`

生产者消费者模型的演变

阅读更多

想复习一下生产者和消费者通过Java代码如何实现,网上搜集了一个,《Thinking in Java》上面有两个,实现各有侧重。与大家分享,也当自己学习。

 

介绍:

生产者、消费者简单说这个模型核心角色有3个,即生产者、消费者、产品(关键区)。

生产者和消费者对产品(关键区)的操作时要互斥,保证并发时的正确性。

 

 

代码实现:

网络上最常见也是最简单的实现,直接对关键区加锁。生产者、消费者公共使用主线程MultiThread的container。通过对container进行synchronized控制,使用wait,notify来控制进程间的交替。

这种实现方式实现需要注意的有以下几点:

1.对关键区进行synchronized控制。

2.只有线程获取了关键区的访问权,才可以通过关键区对象调用notify,notifyAll之类的方法。否则就会抛出IllegalMonitorStateException的异常。

3.wait,notify为Object的方法,在该模式下都是关键区对象,如container来调用相关方法,而非其他线程。

 

消费者代码

package ProductAndConsume; 
import java.util.List; 

public class Consume implements Runnable{ 
	private List container = null; 
	private int count; 
	public Consume(List lst){ 
		this.container = lst; 
	} 
	public void run() { 

		while(true){ 
			synchronized (container) { 
				if(container.size()== 0){ 
					try { 
						container.wait();//容器为空,放弃锁,等待生产 
					} catch (InterruptedException e) { 
						e.printStackTrace(); 
					} 
				} 
				try { 
					Thread.sleep(100); 
				} catch (InterruptedException e) { 
					e.printStackTrace(); 
				} 
				container.remove(0); 
				container.notify(); 
				System.out.println("我吃了"+(++count)+"个"); 
			} 
		} 

	} 

} 

 生产者:

package ProductAndConsume; 
import java.util.List; 

public class Product implements Runnable { 
	private List container = null; 
	private int count; 
	public Product(List lst) { 
		this.container = lst; 
	} 

	public void run() { 
		while (true) { 
			synchronized (container) { 
				if (container.size() > MultiThread.MAX) { 
					//如果容器超过了最大值,就不要在生产了,等待消费 
					try { 
						container.wait(); 
					} catch (InterruptedException e) { 
						e.printStackTrace(); 
					} 
				} 
				try { 
					Thread.sleep(100); 
				} catch (InterruptedException e) { 
					e.printStackTrace(); 
				} 
				container.add(new Object()); 
				container.notify(); 
				System.out.println("我生产了"+(++count)+"个"); 
			} 
		} 

	} 

} 

 主线程,包括公共资源。

package ProductAndConsume; 
import java.util.ArrayList; 
import java.util.List; 

public class MultiThread { 
	private List container = new ArrayList(); 
	public final static int MAX = 5; 
	public static void main(String args[]){ 

		MultiThread m = new MultiThread(); 

		new Thread(new Consume(m.getContainer())).start();
		new Thread(new Consume(m.getContainer())).start();
		new Thread(new Product(m.getContainer())).start(); 
		new Thread(new Consume(m.getContainer())).start(); 
		new Thread(new Product(m.getContainer())).start(); 
	} 
	public List getContainer() { 
		return container; 
	} 

	public void setContainer(List container) { 
		this.container = container; 
	}
}

  

Thinking in Java上面关于消费者、生产者的实现与上面方式基本相同。唯一不同的是,线程直接对自身加锁,而非关键区。因为线程中保留了对关键区的一个引用。

这种设计方式很严谨,同时考虑了信号量丢失的问题。最后的打印结果也很值得分析。

Out of food, closing
Order up! 
Chef interrupted
WaitPerson interrupted 

当食物数量满了之后,关闭线程池被关闭,interrupt所有线程。此时线程在调用sleep或wait方法,就会抛出InterruptedException异常,从而打印出上面的结果。

实现代码如下:

package TIJ4PAC;
/**
 * 生产者,消费者完整的例子
 */
//: concurrency/Restaurant.java
// The producer-consumer approach to task cooperation.
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

class Meal {
  private final int orderNum;
  public Meal(int orderNum) { this.orderNum = orderNum; }
  public String toString() { return "Meal " + orderNum; }
}

class WaitPerson implements Runnable {
  private Restaurant restaurant;
  public WaitPerson(Restaurant r) { restaurant = r; }
  public void run() {
    try {
      while(!Thread.interrupted()) {
        synchronized(this) {
          while(restaurant.meal == null)
            wait(); // ... for the chef to produce a meal,防止其他服务员突然闯入,夺走订单
        }
        System.out.println("Waitperson got " + restaurant.meal);
        synchronized(restaurant.chef) {
          restaurant.meal = null;
          restaurant.chef.notifyAll(); // Ready for another
        }
      }
    } catch(InterruptedException e) {
      System.out.println("WaitPerson interrupted");
    }
  }
}

class Chef implements Runnable {
  private Restaurant restaurant;
  private int count = 0;
  public Chef(Restaurant r) { restaurant = r; }
  public void run() {
    try {
      while(!Thread.interrupted()) {
        synchronized(this) {
          while(restaurant.meal != null)
            wait(); // ... for the meal to be taken,防止其他初始突然闯入,夺走机会
        }
        if(++count == 10) {
          System.out.println("Out of food, closing");
          restaurant.exec.shutdownNow();
        }
        System.out.println("Order up! ");
        synchronized(restaurant.waitPerson) {
          restaurant.meal = new Meal(count);
          restaurant.waitPerson.notifyAll();
        }
        TimeUnit.MILLISECONDS.sleep(100);
      }
    } catch(InterruptedException e) {
      System.out.println("Chef interrupted");
    }
  }
}

public class Restaurant {
  Meal meal;
  ExecutorService exec = Executors.newCachedThreadPool();
  WaitPerson waitPerson = new WaitPerson(this);
  Chef chef = new Chef(this);
  public Restaurant() {
    exec.execute(chef);
    exec.execute(waitPerson);
  }
  public static void main(String[] args) {
    new Restaurant();
  }
} /* Output:
Order up! 
Waitperson got Meal 1
Order up! 
Waitperson got Meal 2
Order up! 
Waitperson got Meal 3
Order up! 
Waitperson got Meal 4
Order up! 
Waitperson got Meal 5
Order up! 
Waitperson got Meal 6
Order up! 
Waitperson got Meal 7
Order up! 
Waitperson got Meal 8
Order up! 
Waitperson got Meal 9
Out of food, closing
Order up! 
Chef interrupted
WaitPerson interrupted
*///:~

 

最后一种实现,采用了BlockingQueue的方式,采用这种方式的好处就是,生产者、消费者无需再对产品(关键区)进行加锁控制了。BlockingQueue的put和take方法都会自动对关键区进行互斥的,无需编码者手动控制。代码结构简单许多。

下面这个例子挺有意思,通过线程模拟制作吐司面包,然后抹黄油,最后加果酱,吃吐司的过程。老外就是幽默呀~

代码如下:

package BlockingQueue;

//: concurrency/ToastOMatic.java
// A toaster that uses queues.
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;

class Toast {
	public enum Status { DRY, BUTTERED, JAMMED }
	private Status status = Status.DRY;
	private final int id;
	public Toast(int idn) { id = idn; }
	public void butter() { status = Status.BUTTERED; }	//抹黄油
	public void jam() { status = Status.JAMMED; }	//涂果酱
	public Status getStatus() { return status; }
	public int getId() { return id; }
	public String toString() {
		return "Toast " + id + ": " + status;
	}
}

class ToastQueue extends LinkedBlockingQueue<Toast> {}

class Toaster implements Runnable {	//制作土司的线程
	private ToastQueue toastQueue;
	private int count = 0;
	private Random rand = new Random(47);
	public Toaster(ToastQueue tq) { toastQueue = tq; }
	public void run() {
		try {
			while(!Thread.interrupted()) {
				TimeUnit.MILLISECONDS.sleep(
						100 + rand.nextInt(500));
				// Make toast
				Toast t = new Toast(count++);
				System.out.println(t);
				// Insert into queue
				toastQueue.put(t);
			}
		} catch(InterruptedException e) {
			System.out.println("Toaster interrupted");
		}
		System.out.println("Toaster off");
	}
}

// Apply butter to toast:
class Butterer implements Runnable {	//抹黄油的人
	private ToastQueue dryQueue, butteredQueue;
	public Butterer(ToastQueue dry, ToastQueue buttered) {
		dryQueue = dry;
		butteredQueue = buttered;
	}
	public void run() {
		try {
			while(!Thread.interrupted()) {
				// Blocks until next piece of toast is available:
				Toast t = dryQueue.take();
				t.butter();
				System.out.println(t);
				butteredQueue.put(t);
			}
		} catch(InterruptedException e) {
			System.out.println("Butterer interrupted");
		}
		System.out.println("Butterer off");
	}
}

// Apply jam to buttered toast:
class Jammer implements Runnable {	//擦果酱的人
	private ToastQueue butteredQueue, finishedQueue;
	public Jammer(ToastQueue buttered, ToastQueue finished) {
		butteredQueue = buttered;
		finishedQueue = finished;
	}
	public void run() {
		try {
			while(!Thread.interrupted()) {
				// Blocks until next piece of toast is available:
				Toast t = butteredQueue.take();
				t.jam();
				System.out.println(t);
				finishedQueue.put(t);
			}
		} catch(InterruptedException e) {
			System.out.println("Jammer interrupted");
		}
		System.out.println("Jammer off");
	}
}

// Consume the toast:
class Eater implements Runnable {	//吃吐司的人
	private ToastQueue finishedQueue;
	private int counter = 0;
	public Eater(ToastQueue finished) {
		finishedQueue = finished;
	}
	public void run() {
		try {
			while(!Thread.interrupted()) {
				// Blocks until next piece of toast is available:
				Toast t = finishedQueue.take();
				// Verify that the toast is coming in order,
				// and that all pieces are getting jammed:
				if(t.getId() != counter++ ||
						t.getStatus() != Toast.Status.JAMMED) {
					System.out.println(">>>> Error: " + t);
					System.exit(1);
				} else
					System.out.println("Chomp! " + t);
			}
		} catch(InterruptedException e) {
			System.out.println("Eater interrupted");
		}
		System.out.println("Eater off");
	}
}

public class ToastOMatic {
	public static void main(String[] args) throws Exception {
		ToastQueue dryQueue = new ToastQueue(),
		butteredQueue = new ToastQueue(),
		finishedQueue = new ToastQueue();	//生成3个队列。分别是干吐司,抹了黄油的吐司,抹了果酱的吐司(完成的吐司)
		
		ExecutorService exec = Executors.newCachedThreadPool();
		exec.execute(new Toaster(dryQueue));	//制作
		exec.execute(new Butterer(dryQueue, butteredQueue));	//抹黄油
		exec.execute(new Jammer(butteredQueue, finishedQueue));		//抹果酱
		exec.execute(new Eater(finishedQueue));		//吃
		TimeUnit.SECONDS.sleep(5);
		exec.shutdownNow();
	}
} /* (Execute to see output) *///:~
//使用BolckingQueue 简化明显,在使用显式的wait和notifyAll方法时存在的类和类直接的耦合被消除了,每一个类都和它的BlockinQueue通讯

 

就与大家分享到这里,代码如附件,如有错误欢迎指正。

分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics