Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit9b4d973

Browse files
committed
Fix syslogger to not lose log coherency under high load.
The original coding of the syslogger had an arbitrary limit of 20 largemessages concurrently in progress, after which it would just punt and dumpmessage fragments to the output file separately. Our ambitions are a bithigher than that now, so allow the data structure to expand as necessary.Reported and patched by Andrew Dunstan; some editing by Tom
1 parent49281db commit9b4d973

File tree

1 file changed

+71
-61
lines changed

1 file changed

+71
-61
lines changed

‎src/backend/postmaster/syslogger.c

Lines changed: 71 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
#include"lib/stringinfo.h"
3535
#include"libpq/pqsignal.h"
3636
#include"miscadmin.h"
37+
#include"nodes/pg_list.h"
3738
#include"pgtime.h"
3839
#include"postmaster/fork_process.h"
3940
#include"postmaster/postmaster.h"
@@ -92,11 +93,14 @@ static char *last_file_name = NULL;
9293
staticchar*last_csv_file_name=NULL;
9394

9495
/*
95-
* Buffers for saving partial messages from different backends. We don't expect
96-
* that there will be very many outstanding at one time, so 20 seems plenty of
97-
* leeway. If this array gets full we won't lose messages, but we will lose
98-
* the protocol protection against them being partially written or interleaved.
96+
* Buffers for saving partial messages from different backends.
9997
*
98+
* Keep NBUFFER_LISTS lists of these, with the entry for a given source pid
99+
* being in the list numbered (pid % NBUFFER_LISTS), so as to cut down on
100+
* the number of entries we have to examine for any one incoming message.
101+
* There must never be more than one entry for the same source pid.
102+
*
103+
* An inactive buffer is not removed from its list, just held for re-use.
100104
* An inactive buffer has pid == 0 and undefined contents of data.
101105
*/
102106
typedefstruct
@@ -105,8 +109,8 @@ typedef struct
105109
StringInfoDatadata;/* accumulated data, as a StringInfo */
106110
}save_buffer;
107111

108-
#defineCHUNK_SLOTS 20
109-
staticsave_buffersaved_chunks[CHUNK_SLOTS];
112+
#defineNBUFFER_LISTS 256
113+
staticList*buffer_lists[NBUFFER_LISTS];
110114

111115
/* These must be exported for EXEC_BACKEND case ... annoying */
112116
#ifndefWIN32
@@ -597,7 +601,7 @@ SysLogger_Start(void)
597601
* Now we are done with the write end of the pipe.
598602
* CloseHandle() must not be called because the preceding
599603
* close() closes the underlying handle.
600-
*/
604+
*/
601605
syslogPipe[1]=0;
602606
#endif
603607
redirection_done= true;
@@ -739,6 +743,12 @@ process_pipe_input(char *logbuffer, int *bytes_in_logbuffer)
739743
(p.is_last=='t'||p.is_last=='f'||
740744
p.is_last=='T'||p.is_last=='F'))
741745
{
746+
List*buffer_list;
747+
ListCell*cell;
748+
save_buffer*existing_slot=NULL,
749+
*free_slot=NULL;
750+
StringInfostr;
751+
742752
chunklen=PIPE_HEADER_SIZE+p.len;
743753

744754
/* Fall out of loop if we don't have the whole chunk yet */
@@ -748,80 +758,70 @@ process_pipe_input(char *logbuffer, int *bytes_in_logbuffer)
748758
dest= (p.is_last=='T'||p.is_last=='F') ?
749759
LOG_DESTINATION_CSVLOG :LOG_DESTINATION_STDERR;
750760

751-
if (p.is_last=='f'||p.is_last=='F')
761+
/* Locate any existing buffer for this source pid */
762+
buffer_list=buffer_lists[p.pid %NBUFFER_LISTS];
763+
foreach(cell,buffer_list)
752764
{
753-
/*
754-
* Save a complete non-final chunk in the per-pid buffer if
755-
* possible - if not just write it out.
756-
*/
757-
intfree_slot=-1,
758-
existing_slot=-1;
759-
inti;
760-
StringInfostr;
765+
save_buffer*buf= (save_buffer*)lfirst(cell);
761766

762-
for (i=0;i<CHUNK_SLOTS;i++)
767+
if (buf->pid==p.pid)
763768
{
764-
if (saved_chunks[i].pid==p.pid)
765-
{
766-
existing_slot=i;
767-
break;
768-
}
769-
if (free_slot<0&&saved_chunks[i].pid==0)
770-
free_slot=i;
769+
existing_slot=buf;
770+
break;
771771
}
772-
if (existing_slot >=0)
772+
if (buf->pid==0&&free_slot==NULL)
773+
free_slot=buf;
774+
}
775+
776+
if (p.is_last=='f'||p.is_last=='F')
777+
{
778+
/*
779+
* Save a complete non-final chunk in a per-pid buffer
780+
*/
781+
if (existing_slot!=NULL)
773782
{
774-
str=&(saved_chunks[existing_slot].data);
783+
/* Add chunk to data from preceding chunks */
784+
str=&(existing_slot->data);
775785
appendBinaryStringInfo(str,
776786
cursor+PIPE_HEADER_SIZE,
777787
p.len);
778788
}
779-
elseif (free_slot >=0)
789+
else
780790
{
781-
saved_chunks[free_slot].pid=p.pid;
782-
str=&(saved_chunks[free_slot].data);
791+
/* First chunk of message, save in a new buffer */
792+
if (free_slot==NULL)
793+
{
794+
/*
795+
* Need a free slot, but there isn't one in the list,
796+
* so create a new one and extend the list with it.
797+
*/
798+
free_slot=palloc(sizeof(save_buffer));
799+
buffer_list=lappend(buffer_list,free_slot);
800+
buffer_lists[p.pid %NBUFFER_LISTS]=buffer_list;
801+
}
802+
free_slot->pid=p.pid;
803+
str=&(free_slot->data);
783804
initStringInfo(str);
784805
appendBinaryStringInfo(str,
785806
cursor+PIPE_HEADER_SIZE,
786807
p.len);
787808
}
788-
else
789-
{
790-
/*
791-
* If there is no free slot we'll just have to take our
792-
* chances and write out a partial message and hope that
793-
* it's not followed by something from another pid.
794-
*/
795-
write_syslogger_file(cursor+PIPE_HEADER_SIZE,p.len,
796-
dest);
797-
}
798809
}
799810
else
800811
{
801812
/*
802813
* Final chunk --- add it to anything saved for that pid, and
803814
* either way write the whole thing out.
804815
*/
805-
intexisting_slot=-1;
806-
inti;
807-
StringInfostr;
808-
809-
for (i=0;i<CHUNK_SLOTS;i++)
810-
{
811-
if (saved_chunks[i].pid==p.pid)
812-
{
813-
existing_slot=i;
814-
break;
815-
}
816-
}
817-
if (existing_slot >=0)
816+
if (existing_slot!=NULL)
818817
{
819-
str=&(saved_chunks[existing_slot].data);
818+
str=&(existing_slot->data);
820819
appendBinaryStringInfo(str,
821820
cursor+PIPE_HEADER_SIZE,
822821
p.len);
823822
write_syslogger_file(str->data,str->len,dest);
824-
saved_chunks[existing_slot].pid=0;
823+
/* Mark the buffer unused, and reclaim string storage */
824+
existing_slot->pid=0;
825825
pfree(str->data);
826826
}
827827
else
@@ -877,17 +877,27 @@ static void
877877
flush_pipe_input(char*logbuffer,int*bytes_in_logbuffer)
878878
{
879879
inti;
880-
StringInfostr;
881880

882881
/* Dump any incomplete protocol messages */
883-
for (i=0;i<CHUNK_SLOTS;i++)
882+
for (i=0;i<NBUFFER_LISTS;i++)
884883
{
885-
if (saved_chunks[i].pid!=0)
884+
List*list=buffer_lists[i];
885+
ListCell*cell;
886+
887+
foreach(cell,list)
886888
{
887-
str=&(saved_chunks[i].data);
888-
write_syslogger_file(str->data,str->len,LOG_DESTINATION_STDERR);
889-
saved_chunks[i].pid=0;
890-
pfree(str->data);
889+
save_buffer*buf= (save_buffer*)lfirst(cell);
890+
891+
if (buf->pid!=0)
892+
{
893+
StringInfostr=&(buf->data);
894+
895+
write_syslogger_file(str->data,str->len,
896+
LOG_DESTINATION_STDERR);
897+
/* Mark the buffer unused, and reclaim string storage */
898+
buf->pid=0;
899+
pfree(str->data);
900+
}
891901
}
892902
}
893903

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp