001 /* 002 * LSFDispatcher.java 003 * 004 * Created on December 23, 2002, 11:30 AM 005 * 006 * This file is part of the STAR Scheduler. 007 * Copyright (c) 2002-2006 STAR Collaboration - Brookhaven National Laboratory 008 * 009 * STAR Scheduler is free software; you can redistribute it and/or modify 010 * it under the terms of the GNU General Public License as published by 011 * the Free Software Foundation; either version 2 of the License, or 012 * (at your option) any later version. 013 * 014 * STAR Scheduler is distributed in the hope that it will be useful, 015 * but WITHOUT ANY WARRANTY; without even the implied warranty of 016 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 017 * GNU General Public License for more details. 018 * 019 * You should have received a copy of the GNU General Public License 020 * along with STAR Scheduler; if not, write to the Free Software 021 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA 022 */ 023 package gov.bnl.star.offline.scheduler.Dispatchers.lsf; 024 025 import gov.bnl.star.offline.scheduler.ComponentLibrary; 026 import gov.bnl.star.offline.scheduler.Dispatcher; 027 import gov.bnl.star.offline.scheduler.Queue; 028 import gov.bnl.star.offline.scheduler.Job; 029 import gov.bnl.star.offline.scheduler.request.Request; 030 import gov.bnl.star.offline.scheduler.catalog.PhysicalFile; 031 import gov.bnl.star.offline.scheduler.util.CSHCommandLineTask; 032 //import gov.bnl.star.offline.scheduler.util.StatisticsRecorder; //Moved Statistics recording to Scheduler.java LH 033 import gov.bnl.star.offline.scheduler.util.GenericResourceRequirementStringDefinition; 034 035 036 import gov.bnl.star.offline.scheduler.Dispatchers.lsf.LSFNodePriorityStringGenerator; 037 038 import gov.bnl.star.offline.scheduler.Dispatchers.AbstractResourceStrategy; 039 import gov.bnl.star.offline.scheduler.util.FilesystemToolkit; 040 041 042 043 import java.util.List; 044 import java.util.ArrayList; 045 import java.util.logging.Level; 046 import org.apache.log4j.Logger; 047 048 049 050 /** Dispatches a job using LSF. 051 * <p> 052 * For each process, two files are created: a script for the execution and 053 * a text file containing the file list. The script basically sets the 054 * environment variables and executes the command line. The file list 055 * contains the input file requested, one full path for each line in the list. 056 * <p> 057 * Each script is submitted through bsub. 058 * <p> 059 * The simulation flag will make the scheduler not actually execute the command 060 * lines. Therefore scripts and fileLists are created, but the bsub and chmod 061 * commands are not executed. Log and output won't be affected, except that there 062 * will be a message warning that the submission is simulated. 063 * 064 * @author Gabriele Carcassi, Jerome Lauret, Levente Hajdu 065 * @version 1.0 2002/12/26 066 */ 067 public class LSFDispatcher extends gov.bnl.star.offline.scheduler.Dispatchers.DispatcherBase implements Dispatcher, java.io.Serializable { 068 static private Logger log = Logger.getLogger(LSFDispatcher.class.getName()); 069 private String resourceStrategy; 070 protected String scratchDir; 071 private String bsubEx; 072 protected boolean simulation = false; 073 private String queueName; 074 private String bsubOptions; 075 private int maxBsubAttempts; 076 private int msBtwnSuccess; 077 private int msBtwnFailure; 078 private int maxElapseTime = 30000; 079 private String ResReqDefinitionObj; 080 private boolean omitTargetNode = false; 081 082 public void setOmitTargetNode(boolean omitTargetNode){this.omitTargetNode = omitTargetNode;} 083 public boolean getOmitTargetNode(){return omitTargetNode;} 084 085 public void setMaxElapseTime(int maxElapseTime){ 086 this.maxElapseTime = maxElapseTime; 087 } 088 089 public int getMaxElapseTime(){ 090 return maxElapseTime; 091 } 092 093 094 public void setResourceRequirementStringDefinition(String ResReqDefinitionObj){ 095 this.ResReqDefinitionObj = ResReqDefinitionObj; 096 } 097 098 099 // public String getScratchDir() { 100 // return scratchDir; 101 // } 102 // 103 // public void setScratchDir(String scratchDir) { 104 // this.scratchDir = scratchDir; 105 // } 106 107 public String getBsubEx() { 108 return bsubEx; 109 } 110 111 public void setBsubEx(String bsubEx) { 112 this.bsubEx = bsubEx; 113 } 114 115 public String getQueueName() { 116 return queueName; 117 } 118 119 public void setQueueName(String queueName) { 120 this.queueName = queueName; 121 } 122 123 public String getBsubOptions() { 124 return bsubOptions; 125 } 126 127 public void setBsubOptions(String bsubOptions) { 128 this.bsubOptions = bsubOptions; 129 } 130 131 public int getMaxAttempts() { 132 return maxBsubAttempts; 133 } 134 135 public void setMaxAttempts(int maxAttempts) { 136 this.maxBsubAttempts = maxAttempts; 137 } 138 139 public int getMsBtwnSuccess() { 140 return msBtwnSuccess; 141 } 142 143 public void setMsBtwnSuccess(int msBtwnSuccess) { 144 this.msBtwnSuccess = msBtwnSuccess; 145 } 146 147 public int getMsBtwnFailure() { 148 return msBtwnFailure; 149 } 150 151 public void setMsBtwnFailure(int msBtwnFailure) { 152 this.msBtwnFailure = msBtwnFailure; 153 } 154 155 protected boolean reportedFailure; 156 protected CSHApplication application; 157 private String resSwitch; 158 159 /** Creates the scripts and dispatches the job on the target machine. 160 * @param request the job request 161 */ 162 public void dispatch(Request request, List jobs) { 163 log.info("Dispatching using LSF: \"" + request.getCommand() + "\""); 164 165 // Enables the simulation mode if necessary 166 useSimulationMode(request.getSimulation()); 167 reportedFailure = false; 168 169 // Submits from the higher to the lower JobID. This way the 170 // user has a feel of when the last job is going to be 171 // submitted 172 for (int nJob = jobs.size() - 1; nJob >= 0; 173 nJob--) { 174 Job job = (Job) jobs.get(nJob); 175 176 System.out.print("Dispatching process " + 177 job.getJobID() + "."); 178 dispatch(request, job); 179 if (getClusterName() != null) job.setCluster(getClusterName()); 180 } 181 182 //StatisticsRecorder.getIntance().recordStatistics(request, jobs); //Moved Statistics recording to Scheduler.java LH 183 } 184 185 /* Enables or disables the simulation mode. The simulation mode will deactivate 186 * every command line execution. 187 */ 188 public void useSimulationMode(boolean simulation) { 189 this.simulation = simulation; 190 if (simulation) { 191 // Warn the user that we are entering simulated submission mode 192 log.info("Simulating submission"); 193 System.out.println("Simulating submission"); 194 } 195 } 196 197 protected void reportProcessSubmissionFailure(Request request, Job job, int jobNumber, String message) { 198 reportFailure(job); 199 System.out.println("Process number " + jobNumber + " wasn't submitted."); 200 System.out.println(message); 201 System.out.println(); 202 System.out.println("The process input file were:"); 203 204 List list = job.getInput(); 205 206 for (int nFile = 0; nFile < list.size(); nFile++) { 207 PhysicalFile file = (PhysicalFile) list.get(nFile); 208 System.out.println(" - " + file.getPath() + "/" + 209 file.getFilename()); 210 } 211 } 212 213 protected void reportFailure(Job job) { 214 if (!reportedFailure) { 215 System.out.println("There were some errors during job submission."); 216 System.out.println("Some processes weren't submitted:"); 217 } 218 } 219 220 /** Currently not implemented 221 * @param request the job for which to retrieve the output 222 */ 223 public void retrieveOutput(Request request, List jobs) { 224 } 225 226 /* Dispatches a single process of a job request. 227 */ 228 protected void dispatch(Request request, Job job) { 229 230 231 232 if(application == null){ 233 System.out.println(" >>>>>>>>>>>> CSHapplacation is not is italized."); 234 235 String notSet = "The CSHApplication for this dispatcher was not set in the config file. If this is a jUnit test this is normal. Finding default \"CSHApplication\" in ComponentLibrary."; 236 log.warn(notSet); 237 System.out.println(notSet); 238 application = (CSHApplication) ComponentLibrary.getInstance().getComponent("CSHApplication"); 239 } 240 241 242 /* 243 244 Trying to use set/get for CSH applecation has major problems 245 246 */ 247 //application = (CSHApplication) ComponentLibrary.getInstance().getComponent("CSHApplication"); 248 249 250 // TODO: all the parameters should be passed in one go 251 application.setJob(request, job); 252 //application.setScratchDir(scratchDir); 253 application.setSubmissionCommand(getBsubCommand(request, job)); 254 255 application.prepareJob(); 256 257 log.info("Executing \"" + getBsubCommand(request, job) + "\""); 258 259 260 261 if (!simulation) { 262 try { 263 Thread.sleep(msBtwnSuccess); 264 } catch (Exception e) { 265 } 266 267 long StarTime = System.currentTimeMillis(); 268 int attempt = 0; 269 boolean success = false; 270 String pe=""; 271 272 while (!success && (attempt < maxBsubAttempts) && run) { 273 try { 274 //TODO: check if this object is really created multiple times or not 275 CSHCommandLineTask task = new CSHCommandLineTask(getBsubCommand(request, job), true, getMaxElapseTime()); 276 277 task.execute(); 278 if (task.getExitStatus() != 0) { 279 log.warn("bsub failed ("+getMaxElapseTime()+"): " + task.getOutput()); 280 Thread.sleep(msBtwnFailure); 281 System.out.print("/"); 282 attempt++; 283 } else { 284 success = true; 285 job.DispatchSuccessful(); 286 job.AddProcesseID(task.getOutput().substring(task.getOutput().indexOf("<") + 1, task.getOutput().indexOf(">"))); 287 job.setDispatchTime(((int) Math.min(System.currentTimeMillis() - StarTime, java.lang.Integer.MAX_VALUE))); 288 //System.out.println(job.getDispatchTime() + "ms"); 289 } 290 } catch (Exception e) { 291 log.error("Couldn't submit the script to LSF", e); 292 // JL: write also to STDOUT to help users debug what happened 293 if (pe == ""){ 294 pe = e.toString(); 295 System.out.print("Couldn't submit the script to LSF" + pe); 296 } 297 try { 298 Thread.sleep(msBtwnFailure); 299 } catch (Exception e1) { 300 } 301 302 System.out.print("/"); 303 attempt++; 304 } 305 } 306 307 if (success) { 308 System.out.println(" done."); 309 310 } else { 311 System.out.println(" FAILED!!"); 312 } 313 } else { 314 System.out.println(" simulated."); 315 } 316 } 317 318 /** Returns the full bsub command to be executed to dispatch the process. This 319 * command must executed in the directory in which the script resides. 320 * @return the bsub command 321 */ 322 String getBsubCommand(Request request, Job job) { 323 StringBuffer bsub = new StringBuffer("cd " + FilesystemToolkit.getCurrentDirectory() + "; " + bsubEx); 324 //bsub.append(bsubEx); 325 326 //bsub.append(" -q ").append(getQueueName(job)); 327 if(getQueueName(job) != null){ 328 //if(job.getQueueObj().getName().trim().length() != 0 ) bsub.append(" -q ").append(job.getQueueObj().getName()); 329 if(getQueueName(job).trim().length() != 0 ) bsub.append(" -q ").append(getQueueName(job)); 330 } 331 332 333 if (job.getTarget() != null) { 334 335 if("rootd".equals(request.getFileListType())){ 336 if(getNodePriorityStringGenerator() == null){ 337 System.out.println("Warring the LSFNodePriorityStringGenerator has note been initialized in the config file for this dispatcher. Please contact your SUMS administrator and report this message."); 338 bsub.append(" -m ").append(job.getTarget()); 339 } 340 else{ 341 bsub.append(" -m ").append(getNodePriorityStringGenerator().generateSyntax(job)); 342 } 343 } 344 else { 345 bsub.append(" -m ").append(job.getTarget()); 346 } 347 } 348 349 350 351 if (application.getJobName() != null) { 352 bsub.append(" -J '").append(application.getJobName()).append("'"); 353 } 354 355 if (application.getStdin() != null) { 356 bsub.append(" -i ").append(application.getStdin()); 357 } 358 359 if (application.getStdout() != null) { 360 bsub.append(" -o ").append(application.getStdout()); 361 } 362 363 if (application.getStderr() != null) { 364 bsub.append(" -e ").append(application.getStderr()); 365 } 366 367 if(job.getMaxMemory() != -1){ 368 bsub.append(" -M ").append(job.getMaxMemory() * 1000); 369 } 370 371 372 //This takes the GenericResourceRequirementStringDefinition (if any) and adds the ruage[] to it if the is any then it write out the string 373 GenericResourceRequirementStringDefinition lsfResReqDef = new GenericResourceRequirementStringDefinition(); 374 if(ResReqDefinitionObj != null) 375 lsfResReqDef = (GenericResourceRequirementStringDefinition) ComponentLibrary.getInstance().getComponent(ResReqDefinitionObj); 376 377 378 if ((getResourceUsageSwitch(job) != null)&&( lsfResReqDef.hasResourcesDefinition(job))) { 379 380 String Res = "\"(" + lsfResReqDef.makeString(job).replaceAll("\"", "").concat(") ").concat(getResourceUsageSwitch(job).replaceAll("\"", "")).concat("\""); 381 bsub.append(" -R ").append(Res); 382 } 383 else if(getResourceUsageSwitch(job) != null){ 384 bsub.append(" -R ").append(getResourceUsageSwitch(job)); 385 } 386 else if( lsfResReqDef.hasResourcesDefinition(job)){ 387 bsub.append(" -R ").append(lsfResReqDef.makeString(job)); 388 } 389 390 /* //////////////////////////lbh 391 392 if (getResourceUsageSwitch(job) != null) { 393 bsub.append(" -R ").append(getResourceUsageSwitch(job)); 394 } 395 */ 396 bsub.append(' ').append(bsubOptions); 397 398 399 400 /* 401 The path is being dropped because different drivers for mounting the PanFS path are reporting the path differently. Instated we will put "cd $path" before the command we execute. 402 Example: 403 /pdirect/star+institutions/data05/scratch/lbhajdu/ 404 /star/data05/scratch/lbhajdu/ 405 */ 406 bsub.append(' ').append(application.getCSHScriptFileName()); 407 //bsub.append(' ').append(application.getCommandLine()); 408 409 410 return bsub.toString(); 411 } 412 413 /** Holds value of property clusterName. */ 414 private String clusterName; 415 416 417 /* 418 // private LSFResourceStrategy lsfResourceStrategy; 419 private AbstractResourceStrategy lsfResourceStrategy; 420 // public void setResourceStrategy(LSFResourceStrategy resourceStrategy) { this.lsfResourceStrategy = resourceStrategy;} 421 // public LSFResourceStrategy getResourceStrategy() { return lsfResourceStrategy; 422 public void setResourceStrategy(AbstractResourceStrategy resourceStrategy) { this.lsfResourceStrategy = resourceStrategy;} 423 public AbstractResourceStrategy getResourceStrategy() { return lsfResourceStrategy;} 424 425 protected String getResourceUsageSwitch(Job job) { 426 //FIXME: cache value 427 if (getResourceStrategy() == null) return null; 428 resSwitch = getResourceStrategy().prepareResourceUsageSwitch(job); 429 return "\"rusage[" + resSwitch + "]\""; 430 }*/ 431 432 433 434 private List lsfResourceStrategy = new ArrayList(); 435 public List getResourceStrategyList(){ return lsfResourceStrategy; } 436 public void setResourceStrategyList(List lsfResourceStrategy){ this.lsfResourceStrategy = lsfResourceStrategy; } 437 public void addResourceStrategy(AbstractResourceStrategy resourceStrategy){lsfResourceStrategy.add(resourceStrategy);} 438 439 /**This function is deprecated but still exists to be backwards compatible with older configuration files. The function really calls addResourceStrategy(). For new configuration files please use addResourceStrategy() instead.*/ 440 public void setResourceStrategy(AbstractResourceStrategy resourceStrategy) { addResourceStrategy(resourceStrategy);} 441 442 //public AbstractResourceStrategy getResourceStrategy() { return lsfResourceStrategy;} 443 444 protected String getResourceUsageSwitch(Job job) { 445 446 resSwitch = null; 447 if (lsfResourceStrategy.isEmpty()) {return null;} 448 449 for(int i = 0; i != lsfResourceStrategy.size(); i++){ 450 AbstractResourceStrategy lsfStrategy = (AbstractResourceStrategy) lsfResourceStrategy.get(i); 451 452 String resSwichFrag = lsfStrategy.prepareResourceUsageSwitch(job); 453 454 if((resSwitch == null)&&(resSwichFrag != null)){ 455 resSwitch = resSwichFrag; 456 } 457 else if((resSwitch != null)&&(resSwichFrag != null)){ 458 resSwitch = resSwitch.concat(","+resSwichFrag); 459 } 460 } 461 462 463 if(resSwitch == null) return null; 464 return "\"rusage[" + resSwitch + "]\""; 465 466 467 //if (getResourceStrategy() == null) return null; 468 //resSwitch = getResourceStrategy().prepareResourceUsageSwitch(job); 469 //return "\"rusage[" + resSwitch + "]\""; 470 471 } 472 473 474 475 476 477 478 479 protected String getQueueName(Job job) { 480 String queue = job.getQueue(); 481 482 if (queue == null) { 483 return queueName; 484 } 485 486 return queue; 487 } 488 489 /** Getter for property clusterName. 490 * @return Value of property clusterName. 491 * 492 */ 493 public String getClusterName() { 494 return this.clusterName; 495 } 496 497 /** Setter for property clusterName. 498 * @param clusterName New value of property clusterName. 499 * 500 */ 501 public void setClusterName(String clusterName) { 502 this.clusterName = clusterName; 503 } 504 505 public void Kill(Request request, List jobs) { 506 507 //System.out.println("lsfr kill"); 508 509 for(int z=0; z != jobs.size(); z++){ 510 Job job = (Job)jobs.get(z); 511 512 //System.out.println("working no job : " + job.getJobID()); 513 514 if(job.getProcesseIDs().size() == 0){ 515 System.out.println("No ProcesseIDs found for job " + job.getJobID()); 516 jobs.remove(z); 517 z--; 518 519 } 520 else{ 521 for(int i=0; job.getProcesseIDs().size() != i; i++){ 522 523 int attempt = 0; 524 boolean success = false; 525 String commmandOutput = ""; 526 System.out.print("ProcesseID: <" + job.getProcesseIDs().get(i) + "> of Job: <" + job.getJobID() + ">"); 527 528 while (!success && (attempt < maxBsubAttempts)) { 529 try { 530 CSHCommandLineTask task = new CSHCommandLineTask("bkill " + ((String) job.getProcesseIDs().get(i)) , true, getMaxElapseTime()); 531 task.execute(); 532 if (task.getExitStatus() != 0) { 533 log.warn("bkill " + task.getOutput()); 534 Thread.sleep(msBtwnFailure); 535 if(task.getOutput().lastIndexOf("already finished") != -1) success = true; 536 System.out.print(task.getOutput()); 537 attempt++; 538 } 539 else{ 540 success = true; 541 System.out.println("Killed"); 542 } 543 544 commmandOutput = task.getOutput(); 545 } 546 catch (Exception e) { System.out.print("bkill failed" + e); 547 System.out.print(commmandOutput); 548 } 549 try { Thread.sleep(msBtwnFailure);} 550 catch (Exception e1) {System.out.print("bkill failed");} 551 if(!success) System.out.print("/"); 552 attempt++; 553 } 554 555 } 556 job.clearProcesseIDs(); 557 jobs.remove(z); 558 z--; 559 } 560 } 561 } 562 563 564 565 public String Status(Job job, int Processe) { 566 567 if(job.getProcesseIDs().size() == 0) return "No ProcesseIDs found for job " + job.getJobID(); 568 if(job.getProcesseIDs().size() < Processe) return job.getJobID() + " only has " + job.getProcesseIDs().size() + "processes, processe " + Processe + "dose not exist."; 569 570 571 // for(int i=0; job.getProcesseIDs().size() != i; i++){ 572 573 int attempt = 0; 574 boolean success = false; 575 String commmandOutput = ""; 576 System.out.print("ProcesseID: <" + job.getProcesseIDs().get(Processe) + "> of Job: <" + job.getJobID() + ">"); 577 578 while (!success && (attempt < maxBsubAttempts)) { 579 try { 580 CSHCommandLineTask task = new CSHCommandLineTask("bjobs " + ((String) job.getProcesseIDs().get(Processe)) , true, getMaxElapseTime()); 581 task.execute(); 582 if (task.getExitStatus() != 0) { 583 log.warn("bkill " + task.getOutput()); 584 Thread.sleep(msBtwnFailure); 585 586 // if(task.getOutput().lastIndexOf("already finished") != -1) success = true; 587 //return (task.getOutput().replace('\n',' '); 588 attempt++; 589 } 590 else{ 591 success = true; 592 593 //task.getOutput().indexOf(" STAT "); 594 //task.getOutput().indexOf(" QUEUE "); 595 // task.getOutput().indexOf("\n"); 596 597 return (task.getOutput().substring(1 + task.getOutput().indexOf("\n") + task.getOutput().indexOf(" STAT "), task.getOutput().indexOf("\n") + task.getOutput().indexOf(" QUEUE ")).trim()); 598 //System.out.println("Killed"); 599 } 600 601 commmandOutput = task.getOutput(); 602 } 603 catch (Exception e) { System.out.print("bjobs failed" + e); 604 System.out.print(commmandOutput); 605 } 606 try { Thread.sleep(msBtwnFailure);} 607 catch (Exception e1) {System.out.print("bjobs failed");} 608 if(!success) System.out.print("/"); 609 attempt++; 610 } 611 612 // } 613 614 return "bjobs failed"; 615 } 616 617 /** 618 * Sets the object initialized that writes the CSH script. 619 * @param application The initialized CSH writer object 620 **/ 621 public void setApplication(CSHApplication application){ 622 this.application = application; 623 } 624 625 /** recovers the initialized object that writes the CSH application 626 * @return the object that writes the csh script **/ 627 public CSHApplication getApplication(){ 628 return application; 629 } 630 public volatile boolean run = true; 631 public void stop(){ run = false; } 632 633 private LSFNodePriorityStringGenerator nodePriorityStringGenerator; 634 /** 635 *Set the object used to generate the node priority string. 636 **/ 637 public void setNodePriorityStringGenerator(LSFNodePriorityStringGenerator nodePriorityStringGenerator){this.nodePriorityStringGenerator = nodePriorityStringGenerator;} 638 public LSFNodePriorityStringGenerator getNodePriorityStringGenerator(){ return nodePriorityStringGenerator; } 639 640 641 642 643 644 String[] bqueuesLines = {}; 645 /** 646 *Test the queue get an indication if job can be submitted successfully to the queue. 647 *@param queue The queue object to be tested 648 *@return Ture if the job will most likely run, and false if the job will most likely fail. 649 **/ 650 public boolean test(Queue queue){ 651 652 if(bqueuesLines.length == 0){ //only run the command one time 653 if(! runInTimeLimitedThread("bqueues" , maxElapseTime, msBtwnFailure, 5 )){ 654 return false; 655 } 656 else{ 657 bqueuesLines = super.threadOuput.split("\n"); 658 } 659 } 660 661 if(bqueuesLines.length == 0) return false; //just in case notting was returnned; 662 663 for(int i = 0; i < bqueuesLines.length; i++){ //bqueuesLines[i] = bqueuesLines[i].replace('\n', ' '); 664 if(bqueuesLines[i].matches( ".*" + queue.getName() + ".*Open.Active.*")){ 665 return true; 666 } 667 } 668 669 return false; 670 671 } 672 673 674 }