Tuesday, April 1, 2014

Producer Consumer in Java Threads and Implementing a Blocking queue.


ClassBlockingQueue

package sachi.test.datastructures;

import java.util.LinkedList;
import java.util.List;

public class BlockingQueue {
private List<Object> queue=new LinkedList<Object>();
private int queueLimit=10;
public BlockingQueue(int queueLimit ){
this.queueLimit=queueLimit;
}
public synchronized void enqueue(Object o) throws InterruptedException{
while(this.queue.size()==this.queueLimit){
wait();
}
if(this.queue.size()==0 || this.queue.size()< this.queueLimit ){
notifyAll();
}
System.out.println("Producer  :: "+o.toString());
this.queue.add(o);
}
public synchronized Object dequeue() throws InterruptedException{
while(this.queue.size()==0){
wait();
}
if(this.queue.size()==this.queueLimit || this.queue.size()> 0 ){
notifyAll();
}
return this.queue.remove(0);
}

}


ClassConsumer

package sachi.test.datastructures;

public class Consumer implements Runnable {

BlockingQueue bq;
Consumer(BlockingQueue bq){
this.bq=bq;
new Thread(this,"Consumer").start();
}
@Override
public void run() {
while(true){
try {
System.out.println("Consumer  :: "+ bq.dequeue().toString());
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}

}


ClassProducer

package sachi.test.datastructures;

public class Producer implements Runnable {

BlockingQueue bq;
Producer(BlockingQueue bq){
this.bq=bq;
new Thread(this, "Producer").start();
}
@Override
public void run() {
int i = 0;
  while (true) {
  try {
bq.enqueue(i++);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
  }
}


}


ClassThreadConsumerProducerTest

package sachi.test.datastructures;

public class ThreadConsumerProducerTest {

/**
* @param args
*/
public static void main(String[] args) {
BlockingQueue bq=new BlockingQueue(10);
new Producer(bq);
new Consumer(bq);

}


}

No comments:

Post a Comment