eu.xtreemos.xosd.execMng
Class ExecMng

java.lang.Object
  extended by eu.xtreemos.system.eventmachine.stage.AbstractStage
      extended by eu.xtreemos.system.eventmachine.stage.AbstractReceivingStage
          extended by eu.xtreemos.system.eventmachine.stage.Abstract2wayStage
              extended by eu.xtreemos.xosd.execMng.ExecMng
All Implemented Interfaces:
eu.xtreemos.system.eventmachine.queue.IEventHandler, eu.xtreemos.system.eventmachine.stage.IStage

public class ExecMng
extends eu.xtreemos.system.eventmachine.stage.Abstract2wayStage

Execution Manager is in charge of the processes running in his node It keeps a list of JobUnits indexed by JobId In each jobUnit is kept the information of the job (pids, jobMngAddr...)

Author:
martag, fguim

Nested Class Summary
 class ExecMng.ExecMngEvent
           
private  class ExecMng.JobsResInfo
          Class to keep callback information when obtaining information of the jobs running in the resource (getJobsResource) pendent is a counter of the remaining answers info is the string containing the information already obtained
 class ExecMng.startJobParams
           
 
Field Summary
(package private) static int BASIC
           
(package private) static int JOB_DEFINITION
           
private  java.util.Hashtable<java.lang.Integer,java.lang.String> jobPid
           
private  java.util.Hashtable<java.lang.String,JobUnit> jobUnits
           
private  java.util.Hashtable<java.lang.Integer,java.lang.Long> lastTimestamp
           
(package private) static org.apache.log4j.Logger logger
           
private  eu.xtreemos.xosd.utilities.metrics.JobMetrics metrics
           
private  eu.xtreemos.xosd.utilities.metrics.ProcMetricsData metricsData
           
private  java.util.Hashtable<ExecMng.ExecMngEvent,java.lang.Integer> PendingEvents
           
(package private) static int RESOURCES_ALLOCATED
           
(package private) static int RESOURCES_CONSUMED
           
(package private) static org.apache.log4j.Logger tracer
           
(package private) static int USER_METRICS
           
private  int XOS_SIG_CHLD
           
 
Fields inherited from class eu.xtreemos.system.eventmachine.stage.Abstract2wayStage
context, counter, curContext, sink
 
Fields inherited from class eu.xtreemos.system.eventmachine.stage.AbstractReceivingStage
queue
 
Fields inherited from class eu.xtreemos.system.eventmachine.stage.AbstractStage
handlerChain, handlerGroup, handlerThreads, name, running, serviceListeners
 
Constructor Summary
ExecMng()
          Constructor
 
Method Summary
 java.lang.Integer addJobMetric(java.lang.String jobId, eu.xtreemos.xosd.utilities.metrics.MetricsDesc metric)
           
 void assignProcessesToJobAtRestart(java.lang.String jobId, java.lang.String procListString)
           
 java.lang.Integer createProcess(java.lang.String jobId, java.lang.String JSDL, java.lang.String reservationId, CommunicationAddress resource, java.security.cert.X509Certificate userCtx)
          It is supposed to be "similar" to a fork but in XOS.
private  java.lang.Integer execJob(JobUnit ju)
           
private  int execJobParams(java.lang.String jobID, java.lang.String path, java.lang.String input, java.lang.String output, java.lang.String error, java.lang.String[] params, java.lang.String[] env_var, java.lang.String userCert)
          Execute Job with the given parameters
 void exitJob(java.lang.String jobId, java.lang.Integer exitValue)
           
 java.lang.String getHandledEventType()
           
 java.lang.String getJobInfoCB(java.lang.String info)
           
 java.lang.String getJobSelf(java.lang.Integer pid)
          Retunr the JobId of the calling process (identified by its pid)
 java.lang.String getJobsResource(java.security.cert.X509Certificate certificate)
          Return the information of the jobs running in this resource TODO Is that a required feature?
 void getProcessList(java.lang.String jobId, java.lang.String initialJobId, java.lang.String jsdlFile, java.lang.String jobUnitID, java.util.ArrayList<java.lang.String> dependentJobs, java.lang.String executable, CommunicationAddress jobCpAddr, CommunicationAddress superJobCpAddr, java.lang.String kernelCheckpointer, java.lang.String checkpointVersion, java.lang.String pidCGroupName, java.lang.String strategy, java.lang.String options, java.lang.String mode, java.security.cert.X509Certificate userCert)
          get a process from the job unit - for kernel checkpointer to determine process group it uses for cp/rst
 java.lang.String getProcsInfo(java.lang.String jobId, java.lang.Integer flags, java.lang.Integer infoLevel, java.util.ArrayList<java.lang.String> metricsList, java.lang.String user)
           
 java.util.ArrayList<java.lang.Integer> getProcsJob(java.lang.String jobId)
           
private  void getProcStatus(java.lang.Integer pid)
          Temporary monitoring daemon patch.
private  int getSIG_CHLD()
          Get the value of the SIG_CHLD
 void handleEvent(java.lang.Object event)
           
 void handleProcStatus(int pid, boolean enable)
           
private  java.lang.Integer inheritMetricsAndStart(JobUnit ju)
          This method is part of the startJob/startProcess workflow, whenever a new jobUnit is created.
 java.lang.Integer inheritMetricsCB(java.util.ArrayList<eu.xtreemos.xosd.utilities.metrics.MetricsDesc> metricsList)
          This method receives a list of metrics from jobMng, stores them in its corresponding structure for the job and sets initial values for some of them.
 void init()
           
private  void newEvent(java.lang.String jobId, java.lang.String event, int pid, int ppid, int exitValue)
          Receive a notification of a new event
private  int pidWait(int pid)
          Do the wait on the pid (prevent defunct)
private  void processEvent(java.lang.String jobId, java.lang.String event, int pid, int ppid, int exitValue)
          New event TODO: update this javadoc
 void rebuildJobUnit(java.lang.String jobId, java.lang.String initialJobId, java.lang.String jobUnitId, java.lang.String checkpointVersion, CommunicationAddress jobMngAddr, java.lang.String processGroupReferenceId, java.lang.String processGroupReferenceType, java.lang.String pidCGroupName, java.lang.String kernelCheckpointer, java.lang.String jsdlFile, java.lang.String input, java.lang.String output, java.lang.String cmd, java.lang.String error, java.security.cert.X509Certificate userCert)
           
 void removeBuffers(java.lang.String jobId)
          To be called from jobMng whenever it's cleaning time.
 java.lang.Integer removeJobMetric(java.lang.String jobId, java.lang.String metricName)
           
 void removeJobMetrics(java.lang.String jobId)
          JobUnit scope Removes metrics data, but leaves metadata and buffers.
 void removeProcMetrics(java.lang.String jobId, java.lang.Integer pid)
          Process scope Removes metrics data, but leaves metadata and buffers.
 java.lang.Object returnCB(java.lang.Object obj)
           
 java.lang.Object returnCBE(java.lang.Exception ex)
           
 java.lang.Integer sendEvent(java.lang.String jobId, java.lang.Integer signal)
           
private  int sendSignal(int pid, int signal)
          Send a signal to a process
private  void sendXOS_SIGCHLD(java.lang.String jobId)
           
 java.lang.Integer setMetricValue(java.lang.String jobId, java.lang.String metricName, java.lang.Integer pid, java.lang.String value)
           
 java.lang.Integer setMonitoringBuffering(java.lang.String jobId, java.lang.String metricName, java.lang.Integer pid, java.lang.Boolean enable, java.lang.String user)
          TODO update to setMonitorBuffering semantics, if required.
 java.lang.Integer startJob(java.lang.String jobId, CommunicationAddress jobMngAddr, java.lang.String command, java.lang.Object params_aux, java.lang.Object env_aux, java.lang.String output, java.lang.String error, java.lang.String input, java.security.cert.X509Certificate userCtx)
          Starts running a job already created
 java.lang.Integer startProcess(java.lang.String jobId, CommunicationAddress jobMngAddr, java.lang.String command, java.lang.Object params_aux, java.lang.Object env_aux, java.lang.String output, java.lang.String error, java.lang.String input, java.security.cert.X509Certificate userCtx)
          Starts a new process, might create a new jobUnit if it's the first one of the job in this resource.
private  void startReceiving()
          Initialize the system to receive events from the processes Must be called when starts the execMng
 void updateJobMetric(java.lang.String jobId, eu.xtreemos.xosd.utilities.metrics.MetricsDesc metric)
           
private  void waitJobEvent()
          Wait for an event from the running processes is a blocking call and must be called in a new thread
private  void waitJobsEvents()
          Wait for the events from the running processes this function never returns.
 
Methods inherited from class eu.xtreemos.system.eventmachine.stage.Abstract2wayStage
getContext, removeContext, SendException, SendException, SendException, SendReply, SendReply, SendReply, setSink
 
Methods inherited from class eu.xtreemos.system.eventmachine.stage.AbstractReceivingStage
dequeue, getSource
 
Methods inherited from class eu.xtreemos.system.eventmachine.stage.AbstractStage
addHandler, addHandler, addServiceListener, getName, getShortName, getThreadCount, notifyServiceInitialised, notifyServiceStarted, notifyServiceStopped, processEvent, removeHandler, removeServiceListener, setThreadCount, start, stop
 
Methods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
 

Field Detail

jobUnits

private java.util.Hashtable<java.lang.String,JobUnit> jobUnits

jobPid

private java.util.Hashtable<java.lang.Integer,java.lang.String> jobPid

PendingEvents

private java.util.Hashtable<ExecMng.ExecMngEvent,java.lang.Integer> PendingEvents

lastTimestamp

private java.util.Hashtable<java.lang.Integer,java.lang.Long> lastTimestamp

BASIC

static final int BASIC

JOB_DEFINITION

static final int JOB_DEFINITION

RESOURCES_ALLOCATED

static final int RESOURCES_ALLOCATED

RESOURCES_CONSUMED

static final int RESOURCES_CONSUMED

USER_METRICS

static final int USER_METRICS

metrics

private eu.xtreemos.xosd.utilities.metrics.JobMetrics metrics

metricsData

private eu.xtreemos.xosd.utilities.metrics.ProcMetricsData metricsData

logger

static final org.apache.log4j.Logger logger

tracer

static final org.apache.log4j.Logger tracer

XOS_SIG_CHLD

private int XOS_SIG_CHLD
Constructor Detail

ExecMng

public ExecMng()
Constructor

Method Detail

execJobParams

private int execJobParams(java.lang.String jobID,
                          java.lang.String path,
                          java.lang.String input,
                          java.lang.String output,
                          java.lang.String error,
                          java.lang.String[] params,
                          java.lang.String[] env_var,
                          java.lang.String userCert)
Execute Job with the given parameters

Parameters:
jobID - of the job
path - to the executable
input - path to the file for the standard input
output - path to the file for the standard output
error - path to the file for the standard error
params - parameters of the job
env_var - environment variables of the job
Returns:
exit status 0 OK

startReceiving

private void startReceiving()
Initialize the system to receive events from the processes Must be called when starts the execMng


waitJobEvent

private void waitJobEvent()
Wait for an event from the running processes is a blocking call and must be called in a new thread


sendSignal

private int sendSignal(int pid,
                       int signal)
Send a signal to a process

Parameters:
pid - of the process to send the signal
signal - number of signal to send to the process
Returns:
exit status 0 ok, else errno

getSIG_CHLD

private int getSIG_CHLD()
Get the value of the SIG_CHLD

Returns:
the int value of the SIG_CHLD

pidWait

private int pidWait(int pid)
Do the wait on the pid (prevent defunct)

Parameters:
pid - of the process to send the signal
Returns:
exit status of the pid

init

public void init()
Specified by:
init in interface eu.xtreemos.system.eventmachine.stage.IStage
Overrides:
init in class eu.xtreemos.system.eventmachine.stage.AbstractStage

waitJobsEvents

private void waitJobsEvents()
Wait for the events from the running processes this function never returns.


newEvent

private void newEvent(java.lang.String jobId,
                      java.lang.String event,
                      int pid,
                      int ppid,
                      int exitValue)
Receive a notification of a new event

Parameters:
jobId - of the job that owes the process that generated the event
event - code of the raised event: NP: New Process EP: Exited Process JF: Job Finished
pid - of the process that raised the event
exitValue - in the case of an Exited process or job Finished

processEvent

private void processEvent(java.lang.String jobId,
                          java.lang.String event,
                          int pid,
                          int ppid,
                          int exitValue)
New event TODO: update this javadoc

Parameters:
jobId - of the job that owes the process that generated the event
event - code of the raised event: NP: New Process EP: Exited Process JF: Job Finished
pid - of the process that raised the event
exitValue - in the case of an Exited process or job Finished

inheritMetricsCB

public java.lang.Integer inheritMetricsCB(java.util.ArrayList<eu.xtreemos.xosd.utilities.metrics.MetricsDesc> metricsList)
This method receives a list of metrics from jobMng, stores them in its corresponding structure for the job and sets initial values for some of them. It needs the newly created jobUnit in its context to execute the job afterwards. Should be called in this workflow: startJob/startProcess: new jobUnit? -> inheritMetrics -> JobMng(getMetricsByScope) -> this callback -> execJob

Parameters:
metricsList - an ArrayList of metrics that the job needs to handle.
Returns:
an Integer with the returnValue of execJob

inheritMetricsAndStart

private java.lang.Integer inheritMetricsAndStart(JobUnit ju)
This method is part of the startJob/startProcess workflow, whenever a new jobUnit is created. It asks jobMng for the metrics, the newly created jobUnit needs, and sets callback to inheritMetricsCB.

Parameters:
ju - a JobUnit describing the newly created job in this context
Returns:
an Integer, null and sets callback.

startJob

public java.lang.Integer startJob(java.lang.String jobId,
                                  CommunicationAddress jobMngAddr,
                                  java.lang.String command,
                                  java.lang.Object params_aux,
                                  java.lang.Object env_aux,
                                  java.lang.String output,
                                  java.lang.String error,
                                  java.lang.String input,
                                  java.security.cert.X509Certificate userCtx)
Starts running a job already created

Returns:
an Integer with the returnValue of execJob, it's not the pid.

execJob

private java.lang.Integer execJob(JobUnit ju)

startProcess

public java.lang.Integer startProcess(java.lang.String jobId,
                                      CommunicationAddress jobMngAddr,
                                      java.lang.String command,
                                      java.lang.Object params_aux,
                                      java.lang.Object env_aux,
                                      java.lang.String output,
                                      java.lang.String error,
                                      java.lang.String input,
                                      java.security.cert.X509Certificate userCtx)
Starts a new process, might create a new jobUnit if it's the first one of the job in this resource.

Returns:
an Integer with the returnValue of execJob, it's not the pid.

getProcsJob

public java.util.ArrayList<java.lang.Integer> getProcsJob(java.lang.String jobId)

addJobMetric

public java.lang.Integer addJobMetric(java.lang.String jobId,
                                      eu.xtreemos.xosd.utilities.metrics.MetricsDesc metric)

updateJobMetric

public void updateJobMetric(java.lang.String jobId,
                            eu.xtreemos.xosd.utilities.metrics.MetricsDesc metric)

removeJobMetric

public java.lang.Integer removeJobMetric(java.lang.String jobId,
                                         java.lang.String metricName)
Parameters:
jobId -
metricName -
Returns:
an Integer to be determined

removeJobMetrics

public void removeJobMetrics(java.lang.String jobId)
JobUnit scope Removes metrics data, but leaves metadata and buffers.

Parameters:
jobId -

removeProcMetrics

public void removeProcMetrics(java.lang.String jobId,
                              java.lang.Integer pid)
Process scope Removes metrics data, but leaves metadata and buffers.

Parameters:
jobId -
pid -

removeBuffers

public void removeBuffers(java.lang.String jobId)
To be called from jobMng whenever it's cleaning time. Removes buffers and metadata.

Parameters:
jobId -

setMetricValue

public java.lang.Integer setMetricValue(java.lang.String jobId,
                                        java.lang.String metricName,
                                        java.lang.Integer pid,
                                        java.lang.String value)
Parameters:
jobId -
metricName -
pid -
value -
Returns:
an Integer to be determined

setMonitoringBuffering

public java.lang.Integer setMonitoringBuffering(java.lang.String jobId,
                                                java.lang.String metricName,
                                                java.lang.Integer pid,
                                                java.lang.Boolean enable,
                                                java.lang.String user)
TODO update to setMonitorBuffering semantics, if required.

Parameters:
jobId -
metricName -
pid - should be null if it's not a process scope metric
enable -
user -
Returns:
an Integer to be determined

handleProcStatus

public void handleProcStatus(int pid,
                             boolean enable)

getProcStatus

private void getProcStatus(java.lang.Integer pid)
Temporary monitoring daemon patch. It dumps the newest information of /tmp/amond/PID to metricsData internal buffering structures. To control which information is new, it uses the lastTimestamp hashtable.

Parameters:
Integer - with the pid of the process to monitor

getProcsInfo

public java.lang.String getProcsInfo(java.lang.String jobId,
                                     java.lang.Integer flags,
                                     java.lang.Integer infoLevel,
                                     java.util.ArrayList<java.lang.String> metricsList,
                                     java.lang.String user)

sendEvent

public java.lang.Integer sendEvent(java.lang.String jobId,
                                   java.lang.Integer signal)

getJobsResource

public java.lang.String getJobsResource(java.security.cert.X509Certificate certificate)
Return the information of the jobs running in this resource TODO Is that a required feature? For now it needs to be run in a core node.

Returns:
String with the information of the running jobs

getJobInfoCB

public java.lang.String getJobInfoCB(java.lang.String info)

exitJob

public void exitJob(java.lang.String jobId,
                    java.lang.Integer exitValue)

createProcess

public java.lang.Integer createProcess(java.lang.String jobId,
                                       java.lang.String JSDL,
                                       java.lang.String reservationId,
                                       CommunicationAddress resource,
                                       java.security.cert.X509Certificate userCtx)
                                throws org.xml.sax.SAXException,
                                       java.io.IOException
It is supposed to be "similar" to a fork but in XOS. If JSDLPath is null we will use the one provided in createJob. If a reservationID is provided it will be used. Otherwise, if resourceID is provided it will be used. Otherwise, the process will be created locally.

Parameters:
jobId -
JSDL -
reservationId -
resource -
userCtx -
Returns:
an Integer to be determined TODO: Return the resource and pid of the new process
Throws:
org.xml.sax.SAXException
java.io.IOException

sendXOS_SIGCHLD

private void sendXOS_SIGCHLD(java.lang.String jobId)

returnCB

public java.lang.Object returnCB(java.lang.Object obj)

returnCBE

public java.lang.Object returnCBE(java.lang.Exception ex)
                           throws java.lang.Exception
Throws:
java.lang.Exception

getProcessList

public void getProcessList(java.lang.String jobId,
                           java.lang.String initialJobId,
                           java.lang.String jsdlFile,
                           java.lang.String jobUnitID,
                           java.util.ArrayList<java.lang.String> dependentJobs,
                           java.lang.String executable,
                           CommunicationAddress jobCpAddr,
                           CommunicationAddress superJobCpAddr,
                           java.lang.String kernelCheckpointer,
                           java.lang.String checkpointVersion,
                           java.lang.String pidCGroupName,
                           java.lang.String strategy,
                           java.lang.String options,
                           java.lang.String mode,
                           java.security.cert.X509Certificate userCert)
get a process from the job unit - for kernel checkpointer to determine process group it uses for cp/rst


rebuildJobUnit

public void rebuildJobUnit(java.lang.String jobId,
                           java.lang.String initialJobId,
                           java.lang.String jobUnitId,
                           java.lang.String checkpointVersion,
                           CommunicationAddress jobMngAddr,
                           java.lang.String processGroupReferenceId,
                           java.lang.String processGroupReferenceType,
                           java.lang.String pidCGroupName,
                           java.lang.String kernelCheckpointer,
                           java.lang.String jsdlFile,
                           java.lang.String input,
                           java.lang.String output,
                           java.lang.String cmd,
                           java.lang.String error,
                           java.security.cert.X509Certificate userCert)
                    throws java.lang.Exception
Parameters:
jobId -
jobMngAddr -
Throws:
java.lang.Exception

assignProcessesToJobAtRestart

public void assignProcessesToJobAtRestart(java.lang.String jobId,
                                          java.lang.String procListString)

getJobSelf

public java.lang.String getJobSelf(java.lang.Integer pid)
Retunr the JobId of the calling process (identified by its pid)

Parameters:
pid - of the calling process
Returns:
String containg the jobId

handleEvent

public void handleEvent(java.lang.Object event)
                 throws java.lang.Exception
Specified by:
handleEvent in interface eu.xtreemos.system.eventmachine.queue.IEventHandler
Specified by:
handleEvent in class eu.xtreemos.system.eventmachine.stage.AbstractReceivingStage
Throws:
java.lang.Exception

getHandledEventType

public java.lang.String getHandledEventType()
Specified by:
getHandledEventType in class eu.xtreemos.system.eventmachine.stage.AbstractReceivingStage