首页>>后端>>java->Disruptor

Disruptor

时间:2023-12-07 本站 点击:0

笔者在之前的文章中介绍过 RingBuffer 的一些理论知识和自己动手实现了一个简单的 RingBuffer 。而 Disruptor 就实现了一个高性能的 RingBuffer。下面对这个项目进行分析和一些简单的使用。用来解决笔者写的一个分布式ID生成器 rain 中雪花算法利用RingBuffer的缓存问题免得自己造轮子。

1. 背景

Disruptor是英国外汇交易公司LMAX开发的一个高性能队列,研发的初衷是解决内存队列的延迟问题(在性能测试中发现竟然与I/O操作处于同样的数量级)。基于Disruptor开发的系统单线程能支撑每秒600万订单,2010年在QCon演讲后,获得了业界关注。2011年,企业应用软件专家Martin Fowler专门撰写长文介绍。同年它还获得了Oracle官方的Duke大奖。

目前包括有Apache Storm、Camel、Log4j2在内的多个知名的项目使用了Disruptor,这也同时说明了Disruptor的高可用和高质量。

2. Disruptor介绍

Disruptor 是一个提供并发环形缓冲区数据结构的库。它旨在在异步事件处理架构中提供低延迟、高吞吐量的工作队列。本质上还是一个队列

2.1 核心概念

Ring Buffer:Ring Buffer 通常被认为是 Disruptor 的主要方面。但是,从 3.0 开始,Ring Buffer 只负责存储和更新Event通过 Disruptor 的数据。对于一些高级用例,它甚至可以完全由用户替换。

Sequence:Disruptor 使用Sequences 作为识别特定组件在哪里的一种手段。每个消费者(事件处理器)Sequence都像 Disruptor 本身一样维护一个。大多数并发代码依赖于这些序列值的移动,因此Sequence支持许多当前的特性AtomicLong。事实上,两者之间唯一真正的区别是Sequence包含额外的功能来防止Sequences 和其他值之间的错误共享。

Sequencer:Sequencer 是 Disruptor 的真正核心。该接口的两种实现(单生产者、多生产者)实现了所有并发算法,以在生产者和消费者之间快速、正确地传递数据。

Sequence Barrier:Sequencer 生成一个 Sequence Barrier,其中包含对Sequence从 Sequencer 发布的 main 和Sequence任何依赖消费者的 s 的引用。它包含确定是否有任何事件可供消费者处理的逻辑。

Wait Strategy:等待策略决定了消费者将如何等待生产者将事件放入 Disruptor。有关可选无锁的部分提供了更多详细信息。

Event:从生产者传递给消费者的数据单位。事件没有特定的代码表示,因为它完全由用户定义。

Event Processor:用于处理来自 Disruptor 的事件的主事件循环,并拥有消费者序列的所有权。有一个称为 BatchEventProcessor 的表示,它包含事件循环的有效实现,并将回调到 EventHandler 接口的使用提供的实现。

Event Handler:由用户实现的接口,代表 Disruptor 的消费者。

Producer:这是调用 Disruptor 入队Event的用户代码。这个概念在代码中也没有表示。

示意图如下:

3. 示例

maven依赖引入:

<dependency>  <groupId>com.lmax</groupId>  <artifactId>disruptor</artifactId>  <version>4.0.0.RC1</version></dependency>

3.1 消费和生产

/** * @author mxsm * @date 2022/5/28 10:58 * @Since 1.0.0 */public class UidEvent {    private long value;    public void set(long value) {        this.value = value;    }}

然后构造一个EventFactory:

/** * @author mxsm * @date 2022/5/28 11:00 * @Since 1.0.0 */public class UidEventFactory implements EventFactory<UidEvent> {    @Override    public UidEvent newInstance() {        return new UidEvent();    }}

定义一个消息处理器:

/** * @author mxsm * @date 2022/5/28 14:55 * @Since 1.0.0 */public class UidEventHandler implements EventHandler<UidEvent> {    @Override    public void onEvent(UidEvent event, long sequence, boolean endOfBatch) {        System.out.println(event.getValue());    }}

编写一个启动类:

public class UidEventMain {    public static void main(String[] args) throws Exception    {        int bufferSize = 1024;        Disruptor<UidEvent> disruptor =            new Disruptor<>(UidEvent::new, bufferSize, DaemonThreadFactory.INSTANCE);        disruptor.handleEventsWith(new UidEventHandler());        disruptor.start();        RingBuffer<UidEvent> ringBuffer = disruptor.getRingBuffer();        ByteBuffer bb = ByteBuffer.allocate(8);        for (long l = 0; ; l++)        {            bb.putLong(0, l);            ringBuffer.publishEvent((event, sequence, buffer) -> event.set(buffer.getLong(0)), bb);            Thread.sleep(1000);        }    }}

运行结果:

3. disruptor性能测试

多生产者和单生产者的性能对比(图片来自官网):

在官网还对比了java.util.concurrent.ArrayBlockingQueue 和 Disruptor 的性能。 生产者和消费模式如下图(图来自官网):

两者对比图:

从上图可以看出来Disruptor 的性能比 ABQ 高出一个数量级。

延迟表现:

从官网和使用了Disruptor 的项目都提供了很大的性能提升。

我是蚂蚁背大象??,文章对你有帮助点赞关注我,文章有不正确的地方请您斧正留言评论~谢谢??!

参考文档:

https://lmax-exchange.github.io/disruptor/user-guide/index.html

https://logging.apache.org/log4j/2.x/manual/async.html

https://lmax-exchange.github.io/disruptor/disruptor.html

https://lmax-exchange.github.io/disruptor/

原文:https://juejin.cn/post/7102686333264461832


本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:/java/18658.html