001    /*
002     * CondorGPBSDispatcher.java
003     *
004     * Created on June 8, 2004, 11:17 AM
005     *
006     * This file is part of the STAR Scheduler.
007     * Copyright (c) 2002-2006 STAR Collaboration - Brookhaven National Laboratory
008     *
009     * STAR Scheduler is free software; you can redistribute it and/or modify
010     * it under the terms of the GNU General Public License as published by
011     * the Free Software Foundation; either version 2 of the License, or
012     * (at your option) any later version.
013     *
014     * STAR Scheduler is distributed in the hope that it will be useful,
015     * but WITHOUT ANY WARRANTY; without even the implied warranty of
016     * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
017     * GNU General Public License for more details.
018     *
019     * You should have received a copy of the GNU General Public License
020     * along with STAR Scheduler; if not, write to the Free Software
021     * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
022     */
023    package gov.bnl.star.offline.scheduler.Dispatchers.condorg;
024    
025    import gov.bnl.star.offline.scheduler.*;
026    import gov.bnl.star.offline.scheduler.request.Request;
027    import gov.bnl.star.offline.scheduler.Queue;
028    import gov.bnl.star.offline.scheduler.Dispatchers.lsf.CSHApplication;
029    import gov.bnl.star.offline.scheduler.Dispatchers.lsf.LSFDispatcher;
030    import gov.bnl.star.offline.scheduler.util.CSHCommandLineTask;
031    import gov.bnl.star.offline.scheduler.util.FilesystemToolkit;
032    //import gov.bnl.star.offline.scheduler.util.StatisticsRecorder;
033    import gov.bnl.star.offline.scheduler.util.sandbox.Sandbox;
034    
035    import java.io.File;
036    import java.io.FileOutputStream;
037    import java.io.PrintStream;
038    import java.util.*;
039    
040    import java.util.logging.Level;
041    import org.apache.log4j.Logger;
042    
043    
044    /** Dispatches jobs using Condor-G on a remote site that uses PBS. 
045     * It will NOT use extra rsl attributes for PBS.  If needed they will
046     * be added later. 
047     * @author Alex Withers, Levente Hajdu
048     * @version 1.0 2004/06/08
049     */
050    public class CondorGRSLDispatcher extends LSFDispatcher {
051        static private Logger log = Logger.getLogger(CondorGRSLDispatcher.class.getName());
052    
053        private static String condorEx;
054        protected CSHApplication application;
055        
056        
057        private boolean useRSL = false;
058        /* This RSL hack can be turned on to pass additional data to the batch system when submitting via condorG, However the patch must be in place or else submitting with this property turned on will fail. Please see http://www.star.bnl.gov/STAR/comp/Grid/scheduler/dev/lsfPatch.html for additional information. */ 
059        public void setUseRSL(boolean useRSL){this.useRSL = useRSL;}; 
060        public boolean getUseRSL(){return useRSL;}; 
061        
062    
063        public void setCondorEx(String condorEx) { this.condorEx = condorEx; }
064        
065        public String getCondorEx() { return condorEx; }
066    
067        /** Creates a new dispatcher */
068        public CondorGRSLDispatcher() {
069        }
070        
071        private String condorGOptions;
072        public String getCondorGOptions(){ return condorGOptions; }
073        public void setCondorGOptions(String condorGOptions){ this.condorGOptions = condorGOptions; }
074    
075        /** Creates the scripts and dispatches the job on the target machine.
076         * @param request the job request
077         */
078        public void dispatch(Request request, List jobs) {
079            log.info("Dispatching using Condor-g and LSF: \"" + request.getCommand() +
080                "\"");
081    
082            // Enables the simulation mode if necessary
083            useSimulationMode(request.getSimulation());
084            reportedFailure = false;
085    
086            // Submits from the higher to the lower JobID. This way the
087            // user has a feel of  when the last job is going to be
088            // submitted
089            for (int nProcess = jobs.size() - 1; nProcess >= 0;
090                    nProcess--) {
091                Job job = (Job) jobs.get(nProcess);
092    
093                System.out.print("Dispatching process " +
094                    job.getJobID() + ".");
095                dispatch(request, job);
096            }
097    
098            //StatisticsRecorder.getInstance().recordStatistics(request, jobs); //removed and moved to frame-work
099        }
100    
101        protected void dispatch(Request request, Job job) {
102            //application = (CSHApplication) ComponentLibrary.getInstance().getComponent("CSHApplication");
103            
104            //No long get the CSHApplication object from the config file. Get it using the setApplication() and getApplication() via the config file
105            if(application == null){ //If this was not set in the config file or if we'er in junit testing mode then print an error, and use the default file
106                System.out.println(" >>>>>>>>>>>> CSHapplacation is not is italized.");  
107                String notSet = "The CSHApplication for this dispatcher was not set in the config file. If this is a jUnit test this is normal. Finding default \"CSHApplication\" in ComponentLibrary.";
108                log.warn(notSet);
109                System.out.println(notSet);
110                application = (CSHApplication) ComponentLibrary.getInstance().getComponent("CSHApplication");
111            }
112            
113            // TODO: all the parameters should be passed in one go
114            application.setJob(request, job);
115    //        application.setScratchDir(scratchDir);
116            application.setSubmissionCommand(getCondorGCommand(request, job));
117    
118            application.prepareJob();
119            prepareClassAd(request, job);
120    
121            log.info("Executing \"" + getCondorGCommand(request, job) + "\"");
122    
123            if (!simulation) {
124                try {
125                    Thread.sleep(getMsBtwnSuccess());
126                } catch (Exception e) {
127                }
128    
129                long StarTime = System.currentTimeMillis();
130                int attempt = 0;
131                boolean success = false;
132    
133                while (!success && (attempt < getMaxAttempts())) {
134                    try {
135                        CSHCommandLineTask task = new CSHCommandLineTask(getCondorGCommand(
136                                    request, job), true, 30000);
137                        task.execute();
138    
139                        if (task.getExitStatus() != 0) {
140                            log.warn("bsub failed: " + task.getOutput());
141                            Thread.sleep(getMsBtwnFailure());
142                            System.out.print("/");
143                            attempt++;
144                        } else {
145                            success = true;
146                            job.DispatchSuccessful();
147                            job.AddProcesseID(task.getOutput().substring(task.getOutput().indexOf("submitted to cluster") + 20, task.getOutput().length()).replace('.', ' ').trim());
148                            job.setDispatchTime(((int) Math.min(System.currentTimeMillis() - StarTime, java.lang.Integer.MAX_VALUE)));
149                        }
150                    } catch (Exception e) {
151                        log.fatal("Couldn't submit the script to Condor-g", e);
152    
153                        try {
154                            Thread.sleep(getMsBtwnFailure());
155                        } catch (Exception e1) {
156                        }
157    
158                        System.out.print("/");
159                        attempt++;
160                    }
161                }
162    
163                if (success) {
164                    System.out.println(" done.");
165                } else {
166                    System.out.println(" FAILED!!");
167                }
168            } else {
169                System.out.println(" simulated.");
170            }
171        }
172    
173        /** Returns the command line to submit the job through condor-g.
174         * @param request the request that originated the job
175         * @param job the job to be dispatched
176         * @return the commandline to submit the job
177         */
178        protected String getCondorGCommand(Request request, Job job) {
179            //The "cd" command was added here, just incase the dir is "somehow" chaged while running
180            return "cd " + FilesystemToolkit.getCurrentDirectory() + "; " + condorEx + " " + getClassAdName(request, job);
181            //return condorEx + " " + getClassAdName(request, job);
182        }
183    
184        /** Returns the name of the file containing the class ad. Class ad is the job
185         * description required by condor to submit a job.
186         * @param request the request that originated the job
187         * @param job the job to be submitted
188         * @return the file name of the class ad
189         */
190        protected String getClassAdName(Request request, Job job) {
191            return "sched" + job.getJobID() + ".condorg";
192        }
193    
194        private void prepareClassAd(Request request, Job job) {
195      
196            try {
197                
198                PrintStream classAd = new PrintStream(new FileOutputStream( new File(getClassAdName(request, job))));
199                createClassAd(request, job, classAd);
200                
201            } catch (Exception e) {
202                log.fatal("Couldn't create the class ad", e);
203                throw new RuntimeException("Couldn't create the class ad " +
204                    getClassAdName(request, job) + ": " + e.getMessage());
205            }
206        }
207    
208        private void createClassAd(Request request, Job job, PrintStream classAd) {
209            
210            
211            classAd.print("executable = ");
212            classAd.println(getExecutable());
213    
214            if (getArguments() != null) {
215                classAd.print("arguments = ");
216                classAd.println(getArguments());
217            }
218    
219            classAd.print("globusscheduler = ");
220            
221    //        if(job.getQueueObj().getBatchSystem().getGatekeeper() != null){  
222    //            classAd.println(job.getQueueObj().getBatchSystem().getGatekeeper().getName() + "/jobmanager-" + job.getQueueObj().getBatchSystem().getName());  
223    //        } 
224            
225            String batchSystem = job.getAccessMethod().getBatchSystem();
226            String gateKeeper = ((GateKeeperAccessPoint) job.getAccessPoint()).getName();
227            if((batchSystem != null) && (gateKeeper != null)){
228                classAd.println(gateKeeper + "/jobmanager-" + batchSystem);  
229            }else{
230                System.out.println("Could not find gatekeeper for this batch sytem, please check config file");
231                log.fatal("Could not find gatekeeper for this batch sytem, please check config file");
232                throw new RuntimeException("Could not find gatekeeper for this batch sytem, please check config file");
233            }
234    
235            if (application.getStdin() != null) {
236                classAd.print("input = ");
237                classAd.println(application.getStdin());
238            }
239    
240            if (application.getStdout() != null) {
241                classAd.print("output = ");
242                classAd.println(application.getStdout());
243            }
244    
245            if (application.getStderr() != null) {
246                classAd.print("error = ");
247                classAd.println(application.getStderr());
248            }
249    
250            classAd.print("log = ");
251            classAd.println(getLogName(job));
252    
253            if (getRemoteDirectory() != null) {
254                classAd.print("remote_initialdir = ");
255                classAd.println(getRemoteDirectory());
256            }
257            
258            Sandbox sandbox = request.getSandbox();
259            if(sandbox != null){
260                List sandboxedFiles = sandbox.getSandboxedFiles(); //get the list of files that need to be copyed
261                if(sandboxedFiles != null){ 
262                    if(sandboxedFiles.size() > 0){
263                        classAd.println("WhenToTransferOutput = ON_EXIT");
264                        classAd.print("transfer_input_files = ");
265                        for(int i = 0; i != sandboxedFiles.size(); i++){
266                            classAd.print(sandboxedFiles.get(i)); //write out each file.
267                            if((i + 1) != sandboxedFiles.size()) classAd.print(",");
268                        }
269                        classAd.println();
270                    }
271                }
272            }
273            
274            /* This is basically the main difference from
275             * CondorGLSFDispatcher.java.  No globus-rsl stuff.
276             * -- Alex Withers 
277             */
278                
279            if(getUseRSL()){
280                
281                classAd.print("globusrsl =");
282                
283                if (job.getQueue() != null) {
284                    classAd.print(" (queue = ");
285                    classAd.print(job.getQueue());
286                    classAd.print(")");
287                }
288                
289                if(getUseLSFMod()){
290                    if (job.getTarget() != null) {
291                        classAd.print(" (xlsfmachine = ");
292                        classAd.print(job.getTarget());
293                        classAd.print(")");
294                    }
295    
296                    if (application.getJobName() != null) {
297                        classAd.print(" (xlsfjobname = ");
298                        classAd.print(application.getJobName());
299                        classAd.print(")");
300                    }
301    
302                    if (request.getMail()) {
303                        classAd.print(" (xlsfmailreport = ");
304                        classAd.print("false");
305                        classAd.print(")");
306                    } else {
307                        classAd.print(" (xlsfmailreport = ");
308                        classAd.print("true");
309                        classAd.print(")");
310                    }
311    
312                    if (getResourceUsageSwitch(job) != null) {
313                        classAd.print(" (xlsfresources = ");
314                        classAd.print(getResourceUsageSwitch(job));
315                        classAd.print(")");
316                    }
317                }
318    
319    
320                classAd.println();
321            }
322            
323    
324            if (isTransferExecutable()) {
325                classAd.println("transfer_executable = true");
326            } else {
327                classAd.println("transfer_executable = false");
328            }
329            
330            classAd.println("notification = never");
331            classAd.println("universe = globus");
332            
333            if (getCondorGOptions() != null) {
334                classAd.println(getCondorGOptions());
335            }
336            
337            classAd.println("queue");
338             
339        }
340    
341        private String getExecutable() {
342            
343            
344            if (application.getCommandLine().indexOf(' ') == -1) {
345                return application.getCommandLine();
346            }
347    
348            return application.getCommandLine().substring(0, application.getCommandLine().indexOf(' '));
349        }
350    
351        private String getArguments() {
352            if (application.getCommandLine().indexOf(' ') == -1) {
353                return null;
354            }
355    
356            return application.getCommandLine().substring(application.getCommandLine().indexOf(' ') + 1);
357        }
358    
359        private String getLogName(Job job) {
360            // TODO maybe log filename should be put as a general property of Process (as stds)
361            return "sched" + job.getJobID() + ".condorg.log";
362        }
363        /*
364        private String getGlobusScheduler() {
365            //TODO make it flexible
366            return getGlobusGatekeeper();
367        }
368        
369        private String gatekeeper;
370        */
371        /** Holds value of property transferExecutable. */
372        private boolean transferExecutable;
373        /*
374        public void setGlobusGatekeeper(String gatekeeper) {
375            this.gatekeeper = gatekeeper;
376        }
377        
378        public String getGlobusGatekeeper() {
379            return gatekeeper;
380        }
381        */
382        private String remoteInitialDir;
383        
384        public void setRemoteInitialDir(String remoteInitialDir) {
385            this.remoteInitialDir = remoteInitialDir;
386        }
387        
388        public String getRemoteInitialDir() {
389            return remoteInitialDir;
390        }
391        
392        private String getRemoteDirectory() {
393            // TODO this has to be specified better: remote execution directory could be different from scheduler execution directory
394            if (".".equals(getRemoteInitialDir())) return FilesystemToolkit.getCurrentDirectory();
395            return getRemoteInitialDir();
396        }
397    
398        protected String getResourceUsageSwitch(Job job) {
399            String res = super.getResourceUsageSwitch(job);
400            if (res == null) return res;
401    
402            return res.replaceAll("\"", "\\\\\"");
403        }
404        
405        /** Getter for property transferExecutable.
406         * @return Value of property transferExecutable.
407         *
408         */
409        public boolean isTransferExecutable() {
410            return this.transferExecutable;
411        }
412        
413        /** Setter for property transferExecutable.
414         * @param transferExecutable New value of property transferExecutable.
415         *
416         */
417        public void setTransferExecutable(boolean transferExecutable) {
418            this.transferExecutable = transferExecutable;
419        }
420        
421        /** Set the class that writes the sricpt that will be executed by the batch system */
422        public void setApplication(CSHApplication application){
423                this.application = application;
424        }
425    
426        /** Get the class that writes the sricpt that will be executed by the batch system */
427        public CSHApplication getApplication(){
428                return application;
429        }
430    
431        
432         public void Kill(Request request, List jobs) {
433                //System.out.println("condor kill");
434            
435             for(int z=0; z != jobs.size(); z++){
436                Job job = (Job)jobs.get(z);
437            
438                if(job.getProcesseIDs().size() == 0){
439                    System.out.println("No ProcesseIDs found for job " + job.getJobID());
440                    jobs.remove(z);
441                    z--;
442                }
443                else{
444                    for(int i=0; job.getProcesseIDs().size() != i; i++){
445    
446                        int attempt = 0;
447                        boolean success = false;
448                        String commmandOutput = "";
449                        System.out.print("ProcesseID: <" + job.getProcesseIDs().get(i) + "> of Job: <" + job.getJobID() + ">");
450    
451                        while (!success && (attempt < getMaxAttempts())) {
452                                try {
453                                   CSHCommandLineTask task = new CSHCommandLineTask("condor_rm " + ((String) job.getProcesseIDs().get(i)) , true, getMaxElapseTime());
454                                    task.execute();
455                                    if (task.getExitStatus() != 0) {
456                                        log.warn("condor_rm " + task.getOutput());
457                                        Thread.sleep(getMsBtwnFailure());
458                                        if(task.getOutput().lastIndexOf("Couldn't find") != -1) success = true;
459                                        System.out.print(task.getOutput());
460                                        attempt++;
461                                    } 
462                                    else{ 
463                                        success = true;
464                                        System.out.println("Killed");
465                                    }
466    
467                                    commmandOutput = task.getOutput();
468                                } 
469                                catch (Exception e) { System.out.print("condor_rm failed" + e); 
470                                System.out.print(commmandOutput);
471                                }
472                                try { Thread.sleep(getMsBtwnFailure());} 
473                                catch (Exception e1) {System.out.print("condor_rm failed");}
474                                    if(!success) System.out.print("/");
475                                    attempt++;
476                        }
477    
478                    }
479                    job.clearProcesseIDs();
480                    jobs.remove(z);
481                    z--;
482                }      
483           }
484        }    
485        
486        public String Status(Job job, int Processe) {
487                        if(job.getProcesseIDs().size() == 0) return "No ProcesseIDs found for job " + job.getJobID();
488                if(job.getProcesseIDs().size() < Processe) return job.getJobID() + " only has " + job.getProcesseIDs().size() + "processes, processe " + Processe + "dose not exist.";
489                
490                    
491               // for(int i=0; job.getProcesseIDs().size() != i; i++){
492    
493                    int attempt = 0;
494                    boolean success = false;
495                    String commmandOutput = "";
496                    System.out.print("ProcesseID: <" + job.getProcesseIDs().get(Processe) + "> of Job: <" + job.getJobID() + ">");
497    
498                    while (!success && (attempt < getMaxAttempts())) {
499                            try {
500                               CSHCommandLineTask task = new CSHCommandLineTask("condor_q " + ((String) job.getProcesseIDs().get(Processe)) , true, getMaxElapseTime());
501                                task.execute();
502                                if (task.getExitStatus() != 0) {
503                                    log.warn("condor_q " + task.getOutput());
504                                    Thread.sleep(getMsBtwnFailure());
505                                    
506                                   // if(task.getOutput().lastIndexOf("already finished") != -1) success = true;
507                                    //return (task.getOutput().replace('\n',' ');
508                                    attempt++;
509                                } 
510                                else{ 
511                                    success = true;
512                                    job.DispatchSuccessful();
513                                    job.AddProcesseID(task.getOutput().substring(task.getOutput().indexOf("submitted to cluster") + 20, task.getOutput().length()).replace('.', ' ').trim());
514                                    
515                                    if(task.getOutput().length() < 217) return("Done or Killed");
516                                    else{
517                                        String state = task.getOutput().substring(214,216).trim();
518                                        if( state.startsWith("R")) state = "RUN";
519                                        return(task.getOutput().substring(214,216).trim());
520                                    }
521    
522                                }
523    
524                                commmandOutput = task.getOutput();
525                            } 
526                            catch (Exception e) { System.out.print("condor_q failed" + e); 
527                            System.out.print(commmandOutput);
528                            }
529                            try { Thread.sleep(getMsBtwnFailure());} 
530                            catch (Exception e1) {System.out.print("condor_q failed");}
531                                if(!success) System.out.print("/");
532                                attempt++;
533                    }
534    
535               // }
536                
537           return "condor_q failed";
538        }
539        
540        public void stop() {
541        }
542        
543        
544        private List passedQueues =   new ArrayList(); // stores a list of queue that have passed, queue check test
545        /**
546         * Runs test(s) on underlying components to determine if submitting jobs should be attempted.
547         * @param queue The queue object to be tested
548         * @return Will return true to indicate everything is alright and false if the test has failed
549         */
550         public boolean test(Queue queue){
551             
552            /* 
553            if(passedQueues.contains(queue)) return true; //If the queue has already passed one time don't test it agin  
554             
555            if(! runInTimeLimitedThread("globus-job-run " + queue.getAssociatedAccessMethod().getBatchSystem() + " /bin/echo passed", getMaxElapseTime() + 300000, getMsBtwnFailure(), 4  )){
556                System.out.print("(ExeFailed)");
557                return false;
558            }
559            else{
560                String output = super.threadOuput.replace('\n',' ').trim();
561                if(output.matches("passed")){ //check if queue is open
562                    passedQueues.add(queue);
563                    return true;   
564                }
565                else{   
566                   return false;
567               }
568            }
569             */ return true;
570         }
571         
572         
573         private boolean useLSFMod = false;
574         public void setUseLSFMod( boolean useLSFMod){this.useLSFMod = useLSFMod;} 
575         public boolean getUseLSFMod(){return  useLSFMod;} 
576         
577        
578        
579    }