Actor 概要
什么是Actor?
actor是应用创建中最小的单元。actor是封装状态和行为的对象,他们唯一的通讯方式是交换消息——把消息存放在接收方的邮箱里。从某种意义上来说,actor是面向对象最严格的形式。
Actor分为UntypedActor和TypedActor。 其中UntypedActor为通用的Actor模型实现,要实现一个Actor只需继承UntypedActor类即可, 然后在实现的onReceive(Object msg)的方法中去写各种消息的处理逻辑。 TyepedActor作为中间层用来连接已有的业务服务层和Actor模型,这样已有的业务服务层几乎不用怎么修改。
一个Actor是一个容器,它包含了状态,行为,一个邮箱,子Actor和一个监管策略。所有这些封装在一个Actor引用里。
状态
Actor对象通常包含一些变量来反映其所处的可能状态。这可以是一个明确的状态机(例如使用 FSM 模块),或是一个计数器,一组监听器,待处理的请求,等等。
当actor失败并被其监管者重新启动时,状态会被重新创建,就象第一次创建这个actor一样。这是为了实现系统的“自愈合”。
行为
每当一个消息被处理,它会与actor的当前行为进行匹配。行为是一个函数,它定义了在某个时间点处理当前消息所要采取的动作。
邮箱
Actor的目的是处理消息,这些消息是从其它actor(或者从actor系统外部)发送过来的。连接发送者与接收者的纽带是actor的邮箱:每个actor有且仅有一个邮箱,所有的发来的消息都在邮箱里排队。排队按照发送操作的时间顺序来进行,这意味着由于actor分布在不同的线程中,所以从不同的actor发来的消息在运行时没有一个固定的顺序。从另一个角度讲,从同一个actor发送到相同目标actor的多个消息,会按发送的顺序排队。
AKKA提供了四种邮箱,默认的是无阻塞,无界的,``注意限流```,否则会导致内存溢出;或者使用有界邮箱;
ActorSystem system = ActorSystem.create("MyActorSystem");
ActorRef pingActorWithMailbox = system.actorOf(PingActor.props().withMailbox("akka.actor.boundedmailbox"), "pingActor");
akka.actor.boundedmailbox{
mailbox-type = "akka.dispatch.BoundedMailbox"
mailbox-capacity = 1000000
}
也可以自定义邮箱, 或者参考:
http://letitcrash.com/post/28901663062/throttling-messages-in-akka-2
https://github.com/yngui/akka-disruptor
子Actor
每个actor都是一个潜在的监管者:如果它创建了子actor来委托处理子任务,它会自动地监管它们。子actor列表维护在actor的上下文中,actor可以访问它。对列表的更改是通过创建(context.actorOf(...)
)或者停止(context.stop(child)
)子actor来完成,并且这些更改会立刻生效。实际的创建和停止操作是在幕后以异步方式完成的,这样它们就不会“阻塞”其监管者。
监管策略
唯一需要了解的前提是每个actor有且仅有一个监管者,就是创建它的那个actor。
具体参考监管与监控
Actor引用
一个actor对象需要与外界隔离开才能从actor模型中获益。因此actor是以actor引用的形式展现给外界的,actor引用作为对象,可以被无限制地自由传递。内部和外部对象的这种划分使得所有想要的操作都能够透明:重启actor而不需要更新别处的引用,将实际actor对象放置到远程主机上,向另外一个应用程序发送消息。但最重要的方面是从外界不可能到actor对象的内部获取其状态,除非这个actor非常不明智地将信息公布出去。
- 获取引用的两种方式?
- Actor引用的几种类型?
- Actor引用与路径的区别?
Actor生命周期
Hook 方法
- preStart()和postStop可用用来初始化和消息处理后清理资源
- preRestart()和postRestop()可用用于异常重启时的一些预操作
终止Actor
如何正确的停止Actor呢。这里采用的思想也类似并发设计模式two phase termination,也就是分为两步,先是Stop状态,这时候已经不处理新来的消息了,但是要处理完之前位处理完的消息,等所有消息处理完了就进入了Terminate状态然后做后续的终止处理。
当Actor收到了Stop信号,就会进行下面的步奏:
Actor停止处理邮箱的消息。
Actor发送Stop信号给它所有的child actor。
Actor等待来自所有child actor的termination消息。
Actor开始自己本身的终止步骤:调用postStop方法;Dumping关联的邮箱;发布terminated消息给DeathWatch;通知自己的Supervisor自身的终止情况。
具体调用的api接口如下。当调用ActorSystem的shutdown方法时候,它会连带关闭所有的这个系统关联的actor;可以通过给指定的actor发送PoisonPill消息来终止它;可以通过调用context.stop方法来终止指定的actor。
问题:
- Stop PosionPill 终止Actor的区别?
如何创建actor
Akka采用强制性的父子监管,每一个actor都被监管着,并且(会)监管它的子actor们
定义Actor类 定义actor,需要继承UntypedActorActor,并实现onReceive 方法
import akka.actor.UntypedActor;
import akka.event.Logging;
import akka.event.LoggingAdapter;
public class MyUntypedActor extends UntypedActor {
LoggingAdapter log = Logging.getLogger(getContext().system(), this);
public void onReceive(Object message) throws Exception {
if (message instanceof String)
log.info("Received String message: {}", message);
else
unhandled(message);
}
}
Props Props是一个用来在创建actor时指定选项的配置类.
Props props1 = new Props();
Props props2 = new Props(MyUntypedActor.class);
Props props3 = new Props(new UntypedActorFactory() {
public UntypedActor create() {
return new MyUntypedActor();
}
});
Props props4 = props1.withCreator(new UntypedActorFactory() {
public UntypedActor create() {
return new MyUntypedActor();
}
});
使用Props创建Actor Actor可以通过将 Props 实例传入 actorOf 工厂方法来创建。
ActorRef myActor = system.actorOf(new Props(MyUntypedActor.class).withDispatcher("my-dispatcher"), "myactor");
创建无参Actor actorof将返回ActorRef的实例。
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Props;
ActorSystem system = ActorSystem.create("MySystem");
ActorRef myActor = system.actorOf(new Props(MyUntypedActor.class), "myactor");
除了通过system创建actor,还可以通过其他actors的context来创建actor。区别就是监控数如何组织,通过system创建是顶级actor,由系统监管,而context创建的actor由父actor监管。
public class FatherUntypedActor extends UntypedActor {
ActorRef childActor = getContext().actorOf(new Props(ChileActor.class), "childActor");
name参数是可选的,建议起一个合适的名字,方便日志中标示,名字不可以为空,而且必须唯一。
Actor在创建后,自动的异步启动,并且自动执行preStart
方法,这是一个非常好的用来添加actor初始化代码的位置。
@Override
public void preStart() {
... // initialization code
}
创建有参Actor
如果你的UntypedActor需要参数,不能通过actorOf(new Props(clazz))创建,那么你可以通过new Props(new UntypedActorFactory() {..})来创建有参数的actor.
// 允许传参数给 MyActor 构造方法
ActorRef master = system.actorOf(Props.create(new UntypedActorFactory()
{
public UntypedActor create()
{
return new Master(nrOfWorkers, nrOfMessages, nrOfElements, listener);
}
}), "master");
UntypedActor API
UntypedActor只有一个抽象方法onReceive,如果当前没有匹配到任何消息,则会调用unhandled方法,它的缺省实现是向actor系统的事件流中发布一条 akka.actor.UnhandledMessage(message, sender, recipient)。 另外,它还包括:
self 代表本actor的 ActorRef
sender 代表最近收到的消息的发送actor,通常用于下面将讲到的 回应消息中
supervisorStrategy 用户可重写它来定义对子actor的监管策略
context 暴露actor和当前消息的上下文信息,如:
用于创建子actor的工厂方法 (actorOf)
actor所属的系统
父监管者
所监管的子actor
生命周期监控
hotswap行为栈
其余的可见方法是可以被用户重写的生命周期hook,描述如下:
public void preStart() {
}
public void preRestart(Throwable reason, Option<Object> message) {
for (ActorRef each : getContext().getChildren())
getContext().stop(each);
postStop();
}
public void postRestart(Throwable reason) {
preStart();
}
public void postStop() {
}
以上是UntypedActor的缺省实现
消息
可以通过如下方法向actor发送消息:
tell 意思是“fire-and-forget”, e.g. 异步发送一个消息并立即返回。
ask 异步发送一条消息并返回一个 Future代表一个可能的回应.
首先Actor之间的通信方式是通过中间介质:不可变消息。发送消息有两种方式,其一是fire and forget也就是只管发送没有返回值,实现方法是ActorRef的tell方法;
其二是ask and return也就是发送并且等待返回值,实现方法是ActorRef的ask方法,这个方法会返回一个Future欠条对象,
然后可以调用Await的result(future, timeout)方法去做超时等待返回结果。这其中tell是建议使用的通信方式,这样更符合高并发的异步模型,容易获得更高的吞吐量和更低的延时。
每一个消息发送者分别保证自己的消息的次序.
Tell: Fire-forget 这是发送消息的推荐方式。 不会阻塞地等待消息。它拥有最好的并发性和可扩展性。
actor.tell("Hello");
如果是在一个Actor中调用 ,那么发送方的actor引用会被隐式地作为消息的getSender: ActorRef成员一起发送.目的actor可以使用它来向原actor发送回应, 使用
getSender().tell(replyMsg)
actor.tell("Hello", getSelf());
如果 不 是从Actor实例发送的, sender成员缺省为 deadLetters actor 引用。
Ask:Send-And-Receive-Future ....
回应消息
通常有两种方法来从一个Actor获取回应:
- 第一种是发送一个消息,这种方法只在发送者是一个Actor时有效),
public void onReceive(Object request) {
String result = process(request);
getSender().tell(result);
}
- 第二种是通过一个Future。
Timeout timeout = new Timeout(Duration.create(5, "seconds"));
Future<Object> future = Patterns.ask(actor, msg, timeout);
String result = (String) Await.result(future, timeout.duration());
Scala 的Future有一些 monadic 方法,与Scala集合所使用的方法非常相似. 这使你可以构造出可以传递结果的 ‘管道’ 或 ‘数据流’ 。
参考:FutureTest.java FutureActor.java
第一条消息接收超时
在接收消息时,如果在一段时间内没有收到第一条消息,可以使用超时机制。 要检测这种超时你必须设置 receiveTimeout 属性并声明一个处理 ReceiveTimeout 对象的匹配分支。
public class MyReceivedTimeoutUntypedActor extends UntypedActor {
public MyReceivedTimeoutUntypedActor() {
getContext().setReceiveTimeout(Duration.parse("30 seconds"));
}
public void onReceive(Object message) {
if (message.equals("Hello")) {
getSender().tell("Hello world");
} else if (message == Actors.receiveTimeout()) {
throw new RuntimeException("received timeout");
} else {
unhandled(message);
}
}
}
消息发送规则
- 至多一次投递,即不保证投递
- 对每个 “发送者-接收者” 对,有消息排序
Actor 与 异常
在消息被actor处理的过程中可能会抛出异常,例如数据库异常。
消息会怎样
如果消息处理过程中(即从邮箱中取出并交给receive后)发生了异常,这个消息将被丢失。必须明白它不会被放回到邮箱中。所以如果你希望重试对消息的处理,你需要自己抓住异常然后在异常处理流程中重试. 请确保你限制重试的次数,因为你不会希望系统产生活锁 (从而消耗大量CPU而于事无补)。
邮箱会怎样
如果消息处理过程中发生异常,邮箱没有任何变化。如果actor被重启,邮箱会被保留。邮箱中的所有消息不会丢失。
actor会怎样
如果抛出了异常,actor实例将被丢弃而生成一个新的实例。这个新的实例会被该actor的引用所引用(所以这个过程对开发人员来说是不可见的)。注意这意味着如果你不在preRestart 回调中进行保存,并在postRestart回调中恢复,那么失败的actor实例的当前状态会被丢失。