关于BlockingQueue的实例
本篇文章将列举一个运用LinkedBlockingQueue完成一个生产消费模型的例子。
项目结构
+--queue | | | +ResourceQueue.java | +--res | | | +Resource.java | +Customer.java | +Processor.java | +Transporter.java | +Test.java
其中,queue是存放队列源码的包,res是存放资源源码的包。
queue
/** * @Author: StupidZhe * @Date: Created in 2017/11/27 * @Description: 存放资源的阻塞队列 */ public class ResourceQueue extends LinkedBlockingQueue<Resource> { }
res
/** * @Author: StupidZhe * @Date: Created in 2017/11/27 * @Description: 资源 */ public class Resource { public int getId() { return id; } public enum STATUS { NULL, TRANSPORT, PROCESS, FINISH } public STATUS status = STATUS.NULL; private final int id; public Resource(int id) { this.id = id; } public void transport() { this.status = STATUS.TRANSPORT; } public void process() { this.status = STATUS.PROCESS; } public void FINISH() { this.status = STATUS.FINISH; } @Override public String toString() { return "Resource id:" + id + ":" + status; } }
生产者消费者
Transporter.java
/** * @Author: StupidZhe * @Date: Created in 2017/11/27 * @Description: 运输线程 */ public class Transporter implements Runnable { public ResourceQueue transportQueue; private int count; public Transporter(ResourceQueue transportQueue) { this.transportQueue = transportQueue; } @Override public void run() { try { while (!Thread.interrupted()) { Resource resource = new Resource(++count); TimeUnit.SECONDS.sleep(1); System.out.println("add new " + resource); resource.transport(); System.out.println(resource); transportQueue.add(resource); } } catch (InterruptedException e) { e.printStackTrace(); } } }
Processor.java
/** * @Author: StupidZhe * @Date: Created in 2017/11/27 * @Description: 加工线程 */ public class Processor implements Runnable { private ResourceQueue transportQueue, processQueue; public Processor(ResourceQueue transportQueue, ResourceQueue processQueue) { this.transportQueue = transportQueue; this.processQueue = processQueue; } @Override public void run() { while(!Thread.interrupted()) { try { Resource resource = transportQueue.take(); resource.process(); System.out.println(resource); processQueue.add(resource); } catch (InterruptedException e) { e.printStackTrace(); } } } }
Customer.java
/** * @Author: StupidZhe * @Date: Created in 2017/11/27 * @Description: 消费线程 */ public class Customer implements Runnable { private ResourceQueue processQueue, customQueue; public Customer(ResourceQueue processQueue, ResourceQueue customQueue) { this.processQueue = processQueue; this.customQueue = customQueue; } @Override public void run() { while (!Thread.interrupted()) { try { Resource resource = processQueue.take(); resource.FINISH(); System.out.println(resource); customQueue.add(resource); } catch (InterruptedException e) { e.printStackTrace(); } } } }
Test.java
/** * @Author: StupidZhe * @Date: Created in 2017/11/27 * @Description: 测试类 */ public class Test { public static void main(String[] args) { ResourceQueue transportQueue = new ResourceQueue(); ResourceQueue processQueue = new ResourceQueue(); ResourceQueue customQueue = new ResourceQueue(); ExecutorService executor = Executors.newCachedThreadPool(); executor.execute(new Customer(processQueue, customQueue)); executor.execute(new Processor(transportQueue, processQueue)); executor.execute(new Transporter(transportQueue)); executor.shutdown(); } } ----output----- add new Resource id:1:NULL Resource id:1:TRANSPORT Resource id:1:PROCESS Resource id:1:FINISH add new Resource id:2:NULL Resource id:2:TRANSPORT Resource id:2:PROCESS Resource id:2:FINISH add new Resource id:3:NULL Resource id:3:TRANSPORT Resource id:3:PROCESS Resource id:3:FINISH ---------------