Posts Java Multithreading - Producer/Consumer (Using Low-Level Synchronization)
Post
Cancel

Java Multithreading - Producer/Consumer (Using Low-Level Synchronization)

This following post is based on the Udemy Course: Java Multithreading


In the updated version of the application, the main app remains unchanged. It creates two threads: one for the producer and another for the consumer. However, instead of using a high-level data structure like a BlockingQueue, we will implement low-level synchronization using the wait() and notify() methods.

The producer thread will produce items and notify the consumer when an item is available. If the consumer thread tries to consume an item when none are available, it will wait until the producer notifies it.

This approach allows for more fine-grained control over synchronization but requires explicit handling of thread coordination. By using the wait() and notify() methods, we ensure that the consumer waits patiently until an item is produced, and the producer notifies the consumer when it has produced an item.

Processor.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
package com.threads;

import java.util.LinkedList;
import java.util.Random;

public class Processor {

 //FIFO
 LinkedList<Integer> list = new LinkedList<>();

 //we will maintain max 10 elements in the queue
 private static final int LIMIT = 10;
 
 //used for synchronization between the producer and consumer threads
 Object lock = new Object();
 
 public void produce() throws InterruptedException {
  
  Random random = new Random();
  
  int value = 0;
  
  //run forever
  while(true) {
   
   //get intrinsic lock on the "lock" object
   synchronized (lock) {
    
    //if the size has reached the limit, wait for notification
    while(list.size() == LIMIT) {
     lock.wait();
    }
    
    //code will reach here when size of the queue is below its limit
    //add element to the queue
    list.add(value);
    
    //increase the value to  be added next time
    value++;
    System.out.println("Producer added " + value + " to the list; ListSize: " + list.size());
    
    //notify the waiting thread
    //wakes up a single thread that is waiting on this object's monitor. 
    lock.notify();
    
   }
   
   //sleep for random amount of time between [0-49]
   Thread.sleep(random.nextInt(50));
   
  }
  
 }
  
 
 public void consume() throws InterruptedException {
  
  Random random = new Random();
  
  //run forever
  while(true) {
   
   //get the intrinsic lock on the "lock" object
   synchronized (lock) {
    
    //if the size of the queue is 0, wait for notification
    while(list.size() == 0) {
     lock.wait();
    }
    
    //we can reach here only when list size is not 0
    //remove the first element from the queue
    int value = list.removeFirst();
    
    System.out.println("Consumed: " + value + "; LisSize: " + list.size());
   
    //notify the other thread
    //wakes up a single thread that is waiting on this object's monitor. 
    lock.notify();
    
   }
   
   //sleep for random amount of time between [0-59]
   Thread.sleep(random.nextInt(60));
   
  }
 }
 
}

App.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
package com.threads;

public class App {
 
 public static void main(String[] args){
   
  Processor processor = new Processor();
  
  //producer thread
  Thread t1 = new Thread(new Runnable() {
   
   @Override
   public void run() {
    try {
     processor.produce();
    } catch (InterruptedException e) {
     e.printStackTrace();
    }
   }
  });
  
  //consumer thread
  Thread t2 = new Thread(new Runnable() {
   
   @Override
   public void run() {
    try {
     processor.consume();
    } catch (InterruptedException e) {
     e.printStackTrace();
    }
   }
  });
  
  t1.start();
  t2.start();
  
  try {
   t1.join();
   t2.join();
  } catch (InterruptedException e) {
   e.printStackTrace();
  }
  
 }

}
This post is licensed under CC BY 4.0 by the author.