001 /* 002 * CondorGPBSDispatcher.java 003 * 004 * Created on June 8, 2004, 11:17 AM 005 * 006 * This file is part of the STAR Scheduler. 007 * Copyright (c) 2002-2006 STAR Collaboration - Brookhaven National Laboratory 008 * 009 * STAR Scheduler is free software; you can redistribute it and/or modify 010 * it under the terms of the GNU General Public License as published by 011 * the Free Software Foundation; either version 2 of the License, or 012 * (at your option) any later version. 013 * 014 * STAR Scheduler is distributed in the hope that it will be useful, 015 * but WITHOUT ANY WARRANTY; without even the implied warranty of 016 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 017 * GNU General Public License for more details. 018 * 019 * You should have received a copy of the GNU General Public License 020 * along with STAR Scheduler; if not, write to the Free Software 021 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA 022 */ 023 package gov.bnl.star.offline.scheduler.Dispatchers.condorg; 024 025 import gov.bnl.star.offline.scheduler.*; 026 import gov.bnl.star.offline.scheduler.request.Request; 027 import gov.bnl.star.offline.scheduler.Queue; 028 import gov.bnl.star.offline.scheduler.Dispatchers.lsf.CSHApplication; 029 import gov.bnl.star.offline.scheduler.Dispatchers.lsf.LSFDispatcher; 030 import gov.bnl.star.offline.scheduler.util.CSHCommandLineTask; 031 import gov.bnl.star.offline.scheduler.util.FilesystemToolkit; 032 //import gov.bnl.star.offline.scheduler.util.StatisticsRecorder; 033 import gov.bnl.star.offline.scheduler.util.sandbox.Sandbox; 034 035 import java.io.File; 036 import java.io.FileOutputStream; 037 import java.io.PrintStream; 038 import java.util.*; 039 040 import java.util.logging.Level; 041 import org.apache.log4j.Logger; 042 043 044 /** Dispatches jobs using Condor-G on a remote site that uses PBS. 045 * It will NOT use extra rsl attributes for PBS. If needed they will 046 * be added later. 047 * @author Alex Withers, Levente Hajdu 048 * @version 1.0 2004/06/08 049 */ 050 public class CondorGRSLDispatcher extends LSFDispatcher { 051 static private Logger log = Logger.getLogger(CondorGRSLDispatcher.class.getName()); 052 053 private static String condorEx; 054 protected CSHApplication application; 055 056 057 private boolean useRSL = false; 058 /* This RSL hack can be turned on to pass additional data to the batch system when submitting via condorG, However the patch must be in place or else submitting with this property turned on will fail. Please see http://www.star.bnl.gov/STAR/comp/Grid/scheduler/dev/lsfPatch.html for additional information. */ 059 public void setUseRSL(boolean useRSL){this.useRSL = useRSL;}; 060 public boolean getUseRSL(){return useRSL;}; 061 062 063 public void setCondorEx(String condorEx) { this.condorEx = condorEx; } 064 065 public String getCondorEx() { return condorEx; } 066 067 /** Creates a new dispatcher */ 068 public CondorGRSLDispatcher() { 069 } 070 071 private String condorGOptions; 072 public String getCondorGOptions(){ return condorGOptions; } 073 public void setCondorGOptions(String condorGOptions){ this.condorGOptions = condorGOptions; } 074 075 /** Creates the scripts and dispatches the job on the target machine. 076 * @param request the job request 077 */ 078 public void dispatch(Request request, List jobs) { 079 log.info("Dispatching using Condor-g and LSF: \"" + request.getCommand() + 080 "\""); 081 082 // Enables the simulation mode if necessary 083 useSimulationMode(request.getSimulation()); 084 reportedFailure = false; 085 086 // Submits from the higher to the lower JobID. This way the 087 // user has a feel of when the last job is going to be 088 // submitted 089 for (int nProcess = jobs.size() - 1; nProcess >= 0; 090 nProcess--) { 091 Job job = (Job) jobs.get(nProcess); 092 093 System.out.print("Dispatching process " + 094 job.getJobID() + "."); 095 dispatch(request, job); 096 } 097 098 //StatisticsRecorder.getInstance().recordStatistics(request, jobs); //removed and moved to frame-work 099 } 100 101 protected void dispatch(Request request, Job job) { 102 //application = (CSHApplication) ComponentLibrary.getInstance().getComponent("CSHApplication"); 103 104 //No long get the CSHApplication object from the config file. Get it using the setApplication() and getApplication() via the config file 105 if(application == null){ //If this was not set in the config file or if we'er in junit testing mode then print an error, and use the default file 106 System.out.println(" >>>>>>>>>>>> CSHapplacation is not is italized."); 107 String notSet = "The CSHApplication for this dispatcher was not set in the config file. If this is a jUnit test this is normal. Finding default \"CSHApplication\" in ComponentLibrary."; 108 log.warn(notSet); 109 System.out.println(notSet); 110 application = (CSHApplication) ComponentLibrary.getInstance().getComponent("CSHApplication"); 111 } 112 113 // TODO: all the parameters should be passed in one go 114 application.setJob(request, job); 115 // application.setScratchDir(scratchDir); 116 application.setSubmissionCommand(getCondorGCommand(request, job)); 117 118 application.prepareJob(); 119 prepareClassAd(request, job); 120 121 log.info("Executing \"" + getCondorGCommand(request, job) + "\""); 122 123 if (!simulation) { 124 try { 125 Thread.sleep(getMsBtwnSuccess()); 126 } catch (Exception e) { 127 } 128 129 long StarTime = System.currentTimeMillis(); 130 int attempt = 0; 131 boolean success = false; 132 133 while (!success && (attempt < getMaxAttempts())) { 134 try { 135 CSHCommandLineTask task = new CSHCommandLineTask(getCondorGCommand( 136 request, job), true, 30000); 137 task.execute(); 138 139 if (task.getExitStatus() != 0) { 140 log.warn("bsub failed: " + task.getOutput()); 141 Thread.sleep(getMsBtwnFailure()); 142 System.out.print("/"); 143 attempt++; 144 } else { 145 success = true; 146 job.DispatchSuccessful(); 147 job.AddProcesseID(task.getOutput().substring(task.getOutput().indexOf("submitted to cluster") + 20, task.getOutput().length()).replace('.', ' ').trim()); 148 job.setDispatchTime(((int) Math.min(System.currentTimeMillis() - StarTime, java.lang.Integer.MAX_VALUE))); 149 } 150 } catch (Exception e) { 151 log.fatal("Couldn't submit the script to Condor-g", e); 152 153 try { 154 Thread.sleep(getMsBtwnFailure()); 155 } catch (Exception e1) { 156 } 157 158 System.out.print("/"); 159 attempt++; 160 } 161 } 162 163 if (success) { 164 System.out.println(" done."); 165 } else { 166 System.out.println(" FAILED!!"); 167 } 168 } else { 169 System.out.println(" simulated."); 170 } 171 } 172 173 /** Returns the command line to submit the job through condor-g. 174 * @param request the request that originated the job 175 * @param job the job to be dispatched 176 * @return the commandline to submit the job 177 */ 178 protected String getCondorGCommand(Request request, Job job) { 179 //The "cd" command was added here, just incase the dir is "somehow" chaged while running 180 return "cd " + FilesystemToolkit.getCurrentDirectory() + "; " + condorEx + " " + getClassAdName(request, job); 181 //return condorEx + " " + getClassAdName(request, job); 182 } 183 184 /** Returns the name of the file containing the class ad. Class ad is the job 185 * description required by condor to submit a job. 186 * @param request the request that originated the job 187 * @param job the job to be submitted 188 * @return the file name of the class ad 189 */ 190 protected String getClassAdName(Request request, Job job) { 191 return "sched" + job.getJobID() + ".condorg"; 192 } 193 194 private void prepareClassAd(Request request, Job job) { 195 196 try { 197 198 PrintStream classAd = new PrintStream(new FileOutputStream( new File(getClassAdName(request, job)))); 199 createClassAd(request, job, classAd); 200 201 } catch (Exception e) { 202 log.fatal("Couldn't create the class ad", e); 203 throw new RuntimeException("Couldn't create the class ad " + 204 getClassAdName(request, job) + ": " + e.getMessage()); 205 } 206 } 207 208 private void createClassAd(Request request, Job job, PrintStream classAd) { 209 210 211 classAd.print("executable = "); 212 classAd.println(getExecutable()); 213 214 if (getArguments() != null) { 215 classAd.print("arguments = "); 216 classAd.println(getArguments()); 217 } 218 219 classAd.print("globusscheduler = "); 220 221 // if(job.getQueueObj().getBatchSystem().getGatekeeper() != null){ 222 // classAd.println(job.getQueueObj().getBatchSystem().getGatekeeper().getName() + "/jobmanager-" + job.getQueueObj().getBatchSystem().getName()); 223 // } 224 225 String batchSystem = job.getAccessMethod().getBatchSystem(); 226 String gateKeeper = ((GateKeeperAccessPoint) job.getAccessPoint()).getName(); 227 if((batchSystem != null) && (gateKeeper != null)){ 228 classAd.println(gateKeeper + "/jobmanager-" + batchSystem); 229 }else{ 230 System.out.println("Could not find gatekeeper for this batch sytem, please check config file"); 231 log.fatal("Could not find gatekeeper for this batch sytem, please check config file"); 232 throw new RuntimeException("Could not find gatekeeper for this batch sytem, please check config file"); 233 } 234 235 if (application.getStdin() != null) { 236 classAd.print("input = "); 237 classAd.println(application.getStdin()); 238 } 239 240 if (application.getStdout() != null) { 241 classAd.print("output = "); 242 classAd.println(application.getStdout()); 243 } 244 245 if (application.getStderr() != null) { 246 classAd.print("error = "); 247 classAd.println(application.getStderr()); 248 } 249 250 classAd.print("log = "); 251 classAd.println(getLogName(job)); 252 253 if (getRemoteDirectory() != null) { 254 classAd.print("remote_initialdir = "); 255 classAd.println(getRemoteDirectory()); 256 } 257 258 Sandbox sandbox = request.getSandbox(); 259 if(sandbox != null){ 260 List sandboxedFiles = sandbox.getSandboxedFiles(); //get the list of files that need to be copyed 261 if(sandboxedFiles != null){ 262 if(sandboxedFiles.size() > 0){ 263 classAd.println("WhenToTransferOutput = ON_EXIT"); 264 classAd.print("transfer_input_files = "); 265 for(int i = 0; i != sandboxedFiles.size(); i++){ 266 classAd.print(sandboxedFiles.get(i)); //write out each file. 267 if((i + 1) != sandboxedFiles.size()) classAd.print(","); 268 } 269 classAd.println(); 270 } 271 } 272 } 273 274 /* This is basically the main difference from 275 * CondorGLSFDispatcher.java. No globus-rsl stuff. 276 * -- Alex Withers 277 */ 278 279 if(getUseRSL()){ 280 281 classAd.print("globusrsl ="); 282 283 if (job.getQueue() != null) { 284 classAd.print(" (queue = "); 285 classAd.print(job.getQueue()); 286 classAd.print(")"); 287 } 288 289 if(getUseLSFMod()){ 290 if (job.getTarget() != null) { 291 classAd.print(" (xlsfmachine = "); 292 classAd.print(job.getTarget()); 293 classAd.print(")"); 294 } 295 296 if (application.getJobName() != null) { 297 classAd.print(" (xlsfjobname = "); 298 classAd.print(application.getJobName()); 299 classAd.print(")"); 300 } 301 302 if (request.getMail()) { 303 classAd.print(" (xlsfmailreport = "); 304 classAd.print("false"); 305 classAd.print(")"); 306 } else { 307 classAd.print(" (xlsfmailreport = "); 308 classAd.print("true"); 309 classAd.print(")"); 310 } 311 312 if (getResourceUsageSwitch(job) != null) { 313 classAd.print(" (xlsfresources = "); 314 classAd.print(getResourceUsageSwitch(job)); 315 classAd.print(")"); 316 } 317 } 318 319 320 classAd.println(); 321 } 322 323 324 if (isTransferExecutable()) { 325 classAd.println("transfer_executable = true"); 326 } else { 327 classAd.println("transfer_executable = false"); 328 } 329 330 classAd.println("notification = never"); 331 classAd.println("universe = globus"); 332 333 if (getCondorGOptions() != null) { 334 classAd.println(getCondorGOptions()); 335 } 336 337 classAd.println("queue"); 338 339 } 340 341 private String getExecutable() { 342 343 344 if (application.getCommandLine().indexOf(' ') == -1) { 345 return application.getCommandLine(); 346 } 347 348 return application.getCommandLine().substring(0, application.getCommandLine().indexOf(' ')); 349 } 350 351 private String getArguments() { 352 if (application.getCommandLine().indexOf(' ') == -1) { 353 return null; 354 } 355 356 return application.getCommandLine().substring(application.getCommandLine().indexOf(' ') + 1); 357 } 358 359 private String getLogName(Job job) { 360 // TODO maybe log filename should be put as a general property of Process (as stds) 361 return "sched" + job.getJobID() + ".condorg.log"; 362 } 363 /* 364 private String getGlobusScheduler() { 365 //TODO make it flexible 366 return getGlobusGatekeeper(); 367 } 368 369 private String gatekeeper; 370 */ 371 /** Holds value of property transferExecutable. */ 372 private boolean transferExecutable; 373 /* 374 public void setGlobusGatekeeper(String gatekeeper) { 375 this.gatekeeper = gatekeeper; 376 } 377 378 public String getGlobusGatekeeper() { 379 return gatekeeper; 380 } 381 */ 382 private String remoteInitialDir; 383 384 public void setRemoteInitialDir(String remoteInitialDir) { 385 this.remoteInitialDir = remoteInitialDir; 386 } 387 388 public String getRemoteInitialDir() { 389 return remoteInitialDir; 390 } 391 392 private String getRemoteDirectory() { 393 // TODO this has to be specified better: remote execution directory could be different from scheduler execution directory 394 if (".".equals(getRemoteInitialDir())) return FilesystemToolkit.getCurrentDirectory(); 395 return getRemoteInitialDir(); 396 } 397 398 protected String getResourceUsageSwitch(Job job) { 399 String res = super.getResourceUsageSwitch(job); 400 if (res == null) return res; 401 402 return res.replaceAll("\"", "\\\\\""); 403 } 404 405 /** Getter for property transferExecutable. 406 * @return Value of property transferExecutable. 407 * 408 */ 409 public boolean isTransferExecutable() { 410 return this.transferExecutable; 411 } 412 413 /** Setter for property transferExecutable. 414 * @param transferExecutable New value of property transferExecutable. 415 * 416 */ 417 public void setTransferExecutable(boolean transferExecutable) { 418 this.transferExecutable = transferExecutable; 419 } 420 421 /** Set the class that writes the sricpt that will be executed by the batch system */ 422 public void setApplication(CSHApplication application){ 423 this.application = application; 424 } 425 426 /** Get the class that writes the sricpt that will be executed by the batch system */ 427 public CSHApplication getApplication(){ 428 return application; 429 } 430 431 432 public void Kill(Request request, List jobs) { 433 //System.out.println("condor kill"); 434 435 for(int z=0; z != jobs.size(); z++){ 436 Job job = (Job)jobs.get(z); 437 438 if(job.getProcesseIDs().size() == 0){ 439 System.out.println("No ProcesseIDs found for job " + job.getJobID()); 440 jobs.remove(z); 441 z--; 442 } 443 else{ 444 for(int i=0; job.getProcesseIDs().size() != i; i++){ 445 446 int attempt = 0; 447 boolean success = false; 448 String commmandOutput = ""; 449 System.out.print("ProcesseID: <" + job.getProcesseIDs().get(i) + "> of Job: <" + job.getJobID() + ">"); 450 451 while (!success && (attempt < getMaxAttempts())) { 452 try { 453 CSHCommandLineTask task = new CSHCommandLineTask("condor_rm " + ((String) job.getProcesseIDs().get(i)) , true, getMaxElapseTime()); 454 task.execute(); 455 if (task.getExitStatus() != 0) { 456 log.warn("condor_rm " + task.getOutput()); 457 Thread.sleep(getMsBtwnFailure()); 458 if(task.getOutput().lastIndexOf("Couldn't find") != -1) success = true; 459 System.out.print(task.getOutput()); 460 attempt++; 461 } 462 else{ 463 success = true; 464 System.out.println("Killed"); 465 } 466 467 commmandOutput = task.getOutput(); 468 } 469 catch (Exception e) { System.out.print("condor_rm failed" + e); 470 System.out.print(commmandOutput); 471 } 472 try { Thread.sleep(getMsBtwnFailure());} 473 catch (Exception e1) {System.out.print("condor_rm failed");} 474 if(!success) System.out.print("/"); 475 attempt++; 476 } 477 478 } 479 job.clearProcesseIDs(); 480 jobs.remove(z); 481 z--; 482 } 483 } 484 } 485 486 public String Status(Job job, int Processe) { 487 if(job.getProcesseIDs().size() == 0) return "No ProcesseIDs found for job " + job.getJobID(); 488 if(job.getProcesseIDs().size() < Processe) return job.getJobID() + " only has " + job.getProcesseIDs().size() + "processes, processe " + Processe + "dose not exist."; 489 490 491 // for(int i=0; job.getProcesseIDs().size() != i; i++){ 492 493 int attempt = 0; 494 boolean success = false; 495 String commmandOutput = ""; 496 System.out.print("ProcesseID: <" + job.getProcesseIDs().get(Processe) + "> of Job: <" + job.getJobID() + ">"); 497 498 while (!success && (attempt < getMaxAttempts())) { 499 try { 500 CSHCommandLineTask task = new CSHCommandLineTask("condor_q " + ((String) job.getProcesseIDs().get(Processe)) , true, getMaxElapseTime()); 501 task.execute(); 502 if (task.getExitStatus() != 0) { 503 log.warn("condor_q " + task.getOutput()); 504 Thread.sleep(getMsBtwnFailure()); 505 506 // if(task.getOutput().lastIndexOf("already finished") != -1) success = true; 507 //return (task.getOutput().replace('\n',' '); 508 attempt++; 509 } 510 else{ 511 success = true; 512 job.DispatchSuccessful(); 513 job.AddProcesseID(task.getOutput().substring(task.getOutput().indexOf("submitted to cluster") + 20, task.getOutput().length()).replace('.', ' ').trim()); 514 515 if(task.getOutput().length() < 217) return("Done or Killed"); 516 else{ 517 String state = task.getOutput().substring(214,216).trim(); 518 if( state.startsWith("R")) state = "RUN"; 519 return(task.getOutput().substring(214,216).trim()); 520 } 521 522 } 523 524 commmandOutput = task.getOutput(); 525 } 526 catch (Exception e) { System.out.print("condor_q failed" + e); 527 System.out.print(commmandOutput); 528 } 529 try { Thread.sleep(getMsBtwnFailure());} 530 catch (Exception e1) {System.out.print("condor_q failed");} 531 if(!success) System.out.print("/"); 532 attempt++; 533 } 534 535 // } 536 537 return "condor_q failed"; 538 } 539 540 public void stop() { 541 } 542 543 544 private List passedQueues = new ArrayList(); // stores a list of queue that have passed, queue check test 545 /** 546 * Runs test(s) on underlying components to determine if submitting jobs should be attempted. 547 * @param queue The queue object to be tested 548 * @return Will return true to indicate everything is alright and false if the test has failed 549 */ 550 public boolean test(Queue queue){ 551 552 /* 553 if(passedQueues.contains(queue)) return true; //If the queue has already passed one time don't test it agin 554 555 if(! runInTimeLimitedThread("globus-job-run " + queue.getAssociatedAccessMethod().getBatchSystem() + " /bin/echo passed", getMaxElapseTime() + 300000, getMsBtwnFailure(), 4 )){ 556 System.out.print("(ExeFailed)"); 557 return false; 558 } 559 else{ 560 String output = super.threadOuput.replace('\n',' ').trim(); 561 if(output.matches("passed")){ //check if queue is open 562 passedQueues.add(queue); 563 return true; 564 } 565 else{ 566 return false; 567 } 568 } 569 */ return true; 570 } 571 572 573 private boolean useLSFMod = false; 574 public void setUseLSFMod( boolean useLSFMod){this.useLSFMod = useLSFMod;} 575 public boolean getUseLSFMod(){return useLSFMod;} 576 577 578 579 }