生产者-消费者问题是多线程环境中一个经典的同步问题。它描述了两个进程(这里是线程)——生产者和消费者,如何通过一个有限的缓冲区进行通信。生产者生成数据并将其放入缓冲区,而消费者则从缓冲区中取出数据并使用它。关键是要确保这两个进程不会同时对同一个数据项进行操作,这通常通过互斥锁实现。
javapublic class ProducerConsumerExample {
// 定义一个固定大小的缓冲区,用于存储生产者生成的数据项
private static final Object[] buffer = new Object[10];
// 计数器,用于追踪当前缓冲区中数据项的数量
private static int count = 0;
public static void main(String[] args) {
// 创建并启动生产者线程
Thread producerThread = new Thread(new Producer(), "ProducerThread");
// 创建并启动消费者线程
Thread consumerThread = new Thread(new Consumer(), "ConsumerThread");
producerThread.start();
consumerThread.start();
}
static class Producer implements Runnable {
@Override
public void run() {
while (true) { // 生产者持续运行
synchronized (buffer) { // 对缓冲区加锁以保证同步
// 如果缓冲区已满,则等待直到有空间可用
while (count == buffer.length) {
try {
System.out.println("Buffer is full, waiting...");
buffer.wait(); // Wait if the buffer is full
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
// 在缓冲区中放入一个新的数据项,并增加计数器
buffer[count++] = new Object(); // Produce an item
buffer.notifyAll(); // 通知所有可能在等待的消费者线程
System.out.println("Produced one item. Buffer size: " + count);
}
try {
Thread.sleep(100); // 模拟生产所需的时间
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
}
static class Consumer implements Runnable {
@Override
public void run() {
while (true) { // 消费者持续运行
synchronized (buffer) { // 对缓冲区加锁以保证同步
// 如果缓冲区为空,则等待直到有数据项可供消费
while (count == 0) {
try {
System.out.println("Buffer is empty, waiting...");
buffer.wait(); // Wait if the buffer is empty
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
// 从缓冲区取出一个数据项,并减少计数器
buffer[--count] = null; // Consume an item
buffer.notifyAll(); // 通知所有可能在等待的生产者线程
System.out.println("Consumed one item. Buffer size: " + count);
}
try {
Thread.sleep(150); // 模拟消费所需的时间
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
}
}
这段代码首先定义了一个固定大小的缓冲区(buffer),以及一个计数器count用于追踪当前缓冲区中的项目数量。Producer类实现了Runnable接口,并在其run方法中模拟生产过程。当缓冲区满时,生产者将等待直到有可用空间。Consumer类也实现了Runnable接口,负责从缓冲区消费项目。如果缓冲区为空,消费者也将等待直到有项目可供消费。在生产和消费操作之后,都会调用notifyAll()方法通知另一个可能正在等待的线程
java
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
public class ProducerConsumerWithBlockingQueue {
private static final int CAPACITY = 10; // 缓冲区大小
private static BlockingQueue<Integer> queue = new LinkedBlockingQueue<>(CAPACITY);
public static void main(String[] args) {
Thread producerThread = new Thread(new Producer(), "Producer");
Thread consumerThread = new Thread(new Consumer(), "Consumer");
producerThread.start();
consumerThread.start();
}
static class Producer implements Runnable {
@Override
public void run() {
while (true) {
try {
Integer value = produce(); // 生产数据
queue.put(value); // 将数据放入队列
System.out.println(Thread.currentThread().getName() + " produced: " + value);
} catch (InterruptedException e) {
Thread.currentThread().interrupt(); // 恢复中断状态
break;
}
try {
Thread.sleep(100); // 模拟生产时间
} catch (InterruptedException e) {
Thread.currentThread().interrupt(); // 恢复中断状态
}
}
}
private Integer produce() {
return (int) (Math.random() * 100); // 随机产生一个数
}
}
static class Consumer implements Runnable {
@Override
public void run() {
while (true) {
try {
Integer value = queue.take(); // 从队列中取数据
consume(value); // 处理数据
System.out.println(Thread.currentThread().getName() + " consumed: " + value);
} catch (InterruptedException e) {
Thread.currentThread().interrupt(); // 恢复中断状态
break;
}
try {
Thread.sleep(150); // 模拟消费时间
} catch (InterruptedException e) {
Thread.currentThread().interrupt(); // 恢复中断状态
}
}
}
private void consume(Integer value) {
// 这里可以进行实际的数据处理逻辑
}
}
}
在这个例子中,我们创建了一个固定容量为10的LinkedBlockingQueue实例。生产者线程不断地向队列中添加随机整数,而消费者线程则从中取出这些整数并打印出来。这里使用了put()方法来添加元素到队列中(如果队列已满,则会阻塞),以及take()方法来从队列中移除元素(如果队列为空,则会阻塞)。通过这种方式,我们无需手动管理同步或等待/通知机制,简化了多线程编程中的复杂性。
javaimport java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class ProducerConsumerWithCondition {
private static final int CAPACITY = 10; // 缓冲区大小
private final Lock lock = new ReentrantLock();
// 生产者使用的条件:当缓冲区满了的时候等待
private final Condition notFull = lock.newCondition();
// 消费者使用的条件:当缓冲区为空的时候等待
private final Condition notEmpty = lock.newCondition();
private final Object[] buffer = new Object[CAPACITY];
private int count = 0;
public static void main(String[] args) {
ProducerConsumerWithCondition pc = new ProducerConsumerWithCondition();
Thread producerThread = new Thread(pc.new Producer(), "Producer");
Thread consumerThread = new Thread(pc.new Consumer(), "Consumer");
producerThread.start();
consumerThread.start();
}
class Producer implements Runnable {
@Override
public void run() {
while (true) {
lock.lock();
try {
// 如果缓冲区已满,则等待直到有空间可用
while (count == CAPACITY) {
System.out.println("Buffer is full, waiting...");
notFull.await();
}
buffer[count++] = new Object(); // 生产一个项目
notEmpty.signal(); // 通知可能在等待的消费者
System.out.println(Thread.currentThread().getName() + " produced one item. Buffer size: " + count);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
lock.unlock();
}
try {
Thread.sleep(100); // 模拟生产时间
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
}
class Consumer implements Runnable {
@Override
public void run() {
while (true) {
lock.lock();
try {
// 如果缓冲区为空,则等待直到有项目可供消费
while (count == 0) {
System.out.println("Buffer is empty, waiting...");
notEmpty.await();
}
buffer[--count] = null; // 消费一个项目
notFull.signal(); // 通知可能在等待的生产者
System.out.println(Thread.currentThread().getName() + " consumed one item. Buffer size: " + count);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
lock.unlock();
}
try {
Thread.sleep(150); // 模拟消费时间
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
}
}
在这个例子中,我们首先定义了一个固定容量为10的缓冲区,并初始化了一个ReentrantLock实例以及两个Condition对象——notFull和notEmpty,分别用于生产者和消费者的同步控制。生产者在缓冲区满时调用notFull.await()方法等待,而消费者在缓冲区空时调用notEmpty.await()方法等待。每当生产或消费发生时,相应的条件(notEmpty或notFull)会通过调用signal()方法通知其他线程。这种方式提供了比简单的wait()/notifyAll()机制更加精细的控制。
本文作者:Weee
本文链接:
版权声明:本博客所有文章除特别声明外,均采用 BY-NC-SA 许可协议。转载请注明出处!