001 /* 002 * PBSDispatcher.java 003 * 004 * Created on Thu Apr 8 12:40:51 EDT 2004 005 * 006 * This file is part of the STAR Scheduler. 007 * Copyright (c) 2002-2006 STAR Collaboration - Brookhaven National Laboratory 008 * 009 * STAR Scheduler is free software; you can redistribute it and/or modify 010 * it under the terms of the GNU General Public License as published by 011 * the Free Software Foundation; either version 2 of the License, or 012 * (at your option) any later version. 013 * 014 * STAR Scheduler is distributed in the hope that it will be useful, 015 * but WITHOUT ANY WARRANTY; without even the implied warranty of 016 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 017 * GNU General Public License for more details. 018 * 019 * You should have received a copy of the GNU General Public License 020 * along with STAR Scheduler; if not, write to the Free Software 021 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA 022 */ 023 024 025 /* 026 * This module was derived from the module LSFDispatcher.java developed by G. Crcassi. 027 * The module is dedicated to be the interface to the batch job scheduler PBS. 028 * This version was prepared by Andrey Shevel@bnl.gov. 029 * 20 April 2004. 030 */ 031 032 033 034 035 package gov.bnl.star.offline.scheduler.Dispatchers.pbs; 036 037 import gov.bnl.star.offline.scheduler.ComponentLibrary; 038 import gov.bnl.star.offline.scheduler.Dispatcher; 039 import gov.bnl.star.offline.scheduler.Queue; 040 import gov.bnl.star.offline.scheduler.Job; 041 import gov.bnl.star.offline.scheduler.request.Request; 042 import gov.bnl.star.offline.scheduler.catalog.PhysicalFile; 043 import gov.bnl.star.offline.scheduler.util.CSHCommandLineTask; 044 //import gov.bnl.star.offline.scheduler.util.StatisticsRecorder; //Moved Statistics recording to Scheduler.java LH 045 046 // I (Andrey Shevel@bnl.gov) added the line 047 import gov.bnl.star.offline.scheduler.Dispatchers.lsf.CSHApplication; 048 import gov.bnl.star.offline.scheduler.util.FilesystemToolkit; 049 050 import java.util.List; 051 import java.util.logging.Level; 052 import org.apache.log4j.Logger; 053 054 055 056 /** Dispatches a job using PBS. 057 * <p> 058 * For each process (job), two files are created: a script for the execution and 059 * a text file containing the file list. The script basically sets the 060 * environment variables and executes the command line. The file list 061 * contains the input file requested, one full path for each line in the list. 062 * <p> 063 * Each script is submitted through qsub. 064 * <p> 065 * The simulation flag will make the scheduler not actually execute the command 066 * lines. Therefore scripts and fileLists are created, but the qsub and chmod 067 * commands are not executed. Log and output won't be affected, except that there 068 * will be a message warning that the submission is simulated. 069 * @author Gabriele Carcassi 070 * @version 1.0 2002/12/26 071 */ 072 public class PBSDispatcher implements Dispatcher { 073 static private Logger log = Logger.getLogger(PBSDispatcher.class.getName()); 074 private String resourceStrategy; 075 // protected String scratchDir; 076 private String qsubEx; 077 protected boolean simulation = false; 078 private String queueName; 079 private String qsubOptions; 080 private int maxQsubAttempts; 081 private int msBtwnSuccess; 082 private int msBtwnFailure; 083 084 // public String getScratchDir() { 085 // return scratchDir; 086 // } 087 // 088 // public void setScratchDir(String scratchDir) { 089 // this.scratchDir = scratchDir; 090 // } 091 092 public String getQsubEx() { 093 return qsubEx; 094 } 095 096 public void setQsubEx(String qsubEx) { 097 this.qsubEx = qsubEx; 098 } 099 100 public String getQueueName() { 101 return queueName; 102 } 103 104 public void setQueueName(String queueName) { 105 this.queueName = queueName; 106 } 107 108 public String getQsubOptions() { 109 return qsubOptions; 110 } 111 112 public void setQsubOptions(String qsubOptions) { 113 this.qsubOptions = qsubOptions; 114 } 115 116 public int getMaxAttempts() { 117 return maxQsubAttempts; 118 } 119 120 public void setMaxAttempts(int maxAttempts) { 121 this.maxQsubAttempts = maxAttempts; 122 } 123 124 public int getMsBtwnSuccess() { 125 return msBtwnSuccess; 126 } 127 128 public void setMsBtwnSuccess(int msBtwnSuccess) { 129 this.msBtwnSuccess = msBtwnSuccess; 130 } 131 132 public int getMsBtwnFailure() { 133 return msBtwnFailure; 134 } 135 136 public void setMsBtwnFailure(int msBtwnFailure) { 137 this.msBtwnFailure = msBtwnFailure; 138 } 139 140 protected boolean reportedFailure; 141 protected CSHApplication application; 142 private String resSwitch; 143 144 /** Creates the scripts and dispatches the job on the target machine. 145 * @param request the job request 146 */ 147 public void dispatch(Request request, List jobs) { 148 log.info("Dispatching using PBS: \"" + request.getCommand() + "\""); 149 150 // Enables the simulation mode if necessary 151 useSimulationMode(request.getSimulation()); 152 reportedFailure = false; 153 154 // Submits from the higher to the lower JobID. This way the 155 // user has a feel of when the last job is going to be 156 // submitted 157 for (int nJob = jobs.size() - 1; nJob >= 0; 158 nJob--) { 159 Job job = (Job) jobs.get(nJob); 160 161 System.out.print("Dispatching process " + 162 job.getJobID() + "."); 163 dispatch(request, job); 164 if (getClusterName() != null) job.setCluster(getClusterName()); 165 } 166 167 //StatisticsRecorder.getIntance().recordStatistics(request, jobs); //Moved Statistics recording to Scheduler.java LH 168 } 169 170 /* Enables or disables the simulation mode. The simulation mode will deactivate 171 * every command line execution. 172 */ 173 protected void useSimulationMode(boolean simulation) { 174 this.simulation = simulation; 175 176 if (simulation) { 177 // Warn the user that we are entering simulated submission mode 178 log.warn("Simulating submission"); 179 System.out.println("Simulating submission"); 180 } 181 } 182 183 protected void reportProcessSubmissionFailure(Request request, Job job, int jobNumber, String message) { 184 reportFailure(job); 185 System.out.println("Process number " + jobNumber + " wasn't submitted."); 186 System.out.println(message); 187 System.out.println(); 188 System.out.println("The process input file were:"); 189 190 List list = job.getInput(); 191 192 for (int nFile = 0; nFile < list.size(); nFile++) { 193 PhysicalFile file = (PhysicalFile) list.get(nFile); 194 System.out.println(" - " + file.getPath() + "/" + 195 file.getFilename()); 196 } 197 } 198 199 protected void reportFailure(Job job) { 200 if (!reportedFailure) { 201 System.out.println("There were some errors during job submission."); 202 System.out.println("Some processes weren't submitted:"); 203 } 204 } 205 206 /** Currently not implemented 207 * @param request the job for which to retrieve the output 208 */ 209 public void retrieveOutput(Request request, List jobs) { 210 } 211 212 /* Dispatches a single process of a job request. 213 */ 214 protected void dispatch(Request request, Job job) { 215 //application = (CSHApplication) ComponentLibrary.getInstance().getComponent("CSHApplication"); 216 217 //No longer get the CSHApplication object from the config file. Get it using the setApplication() and getApplication() via the config file 218 if(application == null){ //If this was not set in the config file or if we'er in junit testing mode then print an error, and use the default file 219 System.out.println(" >>>>>>>>>>>> CSHapplacation is not is italized."); 220 String notSet = "The CSHApplication for this dispatcher was not set in the config file. If this is a jUnit test this is normal. Finding default \"CSHApplication\" in ComponentLibrary."; 221 log.warn(notSet); 222 System.out.println(notSet); 223 application = (CSHApplication) ComponentLibrary.getInstance().getComponent("CSHApplication"); 224 } 225 226 227 // TODO: all the parameters should be passed in one go 228 application.setJob(request, job); 229 //application.setScratchDir(scratchDir); 230 application.setSubmissionCommand(getQsubCommand(request, job)); 231 232 application.prepareJob(); 233 234 log.info("Executing \"" + getQsubCommand(request, job) + "\""); 235 236 if (!simulation) { 237 try { 238 Thread.sleep(msBtwnSuccess); 239 } catch (Exception e) { 240 } 241 242 long StarTime = System.currentTimeMillis(); 243 int attempt = 0; 244 boolean success = false; 245 246 while (!success && (attempt < maxQsubAttempts)) { 247 try { 248 CSHCommandLineTask task = new CSHCommandLineTask(getQsubCommand( 249 request, job), true, 30000); 250 task.execute(); 251 252 if (task.getExitStatus() != 0) { 253 log.warn("qsub failed: " + task.getOutput()); 254 Thread.sleep(msBtwnFailure); 255 System.out.print("/"); 256 attempt++; 257 } else { 258 success = true; 259 job.DispatchSuccessful(); 260 job.setDispatchTime(((int) Math.min(System.currentTimeMillis() - StarTime, java.lang.Integer.MAX_VALUE))); 261 } 262 } catch (Exception e) { 263 log.error("Couldn't submit the script to PBS", e); 264 265 try { 266 Thread.sleep(msBtwnFailure); 267 } catch (Exception e1) { 268 } 269 270 System.out.print("/"); 271 attempt++; 272 } 273 } 274 275 if (success) { 276 System.out.println(" done."); 277 } else { 278 System.out.println(" FAILED!!"); 279 } 280 } else { 281 System.out.println(" simulated."); 282 } 283 } 284 285 /** Returns the full qsub command to be executed to dispatch the process. This 286 * command must executed in the directory in which the script resides. 287 * @return the qsub command 288 */ 289 String getQsubCommand(Request request, Job job) { 290 291 StringBuffer qsub = new StringBuffer("cd " + FilesystemToolkit.getCurrentDirectory() + "; " + qsubEx); 292 //StringBuffer qsub = new StringBuffer(qsubEx); 293 294 //qsub.append(" -q ").append(getQueueName(job)); 295 if(getQueueName(job) != null){ 296 //if(job.getQueueObj().getName().trim().length() != 0 ) bsub.append(" -q ").append(job.getQueueObj().getName()); 297 if(getQueueName(job).trim().length() != 0 ) qsub.append(" -q ").append(getQueueName(job)); 298 } 299 300 if (job.getTarget() != null) { 301 // qsub.append(" -m ").append(job.getTarget()); 302 qsub.append(" -l nodes=").append(job.getTarget()); 303 } 304 305 if (application.getJobName() != null) { 306 // qsub.append(" -J '").append(application.getJobName()).append("'"); 307 qsub.append(" -N '").append(application.getJobName()).append("'"); 308 } 309 310 // if (application.getStdin() != null) { 311 // qsub.append(" -i ").append(application.getStdin()); 312 // } 313 314 if (application.getStdout() != null) { 315 qsub.append(" -o ").append(application.getStdout()); 316 } 317 318 if (application.getStderr() != null) { 319 qsub.append(" -e ").append(application.getStderr()); 320 } 321 322 int maxWallTime = request.getResource("WallTime").getMax(); 323 if((getResourceUsageSwitch(job) != null) || maxWallTime != -1) { 324 qsub.append(" -l "); 325 if (maxWallTime != -1) { 326 //An alternative syntax for the time specification is: #HH:#MM:#SS (HH = hours, MM = minutes, SS = seconds) 327 qsub.append("walltime=").append(maxWallTime); 328 if(getResourceUsageSwitch(job) != null) qsub.append(","); 329 } 330 if (getResourceUsageSwitch(job) != null){ 331 qsub.append("nodes=").append(getResourceUsageSwitch(job)); 332 } 333 } 334 qsub.append(' ').append(qsubOptions); 335 336 337 /* 338 The path is being dropped because different drivers for mounting the PanFS path are reporting the path differently. Instated we will put "cd $path" before the command we execute. 339 Example: 340 /pdirect/star+institutions/data05/scratch/lbhajdu/ 341 /star/data05/scratch/lbhajdu/ 342 */ 343 qsub.append(' ').append(application.getCSHScriptFileName()); 344 //qsub.append(' ').append(application.getCommandLine()); 345 346 return qsub.toString(); 347 } 348 349 350 private PBSResourceStrategy pbsResourceStrategy; 351 352 /** Holds value of property clusterName. */ 353 private String clusterName; 354 355 public void setResourceStrategy(PBSResourceStrategy resourceStrategy) { 356 this.pbsResourceStrategy = resourceStrategy; 357 } 358 359 public PBSResourceStrategy getResourceStrategy() { 360 return pbsResourceStrategy; 361 } 362 363 protected String getResourceUsageSwitch(Job job) { 364 //FIXME: cache value 365 if (getResourceStrategy() == null) { 366 return null; 367 } 368 369 resSwitch = getResourceStrategy().prepareResourceUsageSwitch(job); 370 371 return resSwitch; 372 } 373 374 protected String getQueueName(Job job) { 375 String queue = job.getQueue(); 376 377 if (queue == null) { 378 return queueName; 379 } 380 381 return queue; 382 } 383 384 /** Getter for property clusterName. 385 * @return Value of property clusterName. 386 * 387 */ 388 public String getClusterName() { 389 return this.clusterName; 390 } 391 392 /** Setter for property clusterName. 393 * @param clusterName New value of property clusterName. 394 * 395 */ 396 public void setClusterName(String clusterName) { 397 this.clusterName = clusterName; 398 } 399 400 401 /** Set the class that writes the sricpt that will be executed by the batch system */ 402 public void setApplication(CSHApplication application){ 403 this.application = application; 404 } 405 406 /** Get the class that writes the sricpt that will be executed by the batch system */ 407 public CSHApplication getApplication(){ 408 return application; 409 } 410 411 public void Kill(Request request, List jobs) { 412 } 413 414 public String Status(Job job, int Processe) { 415 return "status unavailable"; 416 } 417 418 public void stop() { 419 } 420 421 422 /** 423 * Runs test(s) on underlying components to determine if submitting jobs should be attempted. 424 * @param queue The queue object to be tested 425 * @return Will return true to indicate everything is alright and false if the test has failed 426 */ 427 public boolean test(Queue queue){ 428 return true; 429 } 430 431 }