|
| 1 | +/* |
| 2 | + * multimaster.c |
| 3 | + * |
| 4 | + * Multimaster based on logical replication |
| 5 | + * |
| 6 | + */ |
| 7 | + |
| 8 | +#include<unistd.h> |
| 9 | +#include<sys/time.h> |
| 10 | +#include<time.h> |
| 11 | + |
| 12 | +#include"postgres.h" |
| 13 | +#include"fmgr.h" |
| 14 | +#include"miscadmin.h" |
| 15 | +#include"libpq-fe.h" |
| 16 | +#include"postmaster/postmaster.h" |
| 17 | +#include"postmaster/bgworker.h" |
| 18 | +#include"storage/s_lock.h" |
| 19 | +#include"storage/spin.h" |
| 20 | +#include"storage/lmgr.h" |
| 21 | +#include"storage/shmem.h" |
| 22 | +#include"storage/ipc.h" |
| 23 | +#include"access/xlogdefs.h" |
| 24 | +#include"access/xact.h" |
| 25 | +#include"access/xtm.h" |
| 26 | +#include"access/transam.h" |
| 27 | +#include"access/subtrans.h" |
| 28 | +#include"access/commit_ts.h" |
| 29 | +#include"access/xlog.h" |
| 30 | +#include"storage/proc.h" |
| 31 | +#include"storage/procarray.h" |
| 32 | +#include"executor/executor.h" |
| 33 | +#include"access/twophase.h" |
| 34 | +#include"utils/guc.h" |
| 35 | +#include"utils/hsearch.h" |
| 36 | +#include"utils/tqual.h" |
| 37 | +#include"utils/array.h" |
| 38 | +#include"utils/builtins.h" |
| 39 | +#include"utils/memutils.h" |
| 40 | +#include"commands/dbcommands.h" |
| 41 | +#include"miscadmin.h" |
| 42 | +#include"postmaster/autovacuum.h" |
| 43 | +#include"storage/pmsignal.h" |
| 44 | +#include"storage/proc.h" |
| 45 | +#include"utils/syscache.h" |
| 46 | +#include"replication/walsender.h" |
| 47 | +#include"replication/slot.h" |
| 48 | +#include"port/atomics.h" |
| 49 | +#include"tcop/utility.h" |
| 50 | + |
| 51 | +#include"multimaster.h" |
| 52 | + |
| 53 | +#defineMAX_CONNECT_ATTEMPTS 10 |
| 54 | +#defineBUFFER_SIZE 1024 |
| 55 | +#defineBUFFER_SIZE 1024 |
| 56 | + |
| 57 | +typedefstruct |
| 58 | +{ |
| 59 | +TransactionIdxid; |
| 60 | +csn_tcsn; |
| 61 | +}DtmCommitMessage; |
| 62 | + |
| 63 | +typedefstruct |
| 64 | +{ |
| 65 | +DtmCommitMessagedata[BUFFER_SIZE]; |
| 66 | +intused; |
| 67 | +}DtmBuffer; |
| 68 | + |
| 69 | +staticint*sockets; |
| 70 | + |
| 71 | +staticBackgroundWorkerDtmSender= { |
| 72 | +"mm-sender", |
| 73 | +0,/* do not need connection to the database */ |
| 74 | +BgWorkerStart_PostmasterStart, |
| 75 | +1,/* restrart in one second (is it possible to restort immediately?) */ |
| 76 | +DtmTransSender |
| 77 | +}; |
| 78 | + |
| 79 | +staticBackgroundWorkerDtmRecevier= { |
| 80 | +"mm-receiver", |
| 81 | +0,/* do not need connection to the database */ |
| 82 | +BgWorkerStart_PostmasterStart, |
| 83 | +1,/* restrart in one second (is it possible to restort immediately?) */ |
| 84 | +DtmTransReceiver |
| 85 | +}; |
| 86 | + |
| 87 | +voidMMArbiterInitialize() |
| 88 | +{ |
| 89 | +RegisterBackgroundWorker(&DtmSender); |
| 90 | +RegisterBackgroundWorker(&DtmRecevier); |
| 91 | +} |
| 92 | + |
| 93 | + |
| 94 | +staticintresolve_host_by_name(constchar*hostname,unsigned*addrs,unsigned*n_addrs) |
| 95 | +{ |
| 96 | +structsockaddr_insin; |
| 97 | +structhostent*hp; |
| 98 | +unsignedi; |
| 99 | + |
| 100 | +sin.sin_addr.s_addr=inet_addr(hostname); |
| 101 | +if (sin.sin_addr.s_addr!=INADDR_NONE) { |
| 102 | +memcpy(&addrs[0],&sin.sin_addr.s_addr,sizeof(sin.sin_addr.s_addr)); |
| 103 | +*n_addrs=1; |
| 104 | +return1; |
| 105 | + } |
| 106 | + |
| 107 | +hp=gethostbyname(hostname); |
| 108 | +if (hp==NULL||hp->h_addrtype!=AF_INET) { |
| 109 | +return0; |
| 110 | + } |
| 111 | +for (i=0;hp->h_addr_list[i]!=NULL&&i<*n_addrs;i++) { |
| 112 | +memcpy(&addrs[i],hp->h_addr_list[i],sizeof(addrs[i])); |
| 113 | + } |
| 114 | +*n_addrs=i; |
| 115 | +return1; |
| 116 | +} |
| 117 | + |
| 118 | +#ifdefUSE_EPOLL |
| 119 | +staticintepollfd; |
| 120 | +#else |
| 121 | +staticintmax_fd; |
| 122 | +staticfd_setinset; |
| 123 | +#endif |
| 124 | + |
| 125 | +inlinevoidregisterSocket(intfd,inti) |
| 126 | +{ |
| 127 | +#ifdefUSE_EPOLL |
| 128 | +structepoll_eventev; |
| 129 | +ev.events=EPOLLIN; |
| 130 | +ev.data.u32=i; |
| 131 | +if (epoll_ctl(epollfd,EPOLL_CTL_ADD,fd,&ev)<0) { |
| 132 | +charbuf[ERR_BUF_SIZE]; |
| 133 | +sprintf(buf,"Failed to add socket %d to epoll set",fd); |
| 134 | +shub->params->error_handler(buf,SHUB_FATAL_ERROR); |
| 135 | + } |
| 136 | +#else |
| 137 | +FD_SET(fd,&inset); |
| 138 | +if (fd>max_fd) { |
| 139 | +max_fd=fd; |
| 140 | + } |
| 141 | +#endif |
| 142 | +} |
| 143 | + |
| 144 | + |
| 145 | + |
| 146 | +staticintconnectSocket(charconst*host,intport) |
| 147 | +{ |
| 148 | +structsockaddr_insock_inet; |
| 149 | +unsignedaddrs[128]; |
| 150 | +unsignedi,n_addrs=sizeof(addrs) /sizeof(addrs[0]); |
| 151 | +intmax_attempts=MAX_CONNECT_ATTEMPTS; |
| 152 | +intsd; |
| 153 | + |
| 154 | +sock_inet.sin_family=AF_INET; |
| 155 | +sock_inet.sin_port=htons(port); |
| 156 | + |
| 157 | +if (!resolve_host_by_name(host,addrs,&n_addrs)) { |
| 158 | +elog(ERROR,"Failed to resolve host '%s' by name",host); |
| 159 | +} |
| 160 | +sd=socket(AF_INET,SOCK_STREAM,0); |
| 161 | +if (sd<0) { |
| 162 | +elog(ERROR,"Failed to create socket: %d",errno); |
| 163 | + } |
| 164 | +while (1) { |
| 165 | +intrc=-1; |
| 166 | +for (i=0;i<n_addrs;++i) { |
| 167 | +memcpy(&sock_inet.sin_addr,&addrs[i],sizeofsock_inet.sin_addr); |
| 168 | +do { |
| 169 | +rc=connect(sd, (structsockaddr*)&sock_inet,sizeof(sock_inet)); |
| 170 | +}while (rc<0&&errno==EINTR); |
| 171 | + |
| 172 | +if (rc >=0||errno==EINPROGRESS) { |
| 173 | +break; |
| 174 | +} |
| 175 | +} |
| 176 | +if (rc<0) { |
| 177 | +if ((errno!=ENOENT&&errno!=ECONNREFUSED&&errno!=EINPROGRESS)||max_attempts==0) { |
| 178 | +elog(ERROR,"Sockhub failed to connect to %s:%d: %d",host,port,errno); |
| 179 | +}else { |
| 180 | +max_attempts-=1; |
| 181 | +sleep(1); |
| 182 | +} |
| 183 | +continue; |
| 184 | +}else { |
| 185 | +intoptval=1; |
| 186 | +setsockopt(shub->output,IPPROTO_TCP,TCP_NODELAY, (charconst*)&optval,sizeof(optval)); |
| 187 | +returnsd; |
| 188 | +} |
| 189 | + } |
| 190 | +} |
| 191 | + |
| 192 | +staticvoidopenConnections() |
| 193 | +{ |
| 194 | +intnNodes=dtm->nNodes; |
| 195 | +inti; |
| 196 | +char*connStr=pstrdup(MMConnStrs); |
| 197 | + |
| 198 | +sockets= (int*)palloc(sizeof(int)*nNodes); |
| 199 | + |
| 200 | +for (i=0;i<nNodes;i++) { |
| 201 | +char*host=strstr(connStr,"host="); |
| 202 | +char*end; |
| 203 | +if (host==NULL) { |
| 204 | +elog(ERROR,"Invalid connection string: '%s'",MMConnStrs); |
| 205 | +} |
| 206 | +for (end=host+5;*end!=' '&&*end!=','&&end!='\0';end++); |
| 207 | +*end='\0'; |
| 208 | +connStr=end+1; |
| 209 | +sockets[i]=i+1!=MMNodeId ?connectSocket(host,MMArbiterPort+i) :-1; |
| 210 | +} |
| 211 | +} |
| 212 | + |
| 213 | +staticvoidacceptConnections() |
| 214 | +{ |
| 215 | +intnNodes=dtm->nNodes-1; |
| 216 | +sockaddr_insock_inet; |
| 217 | +inti; |
| 218 | +intsd; |
| 219 | +inton=1; |
| 220 | + |
| 221 | +sockets= (int*)palloc(sizeof(int)*nNodes); |
| 222 | + |
| 223 | +sock_inet.sin_family=AF_INET; |
| 224 | +sock_inet.sin_addr.s_addr=htonl(INADDR_ANY); |
| 225 | +sock_inet.sin_port=htons(MMArbiterPort+MMNodeId); |
| 226 | + |
| 227 | +sd=socket(u.sock.sa_family,SOCK_STREAM,0); |
| 228 | +if (sd<0) { |
| 229 | +elog(ERROR,"Failed to create socket: %d",errno); |
| 230 | +} |
| 231 | +setsockopt(sd,SOL_SOCKET,SO_REUSEADDR, (char*)&on,sizeofon); |
| 232 | + |
| 233 | +if (bind(fd, (sockaddr*)&sock_init,nNodes-1)<0) { |
| 234 | +elog(ERROR,"Failed to bind socket: %d",errno); |
| 235 | +} |
| 236 | + |
| 237 | +for (i=0;i<nNodes;i++) { |
| 238 | +intfd=accept(sd,NULL,NULL); |
| 239 | +if (fd<0) { |
| 240 | +elog(ERROR,"Failed to accept socket: %d",errno); |
| 241 | +} |
| 242 | +registerSocket(fd,i); |
| 243 | +sockets[i]=fd; |
| 244 | +} |
| 245 | +} |
| 246 | + |
| 247 | +staticvoidWriteSocket(intsd,voidconst*buf,intsize) |
| 248 | +{ |
| 249 | +char*src= (char*)buf; |
| 250 | +while (size!=0) { |
| 251 | +intn=send(sd,src,size,0); |
| 252 | +if (n <=0) { |
| 253 | +return0; |
| 254 | + } |
| 255 | +size-=n; |
| 256 | +src+=n; |
| 257 | + } |
| 258 | +} |
| 259 | + |
| 260 | +staticintReadSocket(intsd,void*buf,intbuf_size) |
| 261 | +{ |
| 262 | +intrc=recv(sd,buf,buf_size,0); |
| 263 | +if (rc <=0) { |
| 264 | +elog(ERROR,"Arbiter failed to read socket: %d",rc); |
| 265 | +} |
| 266 | +returnrc; |
| 267 | +} |
| 268 | + |
| 269 | + |
| 270 | +staticvoidDtmTransSender(Datumarg) |
| 271 | +{ |
| 272 | +intnNodes=dtm->nNodes; |
| 273 | +inti; |
| 274 | +DtmTxBuffer*txBuffer= (DtmTxBuffer*)palloc(sizeof(DtmTxBuffer)*nNodes); |
| 275 | + |
| 276 | +sockets= (int*)palloc(sizeof(int)*nNodes); |
| 277 | + |
| 278 | +openConnections(); |
| 279 | + |
| 280 | +for (i=0;i<nNodes;i++) { |
| 281 | +txBuffer[i].used=0; |
| 282 | +} |
| 283 | + |
| 284 | +while (true) { |
| 285 | +DtmTransState*ts; |
| 286 | +PGSemaphoreLock(&dtm->semphore); |
| 287 | +CHECK_FOR_INTERRUPTS(); |
| 288 | + |
| 289 | +SpinLockAcquire(&dtm->spinlock); |
| 290 | +ts=dtm->pendingTransactions; |
| 291 | +dtm->pendingTransactions=NULL; |
| 292 | +SpinLockRelease(&dtm->spinlock); |
| 293 | + |
| 294 | +for (;ts!=NULL;ts=ts->nextPending) { |
| 295 | +i=ts->gtid.node-1; |
| 296 | +Assert(i!=MMNodeId); |
| 297 | +if (txBuffer[i].used==BUFFER_SIZE) { |
| 298 | +WriteSocket(sockets[i],txBuffer[i].data,txBuffer[i].used*sizeof(DtmCommitRequest)); |
| 299 | +txBuffer[i].used=0; |
| 300 | +} |
| 301 | +txBuffer[i].data[txBuffer[i].used].xid=ts->xid; |
| 302 | +txBuffer[i].data[txBuffer[i].used].csn=ts->csn; |
| 303 | +txBuffer[i].used+=1; |
| 304 | +} |
| 305 | +for (i=0;i<nNodes;i++) { |
| 306 | +if (txBuffer[i].used!=0) { |
| 307 | +WriteSocket(sockets[i],txBuffer[i].data,txBuffer[i].used*sizeof(DtmCommitRequest)); |
| 308 | +txBuffer[i].used=0; |
| 309 | +} |
| 310 | +} |
| 311 | +} |
| 312 | +} |
| 313 | + |
| 314 | +staticvoidDtmTransReceiver(Datumarg) |
| 315 | +{ |
| 316 | +intnNodes=dtm->nNodes-1; |
| 317 | +inti,j,rc; |
| 318 | +intrxBufPos=0; |
| 319 | +DtmBuffer*rxBuffer= (DtmBuffer*)palloc(sizeof(DtmBuffer)*nNodes); |
| 320 | +HTAB*xid2state; |
| 321 | + |
| 322 | +#ifdefUSE_EPOLL |
| 323 | +structepoll_event*events= (structepoll_event*)palloc(SIZEOF(structepoll_event)*nNodes); |
| 324 | +epollfd=epoll_create(nNodes); |
| 325 | +#else |
| 326 | +FD_ZERO(&inset); |
| 327 | +max_fd=0; |
| 328 | +#endif |
| 329 | + |
| 330 | +acceptConnections(); |
| 331 | +xid2state=MMCreateHash(); |
| 332 | + |
| 333 | +for (i=0;i<nNodes;i++) { |
| 334 | +txBuffer[i].used=0; |
| 335 | +} |
| 336 | + |
| 337 | +while (true) { |
| 338 | +#ifdefUSE_EPOLL |
| 339 | +rc=epoll_wait(epollfd,events,MAX_EVENTS,shub->in_buffer_used==0 ?-1 :shub->params->delay); |
| 340 | +if (rc<0) { |
| 341 | +elog(ERROR,"epoll failed: %d",errno); |
| 342 | +} |
| 343 | +for (j=0;j<rc;j++) { |
| 344 | +i=events[j].data.u32; |
| 345 | +if (events[j].events&EPOLLERR) { |
| 346 | +structsockaddr_ininsock; |
| 347 | +socklen_tlen=sizeof(insock); |
| 348 | +getpeername(fd, (structsockaddr*)&insock,&len); |
| 349 | +elog(WARNING,"Loose connection with %s",inet_ntoa(insock.sin_addr_)); |
| 350 | +epoll_ctl(epollfd,EPOLL_CTL_DEL,fd,NULL); |
| 351 | +} |
| 352 | +elseif (events[j].events&EPOLLIN) |
| 353 | +#else |
| 354 | +fd_setevents; |
| 355 | +events=inset; |
| 356 | +rc=select(max_fd+1,&events,NULL,NULL,NULL); |
| 357 | +if (rc<0) { |
| 358 | +elog(ERROR,"select failed: %d",errno); |
| 359 | +} |
| 360 | +for (i=0;i<nNodes;i++) { |
| 361 | +if (FD_ISSET(sockets[i],&events)) |
| 362 | +#endif |
| 363 | +{ |
| 364 | +intnResponses; |
| 365 | +rxBuffer[i].used+=ReadSocket(sockets[i], (char*)rxBuffer[i].data+rxBuffer[i].used,RX_BUFFER_SIZE-rxBufPos); |
| 366 | +nResponses=rxBuffer[i].used/sizeof(DtmCommitRequest); |
| 367 | + |
| 368 | +LWLockAcquire(&dtm->hashLock,LW_SHARED); |
| 369 | + |
| 370 | +for (j=0;j<nResponses;j++) { |
| 371 | +DtmCommitRequest*req=&rxBuffer[i].data[j]; |
| 372 | +DtmTransState*ts= (DtmTransState*)hash_search(xid2state,&req->xid,HASH_FIND,NULL); |
| 373 | +Assert(ts!=NULL); |
| 374 | +if (req->csn>ts->csn) { |
| 375 | +ts->csn=req->csn; |
| 376 | +} |
| 377 | +if (ts->nVotes==dtm->nNodes-1) { |
| 378 | +SetLatch(&ProcGlobal->allProcs[ts->pid].procLatch); |
| 379 | +} |
| 380 | +} |
| 381 | +if (rxBuffer[i].used!=nResponses*sizeof(DtmCommitRequest)) { |
| 382 | +rxBuffer[i].used-=nResponses*sizeof(DtmCommitRequest); |
| 383 | +memmove(rxBuffer[i].data, (char*)rxBuffer[i].data+nResponses*sizeof(DtmCommitRequest),rxBuffer[i].used); |
| 384 | +} |
| 385 | +} |
| 386 | +} |
| 387 | +} |
| 388 | +} |
| 389 | + |