typed Actor


如果我们有一个基于面向对象的应用程序,可以使用Typed Actor连接它与Akka system。

TypeActor允许定义多个方法,分别接受不同的消息。就像java中的接口。

注:Untyped Actor对消息进行回应,而Typed Actor 对方法进行回应。

什么是Typed Actor ?

Typed Actor包含两部分,接口和实现。Typed Actor的实现使用了主动对象模式(Active Object Design).

(注:主动对象模式:对象分为主动对象和被动对象,主动对象内部包含一个线程,可以自动完成动作或改变状态,而一般的被动对象只能通过被其他对象调用才有所作为。在多线程程序中,经常把一个线程封装到主动对象里面。

主动对象模式关键是隔离了方法执行和方法调用的过程,提高了并行性,对内部拥有控制线程的主动对象,降低了异步访问的复杂性。
)

如何解耦
为了解耦方法的调用和执行,主动模式使用了代理模式分离接口和实现。分别使用了不同的线程运行代理和实现。如下图所示。

4.1

让我们进一步看看主动模式如何实现分离方法的调用和执行

  1. 运行时,客户端调用代理对象执行方法;
  2. 代理对象反过来将方法调用转变为请求到一个Scheduler或者Invocation handler拦截调用;
  3. Scheduler或者Invocation handler将请求入对;
  4. 调度器持续监测队列,并监测可运行的方法。当同步约束满足时,调度器将请求出对列;
  5. Scheduler或者Invocation handler将请求转发到实现对象;
  6. 实现对象和Scheduler运行在相同的线程中,处理请求并将Future结果返回客户端。

在Akka中,代理模式是通过JDK proxy 实现的。

使用Typed Actor 尽量避免使用阻塞的方法,可以使用返回值为Unit或者Future的方法。

创建示例


默认构造函数

    CalculatorInt calculatorInt = TypedActor.get(system).typedActorOf(
            new TypedProps<CalculatorInt>(ICalculatorInt.class,CalculatorInt.class));

有参数的构造函数

    Squarer otherSquarer =
          TypedActor.get(system).typedActorOf(
            new TypedProps<SquarerImpl>(Squarer.class,
              new Creator<SquarerImpl>() {
                public SquarerImpl create() { return new SquarerImpl("foo"); }
              }),
            "name");

消息


强烈建议消息是不可变的。

发送消息

4.2

    // Invoke the method and wait for result
    Future<Integer> future = calculator.add(Integer.valueOf(14),
    Integer.valueOf(6));
    Integer result = Await.result(future, timeout.duration());

终止Actor


TypedActor.get(system).stop(calculator);

or

TypedActor.get(system).poisonPill(calculator);

生命周期


同unTyped Actor一样,有preStart,PostStop钩子方法,但是Typed Actor 是接口,要实现该接口,而不是重写。

接收任意消息


如果你的有类型actor的实现类扩展了akka.actor.TypedActor.Receiver,所有非方法调用MethodCall的消息会被传给onReceive方法. 这使你能够对DeathWatch的Terminated消息及其它类型的消息进行处理,例如,与无类型actor进行交互的场合。

代理


为了发送消息到Typed Actor ,你需要动态代理接口获得这个ActorRef.

注意

目标Actor引用需要能处理MethodCall消息.

    //Get access to the ActorRef
    ActorRef calActor = TypedActor.get(_system) .getActorRefFor(calculator);
    Chapter 4
    //pass a message
    calActor.tell("Hi there")s;

监控策略


当Typed Actor 处理子actor时,父Actor需要管理子actor的失败,因此需要定义监控策略。

通过实现类实现TypedActor.Supervisor方法,你可以定义用来监管子actor的策略。

Actor监控树


为了创建子actor,我们需要Typed Actor的context并执行axtorof方法。

ActorRef childActor = TypedActor.context().actorOf(
new Props(ChildActor.class), "childActor");

调度器和路由器


Dispatcher 作用是控制和协调消息的发送,它会运行在自己的线程上,分发来自各个actor mailbox的消息,将其分配到执行线程的堆上。Akka提供了多个dispatcher策略,根据硬件和应用负载,可以自己选择策略。

AKKA的Dispatcher是基于Java Executor Framework来实现的,这一框架属于java.util.concurrent的一部分,主要用于异步任务的执行。通过Dispatcher创建Actor,多个Actor映射到n个线程上。线程池的大小可以配置。

在Akka中,Rounter也是一种Actor类型,通常大量并行的Actor处理消息时,可以通过Rounter路由消息到目的Actor.

通常代表一组相同Actor,处理消息时,根据策略路由到不同的Actor,用于负载均衡场景。

例子


有一个基于Typed Actor实现了加,减,计数的功能。

参阅:CalculatorTypedActor

方法派发语义

方法返回:

  • Unit ::: 会以 fire-and-forget语义进行派发,与Untyped Actor,ActorRef.tell完全一致。

  • Future[_] ::: 会以 send-request-reply语义进行派发,与 ActorRef.ask完全一致,方法是非阻塞的。

  • Option[] ::: 会以send-request-reply语义派发,但是会阻塞等待应答, 如果在超时时限内没有应答则返回scala.None,否则返回包含结果的scala.Some[]。在这个调用中发生的异常将被重新抛出。

  • 任何其它类型的值将以send-request-reply语义进行派发,但会阻塞地等待应答,如果超时会抛出java.util.concurrent.TimeoutException,如果发生异常则将异常重新抛出。