001 /* 002 * CondorDispatcher.java 003 * 004 * Created on July 17, 2003, 11:17 AM 005 * 006 * This file is part of the STAR Scheduler. 007 * Copyright (c) 2002-2006 STAR Collaboration - Brookhaven National Laboratory 008 * 009 * STAR Scheduler is free software; you can redistribute it and/or modify 010 * it under the terms of the GNU General Public License as published by 011 * the Free Software Foundation; either version 2 of the License, or 012 * (at your option) any later version. 013 * 014 * STAR Scheduler is distributed in the hope that it will be useful, 015 * but WITHOUT ANY WARRANTY; without even the implied warranty of 016 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 017 * GNU General Public License for more details. 018 * 019 * You should have received a copy of the GNU General Public License 020 * along with STAR Scheduler; if not, write to the Free Software 021 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA 022 */ 023 package gov.bnl.star.offline.scheduler.Dispatchers.condorg; 024 025 import gov.bnl.star.offline.scheduler.*; 026 import gov.bnl.star.offline.scheduler.Queue; 027 import gov.bnl.star.offline.scheduler.request.Request; 028 import gov.bnl.star.offline.scheduler.ComponentLibrary; 029 import gov.bnl.star.offline.scheduler.Dispatchers.lsf.CSHApplication; 030 import gov.bnl.star.offline.scheduler.Dispatchers.lsf.LSFDispatcher; 031 import gov.bnl.star.offline.scheduler.util.CSHCommandLineTask; 032 import gov.bnl.star.offline.scheduler.util.FilesystemToolkit; 033 //import gov.bnl.star.offline.scheduler.util.StatisticsRecorder; //Moved Statistics recording to Scheduler.java LH 034 035 import java.io.File; 036 import java.io.FileOutputStream; 037 import java.io.PrintStream; 038 import java.util.*; 039 040 import java.util.logging.Level; 041 import org.apache.log4j.Logger; 042 043 044 /** Dispatches jobs using Condor-G on a remote site that uses LSF. It will use some 045 * extra rsl attributes created to command some extra features such as mail 046 * notification, resource usage, job name and target machine. These extra LSF 047 * attribute require a patch to the LSF job manager. 048 * @author Gabriele Carcassi 049 * @version 1.0 2003/07/23 050 */ 051 public class CondorDispatcher extends LSFDispatcher { 052 static private Logger log = Logger.getLogger(CondorDispatcher.class.getName()); 053 private String condorEx; 054 055 private String condorOptions; 056 057 public void setCondorEx(String condorEx) { 058 this.condorEx = condorEx; 059 } 060 061 public String getCondorEx() { 062 return condorEx; 063 } 064 065 /** Creates a new dispatcher */ 066 public CondorDispatcher() { 067 } 068 069 /** Creates the scripts and dispatches the job on the target machine. 070 * @param request the job request 071 */ 072 public void dispatch(Request request, List jobs) { 073 log.info("Dispatching using Condor: \"" + request.getCommand() + "\""); 074 075 // Enables the simulation mode if necessary 076 useSimulationMode(request.getSimulation()); 077 reportedFailure = false; 078 079 // Submits from the higher to the lower JobID. This way the 080 // user has a feel of when the last job is going to be 081 // submitted 082 for (int nProcess = jobs.size() - 1; nProcess >= 0; 083 nProcess--) { 084 Job job = (Job) jobs.get(nProcess); 085 086 System.out.print("Dispatching process " + 087 job.getJobID() + "."); 088 dispatch(request, job); 089 } 090 091 //StatisticsRecorder.getIntance().recordStatistics(request, jobs); 092 } 093 094 protected void dispatch(Request request, Job job) { 095 096 //CondorNodePriorityStringGenerator priorityGen = new CondorNodePriorityStringGenerator(); 097 //System.out.println(">>>>>>>>>>>>>>>>>>>>>>>>>>>>" + priorityGen.generateSyntax(job)); 098 099 100 101 //application = (CSHApplication) ComponentLibrary.getInstance().getComponent("CSHApplication"); 102 //No longer get the CSHApplication object from the config file. Get it using the setApplication() and getApplication() via the config file 103 if(application == null){ //If this was not set in the config file or if we'er in junit testing mode then print an error, and use the default file 104 System.out.println(" >>>>>>>>>>>> CSHapplacation is not is italized."); 105 String notSet = "The CSHApplication for this dispatcher was not set in the config file. If this is a jUnit test this is normal. Finding default \"CSHApplication\" in ComponentLibrary."; 106 log.warn(notSet); 107 System.out.println(notSet); 108 application = (CSHApplication) ComponentLibrary.getInstance().getComponent("CSHApplication"); 109 } 110 111 // TODO: all the parameters should be passed in one go 112 application.setJob(request, job); 113 //application.setScratchDir(scratchDir); 114 application.setSubmissionCommand(getCondorCommand(request, job)); 115 116 application.prepareJob(); 117 prepareClassAd(request, job); 118 119 log.info("Executing \"" + getCondorCommand(request, job) + "\""); 120 121 if (!simulation) { 122 try { 123 Thread.sleep(getMsBtwnSuccess()); 124 } catch (Exception e) { 125 } 126 127 long StarTime = System.currentTimeMillis(); 128 int attempt = 0; 129 boolean success = false; 130 131 while (!success && (attempt < getMaxAttempts())) { 132 try { 133 CSHCommandLineTask task = new CSHCommandLineTask(getCondorCommand(request, job), true, 30000); 134 task.execute(); 135 136 if (task.getExitStatus() != 0) { 137 attempt++; 138 log.warn("condor-submit failed: " + task.getOutput()); 139 if (attempt < getMaxAttempts()) { 140 Thread.sleep(getMsBtwnFailure()); 141 System.out.print("/"); 142 } 143 } else { 144 success = true; 145 job.DispatchSuccessful(); 146 147 //recover the processID 148 String processeID = task.getOutput().replace('\n', ' ').replaceAll(".*submitted to cluster ([0-9]*).*","$1"); 149 if( processeID.matches("[0-9]*") ) job.AddProcesseID( processeID ); 150 else System.out.println("Wanning: Could not record condor processID of job, trying to kill job with -k option will fail if attempted. Otherwise this has no harmful effect.\n"); 151 152 job.setDispatchTime(((int) Math.min(System.currentTimeMillis() - StarTime, java.lang.Integer.MAX_VALUE))); 153 } 154 } catch (Exception e) { 155 log.fatal("Couldn't submit the script to Condor-g", e); 156 try { 157 Thread.sleep(getMsBtwnFailure()); 158 } catch (Exception e1) { 159 } 160 161 System.out.print("/"); 162 attempt++; 163 } 164 } 165 166 if (success) { 167 System.out.println(" done."); 168 } else { 169 System.out.println(" FAILED!!"); 170 } 171 } else { 172 System.out.println(" simulated."); 173 } 174 } 175 176 /** Returns the command line to submit the job through condor-g. 177 * @param request the request that originated the job 178 * @param job the job to be dispatched 179 * @return the commandline to submit the job 180 */ 181 protected String getCondorCommand(Request request, Job job) { 182 //The "cd" command was added here, just incase the dir is "somehow" chaged while running 183 return "cd " + FilesystemToolkit.getCurrentDirectory() + "; " + condorEx + " " + getClassAdName(request, job); 184 } 185 186 /** Returns the name of the file containing the class ad. Class ad is the job 187 * description required by condor to submit a job. 188 * @param request the request that originated the job 189 * @param job the job to be submitted 190 * @return the file name of the class ad 191 */ 192 protected String getClassAdName(Request request, Job job) { 193 return "sched" + job.getJobID() + ".condor"; 194 } 195 private void prepareClassAd(Request request, Job job) { 196 try { 197 PrintStream classAd = new PrintStream(new FileOutputStream( 198 new File(getClassAdName(request, job)))); 199 createClassAd(request, job, classAd); 200 } catch (Exception e) { 201 log.fatal("Couldn't create the class ad", e); 202 throw new RuntimeException("Couldn't create the class ad " + 203 getClassAdName(request, job) + ": " + e.getMessage()); 204 } 205 } 206 207 private void createClassAd(Request request, Job job, PrintStream classAd) { 208 209 classAd.println("Universe = vanilla"); 210 classAd.println(); 211 //classAd.println("+Experiment = \"star\""); 212 classAd.println("Notification = never"); 213 classAd.print("Executable = "); 214 classAd.println(getExecutable()); 215 216 if (getArguments() != null) { 217 classAd.print("Arguments = "); 218 classAd.println(getArguments()); 219 } 220 221 if (application.getStdin() != null) { 222 classAd.print("Input = "); 223 classAd.println(application.getStdin()); 224 } 225 226 if (application.getStdout() != null) { 227 classAd.print("Output = "); 228 classAd.println(application.getStdout()); 229 } 230 231 if (application.getStderr() != null) { 232 classAd.print("Error = "); 233 classAd.println(application.getStderr()); 234 } 235 236 237 if("rootd".equals(request.getFileListType())){ 238 String rank = CondorNodePriorityStringGenerator.generateSyntax(job); 239 if(!rank.equals("")){ 240 classAd.println("Rank = " + rank); 241 } 242 } 243 244 245 /////////////////////////// 246 List summedRequirements = new ArrayList(); 247 summedRequirements.addAll(getRequirements()); 248 249 if (job.getTarget() != null) { 250 summedRequirements.add("(Machine == \"" + job.getTarget() + "\")"); 251 } 252 253 if(summedRequirements.size() > 0){ //if the are some requorment 254 String requirements = "Requirements = "; 255 256 for(int i = 0; i != summedRequirements.size(); i++){ 257 if(i > 0 ) requirements = requirements + " && "; 258 requirements = requirements + summedRequirements.get(i); 259 } 260 261 classAd.println(requirements); 262 } 263 264 265 266 /* //reminder conder needs to work like LSF 267 if (job.getTarget() != null) { 268 269 if("rootd".equals(request.getFileListType())){ 270 if(getNodePriorityStringGenerator() == null){ 271 System.out.println("Warring the LSFNodePriorityStringGenerator has note been initialized in the config file for this dispatcher. Please contact your SUMS administrator and report this message."); 272 bsub.append(" -m ").append(job.getTarget()); 273 } 274 else{ 275 bsub.append(" -m ").append(getNodePriorityStringGenerator().generateSyntax(job)); 276 } 277 } 278 else { 279 280 //if there are more requirements the logic will need to be change use && to delimit 281 classAd.println("Requirements = ); 282 } 283 } 284 */ 285 /////////////////////////// 286 287 288 classAd.print("Log = "); 289 classAd.println(getLogName(job)); 290 classAd.println("Getenv = true"); 291 292 if (getRemoteDirectory() != null) { 293 classAd.print("Initialdir = "); 294 classAd.println(getRemoteDirectory()); 295 } 296 297 if (getCondorOptions() != null) { 298 classAd.println(getCondorOptions()); 299 } 300 301 // classAd.println("transfer_executable = false"); 302 classAd.println("Queue"); 303 } 304 305 private String getExecutable() { 306 if (application.getCommandLine().indexOf(' ') == -1) { 307 return application.getCommandLine(); 308 } 309 310 return application.getCommandLine().substring(0, application.getCommandLine().indexOf(' ')); 311 } 312 313 private String getArguments() { 314 if (application.getCommandLine().indexOf(' ') == -1) { 315 return null; 316 } 317 318 return application.getCommandLine().substring(application.getCommandLine() 319 .indexOf(' ') + 320 1); 321 } 322 323 private String getLogName(Job job) { 324 // TODO maybe log filename should be put as a general property of Process (as stds) 325 return "sched" + job.getJobID() + ".condor.log"; 326 } 327 328 /* private String getGlobusScheduler() { 329 //TODO make it flexible 330 return "stargrid01.rcf.bnl.gov/jobmanager-lsf"; 331 }*/ 332 333 private String getRemoteDirectory() { 334 // TODO this has to be specified better: remote execution directory could be different from scheduler execution directory 335 return FilesystemToolkit.getCurrentDirectory(); 336 } 337 338 /** Getter for property condorOptions. 339 * @return Value of property condorOptions. 340 * 341 */ 342 public String getCondorOptions() { 343 return this.condorOptions; 344 } 345 346 /** Setter for property condorOptions. 347 * @param condorOptions New value of property condorOptions. 348 * 349 */ 350 public void setCondorOptions(String condorOptions) { 351 this.condorOptions = condorOptions; 352 } 353 354 /* protected String getResourceUsageSwitch(Process job) { 355 String res = super.getResourceUsageSwitch(job); 356 357 return res.replaceAll("\"", "\\\\\""); 358 }*/ 359 360 361 /** Set the class that writes the sricpt that will be executed by the batch system */ 362 public void setApplication(CSHApplication application){ 363 this.application = application; 364 } 365 366 /** Get the class that writes the sricpt that will be executed by the batch system */ 367 public CSHApplication getApplication(){ 368 return application; 369 } 370 371 372 public void Kill(Request request, List jobs) { 373 //System.out.println("condor kill -----------started"); 374 375 376 for(int z=0; z != jobs.size(); z++){ 377 Job job = (Job)jobs.get(z); 378 379 if(job.getProcesseIDs().size() == 0){ 380 System.out.println("No ProcesseIDs found for job " + job.getJobID()); 381 jobs.remove(z); 382 z--; 383 } 384 else{ 385 for(int i=0; job.getProcesseIDs().size() != i; i++){ 386 387 int attempt = 0; 388 boolean success = false; 389 String commmandOutput = ""; 390 System.out.print("ProcesseID: <" + job.getProcesseIDs().get(i) + "> of Job: <" + job.getJobID() + ">"); 391 392 while (!success && (attempt < getMaxAttempts())) { 393 try { 394 395 //System.out.println("exec string -----> \"" + "condor_rm " + ((String) job.getProcesseIDs().get(i)) + "\""); 396 397 CSHCommandLineTask task = new CSHCommandLineTask("condor_rm " + ((String) job.getProcesseIDs().get(i)) , true, getMaxElapseTime()); 398 task.execute(); 399 if (task.getExitStatus() != 0) { 400 log.warn("condor_rm " + task.getOutput()); 401 Thread.sleep(getMsBtwnFailure()); 402 if(task.getOutput().lastIndexOf("Couldn't find") != -1) success = true; 403 System.out.print(task.getOutput()); 404 attempt++; 405 } 406 else{ 407 success = true; 408 System.out.println("Killed"); 409 } 410 411 commmandOutput = task.getOutput(); 412 } 413 catch (Exception e) { System.out.print("condor_rm failed" + e); 414 System.out.print(commmandOutput); 415 } 416 try { Thread.sleep(getMsBtwnFailure());} 417 catch (Exception e1) {System.out.print("condor_rm failed");} 418 if(!success) System.out.print("/"); 419 attempt++; 420 } 421 422 } 423 job.clearProcesseIDs(); 424 jobs.remove(z); 425 z--; 426 } 427 } 428 429 430 431 //System.out.println("condor kill -----------done"); 432 433 434 } 435 436 public String Status(Job job, int Processe) { 437 if(job.getProcesseIDs().size() == 0) return "No ProcesseIDs found for job " + job.getJobID(); 438 if(job.getProcesseIDs().size() < Processe) return job.getJobID() + " only has " + job.getProcesseIDs().size() + "processes, processe " + Processe + "dose not exist."; 439 440 441 // for(int i=0; job.getProcesseIDs().size() != i; i++){ 442 443 int attempt = 0; 444 boolean success = false; 445 String commmandOutput = ""; 446 System.out.print("ProcesseID: <" + job.getProcesseIDs().get(Processe) + "> of Job: <" + job.getJobID() + ">"); 447 448 while (!success && (attempt < getMaxAttempts())) { 449 try { 450 CSHCommandLineTask task = new CSHCommandLineTask("condor_q " + ((String) job.getProcesseIDs().get(Processe)) , true, getMaxElapseTime()); 451 task.execute(); 452 if (task.getExitStatus() != 0) { 453 log.warn("condor_q " + task.getOutput()); 454 Thread.sleep(getMsBtwnFailure()); 455 456 // if(task.getOutput().lastIndexOf("already finished") != -1) success = true; 457 //return (task.getOutput().replace('\n',' '); 458 attempt++; 459 } 460 else{ 461 success = true; 462 463 464 if(task.getOutput().length() < 217) return("Done or Killed"); 465 else{ 466 String state = task.getOutput().substring(214,216).trim(); 467 if( state.startsWith("R")) state = "RUN"; 468 return(task.getOutput().substring(214,216).trim()); 469 } 470 471 472 } 473 474 commmandOutput = task.getOutput(); 475 } 476 catch (Exception e) { System.out.print("condor_q failed" + e); 477 System.out.print(commmandOutput); 478 } 479 try { Thread.sleep(getMsBtwnFailure());} 480 catch (Exception e1) {System.out.print("condor_q failed");} 481 if(!success) System.out.print("/"); 482 attempt++; 483 } 484 485 // } 486 487 return "condor_q failed"; 488 } 489 490 public void stop() { 491 } 492 493 String qstat = ""; 494 /** 495 * Runs test(s) on underlying components to determine if submitting jobs should be attempted. 496 * @param queue queue object to be tested 497 * @return Will return true to indicate everything is alright and false if the test has failed 498 */ 499 public boolean test(Queue queue){ 500 501 502 if(qstat.length() == 0){ //only run the command one time 503 if(! runInTimeLimitedThread("condor_status -total" , getMaxElapseTime(), getMsBtwnFailure(), 5 )){ 504 return false; 505 } 506 else{ 507 //System.out.println(super.threadOuput); //show the output for debugging 508 qstat = super.threadOuput.replace('\n', ' '); //keep everything on one line 509 } 510 } 511 512 if(qstat.length() == 0) return false; //just in case notting was returnned; 513 if(qstat.matches(".* [123456789]* .*")){ //find any non zero digit in bewteen speces. This will find eider n available or n used nodes, in both cases the site most likly alright. 514 515 return true; 516 } 517 518 return false; 519 520 } 521 522 523 private List requirements = new ArrayList(); 524 /** 525 Add requirements to the requirements="" string in the .condor file. 526 Note: Machine should not be added as SUMS will fill in this value for you. 527 */ 528 public void addRequirement(String requirement){ requirements.add(requirement.trim()); } 529 public List getRequirements(){return requirements;} 530 public void setRequirements(List requirements) {this.requirements = requirements;} 531 532 }