001    /*
002     * $RCSfile: AssignmentByQueueMonitorPolicy.java,v $
003     *
004     * Created on July 22, 2006, 2:40 pm
005     *
006     * This file is part of the STAR Scheduler.
007     * Copyright (c) 2002-2003 STAR Collaboration - Brookhaven National Laboratory
008     *
009     * STAR Scheduler is free software; you can redistribute it and/or modify
010     * it under the terms of the GNU General Public License as published by
011     * the Free Software Foundation; either version 2 of the License, or
012     * (at your option) any later version.
013     *
014     * STAR Scheduler is distributed in the hope that it will be useful,
015     * but WITHOUT ANY WARRANTY; without even the implied warranty of
016     * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
017     * GNU General Public License for more details.
018     *
019     * You should have received a copy of the GNU General Public License
020     * along with STAR Scheduler; if not, write to the Free Software
021     * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
022     */
023    package gov.bnl.star.offline.scheduler.policy;
024    
025    import gov.bnl.star.offline.scheduler.monitor.QueueInfo;
026    import gov.bnl.star.offline.scheduler.monitor.MonaLisaQueueInfoFinder;
027    import gov.bnl.star.offline.scheduler.monitor.MonaLisaPCQueueInfoFinder;
028    
029    import gov.bnl.star.offline.scheduler.*;
030    import gov.bnl.star.offline.scheduler.Queue;
031    import gov.bnl.star.offline.scheduler.Queues;
032    import gov.bnl.star.offline.scheduler.Policy;
033    import gov.bnl.star.offline.scheduler.catalog.*;
034    import gov.bnl.star.offline.scheduler.catalog.CatalogTask;
035    import gov.bnl.star.offline.scheduler.catalog.QueryResult;
036    import gov.bnl.star.offline.scheduler.catalog.StarCatalog;
037    import gov.bnl.star.offline.scheduler.monitor.ClusterInfo;
038    import gov.bnl.star.offline.scheduler.monitor.ClusterInfoFinder;
039    import gov.bnl.star.offline.scheduler.monitor.MonaLisaHostInfo;
040    import gov.bnl.star.offline.scheduler.monitor.MonaLisaPCHostInfoFinder;
041    import gov.bnl.star.offline.scheduler.monitor.MonaLisaQueueInfo;
042    import gov.bnl.star.offline.scheduler.monitor.MonaLisaPCQueueInfoFinder;
043    import gov.bnl.star.offline.scheduler.policy.copyselector.*;
044    import gov.bnl.star.offline.scheduler.request.Request;
045    
046    import java.io.File;
047    import java.io.FileOutputStream;
048    import java.io.InputStreamReader;
049    import java.io.BufferedReader;
050    import java.io.PrintStream;
051    import java.net.*;
052    import java.util.*;
053    
054    import java.util.Iterator;
055    import java.util.List;
056    import java.util.Set;
057    import java.util.logging.Level;
058    import org.apache.log4j.Logger;
059    
060    import java.text.BreakIterator;
061    
062    
063    /** This policy takes the assignment from another policy, and assigns a queue
064     * depending on the result of the monitoring information.
065     *
066     * @author Levente Hajdu
067     */
068    public class AssignmentByQueueMonitorPolicy extends PassivePolicy{ // implements Policy {
069        
070        
071        public void AssignmentByQueueMonitorPolicy(){}
072        
073        //private ClusterInfoFinder clusterMonitoring;
074        int ncpu = 0;
075        double loadSum = 0;
076        public long msTimeRefresh =0;
077        private boolean usingLoad = true;
078        boolean hasfailed = false;
079        
080        public void assignQueues(Request request, List jobs) {
081            Queues queues = new Queues();
082            
083            //super.
084            
085            //Queues queses = new Queues(); // Load all queues from the Queue object in the config file
086            
087            if(genericQueueList.size() == 0){ //Test if queues where loaded. This also works as a hack for junit
088                System.out.println("Could not find any queues.");
089                log.warn("Could not find any queues.");
090                request.addToReportText("Could not find any queues.\n\n");
091                return;
092            }
093            
094            request.addToReportText("\n*************************************************************************\n");
095            request.addToReportText("*   This is a history of Job assignments.                               *\n");
096            request.addToReportText("*   If your Jobs are going to the wrong queue, this may tell you why.   *\n");
097            request.addToReportText("*************************************************************************\n");
098            
099             long startTime = System.currentTimeMillis(); // time used to test if more data should be pulled from Ml
100             Date date = new Date();
101             request.addToReportText("The date and time is now : " + date.toString() + "\n"); //Print out the time to the report file
102             orderQueues();
103            
104            //assignQueues to jobs
105            for (int nJob = 0; nJob < jobs.size(); nJob++) { //loop over all jobs
106    
107                    long elapsedTime = System.currentTimeMillis() - startTime;
108    
109                    if ((msTimeRefresh > 0) && (elapsedTime > msTimeRefresh)) {
110                        orderQueues();
111                        startTime = System.currentTimeMillis();
112                    }
113                
114                Job job = (Job) jobs.get(nJob);
115                boolean jobHasNoQueue = true;
116                
117                List queuesForJobN = new ArrayList(); //list of all queues the job fits in to               //   = queses.OrderQueuesBySearchOrderPriority(genericQueueList);
118                queuesForJobN.clear();
119                queuesForJobN = FilterNonViableQueues(job, request);
120                
121                if(queuesForJobN.size() == 0){ //Test if queues where loaded. This also works as a hack for junit
122                    System.out.println("Could not find any queues for job : " + job.getJobID());
123                    log.warn("Could not find any queues for job : " + job.getJobID());
124                    request.addToReportText("Could not find any queues for job : " + job.getJobID() + "\n\n");
125                }
126                
127                queuesForJobN = queues.OrderQueuesBySearchOrderPriority(queuesForJobN);
128                
129                
130                //assign the jobs to the queue
131                if(jobHasNoQueue){
132                    for(int i = 0; (i < queuesForJobN.size()) && jobHasNoQueue; i++){ //loop over the list as until you get to the end or the job finds a q
133                        Queue queue = (Queue) queuesForJobN.get(i);
134                        request.addToReportText(job.getJobID() + " trying " + queue.getName()  + "(" + queue.getID() + ")");
135                        if(queue.doesJobFit(job, request)){ //these is where the queue a assinged
136                            //job.setQueue(queue);
137                            assignQueue(queue, job);
138                            
139                            request.addToReportText(job.getJobID() + " assigned to " + queue.getName()  + "(" + queue.getID() + ")");
140                            jobHasNoQueue = false;
141                            
142                            if(! usingLoad){ //only do this if the queue TRi is being used and not the queue load
143                                
144    
145                                 double jobTimeLimit = (((double) job.getInput().size()) / request.getFilesPerHour()) * 60; //how long a job will take one job to run in min
146                                 if((job.getInput().size() == 0)&&(request.getFilesPerHour() != Double.POSITIVE_INFINITY)){
147                                        jobTimeLimit = 60 / request.getFilesPerHour(); //If the job has a FilesPerHour but no file, then the FilesPerHour is the time the job takes
148                                 }
149    
150                                 MonaLisaPCQueueInfoFinder qFinder = new MonaLisaPCQueueInfoFinder();
151                                 QueueInfo qInfo = new MonaLisaQueueInfo();
152                                 qInfo = qFinder.getQueueInfo(getMLQueueName(queue));
153                                 
154                                 if((((int)qInfo.getEstimatedResponseTime()) == -1)||(((int)qInfo.getAverageRunTime()) == -1)){ //If ML returns no data go back to PP
155                                     System.out.println("Can't get data, swiching back to PP\nRe-Setting SOP of all queues to 1 \n");
156                                     request.addToReportText("Can't get data, swiching back to PP\nRe-Setting SOP of all queues to 1 \n");
157                                     for(int j = 0; (j < genericQueueList.size()); j++){ //reset SOP of all queues to 1
158                                            ((Queue) genericQueueList.get(j)).setSearchOrderPriority(1);
159                                     }
160                                     
161                                     super.assignQueues(request, jobs);
162                                     return;
163                                 }
164    
165                                if(jobTimeLimit != 0) //if we know how log a job is going to take using FilesPerHour use it to get the accumulatedResponseTime else use the AverageRunTime
166                                    queue.accumulatedResponseTime += ((jobTimeLimit * 60) / queue.Ncpus);
167                                else
168                                    queue.accumulatedResponseTime += (qInfo.getAverageRunTime()/ queue.Ncpus);
169    
170                                System.out.println("getAverageRunTime = " + qInfo.getAverageRunTime());
171                                queue.setSearchOrderPriority((int)(queue.baseResponseTime + queue.accumulatedResponseTime));
172                                System.out.println("Razing queue \"" + queue.getName() + "\" order priority to: " + queue.getSearchOrderPriority() + " = " + queue.baseResponseTime + "(baseResponseTime) + " + queue.accumulatedResponseTime + "(accumulatedResponseTime)");
173                            }
174                            
175                            queuesForJobN = queues.OrderQueuesBySearchOrderPriority(queuesForJobN);
176                            queuesForJobN = queues.rotateSameLevelSearchOrderPriority(queuesForJobN);
177                        }
178                        else{
179                            request.addToReportText(queue.getMessages());
180                        }
181                    }
182                }
183                
184                if(jobHasNoQueue){
185                    //error code, to handel what happin if a job can not find a q
186                    System.out.println("Job could not find a queue for " + job.getJobID() + " !!!!!!!!!!!!!\n The job will be removed.");
187                    log.warn("Job could not find a queue for " + job.getJobID() + " !!!!!!!!!!!!!");
188                    request.addToReportText("\n\nJob could not find a queue for " + job.getJobID() + " !!!!!!!!!!!!!\n The job will be removed.");
189                    jobs.remove(nJob); //This will remove the job
190                    nJob--; //because one job was removed,nJobs has to be set back by one job
191                }
192                
193                request.addToReportText(" "); //between every job put a blank line in the report
194            }
195            
196    //        try{ //If the job table can not be printed let the user know in the log file
197    //            request.addToReportText(JobInfo(jobs, request));
198    //        }catch (Exception e){ request.addToReportText("\n\nCould not print out \"Jobs\" Talbe\n"); }
199            
200            try{ //If the queue table can not be printed let the user know in the log file
201                request.addToReportText(QueueInfo(localQueueList, genericQueueList));
202            }catch (Exception e){ request.addToReportText("\n\nCould not print out Queue Talbe\n"); }
203              
204        } 
205        
206       /*
207         public void setClusterMonitoring(ClusterInfoFinder clusterMonitoring) {
208            this.clusterMonitoring = clusterMonitoring;
209        }
210        */
211        String getMLQueueName(Queue queue){
212            
213            return queue.getType() + "_" + queue.getName();
214        }
215        
216      
217        public List FilterNonViableQueues(Job job,Request request){ //list of all queues the job fits in to
218            
219            List queuesForJobN = new ArrayList();
220            for(int i = 0; (i < genericQueueList.size()); i++){ //make a list of only queues that fit the job
221                Queue queue = (Queue) genericQueueList.get(i);
222                if(queue.doesJobFit(job, request)){
223                    queuesForJobN.add(queue);
224                }
225                else{
226                    request.addToReportText(queue.getName() + " was droped because it will not fit job : " + job.getJobID());
227                }
228            }
229            return queuesForJobN;
230        }
231       
232        private static BufferedReader processCMD(String cmd) {
233            try{
234                Process p = Runtime.getRuntime().exec(cmd);
235                BufferedReader stdInput = new BufferedReader(new InputStreamReader(p.getInputStream()));
236                return stdInput;
237            } catch (Exception e) {
238                System.out.println("exception happened - here's what I know: ");
239                e.printStackTrace();
240                System.exit(-1);
241                return null;
242            }
243        } 
244       
245        public int QueueLoad(Queue queue){ //returns the load of a queue object * 100
246            
247            //display something to the user so they know it's not chrashed
248            List display = new ArrayList();
249            display.add("\b\b\b\b\b\b\b\b \\      ");
250            display.add("\b\b\b\b\b\b\b\b |      ");
251            display.add("\b\b\b\b\b\b\b\b /      ");
252            display.add("\b\b\b\b\b\b\b\b -      ");
253            
254            
255            //List hosts = new ArrayList();
256            loadSum = 0;
257            ncpu = 0;
258            MonaLisaHostInfo hostInfo = new MonaLisaHostInfo();
259            MonaLisaPCHostInfoFinder finder = new MonaLisaPCHostInfoFinder();
260            
261            
262            try {
263                BufferedReader br = processCMD("bqueues -l " + queue.getName());
264                String line=null;
265                while ((line = br.readLine()) != null){
266                    if(line.startsWith("HOSTS:")) break;
267                }
268                line = line.replaceFirst("HOSTS:", "").trim();
269                String lines=null;
270                if(line.endsWith("/")){
271                    br = processCMD("bmgroup " + line.replace('/', ' ').trim());
272    
273                    while ((lines = br.readLine()) != null){
274                        if(lines.startsWith(line.replace('/', ' ').trim())) break;
275                       /* else{
276                            log.warning("error finding the host of : " + queue.getName());
277                            System.out.println("error finding the host of : " + queue.getName());
278                            return -1;
279                        }*/
280                    }
281                }   
282                
283                else lines = line;
284                
285                //pars each host out
286                BreakIterator wordIterator = BreakIterator.getWordInstance(new Locale("en","US"));
287                lines = lines.replaceFirst(line.replace('/', ' ').trim(), " ");
288                wordIterator.setText(lines);
289                int start = wordIterator.first();
290                int end = wordIterator.next();
291                System.out.print("Getting info ->        ");
292                while (end != BreakIterator.DONE) {
293                    String host = lines.substring(start,end);
294    
295                    if (Character.isLetterOrDigit(host.charAt(0))) {
296                        hostInfo = finder.getHostInfo(host+".rcf.bnl.gov");
297                        loadSum += hostInfo.getLoad5m();
298                        ncpu += hostInfo.getNoCPUs();
299                        
300                        // System.out.println(hostInfo.getLoad5m() + " = load cps = " + hostInfo.getNoCPUs());
301    
302                        // Display something to keep the user occupied 
303                        System.out.print(display.get(0));
304                        display.add(display.get(0));
305                        display.remove(0);
306    
307                        //System.out.println("Load5m for host "+host+" : "+hostInfo.getLoad5m() ); //used for debuging
308                        //System.out.println("noCPUs for host "+host+" : "+hostInfo.getNoCPUs() ); //used for debuging
309                    }
310                    start = end;
311                    end = wordIterator.next();
312                }
313                System.out.print("\b\b\b\b\b\b\b");
314                
315            } catch (Throwable t) {
316                System.out.println("error getting queueLoad !!\n" + t.toString());
317                log.warn("error getting queueLoad !!\n" + t.toString());
318                hasfailed = true;
319                return -1;
320            }
321            
322            
323            queue.Ncpus = ncpu;
324            return (int) ((loadSum / (double)ncpu) * 100);
325        }
326        
327        public void orderQueues(){
328            
329            setSOP();
330            try{ //If the queue table can not be printed let the user know in the log file
331                //addToReportText("The queues have been re-ordered :\n");
332                //addToReportText(QueueInfo(localQueueList, genericQueueList));
333            }catch (Exception e){ /*addToReportText("\n\nCould not print out Queue Talbe\n"); */}
334             
335        }
336    
337      
338        public void setSOP(){ //gets all the data and sorts the queues
339            Queues queues = new Queues();
340            System.out.println("Collecting fresh queue data");
341            Queue queue;
342            /* Loking at the load is just not work !!!!!!!!!
343            //Set to SOP of every queue to the value of  (queueload * 100)
344            for(int i = 0; (i < genericQueueList.size()); i++){ 
345                queue = (Queue) genericQueueList.get(i);
346                
347                int qload = QueueLoad(queue);
348                if(qload == -1) //if it fails try on more time
349                    qload = QueueLoad(queue);
350                
351                queue.setSearchOrderPriority(qload);
352                
353                addToReportText(queue.getName() + " SOP changed to -> " + queue.getSearchOrderPriority());  
354                if(queue.getSearchOrderPriority() == -1) hasfailed = true; // make note that getting this value failed
355                System.out.println(queue.getName() + " SOP changed to -> " + queue.getSearchOrderPriority());
356            }
357            
358            //test i load is too high, if so move over to <ResponseTime>
359            genericQueueList = queues.OrderQueuesBySearchOrderPriority(genericQueueList); //order the queues by SOP
360            queue = (Queue) genericQueueList.get(0);
361            if(queue.getSearchOrderPriority() <  (maxLoadBeforeSwichOverToRunTime * 100) ) return; //if the queue load is smaller then the hold over just return;
362            */
363            usingLoad = false;
364            //request.addToReportText("Queue loads are too high, swiching to <ResponseTime>\n\n");  
365            System.out.println("Using <ResponseTime>");
366            
367            //Set the SOP of every queue to the <ResponseTime>
368            MonaLisaPCQueueInfoFinder qFinder = new MonaLisaPCQueueInfoFinder();
369            QueueInfo qInfo = new MonaLisaQueueInfo();
370            
371            for(int i = 0; (i < genericQueueList.size()); i++){ 
372                queue = (Queue) genericQueueList.get(i);
373                //qInfo = qFinder.getQueueInfo("LSF_star_cas_dd");
374                qInfo = qFinder.getQueueInfo(getMLQueueName(queue));
375                
376                int qload = QueueLoad(queue);
377                if(qload == -1) //if it fails try on more time
378                    qload = QueueLoad(queue);
379                
380                //note: getAverageRunTime() is getting the <running time>
381    
382                
383                double estimatedResponseTime = qInfo.getEstimatedResponseTime();
384                double averageRunTime = qInfo.getAverageRunTime();
385                if(((int) estimatedResponseTime == -1)||((int) averageRunTime == -1))hasfailed = true;
386                
387                int RTi =(int)(qInfo.getEstimatedResponseTime() + (((double)qInfo.getWaitingJobs() * qInfo.getAverageRunTime()) / (double)queue.Ncpus));
388                queue.baseResponseTime = RTi;
389                queue.setSearchOrderPriority(RTi + (int) queue.accumulatedResponseTime);
390        
391                //request.addToReportText(queue.getName() + " SOP changed to -> " + queue.getSearchOrderPriority() + "\n");  
392                System.out.println(queue.getName() + " SOP changed to -> " + queue.getSearchOrderPriority());
393            }
394            
395            return;
396        }
397      
398        
399        double maxLoadBeforeSwichOverToRunTime = 1.0;
400       
401        public void setMaxLoadBeforeSwichOverToRunTime(double maxLoadBeforeSwichOverToRunTime){
402            this.maxLoadBeforeSwichOverToRunTime = maxLoadBeforeSwichOverToRunTime;
403        }
404        
405        public double getMaxLoadBeforeSwichOverToRunTime(){
406            return maxLoadBeforeSwichOverToRunTime;
407        }
408        
409        
410        public void setMsTimeRefresh(long msTimeRefresh){
411            this.msTimeRefresh = msTimeRefresh;
412        }
413        
414        public long getMsTimeRefresh(){
415            return msTimeRefresh;
416        }
417         
418    }