00001 #include <stdio.h>
00002 #include <ctype.h>
00003 #include <sys/stat.h>
00004 #include <fcntl.h>
00005 #include <string.h>
00006 #include <sys/mman.h>
00007 #include <unistd.h>
00008 #include <stdlib.h>
00009 #include <errno.h>
00010 #include <time.h>
00011 #include <arpa/inet.h>
00012 #include <pwd.h>
00013 #include <assert.h>
00014 #include <iccp2k.h>
00015
00016 #ifdef __linux__
00017 #include <sched.h>
00018
00019
00020
00021 #define madvise(x,y,z)
00022
00023 #else
00024 #include <procfs.h>
00025 #endif
00026
00027
00028
00029
00030 #include <rtsLog.h>
00031 #include <daqFormats.h>
00032 #include <iccp.h>
00033 #include <SFS/sfs_index.h>
00034 #include <rts.h>
00035
00036 #include "daqReader.h"
00037 #include "msgNQLib.h"
00038 #include "cfgutil.h"
00039 #include "daq_det.h"
00040
00041 #ifndef MADV_DONTNEED
00042 #define madvise(x,y,z)
00043 #endif
00044
00045
00046 u_int evp_daqbits ;
00047
00048
00049 static const char cvs_id_string[] = "$Id: daqReader.cxx,v 1.47 2012/05/11 09:30:55 tonko Exp $" ;
00050
00051 static int evtwait(int task, ic_msg *m) ;
00052 static int ask(int desc, ic_msg *m) ;
00053
00054 DATAP *getlegacydatap(char *mem, int bytes);
00055
00056
00057
00058 static const char *getCommand(void) ;
00059
00060
00061
00062
00063
00064 daqReader::daqReader(char *mem, int size)
00065 {
00066
00067
00068 init();
00069 input_type = pointer;
00070
00071 data_memory = mem;
00072 data_size = size;
00073
00074 crit_cou = 0;
00075
00076
00077 }
00078
00079 daqReader::daqReader(char *name)
00080 {
00081 struct stat64 stat_buf ;
00082
00083 init();
00084
00085 if(name == NULL) {
00086 input_type = live;
00087 isevp = 1;
00088 if(reconnect() < 0) status = EVP_STAT_CRIT;
00089 return ;
00090 }
00091
00092
00093 strcpy(fname, name);
00094
00095
00096
00097
00098 if(stat64(fname, &stat_buf) < 0) {
00099 LOG(CRIT,"Can't stat \"%s\" [%s]",fname,strerror(errno),0,0,0);
00100 status = EVP_STAT_CRIT;
00101 sleep(1);
00102 return;
00103 }
00104
00105 LOG(NOTE,"Using file \"%s\"...",fname,0,0,0,0) ;
00106
00107
00108 if(stat_buf.st_mode & S_IFDIR) {
00109 LOG(DBG,"Running through a directory %s...",fname,0,0,0,0) ;
00110 input_type = dir;
00111
00112
00113
00114
00115
00116
00117
00118
00119 status = EVP_STAT_OK ;
00120 return ;
00121 }
00122
00123
00124 file_size = stat_buf.st_size ;
00125 evt_offset_in_file = 0;
00126 input_type = file;
00127
00128 strcpy(file_name,fname) ;
00129
00130
00131 LOG(DBG,"Running through a file %s of %d bytes",fname,file_size,0,0,0) ;
00132
00133 desc = open64(fname,O_RDONLY,0444) ;
00134 if(desc < 0) {
00135 LOG(ERR,"Can't open %s [%s]",fname,strerror(errno),0,0,0) ;
00136 sleep(1) ;
00137 return ;
00138 }
00139
00140 status = EVP_STAT_OK ;
00141
00142 return ;
00143 }
00144
00145 int daqReader::getDetectorSize(const char *det)
00146 {
00147 if (!sfs) return 0;
00148
00149 SfsDirsize sz;
00150
00151 sfs->getDirSize((char *)det, &sz);
00152
00153
00154 return (int)sz.size;
00155 }
00156
00157 void daqReader::init()
00158 {
00159
00160 EVP_HOSTNAME = (char *)_EVP_HOSTNAME;
00161 static char evp_hostname[100];
00162 char *str = getenv((char *)"EVP_HOSTNAME");
00163
00164 if(str) {
00165 strcpy(evp_hostname, str);
00166 EVP_HOSTNAME = evp_hostname;
00167 }
00168
00169
00170 #ifndef RTS_ONLINE
00171
00172
00173
00174
00175
00176
00177
00178
00179
00180 rtsLogOutput(RTS_LOG_STDERR) ;
00181 LOG(INFO,"%s",cvs_id_string) ;
00182 #endif
00183
00184
00185
00186 LOG(DBG,"EVP_HOSTNAME set to %s", EVP_HOSTNAME);
00187
00188
00189 isevp = 0;
00190 fname[0] = '\0';
00191 memmap = new MemMap();
00192 sfs = new sfs_index();
00193 sfs_lastevt = 0;
00194
00195 runconfig = (rccnf *) valloc(sizeof(rccnf)) ;
00196 memset(runconfig,0,sizeof(rccnf));
00197
00198 memset(dets,0,sizeof(dets)) ;
00199 memset(pseudo_dets,0,sizeof(pseudo_dets));
00200
00201
00202 do_open = 1 ;
00203 do_mmap = 1 ;
00204 strcpy(evp_disk,"") ;
00205
00206 desc = -1 ;
00207
00208
00209
00210 issued = 0 ;
00211 last_issued = time(NULL) ;
00212 status = EVP_STAT_CRIT ;
00213
00214
00215 input_type = none;
00216
00217 event_number = 0 ;
00218 total_events = 0 ;
00219 readall_reset();
00220 bytes = 0 ;
00221
00222 file_size = 0 ;
00223
00224 status = EVP_STAT_OK ;
00225
00226 return ;
00227 }
00228
00229 int daqReader::setOpen(int flg)
00230 {
00231 int ret ;
00232
00233 ret = do_open ;
00234 do_open = flg ;
00235
00236 return ret ;
00237 }
00238
00239 int daqReader::setMmap(int flg)
00240 {
00241 int ret ;
00242
00243 ret = do_mmap ;
00244 do_mmap = flg ;
00245
00246 return ret ;
00247 }
00248
00249 int daqReader::setLog(int flg)
00250 {
00251
00252 if(flg) {
00253 rtsLogOutput(RTS_LOG_STDERR|RTS_LOG_NET) ;
00254 }
00255 else {
00256 rtsLogOutput(RTS_LOG_NET) ;
00257 }
00258
00259 return 0 ;
00260 }
00261
00262
00263 char *daqReader::setEvpDisk(char *name)
00264 {
00265 char *saved = _static_str_return_;
00266
00267 strcpy(saved, evp_disk) ;
00268 strncpy(evp_disk,name,sizeof(evp_disk)-1) ;
00269
00270 return saved ;
00271 }
00272
00273 daqReader::~daqReader(void)
00274 {
00275 LOG(DBG,"Destructor %s",fname,0,0,0,0) ;
00276
00277
00278 if(desc >= 0) close(desc) ;
00279
00280
00281 if(input_type == live) {
00282 msgNQDelete(evpDesc) ;
00283 }
00284
00285 if(memmap) delete(memmap);
00286 if(sfs) delete(sfs) ;
00287 if(runconfig) free(runconfig) ;
00288
00289
00290 for(int i=0;i<32;i++) {
00291 if(dets[i]) {
00292 LOG(DBG,"Will destruct det %d",i) ;
00293 delete(dets[i]) ;
00294 }
00295 if(pseudo_dets[i]) {
00296 LOG(DBG,"Will destruct pseudo det %d",i) ;
00297 delete(pseudo_dets[i]) ;
00298 }
00299 }
00300
00301
00302 return ;
00303 }
00304
00305
00306
00307
00308
00309
00310
00311
00312
00313
00314
00315
00316
00317
00318 char *daqReader::get(int num, int type)
00319 {
00320
00321
00322 int ret ;
00323
00324
00325
00326
00327 memset(trgIds, 0xffffffff, sizeof(trgIds));
00328 trgIdsSet = 0;
00329 trgIdsNotPresent = 0;
00330
00331 event_number++;
00332 LOG(DBG, "processing nth event. n=%d from %s",event_number, getInputType());
00333
00334
00335 evb_type = 0 ;
00336 evb_type_cou = 0 ;
00337 evb_cou = 0 ;
00338 run = 0 ;
00339
00340
00341
00342 int delay = getStatusBasedEventDelay();
00343 if(delay) {
00344 if(delay == -1) {
00345 LOG(CRIT, "Exiting because of critical errors");
00346 exit(0);
00347 }
00348
00349 LOG(DBG, "Delay of %d usec because of previous event status %d",delay, status);
00350 usleep(delay);
00351 }
00352
00353
00354 if(memmap->mem) {
00355 memmap->unmap();
00356 }
00357
00358 if((input_type == dir) ||
00359 (input_type == live)) {
00360
00361 if(getNextEventFilename(num, type) < 0) {
00362 event_number--;
00363 return NULL;
00364 }
00365
00366 if(openEventFile() < 0) {
00367 if(input_type == dir) {
00368 LOG(NOTE,"No File, but didn't see token 0 - stopping...",event_number) ;
00369 status = EVP_STAT_EOR ;
00370 event_number--;
00371
00372 }
00373 return NULL;
00374 }
00375 }
00376
00377
00378
00379
00380
00381
00382
00383
00384
00385
00386
00387
00388
00389
00390
00391
00392
00393
00394 if(input_type == pointer) {
00395 if(event_memory == NULL) event_memory = data_memory;
00396 }
00397
00398
00399
00400
00401 repeat:
00402
00403 int error = getEventSize();
00404
00405 if(status == EVP_STAT_LOG) {
00406
00407
00408
00409
00410 LOG(DBG, "Skipping a non-event SFS file...");
00411
00412 lseek64(desc, event_size, SEEK_CUR);
00413 evt_offset_in_file += event_size;
00414 event_size = 0;
00415 goto repeat;
00416 }
00417
00418
00419
00420
00421
00422 int padchecks = 0;
00423 for(;;) {
00424 padchecks++;
00425 int inc = addToEventSize(event_size);
00426
00427 if(inc == 0) {
00428 LOG(DBG, "No extra increment... event size=%d",event_size);
00429 break;
00430 }
00431
00432 LOG(WARN, "Found a padding bug. Adding %d to event size",inc);
00433 event_size += inc;
00434
00435 if(padchecks > 5) {
00436 LOG(ERR, "Error finding next event...");
00437 status = EVP_STAT_EOR;
00438 return NULL;
00439 }
00440 }
00441
00442 LOG(DBG, "Event size is %d (%d) %d",event_size, error, status);
00443
00444 if(status == EVP_STAT_EOR) {
00445 LOG(DBG, "Status = EOR");
00446 return NULL;
00447 }
00448
00449 if(error == 0) {
00450 status = EVP_STAT_EOR;
00451 return NULL;
00452 }
00453
00454 if(error < 0) {
00455 status = EVP_STAT_EVT;
00456 return NULL;
00457 }
00458
00459 if((event_size + evt_offset_in_file) > file_size) {
00460 LOG(WARN,"This event is truncated... Good events %d [%d+%d > %d]...", total_events,evt_offset_in_file,event_size,file_size,0) ;
00461 if((input_type == pointer) ||
00462 (input_type == dir)) {
00463 status = EVP_STAT_EVT ;
00464 }
00465 else {
00466 status = EVP_STAT_EOR ;
00467 }
00468
00469 return NULL;
00470 }
00471
00472
00473 if(input_type != pointer) {
00474 LOG(DBG, "Mapping event file %s, offset %d, size %d",
00475 file_name, evt_offset_in_file, event_size);
00476
00477 char *mapmem = memmap->map(desc, evt_offset_in_file, event_size);
00478 if(!mapmem) {
00479 LOG(CRIT, "Error mapping memory for event");
00480 exit(0);
00481 }
00482 }
00483
00484 LOG(DBG, "Event is now in memory: start=0x%x, length=%d",memmap->mem,event_size);
00485
00486
00487 if(input_type == pointer) {
00488 if(run == 0) {
00489 LOG(DBG, "Does this ever get called?");
00490 run = readall_run;
00491 }
00492 }
00493
00494 LOG(DBG, "about to mount sfs file: %s %d 0x%x",file_name, evt_offset_in_file, sfs);
00495
00496
00497
00498 LOG(DBG, "mounting single dir(mem): off=%d sz=%d",evt_offset_in_file, event_size);
00499 ret = sfs->mountSingleDirMem(memmap->mem, event_size, evt_offset_in_file);
00500
00501 if(ret < 0) {
00502 LOG(ERR, "Error mounting sfs?");
00503 status = EVP_STAT_EVT;
00504 return NULL;
00505 }
00506
00507
00508 fs_dir *fsdir = sfs->opendir("/");
00509 for(;;) {
00510 fs_dirent *ent = sfs->readdir(fsdir);
00511 if(!ent) {
00512 sfs->closedir(fsdir);
00513 LOG(ERR, "Error finding event directory in sfs?");
00514
00515
00516 status = EVP_STAT_EVT;
00517 return NULL;
00518 }
00519
00520 LOG(DBG, "does dir (%s) satisfy '/#' or '/nnnn'",ent->full_name);
00521
00522 if(memcmp(ent->full_name, "/#", 2) == 0) {
00523 LOG(DBG, "change sfs dir to %s",ent->full_name);
00524
00525 seq = atoi(&ent->full_name[2]);
00526
00527 sfs->cd(ent->full_name);
00528 sfs->closedir(fsdir);
00529 break;
00530 }
00531
00532 if(allnumeric(&ent->full_name[1])) {
00533 seq = atoi(&ent->full_name[1]);
00534 sfs->cd(ent->full_name);
00535 sfs->closedir(fsdir);
00536 break;
00537 }
00538
00539
00540 LOG(DBG, "SFS event directory not yet found: %s",ent->full_name);
00541 }
00542
00543 fs_dirent *datap = sfs->opendirent("legacy");
00544 if(datap) {
00545 mem = memmap->mem + datap->offset;
00546 LOG(DBG, "Event has a datap bank at 0x%x",mem);
00547 }
00548 else {
00549 mem = NULL;
00550 LOG(DBG, "Event has no DATAP bank");
00551 }
00552
00553
00554 bytes = 0;
00555 if(mem) {
00556 bytes = event_size - (mem - memmap->mem);
00557 LOG(DBG, "size = %d %d",event_size, bytes);
00558 }
00559
00560
00561
00562 run = 0;
00563 fs_dirent *lrhd_ent = sfs->opendirent("lrhd");
00564 if(lrhd_ent) {
00565 char *lrhd_buff = memmap->mem + lrhd_ent->offset;
00566 LOGREC *lrhd_rec = (LOGREC *)lrhd_buff;
00567 run = lrhd_rec->lh.run;
00568
00569 if(lrhd_rec->lh.byte_order != 0x04030201) {
00570 run = swap32(run);
00571 }
00572 }
00573
00574
00575
00576
00577 SummaryInfo info;
00578 fs_dirent *summary = sfs->opendirent("EventSummary");
00579 if(summary) {
00580 char *buff = memmap->mem + summary->offset;
00581 fillSummaryInfo(&info,(gbPayload *)buff);
00582 copySummaryInfoIn(&info);
00583 }
00584 else {
00585 LOG(DBG, "No EventSummary, search for legacy datap");
00586 summary = sfs->opendirent("legacy");
00587 if(!summary) {
00588 LOG(DBG, "No EventSummary and no DATAP... hacking summary info");
00589 hackSummaryInfo();
00590 }
00591 else {
00592 char *buff = memmap->mem + summary->offset;
00593 fillSummaryInfo(&info,(DATAP *)buff);
00594 copySummaryInfoIn(&info);
00595 }
00596 }
00597
00598
00599 status = EVP_STAT_OK ;
00600
00601 total_events++ ;
00602
00603
00604
00605 long long int endpos = lseek64(desc, 0, SEEK_CUR);
00606
00607 long long int nexteventpos = lseek64(desc, event_size, SEEK_CUR) ;
00608
00609 LOG(DBG,"End of event: start_offset=%d end_offset=%lld, file size %lld",endpos,nexteventpos,file_size) ;
00610
00611 evt_offset_in_file = nexteventpos;
00612
00613
00614
00615
00616 fs_dirent *esum = sfs->opendirent("EvbSummary");
00617 if(esum) {
00618 LOG(DBG, "We've got an EvbSummary");
00619 char *buff = (char *)memmap->mem + esum->offset;
00620 EvbSummary *evbsum = (EvbSummary *)buff;
00621 detsinrun = evbsum->detectorsInRun;
00622 evpgroupsinrun = 0xffffffff;
00623 }
00624 else {
00625 LOG(DBG, "No EvbSummary Record");
00626 if(run != (unsigned int)runconfig->run) {
00627 char rccnf_file[256];
00628
00629 runconfig->run = 0;
00630
00631
00632 if(input_type == live) {
00633 sprintf(rccnf_file,"%s%s/%d/0",evp_disk,_evp_basedir_,run) ;
00634 }
00635 else if(input_type == dir) {
00636 sprintf(rccnf_file,"%s/0",fname);
00637 }
00638 else if (input_type == file) {
00639 sprintf(rccnf_file,"rccnf_%d.txt",run);
00640 }
00641
00642 if(input_type != pointer) {
00643 if(getRccnf(rccnf_file, runconfig) < 0) {
00644 LOG(DBG, "No runconfig file %s",rccnf_file,0,0,0,0);
00645 }
00646 }
00647
00648 if(runconfig->run == 0) {
00649 detsinrun = 0xffffffff;
00650 evpgroupsinrun = 0xffffffff;
00651 }
00652 else {
00653 detsinrun = runconfig->detMask;
00654 evpgroupsinrun = runconfig->grpMask;
00655 }
00656 }
00657 }
00658
00659
00660
00661
00662 detector_bugs = 0;
00663
00664 if(detectors & (1 << TPX_ID)) {
00665 if((run >= 13114025) && (run < 13130030)) {
00666 if(trgcmd == 4) {
00667
00668 for(int s=1;s<=24;s++) {
00669 for(int r=1;r<=6;r++) {
00670
00671 if((s==5) && (r==1)) continue ;
00672 if((s==6) && (r==1)) continue ;
00673 if((s==7) && (r==1)) continue ;
00674 if((s==14) && (r==3)) continue ;
00675 if((s==21) && (r==1)) continue ;
00676 if((s==22) && (r==2)) continue ;
00677
00678
00679 char name[32] ;
00680
00681 sprintf(name,"tpx/sec%02d/cld%02d",s,r) ;
00682 if(!get_sfs_name(name)) {
00683 detectors &= ~(1<<TPX_ID) ;
00684 LOG(WARN,"run %d, seq %d -- removing TPX due to FY12 UU future-protection bug",run,seq) ;
00685
00686 detector_bugs |= (1<<TPX_ID) ;
00687 goto bug_check_done ;
00688 }
00689 }
00690 }
00691 }
00692 }
00693 bug_check_done:;
00694 }
00695
00696
00697
00698
00699
00700
00701
00702
00703
00704
00705
00706
00707
00708
00709
00710
00711
00712
00713
00714
00715
00716
00717
00718
00719
00720
00721 return (char *)this;
00722 }
00723
00724
00725 int daqReader::addToEventSize(int sz)
00726 {
00727 if(input_type == pointer) return 0;
00728
00729 long long int orig_offset = lseek64(desc, 0, SEEK_CUR);
00730
00731 LOG(DBG, "orig_offset = %lld sz=%d",orig_offset,sz);
00732
00733 lseek64(desc, sz, SEEK_CUR);
00734
00735 char buff[10];
00736 int ret = read(desc, buff, 8);
00737 if(ret == 0) {
00738 lseek64(desc, orig_offset, SEEK_SET);
00739 return 0;
00740 }
00741
00742 if(memcmp(buff, "LRHD",4) == 0) {
00743 lseek64(desc, orig_offset, SEEK_SET);
00744 return 0;
00745 }
00746
00747 if(memcmp(buff, "DATAP",4) == 0) {
00748 lseek64(desc, orig_offset, SEEK_SET);
00749 return 0;
00750 }
00751
00752 if(memcmp(buff, "SFS", 3) == 0) {
00753 lseek64(desc, orig_offset, SEEK_SET);
00754 return 0;
00755 }
00756
00757 if(memcmp(buff, "FILE", 4) == 0) {
00758 lseek64(desc, orig_offset, SEEK_SET);
00759 return 0;
00760 }
00761
00762 LOG(DBG, "buff = %c%c%c off=%lld",buff[0],buff[1],buff[2], orig_offset);
00763
00764 lseek64(desc, orig_offset, SEEK_SET);
00765
00766
00767 return 8192;
00768 }
00769
00770 int daqReader::getEventSize()
00771 {
00772 MemMap headermap;
00773 char *m;
00774 int swap;
00775 int padding_check;
00776
00777 int ret = -1;
00778 long long int offset = 0;
00779 long long int space_left;
00780
00781 status = EVP_STAT_OK;
00782 event_size = 0;
00783
00784 if(input_type == pointer) {
00785 m = event_memory;
00786 space_left = data_size - (m - data_memory);
00787 }
00788 else {
00789 offset = lseek64(desc, 0, SEEK_CUR);
00790 space_left = file_size - offset;
00791
00792 if(space_left > 1024) space_left = 1024;
00793
00794 LOG(DBG, "Space left = %d",space_left);
00795
00796 if(space_left == 0) return 0;
00797 if(space_left < (long long int)sizeof(LOGREC)) return -1;
00798
00799 m = headermap.map(desc, offset, space_left);
00800 if(!m) {
00801 LOG(ERR, "Error mapping header information");
00802 return -1;
00803 }
00804 }
00805
00806
00807 if(space_left == 0) return 0;
00808 if(space_left < (long long int)sizeof(LOGREC)) {
00809 LOG(NOTE, "File truncated: only %lld bytes left",space_left);
00810 status = EVP_STAT_EOR;
00811 goto done;
00812 }
00813
00814 LOG(DBG, "OFFSET = %lld", offset);
00815
00816
00817 padding_check=0;
00818 while((memcmp(m, "LRHD", 4) != 0) &&
00819 (memcmp(m, "DATAP", 5) != 0)) {
00820
00821 LOG(DBG, "Event starts with %c%c%c%c%c not LRHD or DATAP. Check if sfs file...",m[0],m[1],m[2],m[3],m[4]);
00822
00823 sfs_index *tmp_sfs = new sfs_index();
00824 int sz = tmp_sfs->getSingleDirSize(file_name, evt_offset_in_file);
00825
00826 LOG(DBG, "single dir size = %d",sz);
00827
00828
00829 if(sz > 0) {
00830 tmp_sfs->mountSingleDir(file_name, evt_offset_in_file);
00831
00832 int satisfy=0;
00833
00834
00835 fs_dir *fsdir = tmp_sfs->opendir("/");
00836 fs_dirent *ent = tmp_sfs->readdir(fsdir);
00837 if(!ent) {
00838 tmp_sfs->closedir(fsdir);
00839 LOG(ERR, "Error finding event directory in sfs?");
00840
00841
00842 status = EVP_STAT_EVT;
00843 sz = -1;
00844 goto done;
00845 }
00846
00847 LOG(DBG, "does dir (%s) satisfy '/#' or '/nnnn'",ent->full_name);
00848
00849 if(memcmp(ent->full_name, "/#", 2) == 0) {
00850 satisfy = 1;
00851 }
00852
00853 if(allnumeric(&ent->full_name[1])) {
00854 satisfy = 1;
00855 }
00856
00857 tmp_sfs->closedir(fsdir);
00858
00859 if(satisfy == 0) {
00860 status = EVP_STAT_LOG;
00861 }
00862
00863 tmp_sfs->umount();
00864 }
00865
00866 delete tmp_sfs;
00867
00868 if(sz < 0) {
00869 LOG(ERR, "Event starts with %c%c%c%c%c not LRHD or DATAP and not a SFS file... bad event",m[0],m[1],m[2],m[3],m[4]);
00870
00871 status = EVP_STAT_EVT;
00872 goto done;
00873 }
00874
00875 event_size = sz;
00876 ret = 0;
00877 goto done;
00878 }
00879
00880
00881
00882
00883 if(memcmp(m, "DATAP", 5) == 0) {
00884 DATAP *datap = (DATAP *)m;
00885 int swap = (datap->bh.byte_order == 0x04030201) ? 0 : 1;
00886
00887 event_size = qswap32(swap, datap->len) * 4;
00888 ret = 0;
00889 goto done;
00890 }
00891
00892
00893
00894 LOGREC *lrhd;
00895
00896 for(;;) {
00897 if(memcmp(m, "LRHD", 4) == 0) {
00898 lrhd = (LOGREC *)m;
00899
00900 LOG(DBG, "record_type = %c%c%c%c",
00901 lrhd->record_type[0],
00902 lrhd->record_type[1],
00903 lrhd->record_type[2],
00904 lrhd->record_type[3]);
00905
00906 if(memcmp(lrhd->record_type, "DATA", 4) == 0) break;
00907
00908 if(memcmp(lrhd->record_type, "ENDR", 4) == 0) {
00909
00910 LOG(DBG, "Got ENDR record");
00911
00912 status = EVP_STAT_EOR;
00913 goto done;
00914 }
00915
00916 LOG(DBG, "Skipping LRHD record type %c%c%c%c",
00917 lrhd->record_type[0],
00918 lrhd->record_type[1],
00919 lrhd->record_type[2],
00920 lrhd->record_type[3]);
00921
00922 space_left -= sizeof(LOGREC);
00923 offset += sizeof(LOGREC);
00924 m += sizeof(LOGREC);
00925 event_size += sizeof(LOGREC);
00926
00927 if(space_left < (long long int)sizeof(LOGREC)) {
00928 LOG(NOTE, "File truncated: only %lld bytes left",space_left);
00929 status = EVP_STAT_EOR;
00930 goto done;
00931 }
00932
00933 }
00934 else {
00935 LOG(DBG, "Corrupted headers: %c%c%c%c%c",
00936 m[0],m[1],m[2],m[3],m[4]);
00937
00938 goto done;
00939 }
00940 }
00941
00942
00943 ret = 0;
00944
00945
00946 swap = (lrhd->lh.byte_order == 0x04030201) ? 0 : 1;
00947 event_size += qswap32(swap,lrhd->length) * 4;
00948
00949 done:
00950
00951 if(ret == 0) {
00952 ret = event_size;
00953 }
00954
00955 headermap.unmap();
00956 return ret;
00957 }
00958
00959
00960 char *daqReader::getInputType()
00961 {
00962 switch(input_type) {
00963 case none:
00964 return "none";
00965 case pointer:
00966 return "pointer";
00967 case file:
00968 return "file";
00969 case live:
00970 return "live";
00971 case dir:
00972 return "dir";
00973 }
00974
00975 return "null";
00976 }
00977
00978 int daqReader::openEventFile()
00979 {
00980 struct stat64 stat_buf ;
00981
00982
00983 if(desc > 0) {
00984 close(desc);
00985 desc = -1;
00986 }
00987
00988 errno = 0 ;
00989
00990 desc = open64(file_name,O_RDONLY,0666) ;
00991 if(desc < 0) {
00992 LOG(NOTE,"Error opening file %s [%s] - skipping...",file_name,strerror(errno),0,0,0) ;
00993 status = EVP_STAT_EVT ;
00994 return -1;
00995 }
00996
00997
00998 int ret = stat64(file_name,&stat_buf) ;
00999 if(ret < 0) {
01000 LOG(ERR,"Can't stat %s",file_name,0,0,0,0) ;
01001 status = EVP_STAT_EVT ;
01002 close(desc) ;
01003 desc = -1 ;
01004 return -1;
01005 }
01006
01007 file_size = stat_buf.st_size ;
01008 evt_offset_in_file = 0;
01009 bytes = 0 ;
01010 return 0;
01011 }
01012
01013 int daqReader::getNextEventFilename(int num, int type)
01014 {
01015 if((event_number != 1) && (token == 0)) {
01016 LOG(DBG,"Previous event (%d) was Token 0 in directory - stopping...",event_number,0,0,0,0) ;
01017 status = EVP_STAT_EOR ;
01018 if(input_type == live) {
01019 event_number = 1;
01020 }
01021 return -1;
01022 }
01023
01024 if(input_type == dir) {
01025 return getNextEventFilenameFromDir(num);
01026 }
01027 else if (input_type == live) {
01028 return getNextEventFilenameFromLive(type);
01029 }
01030 else {
01031 LOG(ERR, "Wrong input type");
01032 return -1;
01033 }
01034 }
01035
01036 int daqReader::copySummaryInfoIn(SummaryInfo *info)
01037 {
01038
01039 token = info->token;
01040 evt_time = info->evt_time;
01041 detectors = info->detectors;
01042 daqbits_l1 = info->daqbits_l1;
01043 daqbits_l2 = info->daqbits_l2;
01044 evpgroups = info->evpgroups;
01045 daqbits = info->daqbits;
01046 evp_daqbits = info->evp_daqbits;
01047
01048
01049
01050 trgword = info->trgword;
01051 trgcmd = info->trgcmd;
01052 daqcmd = info->daqcmd;
01053
01054 memcpy(L1summary, info->L1summary, sizeof(L1summary));
01055 memcpy(L2summary, info->L2summary, sizeof(L2summary));
01056 memcpy(L3summary, info->L3summary, sizeof(L3summary));
01057
01058
01059 daqbits64 = ((u_longlong)L3summary[1]) << 32;
01060 daqbits64 += L3summary[0];
01061 daqbits64_l1 = ((u_longlong)L1summary[1]) << 32;
01062 daqbits64_l1 += L1summary[0];
01063 daqbits64_l2 = ((u_longlong)L2summary[1]) << 32;
01064 daqbits64_l2 += L2summary[0];
01065
01066 return 0;
01067 }
01068
01069
01070 int daqReader::hackSummaryInfo()
01071 {
01072
01073 token = 0;
01074 evt_time = 0;
01075 detectors = 0;
01076 daqbits_l1 = 0;
01077 daqbits_l2 = 0;
01078 evpgroups = 0;
01079 daqbits = 0;
01080 evp_daqbits = 0;
01081
01082 daqbits64 = 0ll;
01083 daqbits64_l1 = 0ll;
01084 daqbits64_l2 = 0ll;
01085
01086
01087 trgword = 0;
01088 trgcmd = 0;
01089 daqcmd = 0;
01090
01091 memset(L1summary, 0, sizeof(L1summary));
01092 memset(L2summary, 0, sizeof(L2summary));
01093 memset(L3summary, 0, sizeof(L3summary));
01094
01095 return 0;
01096 }
01097
01098 int daqReader::fillSummaryInfo(SummaryInfo *info, gbPayload *pay)
01099 {
01100
01101
01102 LOG(DBG, "pay=0x%x",pay);
01103
01104 u_int version = pay->gbPayloadVersion;
01105
01106 LOG(NOTE, "version = 0x%x", version);
01107
01108 if(((version & 0xff000000) != 0xda000000) && ((b2h32(version) & 0x000000ff ) != 0x40)) {
01109 LOG(NOTE, "gbPayload version 0x10");
01110
01111 gbPayload_0x01 *pv = (gbPayload_0x01 *)pay;
01112 LOG(NOTE, "gbPayload 0x01: v#=0x%x",b2h32(version));
01113 return fillSummaryInfo_v01(info, pv);
01114 }
01115
01116 if(((version & 0xff000000) != 0xda000000) && ((b2h32(version) & 0x000000ff ) == 0x40)) {
01117 LOG(NOTE, "gbPayload version 0x01a");
01118 gbPayload_0x01a *pv = (gbPayload_0x01a *)pay;
01119 LOG(NOTE, "gbPayload 0x01a: v#=0x%x", b2h32(version));
01120 return fillSummaryInfo_v01a(info, pv);
01121 }
01122
01123 if(version == GB_PAYLOAD_VERSION) {
01124 LOG(NOTE, "gbPayload 0x02: v#=0x%x",b2h32(version));
01125 return fillSummaryInfo_v02(info, pay);
01126 }
01127
01128 LOG(ERR, "gbPayload Version: 0x%x not known", version);
01129 return -1;
01130 }
01131
01132
01133 int daqReader::fillSummaryInfo_v02(SummaryInfo *info, gbPayload *pay) {
01134
01135
01136 LOG(NOTE, "gbPayloadVersion=0x%x, trgVersion=0x%x", pay->gbPayloadVersion, pay->EventDescriptor.TrgDataFmtVer);
01137
01138 info->token = l2h32(pay->token);
01139 info->evt_time = l2h32(pay->sec);
01140 info->detectors = l2h32(pay->rtsDetMask);
01141 info->daqbits_l1 = l2h32(pay->L1summary[0]);
01142 info->daqbits_l2 = l2h32(pay->L2summary[0]);
01143 info->evpgroups = l2h32(pay->L3summary[3]);
01144 info->daqbits = l2h32(pay->L3summary[0]);
01145 info->evp_daqbits = daqbits;
01146
01147
01148 info->trgword = b2h16(pay->EventDescriptor.TriggerWord);
01149 info->trgcmd = pay->EventDescriptor.actionWdTrgCommand;
01150 info->daqcmd = pay->EventDescriptor.actionWdDaqCommand;
01151
01152 for(int i=0;i<2;i++) info->L1summary[i] = l2h32(pay->L1summary[i]);
01153 for(int i=0;i<2;i++) info->L2summary[i] = l2h32(pay->L2summary[i]);
01154 for(int i=0;i<4;i++) info->L3summary[i] = l2h32(pay->L3summary[i]);
01155
01156 return 0;
01157 }
01158
01159 int daqReader::fillSummaryInfo_v01a(SummaryInfo *info, gbPayload_0x01a *pay)
01160 {
01161 LOG(NOTE, "gbPayloadVersion=0xda000001, trgVersion=0x%x", pay->EventDescriptor.TrgDataFmtVer);
01162
01163
01164 info->token = l2h32(pay->token);
01165 info->evt_time = l2h32(pay->sec);
01166 info->detectors = l2h32(pay->rtsDetMask);
01167 info->daqbits_l1 = l2h32(pay->L1summary[0]);
01168 info->daqbits_l2 = l2h32(pay->L2summary[0]);
01169 info->evpgroups = l2h32(pay->L3summary[2]);
01170 info->daqbits = l2h32(pay->L3summary[0]);
01171 info->evp_daqbits = daqbits;
01172
01173
01174 info->trgword = b2h16(pay->EventDescriptor.TriggerWord);
01175 info->trgcmd = pay->EventDescriptor.actionWdTrgCommand;
01176 info->daqcmd = pay->EventDescriptor.actionWdDaqCommand;
01177
01178 for(int i=0;i<2;i++) info->L1summary[i] = l2h32(pay->L1summary[i]);
01179 for(int i=0;i<2;i++) info->L2summary[i] = l2h32(pay->L2summary[i]);
01180 for(int i=0;i<4;i++) info->L3summary[i] = l2h32(pay->L3summary[i]);
01181
01182 return 0;
01183 }
01184
01185 int daqReader::fillSummaryInfo_v01(SummaryInfo *info, gbPayload_0x01 *pay)
01186 {
01187 LOG(NOTE, "gbPayloadVersion=0xda000001, trgVersion=0x%x", pay->EventDescriptor.TrgDataFmtVer);
01188
01189
01190 info->token = l2h32(pay->token);
01191 info->evt_time = l2h32(pay->sec);
01192 info->detectors = l2h32(pay->rtsDetMask);
01193 info->daqbits_l1 = l2h32(pay->L1summary[0]);
01194 info->daqbits_l2 = l2h32(pay->L2summary[0]);
01195 info->evpgroups = l2h32(pay->L3summary[2]);
01196 info->daqbits = l2h32(pay->L3summary[0]);
01197 info->evp_daqbits = daqbits;
01198
01199
01200 info->trgword = b2h16(pay->EventDescriptor.TriggerWord);
01201 info->trgcmd = pay->EventDescriptor.actionWdTrgCommand;
01202 info->daqcmd = pay->EventDescriptor.actionWdDaqCommand;
01203
01204 for(int i=0;i<2;i++) info->L1summary[i] = l2h32(pay->L1summary[i]);
01205 for(int i=0;i<2;i++) info->L2summary[i] = l2h32(pay->L2summary[i]);
01206 for(int i=0;i<4;i++) info->L3summary[i] = l2h32(pay->L3summary[i]);
01207
01208 return 0;
01209 }
01210
01211
01212 int daqReader::fillSummaryInfo(SummaryInfo *info, DATAP *datap)
01213 {
01214 int swap = (datap->bh.byte_order == 0x04030201) ? 0 : 1;
01215
01216 info->token = qswap32(swap, datap->bh.token);
01217 info->evt_time = qswap32(swap, datap->time);
01218 info->detectors = qswap32(swap, datap->detector);
01219 info->seq = qswap32(swap, datap->seq);
01220 info->daqbits_l1 = qswap32(swap, datap->TRG_L1_summary[0]);
01221 info->daqbits_l2 = qswap32(swap, datap->TRG_L2_summary[0]);
01222 info->evpgroups = qswap32(swap, datap->L3_Summary[2]);
01223 info->trgword = qswap32(swap, datap->trg_word);
01224
01225 info->trgcmd = (qswap32(swap, datap->trg_in_word) >> 12) & 0xF ;
01226 info->daqcmd = (qswap32(swap, datap->trg_in_word) >> 8) & 0xF ;
01227
01228
01229 info->daqbits = qswap32(swap, datap->L3_Summary[0]);
01230 info->evp_daqbits = daqbits;
01231
01232 for(int i=0;i<2;i++) info->L1summary[i] = qswap32(swap, datap->TRG_L1_summary[i]);
01233 for(int i=0;i<2;i++) info->L2summary[i] = qswap32(swap, datap->TRG_L2_summary[i]);
01234 for(int i=0;i<4;i++) info->L3summary[i] = qswap32(swap, datap->L3_Summary[i]);
01235
01236 return 0;
01237 }
01238
01239
01240
01241
01242
01243
01244 int daqReader::getNextEventFilenameFromDir(int eventNum)
01245 {
01246 LOG(DBG, "Getting next event from dir: event_number=%d eventNum=%d",event_number, eventNum);
01247
01248 if(eventNum==0) eventNum = event_number;
01249 sprintf(file_name,"%s/%d",fname,eventNum) ;
01250
01251 LOG(DBG, "Getting next event from dir: event_number=%s",file_name);
01252
01253 event_number = eventNum ;
01254 return STAT_OK;
01255 }
01256
01257 int daqReader::getNextEventFilenameFromLive(int type)
01258 {
01259 int ret;
01260 ic_msg m ;
01261
01262
01263
01264
01265 if(issued) {
01266 if((time(NULL) - last_issued) > 10) {
01267 LOG(DBG,"Re-issueing request...",0,0,0,0,0) ;
01268 issued = 0 ;
01269 }
01270 }
01271
01272 if(!issued) {
01273
01274 m.ld.dword[0] = htonl(type) ;
01275
01276 LOG(DBG, "dword[0] is type=%d",type);
01277
01278 ret = ask(evpDesc,&m) ;
01279 if(ret != STAT_OK) {
01280
01281 LOG(ERR,"Queue error %d - recheck EVP...",ret,0,0,0,0) ;
01282 reconnect() ;
01283 status = EVP_STAT_EVT ;
01284 return -1;
01285 }
01286 issued = 1 ;
01287 last_issued = time(NULL) ;
01288 }
01289
01290
01291
01292
01293 if(!readall_rundone) {
01294 int timedout=0;
01295 ret = evtwait(evpDesc, &m) ;
01296
01297
01298 issued = 0;
01299
01300
01301 if((ret != STAT_OK) && (ret != STAT_TIMED_OUT)) {
01302 reconnect() ;
01303 LOG(ERR,"Queue error %d - recheck EVP...",ret,0,0,0,0) ;
01304 status = EVP_STAT_EVT ;
01305 return -1;
01306 }
01307
01308 if(ret == STAT_TIMED_OUT) {
01309 timedout = 1;
01310
01311 #ifdef __linux
01312 sched_yield() ;
01313 #else
01314 yield() ;
01315 #endif
01316
01317 LOG(DBG, "Waiting 1 second, no event yet...");
01318 usleep(100000) ;
01319 status = EVP_STAT_OK ;
01320 return -1;
01321 }
01322
01323 LOG(DBG, "m.head.status = %d",m.head.status);
01324
01325
01326
01327 switch(m.head.status) {
01328 case STAT_SEQ:
01329 {
01330
01331 LOG(DBG,"End of Run!",0,0,0,0,0) ;
01332
01333 status = EVP_STAT_EOR ;
01334 return -1;
01335 }
01336 break;
01337
01338 case STAT_OK:
01339 {
01340
01341
01342 if(!timedout) {
01343 evb_type = ntohl(m.ld.dword[2]) ;
01344 evb_type_cou = ntohl(m.ld.dword[3]) ;
01345 evb_cou = ntohl(m.ld.dword[4]) ;
01346 run = ntohl(m.ld.dword[0]) ;
01347 readall_run = run;
01348 readall_lastevt = ntohl(m.ld.dword[1]);
01349 strcpy(_evp_basedir_, (char *)&m.ld.dword[5]);
01350
01351 event_number = readall_lastevt;
01352
01353 sprintf(_last_evp_dir_, "%s%s/%d",evp_disk,_evp_basedir_,run);
01354 }
01355 }
01356 break;
01357
01358 default:
01359 {
01360 if(!timedout) {
01361 LOG(WARN,"Event in error - not stored...",0,0,0,0,0) ;
01362 status = EVP_STAT_EVT ;
01363 return -1;
01364 }
01365 }
01366 break;
01367 }
01368 }
01369
01370
01371
01372
01373
01374
01375
01376
01377
01378
01379
01380
01381
01382
01383
01384
01385
01386
01387
01388
01389
01390
01391
01392
01393
01394 sprintf(file_name,"%s/%d",_last_evp_dir_, event_number) ;
01395
01396 struct stat64 s;
01397 if(stat64(file_name, &s) < 0) {
01398 LOG(DBG, "No file %s, try _DELETE",file_name);
01399 sprintf(file_name,"%s_DELETE/%d",_last_evp_dir_,event_number);
01400 }
01401
01402 LOG(DBG,"Live Event: file->%s",file_name,0,0,0,0) ;
01403 return 0;
01404 }
01405
01406 int daqReader::getOfflineId(int bit)
01407 {
01408 if(trgIdsNotPresent) return -1;
01409
01410 if(trgIdsSet) {
01411 return trgIds[bit];
01412 }
01413
01414 trgIdsSet = 1;
01415
01416 int ret = sfs->read("TRGID", (char *)trgIds, sizeof(trgIds));
01417
01418 if(ret != sizeof(trgIds)) {
01419 LOG(ERR, "Can't find TRGID bank, can't get the offline id");
01420 memset(trgIds, 0xffffffff, sizeof(trgIds));
01421 trgIdsNotPresent = 1;
01422 }
01423
01424 return trgIds[bit];
01425 }
01426
01428
01429 char *daqReader::get_sfs_name(char *right)
01430 {
01431 if(sfs == 0) return 0 ;
01432
01433 if(right == 0) right = "/" ;
01434
01435 fs_dirent *d = sfs->opendirent(right) ;
01436 if(d == 0) return 0 ;
01437
01438 LOG(DBG,"opendirent(%s) returns %s as full name, %s as d_name ",right,d->full_name,d->d_name) ;
01439 return d->full_name ;
01440
01441 }
01442
01443 #if 0 // unused
01444
01445
01446
01447
01448 static u_int parse_det_string(const char *list)
01449 {
01450 u_int mask = 0 ;
01451
01452
01453
01454
01455
01456 reparse:;
01457
01458 for(int i=0;i<32;i++) {
01459 const char *name = rts2name(i) ;
01460
01461 if(name==0) continue ;
01462
01463
01464
01465 if(strncasecmp(list,name,strlen(name))==0) {
01466
01467 mask |= (1<<i) ;
01468 break ;
01469 }
01470 }
01471
01472
01473 int got_space = 0 ;
01474 while(*list != 0) {
01475 if(*list == ' ') got_space = 1 ;
01476 list++ ;
01477
01478 if(got_space) goto reparse ;
01479
01480 } ;
01481
01482
01483 return mask ;
01484 }
01485
01486 #endif
01487
01488 daq_det *daqReader::det(const char *which)
01489 {
01490 assert(which) ;
01491
01492
01493
01494 for(int i=0;i<DAQ_READER_MAX_DETS;i++) {
01495
01496 if(dets[i]) {
01497 if(strcasecmp(which, dets[i]->name)==0) return dets[i] ;
01498 }
01499 if(pseudo_dets[i]) {
01500 if(strcasecmp(which, pseudo_dets[i]->name)==0) return pseudo_dets[i] ;
01501 }
01502 }
01503
01504 LOG(NOTE,"det %s not yet created... attempting through factory...",which) ;
01505 int id = -1000 ;
01506
01507
01508 for(int i=0;i<DAQ_READER_MAX_DETS;i++) {
01509 const char *name = rts2name(i) ;
01510 if(name == 0) continue ;
01511
01512 if(strcasecmp(name,which)==0) {
01513
01514 dets[i] = daq_det_factory::make_det(i) ;
01515 dets[i]->managed_by(this) ;
01516
01517 return dets[i] ;
01518 }
01519 }
01520
01521
01522 if(strcasecmp(which,"emc_pseudo")==0) id = -BTOW_ID ;
01523 if(strcasecmp(which,"hlt")==0) id = -L3_ID ;
01524
01525 if(id < -32) {
01526 LOG(CRIT,"Requested det \"%s\" not created -- check spelling!",which) ;
01527 assert(!"UNKNOWN DET") ;
01528 return 0 ;
01529 }
01530
01531 int wh = -id ;
01532 pseudo_dets[wh] = daq_det_factory::make_det(id) ;
01533 pseudo_dets[wh]->managed_by(this) ;
01534
01535 return pseudo_dets[wh] ;
01536 }
01537
01538 void daqReader::insert(class daq_det *which, int id)
01539 {
01540 assert(which) ;
01541 LOG(DBG,"calling insert(%d): name %s",id,which->name) ;
01542
01543 if((id>=0) && (id<DAQ_READER_MAX_DETS)) {
01544 dets[id] = which ;
01545 return ;
01546 }
01547 else if(id <0) {
01548 id *= -1 ;
01549 if(id >= DAQ_READER_MAX_DETS) ;
01550 else {
01551 pseudo_dets[id] = which ;
01552 return ;
01553 }
01554 }
01555
01556 LOG(ERR,"rts_id %d out of bounds for %s",id,which->name) ;
01557
01558 }
01559
01560 void daqReader::Make()
01561 {
01562 for(int i=0;i<DAQ_READER_MAX_DETS;i++) {
01563 if(dets[i]) {
01564 LOG(DBG,"Calling %s make",dets[i]->name) ;
01565 dets[i]->Make() ;
01566 }
01567 }
01568 }
01569
01570 void daqReader::de_insert(int id)
01571 {
01572 LOG(DBG,"calling de_insert(%d)",id) ;
01573
01574 if((id>=0) && (id<DAQ_READER_MAX_DETS)) {
01575 if(dets[id]) {
01576 LOG(DBG,"Should destruct %d?",id) ;
01577 }
01578 dets[id] = 0 ;
01579 return ;
01580 }
01581 else if(id < 0) {
01582 id *= -1 ;
01583 if(id >= DAQ_READER_MAX_DETS) ;
01584 else {
01585 if(pseudo_dets[id]) LOG(DBG,"Should destruct %d?",id) ;
01586 pseudo_dets[id] = 0 ;
01587 return ;
01588 }
01589
01590 }
01591
01592 LOG(ERR,"rts_id %d out of bounds",id) ;
01593 }
01594
01595
01596
01597
01598
01599
01600
01601
01602
01603
01604
01605
01606
01607
01608
01609
01610
01611
01612
01613
01614
01615
01616
01617
01618
01619
01620
01621
01622
01623
01624
01625
01626
01627
01628
01629
01630
01631
01632
01633
01634
01635
01636
01637
01638
01639
01640
01641
01642
01643
01644
01645
01646
01647
01648
01649
01650
01651
01652
01653
01654
01655
01656
01657
01658
01659
01660
01661
01662
01663
01664
01665
01666
01667
01668 static int evtwait(int desc, ic_msg *m)
01669 {
01670 int ret ;
01671 static int counter = 0 ;
01672
01673
01674 ret = msgNQReceive(desc,(char *)m, sizeof(ic_msg),NO_WAIT) ;
01675
01676 LOG(DBG,"msgNQReceive returned %d",ret,0,0,0,0) ;
01677
01678 if(ret == MSG_Q_TIMEOUT) {
01679 LOG(DBG, "read a timeout count=%d",counter);
01680 counter++ ;
01681 if(counter >= 100) {
01682 counter = 0 ;
01683 if(msgNQCheck(desc)) {
01684 LOG(DBG, "check returned ok...");
01685 return STAT_TIMED_OUT ;
01686 }
01687 else {
01688 LOG(DBG,"EVP_TASK died",0,0,0,0,0) ;
01689 return STAT_ERROR ;
01690 }
01691 }
01692
01693 return STAT_TIMED_OUT ;
01694 }
01695
01696 counter = 0 ;
01697
01698 if(ret > 0) {
01699 int i, *intp ;
01700 intp = (int *) m ;
01701 LOG(DBG,"0x%08X 0x%08X 0x%08X; %d %d",*intp,*(intp+1),*(intp+2),m->head.daq_cmd,m->head.status) ;
01702
01703 for(i=0;i<3;i++) {
01704 *(intp+i) = ntohl(*(intp+i)) ;
01705 }
01706 LOG(DBG,"0x%08X 0x%08X 0x%08X; %d %d",*intp,*(intp+1),*(intp+2),m->head.daq_cmd,m->head.status) ;
01707 return STAT_OK ;
01708 }
01709
01710 return STAT_ERROR ;
01711 }
01712
01713
01714
01715 int daqReader::fixDatapSummary(DATAP *datap)
01716 {
01717 DATAP *sumdatap;
01718
01719 fs_dirent *ent = sfs->opendirent("legacy");
01720 if(!ent) {
01721 LOG(ERR, "Can't find legacy data");
01722 return -1;
01723 }
01724
01725 sumdatap = (DATAP *)(memmap->mem + ent->offset);
01726
01727 if(memcmp(sumdatap->bh.bank_type, "DATAP", 5) != 0) {
01728 char *x = sumdatap->bh.bank_type;
01729 LOG(ERR, "fixDatapSummary... legacy not DATAP: %c%c%c%c%c",
01730 x[0],x[1],x[2],x[3],x[4]);
01731 return -1;
01732 }
01733
01734
01735 datap->len = sumdatap->len;
01736 datap->time = sumdatap->time;
01737 datap->seq = sumdatap->seq;
01738 datap->trg_word = sumdatap->trg_word;
01739 datap->trg_in_word = sumdatap->trg_in_word;
01740 datap->detector = sumdatap->detector;
01741 memcpy(datap->TRG_L1_summary, sumdatap->TRG_L1_summary, sizeof(datap->TRG_L1_summary));
01742 memcpy(datap->TRG_L2_summary, sumdatap->TRG_L2_summary, sizeof(datap->TRG_L2_summary));
01743 memcpy(datap->L3_Summary, sumdatap->L3_Summary, sizeof(datap->L3_Summary));
01744 memcpy(&datap->evtdes, &sumdatap->evtdes, sizeof(datap->evtdes));
01745
01746 return 0;
01747 }
01748
01749 int daqReader::reconnect(void)
01750 {
01751 int ret ;
01752 int retries ;
01753 ic_msg msg ;
01754
01755
01756 evpDesc = -1 ;
01757
01758 retries = 0 ;
01759
01760 for(;;) {
01761
01762 evpDesc = msgNQCreate(EVP_HOSTNAME,EVP_PORT,120) ;
01763
01764 if(evpDesc < 0) {
01765 LOG(ERR,"Can't create connection to %s:%d [%s] - will retry...",EVP_HOSTNAME,EVP_PORT,
01766 strerror(errno),0,0) ;
01767 fprintf(stderr,"CRITICAL: Can't create connection to %s:%d [%s] - will retry...\n",EVP_HOSTNAME,EVP_PORT,
01768 strerror(errno)) ;
01769 sleep(1) ;
01770 return -1;
01771 }
01772
01773
01774 if(retries) {
01775 LOG(WARN,"Connection suceeded!",0,0,0,0,0) ;
01776 }
01777
01778
01779 LOG(DBG,"Opened connection to %s, port %d on descriptor %d",EVP_HOSTNAME, EVP_PORT,evpDesc,0,0) ;
01780
01781 msg.head.daq_cmd = RTS_ETHDOOR_ANNOUNCE ;
01782 msg.head.status = 0 ;
01783 msg.ld.dword[0] = htonl(getpid()) ;
01784
01785 char *user ;
01786
01787 struct passwd *passwd = getpwuid(getuid()) ;
01788 if(passwd == NULL) {
01789 LOG(WARN,"User doesn't exist?",0,0,0,0,0) ;
01790 user = "????" ;
01791 }
01792 else {
01793 user = passwd->pw_name ;
01794 }
01795
01796
01797 strncpy((char *)&msg.ld.dword[1],user,12) ;
01798 strncpy((char *)&msg.ld.dword[4],getCommand(),12) ;
01799 msg.head.valid_words = 1+7 ;
01800
01801 #define BABABA
01802 #ifdef BABABA
01803 {
01804 int jj ;
01805 int *intp = (int *) &msg ;
01806 for(jj=0;jj<3;jj++) {
01807 *(intp+jj) = htonl(*(intp+jj)) ;
01808 }
01809 }
01810 #endif
01811
01812 ret = msgNQSend(evpDesc,(char *)&msg,120,60) ;
01813 if(ret < 0) {
01814 LOG(ERR,"Can't send data to %s! - will reconnect...",EVP_HOSTNAME,0,0,0,0) ;
01815 msgNQDelete(evpDesc) ;
01816 evpDesc = -1 ;
01817 continue ;
01818
01819 }
01820
01821
01822 status = EVP_STAT_OK ;
01823 LOG(DBG,"Returning to caller, status %d",status,0,0,0,0) ;
01824 break ;
01825 }
01826
01827
01828
01829
01830 return 0 ;
01831 }
01832
01833 static int ask(int desc, ic_msg *m)
01834 {
01835 int ret ;
01836 time_t tm ;
01837 int jj ;
01838 int *intp = (int *) m ;
01839
01840 m->head.daq_cmd = EVP_REQ_EVENT ;
01841 m->head.status = STAT_OK ;
01842 m->head.dst_task = EVP_TASK ;
01843 m->head.valid_words = 1+1 ;
01844 m->head.source_id = EVP_NODE ;
01845 m->head.src_task = EVP_TASK_READER ;
01846
01847 LOG(DBG,"Sending request to EVP_TASK",0,0,0,0,0) ;
01848 tm = time(NULL) ;
01849
01850
01851 for(jj=0;jj<3;jj++) {
01852 *(intp+jj) = htonl(*(intp+jj)) ;
01853 }
01854
01855 ret = msgNQSend(desc, (char *)m, 120,10) ;
01856
01857 LOG(DBG,"msgNQSend returned %d in %d seconds",ret,time(NULL)-tm,0,0,0) ;
01858
01859 if(ret < 0) {
01860 return STAT_ERROR ;
01861 }
01862 else {
01863 return STAT_OK ;
01864 }
01865 }
01866
01867
01868 static const char *getCommand(void)
01869 {
01870
01871
01872 static const char *str = "(no-name)" ;
01873 #ifdef __linux__
01874 FILE *file ;
01875 static char name[128] ;
01876 int dummy ;
01877
01878 file = fopen("/proc/self/stat","r") ;
01879 if(file==NULL) return str ;
01880
01881 fscanf(file,"%d %s",&dummy,name) ;
01882 fclose(file) ;
01883 *(name+strlen(name)-1) = 0 ;
01884 return name+1 ;
01885 #else // solaris
01886 int fd, ret ;
01887 static struct psinfo ps ;
01888
01889 fd = open("/proc/self/psinfo",O_RDONLY,0666) ;
01890 if(fd < 0) return str ;
01891
01892 ret = read(fd,(char *)&ps,sizeof(ps)) ;
01893 close(fd) ;
01894
01895 if(ret != sizeof(ps)) {
01896 return str ;
01897 }
01898
01899 return ps.pr_fname ;
01900 #endif
01901 }
01902
01903
01904
01905
01906
01907
01908
01909
01910
01911
01912
01913
01914
01915
01916
01917
01918
01919
01920 struct copy_SFS_File {
01921 char type[4];
01922 UINT32 byte_order;
01923 UINT32 sz;
01924 UINT8 head_sz;
01925 UINT8 attr;
01926 UINT16 reserved;
01927 char name[4];
01928 };
01929
01930
01931 DATAP *getlegacydatap(char *mem, int bytes)
01932 {
01933 int off = 0;
01934 char *curr = mem;
01935
01936 LOG(DBG, "off = %d bytes = %d", off, bytes);
01937 while(off < bytes) {
01938
01939
01940 if(memcmp(curr, "LRHD", 4) == 0) {
01941 LOG(DBG, "hop over LRHD");
01942 curr += sizeof(LOGREC);
01943 off += sizeof(LOGREC);
01944 }
01945 else if(memcmp(curr, "DATAP", 5) == 0) {
01946 LOG(DBG, "hop over DATAP");
01947 curr += sizeof(DATAP);
01948 off += sizeof(DATAP);
01949 }
01950 else if(memcmp(curr, "SFS", 3) == 0) {
01951 LOG(DBG, "hop over SFS volume spec");
01952 curr += 12;
01953 off += 12;
01954 }
01955 else if(memcmp(curr, "HEAD", 4) == 0) {
01956 LOG(DBG, "hop over SFS header");
01957 curr += 12;
01958 off += 12;
01959 }
01960 else if(memcmp(curr, "FILE", 4) == 0) {
01961 copy_SFS_File *file = (copy_SFS_File *)curr;
01962
01963 int swap = (file->byte_order == 0x04030201) ? 0 : 1;
01964
01965 if(strstr(file->name, "legacy")) {
01966 LOG(DBG, "Found legacy file");
01967 off += file->head_sz;
01968 curr += file->head_sz;
01969
01970 if(memcmp(curr, "DATAP", 5) != 0) {
01971 LOG(ERR, "Got to legacy file, but not DATAP? is %c%c%c%c%c",
01972 curr[0],curr[1],curr[2],curr[3],curr[4]);
01973 return NULL;
01974 }
01975
01976 return (DATAP *)curr;
01977 }
01978 else {
01979 LOG(DBG, "hop over SFS File (%s)", file->name);
01980 off += file->head_sz + ((qswap32(swap, file->sz)+3) & 0xfffffffc);
01981 curr += file->head_sz + ((qswap32(swap, file->sz)+3) & 0xfffffffc);
01982 }
01983 }
01984 else {
01985 LOG(DBG, "There is no legacy datap");
01986 return NULL;
01987 }
01988 }
01989
01990 LOG(ERR, "no legacy datap");
01991 return NULL;
01992 }
01993
01994 int daqReader::getStatusBasedEventDelay()
01995 {
01996
01997 int delay = 0;
01998
01999 switch(status) {
02000 case EVP_STAT_EVT :
02001 delay = 100000;
02002 LOG(DBG, "Delaying for %d usec because of error on last event",delay);
02003 break ;
02004 case EVP_STAT_EOR :
02005 delay = 100000;
02006 LOG(DBG, "Delaying for %d usec because last event was end of run",delay);
02007 break ;
02008 case EVP_STAT_CRIT :
02009 delay = 1000000;
02010 LOG(ERR, "Delaying for %d usec because last event had critical status",delay);
02011
02012 crit_cou++;
02013 if(crit_cou > 10) {
02014 LOG(ERR,"That's IT! Bye...",0,0,0,0,0);
02015 sleep(1) ;
02016 exit(-1) ;
02017 }
02018
02019 default :
02020 break ;
02021 }
02022
02023 if(status != EVP_STAT_CRIT) crit_cou = 0;
02024
02025 return delay;
02026 }
02027
02028
02029 int daqReader::writeCurrentEventToDisk(char *ofilename)
02030 {
02031 int fdo;
02032 int ret;
02033
02034 if(memmap->mem == NULL) {
02035 LOG(ERR, "Can't write current event. No event");
02036 return -1;
02037 }
02038
02039
02040 fdo = open(ofilename, O_APPEND | O_WRONLY | O_CREAT, 0666);
02041 if(fdo < 0) {
02042 LOG(ERR, "Error opening output file %s (%s)", ofilename, strerror(errno));
02043 return -1;
02044 }
02045
02046 ret = write(fdo, memmap->mem, event_size);
02047 if(ret != event_size) {
02048 LOG(ERR, "Error writing event data (%s)",strerror(errno));
02049 close(fdo);
02050 return -1;
02051 }
02052
02053 close(fdo);
02054 return 0;
02055 }
02056
02057 MemMap::MemMap()
02058 {
02059 mem=NULL;
02060 actual_mem_start=NULL;
02061 actual_size=0;
02062 fd = -1;
02063
02064 page_size = sysconf(_SC_PAGESIZE);
02065 }
02066
02067 MemMap::~MemMap()
02068 {
02069 unmap();
02070 }
02071
02072
02073 char *MemMap::map(int _fd, long long int _offset, int _size)
02074 {
02075 offset = _offset;
02076 size = _size;
02077 fd = _fd;
02078
02079 LOG(DBG, "Calling mmap fd=%d offset=%d size=%d",
02080 _fd, _offset, _size);
02081
02082 int excess = offset % page_size;
02083 actual_offset = offset - excess;
02084 actual_size = size + excess;
02085
02086 LOG(DBG, " mmap excess=%d aoffset=%d asize=%d",
02087 excess, actual_offset, actual_size);
02088
02089 actual_mem_start = (char *) mmap64(NULL, actual_size, PROT_READ,MAP_SHARED|MAP_NORESERVE, fd, actual_offset) ;
02090 madvise(actual_mem_start, actual_size, MADV_SEQUENTIAL);
02091
02092 if(((void *)actual_mem_start) == MAP_FAILED) {
02093 LOG(ERR,"Error in mmap (%s)",strerror(errno),0,0,0,0) ;
02094
02095 mem=NULL;
02096 offset=0;
02097 size=0;
02098 actual_offset=0;
02099 actual_mem_start=NULL;
02100 actual_size=0;
02101 return NULL;
02102 }
02103
02104 mem = actual_mem_start + excess;
02105 return mem;
02106 }
02107
02108 void MemMap::unmap()
02109 {
02110 if(mem==NULL) return;
02111
02112 madvise(actual_mem_start, actual_size, MADV_DONTNEED);
02113 munmap(actual_mem_start, actual_size);
02114
02115 mem=NULL;
02116 offset=0;
02117 size=0;
02118 actual_offset=0;
02119 actual_mem_start=NULL;
02120 actual_size=0;
02121 }
02122
02123
02124
02125
02126
02127
02128
02129 int daqReader::readNextFutureSummaryInfo(SummaryInfo *info)
02130 {
02131 memset(info, 0, sizeof(SummaryInfo));
02132
02133 if(input_type != file) {
02134 LOG(ERR, "Can't read next future evt header unless reading from daq file");
02135 return -1;
02136 }
02137
02138 if(sfs->singleDirMount == 0) {
02139 LOG(ERR, "Need a current file to read the next one...");
02140 return -1;
02141 }
02142
02143 LOG(DBG, "(sfs) singleDirOffset = %d singleDirSize = %d", sfs->singleDirOffset, sfs->singleDirSize);
02144
02145 long long int offset = sfs->singleDirOffset + sfs->singleDirSize;
02146
02147 sfs_index *nsfs = new sfs_index();
02148 if(!nsfs) {
02149 LOG(ERR, "Couldn't create sfs_index");
02150 return -1;
02151 }
02152
02153 LOG(DBG, "mounting dir at offset %s:%d",file_name,offset);
02154 int ret = nsfs->mountSingleDir(file_name, offset);
02155 if(ret < 0) {
02156 LOG("ERR", "Error mounting dir at offset %s:%d",file_name,offset);
02157
02158 delete nsfs;
02159 return ret;
02160 }
02161 if(ret == 0) {
02162 LOG(NOTE, "End of file reading next dir...");
02163 delete nsfs;
02164 return ret;
02165 }
02166
02167 LOG(DBG, "Mounted dir (nsfs) off: %d sz: %d",nsfs->singleDirOffset, nsfs->singleDirSize);
02168
02169
02170
02171 int fd = open(file_name, O_RDONLY);
02172 if(fd <=0) {
02173 LOG(ERR, "No defined file descriptor");
02174 delete nsfs;
02175 return -1;
02176 }
02177
02178 MemMap *nmem = new MemMap();
02179 char *mymem = nmem->map(fd, nsfs->singleDirOffset, nsfs->singleDirSize);
02180 if(!mymem) {
02181 LOG(ERR, "Couldn't map memory");
02182 delete nmem;
02183 delete nsfs;
02184 close(fd);
02185 return -1;
02186 }
02187
02188 LOG(DBG, "mapped off=%d sz=%d into 0x%x",nsfs->singleDirOffset, nsfs->singleDirSize, mymem);
02189
02190 fs_dir *rootdir = nsfs->opendir("/");
02191 for(;;) {
02192 fs_dirent *ent = nsfs->readdir(rootdir);
02193 if(!ent) {
02194 nsfs->closedir(rootdir);
02195 LOG(ERR, "Error finding event directory in sfs?");
02196
02197
02198 nsfs->closedir(rootdir);
02199 delete nmem;
02200 delete nsfs;
02201 close(fd);
02202 return -1;
02203 }
02204
02205 if(memcmp(ent->full_name, "/#", 2) == 0) {
02206 info->seq = atoi(&ent->full_name[2]);
02207
02208 nsfs->cd(ent->full_name);
02209 nsfs->closedir(rootdir);
02210 break;
02211 }
02212
02213 if(allnumeric(&ent->full_name[1])) {
02214 info->seq = atoi(&ent->full_name[1]);
02215 nsfs->cd(ent->full_name);
02216 nsfs->closedir(rootdir);
02217 break;
02218 }
02219
02220 LOG(DBG, "SFS event directory not yet found: %s",ent->full_name);
02221 }
02222
02223 fs_dirent *summary = nsfs->opendirent("EventSummary");
02224 if(summary) {
02225
02226
02227
02228 int mem_offset = summary->offset-nsfs->singleDirOffset;
02229 LOG(DBG, "found summary %d: file[%d-%d] sz=%d corr=%d)",summary->offset,nsfs->singleDirOffset, nsfs->singleDirOffset+nsfs->singleDirSize,nsfs->singleDirSize, summary->offset-nsfs->singleDirOffset);
02230
02231 char *buff = mymem + mem_offset;
02232 fillSummaryInfo(info,(gbPayload *)buff);
02233 }
02234 else {
02235 LOG(NOTE, "No EventSummary, search for legacy datap");
02236 summary = nsfs->opendirent("legacy");
02237 if(!summary) {
02238 LOG(NOTE, "No EventSummary and no DATAP... hacking summary info");
02239 }
02240 else {
02241 long long int mem_offset = summary->offset - nsfs->singleDirOffset;
02242 char *buff = mymem + mem_offset;
02243 fillSummaryInfo(info,(DATAP *)buff);
02244 copySummaryInfoIn(info);
02245 }
02246 }
02247
02248 delete nsfs;
02249 delete nmem;
02250 close(fd);
02251
02252 return 1;
02253 }
02254
02255