00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016 #ifdef WIN32
00017 #include <windows.h>
00018 #endif
00019
00020 #include "MySQLAppender.h"
00021 #include "TSystem.h"
00022 #include "TString.h"
00023
00024 #if 1
00025
00026
00027 #include <log4cxx/helpers/loglog.h>
00028 #include <log4cxx/helpers/optionconverter.h>
00029 #include <log4cxx/patternlayout.h>
00030
00031 using namespace log4cxx;
00032 using namespace log4cxx::helpers;
00033 using namespace log4cxx::db;
00034 using namespace log4cxx::spi;
00035
00036 IMPLEMENT_LOG4CXX_OBJECT(MySQLAppender)
00037
00038
00039 MySQLAppender::MySQLAppender()
00040 : connection(0), bufferSize(5),fLastId(0),fIsConnectionOpen(false)
00041 {
00042
00043 }
00044
00045
00046 MySQLAppender::~MySQLAppender()
00047 {
00048
00049 finalize();
00050 }
00051
00052
00053 void MySQLAppender::setOption(const String& option,
00054 const String& value)
00055 {
00056 if (equalsIgnoreCase(option, _T("buffersize")))
00057 {
00058 setBufferSize((size_t)OptionConverter::toInt(value, 1));
00059 }
00060 else if (equalsIgnoreCase(option, _T("password")))
00061 {
00062 setPassword(value);
00063 }
00064 else if (equalsIgnoreCase(option, _T("sql")))
00065 {
00066 setSql(value);
00067 }
00068 else if (equalsIgnoreCase(option, _T("url"))
00069 || equalsIgnoreCase(option, _T("dns")))
00070 {
00071 setURL(value);
00072 }
00073 else if (equalsIgnoreCase(option, _T("user")))
00074 {
00075 setUser(value);
00076 }
00077 else
00078 {
00079 AppenderSkeleton::setOption(name, value);
00080 }
00081 }
00082
00083
00084 void MySQLAppender::append(const spi::LoggingEventPtr& event)
00085 {
00086 buffer.push_back(event);
00087
00088 if (buffer.size() >= bufferSize)
00089 flushBuffer();
00090 }
00091
00092
00093
00094 String MySQLAppender::getLogStatement(const spi::LoggingEventPtr& event)
00095 {
00096 #if (STAR_LOG4CXX_VERSION == 9)
00097 StringBuffer sbuf;
00098 ((MySQLAppender*)this)->getLayout()->format(sbuf, event);
00099 return sbuf.str();
00100 #else
00101 String sbuf;
00102 ((MySQLAppender*)this)->getLayout()->format(sbuf, event,pool);
00103 return sbuf;
00104 #endif
00105 }
00106
00107
00108 unsigned int MySQLAppender::execute(const String& sql)
00109 {
00110 unsigned int ret=1;
00111 if (getConnection()) {
00112
00113
00114
00115
00116
00117 String query = sql;
00118 if (( ret = mysql_query(connection,query.c_str()) )) {
00119 fprintf(stderr, "MYSQL QUERY: %s \n",mysql_error(connection));
00120 } else {
00121
00122
00123
00124
00125 }
00126 }
00127
00128 return ret;
00129
00130 }
00131
00132
00133
00134 void MySQLAppender::closeConnection()
00135 {
00136 if (fIsConnectionOpen) {
00137
00138 mysql_close(connection);
00139 if (mysql_errno(connection)) fprintf(stderr,"MYSQL close ERROR %s \n",mysql_error(connection));
00140 connection = 0;
00141 fIsConnectionOpen = false;
00142 }
00143 }
00144
00145
00146 MYSQL *MySQLAppender::getConnection()
00147 {
00148 if (!fIsConnectionOpen) {
00149
00150 if ( !(connection= mysql_init(connection)) ) {
00151 fprintf(stderr,"MYSQL: ---- > No init connection \n");
00152 } else {
00153
00154 const char *host = "heston.star.bnl.gov";
00155 const char *user = "StarLogger";
00156 const char *passwd = "logger";
00157 const char *db = "logger";
00158 unsigned int port = 3306;
00159
00160 if (!(mysql_real_connect(connection
00161 , host
00162 , user
00163 , passwd
00164 , db
00165 , port
00166 , 0,0
00167 )))
00168 {
00169 fprintf(stderr, "MYSQL: ---- > No connection: %s \n",mysql_error(connection));
00170 connection = 0;
00171 fIsConnectionOpen = false;
00172 } else {
00173 fIsConnectionOpen = true;
00174 }
00175 }
00176 }
00177 return connection;
00178 }
00179
00180
00181 void MySQLAppender::close()
00182 {
00183 flushBuffer();
00184 closeConnection();
00185 this->closed = true;
00186 }
00187
00188 static void ReplaceVariable(TString &string, const char *var)
00189 {
00190
00191 TString spec;
00192 const char *varValue = gSystem->Getenv(var);
00193 if (!varValue) {
00194
00195 spec = var;
00196 if (spec == "REQUESTID") {
00197 spec.Form("%d",gSystem->GetPid());
00198 varValue= spec.Data();
00199 } else if (spec == "JOBINDEX") {
00200 spec.Form("%d",0);
00201 varValue= spec.Data();
00202 }
00203 }
00204
00205 if (varValue) {
00206 TString fullName = "$"; fullName += var;
00207
00208 string.ReplaceAll(fullName,varValue);
00209 }
00210 }
00211
00212 void MySQLAppender::flushBuffer()
00213 {
00214
00215
00216 static bool TaskEntryDone = false;
00217 std::list<spi::LoggingEventPtr>::iterator i;
00218 if ( getConnection()) {
00219 for (i = buffer.begin(); i != buffer.end(); i++)
00220 {
00221 TString expandCommand;
00222 String sql;
00223 if (!TaskEntryDone) {
00224
00226 expandCommand =
00227
00228 #ifdef OLDTABLE
00229 "INSERT DELAYED IGNORE TaskDescription (TaskDescriptionID, TaskRequestID_MD5, TaskSize, TaskRemainSize, EntryTime, UpdateTime, TaskUser,TaskDescription,TaskCredential,BrokerID)"
00230 " VALUES ( DEFAULT, \"$REQUESTID\", \"$SUMS_nProcesses\",\"$SUMS_nProcesses\",\"$SUBMIT_TIME\",DEFAULT,\"$SUMS_USER\",\"$SUMS_name\",\"$SUMS_AUTHENTICATED_USER\",\"SUMS\");";
00231 #else
00232 "INSERT DELAYED IGNORE Tasks (taskID, brokerTaskID, taskName, taskSize, taskRemainSize, submitTime, updateTime, requesterID,taskDescription)"
00233 " VALUES ( DEFAULT, \"$REQUESTID\", \"Short name of task\", \"$SUMS_nProcesses\",\"$SUMS_nProcesses\",\"$SUBMIT_TIME\",DEFAULT,\"$SUMS_USER\",\"$SUMS_name\");";
00234 #endif
00235
00236
00237
00238
00239
00240
00241
00242 ReplaceVariable(expandCommand, "REQUESTID");
00243 ReplaceVariable(expandCommand, "SUMS_nProcesses");
00244 ReplaceVariable(expandCommand, "SUBMIT_TIME");
00245
00246 ReplaceVariable(expandCommand, "SUMS_name");
00247 ReplaceVariable(expandCommand, "SUMS_USER");
00248 ReplaceVariable(expandCommand, "SUMS_AUTHENTICATED_USER");
00249 sql = expandCommand.Data();
00250 if (!execute(sql)) TaskEntryDone = true;
00251 }
00252
00253 if (TaskEntryDone) {
00254
00255
00256
00257
00258 #ifdef OLDTABLE
00259 expandCommand ="INSERT DELAYED IGNORE INTO JobDescription SET ";
00260
00261 expandCommand += "TaskDescriptionID = (SELECT TaskDescriptionID FROM TaskDescription WHERE TaskRequestID_MD5=\"$REQUESTID\")";
00262 expandCommand += ", ";
00263 expandCommand += "TaskRequestID_MD5=\"$REQUESTID\"";
00264 expandCommand += ", ";
00265 expandCommand += "BrokerProcessID=\"$JOBINDEX\"";
00266 expandCommand += ", ";
00267 expandCommand += "JobLocationURL=\"$HOSTNAME\"";
00268 expandCommand += ", ";
00269 expandCommand += "JobUser=\"$USER\"";
00270 expandCommand += "; ";
00271 #else
00272 expandCommand ="INSERT DELAYED IGNORE INTO Jobs SET ";
00273
00274 expandCommand += "taskID = (SELECT taskID FROM Tasks WHERE brokerTaskID=\"$REQUESTID\")";
00275 expandCommand += ", ";
00276 expandCommand += "brokerJobID=\"$JOBINDEX\"";
00277 expandCommand += ", ";
00278 expandCommand += "startTime=NOW()";
00279 expandCommand += ", ";
00280 expandCommand += "nodeLocation=\"$HOSTNAME\"";
00281 expandCommand += ", ";
00282 expandCommand += "stateID=\"4\"";
00283 expandCommand += ", ";
00284 expandCommand += "executionUserName=\"$USER\"";
00285 expandCommand += "; ";
00286 #endif
00287
00288
00289
00290
00291
00292
00293
00294 ReplaceVariable(expandCommand, "USER");
00295 ReplaceVariable(expandCommand, "HOSTNAME");
00296 ReplaceVariable(expandCommand, "REQUESTID");
00297 ReplaceVariable(expandCommand, "JOBINDEX");
00298 sql = expandCommand.Data();
00299 if (!execute(sql) ) {
00300
00301
00302
00303 const LoggingEventPtr& logEvent = *i;
00304 String sql = getLogStatement(logEvent);
00305 expandCommand = sql.c_str();
00306
00307 ReplaceVariable(expandCommand, "REQUESTID");
00308 ReplaceVariable(expandCommand, "JOBINDEX");
00309
00310 sql = expandCommand.Data();
00311 if (!execute(sql)) {
00312 #ifdef NEWTABLE_EXPANSION
00313 expandCommand = "UPDATE LOW_PRIORITY IGNORE Jobs SET updateTime=NOW() WHERE brokerJobID=\"$JOBINDEX\"" AND taskID=(SELECT taskID FROM Tasks WHERE brokerTaskID=\"$REQUESTID\");";
00314 ReplaceVariable(expandCommand, "REQUESTID");
00315 ReplaceVariable(expandCommand, "JOBINDEX");
00316 if (execute(sql)) {
00317 fprintf(stderr," MYSQL ----> can not update the Jobs record%s \n", expandCommand.c_str());
00318 }
00319 #endif
00320 } else {
00321
00322 fprintf(stderr," MYSQL ----> skip and lose event \n");
00323 }
00324 }
00325 }
00326 }
00327 buffer.clear();
00328 }
00329 closeConnection();
00330 }
00331
00332
00333 void MySQLAppender::setSql(const String& s)
00334 {
00335 sqlStatement = s;
00336 if (getLayout() == 0)
00337 {
00338 this->setLayout(new PatternLayout(s));
00339 }
00340 else
00341 {
00342 PatternLayoutPtr patternLayout = this->getLayout();
00343 if (patternLayout != 0)
00344 {
00345 patternLayout->setConversionPattern(s);
00346 }
00347 }
00348 }
00349 #if (STAR_LOG4CXX_VERSION == 10)
00350
00351 void MySQLAppender::append(const spi::LoggingEventPtr& event, log4cxx::helpers::Pool& p)
00352 {
00353 append(event);
00354 }
00355 #endif
00356 #endif //HAVE_MySQL