洛阳java培训
洛阳王城中心

4000178985

热门课程

深度解析– ScheduledThreadPoolExecutor源码解析

  • 时间:2018-02-12 10:39
  • 发布:达内
  • 来源:达内

1. 背景:

提到worker,读者的第一反应都是最常用的quartz框架,它提供了一套简单的定义周期性任务的方式并与Spring做了融合,可以让一个worker的配置简化到只需定义一个任务和执行周期后即完成了一个work的配置。比如:

1
2
3
4
5
<task:scheduled-tasks>
    <task:scheduled ref="testJob" method="work" cron="0 */1 * * * ?" />
</task:scheduled-tasks>
 
 <bean id="testJob" class="com.TestJob"/>

要清楚quartz是怎么实现任务的周期性执行的我们要先一起来看下定义task标签的xsd文件:其中,task标签声明了一组通过ref属性关联周期执行的任务,通过method属性指定了方法,cron属性指定了周期性执行任务的时间表达式。值得注意的是,或许是为了方便quartz将该表达式的格式和类UNIX系统上的定期指定任务的cron表达式保持了一致,但实际上quartz的时间表达式和类UNIX系统上cron的时间表达式完全没有关系,既不是通过JDK的封装去调用cron,也不是将该表达式传递给操作系统执行,而是仅仅表达式格式上保持了一致,具体功能的实现是交给了JDK的另外一个工具类,也就是本文将要说明的JDK的工具类去完成的。

可以看到上述的配置信息会映射到类型为TaskScheduler的scheduler属性上,它有下面这么几个子类

除了TimerManagerTaskScheduler 是为 CommonJ提供的封装外,其余两个子类都持有一个ScheduledExecutorService类型的变量,该变量会完成周期性执行任务的功能,而quartz默认会使用ConcurrentTaskScheduler这个实现类,当然,也可以在配置scheduled-tasks的时候通过ref属性重新指定。

既然quartz默认使用了ConcurrentTaskScheduler,那我们先从ConcurrentTaskScheduler开始,看下quartz是怎么实现定时任务的功能的。

通过ConcurrentTaskScheduler类的简介,可以知道他继承了ConcurrentTaskExecutor用于持有任务的配置信息,并且自己持有了一个ScheduledExecutorService的引用用于调度定时任务,通过查看他的schedule方法可以知道对于定时任务的调用ConcurrentTaskScheduler最终都是委派给scheduledExecutor属性的。

事实上ConcurrentTaskScheduler本身更像是对时间配置,任务配置,融合Spring方面做了些封装,最终执行任务的,还是JDK的这个接口ScheduledExecutorService的实现类ScheduledThreadPoolExecutor。

可以说ScheduledThreadPoolExecutor是quartz实现定时任务的关键。

2. 解读ScheduledThreadPoolExecutor

下面我们一起来看看ScheduledThreadPoolExecutor到底做了些什么事情,先看下它的结构(为了保持一致,这次对ScheduledThreadPoolExecutor的讲解同样基于jdk1.8.0_20,和JDK1.6相比有不少改动):

继承自ThreadPoolExecutor并且实现了ScheduledExecutorService接口。

内部有很多方法和两个内部类:DelayedWorkQueue和ScheduledFutureTask,这两个类具体做了什么事情,后面再详述。

为了说明ScheduledThreadPoolExecutor是怎么实现的定时,延迟,重复执行任务的,我们从他最常用的API,schedule方法开始说起:

接受 一个Runnable接口的子类,推迟启动的时间单位和数量。

并且将他们包装到ScheduledFutureTask中,也是上面提到的ScheduledThreadPoolExecutor的内部类之一。

ScheduledFutureTask 有这么几个私有变量,时间(纳秒为单位,在上面构造时,通过triggerTime方法计算出来的),重复周期period,和一个序列号sequenceNumber,sequencer由ScheduledThreadPoolExecutor持有,

new好ScheduledFutureTask后用decorateTask方法包装一下,其实,什么也没做,直接将task返回了,这里应该是兼容老代码的原因,又偷懒了一下,没有直接改掉这个方法。

包装后调用delayedExecute方法,提交task。

delayedExecute方法中,如果当前线程池已经关闭,调用拒绝策略执行command,之所以说执行不是拒绝,是因为JDK的拒绝策略中并非都是直接将任务拒绝:

看注释可以知道,JDK拒绝策略中,有直接将任务丢弃的,有抛出异常的,还有在当前线程下直接调用run()方法执行的。

之所以说这个,是因为很多使用者在new线程池的时候没有注意到构造方法里面是需要这个参数的,并且默认的这个参数是:

就是抛出异常的拒绝策略,这回导致在线程池添加满任务后,直接抛出RejectExecutionException,而不是用当前线程执行任务。

因此通常我们更常用的应该是:CallerRunsPolicy。

回到原题,看下delayedExecute方法:

如果线程池没有关闭,检查当前线程池已启动的线程数,是否达到corePoolSize,没有的话,新建一个线程并启动它。注意,这个时候新建的线程是没有持有任何Runnable对象的,它是在启动后到queue(工作队列)中去取出任务执行。

创建完线程后向工作队列中添加commond任务。

到此为止,一个用ScheduledFutureTask包装的任务提交到了线程池的工作队列中,等待线程取出后执行。

任务被取出后,会先调用Runnable接口的run方法,那我们来看下run方法做了些什么

1
2
3
4
5
6
7
8
9
10
11
public void run() {
       boolean periodic = isPeriodic();
       if (!canRunInCurrentRunState(periodic))// 检查当前线程池状态是否需要取消
            cancel(false);
       else if (!periodic)// 如果不是周期性任务,直接调用父类FutureTask的run方法执行任务。
            ScheduledFutureTask.super.run();
        else if (ScheduledFutureTask.super.runAndReset()) {//否则,调用父类run方法执行任务,但是不设置结果,以便计算出下次执行的时间后,重复执行。
            setNextRunTime();
            reExecutePeriodic(outerTask);
       }
 }

1. ScheduledFutureTask是FutureTask的子类,它使用了FutureTask执行但是不设置结果的API:runAndReset()。这里有几点需要注意:

其他功能,有兴趣的可以看我原来的博客:futuretask-源码解析

2. 所谓的延迟启动,周期性重复执行的功能在这里还没有体现出来,到底怎么做的,我们继续看。

我们知道,每次调用schedule方法, 都会向线程池的工作队列中添加一个任务,而ScheduledThreadPoolExecutor自己实现了一个工作队列DelayedWorkQueue,也就是ScheduledThreadPoolExecutor中的另外一个内部类,会不会和这个有关系呢?

3. 解读DelayedWorkQueue

看看DelayedWorkQueue的add方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public boolean offer(Runnable x) {
   if (x == null)
           throw new NullPointerException();
      RunnableScheduledFuture<?> e = (RunnableScheduledFuture<?>)x; //将x转成RunnableScheduledFuture 实时上ScheduledFutureTask也是RunnableScheduledFuture 的子类
      final ReentrantLock lock = this.lock;
      lock.lock();//加锁添加元素
      try {
           int i = size;
           if (i >= queue.length)
               grow(); //队列满,扩容
           size = i + 1;
           if (i == 0) {
               queue[0] = e;
               setIndex(e, 0);
           } else {
               siftUp(i, e);//注意:当队列中元素不为空时,调用了siftUp方法
           }
           if (queue[0] == e) {
               leader = null;
               available.signal();//唤醒取元素的线程,告诉它工作队列中有元素可以取了。
           }
       } finally {
            lock.unlock();
       }
      return true;
}

想知道 condition怎么实现的,看我原来的一篇博客:怎么理解Condition

整个方法没有什么特殊的地方,不外乎向队列中添加了一个元素,唯独有个siftup方法,这是理解整个ScheduledThreadPoolExecutor的重点。

在说siftup方法前,我们先看下DelayedWorkQueue的数据结构,其实ScheduledThreadPoolExecutor之所以自己实现工作队列,原因就是这个工作队列有非常特殊的结构。

我们知道如果有一个二叉树的数组,抽象起来用树表示是这个样子:

放在数组中后,是这个样子:

并且二叉树还有一个特性是:

任意结点的子节点的索引位置是其本身索引位置乘2后+1,比如,元素2的子节点是2*2+1=5

任意结点的父节点的索引位置是该结点的索引位置-1后除2并向下取整,比如6的子节点是(6-1)/2 = 2

这个特性对于查找父子结点来说非常的方便。

这个结构被ScheduledThreadPoolExecutor的作者Doug Lea用到了DelayedWorkQueue中,并且将DelayedWorkQueue变成了一个按超时时间升序排序的队列,遵循”左结点比右节点小(下次执行的时间更短)的原则“。

而按超时时间升序排序的队列的原因是为了将即将要执行的任务放在尽量靠前的位置,用二叉树的原因是因为二叉树的查找,加入都相对较快。

明白以上结构后,再看siftup方法

举个例子:

现在要插入一个新的元素,这个时候结合上面的代码可以知道,变量k=10,key为图中new结点,

1. 经过计算,parent = 4 e=parent[4]=3 (注意是索引为4的结点,它的值是3号元素)

2. key=new,e=parent[4] 两者比较下次执行的时间(超时时间)

比较方法如下:

3. 比较后,若key>=e 说明key元素下次执行的时间(超时时间)更长,应该靠后,直接添加在队列尾部。

4. 若key

同理,我们再来看看remove方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public boolean remove(Object x) {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
             int i = indexOf(x);
             if (i < 0)
                return false;
 
             setIndex(queue[i], -1);
             int s = --size;
             RunnableScheduledFuture<?> replacement = queue[s]; //把最后一个元素取出来,用于缩短队列,并且,将取出的元素尝试放在被移除元素的位置上,至于是否要真的放在该位置上要看最后这个元素的下次一次的执行时间(超时时间)决定,因为,DelayedWorkQueue是个有序队列
             queue[s] = null;
             if (s != i) {
                siftDown(i, replacement);
                if (queue[i] == replacement)
                    siftUp(i, replacement);
            }
            return true;
       } finally {
              lock.unlock();
       }
}

当移除的元素不是最后一个元素时,意味着,最后一个元素要开始寻找合适的位置,这个时候就会调用siftDown方法, 该方法的目的是为replacement找到合适的位置,可能是被移除元素的位置,也可能不是。

先附上siftDown代码,对比下面例子看:

我们举例说明:

这里的被移除元素分为两类,一类是被移除元素没有子节点的,一类是被移除元素有子节点的。

从表中,根据二叉树的性质可以知道,位置索引大于等于队列长度一半的,没有子节点,小于队列长度一半的有子节点。

上一篇:Java 8简明教程
下一篇:ThreadLocal 内存泄露的实例分析

洛阳java培训班:为 Java 程序员而生的 10+ 最佳库

Java虚拟机学习(1):体系结构 内存模型

选择城市和中心
贵州省

广西省

海南省