Posts Java Multithreading - Producer/Consumer
Post
Cancel

Java Multithreading - Producer/Consumer

This following post is based on the Udemy Course: Java Multithreading


Blocking Queue (thread-safe queue implementation)

A Queue that additionally supports operations that wait for the queue to become non-empty when retrieving an element, and wait for space to become available in the queue when storing an element.

When retrieving an element from a BlockingQueue, the take() method is used, which blocks the calling thread until an element becomes available in the queue. If the queue is empty, the thread will remain blocked until another thread inserts an item.

When storing an element in a BlockingQueue, the put() method is used. If the queue is full, the put() method will block the calling thread until space becomes available in the queue, either by other threads dequeuing elements or by clearing the queue completely.

Oracle Doc

Summary of BlockingQueue methods

BlockingQueue methods come in four forms, with different ways of handling operations that cannot be satisfied immediately, but may be satisfied at some point in the future: one throws an exception, the second returns a special value (either null or false, depending on the operation), the third blocks the current thread indefinitely until the operation can succeed, and the fourth blocks for only a given maximum time limit before giving up.

OperationThrows exceptionSpecial valueBlocksTimes out
Insertadd(e)offer(e)put(e)offer(e, time, unit)
Removeremove()poll()take()poll(time, unit)
Examineelement()peek()N/AN/A

Producer / Consumer with Blocking Queue

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
package com.threads;

import java.util.Random;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;

public class App {
 
 private static BlockingQueue<Integer> blockingQueue = new ArrayBlockingQueue<>(10);
 
 public static void main(String[] args) throws InterruptedException {
  
  //producer thread
  Thread t1 = new Thread(new Runnable() {
   
   @Override
   public void run() {
    try {
     producer();
    } catch (InterruptedException e) {
     e.printStackTrace();
    }
   }
  });
  
  //consumer thread
  Thread t2 = new Thread(new Runnable() {
   
   @Override
   public void run() {
    try {
     consumer();
    } catch (InterruptedException e) {
     e.printStackTrace();
    }
   }
  });
  
  //both these threads have a while loop so they will continue running indefinitely
  t1.start();
  t2.start();
  
  //waits for this thread to die. 
  t1.join();
  t2.join();
  
 }
 
 //sleep for random amount of time between 0-200 miliseconds and then add an item to the blocking queue
 private static void producer() throws InterruptedException {
  
  Random random = new Random();
  
  while(true) {

   //blockingQueue.put can cause InterruptedException
   Thread.sleep(random.nextInt(200));
   
   blockingQueue.put(random.nextInt(100));
  }
  
 }

 //sleep for random amount of time between 0-50 miliseconds and then take an item from the blocking queue
 private static void consumer() throws InterruptedException {
  
  Random random = new Random();
  
  while(true) {
   
   Thread.sleep(random.nextInt(50));
   
   if(random.nextInt(5) == 0) {

    Integer val = blockingQueue.take();
    System.out.println("Taken: " + val + "; Queue Size: " + blockingQueue.size());

   }
   
  }
  
 }
}

SAMPLE OUTPUT:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
Taken: 11; Queue Size: 0
Taken: 15; Queue Size: 0
Taken: 63; Queue Size: 1
Taken: 25; Queue Size: 0
Taken: 6; Queue Size: 0
Taken: 49; Queue Size: 0
Taken: 41; Queue Size: 3
Taken: 95; Queue Size: 2
Taken: 53; Queue Size: 2
Taken: 79; Queue Size: 3
Taken: 85; Queue Size: 2
Taken: 71; Queue Size: 4
Taken: 76; Queue Size: 3
Taken: 39; Queue Size: 3
Taken: 82; Queue Size: 2
Taken: 73; Queue Size: 1
Taken: 85; Queue Size: 0
Taken: 50; Queue Size: 2
Taken: 13; Queue Size: 3
Taken: 21; Queue Size: 6
Taken: 93; Queue Size: 5
Taken: 57; Queue Size: 5
Taken: 2; Queue Size: 4
Taken: 55; Queue Size: 5
Taken: 31; Queue Size: 4
Taken: 71; Queue Size: 4
Taken: 94; Queue Size: 7
Taken: 45; Queue Size: 7
Taken: 37; Queue Size: 7
Taken: 6; Queue Size: 8
Taken: 48; Queue Size: 9
Taken: 61; Queue Size: 9
Taken: 60; Queue Size: 9
Taken: 63; Queue Size: 9
Taken: 14; Queue Size: 9
Taken: 59; Queue Size: 9
Taken: 46; Queue Size: 8
Taken: 28; Queue Size: 7
Taken: 2; Queue Size: 6
Taken: 4; Queue Size: 5
This post is licensed under CC BY 4.0 by the author.