Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit46af7da

Browse files
committed
Fixing serialization
1 parentda75db0 commit46af7da

File tree

4 files changed

+447
-375
lines changed

4 files changed

+447
-375
lines changed

‎collector.c

Lines changed: 72 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@
4040
PGWS_DIMENSIONS_APPNAME))
4141
staticvolatilesig_atomic_tshutdown_requested= false;
4242

43-
intsaved_profile_dimensions;//TODO should be initialized with the same value as GUC?
43+
intsaved_profile_dimensions;
4444
intsaved_history_dimensions;
4545

4646
staticvoidhandle_sigterm(SIGNAL_ARGS);
@@ -72,12 +72,15 @@ pgws_register_wait_collector(void)
7272
staticvoid
7373
alloc_history(History*observations,intcount)
7474
{
75-
observations->items= (HistoryItem*)palloc0(sizeof(HistoryItem)*count);
75+
intserialized_size;
76+
77+
saved_history_dimensions=pgws_history_dimensions;
78+
serialized_size=get_serialized_size(saved_history_dimensions, true);
79+
80+
observations->serialized_items= (char*)palloc0(serialized_size*count);
7681
observations->index=0;
7782
observations->count=count;
7883
observations->wraparound= false;
79-
80-
saved_history_dimensions=pgws_history_dimensions;
8184
}
8285

8386
/*
@@ -86,13 +89,17 @@ alloc_history(History *observations, int count)
8689
staticvoid
8790
realloc_history(History*observations,intcount)
8891
{
89-
HistoryItem*newitems;
92+
char*newitems;
9093
intcopyCount,
9194
i,
9295
j;
96+
intserialized_size;
97+
98+
//saved_history_dimensions = pgws_history_dimensions; // TODO вроде как
99+
serialized_size=get_serialized_size(saved_history_dimensions, true);
93100

94101
/* Allocate new array for history */
95-
newitems= (HistoryItem*)palloc0(sizeof(HistoryItem)*count);
102+
newitems= (char*)palloc0(serialized_size*count);
96103

97104
/* Copy entries from old array to the new */
98105
if (observations->wraparound)
@@ -111,19 +118,19 @@ realloc_history(History *observations, int count)
111118
{
112119
if (j >=observations->count)
113120
j=0;
114-
memcpy(&newitems[i],&observations->items[j],sizeof(HistoryItem));
121+
memcpy((newitems+i*serialized_size),
122+
(observations->serialized_items+j*serialized_size),
123+
serialized_size);
115124
i++;
116125
j++;
117126
}
118127

119128
/* Switch to new history array */
120-
pfree(observations->items);
121-
observations->items=newitems;
129+
pfree(observations->serialized_items);
130+
observations->serialized_items=newitems;
122131
observations->index=copyCount;
123132
observations->count=count;
124133
observations->wraparound= false;
125-
126-
saved_history_dimensions=pgws_history_dimensions;
127134
}
128135

129136
staticvoid
@@ -140,18 +147,19 @@ handle_sigterm(SIGNAL_ARGS)
140147
/*
141148
* Get next item of history with rotation.
142149
*/
143-
staticHistoryItem*
150+
staticchar*
144151
get_next_observation(History*observations)
145152
{
146-
HistoryItem*result;
153+
char*result;
154+
intserialized_size=get_serialized_size(saved_history_dimensions, true);
147155

148156
/* Check for wraparound */
149157
if (observations->index >=observations->count)
150158
{
151159
observations->index=0;
152160
observations->wraparound= true;
153161
}
154-
result=&observations->items[observations->index];
162+
result=&observations->serialized_items[observations->index*serialized_size];
155163
observations->index++;
156164
returnresult;
157165
}
@@ -413,8 +421,8 @@ serialize_item(SamplingDimensions dimensions, int dimensions_mask,
413421
}
414422

415423
/* copy all the fields without ts/count */
416-
*serialized_key=palloc0(*serialized_size+1);
417-
strcpy(*serialized_key,dummy_array);
424+
*serialized_key=palloc0(*serialized_size);
425+
memcpy(*serialized_key,dummy_array,*serialized_size);
418426

419427
if (is_history)
420428
{
@@ -430,8 +438,8 @@ serialize_item(SamplingDimensions dimensions, int dimensions_mask,
430438
}
431439

432440
/* copy everything */
433-
*serialized_item=palloc0(*serialized_size+1);
434-
strcpy(*serialized_item,dummy_array);
441+
*serialized_item=palloc0(*serialized_size);
442+
memcpy(*serialized_item,dummy_array,*serialized_size);
435443
}
436444

437445
void
@@ -570,17 +578,17 @@ probe_waits(History *observations, HTAB *profile_hash,
570578
LWLockAcquire(ProcArrayLock,LW_SHARED);
571579
for (i=0;i<ProcGlobal->allProcCount;i++)
572580
{
573-
HistoryItemitem_history,
574-
*observation;
575-
ProfileItemitem_profile;
581+
//HistoryItem item_history,
582+
// *observation;
583+
//ProfileItem item_profile;
576584
PGPROC*proc=&ProcGlobal->allProcs[i];
577585
intpid;
578586
uint32wait_event_info;
579587
SamplingDimensionscommon_dimensions,
580588
history_dimensions,
581589
profile_dimensions;
582-
intdimensions_mask_common=pgws_history_dimensions |
583-
pgws_profile_dimensions;
590+
intdimensions_mask_common=saved_history_dimensions |
591+
saved_profile_dimensions;
584592

585593
/* Check if we need to sample this process */
586594
if (!pgws_should_sample_proc(proc,&pid,&wait_event_info))
@@ -598,21 +606,27 @@ probe_waits(History *observations, HTAB *profile_hash,
598606

599607
copy_dimensions(&history_dimensions,
600608
&common_dimensions,
601-
pgws_history_dimensions);
609+
saved_history_dimensions);
602610
copy_dimensions(&profile_dimensions,
603611
&common_dimensions,
604-
pgws_profile_dimensions);
612+
saved_profile_dimensions);
605613

606-
item_history.ts=ts;
607-
item_history.dimensions=history_dimensions;
614+
//item_history.ts = ts;
615+
//item_history.dimensions = history_dimensions;
608616

609617
/* Write to the history if needed */
610618
if (write_history)
611619
{
612-
//TODO вот тут что-то сделать нужно??? потому что мы не запаковываем
613-
//историю
620+
char*serialized_key,
621+
*serialized_item,
622+
*observation;
623+
intserialized_size=0;
624+
614625
observation=get_next_observation(observations);
615-
*observation=item_history;
626+
serialize_item(history_dimensions,saved_history_dimensions,
627+
&serialized_item,&serialized_key,&serialized_size,
628+
ts, (uint64)0, true);
629+
memcpy(observation,serialized_item,serialized_size);
616630
}
617631

618632
/* Write to the profile if needed */
@@ -626,9 +640,9 @@ probe_waits(History *observations, HTAB *profile_hash,
626640
*stored_item;
627641

628642
if (!profile_pid)
629-
item_profile.dimensions.pid=0;
643+
profile_dimensions.pid=0;
630644

631-
serialize_item(item_profile.dimensions,saved_profile_dimensions,
645+
serialize_item(profile_dimensions,saved_profile_dimensions,
632646
&serialized_item,&serialized_key,&serialized_size,
633647
(TimestampTz)0,count, false);
634648

@@ -659,8 +673,9 @@ probe_waits(History *observations, HTAB *profile_hash,
659673
* Send waits history to shared memory queue.
660674
*/
661675
staticvoid
662-
send_history(History*observations,shm_mq_handle*mqh)
676+
send_history(History*observations,shm_mq_handle*mqh)//TODO TODO TODO
663677
{
678+
intserialized_size=get_serialized_size(saved_history_dimensions, true);
664679
Sizecount,
665680
i;
666681
shm_mq_resultmq_result;
@@ -679,11 +694,20 @@ send_history(History *observations, shm_mq_handle *mqh)
679694
"receiver of message queue has been detached")));
680695
return;
681696
}
697+
/* Send saved_dimensions next */
698+
mq_result=shm_mq_send_compat(mqh,sizeof(saved_history_dimensions),&saved_history_dimensions, false, true);
699+
if (mq_result==SHM_MQ_DETACHED)
700+
{
701+
ereport(WARNING,
702+
(errmsg("pg_wait_sampling collector: "
703+
"receiver of message queue has been detached")));
704+
return;
705+
}
682706
for (i=0;i<count;i++)
683707
{
684708
mq_result=shm_mq_send_compat(mqh,
685-
sizeof(HistoryItem),
686-
&observations->items[i],
709+
serialized_size,
710+
(observations->serialized_items+i*serialized_size),
687711
false,
688712
true);
689713
if (mq_result==SHM_MQ_DETACHED)
@@ -703,7 +727,8 @@ static void
703727
send_profile(HTAB*profile_hash,shm_mq_handle*mqh)
704728
{
705729
HASH_SEQ_STATUSscan_status;
706-
ProfileItem*item;
730+
char*serialized_item;
731+
intserialized_size=get_serialized_size(saved_profile_dimensions, true);
707732
Sizecount=hash_get_num_entries(profile_hash);
708733
shm_mq_resultmq_result;
709734

@@ -716,10 +741,19 @@ send_profile(HTAB *profile_hash, shm_mq_handle *mqh)
716741
"receiver of message queue has been detached")));
717742
return;
718743
}
744+
/* Send saved_dimensions next */
745+
mq_result=shm_mq_send_compat(mqh,sizeof(saved_profile_dimensions),&saved_profile_dimensions, false, true);
746+
if (mq_result==SHM_MQ_DETACHED)
747+
{
748+
ereport(WARNING,
749+
(errmsg("pg_wait_sampling collector: "
750+
"receiver of message queue has been detached")));
751+
return;
752+
}
719753
hash_seq_init(&scan_status,profile_hash);
720-
while ((item= (ProfileItem*)hash_seq_search(&scan_status))!=NULL)
754+
while ((serialized_item= (char*)hash_seq_search(&scan_status))!=NULL)
721755
{
722-
mq_result=shm_mq_send_compat(mqh,sizeof(ProfileItem),item, false,
756+
mq_result=shm_mq_send_compat(mqh,serialized_size,serialized_item, false,
723757
true);
724758
if (mq_result==SHM_MQ_DETACHED)
725759
{

‎pg_wait_sampling--1.1--1.2.sql

Lines changed: 27 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,26 @@
33
-- complain if script is sourced in psql, rather than via ALTER EXTENSION
44
\echo Use"ALTER EXTENSION pg_wait_sampling UPDATE TO 1.2" to load this file. \quit
55

6-
DROPFUNCTION pg_wait_sampling_get_current CASCADE;
7-
DROPFUNCTION pg_wait_sampling_get_profile CASCADE;
8-
DROPFUNCTION pg_wait_sampling_get_history CASCADE;
6+
--DROP FUNCTION pg_wait_sampling_get_current (
7+
--pid int4,
8+
--OUT pid int4,
9+
--OUT event_type text,
10+
--OUT event text
11+
--) CASCADE;
12+
--
13+
--DROP FUNCTION pg_wait_sampling_get_history (
14+
--OUT pid int4,
15+
--OUT ts timestamptz,
16+
--OUT event_type text,
17+
--OUT event text
18+
--) CASCADE;
19+
--
20+
--DROP FUNCTION pg_wait_sampling_get_profile (
21+
--OUT pid int4,
22+
--OUT event_type text,
23+
--OUT event text,
24+
--OUT count bigint
25+
--) CASCADE;
926

1027
CREATEFUNCTIONpg_wait_sampling_get_current_extended (
1128
pid int4,
@@ -35,7 +52,6 @@ GRANT SELECT ON pg_wait_sampling_current TO PUBLIC;
3552

3653
CREATEFUNCTIONpg_wait_sampling_get_history_extended (
3754
OUT pid int4,
38-
OUT tstimestamptz,
3955
OUT event_typetext,
4056
OUT eventtext,
4157
OUT queryid int8,
@@ -48,7 +64,8 @@ CREATE FUNCTION pg_wait_sampling_get_history_extended (
4864
OUT proc_starttimestamptz,
4965
OUT client_addrtext,
5066
OUT client_hostnametext,
51-
OUT appnametext
67+
OUT appnametext,
68+
OUT tstimestamptz
5269
)
5370
RETURNS SETOF record
5471
AS'MODULE_PATHNAME'
@@ -85,9 +102,9 @@ CREATE VIEW pg_wait_sampling_profile_extended AS
85102

86103
GRANTSELECTON pg_wait_sampling_profile_extended TO PUBLIC;
87104

88-
CREATEVIEWpg_wait_sampling_profileAS
89-
SELECT pid, event_type, event, queryid,SUM(count)FROM pg_wait_sampling_profile_extended
90-
GROUP BY pid, event_type, event, queryid;
91-
92-
GRANTSELECTON pg_wait_sampling_profile TO PUBLIC;
105+
--CREATE VIEW pg_wait_sampling_profile AS
106+
--SELECT pid, event_type, event, queryid, SUM(count) FROM pg_wait_sampling_profile_extended
107+
--GROUP BY pid, event_type, event, queryid;
108+
--
109+
--GRANT SELECT ON pg_wait_sampling_profile TO PUBLIC;
93110

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp