diff --git a/lowlevel/DelayQueueDemo.java b/lowlevel/DelayQueueDemo.java index 4fc0bba2..e2fbb295 100644 --- a/lowlevel/DelayQueueDemo.java +++ b/lowlevel/DelayQueueDemo.java @@ -63,14 +63,11 @@ public class DelayQueueDemo { new Random(47).ints(20, 0, 4000) .mapToObj(DelayedTask::new), // Add the summarizing task: - Stream.of( - new DelayedTask.EndTask(4000))) + Stream.of(new DelayedTask.EndTask(4000))) .collect(Collectors .toCollection(DelayQueue::new)); - DelayQueue delayQueue = - new DelayQueue<>(tasks); - while(delayQueue.size() > 0) - delayQueue.take().run(); + while(tasks.size() > 0) + tasks.take().run(); } } /* Output: diff --git a/lowlevel/PriorityBlockingQueueDemo.java b/lowlevel/PriorityBlockingQueueDemo.java index 32cbebef..f9159846 100644 --- a/lowlevel/PriorityBlockingQueueDemo.java +++ b/lowlevel/PriorityBlockingQueueDemo.java @@ -2,148 +2,142 @@ // (c)2017 MindView LLC: see Copyright.txt // We make no guarantees that this code is fit for any purpose. // Visit http://OnJava8.com for more book information. -import java.util.concurrent.*; import java.util.*; +import java.util.stream.*; +import java.util.concurrent.*; +import java.util.concurrent.atomic.*; import onjava.Nap; -class PrioritizedTask implements -Runnable, Comparable { - private SplittableRandom rand = new SplittableRandom(47); - private static int counter = 0; - private final int id = counter++; +class Prioritized implements Comparable { + private static AtomicInteger counter = + new AtomicInteger(); + private final int id = counter.getAndAdd(1); private final int priority; - protected static List sequence = - new ArrayList<>(); - public PrioritizedTask(int priority) { + private static List sequence = + new CopyOnWriteArrayList<>(); + public Prioritized(int priority) { this.priority = priority; sequence.add(this); } @Override - public int compareTo(PrioritizedTask arg) { + public int compareTo(Prioritized arg) { return priority < arg.priority ? 1 : (priority > arg.priority ? -1 : 0); } @Override - public void run() { - new Nap(rand.nextInt(250)); - System.out.println(this); - } - @Override public String toString() { - return String.format("[%1$-3d]", priority) + - " Task " + id; + return String.format( + "[%d] Prioritized %d", priority, id); } - public String summary() { - return "(" + id + ":" + priority + ")"; + public void displaySequence() { + int count = 0; + for(Prioritized pt : sequence) { + System.out.printf("(%d:%d)", pt.id, pt.priority); + if(++count % 5 == 0) + System.out.println(); + } } - public static - class EndSentinel extends PrioritizedTask { - private ExecutorService exec; - public EndSentinel(ExecutorService e) { - super(-1); // Lowest priority in this program - exec = e; - } - @Override - public void run() { - int count = 0; - for(PrioritizedTask pt : sequence) { - System.out.print(pt.summary()); - if(++count % 5 == 0) - System.out.println(); - } - System.out.println(); - System.out.println(this + " Calling shutdownNow()"); - exec.shutdownNow(); - } + public static class EndSentinel extends Prioritized { + public EndSentinel() { super(-1); } } } -class PrioritizedTaskProducer implements Runnable { - private SplittableRandom rand = new SplittableRandom(47); - private Queue queue; - private ExecutorService exec; - public PrioritizedTaskProducer( - Queue q, ExecutorService e) { +class Producer implements Runnable { + private static AtomicInteger seed = + new AtomicInteger(47); + private SplittableRandom rand = + new SplittableRandom(seed.getAndAdd(10)); + private Queue queue; + public Producer(Queue q) { queue = q; - exec = e; // Used for EndSentinel } @Override public void run() { - // Unbounded queue; never blocks. - // Fill it up fast with random priorities: - for(int i = 0; i < 20; i++) { - queue.add(new PrioritizedTask(rand.nextInt(10))); - new Nap(10); - } - // Trickle in highest-priority jobs: - for(int i = 0; i < 10; i++) { - new Nap(250); - queue.add(new PrioritizedTask(10)); - } - // Add jobs, lowest priority first: - for(int i = 0; i < 10; i++) - queue.add(new PrioritizedTask(i)); - // A sentinel to stop all the tasks: - queue.add(new PrioritizedTask.EndSentinel(exec)); - System.out.println( - "Finished PrioritizedTaskProducer"); + rand.ints(10, 0, 20) + .mapToObj(Prioritized::new) + .peek(p -> new Nap(rand.nextInt( + PriorityBlockingQueueDemo.NAPTIME))) + .forEach(p -> queue.add(p)); + queue.add(new Prioritized.EndSentinel()); } } -class PrioritizedTaskConsumer implements Runnable { - private PriorityBlockingQueue q; - public PrioritizedTaskConsumer( - PriorityBlockingQueue q) { +class Consumer implements Runnable { + private PriorityBlockingQueue q; + private SplittableRandom rand = + new SplittableRandom(47); + public + Consumer(PriorityBlockingQueue q) { this.q = q; } @Override public void run() { - try { - while(!Thread.interrupted()) - // Use current thread to run the task: - q.take().run(); - } catch(InterruptedException e) { - // Acceptable way to exit + while(true) { + try { + Prioritized pt = q.take(); + System.out.println(pt); + if(pt instanceof Prioritized.EndSentinel) { + pt.displaySequence(); + break; + } + new Nap(rand.nextInt( + PriorityBlockingQueueDemo.NAPTIME)); + } catch(InterruptedException e) { + throw new RuntimeException(e); + } } - System.out.println( - "Finished PrioritizedTaskConsumer"); } } public class PriorityBlockingQueueDemo { - public static void - main(String[] args) throws Exception { - ExecutorService es = Executors.newCachedThreadPool(); - PriorityBlockingQueue queue = + static final int NAPTIME = 50; + public static void main(String[] args) { + PriorityBlockingQueue queue = new PriorityBlockingQueue<>(); - es.execute(new PrioritizedTaskProducer(queue, es)); - es.execute(new PrioritizedTaskConsumer(queue)); + CompletableFuture.runAsync(new Producer(queue)); + CompletableFuture.runAsync(new Producer(queue)); + CompletableFuture.runAsync(new Producer(queue)); + CompletableFuture.runAsync(new Consumer(queue)) + .join(); } } -/* Output: (First and Last 12 Lines) -[9 ] Task 5 -[8 ] Task 7 -[10 ] Task 20 -[8 ] Task 8 -[10 ] Task 21 -[7 ] Task 4 -[10 ] Task 22 -[7 ] Task 19 -[10 ] Task 23 -[7 ] Task 11 -[10 ] Task 24 -[7 ] Task 1 -...________...________...________...________... -[0 ] Task 16 -(0:5)(1:7)(2:1)(3:0)(4:7) -(5:9)(6:6)(7:8)(8:8)(9:3) -(10:0)(11:7)(12:0)(13:5)(14:2) -(15:4)(16:0)(17:2)(18:1)(19:7) -(20:10)(21:10)(22:10)(23:10)(24:10) -(25:10)(26:10)(27:10)(28:10)(29:10) -(30:0)(31:1)(32:2)(33:3)(34:4) -(35:5)(36:6)(37:7)(38:8)(39:9) -(40:-1) -[-1 ] Task 40 Calling shutdownNow() -Finished PrioritizedTaskConsumer +/* Output: +[12] Prioritized 1 +[17] Prioritized 2 +[15] Prioritized 0 +[18] Prioritized 17 +[17] Prioritized 10 +[16] Prioritized 20 +[16] Prioritized 16 +[15] Prioritized 15 +[14] Prioritized 8 +[14] Prioritized 13 +[13] Prioritized 12 +[12] Prioritized 19 +[12] Prioritized 14 +[11] Prioritized 6 +[11] Prioritized 22 +[11] Prioritized 4 +[11] Prioritized 31 +[10] Prioritized 30 +[10] Prioritized 26 +[8] Prioritized 18 +[8] Prioritized 23 +[8] Prioritized 9 +[6] Prioritized 24 +[3] Prioritized 3 +[2] Prioritized 29 +[1] Prioritized 7 +[0] Prioritized 27 +[0] Prioritized 5 +[0] Prioritized 21 +[0] Prioritized 11 +[-1] Prioritized 25 +(0:15)(2:17)(1:12)(3:3)(4:11) +(5:0)(6:11)(7:1)(8:14)(9:8) +(10:17)(11:0)(12:13)(13:14)(14:12) +(15:15)(16:16)(17:18)(18:8)(19:12) +(20:16)(21:0)(22:11)(23:8)(24:6) +(25:-1)(26:10)(27:0)(28:-1)(29:2) +(30:10)(31:11)(32:-1) */