001 /*
002 *
003 * All content copyright Terracotta, Inc., unless otherwise indicated. All rights reserved.
004 *
005 */
006 package demo.sharedqueue;
007
008 import java.net.InetAddress;
009 import java.net.UnknownHostException;
010 import java.util.Collections;
011 import java.util.LinkedList;
012 import java.util.List;
013 import java.util.ListIterator;
014
015 import com.tcclient.cluster.DsoNode;
016
017 public class Queue {
018
019 private static final int MAX_HISTORY_LENGTH = 15;
020 private static final int MAX_QUEUE_LENGTH = 150;
021
022 private List<Job> queue = Collections.synchronizedList(new LinkedList<Job>());
023 private List<Worker> workers = Collections.synchronizedList(new LinkedList<Worker>());
024 private List<Job> completedJobs = Collections.synchronizedList(new LinkedList<Job>());
025
026 private int nextJobId;
027 private int port;
028
029 public Queue(final int port) {
030 this.port = port;
031 this.nextJobId = 1;
032 }
033
034 public final Job getJob() {
035 synchronized (queue) {
036 while (queue.size() == 0) {
037 try {
038 queue.wait();
039 } catch (InterruptedException e) {
040 throw new RuntimeException(e);
041 }
042 }
043 return queue.remove(0);
044 }
045 }
046
047 public final String getXmlData() {
048 // the list of jobs in the queue
049 String data = "<workqueue>";
050 synchronized (queue) {
051 for (Job job : queue) {
052 data += job.toXml();
053 }
054 }
055 data += "</workqueue>";
056
057 // the list of completed jobs
058 data += "<completed>";
059 synchronized (completedJobs) {
060 for (Job job : completedJobs) {
061 data += job.toXml();
062 }
063 }
064 data += "</completed>";
065
066 // the list of registered job consumers
067 data += "<consumers>";
068 synchronized (workers) {
069 for (Worker worker : workers) {
070 data += worker.toXml();
071 }
072 }
073 data += "</consumers>";
074 return data;
075 }
076
077 public final Worker createWorker(final DsoNode node) {
078 synchronized (workers) {
079 Worker worker = new Worker(this, port, node);
080 workers.add(worker);
081 Thread t = new Thread(worker);
082 t.setDaemon(true);
083 t.start();
084 return worker;
085 }
086 }
087
088 public final Worker getWorker(final DsoNode node) {
089 synchronized (workers) {
090 for (Worker worker : workers) {
091 if (worker.getNode().equals(node)) {
092 return worker;
093 }
094 }
095 }
096 return null;
097 }
098
099 public final void log(final Job job) {
100 synchronized (completedJobs) {
101 completedJobs.add(0, job);
102 if (completedJobs.size() > MAX_HISTORY_LENGTH) {
103 completedJobs.remove(completedJobs.size() - 1);
104 }
105 }
106 }
107
108 public final void reap() {
109 synchronized (workers) {
110 ListIterator<Worker> i = workers.listIterator();
111 while (i.hasNext()) {
112 Worker worker = i.next();
113 if (worker.expire()) {
114 i.remove();
115 }
116 }
117 }
118 }
119
120 public final void addJob() {
121 synchronized (queue) {
122 if (queue.size() < MAX_QUEUE_LENGTH) {
123 Job job = new Job(Queue.getHostName() + " " + port, nextJobId);
124 nextJobId = nextJobId < 999 ? nextJobId + 1 : 1;
125 queue.add(job);
126 queue.notifyAll();
127 }
128 }
129 }
130
131 public final void addJob(final Job job) {
132 synchronized (queue) {
133 queue.add(job);
134 queue.notifyAll();
135 }
136 }
137
138 public final static String getHostName() {
139 try {
140 final InetAddress addr = InetAddress.getLocalHost();
141 return addr.getHostName();
142 } catch (UnknownHostException e) {
143 return "Unknown";
144 }
145 }
146 }
|