Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit2bd9e41

Browse files
committed
Support frontend-backend protocol communication using a shm_mq.
A background worker can use pq_redirect_to_shm_mq() to direct protocolthat would normally be sent to the frontend to a shm_mq so that anotherprocess may read them.The receiving process may use pq_parse_errornotice() to parse anErrorResponse or NoticeResponse from the background worker and, ifit wishes, ThrowErrorData() to propagate the error (with or withoutfurther modification).Patch by me. Review by Andres Freund.
1 parent252e652 commit2bd9e41

File tree

9 files changed

+428
-47
lines changed

9 files changed

+428
-47
lines changed

‎src/backend/libpq/Makefile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ include $(top_builddir)/src/Makefile.global
1515
# be-fsstubs is here for historical reasons, probably belongs elsewhere
1616

1717
OBJS = be-fsstubs.o be-secure.o auth.o crypt.o hba.o ip.o md5.o pqcomm.o\
18-
pqformat.o pqsignal.o
18+
pqformat.opqmq.opqsignal.o
1919

2020
ifeq ($(with_openssl),yes)
2121
OBJS += be-secure-openssl.o

‎src/backend/libpq/pqcomm.c

Lines changed: 62 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -102,7 +102,6 @@
102102
intUnix_socket_permissions;
103103
char*Unix_socket_group;
104104

105-
106105
/* Where the Unix socket files are (list of palloc'd strings) */
107106
staticList*sock_paths=NIL;
108107

@@ -134,16 +133,38 @@ static bool DoingCopyOut;
134133

135134

136135
/* Internal functions */
137-
staticvoidpq_close(intcode,Datumarg);
136+
staticvoidsocket_comm_reset(void);
137+
staticvoidsocket_close(intcode,Datumarg);
138+
staticvoidsocket_set_nonblocking(boolnonblocking);
139+
staticintsocket_flush(void);
140+
staticintsocket_flush_if_writable(void);
141+
staticboolsocket_is_send_pending(void);
142+
staticintsocket_putmessage(charmsgtype,constchar*s,size_tlen);
143+
staticvoidsocket_putmessage_noblock(charmsgtype,constchar*s,size_tlen);
144+
staticvoidsocket_startcopyout(void);
145+
staticvoidsocket_endcopyout(boolerrorAbort);
138146
staticintinternal_putbytes(constchar*s,size_tlen);
139147
staticintinternal_flush(void);
140-
staticvoidpq_set_nonblocking(boolnonblocking);
148+
staticvoidsocket_set_nonblocking(boolnonblocking);
141149

142150
#ifdefHAVE_UNIX_SOCKETS
143151
staticintLock_AF_UNIX(char*unixSocketDir,char*unixSocketPath);
144152
staticintSetup_AF_UNIX(char*sock_path);
145153
#endif/* HAVE_UNIX_SOCKETS */
146154

155+
PQcommMethodsPQcommSocketMethods;
156+
157+
staticPQcommMethodsPqCommSocketMethods= {
158+
socket_comm_reset,
159+
socket_flush,
160+
socket_flush_if_writable,
161+
socket_is_send_pending,
162+
socket_putmessage,
163+
socket_putmessage_noblock,
164+
socket_startcopyout,
165+
socket_endcopyout
166+
};
167+
147168

148169
/* --------------------------------
149170
*pq_init - initialize libpq at backend startup
@@ -152,24 +173,25 @@ static intSetup_AF_UNIX(char *sock_path);
152173
void
153174
pq_init(void)
154175
{
176+
PqCommMethods=&PqCommSocketMethods;
155177
PqSendBufferSize=PQ_SEND_BUFFER_SIZE;
156178
PqSendBuffer=MemoryContextAlloc(TopMemoryContext,PqSendBufferSize);
157179
PqSendPointer=PqSendStart=PqRecvPointer=PqRecvLength=0;
158180
PqCommBusy= false;
159181
DoingCopyOut= false;
160-
on_proc_exit(pq_close,0);
182+
on_proc_exit(socket_close,0);
161183
}
162184

163185
/* --------------------------------
164-
*pq_comm_reset - reset libpq during error recovery
186+
*socket_comm_reset - reset libpq during error recovery
165187
*
166188
* This is called from error recovery at the outer idle loop. It's
167189
* just to get us out of trouble if we somehow manage to elog() from
168190
* inside a pqcomm.c routine (which ideally will never happen, but...)
169191
* --------------------------------
170192
*/
171-
void
172-
pq_comm_reset(void)
193+
staticvoid
194+
socket_comm_reset(void)
173195
{
174196
/* Do not throw away pending data, but do reset the busy flag */
175197
PqCommBusy= false;
@@ -178,14 +200,14 @@ pq_comm_reset(void)
178200
}
179201

180202
/* --------------------------------
181-
*pq_close - shutdown libpq at backend exit
203+
*socket_close - shutdown libpq at backend exit
182204
*
183205
* Note: in a standalone backend MyProcPort will be null,
184206
* don't crash during exit...
185207
* --------------------------------
186208
*/
187209
staticvoid
188-
pq_close(intcode,Datumarg)
210+
socket_close(intcode,Datumarg)
189211
{
190212
if (MyProcPort!=NULL)
191213
{
@@ -783,15 +805,20 @@ TouchSocketFiles(void)
783805
*/
784806

785807
/* --------------------------------
786-
*pq_set_nonblocking - set socket blocking/non-blocking
808+
*socket_set_nonblocking - set socket blocking/non-blocking
787809
*
788810
* Sets the socket non-blocking if nonblocking is TRUE, or sets it
789811
* blocking otherwise.
790812
* --------------------------------
791813
*/
792814
staticvoid
793-
pq_set_nonblocking(boolnonblocking)
815+
socket_set_nonblocking(boolnonblocking)
794816
{
817+
if (MyProcPort==NULL)
818+
ereport(ERROR,
819+
(errcode(ERRCODE_CONNECTION_DOES_NOT_EXIST),
820+
errmsg("there is no client connection")));
821+
795822
if (MyProcPort->noblock==nonblocking)
796823
return;
797824

@@ -844,7 +871,7 @@ pq_recvbuf(void)
844871
}
845872

846873
/* Ensure that we're in blocking mode */
847-
pq_set_nonblocking(false);
874+
socket_set_nonblocking(false);
848875

849876
/* Can fill buffer from PqRecvLength and upwards */
850877
for (;;)
@@ -935,7 +962,7 @@ pq_getbyte_if_available(unsigned char *c)
935962
}
936963

937964
/* Put the socket into non-blocking mode */
938-
pq_set_nonblocking(true);
965+
socket_set_nonblocking(true);
939966

940967
r=secure_read(MyProcPort,c,1);
941968
if (r<0)
@@ -1194,7 +1221,7 @@ internal_putbytes(const char *s, size_t len)
11941221
/* If buffer is full, then flush it out */
11951222
if (PqSendPointer >=PqSendBufferSize)
11961223
{
1197-
pq_set_nonblocking(false);
1224+
socket_set_nonblocking(false);
11981225
if (internal_flush())
11991226
returnEOF;
12001227
}
@@ -1210,21 +1237,21 @@ internal_putbytes(const char *s, size_t len)
12101237
}
12111238

12121239
/* --------------------------------
1213-
*pq_flush- flush pending output
1240+
*socket_flush- flush pending output
12141241
*
12151242
*returns 0 if OK, EOF if trouble
12161243
* --------------------------------
12171244
*/
1218-
int
1219-
pq_flush(void)
1245+
staticint
1246+
socket_flush(void)
12201247
{
12211248
intres;
12221249

12231250
/* No-op if reentrant call */
12241251
if (PqCommBusy)
12251252
return0;
12261253
PqCommBusy= true;
1227-
pq_set_nonblocking(false);
1254+
socket_set_nonblocking(false);
12281255
res=internal_flush();
12291256
PqCommBusy= false;
12301257
returnres;
@@ -1310,8 +1337,8 @@ internal_flush(void)
13101337
* Returns 0 if OK, or EOF if trouble.
13111338
* --------------------------------
13121339
*/
1313-
int
1314-
pq_flush_if_writable(void)
1340+
staticint
1341+
socket_flush_if_writable(void)
13151342
{
13161343
intres;
13171344

@@ -1324,7 +1351,7 @@ pq_flush_if_writable(void)
13241351
return0;
13251352

13261353
/* Temporarily put the socket into non-blocking mode */
1327-
pq_set_nonblocking(true);
1354+
socket_set_nonblocking(true);
13281355

13291356
PqCommBusy= true;
13301357
res=internal_flush();
@@ -1333,11 +1360,11 @@ pq_flush_if_writable(void)
13331360
}
13341361

13351362
/* --------------------------------
1336-
*pq_is_send_pending- is there any pending data in the output buffer?
1363+
*socket_is_send_pending- is there any pending data in the output buffer?
13371364
* --------------------------------
13381365
*/
1339-
bool
1340-
pq_is_send_pending(void)
1366+
staticbool
1367+
socket_is_send_pending(void)
13411368
{
13421369
return (PqSendStart<PqSendPointer);
13431370
}
@@ -1351,7 +1378,7 @@ pq_is_send_pending(void)
13511378

13521379

13531380
/* --------------------------------
1354-
*pq_putmessage- send a normal message (suppressed in COPY OUT mode)
1381+
*socket_putmessage- send a normal message (suppressed in COPY OUT mode)
13551382
*
13561383
*If msgtype is not '\0', it is a message type code to place before
13571384
*the message body. If msgtype is '\0', then the message has no type
@@ -1375,8 +1402,8 @@ pq_is_send_pending(void)
13751402
*returns 0 if OK, EOF if trouble
13761403
* --------------------------------
13771404
*/
1378-
int
1379-
pq_putmessage(charmsgtype,constchar*s,size_tlen)
1405+
staticint
1406+
socket_putmessage(charmsgtype,constchar*s,size_tlen)
13801407
{
13811408
if (DoingCopyOut||PqCommBusy)
13821409
return0;
@@ -1408,8 +1435,8 @@ pq_putmessage(char msgtype, const char *s, size_t len)
14081435
*If the output buffer is too small to hold the message, the buffer
14091436
*is enlarged.
14101437
*/
1411-
void
1412-
pq_putmessage_noblock(charmsgtype,constchar*s,size_tlen)
1438+
staticvoid
1439+
socket_putmessage_noblock(charmsgtype,constchar*s,size_tlen)
14131440
{
14141441
intresPG_USED_FOR_ASSERTS_ONLY;
14151442
intrequired;
@@ -1431,18 +1458,18 @@ pq_putmessage_noblock(char msgtype, const char *s, size_t len)
14311458

14321459

14331460
/* --------------------------------
1434-
*pq_startcopyout - inform libpq that an old-style COPY OUT transfer
1461+
*socket_startcopyout - inform libpq that an old-style COPY OUT transfer
14351462
*is beginning
14361463
* --------------------------------
14371464
*/
1438-
void
1439-
pq_startcopyout(void)
1465+
staticvoid
1466+
socket_startcopyout(void)
14401467
{
14411468
DoingCopyOut= true;
14421469
}
14431470

14441471
/* --------------------------------
1445-
*pq_endcopyout- end an old-style COPY OUT transfer
1472+
*socket_endcopyout- end an old-style COPY OUT transfer
14461473
*
14471474
*If errorAbort is indicated, we are aborting a COPY OUT due to an error,
14481475
*and must send a terminator line. Since a partial data line might have
@@ -1451,8 +1478,8 @@ pq_startcopyout(void)
14511478
*not allow binary transfers, so a textual terminator is always correct.
14521479
* --------------------------------
14531480
*/
1454-
void
1455-
pq_endcopyout(boolerrorAbort)
1481+
staticvoid
1482+
socket_endcopyout(boolerrorAbort)
14561483
{
14571484
if (!DoingCopyOut)
14581485
return;
@@ -1462,7 +1489,6 @@ pq_endcopyout(bool errorAbort)
14621489
DoingCopyOut= false;
14631490
}
14641491

1465-
14661492
/*
14671493
* Support for TCP Keepalive parameters
14681494
*/

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp