Queue.java
001 /*
002  *
003  * All content copyright Terracotta, Inc., unless otherwise indicated. All rights reserved.
004  *
005  */
006 package demo.sharedqueue;
007 
008 import java.net.InetAddress;
009 import java.net.UnknownHostException;
010 import java.util.Collections;
011 import java.util.LinkedList;
012 import java.util.List;
013 import java.util.ListIterator;
014 
015 import com.tcclient.cluster.DsoNode;
016 
017 public class Queue {
018 
019    private static final int MAX_HISTORY_LENGTH = 15;
020    private static final int MAX_QUEUE_LENGTH = 150;
021 
022    private List<Job> queue = Collections.synchronizedList(new LinkedList<Job>());
023    private List<Worker> workers = Collections.synchronizedList(new LinkedList<Worker>());
024    private List<Job> completedJobs = Collections.synchronizedList(new LinkedList<Job>());
025 
026    private int nextJobId;
027    private int port;
028 
029    public Queue(final int port) {
030       this.port = port;
031       this.nextJobId = 1;
032    }
033 
034    public final Job getJob() {
035       synchronized (queue) {
036          while (queue.size() == 0) {
037             try {
038                queue.wait();
039             catch (InterruptedException e) {
040                throw new RuntimeException(e);
041             }
042          }
043          return queue.remove(0);
044       }
045    }
046 
047    public final String getXmlData() {
048       // the list of jobs in the queue
049       String data = "<workqueue>";
050       synchronized (queue) {
051          for (Job job : queue) {
052             data += job.toXml();
053          }
054       }
055       data += "</workqueue>";
056 
057       // the list of completed jobs
058       data += "<completed>";
059       synchronized (completedJobs) {
060          for (Job job : completedJobs) {
061             data += job.toXml();
062          }
063       }
064       data += "</completed>";
065 
066       // the list of registered job consumers
067       data += "<consumers>";
068       synchronized (workers) {
069          for (Worker worker : workers) {
070             data += worker.toXml();
071          }
072       }
073       data += "</consumers>";
074       return data;
075    }
076 
077    public final Worker createWorker(final DsoNode node) {
078       synchronized (workers) {
079          Worker worker = new Worker(this, port, node);
080          workers.add(worker);
081          Thread t = new Thread(worker);
082          t.setDaemon(true);
083          t.start();
084          return worker;
085       }
086    }
087 
088    public final Worker getWorker(final DsoNode node) {
089       synchronized (workers) {
090          for (Worker worker : workers) {
091             if (worker.getNode().equals(node)) {
092                return worker;
093             }
094          }
095       }
096       return null;
097    }
098 
099    public final void log(final Job job) {
100       synchronized (completedJobs) {
101          completedJobs.add(0, job);
102          if (completedJobs.size() > MAX_HISTORY_LENGTH) {
103             completedJobs.remove(completedJobs.size() 1);
104          }
105       }
106    }
107 
108    public final void reap() {
109       synchronized (workers) {
110          ListIterator<Worker> i = workers.listIterator();
111          while (i.hasNext()) {
112             Worker worker = i.next();
113             if (worker.expire()) {
114                i.remove();
115             }
116          }
117       }
118    }
119 
120    public final void addJob() {
121       synchronized (queue) {
122          if (queue.size() < MAX_QUEUE_LENGTH) {
123             Job job = new Job(Queue.getHostName() " " + port, nextJobId);
124             nextJobId = nextJobId < 999 ? nextJobId + 1;
125             queue.add(job);
126             queue.notifyAll();
127          }
128       }
129    }
130 
131    public final void addJob(final Job job) {
132       synchronized (queue) {
133          queue.add(job);
134          queue.notifyAll();
135       }
136    }
137 
138    public final static String getHostName() {
139       try {
140          final InetAddress addr = InetAddress.getLocalHost();
141          return addr.getHostName();
142       catch (UnknownHostException e) {
143          return "Unknown";
144       }
145    }
146 }