FutureTask(附源码解析)

FutureTask源码

简介

实现了Future、Runnable接口,可以看成一个带执行结果的线程任务;代码很简单,就不说太多了,有些基础请私下问我,或者找度娘

成员变量

    private volatile int state;
    private static final int NEW          = 0;
    private static final int COMPLETING   = 1;
    private static final int NORMAL       = 2;
    private static final int EXCEPTIONAL  = 3;
    private static final int CANCELLED    = 4;
    private static final int INTERRUPTING = 5;
    private static final int INTERRUPTED  = 6;

    private Callable<V> callable;
    private Object outcome;
    private volatile Thread runner;
    private volatile WaitNode waiters;

    private static final sun.misc.Unsafe U = sun.misc.Unsafe.getUnsafe();
    private static final long STATE;
    private static final long RUNNER;
    private static final long WAITERS;
    static {
        try {
            STATE = U.objectFieldOffset
                (FutureTask.class.getDeclaredField("state"));
            RUNNER = U.objectFieldOffset
                (FutureTask.class.getDeclaredField("runner"));
            WAITERS = U.objectFieldOffset
                (FutureTask.class.getDeclaredField("waiters"));
        } catch (ReflectiveOperationException e) {
            throw new Error(e);
        }

        Class<?> ensureLoaded = LockSupport.class;
    }

关键信息解读

  • state,waiters, runner都可以做原子操作(Unsafe类)
  • state 有7种状态,0 可执行态,1是执行完毕态,2是可以获取结果状态,3是执行出现异常,4,5是未执行取消状态,6是执行过程中取消状态
  • callable 具体执行动作
  • outcome 执行结果
  • runner 动作执行所在线程
  • waiters 单向链表,表头,内部含有线程信息;表示获取结果的挂起线程队列

成员方法

构造方法

可以执行callble或者runnable对象,runnable对象使用适配器的方式,是配成新的callble对象,并且传入的result为结果;并确定当前状态为new,可执行状态

    public FutureTask(Callable<V> callable) {
        if (callable == null)
            throw new NullPointerException();
        this.callable = callable;
        this.state = NEW;
    }

    public FutureTask(Runnable runnable, V result) {
        this.callable = Executors.callable(runnable, result);
        this.state = NEW;
    }

report方法

如果正常执行完毕,则返回执行结果,否则,根据状态抛出异常

    private V report(int s) throws ExecutionException {
        Object x = outcome;
        if (s == NORMAL)
            return (V)x;
        if (s >= CANCELLED)
            throw new CancellationException();
        throw new ExecutionException((Throwable)x);
    }

状态判断

根据state对状态进行判断

 public boolean isCancelled() {
        return state >= CANCELLED;
    }

    public boolean isDone() {
        return state != NEW;
    }

cancel任务

用户主动取消任务,根据用户是否可打断、任务是否执行,来改变状态 4,5,6

    public boolean cancel(boolean mayInterruptIfRunning) {
        if (!(state == NEW &&
              U.compareAndSwapInt(this, STATE, NEW,
                  mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
            return false;
        try {    // in case call to interrupt throws exception
            if (mayInterruptIfRunning) {
                try {
                    Thread t = runner;
                    if (t != null)
                        t.interrupt();
                } finally { // final state
                    U.putOrderedInt(this, STATE, INTERRUPTED);
                }
            }
        } finally {
            finishCompletion();
        }
        return true;
    }

get获取执行结果

获取任务执行结果,最多等待时间模式,和无线等待时间模式

    public V get() throws InterruptedException, ExecutionException {
        int s = state;
        if (s <= COMPLETING)
            s = awaitDone(false, 0L);
        return report(s);
    }

    /**
     * @throws CancellationException {@inheritDoc}
     */
    public V get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException {
        if (unit == null)
            throw new NullPointerException();
        int s = state;
        if (s <= COMPLETING &&
            (s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)
            throw new TimeoutException();
        return report(s);
    }

set执行结果

设置执行结果,如果是执行时出现异常,则结果为异常对象,否则执行结果为正常运算结果;并进行状态置换,1是一个临时态,2,3是最终结果态

   protected void set(V v) {
        if (U.compareAndSwapInt(this, STATE, NEW, COMPLETING)) {
            outcome = v;
            U.putOrderedInt(this, STATE, NORMAL); 
            finishCompletion();
        }
    }

    protected void setException(Throwable t) {
        if (U.compareAndSwapInt(this, STATE, NEW, COMPLETING)) {
            outcome = t;
            U.putOrderedInt(this, STATE, EXCEPTIONAL); 
            finishCompletion();
        }
    }

任务执行项

具体执行任务动作,run执行返回结果,runAndReset执行不带结果,可再次执行

public void run() {
        if (state != NEW ||
            !U.compareAndSwapObject(this, RUNNER, null, Thread.currentThread()))
            return;
        try {
            Callable<V> c = callable;
            if (c != null && state == NEW) {
                V result;
                boolean ran;
                try {
                    result = c.call();
                    ran = true;
                } catch (Throwable ex) {
                    result = null;
                    ran = false;
                    setException(ex);
                }
                if (ran)
                    set(result);
            }
        } finally {
            runner = null;
            int s = state;
            if (s >= INTERRUPTING)
                handlePossibleCancellationInterrupt(s);
        }
    }

    protected boolean runAndReset() {
        if (state != NEW ||
            !U.compareAndSwapObject(this, RUNNER, null, Thread.currentThread()))
            return false;
        boolean ran = false;
        int s = state;
        try {
            Callable<V> c = callable;
            if (c != null && s == NEW) {
                try {
                    c.call(); // don't set result
                    ran = true;
                } catch (Throwable ex) {
                    setException(ex);
                }
            }
        } finally {
            runner = null;
            s = state;
            if (s >= INTERRUPTING)
                handlePossibleCancellationInterrupt(s);
        }
        return ran && s == NEW;
    }
  • 检查状态是否是可执行,并cas设置线程对象从null到当前线程
  • 进行callable对象执行,如果出现异常,则设置异常结果,正常执行,返回正常结果
  • 最终设置线程为空,如果状态执行时状态发生了变化,处理可能的取消打断操作

finishCompletion方法

结束任务方法,执行结束,打断结束,取消结束 waiters节点置空,并且清除队列,并且唤醒每个非空线程节点,callable对象置空;done方法可被继承,本类中什么都不做

      private void finishCompletion() {
        for (WaitNode q; (q = waiters) != null;) {
            if (U.compareAndSwapObject(this, WAITERS, q, null)) {
                for (;;) {
                    Thread t = q.thread;
                    if (t != null) {
                        q.thread = null;
                        LockSupport.unpark(t);
                    }
                    WaitNode next = q.next;
                    if (next == null)
                        break;
                    q.next = null;
                    q = next;
                }
                break;
            }
        }
        done();
        callable = null;
    }

awaitDone方法

等待获取执行结果方法,传入参数有两种,(false, 0) (true,大于0的整数值);自旋处理

private int awaitDone(boolean timed, long nanos) throws InterruptedException {
        long startTime = 0L; 
        WaitNode q = null;
        boolean queued = false;
        for (;;) {
            int s = state;
            if (s > COMPLETING) {
                if (q != null)
                    q.thread = null;
                return s;
            }
            else if (s == COMPLETING)
                Thread.yield();
            else if (Thread.interrupted()) {
                removeWaiter(q);
                throw new InterruptedException();
            }
            else if (q == null) {
                if (timed && nanos <= 0L)
                    return s;
                q = new WaitNode();
            }
            else if (!queued)
                queued = U.compareAndSwapObject(this, WAITERS,q.next = waiters, q);
            else if (timed) {
                final long parkNanos;
                if (startTime == 0L) {
                    startTime = System.nanoTime();
                    if (startTime == 0L)
                        startTime = 1L;
                    parkNanos = nanos;
                } else {
                    long elapsed = System.nanoTime() - startTime;
                    if (elapsed >= nanos) {
                        removeWaiter(q);
                        return state;
                    }
                    parkNanos = nanos - elapsed;
                }
                if (state < COMPLETING)
                    LockSupport.parkNanos(this, parkNanos);
            }
            else
                LockSupport.park(this);
        }
    }

自旋过程按照下面书序来执行

  1. 状态判断,有3中状态判断:已有结果态,马上有结果态,获取结果被打断状态
  • 状态大于1,不是有结果了,就是异常了,就是被主动取消了,这时直接返回结果;状态等于1表示马上就到2或者3状态,下个循环就是状态大于1,直接返回结果
  • 如果线程正在执行,且被打断,重置打断状态,并且异常当前线程节点,抛出打断异常
  1. 新建当前线程获取结果排队节点
  2. 加入获取结果队列中,并且置为waiters,之前waiters值为next指向节点
  3. 进行获取,有限时间内获取,或者无限制时间获取
  • 如果有限时间内获取,如果超时,返回当前状态,如果未超时且当前状态在正在执行状态,则线程挂起计算时间
  • 不限制时间获取,则直接等待

removeWaiter方法

自旋处理,删除节点线程属性为空的节点

    private void removeWaiter(WaitNode node) {
        if (node != null) {
            node.thread = null;
            retry:
            for (;;) {          // restart on removeWaiter race
                for (WaitNode pred = null, q = waiters, s; q != null; q = s) {
                    s = q.next;
                    if (q.thread != null)
                        pred = q;
                    else if (pred != null) {
                        pred.next = s;
                        if (pred.thread == null)
                            continue retry;
                    }
                    else if (!U.compareAndSwapObject(this, WAITERS, q, s))
                        continue retry;
                }
                break;
            }
        }
    }

处理思路:

  • 如果当前第一个节点的线程为空,则把下个线程置为头节点,如果还空继续此过程,知道头节点线程不为空,这是pred指向头,q为头下个节点,s为q下个节点
  • 如果q的线程为空,则把pred的next指向s
  • 如果q的线程不为空,则继续移动这3个节点对象,直到q为空,退出自旋

源码总结:

  • 获取结果为阻塞过程,会挂起线程;等待结果执行完毕会被全部唤醒
  • 线程排队队列为单向链表,有指向下个节点属性;每次插入链表头部
  • 执行过程,进行结果处理,和唤醒链表中节点线程
  • 执行一次带结果动作,或者重复执行不带结果动作

   转载规则


《FutureTask(附源码解析)》 Malroy 采用 知识共享署名 4.0 国际许可协议 进行许可。
 上一篇
面试官:如果让你来设计一个 MQ,该如何下手? 面试官:如果让你来设计一个 MQ,该如何下手?
本文主要讲解 MQ 的通用知识,让大家先弄明白:如果让你来设计一个 MQ,该如何下手?需要考虑哪些问题?又有哪些技术挑战? 有了这个基础后,我相信后面再讲 Kafka 和 RocketMQ 这两种具体的消息中间件时,大家能很快地抓住主脉络,
2021-01-04
下一篇 
大话CY的那些日子 大话CY的那些日子
谈谈CY的日子其实还是很开心的拉。只是现在看到阿喵更开心~~ PS:不过阿喵 虽然被鸡哥哥坑掉了,但是后面还是去了hz,开发了云平台,还是很不错滴拉。!
2020-06-19
  目录