路由器

路由器


上一节我们看到了可以通过正确地的调度策略增加消息处理的吞吐量。当有大量的Actor并行工作,这时需要一个路由器将消息从源Actor路由到目标Actor.

在Akka中,路由器也是一种Actor,路由消息到外部Actor.对路由器来说,外部Actor叫做routee。路由器使用不同的算法路由消息。

6.1

(注:为了避免单点问题,路由actor是一种特殊的类型-RounterActorRef。RounterActorRef不是利用存储转发的机制,相反,他直接路由消息到Rountee的邮箱,而不是到Router的邮箱,当routee答复路由消息时,回复将发送到原始发件人,而不是路由actor。)

默认,Akka rounter 提供如下内部机制:

  • Round robin rounter: 循环将消息转发到routee;
  • Random rounter: 随机选择routee;
  • Smallest mailbox rounter:将消息路由到邮箱中消息量最少的actor;
  • Broadcast router:广播相同的消息到所有的routee;
  • Scatter gather first completed router:广播所有,返回future,当获取到结果,立马返回。

路由Actor的两种模式


路由Actor有两种模式:

  • 池-- 路由器创建routee作为子actor,并在该子actor终止时将它从路由器中移除;
  • 群组--routee actor在路由器外部创建,而路由器将通过使用actor选择将消息发送到指定路径,而不监控其终止。

路由器使用


为了创建路由器,设置指定数量的rountee,我们需要以下信息-rounter类型和routee实例的数量。

ActorRef masterAct = system.actorOf(Props.create(MasterActor.class).withRouter(new RoundRobinPool(5)),
    "RounterActor");
在这,我们定义了一个Actor,通过实例化一个路由器实例,这里是RoundRobin,它的构造函数接受一个参数,表示创建rountees的数量.

(注:当我们定义一个rounter Actor,我们提供了名字-RounterActor。一个Actor只能有一个名字,routee是这个Actor的child,他负责监控child actor(rountees)的生命周期)

通过application.conf使用路由器


路由器通用可以通过配置application.conf使用,具体如下

MyRounterExample {
    akka.actor.deployment {
      /myRandomRouterActor {
        router = random
        nr-of-instances = 5
      }
    }
}

代码中使用如下:

ActorSystem _system = ActorSystem.create("RandomRouterExample", ConfigFactory.load().getConfig("MyRouterExample"));

ActorRef randomRouter = _system.actorOf(
new Props(MsgEchoActor.class).withRouter(new FromConfig()), "myRandomRouterActor") ;

分布式Actor中使用路由器


有些时候我们可能需要将消息路由到远程Actor,这种情况下,每个Actor都有一个不同的地址。为了处理这种情况,我们首先创建一个远程对象地址列表,包含远程节点系详情。

Address addr1 = new Address("akka", "remotesys", "host1", 1234);
Address addr2 = new Address("akka", "remotesys", "host2", 1234);
Address[] addresses = new Address[] { addr1, addr2 };
ActorRef routerRemote = system.actorOf(new Props(MyEchoActor.class) .withRouter(new RemoteRouterConfig(new RoundRobinRouter(5),
addresses)));

远程节点的地址同样可以配置在application.conf中,如下:

akka.actor.deployment {
    /myRandomRouterActor {
        router = round-robin
        nr-of-instances = 5 
        target {
            nodes = ["akka://[email protected]:2552", "akka: / /[email protected]:2552"]
        } 
    }
}

监管


路由器也是子actor的监管者。路由器的默认策略是总是上溯,所以错误会传递给路由器的监管者处理。

注意路由器的监管者会将错误当做路由器的错位,因此会重启路由器,并将导致他的孩子全部重启。

应该提到的是路由器重新启动行为已被重写,以便重新启动时,仍重新创建这些孩子,并会在池中保留相同数量的actor。

注意

如果路由器池的子actor终止,池路由器不会自动产生一个新的actor。在池路由器所有子actor都终止的事件中,路由器将终止本身,除非它是一个动态的路由器,例如使用了大小调整。

动态调整Rounters大小


为了处理处理大量消息,动态调整actor数量非常重要。Rounter提供了resize方法,允许运行是改变实例的数量。

int lowerBound = 2;
int upperBound = 15;
DefaultResizer resizer = new DefaultResizer(lowerBound, upperBound);
ActorRef randomRouter = _system.actorOf(new Props(MsgEchoActor.class). withRouter(new RandomRouter(resizer)));

同样可以在application.conf中配置:

akka.actor.deployment {
    /myRandomRouterActor {
        router = round-robin 
        nr-of-instances = 5 
        resizer {
            lower-bound = 2
            upper-bound = 15 
        }
    } 
}

自定义Rounter


如果默认的Rounter类型不能满足需求,Akka同样支持自定义的Rounter。Akka提供了RouterConfig接口,允许自己定制,同时还提供了ScatterGatherFirstCompletedLike接口,允许自己定义分散-收集-最先完成rounter模型。

现在我们自定义一个批量消息处理路由器--路由器切换到下一个actor之前,必须路由指定数量的消息到当前actor。 如果我们定义批量数字为5,那么消息处理如下图:

6.2

例子


参阅:Rounter.java