Class: BlockingQueue
package sachi.test.datastructures;
import java.util.LinkedList;
import java.util.List;
public class BlockingQueue {
private List<Object> queue=new LinkedList<Object>();
private int queueLimit=10;
public BlockingQueue(int queueLimit ){
this.queueLimit=queueLimit;
}
public synchronized void enqueue(Object o) throws InterruptedException{
while(this.queue.size()==this.queueLimit){
wait();
}
if(this.queue.size()==0 || this.queue.size()< this.queueLimit ){
notifyAll();
}
System.out.println("Producer :: "+o.toString());
this.queue.add(o);
}
public synchronized Object dequeue() throws InterruptedException{
while(this.queue.size()==0){
wait();
}
if(this.queue.size()==this.queueLimit || this.queue.size()> 0 ){
notifyAll();
}
return this.queue.remove(0);
}
}
Class: Consumer
package sachi.test.datastructures;
public class Consumer implements Runnable {
BlockingQueue bq;
Consumer(BlockingQueue bq){
this.bq=bq;
new Thread(this,"Consumer").start();
}
@Override
public void run() {
while(true){
try {
System.out.println("Consumer :: "+ bq.dequeue().toString());
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
}
Class: Producer
package sachi.test.datastructures;
public class Producer implements Runnable {
BlockingQueue bq;
Producer(BlockingQueue bq){
this.bq=bq;
new Thread(this, "Producer").start();
}
@Override
public void run() {
int i = 0;
while (true) {
try {
bq.enqueue(i++);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
}
Class: ThreadConsumerProducerTest
package sachi.test.datastructures;
public class ThreadConsumerProducerTest {
/**
* @param args
*/
public static void main(String[] args) {
BlockingQueue bq=new BlockingQueue(10);
new Producer(bq);
new Consumer(bq);
}
}
No comments:
Post a Comment