FutureTask源码
简介
实现了Future、Runnable接口,可以看成一个带执行结果的线程任务;代码很简单,就不说太多了,有些基础请私下问我,或者找度娘
成员变量
private volatile int state;
private static final int NEW = 0;
private static final int COMPLETING = 1;
private static final int NORMAL = 2;
private static final int EXCEPTIONAL = 3;
private static final int CANCELLED = 4;
private static final int INTERRUPTING = 5;
private static final int INTERRUPTED = 6;
private Callable<V> callable;
private Object outcome;
private volatile Thread runner;
private volatile WaitNode waiters;
private static final sun.misc.Unsafe U = sun.misc.Unsafe.getUnsafe();
private static final long STATE;
private static final long RUNNER;
private static final long WAITERS;
static {
try {
STATE = U.objectFieldOffset
(FutureTask.class.getDeclaredField("state"));
RUNNER = U.objectFieldOffset
(FutureTask.class.getDeclaredField("runner"));
WAITERS = U.objectFieldOffset
(FutureTask.class.getDeclaredField("waiters"));
} catch (ReflectiveOperationException e) {
throw new Error(e);
}
Class<?> ensureLoaded = LockSupport.class;
}
关键信息解读
- state,waiters, runner都可以做原子操作(Unsafe类)
- state 有7种状态,0 可执行态,1是执行完毕态,2是可以获取结果状态,3是执行出现异常,4,5是未执行取消状态,6是执行过程中取消状态
- callable 具体执行动作
- outcome 执行结果
- runner 动作执行所在线程
- waiters 单向链表,表头,内部含有线程信息;表示获取结果的挂起线程队列
成员方法
构造方法
可以执行callble或者runnable对象,runnable对象使用适配器的方式,是配成新的callble对象,并且传入的result为结果;并确定当前状态为new,可执行状态
public FutureTask(Callable<V> callable) {
if (callable == null)
throw new NullPointerException();
this.callable = callable;
this.state = NEW;
}
public FutureTask(Runnable runnable, V result) {
this.callable = Executors.callable(runnable, result);
this.state = NEW;
}
report方法
如果正常执行完毕,则返回执行结果,否则,根据状态抛出异常
private V report(int s) throws ExecutionException {
Object x = outcome;
if (s == NORMAL)
return (V)x;
if (s >= CANCELLED)
throw new CancellationException();
throw new ExecutionException((Throwable)x);
}
状态判断
根据state对状态进行判断
public boolean isCancelled() {
return state >= CANCELLED;
}
public boolean isDone() {
return state != NEW;
}
cancel任务
用户主动取消任务,根据用户是否可打断、任务是否执行,来改变状态 4,5,6
public boolean cancel(boolean mayInterruptIfRunning) {
if (!(state == NEW &&
U.compareAndSwapInt(this, STATE, NEW,
mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
return false;
try { // in case call to interrupt throws exception
if (mayInterruptIfRunning) {
try {
Thread t = runner;
if (t != null)
t.interrupt();
} finally { // final state
U.putOrderedInt(this, STATE, INTERRUPTED);
}
}
} finally {
finishCompletion();
}
return true;
}
get获取执行结果
获取任务执行结果,最多等待时间模式,和无线等待时间模式
public V get() throws InterruptedException, ExecutionException {
int s = state;
if (s <= COMPLETING)
s = awaitDone(false, 0L);
return report(s);
}
/**
* @throws CancellationException {@inheritDoc}
*/
public V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
if (unit == null)
throw new NullPointerException();
int s = state;
if (s <= COMPLETING &&
(s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)
throw new TimeoutException();
return report(s);
}
set执行结果
设置执行结果,如果是执行时出现异常,则结果为异常对象,否则执行结果为正常运算结果;并进行状态置换,1是一个临时态,2,3是最终结果态
protected void set(V v) {
if (U.compareAndSwapInt(this, STATE, NEW, COMPLETING)) {
outcome = v;
U.putOrderedInt(this, STATE, NORMAL);
finishCompletion();
}
}
protected void setException(Throwable t) {
if (U.compareAndSwapInt(this, STATE, NEW, COMPLETING)) {
outcome = t;
U.putOrderedInt(this, STATE, EXCEPTIONAL);
finishCompletion();
}
}
任务执行项
具体执行任务动作,run执行返回结果,runAndReset执行不带结果,可再次执行
public void run() {
if (state != NEW ||
!U.compareAndSwapObject(this, RUNNER, null, Thread.currentThread()))
return;
try {
Callable<V> c = callable;
if (c != null && state == NEW) {
V result;
boolean ran;
try {
result = c.call();
ran = true;
} catch (Throwable ex) {
result = null;
ran = false;
setException(ex);
}
if (ran)
set(result);
}
} finally {
runner = null;
int s = state;
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
}
protected boolean runAndReset() {
if (state != NEW ||
!U.compareAndSwapObject(this, RUNNER, null, Thread.currentThread()))
return false;
boolean ran = false;
int s = state;
try {
Callable<V> c = callable;
if (c != null && s == NEW) {
try {
c.call(); // don't set result
ran = true;
} catch (Throwable ex) {
setException(ex);
}
}
} finally {
runner = null;
s = state;
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
return ran && s == NEW;
}
- 检查状态是否是可执行,并cas设置线程对象从null到当前线程
- 进行callable对象执行,如果出现异常,则设置异常结果,正常执行,返回正常结果
- 最终设置线程为空,如果状态执行时状态发生了变化,处理可能的取消打断操作
finishCompletion方法
结束任务方法,执行结束,打断结束,取消结束 waiters节点置空,并且清除队列,并且唤醒每个非空线程节点,callable对象置空;done方法可被继承,本类中什么都不做
private void finishCompletion() {
for (WaitNode q; (q = waiters) != null;) {
if (U.compareAndSwapObject(this, WAITERS, q, null)) {
for (;;) {
Thread t = q.thread;
if (t != null) {
q.thread = null;
LockSupport.unpark(t);
}
WaitNode next = q.next;
if (next == null)
break;
q.next = null;
q = next;
}
break;
}
}
done();
callable = null;
}
awaitDone方法
等待获取执行结果方法,传入参数有两种,(false, 0) (true,大于0的整数值);自旋处理
private int awaitDone(boolean timed, long nanos) throws InterruptedException {
long startTime = 0L;
WaitNode q = null;
boolean queued = false;
for (;;) {
int s = state;
if (s > COMPLETING) {
if (q != null)
q.thread = null;
return s;
}
else if (s == COMPLETING)
Thread.yield();
else if (Thread.interrupted()) {
removeWaiter(q);
throw new InterruptedException();
}
else if (q == null) {
if (timed && nanos <= 0L)
return s;
q = new WaitNode();
}
else if (!queued)
queued = U.compareAndSwapObject(this, WAITERS,q.next = waiters, q);
else if (timed) {
final long parkNanos;
if (startTime == 0L) {
startTime = System.nanoTime();
if (startTime == 0L)
startTime = 1L;
parkNanos = nanos;
} else {
long elapsed = System.nanoTime() - startTime;
if (elapsed >= nanos) {
removeWaiter(q);
return state;
}
parkNanos = nanos - elapsed;
}
if (state < COMPLETING)
LockSupport.parkNanos(this, parkNanos);
}
else
LockSupport.park(this);
}
}
自旋过程按照下面书序来执行
- 状态判断,有3中状态判断:已有结果态,马上有结果态,获取结果被打断状态
- 状态大于1,不是有结果了,就是异常了,就是被主动取消了,这时直接返回结果;状态等于1表示马上就到2或者3状态,下个循环就是状态大于1,直接返回结果
- 如果线程正在执行,且被打断,重置打断状态,并且异常当前线程节点,抛出打断异常
- 新建当前线程获取结果排队节点
- 加入获取结果队列中,并且置为waiters,之前waiters值为next指向节点
- 进行获取,有限时间内获取,或者无限制时间获取
- 如果有限时间内获取,如果超时,返回当前状态,如果未超时且当前状态在正在执行状态,则线程挂起计算时间
- 不限制时间获取,则直接等待
removeWaiter方法
自旋处理,删除节点线程属性为空的节点
private void removeWaiter(WaitNode node) {
if (node != null) {
node.thread = null;
retry:
for (;;) { // restart on removeWaiter race
for (WaitNode pred = null, q = waiters, s; q != null; q = s) {
s = q.next;
if (q.thread != null)
pred = q;
else if (pred != null) {
pred.next = s;
if (pred.thread == null)
continue retry;
}
else if (!U.compareAndSwapObject(this, WAITERS, q, s))
continue retry;
}
break;
}
}
}
处理思路:
- 如果当前第一个节点的线程为空,则把下个线程置为头节点,如果还空继续此过程,知道头节点线程不为空,这是pred指向头,q为头下个节点,s为q下个节点
- 如果q的线程为空,则把pred的next指向s
- 如果q的线程不为空,则继续移动这3个节点对象,直到q为空,退出自旋
源码总结:
- 获取结果为阻塞过程,会挂起线程;等待结果执行完毕会被全部唤醒
- 线程排队队列为单向链表,有指向下个节点属性;每次插入链表头部
- 执行过程,进行结果处理,和唤醒链表中节点线程
- 执行一次带结果动作,或者重复执行不带结果动作