戳蓝字「TopCoder 」关注我们哦!
TTL(transmittable-thread-local)是一个线程间传递ThreadLocal,异步执行时上下文传递的解决方案。 整个库的核心是构建在TransmittableThreadLocal类(继承并加强InheritableThreadLocal类)之上,同时包含线程池修饰(ExecutorService/ForkJoinPool/TimerTask)以及Java Agent支持,代码小于1k行,短小精悍。
在往下看之前,最好大致看下 https://github.com/alibaba/transmittable-thread-local 文档,效果会更好。JDK的InheritableThreadLocal类可以完成父线程到子线程的值传递。但对于使用线程池等会池化复用线程的组件的情况,线程由线程池创建好,并且线程是池化起来反复使用的;这时父子线程关系的ThreadLocal值传递已经没有意义,应用需要的实际上是把 任务提交给线程池时的ThreadLocal值传递 到 任务执行时 。原理是使用TtlRunnable/Ttlcallable包装了Runnable/Callable类:
-
在TtlRunnable/Ttlcallable初始化时 capture TransmittableThreadLocal变量
-
在run方法调用runnable.run()前进行 replay ,设置到当前线程ThreadLocal
-
在run方法调用runnable.run()后进行 restore ,上下文还原,也就是replay的反向操作
注意,步骤1和步骤2/3不是在同一个线程中执行的。
既然TTL的TransmittableThreadLocal是继承并加强InheritableThreadLocal类的,那么首先需要分析下InheritableThreadLocal是什么东东,源码如下:
public class InheritableThreadLocal<T> extends ThreadLocal<T> {
/**
* 新建线程时,如果当前inheritableThreadLocals非空,则会获取当前inheritableThreadLocals传递给新线程
*/
protected T childValue(T parentValue) {
return parentValue;
}
/**
* InheritableThreadLocal变量的set/get/remove操作都是在inheritableThreadLocals上
*/
ThreadLocalMap getMap(Thread t) {
return t.inheritableThreadLocals;
}
/**
* 创建inheritableThreadLocals
*/
void createMap(Thread t, T firstValue) {
t.inheritableThreadLocals = new ThreadLocalMap(this, firstValue);
}
}
Thread类中有两个ThreadLocal相关的ThreadLocalMap属性,如下:
ThreadLocal.ThreadLocalMap threadLocals:ThreadLocal变量使用 ThreadLocal.ThreadLocalMap inheritableThreadLocals:InheritableThreadLocal变量使用
新建线程时,将当前线程的inheritableThreadLocals传递给新线程,这里的传递是对InheritableThreadLocal变量的数据做浅拷贝(引用复制),这样新线程可以使用同一个InheritableThreadLocal变量查看上一个线程的数据。
下面分析下使用InheritableThreadLocal的一个demo:
void testInheritableThreadLocal_线程池() throws InterruptedException {
final InheritableThreadLocal<String> parent = new InheritableThreadLocal<>();
parent.set("value-set-in-parent");
ExecutorService executor = Executors.newFixedThreadPool(1);
executor.submit(() -> System.out.println(Thread.currentThread().getName() + ": " + parent.get()));
}
// 输出结果:pool-1-thread-1: value-set-in-parent
上面代码在submit任务时会伴随着(线程池工作)线程的创建,会继承当前线程的InheritableThreadLocal,所以会有上述输出结果。如果将代码改成下面的样子,会有什么不同呢?
void testInheritableThreadLocal_线程池() throws InterruptedException {
ExecutorService executor = Executors.newFixedThreadPool(1);
executor.submit(() -> {}); // 先进行工作线程创建
final InheritableThreadLocal<String> parent = new InheritableThreadLocal<>();
parent.set("value-set-in-parent");
executor.submit(() -> System.out.println(Thread.currentThread().getName() + ": " + parent.get()));
}
// 输出结果:pool-1-thread-1: null
因为创建线程时当前线程并没有inheritableThreadLocals,所以线程池中线程打印结果为null。这种场景下如何才能获取到parent变量的数据呢?这时就该TTL出场了,将上述代码改成TTL方式如下:
void testTtlInheritableThreadLocal_线程池() throws InterruptedException {
ExecutorService executor = Executors.newFixedThreadPool(1);
executor.submit(() -> {}); // 先进行工作线程创建
// 使用TTL
final TransmittableThreadLocal<String> parent = new TransmittableThreadLocal<>();
parent.set("value-set-in-parent");
// 将Runnable通过TtlRunnable包装下
executor.submit(TtlRunnable.get(() -> System.out.println(Thread.currentThread().getName() + ": " + parent.get())));
}
// 输出结果:pool-1-thread-1: value-set-in-parent
下面以TtlRunnable.get()为起点分析TTL的设计实现,TtlRunnable.get源码如下(TtlRunnable.get流程对应的初始化时 capture 操作,保存快照。TtlCallable和TtlRunnable流程类似):
public static TtlRunnable get(@Nullable Runnable runnable) {
return get(runnable, false, false);
}
public static TtlRunnable get(@Nullable Runnable runnable, boolean releaseTtlValueReferenceAfterRun, boolean idempotent) {
if (runnable instanceof TtlEnhanced) {
// 幂等时直接返回,否则执行会产生问题,直接抛异常
if (idempotent) return (TtlRunnable) runnable;
else throw new IllegalStateException("Already TtlRunnable!");
}
return new TtlRunnable(runnable, releaseTtlValueReferenceAfterRun);
}
private TtlRunnable(@Nonnull Runnable runnable, boolean releaseTtlValueReferenceAfterRun) {
this.capturedRef = new AtomicReference<Object>(capture());
this.runnable = runnable;
this.releaseTtlValueReferenceAfterRun = releaseTtlValueReferenceAfterRun;
}
public static Object capture() {
Map<TransmittableThreadLocal<?>, Object> captured = new HashMap<TransmittableThreadLocal<?>, Object>();
// 从holder获取所有threadLocal,存到captured,这里相当于对当前线程holder做一个快照保存
// 到TtlRunnable实例属性中,在执行TtlRunnable时进行回放
for (TransmittableThreadLocal<?> threadLocal : holder.get().keySet()) {
captured.put(threadLocal, threadLocal.copyValue());
}
return captured;
}
在新建TtlRunnable过程中,会保存下TransmittableThreadLocal.holder到captured,记录到TtlRunnable实例中的capturedRef字段,TransmittableThreadLocal.holder类型是:
// Note about holder:
// 1. The value of holder is type Map<TransmittableThreadLocal<?>, ?> (WeakHashMap implementation),
// but it is used as *set*. 因为没有WeakSet的原因
// 2. WeakHashMap support null value.
private static InheritableThreadLocal<Map<TransmittableThreadLocal<?>, ?>> holder =
new InheritableThreadLocal<Map<TransmittableThreadLocal<?>, ?>>() {
@Override
protected Map<TransmittableThreadLocal<?>, ?> initialValue() {
return new WeakHashMap<TransmittableThreadLocal<?>, Object>();
}
@Override
protected Map<TransmittableThreadLocal<?>, ?> childValue(Map<TransmittableThreadLocal<?>, ?> parentValue) {
return new WeakHashMap<TransmittableThreadLocal<?>, Object>(parentValue);
}
};
从上面代码我们知道初始化TtlRunnable时已经将TransmittableThreadLocal保存下来了,那么什么时候应用到当前线程ThreadLocal中呢,这是就需要看下TtlRunnable.run方法:
public void run() {
Object captured = capturedRef.get();
// captured不应该为空,releaseTtlValueReferenceAfterRun为true时设置capturedRef为null,防止当前Runnable重复执行
if (captured == null || releaseTtlValueReferenceAfterRun && !capturedRef.compareAndSet(captured, null)) {
throw new IllegalStateException("TTL value reference is released after run!");
}
// captured进行回放,应用到当前线程中
Object backup = replay(captured);
try {
runnable.run();
} finally {
restore(backup);
}
}
注意,TTL中的replay操作是以captured为当前inheritableThreadLocals的(处理逻辑是在TtlRunable run时,会以TtlRunnable.get时间点获取的captured(类似TTL快照)为准,holder中不在captured的先移除,在的会被替换)。关于replay中的clean extra TTL value讨论可以参考:https://github.com/alibaba/transmittable-thread-local/issues/134。回放captured和执行完runnable.run之后,再restore恢复到原来inheritableThreadLocals的状态。
说完了TTL中的capture、replay和restore流程,再看下官方提供的这个时序图,是不是感觉清晰很多。
除了通过TtlRunable.get()修饰用户自定义的task之外,还可以修饰线程池和使用Java Agent修饰JDK线程池实现类的方式实现TTL功能。
修饰线程池省去每次Runnable和Callable传入线程池时的修饰,这个逻辑可以在线程池中完成,其实就是在提交task时调用TtlRunable.get()修饰下。通过 工具 类com.alibaba.ttl.threadpool.TtlExecutors完成,有下面的方法:
-
getTtlExecutor:修饰接口Executor
-
getTtlExecutorService:修饰接口ExecutorService
-
getTtlScheduledExecutorService:修饰接口ScheduledExecutorService
使用Java Agent来修饰JDK线程池实现类,这种方式,实现线程池的传递是透明的,代码中没有修饰Runnable或是线程池的代码。即可以做到应用代码无侵入。关于 无侵入 的更多说明参见文档 Java Agent方式对应用代码无侵入。
使用Java Agent 实例如下:
// 启动参数中需加上 -javaagent:path/transmittable-thread-local-x.x.x.jar
ExecutorService executor = Executors.newFixedThreadPool(1);
executor.submit(() -> {});
final TransmittableThreadLocal<String> parent = new TransmittableThreadLocal<>();
parent.set("value-set-in-parent");
executor.submit(() -> System.out.println(Thread.currentThread().getName() + ": " + parent.get()));
executor.shutdown();
TTL代码实现来看,确实短小精悍,值得花几个小时看下源码。通过看源码,我发现了,可以通过new ThreadLocal对象时,直接重写其initialValue方法,可以在threadLocal.get为空时初始化一个值,使用示例如下:
ThreadLocal<String> local = new ThreadLocal<String>() {
@Override
protected String initialValue() {
return "init";
}
};
System.out.println(local.get()); // init
local.set("hello world");
System.out.println(local.get()); // hello world
local.remove();
System.out.println(local.get()); // init
以上所述就是小编给大家介绍的《Transmittable-Thread-Local:阿里开源的线程间上下文传递解决方案》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!
猜你喜欢:- 82、flask之基于DBUtils实现数据库连接池、本地线程、上下文
- DDD:识别限界上下文以及理解上下文映射
- DDD:识别限界上下文以及理解上下文映射
- 如何划分限界上下文
- 详解Flask上下文
- Spring 上下文的加载
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
Writing Windows VxDs and Device Drivers, Second Edition
Karen Hazzah / CMP / 1996-01-12 / USD 54.95
Software developer and author Karen Hazzah expands her original treatise on device drivers in the second edition of "Writing Windows VxDs and Device Drivers." The book and companion disk include the a......一起来看看 《Writing Windows VxDs and Device Drivers, Second Edition》 这本书的介绍吧!