编辑
2024-03-17
学习记录
00
请注意,本文编写于 382 天前,最后修改于 17 天前,其中某些信息可能已经过时。

目录

简介
wait和notityAll实现
BlockingQueue实现
Condition和Lock实现

简介

生产者-消费者问题是多线程环境中一个经典的同步问题。它描述了两个进程(这里是线程)——生产者和消费者,如何通过一个有限的缓冲区进行通信。生产者生成数据并将其放入缓冲区,而消费者则从缓冲区中取出数据并使用它。关键是要确保这两个进程不会同时对同一个数据项进行操作,这通常通过互斥锁实现。

wait和notityAll实现

java
public class ProducerConsumerExample { // 定义一个固定大小的缓冲区,用于存储生产者生成的数据项 private static final Object[] buffer = new Object[10]; // 计数器,用于追踪当前缓冲区中数据项的数量 private static int count = 0; public static void main(String[] args) { // 创建并启动生产者线程 Thread producerThread = new Thread(new Producer(), "ProducerThread"); // 创建并启动消费者线程 Thread consumerThread = new Thread(new Consumer(), "ConsumerThread"); producerThread.start(); consumerThread.start(); } static class Producer implements Runnable { @Override public void run() { while (true) { // 生产者持续运行 synchronized (buffer) { // 对缓冲区加锁以保证同步 // 如果缓冲区已满,则等待直到有空间可用 while (count == buffer.length) { try { System.out.println("Buffer is full, waiting..."); buffer.wait(); // Wait if the buffer is full } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } // 在缓冲区中放入一个新的数据项,并增加计数器 buffer[count++] = new Object(); // Produce an item buffer.notifyAll(); // 通知所有可能在等待的消费者线程 System.out.println("Produced one item. Buffer size: " + count); } try { Thread.sleep(100); // 模拟生产所需的时间 } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } } } static class Consumer implements Runnable { @Override public void run() { while (true) { // 消费者持续运行 synchronized (buffer) { // 对缓冲区加锁以保证同步 // 如果缓冲区为空,则等待直到有数据项可供消费 while (count == 0) { try { System.out.println("Buffer is empty, waiting..."); buffer.wait(); // Wait if the buffer is empty } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } // 从缓冲区取出一个数据项,并减少计数器 buffer[--count] = null; // Consume an item buffer.notifyAll(); // 通知所有可能在等待的生产者线程 System.out.println("Consumed one item. Buffer size: " + count); } try { Thread.sleep(150); // 模拟消费所需的时间 } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } } } }

这段代码首先定义了一个固定大小的缓冲区(buffer),以及一个计数器count用于追踪当前缓冲区中的项目数量。Producer类实现了Runnable接口,并在其run方法中模拟生产过程。当缓冲区满时,生产者将等待直到有可用空间。Consumer类也实现了Runnable接口,负责从缓冲区消费项目。如果缓冲区为空,消费者也将等待直到有项目可供消费。在生产和消费操作之后,都会调用notifyAll()方法通知另一个可能正在等待的线程

BlockingQueue实现

java
import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; public class ProducerConsumerWithBlockingQueue { private static final int CAPACITY = 10; // 缓冲区大小 private static BlockingQueue<Integer> queue = new LinkedBlockingQueue<>(CAPACITY); public static void main(String[] args) { Thread producerThread = new Thread(new Producer(), "Producer"); Thread consumerThread = new Thread(new Consumer(), "Consumer"); producerThread.start(); consumerThread.start(); } static class Producer implements Runnable { @Override public void run() { while (true) { try { Integer value = produce(); // 生产数据 queue.put(value); // 将数据放入队列 System.out.println(Thread.currentThread().getName() + " produced: " + value); } catch (InterruptedException e) { Thread.currentThread().interrupt(); // 恢复中断状态 break; } try { Thread.sleep(100); // 模拟生产时间 } catch (InterruptedException e) { Thread.currentThread().interrupt(); // 恢复中断状态 } } } private Integer produce() { return (int) (Math.random() * 100); // 随机产生一个数 } } static class Consumer implements Runnable { @Override public void run() { while (true) { try { Integer value = queue.take(); // 从队列中取数据 consume(value); // 处理数据 System.out.println(Thread.currentThread().getName() + " consumed: " + value); } catch (InterruptedException e) { Thread.currentThread().interrupt(); // 恢复中断状态 break; } try { Thread.sleep(150); // 模拟消费时间 } catch (InterruptedException e) { Thread.currentThread().interrupt(); // 恢复中断状态 } } } private void consume(Integer value) { // 这里可以进行实际的数据处理逻辑 } } }

在这个例子中,我们创建了一个固定容量为10的LinkedBlockingQueue实例。生产者线程不断地向队列中添加随机整数,而消费者线程则从中取出这些整数并打印出来。这里使用了put()方法来添加元素到队列中(如果队列已满,则会阻塞),以及take()方法来从队列中移除元素(如果队列为空,则会阻塞)。通过这种方式,我们无需手动管理同步或等待/通知机制,简化了多线程编程中的复杂性。

Condition和Lock实现

java
import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; public class ProducerConsumerWithCondition { private static final int CAPACITY = 10; // 缓冲区大小 private final Lock lock = new ReentrantLock(); // 生产者使用的条件:当缓冲区满了的时候等待 private final Condition notFull = lock.newCondition(); // 消费者使用的条件:当缓冲区为空的时候等待 private final Condition notEmpty = lock.newCondition(); private final Object[] buffer = new Object[CAPACITY]; private int count = 0; public static void main(String[] args) { ProducerConsumerWithCondition pc = new ProducerConsumerWithCondition(); Thread producerThread = new Thread(pc.new Producer(), "Producer"); Thread consumerThread = new Thread(pc.new Consumer(), "Consumer"); producerThread.start(); consumerThread.start(); } class Producer implements Runnable { @Override public void run() { while (true) { lock.lock(); try { // 如果缓冲区已满,则等待直到有空间可用 while (count == CAPACITY) { System.out.println("Buffer is full, waiting..."); notFull.await(); } buffer[count++] = new Object(); // 生产一个项目 notEmpty.signal(); // 通知可能在等待的消费者 System.out.println(Thread.currentThread().getName() + " produced one item. Buffer size: " + count); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } finally { lock.unlock(); } try { Thread.sleep(100); // 模拟生产时间 } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } } } class Consumer implements Runnable { @Override public void run() { while (true) { lock.lock(); try { // 如果缓冲区为空,则等待直到有项目可供消费 while (count == 0) { System.out.println("Buffer is empty, waiting..."); notEmpty.await(); } buffer[--count] = null; // 消费一个项目 notFull.signal(); // 通知可能在等待的生产者 System.out.println(Thread.currentThread().getName() + " consumed one item. Buffer size: " + count); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } finally { lock.unlock(); } try { Thread.sleep(150); // 模拟消费时间 } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } } } }

在这个例子中,我们首先定义了一个固定容量为10的缓冲区,并初始化了一个ReentrantLock实例以及两个Condition对象——notFull和notEmpty,分别用于生产者和消费者的同步控制。生产者在缓冲区满时调用notFull.await()方法等待,而消费者在缓冲区空时调用notEmpty.await()方法等待。每当生产或消费发生时,相应的条件(notEmpty或notFull)会通过调用signal()方法通知其他线程。这种方式提供了比简单的wait()/notifyAll()机制更加精细的控制。

本文作者:Weee

本文链接:

版权声明:本博客所有文章除特别声明外,均采用 BY-NC-SA 许可协议。转载请注明出处!