001    /*
002     * CDDL HEADER START
003     *
004     * The contents of this file are subject to the terms of the
005     * Common Development and Distribution License, Version 1.0 only
006     * (the "License").  You may not use this file except in compliance
007     * with the License.
008     *
009     * You can obtain a copy of the license at
010     * trunk/opends/resource/legal-notices/OpenDS.LICENSE
011     * or https://OpenDS.dev.java.net/OpenDS.LICENSE.
012     * See the License for the specific language governing permissions
013     * and limitations under the License.
014     *
015     * When distributing Covered Code, include this CDDL HEADER in each
016     * file and include the License file at
017     * trunk/opends/resource/legal-notices/OpenDS.LICENSE.  If applicable,
018     * add the following below this CDDL HEADER, with the fields enclosed
019     * by brackets "[]" replaced with your own identifying information:
020     *      Portions Copyright [yyyy] [name of copyright owner]
021     *
022     * CDDL HEADER END
023     *
024     *
025     *      Copyright 2006-2008 Sun Microsystems, Inc.
026     */
027    package org.opends.server.backends.task;
028    import org.opends.messages.Message;
029    
030    
031    
032    import org.opends.server.api.DirectoryThread;
033    import org.opends.server.loggers.debug.DebugTracer;
034    import org.opends.server.types.DebugLogLevel;
035    
036    import static org.opends.server.loggers.debug.DebugLogger.*;
037    import static org.opends.server.loggers.ErrorLogger.*;
038    import static org.opends.messages.BackendMessages.*;
039    
040    import static org.opends.server.util.StaticUtils.*;
041    
042    import java.util.Map;
043    
044    
045    
046    /**
047     * This class defines a thread that will be used to execute a scheduled task
048     * within the server and provide appropriate notification that the task is
049     * complete.
050     */
051    public class TaskThread
052           extends DirectoryThread
053    {
054      /**
055       * The tracer object for the debug logger.
056       */
057      private static final DebugTracer TRACER = getTracer();
058    
059    
060    
061      // Indicates whether a request has been made for this thread to exit.
062      private boolean exitRequested;
063    
064      // The thread ID for this task thread.
065      private int threadID;
066    
067      // The reference to the scheduler with which this thread is associated.
068      private TaskScheduler taskScheduler;
069    
070      // The object that will be used for signaling the thread when there is new
071      // work to perform.
072      private Object notifyLock;
073    
074    
075    
076      /**
077       * Creates a new task thread with the provided information.
078       *
079       * @param  taskScheduler  The reference to the task scheduler with which this
080       *                        thread is associated.
081       * @param  threadID       The ID assigned to this task thread.
082       */
083      public TaskThread(TaskScheduler taskScheduler, int threadID)
084      {
085        super("Task Thread " + threadID);
086    
087    
088        this.taskScheduler = taskScheduler;
089        this.threadID      = threadID;
090    
091        notifyLock    = new Object();
092        exitRequested = false;
093    
094        setAssociatedTask(null);
095      }
096    
097    
098    
099      /**
100       * Retrieves the task currently being processed by this thread, if it is
101       * active.
102       *
103       * @return  The task currently being processed by this thread, or
104       *          <CODE>null</CODE> if it is not processing any task.
105       */
106      public Task getTask()
107      {
108        return getAssociatedTask();
109      }
110    
111    
112    
113      /**
114       * Provides a new task for processing by this thread.  This does not do any
115       * check to ensure that no task is already in process.
116       *
117       * @param  task  The task to be processed.
118       */
119      public void setTask(Task task)
120      {
121        setAssociatedTask(task);
122    
123        synchronized (notifyLock)
124        {
125          notifyLock.notify();
126        }
127      }
128    
129    
130    
131      /**
132       * Attempts to interrupt processing on the task in progress.
133       *
134       * @param  interruptState   The state to use for the task if it is
135       *                          successfully interrupted.
136       * @param  interruptReason  The human-readable reason that the task is to be
137       *                          interrupted.
138       * @param  exitThread       Indicates whether this thread should exit when
139       *                          processing on the active task has completed.
140       */
141      public void interruptTask(TaskState interruptState, Message interruptReason,
142                                boolean exitThread)
143      {
144        if (getAssociatedTask() != null)
145        {
146          try
147          {
148            getAssociatedTask().interruptTask(interruptState, interruptReason);
149          }
150          catch (Exception e)
151          {
152            if (debugEnabled())
153            {
154              TRACER.debugCaught(DebugLogLevel.ERROR, e);
155            }
156          }
157        }
158    
159        if (exitThread)
160        {
161          exitRequested = true;
162        }
163      }
164    
165    
166    
167      /**
168       * Operates in a loop, sleeping until there is no work to do, then
169       * processing the task and returning to the scheduler for more work.
170       */
171      public void run()
172      {
173        while (! exitRequested)
174        {
175          if (getAssociatedTask() == null)
176          {
177            try
178            {
179              synchronized (notifyLock)
180              {
181                notifyLock.wait(5000);
182              }
183            }
184            catch (InterruptedException ie)
185            {
186              if (debugEnabled())
187              {
188                TRACER.debugCaught(DebugLogLevel.ERROR, ie);
189              }
190            }
191    
192            continue;
193          }
194    
195          try
196          {
197            TaskState returnState = getAssociatedTask().execute();
198            getAssociatedTask().setTaskState(returnState);
199          }
200          catch (Exception e)
201          {
202            if (debugEnabled())
203            {
204              TRACER.debugCaught(DebugLogLevel.ERROR, e);
205            }
206    
207            Task task = getAssociatedTask();
208    
209            Message message = ERR_TASK_EXECUTE_FAILED.
210                get(String.valueOf(task.getTaskEntry().getDN()),
211                    stackTraceToSingleLineString(e));
212            logError(message);
213            task.setTaskState(TaskState.STOPPED_BY_ERROR);
214          }
215    
216          Task completedTask = getAssociatedTask();
217          setAssociatedTask(null);
218          if (! taskScheduler.threadDone(this, completedTask))
219          {
220            exitRequested = true;
221            break;
222          }
223        }
224    
225        if (getAssociatedTask() != null)
226        {
227          Task task = getAssociatedTask();
228          task.setTaskState(TaskState.STOPPED_BY_SHUTDOWN);
229          taskScheduler.threadDone(this, task);
230        }
231      }
232    
233    
234    
235      /**
236       * Retrieves any relevent debug information with which this tread is
237       * associated so they can be included in debug messages.
238       *
239       * @return debug information about this thread as a string.
240       */
241      public Map<String, String> getDebugProperties()
242      {
243        Map<String, String> properties = super.getDebugProperties();
244    
245        if (getAssociatedTask() != null)
246        {
247          properties.put("task", getAssociatedTask().toString());
248        }
249    
250        return properties;
251      }
252    }
253