Actor 概要

什么是Actor?


actor是应用创建中最小的单元。actor是封装状态和行为的对象,他们唯一的通讯方式是交换消息——把消息存放在接收方的邮箱里。从某种意义上来说,actor是面向对象最严格的形式。

Actor分为UntypedActor和TypedActor。 其中UntypedActor为通用的Actor模型实现,要实现一个Actor只需继承UntypedActor类即可, 然后在实现的onReceive(Object msg)的方法中去写各种消息的处理逻辑。 TyepedActor作为中间层用来连接已有的业务服务层和Actor模型,这样已有的业务服务层几乎不用怎么修改。

一个Actor是一个容器,它包含了状态行为,一个邮箱子Actor和一个监管策略。所有这些封装在一个Actor引用里。

状态

Actor对象通常包含一些变量来反映其所处的可能状态。这可以是一个明确的状态机(例如使用 FSM 模块),或是一个计数器,一组监听器,待处理的请求,等等。

当actor失败并被其监管者重新启动时,状态会被重新创建,就象第一次创建这个actor一样。这是为了实现系统的“自愈合”。

行为

每当一个消息被处理,它会与actor的当前行为进行匹配。行为是一个函数,它定义了在某个时间点处理当前消息所要采取的动作。

图示1

邮箱

Actor的目的是处理消息,这些消息是从其它actor(或者从actor系统外部)发送过来的。连接发送者与接收者的纽带是actor的邮箱:每个actor有且仅有一个邮箱,所有的发来的消息都在邮箱里排队。排队按照发送操作的时间顺序来进行,这意味着由于actor分布在不同的线程中,所以从不同的actor发来的消息在运行时没有一个固定的顺序。从另一个角度讲,从同一个actor发送到相同目标actor的多个消息,会按发送的顺序排队。

图示2

AKKA提供了四种邮箱,默认的是无阻塞,无界的,``注意限流```,否则会导致内存溢出;或者使用有界邮箱;

    ActorSystem system = ActorSystem.create("MyActorSystem");
    ActorRef pingActorWithMailbox = system.actorOf(PingActor.props().withMailbox("akka.actor.boundedmailbox"), "pingActor");

    akka.actor.boundedmailbox{
      mailbox-type = "akka.dispatch.BoundedMailbox"
      mailbox-capacity = 1000000
    }

图示3

也可以自定义邮箱, 或者参考:

http://letitcrash.com/post/28901663062/throttling-messages-in-akka-2

https://github.com/yngui/akka-disruptor

子Actor

每个actor都是一个潜在的监管者:如果它创建了子actor来委托处理子任务,它会自动地监管它们。子actor列表维护在actor的上下文中,actor可以访问它。对列表的更改是通过创建(context.actorOf(...))或者停止(context.stop(child))子actor来完成,并且这些更改会立刻生效。实际的创建和停止操作是在幕后以异步方式完成的,这样它们就不会“阻塞”其监管者。

监管策略

唯一需要了解的前提是每个actor有且仅有一个监管者,就是创建它的那个actor。

具体参考监管与监控

Actor引用

一个actor对象需要与外界隔离开才能从actor模型中获益。因此actor是以actor引用的形式展现给外界的,actor引用作为对象,可以被无限制地自由传递。内部和外部对象的这种划分使得所有想要的操作都能够透明:重启actor而不需要更新别处的引用,将实际actor对象放置到远程主机上,向另外一个应用程序发送消息。但最重要的方面是从外界不可能到actor对象的内部获取其状态,除非这个actor非常不明智地将信息公布出去。

  • 获取引用的两种方式?
  • Actor引用的几种类型?
  • Actor引用与路径的区别?

Actor生命周期

图示3

Hook 方法

  • preStart()和postStop可用用来初始化和消息处理后清理资源
  • preRestart()和postRestop()可用用于异常重启时的一些预操作

终止Actor

    如何正确的停止Actor呢。这里采用的思想也类似并发设计模式two phase termination,也就是分为两步,先是Stop状态,这时候已经不处理新来的消息了,但是要处理完之前位处理完的消息,等所有消息处理完了就进入了Terminate状态然后做后续的终止处理。
    当Actor收到了Stop信号,就会进行下面的步奏:

    Actor停止处理邮箱的消息。
    Actor发送Stop信号给它所有的child actor。
    Actor等待来自所有child actor的termination消息。
    Actor开始自己本身的终止步骤:调用postStop方法;Dumping关联的邮箱;发布terminated消息给DeathWatch;通知自己的Supervisor自身的终止情况。
    具体调用的api接口如下。当调用ActorSystem的shutdown方法时候,它会连带关闭所有的这个系统关联的actor;可以通过给指定的actor发送PoisonPill消息来终止它;可以通过调用context.stop方法来终止指定的actor。

问题:

  • Stop PosionPill 终止Actor的区别?

如何创建actor


Akka采用强制性的父子监管,每一个actor都被监管着,并且(会)监管它的子actor们

定义Actor类 定义actor,需要继承UntypedActorActor,并实现onReceive 方法

import akka.actor.UntypedActor;
import akka.event.Logging;
import akka.event.LoggingAdapter;

public class MyUntypedActor extends UntypedActor {
  LoggingAdapter log = Logging.getLogger(getContext().system(), this);

  public void onReceive(Object message) throws Exception {
    if (message instanceof String)
      log.info("Received String message: {}", message);
    else
      unhandled(message);
  }
}

Props Props是一个用来在创建actor时指定选项的配置类.

Props props1 = new Props();
Props props2 = new Props(MyUntypedActor.class);
Props props3 = new Props(new UntypedActorFactory() {
  public UntypedActor create() {
    return new MyUntypedActor();
  }
});
Props props4 = props1.withCreator(new UntypedActorFactory() {
  public UntypedActor create() {
    return new MyUntypedActor();
  }
});

使用Props创建Actor Actor可以通过将 Props 实例传入 actorOf 工厂方法来创建。

ActorRef myActor = system.actorOf(new Props(MyUntypedActor.class).withDispatcher("my-dispatcher"), "myactor");

创建无参Actor actorof将返回ActorRef的实例。

import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Props;
ActorSystem system = ActorSystem.create("MySystem");
ActorRef myActor = system.actorOf(new Props(MyUntypedActor.class), "myactor");

除了通过system创建actor,还可以通过其他actors的context来创建actor。区别就是监控数如何组织,通过system创建是顶级actor,由系统监管,而context创建的actor由父actor监管。

public class FatherUntypedActor extends UntypedActor {
  ActorRef childActor = getContext().actorOf(new Props(ChileActor.class), "childActor");

name参数是可选的,建议起一个合适的名字,方便日志中标示,名字不可以为空,而且必须唯一。 Actor在创建后,自动的异步启动,并且自动执行preStart方法,这是一个非常好的用来添加actor初始化代码的位置。

@Override
public void preStart() {
  ... // initialization code
}

创建有参Actor

如果你的UntypedActor需要参数,不能通过actorOf(new Props(clazz))创建,那么你可以通过new Props(new UntypedActorFactory() {..})来创建有参数的actor.

// 允许传参数给 MyActor 构造方法
ActorRef master = system.actorOf(Props.create(new UntypedActorFactory()
        {
            public UntypedActor create()
            {
                return new Master(nrOfWorkers, nrOfMessages, nrOfElements, listener);
            }
        }), "master");
UntypedActor API

UntypedActor只有一个抽象方法onReceive,如果当前没有匹配到任何消息,则会调用unhandled方法,它的缺省实现是向actor系统的事件流中发布一条 akka.actor.UnhandledMessage(message, sender, recipient)。 另外,它还包括:

self 代表本actor的 ActorRef
sender 代表最近收到的消息的发送actor,通常用于下面将讲到的 回应消息中
supervisorStrategy 用户可重写它来定义对子actor的监管策略
context 暴露actor和当前消息的上下文信息,如:
    用于创建子actor的工厂方法 (actorOf)
    actor所属的系统
    父监管者
    所监管的子actor
    生命周期监控
    hotswap行为栈

其余的可见方法是可以被用户重写的生命周期hook,描述如下:

public void preStart() {
}

public void preRestart(Throwable reason, Option<Object> message) {
  for (ActorRef each : getContext().getChildren())
    getContext().stop(each);
  postStop();
}

public void postRestart(Throwable reason) {
  preStart();
}

public void postStop() {
}

以上是UntypedActor的缺省实现

消息

可以通过如下方法向actor发送消息:

tell 意思是“fire-and-forget”, e.g. 异步发送一个消息并立即返回。
ask 异步发送一条消息并返回一个 Future代表一个可能的回应.

首先Actor之间的通信方式是通过中间介质:不可变消息。发送消息有两种方式,其一是fire and forget也就是只管发送没有返回值,实现方法是ActorRef的tell方法;
其二是ask and return也就是发送并且等待返回值,实现方法是ActorRef的ask方法,这个方法会返回一个Future欠条对象,
然后可以调用Await的result(future, timeout)方法去做超时等待返回结果。这其中tell是建议使用的通信方式,这样更符合高并发的异步模型,容易获得更高的吞吐量和更低的延时。

每一个消息发送者分别保证自己的消息的次序.

Tell: Fire-forget 这是发送消息的推荐方式。 不会阻塞地等待消息。它拥有最好的并发性和可扩展性。

actor.tell("Hello");

如果是在一个Actor中调用 ,那么发送方的actor引用会被隐式地作为消息的getSender: ActorRef成员一起发送.目的actor可以使用它来向原actor发送回应, 使用 getSender().tell(replyMsg)

actor.tell("Hello", getSelf());

如果 不 是从Actor实例发送的, sender成员缺省为 deadLetters actor 引用。

Ask:Send-And-Receive-Future ....

回应消息

通常有两种方法来从一个Actor获取回应:

  • 第一种是发送一个消息,这种方法只在发送者是一个Actor时有效),
public void onReceive(Object request) {
  String result = process(request);
  getSender().tell(result);        
}
  • 第二种是通过一个Future。
    Timeout timeout = new Timeout(Duration.create(5, "seconds"));
    Future<Object> future = Patterns.ask(actor, msg, timeout);
    String result = (String) Await.result(future, timeout.duration());

Scala 的Future有一些 monadic 方法,与Scala集合所使用的方法非常相似. 这使你可以构造出可以传递结果的 ‘管道’ 或 ‘数据流’ 。

参考:FutureTest.java FutureActor.java

第一条消息接收超时

在接收消息时,如果在一段时间内没有收到第一条消息,可以使用超时机制。 要检测这种超时你必须设置 receiveTimeout 属性并声明一个处理 ReceiveTimeout 对象的匹配分支。

public class MyReceivedTimeoutUntypedActor extends UntypedActor {

  public MyReceivedTimeoutUntypedActor() {
    getContext().setReceiveTimeout(Duration.parse("30 seconds"));
  }

  public void onReceive(Object message) {
    if (message.equals("Hello")) {
      getSender().tell("Hello world");
    } else if (message == Actors.receiveTimeout()) {
      throw new RuntimeException("received timeout");
    } else {
      unhandled(message);
    }
  }
}

消息发送规则

  • 至多一次投递,即不保证投递
  • 对每个 “发送者-接收者” 对,有消息排序

消息发送语义

Actor 与 异常

在消息被actor处理的过程中可能会抛出异常,例如数据库异常。

消息会怎样

如果消息处理过程中(即从邮箱中取出并交给receive后)发生了异常,这个消息将被丢失。必须明白它不会被放回到邮箱中。所以如果你希望重试对消息的处理,你需要自己抓住异常然后在异常处理流程中重试. 请确保你限制重试的次数,因为你不会希望系统产生活锁 (从而消耗大量CPU而于事无补)。

邮箱会怎样

如果消息处理过程中发生异常,邮箱没有任何变化。如果actor被重启,邮箱会被保留。邮箱中的所有消息不会丢失。

actor会怎样

如果抛出了异常,actor实例将被丢弃而生成一个新的实例。这个新的实例会被该actor的引用所引用(所以这个过程对开发人员来说是不可见的)。注意这意味着如果你不在preRestart 回调中进行保存,并在postRestart回调中恢复,那么失败的actor实例的当前状态会被丢失。