001    /*
002     * LSFDispatcher.java
003     *
004     * Created on December 23, 2002, 11:30 AM
005     *
006     * This file is part of the STAR Scheduler.
007     * Copyright (c) 2002-2006 STAR Collaboration - Brookhaven National Laboratory
008     *
009     * STAR Scheduler is free software; you can redistribute it and/or modify
010     * it under the terms of the GNU General Public License as published by
011     * the Free Software Foundation; either version 2 of the License, or
012     * (at your option) any later version.
013     *
014     * STAR Scheduler is distributed in the hope that it will be useful,
015     * but WITHOUT ANY WARRANTY; without even the implied warranty of
016     * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
017     * GNU General Public License for more details.
018     *
019     * You should have received a copy of the GNU General Public License
020     * along with STAR Scheduler; if not, write to the Free Software
021     * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
022     */
023    package gov.bnl.star.offline.scheduler.Dispatchers.local;
024    
025    import gov.bnl.star.offline.scheduler.Dispatchers.lsf.*;
026    import gov.bnl.star.offline.scheduler.ComponentLibrary;
027    import gov.bnl.star.offline.scheduler.Dispatcher;
028    import gov.bnl.star.offline.scheduler.Queue;
029    import gov.bnl.star.offline.scheduler.Job;
030    import gov.bnl.star.offline.scheduler.request.Request;
031    import gov.bnl.star.offline.scheduler.catalog.PhysicalFile;
032    import gov.bnl.star.offline.scheduler.util.CSHCommandLineTask;
033    //import gov.bnl.star.offline.scheduler.util.StatisticsRecorder; //Moved Statistics recording to Scheduler.java LH
034    import gov.bnl.star.offline.scheduler.util.GenericResourceRequirementStringDefinition;
035    
036    
037    import java.util.List;
038    import java.util.logging.Level;
039    import org.apache.log4j.Logger;
040    
041    
042    
043    /** A dispatcher for dispatching jobs to fork on the local node.
044     * 
045     * The simulation flag will make the scheduler not actually execute the command
046     * lines. Therefore scripts and fileLists are created, but the 
047     * commands are not executed. Log and output won't be affected, except that there
048     * will be a message warning that the submission is simulated.
049     *
050     * @author Levente Hajdu
051     * @version 1.0 2002/12/26
052     */
053    public class LocalDispatcher extends LSFDispatcher implements Dispatcher {
054    
055        static private Logger log = Logger.getLogger(LocalDispatcher.class.getName());
056    
057        private String bsubEx;
058        protected boolean simulation = false;
059        private String bsubOptions;
060        private int maxBsubAttempts;
061        private int msBtwnSuccess;
062        private int msBtwnFailure;
063        private String ResReqDefinitionObj;
064        
065        public void setResourceRequirementStringDefinition(String ResReqDefinitionObj){
066            this.ResReqDefinitionObj = ResReqDefinitionObj;
067            
068        }
069     
070        
071        /** @return max number of times submitting the job should be tried. **/
072        public int getMaxAttempts() {
073            return maxBsubAttempts;
074        }
075        
076        /** Set max number of times submitting the job should be tried. 
077         *  @param maxAttempts numbmer of attempts 
078         **/
079        public void setMaxAttempts(int maxAttempts) {
080            this.maxBsubAttempts = maxAttempts;
081        }
082        
083        /** @return The number of milliseconds to delay between successful submissions. */
084        public int getMsBtwnSuccess() {
085            return msBtwnSuccess;
086        }
087        
088        /** sets the number of milliseconds to delay between successful submissions. 
089         *  @param msBtwnSuccess delay in milliseconds
090         */
091        public void setMsBtwnSuccess(int msBtwnSuccess) {
092            this.msBtwnSuccess = msBtwnSuccess;
093        }
094        
095        /** @return The number of milliseconds to delay between failed submissions. 
096         */
097        public int getMsBtwnFailure() {
098            return msBtwnFailure;
099        }
100        
101        /* Sets the number of milliseconds to delay between failed submissions. 
102         * @param msBtwnSuccess delay in milliseconds
103         */
104        public void setMsBtwnFailure(int msBtwnFailure) {
105            this.msBtwnFailure = msBtwnFailure;
106        }
107        
108        protected boolean reportedFailure;
109        protected CSHApplication application;
110        private String resSwitch;
111        
112        void getGenericResourceRequirementStringDefinition(){
113            
114        }
115        
116        //GenericResourceRequirementStringDefinition
117    
118        /** Creates the scripts and dispatches the job on the target machine.
119         * @param request the job request
120         */
121        
122        public void dispatch(Request request, List jobs) {
123            log.info("Dispatching using local system: \"" + request.getCommand() + "\"");
124    
125            // Enables the simulation mode if necessary
126            useSimulationMode(request.getSimulation());
127            
128            
129            
130            reportedFailure = false;
131    
132            // Submits from the higher to the lower JobID. This way the
133            // user has a feel of  when the last job is going to be
134            // submitted
135            for (int nJob = jobs.size() - 1; nJob >= 0;
136                    nJob--) {
137                Job job = (Job) jobs.get(nJob);
138    
139                System.out.print("Dispatching process " +
140                    job.getJobID() + ".");
141                dispatch(request, job);
142                if (getClusterName() != null) job.setCluster(getClusterName());
143            }
144    
145            //StatisticsRecorder.getIntance().recordStatistics(request, jobs); //Moved Statistics recording to Scheduler.java LH
146        }
147       
148        /* Enables or disables the simulation mode. The simulation mode will deactivate
149         * every command line execution.
150         */
151        
152        
153        public void useSimulationMode(boolean simulation) {
154            this.simulation = simulation;
155    
156            if (simulation) {
157                // Warn the user that we are entering simulated submission mode
158                log.warn("Simulating submission");
159                System.out.println("Simulating submission");
160            }
161        }
162        
163    
164    
165        /* Dispatches a single process of a job request.*/
166        protected void dispatch(Request request, Job job) {
167            //application = (CSHApplication) ComponentLibrary.getInstance().getComponent("CSHApplication");
168            //No longer get the CSHApplication object from the config file. Get it using the setApplication() and getApplication() via the config file
169            if(application == null){ //If this was not set in the config file or if we'er in junit testing mode then print an error, and use the default file
170                System.out.println(" >>>>>>>>>>>> CSHapplacation is not is italized.");  
171                String notSet = "The CSHApplication for this dispatcher was not set in the config file. If this is a jUnit test this is normal. Finding default \"CSHApplication\" in ComponentLibrary.";
172                log.warn(notSet);
173                System.out.println(notSet);
174                application = (CSHApplication) ComponentLibrary.getInstance().getComponent("CSHApplication");
175            }
176            
177            //System.out.println("debug 100 dispatching!!");
178            
179            // TODO: all the parameters should be passed in one go
180            application.setJob(request, job);
181            //application.setScratchDir(scratchDir);
182            application.setSubmissionCommand(getLocalDispatchCommand(request, job));
183    
184            application.prepareJob();
185    
186            log.info("Executing \"" + getLocalDispatchCommand(request, job) + "\"");
187    
188            if (!simulation) {
189                //System.out.println("!Not simulation"); //used for debuginh 
190                try {
191                    Thread.sleep(msBtwnSuccess);
192                } catch (Exception e) {
193                }
194    
195                long StarTime = System.currentTimeMillis();
196                int attempt = 0;
197                boolean success = false;
198                String pe="";
199    
200                while (!success && (attempt < maxBsubAttempts) && run) {
201                    try {
202                        CSHCommandLineTask task = new CSHCommandLineTask(getLocalDispatchCommand(
203                                    request, job), true, getMaxElapseTime());
204                        task.execute();
205    
206                        if (task.getExitStatus() != 0) {
207                            log.warn("Local submit failed: " + task.getOutput());
208                            Thread.sleep(msBtwnFailure);
209                            System.out.print("/");
210                            attempt++;
211                        } else {
212                            success = true;
213                            job.DispatchSuccessful();
214                            
215                            
216                            //change code to get the local pid
217                            //job.AddProcesseID(task.getOutput().substring(task.getOutput().indexOf("your job") + 8 ,task.getOutput().indexOf('(') - 1).trim());
218                            
219                            
220                            job.setDispatchTime(((int) Math.min(System.currentTimeMillis() - StarTime, java.lang.Integer.MAX_VALUE)));
221                            
222                            
223                            //The // lines are used for debuging to verfiy the pid was recovored corectly. 
224                            //System.out.println("output >> " +  task.getOutput() + "\n" );
225                            //System.out.println("pid = \"" + task.getOutput().substring(task.getOutput().indexOf("]") + 1,task.getOutput().indexOf('\n')).trim() + "\"");
226                            job.AddProcesseID(task.getOutput().substring(task.getOutput().indexOf("]") + 1,task.getOutput().indexOf('\n')).trim());
227                           
228                             
229                        }
230                    } catch (Exception e) {
231                        log.error("Couldn't submit the script to SGE", e);
232                        // JL: write also to STDOUT to help users debug what happened
233                        if (pe == ""){
234                           pe = e.toString();
235                           System.out.print("Couldn't submit the script to SGE" + pe);
236                        }
237                        try {
238                            Thread.sleep(msBtwnFailure);
239                        } catch (Exception e1) {
240                        }
241    
242                        System.out.print("/");
243                        attempt++;
244                    }
245                }
246    
247                if (success) {
248                    System.out.println(" done.");
249                } else {
250                    System.out.println(" FAILED!!");
251                }
252            } else {
253                System.out.println(" simulated.");
254            }
255        }
256    
257        /** Retunes the srtring (command) to submit the job. **/
258        String getLocalDispatchCommand(Request request, Job job) {
259        
260            //System.out.println("commmand = \"" + application.getCommandLine() + "\"");
261            //return "csh -c " + application.getCommandLine();
262            
263            
264            //The output should look like this "(job.csh > file.out) >& error.out &"
265            
266            StringBuffer LocalDispatchCommand = new StringBuffer("( " + application.getCommandLine());
267    
268           if (application.getStdout() != null) {
269                LocalDispatchCommand.append("> " + application.getStdout());
270            }
271           
272           LocalDispatchCommand.append(" )");
273    
274           if (application.getStderr() != null) {
275                LocalDispatchCommand.append(" >& ").append(application.getStderr());
276           }
277                
278           LocalDispatchCommand.append(" &");
279           
280           return LocalDispatchCommand.toString();
281           
282        
283        }
284    
285      
286            /** Set the class that writes the sricpt that will be executed by the batch system */
287            public void setApplication(CSHApplication application){
288                    this.application = application;
289            }
290            
291            /** Get the class that writes the sricpt that will be executed by the batch system */
292            public CSHApplication getApplication(){
293                    return application;
294            }
295        
296        
297        
298            /**Kill all submitted jobs in the job list.
299             *  @param request the job request
300             *  @param jobs a list of jobs to be killed
301             **/
302            public void Kill(Request request, List jobs) {
303            
304            //System.out.println("lsfr kill");
305            
306            for(int z=0; z != jobs.size(); z++){
307                Job job = (Job)jobs.get(z);
308                
309                //System.out.println("working no job : " + job.getJobID());
310            
311                if(job.getProcesseIDs().size() == 0){
312                    System.out.println("No ProcesseIDs found for job " + job.getJobID());
313                    jobs.remove(z);
314                    z--;
315                  
316                }
317                else{
318                    for(int i=0; job.getProcesseIDs().size() != i; i++){
319    
320                        int attempt = 0;
321                        boolean success = false;
322                        String commmandOutput = "";
323                        System.out.print("ProcesseID: <" + job.getProcesseIDs().get(i) + "> of Job: <" + job.getJobID() + ">");
324    
325                        while (!success && (attempt < maxBsubAttempts) && run) {
326                                try {
327                                   CSHCommandLineTask task = new CSHCommandLineTask("kill " + ((String) job.getProcesseIDs().get(i)) , true, getMaxElapseTime());
328                                    task.execute();
329                                    if (task.getExitStatus() != 0) {
330                                        log.warn("kill " + task.getOutput());
331                                        Thread.sleep(msBtwnFailure);
332                                        if(task.getOutput().lastIndexOf("does not exist") != -1) success = true;
333                                        System.out.print(task.getOutput());
334                                        attempt++;
335                                    } 
336                                    else{ 
337                                        success = true;
338                                        System.out.println("Killed");
339                                    }
340    
341                                    commmandOutput = task.getOutput();
342                                } 
343                                catch (Exception e) { System.out.print("kill failed" + e); 
344                                System.out.print(commmandOutput);
345                                }
346                                try { Thread.sleep(msBtwnFailure);} 
347                                catch (Exception e1) {System.out.print("kill failed");}
348                                    if(!success) System.out.print("/");
349                                    attempt++;
350                        }
351    
352                    }
353                    job.clearProcesseIDs();
354                    jobs.remove(z);
355                    z--;
356                } 
357           }
358     }
359            
360            /** Get the status of the jobs object processe N. 
361              * @param job The job object of the running process
362              * @param Processe the index of the running process of this object
363             **/
364            public String Status(Job job, int Processe) {
365                //return "No data avalable (Local Processe)";
366                
367                 //ps -o stat 11000
368                
369                
370                if(job.getProcesseIDs().size() == 0) return "No ProcesseIDs found for job " + job.getJobID();
371                if(job.getProcesseIDs().size() < Processe) return job.getJobID() + " only has " + job.getProcesseIDs().size() + "processes, processe " + Processe + "dose not exist.";
372                
373                    
374               // for(int i=0; job.getProcesseIDs().size() != i; i++){
375    
376                    int attempt = 0;
377                    boolean success = false;
378                    String commmandOutput = "";
379                    System.out.print("ProcesseID: <" + job.getProcesseIDs().get(Processe) + "> of Job: <" + job.getJobID() + ">");
380    
381                    while (!success && (attempt < maxBsubAttempts)) {
382                            try {
383                               CSHCommandLineTask task = new CSHCommandLineTask("ps -o stat " + ((String) job.getProcesseIDs().get(Processe)), true, getMaxElapseTime());
384                                task.execute();
385                                if (task.getExitStatus() != 0) {
386                                    log.warn("ps : " + task.getOutput());
387                                    Thread.sleep(msBtwnFailure);
388                                    
389                                    attempt++;
390                                } 
391                                else{ 
392                                    success = true;
393                                    
394                                    String state = task.getOutput().replaceFirst("STAT","").replaceAll("\n", "").trim();
395                                   
396                                    if(state == null) return "job is currently not running";
397                                    
398                                    //state.compareTo("D") == 0
399                                    
400                                    else if(state == "") return "job is currently not running";
401                                    else if(state.compareTo("D") == 0) return "job is in uninterruptible sleep";
402                                    else if(state.compareTo("R") == 0) return "job is on the run queue";
403                                    else if(state.compareTo("S") == 0) return "sleeping";
404                                    else if(state.compareTo("T") == 0) return "job is traced or stopped";
405                                    else if(state.compareTo("Z") == 0) return "job is defunct (\"zombie\") process";
406                                    else if(state.compareTo("W") == 0) return "job has no resident pages";
407                                    else if(state.compareTo("<") == 0) return "job is high-priority";
408                                    else if(state.compareTo("N") == 0) return "job is low-priority task";
409                                    else if(state.compareTo("N") == 0) return "job is low-priority task";
410                                    else if(state.compareTo("L") == 0) return "Job has pages locked into memory";
411                                    else return "\"" + state + "\"";
412                                    
413                                }
414    
415                                commmandOutput = task.getOutput();
416                            } 
417                            catch (Exception e) { System.out.print("ps failed" + e); 
418                            System.out.print(commmandOutput);
419                            }
420                            try { Thread.sleep(msBtwnFailure);} 
421                            catch (Exception e1) {System.out.print("ps failed");}
422                                if(!success) System.out.print("/");
423                                attempt++;
424                    }
425    
426                
427               
428            return "No status could be recovered / job is currently not running";
429         }
430        
431            public void retrieveOutput(Request job, List jobs) {}
432            
433            public volatile boolean run = true;
434            public void stop(){ run = false; }
435            
436            
437        /**
438         * Runs test(s) on underlying components to determine if submitting jobs should be attempted.
439         * @param queue The queue object one is trying to submit to.
440         * @return Will return true to indicate everything is alright and false if the test has failed
441         */
442         public boolean test(Queue queue){
443            return true;
444         }
445            
446            
447    }