Tripal 0.3b
tripal_core/jobs.php
Go to the documentation of this file.
00001 <?php
00002 
00003 /** 
00004  * @defgroup tripal_jobs_api Core Module Jobs API
00005  * @{
00006  * Tripal offers a job management subsystem for managing tasks that may require an extended period of time for 
00007  * completion.  Drupal uses a UNIX-based cron job to handle tasks such as  checking  the  availability of updates, 
00008  * indexing new nodes for searching, etc.   Drupal's cron uses the web interface for launching these tasks, however, 
00009  * Tripal provides several administrative tasks that may time out and not complete due to limitations of the web 
00010  * server.  Examples including syncing of a large number of features between chado and Drupal.  To circumvent this, 
00011  * as well as provide more fine-grained control and monitoring, Tripal uses a jobs management sub-system built into 
00012  * the Tripal Core module.   It is anticipated that this functionality will be used for managing analysis jobs provided by 
00013  * future tools, with eventual support for distributed computing.   
00014  *
00015  * The  Tripal jobs management system allows administrators to submit tasks to be performed which can then  be 
00016  * launched through a UNIX command-line PHP script or cron job.  This command-line script can be added to a cron 
00017  * entry along-side the Drupal cron entry for automatic, regular launching of Tripal jobs.  The order of execution of 
00018  * waiting jobs is determined first by priority and second by the order the jobs were entered.  
00019  *
00020  * The API functions described below provide a programmatic interface for adding, checking and viewing jobs.
00021  * @}
00022  * @ingroup tripal_api
00023  */
00024  
00025 /**
00026  * Adds a job to the Tripal Jbo queue
00027  *
00028  * @param $job_name
00029  *    The human readable name for the job
00030  * @param $modulename
00031  *    The name of the module adding the job
00032  * @param $callback
00033  *    The name of a function to be called when the job is executed
00034  * @param $arguments
00035  *    An array of arguements to be passed on to the callback
00036  * @param $uid
00037  *    The uid of the user adding the job
00038  * @param $priority
00039  *    The priority at which to run the job where the highest priority is 10 and the lowest priority 
00040  *    is 1. The default priority is 10.
00041  *
00042  * @return
00043  *    The job_id of the registered job
00044  *
00045  * @ingroup tripal_jobs_api
00046  */
00047 function tripal_add_job ($job_name,$modulename,$callback,$arguments,$uid,$priority = 10){
00048 
00049    # convert the arguments into a string for storage in the database
00050    $args = implode("::",$arguments);
00051 
00052    $record = new stdClass();
00053    $record->job_name = $job_name;
00054    $record->modulename = $modulename;
00055    $record->callback = $callback;
00056    $record->status = 'Waiting';
00057    $record->submit_date = time();
00058    $record->uid = $uid;
00059    $record->priority = $priority;  # the lower the number the higher the priority
00060    if($args){
00061       $record->arguments = $args;
00062    }
00063    if(drupal_write_record('tripal_jobs',$record)){
00064       $jobs_url = url("admin/tripal/tripal_jobs");
00065       drupal_set_message(t("Job '$job_name' submitted.  Check the <a href='$jobs_url'>jobs page</a> for status"));
00066    } else {
00067       drupal_set_message("Failed to add job $job_name.");
00068    }
00069 
00070    return $record->job_id;
00071 }
00072 
00073 /**
00074  * An internal function for setting the progress for a current job
00075  *
00076  * @param $job_id
00077  *   The job_id to set the progress for
00078  * @param $percentage
00079  *   The progress to set the job to
00080  *
00081  * @return
00082  *   True on success and False otherwise
00083  *
00084  * @ingroup tripal_core
00085  */
00086 function tripal_job_set_progress($job_id,$percentage){
00087 
00088    if(preg_match("/^(\d\d|100)$/",$percentage)){
00089       $record = new stdClass();
00090       $record->job_id = $job_id; 
00091       $record->progress = $percentage;
00092     if(drupal_write_record('tripal_jobs',$record,'job_id')){
00093        return 1;
00094     }
00095    }
00096    return 0;
00097 }
00098 
00099 /**
00100  * Returns a list of jobs associated with the given module
00101  *
00102  * @param $modulename
00103  *    The module to return a list of jobs for
00104  *
00105  * @return
00106  *    An array of objects where each object describes a tripal job
00107  *
00108  * @ingroup tripal_jobs_api
00109  */
00110 function tripal_get_module_active_jobs ($modulename){
00111    $sql =  "SELECT * FROM {tripal_jobs} TJ ".
00112            "WHERE TJ.end_time IS NULL and TJ.modulename = '%s' ";
00113   return db_fetch_object(db_query($sql,$modulename));
00114 
00115 }
00116 /**
00117  * Returns the Tripal Job Report
00118  *
00119  * @return
00120  *   The HTML to be rendered which describes the job report
00121  *
00122  * @ingroup tripal_core
00123  */
00124 function tripal_jobs_report () {
00125    //$jobs = db_query("SELECT * FROM {tripal_jobs} ORDER BY job_id DESC");
00126    $jobs = pager_query(
00127       "SELECT TJ.job_id,TJ.uid,TJ.job_name,TJ.modulename,TJ.progress,
00128               TJ.status as job_status, TJ,submit_date,TJ.start_time,
00129               TJ.end_time,TJ.priority,U.name as username
00130        FROM {tripal_jobs} TJ 
00131          INNER JOIN users U on TJ.uid = U.uid 
00132        ORDER BY job_id DESC", 10,0,"SELECT count(*) FROM {tripal_jobs}");
00133   
00134    // create a table with each row containig stats for 
00135    // an individual job in the results set.
00136    $output .= "Waiting jobs are executed first by priority level (the lower the ".
00137               "number the higher the priority) and second by the order they ".
00138               "were entered";
00139    $output .= "<table class=\"tripal-table tripal-table-horz\">". 
00140               "  <tr>".
00141               "    <th>Job ID</th>".
00142               "    <th>User</th>".
00143               "    <th>Job Name</th>".
00144               "    <th nowrap>Dates</th>".             
00145            "    <th>Priority</th>".
00146            "    <th>Progress</th>".
00147               "    <th>Status</th>".
00148               "    <th>Actions</th>".
00149               "  </tr>";
00150    $i = 0;
00151    while($job = db_fetch_object($jobs)){
00152       $class = 'tripal-table-odd-row';
00153       if($i % 2 == 0 ){
00154          $class = 'tripal-table-even-row';
00155       }
00156       $submit = tripal_jobs_get_submit_date($job);
00157       $start = tripal_jobs_get_start_time($job);
00158       $end = tripal_jobs_get_end_time($job);
00159 
00160       $cancel_link = '';
00161       if($job->start_time == 0 and $job->end_time == 0){
00162          $cancel_link = "<a href=\"".url("admin/tripal/tripal_jobs/cancel/".$job->job_id)."\">Cancel</a><br>";
00163       }
00164       $rerun_link = "<a href=\"".url("admin/tripal/tripal_jobs/rerun/".$job->job_id)."\">Re-run</a><br>";
00165       $view_link ="<a href=\"".url("admin/tripal/tripal_jobs/view/".$job->job_id)."\">View</a>";
00166       $output .= "  <tr class=\"$class\">";
00167       $output .= "    <td>$job->job_id</td>".
00168                  "    <td>$job->username</td>".
00169                  "    <td>$job->job_name</td>".
00170                  "    <td nowrap>Submit Date: $submit".
00171                  "    <br>Start Time: $start".
00172                  "    <br>End Time: $end</td>".
00173                  "    <td>$job->priority</td>".
00174              "    <td>$job->progress%</td>".
00175                  "    <td>$job->job_status</td>".
00176                  "    <td>$cancel_link $rerun_link $view_link</td>".
00177                  "  </tr>";
00178       $i++;
00179    }
00180    $output .= "</table>";
00181   $output .= theme_pager();
00182    return $output;
00183 }
00184 
00185 /**
00186  * Returns the start time for a given job
00187  * 
00188  * @param $job
00189  *   An object describing the job
00190  *
00191  * @return
00192  *   The start time of the job if it was already run and either "Cancelled" or "Not Yet Started" otherwise
00193  *
00194  * @ingroup tripal_jobs_api
00195  */
00196 function tripal_jobs_get_start_time($job){
00197    if($job->start_time > 0){
00198       $start = format_date($job->start_time);
00199    } else {
00200       if(strcmp($job->job_status,'Cancelled')==0){
00201          $start = 'Cancelled';
00202       } else {
00203          $start = 'Not Yet Started';
00204       }
00205    }
00206    return $start;
00207 }
00208 
00209 /**
00210  * Returns the end time for a given job
00211  * 
00212  * @param $job
00213  *   An object describing the job
00214  *
00215  * @return
00216  *   The end time of the job if it was already run and empty otherwise
00217  *
00218  * @ingroup tripal_jobs_api
00219  */
00220 function tripal_jobs_get_end_time($job){
00221    if($job->end_time > 0){
00222       $end = format_date($job->end_time);
00223    } else {
00224       $end = '';
00225    }
00226    return $end;
00227 }
00228 
00229 /**
00230  * Returns the date the job was added to the queue
00231  *
00232  * @param $job
00233  *   An object describing the job
00234  *
00235  * @return
00236  *   The date teh job was submitted
00237  *
00238  * @ingroup tripal_jobs_api
00239  */
00240 function tripal_jobs_get_submit_date($job){
00241    return format_date($job->submit_date);
00242 }
00243 
00244 /**
00245  * A function used to manually launch all queued tripal jobs
00246  *
00247  * @param $do_parallel
00248  *   A boolean indicating whether jobs should be attempted to run in parallel
00249  *
00250  * @ingroup tripal_jobs_api
00251  */
00252 function tripal_jobs_launch ($do_parallel = 0){
00253    
00254    // first check if any jobs are currently running
00255    // if they are, don't continue, we don't want to have
00256    // more than one job script running at a time
00257    if(!$do_parallel and tripal_jobs_check_running()){
00258       return;
00259    }
00260    
00261    // get all jobs that have not started and order them such that
00262    // they are processed in a FIFO manner. 
00263    $sql =  "SELECT * FROM {tripal_jobs} TJ ".
00264            "WHERE TJ.start_time IS NULL and TJ.end_time IS NULL ".
00265            "ORDER BY priority ASC,job_id ASC";
00266    $job_res = db_query($sql);
00267    while($job = db_fetch_object($job_res)){
00268 
00269     // set the start time for this job
00270     $record = new stdClass();
00271     $record->job_id = $job->job_id;
00272     $record->start_time = time();
00273     $record->status = 'Running';
00274     $record->pid = getmypid();
00275     drupal_write_record('tripal_jobs',$record,'job_id');
00276 
00277     // call the function provided in the callback column.
00278     // Add the job_id as the last item in the list of arguments. All
00279     // callback functions should support this argument.
00280     $callback = $job->callback;
00281     $args = split("::",$job->arguments);
00282     $args[] = $job->job_id;
00283     print "Calling: $callback(" . implode(", ",$args) . ")\n";   
00284     call_user_func_array($callback,$args);
00285     
00286     // set the end time for this job
00287     $record->end_time = time();
00288     $record->status = 'Completed';
00289     $record->progress = '100';
00290     drupal_write_record('tripal_jobs',$record,'job_id');
00291     
00292     // send an email to the user advising that the job has finished
00293    }
00294 }
00295 
00296 /**
00297  * Returns a list of running tripal jobs
00298  *
00299  * @return
00300  *    and array of objects where each object describes a running job or false if no jobs are running
00301  *
00302  * @ingroup tripal_jobs_api
00303  */
00304 function tripal_jobs_check_running () {
00305    // iterate through each job that has not ended
00306    // and see if it is still running. If it is not
00307    // running but does not have an end_time then
00308    // set the end time and set the status to 'Error'
00309    $sql =  "SELECT * FROM {tripal_jobs} TJ ".
00310            "WHERE TJ.end_time IS NULL and NOT TJ.start_time IS NULL ";
00311    $jobs = db_query($sql);
00312    while($job = db_fetch_object($jobs)){
00313       if($job->pid and posix_kill($job->pid, 0)) {
00314          // the job is still running so let it go
00315        // we return 1 to indicate that a job is running
00316        print "Job is still running (pid $job->pid)\n";
00317        return 1;
00318       } else {
00319         // the job is not running so terminate it
00320         $record = new stdClass();
00321          $record->job_id = $job->job_id;
00322         $record->end_time = time();
00323          $record->status = 'Error';
00324         $record->error_msg = 'Job has terminated unexpectedly.';
00325          drupal_write_record('tripal_jobs',$record,'job_id');
00326      }
00327    }
00328    // return 1 to indicate that no jobs are currently running.
00329    return 0;
00330 }
00331 
00332 /**
00333  * Returns the HTML code to display a given job
00334  *
00335  * @param $job_id
00336  *   The job_id of the job to display
00337  * 
00338  * @return
00339  *   The HTML describing the indicated job
00340  * @ingroup tripal_core
00341  */
00342 function tripal_jobs_view ($job_id){
00343    return theme('tripal_core_job_view',$job_id);
00344 }
00345 
00346 /**
00347  * Registers variables for the tripal_core_job_view themeing function
00348  *
00349  * @param $variables
00350  *   An array containing all variables supplied to this template
00351  *
00352  * @ingroup tripal_core
00353  */
00354 function tripal_core_preprocess_tripal_core_job_view (&$variables){
00355    // get the job record
00356    $job_id = $variables['job_id'];
00357    $sql = 
00358       "SELECT TJ.job_id,TJ.uid,TJ.job_name,TJ.modulename,TJ.progress,
00359               TJ.status as job_status, TJ,submit_date,TJ.start_time,
00360               TJ.end_time,TJ.priority,U.name as username,TJ.arguments,
00361               TJ.callback,TJ.error_msg,TJ.pid
00362        FROM {tripal_jobs} TJ 
00363          INNER JOIN users U on TJ.uid = U.uid 
00364        WHERE TJ.job_id = %d";
00365    $job = db_fetch_object(db_query($sql,$job_id));
00366 
00367    // we do not know what the arguments are for and we want to provide a 
00368    // meaningful description to the end-user. So we use a callback function
00369    // deinfed in the module that created the job to describe in an array
00370    // the arguments provided.  If the callback fails then just use the 
00371    // arguments as they are
00372    $args = preg_split("/::/",$job->arguments);
00373    $arg_hook = $job->modulename."_job_describe_args";
00374    if(is_callable($arg_hook)){
00375       $new_args = call_user_func_array($arg_hook,array($job->callback,$args));
00376       if(is_array($new_args) and count($new_args)){
00377          $job->arguments = $new_args;
00378       } else {
00379          $job->arguments = $args;
00380       }
00381    } else {
00382       $job->arguments = $args;
00383    }
00384 
00385    // make our start and end times more legible
00386    $job->submit_date = tripal_jobs_get_submit_date($job);
00387    $job->start_time = tripal_jobs_get_start_time($job);
00388    $job->end_time = tripal_jobs_get_end_time($job);
00389 
00390    // add the job to the variables that get exported to the template
00391    $variables['job'] = $job;
00392 }
00393 /**
00394  * Set a job to be re-ran (ie: add it back into the job queue)
00395  * 
00396  * @param $job_id
00397  *   The job_id of the job to be re-ran
00398  * 
00399  * @ingroup tripal_jobs_api
00400  */
00401 function tripal_jobs_rerun ($job_id){
00402    global $user;
00403 
00404    $sql = "select * from {tripal_jobs} where job_id = %d";
00405    $job = db_fetch_object(db_query($sql,$job_id));
00406 
00407    $args = explode("::",$job->arguments);
00408    tripal_add_job ($job->job_name,$job->modulename,$job->callback,$args,$user->uid,
00409       $job->priority);
00410 
00411    drupal_goto("admin/tripal/tripal_jobs");
00412 }
00413 
00414 /**
00415  * Cancel a Tripal Job currently waiting in the job queue
00416  *
00417  * @param $job_id
00418  *   The job_id of the job to be cancelled
00419  *
00420  * @ingroup tripal_jobs_api
00421  */
00422 function tripal_jobs_cancel ($job_id){
00423    $sql = "select * from {tripal_jobs} where job_id = %d";
00424    $job = db_fetch_object(db_query($sql,$job_id));
00425 
00426    // set the end time for this job
00427    if($job->start_time == 0){
00428       $record = new stdClass();
00429       $record->job_id = $job->job_id;
00430      $record->end_time = time();
00431      $record->status = 'Cancelled';
00432      $record->progress = '0';
00433      drupal_write_record('tripal_jobs',$record,'job_id');
00434       drupal_set_message("Job #$job_id cancelled");
00435    } else {
00436       drupal_set_message("Job #$job_id cannot be cancelled. It is in progress or has finished.");
00437    }
00438    drupal_goto("admin/tripal/tripal_jobs");
00439 }
 All Classes Files Functions Variables