001    /*
002     * PBSDispatcher.java
003     *
004     * Created on Thu Apr  8 12:40:51 EDT 2004
005     *
006     * This file is part of the STAR Scheduler.
007     * Copyright (c) 2002-2006 STAR Collaboration - Brookhaven National Laboratory
008     *
009     * STAR Scheduler is free software; you can redistribute it and/or modify
010     * it under the terms of the GNU General Public License as published by
011     * the Free Software Foundation; either version 2 of the License, or
012     * (at your option) any later version.
013     *
014     * STAR Scheduler is distributed in the hope that it will be useful,
015     * but WITHOUT ANY WARRANTY; without even the implied warranty of
016     * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
017     * GNU General Public License for more details.
018     *
019     * You should have received a copy of the GNU General Public License
020     * along with STAR Scheduler; if not, write to the Free Software
021     * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
022     */
023    
024    
025    /*
026     *  This module was derived from the module LSFDispatcher.java developed by G. Crcassi.
027     *  The module is dedicated to be the interface to the batch job scheduler PBS.
028     *  This version was prepared by Andrey Shevel@bnl.gov.
029     *    20 April 2004.
030     */
031    
032    
033    
034    
035    package gov.bnl.star.offline.scheduler.Dispatchers.pbs;
036    
037    import gov.bnl.star.offline.scheduler.ComponentLibrary;
038    import gov.bnl.star.offline.scheduler.Dispatcher;
039    import gov.bnl.star.offline.scheduler.Queue;
040    import gov.bnl.star.offline.scheduler.Job;
041    import gov.bnl.star.offline.scheduler.request.Request;
042    import gov.bnl.star.offline.scheduler.catalog.PhysicalFile;
043    import gov.bnl.star.offline.scheduler.util.CSHCommandLineTask;
044    //import gov.bnl.star.offline.scheduler.util.StatisticsRecorder; //Moved Statistics recording to Scheduler.java LH
045    
046    // I (Andrey Shevel@bnl.gov) added the line
047    import gov.bnl.star.offline.scheduler.Dispatchers.lsf.CSHApplication;
048    import gov.bnl.star.offline.scheduler.util.FilesystemToolkit;
049    
050    import java.util.List;
051    import java.util.logging.Level;
052    import org.apache.log4j.Logger;
053    
054    
055    
056    /** Dispatches a job using PBS.
057     * <p>
058     * For each process (job), two files are created: a script for the execution and
059     * a text file containing the file list. The script basically sets the
060     * environment variables and executes the command line. The file list
061     * contains the input file requested, one full path for each line in the list.
062     * <p>
063     * Each script is submitted through qsub.
064     * <p>
065     * The simulation flag will make the scheduler not actually execute the command
066     * lines. Therefore scripts and fileLists are created, but the qsub and chmod
067     * commands are not executed. Log and output won't be affected, except that there
068     * will be a message warning that the submission is simulated.
069     * @author Gabriele Carcassi
070     * @version 1.0 2002/12/26
071     */
072    public class PBSDispatcher implements Dispatcher {
073        static private Logger log = Logger.getLogger(PBSDispatcher.class.getName());
074        private String resourceStrategy;
075       // protected String scratchDir;
076        private String qsubEx;
077        protected boolean simulation = false;
078        private String queueName;
079        private String qsubOptions;
080        private int maxQsubAttempts;
081        private int msBtwnSuccess;
082        private int msBtwnFailure;
083        
084    //    public String getScratchDir() {
085    //        return scratchDir;
086    //    }
087    //    
088    //    public void setScratchDir(String scratchDir) {
089    //        this.scratchDir = scratchDir;
090    //    }
091        
092        public String getQsubEx() {
093            return qsubEx;
094        }
095        
096        public void setQsubEx(String qsubEx) {
097            this.qsubEx = qsubEx;
098        }
099    
100        public String getQueueName() {
101            return queueName;
102        }
103        
104        public void setQueueName(String queueName) {
105            this.queueName = queueName;
106        }
107        
108        public String getQsubOptions() {
109            return qsubOptions;
110        }
111        
112        public void setQsubOptions(String qsubOptions) {
113            this.qsubOptions = qsubOptions;
114        }
115        
116        public int getMaxAttempts() {
117            return maxQsubAttempts;
118        }
119        
120        public void setMaxAttempts(int maxAttempts) {
121            this.maxQsubAttempts = maxAttempts;
122        }
123        
124        public int getMsBtwnSuccess() {
125            return msBtwnSuccess;
126        }
127        
128        public void setMsBtwnSuccess(int msBtwnSuccess) {
129            this.msBtwnSuccess = msBtwnSuccess;
130        }
131        
132        public int getMsBtwnFailure() {
133            return msBtwnFailure;
134        }
135        
136        public void setMsBtwnFailure(int msBtwnFailure) {
137            this.msBtwnFailure = msBtwnFailure;
138        }
139        
140        protected boolean reportedFailure;
141        protected CSHApplication application;
142        private String resSwitch;
143    
144        /** Creates the scripts and dispatches the job on the target machine.
145         * @param request the job request
146         */
147        public void dispatch(Request request, List jobs) {
148            log.info("Dispatching using PBS: \"" + request.getCommand() + "\"");
149    
150            // Enables the simulation mode if necessary
151            useSimulationMode(request.getSimulation());
152            reportedFailure = false;
153    
154            // Submits from the higher to the lower JobID. This way the
155            // user has a feel of  when the last job is going to be
156            // submitted
157            for (int nJob = jobs.size() - 1; nJob >= 0;
158                    nJob--) {
159                Job job = (Job) jobs.get(nJob);
160    
161                System.out.print("Dispatching process " +
162                    job.getJobID() + ".");
163                dispatch(request, job);
164                if (getClusterName() != null) job.setCluster(getClusterName());
165            }
166    
167            //StatisticsRecorder.getIntance().recordStatistics(request, jobs); //Moved Statistics recording to Scheduler.java LH
168        }
169    
170        /* Enables or disables the simulation mode. The simulation mode will deactivate
171         * every command line execution.
172         */
173        protected void useSimulationMode(boolean simulation) {
174            this.simulation = simulation;
175    
176            if (simulation) {
177                // Warn the user that we are entering simulated submission mode
178                log.warn("Simulating submission");
179                System.out.println("Simulating submission");
180            }
181        }
182    
183        protected void reportProcessSubmissionFailure(Request request, Job job, int jobNumber, String message) {
184            reportFailure(job);
185            System.out.println("Process number " + jobNumber + " wasn't submitted.");
186            System.out.println(message);
187            System.out.println();
188            System.out.println("The process input file were:");
189    
190            List list = job.getInput();
191    
192            for (int nFile = 0; nFile < list.size(); nFile++) {
193                PhysicalFile file = (PhysicalFile) list.get(nFile);
194                System.out.println(" - " + file.getPath() + "/" +
195                    file.getFilename());
196            }
197        }
198    
199        protected void reportFailure(Job job) {
200            if (!reportedFailure) {
201                System.out.println("There were some errors during job submission.");
202                System.out.println("Some processes weren't submitted:");
203            }
204        }
205    
206        /** Currently not implemented
207         * @param request the job for which to retrieve the output
208         */
209        public void retrieveOutput(Request request, List jobs) {
210        }
211    
212        /* Dispatches a single process of a job request.
213         */
214        protected void dispatch(Request request, Job job) {
215            //application = (CSHApplication) ComponentLibrary.getInstance().getComponent("CSHApplication");
216            
217            //No longer get the CSHApplication object from the config file. Get it using the setApplication() and getApplication() via the config file
218            if(application == null){ //If this was not set in the config file or if we'er in junit testing mode then print an error, and use the default file
219                System.out.println(" >>>>>>>>>>>> CSHapplacation is not is italized.");  
220                String notSet = "The CSHApplication for this dispatcher was not set in the config file. If this is a jUnit test this is normal. Finding default \"CSHApplication\" in ComponentLibrary.";
221                log.warn(notSet);
222                System.out.println(notSet);
223                application = (CSHApplication) ComponentLibrary.getInstance().getComponent("CSHApplication");
224            }
225            
226            
227            // TODO: all the parameters should be passed in one go
228            application.setJob(request, job);
229            //application.setScratchDir(scratchDir);
230            application.setSubmissionCommand(getQsubCommand(request, job));
231    
232            application.prepareJob();
233    
234            log.info("Executing \"" + getQsubCommand(request, job) + "\"");
235    
236            if (!simulation) {
237                try {
238                    Thread.sleep(msBtwnSuccess);
239                } catch (Exception e) {
240                }
241    
242                long StarTime = System.currentTimeMillis();
243                int attempt = 0;
244                boolean success = false;
245    
246                while (!success && (attempt < maxQsubAttempts)) {
247                    try {
248                        CSHCommandLineTask task = new CSHCommandLineTask(getQsubCommand(
249                                    request, job), true, 30000);
250                        task.execute();
251    
252                        if (task.getExitStatus() != 0) {
253                            log.warn("qsub failed: " + task.getOutput());
254                            Thread.sleep(msBtwnFailure);
255                            System.out.print("/");
256                            attempt++;
257                        } else {
258                            success = true;
259                            job.DispatchSuccessful();
260                            job.setDispatchTime(((int) Math.min(System.currentTimeMillis() - StarTime, java.lang.Integer.MAX_VALUE)));
261                        }
262                    } catch (Exception e) {
263                        log.error("Couldn't submit the script to PBS", e);
264    
265                        try {
266                            Thread.sleep(msBtwnFailure);
267                        } catch (Exception e1) {
268                        }
269    
270                        System.out.print("/");
271                        attempt++;
272                    }
273                }
274    
275                if (success) {
276                    System.out.println(" done.");
277                } else {
278                    System.out.println(" FAILED!!");
279                }
280            } else {
281                System.out.println(" simulated.");
282            }
283        }
284    
285        /** Returns the full qsub command to be executed to dispatch the process. This
286        * command must executed in the directory in which the script resides.
287        * @return the qsub command
288        */
289        String getQsubCommand(Request request, Job job) {
290            
291            StringBuffer qsub = new StringBuffer("cd " + FilesystemToolkit.getCurrentDirectory() + "; " + qsubEx);
292            //StringBuffer qsub = new StringBuffer(qsubEx);
293            
294            //qsub.append(" -q ").append(getQueueName(job));
295            if(getQueueName(job) != null){
296               //if(job.getQueueObj().getName().trim().length() != 0 ) bsub.append(" -q ").append(job.getQueueObj().getName());
297                if(getQueueName(job).trim().length() != 0 ) qsub.append(" -q ").append(getQueueName(job));
298           }
299    
300            if (job.getTarget() != null) {
301    //            qsub.append(" -m ").append(job.getTarget());
302                qsub.append(" -l nodes=").append(job.getTarget());
303            }
304    
305            if (application.getJobName() != null) {
306    //          qsub.append(" -J '").append(application.getJobName()).append("'");
307                qsub.append(" -N '").append(application.getJobName()).append("'");
308            }
309    
310    //        if (application.getStdin() != null) {
311    //            qsub.append(" -i ").append(application.getStdin());
312    //        }
313    
314            if (application.getStdout() != null) {
315                qsub.append(" -o ").append(application.getStdout());
316            }
317    
318            if (application.getStderr() != null) {
319                qsub.append(" -e ").append(application.getStderr());
320            }
321    
322            int maxWallTime = request.getResource("WallTime").getMax();
323            if((getResourceUsageSwitch(job) != null) || maxWallTime !=  -1) {
324                qsub.append(" -l ");
325                if (maxWallTime !=  -1) {
326                    //An alternative syntax for the time specification is: #HH:#MM:#SS  (HH = hours, MM = minutes, SS = seconds)
327                    qsub.append("walltime=").append(maxWallTime);
328                    if(getResourceUsageSwitch(job) != null) qsub.append(",");   
329                }
330                if (getResourceUsageSwitch(job) != null){ 
331                    qsub.append("nodes=").append(getResourceUsageSwitch(job));
332                }  
333            }
334            qsub.append(' ').append(qsubOptions);
335    
336            
337            /*
338            The path is being dropped because different drivers for mounting the PanFS path are reporting the path differently. Instated we will put "cd $path" before the command we execute.   
339            Example:
340            /pdirect/star+institutions/data05/scratch/lbhajdu/
341            /star/data05/scratch/lbhajdu/
342            */
343            qsub.append(' ').append(application.getCSHScriptFileName());
344            //qsub.append(' ').append(application.getCommandLine());
345    
346            return qsub.toString();
347        }
348    
349        
350        private PBSResourceStrategy pbsResourceStrategy;
351        
352        /** Holds value of property clusterName. */
353        private String clusterName;
354        
355        public void setResourceStrategy(PBSResourceStrategy resourceStrategy) {
356            this.pbsResourceStrategy = resourceStrategy;
357        }
358        
359        public PBSResourceStrategy getResourceStrategy() {
360            return pbsResourceStrategy;
361        }
362    
363        protected String getResourceUsageSwitch(Job job) {
364            //FIXME: cache value
365            if (getResourceStrategy() == null) {
366                return null;
367            }
368    
369            resSwitch = getResourceStrategy().prepareResourceUsageSwitch(job);
370    
371            return resSwitch;
372        }
373    
374        protected String getQueueName(Job job) {
375            String queue = job.getQueue();
376    
377            if (queue == null) {
378                return queueName;
379            }
380    
381            return queue;
382        }
383        
384        /** Getter for property clusterName.
385         * @return Value of property clusterName.
386         *
387         */
388        public String getClusterName() {
389            return this.clusterName;
390        }
391        
392        /** Setter for property clusterName.
393         * @param clusterName New value of property clusterName.
394         *
395         */
396        public void setClusterName(String clusterName) {
397            this.clusterName = clusterName;
398        }
399        
400        
401        /** Set the class that writes the sricpt that will be executed by the batch system */
402        public void setApplication(CSHApplication application){
403                this.application = application;
404        }
405    
406        /** Get the class that writes the sricpt that will be executed by the batch system */
407        public CSHApplication getApplication(){
408                return application;
409        }      
410        
411        public void Kill(Request request, List jobs) {
412        }
413        
414        public String Status(Job job, int Processe) {
415            return "status unavailable";
416        }   
417        
418        public void stop() {
419        }
420        
421        
422        /**
423         * Runs test(s) on underlying components to determine if submitting jobs should be attempted.
424         * @param queue The queue object to be tested
425         * @return Will return true to indicate everything is alright and false if the test has failed
426         */
427         public boolean test(Queue queue){
428            return true;
429         }
430        
431    }