Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit10c0558

Browse files
committed
Fix several mistakes around parallel workers and client_encoding.
Previously, workers sent data to the leader using the client encoding.That mostly worked, but the leader the converted the data back to theserver encoding. Since not all encoding conversions are reversible,that could provoke failures. Fix by using the database encoding forall communication between worker and leader.Also, while temporary changes to GUC settings, as from the SET clauseof a function, are in general OK for parallel query, changingclient_encoding this way inside of a parallel worker is not OK.Previously, that would have confused the leader; with these changes,it would not confuse the leader, but it wouldn't do anything either.So refuse such changes in parallel workers.Also, the previous code naively assumed that when it received aNotifyResonse from the worker, it could pass that directly back to theuser. But now that worker-to-leader communication always uses thedatabase encoding, that's clearly no longer correct - though,actually, the old way was always broken for V2 clients. Sodisassemble and reconstitute the message instead.Issues reported by Peter Eisentraut. Patch by me, reviewed byPeter Eisentraut.
1 parentf8c5855 commit10c0558

File tree

7 files changed

+78
-6
lines changed

7 files changed

+78
-6
lines changed

‎src/backend/access/transam/parallel.c

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -810,7 +810,17 @@ HandleParallelMessage(ParallelContext *pcxt, int i, StringInfo msg)
810810
case'A':/* NotifyResponse */
811811
{
812812
/* Propagate NotifyResponse. */
813-
pq_putmessage(msg->data[0],&msg->data[1],msg->len-1);
813+
int32pid;
814+
constchar*channel;
815+
constchar*payload;
816+
817+
pid=pq_getmsgint(msg,4);
818+
channel=pq_getmsgrawstring(msg);
819+
payload=pq_getmsgrawstring(msg);
820+
pq_endmessage(msg);
821+
822+
NotifyMyFrontEnd(channel,payload,pid);
823+
814824
break;
815825
}
816826

@@ -988,6 +998,12 @@ ParallelWorkerMain(Datum main_arg)
988998
BackgroundWorkerInitializeConnectionByOid(fps->database_id,
989999
fps->authenticated_user_id);
9901000

1001+
/*
1002+
* Set the client encoding to the database encoding, since that is what
1003+
* the leader will expect.
1004+
*/
1005+
SetClientEncoding(GetDatabaseEncoding());
1006+
9911007
/* Restore GUC values from launching backend. */
9921008
gucspace=shm_toc_lookup(toc,PARALLEL_KEY_GUC);
9931009
Assert(gucspace!=NULL);

‎src/backend/commands/async.c

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -390,9 +390,6 @@ static bool asyncQueueProcessPageEntries(volatile QueuePosition *current,
390390
char*page_buffer);
391391
staticvoidasyncQueueAdvanceTail(void);
392392
staticvoidProcessIncomingNotify(void);
393-
staticvoidNotifyMyFrontEnd(constchar*channel,
394-
constchar*payload,
395-
int32srcPid);
396393
staticboolAsyncExistsPendingNotify(constchar*channel,constchar*payload);
397394
staticvoidClearPendingActionsAndNotifies(void);
398395

@@ -2076,7 +2073,7 @@ ProcessIncomingNotify(void)
20762073
/*
20772074
* Send NOTIFY message to my front end.
20782075
*/
2079-
staticvoid
2076+
void
20802077
NotifyMyFrontEnd(constchar*channel,constchar*payload,int32srcPid)
20812078
{
20822079
if (whereToSendOutput==DestRemote)

‎src/backend/commands/variable.c

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -755,6 +755,30 @@ assign_client_encoding(const char *newval, void *extra)
755755
{
756756
intencoding=*((int*)extra);
757757

758+
/*
759+
* Parallel workers send data to the leader, not the client. They always
760+
* send data using the database encoding.
761+
*/
762+
if (IsParallelWorker())
763+
{
764+
/*
765+
* During parallel worker startup, we want to accept the leader's
766+
* client_encoding setting so that anyone who looks at the value in
767+
* the worker sees the same value that they would see in the leader.
768+
*/
769+
if (InitializingParallelWorker)
770+
return;
771+
772+
/*
773+
* A change other than during startup, for example due to a SET clause
774+
* attached to a function definition, should be rejected, as there is
775+
* nothing we can do inside the worker to make it take effect.
776+
*/
777+
ereport(ERROR,
778+
(errcode(ERRCODE_INVALID_TRANSACTION_STATE),
779+
errmsg("cannot change client_encoding in a parallel worker")));
780+
}
781+
758782
/* We do not expect an error if PrepareClientEncoding succeeded */
759783
if (SetClientEncoding(encoding)<0)
760784
elog(LOG,"SetClientEncoding(%d) failed",encoding);

‎src/backend/libpq/pqformat.c

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@
6565
*pq_copymsgbytes - copy raw data from a message buffer
6666
*pq_getmsgtext- get a counted text string (with conversion)
6767
*pq_getmsgstring - get a null-terminated text string (with conversion)
68+
*pq_getmsgrawstring - get a null-terminated text string - NO conversion
6869
*pq_getmsgend- verify message fully consumed
6970
*/
7071

@@ -639,6 +640,35 @@ pq_getmsgstring(StringInfo msg)
639640
returnpg_client_to_server(str,slen);
640641
}
641642

643+
/* --------------------------------
644+
*pq_getmsgrawstring - get a null-terminated text string - NO conversion
645+
*
646+
*Returns a pointer directly into the message buffer.
647+
* --------------------------------
648+
*/
649+
constchar*
650+
pq_getmsgrawstring(StringInfomsg)
651+
{
652+
char*str;
653+
intslen;
654+
655+
str=&msg->data[msg->cursor];
656+
657+
/*
658+
* It's safe to use strlen() here because a StringInfo is guaranteed to
659+
* have a trailing null byte. But check we found a null inside the
660+
* message.
661+
*/
662+
slen=strlen(str);
663+
if (msg->cursor+slen >=msg->len)
664+
ereport(ERROR,
665+
(errcode(ERRCODE_PROTOCOL_VIOLATION),
666+
errmsg("invalid string in message")));
667+
msg->cursor+=slen+1;
668+
669+
returnstr;
670+
}
671+
642672
/* --------------------------------
643673
*pq_getmsgend- verify message fully consumed
644674
* --------------------------------

‎src/backend/libpq/pqmq.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -232,7 +232,7 @@ pq_parse_errornotice(StringInfo msg, ErrorData *edata)
232232
pq_getmsgend(msg);
233233
break;
234234
}
235-
value=pq_getmsgstring(msg);
235+
value=pq_getmsgrawstring(msg);
236236

237237
switch (code)
238238
{

‎src/include/commands/async.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,10 @@ extern volatile sig_atomic_t notifyInterruptPending;
2828
externSizeAsyncShmemSize(void);
2929
externvoidAsyncShmemInit(void);
3030

31+
externvoidNotifyMyFrontEnd(constchar*channel,
32+
constchar*payload,
33+
int32srcPid);
34+
3135
/* notify-related SQL statements */
3236
externvoidAsync_Notify(constchar*channel,constchar*payload);
3337
externvoidAsync_Listen(constchar*channel);

‎src/include/libpq/pqformat.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ extern const char *pq_getmsgbytes(StringInfo msg, int datalen);
4444
externvoidpq_copymsgbytes(StringInfomsg,char*buf,intdatalen);
4545
externchar*pq_getmsgtext(StringInfomsg,intrawbytes,int*nbytes);
4646
externconstchar*pq_getmsgstring(StringInfomsg);
47+
externconstchar*pq_getmsgrawstring(StringInfomsg);
4748
externvoidpq_getmsgend(StringInfomsg);
4849

4950
#endif/* PQFORMAT_H */

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp