001    /*
002     * LSFDispatcher.java
003     *
004     * Created on December 23, 2002, 11:30 AM
005     *
006     * This file is part of the STAR Scheduler.
007     * Copyright (c) 2002-2006 STAR Collaboration - Brookhaven National Laboratory
008     *
009     * STAR Scheduler is free software; you can redistribute it and/or modify
010     * it under the terms of the GNU General Public License as published by
011     * the Free Software Foundation; either version 2 of the License, or
012     * (at your option) any later version.
013     *
014     * STAR Scheduler is distributed in the hope that it will be useful,
015     * but WITHOUT ANY WARRANTY; without even the implied warranty of
016     * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
017     * GNU General Public License for more details.
018     *
019     * You should have received a copy of the GNU General Public License
020     * along with STAR Scheduler; if not, write to the Free Software
021     * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
022     */
023    package gov.bnl.star.offline.scheduler.Dispatchers.lsf;
024    
025    import gov.bnl.star.offline.scheduler.ComponentLibrary;
026    import gov.bnl.star.offline.scheduler.Dispatcher;
027    import gov.bnl.star.offline.scheduler.Queue;
028    import gov.bnl.star.offline.scheduler.Job;
029    import gov.bnl.star.offline.scheduler.request.Request;
030    import gov.bnl.star.offline.scheduler.catalog.PhysicalFile;
031    import gov.bnl.star.offline.scheduler.util.CSHCommandLineTask;
032    //import gov.bnl.star.offline.scheduler.util.StatisticsRecorder; //Moved Statistics recording to Scheduler.java LH
033    import gov.bnl.star.offline.scheduler.util.GenericResourceRequirementStringDefinition;
034    
035    
036    import gov.bnl.star.offline.scheduler.Dispatchers.lsf.LSFNodePriorityStringGenerator;
037    
038    import gov.bnl.star.offline.scheduler.Dispatchers.AbstractResourceStrategy;
039    import gov.bnl.star.offline.scheduler.util.FilesystemToolkit;
040    
041    
042    
043    import java.util.List;
044    import java.util.ArrayList;
045    import java.util.logging.Level;
046    import org.apache.log4j.Logger;
047    
048    
049    
050    /** Dispatches a job using LSF.
051     * <p>
052     * For each process, two files are created: a script for the execution and
053     * a text file containing the file list. The script basically sets the
054     * environment variables and executes the command line. The file list
055     * contains the input file requested, one full path for each line in the list.
056     * <p>
057     * Each script is submitted through bsub.
058     * <p>
059     * The simulation flag will make the scheduler not actually execute the command
060     * lines. Therefore scripts and fileLists are created, but the bsub and chmod
061     * commands are not executed. Log and output won't be affected, except that there
062     * will be a message warning that the submission is simulated.
063     *
064     * @author Gabriele Carcassi, Jerome Lauret, Levente Hajdu
065     * @version 1.0 2002/12/26
066     */
067    public class LSFDispatcher extends gov.bnl.star.offline.scheduler.Dispatchers.DispatcherBase implements Dispatcher, java.io.Serializable {
068        static private Logger log = Logger.getLogger(LSFDispatcher.class.getName());
069        private String resourceStrategy;
070        protected String scratchDir;
071        private String bsubEx;
072        protected boolean simulation = false;
073        private String queueName;
074        private String bsubOptions;
075        private int maxBsubAttempts;
076        private int msBtwnSuccess;
077        private int msBtwnFailure;
078        private int maxElapseTime = 30000;
079        private String ResReqDefinitionObj;
080        private  boolean omitTargetNode  = false;
081        
082        public void setOmitTargetNode(boolean  omitTargetNode){this.omitTargetNode = omitTargetNode;}
083        public boolean getOmitTargetNode(){return omitTargetNode;}
084        
085        public void setMaxElapseTime(int maxElapseTime){
086            this.maxElapseTime = maxElapseTime;
087        }
088        
089        public int getMaxElapseTime(){
090            return maxElapseTime;
091        }
092        
093        
094        public void setResourceRequirementStringDefinition(String ResReqDefinitionObj){
095            this.ResReqDefinitionObj = ResReqDefinitionObj;    
096        }
097      
098        
099    //    public String getScratchDir() {
100    //        return scratchDir;
101    //    }
102    //    
103    //    public void setScratchDir(String scratchDir) {
104    //        this.scratchDir = scratchDir;
105    //    }
106        
107        public String getBsubEx() {
108            return bsubEx;
109        }
110        
111        public void setBsubEx(String bsubEx) {
112            this.bsubEx = bsubEx;
113        }
114    
115        public String getQueueName() {
116            return queueName;
117        }
118        
119        public void setQueueName(String queueName) {
120            this.queueName = queueName;
121        }
122        
123        public String getBsubOptions() {
124            return bsubOptions;
125        }
126        
127        public void setBsubOptions(String bsubOptions) {
128            this.bsubOptions = bsubOptions;
129        }
130        
131        public int getMaxAttempts() {
132            return maxBsubAttempts;
133        }
134        
135        public void setMaxAttempts(int maxAttempts) {
136            this.maxBsubAttempts = maxAttempts;
137        }
138        
139        public int getMsBtwnSuccess() {
140            return msBtwnSuccess;
141        }
142        
143        public void setMsBtwnSuccess(int msBtwnSuccess) {
144            this.msBtwnSuccess = msBtwnSuccess;
145        }
146        
147        public int getMsBtwnFailure() {
148            return msBtwnFailure;
149        }
150        
151        public void setMsBtwnFailure(int msBtwnFailure) {
152            this.msBtwnFailure = msBtwnFailure;
153        }
154        
155        protected boolean reportedFailure;
156        protected CSHApplication application;
157        private String resSwitch;
158    
159        /** Creates the scripts and dispatches the job on the target machine.
160         * @param request the job request
161         */
162        public void dispatch(Request request, List jobs) {
163            log.info("Dispatching using LSF: \"" + request.getCommand() + "\"");
164    
165            // Enables the simulation mode if necessary
166            useSimulationMode(request.getSimulation());
167            reportedFailure = false;
168    
169            // Submits from the higher to the lower JobID. This way the
170            // user has a feel of  when the last job is going to be
171            // submitted
172            for (int nJob = jobs.size() - 1; nJob >= 0;
173                    nJob--) {
174                Job job = (Job) jobs.get(nJob);
175    
176                System.out.print("Dispatching process " +
177                    job.getJobID() + ".");
178                dispatch(request, job);
179                if (getClusterName() != null) job.setCluster(getClusterName());
180            }
181    
182            //StatisticsRecorder.getIntance().recordStatistics(request, jobs); //Moved Statistics recording to Scheduler.java LH
183        }
184    
185        /* Enables or disables the simulation mode. The simulation mode will deactivate
186         * every command line execution.
187         */
188        public void useSimulationMode(boolean simulation) {
189            this.simulation = simulation;
190            if (simulation) {
191                // Warn the user that we are entering simulated submission mode
192                log.info("Simulating submission");
193                System.out.println("Simulating submission");
194            }
195        }
196    
197        protected void reportProcessSubmissionFailure(Request request, Job job, int jobNumber, String message) {
198            reportFailure(job);
199            System.out.println("Process number " + jobNumber + " wasn't submitted.");
200            System.out.println(message);
201            System.out.println();
202            System.out.println("The process input file were:");
203    
204            List list = job.getInput();
205    
206            for (int nFile = 0; nFile < list.size(); nFile++) {
207                PhysicalFile file = (PhysicalFile) list.get(nFile);
208                System.out.println(" - " + file.getPath() + "/" +
209                    file.getFilename());
210            }
211        }
212    
213        protected void reportFailure(Job job) {
214            if (!reportedFailure) {
215                System.out.println("There were some errors during job submission.");
216                System.out.println("Some processes weren't submitted:");
217            }
218        }
219    
220        /** Currently not implemented
221         * @param request the job for which to retrieve the output
222         */
223        public void retrieveOutput(Request request, List jobs) {
224        }
225    
226        /* Dispatches a single process of a job request.
227         */
228        protected void dispatch(Request request, Job job) {
229            
230            
231            
232            if(application == null){ 
233                System.out.println(" >>>>>>>>>>>> CSHapplacation is not is italized.");
234              
235                String notSet = "The CSHApplication for this dispatcher was not set in the config file. If this is a jUnit test this is normal. Finding default \"CSHApplication\" in ComponentLibrary.";
236                log.warn(notSet);
237                System.out.println(notSet);
238                application = (CSHApplication) ComponentLibrary.getInstance().getComponent("CSHApplication");
239           }
240            
241            
242            /*
243             
244             Trying to use set/get for CSH applecation has major problems 
245             
246             */
247            //application = (CSHApplication) ComponentLibrary.getInstance().getComponent("CSHApplication");
248                
249            
250            // TODO: all the parameters should be passed in one go
251            application.setJob(request, job);
252            //application.setScratchDir(scratchDir);
253            application.setSubmissionCommand(getBsubCommand(request, job));
254    
255            application.prepareJob();
256    
257            log.info("Executing \"" + getBsubCommand(request, job) + "\"");
258            
259            
260    
261            if (!simulation) {
262                try {
263                    Thread.sleep(msBtwnSuccess);
264                } catch (Exception e) {
265                }
266                
267                long StarTime = System.currentTimeMillis();  
268                int attempt = 0;
269                boolean success = false;
270                String pe="";
271    
272                while (!success && (attempt < maxBsubAttempts) && run) {
273                    try {
274                       //TODO: check if this object is really created multiple times or not 
275                        CSHCommandLineTask task = new CSHCommandLineTask(getBsubCommand(request, job), true, getMaxElapseTime());
276                       
277                       task.execute();
278                        if (task.getExitStatus() != 0) {
279                            log.warn("bsub failed ("+getMaxElapseTime()+"): " + task.getOutput());
280                            Thread.sleep(msBtwnFailure);
281                            System.out.print("/");
282                            attempt++;
283                        } else {
284                            success = true;
285                            job.DispatchSuccessful();
286                            job.AddProcesseID(task.getOutput().substring(task.getOutput().indexOf("<") + 1, task.getOutput().indexOf(">")));
287                            job.setDispatchTime(((int) Math.min(System.currentTimeMillis() - StarTime, java.lang.Integer.MAX_VALUE))); 
288                            //System.out.println(job.getDispatchTime() + "ms");
289                        }
290                    } catch (Exception e) {
291                        log.error("Couldn't submit the script to LSF", e);
292                        // JL: write also to STDOUT to help users debug what happened
293                        if (pe == ""){
294                           pe = e.toString();
295                           System.out.print("Couldn't submit the script to LSF" + pe);
296                        }
297                        try {
298                            Thread.sleep(msBtwnFailure);
299                        } catch (Exception e1) {
300                        }
301    
302                        System.out.print("/");
303                        attempt++;
304                    }
305                }
306    
307                if (success) {
308                    System.out.println(" done.");
309                    
310                } else {
311                    System.out.println(" FAILED!!");
312                }
313            } else {
314                System.out.println(" simulated.");
315            }
316        }
317    
318        /** Returns the full bsub command to be executed to dispatch the process. This
319        * command must executed in the directory in which the script resides.
320        * @return the bsub command
321        */
322        String getBsubCommand(Request request, Job job) {
323             StringBuffer bsub = new StringBuffer("cd " + FilesystemToolkit.getCurrentDirectory() + "; " + bsubEx);
324            //bsub.append(bsubEx);
325            
326            //bsub.append(" -q ").append(getQueueName(job));
327            if(getQueueName(job) != null){
328               //if(job.getQueueObj().getName().trim().length() != 0 ) bsub.append(" -q ").append(job.getQueueObj().getName());
329                if(getQueueName(job).trim().length() != 0 ) bsub.append(" -q ").append(getQueueName(job));
330           }
331    
332             
333            if (job.getTarget() != null) {
334    
335                if("rootd".equals(request.getFileListType())){
336                    if(getNodePriorityStringGenerator() == null){
337                        System.out.println("Warring the LSFNodePriorityStringGenerator has note been initialized in the config file for this dispatcher. Please contact your SUMS administrator and report this message."); 
338                        bsub.append(" -m ").append(job.getTarget());
339                    }
340                    else{
341                        bsub.append(" -m ").append(getNodePriorityStringGenerator().generateSyntax(job));
342                    }
343                }
344                else {
345                    bsub.append(" -m ").append(job.getTarget());
346                }
347            }
348             
349             
350             
351            if (application.getJobName() != null) {
352                bsub.append(" -J '").append(application.getJobName()).append("'");
353            }
354    
355            if (application.getStdin() != null) {
356                bsub.append(" -i ").append(application.getStdin());
357            }
358    
359            if (application.getStdout() != null) {
360                bsub.append(" -o ").append(application.getStdout());
361            }
362    
363            if (application.getStderr() != null) {
364                bsub.append(" -e ").append(application.getStderr());
365            }
366    
367            if(job.getMaxMemory() != -1){
368                bsub.append(" -M ").append(job.getMaxMemory() * 1000);  
369            }
370             
371            
372            //This takes the GenericResourceRequirementStringDefinition (if any) and adds the ruage[] to it if the is any then it write out the string
373            GenericResourceRequirementStringDefinition lsfResReqDef = new GenericResourceRequirementStringDefinition();
374            if(ResReqDefinitionObj != null)
375            lsfResReqDef = (GenericResourceRequirementStringDefinition) ComponentLibrary.getInstance().getComponent(ResReqDefinitionObj);
376            
377           
378            if ((getResourceUsageSwitch(job) != null)&&( lsfResReqDef.hasResourcesDefinition(job))) {
379                
380           String Res = "\"(" + lsfResReqDef.makeString(job).replaceAll("\"", "").concat(") ").concat(getResourceUsageSwitch(job).replaceAll("\"", "")).concat("\"");
381            bsub.append(" -R ").append(Res);
382            }
383            else if(getResourceUsageSwitch(job) != null){
384                bsub.append(" -R ").append(getResourceUsageSwitch(job));
385            }
386            else if( lsfResReqDef.hasResourcesDefinition(job)){
387                bsub.append(" -R ").append(lsfResReqDef.makeString(job));
388            }
389            
390       /*   //////////////////////////lbh
391            
392            if (getResourceUsageSwitch(job) != null) {
393                bsub.append(" -R ").append(getResourceUsageSwitch(job));
394            }
395      */  
396            bsub.append(' ').append(bsubOptions);
397    
398    
399            
400            /*
401            The path is being dropped because different drivers for mounting the PanFS path are reporting the path differently. Instated we will put "cd $path" before the command we execute.   
402            Example:
403            /pdirect/star+institutions/data05/scratch/lbhajdu/
404            /star/data05/scratch/lbhajdu/
405            */
406            bsub.append(' ').append(application.getCSHScriptFileName());
407          //bsub.append(' ').append(application.getCommandLine());
408            
409            
410            return bsub.toString();
411        }
412    
413        /** Holds value of property clusterName. */
414        private String clusterName;
415        
416        
417        /*
418    //    private LSFResourceStrategy lsfResourceStrategy;
419          private AbstractResourceStrategy lsfResourceStrategy;
420    //    public void setResourceStrategy(LSFResourceStrategy resourceStrategy) { this.lsfResourceStrategy = resourceStrategy;}
421    //    public LSFResourceStrategy getResourceStrategy() { return lsfResourceStrategy;
422          public void setResourceStrategy(AbstractResourceStrategy resourceStrategy) { this.lsfResourceStrategy = resourceStrategy;}
423          public AbstractResourceStrategy getResourceStrategy() { return lsfResourceStrategy;}
424    
425        protected String getResourceUsageSwitch(Job job) {
426            //FIXME: cache value
427            if (getResourceStrategy() == null) return null;
428            resSwitch = getResourceStrategy().prepareResourceUsageSwitch(job);
429            return "\"rusage[" + resSwitch + "]\"";
430        }*/
431        
432        
433         
434          private List lsfResourceStrategy = new ArrayList();  
435          public List getResourceStrategyList(){ return lsfResourceStrategy; }
436          public void setResourceStrategyList(List lsfResourceStrategy){ this.lsfResourceStrategy = lsfResourceStrategy; }
437          public void addResourceStrategy(AbstractResourceStrategy resourceStrategy){lsfResourceStrategy.add(resourceStrategy);}
438    
439          /**This function is deprecated but still exists to be backwards compatible with older configuration files. The function really calls addResourceStrategy(). For new configuration files please use addResourceStrategy() instead.*/
440          public void setResourceStrategy(AbstractResourceStrategy resourceStrategy) { addResourceStrategy(resourceStrategy);}
441          
442          //public AbstractResourceStrategy getResourceStrategy() { return lsfResourceStrategy;}
443    
444        protected String getResourceUsageSwitch(Job job) {
445            
446            resSwitch = null;
447            if (lsfResourceStrategy.isEmpty()) {return null;}
448            
449            for(int i = 0; i != lsfResourceStrategy.size(); i++){
450                    AbstractResourceStrategy lsfStrategy = (AbstractResourceStrategy) lsfResourceStrategy.get(i);
451    
452                    String resSwichFrag = lsfStrategy.prepareResourceUsageSwitch(job);
453    
454                    if((resSwitch == null)&&(resSwichFrag != null)){
455                        resSwitch = resSwichFrag;
456                    }
457                    else if((resSwitch != null)&&(resSwichFrag != null)){
458                        resSwitch = resSwitch.concat(","+resSwichFrag);
459                    }
460                }
461            
462            
463                if(resSwitch == null) return null;
464                return "\"rusage[" + resSwitch + "]\"";
465                
466            
467            //if (getResourceStrategy() == null) return null;
468            //resSwitch = getResourceStrategy().prepareResourceUsageSwitch(job);
469            //return "\"rusage[" + resSwitch + "]\"";
470            
471        }
472        
473        
474        
475        
476        
477        
478    
479        protected String getQueueName(Job job) {
480            String queue = job.getQueue();
481    
482            if (queue == null) {
483                return queueName;
484            }
485    
486            return queue;
487        }
488        
489        /** Getter for property clusterName.
490         * @return Value of property clusterName.
491         *
492         */
493        public String getClusterName() {
494            return this.clusterName;
495        }
496        
497        /** Setter for property clusterName.
498         * @param clusterName New value of property clusterName.
499         *
500         */
501        public void setClusterName(String clusterName) {
502            this.clusterName = clusterName;
503        }
504        
505        public void Kill(Request request, List jobs) {
506            
507            //System.out.println("lsfr kill");
508            
509            for(int z=0; z != jobs.size(); z++){
510                Job job = (Job)jobs.get(z);
511                
512                //System.out.println("working no job : " + job.getJobID());
513            
514                if(job.getProcesseIDs().size() == 0){
515                    System.out.println("No ProcesseIDs found for job " + job.getJobID());
516                    jobs.remove(z);
517                    z--;
518                  
519                }
520                else{
521                    for(int i=0; job.getProcesseIDs().size() != i; i++){
522    
523                        int attempt = 0;
524                        boolean success = false;
525                        String commmandOutput = "";
526                        System.out.print("ProcesseID: <" + job.getProcesseIDs().get(i) + "> of Job: <" + job.getJobID() + ">");
527    
528                        while (!success && (attempt < maxBsubAttempts)) {
529                                try {
530                                   CSHCommandLineTask task = new CSHCommandLineTask("bkill " + ((String) job.getProcesseIDs().get(i)) , true, getMaxElapseTime());
531                                    task.execute();
532                                    if (task.getExitStatus() != 0) {
533                                        log.warn("bkill " + task.getOutput());
534                                        Thread.sleep(msBtwnFailure);
535                                        if(task.getOutput().lastIndexOf("already finished") != -1) success = true;
536                                        System.out.print(task.getOutput());
537                                        attempt++;
538                                    } 
539                                    else{ 
540                                        success = true;
541                                        System.out.println("Killed");
542                                    }
543    
544                                    commmandOutput = task.getOutput();
545                                } 
546                                catch (Exception e) { System.out.print("bkill failed" + e); 
547                                System.out.print(commmandOutput);
548                                }
549                                try { Thread.sleep(msBtwnFailure);} 
550                                catch (Exception e1) {System.out.print("bkill failed");}
551                                    if(!success) System.out.print("/");
552                                    attempt++;
553                        }
554    
555                    }
556                    job.clearProcesseIDs();
557                    jobs.remove(z);
558                    z--;
559                } 
560           }
561     }
562            
563    
564        
565        public String Status(Job job, int Processe) {
566             
567                if(job.getProcesseIDs().size() == 0) return "No ProcesseIDs found for job " + job.getJobID();
568                if(job.getProcesseIDs().size() < Processe) return job.getJobID() + " only has " + job.getProcesseIDs().size() + "processes, processe " + Processe + "dose not exist.";
569                
570                    
571               // for(int i=0; job.getProcesseIDs().size() != i; i++){
572    
573                    int attempt = 0;
574                    boolean success = false;
575                    String commmandOutput = "";
576                    System.out.print("ProcesseID: <" + job.getProcesseIDs().get(Processe) + "> of Job: <" + job.getJobID() + ">");
577    
578                    while (!success && (attempt < maxBsubAttempts)) {
579                            try {
580                               CSHCommandLineTask task = new CSHCommandLineTask("bjobs " + ((String) job.getProcesseIDs().get(Processe)) , true, getMaxElapseTime());
581                                task.execute();
582                                if (task.getExitStatus() != 0) {
583                                    log.warn("bkill " + task.getOutput());
584                                    Thread.sleep(msBtwnFailure);
585                                    
586                                   // if(task.getOutput().lastIndexOf("already finished") != -1) success = true;
587                                    //return (task.getOutput().replace('\n',' ');
588                                    attempt++;
589                                } 
590                                else{ 
591                                    success = true;
592                                    
593                                    //task.getOutput().indexOf(" STAT ");
594                                    //task.getOutput().indexOf(" QUEUE ");
595                                   // task.getOutput().indexOf("\n");
596                                    
597                                    return (task.getOutput().substring(1 + task.getOutput().indexOf("\n") + task.getOutput().indexOf(" STAT "), task.getOutput().indexOf("\n") + task.getOutput().indexOf(" QUEUE ")).trim());
598                                    //System.out.println("Killed");
599                                }
600    
601                                commmandOutput = task.getOutput();
602                            } 
603                            catch (Exception e) { System.out.print("bjobs failed" + e); 
604                            System.out.print(commmandOutput);
605                            }
606                            try { Thread.sleep(msBtwnFailure);} 
607                            catch (Exception e1) {System.out.print("bjobs failed");}
608                                if(!success) System.out.print("/");
609                                attempt++;
610                    }
611    
612               // }
613                
614           return "bjobs failed";
615     }
616        
617        /** 
618         * Sets the object initialized that writes the CSH script.
619         * @param application The initialized CSH writer object 
620         **/
621        public void setApplication(CSHApplication application){
622            this.application = application;
623        }
624        
625        /** recovers the initialized object that writes the CSH application
626         * @return the object that writes the csh script **/
627        public CSHApplication getApplication(){
628            return application;
629        }
630        public volatile boolean run = true;
631        public void stop(){ run = false; } 
632        
633        private LSFNodePriorityStringGenerator nodePriorityStringGenerator;
634        /**
635         *Set the object used to generate the node priority string.
636         **/
637        public void setNodePriorityStringGenerator(LSFNodePriorityStringGenerator nodePriorityStringGenerator){this.nodePriorityStringGenerator = nodePriorityStringGenerator;}
638        public LSFNodePriorityStringGenerator getNodePriorityStringGenerator(){ return nodePriorityStringGenerator; }  
639        
640        
641        
642        
643        
644        String[] bqueuesLines = {};
645        /**
646         *Test the queue get an indication if job can be submitted successfully to the queue.
647         *@param queue The queue object to be tested 
648         *@return Ture if the job will most likely run, and false if the job will most likely fail.
649        **/
650        public boolean test(Queue queue){
651            
652            if(bqueuesLines.length == 0){ //only run the command one time
653                if(! runInTimeLimitedThread("bqueues" , maxElapseTime, msBtwnFailure, 5  )){
654                    return false;
655                }
656                else{
657                    bqueuesLines = super.threadOuput.split("\n"); 
658                }
659            }
660            
661            if(bqueuesLines.length == 0) return false; //just in case notting was returnned;
662            
663            for(int i = 0; i < bqueuesLines.length; i++){ //bqueuesLines[i] = bqueuesLines[i].replace('\n', ' ');
664                if(bqueuesLines[i].matches( ".*" + queue.getName() + ".*Open.Active.*")){
665                    return true;  
666                }     
667            }
668                   
669            return false;    
670        
671        }
672        
673       
674    }