StRoot  1
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Groups Pages
ThreadsMsgQueue.hh
1 #ifndef TMSGQUEUE_HH
2 #define TMSGQUEUE_HH
3 
4 #ifdef __unix__
5 #ifndef unix
6 #define unix
7 #endif
8 #endif
9 
10 #ifdef __linux__
11 #ifndef linux
12 #define linux
13 #endif
14 #endif
15 
16 
17 #ifdef sun
18 #include <synch.h>
19 #include <thread.h>
20 #else
21 #include <semaphore.h>
22 #include <pthread.h>
23 #endif
24 #include <errno.h>
25 #include "StaticSizedDQueue.hh"
26 
27 // The queue has to be protected on both ends
28 
29 template <class T> class thrMsgQueue
30 {
31 public:
32  thrMsgQueue(int s) ; // number of elements of type T deep
33  ~thrMsgQueue() ;
34  int send(T* a, int prio = 0 ) ; // 0 = low prio != 0 == high
35  int receive(T*, int block=1) ; // Blocking
36 #ifndef sun
37  int peek(T* a);
38 #endif
39  int entries() { return(q->entries()) ;} ;
40  int free() { return(q->free()) ;} ;
41  // int clear() ; // 0 is O.K >0 some fatal error !!!!
42  // I removed clear since clear would require another set of locks
43  // to avoid changing of the semaphores during wiping
44 private:
45  sdqueue<T> *q ;
46 #ifdef sun
47  sema_t empty ;
48  sema_t occupied ;
49  mutex_t mp ;
50 #else
51  sem_t empty ;
52  sem_t occupied ;
53  pthread_mutexattr_t mattr ;
54  pthread_mutex_t mp ;
55 #endif
56  int elements ;
57 };
58 
59 // Implementation
60 template <class T> thrMsgQueue<T>::~thrMsgQueue()
61 {
62 #ifdef sun
63  sema_destroy(&empty);
64  sema_destroy(&occupied) ;
65  mutex_destroy(&mp) ;
66 #else
67  sem_destroy(&empty) ;
68  sem_destroy(&occupied) ;
69  pthread_mutex_destroy(&mp) ;
70 #endif
71  delete q;
72 };
73 
74 //--------------------------------------------------------------
75 template <class T> thrMsgQueue<T>::thrMsgQueue(int s)
76 {
77  elements = s;
78  q = new sdqueue<T>(elements);
79 
80 #ifdef sun
81  mutex_init(&mp,USYNC_THREAD,NULL);
82  sema_init(&empty,elements,USYNC_THREAD,NULL) ;
83  sema_init(&occupied,0,USYNC_THREAD,NULL) ;
84 #else
85  pthread_mutex_init(&mp,NULL);
86  sem_init(&empty,0,elements) ;
87  sem_init(&occupied,0,0) ;
88 #endif
89 };
90 //--------------------------------------------------------------
91 #if defined(linux) || defined(__APPLE__)
92 template <class T> int thrMsgQueue<T>::send(T* a, int prio)
93 #else
94 template <class T> int thrMsgQueue<T>::send(T* a, int prio = 0)
95 #endif
96 {
97  int iret ;
98  l1:
99  errno = 0 ;
100 #ifdef sun
101  iret = sema_wait(&empty);
102 #else
103  iret = sem_wait(&empty) ;
104 #endif
105  if(iret)
106  {
107  if(errno == EINTR) goto l1 ; // a signal
108  else return(-1) ;
109  }
110 #ifdef sun
111  mutex_lock(&mp);
112 #else
113  pthread_mutex_lock(&mp) ;
114 #endif
115 
116  if(prio)
117  iret = q->prepend(a) ;
118  else
119  iret = q->insert(a) ;
120 
121 #ifdef sun
122  mutex_unlock(&mp);
123 #else
124  pthread_mutex_unlock(&mp);
125 #endif
126 
127  if(iret)
128  {
129  return(-1) ;
130  }
131 
132 #ifdef sun
133  sema_post(&occupied) ;
134 #else
135  sem_post(&occupied) ;
136 #endif
137  return(0) ; // send it !!!
138 };
139 //-----------------------------------------------------
140 template <class T> int thrMsgQueue<T>::receive(T* a, int block)
141 {
142  int iret ;
143 
144  l1:
145  errno = 0 ;
146 
147  if(block) {
148 #ifdef sun
149  iret = sema_wait(&occupied);
150 #else
151  iret = sem_wait(&occupied);
152 #endif
153  }
154  else {
155  iret = sem_trywait(&occupied);
156  if(iret < 0) {
157  return -1;
158  }
159  }
160 
161 
162  if(iret)
163  {
164  if(errno == EINTR) goto l1;
165  return(-10);
166  }
167 
168 #ifdef sun
169  mutex_lock(&mp);
170 #else
171  pthread_mutex_lock(&mp) ;
172 #endif
173 
174  iret = q->get(a) ;
175 
176 #ifdef sun
177  mutex_unlock(&mp);
178 #else
179  pthread_mutex_unlock(&mp) ;
180 #endif
181 
182  if(iret)
183  {
184  return(-20) ;
185  }
186 
187 #ifdef sun
188  sema_post(&empty);
189 #else
190  sem_post(&empty);
191 #endif
192 
193  return(0);
194 };
195 
196 #ifndef sun
197 template <class T> int thrMsgQueue<T>::peek(T* a)
198 {
199  int iret ;
200  int semval = 0;
201  errno = 0 ;
202 
203  pthread_mutex_lock(&mp) ;
204  iret = sem_getvalue(&occupied, &semval);
205 
206  if(semval == 0) {
207  iret = -1;
208  }
209  else {
210  iret = q->first(a);
211  }
212 
213  pthread_mutex_unlock(&mp) ;
214 
215  return iret;
216 };
217 #endif
218 
219 //-------------------------------------------
220 
221 #endif
222 
223 
224