|
| 1 | +#include<sys/ioctl.h> |
| 2 | +#include<fcntl.h> |
| 3 | +#include<sys/time.h> |
| 4 | +#include<sys/types.h> |
| 5 | +#include<sys/socket.h> |
| 6 | +#include<sys/utsname.h> |
| 7 | +#include<sys/select.h> |
| 8 | +#include<netinet/in.h> |
| 9 | +#include<netinet/tcp.h> |
| 10 | +#include<arpa/inet.h> |
| 11 | +#include<stdio.h> |
| 12 | +#include<netdb.h> |
| 13 | +#include<stdlib.h> |
| 14 | +#include<unistd.h> |
| 15 | +#include<string.h> |
| 16 | +#include<errno.h> |
| 17 | + |
| 18 | +staticvoiddefault_error_handler(charconst*msg,ShubErrorSeverityseverity) |
| 19 | +{ |
| 20 | +perror(msg); |
| 21 | +if (severity==SHUB_FATAL_ERROR) { |
| 22 | +exit(1); |
| 23 | + } |
| 24 | +} |
| 25 | + |
| 26 | + |
| 27 | +voidShubInitPrams(ShubParams*params) |
| 28 | +{ |
| 29 | +memset(params,0,sizeofparams); |
| 30 | +params->buffer_size=64*1025; |
| 31 | +params->port=54321; |
| 32 | +params->queue_size=100; |
| 33 | +params->max_attempts=10; |
| 34 | +params->error_handler=default_error_handler; |
| 35 | +} |
| 36 | + |
| 37 | + |
| 38 | +staticintresolve_host_by_name(constchar*hostname,unsigned*addrs,unsigned*n_addrs) |
| 39 | +{ |
| 40 | +structsockaddr_insin; |
| 41 | +structhostent*hp; |
| 42 | +unsignedi; |
| 43 | + |
| 44 | +sin.sin_addr.s_addr=inet_addr(hostname); |
| 45 | +if (sin.sin_addr.s_addr!=INADDR_NONE) { |
| 46 | +memcpy(&addrs[0],&sin.sin_addr.s_addr,sizeof(sin.sin_addr.s_addr)); |
| 47 | +*n_addrs=1; |
| 48 | +return1; |
| 49 | + } |
| 50 | + |
| 51 | +hp=gethostbyname(hostname); |
| 52 | +if (hp==NULL||hp->h_addrtype!=AF_INET) { |
| 53 | +return0; |
| 54 | + } |
| 55 | +for (i=0;hp->h_addr_list[i]!=NULL&&i<*n_addrs;i++) { |
| 56 | +memcpy(&addrs[i],hp->h_addr_list[i],sizeof(addrs[i])); |
| 57 | + } |
| 58 | +*n_addrs=i; |
| 59 | +return1; |
| 60 | +} |
| 61 | + |
| 62 | + |
| 63 | + |
| 64 | +staticvoidclose_socket(Shub*shub,intfd) |
| 65 | +{ |
| 66 | +inti,max_fd; |
| 67 | +fd_setcopy; |
| 68 | +FD_ZERO(©); |
| 69 | +close(fd); |
| 70 | +for (i=0,max_fd=shub->max_fd;i <=max_fd;i++) { |
| 71 | +if (i!=fd&&FD_ISSET(i,&shub->inset)) { |
| 72 | +FD_SET(i,©); |
| 73 | + } |
| 74 | + } |
| 75 | +FD_COPY(©,&shub->inset); |
| 76 | +} |
| 77 | + |
| 78 | +staticintread_socket(intsd,char*buf,intsize) |
| 79 | +{ |
| 80 | +while (size!=0) { |
| 81 | +intn=recv(sd,buf,size ,0); |
| 82 | +if (n <=0) { |
| 83 | +return0; |
| 84 | + } |
| 85 | +size-=n; |
| 86 | +buf+=n; |
| 87 | + } |
| 88 | +return1; |
| 89 | +} |
| 90 | + |
| 91 | +staticintwrite_socket(intsd,charconst*buf,intsize) |
| 92 | +{ |
| 93 | +while (size!=0) { |
| 94 | +intn=send(sd,buf,size,0); |
| 95 | +if (n <=0) { |
| 96 | +return0; |
| 97 | + } |
| 98 | +size-=n; |
| 99 | +buf+=n; |
| 100 | + } |
| 101 | +return1; |
| 102 | +} |
| 103 | + |
| 104 | + |
| 105 | +staticvoidreconnect(Shub*shub) |
| 106 | +{ |
| 107 | +structsockaddr_insock_inet; |
| 108 | +unsignedaddrs[128]; |
| 109 | +unsignedi,n_addrs=sizeof(addrs) /sizeof(addrs[0]); |
| 110 | +intmax_attemtps=shub->params->max_attempts; |
| 111 | + |
| 112 | +if (shub->output >=0) { |
| 113 | +close_socket(shub,shub->output); |
| 114 | + } |
| 115 | + |
| 116 | +sock_inet.sin_family=AF_INET; |
| 117 | +sock_inet.sin_port=htons(port); |
| 118 | +if (!resolve_host_by_name(host,addrs,&n_addrs)) { |
| 119 | +shub->error_handler("Failed to resolve host by name",SHUB_FATAL_ERROR); |
| 120 | + } |
| 121 | +shub->output=socket(AF_INET,SOCK_STREAM,0); |
| 122 | +if (shub->output<0) { |
| 123 | +shub->error_handler("Failed to create inet socket",SHUB_FATAL_ERROR); |
| 124 | + } |
| 125 | +while (1) { |
| 126 | +intrc=-1; |
| 127 | +for (i=0;i<n_addrs;++i) { |
| 128 | +memcpy(&sock_inet.sin_addr,&addrs[i],sizeofsock_inet.sin_addr); |
| 129 | +do { |
| 130 | +rc=connect(output, (structsockaddr*)&sock_inet,sizeof(sock_inet)); |
| 131 | + }while (rc<0&&errno==EINTR); |
| 132 | + |
| 133 | +if (rc >=0||errno==EINPROGRESS) { |
| 134 | +if (rc >=0) { |
| 135 | + } |
| 136 | +break; |
| 137 | + } |
| 138 | + } |
| 139 | +if (rc<0) { |
| 140 | +if (errno!=ENOENT&&errno!=ECONNREFUSED) { |
| 141 | +shub->error_handler("Connection can not be establish",SHUB_FATAL_ERROR); |
| 142 | + } |
| 143 | +if (max_attempts--!=0) { |
| 144 | +sleep(1); |
| 145 | + }else { |
| 146 | +shub->error_handler("Failed to connect to host",SHUB_FATAL_ERROR); |
| 147 | + } |
| 148 | + }else { |
| 149 | +intoptval=1; |
| 150 | +setsockopt(shub->output,IPPROTO_TCP,TCP_NODELAY, (charconst*)&optval,sizeof(int)); |
| 151 | +FD_SET(shub->output,&inset); |
| 152 | +break; |
| 153 | + } |
| 154 | + } |
| 155 | +} |
| 156 | + |
| 157 | +staticvoidrecovery(Shub*shub) |
| 158 | +{ |
| 159 | +inti,max_fd; |
| 160 | +fd_setokset; |
| 161 | +fd_settryset; |
| 162 | + |
| 163 | +for (i=0,max_fd=shub->max_fd;i <=max_fd;i++) { |
| 164 | +if (FD_ISSET(i,&inset)) { |
| 165 | +structtimevaltm= {0,0}; |
| 166 | +FD_ZERO(&tryset); |
| 167 | +FD_SET(i,&tryset); |
| 168 | +if (select(i+1,&tryset,NULL,NULL,&tm) >=0) { |
| 169 | +FD_SET(i,&okset); |
| 170 | + } |
| 171 | + } |
| 172 | + } |
| 173 | +FD_COPY(&okset,&shub->inset); |
| 174 | +} |
| 175 | + |
| 176 | +voidShubIntialize(Shub*shub,ShubParams*params) |
| 177 | +{ |
| 178 | +structsockaddrsock; |
| 179 | + |
| 180 | +shub->params=params; |
| 181 | +sock.sa_family=AF_UNIX; |
| 182 | +strcpy(sock.sa_data,params->file); |
| 183 | +unlink(params->file); |
| 184 | +shub->input=socket(AF_UNIX,SOCK_STREAM,0); |
| 185 | +if (shub->input<0) { |
| 186 | +shub->error_handler("Failed to create local socket",SHUB_FATAL_ERROR); |
| 187 | + } |
| 188 | +if (bind(shub->input,&sock, ((char*)sock.sa_data- (char*)&sock)+strlen(params->file))<0) { |
| 189 | +shub->error_handler("Failed to bind local socket",SHUB_FATAL_ERROR); |
| 190 | + } |
| 191 | +if (listen(shub->input,params->queue_size)<0) { |
| 192 | +shub->error_handler("Failed to listen local socket"); |
| 193 | + } |
| 194 | +FD_ZERO(&shub->inset); |
| 195 | +FD_SET(shub->input,&shub->inset); |
| 196 | + |
| 197 | +reconnect(shub); |
| 198 | + |
| 199 | +shub->in_buffer=malloc(params->buffer_size); |
| 200 | +shub->out_buffer=malloc(params->buffer_size); |
| 201 | +if (shub->in_buffer==NULL||shub->out_buffer==NULL) { |
| 202 | +shub->error_handler("Failed to allocate buffer",SHUB_FATAL_ERROR); |
| 203 | + } |
| 204 | +} |
| 205 | + |
| 206 | + |
| 207 | +voidShubLoop(Shub*shub) |
| 208 | +{ |
| 209 | +intbuffer_size=shub->params->buffer_size; |
| 210 | + |
| 211 | +while (1) { |
| 212 | +fd_setevents; |
| 213 | +structtimevaltm; |
| 214 | +inti,max_fd,rc; |
| 215 | +unsignedintn,size; |
| 216 | + |
| 217 | +tm.tv_sec=delay/1000; |
| 218 | +tm.tv_usec=delay %1000*1000; |
| 219 | + |
| 220 | + |
| 221 | +FD_COPY(&shub->inset,&events); |
| 222 | +rc=select(shub->max_fd+1,&events,NULL,NULL,shub->in_buffer_used==0 ?NULL :&tm); |
| 223 | +if (rc<0) { |
| 224 | +if (errno!=EINTR) { |
| 225 | +shub->error_handler("Select failed",SHUB_RECOVERABLE_ERROR); |
| 226 | +recovery(shub); |
| 227 | + } |
| 228 | + }else { |
| 229 | +if (rc>0) { |
| 230 | +for (i=0,max_fd=shub->max_fd;i <=max_fd;i++) { |
| 231 | +if (FD_ISSET(i,&events)) { |
| 232 | +if (i==shub->input) { |
| 233 | +ints=accept(i,NULL,NULL); |
| 234 | +if (s<0) { |
| 235 | +shub->error_handler("Failed to accept socket",SHUB_RECOVERABLE_ERROR); |
| 236 | + }else { |
| 237 | +if (s>max_fd) { |
| 238 | +shub->max_fd=s; |
| 239 | + } |
| 240 | +FD_SET(s,&shub->inset); |
| 241 | + } |
| 242 | + }elseif (i==shub->output) { |
| 243 | +intavailable=recv(shub->output,shub->out_buffer+shub->out_buffer_used,buffer_size-shub->out_buffer_used,0); |
| 244 | +intpos=0; |
| 245 | +if (available <=0) { |
| 246 | +pshub->error_handler("Failed to read inet socket",SHUB_RECOVERABLE_ERROR); |
| 247 | +reconnect(shub); |
| 248 | + } |
| 249 | +shub->out_buffer_used+=available; |
| 250 | +while (pos+sizeof(ShubMessageHdr) <=shub->out_buffer_used) { |
| 251 | +ShubMessageHdr*hdr= (ShubMessageHdr*)shub->out_buffer; |
| 252 | +intchan=hdr->chan; |
| 253 | +pos+=sizeof(ShubMessageHdr); |
| 254 | +n=pos+hdr->size <=shub->out_buffer_used ?hdr->size+sizeof(ShubMessageHdr) :shub->out_buffer_used-pos; |
| 255 | +if (!write_socket(chan,hdr,n)) { |
| 256 | +shub->error_handler("Failed to write to local socket",SHUB_RECOVERABLE_ERROR); |
| 257 | +close_socket(shub,chan); |
| 258 | +chan=-1; |
| 259 | + } |
| 260 | +if (n!=hdr->size+sizeof(ShubMessageHdr)) { |
| 261 | +inttail=hdr->size+sizeof(ShubMessageHdr)-n; |
| 262 | +do { |
| 263 | +n=tail<shub->out_buffer_size ?tail :shub->out_buffer_size; |
| 264 | +if (!read_socket(output,out_buffer,n)) { |
| 265 | +shub->error_handler("Failed to read inet socket",SHUB_RECOVERABLE_ERROR); |
| 266 | +reconnect(shub); |
| 267 | +continue; |
| 268 | + } |
| 269 | +if (chan >=0&& !write_socket(chan,out_buffer,n)) { |
| 270 | +shub->error_handler("Failed to write to local socket",SHUB_RECOVERABLE_ERROR); |
| 271 | +close_socket(shub,chan); |
| 272 | +chan=-1; |
| 273 | + } |
| 274 | +tail-=n; |
| 275 | + }while (tail!=0); |
| 276 | + } |
| 277 | + } |
| 278 | +memcpy(shub->out_buffer,shub->out_buffer+pos,shub->out_buffer_used-pos); |
| 279 | +shub->out_buffer_used-=pos; |
| 280 | + }else { |
| 281 | +ShubMessageHdr*hdr= (MessgeHdr*)&shub->in_buffer[shub->in_buffer_used]; |
| 282 | +if (!read_socket(i,hdr,sizeof(ShubMessageHdr))) { |
| 283 | +shub->error_handler("Failed to read local socket",SHUB_RECOVERABLE_ERROR); |
| 284 | +close_socket(shub,i); |
| 285 | + }else { |
| 286 | +unsignedintsize=hdr->size; |
| 287 | +hdr->chan=i; |
| 288 | +if (size+shub->in_buffer_used+sizeof(ShubMessageHdr)>buffer_size) { |
| 289 | +if (shub->in_buffer_used!=0) { |
| 290 | +while (!write_socket(shub->output,shub->in_buffer,shub->in_buffer_used)) { |
| 291 | +shub->error_handler("Failed to write to inet socket",SHUB_RECOVERABLE_ERROR); |
| 292 | +reconnect(shub); |
| 293 | + } |
| 294 | +memcpy(shub->in_buffer,shub->in_buffer+shub->in_buffer_used,sizeof(ShubMessageHdr)); |
| 295 | +shub->in_buffer_used=0; |
| 296 | + } |
| 297 | + } |
| 298 | +shub->in_buffer_used+=sizeof(ShubMessageHdr); |
| 299 | + |
| 300 | +while (1) { |
| 301 | +unsignedintn=size+shub->in_buffer_used>buffer_size ?buffer_size-shub->in_buffer_used :size; |
| 302 | +if (!read_socket(i,shub->in_buffer+shub->in_buffer_used,n)) { |
| 303 | +shub->error_handler("Failed to read local socket",SHUB_RECOVERABLE_ERROR); |
| 304 | +close_socket(shub,i); |
| 305 | +break; |
| 306 | + }else { |
| 307 | +if (n!=size) { |
| 308 | +while (!write_socket(output,in_buffer,n)) { |
| 309 | +shub->error_handler("Failed to write to inet socket",SHUB_RECOVERABLE_ERROR); |
| 310 | +reconnect(shub); |
| 311 | + } |
| 312 | +size-=n; |
| 313 | +shub->in_buffer_used=0; |
| 314 | + }else { |
| 315 | +shub->in_buffer_used+=n; |
| 316 | +break; |
| 317 | + } |
| 318 | + } |
| 319 | + } |
| 320 | + } |
| 321 | + } |
| 322 | + } |
| 323 | + } |
| 324 | + }elseif (shub->in_buffer_used!=0) { |
| 325 | +while (!write_socket(shub->output,shub->in_buffer,shub->in_buffer_used)) { |
| 326 | +shub->error_handler("Failed to write to inet socket",SHUB_RECOVERABLE_ERROR); |
| 327 | +reconnect(shub); |
| 328 | + } |
| 329 | + } |
| 330 | + } |
| 331 | + } |
| 332 | +} |
| 333 | + |