调度器 Dispatcher


在akka的世界中,调度器协调着Actor之间消息通信,其自己运行在自己的线程中。这样能保证资源最优,消息能尽快的得到处理。

调度器的作用就是把消息分发到指定的线程进行消息的处理。

在Akka中,调度器,actor,信箱,线程的关系如下:

5.3

调度器运行在线程上,它协调actor和message,基于关联的邮箱和分配在堆上的线程。

Dispatcher的两个重要的实现如下:

  • ThreadPoolExecutor: 它执行提交的任务,是通过已经预先定义和配置好的线程池。

  • FockJoinPool: 它使用相同的线程池模型,但却使用了work-stealing模式,即池中的线程会去执行其他线程创建的任务,或者任务被分配到其他正在等待执行的线程。

Fork join

是基于细粒度,并行,分而治之的并行模型。其思想是将大的数据块分解成较小的块,利用底层多核优势,并行的处理任务。 Fork/Join的逻辑非常简单:(1)把大的任务快分为多个小任务,成为Fork;(2)在独自的线程里处理每个任务,如果需要的话,可以把这些任务细分为更小的任务;(3)合并(Join)结果。 Fork/Join框架介绍。参考示例:ForkjoinPoolTest.java

AKKA提供了四种Dispatcher,我们可以根据项目的情况选择不同的Dispatcher。

  • Dispatcher默认: 对于这种Dispatcher,每个Actor都有自己的Mailbox;它可以被多个Actor所共享。Dispatcher可以由线程池或fork join pool支持。而且,它针对非阻塞代码进行了优化。

5.4

  • Pinned Dispatcher: 每个Actor都有单独的线程,即不共享Dispatcher.这种Dispatcher比较适合处理对外部资源操作(如IO操作)或耗时比较长的Actor,并针对阻塞操作进行了优化。

5.5

  • Balancing dispatcher:它是基于事件的Dispatcher,它可以合理地协调Actor资源,若某个Actor上的任务较为繁忙,就可以将它的工作分发给闲置的Actor。但是,能够进行任务重新分配的前提是所有Actor都属于相同类型。对于这种Dispatcher,所有Actor只有唯一一个mailbox。它可以被相同类型的Actor所共享。

5.6

  • Calling thread dispatcher:这个Dispatcher主要用于测试,它会将任务执行在当前线程上,而不会创建任何一个新的线程,也不提供确定的执行顺序。如果调用没有及时执行,则任务会入队到thead-local queue,等待前面调用结束再执行。对于这个Dispatcher,每个Actor都有自己的mailbox,它可以被多个Actor共享,为Calling Thread支持。

Dispatcher配置

Dispatcher可以配置在application.conf文件中,例如配置为采用线程池Executor的默认Dispatcher:

# Configuration for the thread pool thread-pool-executor 
{
    # minimum number of threads 
    core-pool-size-min = 2
    # available processors * factor 
    core-pool-size-factor = 2.0
    # maximum number of threads 
    core-pool-size-max = 10
}

Fork join executor

# Configuration for the fork join pool fork-join-executor 
{
    # Min number of threads 
    parallelism-min = 2
    # available processors * factor 
    parallelism-factor = 2.0
    # Max number of threads 
    parallelism-max = 10
}

为了配置Dispatcher,除了上述外,还有如下参数:

参数名 描述 可能值
type 正在使用的事件类型的Dispatcher的标识符名称 Dispatcher or PinnedDispatcher or BalancingDispatcher
executor executor的类型 fork-join-executor or thread-pool-executor
fork-join-executor 使用上述forjoin的参数
thread-pool-executor 使用上述thread pool的参数
throughput(吞吐量) 定义了线程切换到下一个actor之前处理的消息数上限 设置成1表示尽可能公平
mailbox-capacity(可选) 指定基于队列的邮箱的容量 0 或者false 表示无界队列(默认)
mailbox-type(可选) 指定邮箱类型 Bounded or unbounded mailbox

一个完整的示例如下:

my-dispatcher {
    type = Dispatcher
    executor = "fork-join-executor"
    fork-join-executor { 
        parallelism-min = 2 
        parallelism-factor = 2.0 
        parallelism-max = 10
    }
    throughput = 100
    mailbox-capacity = -1 
    mailbox-type =""
}

基于PinnedDispatcher
my-dispatcher {
type = PinnedDispatcher
executor = "thread-pool-executor"
thread-pool-executor { 
    core-pool-size-min = 2 
    core-pool-size-factor = 2.0 
    core-pool-size-max = 10
}
throughput = 100
mailbox-capacity = -1 mailbox-type =""
}

一旦dispatcher配置好后(application.conf),你就可以更具策略创建Actor,你可以为不同的Actor定义不同的策略。

ActorRef ref = this.getContext().actorOf(Props.create(WriteActor.class).withDispatcher("writer-dispatcher"),"Writer-"+i);

例子


参阅:DispatcherApplication