001    /*
002     * BOSSDispatcher.java
003     *
004     * Created on Thu Apr  8 12:40:51 EDT 2004
005     *
006     * This file is part of the STAR Scheduler.
007     * Copyright (c) 2002-2003 STAR Collaboration - Brookhaven National Laboratory
008     *
009     * STAR Scheduler is free software; you can redistribute it and/or modify
010     * it under the terms of the GNU General Public License as published by
011     * the Free Software Foundation; either version 2 of the License, or
012     * (at your option) any later version.
013     *
014     * STAR Scheduler is distributed in the hope that it will be useful,
015     * but WITHOUT ANY WARRANTY; without even the implied warranty of
016     * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
017     * GNU General Public License for more details.
018     *
019     * You should have received a copy of the GNU General Public License
020     * along with STAR Scheduler; if not, write to the Free Software
021     * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
022     */
023    
024    
025    /* This module was derived from the module PBSDispatcher.java.
026       It was prepared as the interface to the BOSS tracking system
027       with using the set of the scripts available at 
028       ftp://ram3.chem.sunysb.edu/pub/suny-gt-2/gsuny.tar.gz.
029       The interface was prepared by Andrey Shevel@bnl.gov
030         20 April 2004
031     */
032    
033    
034    
035    package gov.bnl.star.offline.scheduler.Dispatchers.boss;
036    
037    import gov.bnl.star.offline.scheduler.ComponentLibrary;
038    import gov.bnl.star.offline.scheduler.Dispatcher;
039    import gov.bnl.star.offline.scheduler.Queue;
040    import gov.bnl.star.offline.scheduler.Job;
041    import gov.bnl.star.offline.scheduler.request.Request;
042    import gov.bnl.star.offline.scheduler.catalog.PhysicalFile;
043    import gov.bnl.star.offline.scheduler.util.CSHCommandLineTask;
044    //import gov.bnl.star.offline.scheduler.util.StatisticsRecorder; //Moved Statistics recording to Scheduler.java LH
045    
046    import gov.bnl.star.offline.scheduler.util.FilesystemToolkit;
047    
048    // I (Andrey Shevel@bnl.gov) added the line
049    import gov.bnl.star.offline.scheduler.Dispatchers.lsf.CSHApplication;
050    
051    import java.util.List;
052    //import java.util.logging.Level;
053    import org.apache.log4j.Logger;
054    
055    
056    
057    /** Dispatches a job using BOSS.
058     * <p>
059     * For each process (job), two files are created: a script for the execution and
060     * a text file containing the file list. The script basically sets the
061     * environment variables and executes the command line. The file list
062     * contains the input file requested, one full path for each line in the list.
063     * <p>
064     * Each script is submitted through boss.
065     * <p>
066     * The simulation flag will make the scheduler not actually execute the command
067     * lines. Therefore scripts and fileLists are created, but the boss and chmod
068     * commands are not executed. Log and output won't be affected, except that there
069     * will be a message warning that the submission is simulated.
070     * @author Gabriele Carcassi
071     * @version 1.0 2002/12/26
072     */
073    public class BOSSDispatcher implements Dispatcher {
074        static private Logger log = Logger.getLogger(BOSSDispatcher.class.getName());
075        private String resourceStrategy;
076        //protected String scratchDir;
077        private String bossEx;
078        protected boolean simulation = false;
079        private String queueName;
080        private String bossOptions;
081        private int maxBossAttempts;
082        private int msBtwnSuccess;
083        private int msBtwnFailure;
084        
085    //    public String getScratchDir() {
086    //        return scratchDir;
087    //    }
088    //    
089    //    public void setScratchDir(String scratchDir) {
090    //        this.scratchDir = scratchDir;
091    //    }
092        
093        public String getBossEx() {
094            return bossEx;
095        }
096        
097        public void setBossEx(String bossEx) {
098            this.bossEx = bossEx;
099        }
100    
101        public String getQueueName() {
102            return queueName;
103        }
104        
105        public void setQueueName(String queueName) {
106            this.queueName = queueName;
107        }
108        
109        public String getBossOptions() {
110            return bossOptions;
111        }
112        
113        public void setBossOptions(String bossOptions) {
114            this.bossOptions = bossOptions;
115        }
116        
117        public int getMaxAttempts() {
118            return maxBossAttempts;
119        }
120        
121        public void setMaxAttempts(int maxAttempts) {
122            this.maxBossAttempts = maxAttempts;
123        }
124        
125        public int getMsBtwnSuccess() {
126            return msBtwnSuccess;
127        }
128        
129        public void setMsBtwnSuccess(int msBtwnSuccess) {
130            this.msBtwnSuccess = msBtwnSuccess;
131        }
132        
133        public int getMsBtwnFailure() {
134            return msBtwnFailure;
135        }
136        
137        public void setMsBtwnFailure(int msBtwnFailure) {
138            this.msBtwnFailure = msBtwnFailure;
139        }
140        
141        protected boolean reportedFailure;
142        protected CSHApplication application;
143        private String resSwitch;
144    
145        /** Creates the scripts and dispatches the job on the target machine.
146         * @param request the job request
147         */
148        public void dispatch(Request request, List jobs) {
149            log.info("Dispatching using BOSS: \"" + request.getCommand() + "\"");
150    
151            // Enables the simulation mode if necessary
152            useSimulationMode(request.getSimulation());
153            reportedFailure = false;
154    
155            // Submits from the higher to the lower JobID. This way the
156            // user has a feel of  when the last job is going to be
157            // submitted
158            for (int nJob = jobs.size() - 1; nJob >= 0;
159                    nJob--) {
160                Job job = (Job) jobs.get(nJob);
161    
162                System.out.print("Dispatching process " +
163                    job.getJobID() + ".");
164                dispatch(request, job);
165                if (getClusterName() != null) job.setCluster(getClusterName());
166            }
167    
168            //StatisticsRecorder.getIntance().recordStatistics(request, jobs); //Moved Statistics recording to Scheduler.java LH
169        }
170    
171        /* Enables or disables the simulation mode. The simulation mode will deactivate
172         * every command line execution.
173         */
174        protected void useSimulationMode(boolean simulation) {
175            this.simulation = simulation;
176    
177            if (simulation) {
178                // Warn the user that we are entering simulated submission mode
179                log.warn("Simulating submission");
180                System.out.println("Simulating submission");
181            }
182        }
183    
184        protected void reportProcessSubmissionFailure(Request request, Job job, int jobNumber, String message) {
185            reportFailure(job);
186            System.out.println("Process number " + jobNumber + " wasn't submitted.");
187            System.out.println(message);
188            System.out.println();
189            System.out.println("The process input file were:");
190    
191            List list = job.getInput();
192    
193            for (int nFile = 0; nFile < list.size(); nFile++) {
194                PhysicalFile file = (PhysicalFile) list.get(nFile);
195                System.out.println(" - " + file.getPath() + "/" +
196                    file.getFilename());
197            }
198        }
199    
200        protected void reportFailure(Job job) {
201            if (!reportedFailure) {
202                System.out.println("There were some errors during job submission.");
203                System.out.println("Some processes weren't submitted:");
204            }
205        }
206    
207        /** Currently not implemented
208         * @param request the job for which to retrieve the output
209         */
210        public void retrieveOutput(Request request, List jobs) {
211        }
212    
213        /* Dispatches a single process of a job request.
214         */
215        protected void dispatch(Request request, Job job) {
216            //application = (CSHApplication) ComponentLibrary.getInstance().getComponent("CSHApplication");
217            
218            //No longer get the CSHApplication object from the config file. Get it using the setApplication() and getApplication() via the config file
219            if(application == null){ //If this was not set in the config file or if we'er in junit testing mode then print an error, and use the default file
220                System.out.println(" >>>>>>>>>>>> CSHapplacation is not is italized.");  
221                String notSet = "The CSHApplication for this dispatcher was not set in the config file. If this is a jUnit test this is normal. Finding default \"CSHApplication\" in ComponentLibrary.";
222                log.warn(notSet);
223                System.out.println(notSet);
224                application = (CSHApplication) ComponentLibrary.getInstance().getComponent("CSHApplication");
225            }
226            
227            // TODO: all the parameters should be passed in one go
228            application.setJob(request, job);
229            //application.setScratchDir(scratchDir);
230            application.setSubmissionCommand(getBossCommand(request, job));
231    
232            application.prepareJob();
233    
234            log.info("Executing \"" + getBossCommand(request, job) + "\"");
235    
236            if (!simulation) {
237                try {
238                    Thread.sleep(msBtwnSuccess);
239                } catch (Exception e) {
240                }
241    
242                long StarTime = System.currentTimeMillis();
243                int attempt = 0;
244                boolean success = false;
245    
246                while (!success && (attempt < maxBossAttempts)) {
247                    try {
248                        CSHCommandLineTask task = new CSHCommandLineTask(getBossCommand(
249    //                               request, job), true, 30000);
250                                   request, job), true, 130000);
251                        task.execute();
252    
253                        if (task.getExitStatus() != 0) {
254                            log.warn("boss failed: " + task.getOutput());
255                            Thread.sleep(msBtwnFailure);
256                            System.out.print("/");
257                            attempt++;
258                        } else {
259                            success = true;
260                            job.DispatchSuccessful();
261                            job.setDispatchTime(((int) Math.min(System.currentTimeMillis() - StarTime, java.lang.Integer.MAX_VALUE)));
262                        }
263                    } catch (Exception e) {
264                        log.warn("Couldn't submit the script to BOSS", e);
265    
266                        try {
267                            Thread.sleep(msBtwnFailure);
268                        } catch (Exception e1) {
269                        }
270    
271                        System.out.print("/");
272                        attempt++;
273                    }
274                }
275    
276                if (success) {
277                    System.out.println(" done.");
278                } else {
279                    System.out.println(" FAILED!!");
280                }
281            } else {
282                System.out.println(" simulated.");
283            }
284        }
285    
286        /** Returns the full boss command to be executed to dispatch the process. This
287        * command must executed in the directory in which the script resides.
288        * @return the boss command
289        */
290        String getBossCommand(Request request, Job job) {
291              
292              //StringBuffer boss = new StringBuffer(bossEx);
293              StringBuffer boss = new StringBuffer("cd " + FilesystemToolkit.getCurrentDirectory() + "; " + bossEx);
294            
295    //        boss.append(" submit -jobtype ").append(getQueueName(job));
296    
297    //        19 Apr 2004 Andrey 
298    //        boss.append(" -executable ").append(application.getCommandLine());
299    
300    //        if (job.getTarget() != null) {
301    //            boss.append(" -m ").append(job.getTarget());
302    //            boss.append(" -l nodes=").append(job.getTarget());
303    //        }
304    
305    //        if (application.getJobName() != null) {
306    //          boss.append(" -J '").append(application.getJobName()).append("'");
307    //            boss.append(" -jobtype '").append(application.getJobName()).append("'");
308    //        }
309    
310    //        if (application.getStdin() != null) {
311    //            boss.append(" -stdin ").append(application.getStdin());
312    //        }
313    
314    //        if (application.getStdout() != null) {
315    //            boss.append(" -stdout ").append(application.getStdout());
316    //        }
317    
318    //        if (application.getStderr() != null) {
319    //            boss.append(" -stderr ").append(application.getStderr());
320    //        }
321    
322    //        if (getResourceUsageSwitch(job) != null) {
323    //            boss.append(" -R ").append(getResourceUsageSwitch(job));
324    //              boss.append(" -l nodes=").append(getResourceUsageSwitch(job));
325    //        }
326            
327    //        boss.append(' ').append(bossOptions);
328    
329              boss.append(' ').append(application.getCSHScriptFileName());
330              //boss.append(' ').append(application.getCommandLine());
331    
332              return boss.toString();
333        }
334    
335        
336        private BOSSResourceStrategy bossResourceStrategy;
337        
338        /** Holds value of property clusterName. */
339        private String clusterName;
340        
341        public void setResourceStrategy(BOSSResourceStrategy resourceStrategy) {
342            this.bossResourceStrategy = resourceStrategy;
343        }
344        
345        public BOSSResourceStrategy getResourceStrategy() {
346            return bossResourceStrategy;
347        }
348    
349        protected String getResourceUsageSwitch(Job job) {
350            //FIXME: cache value
351            if (getResourceStrategy() == null) {
352                return null;
353            }
354    
355            resSwitch = getResourceStrategy().prepareResourceUsageSwitch(job);
356    
357            return resSwitch;
358        }
359    
360        protected String getQueueName(Job job) {
361            String queue = job.getQueue();
362    
363            if (queue == null) {
364                return queueName;
365            }
366    
367            return queue;
368        }
369        
370        /** Getter for property clusterName.
371         * @return Value of property clusterName.
372         *
373         */
374        public String getClusterName() {
375            return this.clusterName;
376        }
377        
378        /** Setter for property clusterName.
379         * @param clusterName New value of property clusterName.
380         *
381         */
382        public void setClusterName(String clusterName) {
383            this.clusterName = clusterName;
384        }
385        
386        /** Set the class that writes the sricpt that will be executed by the batch system */
387        public void setApplication(CSHApplication application){
388                this.application = application;
389        }
390    
391        /** Get the class that writes the sricpt that will be executed by the batch system */
392        public CSHApplication getApplication(){
393                return application;
394        }
395    
396        
397        public void Kill(Request request, List jobs) {
398        }    
399        
400        public String Status(Job job, int Processe) {
401            return "status unavailable";
402        }    
403        
404        public void stop() {
405        }
406        
407        
408        /**
409         * Runs test(s) on underlying components to determine if submitting jobs should be attempted.
410         * @param queue The queue object to be tested
411         * @return Will return true to indicate everything is alright and false if the test has failed
412         */
413         public boolean test(Queue queue){
414            return true;
415         }
416    
417        
418        
419    }