001    /*
002     * XgridDispatcher.java
003     *
004     * Created on March 9, 2006, 11:47 AM
005     *
006     * This file is part of the STAR Scheduler.
007     * Copyright (c) 2002-2006 STAR Collaboration - Brookhaven National Laboratory
008     *
009     * STAR Scheduler is free software; you can redistribute it and/or modify
010     * it under the terms of the GNU General Public License as published by
011     * the Free Software Foundation; either version 2 of the License, or
012     * (at your option) any later version.
013     *
014     * STAR Scheduler is distributed in the hope that it will be useful,
015     * but WITHOUT ANY WARRANTY; without even the implied warranty of
016     * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
017     * GNU General Public License for more details.
018     *
019     * You should have received a copy of the GNU General Public License
020     * along with STAR Scheduler; if not, write to the Free Software
021     * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
022     */
023    
024    package gov.bnl.star.offline.scheduler.Dispatchers.xgrid;
025    
026    import gov.bnl.star.offline.scheduler.Dispatchers.lsf.*;
027    import gov.bnl.star.offline.scheduler.ComponentLibrary;
028    import gov.bnl.star.offline.scheduler.Dispatcher;
029    import gov.bnl.star.offline.scheduler.Job;
030    import gov.bnl.star.offline.scheduler.request.Request;
031    import gov.bnl.star.offline.scheduler.catalog.PhysicalFile;
032    import gov.bnl.star.offline.scheduler.util.CSHCommandLineTask;
033    import gov.bnl.star.offline.scheduler.util.GenericResourceRequirementStringDefinition;
034    import gov.bnl.star.offline.scheduler.Dispatchers.AbstractResourceStrategy;
035    import gov.bnl.star.offline.scheduler.util.sandbox.Sandbox;
036    import gov.bnl.star.offline.scheduler.util.sandbox.SandboxPackage;
037    import gov.bnl.star.offline.scheduler.Dispatchers.xgrid.BashApplication; 
038    
039    import gov.bnl.star.offline.scheduler.util.FilesystemToolkit;
040    import java.net.URL;
041    import java.io.*;
042    import java.util.*;
043    //import java.util.List;
044    import java.util.logging.Level;
045    import java.util.logging.Logger;
046    /**
047     * Dispatches a jobs using XGRID for Mac O.S. X
048     *
049     * @author  mlmiller
050     */
051    public class XgridDispatcher implements Dispatcher {
052        
053        static private Logger log = Logger.getLogger(XgridDispatcher.class.getName());
054        protected BashApplication application;
055        protected boolean simulation = false;
056        private int maxElapseTime = 30000;
057        private String xGridScratch = "./";
058        private String xGridEx = "xgrid -h 18.77.0.68 -p kuba -job";
059        private List retrievalCommands = new ArrayList();
060        private List deleteCommands = new ArrayList();
061        private boolean isSandboxDirCreated = false;
062        
063        public String getXgridEx() {
064            return xGridEx;
065        }
066        
067        public void setXgridEx(String v) {
068            this.xGridEx= v;
069        }
070    
071        public void setMaxElapseTime(int maxElapseTime){
072            this.maxElapseTime = maxElapseTime;
073        }
074        
075        public int getMaxElapseTime(){
076            return maxElapseTime;
077        }
078    
079        /** Creates a new instance of XgridDispatcher */
080        public XgridDispatcher() {
081        }
082        
083    
084        public String getOutputDirName(Job job) {
085            String dirN = FilesystemToolkit.getCurrentDirectory() + "/results_" + job.getRequestID();
086            return dirN;
087        }
088    
089        public String getInputDirName(Job job) {
090            String dirN = FilesystemToolkit.getCurrentDirectory() + "/input_" + job.getRequestID();
091            return dirN;
092        }
093    
094        public void dispatch(Request request, List jobs) {
095            
096            
097            try {
098                //System.out.println("");
099                //System.out.println("XGridDispatcher::dipatch() -- beginning");
100                //application = (CSHApplication) ComponentLibrary.getInstance().getComponent("CSHApplication");
101                simulation = request.getSimulation();
102                //System.out.println("\nWe are using simulation:\t" + simulation);
103    
104                try {
105                    //No long get the CSHApplication object from the config file. Get it using the setApplication() and getApplication() via the config file
106                    if(application == null){ //If this was not set in the config file or if we'er in junit testing mode then print an error, and use the default file
107                        //System.out.println(" >>>>>>>>>>>> CSHapplacation is not is italized.");  
108                        String notSet = "The BashApplication for the xgrid dispatcher was not set in the config file. If this is a jUnit test this is normal. Finding default \"BashApplication\" in ComponentLibrary.";
109                        log.warning(notSet);
110                        throw new RuntimeException(notSet);
111                        //System.out.println(notSet);
112                        //application = (BashApplication) ComponentLibrary.getInstance().getComponent("CSHApplication");
113                    }
114                } catch (Exception appE) {
115                    System.out.println("\nXGridDispatcher::dispatch(Request,List).  getApplication threw:\t" + appE + " abort");
116                    appE.printStackTrace();
117                    System.exit(1);
118                }
119                
120    
121                //System.out.println("Enter loop\n");
122                for (int nJob = jobs.size() - 1; nJob >= 0; nJob--) {
123                    Job job = (Job) jobs.get(nJob);
124    
125                     String jid = job.getJobID();
126    
127                     //this is kind of a hack, but this directory has to exist for all jobs
128                     if (isSandboxDirCreated==false) {
129                         System.out.println("\n ------------ Make input directory -------------");
130                         String mkdir = "/bin/mkdir " + getInputDirName(job);
131                         java.lang.Process dirShell = Runtime.getRuntime().exec(mkdir);
132                         isSandboxDirCreated=true;
133                     }
134                     //System.out.print("Dispatching process " + jid + ".");
135                
136                     dispatch(request, job);
137    
138                     if (application.getCSHScriptFileName().endsWith("_0.csh")) {//last job to dispatch, now do afterburner
139                         //System.out.println("\n ------------ Make results directory -------------");
140                         String mkdir = "/bin/mkdir " + getOutputDirName(job);
141                         
142                         java.lang.Process dirShell = Runtime.getRuntime().exec(mkdir);
143                         
144                         //System.out.println("\n------------- Make retrieval script---------------");
145                         
146                         StringBuffer rFileName = new StringBuffer(FilesystemToolkit.getCurrentDirectory() + "/" + job.getRequestID() + ".retrieve.csh");                                          
147                         //System.out.println("open file:\t" + rFileName);                     
148                         BufferedWriter rOut = new BufferedWriter(new FileWriter(rFileName.toString()));
149    
150                         for (int i=0; i<retrievalCommands.size(); ++i) {
151                             String rcmd = (String) retrievalCommands.get(i);
152                             //System.out.println("retrive job with:\t"+rcmd);
153                             rOut.write(rcmd + "\n");
154                         }
155                         rOut.close();
156                         //and make it executable
157                         String chmod1 = "/bin/chmod +x " + rFileName.toString();
158                         java.lang.Process shell = Runtime.getRuntime().exec(chmod1);                     
159                         
160                         
161                         
162                         //System.out.println("\n------------- Make delete script---------------");
163                         
164                         StringBuffer dFileName = new StringBuffer(FilesystemToolkit.getCurrentDirectory() + "/" + job.getRequestID() + ".delete.csh");                                          
165                         //System.out.println("open file:\t" + dFileName);                     
166                         BufferedWriter dOut = new BufferedWriter(new FileWriter(dFileName.toString()));
167    
168                         for (int i=0; i<deleteCommands.size(); ++i) {
169                             String dcmd = (String) deleteCommands.get(i);
170                             //System.out.println("delete job with:\t"+dcmd);
171                             dOut.write(dcmd + "\n");
172                         }
173                         dOut.close();
174                         //and make it executable
175                         String chmod2 = "/bin/chmod +x " + dFileName.toString();
176                         java.lang.Process shell2 = Runtime.getRuntime().exec(chmod2);                     
177    
178                     }
179    
180                     
181                     
182                }
183    
184                //StatisticsRecorder.getIntance().recordStatistics(request, jobs); //Moved Statistics recording to Scheduler.java LH
185                //System.out.println("XGridDispatcher::dipatch() -- finished");        
186                return;
187                
188            
189            } catch (Exception e) {
190                System.out.println("\n XGridDispatcher::dispatch(Lists) threw:\t" + e + "abort()");
191                e.printStackTrace();
192                System.exit(1);
193            }
194        }
195        
196        
197    
198    
199        //To do (MLM): convert this to xgrid submission logic flow
200        protected void dispatch(Request request, Job job) {
201            
202            
203            try {
204                
205                // TODO: all the parameters should be passed in one go
206                application.setJob(request, job);
207                application.setSubmissionCommand(getXgridCommand(request, job));
208                application.prepareJob();
209                //System.out.println("\n \t Executing: " + getXgridCommand(request, job));
210            
211                long StarTime = System.currentTimeMillis();
212                boolean success = false;
213                String pe="";
214    
215            
216            } catch (Exception e) {
217                System.out.println("\nXGridDispatcher::dispatch(Request,Job).  application.prepareJob threw:\t" + e);
218                e.printStackTrace();
219                //System.out.println("return w/o dispatching job");
220                System.exit(1);
221                return;
222            }
223    
224            //Ok, here we hack in a copy of the .list file into the input directory:
225            try {
226                if (isSandboxDirCreated==false) {
227                    System.out.println("\nFatal Error: input directory not yet created.  Abort()");
228                    System.exit(1);
229                }
230                //System.out.println("\n ------------ copy list file-------------");
231                String cpfile = "/bin/cp " + application.getInputFileListName() 
232                + " " + getInputDirName(job) + "/" + application.getInputFileListName();
233                java.lang.Process dirShell = Runtime.getRuntime().exec(cpfile);
234    
235            }
236            catch (Exception cpE) {
237                cpE.printStackTrace();
238                System.exit(1);
239            }
240    
241            if (simulation==true) {
242                System.out.println("..Simulated");
243                return;
244            }
245    
246            String cmdReturn = null;
247                try {
248                    // start up the command in child process
249                    //String cmd = "/bin/ls";
250                    //String cmd = "xgrid -h lincoln.mit.edu -p kuba -job submit /bin/hostname";
251                    //String cmd = "/bin/pwd";
252                    String cmd = getXgridCommand(request, job);
253                    System.out.println("dispatch job:\t" + job.getJobID()+"\twith command:" + cmd);
254                    Process child = Runtime.getRuntime().exec(cmd);
255                    //Process child = Runtime.getRuntime().exec(new String[] {csh, "-cf",  cmd});
256    
257                    // hook up child process output to parent
258                    InputStream lsOut = child.getInputStream();
259                    InputStreamReader r = new InputStreamReader(lsOut);
260                    BufferedReader in = new BufferedReader(r);
261                    
262                    // read the child process' output
263                    String line;
264                    int counter = 0;
265                    String tmpString = in.toString();
266                    //System.out.println("in:\t" + tmpString);
267                   
268                    while ((line = in.readLine()) != null) {
269                        //System.out.println("counter:\t" + counter + "\t" + line);
270                        
271                        try {
272                            if (counter==0) cmdReturn= line;
273                        }
274                        catch (Exception stringE) {
275                            System.out.println("concat threw:\t" + stringE);
276                            stringE.printStackTrace();
277                            System.exit(1);
278                        }
279                    }
280                    try {
281                        //System.out.println("\nSuccess.  Retrieved string:\t"+cmdReturn);
282                    }
283                    catch (Exception soutE) {
284                        System.out.println("cout threw:\t" + soutE);
285                        soutE.printStackTrace();
286                        System.exit(1);
287                    }
288                } 
289                catch (Exception runTimeE) {  // exception thrown
290                    System.out.println("Command failed!");
291                    runTimeE.printStackTrace();
292                    System.exit(1);
293                }
294            
295            //now get the job id
296            if (cmdReturn!=null) {
297            try {
298                String sub1 = "jobIdentifier = ";
299                String sub2 = "; }";
300                int istart = cmdReturn.indexOf(sub1);
301                int istop = cmdReturn.indexOf(sub2);
302                int beginAt = istart + sub1.length();
303                int endAt = istop;
304                //System.out.println("istart:\t" + istart + "\tistop:\t" + istop);
305                //System.out.println("loop from:\t" +beginAt + "\tto:\t" + endAt);
306                String jobId = cmdReturn.substring(beginAt, endAt);
307                System.out.println("\tsuccess.  jobId:\t" + jobId);
308                //job.AddProcesseID(jobId); //This doesn't seem to work right...
309                
310                
311                //ok, jobId is retived, so let's catalog for later use:
312                StringBuffer resultCmd = new StringBuffer(xGridEx);
313                resultCmd.append(" results");
314    
315                if (application.getStdout() != null) {
316                    resultCmd.append(" -so ").append(application.getStdout());
317                }
318                
319                if (application.getStderr() != null) {
320                    resultCmd.append(" -se ").append(application.getStderr());
321                }
322                
323                resultCmd.append(" -id " + jobId );
324                resultCmd.append(" -out " + getOutputDirName(job) + "/");
325                //System.out.println("retrive results with:\t" + resultCmd);
326                
327                retrievalCommands.add(resultCmd.toString());
328                
329                
330                //And add the delete command
331                StringBuffer delCmd = new StringBuffer(xGridEx);
332                delCmd.append(" delete");
333                delCmd.append(" -id " + jobId );   
334                deleteCommands.add(delCmd.toString());
335                
336                
337            }
338            catch (Exception idE) {
339                System.out.println("job id find threw:\t"+idE);
340                idE.printStackTrace();
341            }
342            }
343            else {
344                System.out.println("error, could not dispatch job.  abort()");
345                System.exit(1);
346            }
347                
348    
349        }
350    
351        
352        /** Returns the full xgrid command to be executed to dispatch the process. This
353        * command must executed in the directory in which the script resides.
354        * @return the xgrid command
355        */
356        //To do (MLM): convert this to real xgrid syntax
357        public String getXgridCommand(Request request, Job job) {
358           
359            try { //careful, java strings seem to throw...
360                //String thePwd = FilesystemToolkit.getCurrentDirectory();
361                //System.out.println("XGridDispather::getXgridCommand() -- pwd:\t" + thePwd);
362                //StringBuffer xsub = new StringBuffer("cd " + FilesystemToolkit.getCurrentDirectory() + "; " + xGridEx);
363                StringBuffer xsub = new StringBuffer(xGridEx);
364                xsub.append(" submit");
365                
366                      
367                //MLM check here for sanbox elements ------
368                Sandbox sandbox = request.getSandbox();
369                List packages = sandbox.getPackages();
370            
371                //String jobSandbox = null;
372                for (int ipack = 0; ipack<1; ++ipack) {
373                    SandboxPackage sp = (SandboxPackage) packages.get(ipack);
374                    
375                    
376                    
377                    
378                    //System.out.println("List of Sandbox files:");
379                    List files = sp.getFiles();
380                    for (int ifile=0; ifile<files.size(); ++ifile) {
381                        URL theUrl = (URL) files.get(ifile);
382                        String fileName = theUrl.getFile();
383                        String pathName = theUrl.getPath();
384                        //System.out.println("     file: " + fileName);
385                        //System.out.println("     path: " + pathName);
386                                    
387                        /*
388                        try {
389                            if(FilesystemToolkit.checkIfFileExists( new URL("file:" + fileName) ,true)){
390                                //this.addSandboxedFiles((new URL(packageObj.getPackageName()).getFile()));
391                                //jobSandbox = fileName;                            
392                                //instead of breaking here, we'll just copy each file into the input directory
393                                //break;
394    
395                                try {
396                                    if (isSandboxDirCreated==false) {
397                                        System.out.println("\nFatal Error: input directory not yet created.  Abort()");
398                                        System.exit(1);
399                                    }
400                                    String cpfile = "/bin/cp " + fileName + " " + getInputDirName(job) + "/.";
401                                    System.out.println("\tcp sandbox file command:\t" + cpfile);
402                                    java.lang.Process dirShell = Runtime.getRuntime().exec(cpfile);
403                                }
404                                catch (Exception cpE) {
405                                    cpE.printStackTrace();
406                                    System.exit(1);
407                                }
408                            }
409                            else {
410                                System.out.println("Error -- file:" + fileName + " does not exist!!!!!!!!!!!!!!!!!!!!");
411                            }
412                        } catch (Exception e) {
413                            String buffer = "Error -- file:" + fileName + " does not exist!!!!!!!!!!!!!!!!!!!!";
414                            System.out.println(buffer);
415                            log.log(Level.SEVERE, buffer, e);
416                        }
417                         */
418                                            
419                        try {
420                            if (isSandboxDirCreated==false) {
421                                System.out.println("\nFatal Error: input directory not yet created.  Abort()");
422                                System.exit(1);
423                            }
424                            String cpfile = "/bin/cp " + fileName + " " + getInputDirName(job) + "/.";
425                            //System.out.println("\tcp sandbox file command:\t" + cpfile);
426                            java.lang.Process dirShell = Runtime.getRuntime().exec(cpfile);
427                        }
428                        catch (Exception cpE) {
429                            cpE.printStackTrace();
430                            System.exit(1);
431                        }
432    
433                    }
434                }
435                /* MLM, don't need this anymore
436                if (jobSandbox!=null) {
437                    //System.out.println("using sandbox" + jobSandbox +" all others will be ignored");
438                    xsub.append(" -in " + jobSandbox);
439                }
440                 */
441                xsub.append(" -in " + getInputDirName(job));
442    
443                //xsub.append(" -file " + application.getInputFileListName());
444                xsub.append(' ').append(application.getCSHScriptFileName());
445    
446                //System.out.println("\n" + qsub.toString() + "\n");
447                return xsub.toString();
448            }
449            catch (Exception e) {
450                System.out.println("\nXGridDispatcher::getXgridCommand() threw:\t" + e);
451                e.printStackTrace();
452                System.exit(1);
453            }
454            
455            //If we reached this point we're dead and should abort
456            String undef = "XGridDispatcher::getXgridCommand()--undefined.   You should abort()";
457            System.exit(1);
458            return undef;
459        }
460    
461    
462        
463        public void setBashApplication(BashApplication application){ this.application = application; }
464        public BashApplication getBashApplication(){ return application; }
465        
466        
467        
468        
469        
470        
471        // ------------------------------------------------------------ We don't use these ---------
472        
473        /** Retrieves the output of the job from the target machine. It will first
474         * check whether the job has terminated, and than it will retrieve the
475         * output files and delete any temporary files on the target machine.
476         * @param job job description and requirements, include input and output file and environment
477         * variables
478         */
479        public void retrieveOutput(Request job, List jobs) {
480            
481        }
482        
483        /**
484        Kills the processes associated with this job.
485        */
486        public void Kill(Request request, List jobs) {
487        }
488        
489        /**
490        Returns The status of the job
491        */
492        public String Status(Job job, int Processe) {
493            return null;
494        }
495        
496        
497        /**
498        Cases the dispacher to stop dispaching and trys to kill dispached jobs
499        */
500        public void stop() {
501        }
502    
503        public boolean test(gov.bnl.star.offline.scheduler.Queue queue) {
504            return true;
505        }
506    }