001    /*
002     * $RCSfile: StatisticsRecorder.java,v $ 
003     *
004     * Created on December 23, 2002, 4:40 PM
005     *
006     * This file is part of the STAR Scheduler.
007     * Copyright (c) 2002-2006 STAR Collaboration - Brookhaven National Laboratory
008     *
009     * STAR Scheduler is free software; you can redistribute it and/or modify
010     * it under the terms of the GNU General Public License as published by
011     * the Free Software Foundation; either version 2 of the License, or
012     * (at your option) any later version.
013     *
014     * STAR Scheduler is distributed in the hope that it will be useful,
015     * but WITHOUT ANY WARRANTY; without even the implied warranty of
016     * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
017     * GNU General Public License for more details.
018     *
019     * You should have received a copy of the GNU General Public License
020     * along with STAR Scheduler; if not, write to the Free Software
021     * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
022     */
023    package gov.bnl.star.offline.scheduler.util;
024    
025    import gov.bnl.star.offline.scheduler.CatalogQuery;
026    import gov.bnl.star.offline.scheduler.ComponentLibrary;
027    import gov.bnl.star.offline.scheduler.Job;
028    import gov.bnl.star.offline.scheduler.dataset.DatasetSubset;
029    import gov.bnl.star.offline.scheduler.request.Request;
030    import gov.bnl.star.offline.scheduler.catalog.PhysicalFile;
031    import gov.bnl.star.offline.scheduler.util.ConfigToolkit;
032    
033    import java.net.URL;
034    import java.sql.*;
035    import java.util.Date;
036    import java.util.Calendar;
037    import java.util.*;
038    import java.util.List;
039    import java.util.Map;
040    import java.util.logging.Level;
041    import org.apache.log4j.Logger;
042    
043    
044    /** Utility class to record scheduler statistics in a database.
045     * @author Gabriele Carcassi
046     * @version $Revision: 1.37 $ $Date: 2006/11/21 00:41:30 $
047     */
048    public class StatisticsRecorder {
049        static private Logger log = Logger.getLogger(StatisticsRecorder.class.getName());
050        private static StatisticsRecorder theRecorder = new StatisticsRecorder();
051        private int jobInsertCount;
052        private StringBuffer jobInsert;
053        private int inputFileInsertCount;
054        private StringBuffer inputFileInsert;
055        private Connection conn;
056        private Map configuration;
057        private long systemTime = System.currentTimeMillis();
058    
059        /** Creates a new instance of StatisticsRecorder */
060        private StatisticsRecorder() {
061            //configuration = (Map) ComponentLibrary.getInstance().getComponent("statistics"); //part of config file reshape
062            configuration = (Map) ConfigToolkit.getToolkit().myLocalSite().getStatisticsConf();
063        }
064    
065        /** Retrieve the statistics recorder.
066         * @return the statistics recorder
067         */
068        public static StatisticsRecorder getInstance() {
069            return theRecorder;
070        }
071        
072    //    private String replaceJobID(String jobID){ //Replaces the job Id with I new job ID in the old deprecated job id format. By doing it this way we do not have to change the db and other programs that us it. 
073    //        return String.valueOf(systemTime).concat(jobID.substring(jobID.indexOf("_")));
074    //    }
075    
076        /** Record the statistics of the job request in the database.
077         * @param request the request to be recorded
078         */
079        public void recordStatistics(Request request, List jobs) {
080            if (configuration == null) {
081                return;
082            }
083            
084            
085    
086            try {
087                Class.forName("com.mysql.jdbc.Driver");
088            } catch (Exception e) {
089                log.warn("JDBC driver not found", e);
090    
091                return;
092            }
093    
094            try {
095                String url = (String) configuration.get("URL");
096                String username = (String) configuration.get("username");
097                String password = (String) configuration.get("password");
098    
099                if (url == null) {
100                    log.info(
101                        "Statistics DB not specified. No statistics will be recorded");
102    
103                    return;
104                }
105    
106                conn = DriverManager.getConnection(url, username, password);
107            } catch (Exception e) {
108                log.error("Couldn't establish the connection to the stat DB", e);
109    
110                return;
111            }
112    
113            try {
114                System.out.print("Reporting statistics... ");
115    
116                if (!request.getSimulation()) {
117                    reportJobs(request, jobs);
118                    reportInputFiles(request, jobs);
119                    reportCatalogQueries(request);
120                }
121    
122                reportRequest(request);
123    
124                log.debug("Statistics logged in the db");
125                System.out.println("done.");
126            } catch (Exception e) {
127                System.out.println("failed.");
128                log.error("Couldn't write statistics", e);
129            }
130                
131        }
132    
133        private void reportRequest(Request request) {
134            /*
135            String query =
136                "INSERT DELAYED INTO Request (jobID, simulateSubmission, name, " +
137                "mail, maxFilePerProcess, minFilesPerProcess, nQueries, nFiles, " +
138                "preferStorage, queryNFiles, singleCopy, stdout, stdin, stderror, " +
139                "filesPerHour, fileListSyntax, nOutput) " +
140                "VALUES (?, ?, ?, ?, ?, ?, ?, ?, ? ,?, ?, ?, ?, ?, ?, ?, ?)";
141            */
142            
143            
144            Calendar rightNow = Calendar.getInstance();
145            String query =
146                "INSERT DELAYED INTO Request_" + String.valueOf(rightNow.get(Calendar.YEAR)).substring(2) + " (jobID_MD5, simulateSubmission, name, " +
147                "mail, maxFilePerProcess, minFilesPerProcess, nQueries, nFiles, " +
148                "preferStorage, queryNFiles, singleCopy, stdout, stdin, stderror, " +
149                "filesPerHour, fileListSyntax, nOutput, maxMem, minMem, minStorage, maxStorage, user) " +
150                "VALUES (?, ?, ?, ?, ?, ?, ?, ?, ? ,?, ?, ?, ?, ?, ?, ?, ?,?,?,?,?,?)";
151            
152            /* This table is not woring just right yet
153            String query =
154                "INSERT DELAYED INTO Request_all (jobID_MD5, simulateSubmission, name, " +
155                "mail, maxFilePerProcess, minFilesPerProcess, nQueries, nFiles, " +
156                "preferStorage, queryNFiles, singleCopy, stdout, stdin, stderror, " +
157                "filesPerHour, fileListSyntax, nOutput, maxMem, minMem, minStorage, maxStorage) " +
158                "VALUES (?, ?, ?, ?, ?, ?, ?, ?, ? ,?, ?, ?, ?, ?, ?, ?, ?,?,?,?,?)";
159            */    
160            try {
161                PreparedStatement stmt = conn.prepareStatement(query);
162                //stmt.setInt(1, 0);
163                stmt.setString(1, request.getID());
164                stmt.setString(2, Boolean.toString(request.getSimulation()));
165                stmt.setString(3, request.getName());
166                stmt.setString(4, Boolean.toString(request.getMail()));
167                stmt.setInt(5, request.getResource("FilesPerProcess").getMax());
168                stmt.setInt(6, request.getResource("FilesPerProcess").getMin());
169                int nQueries = 0;
170                int nFiles = 0;
171                CatalogQuery catalogQuery = null;
172                List input = request.getInputList();
173                for (int n = 0; n < input.size(); n++) {
174                    if (input.get(n) instanceof CatalogQuery) { 
175                        nQueries++;
176                        if (catalogQuery == null) catalogQuery = (CatalogQuery) input.get(n);
177                    }
178                    if (input.get(n) instanceof URL) nFiles++;
179                }
180                stmt.setInt(7, nQueries);
181                stmt.setInt(8, nFiles);
182    
183                String storage;
184                Integer queryNFiles;
185    
186                if (nQueries > 0) {
187                    stmt.setString(9,
188                        catalogQuery.getPreferStorage());
189                    stmt.setObject(10,
190                        catalogQuery.getNFiles());
191                    stmt.setString(11,
192                        Boolean.toString(catalogQuery.isSingleCopy()));
193                } else {
194                    stmt.setNull(9, Types.VARCHAR);
195                    stmt.setNull(10, Types.INTEGER);
196                    stmt.setNull(11, Types.VARCHAR);
197                }
198    
199                if (request.getStdOut() != null) {
200                    stmt.setString(12, request.getStdOut().toString());
201                } else {
202                    stmt.setNull(12, Types.VARCHAR);
203                }
204    
205                if (request.getStdIn() != null) {
206                    stmt.setString(13, request.getStdIn().toString());
207                } else {
208                    stmt.setNull(13, Types.VARCHAR);
209                }
210    
211                if (request.getStdErr() != null) {
212                    stmt.setString(14, request.getStdErr().toString());
213                } else {
214                    stmt.setNull(14, Types.VARCHAR);
215                }
216                
217                if (request.getFilesPerHour() == Double.POSITIVE_INFINITY) {
218                    stmt.setNull(15, Types.DOUBLE);
219                } else {
220                    stmt.setDouble(15, request.getFilesPerHour());
221                }
222                stmt.setString(16, request.getFileListType());
223                stmt.setInt(17, request.getOutputList().size());
224                
225                //request.getMaxMemory()
226                int maxMem = request.getResource("Memory").getMax();
227                if (maxMem == -1) stmt.setNull(18, Types.INTEGER);
228                else stmt.setInt(18, maxMem);
229               
230                int minMem = request.getResource("Memory").getMin(); 
231                if (minMem == -1) stmt.setNull(19, Types.INTEGER);
232                else stmt.setInt(19, minMem);
233    
234                int maxStorage = request.getResource("StorageSpace").getMax(); 
235                if (maxStorage == -1) stmt.setNull(20, Types.INTEGER);
236                else stmt.setInt(20, maxStorage);
237    
238                int minStorage = request.getResource("StorageSpace").getMin(); 
239                if (minStorage == -1) stmt.setNull(21, Types.INTEGER);
240                else stmt.setInt(21, minStorage);
241                
242                stmt.setString(22, System.getProperty("user.name"));
243                
244                //System.out.println("\n\nsql request table ->" + stmt +"\n\n");
245    
246                log.debug("Query: " + query);
247                stmt.execute();
248            } catch (Exception e) {
249                log.error("Couldn't write statistics: " + query, e);
250                throw new RuntimeException("Couldn't write statistics: " + e.getMessage());
251            }
252        }
253    
254        private void reportJobs(Request request, List jobs) {
255            String user = System.getProperty("user.name");
256    
257            for (int nJob = 0; nJob < jobs.size(); nJob++) {
258                Job job = (Job) jobs.get(nJob);
259                String node = job.getTarget();
260    
261                if (node == null) {
262                    node = "any";
263                }
264                
265                String dispatcher = job.getAssociatedDispatcher().toString().substring(1 + job.getAssociatedDispatcher().toString().lastIndexOf("."),job.getAssociatedDispatcher().toString().lastIndexOf("@"));
266    
267                reportJob(job.getJobID(), node, user, job.getQueueObj().getID(), job.getCluster(), dispatcher, ((Job) jobs.get(nJob)).getDispatchTime());
268            }
269    
270            flushJob();
271        }
272    
273        private void reportInputFiles(Request request, List jobs) {
274            for (int nJob = 0; nJob < jobs.size(); nJob++) {
275                Job job = (Job) jobs.get(nJob);
276                List list = job.getInput();
277    
278                for (int nFile = 0; nFile < list.size(); nFile++) {
279                    PhysicalFile file = (PhysicalFile) list.get(nFile);
280    
281                    reportInputFile(job.getJobID(), file.getPath() + "/" + file.getFilename(), file.getStorage());
282                }
283            }
284    
285            flushInputFile();
286        }
287    
288        private void reportInputFile(String jobID, String filename, String storage) {
289            boolean first = false;
290    
291            if (inputFileInsert == null) {
292                inputFileInsert = new StringBuffer(
293                        "INSERT DELAYED INTO InputFile (jobID_MD5, processID, filename, storage) VALUES ");
294                inputFileInsertCount = 0;
295                first = true;
296            }
297    
298            if (!first) {
299                inputFileInsert.append(", ");
300            }
301    
302            inputFileInsert.append("('").append(jobID(jobID)).append("', ").append(processID(jobID));
303            inputFileInsert.append(", '").append(filename).append("', '").append(storage);
304            inputFileInsert.append("')");
305            inputFileInsertCount++;
306            
307            //System.out.println("The DB input-> " + inputFileInsert.toString());
308            
309    
310            if (inputFileInsertCount == 100) {
311                flushInputFile();
312            }
313        }
314    
315        private void flushInputFile() {
316            if (inputFileInsert == null) {
317                return;
318            }
319    
320            try {
321                Statement stmt = conn.createStatement();
322                stmt.execute(inputFileInsert.toString());
323                log.debug("Query: " + inputFileInsert.toString());
324                inputFileInsert = null;
325            } catch (Exception e) {
326                log.error("Couldn't write statistics: " + inputFileInsert.toString(), e);
327                throw new RuntimeException("Couldn't write statistics: " +
328                    e.getMessage());
329            }
330        }
331      //  Write to old table 
332        /*
333       private void reportJob(String jobID, String node, String user, String queue, String cluster) {
334            boolean first = false;
335    
336            if (jobInsert == null) {
337                jobInsert = new StringBuffer(
338                        "INSERT DELAYED INTO Job (jobID, processID, node, user, queue, cluster) VALUES ");
339                jobInsertCount = 0;
340                first = true;
341            }
342    
343            if (!first) {
344                jobInsert.append(", ");
345            }
346    
347            jobInsert.append("(").append(jobID(jobID)).append(", ").append(processID(jobID));
348            jobInsert.append(", '").append(node).append("', '").append(user);
349            jobInsert.append("', '").append(queue).append("', '").append(cluster);
350            jobInsert.append("')");
351    
352            jobInsertCount++;
353    
354            if (jobInsertCount == 100) {
355                flushJob();
356            }
357        }*/
358         
359         private void reportJob(String jobID, String node, String user, String queue, String cluster, String dispatcher, int dispatchTime) {
360            boolean first = false;
361    
362            if (jobInsert == null) {
363                //Have to get date
364                Date date = new Date();
365                //date.
366                Calendar rightNow = Calendar.getInstance();
367                
368                //System.out.println(String.valueOf(rightNow.get(Calendar.YEAR)));///
369                
370                jobInsert = new StringBuffer("INSERT DELAYED INTO Job_"+ String.valueOf(rightNow.get(Calendar.YEAR)).substring(2) +" (dataID, jobID_MD5, processID, node, queue, cluster, dispatcher, dispatchTime) VALUES ");
371                
372                //This table is not woring yet
373                //jobInsert = new StringBuffer("INSERT DELAYED INTO Job_all (dataID, jobID_MD5, processID, node, user, queue, cluster, dispatcher, dispatchTime) VALUES ");
374                
375                jobInsertCount = 0;
376                first = true;
377            }
378    
379            if (!first) jobInsert.append(", ");
380            
381    
382            jobInsert.append("( 0, '").append(jobID.substring(0,32)).append("', ").append(processID(jobID));
383            jobInsert.append(", '").append(node).append("', '");
384            jobInsert.append(queue).append("', '").append(cluster);
385            //job.getAssociatedDispatcher().toString().substring(1 + job.getAssociatedDispatcher().toString().lastIndexOf("."),job.getAssociatedDispatcher().toString().lastIndexOf("@"))
386            jobInsert.append("', '").append(dispatcher).append("',").append(dispatchTime);
387            jobInsert.append(")");
388            
389           // System.out.println(jobInsert);///
390    
391            jobInsertCount++;
392            
393    //        System.out.println("\n\n" + jobInsert); ////for debuging 
394    
395            if (jobInsertCount == 100) flushJob();
396            
397        }     
398        
399        
400        
401        private void flushJob() {
402            if (jobInsert == null) {
403                return;
404            }
405    
406            try {
407                Statement stmt = conn.createStatement();
408                stmt.execute(jobInsert.toString());
409                log.debug("Query: " + jobInsert.toString());
410                jobInsert = null;
411            } catch (Exception e) {
412                log.error("Couldn't write statistics: " + jobInsert.toString(), e);
413                throw new RuntimeException("Couldn't write statistics: " + e.getMessage());
414            }
415        }
416        
417        
418        private void reportCatalogQueries(Request request){
419            
420            
421            if(! request.getSimulation()) return; //do not record simulation
422            
423            boolean writeQueriesReturningZeroFiles = false;
424                
425                CatalogQuery catalogQuery = null;
426                boolean hasCatalogQuery = false;
427                int filesReturned = 0;
428            
429                //refactor this to only record when one catalog queuery is used and no file list
430                for (int n = 0; n != request.getInputList().size(); n++) {
431                    if (request.getInputList().get(n) instanceof CatalogQuery) {
432                        hasCatalogQuery = true;
433                        catalogQuery = (CatalogQuery) request.getInputList().get(n);
434                        filesReturned += catalogQuery.getFilesReturned();
435                    }
436                }    
437            
438                ///
439            
440    //        
441    //            for (int n = 0; n != request.getInputList().size(); n++) {
442    //                if (request.getInputList().get(n) instanceof CatalogQuery) {
443    //                  CatalogQuery catalogQuery = (CatalogQuery) request.getInputList().get(n);
444                        //System.out.println( catalogQuery.getFilesReturned() + "---------------> " + catalogQuery.getQuery()); //used for debugging 
445                     
446                if(hasCatalogQuery){
447                
448                        if((catalogQuery.getFilesReturned() > 0) || writeQueriesReturningZeroFiles){
449                            Calendar rightNow = Calendar.getInstance();
450                            
451                            StringBuffer SQLString;
452                            int nEventsSelected = 0;
453                            int nFilesSelected = 0;      
454                            
455                            
456                            List jobs = request.getJobs();
457                            for(int i = 0; i != jobs.size(); i++){
458                                DatasetSubset datasetSubset = ((Job) jobs.get(i)).getDatasetSubset();
459                                nFilesSelected += datasetSubset.getFilesInSubset();
460                                nEventsSelected += datasetSubset.getEventsInSubset();
461                            }
462                            
463                            SQLString = new StringBuffer("INSERT DELAYED INTO Query_"+ String.valueOf(rightNow.get(Calendar.YEAR)).substring(2) +" (jobID_MD5, Query, nFilesReturned, nFilesSelected, nEventsSelected) VALUES ( " );
464                            if(request.getInputList().size() == 1 ){ //If this is the only input then the data is pure
465                                SQLString.append(" \"" + request.getID() + "\" , \"" + catalogQuery.getCatalogName() +"?"+ catalogQuery.getQuery().replace('\"','\'') + "\", " + catalogQuery.getFilesReturned() + ", " + nFilesSelected +", " + nEventsSelected + " )");
466                                //System.out.println(">>>>>>>> " + SQLString.toString() );
467                            }else{ //If there is more then one catalog query there is no telling where the files came from
468                                SQLString.append(" \"" + request.getID() + "\" , \"" + catalogQuery.getCatalogName() +"?"+ "other\", " + catalogQuery.getFilesReturned() + ", " + nFilesSelected +", " + nEventsSelected + " )");
469                                //System.out.println(">>>>>>>> " + SQLString.toString() );
470                            }
471    
472                            try {
473                                Statement stmt = conn.createStatement();
474                                stmt.execute(SQLString.toString());
475                                log.debug("Query: " + SQLString.toString());
476    
477                            } catch (Exception e) {
478                                log.error("Couldn't write statistics: " + SQLString.toString(), e);
479                                throw new RuntimeException("Couldn't write statistics: " + e.getMessage());
480                            }
481                            SQLString = null;
482                        }
483    
484                }
485      
486        }
487        
488        
489    
490        private String jobID(String jobID) {
491            int separator = jobID.indexOf('_');
492    
493            return jobID.substring(0, separator);
494        }
495    
496        private String processID(String jobID) {
497            int separator = jobID.indexOf('_');
498    
499            return jobID.substring(separator + 1);
500        }
501    }
502