调度器
Dispatcher
Dispatcher 是Akka的核心功能。相当于Akka 应用的引擎,对于构建并发可拓展应用相当重要。
Dispatcher模式
所谓dispatcher模式就是一个集中然后分发的模式,类似邮件的派发。 Dispatcher模式提供了如下功能:
- 集中控制:Dispatcher为各种消息和请求提供了中控管理能力,从而可以重用代码减少重复。
- 应用隔离:业务层和显示层隔离
- 减少内部依赖:更少的竞争资源。
Dispatcher 底层是使用java的Executor框架。Executor是一个异步执行框架。基于生产者-消费者模式,任务的提交(生产者)与任务的执行(消费者)是分离的,是不同的线程。
Executor的两个重要的实现如下:
ThreadPoolExecutor: 它执行提交的任务,是通过已经预先定义和配置好的线程池。
FockJoinPool: 它使用相同的线程池模型,但却使用了work-stealing模式,即池中的线程会去执行其他线程创建的任务,或者任务被分配到其他正在等待执行的线程。
Fork join
是基于细粒度,并行,分而治之的并行模型。其思想是将大的数据块分解成较小的块,利用底层多核优势,并行的处理任务。
使用Executor框架时,允许定义任务的上限,可以指定如下内容:
- 允许多少线程?(thread pool size)
- 消息处理前,任务如何入队?
- 允许的最大并发数?
- 系统过载,或者任务被拒绝策略?
- 执行任务的顺序?(FIFO,LIFO)
AKKA 调度器
在akka的世界中,调度器协调着Actor之间消息通信,其自己运行在自己的线程中。这样能保证资源最优,消息能尽快的得到处理。
如果我们把现实世界中的飞机场比作akka系统,映射关系如下。
在Akka中,调度器,actor,信箱,线程的关系如下:
调度器运行在线程上,它协调actor和message,基于关联的邮箱和分配在堆上的线程。
AKKA提供了四种Dispatcher,我们可以根据项目的情况选择不同的Dispatcher。
- Dispatcher默认: 对于这种Dispatcher,每个Actor都有自己的Mailbox;它可以被多个Actor所共享。Dispatcher可以由线程池或fork join pool支持。而且,它针对非阻塞代码进行了优化。
- Pinned Dispatcher: 每个Actor都有单独的线程,即不共享Dispatcher.这种Dispatcher比较适合处理对外部资源操作(如IO操作)或耗时比较长的Actor,并针对阻塞操作进行了优化。
- Balancing dispatcher:它是基于事件的Dispatcher,它可以合理地协调Actor资源,若某个Actor上的任务较为繁忙,就可以将它的工作分发给闲置的Actor。但是,能够进行任务重新分配的前提是所有Actor都属于相同类型。对于这种Dispatcher,所有Actor只有唯一一个mailbox。它可以被相同类型的Actor所共享。
- Calling thread dispatcher:这个Dispatcher主要用于测试,它会将任务执行在当前线程上,而不会创建任何一个新的线程,也不提供确定的执行顺序。如果调用没有及时执行,则任务会入队到thead-local queue,等待前面调用结束再执行。对于这个Dispatcher,每个Actor都有自己的mailbox,它可以被多个Actor共享,为Calling Thread支持。
同样,有四种默认的邮箱:
- Unbounded mailbox:默认邮箱,ConcurrentLinkedQueue,无阻塞,无界;
- Bounded mailbox:LinkedBlockingQueue,阻塞,有界;
- Unbounded priority mailbox:PriorityBlockingQueue阻塞,无界,基于优先级的队列;
- Bounded priority mailbox:BoundedBlockingQueue,阻塞,有界的基于优先级的队列。�
Dispatcher配置
Dispatcher可以配置在application.conf文件中,例如配置为采用线程池Executor的默认Dispatcher:
# Configuration for the thread pool thread-pool-executor
{
# minimum number of threads
core-pool-size-min = 2
# available processors * factor
core-pool-size-factor = 2.0
# maximum number of threads
core-pool-size-max = 10
}
Fork join executor
# Configuration for the fork join pool fork-join-executor
{
# Min number of threads
parallelism-min = 2
# available processors * factor
parallelism-factor = 2.0
# Max number of threads
parallelism-max = 10
}
为了配置Dispatcher,除了上述外,还有如下参数:
参数名 | 描述 | 可能值 |
---|---|---|
type | 正在使用的事件类型的Dispatcher的标识符名称 | Dispatcher or PinnedDispatcher or BalancingDispatcher |
executor | executor的类型 | fork-join-executor or thread-pool-executor |
fork-join-executor | 使用上述forjoin的参数 | |
thread-pool-executor | 使用上述thread pool的参数 | |
throughput(吞吐量) | 定义了线程切换到下一个actor之前处理的消息数上限 | 设置成1表示尽可能公平 |
mailbox-capacity(可选) | 指定基于队列的邮箱的容量 | 0 或者false 表示无界队列(默认) |
mailbox-type(可选) | 指定邮箱类型 | Bounded or unbounded mailbox |
一个完整的示例如下:
my-dispatcher {
type = Dispatcher
executor = "fork-join-executor"
fork-join-executor {
parallelism-min = 2
parallelism-factor = 2.0
parallelism-max = 10
}
throughput = 100
mailbox-capacity = -1
mailbox-type =""
}
基于PinnedDispatcher
my-dispatcher {
type = PinnedDispatcher
executor = "thread-pool-executor"
thread-pool-executor {
core-pool-size-min = 2
core-pool-size-factor = 2.0
core-pool-size-max = 10
}
throughput = 100
mailbox-capacity = -1 mailbox-type =""
}
一旦dispatcher配置好后(application.conf),你就可以更具策略创建Actor,你可以为不同的Actor定义不同的策略。
ActorRef ref = this.getContext().actorOf(Props.create(WriteActor.class).withDispatcher("writer-dispatcher"),"Writer-"+i);
例子
参阅:DispatcherApplication