Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit4f65ca6

Browse files
authored
Merge pull request#31 from postgrespro/issue#29
Resolve Issue#29
2 parents38e5d3b +a0b05e4 commit4f65ca6

File tree

2 files changed

+61
-28
lines changed

2 files changed

+61
-28
lines changed

‎collector.c

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -225,7 +225,7 @@ send_history(History *observations, shm_mq_handle *mqh)
225225
{
226226
ereport(WARNING,
227227
(errmsg("pg_wait_sampling collector: "
228-
"receiver of message queuehave been detached")));
228+
"receiver of message queuehas been detached")));
229229
return;
230230
}
231231
for (i=0;i<count;i++)
@@ -238,7 +238,7 @@ send_history(History *observations, shm_mq_handle *mqh)
238238
{
239239
ereport(WARNING,
240240
(errmsg("pg_wait_sampling collector: "
241-
"receiver of message queuehave been detached")));
241+
"receiver of message queuehas been detached")));
242242
return;
243243
}
244244
}
@@ -260,7 +260,7 @@ send_profile(HTAB *profile_hash, shm_mq_handle *mqh)
260260
{
261261
ereport(WARNING,
262262
(errmsg("pg_wait_sampling collector: "
263-
"receiver of message queuehave been detached")));
263+
"receiver of message queuehas been detached")));
264264
return;
265265
}
266266
hash_seq_init(&scan_status,profile_hash);
@@ -272,7 +272,7 @@ send_profile(HTAB *profile_hash, shm_mq_handle *mqh)
272272
hash_seq_term(&scan_status);
273273
ereport(WARNING,
274274
(errmsg("pg_wait_sampling collector: "
275-
"receiver of message queuehave been detached")));
275+
"receiver of message queuehas been detached")));
276276
return;
277277
}
278278
}
@@ -468,7 +468,7 @@ collector_main(Datum main_arg)
468468
caseSHM_MQ_DETACHED:
469469
ereport(WARNING,
470470
(errmsg("pg_wait_sampling collector: "
471-
"receiver of message queuehave been "
471+
"receiver of message queuehas been "
472472
"detached")));
473473
break;
474474
default:

‎pg_wait_sampling.c

Lines changed: 56 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,11 @@ shm_mq *collector_mq = NULL;
4747
uint64*proc_queryids=NULL;
4848
CollectorShmqHeader*collector_hdr=NULL;
4949

50+
/* Receiver (backend) local shm_mq pointers and lock */
51+
shm_mq*recv_mq=NULL;
52+
shm_mq_handle*recv_mqh=NULL;
53+
LOCKTAGqueueTag;
54+
5055
staticshmem_startup_hook_typeprev_shmem_startup_hook=NULL;
5156
staticPGPROC*search_proc(intbackendPid);
5257
staticPlannedStmt*pgws_planner_hook(Query*parse,
@@ -290,6 +295,14 @@ check_shmem(void)
290295
}
291296
}
292297

298+
staticvoid
299+
pgws_cleanup_callback(intcode,Datumarg)
300+
{
301+
elog(DEBUG3,"pg_wait_sampling cleanup: detaching shm_mq and releasing queue lock");
302+
shm_mq_detach_compat(recv_mqh,recv_mq);
303+
LockRelease(&queueTag,ExclusiveLock, false);
304+
}
305+
293306
/*
294307
* Module load callback
295308
*/
@@ -499,16 +512,14 @@ init_lock_tag(LOCKTAG *tag, uint32 lock)
499512
staticvoid*
500513
receive_array(SHMRequestrequest,Sizeitem_size,Size*count)
501514
{
502-
LOCKTAGqueueTag;
503515
LOCKTAGcollectorTag;
504-
shm_mq*mq;
505-
shm_mq_handle*mqh;
506516
shm_mq_resultres;
507517
Sizelen,
508518
i;
509519
void*data;
510520
Pointerresult,
511521
ptr;
522+
MemoryContextoldctx;
512523

513524
/* Ensure nobody else trying to send request to queue */
514525
init_lock_tag(&queueTag,PGWS_QUEUE_LOCK);
@@ -519,7 +530,7 @@ receive_array(SHMRequest request, Size item_size, Size *count)
519530
LockAcquire(&collectorTag,ExclusiveLock, false, false);
520531
LockRelease(&collectorTag,ExclusiveLock, false);
521532

522-
mq=shm_mq_create(collector_mq,COLLECTOR_QUEUE_SIZE);
533+
recv_mq=shm_mq_create(collector_mq,COLLECTOR_QUEUE_SIZE);
523534
collector_hdr->request=request;
524535

525536
if (!collector_hdr->latch)
@@ -528,33 +539,55 @@ receive_array(SHMRequest request, Size item_size, Size *count)
528539

529540
SetLatch(collector_hdr->latch);
530541

531-
shm_mq_set_receiver(mq,MyProc);
532-
mqh=shm_mq_attach(mq,NULL,NULL);
542+
shm_mq_set_receiver(recv_mq,MyProc);
533543

534-
res=shm_mq_receive(mqh,&len,&data, false);
535-
if (res!=SHM_MQ_SUCCESS||len!=sizeof(*count))
536-
{
537-
shm_mq_detach_compat(mqh,mq);
538-
elog(ERROR,"Error reading mq.");
539-
}
540-
memcpy(count,data,sizeof(*count));
541-
542-
result=palloc(item_size* (*count));
543-
ptr=result;
544+
/*
545+
* We switch to TopMemoryContext, so that recv_mqh is allocated there
546+
* and is guaranteed to survive until before_shmem_exit callbacks are
547+
* fired. Anyway, shm_mq_detach() will free handler on its own.
548+
*/
549+
oldctx=MemoryContextSwitchTo(TopMemoryContext);
550+
recv_mqh=shm_mq_attach(recv_mq,NULL,NULL);
551+
MemoryContextSwitchTo(oldctx);
544552

545-
for (i=0;i<*count;i++)
553+
/*
554+
* Now we surely attached to the shm_mq and got collector's attention.
555+
* If anything went wrong (e.g. Ctrl+C received from the client) we have
556+
* to cleanup some things, i.e. detach from the shm_mq, so collector was
557+
* able to continue responding to other requests.
558+
*
559+
* PG_ENSURE_ERROR_CLEANUP() guaranties that cleanup callback will be
560+
* fired for both ERROR and FATAL.
561+
*/
562+
PG_ENSURE_ERROR_CLEANUP(pgws_cleanup_callback,0);
546563
{
547-
res=shm_mq_receive(mqh,&len,&data, false);
548-
if (res!=SHM_MQ_SUCCESS||len!=item_size)
564+
res=shm_mq_receive(recv_mqh,&len,&data, false);
565+
if (res!=SHM_MQ_SUCCESS||len!=sizeof(*count))
549566
{
550-
shm_mq_detach_compat(mqh,mq);
567+
shm_mq_detach_compat(recv_mqh,recv_mq);
551568
elog(ERROR,"Error reading mq.");
552569
}
553-
memcpy(ptr,data,item_size);
554-
ptr+=item_size;
570+
memcpy(count,data,sizeof(*count));
571+
572+
result=palloc(item_size* (*count));
573+
ptr=result;
574+
575+
for (i=0;i<*count;i++)
576+
{
577+
res=shm_mq_receive(recv_mqh,&len,&data, false);
578+
if (res!=SHM_MQ_SUCCESS||len!=item_size)
579+
{
580+
shm_mq_detach_compat(recv_mqh,recv_mq);
581+
elog(ERROR,"Error reading mq.");
582+
}
583+
memcpy(ptr,data,item_size);
584+
ptr+=item_size;
585+
}
555586
}
587+
PG_END_ENSURE_ERROR_CLEANUP(pgws_cleanup_callback,0);
556588

557-
shm_mq_detach_compat(mqh,mq);
589+
/* We still have to detach and release lock during normal operation. */
590+
shm_mq_detach_compat(recv_mqh,recv_mq);
558591

559592
LockRelease(&queueTag,ExclusiveLock, false);
560593

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp