SpringBoot - 并发框架Disruptor使用详解2(多生产者、多消费者、消费者依赖关系)
三、多生产者、多消费者
1,多生产者
(1)下面测试样例在前文的基础上在增加一个生产者,即两个生产者、一个消费者。
注意:多生产者的时候要将 isMoreProducer 参数设置为 true,即 ProducerType 使用 ProducerType.MULTI。否则会出现数据丢失的情况。
public class DisruptorTest { public static void main(String[] args) throws InterruptedException { // 创建一个消费者 MyConsumer myConsumer = new MyConsumer("---->消费者1"); // 创建一个Disruptor队列操作类对象(RingBuffer大小为4,true表示有多个生产者) DisruptorQueue disruptorQueue = DisruptorQueueFactory.getHandleEventsQueue(4, true, myConsumer); // 创建两个生产者,开始模拟生产数据 MyProducerThread myProducerThread1 = new MyProducerThread("11111生产者1", disruptorQueue); Thread t1 = new Thread(myProducerThread1); t1.start(); MyProducerThread myProducerThread2 = new MyProducerThread("22222生产者2", disruptorQueue); Thread t2 = new Thread(myProducerThread2); t2.start(); // 执行3s后,生产者不再生产 Thread.sleep(3 * 1000); myProducerThread1.stopThread(); myProducerThread2.stopThread(); } }
(2)运行结果如下,可以看到整个过程两个生产者前后一共生产了 7 个元素,并由消费者消费掉:

2,多消费者
(1)下面代码在上面的基础上增加一个消费者(即一共两个消费者),同时通过 DisruptorQueueFactory.getHandleEventsQueue 创建“发布订阅”模式的操作队列,可以看到同一事件会被多个消费者并行消费:
public class DisruptorTest { public static void main(String[] args) throws InterruptedException { // 创建两个消费者 MyConsumer myConsumer1 = new MyConsumer("---->消费者1"); MyConsumer myConsumer2 = new MyConsumer("---->消费者2"); // 创建一个Disruptor队列操作类对象(RingBuffer大小为4,true表示有多个生产者) DisruptorQueue disruptorQueue = DisruptorQueueFactory.getHandleEventsQueue(4, true, myConsumer1, myConsumer2); // 创建两个生产者,开始模拟生产数据 MyProducerThread myProducerThread1 = new MyProducerThread("11111生产者1", disruptorQueue); Thread t1 = new Thread(myProducerThread1); t1.start(); MyProducerThread myProducerThread2 = new MyProducerThread("22222生产者2", disruptorQueue); Thread t2 = new Thread(myProducerThread2); t2.start(); // 执行3s后,生产者不再生产 Thread.sleep(3 * 1000); myProducerThread1.stopThread(); myProducerThread2.stopThread(); } }

(2)我们也可以通过 DisruptorQueueFactory.getWorkPoolQueue 方法创建“点对点”模式的操作队列,这样同一事件只会被一组消费者其中之一消费:
public class DisruptorTest { public static void main(String[] args) throws InterruptedException { // 创建两个消费者 MyConsumer myConsumer1 = new MyConsumer("---->消费者1"); MyConsumer myConsumer2 = new MyConsumer("---->消费者2"); // 创建一个Disruptor队列操作类对象(RingBuffer大小为4,true表示有多个生产者) DisruptorQueue disruptorQueue = DisruptorQueueFactory.getWorkPoolQueue(4, true, myConsumer1, myConsumer2); // 创建两个生产者,开始模拟生产数据 MyProducerThread myProducerThread1 = new MyProducerThread("11111生产者1", disruptorQueue); Thread t1 = new Thread(myProducerThread1); t1.start(); MyProducerThread myProducerThread2 = new MyProducerThread("22222生产者2", disruptorQueue); Thread t2 = new Thread(myProducerThread2); t2.start(); // 执行3s后,生产者不再生产 Thread.sleep(3 * 1000); myProducerThread1.stopThread(); myProducerThread2.stopThread(); } }

四、消费者依赖关系
1,简单的依赖
(1)下面是一个简单的消费者依赖关系,消费者 C3 消费时,必须保证消费者 C1 和消费者 C2 已经完成对该消息的消费。举个例子,在处理实际的业务逻辑(C3)之前,需要校验数据(C1),以及将数据写入磁盘(C2)。

(2)下面是样例代码:
public class DisruptorTest { public static void main(String[] args) throws InterruptedException { // 创建两个消费者 MyConsumer myConsumer1 = new MyConsumer("---->消费者C1"); MyConsumer myConsumer2 = new MyConsumer("---->消费者C2"); MyConsumer myConsumer3 = new MyConsumer("------->消费者C3"); // 创建一个Disruptor对象 Disruptor disruptor = new Disruptor(new ObjectEventFactory(), 4, Executors.defaultThreadFactory(), ProducerType.SINGLE, new SleepingWaitStrategy()); // 设置消费者依赖关系(先让C1和C2消费,再让C3消费) disruptor.handleEventsWith(myConsumer1, myConsumer2).then(myConsumer3); // 创建一个Disruptor队列操作类对象 DisruptorQueue disruptorQueue = DisruptorQueueFactory.getQueue(disruptor); // 创建一个生产者,开始模拟生产数据 MyProducerThread myProducerThread1 = new MyProducerThread("11111生产者1", disruptorQueue); Thread t1 = new Thread(myProducerThread1); t1.start(); // 执行3s后,生产者不再生产 Thread.sleep(3 * 1000); myProducerThread1.stopThread(); } }

2,复杂的消费者依赖关系
(1)下面是一个更加复杂的消费者依赖关系:- 消费者 C1B 消费时,必须保证消费者 C1A 已经完成对该消息的消费;
- 消费者 C2B 消费时,必须保证消费者 C2A 已经完成对该消息的消费;
- 消费者 C3 消费时,必须保证消费者 C1B 和 C2B 已经完成对该消息的消费。

public class DisruptorTest { public static void main(String[] args) throws InterruptedException { // 创建两个消费者 MyConsumer myConsumerC1A = new MyConsumer("---->消费者C1A"); MyConsumer myConsumerC1B = new MyConsumer("------->消费者C1B"); MyConsumer myConsumerC2A = new MyConsumer("---->消费者C2A"); MyConsumer myConsumerC2B = new MyConsumer("------->消费者C2B"); MyConsumer myConsumerC3 = new MyConsumer("----------->消费者C3"); // 创建一个Disruptor对象 Disruptor disruptor = new Disruptor(new ObjectEventFactory(), 4, Executors.defaultThreadFactory(), ProducerType.SINGLE, new SleepingWaitStrategy()); // 设置消费者依赖关系 disruptor.handleEventsWith(myConsumerC1A, myConsumerC2A); disruptor.after(myConsumerC1A).then(myConsumerC1B); disruptor.after(myConsumerC2A).then(myConsumerC2B); disruptor.after(myConsumerC1B, myConsumerC2B).then(myConsumerC3); // 创建一个Disruptor队列操作类对象 DisruptorQueue disruptorQueue = DisruptorQueueFactory.getQueue(disruptor); // 创建一个生产者,开始模拟生产数据 MyProducerThread myProducerThread1 = new MyProducerThread("11111生产者1", disruptorQueue); Thread t1 = new Thread(myProducerThread1); t1.start(); // 执行3s后,生产者不再生产 Thread.sleep(3 * 1000); myProducerThread1.stopThread(); } }
