001 /* 002 * LSFDispatcher.java 003 * 004 * Created on December 23, 2002, 11:30 AM 005 * 006 * This file is part of the STAR Scheduler. 007 * Copyright (c) 2002-2006 STAR Collaboration - Brookhaven National Laboratory 008 * 009 * STAR Scheduler is free software; you can redistribute it and/or modify 010 * it under the terms of the GNU General Public License as published by 011 * the Free Software Foundation; either version 2 of the License, or 012 * (at your option) any later version. 013 * 014 * STAR Scheduler is distributed in the hope that it will be useful, 015 * but WITHOUT ANY WARRANTY; without even the implied warranty of 016 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 017 * GNU General Public License for more details. 018 * 019 * You should have received a copy of the GNU General Public License 020 * along with STAR Scheduler; if not, write to the Free Software 021 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA 022 */ 023 package gov.bnl.star.offline.scheduler.Dispatchers.local; 024 025 import gov.bnl.star.offline.scheduler.Dispatchers.lsf.*; 026 import gov.bnl.star.offline.scheduler.ComponentLibrary; 027 import gov.bnl.star.offline.scheduler.Dispatcher; 028 import gov.bnl.star.offline.scheduler.Queue; 029 import gov.bnl.star.offline.scheduler.Job; 030 import gov.bnl.star.offline.scheduler.request.Request; 031 import gov.bnl.star.offline.scheduler.catalog.PhysicalFile; 032 import gov.bnl.star.offline.scheduler.util.CSHCommandLineTask; 033 //import gov.bnl.star.offline.scheduler.util.StatisticsRecorder; //Moved Statistics recording to Scheduler.java LH 034 import gov.bnl.star.offline.scheduler.util.GenericResourceRequirementStringDefinition; 035 036 037 import java.util.List; 038 import java.util.logging.Level; 039 import org.apache.log4j.Logger; 040 041 042 043 /** A dispatcher for dispatching jobs to fork on the local node. 044 * 045 * The simulation flag will make the scheduler not actually execute the command 046 * lines. Therefore scripts and fileLists are created, but the 047 * commands are not executed. Log and output won't be affected, except that there 048 * will be a message warning that the submission is simulated. 049 * 050 * @author Levente Hajdu 051 * @version 1.0 2002/12/26 052 */ 053 public class LocalDispatcher extends LSFDispatcher implements Dispatcher { 054 055 static private Logger log = Logger.getLogger(LocalDispatcher.class.getName()); 056 057 private String bsubEx; 058 protected boolean simulation = false; 059 private String bsubOptions; 060 private int maxBsubAttempts; 061 private int msBtwnSuccess; 062 private int msBtwnFailure; 063 private String ResReqDefinitionObj; 064 065 public void setResourceRequirementStringDefinition(String ResReqDefinitionObj){ 066 this.ResReqDefinitionObj = ResReqDefinitionObj; 067 068 } 069 070 071 /** @return max number of times submitting the job should be tried. **/ 072 public int getMaxAttempts() { 073 return maxBsubAttempts; 074 } 075 076 /** Set max number of times submitting the job should be tried. 077 * @param maxAttempts numbmer of attempts 078 **/ 079 public void setMaxAttempts(int maxAttempts) { 080 this.maxBsubAttempts = maxAttempts; 081 } 082 083 /** @return The number of milliseconds to delay between successful submissions. */ 084 public int getMsBtwnSuccess() { 085 return msBtwnSuccess; 086 } 087 088 /** sets the number of milliseconds to delay between successful submissions. 089 * @param msBtwnSuccess delay in milliseconds 090 */ 091 public void setMsBtwnSuccess(int msBtwnSuccess) { 092 this.msBtwnSuccess = msBtwnSuccess; 093 } 094 095 /** @return The number of milliseconds to delay between failed submissions. 096 */ 097 public int getMsBtwnFailure() { 098 return msBtwnFailure; 099 } 100 101 /* Sets the number of milliseconds to delay between failed submissions. 102 * @param msBtwnSuccess delay in milliseconds 103 */ 104 public void setMsBtwnFailure(int msBtwnFailure) { 105 this.msBtwnFailure = msBtwnFailure; 106 } 107 108 protected boolean reportedFailure; 109 protected CSHApplication application; 110 private String resSwitch; 111 112 void getGenericResourceRequirementStringDefinition(){ 113 114 } 115 116 //GenericResourceRequirementStringDefinition 117 118 /** Creates the scripts and dispatches the job on the target machine. 119 * @param request the job request 120 */ 121 122 public void dispatch(Request request, List jobs) { 123 log.info("Dispatching using local system: \"" + request.getCommand() + "\""); 124 125 // Enables the simulation mode if necessary 126 useSimulationMode(request.getSimulation()); 127 128 129 130 reportedFailure = false; 131 132 // Submits from the higher to the lower JobID. This way the 133 // user has a feel of when the last job is going to be 134 // submitted 135 for (int nJob = jobs.size() - 1; nJob >= 0; 136 nJob--) { 137 Job job = (Job) jobs.get(nJob); 138 139 System.out.print("Dispatching process " + 140 job.getJobID() + "."); 141 dispatch(request, job); 142 if (getClusterName() != null) job.setCluster(getClusterName()); 143 } 144 145 //StatisticsRecorder.getIntance().recordStatistics(request, jobs); //Moved Statistics recording to Scheduler.java LH 146 } 147 148 /* Enables or disables the simulation mode. The simulation mode will deactivate 149 * every command line execution. 150 */ 151 152 153 public void useSimulationMode(boolean simulation) { 154 this.simulation = simulation; 155 156 if (simulation) { 157 // Warn the user that we are entering simulated submission mode 158 log.warn("Simulating submission"); 159 System.out.println("Simulating submission"); 160 } 161 } 162 163 164 165 /* Dispatches a single process of a job request.*/ 166 protected void dispatch(Request request, Job job) { 167 //application = (CSHApplication) ComponentLibrary.getInstance().getComponent("CSHApplication"); 168 //No longer get the CSHApplication object from the config file. Get it using the setApplication() and getApplication() via the config file 169 if(application == null){ //If this was not set in the config file or if we'er in junit testing mode then print an error, and use the default file 170 System.out.println(" >>>>>>>>>>>> CSHapplacation is not is italized."); 171 String notSet = "The CSHApplication for this dispatcher was not set in the config file. If this is a jUnit test this is normal. Finding default \"CSHApplication\" in ComponentLibrary."; 172 log.warn(notSet); 173 System.out.println(notSet); 174 application = (CSHApplication) ComponentLibrary.getInstance().getComponent("CSHApplication"); 175 } 176 177 //System.out.println("debug 100 dispatching!!"); 178 179 // TODO: all the parameters should be passed in one go 180 application.setJob(request, job); 181 //application.setScratchDir(scratchDir); 182 application.setSubmissionCommand(getLocalDispatchCommand(request, job)); 183 184 application.prepareJob(); 185 186 log.info("Executing \"" + getLocalDispatchCommand(request, job) + "\""); 187 188 if (!simulation) { 189 //System.out.println("!Not simulation"); //used for debuginh 190 try { 191 Thread.sleep(msBtwnSuccess); 192 } catch (Exception e) { 193 } 194 195 long StarTime = System.currentTimeMillis(); 196 int attempt = 0; 197 boolean success = false; 198 String pe=""; 199 200 while (!success && (attempt < maxBsubAttempts) && run) { 201 try { 202 CSHCommandLineTask task = new CSHCommandLineTask(getLocalDispatchCommand( 203 request, job), true, getMaxElapseTime()); 204 task.execute(); 205 206 if (task.getExitStatus() != 0) { 207 log.warn("Local submit failed: " + task.getOutput()); 208 Thread.sleep(msBtwnFailure); 209 System.out.print("/"); 210 attempt++; 211 } else { 212 success = true; 213 job.DispatchSuccessful(); 214 215 216 //change code to get the local pid 217 //job.AddProcesseID(task.getOutput().substring(task.getOutput().indexOf("your job") + 8 ,task.getOutput().indexOf('(') - 1).trim()); 218 219 220 job.setDispatchTime(((int) Math.min(System.currentTimeMillis() - StarTime, java.lang.Integer.MAX_VALUE))); 221 222 223 //The // lines are used for debuging to verfiy the pid was recovored corectly. 224 //System.out.println("output >> " + task.getOutput() + "\n" ); 225 //System.out.println("pid = \"" + task.getOutput().substring(task.getOutput().indexOf("]") + 1,task.getOutput().indexOf('\n')).trim() + "\""); 226 job.AddProcesseID(task.getOutput().substring(task.getOutput().indexOf("]") + 1,task.getOutput().indexOf('\n')).trim()); 227 228 229 } 230 } catch (Exception e) { 231 log.error("Couldn't submit the script to SGE", e); 232 // JL: write also to STDOUT to help users debug what happened 233 if (pe == ""){ 234 pe = e.toString(); 235 System.out.print("Couldn't submit the script to SGE" + pe); 236 } 237 try { 238 Thread.sleep(msBtwnFailure); 239 } catch (Exception e1) { 240 } 241 242 System.out.print("/"); 243 attempt++; 244 } 245 } 246 247 if (success) { 248 System.out.println(" done."); 249 } else { 250 System.out.println(" FAILED!!"); 251 } 252 } else { 253 System.out.println(" simulated."); 254 } 255 } 256 257 /** Retunes the srtring (command) to submit the job. **/ 258 String getLocalDispatchCommand(Request request, Job job) { 259 260 //System.out.println("commmand = \"" + application.getCommandLine() + "\""); 261 //return "csh -c " + application.getCommandLine(); 262 263 264 //The output should look like this "(job.csh > file.out) >& error.out &" 265 266 StringBuffer LocalDispatchCommand = new StringBuffer("( " + application.getCommandLine()); 267 268 if (application.getStdout() != null) { 269 LocalDispatchCommand.append("> " + application.getStdout()); 270 } 271 272 LocalDispatchCommand.append(" )"); 273 274 if (application.getStderr() != null) { 275 LocalDispatchCommand.append(" >& ").append(application.getStderr()); 276 } 277 278 LocalDispatchCommand.append(" &"); 279 280 return LocalDispatchCommand.toString(); 281 282 283 } 284 285 286 /** Set the class that writes the sricpt that will be executed by the batch system */ 287 public void setApplication(CSHApplication application){ 288 this.application = application; 289 } 290 291 /** Get the class that writes the sricpt that will be executed by the batch system */ 292 public CSHApplication getApplication(){ 293 return application; 294 } 295 296 297 298 /**Kill all submitted jobs in the job list. 299 * @param request the job request 300 * @param jobs a list of jobs to be killed 301 **/ 302 public void Kill(Request request, List jobs) { 303 304 //System.out.println("lsfr kill"); 305 306 for(int z=0; z != jobs.size(); z++){ 307 Job job = (Job)jobs.get(z); 308 309 //System.out.println("working no job : " + job.getJobID()); 310 311 if(job.getProcesseIDs().size() == 0){ 312 System.out.println("No ProcesseIDs found for job " + job.getJobID()); 313 jobs.remove(z); 314 z--; 315 316 } 317 else{ 318 for(int i=0; job.getProcesseIDs().size() != i; i++){ 319 320 int attempt = 0; 321 boolean success = false; 322 String commmandOutput = ""; 323 System.out.print("ProcesseID: <" + job.getProcesseIDs().get(i) + "> of Job: <" + job.getJobID() + ">"); 324 325 while (!success && (attempt < maxBsubAttempts) && run) { 326 try { 327 CSHCommandLineTask task = new CSHCommandLineTask("kill " + ((String) job.getProcesseIDs().get(i)) , true, getMaxElapseTime()); 328 task.execute(); 329 if (task.getExitStatus() != 0) { 330 log.warn("kill " + task.getOutput()); 331 Thread.sleep(msBtwnFailure); 332 if(task.getOutput().lastIndexOf("does not exist") != -1) success = true; 333 System.out.print(task.getOutput()); 334 attempt++; 335 } 336 else{ 337 success = true; 338 System.out.println("Killed"); 339 } 340 341 commmandOutput = task.getOutput(); 342 } 343 catch (Exception e) { System.out.print("kill failed" + e); 344 System.out.print(commmandOutput); 345 } 346 try { Thread.sleep(msBtwnFailure);} 347 catch (Exception e1) {System.out.print("kill failed");} 348 if(!success) System.out.print("/"); 349 attempt++; 350 } 351 352 } 353 job.clearProcesseIDs(); 354 jobs.remove(z); 355 z--; 356 } 357 } 358 } 359 360 /** Get the status of the jobs object processe N. 361 * @param job The job object of the running process 362 * @param Processe the index of the running process of this object 363 **/ 364 public String Status(Job job, int Processe) { 365 //return "No data avalable (Local Processe)"; 366 367 //ps -o stat 11000 368 369 370 if(job.getProcesseIDs().size() == 0) return "No ProcesseIDs found for job " + job.getJobID(); 371 if(job.getProcesseIDs().size() < Processe) return job.getJobID() + " only has " + job.getProcesseIDs().size() + "processes, processe " + Processe + "dose not exist."; 372 373 374 // for(int i=0; job.getProcesseIDs().size() != i; i++){ 375 376 int attempt = 0; 377 boolean success = false; 378 String commmandOutput = ""; 379 System.out.print("ProcesseID: <" + job.getProcesseIDs().get(Processe) + "> of Job: <" + job.getJobID() + ">"); 380 381 while (!success && (attempt < maxBsubAttempts)) { 382 try { 383 CSHCommandLineTask task = new CSHCommandLineTask("ps -o stat " + ((String) job.getProcesseIDs().get(Processe)), true, getMaxElapseTime()); 384 task.execute(); 385 if (task.getExitStatus() != 0) { 386 log.warn("ps : " + task.getOutput()); 387 Thread.sleep(msBtwnFailure); 388 389 attempt++; 390 } 391 else{ 392 success = true; 393 394 String state = task.getOutput().replaceFirst("STAT","").replaceAll("\n", "").trim(); 395 396 if(state == null) return "job is currently not running"; 397 398 //state.compareTo("D") == 0 399 400 else if(state == "") return "job is currently not running"; 401 else if(state.compareTo("D") == 0) return "job is in uninterruptible sleep"; 402 else if(state.compareTo("R") == 0) return "job is on the run queue"; 403 else if(state.compareTo("S") == 0) return "sleeping"; 404 else if(state.compareTo("T") == 0) return "job is traced or stopped"; 405 else if(state.compareTo("Z") == 0) return "job is defunct (\"zombie\") process"; 406 else if(state.compareTo("W") == 0) return "job has no resident pages"; 407 else if(state.compareTo("<") == 0) return "job is high-priority"; 408 else if(state.compareTo("N") == 0) return "job is low-priority task"; 409 else if(state.compareTo("N") == 0) return "job is low-priority task"; 410 else if(state.compareTo("L") == 0) return "Job has pages locked into memory"; 411 else return "\"" + state + "\""; 412 413 } 414 415 commmandOutput = task.getOutput(); 416 } 417 catch (Exception e) { System.out.print("ps failed" + e); 418 System.out.print(commmandOutput); 419 } 420 try { Thread.sleep(msBtwnFailure);} 421 catch (Exception e1) {System.out.print("ps failed");} 422 if(!success) System.out.print("/"); 423 attempt++; 424 } 425 426 427 428 return "No status could be recovered / job is currently not running"; 429 } 430 431 public void retrieveOutput(Request job, List jobs) {} 432 433 public volatile boolean run = true; 434 public void stop(){ run = false; } 435 436 437 /** 438 * Runs test(s) on underlying components to determine if submitting jobs should be attempted. 439 * @param queue The queue object one is trying to submit to. 440 * @return Will return true to indicate everything is alright and false if the test has failed 441 */ 442 public boolean test(Queue queue){ 443 return true; 444 } 445 446 447 }