001 /* 002 * $RCSfile: PassivePolicy.java,v $ 003 * 004 * Created on August 6, 2006, 10:15 AM 005 * 006 * This file is part of the STAR Scheduler. 007 * Copyright (c) 2002-2003 STAR Collaboration - Brookhaven National Laboratory 008 * 009 * STAR Scheduler is free software; you can redistribute it and/or modify 010 * it under the terms of the GNU General Public License as published by 011 * the Free Software Foundation; either version 2 of the License, or 012 * (at your option) any later version. 013 * 014 * STAR Scheduler is distributed in the hope that it will be useful, 015 * but WITHOUT ANY WARRANTY; without even the implied warranty of 016 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 017 * GNU General Public License for more details. 018 * 019 * You should have received a copy of the GNU General Public License 020 * along with STAR Scheduler; if not, write to the Free Software 021 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA 022 */ 023 package gov.bnl.star.offline.scheduler.policy; 024 025 026 import gov.bnl.star.offline.scheduler.dataset.catalog.STARCatalog; 027 import gov.bnl.star.offline.scheduler.dataset.datasetManipulators.DropDuplicateRegX; 028 import gov.bnl.star.offline.scheduler.dataset.datasetManipulators.DropDuplicatesFromSortedList; 029 import gov.bnl.star.offline.scheduler.dataset.datasetManipulators.DropMatchingRegX; 030 import gov.bnl.star.offline.scheduler.dataset.datasetManipulators.SplitByRegX; 031 import gov.bnl.star.offline.scheduler.dataset.datasetManipulators.SortByRegX; 032 import gov.bnl.star.offline.scheduler.dataset.datasetManipulators.SplitByMinMaxEntries; 033 import gov.bnl.star.offline.scheduler.dataset.Dataset; 034 import gov.bnl.star.offline.scheduler.dataset.DatasetSubset; 035 import gov.bnl.star.offline.scheduler.dataset.EntryCounter; 036 import gov.bnl.star.offline.scheduler.dataset.EntryParser; 037 import gov.bnl.star.offline.scheduler.dataset.SubsetValidator; 038 import gov.bnl.star.offline.scheduler.util.ConfigToolkit; 039 import gov.bnl.star.offline.scheduler.util.ThreadSafeFilesystemToolkit; 040 import java.io.File; 041 042 import org.globus.gsi.CertUtil; 043 import org.globus.gsi.GlobusCredential; 044 import org.globus.util.Util; 045 import org.globus.common.CoGProperties; 046 import org.globus.common.Version; 047 048 049 import gov.bnl.star.offline.scheduler.*; 050 import gov.bnl.star.offline.scheduler.catalog.*; 051 import gov.bnl.star.offline.scheduler.catalog.CatalogTask; 052 import gov.bnl.star.offline.scheduler.catalog.QueryResult; 053 import gov.bnl.star.offline.scheduler.catalog.StarCatalog; 054 import gov.bnl.star.offline.scheduler.request.Request; 055 import gov.bnl.star.offline.scheduler.policy.copyselector.*; 056 import gov.bnl.star.offline.scheduler.util.jobIDGen; 057 import gov.bnl.star.offline.scheduler.Dispatchers.DispatcherBase; 058 059 060 061 import gov.bnl.star.offline.scheduler.informationService.VORS; 062 063 import java.net.*; 064 import java.util.*; 065 066 import java.util.Iterator; 067 import java.util.List; 068 import java.util.Set; 069 //import java.util.logging.Level; 070 import org.apache.log4j.Logger; 071 072 073 /** Decides how to split the job without triggering any resource redistribution. 074 * The passive policy doesn't trigger any file copy: it just collects 075 * information and does the best it can. 076 * <p> 077 * The policy will use copy selectors and file assignments to build the division. 078 * Refer to the documentation of those pieces for the details. 079 * <p> 080 * Overview of what the policy does <br> 081 * The policy basically solves all the queries, thus creating a big list of 082 * files. Then, according to which machines the file reside, the files are divided 083 * into different processes. At that point a new process is created. 084 * <p>How to configure the policy 085 * <p>There are few things how have to set in the policy to configure it. First 086 * of all, the queue names: 087 * <ul> 088 * <li>localQueue - is the queue used if the job is assigned a local file.</li> 089 * <li>nfsQueue - is the queue used if the job wasn't assigned any local file.</li> 090 * <li>longQueue - is the queue used for long jobs, that is when the expected 091 * time is more than minTimeLimitQueue</li> 092 * </ul> 093 * You will also need to configure the minTimeLimitQueue, which sets the limit 094 * for the length of the jobs on the short queues. Both nfsQueue and localQueue 095 * are short queues, meaning that if any job longer than the time set, will go 096 * to the longQueue, regardless of their input. 097 * <p>The last thing you will need to set, is whether rootd is available on the 098 * cluster. This will allow the policy to assign a job files that are on different 099 * local disks. This helps to satisfy the requirements, such as minFilesPerProcess 100 * when using files on local disk. 101 * <p>An optional setting is the clusterName, which will be assigned to all the 102 * jobs being created by this policy. 103 * 104 * @author Gabriele Carcassi & Pavel Jakl & Levente Hajdu 105 * @version $Revision: 1.52 $ $Date: 2006/11/21 00:41:32 $ 106 */ 107 public class PassivePolicy implements Policy { 108 static public Logger log = Logger.getLogger(PassivePolicy.class.getName()); 109 110 private String jobIDBase; 111 private int processNumber; 112 private String nfsQueue; //Deprecated 113 private String localQueue; //Deprecated 114 protected String longQueue; //Deprecated 115 private static boolean rootdAvailable=false; // TODO: this is messy if several services exists - Need to change 116 private static boolean xrootdAvailable=false; // TODO: this is messy if several services exists - Need to change 117 private static String xrootdRedirectorName=null; 118 private static int xrootdPort=0; // TODO: this is messy if several services exists - Need to change 119 private int minTimeLimit; //Deprecated 120 private double timeLimit = ((double) minTimeLimit) / 60; 121 122 public List localQueueList = new ArrayList(); 123 public List genericQueueList = new ArrayList(); 124 125 126 /** Holds value of property clusterName. */ 127 private String clusterName; 128 129 /* Initializes the creation of the jobIDs for the processes of one job. 130 * It basically initializes the first part of the ID, which is actually 131 * the time at which this method is running. 132 */ 133 private void initJobIDs() { 134 135 jobIDBase = jobIDGen.GetJobIDType2(); 136 137 //jobIDBase = String.valueOf(System.currentTimeMillis()); 138 139 processNumber = 0; 140 } 141 142 /* Returns a unique jobID. Each call returns a unique String. 143 */ 144 private String getNextJobID() { 145 return "" + jobIDBase + "_" + processNumber++; 146 } 147 148 /* Executes all the queries to the file catalog contained in the job 149 * request using the STAR scheduler implementation. 150 * <p> 151 * The result is simply added to the input file list. 152 */ 153 /* 154 private QueryResult[] resolveCatalogQueries(Request request, CatalogQuery[] queries) { 155 QueryResult[] results = new QueryResult[queries.length]; 156 157 for (int nQuery = 0; nQuery < queries.length; nQuery++) { 158 CatalogQuery query = queries[nQuery]; 159 if (request.getInputOrder() != null) { 160 if (query.getAttributes() == null) { 161 query.setAttributes(request.getInputOrder()); 162 } else { 163 query.getAttributes().addAll(request.getInputOrder()); 164 } 165 } 166 System.out.print("Retrieving files for query : " + query.getQuery() + " "); 167 168 CatalogTask task = new CatalogTask(StarCatalog.class, query); 169 task.execute(); 170 171 if (task.getExitStatus() == -1) { 172 throw new RuntimeException( "A query to the file catalog failed."); 173 } 174 175 System.out.println(" " + task.getResult().getPhysicalCount() + " files retrieved."); 176 query.setFilesReturned(task.getResult().getPhysicalCount()); 177 178 //CopySelector selector = CopySelectorFactory.createCopySelector(query); 179 results[nQuery] = task.getResult(); 180 } 181 182 return results; 183 } 184 */ 185 186 private CatalogQuery[] getCatalogQueries(Request request) { 187 int nQueries = 0; 188 189 for (int n = 0; n < request.getInputList().size(); n++) { 190 if (request.getInputList().get(n) instanceof CatalogQuery) { 191 nQueries++; 192 } 193 } 194 195 CatalogQuery[] queries = new CatalogQuery[nQueries]; 196 197 for (int n = 0; n < queries.length; n++) { 198 queries[n] = (CatalogQuery) request.getInputList().get(n); 199 } 200 201 return queries; 202 } 203 204 /** 205 *This member associates the job to the queue. In order to set the association the following classes have to be set: 206 * job.setQueueObj(queue); 207 * job.setAccessMethod() 208 * job.setAccessPoint(); 209 **/ 210 public static void assignQueue(gov.bnl.star.offline.scheduler.Queue queue, Job job){ 211 job.setQueueObj(queue); 212 213 214 //////////////////////////////////////////find an access method for the job //////////////////////////////////// 215 boolean accessMethodWasNotSet = true; 216 List accessMethods = queue.getAccessMethods(); 217 if(ConfigToolkit.getToolkit().isLocalQueue(queue)){ //the queue is local, start with local the local access points 218 for(int i = 0; i != accessMethods.size(); i++){ 219 AccessMethod accessMethod = (AccessMethod) accessMethods.get(i); 220 if( ((LocalAccessPoint) accessMethod.getAccessPoints().get(0)).isLocal() ){ //It is assumed that if one accessPoint is local all accessPoints associated with that point will be local. 221 accessMethodWasNotSet = false; 222 job.setAccessMethod( accessMethod ); 223 } 224 } 225 if(accessMethodWasNotSet) System.out.println("Could not find local access point for " + queue.getID() + " looking for grid gatekeeper access point."); 226 } 227 228 229 if(accessMethodWasNotSet){ //if no local access method can be found try grid acess methods 230 for(int i = 0; i != accessMethods.size(); i++){ 231 AccessMethod accessMethod = (AccessMethod) accessMethods.get(i); 232 if(! ( (LocalAccessPoint) accessMethod.getAccessPoints().get(0) ).isLocal() ){ //It is assumed that if one accessPoint is local all accessPoints associated with that point will be local. 233 accessMethodWasNotSet = false; 234 job.setAccessMethod( accessMethod ); 235 } 236 } 237 } 238 if(accessMethodWasNotSet){ 239 throw new RuntimeException("No Grid accessMethod could be found for the queue: " + queue.getID() + "\nThis may be becuase the policy you are using was not intended to be run from this site or it may be a misconfiguration."); 240 } 241 242 243 244 ////////////////////////////////////find an access point /////////////////////////////////////////////////// 245 if(job.getAccessMethod().getAccessPoints().size() > 0) 246 job.setAccessPoint( (LocalAccessPoint) job.getAccessMethod().getAccessPoints().get(0) ); 247 else 248 throw new RuntimeException("No accessPoint could be found for the queue: " + queue.getID() + "\nThis may be becuase the policy you are using was not intended to be run from this site or it may be a misconfiguration."); 249 250 251 } 252 253 254 public void assignQueues(Request request, List jobs) { 255 256 gov.bnl.star.offline.scheduler.Queues queses = new Queues(); // Load all queues from the Queue object in the config file 257 258 259 /* 260 Object queueCheckFlag = ComponentLibrary.getInstance().getComponent("SUMS_FLAG_DO_NO_QUEUE_CHECK"); //get the ComponentLibrary flag to true off queue checking 261 262 // Test all queues if the queues do not pass the test drop them from the list 263 for(int i = 0; i < genericQueueList.size(); i++){ 264 gov.bnl.star.offline.scheduler.Queue queue = (gov.bnl.star.offline.scheduler.Queue) genericQueueList.get(i); 265 System.out.print("Testing queue (" + queue.getID() + "@" + queue.getBatchSystem().getSite().getSiteName() + ") : "); 266 if((! bypassQueueTests)&&(queueCheckFlag == null)){ 267 if(! queue.getAssociatedDispatcher().test(queue)){ 268 System.out.println("(faild)"); 269 if(! ConfigToolkit.getToolkit().isFlagSet("SUMS_FLAG_IGNORE_ERRORS")){ 270 String dropQueue = "The queue " + queue.getID() + (queue.getName()!=null?"("+ queue.getName() + ")":"") + " was dropped because it did not pass validation tests."; 271 System.out.println("Warning : " + dropQueue);//outoput notic the queue is bing dropped 272 log.warn(dropQueue); //log notic 273 genericQueueList.remove(i); //remove queue 274 i--; //When the list gets small becuase something is removed, you need to go back one index else that element will not be tested 275 } 276 } 277 else{ 278 System.out.println("(passed)"); 279 } 280 } 281 else{ 282 System.out.println("(Bypassed)"); 283 } 284 }*/ 285 286 287 if(genericQueueList.size() == 0){ //Test if queues where loaded. This also works as a hack for junit 288 System.out.println("Could not find any queues."); 289 log.warn("Could not find any queues."); 290 // System.exit(1); //This will kill the unit test, a bypass of some type is needed to privent this. 291 //job.addToReportText("Could not find any queues.\n\n"); 292 return; 293 } 294 295 //remove all the local queues for the generic queue list and put them on the local queue list 296 for(int i = 0; i < genericQueueList.size(); i++){ 297 gov.bnl.star.offline.scheduler.Queue queue = (gov.bnl.star.offline.scheduler.Queue) genericQueueList.get(i); 298 if(queue.IsOptimizedAsLocalQueue()){ 299 localQueueList.add(queue); 300 genericQueueList.remove(i); 301 i = 0; //If the size of the list changes start over again from first to last 302 } 303 } 304 305 //sort all the queues by "Search Order Priority" 306 gov.bnl.star.offline.scheduler.Queues queues = new Queues(); 307 localQueueList = queses.OrderQueuesBySearchOrderPriority(localQueueList); 308 genericQueueList = queses.OrderQueuesBySearchOrderPriority(genericQueueList); 309 310 try{ //If the queue table can not be printed let the user know in the log file 311 request.addToReportText(QueueInfo(localQueueList, genericQueueList)); 312 }catch (Exception e){ request.addToReportText("\n\nCould not print out Queue Talbe\n"); } 313 314 315 //assignQueues to jobs 316 request.addToReportText("\n*************************************************************************\n"); 317 request.addToReportText("* This is a history of Job assignments. *\n"); 318 request.addToReportText("* If your Jobs are going to the wrong queue, this may tell you why. *\n"); 319 request.addToReportText("*************************************************************************\n"); 320 for (int nJob = 0; nJob < jobs.size(); nJob++) { //loop over all jobs 321 322 Job job = (Job) jobs.get(nJob); 323 boolean jobHasNoQueue = true; 324 325 if((job.getTarget() != null) || "xrootd".equals(request.getFileListType()) || "rootd".equals(request.getFileListType())){ //if the job uses Ddisk files 326 for(int i = 0; (i < localQueueList.size()) && jobHasNoQueue; i++){ //loop over the list as until you get to the end or the job finds a q 327 gov.bnl.star.offline.scheduler.Queue queue = (gov.bnl.star.offline.scheduler.Queue) localQueueList.get(i); 328 job.addToReportText(job.getJobID() + " trying " + queue.getName() + "(" + queue.getID() + ")"); 329 if(queue.doesJobFit(job, request)){ 330 //job.setQueueObj(queue); 331 assignQueue(queue, job); 332 job.addToReportText(job.getJobID() + " assigned to " + queue.getName() + "(" + queue.getID() + ")"); 333 jobHasNoQueue = false; 334 localQueueList = queues.rotateSameLevelSearchOrderPriority(localQueueList); 335 } 336 else{ 337 job.addToReportText(queue.getMessages()); 338 } 339 } 340 } 341 342 if(jobHasNoQueue){ //if no queue has been found yet 343 for(int i = 0; (i < genericQueueList.size()) && jobHasNoQueue; i++){ //loop over the list as until you get to the end or the job finds a q 344 gov.bnl.star.offline.scheduler.Queue queue = (gov.bnl.star.offline.scheduler.Queue) genericQueueList.get(i); 345 job.addToReportText(job.getJobID() + " trying " + queue.getName() + "(" + queue.getID() + ")"); 346 if(queue.doesJobFit(job, request)){ 347 //job.setQueueObj(queue); 348 assignQueue(queue, job); 349 job.addToReportText(job.getJobID() + " assigned to " + queue.getName() + "(" + queue.getID() + ")"); 350 jobHasNoQueue = false; 351 genericQueueList = queues.rotateSameLevelSearchOrderPriority(genericQueueList); 352 } 353 else{ 354 job.addToReportText(queue.getMessages()); 355 } 356 } 357 } 358 359 if(jobHasNoQueue){ 360 //error code, to handel what happin if a job can not find a q 361 System.out.println("Job could not find a queue for " + job.getJobID() + " !!!!!!!!!!!!!\n The job will be removed."); 362 System.out.println("Job report \n" + job.GetReportText()); 363 log.warn("Job could not find a queue for " + job.getJobID() + " !!!!!!!!!!!!!"); 364 job.addToReportText("\n\nJob could not find a queue !!!!!!!!!!!!!\n The job will be removed."); 365 jobs.remove(nJob); //This will remove the job 366 nJob--; //because one job was removed,nJobs has to be set back by one job 367 } 368 369 job.addToReportText(" "); //between every job put a blank line in the report 370 } 371 372 } 373 374 private void askFeedback(SubsetValidator validator) { 375 log.info("Assignment not valid. Asking for user feedback"); 376 System.out.println(); 377 System.out.println("The scheduler wasn't able to meet the requirement.\nThis is the proposed distribution:"); 378 /* System.out.println(assignment.getReport()); */ 379 380 381 382 /////////////////////////////// print index 383 System.out.println("\n-------------------------"); 384 System.out.println("JobIndex -> FilesInJob -> EventsInJob"); 385 List indexs = validator.getDatasetIndexList(); 386 for(int i = 0; (i != (indexs.size() - 1)); i++){ 387 DatasetSubset datasetSubset = (DatasetSubset) indexs.get(i); 388 //System.out.println(i + " -> " + datasetSubset.getPhysicalFileList().size() ); 389 System.out.println("job" + i + " -> " + datasetSubset.getFilesInSubset() + "files -> " + datasetSubset.getEventsInSubset() + "Events" ); 390 } 391 System.out.println("\n-------------------------"); 392 ////////////////////////////// 393 394 395 System.out.print("Should it be dispatched anyway? (press y to dispatch) "); 396 397 if( ConfigToolkit.getToolkit().isFlagSet("SUMS_FLAG_IGNORE_ERRORS")) return; //ignore the error if the flag is set 398 399 try { 400 if (System.in.read() == 'y') 401 return; 402 } catch (Exception e) {} 403 404 log.fatal("User chose to cancel"); 405 throw new RuntimeException("Scheduling didn't meet user requirements, and user cancelled.\nTry changing minFilesPerProcess, or the number of files returned by the query."); 406 } 407 408 409 410 Comparator createComparator(List attrs) { 411 return new InputOrderComparator(attrs); 412 } 413 class InputOrderComparator implements Comparator { 414 private List attrs; 415 InputOrderComparator(List attrs) { 416 this.attrs = attrs; 417 } 418 public int compare(Object o1, Object o2){ 419 PhysicalFile p1 = (PhysicalFile) o1; 420 PhysicalFile p2 = (PhysicalFile) o2; 421 Iterator iter = attrs.iterator(); 422 while (iter.hasNext()) { 423 String att = (String) iter.next(); 424 Comparable a1 = (Comparable) p1.getAtribute(att); 425 Comparable a2 = (Comparable) p2.getAtribute(att); 426 if (a1 == null) return -1; 427 if (a2 == null) return -1; 428 int res = a1.compareTo(a2); 429 if (res != 0) return res; 430 } 431 return 0; 432 } 433 }; 434 435 436 437 438 private List createJobs(Request request, List datasetIndexList) { 439 440 /* 441 if (request.getInputOrder() != null) { 442 List attrs = request.getInputOrder(); 443 assignment.orderFiles(createComparator(attrs)); 444 } 445 446 Set locations = assignment.getLocations(); 447 Iterator iter = locations.iterator(); 448 List jobs = new ArrayList(); 449 450 while (iter.hasNext()) { 451 Location location = (Location) iter.next(); 452 String machine = null; 453 454 //if (!location.equals(Location.getNFS())&& !location.equals(Location.getHPSS())) { 455 //Don't set the node name if the file is xrootd or NSF or HPSS 456 if (!"xrootd".equals(request.getFileListType()) &&!location.equals(Location.getNFS())&& !location.equals(Location.getHPSS())) { 457 458 459 machine = location.toString(); 460 461 if ("".equals(machine)) { 462 machine = null; 463 } 464 } 465 466 for (int nProc = 0; nProc < assignment.minProcs(location);nProc++) { 467 468 List files = assignment.getFiles(location, nProc); 469 470 Job job = new Job(getNextJobID()); 471 job.setRequest(request); 472 job.setTarget(machine); 473 job.setStdin(request.getStdIn()); 474 job.setStdout(request.getStdOut()); 475 job.setStderr(request.getStdErr()); 476 job.setInput(new ArrayList()); 477 job.getInput().addAll(files); 478 job.setCluster(getClusterName()); 479 jobs.add(job); 480 } 481 } 482 return jobs; 483 * 484 */ 485 486 487 List jobs = new ArrayList(); 488 489 //datasetIndexList 490 for(int i = 0; (i != (datasetIndexList.size() - 1)); i++){ 491 492 DatasetSubset datasetSubset = (DatasetSubset) datasetIndexList.get(i); 493 //System.out.println("job index --->" + i); 494 495 Job job = new Job(getNextJobID()); 496 job.setRequest(request); 497 //job.setTarget(datasetSubset.getNode()); 498 job.setStdin(request.getStdIn()); 499 job.setStdout(request.getStdOut()); 500 job.setStderr(request.getStdErr()); 501 502 //job.setInput(new ArrayList()); 503 //job.getInput().addAll(files); //replaced 504 505 job.setDatasetSubset(datasetSubset); 506 507 job.setCluster(getClusterName()); 508 jobs.add(job); 509 510 } 511 512 return jobs; 513 514 } 515 516 /** Processes a job requests, splitting into multiple processes and assigning 517 * the target machines. 518 * <p> 519 * The step followed here are: 520 * <ul> 521 * <li>Initializes the job ID </li> 522 * <li>Executes the catalog queries</li> 523 * <li>Changes the maxFilePerProcess to fit in the short queue if possible</li> 524 * <li>Using AssignmentStrategyFactory, retrieves the assignmentStrategies 525 * for each query, and assign the file to the different machines</li> 526 * <li>If the job had no input, just dispatch a job with no input</li> 527 * <li>If the job had input, split into processes according to the distribution</li> 528 * <li>If the the distribution is invalid (some user constraint weren't met) 529 * ask user permission to continue</li> 530 * <li>Assigns the correct queue depending on the estimated job length</li> 531 * <li>Saves the report</li> 532 * </ul> 533 * Most of these actions are done in a separate private method. Refer to those 534 * for more information. 535 * @param request the job to be processed 536 */ 537 public List assignTargetMachine(Request request) { 538 539 540 ////////test block/////////// 541 // System.out.println(" debug point -1 "); 542 // DynamicConfigPolicy configpol = new DynamicConfigPolicy(); 543 // 544 // if(true) return configpol.assignTargetMachine(request); 545 ///////////////////////////// 546 547 548 549 log.info("Assign target with PassivePolicy"); 550 initJobIDs(); 551 AdjustIOFileNames(request); 552 553 554 //get user grid cert if any exist using globus API if there is not cert use <username>@<site> example lbhajdu@rhic.bnl.gov 555 try { 556 GlobusCredential proxy = null; 557 String file = CoGProperties.getDefault().getProxyFile(); 558 proxy = new GlobusCredential(file); 559 request.setAuthenticatedUser( proxy.getSubject() ); 560 } catch(Exception e) { 561 request.setAuthenticatedUser( System.getProperty("user.name") + "@" + ConfigToolkit.getToolkit().myLocalSite().getSiteName() ); 562 } 563 564 565 566 /////////////////////test block///////////////////////////// 567 /* 568 VORS vors = new VORS(); 569 vors.setService("http://vors.grid.iu.edu"); 570 vors.setSiteID("STAR-BNL"); 571 vors.refreshConnections(); 572 vors.refreshConnections(); 573 574 VORS vors2 = new VORS(); 575 vors2.setService("http://vors.grid.iu.edu"); 576 vors2.setSiteID("NERSC-PDSF"); 577 vors2.refreshConnections(); 578 */ 579 //////////////////////////////////////////////////////////// 580 /* //replaed by new fileBUffer based catalog 581 CatalogQuery[] queries = getCatalogQueries(request); 582 log.info("Executing catalog queries"); 583 QueryResult[] results = resolveCatalogQueries(request, queries); 584 log.debug("Assigning input files to processes accordint to their position"); 585 */ 586 587 ////////////////////lbh////Look at all the local queues in order end take the 1st one that has a time limit and try to make all jobs fit that 588 //gov.bnl.star.offline.scheduler.Queues queues = new Queues(); //load all the queues from the log file 589 590 ///////////////////////////////////////////////// 591 592 593 594 595 596 ////////////////////////////////////////////////clac the min/max files split///////////////////////////////////////// 597 int gap = 25;//% 598 int shortQueueTimeLimit = Integer.MAX_VALUE; 599 int optFiles = - 1; 600 int maxFiles = request.getResource("FilesPerProcess").getMax(); 601 int minFiles = request.getResource("FilesPerProcess").getMin(); 602 603 //get the queue with the shartest time limit 604 for(int i = 0; i < genericQueueList.size(); i++){ 605 gov.bnl.star.offline.scheduler.Queue queue = (gov.bnl.star.offline.scheduler.Queue) genericQueueList.get(i); 606 if( (queue.getTimeLimit() > 0) && (queue.getTimeLimit() < shortQueueTimeLimit )) { 607 shortQueueTimeLimit = queue.getTimeLimit(); 608 } 609 } 610 if((shortQueueTimeLimit != Integer.MAX_VALUE) && (request.getFilesPerHour() != Double.POSITIVE_INFINITY)){ //if there is some queue time data and files perhour from the user, clac how many will fit into the smallest queue 611 optFiles = (int) (((float)request.getFilesPerHour() * ((float)shortQueueTimeLimit / (float)60)) - .5 ); 612 if(optFiles < 1) optFiles = 1; 613 } 614 615 616 //System.out.println("optFiles-------------->" + optFiles); 617 618 String splitStatment; 619 String splitDebugging = " file splitting mode ---> "; 620 621 622 if((maxFiles == -1) && (minFiles==-1) && (optFiles == -1)){ //if the user sets notting just use somedefult value 623 splitDebugging = splitDebugging + "0"; 624 maxFiles = 150; 625 minFiles = (int) ((double)maxFiles - ((double)maxFiles * (double)gap * (double).01)) ; 626 splitStatment = "No values (maxFilesPerProcess, minFilesPerProcess, filesPerHour) to calculate an optimal file split values, so using default splitFileList(min=" + minFiles +", max="+ maxFiles +")"; 627 }else if((maxFiles > 0) && (minFiles==-1) && (optFiles == -1)){ //if only max files is given 628 splitDebugging = splitDebugging + "1"; 629 minFiles = (int) ((double)maxFiles - ((double)maxFiles * (double)gap * (double).01)) ; 630 splitStatment = "Using split valuse : splitFileList(min=" + minFiles +", max="+ maxFiles +")"; 631 }else if((maxFiles == -1) && (minFiles > 0) && (optFiles == -1)){ //if only min files is given 632 splitDebugging = splitDebugging + "2"; 633 maxFiles = (int) ((double)minFiles + ((double)minFiles * (double)gap * (double).01)) ; 634 splitStatment = "Using split valuse : splitFileList(min=" + minFiles +", max="+ maxFiles +")"; 635 }else if((maxFiles > 0) && (minFiles > 0) && (optFiles == -1)){ //if min and max files is given 636 splitDebugging = splitDebugging + "3"; 637 splitStatment = "Using split valuse : splitFileList(min=" + minFiles +", max="+ maxFiles +")"; 638 }else if((maxFiles == -1) && (minFiles == -1) && (optFiles > 0)){ //if optFiles via FilesPerHour is given 639 splitDebugging = splitDebugging + "4"; 640 maxFiles = optFiles; 641 minFiles = (int) ((double)maxFiles - ((double)maxFiles * (double)gap * (double).01)) ; 642 splitStatment = "Using split valuse : splitFileList(min=" + minFiles +", max="+ maxFiles +")"; 643 }else if((maxFiles > 0) && (minFiles > 0) && (optFiles > 0)){ //if optFiles via FilesPerHour is given 644 splitDebugging = splitDebugging + "5"; 645 if((minFiles < optFiles) && (optFiles < maxFiles)){ minFiles = optFiles; } //This is min becuase the splitter favors slipping by the min 646 splitStatment = "Using split valuse : splitFileList(min=" + minFiles +", max="+ maxFiles +")"; 647 }else if((maxFiles > 0) && (minFiles == -1) && (optFiles > 0)){ 648 splitDebugging = splitDebugging + "6"; 649 if(optFiles < ((double)maxFiles - ((double)maxFiles * (double)gap * (double).01))){ 650 minFiles = optFiles; 651 }else{ 652 minFiles = (int) ((double)maxFiles - ((double)maxFiles * (double)gap * (double).01)) ; 653 } 654 splitStatment = "Using split valuse : splitFileList(min=" + minFiles +", max="+ maxFiles +")"; 655 }else if((maxFiles == -1) && (minFiles > 0) && (optFiles > 0)){ 656 splitDebugging = splitDebugging + "7"; 657 if(optFiles < ((double)minFiles + ((double)minFiles * (double)gap * (double).01))){ 658 maxFiles = optFiles; 659 }else{ 660 maxFiles = (int) ((double)minFiles + ((double)minFiles * (double)gap * (double).01)); 661 } 662 splitStatment = "Using split valuse : splitFileList(min=" + minFiles +", max="+ maxFiles +")"; 663 }else{ 664 log.fatal("hit error trap when tring to clac min max file split"); 665 throw new RuntimeException("The PassivePolicy hit an error trap while trying to calculate a min/max split value."); 666 } 667 668 if(minFiles < 1) minFiles = 1; //this is in case the math makes the min smaller then zero 669 670 671 log.info(splitDebugging + 672 "\n MinSplit = " + minFiles + " (uses by splitter)" + 673 "\n maxSplit = " + maxFiles + " (uses by splitter)" + 674 "\n optFiles = " + optFiles + 675 "\n request.Maxfiles = " + request.getResource("FilesPerProcess").getMax() + 676 "\n request.Minfiles = " + request.getResource("FilesPerProcess").getMin() ); 677 678 //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// 679 680 681 //System.out.println(">>>>>>>>>>>>>>> maxFiles = " + maxFiles + ", minFiles = " + minFiles); 682 683 //System.out.println( "Integer.MAX_VALUE=" + Integer.MAX_VALUE ); 684 //System.out.println( "request.getFilesPerHour() = " + request.getFilesPerHour() ); 685 //System.out.println(gap); 686 //System.out.println(shortQueueTimeLimit); 687 //System.out.println(optFiles); 688 //System.out.println(maxFiles); 689 //System.out.println(minFiles); 690 691 692 //System.out.println("...............>" + request.getResource("FilesPerProcess").getMax()); 693 694 695 /* 696 int maxFiles = request.getResource("FilesPerProcess").getMax(); 697 698 699 request.getResource("FilesPerProcess"). 700 701 int newMaxFiles = request. 702 703 int newMaxFiles = (int) (((float)request.getFilesPerHour() * ((float)shortQueueTimeLimit / (float)60)) - .5 ); 704 */ 705 706 707 //Math.min( 708 709 710 ///////////////////////////////////////////////// 711 712 // List genericQueueList = queues.OrderQueuesBySearchOrderPriority(queues.getQueues(PPQueues)); //make a list of all the generic queue objects 713 714 /* 715 System.out.println("s - debug point 1"); 716 boolean requestResized = false; 717 for(int i = 0; i < genericQueueList.size(); i++){ //loop over the list as until you get to the end or the job finds a q 718 719 System.out.println("s - debug point 2"); 720 721 gov.bnl.star.offline.scheduler.Queue queue = (gov.bnl.star.offline.scheduler.Queue) genericQueueList.get(i); 722 if(queue.willJobsFit(request) && (! requestResized) && (queue.getTimeLimit() > 0)){ 723 724 System.out.println("s - debug point 3"); 725 726 timeLimit = (int)((float)queue.getTimeLimit() / (float)60); 727 // TODO this should be more general 728 // If it can fit in the short queue, then use it! 729 if ((request.getFilesPerHour() != Double.POSITIVE_INFINITY) && (((double)request.getResource("FilesPerProcess").getMin())/request.getFilesPerHour() < timeLimit)) { 730 731 System.out.println("s - debug point 4"); 732 733 //These two lines are almost the same however the one at the bottom has less round off error //LH 734 //request.setMaxFilesPerProcess(Math.min((int) (request.getFilesPerHour() * timeLimit), request.getMaxFilesPerProcess())); //you can't set it higher then MaxFilesPerProces 735 int maxFiles = request.getResource("FilesPerProcess").getMax(); 736 int newMaxFiles = Math.min((int) (((float)request.getFilesPerHour() * ((float)queue.getTimeLimit() / (float)60)) - .5 ), maxFiles); 737 if(newMaxFiles!=0){ 738 request.setResourceMax("FilesPerProcess",newMaxFiles); //you can't set it higher then MaxFilesPerProcess 739 } 740 request.addToReportText("Note : Setting MaxFilesPerProcess to " + newMaxFiles + " to try and optimized job length to fit in " + queue.getName() + " queue.\n"); 741 log.debug("Setting MaxFilesPerProcess to " + newMaxFiles + " to try and optimized job length to fit in " + queue.getName() + " queue."); 742 743 System.out.println("s - debug newmax=" + newMaxFiles); 744 } 745 requestResized = true; 746 } 747 } 748 */ 749 750 // log.finer("Number of input files to assign: " + assignment.g.getFileCount()); 751 // The job has no input 752 if (request.getInputList().size() == 0) { 753 // If the job was specified as not having input, then submit 754 // anywhere. 755 if (request.hasNoInput()) { 756 // If the number of processes is set, then submit the number of 757 // jobs. Otherwise just submit one 758 int nProcesses = request.getNProcesses(); 759 if (nProcesses == -1) nProcesses = 1; 760 761 List jobs = new ArrayList(); 762 for (int n = 0; n < nProcesses; n++) { 763 Job job = new Job(getNextJobID()); 764 job.setRequest(request); 765 job.setStdin(request.getStdIn()); 766 job.setStdout(request.getStdOut()); 767 job.setStderr(request.getStdErr()); 768 //job.setTarget(null); 769 //job.setInput(new ArrayList()); 770 request.setID(jobIDBase); 771 jobs.add(job); 772 } 773 774 assignQueues(request, jobs); //This is called there as a replacment for set cluster. 775 request.setJobs(jobs); //make pointers from the request to the job objects 776 return jobs; 777 } else { 778 // Otherwise don't submit anything 779 System.out.println("Queries and wildcards returned no input files"); 780 return null; 781 } 782 } 783 784 785 786 //////////////////// User Requested Special Sorting ////////////////////////////////////////////// 787 788 // query.setSingleCopy(true); 789 //req.prepareInputOrder(attrs.getValue("inputOrder")); 790 //attrs.getValue("preferStorage") 791 792 String preferStorage = null; 793 boolean singleCopy = true; 794 795 for (int n = 0; n < request.getInputList().size(); n++) { 796 Object object = request.getInputList().get(n); 797 if (object instanceof CatalogQuery) { 798 CatalogQuery query = (CatalogQuery) object; 799 if(preferStorage == null) preferStorage = query.getPreferStorage(); 800 else if(! query.getPreferStorage().equals(preferStorage)){ 801 System.out.println("Wringing : More then one preferStorage attribute can not honored !!!"); 802 } //else it is the same and we don't need to do anything 803 804 if(! query.isSingleCopy()){ 805 System.out.println("Wringing : You have specified “singleCopy = false” this means duplicate files may exist in you dataset, and they will not be filter out for any of your inputs!!!!!!!! Please be certain this is what you intended."); 806 singleCopy = false; 807 } 808 809 } 810 } 811 812 813 SortByRegX userRequestedSort = null; //defins user requested sorting 814 815 List inputOrderList = request.getInputOrder(); 816 if(inputOrderList != null){ 817 for(int i = 0; i != inputOrderList.size(); i++){ 818 String item = (String) inputOrderList.get(i); 819 820 if(userRequestedSort != null){ 821 System.out.println("Wanning : If more then one inputOrder is used, it may not be honored !!!"); 822 } 823 824 if(item.equals("fdid")){ 825 userRequestedSort = new SortByRegX("([0-9]*)::[a-zA-Z]*::[a-zA-Z]*::[a-zA-Z0-9:.]*::[^:]*::[^:/]*::[0-9]*.*"); 826 }else if(item.equals("storage")){ 827 userRequestedSort = new SortByRegX("[0-9]*::([a-zA-Z]*)::[a-zA-Z]*::[a-zA-Z0-9:.]*::[^:]*::[^:/]*::[0-9]*.*"); 828 }else if(item.equals("site")){ 829 userRequestedSort = new SortByRegX("[0-9]*::[a-zA-Z]*::([a-zA-Z]*)::[a-zA-Z0-9:.]*::[^:]*::[^:/]*::[0-9]*.*"); 830 }else if(item.equals("node")){ 831 userRequestedSort = new SortByRegX("[0-9]*::[a-zA-Z]*::[a-zA-Z]*::([a-zA-Z0-9:.]*)::[^:]*::[^:/]*::[0-9]*.*"); 832 }else if(item.equals("path")){ 833 userRequestedSort = new SortByRegX("[0-9]*::[a-zA-Z]*::[a-zA-Z]*::[a-zA-Z0-9:.]*::([^:]*)::[^:/]*::[0-9]*.*"); 834 }else if(item.equals("filename")){ 835 userRequestedSort = new SortByRegX("[0-9]*::[a-zA-Z]*::[a-zA-Z]*::[a-zA-Z0-9:.]*::[^:]*::([^:/]*)::[0-9]*.*"); 836 }else if(item.equals("events")){ 837 userRequestedSort = new SortByRegX("[0-9]*::[a-zA-Z]*::[a-zA-Z]*::[a-zA-Z0-9:.]*::[^:]*::[^:/]*::([0-9]*).*"); 838 }else { 839 userRequestedSort = new SortByRegX("[0-9]*::[a-zA-Z]*::[a-zA-Z]*::[a-zA-Z0-9:.]*::[^:]*::[^:/]*::[0-9]*::(.*)"); 840 } 841 } 842 } 843 844 845 846 ////////////////////////////////////////////////////////////////////////////////////////////////////////// 847 EntryParser entryParser = new EntryParser(); 848 entryParser.addLFN ("([0-9]*)::[a-zA-Z]*::[a-zA-Z]*::[a-zA-Z0-9:.]*::[^:]*::[^:/]*::[0-9]*.*"); 849 entryParser.addSTORAGE_SERVISE ("[0-9]*::([a-zA-Z]*)::[a-zA-Z]*::[a-zA-Z0-9:.]*::[^:]*::[^:/]*::[0-9]*.*"); 850 entryParser.addSITE ("[0-9]*::[a-zA-Z]*::([a-zA-Z]*)::[a-zA-Z0-9:.]*::[^:]*::[^:/]*::[0-9]*.*"); 851 entryParser.addHOST ("[0-9]*::[a-zA-Z]*::[a-zA-Z]*::([a-zA-Z0-9:.]*)::[^:]*::[^:/]*::[0-9]*.*"); 852 entryParser.addPATH ("[0-9]*::[a-zA-Z]*::[a-zA-Z]*::[a-zA-Z0-9:.]*::([^:]*)::[^:/]*::[0-9]*.*"); 853 entryParser.addFILE_NAME ("[0-9]*::[a-zA-Z]*::[a-zA-Z]*::[a-zA-Z0-9:.]*::[^:]*::([^:/]*)::[0-9]*.*"); 854 entryParser.addNUMBER_OF_EVENTS("[0-9]*::[a-zA-Z]*::[a-zA-Z]*::[a-zA-Z0-9:.]*::[^:]*::[^:/]*::([0-9]*).*"); 855 856 Dataset dataset = new Dataset("sched" + jobIDBase); 857 dataset.setEntryParser(entryParser); 858 859 860 gov.bnl.star.offline.scheduler.dataset.catalog.STARCatalog catalog = new STARCatalog(); 861 catalog.fillDataset(dataset ,request); 862 EntryCounter.countAndPrint(dataset, false); 863 864 865 //Defines objects for preparing the dataset 866 DropMatchingRegX dropHPSSFiles = new DropMatchingRegX(".*::HPSS::.*::.*::.*::.*::.*"); 867 DropMatchingRegX dropLocalRCRSFiles = new DropMatchingRegX("^[0-9]*::[a-zA-Z]*::[a-zA-Z]*::rcrs[a-zA-Z0-9:.]*::[^:]*::[^:/]*::[0-9]*.*"); 868 DropDuplicateRegX dropDuplicateLFN = new DropDuplicateRegX("([0-9]*)::[a-zA-Z]*::[a-zA-Z]*::[a-zA-Z0-9:.]*::[^:]*::[^:/]*::[0-9]*.*"); 869 //add pref storage here 870 871 872 if(preferStorage != null){ //This is of the user asks for a preferStorage 873 if(preferStorage.equals("NFS")) dropDuplicateLFN.addPreferredCopyRegx("A","^[0-9]*::NFS::.*"); //this is not truely needed because NFS is the top of the list already 874 else if((preferStorage.equals("local"))) dropDuplicateLFN.addPreferredCopyRegx("A","^[0-9]*::local::.*"); 875 else if((preferStorage.equals("HPSS"))) dropDuplicateLFN.addPreferredCopyRegx("A","^[0-9]*::HPSS::.*"); 876 } 877 878 879 dropDuplicateLFN.addPreferredCopyRegx("B","^[0-9]*::NFS::.*"); 880 dropDuplicateLFN.addPreferredCopyRegx("C","^[0-9]*::local::.*"); 881 dropDuplicateLFN.addPreferredCopyRegx("D","^[0-9]*::HPSS::.*"); 882 SortByRegX sortByNode = new SortByRegX("[0-9]*::[a-zA-Z]*::[a-zA-Z]*::([a-zA-Z0-9:.]*)::[^:]*::[^:/]*::[0-9]*.*"); 883 SplitByRegX groupByNode = new SplitByRegX("[0-9]*::[a-zA-Z]*::[a-zA-Z]*::([a-zA-Z0-9:.]*)::[^:]*::[^:/]*::[0-9]*.*"); 884 SplitByMinMaxEntries splitByMinMaxEntries = new SplitByMinMaxEntries(minFiles, maxFiles); 885 886 887 //Sorting and splitting for xrootd 888 //SortByRegX xrootdSort = new SortByRegX("[0-9]*::([a-zA-Z]*)::[a-zA-Z]*::[a-zA-Z0-9:.]*::([^:]*)::([^:/]*)::[0-9]*.*", "$1$2$3"); 889 //SplitByRegX SplitByStorage = new SplitByRegX("[0-9]*::([a-zA-Z]*)::[a-zA-Z]*::[a-zA-Z0-9:.]*::[^:]*::[^:/]*::[0-9]*.*"); 890 891 ////// 892 if("paths".equals(request.getFileListType())){ 893 894 System.out.println("-------Processing recovered dataset for paths-------"); 895 896 System.out.println("Dropping HPSS files"); 897 dropHPSSFiles.modify(dataset, request); //drop hpss files 898 EntryCounter.countAndPrint(dataset, false); 899 900 System.out.println("Dropping RCRS node node files"); 901 dropLocalRCRSFiles.modify(dataset, request); //drop files on rcrs nodes 902 EntryCounter.countAndPrint(dataset, false); 903 904 if(singleCopy){ 905 System.out.println("Dropping files with duplicate LFN"); 906 dropDuplicateLFN.modify(dataset, request); //drop files so there is only one of each LFN 907 EntryCounter.countAndPrint(dataset, false); 908 } 909 910 if(userRequestedSort == null){ 911 System.out.println("Sorting files by node"); 912 sortByNode.modify(dataset, request); //sort by node 913 EntryCounter.countAndPrint(dataset, false); 914 }else{ 915 System.out.println("Sorting files by user requested inputOrder"); 916 userRequestedSort.modify(dataset, request); 917 EntryCounter.countAndPrint(dataset, false); 918 } 919 920 921 System.out.println("Splitting by node"); 922 groupByNode.modify(dataset, request); //Grouping by node 923 EntryCounter.countAndPrint(dataset, false); 924 925 } 926 else if("rootd".equals(request.getFileListType())){ 927 928 System.out.println("-------Processing recovered dataset for rootd-------"); 929 930 System.out.println("Dropping HPSS files"); 931 dropHPSSFiles.modify(dataset, request); //drop hpss files 932 EntryCounter.countAndPrint(dataset, false); 933 934 System.out.println("Dropping RCRS node files"); 935 dropLocalRCRSFiles.modify(dataset, request); //drop files on rcrs nodes 936 EntryCounter.countAndPrint(dataset, false); 937 938 if(singleCopy){ 939 System.out.println("Dropping files with duplicate LFN"); 940 dropDuplicateLFN.modify(dataset, request); //drop files so there is only one of each LFN 941 EntryCounter.countAndPrint(dataset, false); 942 } 943 944 if(userRequestedSort == null){ 945 System.out.println("Sorting files by node"); 946 sortByNode.modify(dataset, request); //sort by node / but -not- split by node 947 EntryCounter.countAndPrint(dataset, false); 948 }else{ 949 System.out.println("Sorting files by user requested inputOrder"); 950 userRequestedSort.modify(dataset, request); 951 EntryCounter.countAndPrint(dataset, false); 952 } 953 954 //throw new RuntimeException("------------halted by code-----------"); 955 956 } 957 else if("xrootd".equals(request.getFileListType())){ 958 959 System.out.println("-------Processing recovered dataset for xrootd-------"); 960 961 // //ask P.J. what he wants to do here 962 // System.out.println("Dropping HPSS files"); 963 // dropHPSSFiles.modify(dataset, request); //drop hpss files 964 // EntryCounter.countAndPrint(dataset, false); 965 966 967 968 if(singleCopy){ 969 System.out.println("Dropping files with duplicate LFN"); 970 dropDuplicateLFN.modify(dataset, request); //drop files so there is only one of each LFN 971 EntryCounter.countAndPrint(dataset, false); 972 } 973 974 if(userRequestedSort != null){ 975 System.out.println("Sorting files by user requested inputOrder"); 976 userRequestedSort.modify(dataset, request); 977 EntryCounter.countAndPrint(dataset, false); 978 } 979 980 981 982 }else{ 983 throw new RuntimeException("FileListType is not known"); 984 } 985 986 splitByMinMaxEntries.modify(dataset, request); 987 EntryCounter.countAndPrint(dataset, false); 988 989 System.out.println("----------------------------------------------------"); 990 991 dataset.deleteBuffer(); 992 993 994 995 System.out.print("validating dataset .."); 996 997 SubsetValidator validator = new SubsetValidator(); 998 //setting up the validator 999 if("paths".equals(request.getFileListType())) validator.setMustHaveHomogeneousHost(true); 1000 else if("rootd".equals(request.getFileListType())) validator.setMustHaveHomogeneousHost(false); 1001 else if("xrootd".equals(request.getFileListType())) validator.setMustHaveHomogeneousHost(false); 1002 validator.setMustHaveHomogeneousSite(true); 1003 if(request.getResource("FilesPerProcess").getMax() != -1) validator.setMaxFiles(request.getResource("FilesPerProcess").getMax()); 1004 if(request.getResource("FilesPerProcess").getMin() != -1) validator.setMinFiles(request.getResource("FilesPerProcess").getMin()); 1005 1006 boolean testResult = validator.validate(dataset); 1007 System.out.println((testResult ? "..passed" : "..faild")); 1008 1009 if(! testResult){ 1010 System.out.println("The file splitter has encountered a problem meeting your request."); 1011 System.out.println( validator.getErrors() ); 1012 if(validator.isErrorFatal()) throw new RuntimeException("This error is fatal. Exitting...."); 1013 else{ 1014 //ask user 1015 askFeedback(validator); 1016 } 1017 } 1018 1019 1020 //if(true) throw new RuntimeException("-------- stopped --------"); 1021 1022 //int indexCount = validator.getIndexCount(); 1023 /* 1024 for(int x = 0; x != indexCount - 1; x++ ){ 1025 1026 System.out.println("getting index --->" + x); 1027 1028 DatasetSubset subset0 = new DatasetSubset(x, dataset); 1029 1030 1031 List pfiles = subset0.getPhysicalFileList(); 1032 */ 1033 /* //This code is used for debugging to print out the file list. 1034 System.out.println(">>>>>>>>>>>>>>>>index<<<<<<<<<<<<<<<<<"); 1035 for(int i = 0; i != pfiles.size(); i++){ 1036 PhysicalFile pfile = (PhysicalFile) pfiles.get(i); 1037 System.out.println(pfile.asURL().toString()); 1038 if (pfile.getAtribute("events") != null) { 1039 System.out.println(pfile.getAtribute("events")); 1040 } 1041 } 1042 System.out.println(">>>>>>>>>>>>>>>>index<<<<<<<<<<<<<<<<<"); 1043 1044 } 1045 */ 1046 ////////////////////////////////////////////////////////////////////////////////////////////////////////// 1047 1048 /* 1049 AssignmentStrategy assigner = AssignmentStrategyFactory.createCopySelector(); 1050 //TODO Temporary fix: should be refactored 1051 List fileList = new ArrayList(); 1052 for (int n = 0; n < request.getInputList().size(); n++) { 1053 if (request.getInputList().get(n) instanceof URL) 1054 fileList.add(request.getInputList().get(n)); 1055 } 1056 FileAssignment assignment = assigner.assignFiles(request, results, queries, fileList); 1057 */ 1058 1059 1060 1061 1062 1063 List jobs = createJobs(request, validator.getDatasetIndexList()); 1064 assignQueues(request, jobs); 1065 //request.addToReportText(assignment.getReport()); 1066 request.setJobs(jobs); //make pointers from the request to the job objects 1067 request.setID(jobIDBase); 1068 return jobs; 1069 1070 } 1071 1072 1073 /** Changes the name of the NFS queue. The NFS queue is used for all 1074 * those jobs that aren't assigned any file on local disk and whose 1075 * length is less than minTimeLimitQueue. 1076 */ 1077 public void setNfsQueue(String nfsQueue) { 1078 this.nfsQueue = nfsQueue; 1079 } 1080 1081 public String getNfsQueue() { 1082 return nfsQueue; 1083 } 1084 1085 /** Changes the name of the local queue. The local queue is used for all 1086 * those jobs that are assigned at least one file on local disk and whose 1087 * length is less than minTimeLimitQueue. 1088 */ 1089 public void setLocalQueue(String localQueue) { 1090 this.localQueue = localQueue; 1091 } 1092 1093 public String getLocalQueue() { 1094 return localQueue; 1095 } 1096 1097 /** Changes the name of the long queue. A job is assigned to the long queue 1098 * when the minFilePerProcess and filesPerHour do not allow a job length 1099 * smaller than minTimeLimitQueue. 1100 */ 1101 public void setLongQueue(String longQueue) { 1102 this.longQueue = longQueue; 1103 } 1104 1105 public String getLongQueue() { 1106 return longQueue; 1107 } 1108 1109 /** Sets the maximum duration of a job that will be sent to the short queue. 1110 * This is also the minimum duration of a job on the long queue. Both 1111 * the localQueue and the nfsQueue are considered short queues. 1112 */ 1113 public void setMinTimeLimitQueue(int minTimeLimit) { 1114 this.minTimeLimit = minTimeLimit; 1115 timeLimit = ((double) minTimeLimit) / 60; 1116 } 1117 1118 public int getMinTimeLimitQueue() { 1119 return minTimeLimit; 1120 } 1121 1122 /** Tells the policy that rootd is available on the cluster. Having rootd 1123 * available in the cluster means that, if the filelistSyntax of the job 1124 * is rootd, the job will be able to access local disk files from every node. 1125 * This enables the policy to assign to a single job files from different 1126 * local disks, which will make it easier to satisfy request requirement, 1127 * such as minFilesPerProcess. 1128 */ 1129 /* public void setRootdAvailable(boolean rootd) { 1130 // FIXME This is a very bad way to enable rootd... don't have much choice now... 1131 CopySelectorFactory.setCrossLocalDiskAccessEnabled(rootd); 1132 this.isRootdAvailable = rootd; 1133 } 1134 1135 public boolean isRootdAvailable() { 1136 return isRootdAvailable; 1137 } */ 1138 public static boolean isRootdAvailable() { 1139 return rootdAvailable; 1140 } 1141 1142 public static void setRootdAvailable(boolean rootdAvailable) { 1143 PassivePolicy.rootdAvailable = rootdAvailable; 1144 } 1145 1146 public static boolean isXrootdAvailable() { 1147 return xrootdAvailable; 1148 } 1149 1150 public static void setXrootdAvailable(boolean xrootdAvailable) { 1151 PassivePolicy.xrootdAvailable = xrootdAvailable; 1152 } 1153 1154 public static int getXrootdPort() { 1155 if(xrootdPort==0){ 1156 throw new RuntimeException("Please specify xrootd port number in your scheduler configuration file !!!"); 1157 } 1158 return xrootdPort; 1159 } 1160 1161 public static void setXrootdPort(int xrootdPort) { 1162 PassivePolicy.xrootdPort = xrootdPort; 1163 } 1164 1165 public static String getXrootdRedirectorName() { 1166 if(xrootdRedirectorName==null){ 1167 throw new RuntimeException("Please specify hostname of xrootd redirector node in your scheduler configuration file !!!"); 1168 } 1169 return xrootdRedirectorName; 1170 } 1171 1172 public static void setXrootdRedirectorName(String xrootdRedirectorName) { 1173 PassivePolicy.xrootdRedirectorName = xrootdRedirectorName; 1174 } 1175 1176 1177 /** Getter for property clusterName. 1178 * @return Value of property clusterName. 1179 * 1180 */ 1181 public String getClusterName() { 1182 return this.clusterName; 1183 } 1184 1185 /** Sets the name of the cluster that will be assigned to all the jobs. 1186 * This parameter allows to create a single CompositeDispatcher with all 1187 * the diffferent clusters. By changing this parameter in the policy, you 1188 * will then control on which cluster jobs will be dispatched, without 1189 * having to reconfigure the dispatchers. 1190 * 1191 */ 1192 1193 1194 public void setClusterName(String clusterName) { 1195 this.clusterName = clusterName; 1196 } 1197 1198 /////////////////////////////////////lbh///////////////////////////////// 1199 1200 public void ClearAllQueues() { //This is called from the config file to fill the PPQueues list 1201 localQueueList.clear(); 1202 genericQueueList.clear(); 1203 } 1204 1205 1206 1207 1208 //A small function for working with strings 1209 public String resize(String string, int size){ 1210 while(true){ 1211 if(string.length() == size) return string; 1212 if(string.length() > size){ 1213 string = string.substring(0,string.length() - 4); 1214 string = string.concat("~"); 1215 } 1216 if(string.length() < size) string = string.concat(" "); 1217 } 1218 } 1219 1220 //returns a table in string fromat on queues 1221 public String QueueInfo(List localQueueList, List genericQueueList){ 1222 1223 //Put all the queue objects together in on big list. 1224 List QueueList = new ArrayList(); 1225 QueueList.addAll(localQueueList); 1226 QueueList.addAll(genericQueueList); 1227 1228 //prit the qeues out by order of SearchOrderPriority 1229 gov.bnl.star.offline.scheduler.Queues queues = new Queues(); 1230 QueueList = queues.OrderQueuesBySearchOrderPriority(QueueList); 1231 1232 //get the lenght of the header of all the rows 1233 int IDLength = "ID".length(); 1234 int nameLength = "Name".length(); 1235 int typeLength = "Type".length(); 1236 int timeLimitLength = "TimeLimit".length(); 1237 int maxMenLength = "MaxMem".length(); 1238 int localLength = "Local".length(); 1239 int SOPLength = "S.O.P.".length(); 1240 int clusterLength = "Cluster".length(); 1241 1242 //get the max length of all the rows 1243 for(int i = 0; i < QueueList.size(); i++){ 1244 gov.bnl.star.offline.scheduler.Queue queue = (gov.bnl.star.offline.scheduler.Queue) QueueList.get(i); 1245 if(queue.getID() != null) IDLength = Math.max(IDLength, queue.getID().length()); 1246 else IDLength = Math.max(IDLength, "N/A".length()); 1247 nameLength = Math.max(nameLength, queue.getName().length()); 1248 if(queue.getType() == null) typeLength = Math.max(typeLength, queue.getType().length()); 1249 else typeLength = Math.max(typeLength, "N/A".length()); 1250 if(queue.getTimeLimit() != -1) timeLimitLength = Math.max(timeLimitLength, String.valueOf(queue.getTimeLimit()).length() + "min".length()); 1251 else timeLimitLength = Math.max(timeLimitLength, "none".length()); 1252 if(queue.getMaxMemory() != -1) maxMenLength = Math.max(maxMenLength, String.valueOf(queue.getMaxMemory()).length() + "MB".length()); 1253 else maxMenLength = Math.max(maxMenLength, "N/A".length()); 1254 //No test for the test of local is need because we know local is always one 1255 SOPLength = Math.max(SOPLength, String.valueOf(queue.getSearchOrderPriority()).length()); 1256 1257 if(queue.getCluster() != null) clusterLength = Math.max(clusterLength, queue.getCluster().length()); 1258 else clusterLength = Math.max(clusterLength, "none".length()); 1259 } 1260 1261 String Info =""; 1262 String table =""; 1263 1264 int tableWide = 1 + IDLength + 2 + nameLength + 2 + typeLength + 2 + timeLimitLength + 2 + maxMenLength + 2 + localLength + 2 + SOPLength + 2 + clusterLength; 1265 1266 String TableStart = "\n\n"; //draw the header 1267 for(int i = 0; i < (((tableWide) / 2) - (" Queues ".length() / 2)) ; i++) TableStart = TableStart.concat("-"); 1268 TableStart = TableStart.concat(" Queues "); 1269 for(int i = 0; i < (tableWide) ; i++) TableStart = TableStart.concat("-"); 1270 while((tableWide + 2) != TableStart.length()) TableStart = TableStart.substring(0,TableStart.length() - 1); //trim until it's the throws right size 1271 table = table.concat(TableStart); 1272 1273 //draw col names 1274 table = table.concat("\n " + resize("ID", IDLength + 2)); 1275 table = table.concat(resize("Name", nameLength + 2)); 1276 table = table.concat(resize("Type", typeLength + 2)); 1277 table = table.concat(resize( "TimeLimit" , timeLimitLength + 2)); 1278 table = table.concat(resize("MaxMem", maxMenLength + 2)); 1279 table = table.concat(resize("Local", localLength + 2)); 1280 table = table.concat(resize("S.O.P.", SOPLength + 2)); 1281 table = table.concat(resize("Cluster", clusterLength + 2) + "\n"); 1282 1283 //draw ----- line 1284 for(int i = 0; i < (tableWide) ; i++) table = table.concat("-"); 1285 1286 table = table.concat("\n"); 1287 1288 for(int i = 0; i < QueueList.size(); i++){ //print out all the local queues in the report file 1289 gov.bnl.star.offline.scheduler.Queue queue = (gov.bnl.star.offline.scheduler.Queue) QueueList.get(i); 1290 Info = " " + resize(queue.getID(), IDLength + 2); //15 1291 Info = Info.concat(resize(queue.getName(), nameLength + 2)); //18 1292 if(queue.getType() == null) 1293 Info = Info.concat(resize("N/A", typeLength + 2)); //10 1294 else 1295 Info = Info.concat(resize(queue.getType(), typeLength + 2)); //10 1296 if(queue.getTimeLimit() != -1) 1297 Info = Info.concat(resize( String.valueOf(queue.getTimeLimit()) + "min" , timeLimitLength + 2)); //10 1298 else 1299 Info = Info.concat(resize("N/A", timeLimitLength + 2)); //10 1300 if(queue.getMaxMemory() != -1) 1301 Info = Info.concat(resize( String.valueOf(queue.getMaxMemory()) + "MB" , maxMenLength + 2)); //7 1302 else 1303 Info = Info.concat(resize("none", maxMenLength + 2)); //7 1304 if(queue.IsOptimizedAsLocalQueue()) 1305 Info = Info.concat(resize("Y", localLength + 2)); 1306 else 1307 Info = Info.concat(resize("N", localLength + 2)); 1308 Info = Info.concat(resize( String.valueOf(queue.getSearchOrderPriority()), SOPLength + 2)); //7 1309 if(queue.getCluster() == null) 1310 Info = Info.concat("N/A"); 1311 else 1312 Info = Info.concat(queue.getCluster()); 1313 table = table.concat(Info + "\n"); 1314 } 1315 1316 //draw ----- 1317 for(int i = 0; i < (tableWide) ; i++) table = table.concat("-"); 1318 return table; 1319 } 1320 1321 1322 1323 public void addQueue(gov.bnl.star.offline.scheduler.Queue queue){ 1324 genericQueueList.add(queue); 1325 } 1326 1327 private boolean bypassQueueTests = false; 1328 public void setBypassQueueTests(boolean bypassQueueTests){this.bypassQueueTests = bypassQueueTests;} 1329 public boolean getBypassQueueTests(){return bypassQueueTests;} 1330 1331 1332 private void AdjustIOFileNames(Request request){ 1333 request.getTask().setStdErr(AdjustIOFileName(request.getTask().getStdErr(), "err")); 1334 request.getTask().setStdOut(AdjustIOFileName(request.getTask().getStdOut(), "out")); 1335 } 1336 1337 private URL AdjustIOFileName(URL url, String ext){ 1338 if(url == null) return url; 1339 //ThreadSafeFilesystemToolkit threadSafeFilesystemToolkit = new ThreadSafeFilesystemToolkit(); 1340 File file = new File(url.getPath()); 1341 if( file.isDirectory() ){ 1342 String path = url.getPath(); 1343 if(! (path.endsWith(System.getProperty("file.separator")))){ 1344 path = path + System.getProperty("file.separator"); 1345 } 1346 try { 1347 return new URL("file:" + url.getPath() + "$JOBID." + ext); 1348 } catch (MalformedURLException ex) { 1349 ex.printStackTrace(); 1350 throw new RuntimeException("MalformedURLException in passive police"); 1351 } 1352 1353 } else{ 1354 return url; 1355 } 1356 } 1357 1358 1359 1360 1361 }