001 /* 002 * $RCSfile: StatisticsRecorder.java,v $ 003 * 004 * Created on December 23, 2002, 4:40 PM 005 * 006 * This file is part of the STAR Scheduler. 007 * Copyright (c) 2002-2006 STAR Collaboration - Brookhaven National Laboratory 008 * 009 * STAR Scheduler is free software; you can redistribute it and/or modify 010 * it under the terms of the GNU General Public License as published by 011 * the Free Software Foundation; either version 2 of the License, or 012 * (at your option) any later version. 013 * 014 * STAR Scheduler is distributed in the hope that it will be useful, 015 * but WITHOUT ANY WARRANTY; without even the implied warranty of 016 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 017 * GNU General Public License for more details. 018 * 019 * You should have received a copy of the GNU General Public License 020 * along with STAR Scheduler; if not, write to the Free Software 021 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA 022 */ 023 package gov.bnl.star.offline.scheduler.util; 024 025 import gov.bnl.star.offline.scheduler.CatalogQuery; 026 import gov.bnl.star.offline.scheduler.ComponentLibrary; 027 import gov.bnl.star.offline.scheduler.Job; 028 import gov.bnl.star.offline.scheduler.dataset.DatasetSubset; 029 import gov.bnl.star.offline.scheduler.request.Request; 030 import gov.bnl.star.offline.scheduler.catalog.PhysicalFile; 031 import gov.bnl.star.offline.scheduler.util.ConfigToolkit; 032 033 import java.net.URL; 034 import java.sql.*; 035 import java.util.Date; 036 import java.util.Calendar; 037 import java.util.*; 038 import java.util.List; 039 import java.util.Map; 040 import java.util.logging.Level; 041 import org.apache.log4j.Logger; 042 043 044 /** Utility class to record scheduler statistics in a database. 045 * @author Gabriele Carcassi 046 * @version $Revision: 1.37 $ $Date: 2006/11/21 00:41:30 $ 047 */ 048 public class StatisticsRecorder { 049 static private Logger log = Logger.getLogger(StatisticsRecorder.class.getName()); 050 private static StatisticsRecorder theRecorder = new StatisticsRecorder(); 051 private int jobInsertCount; 052 private StringBuffer jobInsert; 053 private int inputFileInsertCount; 054 private StringBuffer inputFileInsert; 055 private Connection conn; 056 private Map configuration; 057 private long systemTime = System.currentTimeMillis(); 058 059 /** Creates a new instance of StatisticsRecorder */ 060 private StatisticsRecorder() { 061 //configuration = (Map) ComponentLibrary.getInstance().getComponent("statistics"); //part of config file reshape 062 configuration = (Map) ConfigToolkit.getToolkit().myLocalSite().getStatisticsConf(); 063 } 064 065 /** Retrieve the statistics recorder. 066 * @return the statistics recorder 067 */ 068 public static StatisticsRecorder getInstance() { 069 return theRecorder; 070 } 071 072 // private String replaceJobID(String jobID){ //Replaces the job Id with I new job ID in the old deprecated job id format. By doing it this way we do not have to change the db and other programs that us it. 073 // return String.valueOf(systemTime).concat(jobID.substring(jobID.indexOf("_"))); 074 // } 075 076 /** Record the statistics of the job request in the database. 077 * @param request the request to be recorded 078 */ 079 public void recordStatistics(Request request, List jobs) { 080 if (configuration == null) { 081 return; 082 } 083 084 085 086 try { 087 Class.forName("com.mysql.jdbc.Driver"); 088 } catch (Exception e) { 089 log.warn("JDBC driver not found", e); 090 091 return; 092 } 093 094 try { 095 String url = (String) configuration.get("URL"); 096 String username = (String) configuration.get("username"); 097 String password = (String) configuration.get("password"); 098 099 if (url == null) { 100 log.info( 101 "Statistics DB not specified. No statistics will be recorded"); 102 103 return; 104 } 105 106 conn = DriverManager.getConnection(url, username, password); 107 } catch (Exception e) { 108 log.error("Couldn't establish the connection to the stat DB", e); 109 110 return; 111 } 112 113 try { 114 System.out.print("Reporting statistics... "); 115 116 if (!request.getSimulation()) { 117 reportJobs(request, jobs); 118 reportInputFiles(request, jobs); 119 reportCatalogQueries(request); 120 } 121 122 reportRequest(request); 123 124 log.debug("Statistics logged in the db"); 125 System.out.println("done."); 126 } catch (Exception e) { 127 System.out.println("failed."); 128 log.error("Couldn't write statistics", e); 129 } 130 131 } 132 133 private void reportRequest(Request request) { 134 /* 135 String query = 136 "INSERT DELAYED INTO Request (jobID, simulateSubmission, name, " + 137 "mail, maxFilePerProcess, minFilesPerProcess, nQueries, nFiles, " + 138 "preferStorage, queryNFiles, singleCopy, stdout, stdin, stderror, " + 139 "filesPerHour, fileListSyntax, nOutput) " + 140 "VALUES (?, ?, ?, ?, ?, ?, ?, ?, ? ,?, ?, ?, ?, ?, ?, ?, ?)"; 141 */ 142 143 144 Calendar rightNow = Calendar.getInstance(); 145 String query = 146 "INSERT DELAYED INTO Request_" + String.valueOf(rightNow.get(Calendar.YEAR)).substring(2) + " (jobID_MD5, simulateSubmission, name, " + 147 "mail, maxFilePerProcess, minFilesPerProcess, nQueries, nFiles, " + 148 "preferStorage, queryNFiles, singleCopy, stdout, stdin, stderror, " + 149 "filesPerHour, fileListSyntax, nOutput, maxMem, minMem, minStorage, maxStorage, user) " + 150 "VALUES (?, ?, ?, ?, ?, ?, ?, ?, ? ,?, ?, ?, ?, ?, ?, ?, ?,?,?,?,?,?)"; 151 152 /* This table is not woring just right yet 153 String query = 154 "INSERT DELAYED INTO Request_all (jobID_MD5, simulateSubmission, name, " + 155 "mail, maxFilePerProcess, minFilesPerProcess, nQueries, nFiles, " + 156 "preferStorage, queryNFiles, singleCopy, stdout, stdin, stderror, " + 157 "filesPerHour, fileListSyntax, nOutput, maxMem, minMem, minStorage, maxStorage) " + 158 "VALUES (?, ?, ?, ?, ?, ?, ?, ?, ? ,?, ?, ?, ?, ?, ?, ?, ?,?,?,?,?)"; 159 */ 160 try { 161 PreparedStatement stmt = conn.prepareStatement(query); 162 //stmt.setInt(1, 0); 163 stmt.setString(1, request.getID()); 164 stmt.setString(2, Boolean.toString(request.getSimulation())); 165 stmt.setString(3, request.getName()); 166 stmt.setString(4, Boolean.toString(request.getMail())); 167 stmt.setInt(5, request.getResource("FilesPerProcess").getMax()); 168 stmt.setInt(6, request.getResource("FilesPerProcess").getMin()); 169 int nQueries = 0; 170 int nFiles = 0; 171 CatalogQuery catalogQuery = null; 172 List input = request.getInputList(); 173 for (int n = 0; n < input.size(); n++) { 174 if (input.get(n) instanceof CatalogQuery) { 175 nQueries++; 176 if (catalogQuery == null) catalogQuery = (CatalogQuery) input.get(n); 177 } 178 if (input.get(n) instanceof URL) nFiles++; 179 } 180 stmt.setInt(7, nQueries); 181 stmt.setInt(8, nFiles); 182 183 String storage; 184 Integer queryNFiles; 185 186 if (nQueries > 0) { 187 stmt.setString(9, 188 catalogQuery.getPreferStorage()); 189 stmt.setObject(10, 190 catalogQuery.getNFiles()); 191 stmt.setString(11, 192 Boolean.toString(catalogQuery.isSingleCopy())); 193 } else { 194 stmt.setNull(9, Types.VARCHAR); 195 stmt.setNull(10, Types.INTEGER); 196 stmt.setNull(11, Types.VARCHAR); 197 } 198 199 if (request.getStdOut() != null) { 200 stmt.setString(12, request.getStdOut().toString()); 201 } else { 202 stmt.setNull(12, Types.VARCHAR); 203 } 204 205 if (request.getStdIn() != null) { 206 stmt.setString(13, request.getStdIn().toString()); 207 } else { 208 stmt.setNull(13, Types.VARCHAR); 209 } 210 211 if (request.getStdErr() != null) { 212 stmt.setString(14, request.getStdErr().toString()); 213 } else { 214 stmt.setNull(14, Types.VARCHAR); 215 } 216 217 if (request.getFilesPerHour() == Double.POSITIVE_INFINITY) { 218 stmt.setNull(15, Types.DOUBLE); 219 } else { 220 stmt.setDouble(15, request.getFilesPerHour()); 221 } 222 stmt.setString(16, request.getFileListType()); 223 stmt.setInt(17, request.getOutputList().size()); 224 225 //request.getMaxMemory() 226 int maxMem = request.getResource("Memory").getMax(); 227 if (maxMem == -1) stmt.setNull(18, Types.INTEGER); 228 else stmt.setInt(18, maxMem); 229 230 int minMem = request.getResource("Memory").getMin(); 231 if (minMem == -1) stmt.setNull(19, Types.INTEGER); 232 else stmt.setInt(19, minMem); 233 234 int maxStorage = request.getResource("StorageSpace").getMax(); 235 if (maxStorage == -1) stmt.setNull(20, Types.INTEGER); 236 else stmt.setInt(20, maxStorage); 237 238 int minStorage = request.getResource("StorageSpace").getMin(); 239 if (minStorage == -1) stmt.setNull(21, Types.INTEGER); 240 else stmt.setInt(21, minStorage); 241 242 stmt.setString(22, System.getProperty("user.name")); 243 244 //System.out.println("\n\nsql request table ->" + stmt +"\n\n"); 245 246 log.debug("Query: " + query); 247 stmt.execute(); 248 } catch (Exception e) { 249 log.error("Couldn't write statistics: " + query, e); 250 throw new RuntimeException("Couldn't write statistics: " + e.getMessage()); 251 } 252 } 253 254 private void reportJobs(Request request, List jobs) { 255 String user = System.getProperty("user.name"); 256 257 for (int nJob = 0; nJob < jobs.size(); nJob++) { 258 Job job = (Job) jobs.get(nJob); 259 String node = job.getTarget(); 260 261 if (node == null) { 262 node = "any"; 263 } 264 265 String dispatcher = job.getAssociatedDispatcher().toString().substring(1 + job.getAssociatedDispatcher().toString().lastIndexOf("."),job.getAssociatedDispatcher().toString().lastIndexOf("@")); 266 267 reportJob(job.getJobID(), node, user, job.getQueueObj().getID(), job.getCluster(), dispatcher, ((Job) jobs.get(nJob)).getDispatchTime()); 268 } 269 270 flushJob(); 271 } 272 273 private void reportInputFiles(Request request, List jobs) { 274 for (int nJob = 0; nJob < jobs.size(); nJob++) { 275 Job job = (Job) jobs.get(nJob); 276 List list = job.getInput(); 277 278 for (int nFile = 0; nFile < list.size(); nFile++) { 279 PhysicalFile file = (PhysicalFile) list.get(nFile); 280 281 reportInputFile(job.getJobID(), file.getPath() + "/" + file.getFilename(), file.getStorage()); 282 } 283 } 284 285 flushInputFile(); 286 } 287 288 private void reportInputFile(String jobID, String filename, String storage) { 289 boolean first = false; 290 291 if (inputFileInsert == null) { 292 inputFileInsert = new StringBuffer( 293 "INSERT DELAYED INTO InputFile (jobID_MD5, processID, filename, storage) VALUES "); 294 inputFileInsertCount = 0; 295 first = true; 296 } 297 298 if (!first) { 299 inputFileInsert.append(", "); 300 } 301 302 inputFileInsert.append("('").append(jobID(jobID)).append("', ").append(processID(jobID)); 303 inputFileInsert.append(", '").append(filename).append("', '").append(storage); 304 inputFileInsert.append("')"); 305 inputFileInsertCount++; 306 307 //System.out.println("The DB input-> " + inputFileInsert.toString()); 308 309 310 if (inputFileInsertCount == 100) { 311 flushInputFile(); 312 } 313 } 314 315 private void flushInputFile() { 316 if (inputFileInsert == null) { 317 return; 318 } 319 320 try { 321 Statement stmt = conn.createStatement(); 322 stmt.execute(inputFileInsert.toString()); 323 log.debug("Query: " + inputFileInsert.toString()); 324 inputFileInsert = null; 325 } catch (Exception e) { 326 log.error("Couldn't write statistics: " + inputFileInsert.toString(), e); 327 throw new RuntimeException("Couldn't write statistics: " + 328 e.getMessage()); 329 } 330 } 331 // Write to old table 332 /* 333 private void reportJob(String jobID, String node, String user, String queue, String cluster) { 334 boolean first = false; 335 336 if (jobInsert == null) { 337 jobInsert = new StringBuffer( 338 "INSERT DELAYED INTO Job (jobID, processID, node, user, queue, cluster) VALUES "); 339 jobInsertCount = 0; 340 first = true; 341 } 342 343 if (!first) { 344 jobInsert.append(", "); 345 } 346 347 jobInsert.append("(").append(jobID(jobID)).append(", ").append(processID(jobID)); 348 jobInsert.append(", '").append(node).append("', '").append(user); 349 jobInsert.append("', '").append(queue).append("', '").append(cluster); 350 jobInsert.append("')"); 351 352 jobInsertCount++; 353 354 if (jobInsertCount == 100) { 355 flushJob(); 356 } 357 }*/ 358 359 private void reportJob(String jobID, String node, String user, String queue, String cluster, String dispatcher, int dispatchTime) { 360 boolean first = false; 361 362 if (jobInsert == null) { 363 //Have to get date 364 Date date = new Date(); 365 //date. 366 Calendar rightNow = Calendar.getInstance(); 367 368 //System.out.println(String.valueOf(rightNow.get(Calendar.YEAR)));/// 369 370 jobInsert = new StringBuffer("INSERT DELAYED INTO Job_"+ String.valueOf(rightNow.get(Calendar.YEAR)).substring(2) +" (dataID, jobID_MD5, processID, node, queue, cluster, dispatcher, dispatchTime) VALUES "); 371 372 //This table is not woring yet 373 //jobInsert = new StringBuffer("INSERT DELAYED INTO Job_all (dataID, jobID_MD5, processID, node, user, queue, cluster, dispatcher, dispatchTime) VALUES "); 374 375 jobInsertCount = 0; 376 first = true; 377 } 378 379 if (!first) jobInsert.append(", "); 380 381 382 jobInsert.append("( 0, '").append(jobID.substring(0,32)).append("', ").append(processID(jobID)); 383 jobInsert.append(", '").append(node).append("', '"); 384 jobInsert.append(queue).append("', '").append(cluster); 385 //job.getAssociatedDispatcher().toString().substring(1 + job.getAssociatedDispatcher().toString().lastIndexOf("."),job.getAssociatedDispatcher().toString().lastIndexOf("@")) 386 jobInsert.append("', '").append(dispatcher).append("',").append(dispatchTime); 387 jobInsert.append(")"); 388 389 // System.out.println(jobInsert);/// 390 391 jobInsertCount++; 392 393 // System.out.println("\n\n" + jobInsert); ////for debuging 394 395 if (jobInsertCount == 100) flushJob(); 396 397 } 398 399 400 401 private void flushJob() { 402 if (jobInsert == null) { 403 return; 404 } 405 406 try { 407 Statement stmt = conn.createStatement(); 408 stmt.execute(jobInsert.toString()); 409 log.debug("Query: " + jobInsert.toString()); 410 jobInsert = null; 411 } catch (Exception e) { 412 log.error("Couldn't write statistics: " + jobInsert.toString(), e); 413 throw new RuntimeException("Couldn't write statistics: " + e.getMessage()); 414 } 415 } 416 417 418 private void reportCatalogQueries(Request request){ 419 420 421 if(! request.getSimulation()) return; //do not record simulation 422 423 boolean writeQueriesReturningZeroFiles = false; 424 425 CatalogQuery catalogQuery = null; 426 boolean hasCatalogQuery = false; 427 int filesReturned = 0; 428 429 //refactor this to only record when one catalog queuery is used and no file list 430 for (int n = 0; n != request.getInputList().size(); n++) { 431 if (request.getInputList().get(n) instanceof CatalogQuery) { 432 hasCatalogQuery = true; 433 catalogQuery = (CatalogQuery) request.getInputList().get(n); 434 filesReturned += catalogQuery.getFilesReturned(); 435 } 436 } 437 438 /// 439 440 // 441 // for (int n = 0; n != request.getInputList().size(); n++) { 442 // if (request.getInputList().get(n) instanceof CatalogQuery) { 443 // CatalogQuery catalogQuery = (CatalogQuery) request.getInputList().get(n); 444 //System.out.println( catalogQuery.getFilesReturned() + "---------------> " + catalogQuery.getQuery()); //used for debugging 445 446 if(hasCatalogQuery){ 447 448 if((catalogQuery.getFilesReturned() > 0) || writeQueriesReturningZeroFiles){ 449 Calendar rightNow = Calendar.getInstance(); 450 451 StringBuffer SQLString; 452 int nEventsSelected = 0; 453 int nFilesSelected = 0; 454 455 456 List jobs = request.getJobs(); 457 for(int i = 0; i != jobs.size(); i++){ 458 DatasetSubset datasetSubset = ((Job) jobs.get(i)).getDatasetSubset(); 459 nFilesSelected += datasetSubset.getFilesInSubset(); 460 nEventsSelected += datasetSubset.getEventsInSubset(); 461 } 462 463 SQLString = new StringBuffer("INSERT DELAYED INTO Query_"+ String.valueOf(rightNow.get(Calendar.YEAR)).substring(2) +" (jobID_MD5, Query, nFilesReturned, nFilesSelected, nEventsSelected) VALUES ( " ); 464 if(request.getInputList().size() == 1 ){ //If this is the only input then the data is pure 465 SQLString.append(" \"" + request.getID() + "\" , \"" + catalogQuery.getCatalogName() +"?"+ catalogQuery.getQuery().replace('\"','\'') + "\", " + catalogQuery.getFilesReturned() + ", " + nFilesSelected +", " + nEventsSelected + " )"); 466 //System.out.println(">>>>>>>> " + SQLString.toString() ); 467 }else{ //If there is more then one catalog query there is no telling where the files came from 468 SQLString.append(" \"" + request.getID() + "\" , \"" + catalogQuery.getCatalogName() +"?"+ "other\", " + catalogQuery.getFilesReturned() + ", " + nFilesSelected +", " + nEventsSelected + " )"); 469 //System.out.println(">>>>>>>> " + SQLString.toString() ); 470 } 471 472 try { 473 Statement stmt = conn.createStatement(); 474 stmt.execute(SQLString.toString()); 475 log.debug("Query: " + SQLString.toString()); 476 477 } catch (Exception e) { 478 log.error("Couldn't write statistics: " + SQLString.toString(), e); 479 throw new RuntimeException("Couldn't write statistics: " + e.getMessage()); 480 } 481 SQLString = null; 482 } 483 484 } 485 486 } 487 488 489 490 private String jobID(String jobID) { 491 int separator = jobID.indexOf('_'); 492 493 return jobID.substring(0, separator); 494 } 495 496 private String processID(String jobID) { 497 int separator = jobID.indexOf('_'); 498 499 return jobID.substring(separator + 1); 500 } 501 } 502