//: concurrency/PriorityBlockingQueueDemo.java import java.util.concurrent.*; import java.util.*; import static net.mindview.util.Print.*; class PrioritizedTask implements Runnable, Comparable { private Random rand = new Random(47); private static int counter = 0; private final int id = counter++; private final int priority; protected static List sequence = new ArrayList<>(); public PrioritizedTask(int priority) { this.priority = priority; sequence.add(this); } @Override public int compareTo(PrioritizedTask arg) { return priority < arg.priority ? 1 : (priority > arg.priority ? -1 : 0); } @Override public void run() { try { TimeUnit.MILLISECONDS.sleep(rand.nextInt(250)); } catch(InterruptedException e) { // Acceptable way to exit } print(this); } @Override public String toString() { return String.format("[%1$-3d]", priority) + " Task " + id; } public String summary() { return "(" + id + ":" + priority + ")"; } public static class EndSentinel extends PrioritizedTask { private ExecutorService exec; public EndSentinel(ExecutorService e) { super(-1); // Lowest priority in this program exec = e; } @Override public void run() { int count = 0; for(PrioritizedTask pt : sequence) { printnb(pt.summary()); if(++count % 5 == 0) print(); } print(); print(this + " Calling shutdownNow()"); exec.shutdownNow(); } } } class PrioritizedTaskProducer implements Runnable { private Random rand = new Random(47); private Queue queue; private ExecutorService exec; public PrioritizedTaskProducer( Queue q, ExecutorService e) { queue = q; exec = e; // Used for EndSentinel } @Override public void run() { // Unbounded queue; never blocks. // Fill it up fast with random priorities: for(int i = 0; i < 20; i++) { queue.add(new PrioritizedTask(rand.nextInt(10))); Thread.yield(); } // Trickle in highest-priority jobs: try { for(int i = 0; i < 10; i++) { TimeUnit.MILLISECONDS.sleep(250); queue.add(new PrioritizedTask(10)); } // Add jobs, lowest priority first: for(int i = 0; i < 10; i++) queue.add(new PrioritizedTask(i)); // A sentinel to stop all the tasks: queue.add(new PrioritizedTask.EndSentinel(exec)); } catch(InterruptedException e) { // Acceptable way to exit } print("Finished PrioritizedTaskProducer"); } } class PrioritizedTaskConsumer implements Runnable { private PriorityBlockingQueue q; public PrioritizedTaskConsumer( PriorityBlockingQueue q) { this.q = q; } @Override public void run() { try { while(!Thread.interrupted()) // Use current thread to run the task: q.take().run(); } catch(InterruptedException e) { // Acceptable way to exit } print("Finished PrioritizedTaskConsumer"); } } public class PriorityBlockingQueueDemo { public static void main(String[] args) throws Exception { ExecutorService exec = Executors.newCachedThreadPool(); PriorityBlockingQueue queue = new PriorityBlockingQueue<>(); exec.execute(new PrioritizedTaskProducer(queue, exec)); exec.execute(new PrioritizedTaskConsumer(queue)); } } /* (Execute to see output) *///:~