001 /* 002 * LSFDispatcher.java 003 * 004 * Created on December 23, 2002, 11:30 AM 005 * 006 * This file is part of the STAR Scheduler. 007 * Copyright (c) 2002-2006 STAR Collaboration - Brookhaven National Laboratory 008 * 009 * STAR Scheduler is free software; you can redistribute it and/or modify 010 * it under the terms of the GNU General Public License as published by 011 * the Free Software Foundation; either version 2 of the License, or 012 * (at your option) any later version. 013 * 014 * STAR Scheduler is distributed in the hope that it will be useful, 015 * but WITHOUT ANY WARRANTY; without even the implied warranty of 016 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 017 * GNU General Public License for more details. 018 * 019 * You should have received a copy of the GNU General Public License 020 * along with STAR Scheduler; if not, write to the Free Software 021 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA 022 */ 023 package gov.bnl.star.offline.scheduler.Dispatchers.sge; 024 025 import gov.bnl.star.offline.scheduler.Dispatchers.lsf.*; 026 import gov.bnl.star.offline.scheduler.ComponentLibrary; 027 import gov.bnl.star.offline.scheduler.Dispatcher; 028 import gov.bnl.star.offline.scheduler.Queue; 029 import gov.bnl.star.offline.scheduler.Job; 030 import gov.bnl.star.offline.scheduler.request.Request; 031 import gov.bnl.star.offline.scheduler.catalog.PhysicalFile; 032 import gov.bnl.star.offline.scheduler.util.CSHCommandLineTask; 033 //import gov.bnl.star.offline.scheduler.util.StatisticsRecorder; //Moved Statistics recording to Scheduler.java LH 034 import gov.bnl.star.offline.scheduler.util.GenericResourceRequirementStringDefinition; 035 import gov.bnl.star.offline.scheduler.Dispatchers.AbstractResourceStrategy; 036 037 import gov.bnl.star.offline.scheduler.util.FilesystemToolkit; 038 039 import java.util.*; 040 //import java.util.List; 041 import java.util.logging.Level; 042 import org.apache.log4j.Logger; 043 044 045 046 /** Dispatches a jobs using SGE. 047 * <p> 048 * For each process, two files are created: a script for the execution and 049 * a text file containing the file list. The script basically sets the 050 * environment variables and executes the command line. The file list 051 * contains the input file requested, one full path for each line in the list. 052 * <p> 053 * Each script is submitted through bsub. 054 * <p> 055 * The simulation flag will make the scheduler not actually execute the command 056 * lines. Therefore scripts and fileLists are created, but the bsub and chmod 057 * commands are not executed. Log and output won't be affected, except that there 058 * will be a message warning that the submission is simulated. 059 * 060 * @author Levente Hajdu, Jerome Lauret 061 * @version 1.0 2002/12/26 062 */ 063 public class SGEDispatcher extends LSFDispatcher implements Dispatcher { 064 065 static private Logger log = Logger.getLogger(SGEDispatcher.class.getName()); 066 067 // private String resourceStrategy; 068 // protected String scratchDir; 069 private String bsubEx; 070 protected boolean simulation = false; 071 // private String queueName; 072 private String bsubOptions; 073 private int maxBsubAttempts; 074 private int msBtwnSuccess; 075 private int msBtwnFailure; 076 private String ResReqDefinitionObj; 077 078 public void setResourceRequirementStringDefinition(String ResReqDefinitionObj){ 079 this.ResReqDefinitionObj = ResReqDefinitionObj; 080 081 } 082 083 084 // public String getScratchDir() { 085 // return scratchDir; 086 // } 087 088 // public void setScratchDir(String scratchDir) { 089 // this.scratchDir = scratchDir; 090 // } 091 092 public String getQsubEx() { 093 return bsubEx; 094 } 095 096 public void setQsubEx(String bsubEx) { 097 this.bsubEx = bsubEx; 098 } 099 100 101 102 // public String getQueueName() { 103 // return queueName; 104 // } 105 106 // public void setQueueName(String queueName) { 107 // this.queueName = queueName; 108 // } 109 110 // public String getBsubOptions() { 111 // return bsubOptions; 112 // } 113 114 // public void setBsubOptions(String bsubOptions) { 115 // this.bsubOptions = bsubOptions; 116 // } 117 118 public int getMaxAttempts() { 119 return maxBsubAttempts; 120 } 121 122 public void setMaxAttempts(int maxAttempts) { 123 this.maxBsubAttempts = maxAttempts; 124 } 125 126 public int getMsBtwnSuccess() { 127 return msBtwnSuccess; 128 } 129 130 public void setMsBtwnSuccess(int msBtwnSuccess) { 131 this.msBtwnSuccess = msBtwnSuccess; 132 } 133 134 public int getMsBtwnFailure() { 135 return msBtwnFailure; 136 } 137 138 public void setMsBtwnFailure(int msBtwnFailure) { 139 this.msBtwnFailure = msBtwnFailure; 140 } 141 142 protected boolean reportedFailure; 143 protected CSHApplication application; 144 private String resSwitch; 145 146 void getGenericResourceRequirementStringDefinition(){ 147 148 } 149 150 //GenericResourceRequirementStringDefinition 151 152 /** Creates the scripts and dispatches the job on the target machine. 153 * @param request the job request 154 */ 155 156 public void dispatch(Request request, List jobs) { 157 log.info("Dispatching using LSF: \"" + request.getCommand() + "\""); 158 159 // Enables the simulation mode if necessary 160 useSimulationMode(request.getSimulation()); 161 162 reportedFailure = false; 163 164 // Submits from the higher to the lower JobID. This way the 165 // user has a feel of when the last job is going to be 166 // submitted 167 for (int nJob = jobs.size() - 1; nJob >= 0; 168 nJob--) { 169 Job job = (Job) jobs.get(nJob); 170 171 System.out.print("Dispatching process " + 172 job.getJobID() + "."); 173 dispatch(request, job); 174 if (getClusterName() != null) job.setCluster(getClusterName()); 175 } 176 177 //StatisticsRecorder.getIntance().recordStatistics(request, jobs); //Moved Statistics recording to Scheduler.java LH 178 } 179 180 /* Enables or disables the simulation mode. The simulation mode will deactivate 181 * every command line execution. 182 */ 183 184 185 public void useSimulationMode(boolean simulation) { 186 this.simulation = simulation; 187 188 if (simulation) { 189 // Warn the user that we are entering simulated submission mode 190 log.warn("Simulating submission"); 191 System.out.println("Simulating submission"); 192 } 193 } 194 195 196 197 /* Dispatches a single process of a job request. 198 */ 199 200 protected void dispatch(Request request, Job job) { 201 202 //application = (CSHApplication) ComponentLibrary.getInstance().getComponent("CSHApplication"); 203 204 //No long get the CSHApplication object from the config file. Get it using the setApplication() and getApplication() via the config file 205 if(application == null){ //If this was not set in the config file or if we'er in junit testing mode then print an error, and use the default file 206 System.out.println(" >>>>>>>>>>>> CSHapplacation is not is italized."); 207 String notSet = "The CSHApplication for this dispatcher was not set in the config file. If this is a jUnit test this is normal. Finding default \"CSHApplication\" in ComponentLibrary."; 208 log.warn(notSet); 209 System.out.println(notSet); 210 application = (CSHApplication) ComponentLibrary.getInstance().getComponent("CSHApplication"); 211 } 212 213 214 // TODO: all the parameters should be passed in one go 215 application.setJob(request, job); 216 //application.setScratchDir(scratchDir); 217 application.setSubmissionCommand(getBsubCommand(request, job)); 218 219 application.prepareJob(); 220 221 log.info("Executing \"" + getBsubCommand(request, job) + "\""); 222 223 if (!simulation) { 224 //System.out.println("!Not simulation"); //used for debuginh 225 try { 226 Thread.sleep(msBtwnSuccess); 227 } catch (Exception e) { 228 } 229 230 long StarTime = System.currentTimeMillis(); 231 int attempt = 0; 232 boolean success = false; 233 String pe=""; 234 235 while (!success && (attempt < maxBsubAttempts)) { 236 try { 237 CSHCommandLineTask task = new CSHCommandLineTask(getBsubCommand( 238 request, job), true, getMaxElapseTime()); 239 task.execute(); 240 241 if (task.getExitStatus() != 0) { 242 log.warn(getQsubEx() + " failed: " + task.getOutput()); 243 Thread.sleep(msBtwnFailure); 244 System.out.print("/"); 245 attempt++; 246 } else { 247 success = true; 248 job.DispatchSuccessful(); 249 job.AddProcesseID(task.getOutput().substring(task.getOutput().indexOf("your job") + 8 ,task.getOutput().indexOf('(') - 1).trim()); 250 job.setDispatchTime(((int) Math.min(System.currentTimeMillis() - StarTime, java.lang.Integer.MAX_VALUE))); 251 //System.out.println("\n\npid=" + task.getOutput() + "\n" ); 252 //System.out.println("\n\npid=" + ((String) job.getProcesseIDs().get(0)) + "\n" ); 253 254 255 } 256 } catch (Exception e) { 257 log.error("Couldn't submit the script to SGE", e); 258 // JL: write also to STDOUT to help users debug what happened 259 if (pe == ""){ 260 pe = e.toString(); 261 System.out.print("Couldn't submit the script to SGE" + pe); 262 } 263 try { 264 Thread.sleep(msBtwnFailure); 265 } catch (Exception e1) { 266 } 267 268 System.out.print("/"); 269 attempt++; 270 } 271 } 272 273 if (success) { 274 System.out.println(" done."); 275 } else { 276 System.out.println(" FAILED!!"); 277 } 278 } else { 279 System.out.println(" simulated."); 280 } 281 } 282 283 String getBsubCommand(Request request, Job job){ 284 return getQsubCommand(request,job); 285 } 286 287 288 /** Returns the full qsub command to be executed to dispatch the process. This 289 * command must executed in the directory in which the script resides. 290 * @return the bsub command 291 */ 292 String getQsubCommand(Request request, Job job) { 293 294 //StringBuffer qsub = new StringBuffer(getQsubEx()); 295 StringBuffer qsub = new StringBuffer("cd " + FilesystemToolkit.getCurrentDirectory() + "; " + bsubEx); 296 297 298 if (application.getJobName() != null) { 299 qsub.append(" -N '").append(application.getJobName()).append("'"); 300 } 301 302 if (application.getStdout() != null) { 303 qsub.append(" -o ").append(application.getStdout()); 304 } 305 306 if (application.getStderr() != null) { 307 qsub.append(" -e ").append(application.getStderr()); 308 }else{ 309 qsub.append(" -j y"); 310 } 311 312 //fills in the -l resources (if any) 313 int minMem = request.getResource("Memory").getMin(); 314 if (getResourceUsageSwitch(job) != null 315 || (job.getTarget() != null) 316 || (minMem != -1)) { 317 qsub.append(" -l "); 318 if(getResourceUsageSwitch(job) != null)qsub.append(getResourceUsageSwitch(job).replace(':', ',')).append(","); 319 if(job.getTarget() != null) qsub.append("hostname=").append(job.getTarget()).append(","); 320 if(minMem != -1) qsub.append("h_vmem=").append(minMem).append("M,"); 321 int minStorage = request.getResource("StorageSpace").getMin(); 322 if(minStorage != -1) qsub.append("scratchfree=").append(minStorage).append("M,"); 323 //remore the last ',' 324 qsub.deleteCharAt(qsub.length() - 1); 325 } 326 327 int maxMem = request.getResource("Memory").getMax(); 328 if (maxMem != -1) { 329 System.out.println("\nWarning your request for MaxMemory="+ maxMem + "MB will not be honored by SGEDispatcher !!!!\n"); 330 log.warn("\nWarning your request for MaxMemory="+ maxMem + "MB will not be honored by SGEDispatcher !!!!\n"); 331 } 332 333 int maxStorage = request.getResource("StorageSpace").getMax(); 334 if (maxStorage != -1) { 335 System.out.println("\nWarning your request for MaxStorageSpace=" + maxStorage + "MB will not be honored by SGEDispatcher !!!!\n"); 336 log.warn("\nWarning your request for MaxStorageSpace=" + maxStorage + "MB will not be honored by SGEDispatcher !!!!\n"); 337 } 338 339 340 341 if(job.getQueueObj().getName() != null){ 342 if(job.getQueueObj().getName().trim().length() != 0 ) qsub.append(" -q ").append(job.getQueueObj().getName()); 343 } 344 345 346 if(getQsubOptions() != null) qsub.append(" " + getQsubOptions()); 347 348 349 350 351 /* 352 The path is being dropped because different drivers for mounting the PanFS path are reporting the path differently. Instated we will put "cd $path" before the command we execute. 353 Example: 354 /pdirect/star+institutions/data05/scratch/lbhajdu/ 355 /star/data05/scratch/lbhajdu/ 356 */ 357 qsub.append(' ').append(application.getCSHScriptFileName()); 358 //qsub.append(' ').append(application.getCommandLine()); 359 360 361 //This is used if you need to see the command 362 //System.out.println("\n" + qsub.toString() + "\n"); 363 364 365 return qsub.toString(); 366 } 367 368 369 private List sgeResourceStrategy = new ArrayList(); 370 public List getResourceStrategyList(){ return sgeResourceStrategy; } 371 public void setResourceStrategyList(List sgeResourceStrategy){ this.sgeResourceStrategy = sgeResourceStrategy; } 372 public void addResourceStrategy(AbstractResourceStrategy resourceStrategy){sgeResourceStrategy.add(resourceStrategy);} 373 374 /** Biuld an SGE resource usage switch for this job to be appanded to the submitting comaand 375 * @param job job to biuld the resource uisage string from 376 * @return SGE resource usage switch for this job 377 **/ 378 protected String getResourceUsageSwitch(Job job) { 379 resSwitch = null; 380 381 //FIXME: cache value 382 //if (getResourceStrategy() == null) {return null;} 383 if (sgeResourceStrategy.isEmpty()) {return null;} 384 385 //resSwitch = getResourceStrategy().prepareResourceUsageSwitch(job); 386 387 //Run the jobs though all of the resource strategies and concatenate all the strings they output together. 388 for(int i = 0; i != sgeResourceStrategy.size(); i++){ 389 AbstractResourceStrategy sgeStrategy = (AbstractResourceStrategy) sgeResourceStrategy.get(i); 390 391 String resSwichFrag = sgeStrategy.prepareResourceUsageSwitch(job); 392 393 if((resSwitch == null)&&(resSwichFrag != null)){ 394 resSwitch = resSwichFrag; 395 } 396 else if((resSwitch != null)&&(resSwichFrag != null)){ 397 resSwitch = resSwitch.concat(","+resSwichFrag); 398 } 399 } 400 return resSwitch; 401 } 402 403 404 /** Set the class that writes the sricpt that will be executed by the batch system */ 405 public void setApplication(CSHApplication application){ 406 this.application = application; 407 } 408 409 /** Get the class that writes the sricpt that will be executed by the batch system */ 410 public CSHApplication getApplication(){ 411 return application; 412 } 413 414 415 public void Kill(Request request, List jobs) { 416 417 //System.out.println("lsfr kill"); 418 419 for(int z=0; z != jobs.size(); z++){ 420 Job job = (Job)jobs.get(z); 421 422 //System.out.println("working no job : " + job.getJobID()); 423 424 if(job.getProcesseIDs().size() == 0){ 425 System.out.println("No ProcesseIDs found for job " + job.getJobID()); 426 jobs.remove(z); 427 z--; 428 429 } 430 else{ 431 for(int i=0; job.getProcesseIDs().size() != i; i++){ 432 433 int attempt = 0; 434 boolean success = false; 435 String commmandOutput = ""; 436 System.out.print("ProcesseID: <" + job.getProcesseIDs().get(i) + "> of Job: <" + job.getJobID() + ">"); 437 438 while (!success && (attempt < maxBsubAttempts)) { 439 try { 440 CSHCommandLineTask task = new CSHCommandLineTask("qdel " + ((String) job.getProcesseIDs().get(i)) , true, getMaxElapseTime()); 441 task.execute(); 442 if (task.getExitStatus() != 0) { 443 log.warn("qdel " + task.getOutput()); 444 Thread.sleep(msBtwnFailure); 445 if(task.getOutput().lastIndexOf("does not exist") != -1) success = true; 446 System.out.print(task.getOutput()); 447 attempt++; 448 } 449 else{ 450 success = true; 451 System.out.println("Killed"); 452 } 453 454 commmandOutput = task.getOutput(); 455 } 456 catch (Exception e) { System.out.print("qdel failed" + e); 457 System.out.print(commmandOutput); 458 } 459 try { Thread.sleep(msBtwnFailure);} 460 catch (Exception e1) {System.out.print("qdel failed");} 461 if(!success) System.out.print("/"); 462 attempt++; 463 } 464 465 } 466 job.clearProcesseIDs(); 467 jobs.remove(z); 468 z--; 469 } 470 } 471 } 472 473 public String Status(Job job, int Processe) { 474 475 if(job.getProcesseIDs().size() == 0) return "No ProcesseIDs found for job " + job.getJobID(); 476 if(job.getProcesseIDs().size() < Processe) return job.getJobID() + " only has " + job.getProcesseIDs().size() + "processes, processe " + Processe + "dose not exist."; 477 478 479 // for(int i=0; job.getProcesseIDs().size() != i; i++){ 480 481 int attempt = 0; 482 boolean success = false; 483 String commmandOutput = ""; 484 System.out.print("ProcesseID: <" + job.getProcesseIDs().get(Processe) + "> of Job: <" + job.getJobID() + ">"); 485 486 while (!success && (attempt < maxBsubAttempts)) { 487 try { 488 CSHCommandLineTask task = new CSHCommandLineTask("qstat | grep '^ " + ((String) job.getProcesseIDs().get(Processe)) + " '" +"; echo text" , true, getMaxElapseTime()); 489 task.execute(); 490 if (task.getExitStatus() != 0) { 491 log.warn("qstat " + task.getOutput()); 492 Thread.sleep(msBtwnFailure); 493 494 // if(task.getOutput().lastIndexOf("already finished") != -1) success = true; 495 //return (task.getOutput().replace('\n',' '); 496 attempt++; 497 } 498 else{ 499 success = true; 500 501 //task.getOutput().indexOf(" STAT "); 502 //task.getOutput().indexOf(" QUEUE "); 503 // task.getOutput().indexOf("\n"); 504 if(task.getOutput().length() < 50) return "No data avalable "; 505 else 506 return (task.getOutput().substring(38,42).trim()); 507 //System.out.println("Killed"); 508 } 509 510 commmandOutput = task.getOutput(); 511 } 512 catch (Exception e) { System.out.print("qstat failed" + e); 513 System.out.print(commmandOutput); 514 } 515 try { Thread.sleep(msBtwnFailure);} 516 catch (Exception e1) {System.out.print("qstat failed");} 517 if(!success) System.out.print("/"); 518 attempt++; 519 } 520 521 // } 522 523 return "No data avalable "; 524 } 525 526 public void retrieveOutput(Request job, List jobs) { 527 } 528 529 530 private String qsubOptions; 531 public String getQsubOptions() { return qsubOptions;} 532 public void setQsubOptions(String qsubOptions) { this.qsubOptions = qsubOptions;} 533 534 535 String qstat = ""; 536 /** 537 * Runs test(s) on underlying components to determine if submitting jobs should be attempted. 538 * @param queue The queue object to be tested 539 * @return Will return true to indicate everything is alright and false if the test has failed 540 */ 541 public boolean test(Queue queue){ 542 543 544 if(qstat.length() == 0){ //only run the command one time 545 if(! runInTimeLimitedThread("qstat -g c" , getMaxElapseTime(), msBtwnFailure, 5 )){ 546 return false; 547 } 548 else{ 549 //System.out.println(super.threadOuput); //show the output for debugging 550 qstat = super.threadOuput.replace('\n', ' '); //keep everything on one line 551 } 552 } 553 554 if(qstat.length() == 0) return false; //just in case notting was returnned; 555 if(qstat.matches(".* [123456789]* .*")){ //find any non zero digit in bewteen speces. This will find eider n available or n used nodes, in both cases the site is alright. 556 557 return true; 558 } 559 560 return false; 561 562 } 563 564 565 }