Worker.java
001 /*
002  *
003  * All content copyright Terracotta, Inc., unless otherwise indicated. All rights reserved.
004  *
005  */
006 package demo.sharedqueue;
007 
008 import java.util.Collections;
009 import java.util.LinkedList;
010 import java.util.List;
011 
012 import com.tcclient.cluster.DsoNode;
013 
014 class Worker implements Runnable {
015 
016    private final static int HEALTH_ALIVE = 0;
017    private final static int HEALTH_DYING = 1;
018    private final static int HEALTH_DEAD = 2;
019    private final static int MAX_LOAD = 10;
020 
021    private final String name;
022    private final int port;
023    private final Queue queue;
024    private final List<Job> jobs;
025    private final DsoNode node;
026 
027    private int health = HEALTH_ALIVE;
028 
029    public Worker(final Queue queue, final int port, final DsoNode node) {
030       this.name = Queue.getHostName();
031       this.port = port;
032       this.queue = queue;
033       this.node = node;
034       jobs = Collections.synchronizedList(new LinkedList<Job>());
035    }
036 
037    public final DsoNode getNode() {
038       return node;
039    }
040 
041    public final String getName() {
042       return "node: " + node + " (" + name + ":" + port + ")";
043    }
044 
045    public final String toXml() {
046       synchronized (jobs) {
047          String data = "<worker><name>" + getName() "</name><jobs>";
048          for (Job job : jobs) {
049             data += job.toXml();
050          }
051          data += "</jobs></worker>";
052          return data;
053       }
054    }
055 
056    /**
057     * Attempt to mark the Worker as dead (if it's already dying); Note that we
058     * synchronize this method since it's mutating a shared object (this class)
059     *
060     @return True if the Worker is dead.
061     */
062    public final synchronized boolean expire() {
063       if (HEALTH_DYING == health) {
064          // a dying Worker wont die until it has
065          // consumed all of it's jobs
066          if (jobs.size() 0) {
067             queue.addJob(jobs.remove(0));
068          else {
069             setHealth(HEALTH_DEAD);
070          }
071       }
072       return (HEALTH_DEAD == health);
073    }
074 
075    /**
076     * Set the state of the Worker's health; Note that we synchronize this
077     * method since it's mutating a shared object (this class)
078     *
079     @param health
080     */
081    private final synchronized void setHealth(final int health) {
082       this.health = health;
083    }
084 
085    /**
086     * Set the state of the Worker's health to dying; Note that we synchronize
087     * this method since it's mutating a shared object (this class)
088     *
089     @param health
090     */
091    public final synchronized void markForExpiration() {
092       setHealth(HEALTH_DYING);
093    }
094 
095    public final void run() {
096       while (HEALTH_DEAD != health) {
097          if ((HEALTH_ALIVE == health&& (jobs.size() < MAX_LOAD)) {
098             final Job job = queue.getJob();
099 
100             try {
101                Thread.sleep(500);
102             catch (InterruptedException ie) {
103                System.err.println(ie.getMessage());
104             }
105 
106             synchronized (jobs) {
107                jobs.add(job);
108             }
109 
110             Thread processor = new Thread(new Runnable() {
111                public void run() {
112                   job.run(Worker.this);
113                   synchronized (jobs) {
114                      jobs.remove(job);
115                   }
116                   queue.log(job);
117                }
118             });
119             processor.start();
120          }
121       }
122    }
123 }