Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commitbd2cb9a

Browse files
committed
Implement a chunking protocol for writes to the syslogger pipe, with messages
reassembled in the syslogger before writing to the log file. This preventspartial messages from being written, which mucks up log rotation, andmessages from different backends being interleaved, which causes garbledlogs. Backport as far as 8.0, where the syslogger was introduced.Tom Lane and Andrew Dunstan
1 parent320f820 commitbd2cb9a

File tree

3 files changed

+345
-16
lines changed

3 files changed

+345
-16
lines changed

‎src/backend/postmaster/syslogger.c

Lines changed: 255 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
*
1919
*
2020
* IDENTIFICATION
21-
* $PostgreSQL: pgsql/src/backend/postmaster/syslogger.c,v 1.31 2007/06/04 22:21:42 adunstan Exp $
21+
* $PostgreSQL: pgsql/src/backend/postmaster/syslogger.c,v 1.32 2007/06/14 01:48:51 adunstan Exp $
2222
*
2323
*-------------------------------------------------------------------------
2424
*/
@@ -31,6 +31,7 @@
3131
#include<sys/stat.h>
3232
#include<sys/time.h>
3333

34+
#include"lib/stringinfo.h"
3435
#include"libpq/pqsignal.h"
3536
#include"miscadmin.h"
3637
#include"pgtime.h"
@@ -54,6 +55,13 @@
5455
#defineLBF_MODE_IOLBF
5556
#endif
5657

58+
/*
59+
* We read() into a temp buffer twice as big as a chunk, so that any fragment
60+
* left after processing can be moved down to the front and we'll still have
61+
* room to read a full chunk.
62+
*/
63+
#defineREAD_BUF_SIZE (2 * PIPE_CHUNK_SIZE)
64+
5765

5866
/*
5967
* GUC parameters.Redirect_stderr cannot be changed after postmaster
@@ -75,15 +83,28 @@ boolam_syslogger = false;
7583
* Private state
7684
*/
7785
staticpg_time_tnext_rotation_time;
78-
7986
staticboolredirection_done= false;
80-
8187
staticboolpipe_eof_seen= false;
82-
8388
staticFILE*syslogFile=NULL;
84-
8589
staticchar*last_file_name=NULL;
8690

91+
/*
92+
* Buffers for saving partial messages from different backends. We don't expect
93+
* that there will be very many outstanding at one time, so 20 seems plenty of
94+
* leeway. If this array gets full we won't lose messages, but we will lose
95+
* the protocol protection against them being partially written or interleaved.
96+
*
97+
* An inactive buffer has pid == 0 and undefined contents of data.
98+
*/
99+
typedefstruct
100+
{
101+
int32pid;/* PID of source process */
102+
StringInfoDatadata;/* accumulated data, as a StringInfo */
103+
}save_buffer;
104+
105+
#defineCHUNK_SLOTS 20
106+
staticsave_buffersaved_chunks[CHUNK_SLOTS];
107+
87108
/* These must be exported for EXEC_BACKEND case ... annoying */
88109
#ifndefWIN32
89110
intsyslogPipe[2]= {-1,-1};
@@ -108,6 +129,8 @@ static volatile sig_atomic_t rotation_requested = false;
108129
staticpid_tsyslogger_forkexec(void);
109130
staticvoidsyslogger_parseArgs(intargc,char*argv[]);
110131
#endif
132+
staticvoidprocess_pipe_input(char*logbuffer,int*bytes_in_logbuffer);
133+
staticvoidflush_pipe_input(char*logbuffer,int*bytes_in_logbuffer);
111134

112135
#ifdefWIN32
113136
staticunsignedint __stdcallpipeThread(void*arg);
@@ -126,6 +149,10 @@ static void sigUsr1Handler(SIGNAL_ARGS);
126149
NON_EXEC_STATICvoid
127150
SysLoggerMain(intargc,char*argv[])
128151
{
152+
#ifndefWIN32
153+
charlogbuffer[READ_BUF_SIZE];
154+
intbytes_in_logbuffer=0;
155+
#endif
129156
char*currentLogDir;
130157
char*currentLogFilename;
131158
intcurrentLogRotationAge;
@@ -244,7 +271,6 @@ SysLoggerMain(int argc, char *argv[])
244271
booltime_based_rotation= false;
245272

246273
#ifndefWIN32
247-
charlogbuffer[1024];
248274
intbytesRead;
249275
intrc;
250276
fd_setrfds;
@@ -326,8 +352,8 @@ SysLoggerMain(int argc, char *argv[])
326352
elseif (rc>0&&FD_ISSET(syslogPipe[0],&rfds))
327353
{
328354
bytesRead=piperead(syslogPipe[0],
329-
logbuffer,sizeof(logbuffer));
330-
355+
logbuffer+bytes_in_logbuffer,
356+
sizeof(logbuffer)-bytes_in_logbuffer);
331357
if (bytesRead<0)
332358
{
333359
if (errno!=EINTR)
@@ -337,7 +363,8 @@ SysLoggerMain(int argc, char *argv[])
337363
}
338364
elseif (bytesRead>0)
339365
{
340-
write_syslogger_file(logbuffer,bytesRead);
366+
bytes_in_logbuffer+=bytesRead;
367+
process_pipe_input(logbuffer,&bytes_in_logbuffer);
341368
continue;
342369
}
343370
else
@@ -349,6 +376,9 @@ SysLoggerMain(int argc, char *argv[])
349376
* and all backends are shut down, and we are done.
350377
*/
351378
pipe_eof_seen= true;
379+
380+
/* if there's any data left then force it out now */
381+
flush_pipe_input(logbuffer,&bytes_in_logbuffer);
352382
}
353383
}
354384
#else/* WIN32 */
@@ -611,6 +641,207 @@ syslogger_parseArgs(int argc, char *argv[])
611641
#endif/* EXEC_BACKEND */
612642

613643

644+
/* --------------------------------
645+
*pipe protocol handling
646+
* --------------------------------
647+
*/
648+
649+
/*
650+
* Process data received through the syslogger pipe.
651+
*
652+
* This routine interprets the log pipe protocol which sends log messages as
653+
* (hopefully atomic) chunks - such chunks are detected and reassembled here.
654+
*
655+
* The protocol has a header that starts with two nul bytes, then has a 16 bit
656+
* length, the pid of the sending process, and a flag to indicate if it is
657+
* the last chunk in a message. Incomplete chunks are saved until we read some
658+
* more, and non-final chunks are accumulated until we get the final chunk.
659+
*
660+
* All of this is to avoid 2 problems:
661+
* . partial messages being written to logfiles (messes rotation), and
662+
* . messages from different backends being interleaved (messages garbled).
663+
*
664+
* Any non-protocol messages are written out directly. These should only come
665+
* from non-PostgreSQL sources, however (e.g. third party libraries writing to
666+
* stderr).
667+
*
668+
* logbuffer is the data input buffer, and *bytes_in_logbuffer is the number
669+
* of bytes present. On exit, any not-yet-eaten data is left-justified in
670+
* logbuffer, and *bytes_in_logbuffer is updated.
671+
*/
672+
staticvoid
673+
process_pipe_input(char*logbuffer,int*bytes_in_logbuffer)
674+
{
675+
char*cursor=logbuffer;
676+
intcount=*bytes_in_logbuffer;
677+
678+
/* While we have enough for a header, process data... */
679+
while (count >= (int)sizeof(PipeProtoHeader))
680+
{
681+
PipeProtoHeaderp;
682+
intchunklen;
683+
684+
/* Do we have a valid header? */
685+
memcpy(&p,cursor,sizeof(PipeProtoHeader));
686+
if (p.nuls[0]=='\0'&&p.nuls[1]=='\0'&&
687+
p.len>0&&p.len <=PIPE_MAX_PAYLOAD&&
688+
p.pid!=0&&
689+
(p.is_last=='t'||p.is_last=='f'))
690+
{
691+
chunklen=PIPE_HEADER_SIZE+p.len;
692+
693+
/* Fall out of loop if we don't have the whole chunk yet */
694+
if (count<chunklen)
695+
break;
696+
697+
if (p.is_last=='f')
698+
{
699+
/*
700+
* Save a complete non-final chunk in the per-pid buffer
701+
* if possible - if not just write it out.
702+
*/
703+
intfree_slot=-1,existing_slot=-1;
704+
inti;
705+
StringInfostr;
706+
707+
for (i=0;i<CHUNK_SLOTS;i++)
708+
{
709+
if (saved_chunks[i].pid==p.pid)
710+
{
711+
existing_slot=i;
712+
break;
713+
}
714+
if (free_slot<0&&saved_chunks[i].pid==0)
715+
free_slot=i;
716+
}
717+
if (existing_slot >=0)
718+
{
719+
str=&(saved_chunks[existing_slot].data);
720+
appendBinaryStringInfo(str,
721+
cursor+PIPE_HEADER_SIZE,
722+
p.len);
723+
}
724+
elseif (free_slot >=0)
725+
{
726+
saved_chunks[free_slot].pid=p.pid;
727+
str=&(saved_chunks[free_slot].data);
728+
initStringInfo(str);
729+
appendBinaryStringInfo(str,
730+
cursor+PIPE_HEADER_SIZE,
731+
p.len);
732+
}
733+
else
734+
{
735+
/*
736+
* If there is no free slot we'll just have to take our
737+
* chances and write out a partial message and hope that
738+
* it's not followed by something from another pid.
739+
*/
740+
write_syslogger_file(cursor+PIPE_HEADER_SIZE,p.len);
741+
}
742+
}
743+
else
744+
{
745+
/*
746+
* Final chunk --- add it to anything saved for that pid, and
747+
* either way write the whole thing out.
748+
*/
749+
intexisting_slot=-1;
750+
inti;
751+
StringInfostr;
752+
753+
for (i=0;i<CHUNK_SLOTS;i++)
754+
{
755+
if (saved_chunks[i].pid==p.pid)
756+
{
757+
existing_slot=i;
758+
break;
759+
}
760+
}
761+
if (existing_slot >=0)
762+
{
763+
str=&(saved_chunks[existing_slot].data);
764+
appendBinaryStringInfo(str,
765+
cursor+PIPE_HEADER_SIZE,
766+
p.len);
767+
write_syslogger_file(str->data,str->len);
768+
saved_chunks[existing_slot].pid=0;
769+
pfree(str->data);
770+
}
771+
else
772+
{
773+
/* The whole message was one chunk, evidently. */
774+
write_syslogger_file(cursor+PIPE_HEADER_SIZE,p.len);
775+
}
776+
}
777+
778+
/* Finished processing this chunk */
779+
cursor+=chunklen;
780+
count-=chunklen;
781+
}
782+
else
783+
{
784+
/* Process non-protocol data */
785+
786+
/*
787+
* Look for the start of a protocol header. If found, dump data
788+
* up to there and repeat the loop. Otherwise, dump it all and
789+
* fall out of the loop. (Note: we want to dump it all if
790+
* at all possible, so as to avoid dividing non-protocol messages
791+
* across logfiles. We expect that in many scenarios, a
792+
* non-protocol message will arrive all in one read(), and we
793+
* want to respect the read() boundary if possible.)
794+
*/
795+
for (chunklen=1;chunklen<count;chunklen++)
796+
{
797+
if (cursor[chunklen]=='\0')
798+
break;
799+
}
800+
write_syslogger_file(cursor,chunklen);
801+
cursor+=chunklen;
802+
count-=chunklen;
803+
}
804+
}
805+
806+
/* We don't have a full chunk, so left-align what remains in the buffer */
807+
if (count>0&&cursor!=logbuffer)
808+
memmove(logbuffer,cursor,count);
809+
*bytes_in_logbuffer=count;
810+
}
811+
812+
/*
813+
* Force out any buffered data
814+
*
815+
* This is currently used only at syslogger shutdown, but could perhaps be
816+
* useful at other times, so it is careful to leave things in a clean state.
817+
*/
818+
staticvoid
819+
flush_pipe_input(char*logbuffer,int*bytes_in_logbuffer)
820+
{
821+
inti;
822+
StringInfostr;
823+
824+
/* Dump any incomplete protocol messages */
825+
for (i=0;i<CHUNK_SLOTS;i++)
826+
{
827+
if (saved_chunks[i].pid!=0)
828+
{
829+
str=&(saved_chunks[i].data);
830+
write_syslogger_file(str->data,str->len);
831+
saved_chunks[i].pid=0;
832+
pfree(str->data);
833+
}
834+
}
835+
/*
836+
* Force out any remaining pipe data as-is; we don't bother trying to
837+
* remove any protocol headers that may exist in it.
838+
*/
839+
if (*bytes_in_logbuffer>0)
840+
write_syslogger_file(logbuffer,*bytes_in_logbuffer);
841+
*bytes_in_logbuffer=0;
842+
}
843+
844+
614845
/* --------------------------------
615846
*logfile routines
616847
* --------------------------------
@@ -653,12 +884,16 @@ write_syslogger_file(const char *buffer, int count)
653884
staticunsignedint __stdcall
654885
pipeThread(void*arg)
655886
{
656-
DWORDbytesRead;
657-
charlogbuffer[1024];
887+
charlogbuffer[READ_BUF_SIZE];
888+
intbytes_in_logbuffer=0;
658889

659890
for (;;)
660891
{
661-
if (!ReadFile(syslogPipe[0],logbuffer,sizeof(logbuffer),
892+
DWORDbytesRead;
893+
894+
if (!ReadFile(syslogPipe[0],
895+
logbuffer+bytes_in_logbuffer,
896+
sizeof(logbuffer)-bytes_in_logbuffer,
662897
&bytesRead,0))
663898
{
664899
DWORDerror=GetLastError();
@@ -672,11 +907,18 @@ pipeThread(void *arg)
672907
errmsg("could not read from logger pipe: %m")));
673908
}
674909
elseif (bytesRead>0)
675-
write_syslogger_file(logbuffer,bytesRead);
910+
{
911+
bytes_in_logbuffer+=bytesRead;
912+
process_pipe_input(logbuffer,&bytes_in_logbuffer);
913+
}
676914
}
677915

678916
/* We exit the above loop only upon detecting pipe EOF */
679917
pipe_eof_seen= true;
918+
919+
/* if there's any data left then force it out now */
920+
flush_pipe_input(logbuffer,&bytes_in_logbuffer);
921+
680922
_endthread();
681923
return0;
682924
}

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp