|
Tripal 0.3b
|
00001 <?php 00002 00003 /** 00004 * @defgroup tripal_jobs_api Core Module Jobs API 00005 * @{ 00006 * Tripal offers a job management subsystem for managing tasks that may require an extended period of time for 00007 * completion. Drupal uses a UNIX-based cron job to handle tasks such as checking the availability of updates, 00008 * indexing new nodes for searching, etc. Drupal's cron uses the web interface for launching these tasks, however, 00009 * Tripal provides several administrative tasks that may time out and not complete due to limitations of the web 00010 * server. Examples including syncing of a large number of features between chado and Drupal. To circumvent this, 00011 * as well as provide more fine-grained control and monitoring, Tripal uses a jobs management sub-system built into 00012 * the Tripal Core module. It is anticipated that this functionality will be used for managing analysis jobs provided by 00013 * future tools, with eventual support for distributed computing. 00014 * 00015 * The Tripal jobs management system allows administrators to submit tasks to be performed which can then be 00016 * launched through a UNIX command-line PHP script or cron job. This command-line script can be added to a cron 00017 * entry along-side the Drupal cron entry for automatic, regular launching of Tripal jobs. The order of execution of 00018 * waiting jobs is determined first by priority and second by the order the jobs were entered. 00019 * 00020 * The API functions described below provide a programmatic interface for adding, checking and viewing jobs. 00021 * @} 00022 * @ingroup tripal_api 00023 */ 00024 00025 /** 00026 * Adds a job to the Tripal Jbo queue 00027 * 00028 * @param $job_name 00029 * The human readable name for the job 00030 * @param $modulename 00031 * The name of the module adding the job 00032 * @param $callback 00033 * The name of a function to be called when the job is executed 00034 * @param $arguments 00035 * An array of arguements to be passed on to the callback 00036 * @param $uid 00037 * The uid of the user adding the job 00038 * @param $priority 00039 * The priority at which to run the job where the highest priority is 10 and the lowest priority 00040 * is 1. The default priority is 10. 00041 * 00042 * @return 00043 * The job_id of the registered job 00044 * 00045 * @ingroup tripal_jobs_api 00046 */ 00047 function tripal_add_job ($job_name,$modulename,$callback,$arguments,$uid,$priority = 10){ 00048 00049 # convert the arguments into a string for storage in the database 00050 $args = implode("::",$arguments); 00051 00052 $record = new stdClass(); 00053 $record->job_name = $job_name; 00054 $record->modulename = $modulename; 00055 $record->callback = $callback; 00056 $record->status = 'Waiting'; 00057 $record->submit_date = time(); 00058 $record->uid = $uid; 00059 $record->priority = $priority; # the lower the number the higher the priority 00060 if($args){ 00061 $record->arguments = $args; 00062 } 00063 if(drupal_write_record('tripal_jobs',$record)){ 00064 $jobs_url = url("admin/tripal/tripal_jobs"); 00065 drupal_set_message(t("Job '$job_name' submitted. Check the <a href='$jobs_url'>jobs page</a> for status")); 00066 } else { 00067 drupal_set_message("Failed to add job $job_name."); 00068 } 00069 00070 return $record->job_id; 00071 } 00072 00073 /** 00074 * An internal function for setting the progress for a current job 00075 * 00076 * @param $job_id 00077 * The job_id to set the progress for 00078 * @param $percentage 00079 * The progress to set the job to 00080 * 00081 * @return 00082 * True on success and False otherwise 00083 * 00084 * @ingroup tripal_core 00085 */ 00086 function tripal_job_set_progress($job_id,$percentage){ 00087 00088 if(preg_match("/^(\d\d|100)$/",$percentage)){ 00089 $record = new stdClass(); 00090 $record->job_id = $job_id; 00091 $record->progress = $percentage; 00092 if(drupal_write_record('tripal_jobs',$record,'job_id')){ 00093 return 1; 00094 } 00095 } 00096 return 0; 00097 } 00098 00099 /** 00100 * Returns a list of jobs associated with the given module 00101 * 00102 * @param $modulename 00103 * The module to return a list of jobs for 00104 * 00105 * @return 00106 * An array of objects where each object describes a tripal job 00107 * 00108 * @ingroup tripal_jobs_api 00109 */ 00110 function tripal_get_module_active_jobs ($modulename){ 00111 $sql = "SELECT * FROM {tripal_jobs} TJ ". 00112 "WHERE TJ.end_time IS NULL and TJ.modulename = '%s' "; 00113 return db_fetch_object(db_query($sql,$modulename)); 00114 00115 } 00116 /** 00117 * Returns the Tripal Job Report 00118 * 00119 * @return 00120 * The HTML to be rendered which describes the job report 00121 * 00122 * @ingroup tripal_core 00123 */ 00124 function tripal_jobs_report () { 00125 //$jobs = db_query("SELECT * FROM {tripal_jobs} ORDER BY job_id DESC"); 00126 $jobs = pager_query( 00127 "SELECT TJ.job_id,TJ.uid,TJ.job_name,TJ.modulename,TJ.progress, 00128 TJ.status as job_status, TJ,submit_date,TJ.start_time, 00129 TJ.end_time,TJ.priority,U.name as username 00130 FROM {tripal_jobs} TJ 00131 INNER JOIN users U on TJ.uid = U.uid 00132 ORDER BY job_id DESC", 10,0,"SELECT count(*) FROM {tripal_jobs}"); 00133 00134 // create a table with each row containig stats for 00135 // an individual job in the results set. 00136 $output .= "Waiting jobs are executed first by priority level (the lower the ". 00137 "number the higher the priority) and second by the order they ". 00138 "were entered"; 00139 $output .= "<table class=\"tripal-table tripal-table-horz\">". 00140 " <tr>". 00141 " <th>Job ID</th>". 00142 " <th>User</th>". 00143 " <th>Job Name</th>". 00144 " <th nowrap>Dates</th>". 00145 " <th>Priority</th>". 00146 " <th>Progress</th>". 00147 " <th>Status</th>". 00148 " <th>Actions</th>". 00149 " </tr>"; 00150 $i = 0; 00151 while($job = db_fetch_object($jobs)){ 00152 $class = 'tripal-table-odd-row'; 00153 if($i % 2 == 0 ){ 00154 $class = 'tripal-table-even-row'; 00155 } 00156 $submit = tripal_jobs_get_submit_date($job); 00157 $start = tripal_jobs_get_start_time($job); 00158 $end = tripal_jobs_get_end_time($job); 00159 00160 $cancel_link = ''; 00161 if($job->start_time == 0 and $job->end_time == 0){ 00162 $cancel_link = "<a href=\"".url("admin/tripal/tripal_jobs/cancel/".$job->job_id)."\">Cancel</a><br>"; 00163 } 00164 $rerun_link = "<a href=\"".url("admin/tripal/tripal_jobs/rerun/".$job->job_id)."\">Re-run</a><br>"; 00165 $view_link ="<a href=\"".url("admin/tripal/tripal_jobs/view/".$job->job_id)."\">View</a>"; 00166 $output .= " <tr class=\"$class\">"; 00167 $output .= " <td>$job->job_id</td>". 00168 " <td>$job->username</td>". 00169 " <td>$job->job_name</td>". 00170 " <td nowrap>Submit Date: $submit". 00171 " <br>Start Time: $start". 00172 " <br>End Time: $end</td>". 00173 " <td>$job->priority</td>". 00174 " <td>$job->progress%</td>". 00175 " <td>$job->job_status</td>". 00176 " <td>$cancel_link $rerun_link $view_link</td>". 00177 " </tr>"; 00178 $i++; 00179 } 00180 $output .= "</table>"; 00181 $output .= theme_pager(); 00182 return $output; 00183 } 00184 00185 /** 00186 * Returns the start time for a given job 00187 * 00188 * @param $job 00189 * An object describing the job 00190 * 00191 * @return 00192 * The start time of the job if it was already run and either "Cancelled" or "Not Yet Started" otherwise 00193 * 00194 * @ingroup tripal_jobs_api 00195 */ 00196 function tripal_jobs_get_start_time($job){ 00197 if($job->start_time > 0){ 00198 $start = format_date($job->start_time); 00199 } else { 00200 if(strcmp($job->job_status,'Cancelled')==0){ 00201 $start = 'Cancelled'; 00202 } else { 00203 $start = 'Not Yet Started'; 00204 } 00205 } 00206 return $start; 00207 } 00208 00209 /** 00210 * Returns the end time for a given job 00211 * 00212 * @param $job 00213 * An object describing the job 00214 * 00215 * @return 00216 * The end time of the job if it was already run and empty otherwise 00217 * 00218 * @ingroup tripal_jobs_api 00219 */ 00220 function tripal_jobs_get_end_time($job){ 00221 if($job->end_time > 0){ 00222 $end = format_date($job->end_time); 00223 } else { 00224 $end = ''; 00225 } 00226 return $end; 00227 } 00228 00229 /** 00230 * Returns the date the job was added to the queue 00231 * 00232 * @param $job 00233 * An object describing the job 00234 * 00235 * @return 00236 * The date teh job was submitted 00237 * 00238 * @ingroup tripal_jobs_api 00239 */ 00240 function tripal_jobs_get_submit_date($job){ 00241 return format_date($job->submit_date); 00242 } 00243 00244 /** 00245 * A function used to manually launch all queued tripal jobs 00246 * 00247 * @param $do_parallel 00248 * A boolean indicating whether jobs should be attempted to run in parallel 00249 * 00250 * @ingroup tripal_jobs_api 00251 */ 00252 function tripal_jobs_launch ($do_parallel = 0){ 00253 00254 // first check if any jobs are currently running 00255 // if they are, don't continue, we don't want to have 00256 // more than one job script running at a time 00257 if(!$do_parallel and tripal_jobs_check_running()){ 00258 return; 00259 } 00260 00261 // get all jobs that have not started and order them such that 00262 // they are processed in a FIFO manner. 00263 $sql = "SELECT * FROM {tripal_jobs} TJ ". 00264 "WHERE TJ.start_time IS NULL and TJ.end_time IS NULL ". 00265 "ORDER BY priority ASC,job_id ASC"; 00266 $job_res = db_query($sql); 00267 while($job = db_fetch_object($job_res)){ 00268 00269 // set the start time for this job 00270 $record = new stdClass(); 00271 $record->job_id = $job->job_id; 00272 $record->start_time = time(); 00273 $record->status = 'Running'; 00274 $record->pid = getmypid(); 00275 drupal_write_record('tripal_jobs',$record,'job_id'); 00276 00277 // call the function provided in the callback column. 00278 // Add the job_id as the last item in the list of arguments. All 00279 // callback functions should support this argument. 00280 $callback = $job->callback; 00281 $args = split("::",$job->arguments); 00282 $args[] = $job->job_id; 00283 print "Calling: $callback(" . implode(", ",$args) . ")\n"; 00284 call_user_func_array($callback,$args); 00285 00286 // set the end time for this job 00287 $record->end_time = time(); 00288 $record->status = 'Completed'; 00289 $record->progress = '100'; 00290 drupal_write_record('tripal_jobs',$record,'job_id'); 00291 00292 // send an email to the user advising that the job has finished 00293 } 00294 } 00295 00296 /** 00297 * Returns a list of running tripal jobs 00298 * 00299 * @return 00300 * and array of objects where each object describes a running job or false if no jobs are running 00301 * 00302 * @ingroup tripal_jobs_api 00303 */ 00304 function tripal_jobs_check_running () { 00305 // iterate through each job that has not ended 00306 // and see if it is still running. If it is not 00307 // running but does not have an end_time then 00308 // set the end time and set the status to 'Error' 00309 $sql = "SELECT * FROM {tripal_jobs} TJ ". 00310 "WHERE TJ.end_time IS NULL and NOT TJ.start_time IS NULL "; 00311 $jobs = db_query($sql); 00312 while($job = db_fetch_object($jobs)){ 00313 if($job->pid and posix_kill($job->pid, 0)) { 00314 // the job is still running so let it go 00315 // we return 1 to indicate that a job is running 00316 print "Job is still running (pid $job->pid)\n"; 00317 return 1; 00318 } else { 00319 // the job is not running so terminate it 00320 $record = new stdClass(); 00321 $record->job_id = $job->job_id; 00322 $record->end_time = time(); 00323 $record->status = 'Error'; 00324 $record->error_msg = 'Job has terminated unexpectedly.'; 00325 drupal_write_record('tripal_jobs',$record,'job_id'); 00326 } 00327 } 00328 // return 1 to indicate that no jobs are currently running. 00329 return 0; 00330 } 00331 00332 /** 00333 * Returns the HTML code to display a given job 00334 * 00335 * @param $job_id 00336 * The job_id of the job to display 00337 * 00338 * @return 00339 * The HTML describing the indicated job 00340 * @ingroup tripal_core 00341 */ 00342 function tripal_jobs_view ($job_id){ 00343 return theme('tripal_core_job_view',$job_id); 00344 } 00345 00346 /** 00347 * Registers variables for the tripal_core_job_view themeing function 00348 * 00349 * @param $variables 00350 * An array containing all variables supplied to this template 00351 * 00352 * @ingroup tripal_core 00353 */ 00354 function tripal_core_preprocess_tripal_core_job_view (&$variables){ 00355 // get the job record 00356 $job_id = $variables['job_id']; 00357 $sql = 00358 "SELECT TJ.job_id,TJ.uid,TJ.job_name,TJ.modulename,TJ.progress, 00359 TJ.status as job_status, TJ,submit_date,TJ.start_time, 00360 TJ.end_time,TJ.priority,U.name as username,TJ.arguments, 00361 TJ.callback,TJ.error_msg,TJ.pid 00362 FROM {tripal_jobs} TJ 00363 INNER JOIN users U on TJ.uid = U.uid 00364 WHERE TJ.job_id = %d"; 00365 $job = db_fetch_object(db_query($sql,$job_id)); 00366 00367 // we do not know what the arguments are for and we want to provide a 00368 // meaningful description to the end-user. So we use a callback function 00369 // deinfed in the module that created the job to describe in an array 00370 // the arguments provided. If the callback fails then just use the 00371 // arguments as they are 00372 $args = preg_split("/::/",$job->arguments); 00373 $arg_hook = $job->modulename."_job_describe_args"; 00374 if(is_callable($arg_hook)){ 00375 $new_args = call_user_func_array($arg_hook,array($job->callback,$args)); 00376 if(is_array($new_args) and count($new_args)){ 00377 $job->arguments = $new_args; 00378 } else { 00379 $job->arguments = $args; 00380 } 00381 } else { 00382 $job->arguments = $args; 00383 } 00384 00385 // make our start and end times more legible 00386 $job->submit_date = tripal_jobs_get_submit_date($job); 00387 $job->start_time = tripal_jobs_get_start_time($job); 00388 $job->end_time = tripal_jobs_get_end_time($job); 00389 00390 // add the job to the variables that get exported to the template 00391 $variables['job'] = $job; 00392 } 00393 /** 00394 * Set a job to be re-ran (ie: add it back into the job queue) 00395 * 00396 * @param $job_id 00397 * The job_id of the job to be re-ran 00398 * 00399 * @ingroup tripal_jobs_api 00400 */ 00401 function tripal_jobs_rerun ($job_id){ 00402 global $user; 00403 00404 $sql = "select * from {tripal_jobs} where job_id = %d"; 00405 $job = db_fetch_object(db_query($sql,$job_id)); 00406 00407 $args = explode("::",$job->arguments); 00408 tripal_add_job ($job->job_name,$job->modulename,$job->callback,$args,$user->uid, 00409 $job->priority); 00410 00411 drupal_goto("admin/tripal/tripal_jobs"); 00412 } 00413 00414 /** 00415 * Cancel a Tripal Job currently waiting in the job queue 00416 * 00417 * @param $job_id 00418 * The job_id of the job to be cancelled 00419 * 00420 * @ingroup tripal_jobs_api 00421 */ 00422 function tripal_jobs_cancel ($job_id){ 00423 $sql = "select * from {tripal_jobs} where job_id = %d"; 00424 $job = db_fetch_object(db_query($sql,$job_id)); 00425 00426 // set the end time for this job 00427 if($job->start_time == 0){ 00428 $record = new stdClass(); 00429 $record->job_id = $job->job_id; 00430 $record->end_time = time(); 00431 $record->status = 'Cancelled'; 00432 $record->progress = '0'; 00433 drupal_write_record('tripal_jobs',$record,'job_id'); 00434 drupal_set_message("Job #$job_id cancelled"); 00435 } else { 00436 drupal_set_message("Job #$job_id cannot be cancelled. It is in progress or has finished."); 00437 } 00438 drupal_goto("admin/tripal/tripal_jobs"); 00439 }