00001 #ifndef TMSGQUEUE_HH
00002 #define TMSGQUEUE_HH
00003
00004 #ifdef sun
00005 #include <synch.h>
00006 #include <thread.h>
00007 #else
00008 #include <semaphore.h>
00009 #include <pthread.h>
00010 #endif
00011 #include <errno.h>
00012 #include "StaticSizedDQueue.hh"
00013
00014
00015
00016 template <class T> class thrMsgQueue
00017 {
00018 public:
00019 thrMsgQueue(int s) ;
00020 ~thrMsgQueue() ;
00021 int send(T* a, int prio = 0 ) ;
00022 int receive(T* a) ;
00023 #ifndef sun
00024 int peek(T* a);
00025 #endif
00026 int entries() { return(q->entries()) ;} ;
00027 int free() { return(q->free()) ;} ;
00028
00029
00030
00031 private:
00032 sdqueue<T> *q ;
00033 #ifdef sun
00034 sema_t empty ;
00035 sema_t occupied ;
00036 mutex_t mp ;
00037 #else
00038 sem_t empty ;
00039 sem_t occupied ;
00040 pthread_mutexattr_t mattr ;
00041 pthread_mutex_t mp ;
00042 #endif
00043 int elements ;
00044 };
00045
00046
00047 template <class T> thrMsgQueue<T>::~thrMsgQueue()
00048 {
00049 #ifdef sun
00050 sema_destroy(&empty);
00051 sema_destroy(&occupied) ;
00052 mutex_destroy(&mp) ;
00053 #else
00054 sem_destroy(&empty) ;
00055 sem_destroy(&occupied) ;
00056 pthread_mutex_destroy(&mp) ;
00057 #endif
00058 delete q;
00059 };
00060
00061
00062 template <class T> thrMsgQueue<T>::thrMsgQueue(int s)
00063 {
00064 elements = s;
00065 q = new sdqueue<T>(elements);
00066
00067 #ifdef sun
00068 mutex_init(&mp,USYNC_THREAD,NULL);
00069 sema_init(&empty,elements,USYNC_THREAD,NULL) ;
00070 sema_init(&occupied,0,USYNC_THREAD,NULL) ;
00071 #else
00072 pthread_mutex_init(&mp,NULL);
00073 sem_init(&empty,0,elements) ;
00074 sem_init(&occupied,0,0) ;
00075 #endif
00076 };
00077
00078 #ifdef linux
00079 template <class T> int thrMsgQueue<T>::send(T* a, int prio)
00080 #else
00081 template <class T> int thrMsgQueue<T>::send(T* a, int prio = 0)
00082 #endif
00083 {
00084 int iret ;
00085 l1:
00086 errno = 0 ;
00087 #ifdef sun
00088 iret = sema_wait(&empty);
00089 #else
00090 iret = sem_wait(&empty) ;
00091 #endif
00092 if(iret)
00093 {
00094 if(errno == EINTR) goto l1 ;
00095 else return(-1) ;
00096 }
00097 #ifdef sun
00098 mutex_lock(&mp);
00099 #else
00100 pthread_mutex_lock(&mp) ;
00101 #endif
00102
00103 if(prio)
00104 iret = q->prepend(a) ;
00105 else
00106 iret = q->insert(a) ;
00107
00108 #ifdef sun
00109 mutex_unlock(&mp);
00110 #else
00111 pthread_mutex_unlock(&mp);
00112 #endif
00113
00114 if(iret)
00115 {
00116 return(-1) ;
00117 }
00118
00119 #ifdef sun
00120 sema_post(&occupied) ;
00121 #else
00122 sem_post(&occupied) ;
00123 #endif
00124 return(0) ;
00125 };
00126
00127 template <class T> int thrMsgQueue<T>::receive(T* a)
00128 {
00129 int iret ;
00130
00131 l1:
00132 errno = 0 ;
00133 #ifdef sun
00134 iret = sema_wait(&occupied);
00135 #else
00136 iret = sem_wait(&occupied);
00137 #endif
00138
00139 if(iret)
00140 {
00141 if(errno == EINTR) goto l1;
00142 return(-1);
00143 }
00144
00145 #ifdef sun
00146 mutex_lock(&mp);
00147 #else
00148 pthread_mutex_lock(&mp) ;
00149 #endif
00150
00151 iret = q->get(a) ;
00152
00153 #ifdef sun
00154 mutex_unlock(&mp);
00155 #else
00156 pthread_mutex_unlock(&mp) ;
00157 #endif
00158
00159 if(iret)
00160 {
00161 return(-2) ;
00162 }
00163
00164 #ifdef sun
00165 sema_post(&empty);
00166 #else
00167 sem_post(&empty);
00168 #endif
00169
00170 return(0);
00171 };
00172
00173 #ifndef sun
00174 template <class T> int thrMsgQueue<T>::peek(T* a)
00175 {
00176 int iret ;
00177 int semval = 0;
00178 errno = 0 ;
00179
00180 pthread_mutex_lock(&mp) ;
00181 iret = sem_getvalue(&occupied, &semval);
00182
00183 if(semval == 0) {
00184 iret = -1;
00185 }
00186 else {
00187 iret = q->first(a);
00188 }
00189
00190 pthread_mutex_unlock(&mp) ;
00191
00192 return iret;
00193 };
00194 #endif
00195
00196
00197
00198 #endif
00199
00200
00201