00001 #include <stdio.h>
00002 #include <unistd.h>
00003 #include <sys/types.h>
00004 #include <sys/stat.h>
00005 #include <fcntl.h>
00006 #include <errno.h>
00007 #include <string.h>
00008 #include <poll.h>
00009 #include <sys/socket.h>
00010 #include <netinet/in.h>
00011 #include <arpa/inet.h>
00012 #include <netdb.h>
00013
00014 #ifdef __linux
00015
00016 #endif
00017
00018 #include <rtsLog.h>
00019
00020
00021 #include "msgNQLib.h"
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036
00037
00038
00039
00040
00041
00042 int msgNQCreate(char *host, int port, int msglen)
00043 {
00044 struct sockaddr_in me ;
00045 int size, dsc ;
00046 struct hostent *hostent ;
00047 int optval ;
00048 int ret ;
00049
00050 size = sizeof(struct sockaddr_in) ;
00051 memset((char *)&me,0,size) ;
00052
00053
00054 me.sin_family = AF_INET ;
00055 me.sin_port = htons(port) ;
00056
00057
00058 hostent = gethostbyname(host) ;
00059 if(hostent == NULL) {
00060 LOG(CRIT,"Unknown host %s",host,0,0,0,0) ;
00061 return -1 ;
00062 }
00063
00064
00065 memcpy(&me.sin_addr.s_addr,*(hostent->h_addr_list),sizeof(me.sin_addr.s_addr)) ;
00066
00067 errno = 0 ;
00068 dsc = socket(AF_INET, SOCK_STREAM, 0) ;
00069 if(dsc < 0) {
00070 LOG(CRIT,"socket() failed [%s]",strerror(errno),0,0,0,0) ;
00071 return -1 ;
00072 }
00073
00074 optval = 1 ;
00075 setsockopt(dsc,SOL_SOCKET, SO_KEEPALIVE, (char *)&optval, sizeof(optval)) ;
00076 setsockopt(dsc,SOL_SOCKET, SO_REUSEADDR, (char *)&optval, sizeof(optval)) ;
00077
00078 errno = 0 ;
00079 if(connect(dsc,(struct sockaddr *)&me,size) < 0) {
00080 LOG(CRIT,"connect() to %s, port %d failed [%s]",host,port,strerror(errno),0,0) ;
00081 close(dsc) ;
00082 return -1 ;
00083 }
00084
00085
00086 #ifdef __linux
00087 LOG(DBG,"Before fcntl") ;
00088 errno = 0 ;
00089 ret = fcntl(dsc,F_SETFL, O_NONBLOCK) ;
00090 if(ret < 0) {
00091 LOG(CRIT,"fcntl() failed [%s]",strerror(errno),0,0,0,0) ;
00092 close(dsc) ;
00093 return -1 ;
00094 }
00095
00096
00097 #else
00098 LOG(DBG,"Before fcntl...",0,0,0,0,0) ;
00099 int modes ;
00100 errno = 0 ;
00101 ret = fcntl(dsc,F_GETFL,&modes) ;
00102 if(ret < 0) {
00103 LOG(CRIT,"fcntl() failed [%s]",strerror(errno),0,0,0,0) ;
00104 close(dsc) ;
00105 return -1 ;
00106 }
00107
00108 LOG(DBG,"Before fcntl 0x%X",modes,0,0,0,0) ;
00109 errno = 0 ;
00110 ret = fcntl(dsc,F_SETFL, modes|O_NONBLOCK) ;
00111 if(ret < 0) {
00112 LOG(CRIT,"fcntl() failed [%s]",strerror(errno),0,0,0,0) ;
00113 close(dsc) ;
00114 return -1 ;
00115 }
00116 #endif
00117
00118 return dsc ;
00119 }
00120
00121
00122
00123
00124
00125
00126
00127
00128
00129
00130
00131
00132 int msgNQSend(int dsc, char *what, int size, int timeout, int prio)
00133 {
00134 int ret ;
00135 struct pollfd pollstruct ;
00136
00137
00138 if(timeout == WAIT_FOREVER) timeout = 100000000 ;
00139
00140 pollstruct.fd = dsc ;
00141 pollstruct.events = POLLOUT ;
00142
00143
00144 size = 120 ;
00145
00146
00147 for(;;) {
00148 if(msgNQCheck(dsc)==0) {
00149 LOG(ERR,"Task %d not there...",dsc,0,0,0,0) ;
00150 return MSG_Q_NOTASK ;
00151 }
00152
00153 errno = 0 ;
00154 ret = write(dsc,what,size) ;
00155 if(ret < 0) {
00156 if(errno == EAGAIN) {
00157 if(timeout) {
00158
00159 errno = 0 ;
00160 ret = poll(&pollstruct,1,1000) ;
00161
00162 if((timeout % 10) == 0) {
00163 LOG(DBG,"Unable to send to task %d in 10 seconds...",dsc,0,0,0,0) ;
00164 }
00165
00166 timeout-- ;
00167
00168 if(ret >= 0) {
00169 continue ;
00170 }
00171
00172 if(errno == EINTR) {
00173 continue ;
00174 }
00175 LOG(ERR,"poll() returned (%s)",strerror(errno),0,0,0,0) ;
00176 return MSG_Q_ERROR ;
00177
00178 }
00179 else break ;
00180 }
00181 else {
00182 LOG(ERR,"Can't write to task (%s)",strerror(errno),0,0,0,0) ;
00183 return MSG_Q_ERROR ;
00184 }
00185 }
00186 else break ;
00187 } ;
00188
00189 if(ret < 0) return MSG_Q_TIMEOUT ;
00190
00191 if(ret != size) {
00192 LOG(ERR,"Bad size (%d != ret %d) in task %d (%s)",120,ret,dsc,strerror(errno),prio) ;
00193 return MSG_Q_ERROR ;
00194 }
00195
00196 return size ;
00197 }
00198
00199
00200
00201
00202 int msgNQReceive(int dsc, char *where, int size, int timeout)
00203 {
00204 int ret ;
00205 struct pollfd pollstruct ;
00206
00207
00208
00209
00210 if(timeout < 0) timeout = 100000000 ;
00211
00212 pollstruct.fd = dsc ;
00213 #ifdef __linux
00214 pollstruct.events = POLLIN | POLLPRI ;
00215 #else
00216 pollstruct.events = POLLIN | POLLRDNORM | POLLRDBAND | POLLPRI ;
00217 #endif
00218
00219 size = 120 ;
00220
00221 for(;;) {
00222 if(!msgNQCheck(dsc)) {
00223 LOG(ERR,"Task %d not there ...",dsc,0,0,0,0) ;
00224 return MSG_Q_NOTASK ;
00225 }
00226
00227 LOG(DBG,"Before read %d",size,0,0,0,0) ;
00228 errno = 0 ;
00229 ret = read(dsc,where,size) ;
00230 LOG(DBG,"After read %d, %d",ret,errno,0,0,0);
00231
00232 if(ret < 0) {
00233 if(errno == EAGAIN) {
00234 if(timeout) {
00235
00236 errno = 0 ;
00237 ret = poll(&pollstruct,1,1000) ;
00238
00239 if((timeout % 10) == 0) {
00240 LOG(DBG,"Unable to rcv. from task %d in 10 seconds...",dsc,0,0,0,0) ;
00241 }
00242
00243 timeout-- ;
00244 if(ret >= 0) {
00245 continue ;
00246 }
00247
00248 if(errno == EINTR) {
00249 LOG(DBG,"Signal caught while in rcv. poll() from task %d...",dsc,0,0,0,0) ;
00250 continue ;
00251 }
00252 LOG(ERR,"poll() returned (%s)",strerror(errno),0,0,0,0) ;
00253 return MSG_Q_ERROR ;
00254
00255 }
00256 else break ;
00257
00258 }
00259 else {
00260 LOG(ERR,"Can't read from task %d (%s)",dsc,strerror(errno),0,0,0) ;
00261 return MSG_Q_ERROR ;
00262 }
00263 }
00264 break ;
00265
00266 } ;
00267
00268 if(ret < 0) return MSG_Q_TIMEOUT ;
00269
00270 if(ret != size) {
00271 LOG(ERR,"Read returned %d instead of %d - task %d",ret,size,dsc,0,0) ;
00272 return MSG_Q_ERROR ;
00273 }
00274
00275
00276 return 120 ;
00277 }
00278
00279
00280
00281
00282
00283 int msgNQDelete(int desc)
00284 {
00285
00286 close(desc) ;
00287
00288 return 0 ;
00289 }
00290
00291
00292
00293
00294
00295
00296
00297 int msgNQCheck(int dsc)
00298 {
00299 int optval ;
00300 int ret ;
00301 #ifdef __linux
00302 socklen_t size;
00303 #else
00304 int size ;
00305 #endif
00306
00307 if(dsc < 0) {
00308 LOG(WARN,"No such NQueue %d",dsc,0,0,0,0) ;
00309 return 0 ;
00310 }
00311
00312 size = sizeof(optval) ;
00313
00314
00315 ret = getsockopt(dsc, SOL_SOCKET, SO_KEEPALIVE, (char *)&optval, &size) ;
00316 if(ret < 0) {
00317 LOG(ERR,"getsockopt() returned error for dsc %d [%s]",dsc,strerror(errno),0,0,0) ;
00318 return 0 ;
00319 }
00320
00321
00322 return 1 ;
00323
00324 }
00325
00326
00327
00328