001 /* 002 * XgridDispatcher.java 003 * 004 * Created on March 9, 2006, 11:47 AM 005 * 006 * This file is part of the STAR Scheduler. 007 * Copyright (c) 2002-2006 STAR Collaboration - Brookhaven National Laboratory 008 * 009 * STAR Scheduler is free software; you can redistribute it and/or modify 010 * it under the terms of the GNU General Public License as published by 011 * the Free Software Foundation; either version 2 of the License, or 012 * (at your option) any later version. 013 * 014 * STAR Scheduler is distributed in the hope that it will be useful, 015 * but WITHOUT ANY WARRANTY; without even the implied warranty of 016 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 017 * GNU General Public License for more details. 018 * 019 * You should have received a copy of the GNU General Public License 020 * along with STAR Scheduler; if not, write to the Free Software 021 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA 022 */ 023 024 package gov.bnl.star.offline.scheduler.Dispatchers.xgrid; 025 026 import gov.bnl.star.offline.scheduler.Dispatchers.lsf.*; 027 import gov.bnl.star.offline.scheduler.ComponentLibrary; 028 import gov.bnl.star.offline.scheduler.Dispatcher; 029 import gov.bnl.star.offline.scheduler.Job; 030 import gov.bnl.star.offline.scheduler.request.Request; 031 import gov.bnl.star.offline.scheduler.catalog.PhysicalFile; 032 import gov.bnl.star.offline.scheduler.util.CSHCommandLineTask; 033 import gov.bnl.star.offline.scheduler.util.GenericResourceRequirementStringDefinition; 034 import gov.bnl.star.offline.scheduler.Dispatchers.AbstractResourceStrategy; 035 import gov.bnl.star.offline.scheduler.util.sandbox.Sandbox; 036 import gov.bnl.star.offline.scheduler.util.sandbox.SandboxPackage; 037 import gov.bnl.star.offline.scheduler.Dispatchers.xgrid.BashApplication; 038 039 import gov.bnl.star.offline.scheduler.util.FilesystemToolkit; 040 import java.net.URL; 041 import java.io.*; 042 import java.util.*; 043 //import java.util.List; 044 import java.util.logging.Level; 045 import java.util.logging.Logger; 046 /** 047 * Dispatches a jobs using XGRID for Mac O.S. X 048 * 049 * @author mlmiller 050 */ 051 public class XgridDispatcher implements Dispatcher { 052 053 static private Logger log = Logger.getLogger(XgridDispatcher.class.getName()); 054 protected BashApplication application; 055 protected boolean simulation = false; 056 private int maxElapseTime = 30000; 057 private String xGridScratch = "./"; 058 private String xGridEx = "xgrid -h 18.77.0.68 -p kuba -job"; 059 private List retrievalCommands = new ArrayList(); 060 private List deleteCommands = new ArrayList(); 061 private boolean isSandboxDirCreated = false; 062 063 public String getXgridEx() { 064 return xGridEx; 065 } 066 067 public void setXgridEx(String v) { 068 this.xGridEx= v; 069 } 070 071 public void setMaxElapseTime(int maxElapseTime){ 072 this.maxElapseTime = maxElapseTime; 073 } 074 075 public int getMaxElapseTime(){ 076 return maxElapseTime; 077 } 078 079 /** Creates a new instance of XgridDispatcher */ 080 public XgridDispatcher() { 081 } 082 083 084 public String getOutputDirName(Job job) { 085 String dirN = FilesystemToolkit.getCurrentDirectory() + "/results_" + job.getRequestID(); 086 return dirN; 087 } 088 089 public String getInputDirName(Job job) { 090 String dirN = FilesystemToolkit.getCurrentDirectory() + "/input_" + job.getRequestID(); 091 return dirN; 092 } 093 094 public void dispatch(Request request, List jobs) { 095 096 097 try { 098 //System.out.println(""); 099 //System.out.println("XGridDispatcher::dipatch() -- beginning"); 100 //application = (CSHApplication) ComponentLibrary.getInstance().getComponent("CSHApplication"); 101 simulation = request.getSimulation(); 102 //System.out.println("\nWe are using simulation:\t" + simulation); 103 104 try { 105 //No long get the CSHApplication object from the config file. Get it using the setApplication() and getApplication() via the config file 106 if(application == null){ //If this was not set in the config file or if we'er in junit testing mode then print an error, and use the default file 107 //System.out.println(" >>>>>>>>>>>> CSHapplacation is not is italized."); 108 String notSet = "The BashApplication for the xgrid dispatcher was not set in the config file. If this is a jUnit test this is normal. Finding default \"BashApplication\" in ComponentLibrary."; 109 log.warning(notSet); 110 throw new RuntimeException(notSet); 111 //System.out.println(notSet); 112 //application = (BashApplication) ComponentLibrary.getInstance().getComponent("CSHApplication"); 113 } 114 } catch (Exception appE) { 115 System.out.println("\nXGridDispatcher::dispatch(Request,List). getApplication threw:\t" + appE + " abort"); 116 appE.printStackTrace(); 117 System.exit(1); 118 } 119 120 121 //System.out.println("Enter loop\n"); 122 for (int nJob = jobs.size() - 1; nJob >= 0; nJob--) { 123 Job job = (Job) jobs.get(nJob); 124 125 String jid = job.getJobID(); 126 127 //this is kind of a hack, but this directory has to exist for all jobs 128 if (isSandboxDirCreated==false) { 129 System.out.println("\n ------------ Make input directory -------------"); 130 String mkdir = "/bin/mkdir " + getInputDirName(job); 131 java.lang.Process dirShell = Runtime.getRuntime().exec(mkdir); 132 isSandboxDirCreated=true; 133 } 134 //System.out.print("Dispatching process " + jid + "."); 135 136 dispatch(request, job); 137 138 if (application.getCSHScriptFileName().endsWith("_0.csh")) {//last job to dispatch, now do afterburner 139 //System.out.println("\n ------------ Make results directory -------------"); 140 String mkdir = "/bin/mkdir " + getOutputDirName(job); 141 142 java.lang.Process dirShell = Runtime.getRuntime().exec(mkdir); 143 144 //System.out.println("\n------------- Make retrieval script---------------"); 145 146 StringBuffer rFileName = new StringBuffer(FilesystemToolkit.getCurrentDirectory() + "/" + job.getRequestID() + ".retrieve.csh"); 147 //System.out.println("open file:\t" + rFileName); 148 BufferedWriter rOut = new BufferedWriter(new FileWriter(rFileName.toString())); 149 150 for (int i=0; i<retrievalCommands.size(); ++i) { 151 String rcmd = (String) retrievalCommands.get(i); 152 //System.out.println("retrive job with:\t"+rcmd); 153 rOut.write(rcmd + "\n"); 154 } 155 rOut.close(); 156 //and make it executable 157 String chmod1 = "/bin/chmod +x " + rFileName.toString(); 158 java.lang.Process shell = Runtime.getRuntime().exec(chmod1); 159 160 161 162 //System.out.println("\n------------- Make delete script---------------"); 163 164 StringBuffer dFileName = new StringBuffer(FilesystemToolkit.getCurrentDirectory() + "/" + job.getRequestID() + ".delete.csh"); 165 //System.out.println("open file:\t" + dFileName); 166 BufferedWriter dOut = new BufferedWriter(new FileWriter(dFileName.toString())); 167 168 for (int i=0; i<deleteCommands.size(); ++i) { 169 String dcmd = (String) deleteCommands.get(i); 170 //System.out.println("delete job with:\t"+dcmd); 171 dOut.write(dcmd + "\n"); 172 } 173 dOut.close(); 174 //and make it executable 175 String chmod2 = "/bin/chmod +x " + dFileName.toString(); 176 java.lang.Process shell2 = Runtime.getRuntime().exec(chmod2); 177 178 } 179 180 181 182 } 183 184 //StatisticsRecorder.getIntance().recordStatistics(request, jobs); //Moved Statistics recording to Scheduler.java LH 185 //System.out.println("XGridDispatcher::dipatch() -- finished"); 186 return; 187 188 189 } catch (Exception e) { 190 System.out.println("\n XGridDispatcher::dispatch(Lists) threw:\t" + e + "abort()"); 191 e.printStackTrace(); 192 System.exit(1); 193 } 194 } 195 196 197 198 199 //To do (MLM): convert this to xgrid submission logic flow 200 protected void dispatch(Request request, Job job) { 201 202 203 try { 204 205 // TODO: all the parameters should be passed in one go 206 application.setJob(request, job); 207 application.setSubmissionCommand(getXgridCommand(request, job)); 208 application.prepareJob(); 209 //System.out.println("\n \t Executing: " + getXgridCommand(request, job)); 210 211 long StarTime = System.currentTimeMillis(); 212 boolean success = false; 213 String pe=""; 214 215 216 } catch (Exception e) { 217 System.out.println("\nXGridDispatcher::dispatch(Request,Job). application.prepareJob threw:\t" + e); 218 e.printStackTrace(); 219 //System.out.println("return w/o dispatching job"); 220 System.exit(1); 221 return; 222 } 223 224 //Ok, here we hack in a copy of the .list file into the input directory: 225 try { 226 if (isSandboxDirCreated==false) { 227 System.out.println("\nFatal Error: input directory not yet created. Abort()"); 228 System.exit(1); 229 } 230 //System.out.println("\n ------------ copy list file-------------"); 231 String cpfile = "/bin/cp " + application.getInputFileListName() 232 + " " + getInputDirName(job) + "/" + application.getInputFileListName(); 233 java.lang.Process dirShell = Runtime.getRuntime().exec(cpfile); 234 235 } 236 catch (Exception cpE) { 237 cpE.printStackTrace(); 238 System.exit(1); 239 } 240 241 if (simulation==true) { 242 System.out.println("..Simulated"); 243 return; 244 } 245 246 String cmdReturn = null; 247 try { 248 // start up the command in child process 249 //String cmd = "/bin/ls"; 250 //String cmd = "xgrid -h lincoln.mit.edu -p kuba -job submit /bin/hostname"; 251 //String cmd = "/bin/pwd"; 252 String cmd = getXgridCommand(request, job); 253 System.out.println("dispatch job:\t" + job.getJobID()+"\twith command:" + cmd); 254 Process child = Runtime.getRuntime().exec(cmd); 255 //Process child = Runtime.getRuntime().exec(new String[] {csh, "-cf", cmd}); 256 257 // hook up child process output to parent 258 InputStream lsOut = child.getInputStream(); 259 InputStreamReader r = new InputStreamReader(lsOut); 260 BufferedReader in = new BufferedReader(r); 261 262 // read the child process' output 263 String line; 264 int counter = 0; 265 String tmpString = in.toString(); 266 //System.out.println("in:\t" + tmpString); 267 268 while ((line = in.readLine()) != null) { 269 //System.out.println("counter:\t" + counter + "\t" + line); 270 271 try { 272 if (counter==0) cmdReturn= line; 273 } 274 catch (Exception stringE) { 275 System.out.println("concat threw:\t" + stringE); 276 stringE.printStackTrace(); 277 System.exit(1); 278 } 279 } 280 try { 281 //System.out.println("\nSuccess. Retrieved string:\t"+cmdReturn); 282 } 283 catch (Exception soutE) { 284 System.out.println("cout threw:\t" + soutE); 285 soutE.printStackTrace(); 286 System.exit(1); 287 } 288 } 289 catch (Exception runTimeE) { // exception thrown 290 System.out.println("Command failed!"); 291 runTimeE.printStackTrace(); 292 System.exit(1); 293 } 294 295 //now get the job id 296 if (cmdReturn!=null) { 297 try { 298 String sub1 = "jobIdentifier = "; 299 String sub2 = "; }"; 300 int istart = cmdReturn.indexOf(sub1); 301 int istop = cmdReturn.indexOf(sub2); 302 int beginAt = istart + sub1.length(); 303 int endAt = istop; 304 //System.out.println("istart:\t" + istart + "\tistop:\t" + istop); 305 //System.out.println("loop from:\t" +beginAt + "\tto:\t" + endAt); 306 String jobId = cmdReturn.substring(beginAt, endAt); 307 System.out.println("\tsuccess. jobId:\t" + jobId); 308 //job.AddProcesseID(jobId); //This doesn't seem to work right... 309 310 311 //ok, jobId is retived, so let's catalog for later use: 312 StringBuffer resultCmd = new StringBuffer(xGridEx); 313 resultCmd.append(" results"); 314 315 if (application.getStdout() != null) { 316 resultCmd.append(" -so ").append(application.getStdout()); 317 } 318 319 if (application.getStderr() != null) { 320 resultCmd.append(" -se ").append(application.getStderr()); 321 } 322 323 resultCmd.append(" -id " + jobId ); 324 resultCmd.append(" -out " + getOutputDirName(job) + "/"); 325 //System.out.println("retrive results with:\t" + resultCmd); 326 327 retrievalCommands.add(resultCmd.toString()); 328 329 330 //And add the delete command 331 StringBuffer delCmd = new StringBuffer(xGridEx); 332 delCmd.append(" delete"); 333 delCmd.append(" -id " + jobId ); 334 deleteCommands.add(delCmd.toString()); 335 336 337 } 338 catch (Exception idE) { 339 System.out.println("job id find threw:\t"+idE); 340 idE.printStackTrace(); 341 } 342 } 343 else { 344 System.out.println("error, could not dispatch job. abort()"); 345 System.exit(1); 346 } 347 348 349 } 350 351 352 /** Returns the full xgrid command to be executed to dispatch the process. This 353 * command must executed in the directory in which the script resides. 354 * @return the xgrid command 355 */ 356 //To do (MLM): convert this to real xgrid syntax 357 public String getXgridCommand(Request request, Job job) { 358 359 try { //careful, java strings seem to throw... 360 //String thePwd = FilesystemToolkit.getCurrentDirectory(); 361 //System.out.println("XGridDispather::getXgridCommand() -- pwd:\t" + thePwd); 362 //StringBuffer xsub = new StringBuffer("cd " + FilesystemToolkit.getCurrentDirectory() + "; " + xGridEx); 363 StringBuffer xsub = new StringBuffer(xGridEx); 364 xsub.append(" submit"); 365 366 367 //MLM check here for sanbox elements ------ 368 Sandbox sandbox = request.getSandbox(); 369 List packages = sandbox.getPackages(); 370 371 //String jobSandbox = null; 372 for (int ipack = 0; ipack<1; ++ipack) { 373 SandboxPackage sp = (SandboxPackage) packages.get(ipack); 374 375 376 377 378 //System.out.println("List of Sandbox files:"); 379 List files = sp.getFiles(); 380 for (int ifile=0; ifile<files.size(); ++ifile) { 381 URL theUrl = (URL) files.get(ifile); 382 String fileName = theUrl.getFile(); 383 String pathName = theUrl.getPath(); 384 //System.out.println(" file: " + fileName); 385 //System.out.println(" path: " + pathName); 386 387 /* 388 try { 389 if(FilesystemToolkit.checkIfFileExists( new URL("file:" + fileName) ,true)){ 390 //this.addSandboxedFiles((new URL(packageObj.getPackageName()).getFile())); 391 //jobSandbox = fileName; 392 //instead of breaking here, we'll just copy each file into the input directory 393 //break; 394 395 try { 396 if (isSandboxDirCreated==false) { 397 System.out.println("\nFatal Error: input directory not yet created. Abort()"); 398 System.exit(1); 399 } 400 String cpfile = "/bin/cp " + fileName + " " + getInputDirName(job) + "/."; 401 System.out.println("\tcp sandbox file command:\t" + cpfile); 402 java.lang.Process dirShell = Runtime.getRuntime().exec(cpfile); 403 } 404 catch (Exception cpE) { 405 cpE.printStackTrace(); 406 System.exit(1); 407 } 408 } 409 else { 410 System.out.println("Error -- file:" + fileName + " does not exist!!!!!!!!!!!!!!!!!!!!"); 411 } 412 } catch (Exception e) { 413 String buffer = "Error -- file:" + fileName + " does not exist!!!!!!!!!!!!!!!!!!!!"; 414 System.out.println(buffer); 415 log.log(Level.SEVERE, buffer, e); 416 } 417 */ 418 419 try { 420 if (isSandboxDirCreated==false) { 421 System.out.println("\nFatal Error: input directory not yet created. Abort()"); 422 System.exit(1); 423 } 424 String cpfile = "/bin/cp " + fileName + " " + getInputDirName(job) + "/."; 425 //System.out.println("\tcp sandbox file command:\t" + cpfile); 426 java.lang.Process dirShell = Runtime.getRuntime().exec(cpfile); 427 } 428 catch (Exception cpE) { 429 cpE.printStackTrace(); 430 System.exit(1); 431 } 432 433 } 434 } 435 /* MLM, don't need this anymore 436 if (jobSandbox!=null) { 437 //System.out.println("using sandbox" + jobSandbox +" all others will be ignored"); 438 xsub.append(" -in " + jobSandbox); 439 } 440 */ 441 xsub.append(" -in " + getInputDirName(job)); 442 443 //xsub.append(" -file " + application.getInputFileListName()); 444 xsub.append(' ').append(application.getCSHScriptFileName()); 445 446 //System.out.println("\n" + qsub.toString() + "\n"); 447 return xsub.toString(); 448 } 449 catch (Exception e) { 450 System.out.println("\nXGridDispatcher::getXgridCommand() threw:\t" + e); 451 e.printStackTrace(); 452 System.exit(1); 453 } 454 455 //If we reached this point we're dead and should abort 456 String undef = "XGridDispatcher::getXgridCommand()--undefined. You should abort()"; 457 System.exit(1); 458 return undef; 459 } 460 461 462 463 public void setBashApplication(BashApplication application){ this.application = application; } 464 public BashApplication getBashApplication(){ return application; } 465 466 467 468 469 470 471 // ------------------------------------------------------------ We don't use these --------- 472 473 /** Retrieves the output of the job from the target machine. It will first 474 * check whether the job has terminated, and than it will retrieve the 475 * output files and delete any temporary files on the target machine. 476 * @param job job description and requirements, include input and output file and environment 477 * variables 478 */ 479 public void retrieveOutput(Request job, List jobs) { 480 481 } 482 483 /** 484 Kills the processes associated with this job. 485 */ 486 public void Kill(Request request, List jobs) { 487 } 488 489 /** 490 Returns The status of the job 491 */ 492 public String Status(Job job, int Processe) { 493 return null; 494 } 495 496 497 /** 498 Cases the dispacher to stop dispaching and trys to kill dispached jobs 499 */ 500 public void stop() { 501 } 502 503 public boolean test(gov.bnl.star.offline.scheduler.Queue queue) { 504 return true; 505 } 506 }