StRoot  1
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Groups Pages
StDbServiceBroker.cxx
1 #ifndef NoXmlTreeReader
2 #include "StDbServiceBroker.h"
3 #include "ChapiStringUtilities.h"
4 #include "mysql.h"
5 #include "math.h"
6 #include <string.h>
7 #include <libxml/nanohttp.h>
8 #include <sys/stat.h>
9 #include <sys/types.h>
10 #include <unistd.h>
11 #ifndef __STDB_STANDALONE__
12 #include "StMessMgr.h"
13 #else
14 #define LOG_DEBUG cout
15 #define LOG_INFO cout
16 #define LOG_WARN cout
17 #define LOG_ERROR cerr
18 #define LOG_FATAL cerr
19 #define LOG_QA cout
20 #define endm "\n"
21 #endif
22 #include <algorithm>
23 using namespace std;
24 using namespace chapi_string_utilities;
25 
26 typedef vector<string>::const_iterator VCI;
27 
28 using st_db_service_broker::MyScatalogVersion;
29 using st_db_service_broker::NO_ERROR;
30 using st_db_service_broker::NO_USER;
31 using st_db_service_broker::NO_DOMAIN;
32 using st_db_service_broker::NO_HOSTS;
33 using st_db_service_broker::NightBegins;
34 using st_db_service_broker::NightEnds;
35 
36 using stl_xml_tree::sep;
37 
38 string nn[] =
39  {
40  "Scatalog",
41  "Site",
42  "Server",
43  "Host",
44  "Access"
45  };
46 
47 enum
48  {
49  SCATALOG,
50  SITE,
51  SERVER,
52  HOST,
53  ACCESS
54  };
55 
56 string NAME = "name";
57 string PORT = "port";
58 string USER = "user";
59 string SCOPE = "scope";
60 string WHEN_ACTIVE = "whenActive";
61 string ACCESS_MODE = "accessMode";
62 string WRITER = "writer";
63 string POWER = "machinePower";
64 string CAP = "cap"; // max # connections to a server
65 string INTERACTIVE = "interactive";
66 
67 
68 static MYSQL *conn;
69 
70 char* Socket = 0;
71 
72 
74 StDbServiceBroker::StDbServiceBroker(const string xmlbase) :
75  MyHostList(vector<ChapiDbHost>()),
76  MyStatus(st_db_service_broker::NO_ERROR)
77 {
78  last_succeeded_connect_time = time(NULL);
79  seconds_to_reach_for_connect = 1800;
80  char* secs = 0;
81  secs = getenv("STAR_DEBUG_DB_RETRIES_SECONDS");
82  if (secs) {
83  seconds_to_reach_for_connect = atoi(secs);
84  }
85 
86  char* whoami = getenv("USER");
87  if (!whoami) whoami = getenv("LOGNAME");
88  if (!whoami)
89  {
90  MyStatus = NO_USER;
91  return;
92  }
93 
94  const char* access_mode = getenv("DB_ACCESS_MODE");
95  if (!access_mode)
96  {
97  access_mode = "read";
98  }
99 
100  StlXmlTree* f = new StlXmlTree();
101  string ScatalogKey = sep+nn[SCATALOG];
102  f->InsertKeyValuePair(ScatalogKey, MyScatalogVersion);
103 
104  string QualifiedScatalog = sep + StlXmlTree::QualifyParent(nn[SCATALOG],MyScatalogVersion);
105  string ServerKey = QualifiedScatalog + sep + nn[SERVER];
106 
107  struct tm *tp;
108  time_t timeNow;
109  timeNow = time(NULL);
110  tp = localtime(&timeNow);
111  short hour = tp->tm_hour;
112  string DayOrNight;
113  if (hour<NightBegins && hour >NightEnds)
114  {
115  DayOrNight = "day";
116  }
117  else
118  {
119  DayOrNight = "night";
120  }
121 
122  // test for the interactive user session
123  std::string isInteractive = "no";
124  char* testInteractive = 0;
125  testInteractive = getenv ("INTERACTIVE");
126 
127  if (testInteractive) { // result would be either "yes" or "no", all lowercase
128  isInteractive = testInteractive;
129  if (isInteractive == "1" || isInteractive == "YES" || isInteractive == "yes"
130  || isInteractive == "Yes" || isInteractive == "true" || isInteractive == "TRUE") {
131  isInteractive = "yes";
132  } else {
133  isInteractive = "no";
134  };
135  }
136 
137  string ServerAttr = USER+"="+(string)whoami+";"+WHEN_ACTIVE+"="+DayOrNight+";"+ACCESS_MODE+"="+access_mode+";"+INTERACTIVE+"="+isInteractive;
138  if (strcmp(whoami,"starreco")==0)
139  {
140  ServerAttr = SCOPE+"=Production;"+ServerAttr;
141  }
142  else
143  {
144  ServerAttr = SCOPE+"=Analysis;"+ServerAttr;
145  }
146  f->InsertKeyValuePair(ServerKey, ServerAttr);
147 
148 #ifdef DEBUG
149  LOG_INFO << " Filter XML "<<endm;
150  f->ShowTree();
151 #endif
152 
153  ParsedXml = StlXmlTree(xmlbase,f);
154  if (ParsedXml.GetStatus()==stl_xml_tree::NO_XML_BASE)
155  {
156  MyStatus = st_db_service_broker::NO_XML_BASE;
157  LOG_WARN <<"StDbServiceBroker::StDbServiceBroker: no XML description of services found "<<endm;
158  }
159  if (ParsedXml.GetStatus()==stl_xml_tree::BAD_XML)
160  {
161  MyStatus = st_db_service_broker::BAD_XML;
162  LOG_WARN <<"StDbServiceBroker::StDbServiceBroker: malformed/incorrect XML description of services found "<<endm;
163  }
164 
165 #ifdef DEBUG
166  LOG_INFO << " Parsed XML "<<endm;
167  ParsedXml.ShowTree();
168 #endif
169 
170  xmlCleanupParser();
171  delete f;
172  FormHostList();
173 }
175 StDbServiceBroker::StDbServiceBroker
176 (const string xmlbase, const string xmlfilter) :
177  MyHostList(vector<ChapiDbHost>()),
178  MyStatus(st_db_service_broker::NO_ERROR)
179 {
180  last_succeeded_connect_time = time(NULL);
181  seconds_to_reach_for_connect = 1800;
182  char* secs = 0;
183  secs = getenv("STAR_DEBUG_DB_RETRIES_SECONDS");
184  if (secs) {
185  seconds_to_reach_for_connect = atoi(secs);
186  }
187 
188 
189  StlXmlTree* f = new StlXmlTree(xmlfilter);
190  f->ShowTree();
191  ParsedXml = StlXmlTree(xmlbase,f);
192 
193  if (ParsedXml.GetStatus()==stl_xml_tree::NO_XML_BASE)
194  {
195  MyStatus = st_db_service_broker::NO_XML_BASE;
196  LOG_WARN <<"StDbServiceBroker::StDbServiceBroker: no XML description of services found "<<endm;
197  }
198  if (ParsedXml.GetStatus()==stl_xml_tree::BAD_XML)
199  {
200  MyStatus = st_db_service_broker::BAD_XML;
201  LOG_WARN <<"StDbServiceBroker::StDbServiceBroker: malformed/incorrect XML description of services found "<<endm;
202  }
203 
204  ParsedXml.ShowTree();
205  xmlCleanupParser();
206  delete f;
207  FormHostList();
208 }
210 void StDbServiceBroker::DoLoadBalancing()
211 {
212  if (MyStatus!=st_db_service_broker::NO_ERROR)
213  {
214  LOG_ERROR << " StDbServiceBroker::DoLoadBalancing() errors" <<MyStatus<<endm;
215  }
216 #ifdef DEBUG
217  PrintHostList();
218 #endif
219  bool host_found = false;
220  const int MAX_COUNT = 500;
221  last_succeeded_connect_time = time(NULL);
222  for (int i = 0; i < MAX_COUNT; i++) {
223  if (!RecommendHost()) {
224  host_found = true;
225  break;
226  }
227  LOG_WARN << "Load Balancing attempt No " << i << ". Scanned all hosts in a Load Balancer list, no usable hosts found. Will try again in 60 seconds." << endm;
228  sleep(60);
229  }
230  if (!host_found) {
231  LOG_WARN << "Tried to find a host for " << MAX_COUNT << " times, will abort now." << endm;
232  exit(0);
233  }
234 
235 #ifdef DEBUG
236  cout << " go to "<<GiveHostName()<<" port "<<GiveHostPort()<<"\n";
237 #endif
238 }
240 void StDbServiceBroker::FormHostList()
241 {
242  /*
243 We expect:
244 -- a single SCATALOG version
245 -- possibly access to multiple sites
246 -- obviously multiple hosts
247  */
248 
249  MyHostList.clear();
250  string key = StlXmlTree::MakeKey("",nn[SCATALOG]);
251  vector<string> versions = ParsedXml.LookUpValueByKey(key);
252 
253  if (versions.size()!=1) return;
254 
255  string my_version = versions[0];
256 
257  vector<string> services = ParsedXml.LookUpValueByKey(key,my_version,nn[SERVER]);
258 
259  for (VCI ii = services.begin(); ii!=services.end(); ++ii)
260  {
261  vector<string> hosts = ParsedXml.LookUpValueByKey(key,*ii,nn[HOST]);
262 
263  for (VCI iii = hosts.begin(); iii!=hosts.end(); ++iii)
264  {
265  map<string,string> host_data = StlXmlTree::ParseAttributeString(*iii);
266 
267  int port;
268  if (!from_string<int>(port,host_data[PORT],std::dec))
269  {
270  // error: non-numeric port string
271  port = DefaultPort;
272  }
273 
274  double power;
275  short cap;
276 
277  if (!from_string<double>(power,host_data[POWER],std::dec))
278  {
279  // error: non-numeric port string
280 #ifdef DEBUG
281  LOG_ERROR << "StDbServiceBroker::FormHostList(): non-numeric port, using default power for host "<<*iii<<endm;
282 #endif
283  power = DefaultPower;
284  }
285 
286 
287  if (!from_string<short>(cap,host_data[CAP],std::dec))
288  {
289  // error: non-numeric cap string
290 #ifdef DEBUG
291  LOG_ERROR << "StDbServiceBroker::FormHostList(): non-numeric cap, using default cap for host "<<*iii<<endm;
292 #endif
293  cap = DefaultCap;
294  }
295 
296  ChapiDbHost host_entry = ChapiDbHost(host_data[NAME],port,power,cap);
297  MyHostList.push_back(host_entry);
298  }
299  cut_string_after_sub(key,">");
300  cut_string_after_sub(key,"(");
301  }
302 
303 #ifdef DEBUG
304  PrintHostList();
305 #endif
306 
307  if (MyHostList.size()==0)
308  {
309  MyStatus = NO_HOSTS;
310  LOG_DEBUG<<" StDbServiceBroker::RecommendHost() will have no hosts to choose from !"<<endm;
311  return;
312  }
313 
314 }
316 void StDbServiceBroker::PrintHostList()
317 {
318  LOG_DEBUG << " MyHostList contains:"<<endm;
319  for (vector<ChapiDbHost>::const_iterator i = MyHostList.begin(); i!=MyHostList.end(); ++i)
320  {
321  LOG_DEBUG << (*i).HostName << endm;
322  }
323 }
324 
325 namespace {
326 
327 const std::string currentDateTime() {
328  time_t now = time(0);
329  struct tm tstruct;
330  char buf[80];
331  tstruct = *localtime(&now);
332  strftime(buf, sizeof(buf), "%Y-%m-%d.%X", &tstruct);
333  return buf;
334 }
335 
336 }
337 
338 void StDbServiceBroker::SendEmail(time_t timediff) {
339 
340  std::string admin_emails;
341  char* admins = getenv("STAR_DEBUG_DB_RETRIES_ADMINS");
342  if (!admins) { return; }
343  char* hostname = getenv("HOSTNAME");
344  pid_t mypid = getpid();
345  std::string host = "unknown host";
346  if (hostname) { host = hostname; }
347  admin_emails = admins;
348 
349  // check modification ts of the marker file ( email auto-suppression )
350  struct stat attrib;
351  int res = stat( "/tmp/db_network_error.txt", &attrib );
352 
353  // create or re-create marker file file
354  std::ofstream marker_file( "/tmp/db_network_error.txt" );
355  if ( marker_file.is_open() ) {
356  marker_file << time(0);
357  marker_file.close();
358  chmod( "/tmp/db_network_error.txt", 0666 );
359  }
360 
361  if ( res == 0 && ( ( time(0) - attrib.st_mtime ) < seconds_to_reach_for_connect ) ) {
362  // if original file time was modified recently, do nothing
363  return;
364  }
365 
366  std::string curtime = currentDateTime();
367  std::stringstream exec_command;
368  exec_command << "echo \"We waited for " << timediff << " seconds (threshold: "<< seconds_to_reach_for_connect <<"), and did not get a db connection at " << host
369  << " at " << curtime << ", process id = " << mypid << "\" | /bin/mail -s \"DB RETRIES - Problem detected on "<< host << " at " << curtime << "\" " << admin_emails;
370  system(exec_command.str().c_str());
371 
372 }
373 
375 int StDbServiceBroker::RecommendHost()
376 {
377  double dproc_min = HUGE_VAL;
378 
379  // default initialization for vector iterator
380  MyBestHost = MyHostList.end();
381 
382  if (MyHostList.size()==1)
383  {
384  MyBestHost = MyHostList.begin();
385  return 0;
386  }
387 
388  srand ( unsigned ( time (NULL) ) );
389  random_shuffle( MyHostList.begin(), MyHostList.end() );
390 
391  int scanned_hosts = 0;
392  bool host_found = false;
393  for (vector<ChapiDbHost>::const_iterator I=MyHostList.begin(); I!=MyHostList.end(); ++I)
394  {
395  conn = mysql_init(0);
396 
397  if (conn==0)
398  {
399  LOG_WARN << "StDbServiceBroker::RecommendHost() mysql_init(0) failed "<<endm;
400  continue;
401  }
402 
403  conn->options.connect_timeout = 30;
404 
405  if (mysql_real_connect
406  (conn,((*I).HostName).c_str(), "loadbalancer","lbdb","test",(*I).Port,Socket,0)==NULL)
407  {
408  LOG_WARN << "StDbServiceBroker::RecommendHost() mysql_real_connect "<<
409  conn << " "<<((*I).HostName).c_str()<<" "<<(*I).Port <<" did not succeed"<<endm;
410  mysql_close(conn);
411  time_t timediff = time(NULL) - last_succeeded_connect_time;
412  if ( timediff > seconds_to_reach_for_connect) { // default: 1800
413  last_succeeded_connect_time = time(NULL);
414  SendEmail(timediff);
415  }
416  continue;
417  } else {
418  last_succeeded_connect_time = time(NULL);
419  }
420 
421  if (mysql_query(conn, "show status like \"%Threads_running\"") != 0 )
422  {
423  LOG_WARN <<"StDbServiceBroker::RecommendHost() show processlist failed"<<endm;
424  continue;
425  }
426 
427  unsigned int nproc = 0;
428 
429 
430  MYSQL_RES* res_set = mysql_store_result(conn);
431  if (res_set==0)
432  {
433  LOG_WARN << "StDbServiceBroker::RecommendHost(): mysql_store_result failed"<<endm;
434  }
435  else
436  {
437  MYSQL_ROW row = mysql_fetch_row(res_set);
438  if (row==0)
439  {
440  LOG_WARN << "StDbServiceBroker::RecommendHost(): mysql_fetch_row failed"<<endm;
441  }
442  else
443  {
444  if(!from_string<unsigned int>(nproc,row[1],std::dec))
445  {
446  LOG_WARN << "StDbServiceBroker::RecommendHost(): mysql_fetch_row returns non-numeric"<<endm;
447  }
448  }
449  mysql_free_result(res_set);
450  }
451 
452 
453  // unsigned int nproc = mysql_num_rows(res_set);
454 
455 
456 
457  double dproc = nproc/(*I).Power;
458 #ifdef DEBUG
459  LOG_DEBUG <<" Server "<<((*I).HostName).c_str()<< " actual "<< nproc << " effective "<< dproc <<" processes "<<endm;
460 #endif
461  mysql_close(conn);
462 
463 
464  if ( dproc<dproc_min && nproc<(*I).Cap ) {
465  dproc_min = dproc;
466  MyBestHost = I;
467  host_found = true;
468  }
469  scanned_hosts += 1;
470 
471  if (scanned_hosts > 2 && host_found == true ) {
472  LOG_INFO << "StDbLib: db server found in " << scanned_hosts << " iterations" << endm;
473  return 0;
474  }
475  }
476 
477  if ( MyBestHost != MyHostList.end() ) {
478  // found good host
479  return 0;
480  }
481  // no hosts found..
482  return 1;
483 }
485 string StDbServiceBroker::GiveHostName()
486 {
487  return (*MyBestHost).HostName;
488 }
490 short StDbServiceBroker::GiveHostPort()
491 {
492  return (*MyBestHost).Port;
493 }
495 
496 int StDbServiceBroker::updateLocalLbPolicy()
497 {
498 // MLK: this method will be most likely run as a static method through a root
499 // call by a specially created user once a day.
500 
501  string writableDir;
502  string dbLoadBalancerConfig;
503 
504  const char* loConfig = getenv("DB_SERVER_LOCAL_CONFIG");
505  dbLoadBalancerConfig = loConfig ? loConfig : "";
506 
507  if (!loConfig)
508  {
509  LOG_ERROR << "StDbManagerImpl::updateLocalLbPolicy(): DB_SERVER_LOCAL_CONFIG is undefined! "<<endm;
510  return lb_error::NO_LPD_ENV_VAR;
511  }
512  else
513  {
514  string::size_type last_slash = dbLoadBalancerConfig.find_last_of("/");
515  writableDir = dbLoadBalancerConfig.substr(0,last_slash+1);
516 
517 
518  struct stat dir_status;
519  if (stat(writableDir.c_str(),&dir_status)==0)
520  {
521  if (dir_status.st_mode & S_IWUSR)
522  {
523  // OK
524  }
525  else
526  {
527  LOG_ERROR << "StDbManagerImpl::lookUpServers() "<<writableDir<<" is not writable"<<endm;
528  return lb_error::NO_WRITE_PERMISSION;
529  }
530  }
531  else
532  {
533  LOG_ERROR << "StDbManagerImpl::lookUpServers() invalid dir "<<writableDir <<endm;
534  return lb_error::NO_LPD_DIR;
535  }
536 
537  }
538 
540 // do we need to fetch XML from the Web ?
541 
542  struct stat file_status;
543  bool fetchWorldConfig = true;
544 
545  if (stat(dbLoadBalancerConfig.c_str(), &file_status) == 0)
546  {
547  /* A STAR site has an option to state that they do not want to use global XML. They do that by
548  1) creating in a specific, group-writeable directory's WorldConfig file an element <Site>
549  2) making sure a <Site> element with name "World" does NOT exist in that file
550  3) the file has to be error-free XML
551  These conditions can be fullfilled by e.g. having <Site name="MyLittleWorld" /> as the only
552  Site element in that file. In that case, global XML from the Web will not be downloaded to that
553  group-writeable directory.
554  */
555 
556  StlXmlTree myLittleWorldCheck = StlXmlTree(dbLoadBalancerConfig.c_str());
557 #ifdef DEBUG
558  LOG_INFO << "myLittleWorldCheck is:"<<endm;
559  myLittleWorldCheck.ShowTree();
560 #endif
561  if (myLittleWorldCheck.GetStatus()==stl_xml_tree::NO_ERROR)
562  {
563  string key = StlXmlTree::MakeKey("","Scatalog");
564 
565  vector<string> versions = myLittleWorldCheck.LookUpValueByKey(key);
566 
567  string my_version = versions[0];
568 
569  vector<string> sites = myLittleWorldCheck.LookUpValueByKey(key,my_version,"Site");
570 
571  vector<string>::const_iterator I = sites.begin();
572  bool WorldNotFound = true;
573  if (sites.size()==0)
574  {
575  // protection is invalid, no Site element
576  WorldNotFound = false;
577  }
578  while( I!=sites.end() && WorldNotFound)
579  {
580  if (StlXmlTree::AttributesContain(*I,"name","World"))
581  {
582  WorldNotFound = false;
583  }
584  ++I;
585  }
586 
587  if (WorldNotFound)
588  {
589 #ifdef DEBUG
590  LOG_INFO <<"StDbManagerImpl::updateLocalLbPolicy() protection against WWW XML is activated"<<endm;
591 #endif
592  fetchWorldConfig = false;
593  }
594  else
595  {
596 #ifdef DEBUG
597  LOG_INFO<<"StDbManagerImpl::updateLocalLbPolicy() we found World, the user wants to read from the Web"<<endm;
598 #endif
599  }
600 
601  if (fetchWorldConfig)
602  {
603  time_t glbModTime = file_status.st_mtime;
604  time_t nowTime = time(0);
605  if (nowTime-glbModTime<60)
606  {
607  fetchWorldConfig = false;
608  }
609  }
610  }
611  else
612  {
613  LOG_ERROR << "StDbManagerImpl::updateLocalLbPolicy() invalid XML file "<<dbLoadBalancerConfig <<endm;
614  }
615  }
616  else
617  {
618  LOG_ERROR << "StDbManagerImpl::lookUpServers(): config file " << dbLoadBalancerConfig << " is invalid "<< endm;
619  }
620 
621  if (fetchWorldConfig)
622  {
623  // try AFS first
624  const char* glConfig = getenv("DB_SERVER_GLOBAL_CONFIG");
625  const string dbLoadBalancerWorldAFS = glConfig ? glConfig : "";
626 
627  if (!glConfig)
628  {
629  LOG_ERROR << "StDbManagerImpl::updateLocalLbPolicy(): DB_SERVER_GLOBAL_CONFIG is undefined! "<<endm;
630  }
631  else
632  {
633 
634  struct stat file_status;
635  if (stat(dbLoadBalancerWorldAFS.c_str(), &file_status) == 0)
636  {
637  system(("/bin/cp "+dbLoadBalancerConfig+" "+dbLoadBalancerConfig+".old").c_str());
638  system(("/bin/cp "+dbLoadBalancerWorldAFS+" "+dbLoadBalancerConfig).c_str());
639  system(("/bin/chmod u+w "+dbLoadBalancerConfig).c_str());
640  return lb_error::NO_ERROR;
641  }
642  else
643  {
644  LOG_ERROR << "StDbManagerImpl::updateLocalLbPolicy(): DB_SERVER_GLOBAL_CONFIG points to invalid file! "<<endm;
645  }
646 
647  }
648 
649  // try WWW
650  const char* wwwConfig = getenv("DB_SERVER_GLOBAL_CONFIG_URL");
651  const string dbLoadBalancerWorldURL = wwwConfig ? wwwConfig : "";
652 
653  if (!wwwConfig)
654  {
655  LOG_ERROR << "StDbManagerImpl::updateLocalLbPolicy(): DB_SERVER_GLOBAL_CONFIG_URL is undefined! "<<endm;
656  return lb_error::NO_GPD_ENV_VAR;
657  }
658 
659 #ifdef DEBUG
660  LOG_INFO <<"StDbManagerImpl::updateLocalLbPolicy() fetching world config "<<endm;
661 #endif
662  system(("cp "+dbLoadBalancerConfig+" "+dbLoadBalancerConfig+".old").c_str());
663  const char* proxy = getenv("http_proxy");
664  if (proxy)
665  {
666  xmlNanoHTTPScanProxy(proxy);
667  }
668 
669  int ret = xmlNanoHTTPFetch(dbLoadBalancerWorldURL.c_str(), dbLoadBalancerConfig.c_str(), 0);
670 
671  if (ret!=0)
672  {
673  LOG_ERROR << "StDbManagerImpl::updateLocalLbPolicy() xmlNanoHTTPFetch error "<<ret<<endm;
674  return lb_error::WWW_ERROR;
675  }
676  else
677  {
678  system(("chmod u+w "+dbLoadBalancerConfig).c_str());
679  return lb_error::NO_ERROR;
680  }
681  }
682 return lb_error::NO_ERROR;
683 
684 }
685 
686 
687 #endif