当前位置: > > > SpringBoot - 并发框架Disruptor使用详解2(多生产者、多消费者、消费者依赖关系)

SpringBoot - 并发框架Disruptor使用详解2(多生产者、多消费者、消费者依赖关系)

三、多生产者、多消费者

1,多生产者

(1)下面测试样例在前文的基础上在增加一个生产者,即两个生产者、一个消费者。
注意:多生产者的时候要将 isMoreProducer 参数设置为 true,即 ProducerType 使用 ProducerType.MULTI。否则会出现数据丢失的情况。
public class DisruptorTest {
    public static void main(String[] args) throws InterruptedException {

        // 创建一个消费者
        MyConsumer myConsumer = new MyConsumer("---->消费者1");

        // 创建一个Disruptor队列操作类对象(RingBuffer大小为4,true表示有多个生产者)
        DisruptorQueue disruptorQueue = DisruptorQueueFactory.getHandleEventsQueue(4,
                true, myConsumer);

        // 创建两个生产者,开始模拟生产数据
        MyProducerThread myProducerThread1 = new MyProducerThread("11111生产者1", disruptorQueue);
        Thread t1 = new Thread(myProducerThread1);
        t1.start();
        MyProducerThread myProducerThread2 = new MyProducerThread("22222生产者2", disruptorQueue);
        Thread t2 = new Thread(myProducerThread2);
        t2.start();

        // 执行3s后,生产者不再生产
        Thread.sleep(3 * 1000);
        myProducerThread1.stopThread();
        myProducerThread2.stopThread();
    }
}

(2)运行结果如下,可以看到整个过程两个生产者前后一共生产了 7 个元素,并由消费者消费掉:

2,多消费者

(1)下面代码在上面的基础上增加一个消费者(即一共两个消费者),同时通过 DisruptorQueueFactory.getHandleEventsQueue 创建“发布订阅”模式的操作队列,可以看到同一事件会被多个消费者并行消费:
public class DisruptorTest {
    public static void main(String[] args) throws InterruptedException {

        // 创建两个消费者
        MyConsumer myConsumer1 = new MyConsumer("---->消费者1");
        MyConsumer myConsumer2 = new MyConsumer("---->消费者2");

        // 创建一个Disruptor队列操作类对象(RingBuffer大小为4,true表示有多个生产者)
        DisruptorQueue disruptorQueue = DisruptorQueueFactory.getHandleEventsQueue(4,
                true, myConsumer1, myConsumer2);

        // 创建两个生产者,开始模拟生产数据
        MyProducerThread myProducerThread1 = new MyProducerThread("11111生产者1", disruptorQueue);
        Thread t1 = new Thread(myProducerThread1);
        t1.start();
        MyProducerThread myProducerThread2 = new MyProducerThread("22222生产者2", disruptorQueue);
        Thread t2 = new Thread(myProducerThread2);
        t2.start();

        // 执行3s后,生产者不再生产
        Thread.sleep(3 * 1000);
        myProducerThread1.stopThread();
        myProducerThread2.stopThread();
    }
}

(2)我们也可以通过 DisruptorQueueFactory.getWorkPoolQueue 方法创建“点对点”模式的操作队列,这样同一事件只会被一组消费者其中之一消费:
public class DisruptorTest {
    public static void main(String[] args) throws InterruptedException {

        // 创建两个消费者
        MyConsumer myConsumer1 = new MyConsumer("---->消费者1");
        MyConsumer myConsumer2 = new MyConsumer("---->消费者2");

        // 创建一个Disruptor队列操作类对象(RingBuffer大小为4,true表示有多个生产者)
        DisruptorQueue disruptorQueue = DisruptorQueueFactory.getWorkPoolQueue(4,
                true, myConsumer1, myConsumer2);

        // 创建两个生产者,开始模拟生产数据
        MyProducerThread myProducerThread1 = new MyProducerThread("11111生产者1", disruptorQueue);
        Thread t1 = new Thread(myProducerThread1);
        t1.start();
        MyProducerThread myProducerThread2 = new MyProducerThread("22222生产者2", disruptorQueue);
        Thread t2 = new Thread(myProducerThread2);
        t2.start();

        // 执行3s后,生产者不再生产
        Thread.sleep(3 * 1000);
        myProducerThread1.stopThread();
        myProducerThread2.stopThread();
    }
}

四、消费者依赖关系

1,简单的依赖

(1)下面是一个简单的消费者依赖关系,消费者 C3 消费时,必须保证消费者 C1 和消费者 C2 已经完成对该消息的消费。举个例子,在处理实际的业务逻辑(C3)之前,需要校验数据(C1),以及将数据写入磁盘(C2)。

(2)下面是样例代码:
public class DisruptorTest {
    public static void main(String[] args) throws InterruptedException {

        // 创建两个消费者
        MyConsumer myConsumer1 = new MyConsumer("---->消费者C1");
        MyConsumer myConsumer2 = new MyConsumer("---->消费者C2");
        MyConsumer myConsumer3 = new MyConsumer("------->消费者C3");

        // 创建一个Disruptor对象
        Disruptor disruptor = new Disruptor(new ObjectEventFactory(),
                4, Executors.defaultThreadFactory(), ProducerType.SINGLE,
                new SleepingWaitStrategy());

        // 设置消费者依赖关系(先让C1和C2消费,再让C3消费)
        disruptor.handleEventsWith(myConsumer1, myConsumer2).then(myConsumer3);

        // 创建一个Disruptor队列操作类对象
        DisruptorQueue disruptorQueue = DisruptorQueueFactory.getQueue(disruptor);

        // 创建一个生产者,开始模拟生产数据
        MyProducerThread myProducerThread1 = new MyProducerThread("11111生产者1", disruptorQueue);
        Thread t1 = new Thread(myProducerThread1);
        t1.start();

        // 执行3s后,生产者不再生产
        Thread.sleep(3 * 1000);
        myProducerThread1.stopThread();
    }
}

2,复杂的消费者依赖关系

(1)下面是一个更加复杂的消费者依赖关系:
  • 消费者 C1B 消费时,必须保证消费者 C1A 已经完成对该消息的消费;
  • 消费者 C2B 消费时,必须保证消费者 C2A 已经完成对该消息的消费;
  • 消费者 C3 消费时,必须保证消费者 C1B C2B 已经完成对该消息的消费。

(2)下面是样例代码:
public class DisruptorTest {
    public static void main(String[] args) throws InterruptedException {

        // 创建两个消费者
        MyConsumer myConsumerC1A = new MyConsumer("---->消费者C1A");
        MyConsumer myConsumerC1B = new MyConsumer("------->消费者C1B");
        MyConsumer myConsumerC2A = new MyConsumer("---->消费者C2A");
        MyConsumer myConsumerC2B = new MyConsumer("------->消费者C2B");
        MyConsumer myConsumerC3 = new MyConsumer("----------->消费者C3");

        // 创建一个Disruptor对象
        Disruptor disruptor = new Disruptor(new ObjectEventFactory(),
                4, Executors.defaultThreadFactory(), ProducerType.SINGLE,
                new SleepingWaitStrategy());

        // 设置消费者依赖关系
        disruptor.handleEventsWith(myConsumerC1A, myConsumerC2A);
        disruptor.after(myConsumerC1A).then(myConsumerC1B);
        disruptor.after(myConsumerC2A).then(myConsumerC2B);
        disruptor.after(myConsumerC1B, myConsumerC2B).then(myConsumerC3);

        // 创建一个Disruptor队列操作类对象
        DisruptorQueue disruptorQueue = DisruptorQueueFactory.getQueue(disruptor);

        // 创建一个生产者,开始模拟生产数据
        MyProducerThread myProducerThread1 = new MyProducerThread("11111生产者1", disruptorQueue);
        Thread t1 = new Thread(myProducerThread1);
        t1.start();

        // 执行3s后,生产者不再生产
        Thread.sleep(3 * 1000);
        myProducerThread1.stopThread();
    }
}
评论0