001    /*
002     * CondorDispatcher.java
003     *
004     * Created on July 17, 2003, 11:17 AM
005     *
006     * This file is part of the STAR Scheduler.
007     * Copyright (c) 2002-2006 STAR Collaboration - Brookhaven National Laboratory
008     *
009     * STAR Scheduler is free software; you can redistribute it and/or modify
010     * it under the terms of the GNU General Public License as published by
011     * the Free Software Foundation; either version 2 of the License, or
012     * (at your option) any later version.
013     *
014     * STAR Scheduler is distributed in the hope that it will be useful,
015     * but WITHOUT ANY WARRANTY; without even the implied warranty of
016     * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
017     * GNU General Public License for more details.
018     *
019     * You should have received a copy of the GNU General Public License
020     * along with STAR Scheduler; if not, write to the Free Software
021     * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
022     */
023    package gov.bnl.star.offline.scheduler.Dispatchers.condorg;
024    
025    import gov.bnl.star.offline.scheduler.*;
026    import gov.bnl.star.offline.scheduler.Queue;
027    import gov.bnl.star.offline.scheduler.request.Request;
028    import gov.bnl.star.offline.scheduler.ComponentLibrary;
029    import gov.bnl.star.offline.scheduler.Dispatchers.lsf.CSHApplication;
030    import gov.bnl.star.offline.scheduler.Dispatchers.lsf.LSFDispatcher;
031    import gov.bnl.star.offline.scheduler.util.CSHCommandLineTask;
032    import gov.bnl.star.offline.scheduler.util.FilesystemToolkit;
033    //import gov.bnl.star.offline.scheduler.util.StatisticsRecorder; //Moved Statistics recording to Scheduler.java LH
034    
035    import java.io.File;
036    import java.io.FileOutputStream;
037    import java.io.PrintStream;
038    import java.util.*;
039    
040    import java.util.logging.Level;
041    import org.apache.log4j.Logger;
042    
043    
044    /** Dispatches jobs using Condor-G on a remote site that uses LSF. It will use some
045     * extra rsl attributes created to command some extra features such as mail
046     * notification, resource usage, job name and target machine. These extra LSF
047     * attribute require a patch to the LSF job manager.
048     * @author Gabriele Carcassi
049     * @version 1.0 2003/07/23
050     */
051    public class CondorDispatcher extends LSFDispatcher {
052        static private Logger log = Logger.getLogger(CondorDispatcher.class.getName());
053        private String condorEx;
054    
055        private String condorOptions;
056        
057        public void setCondorEx(String condorEx) {
058            this.condorEx = condorEx;
059        }
060        
061        public String getCondorEx() {
062            return condorEx;
063        }
064    
065        /** Creates a new dispatcher */
066        public CondorDispatcher() {
067        }
068    
069        /** Creates the scripts and dispatches the job on the target machine.
070         * @param request the job request
071         */
072        public void dispatch(Request request, List jobs) {
073            log.info("Dispatching using Condor: \"" + request.getCommand() + "\"");
074    
075            // Enables the simulation mode if necessary
076            useSimulationMode(request.getSimulation());
077            reportedFailure = false;
078    
079            // Submits from the higher to the lower JobID. This way the
080            // user has a feel of  when the last job is going to be
081            // submitted
082            for (int nProcess = jobs.size() - 1; nProcess >= 0;
083                    nProcess--) {
084                Job job = (Job) jobs.get(nProcess);
085    
086                System.out.print("Dispatching process " +
087                    job.getJobID() + ".");
088                dispatch(request, job);
089            }
090    
091            //StatisticsRecorder.getIntance().recordStatistics(request, jobs);
092        }
093    
094        protected void dispatch(Request request, Job job) {
095            
096            //CondorNodePriorityStringGenerator priorityGen = new CondorNodePriorityStringGenerator();
097            //System.out.println(">>>>>>>>>>>>>>>>>>>>>>>>>>>>" + priorityGen.generateSyntax(job));
098            
099            
100            
101            //application = (CSHApplication) ComponentLibrary.getInstance().getComponent("CSHApplication");
102            //No longer get the CSHApplication object from the config file. Get it using the setApplication() and getApplication() via the config file
103            if(application == null){ //If this was not set in the config file or if we'er in junit testing mode then print an error, and use the default file
104                System.out.println(" >>>>>>>>>>>> CSHapplacation is not is italized.");  
105                String notSet = "The CSHApplication for this dispatcher was not set in the config file. If this is a jUnit test this is normal. Finding default \"CSHApplication\" in ComponentLibrary.";
106                log.warn(notSet);
107                System.out.println(notSet);
108                application = (CSHApplication) ComponentLibrary.getInstance().getComponent("CSHApplication");
109            }
110    
111            // TODO: all the parameters should be passed in one go
112            application.setJob(request, job);
113            //application.setScratchDir(scratchDir);
114            application.setSubmissionCommand(getCondorCommand(request, job));
115    
116            application.prepareJob();
117            prepareClassAd(request, job);
118    
119            log.info("Executing \"" + getCondorCommand(request, job) + "\"");
120    
121            if (!simulation) {
122                try {
123                    Thread.sleep(getMsBtwnSuccess());
124                } catch (Exception e) {
125                }
126    
127                long StarTime = System.currentTimeMillis();
128                int attempt = 0;
129                boolean success = false;
130    
131                while (!success && (attempt < getMaxAttempts())) {
132                    try {
133                        CSHCommandLineTask task = new CSHCommandLineTask(getCondorCommand(request, job), true, 30000);
134                        task.execute();
135    
136                        if (task.getExitStatus() != 0) {
137                            attempt++;
138                            log.warn("condor-submit failed: " + task.getOutput());
139                            if (attempt < getMaxAttempts()) { 
140                               Thread.sleep(getMsBtwnFailure());
141                               System.out.print("/");
142                            }
143                        } else {
144                            success = true;
145                            job.DispatchSuccessful();
146                            
147                            //recover the processID
148                            String processeID = task.getOutput().replace('\n', ' ').replaceAll(".*submitted to cluster ([0-9]*).*","$1");                        
149                            if( processeID.matches("[0-9]*") ) job.AddProcesseID( processeID );
150                            else System.out.println("Wanning: Could not record condor processID of job, trying to kill job with -k option will fail if attempted. Otherwise this has no harmful effect.\n");
151                            
152                            job.setDispatchTime(((int) Math.min(System.currentTimeMillis() - StarTime, java.lang.Integer.MAX_VALUE)));
153                        }
154                    } catch (Exception e) {
155                        log.fatal("Couldn't submit the script to Condor-g", e);
156                        try {
157                            Thread.sleep(getMsBtwnFailure());
158                        } catch (Exception e1) {
159                        }
160    
161                        System.out.print("/");
162                        attempt++;
163                    }
164                }
165    
166                if (success) {
167                    System.out.println(" done.");
168                } else {
169                    System.out.println(" FAILED!!");
170                }
171            } else {
172                System.out.println(" simulated.");
173            }
174        }
175    
176        /** Returns the command line to submit the job through condor-g.
177         * @param request the request that originated the job
178         * @param job the job to be dispatched
179         * @return the commandline to submit the job
180         */
181        protected String getCondorCommand(Request request, Job job) {
182            //The "cd" command was added here, just incase the dir is "somehow" chaged while running
183            return "cd " + FilesystemToolkit.getCurrentDirectory() + "; " + condorEx + " " + getClassAdName(request, job);
184        }
185    
186        /** Returns the name of the file containing the class ad. Class ad is the job
187         * description required by condor to submit a job.
188         * @param request the request that originated the job
189         * @param job the job to be submitted
190         * @return the file name of the class ad
191         */
192        protected String getClassAdName(Request request, Job job) {
193            return "sched" + job.getJobID() + ".condor";
194        }
195        private void prepareClassAd(Request request, Job job) {
196            try {
197                PrintStream classAd = new PrintStream(new FileOutputStream(
198                            new File(getClassAdName(request, job))));
199                createClassAd(request, job, classAd);
200            } catch (Exception e) {
201                log.fatal("Couldn't create the class ad", e);
202                throw new RuntimeException("Couldn't create the class ad " +
203                    getClassAdName(request, job) + ": " + e.getMessage());
204            }
205        }
206    
207        private void createClassAd(Request request, Job job, PrintStream classAd) {
208                
209            classAd.println("Universe       = vanilla");
210            classAd.println();
211            //classAd.println("+Experiment = \"star\"");
212            classAd.println("Notification   = never");
213            classAd.print("Executable     = ");
214            classAd.println(getExecutable());
215    
216            if (getArguments() != null) {
217                classAd.print("Arguments      = ");
218                classAd.println(getArguments());
219            }
220    
221            if (application.getStdin() != null) {
222                classAd.print("Input          = ");
223                classAd.println(application.getStdin());
224            }
225    
226            if (application.getStdout() != null) {
227                classAd.print("Output =        ");
228                classAd.println(application.getStdout());
229            }
230    
231            if (application.getStderr() != null) {
232                classAd.print("Error          = ");
233                classAd.println(application.getStderr());
234            }
235            
236            
237            if("rootd".equals(request.getFileListType())){
238                String rank =  CondorNodePriorityStringGenerator.generateSyntax(job);
239                if(!rank.equals("")){
240                    classAd.println("Rank          = " + rank);
241                }
242            }
243            
244            
245            ///////////////////////////
246            List summedRequirements = new ArrayList();
247            summedRequirements.addAll(getRequirements());
248       
249            if (job.getTarget() != null) {
250                summedRequirements.add("(Machine == \"" + job.getTarget() + "\")");
251            }
252            
253            if(summedRequirements.size() > 0){ //if the are some requorment 
254               String  requirements = "Requirements = ";
255               
256               for(int i = 0; i != summedRequirements.size(); i++){
257                   if(i > 0 ) requirements = requirements + " && ";
258                   requirements = requirements + summedRequirements.get(i);
259               }
260               
261               classAd.println(requirements);     
262            }
263            
264            
265    
266            /* //reminder conder needs to work like LSF
267            if (job.getTarget() != null) {
268                
269                if("rootd".equals(request.getFileListType())){
270                    if(getNodePriorityStringGenerator() == null){
271                        System.out.println("Warring the LSFNodePriorityStringGenerator has note been initialized in the config file for this dispatcher. Please contact your SUMS administrator and report this message."); 
272                        bsub.append(" -m ").append(job.getTarget());
273                    }
274                    else{
275                        bsub.append(" -m ").append(getNodePriorityStringGenerator().generateSyntax(job));
276                    }
277                }
278                else {
279                
280                    //if there are more requirements the logic will need to be change use && to delimit   
281                    classAd.println("Requirements = );
282                }
283            }
284             */
285            ///////////////////////////
286    
287            
288            classAd.print("Log            = ");
289            classAd.println(getLogName(job));
290            classAd.println("Getenv         = true");
291    
292            if (getRemoteDirectory() != null) {
293                classAd.print("Initialdir     = ");
294                classAd.println(getRemoteDirectory());
295            }
296    
297            if (getCondorOptions() != null) {
298                classAd.println(getCondorOptions());
299            }
300            
301    //        classAd.println("transfer_executable = false");
302            classAd.println("Queue");
303        }
304    
305        private String getExecutable() {
306            if (application.getCommandLine().indexOf(' ') == -1) {
307                return application.getCommandLine();
308            }
309    
310            return application.getCommandLine().substring(0, application.getCommandLine().indexOf(' '));
311        }
312    
313        private String getArguments() {
314            if (application.getCommandLine().indexOf(' ') == -1) {
315                return null;
316            }
317    
318            return application.getCommandLine().substring(application.getCommandLine()
319                                                                     .indexOf(' ') +
320                1);
321        }
322    
323        private String getLogName(Job job) {
324            // TODO maybe log filename should be put as a general property of Process (as stds)
325            return "sched" + job.getJobID() + ".condor.log";
326        }
327    
328    /*    private String getGlobusScheduler() {
329            //TODO make it flexible
330            return "stargrid01.rcf.bnl.gov/jobmanager-lsf";
331        }*/
332    
333        private String getRemoteDirectory() {
334            // TODO this has to be specified better: remote execution directory could be different from scheduler execution directory
335            return FilesystemToolkit.getCurrentDirectory();
336        }
337    
338        /** Getter for property condorOptions.
339         * @return Value of property condorOptions.
340         *
341         */
342        public String getCondorOptions() {
343            return this.condorOptions;
344        }
345        
346        /** Setter for property condorOptions.
347         * @param condorOptions New value of property condorOptions.
348         *
349         */
350        public void setCondorOptions(String condorOptions) {
351            this.condorOptions = condorOptions;
352        }
353        
354    /*    protected String getResourceUsageSwitch(Process job) {
355            String res = super.getResourceUsageSwitch(job);
356    
357            return res.replaceAll("\"", "\\\\\"");
358        }*/
359        
360        
361        /** Set the class that writes the sricpt that will be executed by the batch system */
362        public void setApplication(CSHApplication application){
363                this.application = application;
364        }
365    
366        /** Get the class that writes the sricpt that will be executed by the batch system */
367        public CSHApplication getApplication(){
368                return application;
369        }    
370        
371        
372        public void Kill(Request request, List jobs) {
373              //System.out.println("condor kill -----------started");
374            
375            
376             for(int z=0; z != jobs.size(); z++){
377                Job job = (Job)jobs.get(z);
378            
379                if(job.getProcesseIDs().size() == 0){
380                    System.out.println("No ProcesseIDs found for job " + job.getJobID());
381                    jobs.remove(z);
382                    z--;
383                }
384                else{
385                    for(int i=0; job.getProcesseIDs().size() != i; i++){
386    
387                        int attempt = 0;
388                        boolean success = false;
389                        String commmandOutput = "";
390                        System.out.print("ProcesseID: <" + job.getProcesseIDs().get(i) + "> of Job: <" + job.getJobID() + ">");
391    
392                        while (!success && (attempt < getMaxAttempts())) {
393                                try {
394                                    
395                                    //System.out.println("exec string -----> \"" + "condor_rm " + ((String) job.getProcesseIDs().get(i)) + "\"");
396                                    
397                                   CSHCommandLineTask task = new CSHCommandLineTask("condor_rm " + ((String) job.getProcesseIDs().get(i)) , true, getMaxElapseTime());
398                                    task.execute();
399                                    if (task.getExitStatus() != 0) {
400                                        log.warn("condor_rm " + task.getOutput());
401                                        Thread.sleep(getMsBtwnFailure());
402                                        if(task.getOutput().lastIndexOf("Couldn't find") != -1) success = true;
403                                        System.out.print(task.getOutput());
404                                        attempt++;
405                                    } 
406                                    else{ 
407                                        success = true;
408                                        System.out.println("Killed");
409                                    }
410    
411                                    commmandOutput = task.getOutput();
412                                } 
413                                catch (Exception e) { System.out.print("condor_rm failed" + e); 
414                                System.out.print(commmandOutput);
415                                }
416                                try { Thread.sleep(getMsBtwnFailure());} 
417                                catch (Exception e1) {System.out.print("condor_rm failed");}
418                                    if(!success) System.out.print("/");
419                                    attempt++;
420                        }
421    
422                    }
423                    job.clearProcesseIDs();
424                    jobs.remove(z);
425                    z--;
426                }      
427           }
428              
429              
430              
431         //System.out.println("condor kill -----------done");
432              
433              
434        }    
435        
436        public String Status(Job job, int Processe) {
437                        if(job.getProcesseIDs().size() == 0) return "No ProcesseIDs found for job " + job.getJobID();
438                if(job.getProcesseIDs().size() < Processe) return job.getJobID() + " only has " + job.getProcesseIDs().size() + "processes, processe " + Processe + "dose not exist.";
439                
440                    
441               // for(int i=0; job.getProcesseIDs().size() != i; i++){
442    
443                    int attempt = 0;
444                    boolean success = false;
445                    String commmandOutput = "";
446                    System.out.print("ProcesseID: <" + job.getProcesseIDs().get(Processe) + "> of Job: <" + job.getJobID() + ">");
447    
448                    while (!success && (attempt < getMaxAttempts())) {
449                            try {
450                               CSHCommandLineTask task = new CSHCommandLineTask("condor_q " + ((String) job.getProcesseIDs().get(Processe)) , true, getMaxElapseTime());
451                                task.execute();
452                                if (task.getExitStatus() != 0) {
453                                    log.warn("condor_q " + task.getOutput());
454                                    Thread.sleep(getMsBtwnFailure());
455                                    
456                                   // if(task.getOutput().lastIndexOf("already finished") != -1) success = true;
457                                    //return (task.getOutput().replace('\n',' ');
458                                    attempt++;
459                                } 
460                                else{ 
461                                    success = true;
462    
463                                    
464                                    if(task.getOutput().length() < 217) return("Done or Killed");
465                                    else{
466                                        String state = task.getOutput().substring(214,216).trim();
467                                        if( state.startsWith("R")) state = "RUN";
468                                        return(task.getOutput().substring(214,216).trim());
469                                    }
470    
471                              
472                                }
473    
474                                commmandOutput = task.getOutput();
475                            } 
476                            catch (Exception e) { System.out.print("condor_q failed" + e); 
477                            System.out.print(commmandOutput);
478                            }
479                            try { Thread.sleep(getMsBtwnFailure());} 
480                            catch (Exception e1) {System.out.print("condor_q failed");}
481                                if(!success) System.out.print("/");
482                                attempt++;
483                    }
484    
485               // }
486                
487           return "condor_q failed";
488        }
489        
490        public void stop() {
491        }
492        
493        String qstat = "";
494        /**
495         * Runs test(s) on underlying components to determine if submitting jobs should be attempted.
496         * @param queue queue object to be tested
497         * @return Will return true to indicate everything is alright and false if the test has failed
498         */
499        public boolean test(Queue queue){
500            
501            
502            if(qstat.length() == 0){ //only run the command one time
503                if(! runInTimeLimitedThread("condor_status -total" , getMaxElapseTime(), getMsBtwnFailure(), 5  )){
504                    return false;
505                }
506                else{
507                    //System.out.println(super.threadOuput); //show the output for debugging
508                    qstat = super.threadOuput.replace('\n', ' '); //keep everything on one line 
509                }
510            }
511             
512            if(qstat.length() == 0) return false; //just in case notting was returnned;
513            if(qstat.matches(".* [123456789]* .*")){ //find any non zero digit in bewteen speces. This will find eider n available or n used nodes, in both cases the site most likly alright. 
514    
515                    return true;  
516                }
517            
518            return false;    
519                
520        }
521        
522        
523        private List requirements = new ArrayList();
524        /**
525         Add requirements to the requirements="" string in the .condor file. 
526         Note: Machine should not be added as SUMS will fill in this value for you.
527        */
528        public void addRequirement(String requirement){ requirements.add(requirement.trim()); }
529        public List getRequirements(){return requirements;}
530        public void setRequirements(List requirements) {this.requirements = requirements;}
531        
532    }