调度器

Dispatcher


Dispatcher 是Akka的核心功能。相当于Akka 应用的引擎,对于构建并发可拓展应用相当重要。

Dispatcher模式

所谓dispatcher模式就是一个集中然后分发的模式,类似邮件的派发。 Dispatcher模式提供了如下功能:

  • 集中控制:Dispatcher为各种消息和请求提供了中控管理能力,从而可以重用代码减少重复。
  • 应用隔离:业务层和显示层隔离
  • 减少内部依赖:更少的竞争资源。

Dispatcher 底层是使用java的Executor框架。Executor是一个异步执行框架。基于生产者-消费者模式,任务的提交(生产者)与任务的执行(消费者)是分离的,是不同的线程。

Executor的两个重要的实现如下:

  • ThreadPoolExecutor: 它执行提交的任务,是通过已经预先定义和配置好的线程池。

  • FockJoinPool: 它使用相同的线程池模型,但却使用了work-stealing模式,即池中的线程会去执行其他线程创建的任务,或者任务被分配到其他正在等待执行的线程。

Fork join

是基于细粒度,并行,分而治之的并行模型。其思想是将大的数据块分解成较小的块,利用底层多核优势,并行的处理任务。 5.1

使用Executor框架时,允许定义任务的上限,可以指定如下内容:

  1. 允许多少线程?(thread pool size)
  2. 消息处理前,任务如何入队?
  3. 允许的最大并发数?
  4. 系统过载,或者任务被拒绝策略?
  5. 执行任务的顺序?(FIFO,LIFO)

AKKA 调度器


在akka的世界中,调度器协调着Actor之间消息通信,其自己运行在自己的线程中。这样能保证资源最优,消息能尽快的得到处理。

如果我们把现实世界中的飞机场比作akka系统,映射关系如下。 5.2

在Akka中,调度器,actor,信箱,线程的关系如下:

5.3

调度器运行在线程上,它协调actor和message,基于关联的邮箱和分配在堆上的线程。

AKKA提供了四种Dispatcher,我们可以根据项目的情况选择不同的Dispatcher。

  • Dispatcher默认: 对于这种Dispatcher,每个Actor都有自己的Mailbox;它可以被多个Actor所共享。Dispatcher可以由线程池或fork join pool支持。而且,它针对非阻塞代码进行了优化。

5.4

  • Pinned Dispatcher: 每个Actor都有单独的线程,即不共享Dispatcher.这种Dispatcher比较适合处理对外部资源操作(如IO操作)或耗时比较长的Actor,并针对阻塞操作进行了优化。

5.5

  • Balancing dispatcher:它是基于事件的Dispatcher,它可以合理地协调Actor资源,若某个Actor上的任务较为繁忙,就可以将它的工作分发给闲置的Actor。但是,能够进行任务重新分配的前提是所有Actor都属于相同类型。对于这种Dispatcher,所有Actor只有唯一一个mailbox。它可以被相同类型的Actor所共享。

5.6

  • Calling thread dispatcher:这个Dispatcher主要用于测试,它会将任务执行在当前线程上,而不会创建任何一个新的线程,也不提供确定的执行顺序。如果调用没有及时执行,则任务会入队到thead-local queue,等待前面调用结束再执行。对于这个Dispatcher,每个Actor都有自己的mailbox,它可以被多个Actor共享,为Calling Thread支持。

同样,有四种默认的邮箱:

  • Unbounded mailbox:默认邮箱,ConcurrentLinkedQueue,无阻塞,无界;
  • Bounded mailbox:LinkedBlockingQueue,阻塞,有界;
  • Unbounded priority mailbox:PriorityBlockingQueue阻塞,无界,基于优先级的队列;
  • Bounded priority mailbox:BoundedBlockingQueue,阻塞,有界的基于优先级的队列。�

5.7

Dispatcher配置

Dispatcher可以配置在application.conf文件中,例如配置为采用线程池Executor的默认Dispatcher:

# Configuration for the thread pool thread-pool-executor 
{
    # minimum number of threads 
    core-pool-size-min = 2
    # available processors * factor 
    core-pool-size-factor = 2.0
    # maximum number of threads 
    core-pool-size-max = 10
}

Fork join executor

# Configuration for the fork join pool fork-join-executor 
{
    # Min number of threads 
    parallelism-min = 2
    # available processors * factor 
    parallelism-factor = 2.0
    # Max number of threads 
    parallelism-max = 10
}

为了配置Dispatcher,除了上述外,还有如下参数:

参数名 描述 可能值
type 正在使用的事件类型的Dispatcher的标识符名称 Dispatcher or PinnedDispatcher or BalancingDispatcher
executor executor的类型 fork-join-executor or thread-pool-executor
fork-join-executor 使用上述forjoin的参数
thread-pool-executor 使用上述thread pool的参数
throughput(吞吐量) 定义了线程切换到下一个actor之前处理的消息数上限 设置成1表示尽可能公平
mailbox-capacity(可选) 指定基于队列的邮箱的容量 0 或者false 表示无界队列(默认)
mailbox-type(可选) 指定邮箱类型 Bounded or unbounded mailbox

一个完整的示例如下:

my-dispatcher {
    type = Dispatcher
    executor = "fork-join-executor"
    fork-join-executor { 
        parallelism-min = 2 
        parallelism-factor = 2.0 
        parallelism-max = 10
    }
    throughput = 100
    mailbox-capacity = -1 
    mailbox-type =""
}

基于PinnedDispatcher
my-dispatcher {
type = PinnedDispatcher
executor = "thread-pool-executor"
thread-pool-executor { 
    core-pool-size-min = 2 
    core-pool-size-factor = 2.0 
    core-pool-size-max = 10
}
throughput = 100
mailbox-capacity = -1 mailbox-type =""
}

一旦dispatcher配置好后(application.conf),你就可以更具策略创建Actor,你可以为不同的Actor定义不同的策略。

ActorRef ref = this.getContext().actorOf(Props.create(WriteActor.class).withDispatcher("writer-dispatcher"),"Writer-"+i);

例子


参阅:DispatcherApplication