001    /*
002     * $RCSfile: PassivePolicy.java,v $
003     *
004     * Created on August 6, 2006, 10:15 AM
005     *
006     * This file is part of the STAR Scheduler.
007     * Copyright (c) 2002-2003 STAR Collaboration - Brookhaven National Laboratory
008     *
009     * STAR Scheduler is free software; you can redistribute it and/or modify
010     * it under the terms of the GNU General Public License as published by
011     * the Free Software Foundation; either version 2 of the License, or
012     * (at your option) any later version.
013     *
014     * STAR Scheduler is distributed in the hope that it will be useful,
015     * but WITHOUT ANY WARRANTY; without even the implied warranty of
016     * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
017     * GNU General Public License for more details.
018     *
019     * You should have received a copy of the GNU General Public License
020     * along with STAR Scheduler; if not, write to the Free Software
021     * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
022     */
023    package gov.bnl.star.offline.scheduler.policy;
024    
025    
026    import gov.bnl.star.offline.scheduler.dataset.catalog.STARCatalog;
027    import gov.bnl.star.offline.scheduler.dataset.datasetManipulators.DropDuplicateRegX;
028    import gov.bnl.star.offline.scheduler.dataset.datasetManipulators.DropDuplicatesFromSortedList;
029    import gov.bnl.star.offline.scheduler.dataset.datasetManipulators.DropMatchingRegX;
030    import gov.bnl.star.offline.scheduler.dataset.datasetManipulators.SplitByRegX;
031    import gov.bnl.star.offline.scheduler.dataset.datasetManipulators.SortByRegX;
032    import gov.bnl.star.offline.scheduler.dataset.datasetManipulators.SplitByMinMaxEntries;
033    import gov.bnl.star.offline.scheduler.dataset.Dataset;
034    import gov.bnl.star.offline.scheduler.dataset.DatasetSubset;
035    import gov.bnl.star.offline.scheduler.dataset.EntryCounter;
036    import gov.bnl.star.offline.scheduler.dataset.EntryParser;
037    import gov.bnl.star.offline.scheduler.dataset.SubsetValidator;
038    import gov.bnl.star.offline.scheduler.util.ConfigToolkit;
039    import gov.bnl.star.offline.scheduler.util.ThreadSafeFilesystemToolkit;
040    import java.io.File;
041    
042    import org.globus.gsi.CertUtil;
043    import org.globus.gsi.GlobusCredential;
044    import org.globus.util.Util;
045    import org.globus.common.CoGProperties;
046    import org.globus.common.Version;
047    
048    
049    import gov.bnl.star.offline.scheduler.*;
050    import gov.bnl.star.offline.scheduler.catalog.*;
051    import gov.bnl.star.offline.scheduler.catalog.CatalogTask;
052    import gov.bnl.star.offline.scheduler.catalog.QueryResult;
053    import gov.bnl.star.offline.scheduler.catalog.StarCatalog;
054    import gov.bnl.star.offline.scheduler.request.Request;
055    import gov.bnl.star.offline.scheduler.policy.copyselector.*;
056    import gov.bnl.star.offline.scheduler.util.jobIDGen;
057    import gov.bnl.star.offline.scheduler.Dispatchers.DispatcherBase;
058    
059    
060    
061    import gov.bnl.star.offline.scheduler.informationService.VORS;
062    
063    import java.net.*;
064    import java.util.*;
065    
066    import java.util.Iterator;
067    import java.util.List;
068    import java.util.Set;
069    //import java.util.logging.Level;
070    import org.apache.log4j.Logger;
071    
072    
073    /** Decides how to split the job without triggering any resource redistribution.
074     * The passive policy doesn't trigger any file copy: it just collects
075     * information and does the best it can.
076     * <p>
077     * The policy will use copy selectors and file assignments to build the division.
078     * Refer to the documentation of those pieces for the details.
079     * <p>
080     * Overview of what the policy does <br>
081     * The policy basically solves all the queries, thus creating a big list of
082     * files. Then, according to which machines the file reside, the files are divided
083     * into different processes. At that point a new process is created.
084     * <p>How to configure the policy
085     * <p>There are few things how have to set in the policy to configure it. First
086     * of all, the queue names:
087     * <ul>
088     *   <li>localQueue - is the queue used if the job is assigned a local file.</li>
089     *   <li>nfsQueue - is the queue used if the job wasn't assigned any local file.</li>
090     *   <li>longQueue - is the queue used for long jobs, that is when the expected
091     *   time is more than minTimeLimitQueue</li>
092     * </ul>
093     * You will also need to configure the minTimeLimitQueue, which sets the limit
094     * for the length of the jobs on the short queues. Both nfsQueue and localQueue
095     * are short queues, meaning that if any job longer than the time set, will go
096     * to the longQueue, regardless of their input.
097     * <p>The last thing you will need to set, is whether rootd is available on the
098     * cluster. This will allow the policy to assign a job files that are on different
099     * local disks. This helps to satisfy the requirements, such as minFilesPerProcess
100     * when using files on local disk.
101     * <p>An optional setting is the clusterName, which will be assigned to all the
102     * jobs being created by this policy.
103     *
104     * @author Gabriele Carcassi & Pavel Jakl & Levente Hajdu
105     * @version $Revision: 1.52 $ $Date: 2006/11/21 00:41:32 $
106     */
107    public class PassivePolicy implements Policy {
108        static public Logger log = Logger.getLogger(PassivePolicy.class.getName());
109    
110        private String jobIDBase;
111        private int processNumber;
112        private String nfsQueue; //Deprecated
113        private String localQueue; //Deprecated
114        protected String longQueue; //Deprecated
115        private static boolean rootdAvailable=false; // TODO: this is messy if several services exists - Need to change
116        private static boolean xrootdAvailable=false; // TODO: this is messy if several services exists - Need to change
117        private static String xrootdRedirectorName=null;
118        private static int xrootdPort=0;         // TODO: this is messy if several services exists - Need to change
119        private int minTimeLimit; //Deprecated
120        private double timeLimit = ((double) minTimeLimit) / 60;
121    
122        public List localQueueList = new ArrayList();
123        public List genericQueueList = new ArrayList();
124    
125    
126        /** Holds value of property clusterName. */
127        private String clusterName;
128    
129        /* Initializes the creation of the jobIDs for the processes of one job.
130         * It basically initializes the first part of the ID, which is actually
131         * the time at which this method is running.
132         */
133        private void initJobIDs() {
134    
135           jobIDBase =  jobIDGen.GetJobIDType2();
136    
137           //jobIDBase = String.valueOf(System.currentTimeMillis());
138    
139            processNumber = 0;
140        }
141    
142        /* Returns a unique jobID. Each call returns a unique String.
143         */
144        private String getNextJobID() {
145            return "" + jobIDBase + "_" + processNumber++;
146        }
147    
148        /* Executes all the queries to the file catalog contained in the job
149         * request using the STAR scheduler implementation.
150         * <p>
151         * The result is simply added to the input file list.
152         */
153        /*
154        private QueryResult[] resolveCatalogQueries(Request request, CatalogQuery[] queries) {
155            QueryResult[] results = new QueryResult[queries.length];
156    
157            for (int nQuery = 0; nQuery < queries.length; nQuery++) {
158                CatalogQuery query = queries[nQuery];
159                if (request.getInputOrder() != null) {
160                    if (query.getAttributes() == null) {
161                        query.setAttributes(request.getInputOrder());
162                    } else {
163                        query.getAttributes().addAll(request.getInputOrder());
164                    }
165                }
166                System.out.print("Retrieving files for query : " + query.getQuery() + " ");
167    
168                CatalogTask task = new CatalogTask(StarCatalog.class, query);
169                task.execute();
170    
171                if (task.getExitStatus() == -1) {
172                    throw new RuntimeException( "A query to the file catalog failed.");
173                }
174    
175                System.out.println(" " + task.getResult().getPhysicalCount() + " files retrieved.");
176                query.setFilesReturned(task.getResult().getPhysicalCount());
177    
178                //CopySelector selector = CopySelectorFactory.createCopySelector(query);
179                results[nQuery] = task.getResult();
180            }
181    
182            return results;
183        }
184         */
185    
186        private CatalogQuery[] getCatalogQueries(Request request) {
187            int nQueries = 0;
188    
189            for (int n = 0; n < request.getInputList().size(); n++) {
190                if (request.getInputList().get(n) instanceof CatalogQuery) {
191                    nQueries++;
192                }
193            }
194    
195            CatalogQuery[] queries = new CatalogQuery[nQueries];
196    
197            for (int n = 0; n < queries.length; n++) {
198                queries[n] = (CatalogQuery) request.getInputList().get(n);
199            }
200    
201            return queries;
202        }
203        
204        /**
205         *This member associates the job to the queue. In order to set the association the following classes have to be set:         
206         * job.setQueueObj(queue);       
207         * job.setAccessMethod()
208         * job.setAccessPoint();
209         **/
210        public static void assignQueue(gov.bnl.star.offline.scheduler.Queue queue, Job job){
211            job.setQueueObj(queue);
212            
213            
214            //////////////////////////////////////////find an access method for the job ////////////////////////////////////
215            boolean accessMethodWasNotSet = true;
216            List accessMethods = queue.getAccessMethods();
217            if(ConfigToolkit.getToolkit().isLocalQueue(queue)){  //the queue is local, start with local the local access points
218                for(int i = 0; i != accessMethods.size(); i++){  
219                    AccessMethod accessMethod = (AccessMethod) accessMethods.get(i);
220                    if( ((LocalAccessPoint) accessMethod.getAccessPoints().get(0)).isLocal() ){ //It is assumed that if one accessPoint is local all accessPoints associated with that point will be local. 
221                        accessMethodWasNotSet = false;
222                        job.setAccessMethod( accessMethod );
223                    }
224                } 
225                if(accessMethodWasNotSet) System.out.println("Could not find local access point for " + queue.getID() + " looking for grid gatekeeper access point.");
226            } 
227            
228            
229            if(accessMethodWasNotSet){  //if no local access method can be found try grid acess methods
230                for(int i = 0; i != accessMethods.size(); i++){  
231                    AccessMethod accessMethod = (AccessMethod) accessMethods.get(i);
232                    if(! ( (LocalAccessPoint) accessMethod.getAccessPoints().get(0) ).isLocal() ){  //It is assumed that if one accessPoint is local all accessPoints associated with that point will be local. 
233                        accessMethodWasNotSet = false;
234                        job.setAccessMethod( accessMethod );
235                    }
236                }    
237            }
238            if(accessMethodWasNotSet){
239                throw new RuntimeException("No Grid accessMethod could be found for the queue: " + queue.getID() + "\nThis may be becuase the policy you are using was not intended to be run from this site or it may be a misconfiguration.");
240            }
241            
242            
243            
244            ////////////////////////////////////find an access point /////////////////////////////////////////////////// 
245            if(job.getAccessMethod().getAccessPoints().size() > 0)
246                job.setAccessPoint( (LocalAccessPoint)  job.getAccessMethod().getAccessPoints().get(0) );
247            else
248               throw new RuntimeException("No accessPoint could be found for the queue: " + queue.getID() + "\nThis may be becuase the policy you are using was not intended to be run from this site or it may be a misconfiguration."); 
249            
250            
251        }
252    
253    
254        public void assignQueues(Request request, List jobs) {
255    
256           gov.bnl.star.offline.scheduler.Queues queses = new Queues(); // Load all queues from the Queue object in the config file
257    
258           
259           /*
260           Object queueCheckFlag = ComponentLibrary.getInstance().getComponent("SUMS_FLAG_DO_NO_QUEUE_CHECK"); //get the ComponentLibrary flag to true off queue checking
261           
262           // Test all queues if the queues do not pass the test drop them from the list
263           for(int i = 0; i < genericQueueList.size(); i++){ 
264               gov.bnl.star.offline.scheduler.Queue queue = (gov.bnl.star.offline.scheduler.Queue) genericQueueList.get(i);
265               System.out.print("Testing queue (" + queue.getID() + "@" + queue.getBatchSystem().getSite().getSiteName() + ") : ");
266               if((! bypassQueueTests)&&(queueCheckFlag == null)){
267                   if(! queue.getAssociatedDispatcher().test(queue)){
268                       System.out.println("(faild)");
269                       if(! ConfigToolkit.getToolkit().isFlagSet("SUMS_FLAG_IGNORE_ERRORS")){
270                           String dropQueue = "The queue " + queue.getID() + (queue.getName()!=null?"("+ queue.getName() + ")":"") + " was dropped because it did not pass validation tests."; 
271                           System.out.println("Warning : " + dropQueue);//outoput notic the queue is bing dropped 
272                           log.warn(dropQueue); //log notic
273                           genericQueueList.remove(i); //remove queue 
274                           i--; //When the list gets small becuase something is removed, you need to go back one index else that element will not be tested
275                       }
276                   }
277                   else{
278                       System.out.println("(passed)");
279                   }
280               }
281               else{
282                   System.out.println("(Bypassed)");
283               }
284           }*/
285           
286           
287           if(genericQueueList.size() == 0){ //Test if queues where loaded. This also works as a hack for junit
288               System.out.println("Could not find any queues.");
289               log.warn("Could not find any queues.");
290               // System.exit(1); //This will kill the unit test, a bypass of some type is needed to privent this.
291               //job.addToReportText("Could not find any queues.\n\n");
292               return;
293           }
294    
295           //remove all the local queues for the generic queue list and put them on the local queue list
296           for(int i = 0; i < genericQueueList.size(); i++){
297               gov.bnl.star.offline.scheduler.Queue queue = (gov.bnl.star.offline.scheduler.Queue) genericQueueList.get(i);
298               if(queue.IsOptimizedAsLocalQueue()){
299                   localQueueList.add(queue);
300                   genericQueueList.remove(i);
301                   i = 0; //If the size of the list changes start over again from first to last
302               }
303           }
304    
305           //sort all the queues by "Search Order Priority"
306           gov.bnl.star.offline.scheduler.Queues queues = new Queues();
307           localQueueList = queses.OrderQueuesBySearchOrderPriority(localQueueList);
308           genericQueueList = queses.OrderQueuesBySearchOrderPriority(genericQueueList);
309           
310           try{ //If the queue table can not be printed let the user know in the log file
311                request.addToReportText(QueueInfo(localQueueList, genericQueueList));
312           }catch (Exception e){ request.addToReportText("\n\nCould not print out Queue Talbe\n"); }
313    
314    
315           //assignQueues to jobs
316            request.addToReportText("\n*************************************************************************\n");
317            request.addToReportText("*   This is a history of Job assignments.                               *\n");
318            request.addToReportText("*   If your Jobs are going to the wrong queue, this may tell you why.   *\n");
319            request.addToReportText("*************************************************************************\n");
320           for (int nJob = 0; nJob < jobs.size(); nJob++) { //loop over all jobs
321    
322               Job job = (Job) jobs.get(nJob);
323               boolean jobHasNoQueue = true;
324    
325               if((job.getTarget() != null) || "xrootd".equals(request.getFileListType()) || "rootd".equals(request.getFileListType())){ //if the job uses Ddisk files
326                   for(int i = 0; (i < localQueueList.size()) && jobHasNoQueue; i++){ //loop over the list as until you get to the end or the job finds a q
327                        gov.bnl.star.offline.scheduler.Queue queue = (gov.bnl.star.offline.scheduler.Queue) localQueueList.get(i);
328                        job.addToReportText(job.getJobID() + " trying " + queue.getName()  + "(" + queue.getID() + ")");
329                        if(queue.doesJobFit(job, request)){
330                            //job.setQueueObj(queue);
331                            assignQueue(queue, job);
332                            job.addToReportText(job.getJobID() + " assigned to " + queue.getName()  + "(" + queue.getID() + ")");
333                            jobHasNoQueue = false;
334                            localQueueList = queues.rotateSameLevelSearchOrderPriority(localQueueList);
335                        }
336                        else{
337                            job.addToReportText(queue.getMessages());
338                        }
339                   }
340               }
341    
342               if(jobHasNoQueue){ //if no queue has been found yet
343                   for(int i = 0; (i < genericQueueList.size()) && jobHasNoQueue; i++){ //loop over the list as until you get to the end or the job finds a q
344                        gov.bnl.star.offline.scheduler.Queue queue = (gov.bnl.star.offline.scheduler.Queue) genericQueueList.get(i);
345                        job.addToReportText(job.getJobID() + " trying " + queue.getName()  + "(" + queue.getID() + ")");
346                        if(queue.doesJobFit(job, request)){
347                            //job.setQueueObj(queue);
348                            assignQueue(queue, job);
349                            job.addToReportText(job.getJobID() + " assigned to " + queue.getName()  + "(" + queue.getID() + ")");
350                            jobHasNoQueue = false;
351                            genericQueueList = queues.rotateSameLevelSearchOrderPriority(genericQueueList);
352                        }
353                        else{
354                            job.addToReportText(queue.getMessages());
355                        }
356                   }
357               }
358    
359               if(jobHasNoQueue){
360                   //error code, to handel what happin if a job can not find a q
361                   System.out.println("Job could not find a queue for " + job.getJobID() + " !!!!!!!!!!!!!\n The job will be removed.");
362                   System.out.println("Job report \n" + job.GetReportText());
363                   log.warn("Job could not find a queue for " + job.getJobID() + " !!!!!!!!!!!!!");
364                   job.addToReportText("\n\nJob could not find a queue !!!!!!!!!!!!!\n The job will be removed.");
365                   jobs.remove(nJob); //This will remove the job
366                   nJob--; //because one job was removed,nJobs has to be set back by one job
367               }
368    
369               job.addToReportText(" "); //between every job put a blank line in the report
370           }
371    
372        }
373    
374        private void askFeedback(SubsetValidator validator) {
375            log.info("Assignment not valid. Asking for user feedback");
376            System.out.println();
377            System.out.println("The scheduler wasn't able to meet the requirement.\nThis is the proposed distribution:");
378            /* System.out.println(assignment.getReport()); */
379            
380                    
381              
382            /////////////////////////////// print index
383            System.out.println("\n-------------------------");
384            System.out.println("JobIndex -> FilesInJob -> EventsInJob");
385            List indexs = validator.getDatasetIndexList();
386            for(int i = 0; (i != (indexs.size() - 1)); i++){
387                 DatasetSubset datasetSubset = (DatasetSubset) indexs.get(i);
388                 //System.out.println(i + " -> " + datasetSubset.getPhysicalFileList().size() );
389                 System.out.println("job" + i + " -> " + datasetSubset.getFilesInSubset() + "files -> " + datasetSubset.getEventsInSubset() + "Events"  );
390            }
391            System.out.println("\n-------------------------");
392            //////////////////////////////
393            
394            
395            System.out.print("Should it be dispatched anyway? (press y to dispatch) ");
396            
397            if( ConfigToolkit.getToolkit().isFlagSet("SUMS_FLAG_IGNORE_ERRORS")) return; //ignore the error if the flag is set
398            
399            try {
400                if (System.in.read() == 'y')
401                    return;
402            } catch (Exception e) {}
403    
404            log.fatal("User chose to cancel");
405            throw new RuntimeException("Scheduling didn't meet user requirements, and user cancelled.\nTry changing minFilesPerProcess, or the number of files returned by the query.");
406        }
407    
408    
409    
410        Comparator createComparator(List attrs) {
411            return new InputOrderComparator(attrs);
412        }
413        class InputOrderComparator implements Comparator {
414            private List attrs;
415            InputOrderComparator(List attrs) {
416                this.attrs = attrs;
417            }
418                    public int compare(Object o1, Object o2){
419                        PhysicalFile p1 = (PhysicalFile) o1;
420                        PhysicalFile p2 = (PhysicalFile) o2;
421                        Iterator iter = attrs.iterator();
422                        while (iter.hasNext()) {
423                            String att = (String) iter.next();
424                            Comparable a1 = (Comparable) p1.getAtribute(att);
425                            Comparable a2 = (Comparable) p2.getAtribute(att);
426                            if (a1 == null) return -1;
427                            if (a2 == null) return -1;
428                            int res = a1.compareTo(a2);
429                            if (res != 0) return res;
430                        }
431                        return 0;
432                    }
433                };
434    
435    
436    
437    
438        private List createJobs(Request request, List datasetIndexList) {
439            
440            /*
441            if (request.getInputOrder() != null) {
442                List attrs = request.getInputOrder();
443                assignment.orderFiles(createComparator(attrs));
444            }
445    
446            Set locations = assignment.getLocations();
447            Iterator iter = locations.iterator();
448            List jobs = new ArrayList();
449    
450            while (iter.hasNext()) {
451                Location location = (Location) iter.next();
452                String machine = null;
453                
454                //if (!location.equals(Location.getNFS())&& !location.equals(Location.getHPSS())) {
455                //Don't set the node name if the file is xrootd or NSF or HPSS
456                if (!"xrootd".equals(request.getFileListType()) &&!location.equals(Location.getNFS())&& !location.equals(Location.getHPSS())) {
457                
458                    
459                    machine = location.toString();
460    
461                    if ("".equals(machine)) {
462                        machine = null;
463                    }
464                }
465    
466                for (int nProc = 0; nProc < assignment.minProcs(location);nProc++) {
467    
468                    List files = assignment.getFiles(location, nProc);
469    
470                    Job job = new Job(getNextJobID());
471                    job.setRequest(request);
472                    job.setTarget(machine);
473                    job.setStdin(request.getStdIn());
474                    job.setStdout(request.getStdOut());
475                    job.setStderr(request.getStdErr());
476                    job.setInput(new ArrayList());
477                    job.getInput().addAll(files);
478                    job.setCluster(getClusterName());
479                    jobs.add(job);
480                }
481            }
482            return jobs;
483             *
484             */
485            
486            
487            List jobs = new ArrayList();
488            
489            //datasetIndexList
490            for(int i = 0; (i != (datasetIndexList.size() - 1)); i++){
491                
492                    DatasetSubset datasetSubset = (DatasetSubset) datasetIndexList.get(i);
493                    //System.out.println("job index --->" + i);
494    
495                    Job job = new Job(getNextJobID());
496                    job.setRequest(request);
497                    //job.setTarget(datasetSubset.getNode());
498                    job.setStdin(request.getStdIn());
499                    job.setStdout(request.getStdOut());
500                    job.setStderr(request.getStdErr());
501                    
502                    //job.setInput(new ArrayList());
503                    //job.getInput().addAll(files); //replaced
504                    
505                    job.setDatasetSubset(datasetSubset);
506                    
507                    job.setCluster(getClusterName());
508                    jobs.add(job);
509                
510            }
511    
512            return jobs;
513     
514        }
515    
516        /** Processes a job requests, splitting into multiple processes and assigning
517         * the target machines.
518         * <p>
519         * The step followed here are:
520         * <ul>
521         * <li>Initializes the job ID </li>
522         * <li>Executes the catalog queries</li>
523         * <li>Changes the maxFilePerProcess to fit in the short queue if possible</li>
524         * <li>Using AssignmentStrategyFactory, retrieves the assignmentStrategies
525         * for each query, and assign the file to the different machines</li>
526         * <li>If the job had no input, just dispatch a job with no input</li>
527         * <li>If the job had input, split into processes according to the distribution</li>
528         * <li>If the the distribution is invalid (some user constraint weren't met)
529         * ask user permission to continue</li>
530         * <li>Assigns the correct queue depending on the estimated job length</li>
531         * <li>Saves the report</li>
532         * </ul>
533         * Most of these actions are done in a separate private method. Refer to those
534         * for more information.
535         * @param request the job to be processed
536         */
537        public List assignTargetMachine(Request request) {
538            
539            
540            ////////test block///////////
541    //        System.out.println(" debug point -1 ");
542    //        DynamicConfigPolicy configpol = new DynamicConfigPolicy();
543    //        
544    //        if(true) return configpol.assignTargetMachine(request);
545            /////////////////////////////
546            
547            
548            
549            log.info("Assign target with PassivePolicy");
550            initJobIDs();
551            AdjustIOFileNames(request);
552            
553            
554            //get user grid cert if any exist using globus API if there is not cert use <username>@<site> example lbhajdu@rhic.bnl.gov
555            try {
556                GlobusCredential proxy = null;
557                String  file = CoGProperties.getDefault().getProxyFile();
558                proxy = new GlobusCredential(file);
559                request.setAuthenticatedUser( proxy.getSubject() );
560            } catch(Exception e) {
561                request.setAuthenticatedUser( System.getProperty("user.name") + "@" + ConfigToolkit.getToolkit().myLocalSite().getSiteName()  );
562            }
563            
564            
565            
566            /////////////////////test block/////////////////////////////
567            /*
568            VORS vors = new VORS();
569            vors.setService("http://vors.grid.iu.edu");
570            vors.setSiteID("STAR-BNL");
571            vors.refreshConnections();
572            vors.refreshConnections();
573            
574            VORS vors2 = new VORS();
575            vors2.setService("http://vors.grid.iu.edu");
576            vors2.setSiteID("NERSC-PDSF");
577            vors2.refreshConnections();
578            */
579            ////////////////////////////////////////////////////////////
580            /* //replaed by new fileBUffer based catalog
581            CatalogQuery[] queries = getCatalogQueries(request);
582            log.info("Executing catalog queries");
583            QueryResult[] results = resolveCatalogQueries(request, queries);
584            log.debug("Assigning input files to processes accordint to their position");
585            */
586    
587    ////////////////////lbh////Look at all the local queues in order end take the 1st one that has a time limit and try to make all jobs fit that
588            //gov.bnl.star.offline.scheduler.Queues queues = new Queues(); //load all the queues from the log file
589            
590    /////////////////////////////////////////////////
591            
592            
593            
594            
595            
596    ////////////////////////////////////////////////clac the min/max files split/////////////////////////////////////////        
597            int gap = 25;//%
598            int shortQueueTimeLimit = Integer.MAX_VALUE;
599            int optFiles = - 1;
600            int maxFiles = request.getResource("FilesPerProcess").getMax();
601            int minFiles = request.getResource("FilesPerProcess").getMin();
602            
603            //get the queue with the shartest time limit
604            for(int i = 0; i < genericQueueList.size(); i++){
605                gov.bnl.star.offline.scheduler.Queue queue = (gov.bnl.star.offline.scheduler.Queue) genericQueueList.get(i);
606                if( (queue.getTimeLimit() > 0) && (queue.getTimeLimit() < shortQueueTimeLimit )) {
607                    shortQueueTimeLimit = queue.getTimeLimit();
608                }
609            } 
610            if((shortQueueTimeLimit != Integer.MAX_VALUE) && (request.getFilesPerHour() != Double.POSITIVE_INFINITY)){ //if there is some queue time data and files perhour from the user, clac how many will fit into the smallest queue
611                optFiles = (int) (((float)request.getFilesPerHour() * ((float)shortQueueTimeLimit / (float)60)) - .5 );
612                if(optFiles < 1) optFiles = 1;
613            }
614                
615            
616            //System.out.println("optFiles-------------->" + optFiles);
617            
618            String splitStatment; 
619            String splitDebugging = " file splitting mode ---> "; 
620            
621    
622            if((maxFiles == -1) && (minFiles==-1) && (optFiles == -1)){ //if the user sets notting just use somedefult value
623                splitDebugging = splitDebugging + "0";
624                maxFiles = 150;
625                minFiles = (int) ((double)maxFiles - ((double)maxFiles * (double)gap * (double).01)) ;
626                splitStatment = "No values (maxFilesPerProcess, minFilesPerProcess, filesPerHour) to calculate an optimal file split values, so using default splitFileList(min=" + minFiles +", max="+ maxFiles +")";   
627            }else if((maxFiles > 0) && (minFiles==-1) && (optFiles == -1)){ //if only max files is given
628                splitDebugging = splitDebugging + "1";
629                minFiles = (int) ((double)maxFiles - ((double)maxFiles * (double)gap * (double).01)) ;
630                splitStatment = "Using split valuse : splitFileList(min=" + minFiles +", max="+ maxFiles +")";
631            }else if((maxFiles == -1) && (minFiles > 0) && (optFiles == -1)){ //if only min files is given
632                splitDebugging = splitDebugging + "2";
633                maxFiles = (int) ((double)minFiles + ((double)minFiles * (double)gap * (double).01)) ;
634                splitStatment = "Using split valuse : splitFileList(min=" + minFiles +", max="+ maxFiles +")";  
635            }else if((maxFiles  > 0) && (minFiles > 0) && (optFiles == -1)){ //if min and max files is given
636                splitDebugging = splitDebugging + "3";
637                splitStatment = "Using split valuse : splitFileList(min=" + minFiles +", max="+ maxFiles +")";
638            }else if((maxFiles == -1) && (minFiles == -1) && (optFiles > 0)){ //if optFiles via FilesPerHour is given
639                splitDebugging = splitDebugging + "4";
640                maxFiles = optFiles;
641                minFiles = (int) ((double)maxFiles - ((double)maxFiles * (double)gap * (double).01)) ;
642                splitStatment = "Using split valuse : splitFileList(min=" + minFiles +", max="+ maxFiles +")";  
643            }else if((maxFiles > 0) && (minFiles > 0) && (optFiles > 0)){ //if optFiles via FilesPerHour is given
644                splitDebugging = splitDebugging + "5";
645                if((minFiles < optFiles) && (optFiles < maxFiles)){ minFiles = optFiles; } //This is min becuase the splitter favors slipping by the min 
646                splitStatment = "Using split valuse : splitFileList(min=" + minFiles +", max="+ maxFiles +")";
647            }else if((maxFiles > 0) && (minFiles == -1) && (optFiles > 0)){
648                 splitDebugging = splitDebugging + "6";
649                 if(optFiles < ((double)maxFiles - ((double)maxFiles * (double)gap * (double).01))){
650                     minFiles = optFiles; 
651                 }else{
652                     minFiles = (int) ((double)maxFiles - ((double)maxFiles * (double)gap * (double).01)) ;
653                 } 
654                 splitStatment = "Using split valuse : splitFileList(min=" + minFiles +", max="+ maxFiles +")";   
655            }else if((maxFiles == -1) && (minFiles > 0) && (optFiles > 0)){   
656                splitDebugging = splitDebugging + "7";
657                if(optFiles < ((double)minFiles + ((double)minFiles * (double)gap * (double).01))){
658                    maxFiles = optFiles;
659                }else{
660                    maxFiles = (int) ((double)minFiles + ((double)minFiles * (double)gap * (double).01));
661                }
662                splitStatment = "Using split valuse : splitFileList(min=" + minFiles +", max="+ maxFiles +")";
663            }else{
664                log.fatal("hit error trap when tring to clac min max file split");
665                throw new RuntimeException("The PassivePolicy hit an error trap while trying to calculate a min/max split value.");
666            }
667    
668            if(minFiles < 1) minFiles = 1; //this is in case the math makes the min smaller then zero
669            
670     
671            log.info(splitDebugging + 
672                    "\n MinSplit = " + minFiles + " (uses by splitter)" +
673                    "\n maxSplit = " + maxFiles + " (uses by splitter)" +
674                    "\n optFiles = " + optFiles +
675                    "\n request.Maxfiles = " + request.getResource("FilesPerProcess").getMax() +
676                    "\n request.Minfiles = " + request.getResource("FilesPerProcess").getMin() );
677      
678     ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////           
679            
680            
681            //System.out.println(">>>>>>>>>>>>>>>  maxFiles = " + maxFiles + ", minFiles = " + minFiles);
682            
683            //System.out.println( "Integer.MAX_VALUE=" +  Integer.MAX_VALUE );
684            //System.out.println( "request.getFilesPerHour() = " + request.getFilesPerHour() );
685            //System.out.println(gap);
686            //System.out.println(shortQueueTimeLimit);      
687            //System.out.println(optFiles);
688            //System.out.println(maxFiles);
689            //System.out.println(minFiles);
690            
691            
692            //System.out.println("...............>" + request.getResource("FilesPerProcess").getMax());
693            
694            
695            /*
696            int maxFiles = request.getResource("FilesPerProcess").getMax();
697            
698            
699            request.getResource("FilesPerProcess").
700            
701            int newMaxFiles = request.
702            
703            int newMaxFiles = (int) (((float)request.getFilesPerHour() * ((float)shortQueueTimeLimit / (float)60)) - .5 );
704            */
705            
706            
707            //Math.min(
708            
709            
710    /////////////////////////////////////////////////        
711            
712           // List genericQueueList = queues.OrderQueuesBySearchOrderPriority(queues.getQueues(PPQueues)); //make a list of all the generic  queue objects
713    
714    /*        
715            System.out.println("s - debug point 1");
716            boolean requestResized = false;
717            for(int i = 0; i < genericQueueList.size(); i++){ //loop over the list as until you get to the end or the job finds a q
718                
719                System.out.println("s - debug point 2");
720                
721                gov.bnl.star.offline.scheduler.Queue queue = (gov.bnl.star.offline.scheduler.Queue) genericQueueList.get(i);
722                if(queue.willJobsFit(request) && (! requestResized) && (queue.getTimeLimit() > 0)){
723                    
724                    System.out.println("s - debug point 3");
725                    
726                    timeLimit = (int)((float)queue.getTimeLimit() / (float)60);
727                    // TODO this should be more general
728                    // If it can fit in the short queue, then use it!
729                    if ((request.getFilesPerHour() != Double.POSITIVE_INFINITY) && (((double)request.getResource("FilesPerProcess").getMin())/request.getFilesPerHour() < timeLimit)) {
730                        
731                        System.out.println("s - debug point 4");
732                        
733                        //These two lines are almost the same however the one at the bottom has less round off error //LH
734                        //request.setMaxFilesPerProcess(Math.min((int) (request.getFilesPerHour() * timeLimit), request.getMaxFilesPerProcess())); //you can't set it higher then MaxFilesPerProces
735                        int maxFiles = request.getResource("FilesPerProcess").getMax();
736                        int newMaxFiles = Math.min((int) (((float)request.getFilesPerHour() * ((float)queue.getTimeLimit() / (float)60)) - .5 ), maxFiles);
737                        if(newMaxFiles!=0){
738                            request.setResourceMax("FilesPerProcess",newMaxFiles); //you can't set it higher then MaxFilesPerProcess
739                        }
740                        request.addToReportText("Note : Setting MaxFilesPerProcess to " + newMaxFiles + " to try and optimized job length to fit in  " + queue.getName() + " queue.\n");
741                        log.debug("Setting MaxFilesPerProcess to " + newMaxFiles + " to try and optimized job length to fit in  " + queue.getName() + " queue.");
742                        
743                        System.out.println("s - debug newmax=" + newMaxFiles);   
744                    }
745                    requestResized = true;
746                }
747            }
748    */        
749    
750            // log.finer("Number of input files to assign: " + assignment.g.getFileCount());
751            // The job has no input
752            if (request.getInputList().size() == 0) {
753                // If the job was specified as not having input, then submit
754                // anywhere.
755                if (request.hasNoInput()) {
756                    // If the number of processes is set, then submit the number of
757                    // jobs. Otherwise just submit one
758                    int nProcesses = request.getNProcesses();
759                    if (nProcesses == -1) nProcesses = 1;
760    
761                    List jobs = new ArrayList();
762                    for (int n = 0; n < nProcesses; n++) {
763                        Job job = new Job(getNextJobID());
764                        job.setRequest(request);
765                        job.setStdin(request.getStdIn());
766                        job.setStdout(request.getStdOut());
767                        job.setStderr(request.getStdErr());
768                        //job.setTarget(null);
769                        //job.setInput(new ArrayList());
770                        request.setID(jobIDBase);
771                        jobs.add(job);
772                    }
773    
774                    assignQueues(request, jobs); //This is called there as a replacment for set cluster.
775                    request.setJobs(jobs); //make pointers from the request to the job objects
776                    return jobs;
777                } else {
778                    // Otherwise don't submit anything
779                    System.out.println("Queries and wildcards returned no input files");
780                    return null;
781                }
782            }       
783            
784            
785            
786            //////////////////// User Requested  Special Sorting //////////////////////////////////////////////
787            
788            // query.setSingleCopy(true); 
789            //req.prepareInputOrder(attrs.getValue("inputOrder"));
790            //attrs.getValue("preferStorage")
791            
792            String preferStorage = null;
793            boolean singleCopy = true;
794            
795            for (int n = 0; n < request.getInputList().size(); n++) {
796                Object object = request.getInputList().get(n);
797                if (object instanceof CatalogQuery) {
798                   CatalogQuery  query = (CatalogQuery) object;
799                   if(preferStorage == null) preferStorage = query.getPreferStorage();
800                   else if(! query.getPreferStorage().equals(preferStorage)){
801                       System.out.println("Wringing : More then one preferStorage attribute can not  honored !!!");
802                   } //else it is the same and we don't need to do anything  
803                   
804                   if(! query.isSingleCopy()){
805                       System.out.println("Wringing : You have specified “singleCopy = false” this means duplicate files may exist in you dataset, and they will not be filter out for any of your inputs!!!!!!!! Please be certain this is what you intended.");
806                       singleCopy = false;
807                   }
808      
809                }    
810            }
811            
812            
813            SortByRegX userRequestedSort = null; //defins user requested sorting
814           
815            List inputOrderList = request.getInputOrder();
816            if(inputOrderList != null){
817                for(int i = 0; i != inputOrderList.size(); i++){
818                   String item = (String) inputOrderList.get(i); 
819                   
820                   if(userRequestedSort != null){
821                       System.out.println("Wanning : If more then one inputOrder is used, it may not be honored  !!!");
822                   }
823                   
824                   if(item.equals("fdid")){
825                       userRequestedSort =  new SortByRegX("([0-9]*)::[a-zA-Z]*::[a-zA-Z]*::[a-zA-Z0-9:.]*::[^:]*::[^:/]*::[0-9]*.*");
826                   }else if(item.equals("storage")){
827                       userRequestedSort =  new SortByRegX("[0-9]*::([a-zA-Z]*)::[a-zA-Z]*::[a-zA-Z0-9:.]*::[^:]*::[^:/]*::[0-9]*.*");
828                   }else if(item.equals("site")){
829                       userRequestedSort =  new SortByRegX("[0-9]*::[a-zA-Z]*::([a-zA-Z]*)::[a-zA-Z0-9:.]*::[^:]*::[^:/]*::[0-9]*.*");
830                   }else if(item.equals("node")){
831                       userRequestedSort =  new SortByRegX("[0-9]*::[a-zA-Z]*::[a-zA-Z]*::([a-zA-Z0-9:.]*)::[^:]*::[^:/]*::[0-9]*.*");
832                   }else if(item.equals("path")){
833                       userRequestedSort =  new SortByRegX("[0-9]*::[a-zA-Z]*::[a-zA-Z]*::[a-zA-Z0-9:.]*::([^:]*)::[^:/]*::[0-9]*.*");
834                   }else if(item.equals("filename")){
835                       userRequestedSort =  new SortByRegX("[0-9]*::[a-zA-Z]*::[a-zA-Z]*::[a-zA-Z0-9:.]*::[^:]*::([^:/]*)::[0-9]*.*");
836                   }else if(item.equals("events")){
837                       userRequestedSort =  new SortByRegX("[0-9]*::[a-zA-Z]*::[a-zA-Z]*::[a-zA-Z0-9:.]*::[^:]*::[^:/]*::([0-9]*).*");
838                   }else {
839                       userRequestedSort =  new SortByRegX("[0-9]*::[a-zA-Z]*::[a-zA-Z]*::[a-zA-Z0-9:.]*::[^:]*::[^:/]*::[0-9]*::(.*)");
840                   }
841                }
842            }
843            
844            
845    
846            //////////////////////////////////////////////////////////////////////////////////////////////////////////
847            EntryParser  entryParser = new EntryParser();
848            entryParser.addLFN             ("([0-9]*)::[a-zA-Z]*::[a-zA-Z]*::[a-zA-Z0-9:.]*::[^:]*::[^:/]*::[0-9]*.*");
849            entryParser.addSTORAGE_SERVISE ("[0-9]*::([a-zA-Z]*)::[a-zA-Z]*::[a-zA-Z0-9:.]*::[^:]*::[^:/]*::[0-9]*.*");
850            entryParser.addSITE            ("[0-9]*::[a-zA-Z]*::([a-zA-Z]*)::[a-zA-Z0-9:.]*::[^:]*::[^:/]*::[0-9]*.*");
851            entryParser.addHOST            ("[0-9]*::[a-zA-Z]*::[a-zA-Z]*::([a-zA-Z0-9:.]*)::[^:]*::[^:/]*::[0-9]*.*");
852            entryParser.addPATH            ("[0-9]*::[a-zA-Z]*::[a-zA-Z]*::[a-zA-Z0-9:.]*::([^:]*)::[^:/]*::[0-9]*.*");
853            entryParser.addFILE_NAME       ("[0-9]*::[a-zA-Z]*::[a-zA-Z]*::[a-zA-Z0-9:.]*::[^:]*::([^:/]*)::[0-9]*.*");
854            entryParser.addNUMBER_OF_EVENTS("[0-9]*::[a-zA-Z]*::[a-zA-Z]*::[a-zA-Z0-9:.]*::[^:]*::[^:/]*::([0-9]*).*");
855            
856            Dataset dataset = new Dataset("sched" + jobIDBase);
857            dataset.setEntryParser(entryParser);
858            
859    
860            gov.bnl.star.offline.scheduler.dataset.catalog.STARCatalog catalog = new STARCatalog();        
861            catalog.fillDataset(dataset ,request);
862            EntryCounter.countAndPrint(dataset, false);
863            
864            
865            //Defines objects for preparing the dataset 
866            DropMatchingRegX dropHPSSFiles = new DropMatchingRegX(".*::HPSS::.*::.*::.*::.*::.*");
867            DropMatchingRegX dropLocalRCRSFiles = new DropMatchingRegX("^[0-9]*::[a-zA-Z]*::[a-zA-Z]*::rcrs[a-zA-Z0-9:.]*::[^:]*::[^:/]*::[0-9]*.*");
868            DropDuplicateRegX dropDuplicateLFN = new DropDuplicateRegX("([0-9]*)::[a-zA-Z]*::[a-zA-Z]*::[a-zA-Z0-9:.]*::[^:]*::[^:/]*::[0-9]*.*");
869            //add pref storage here
870            
871            
872            if(preferStorage != null){ //This is of the user asks for a preferStorage
873                if(preferStorage.equals("NFS")) dropDuplicateLFN.addPreferredCopyRegx("A","^[0-9]*::NFS::.*"); //this is not truely needed because NFS is the top of the list already
874                else if((preferStorage.equals("local"))) dropDuplicateLFN.addPreferredCopyRegx("A","^[0-9]*::local::.*");
875                else if((preferStorage.equals("HPSS"))) dropDuplicateLFN.addPreferredCopyRegx("A","^[0-9]*::HPSS::.*");
876            }
877            
878            
879            dropDuplicateLFN.addPreferredCopyRegx("B","^[0-9]*::NFS::.*");
880            dropDuplicateLFN.addPreferredCopyRegx("C","^[0-9]*::local::.*");
881            dropDuplicateLFN.addPreferredCopyRegx("D","^[0-9]*::HPSS::.*");
882            SortByRegX sortByNode = new SortByRegX("[0-9]*::[a-zA-Z]*::[a-zA-Z]*::([a-zA-Z0-9:.]*)::[^:]*::[^:/]*::[0-9]*.*");
883            SplitByRegX groupByNode = new SplitByRegX("[0-9]*::[a-zA-Z]*::[a-zA-Z]*::([a-zA-Z0-9:.]*)::[^:]*::[^:/]*::[0-9]*.*");
884            SplitByMinMaxEntries splitByMinMaxEntries = new SplitByMinMaxEntries(minFiles, maxFiles);
885            
886            
887            //Sorting and splitting for xrootd
888            //SortByRegX xrootdSort = new SortByRegX("[0-9]*::([a-zA-Z]*)::[a-zA-Z]*::[a-zA-Z0-9:.]*::([^:]*)::([^:/]*)::[0-9]*.*", "$1$2$3");
889            //SplitByRegX SplitByStorage = new SplitByRegX("[0-9]*::([a-zA-Z]*)::[a-zA-Z]*::[a-zA-Z0-9:.]*::[^:]*::[^:/]*::[0-9]*.*");
890            
891            //////
892            if("paths".equals(request.getFileListType())){ 
893                
894                        System.out.println("-------Processing recovered dataset for paths-------");
895                                    
896                        System.out.println("Dropping HPSS files");
897                        dropHPSSFiles.modify(dataset, request);      //drop hpss files
898                        EntryCounter.countAndPrint(dataset, false);
899                        
900                        System.out.println("Dropping RCRS node node files");
901                        dropLocalRCRSFiles.modify(dataset, request); //drop files on rcrs nodes
902                        EntryCounter.countAndPrint(dataset, false);
903                        
904                        if(singleCopy){
905                            System.out.println("Dropping files with duplicate LFN");
906                            dropDuplicateLFN.modify(dataset, request);  //drop files so there is only one of each LFN
907                            EntryCounter.countAndPrint(dataset, false);
908                        }
909                        
910                        if(userRequestedSort == null){
911                            System.out.println("Sorting files by node");
912                            sortByNode.modify(dataset, request);        //sort by node
913                            EntryCounter.countAndPrint(dataset, false);
914                        }else{
915                            System.out.println("Sorting files by user requested inputOrder");
916                            userRequestedSort.modify(dataset, request);
917                            EntryCounter.countAndPrint(dataset, false);
918                        }
919                        
920                        
921                        System.out.println("Splitting by node");
922                        groupByNode.modify(dataset, request);       //Grouping by node
923                        EntryCounter.countAndPrint(dataset, false); 
924                        
925            }
926            else if("rootd".equals(request.getFileListType())){
927                
928                        System.out.println("-------Processing recovered dataset for rootd-------");
929                        
930                        System.out.println("Dropping HPSS files");
931                        dropHPSSFiles.modify(dataset, request);     //drop hpss files
932                        EntryCounter.countAndPrint(dataset, false);
933                        
934                        System.out.println("Dropping RCRS node files");
935                        dropLocalRCRSFiles.modify(dataset, request); //drop files on rcrs nodes
936                        EntryCounter.countAndPrint(dataset, false);        
937                        
938                        if(singleCopy){
939                            System.out.println("Dropping files with duplicate LFN");
940                            dropDuplicateLFN.modify(dataset, request);  //drop files so there is only one of each LFN
941                            EntryCounter.countAndPrint(dataset, false);
942                        }
943                        
944                        if(userRequestedSort == null){
945                            System.out.println("Sorting files by node");
946                            sortByNode.modify(dataset, request);        //sort by node / but -not- split by node
947                            EntryCounter.countAndPrint(dataset, false);
948                        }else{
949                            System.out.println("Sorting files by user requested inputOrder");
950                            userRequestedSort.modify(dataset, request);
951                            EntryCounter.countAndPrint(dataset, false);
952                        }
953                        
954                        //throw new RuntimeException("------------halted by code-----------"); 
955                
956            }
957            else if("xrootd".equals(request.getFileListType())){ 
958                
959                        System.out.println("-------Processing recovered dataset for xrootd-------");
960                
961    //                  //ask P.J. what he wants to do here
962    //                  System.out.println("Dropping HPSS files");
963    //                  dropHPSSFiles.modify(dataset, request);     //drop hpss files
964    //                  EntryCounter.countAndPrint(dataset, false);  
965                        
966                        
967                        
968                        if(singleCopy){
969                            System.out.println("Dropping files with duplicate LFN");
970                            dropDuplicateLFN.modify(dataset, request);  //drop files so there is only one of each LFN
971                            EntryCounter.countAndPrint(dataset, false);
972                        }
973                        
974                        if(userRequestedSort != null){
975                            System.out.println("Sorting files by user requested inputOrder");
976                            userRequestedSort.modify(dataset, request);
977                            EntryCounter.countAndPrint(dataset, false);
978                        }
979                        
980                        
981                        
982            }else{
983                        throw new RuntimeException("FileListType is not known"); 
984            }
985     
986            splitByMinMaxEntries.modify(dataset, request);  
987            EntryCounter.countAndPrint(dataset, false);
988            
989            System.out.println("----------------------------------------------------");
990            
991            dataset.deleteBuffer();
992                  
993            
994            
995            System.out.print("validating dataset ..");
996            
997            SubsetValidator validator = new SubsetValidator();
998            //setting up the validator
999            if("paths".equals(request.getFileListType())) validator.setMustHaveHomogeneousHost(true);
1000            else if("rootd".equals(request.getFileListType())) validator.setMustHaveHomogeneousHost(false);
1001            else if("xrootd".equals(request.getFileListType())) validator.setMustHaveHomogeneousHost(false);    
1002            validator.setMustHaveHomogeneousSite(true);
1003            if(request.getResource("FilesPerProcess").getMax() != -1)  validator.setMaxFiles(request.getResource("FilesPerProcess").getMax());
1004            if(request.getResource("FilesPerProcess").getMin() != -1)  validator.setMinFiles(request.getResource("FilesPerProcess").getMin());
1005            
1006            boolean testResult = validator.validate(dataset);
1007            System.out.println((testResult ? "..passed" : "..faild"));
1008            
1009            if(! testResult){
1010                System.out.println("The file splitter has encountered a problem meeting your request.");
1011                System.out.println( validator.getErrors() );
1012                if(validator.isErrorFatal()) throw new RuntimeException("This error is fatal. Exitting....");
1013                else{
1014                    //ask user
1015                    askFeedback(validator);
1016                }
1017            }
1018    
1019            
1020            //if(true) throw new RuntimeException("-------- stopped --------");
1021            
1022            //int indexCount = validator.getIndexCount();
1023            /*
1024            for(int x = 0; x != indexCount - 1; x++ ){
1025                
1026                System.out.println("getting index --->" + x);
1027            
1028                DatasetSubset subset0 = new DatasetSubset(x, dataset);
1029    
1030    
1031                List pfiles = subset0.getPhysicalFileList();
1032                */
1033                /* //This code is used for debugging to print out the file list.
1034                System.out.println(">>>>>>>>>>>>>>>>index<<<<<<<<<<<<<<<<<");
1035                for(int i = 0; i != pfiles.size(); i++){
1036                    PhysicalFile pfile = (PhysicalFile) pfiles.get(i);
1037                    System.out.println(pfile.asURL().toString());
1038                    if (pfile.getAtribute("events") != null) {
1039                        System.out.println(pfile.getAtribute("events"));
1040                    }   
1041                }
1042                System.out.println(">>>>>>>>>>>>>>>>index<<<<<<<<<<<<<<<<<");
1043               
1044            }   
1045             */
1046            //////////////////////////////////////////////////////////////////////////////////////////////////////////
1047                 
1048    /*        
1049            AssignmentStrategy assigner = AssignmentStrategyFactory.createCopySelector();
1050            //TODO Temporary fix: should be refactored
1051            List fileList = new ArrayList();
1052            for (int n = 0; n < request.getInputList().size(); n++) {
1053                if (request.getInputList().get(n) instanceof URL)
1054                    fileList.add(request.getInputList().get(n));
1055            }
1056            FileAssignment assignment = assigner.assignFiles(request, results, queries, fileList);
1057    */        
1058            
1059            
1060          
1061            
1062            
1063            List jobs = createJobs(request, validator.getDatasetIndexList());
1064            assignQueues(request, jobs);
1065            //request.addToReportText(assignment.getReport());
1066            request.setJobs(jobs); //make pointers from the request to the job objects
1067            request.setID(jobIDBase);
1068            return jobs;
1069            
1070        }
1071    
1072    
1073        /** Changes the name of the NFS queue. The NFS queue is used for all
1074         * those jobs that aren't assigned any file on local disk and whose
1075         * length is less than minTimeLimitQueue.
1076         */
1077        public void setNfsQueue(String nfsQueue) {
1078            this.nfsQueue = nfsQueue;
1079        }
1080    
1081        public String getNfsQueue() {
1082            return nfsQueue;
1083        }
1084    
1085        /** Changes the name of the local queue. The local queue is used for all
1086         * those jobs that are assigned at least one file on local disk and whose
1087         * length is less than minTimeLimitQueue.
1088         */
1089        public void setLocalQueue(String localQueue) {
1090            this.localQueue = localQueue;
1091        }
1092    
1093        public String getLocalQueue() {
1094            return localQueue;
1095        }
1096    
1097        /** Changes the name of the long queue. A job is assigned to the long queue
1098         * when the minFilePerProcess and filesPerHour do not allow a job length
1099         * smaller than minTimeLimitQueue.
1100         */
1101        public void setLongQueue(String longQueue) {
1102            this.longQueue = longQueue;
1103        }
1104    
1105        public String getLongQueue() {
1106            return longQueue;
1107        }
1108    
1109        /** Sets the maximum duration of a job that will be sent to the short queue.
1110         * This is also the minimum duration of a job on the long queue. Both
1111         * the localQueue and the nfsQueue are considered short queues.
1112         */
1113        public void setMinTimeLimitQueue(int minTimeLimit) {
1114            this.minTimeLimit = minTimeLimit;
1115            timeLimit = ((double) minTimeLimit) / 60;
1116        }
1117    
1118        public int getMinTimeLimitQueue() {
1119            return minTimeLimit;
1120        }
1121    
1122        /** Tells the policy that rootd is available on the cluster. Having rootd
1123         * available in the cluster means that, if the filelistSyntax of the job
1124         * is rootd, the job will be able to access local disk files from every node.
1125         * This enables the policy to assign to a single job files from different
1126         * local disks, which will make it easier to satisfy request requirement,
1127         * such as minFilesPerProcess.
1128         */
1129       /* public void setRootdAvailable(boolean rootd) {
1130            // FIXME This is a very bad way to enable rootd... don't have much choice now...
1131            CopySelectorFactory.setCrossLocalDiskAccessEnabled(rootd);
1132            this.isRootdAvailable = rootd;
1133        }
1134    
1135        public boolean isRootdAvailable() {
1136            return isRootdAvailable;
1137        } */
1138        public static boolean isRootdAvailable() {
1139            return rootdAvailable;
1140        }
1141    
1142        public static void setRootdAvailable(boolean rootdAvailable) {
1143            PassivePolicy.rootdAvailable = rootdAvailable;
1144        }
1145    
1146        public static boolean isXrootdAvailable() {
1147            return xrootdAvailable;
1148        }
1149    
1150        public static void setXrootdAvailable(boolean xrootdAvailable) {
1151            PassivePolicy.xrootdAvailable = xrootdAvailable;
1152        }   
1153    
1154        public static int getXrootdPort() {
1155            if(xrootdPort==0){
1156               throw new RuntimeException("Please specify xrootd port number in your scheduler configuration file !!!");
1157            }
1158            return xrootdPort;
1159        }
1160    
1161        public static void setXrootdPort(int xrootdPort) {
1162            PassivePolicy.xrootdPort = xrootdPort;
1163        }
1164    
1165        public static String getXrootdRedirectorName() {
1166             if(xrootdRedirectorName==null){
1167                throw new RuntimeException("Please specify hostname of xrootd redirector node in your scheduler configuration file !!!");
1168            }
1169            return xrootdRedirectorName;
1170        }
1171    
1172        public static void setXrootdRedirectorName(String xrootdRedirectorName) {
1173            PassivePolicy.xrootdRedirectorName = xrootdRedirectorName;
1174        }
1175    
1176    
1177        /** Getter for property clusterName.
1178         * @return Value of property clusterName.
1179         *
1180         */
1181        public String getClusterName() {
1182            return this.clusterName;
1183        }
1184    
1185        /** Sets the name of the cluster that will be assigned to all the jobs.
1186         * This parameter allows to create a single CompositeDispatcher with all
1187         * the diffferent clusters. By changing this parameter in the policy, you
1188         * will then control on which cluster jobs will be dispatched, without
1189         * having to reconfigure the dispatchers.
1190         *
1191         */
1192    
1193    
1194        public void setClusterName(String clusterName) {
1195            this.clusterName = clusterName;
1196        }
1197    
1198    /////////////////////////////////////lbh/////////////////////////////////
1199    
1200         public void ClearAllQueues() { //This is called from the config file to fill the PPQueues list
1201            localQueueList.clear();
1202            genericQueueList.clear();
1203        }
1204    
1205    
1206    
1207    
1208        //A small function for working with strings
1209        public String resize(String string, int size){
1210            while(true){
1211            if(string.length() == size) return string;
1212            if(string.length() > size){
1213                string = string.substring(0,string.length() - 4);
1214                string = string.concat("~");
1215            }
1216            if(string.length() < size) string = string.concat(" ");
1217            }
1218        }
1219    
1220        //returns a table in string fromat on queues
1221        public String QueueInfo(List localQueueList, List genericQueueList){
1222    
1223            //Put all the queue objects together in on big list.
1224            List QueueList = new ArrayList();
1225            QueueList.addAll(localQueueList);
1226            QueueList.addAll(genericQueueList);
1227    
1228            //prit the qeues out by order of SearchOrderPriority
1229            gov.bnl.star.offline.scheduler.Queues queues = new Queues();
1230            QueueList = queues.OrderQueuesBySearchOrderPriority(QueueList);
1231    
1232            //get the lenght of the header of all the rows
1233            int IDLength = "ID".length();
1234            int nameLength = "Name".length();
1235            int typeLength = "Type".length();
1236            int timeLimitLength = "TimeLimit".length();
1237            int maxMenLength = "MaxMem".length();
1238            int localLength = "Local".length();
1239            int SOPLength = "S.O.P.".length();
1240            int clusterLength = "Cluster".length();
1241    
1242            //get the max length of all the rows
1243            for(int i = 0; i < QueueList.size(); i++){
1244                gov.bnl.star.offline.scheduler.Queue queue = (gov.bnl.star.offline.scheduler.Queue) QueueList.get(i);
1245                if(queue.getID() != null) IDLength = Math.max(IDLength, queue.getID().length());
1246                else IDLength = Math.max(IDLength, "N/A".length());
1247                nameLength = Math.max(nameLength, queue.getName().length());
1248                if(queue.getType() == null) typeLength = Math.max(typeLength, queue.getType().length());
1249                else typeLength = Math.max(typeLength, "N/A".length());
1250                if(queue.getTimeLimit() != -1) timeLimitLength = Math.max(timeLimitLength, String.valueOf(queue.getTimeLimit()).length() + "min".length());
1251                else timeLimitLength = Math.max(timeLimitLength, "none".length());
1252                if(queue.getMaxMemory() != -1) maxMenLength = Math.max(maxMenLength, String.valueOf(queue.getMaxMemory()).length() + "MB".length());
1253                else maxMenLength = Math.max(maxMenLength, "N/A".length());
1254                //No test for the test of local is need because we know local is always one
1255                SOPLength = Math.max(SOPLength, String.valueOf(queue.getSearchOrderPriority()).length());
1256    
1257                if(queue.getCluster() != null) clusterLength = Math.max(clusterLength, queue.getCluster().length());
1258                else clusterLength = Math.max(clusterLength, "none".length());
1259            }
1260    
1261            String Info ="";
1262            String table ="";
1263    
1264            int tableWide = 1 + IDLength + 2 + nameLength + 2  + typeLength + 2  + timeLimitLength + 2  + maxMenLength + 2  + localLength + 2  + SOPLength + 2  + clusterLength;
1265    
1266            String TableStart = "\n\n"; //draw the header
1267            for(int i = 0; i < (((tableWide) / 2) - (" Queues ".length() / 2)) ; i++) TableStart = TableStart.concat("-");
1268            TableStart = TableStart.concat(" Queues ");
1269            for(int i = 0; i < (tableWide) ; i++) TableStart = TableStart.concat("-");
1270            while((tableWide + 2) !=  TableStart.length()) TableStart = TableStart.substring(0,TableStart.length() - 1); //trim until it's the throws right size
1271            table = table.concat(TableStart);
1272    
1273            //draw col names
1274            table = table.concat("\n " + resize("ID", IDLength + 2));
1275            table = table.concat(resize("Name", nameLength + 2));
1276            table = table.concat(resize("Type", typeLength + 2));
1277            table = table.concat(resize( "TimeLimit" , timeLimitLength + 2));
1278            table = table.concat(resize("MaxMem", maxMenLength + 2));
1279            table = table.concat(resize("Local", localLength + 2));
1280            table = table.concat(resize("S.O.P.", SOPLength + 2));
1281            table = table.concat(resize("Cluster", clusterLength + 2) + "\n");
1282    
1283            //draw ----- line
1284            for(int i = 0; i < (tableWide) ; i++) table = table.concat("-");
1285    
1286            table = table.concat("\n");
1287    
1288            for(int i = 0; i < QueueList.size(); i++){ //print out all the local queues in the report file
1289                gov.bnl.star.offline.scheduler.Queue queue = (gov.bnl.star.offline.scheduler.Queue) QueueList.get(i);
1290                Info = " " + resize(queue.getID(), IDLength + 2); //15
1291                Info = Info.concat(resize(queue.getName(), nameLength + 2)); //18
1292                if(queue.getType() == null)
1293                    Info = Info.concat(resize("N/A", typeLength + 2)); //10
1294                else
1295                    Info = Info.concat(resize(queue.getType(), typeLength + 2)); //10
1296                if(queue.getTimeLimit() != -1)
1297                    Info = Info.concat(resize( String.valueOf(queue.getTimeLimit()) + "min" , timeLimitLength + 2)); //10
1298                else
1299                    Info = Info.concat(resize("N/A", timeLimitLength + 2)); //10
1300                if(queue.getMaxMemory() != -1)
1301                    Info = Info.concat(resize( String.valueOf(queue.getMaxMemory()) + "MB" , maxMenLength + 2)); //7
1302                else
1303                    Info = Info.concat(resize("none", maxMenLength + 2));  //7
1304                if(queue.IsOptimizedAsLocalQueue())
1305                    Info = Info.concat(resize("Y", localLength + 2));
1306                else
1307                    Info = Info.concat(resize("N", localLength + 2));
1308                Info = Info.concat(resize( String.valueOf(queue.getSearchOrderPriority()), SOPLength + 2)); //7
1309                if(queue.getCluster() == null)
1310                    Info = Info.concat("N/A");
1311                else
1312                    Info = Info.concat(queue.getCluster());
1313                table = table.concat(Info + "\n");
1314           }
1315    
1316            //draw -----
1317            for(int i = 0; i < (tableWide) ; i++) table = table.concat("-");
1318            return table;
1319       }
1320    
1321    
1322    
1323        public void addQueue(gov.bnl.star.offline.scheduler.Queue queue){
1324            genericQueueList.add(queue);
1325        }
1326        
1327        private boolean bypassQueueTests = false;
1328        public void setBypassQueueTests(boolean bypassQueueTests){this.bypassQueueTests = bypassQueueTests;}
1329        public boolean getBypassQueueTests(){return bypassQueueTests;}
1330        
1331        
1332        private void AdjustIOFileNames(Request request){
1333           request.getTask().setStdErr(AdjustIOFileName(request.getTask().getStdErr(), "err"));
1334           request.getTask().setStdOut(AdjustIOFileName(request.getTask().getStdOut(), "out"));
1335        }
1336        
1337        private URL AdjustIOFileName(URL url, String ext){
1338           if(url == null) return url;
1339           //ThreadSafeFilesystemToolkit threadSafeFilesystemToolkit = new ThreadSafeFilesystemToolkit();
1340           File file = new File(url.getPath());
1341           if( file.isDirectory() ){
1342               String path = url.getPath();
1343               if(! (path.endsWith(System.getProperty("file.separator")))){
1344                   path = path + System.getProperty("file.separator");
1345               }
1346                try {
1347                    return new URL("file:" + url.getPath() + "$JOBID." + ext);
1348                } catch (MalformedURLException ex) {
1349                    ex.printStackTrace();
1350                    throw new RuntimeException("MalformedURLException in passive police");
1351                }
1352    
1353           } else{
1354               return url;
1355           } 
1356        }
1357       
1358        
1359        
1360    
1361    }