001 /* 002 * $RCSfile: AssignmentByQueueMonitorPolicy.java,v $ 003 * 004 * Created on July 22, 2006, 2:40 pm 005 * 006 * This file is part of the STAR Scheduler. 007 * Copyright (c) 2002-2003 STAR Collaboration - Brookhaven National Laboratory 008 * 009 * STAR Scheduler is free software; you can redistribute it and/or modify 010 * it under the terms of the GNU General Public License as published by 011 * the Free Software Foundation; either version 2 of the License, or 012 * (at your option) any later version. 013 * 014 * STAR Scheduler is distributed in the hope that it will be useful, 015 * but WITHOUT ANY WARRANTY; without even the implied warranty of 016 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 017 * GNU General Public License for more details. 018 * 019 * You should have received a copy of the GNU General Public License 020 * along with STAR Scheduler; if not, write to the Free Software 021 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA 022 */ 023 package gov.bnl.star.offline.scheduler.policy; 024 025 import gov.bnl.star.offline.scheduler.monitor.QueueInfo; 026 import gov.bnl.star.offline.scheduler.monitor.MonaLisaQueueInfoFinder; 027 import gov.bnl.star.offline.scheduler.monitor.MonaLisaPCQueueInfoFinder; 028 029 import gov.bnl.star.offline.scheduler.*; 030 import gov.bnl.star.offline.scheduler.Queue; 031 import gov.bnl.star.offline.scheduler.Queues; 032 import gov.bnl.star.offline.scheduler.Policy; 033 import gov.bnl.star.offline.scheduler.catalog.*; 034 import gov.bnl.star.offline.scheduler.catalog.CatalogTask; 035 import gov.bnl.star.offline.scheduler.catalog.QueryResult; 036 import gov.bnl.star.offline.scheduler.catalog.StarCatalog; 037 import gov.bnl.star.offline.scheduler.monitor.ClusterInfo; 038 import gov.bnl.star.offline.scheduler.monitor.ClusterInfoFinder; 039 import gov.bnl.star.offline.scheduler.monitor.MonaLisaHostInfo; 040 import gov.bnl.star.offline.scheduler.monitor.MonaLisaPCHostInfoFinder; 041 import gov.bnl.star.offline.scheduler.monitor.MonaLisaQueueInfo; 042 import gov.bnl.star.offline.scheduler.monitor.MonaLisaPCQueueInfoFinder; 043 import gov.bnl.star.offline.scheduler.policy.copyselector.*; 044 import gov.bnl.star.offline.scheduler.request.Request; 045 046 import java.io.File; 047 import java.io.FileOutputStream; 048 import java.io.InputStreamReader; 049 import java.io.BufferedReader; 050 import java.io.PrintStream; 051 import java.net.*; 052 import java.util.*; 053 054 import java.util.Iterator; 055 import java.util.List; 056 import java.util.Set; 057 import java.util.logging.Level; 058 import org.apache.log4j.Logger; 059 060 import java.text.BreakIterator; 061 062 063 /** This policy takes the assignment from another policy, and assigns a queue 064 * depending on the result of the monitoring information. 065 * 066 * @author Levente Hajdu 067 */ 068 public class AssignmentByQueueMonitorPolicy extends PassivePolicy{ // implements Policy { 069 070 071 public void AssignmentByQueueMonitorPolicy(){} 072 073 //private ClusterInfoFinder clusterMonitoring; 074 int ncpu = 0; 075 double loadSum = 0; 076 public long msTimeRefresh =0; 077 private boolean usingLoad = true; 078 boolean hasfailed = false; 079 080 public void assignQueues(Request request, List jobs) { 081 Queues queues = new Queues(); 082 083 //super. 084 085 //Queues queses = new Queues(); // Load all queues from the Queue object in the config file 086 087 if(genericQueueList.size() == 0){ //Test if queues where loaded. This also works as a hack for junit 088 System.out.println("Could not find any queues."); 089 log.warn("Could not find any queues."); 090 request.addToReportText("Could not find any queues.\n\n"); 091 return; 092 } 093 094 request.addToReportText("\n*************************************************************************\n"); 095 request.addToReportText("* This is a history of Job assignments. *\n"); 096 request.addToReportText("* If your Jobs are going to the wrong queue, this may tell you why. *\n"); 097 request.addToReportText("*************************************************************************\n"); 098 099 long startTime = System.currentTimeMillis(); // time used to test if more data should be pulled from Ml 100 Date date = new Date(); 101 request.addToReportText("The date and time is now : " + date.toString() + "\n"); //Print out the time to the report file 102 orderQueues(); 103 104 //assignQueues to jobs 105 for (int nJob = 0; nJob < jobs.size(); nJob++) { //loop over all jobs 106 107 long elapsedTime = System.currentTimeMillis() - startTime; 108 109 if ((msTimeRefresh > 0) && (elapsedTime > msTimeRefresh)) { 110 orderQueues(); 111 startTime = System.currentTimeMillis(); 112 } 113 114 Job job = (Job) jobs.get(nJob); 115 boolean jobHasNoQueue = true; 116 117 List queuesForJobN = new ArrayList(); //list of all queues the job fits in to // = queses.OrderQueuesBySearchOrderPriority(genericQueueList); 118 queuesForJobN.clear(); 119 queuesForJobN = FilterNonViableQueues(job, request); 120 121 if(queuesForJobN.size() == 0){ //Test if queues where loaded. This also works as a hack for junit 122 System.out.println("Could not find any queues for job : " + job.getJobID()); 123 log.warn("Could not find any queues for job : " + job.getJobID()); 124 request.addToReportText("Could not find any queues for job : " + job.getJobID() + "\n\n"); 125 } 126 127 queuesForJobN = queues.OrderQueuesBySearchOrderPriority(queuesForJobN); 128 129 130 //assign the jobs to the queue 131 if(jobHasNoQueue){ 132 for(int i = 0; (i < queuesForJobN.size()) && jobHasNoQueue; i++){ //loop over the list as until you get to the end or the job finds a q 133 Queue queue = (Queue) queuesForJobN.get(i); 134 request.addToReportText(job.getJobID() + " trying " + queue.getName() + "(" + queue.getID() + ")"); 135 if(queue.doesJobFit(job, request)){ //these is where the queue a assinged 136 //job.setQueue(queue); 137 assignQueue(queue, job); 138 139 request.addToReportText(job.getJobID() + " assigned to " + queue.getName() + "(" + queue.getID() + ")"); 140 jobHasNoQueue = false; 141 142 if(! usingLoad){ //only do this if the queue TRi is being used and not the queue load 143 144 145 double jobTimeLimit = (((double) job.getInput().size()) / request.getFilesPerHour()) * 60; //how long a job will take one job to run in min 146 if((job.getInput().size() == 0)&&(request.getFilesPerHour() != Double.POSITIVE_INFINITY)){ 147 jobTimeLimit = 60 / request.getFilesPerHour(); //If the job has a FilesPerHour but no file, then the FilesPerHour is the time the job takes 148 } 149 150 MonaLisaPCQueueInfoFinder qFinder = new MonaLisaPCQueueInfoFinder(); 151 QueueInfo qInfo = new MonaLisaQueueInfo(); 152 qInfo = qFinder.getQueueInfo(getMLQueueName(queue)); 153 154 if((((int)qInfo.getEstimatedResponseTime()) == -1)||(((int)qInfo.getAverageRunTime()) == -1)){ //If ML returns no data go back to PP 155 System.out.println("Can't get data, swiching back to PP\nRe-Setting SOP of all queues to 1 \n"); 156 request.addToReportText("Can't get data, swiching back to PP\nRe-Setting SOP of all queues to 1 \n"); 157 for(int j = 0; (j < genericQueueList.size()); j++){ //reset SOP of all queues to 1 158 ((Queue) genericQueueList.get(j)).setSearchOrderPriority(1); 159 } 160 161 super.assignQueues(request, jobs); 162 return; 163 } 164 165 if(jobTimeLimit != 0) //if we know how log a job is going to take using FilesPerHour use it to get the accumulatedResponseTime else use the AverageRunTime 166 queue.accumulatedResponseTime += ((jobTimeLimit * 60) / queue.Ncpus); 167 else 168 queue.accumulatedResponseTime += (qInfo.getAverageRunTime()/ queue.Ncpus); 169 170 System.out.println("getAverageRunTime = " + qInfo.getAverageRunTime()); 171 queue.setSearchOrderPriority((int)(queue.baseResponseTime + queue.accumulatedResponseTime)); 172 System.out.println("Razing queue \"" + queue.getName() + "\" order priority to: " + queue.getSearchOrderPriority() + " = " + queue.baseResponseTime + "(baseResponseTime) + " + queue.accumulatedResponseTime + "(accumulatedResponseTime)"); 173 } 174 175 queuesForJobN = queues.OrderQueuesBySearchOrderPriority(queuesForJobN); 176 queuesForJobN = queues.rotateSameLevelSearchOrderPriority(queuesForJobN); 177 } 178 else{ 179 request.addToReportText(queue.getMessages()); 180 } 181 } 182 } 183 184 if(jobHasNoQueue){ 185 //error code, to handel what happin if a job can not find a q 186 System.out.println("Job could not find a queue for " + job.getJobID() + " !!!!!!!!!!!!!\n The job will be removed."); 187 log.warn("Job could not find a queue for " + job.getJobID() + " !!!!!!!!!!!!!"); 188 request.addToReportText("\n\nJob could not find a queue for " + job.getJobID() + " !!!!!!!!!!!!!\n The job will be removed."); 189 jobs.remove(nJob); //This will remove the job 190 nJob--; //because one job was removed,nJobs has to be set back by one job 191 } 192 193 request.addToReportText(" "); //between every job put a blank line in the report 194 } 195 196 // try{ //If the job table can not be printed let the user know in the log file 197 // request.addToReportText(JobInfo(jobs, request)); 198 // }catch (Exception e){ request.addToReportText("\n\nCould not print out \"Jobs\" Talbe\n"); } 199 200 try{ //If the queue table can not be printed let the user know in the log file 201 request.addToReportText(QueueInfo(localQueueList, genericQueueList)); 202 }catch (Exception e){ request.addToReportText("\n\nCould not print out Queue Talbe\n"); } 203 204 } 205 206 /* 207 public void setClusterMonitoring(ClusterInfoFinder clusterMonitoring) { 208 this.clusterMonitoring = clusterMonitoring; 209 } 210 */ 211 String getMLQueueName(Queue queue){ 212 213 return queue.getType() + "_" + queue.getName(); 214 } 215 216 217 public List FilterNonViableQueues(Job job,Request request){ //list of all queues the job fits in to 218 219 List queuesForJobN = new ArrayList(); 220 for(int i = 0; (i < genericQueueList.size()); i++){ //make a list of only queues that fit the job 221 Queue queue = (Queue) genericQueueList.get(i); 222 if(queue.doesJobFit(job, request)){ 223 queuesForJobN.add(queue); 224 } 225 else{ 226 request.addToReportText(queue.getName() + " was droped because it will not fit job : " + job.getJobID()); 227 } 228 } 229 return queuesForJobN; 230 } 231 232 private static BufferedReader processCMD(String cmd) { 233 try{ 234 Process p = Runtime.getRuntime().exec(cmd); 235 BufferedReader stdInput = new BufferedReader(new InputStreamReader(p.getInputStream())); 236 return stdInput; 237 } catch (Exception e) { 238 System.out.println("exception happened - here's what I know: "); 239 e.printStackTrace(); 240 System.exit(-1); 241 return null; 242 } 243 } 244 245 public int QueueLoad(Queue queue){ //returns the load of a queue object * 100 246 247 //display something to the user so they know it's not chrashed 248 List display = new ArrayList(); 249 display.add("\b\b\b\b\b\b\b\b \\ "); 250 display.add("\b\b\b\b\b\b\b\b | "); 251 display.add("\b\b\b\b\b\b\b\b / "); 252 display.add("\b\b\b\b\b\b\b\b - "); 253 254 255 //List hosts = new ArrayList(); 256 loadSum = 0; 257 ncpu = 0; 258 MonaLisaHostInfo hostInfo = new MonaLisaHostInfo(); 259 MonaLisaPCHostInfoFinder finder = new MonaLisaPCHostInfoFinder(); 260 261 262 try { 263 BufferedReader br = processCMD("bqueues -l " + queue.getName()); 264 String line=null; 265 while ((line = br.readLine()) != null){ 266 if(line.startsWith("HOSTS:")) break; 267 } 268 line = line.replaceFirst("HOSTS:", "").trim(); 269 String lines=null; 270 if(line.endsWith("/")){ 271 br = processCMD("bmgroup " + line.replace('/', ' ').trim()); 272 273 while ((lines = br.readLine()) != null){ 274 if(lines.startsWith(line.replace('/', ' ').trim())) break; 275 /* else{ 276 log.warning("error finding the host of : " + queue.getName()); 277 System.out.println("error finding the host of : " + queue.getName()); 278 return -1; 279 }*/ 280 } 281 } 282 283 else lines = line; 284 285 //pars each host out 286 BreakIterator wordIterator = BreakIterator.getWordInstance(new Locale("en","US")); 287 lines = lines.replaceFirst(line.replace('/', ' ').trim(), " "); 288 wordIterator.setText(lines); 289 int start = wordIterator.first(); 290 int end = wordIterator.next(); 291 System.out.print("Getting info -> "); 292 while (end != BreakIterator.DONE) { 293 String host = lines.substring(start,end); 294 295 if (Character.isLetterOrDigit(host.charAt(0))) { 296 hostInfo = finder.getHostInfo(host+".rcf.bnl.gov"); 297 loadSum += hostInfo.getLoad5m(); 298 ncpu += hostInfo.getNoCPUs(); 299 300 // System.out.println(hostInfo.getLoad5m() + " = load cps = " + hostInfo.getNoCPUs()); 301 302 // Display something to keep the user occupied 303 System.out.print(display.get(0)); 304 display.add(display.get(0)); 305 display.remove(0); 306 307 //System.out.println("Load5m for host "+host+" : "+hostInfo.getLoad5m() ); //used for debuging 308 //System.out.println("noCPUs for host "+host+" : "+hostInfo.getNoCPUs() ); //used for debuging 309 } 310 start = end; 311 end = wordIterator.next(); 312 } 313 System.out.print("\b\b\b\b\b\b\b"); 314 315 } catch (Throwable t) { 316 System.out.println("error getting queueLoad !!\n" + t.toString()); 317 log.warn("error getting queueLoad !!\n" + t.toString()); 318 hasfailed = true; 319 return -1; 320 } 321 322 323 queue.Ncpus = ncpu; 324 return (int) ((loadSum / (double)ncpu) * 100); 325 } 326 327 public void orderQueues(){ 328 329 setSOP(); 330 try{ //If the queue table can not be printed let the user know in the log file 331 //addToReportText("The queues have been re-ordered :\n"); 332 //addToReportText(QueueInfo(localQueueList, genericQueueList)); 333 }catch (Exception e){ /*addToReportText("\n\nCould not print out Queue Talbe\n"); */} 334 335 } 336 337 338 public void setSOP(){ //gets all the data and sorts the queues 339 Queues queues = new Queues(); 340 System.out.println("Collecting fresh queue data"); 341 Queue queue; 342 /* Loking at the load is just not work !!!!!!!!! 343 //Set to SOP of every queue to the value of (queueload * 100) 344 for(int i = 0; (i < genericQueueList.size()); i++){ 345 queue = (Queue) genericQueueList.get(i); 346 347 int qload = QueueLoad(queue); 348 if(qload == -1) //if it fails try on more time 349 qload = QueueLoad(queue); 350 351 queue.setSearchOrderPriority(qload); 352 353 addToReportText(queue.getName() + " SOP changed to -> " + queue.getSearchOrderPriority()); 354 if(queue.getSearchOrderPriority() == -1) hasfailed = true; // make note that getting this value failed 355 System.out.println(queue.getName() + " SOP changed to -> " + queue.getSearchOrderPriority()); 356 } 357 358 //test i load is too high, if so move over to <ResponseTime> 359 genericQueueList = queues.OrderQueuesBySearchOrderPriority(genericQueueList); //order the queues by SOP 360 queue = (Queue) genericQueueList.get(0); 361 if(queue.getSearchOrderPriority() < (maxLoadBeforeSwichOverToRunTime * 100) ) return; //if the queue load is smaller then the hold over just return; 362 */ 363 usingLoad = false; 364 //request.addToReportText("Queue loads are too high, swiching to <ResponseTime>\n\n"); 365 System.out.println("Using <ResponseTime>"); 366 367 //Set the SOP of every queue to the <ResponseTime> 368 MonaLisaPCQueueInfoFinder qFinder = new MonaLisaPCQueueInfoFinder(); 369 QueueInfo qInfo = new MonaLisaQueueInfo(); 370 371 for(int i = 0; (i < genericQueueList.size()); i++){ 372 queue = (Queue) genericQueueList.get(i); 373 //qInfo = qFinder.getQueueInfo("LSF_star_cas_dd"); 374 qInfo = qFinder.getQueueInfo(getMLQueueName(queue)); 375 376 int qload = QueueLoad(queue); 377 if(qload == -1) //if it fails try on more time 378 qload = QueueLoad(queue); 379 380 //note: getAverageRunTime() is getting the <running time> 381 382 383 double estimatedResponseTime = qInfo.getEstimatedResponseTime(); 384 double averageRunTime = qInfo.getAverageRunTime(); 385 if(((int) estimatedResponseTime == -1)||((int) averageRunTime == -1))hasfailed = true; 386 387 int RTi =(int)(qInfo.getEstimatedResponseTime() + (((double)qInfo.getWaitingJobs() * qInfo.getAverageRunTime()) / (double)queue.Ncpus)); 388 queue.baseResponseTime = RTi; 389 queue.setSearchOrderPriority(RTi + (int) queue.accumulatedResponseTime); 390 391 //request.addToReportText(queue.getName() + " SOP changed to -> " + queue.getSearchOrderPriority() + "\n"); 392 System.out.println(queue.getName() + " SOP changed to -> " + queue.getSearchOrderPriority()); 393 } 394 395 return; 396 } 397 398 399 double maxLoadBeforeSwichOverToRunTime = 1.0; 400 401 public void setMaxLoadBeforeSwichOverToRunTime(double maxLoadBeforeSwichOverToRunTime){ 402 this.maxLoadBeforeSwichOverToRunTime = maxLoadBeforeSwichOverToRunTime; 403 } 404 405 public double getMaxLoadBeforeSwichOverToRunTime(){ 406 return maxLoadBeforeSwichOverToRunTime; 407 } 408 409 410 public void setMsTimeRefresh(long msTimeRefresh){ 411 this.msTimeRefresh = msTimeRefresh; 412 } 413 414 public long getMsTimeRefresh(){ 415 return msTimeRefresh; 416 } 417 418 }