想复习一下生产者和消费者通过Java代码如何实现,网上搜集了一个,《Thinking in Java》上面有两个,实现各有侧重。与大家分享,也当自己学习。
介绍:
生产者、消费者简单说这个模型核心角色有3个,即生产者、消费者、产品(关键区)。
生产者和消费者对产品(关键区)的操作时要互斥,保证并发时的正确性。
代码实现:
网络上最常见也是最简单的实现,直接对关键区加锁。生产者、消费者公共使用主线程MultiThread的container。通过对container进行synchronized控制,使用wait,notify来控制进程间的交替。
这种实现方式实现需要注意的有以下几点:
1.对关键区进行synchronized控制。
2.只有线程获取了关键区的访问权,才可以通过关键区对象调用notify,notifyAll之类的方法。否则就会抛出IllegalMonitorStateException的异常。
3.wait,notify为Object的方法,在该模式下都是关键区对象,如container来调用相关方法,而非其他线程。
消费者代码
package ProductAndConsume;
import java.util.List;
public class Consume implements Runnable{
private List container = null;
private int count;
public Consume(List lst){
this.container = lst;
}
public void run() {
while(true){
synchronized (container) {
if(container.size()== 0){
try {
container.wait();//容器为空,放弃锁,等待生产
} catch (InterruptedException e) {
e.printStackTrace();
}
}
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
container.remove(0);
container.notify();
System.out.println("我吃了"+(++count)+"个");
}
}
}
}
生产者:
package ProductAndConsume;
import java.util.List;
public class Product implements Runnable {
private List container = null;
private int count;
public Product(List lst) {
this.container = lst;
}
public void run() {
while (true) {
synchronized (container) {
if (container.size() > MultiThread.MAX) {
//如果容器超过了最大值,就不要在生产了,等待消费
try {
container.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
container.add(new Object());
container.notify();
System.out.println("我生产了"+(++count)+"个");
}
}
}
}
主线程,包括公共资源。
package ProductAndConsume;
import java.util.ArrayList;
import java.util.List;
public class MultiThread {
private List container = new ArrayList();
public final static int MAX = 5;
public static void main(String args[]){
MultiThread m = new MultiThread();
new Thread(new Consume(m.getContainer())).start();
new Thread(new Consume(m.getContainer())).start();
new Thread(new Product(m.getContainer())).start();
new Thread(new Consume(m.getContainer())).start();
new Thread(new Product(m.getContainer())).start();
}
public List getContainer() {
return container;
}
public void setContainer(List container) {
this.container = container;
}
}
Thinking in Java上面关于消费者、生产者的实现与上面方式基本相同。唯一不同的是,线程直接对自身加锁,而非关键区。因为线程中保留了对关键区的一个引用。
这种设计方式很严谨,同时考虑了信号量丢失的问题。最后的打印结果也很值得分析。
Out of food, closing
Order up!
Chef interrupted
WaitPerson interrupted
当食物数量满了之后,关闭线程池被关闭,interrupt所有线程。此时线程在调用sleep或wait方法,就会抛出InterruptedException异常,从而打印出上面的结果。
实现代码如下:
package TIJ4PAC;
/**
* 生产者,消费者完整的例子
*/
//: concurrency/Restaurant.java
// The producer-consumer approach to task cooperation.
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
class Meal {
private final int orderNum;
public Meal(int orderNum) { this.orderNum = orderNum; }
public String toString() { return "Meal " + orderNum; }
}
class WaitPerson implements Runnable {
private Restaurant restaurant;
public WaitPerson(Restaurant r) { restaurant = r; }
public void run() {
try {
while(!Thread.interrupted()) {
synchronized(this) {
while(restaurant.meal == null)
wait(); // ... for the chef to produce a meal,防止其他服务员突然闯入,夺走订单
}
System.out.println("Waitperson got " + restaurant.meal);
synchronized(restaurant.chef) {
restaurant.meal = null;
restaurant.chef.notifyAll(); // Ready for another
}
}
} catch(InterruptedException e) {
System.out.println("WaitPerson interrupted");
}
}
}
class Chef implements Runnable {
private Restaurant restaurant;
private int count = 0;
public Chef(Restaurant r) { restaurant = r; }
public void run() {
try {
while(!Thread.interrupted()) {
synchronized(this) {
while(restaurant.meal != null)
wait(); // ... for the meal to be taken,防止其他初始突然闯入,夺走机会
}
if(++count == 10) {
System.out.println("Out of food, closing");
restaurant.exec.shutdownNow();
}
System.out.println("Order up! ");
synchronized(restaurant.waitPerson) {
restaurant.meal = new Meal(count);
restaurant.waitPerson.notifyAll();
}
TimeUnit.MILLISECONDS.sleep(100);
}
} catch(InterruptedException e) {
System.out.println("Chef interrupted");
}
}
}
public class Restaurant {
Meal meal;
ExecutorService exec = Executors.newCachedThreadPool();
WaitPerson waitPerson = new WaitPerson(this);
Chef chef = new Chef(this);
public Restaurant() {
exec.execute(chef);
exec.execute(waitPerson);
}
public static void main(String[] args) {
new Restaurant();
}
} /* Output:
Order up!
Waitperson got Meal 1
Order up!
Waitperson got Meal 2
Order up!
Waitperson got Meal 3
Order up!
Waitperson got Meal 4
Order up!
Waitperson got Meal 5
Order up!
Waitperson got Meal 6
Order up!
Waitperson got Meal 7
Order up!
Waitperson got Meal 8
Order up!
Waitperson got Meal 9
Out of food, closing
Order up!
Chef interrupted
WaitPerson interrupted
*///:~
最后一种实现,采用了BlockingQueue的方式,采用这种方式的好处就是,生产者、消费者无需再对产品(关键区)进行加锁控制了。BlockingQueue的put和take方法都会自动对关键区进行互斥的,无需编码者手动控制。代码结构简单许多。
下面这个例子挺有意思,通过线程模拟制作吐司面包,然后抹黄油,最后加果酱,吃吐司的过程。老外就是幽默呀~
代码如下:
package BlockingQueue;
//: concurrency/ToastOMatic.java
// A toaster that uses queues.
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
class Toast {
public enum Status { DRY, BUTTERED, JAMMED }
private Status status = Status.DRY;
private final int id;
public Toast(int idn) { id = idn; }
public void butter() { status = Status.BUTTERED; } //抹黄油
public void jam() { status = Status.JAMMED; } //涂果酱
public Status getStatus() { return status; }
public int getId() { return id; }
public String toString() {
return "Toast " + id + ": " + status;
}
}
class ToastQueue extends LinkedBlockingQueue<Toast> {}
class Toaster implements Runnable { //制作土司的线程
private ToastQueue toastQueue;
private int count = 0;
private Random rand = new Random(47);
public Toaster(ToastQueue tq) { toastQueue = tq; }
public void run() {
try {
while(!Thread.interrupted()) {
TimeUnit.MILLISECONDS.sleep(
100 + rand.nextInt(500));
// Make toast
Toast t = new Toast(count++);
System.out.println(t);
// Insert into queue
toastQueue.put(t);
}
} catch(InterruptedException e) {
System.out.println("Toaster interrupted");
}
System.out.println("Toaster off");
}
}
// Apply butter to toast:
class Butterer implements Runnable { //抹黄油的人
private ToastQueue dryQueue, butteredQueue;
public Butterer(ToastQueue dry, ToastQueue buttered) {
dryQueue = dry;
butteredQueue = buttered;
}
public void run() {
try {
while(!Thread.interrupted()) {
// Blocks until next piece of toast is available:
Toast t = dryQueue.take();
t.butter();
System.out.println(t);
butteredQueue.put(t);
}
} catch(InterruptedException e) {
System.out.println("Butterer interrupted");
}
System.out.println("Butterer off");
}
}
// Apply jam to buttered toast:
class Jammer implements Runnable { //擦果酱的人
private ToastQueue butteredQueue, finishedQueue;
public Jammer(ToastQueue buttered, ToastQueue finished) {
butteredQueue = buttered;
finishedQueue = finished;
}
public void run() {
try {
while(!Thread.interrupted()) {
// Blocks until next piece of toast is available:
Toast t = butteredQueue.take();
t.jam();
System.out.println(t);
finishedQueue.put(t);
}
} catch(InterruptedException e) {
System.out.println("Jammer interrupted");
}
System.out.println("Jammer off");
}
}
// Consume the toast:
class Eater implements Runnable { //吃吐司的人
private ToastQueue finishedQueue;
private int counter = 0;
public Eater(ToastQueue finished) {
finishedQueue = finished;
}
public void run() {
try {
while(!Thread.interrupted()) {
// Blocks until next piece of toast is available:
Toast t = finishedQueue.take();
// Verify that the toast is coming in order,
// and that all pieces are getting jammed:
if(t.getId() != counter++ ||
t.getStatus() != Toast.Status.JAMMED) {
System.out.println(">>>> Error: " + t);
System.exit(1);
} else
System.out.println("Chomp! " + t);
}
} catch(InterruptedException e) {
System.out.println("Eater interrupted");
}
System.out.println("Eater off");
}
}
public class ToastOMatic {
public static void main(String[] args) throws Exception {
ToastQueue dryQueue = new ToastQueue(),
butteredQueue = new ToastQueue(),
finishedQueue = new ToastQueue(); //生成3个队列。分别是干吐司,抹了黄油的吐司,抹了果酱的吐司(完成的吐司)
ExecutorService exec = Executors.newCachedThreadPool();
exec.execute(new Toaster(dryQueue)); //制作
exec.execute(new Butterer(dryQueue, butteredQueue)); //抹黄油
exec.execute(new Jammer(butteredQueue, finishedQueue)); //抹果酱
exec.execute(new Eater(finishedQueue)); //吃
TimeUnit.SECONDS.sleep(5);
exec.shutdownNow();
}
} /* (Execute to see output) *///:~
//使用BolckingQueue 简化明显,在使用显式的wait和notifyAll方法时存在的类和类直接的耦合被消除了,每一个类都和它的BlockinQueue通讯
就与大家分享到这里,代码如附件,如有错误欢迎指正。
分享到:
相关推荐
LinuxC语言实现生产者和消费者模型LinuxC语言实现生产者和消费者模型LinuxC语言实现生产者和消费者模型LinuxC语言实现生产者和消费者模型LinuxC语言实现生产者和消费者模型LinuxC语言实现生产者和消费者模型LinuxC...
多线程实现生产者消费者模型:锁(Lock)、信号量(Semaphore、BoundedSemaphore)、条件(Condition)、队列(Queue)、事件(Event) 多进程程实现生产者消费者模型:信号量(Semaphore)、条件(Condition)、...
Qt入门练习项目——生产者消费者模型 Qt入门练习项目源码,通过本项目从而了解Qt,进行一个简单的入门。 本文件是对应项目源码,希望多多交流。
java生产者消费者模型
java多线程经典模型生产者消费者java多线程经典模型生产者消费者java多线程经典模型生产者消费者java多线程经典模型生产者消费者java多线程经典模型生产者消费者java多线程经典模型生产者消费者java多线程经典模型...
内容:编程实现生产者-消费者问题的模拟。 基本要求: 1. 生产者消费者对缓冲区进行互斥操作。 2. 缓冲区大小为10,缓冲区满则不允许生产者生产数据,缓冲区空则不允许消费者消费数据。 3. 生产者消费者各循环操作10...
用Qt实现的生产者消费者模型(多进程),操作系统课程作业。 4消费者,4生产者,12缓冲区,可视化展示。
使用wait()和notify()实现的生产者与消费者模型,可以了解如何使用wait()和notify()进行线程间通信。(上一次上传的代码有一个问题没有考虑到,这次修补了——CSDN没法撤销资源,只能再上传了)
结合System V信号量机制,利用Linux下的多线程库实现了Linux下的操作系统生产者-消费者模型,具体原理可参考博文:: http://blog.csdn.net/Mikeoperfect/article/details/79431642
LINUX下的生产者消费者模型算法 利用进程模拟生产者消费者,利用共享存储区模拟缓冲区
生产者消费者为模型的多线程编程的c#实现,原创包括代码和程序,算法核心已经封装。可重用。
C++实现生产与消费者模型,利用锁实现互斥。
大学计算机专业,操作系统实验报告,生产者消费者模型模拟进程调度
生产者,消费者,线程同步,模型,锁
内容概要:基于QT Creator环境的多线程编程验证互斥的原理,通过使用semaphore,mutex等控制变量,实现对生产者消费者模型的真实模拟。 适合人群:初学操作系统的学生 能学到什么:调用了Qt多线程类库QThread进行...
设计目的:通过研究Linux 的进程机制和信号量实现生产者消费者问题的并发控制。说明:有界缓冲区内设有20 个存储单元,放入/取出的数据项设定为1‐20 这20 个整型数。设计要求:1)每个生产者和消费者对有界缓冲区...
基于Linux C++的条件变量实现的生产者消费者模型,旨在进一步帮助读者理解条件变量的使用。
这是本人学习labview生产者消费者模型时,收藏的资料,里面包含生产消费者模型的文档介绍和例程,保证资源真实有效,例程能跑。