Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

[PGPRO-5531] Fixed crashes in the receive_msg_by_parts function#30

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to ourterms of service andprivacy statement. We’ll occasionally send you account related emails.

Already on GitHub?Sign in to your account

Merged
kovdb75 merged 1 commit intomasterfromPGPRO-5531
Sep 21, 2021
Merged
Show file tree
Hide file tree
Changes fromall commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 38 additions & 10 deletionspg_query_state.c
View file
Open in desktop
Original file line numberDiff line numberDiff line change
Expand Up@@ -61,7 +61,7 @@ static void qs_ExecutorRun(QueryDesc *queryDesc, ScanDirection direction,
static void qs_ExecutorFinish(QueryDesc *queryDesc);

static shm_mq_result receive_msg_by_parts(shm_mq_handle *mqh, Size *total,
void **datap, bool nowait);
void **datap,int64 timeout, int *rc,bool nowait);

/* Global variables */
List *QueryDescStack = NIL;
Expand DownExpand Up@@ -780,7 +780,7 @@ shm_mq_receive_with_timeout(shm_mq_handle *mqh,
{
shm_mq_result mq_receive_result;

mq_receive_result = receive_msg_by_parts(mqh, nbytesp, datap, true);
mq_receive_result = receive_msg_by_parts(mqh, nbytesp, datap,timeout, &rc,true);
if (mq_receive_result != SHM_MQ_WOULD_BLOCK)
return mq_receive_result;
if (rc & WL_TIMEOUT || delay <= 0)
Expand DownExpand Up@@ -967,33 +967,61 @@ copy_msg(shm_mq_msg *msg)

static shm_mq_result
receive_msg_by_parts(shm_mq_handle *mqh, Size *total, void **datap,
bool nowait)
int64 timeout, int *rc,bool nowait)
{
shm_mq_result mq_receive_result;
shm_mq_msg*buff;
intoffset;
Size*expected;
Sizeexpected_data;
Size*expected;
Sizeexpected_data;
Sizelen;

/* Get the expected number of bytes in message */
mq_receive_result = shm_mq_receive(mqh, &len, (void **) &expected, nowait);
expected_data = *expected;
if (mq_receive_result != SHM_MQ_SUCCESS)
return mq_receive_result;
Assert(len == sizeof(Size));

expected_data = *expected;
*datap = palloc0(expected_data);

/* Get the message itself */
for (offset = 0; offset < expected_data; )
{
int64 delay = timeout;
/* Keep receiving new messages until we assemble the full message */
mq_receive_result = shm_mq_receive(mqh, &len, ((void **) &buff), nowait);
for (;;)
{
mq_receive_result = shm_mq_receive(mqh, &len, ((void **) &buff), nowait);
if (mq_receive_result != SHM_MQ_SUCCESS)
{
if (nowait && mq_receive_result == SHM_MQ_WOULD_BLOCK)
{
/*
* We can't leave this function during reading parts with
* error code SHM_MQ_WOULD_BLOCK because can be be error
* at next call receive_msg_by_parts() with continuing
* reading non-readed parts.
* So we should wait whole MAX_RCV_TIMEOUT timeout and
* return error after that only.
*/
if (delay > 0)
{
pg_usleep(PART_RCV_DELAY * 1000);
delay -= PART_RCV_DELAY;
continue;
}
if (rc)
{/* Mark that the timeout has expired: */
*rc |= WL_TIMEOUT;
}
}
return mq_receive_result;
}
break;
}
memcpy((char *) *datap + offset, buff, len);
offset += len;
if (mq_receive_result != SHM_MQ_SUCCESS)
return mq_receive_result;
}

*total = offset;
Expand DownExpand Up@@ -1074,7 +1102,7 @@ GetRemoteBackendQueryStates(PGPROC *leader,
mqh = shm_mq_attach(mq, NULL, NULL);
elog(DEBUG1, "Wait response from leader %d", leader->pid);
mq_receive_result = receive_msg_by_parts(mqh, &len, (void **) &msg,
false);
0, NULL,false);
if (mq_receive_result != SHM_MQ_SUCCESS)
goto mq_error;
if (msg->reqid != reqid)
Expand Down
5 changes: 5 additions & 0 deletionspg_query_state.h
View file
Open in desktop
Original file line numberDiff line numberDiff line change
Expand Up@@ -31,6 +31,11 @@
#define MAX_RCV_TIMEOUT 6000 /* 6 seconds */
#define MAX_SND_TIMEOUT 3000 /* 3 seconds */

/*
* Delay for receiving parts of full message (in case SHM_MQ_WOULD_BLOCK code),
* should be tess than MAX_RCV_TIMEOUT
*/
#define PART_RCV_DELAY 1000 /* 1 second */

/*
* Result status on query state request from asked backend
Expand Down

[8]ページ先頭

©2009-2025 Movatter.jp