001    /*
002     * LSFDispatcher.java
003     *
004     * Created on December 23, 2002, 11:30 AM
005     *
006     * This file is part of the STAR Scheduler.
007     * Copyright (c) 2002-2006 STAR Collaboration - Brookhaven National Laboratory
008     *
009     * STAR Scheduler is free software; you can redistribute it and/or modify
010     * it under the terms of the GNU General Public License as published by
011     * the Free Software Foundation; either version 2 of the License, or
012     * (at your option) any later version.
013     *
014     * STAR Scheduler is distributed in the hope that it will be useful,
015     * but WITHOUT ANY WARRANTY; without even the implied warranty of
016     * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
017     * GNU General Public License for more details.
018     *
019     * You should have received a copy of the GNU General Public License
020     * along with STAR Scheduler; if not, write to the Free Software
021     * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
022     */
023    package gov.bnl.star.offline.scheduler.Dispatchers.sge;
024    
025    import gov.bnl.star.offline.scheduler.Dispatchers.lsf.*;
026    import gov.bnl.star.offline.scheduler.ComponentLibrary;
027    import gov.bnl.star.offline.scheduler.Dispatcher;
028    import gov.bnl.star.offline.scheduler.Queue;
029    import gov.bnl.star.offline.scheduler.Job;
030    import gov.bnl.star.offline.scheduler.request.Request;
031    import gov.bnl.star.offline.scheduler.catalog.PhysicalFile;
032    import gov.bnl.star.offline.scheduler.util.CSHCommandLineTask;
033    //import gov.bnl.star.offline.scheduler.util.StatisticsRecorder; //Moved Statistics recording to Scheduler.java LH
034    import gov.bnl.star.offline.scheduler.util.GenericResourceRequirementStringDefinition;
035    import gov.bnl.star.offline.scheduler.Dispatchers.AbstractResourceStrategy;
036    
037    import gov.bnl.star.offline.scheduler.util.FilesystemToolkit;
038    
039    import java.util.*;
040    //import java.util.List;
041    import java.util.logging.Level;
042    import org.apache.log4j.Logger;
043    
044    
045    
046    /** Dispatches a jobs using SGE.
047     * <p>
048     * For each process, two files are created: a script for the execution and
049     * a text file containing the file list. The script basically sets the
050     * environment variables and executes the command line. The file list
051     * contains the input file requested, one full path for each line in the list.
052     * <p>
053     * Each script is submitted through bsub.
054     * <p>
055     * The simulation flag will make the scheduler not actually execute the command
056     * lines. Therefore scripts and fileLists are created, but the bsub and chmod
057     * commands are not executed. Log and output won't be affected, except that there
058     * will be a message warning that the submission is simulated.
059     *
060     * @author Levente Hajdu, Jerome Lauret
061     * @version 1.0 2002/12/26
062     */
063    public class SGEDispatcher extends LSFDispatcher implements Dispatcher {
064    
065        static private Logger log = Logger.getLogger(SGEDispatcher.class.getName());
066    
067     // private String resourceStrategy;
068     // protected String scratchDir;
069        private String bsubEx;
070        protected boolean simulation = false;
071    //  private String queueName;
072        private String bsubOptions;
073        private int maxBsubAttempts;
074        private int msBtwnSuccess;
075        private int msBtwnFailure;
076        private String ResReqDefinitionObj;
077        
078        public void setResourceRequirementStringDefinition(String ResReqDefinitionObj){
079            this.ResReqDefinitionObj = ResReqDefinitionObj;
080            
081        }
082     
083        
084    //    public String getScratchDir() {
085    //        return scratchDir;
086    //    }
087        
088    //    public void setScratchDir(String scratchDir) {
089    //        this.scratchDir = scratchDir;
090    //    }
091        
092        public String getQsubEx() {
093            return bsubEx;
094        }
095        
096        public void setQsubEx(String bsubEx) {
097            this.bsubEx = bsubEx;
098        }
099        
100        
101    
102    //    public String getQueueName() {
103    //        return queueName;
104    //    }
105        
106    //    public void setQueueName(String queueName) {
107    //        this.queueName = queueName;
108    //    }
109        
110    //    public String getBsubOptions() {
111    //        return bsubOptions;
112    //    }
113        
114    //    public void setBsubOptions(String bsubOptions) {
115    //        this.bsubOptions = bsubOptions;
116    //    }
117        
118        public int getMaxAttempts() {
119            return maxBsubAttempts;
120        }
121        
122        public void setMaxAttempts(int maxAttempts) {
123            this.maxBsubAttempts = maxAttempts;
124        }
125        
126        public int getMsBtwnSuccess() {
127            return msBtwnSuccess;
128        }
129        
130        public void setMsBtwnSuccess(int msBtwnSuccess) {
131            this.msBtwnSuccess = msBtwnSuccess;
132        }
133        
134        public int getMsBtwnFailure() {
135            return msBtwnFailure;
136        }
137        
138        public void setMsBtwnFailure(int msBtwnFailure) {
139            this.msBtwnFailure = msBtwnFailure;
140        }
141        
142        protected boolean reportedFailure;
143        protected CSHApplication application;
144        private String resSwitch;
145        
146        void getGenericResourceRequirementStringDefinition(){
147            
148        }
149        
150        //GenericResourceRequirementStringDefinition
151    
152        /** Creates the scripts and dispatches the job on the target machine.
153         * @param request the job request
154         */
155        
156        public void dispatch(Request request, List jobs) {
157            log.info("Dispatching using LSF: \"" + request.getCommand() + "\"");
158    
159            // Enables the simulation mode if necessary
160            useSimulationMode(request.getSimulation());
161            
162            reportedFailure = false;
163    
164            // Submits from the higher to the lower JobID. This way the
165            // user has a feel of  when the last job is going to be
166            // submitted
167            for (int nJob = jobs.size() - 1; nJob >= 0;
168                    nJob--) {
169                Job job = (Job) jobs.get(nJob);
170    
171                System.out.print("Dispatching process " +
172                    job.getJobID() + ".");
173                dispatch(request, job);
174                if (getClusterName() != null) job.setCluster(getClusterName());
175            }
176    
177            //StatisticsRecorder.getIntance().recordStatistics(request, jobs); //Moved Statistics recording to Scheduler.java LH
178        }
179       
180        /* Enables or disables the simulation mode. The simulation mode will deactivate
181         * every command line execution.
182         */
183        
184        
185        public void useSimulationMode(boolean simulation) {
186            this.simulation = simulation;
187    
188            if (simulation) {
189                // Warn the user that we are entering simulated submission mode
190                log.warn("Simulating submission");
191                System.out.println("Simulating submission");
192            }
193        }
194        
195     
196    
197        /* Dispatches a single process of a job request.
198         */
199    
200        protected void dispatch(Request request, Job job) {
201            
202            //application = (CSHApplication) ComponentLibrary.getInstance().getComponent("CSHApplication");
203            
204            //No long get the CSHApplication object from the config file. Get it using the setApplication() and getApplication() via the config file
205            if(application == null){ //If this was not set in the config file or if we'er in junit testing mode then print an error, and use the default file
206                System.out.println(" >>>>>>>>>>>> CSHapplacation is not is italized.");  
207                String notSet = "The CSHApplication for this dispatcher was not set in the config file. If this is a jUnit test this is normal. Finding default \"CSHApplication\" in ComponentLibrary.";
208                log.warn(notSet);
209                System.out.println(notSet);
210                application = (CSHApplication) ComponentLibrary.getInstance().getComponent("CSHApplication");
211            }
212            
213            
214            // TODO: all the parameters should be passed in one go
215            application.setJob(request, job);
216            //application.setScratchDir(scratchDir);
217            application.setSubmissionCommand(getBsubCommand(request, job));
218    
219            application.prepareJob();
220    
221            log.info("Executing \"" + getBsubCommand(request, job) + "\"");
222    
223            if (!simulation) {
224                //System.out.println("!Not simulation"); //used for debuginh 
225                try {
226                    Thread.sleep(msBtwnSuccess);
227                } catch (Exception e) {
228                }
229    
230                long StarTime = System.currentTimeMillis();
231                int attempt = 0;
232                boolean success = false;
233                String pe="";
234    
235                while (!success && (attempt < maxBsubAttempts)) {
236                    try {
237                        CSHCommandLineTask task = new CSHCommandLineTask(getBsubCommand(
238                                    request, job), true, getMaxElapseTime());
239                        task.execute();
240    
241                        if (task.getExitStatus() != 0) {
242                            log.warn(getQsubEx() + " failed: " + task.getOutput());
243                            Thread.sleep(msBtwnFailure);
244                            System.out.print("/");
245                            attempt++;
246                        } else {
247                            success = true;
248                            job.DispatchSuccessful();
249                            job.AddProcesseID(task.getOutput().substring(task.getOutput().indexOf("your job") + 8 ,task.getOutput().indexOf('(') - 1).trim());
250                            job.setDispatchTime(((int) Math.min(System.currentTimeMillis() - StarTime, java.lang.Integer.MAX_VALUE)));
251                            //System.out.println("\n\npid=" +  task.getOutput() + "\n" );
252                            //System.out.println("\n\npid=" +  ((String) job.getProcesseIDs().get(0)) + "\n" );
253                            
254                            
255                        }
256                    } catch (Exception e) {
257                        log.error("Couldn't submit the script to SGE", e);
258                        // JL: write also to STDOUT to help users debug what happened
259                        if (pe == ""){
260                           pe = e.toString();
261                           System.out.print("Couldn't submit the script to SGE" + pe);
262                        }
263                        try {
264                            Thread.sleep(msBtwnFailure);
265                        } catch (Exception e1) {
266                        }
267    
268                        System.out.print("/");
269                        attempt++;
270                    }
271                }
272    
273                if (success) {
274                    System.out.println(" done.");
275                } else {
276                    System.out.println(" FAILED!!");
277                }
278            } else {
279                System.out.println(" simulated.");
280            }
281        }
282    
283        String getBsubCommand(Request request, Job job){
284            return getQsubCommand(request,job);
285        }
286        
287        
288        /** Returns the full qsub command to be executed to dispatch the process. This
289        * command must executed in the directory in which the script resides.
290        * @return the bsub command
291        */
292        String getQsubCommand(Request request, Job job) {
293           
294            //StringBuffer qsub = new StringBuffer(getQsubEx());
295            StringBuffer qsub = new StringBuffer("cd " + FilesystemToolkit.getCurrentDirectory() + "; " + bsubEx);
296           
297           
298           if (application.getJobName() != null) {
299                qsub.append(" -N '").append(application.getJobName()).append("'");
300            }
301           
302           if (application.getStdout() != null) {
303                qsub.append(" -o ").append(application.getStdout());
304            }
305           
306           if (application.getStderr() != null) {
307                qsub.append(" -e ").append(application.getStderr());
308           }else{
309                qsub.append(" -j y");
310           }
311           
312           //fills in the -l resources (if any)
313           int minMem = request.getResource("Memory").getMin();
314           if (getResourceUsageSwitch(job) != null 
315               || (job.getTarget() != null) 
316               || (minMem != -1)) {
317               qsub.append(" -l ");
318               if(getResourceUsageSwitch(job) != null)qsub.append(getResourceUsageSwitch(job).replace(':', ',')).append(",");
319               if(job.getTarget() != null) qsub.append("hostname=").append(job.getTarget()).append(",");
320               if(minMem != -1) qsub.append("h_vmem=").append(minMem).append("M,");
321               int minStorage = request.getResource("StorageSpace").getMin();
322               if(minStorage != -1) qsub.append("scratchfree=").append(minStorage).append("M,");
323               //remore the last ','    
324               qsub.deleteCharAt(qsub.length() - 1);  
325           }
326    
327           int maxMem =  request.getResource("Memory").getMax();
328           if (maxMem != -1) {
329               System.out.println("\nWarning your request for MaxMemory="+ maxMem + "MB will not be honored by SGEDispatcher !!!!\n");
330               log.warn("\nWarning your request for MaxMemory="+ maxMem + "MB will not be honored by SGEDispatcher !!!!\n");
331           }
332    
333           int maxStorage = request.getResource("StorageSpace").getMax();
334           if (maxStorage != -1) {
335               System.out.println("\nWarning your request for MaxStorageSpace=" + maxStorage + "MB will not be honored by SGEDispatcher !!!!\n");
336               log.warn("\nWarning your request for MaxStorageSpace=" + maxStorage + "MB will not be honored by SGEDispatcher !!!!\n");
337           }
338           
339           
340           
341           if(job.getQueueObj().getName() != null){
342               if(job.getQueueObj().getName().trim().length() != 0 ) qsub.append(" -q ").append(job.getQueueObj().getName());
343           }    
344           
345           
346           if(getQsubOptions() != null) qsub.append(" " + getQsubOptions());
347           
348           
349           
350           
351           /*
352           The path is being dropped because different drivers for mounting the PanFS path are reporting the path differently. Instated we will put "cd $path" before the command we execute.   
353           Example:
354           /pdirect/star+institutions/data05/scratch/lbhajdu/
355           /star/data05/scratch/lbhajdu/
356           */
357           qsub.append(' ').append(application.getCSHScriptFileName());
358           //qsub.append(' ').append(application.getCommandLine());
359    
360    
361           //This is used if you need to see the command
362           //System.out.println("\n" + qsub.toString() + "\n");
363           
364           
365           return qsub.toString();
366        }
367      
368        
369            private List sgeResourceStrategy = new ArrayList();
370            public List getResourceStrategyList(){ return sgeResourceStrategy; }
371            public void setResourceStrategyList(List sgeResourceStrategy){ this.sgeResourceStrategy = sgeResourceStrategy; }
372            public void addResourceStrategy(AbstractResourceStrategy resourceStrategy){sgeResourceStrategy.add(resourceStrategy);}
373            
374            /** Biuld an SGE resource usage switch for this job to be appanded to the submitting comaand
375             *  @param job job to biuld the resource uisage string from
376             *  @return  SGE resource usage switch for this job
377             **/
378            protected String getResourceUsageSwitch(Job job) {
379                resSwitch = null;
380    
381                //FIXME: cache value
382                //if (getResourceStrategy() == null) {return null;}
383                if (sgeResourceStrategy.isEmpty()) {return null;}
384    
385                //resSwitch = getResourceStrategy().prepareResourceUsageSwitch(job);
386    
387                //Run the jobs though all of the resource strategies and concatenate all the strings they output together.  
388                for(int i = 0; i != sgeResourceStrategy.size(); i++){
389                    AbstractResourceStrategy sgeStrategy = (AbstractResourceStrategy) sgeResourceStrategy.get(i);
390    
391                    String resSwichFrag = sgeStrategy.prepareResourceUsageSwitch(job);
392    
393                    if((resSwitch == null)&&(resSwichFrag != null)){
394                        resSwitch = resSwichFrag;
395                    }
396                    else if((resSwitch != null)&&(resSwichFrag != null)){
397                        resSwitch = resSwitch.concat(","+resSwichFrag);
398                    }
399                }
400                return resSwitch;
401            }
402            
403            
404            /** Set the class that writes the sricpt that will be executed by the batch system */
405            public void setApplication(CSHApplication application){
406                    this.application = application;
407            }
408            
409            /** Get the class that writes the sricpt that will be executed by the batch system */
410            public CSHApplication getApplication(){
411                    return application;
412            }
413         
414        
415            public void Kill(Request request, List jobs) {
416            
417            //System.out.println("lsfr kill");
418            
419            for(int z=0; z != jobs.size(); z++){
420                Job job = (Job)jobs.get(z);
421                
422                //System.out.println("working no job : " + job.getJobID());
423            
424                if(job.getProcesseIDs().size() == 0){
425                    System.out.println("No ProcesseIDs found for job " + job.getJobID());
426                    jobs.remove(z);
427                    z--;
428                  
429                }
430                else{
431                    for(int i=0; job.getProcesseIDs().size() != i; i++){
432    
433                        int attempt = 0;
434                        boolean success = false;
435                        String commmandOutput = "";
436                        System.out.print("ProcesseID: <" + job.getProcesseIDs().get(i) + "> of Job: <" + job.getJobID() + ">");
437    
438                        while (!success && (attempt < maxBsubAttempts)) {
439                                try {
440                                   CSHCommandLineTask task = new CSHCommandLineTask("qdel " + ((String) job.getProcesseIDs().get(i)) , true, getMaxElapseTime());
441                                    task.execute();
442                                    if (task.getExitStatus() != 0) {
443                                        log.warn("qdel " + task.getOutput());
444                                        Thread.sleep(msBtwnFailure);
445                                        if(task.getOutput().lastIndexOf("does not exist") != -1) success = true;
446                                        System.out.print(task.getOutput());
447                                        attempt++;
448                                    } 
449                                    else{ 
450                                        success = true;
451                                        System.out.println("Killed");
452                                    }
453    
454                                    commmandOutput = task.getOutput();
455                                } 
456                                catch (Exception e) { System.out.print("qdel failed" + e); 
457                                System.out.print(commmandOutput);
458                                }
459                                try { Thread.sleep(msBtwnFailure);} 
460                                catch (Exception e1) {System.out.print("qdel failed");}
461                                    if(!success) System.out.print("/");
462                                    attempt++;
463                        }
464    
465                    }
466                    job.clearProcesseIDs();
467                    jobs.remove(z);
468                    z--;
469                } 
470           }
471     }
472        
473            public String Status(Job job, int Processe) {
474             
475                if(job.getProcesseIDs().size() == 0) return "No ProcesseIDs found for job " + job.getJobID();
476                if(job.getProcesseIDs().size() < Processe) return job.getJobID() + " only has " + job.getProcesseIDs().size() + "processes, processe " + Processe + "dose not exist.";
477                
478                    
479               // for(int i=0; job.getProcesseIDs().size() != i; i++){
480    
481                    int attempt = 0;
482                    boolean success = false;
483                    String commmandOutput = "";
484                    System.out.print("ProcesseID: <" + job.getProcesseIDs().get(Processe) + "> of Job: <" + job.getJobID() + ">");
485    
486                    while (!success && (attempt < maxBsubAttempts)) {
487                            try {
488                               CSHCommandLineTask task = new CSHCommandLineTask("qstat | grep '^ " + ((String) job.getProcesseIDs().get(Processe)) + " '"  +"; echo text" , true, getMaxElapseTime());
489                                task.execute();
490                                if (task.getExitStatus() != 0) {
491                                    log.warn("qstat " + task.getOutput());
492                                    Thread.sleep(msBtwnFailure);
493                                    
494                                   // if(task.getOutput().lastIndexOf("already finished") != -1) success = true;
495                                    //return (task.getOutput().replace('\n',' ');
496                                    attempt++;
497                                } 
498                                else{ 
499                                    success = true;
500                                    
501                                    //task.getOutput().indexOf(" STAT ");
502                                    //task.getOutput().indexOf(" QUEUE ");
503                                   // task.getOutput().indexOf("\n");
504                                    if(task.getOutput().length() < 50) return "No data avalable ";
505                                    else
506                                    return (task.getOutput().substring(38,42).trim());
507                                    //System.out.println("Killed");
508                                }
509    
510                                commmandOutput = task.getOutput();
511                            } 
512                            catch (Exception e) { System.out.print("qstat failed" + e); 
513                            System.out.print(commmandOutput);
514                            }
515                            try { Thread.sleep(msBtwnFailure);} 
516                            catch (Exception e1) {System.out.print("qstat failed");}
517                                if(!success) System.out.print("/");
518                                attempt++;
519                    }
520    
521               // }
522                
523            return "No data avalable ";
524         }
525        
526            public void retrieveOutput(Request job, List jobs) {
527            }
528            
529            
530            private String qsubOptions;
531            public String getQsubOptions() { return qsubOptions;}
532            public void setQsubOptions(String qsubOptions) { this.qsubOptions = qsubOptions;}
533            
534           
535        String qstat = "";
536        /**
537         * Runs test(s) on underlying components to determine if submitting jobs should be attempted.
538         * @param queue The queue object to be tested
539         * @return Will return true to indicate everything is alright and false if the test has failed
540         */
541        public boolean test(Queue queue){
542            
543            
544            if(qstat.length() == 0){ //only run the command one time
545                if(! runInTimeLimitedThread("qstat -g c" , getMaxElapseTime(), msBtwnFailure, 5  )){
546                    return false;
547                }
548                else{
549                    //System.out.println(super.threadOuput); //show the output for debugging
550                    qstat = super.threadOuput.replace('\n', ' '); //keep everything on one line 
551                }
552            }
553             
554            if(qstat.length() == 0) return false; //just in case notting was returnned;
555            if(qstat.matches(".* [123456789]* .*")){ //find any non zero digit in bewteen speces. This will find eider n available or n used nodes, in both cases the site is alright. 
556    
557                    return true;  
558                }
559            
560            return false;    
561                
562        }
563            
564            
565    }