- JUC中的有界队列ArrayBlockingQueue和LinkedBlockingQueue,都是基于 ReentrantLock
- 在高并发场景下,锁的效率并不高,Disruptor是一款 性能更高 的有界内存队列
- Disruptor高性能的原因
- 内存分配更合理,使用 RingBuffer ,数组元素在初始化时 一次性 全部创建
- 提升缓存命中率 ,对象循环利用, 避免频繁GC
- 能够 避免伪共享 ,提升缓存利用率
- 采用 无锁算法 ,避免频繁加锁、解锁的性能消耗
- 支持 批量消费 ,消费者可以以无锁的方式消费多个消息
- 内存分配更合理,使用 RingBuffer ,数组元素在初始化时 一次性 全部创建
简单使用
public class DisruptorExample { public static void main(String[] args) throws InterruptedException { // RingBuffer大小,必须是2的N次方 int bufferSize = 1024; // 构建Disruptor Disruptor<LongEvent> disruptor = new Disruptor<>(LongEvent::new, bufferSize, DaemonThreadFactory.INSTANCE); // 注册事件处理器 disruptor.handleEventsWith((event, sequence, endOfBatch) -> System.out.println("E: " + event)); // 启动Disruptor disruptor.start(); RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer(); // 生产Event ByteBuffer bb = ByteBuffer.allocate(8); for (long l = 0; true; l++) { bb.putLong(0, l); // 生产者生产消息 ringBuffer.publishEvent((event, sequence, buffer) -> event.setValue(buffer.getLong(0)), bb); TimeUnit.SECONDS.sleep(1); } } } @Data class LongEvent { private long value; }
- 在Disruptor中,生产者生产的对象和消费者消费的对象称为Event,使用Disruptor必须定义Event
- 构建Disruptor对象需要传入EventFactory(LongEvent::new)
- 消费Disruptor中的Event需要通过handleEventsWith方法注册一个事件处理器
- 发布Event需要通过publishEvent方法
优点
RingBuffer
局部性原理
- 在一段时间内程序的执行会限定在一个局部范围内,包括时间局部性和空间局部性
- 时间局部性
- 程序中的某条 指令 一旦被执行,不久之后这条指令很可能被再次执行
- 如果某条 数据 被访问,不久之后这条数据很可能被再次访问
- 空间局部性
- 某块 内存 一旦被访问,不久之后这块内存 附近 的内存也有可能被访问
- CPU缓存利用了程序的局部性原理
- CPU从内存中加载数据X时,会将数据X及其 附近 的数据缓存在高速Cache中
- 如果程序能够很好地体现出局部性原理,就能更好地利用CPU缓存,从而提升程序的性能
ArrayBlockingQueue
- 生产者向ArrayBlockingQueue增加一个元素之前,都需要先创建对象E
- 创建这些元素的时间基本上是 离散 的,所以这些元素的内存地址大概率也 不是连续 的
Disruptor
// com.lmax.disruptor.RingBufferFields for (int i = 0; i < bufferSize; i++) { entries[BUFFER_PAD + i] = eventFactory.newInstance(); }
- Disruptor内部的RingBuffer也是用数组实现的
- 但这个数组中的所有元素在初始化时是 一次性 全部创建,所以这些元素的内存地址大概率是 连续 的
- 如果数组中所有元素的内存地址是 连续 的,能够提升性能
- 消费者线程在消费的时候,遵循 空间局部性原理 ,消费完第1个元素,很快就会消费第2个元素
- 而在消费第1个元素的时候,CPU会把内存中E1后面的数据也加载进高速Cache
- 如果E1和E2是连续的,那么E2也就会被加载进高速Cache
- 当消费第2个元素的时候,由于E2已经在高速Cache中了,不再需要从内存中加载,能大大提升性能
- 另外在Disruptor中,生产者线程通过publishEvent发布Event时,并不是创建一个新的Event
- 而是通过event.setValue来修改Event,即 循环利用 RingBuffer中的Event
- 这样能避免频繁创建和销毁Event而导致的 GC问题
避免伪共享
伪共享
- CPU缓存内部是按照 缓存行 (Cache Line)进行管理的,一个缓存行通常为 64 Bytes
- CPU从内存中加载数据X,会同时加载后面(64-size(X))个字节的数据
ArrayBlockingQueue
/** The queued items */ final Object[] items; /** items index for next take, poll, peek or remove */ int takeIndex; /** items index for next put, offer, or add */ int putIndex; /** Number of elements in the queue */ int count;
- 当CPU从内存中加载takeIndex时,会同时将putIndex和count都加载进高速Cache
- 假设线程A运行在CPU-1上,执行入队操作,入队操作会修改putIndex
- 而修改putIndex会导致CPU-2上putIndex所在的缓存行失效
- 假设线程B运行在CPU-2上,执行出队操作,出队操作需要读取takeIndex
- 但由于takeIndex所在的缓存行已经失效,所以CPU-2必须从 内存 中重新读取
- 入队操作本身不会修改takeIndex,但由于takeIndex和putIndex共享同一个缓存行
- 导致出队操作不能很好地利用Cache,这就是伪共享
- 伪共享:由于 共享缓存行而导致缓存无效 的场景
- ArrayBlockingQueue的入队操作和出队操作是用 锁 来保证互斥的,所以入队和出队不会同时发生
- 如果允许入队和出队同时发生,可以采用 缓存行填充 ,保证每个变量 独占一个缓存行
- 如果想让takeIndex独占一个缓存行,可以在takeIndex的前后各填充 56 个字节
Disruptor
// 前:填充56字节 class LhsPadding { protected long p1, p2, p3, p4, p5, p6, p7; } class Value extends LhsPadding { protected volatile long value; } // 后:填充56字节 class RhsPadding extends Value { protected long p9, p10, p11, p12, p13, p14, p15; } public class Sequence extends RhsPadding { }
Contended
- Java 8引入了@sun.misc.Contended注解,能够轻松避免伪共享,需要设置JVM参数-XX:RestrictContended
- 避免伪共享是以 牺牲内存 为代价的
无锁算法
- ArrayBlockingQueue利用 管程 实现,生产和消费都需要 加锁 ,实现简单,但 性能不太理想
- Disruptor采用的是 无锁 算法,实现复杂,核心操作是生产和消费,最复杂的是入队操作
- 对于入队操作,不能覆盖没有消费的元素,对于出队操作,不能读取没有写入的元素
- Disruptor中的RingBuffer维护了入队索引,但没有维护出队索引
- 因为Disruptor支持多个消费者同时消费,每个消费者都会有一个出队索引
- 所以RingBuffer的 出队索引 是所有消费者里 最小 的一个
- 入队逻辑: 如果没有足够的空余位置,就出让CPU使用权,然后重新计算,反之使用CAS设置入队索引
// com.lmax.disruptor.MultiProducerSequencer public long next(int n) { if (n < 1) { throw new IllegalArgumentException("n must be > 0"); } long current; long next; // 生产者获取n个写入位置 do { // current相当于入队索引,表示上次生产到这里 current = cursor.get(); // 目标是再生产n个 next = current + n; // 减掉一个循环 long wrapPoint = next - bufferSize; // 获取上一次的最小消费位置 long cachedGatingSequence = gatingSequenceCache.get(); // 没有足够的空余位置 if (wrapPoint > cachedGatingSequence || cachedGatingSequence > current) { // 重新计算所有消费者里面的最小值位置 long gatingSequence = Util.getMinimumSequence(gatingSequences, current); // 仍然没有足够的空余位置,出让CPU使用权,重新执行下一循环 if (wrapPoint > gatingSequence) { LockSupport.parkNanos(1); // TODO, should we spin based on the wait strategy? continue; } // 重新设置上一次的最小消费位置 gatingSequenceCache.set(gatingSequence); } else if (cursor.compareAndSet(current, next)) { // 获取写入位置成功,跳出循环 break; } } while (true); return next; }
转载请注明出处:http://zhongmingmao.me/2019/05/31/java-concurrent-disruptor/
访问原文「 Java并发 -- Disruptor 」获取最佳阅读体验并参与讨论
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网
猜你喜欢:- Java并发系列—并发编程基础
- [Java并发-17-并发设计模式] Immutability模式:如何利用不变性解决并发问题?
- JAVA并发编程之并发模拟工具
- Java并发系列—并发编程的挑战
- Core Java 并发:理解并发概念
- [Java并发-11] 并发容器的使用
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。