3434#include "lib/stringinfo.h"
3535#include "libpq/pqsignal.h"
3636#include "miscadmin.h"
37+ #include "nodes/pg_list.h"
3738#include "pgtime.h"
3839#include "postmaster/fork_process.h"
3940#include "postmaster/postmaster.h"
@@ -93,11 +94,14 @@ static char *last_file_name = NULL;
9394static char * last_csv_file_name = NULL ;
9495
9596/*
96- * Buffers for saving partial messages from different backends. We don't expect
97- * that there will be very many outstanding at one time, so 20 seems plenty of
98- * leeway. If this array gets full we won't lose messages, but we will lose
99- * the protocol protection against them being partially written or interleaved.
97+ * Buffers for saving partial messages from different backends.
10098 *
99+ * Keep NBUFFER_LISTS lists of these, with the entry for a given source pid
100+ * being in the list numbered (pid % NBUFFER_LISTS), so as to cut down on
101+ * the number of entries we have to examine for any one incoming message.
102+ * There must never be more than one entry for the same source pid.
103+ *
104+ * An inactive buffer is not removed from its list, just held for re-use.
101105 * An inactive buffer has pid == 0 and undefined contents of data.
102106 */
103107typedef struct
@@ -106,8 +110,8 @@ typedef struct
106110StringInfoData data ;/* accumulated data, as a StringInfo */
107111}save_buffer ;
108112
109- #define CHUNK_SLOTS 20
110- static save_buffer saved_chunks [ CHUNK_SLOTS ];
113+ #define NBUFFER_LISTS 256
114+ static List * buffer_lists [ NBUFFER_LISTS ];
111115
112116/* These must be exported for EXEC_BACKEND case ... annoying */
113117#ifndef WIN32
@@ -592,7 +596,7 @@ SysLogger_Start(void)
592596 * Now we are done with the write end of the pipe.
593597 * CloseHandle() must not be called because the preceding
594598 * close() closes the underlying handle.
595- */
599+ */
596600syslogPipe [1 ]= 0 ;
597601#endif
598602redirection_done = true;
@@ -734,6 +738,12 @@ process_pipe_input(char *logbuffer, int *bytes_in_logbuffer)
734738(p .is_last == 't' || p .is_last == 'f' ||
735739p .is_last == 'T' || p .is_last == 'F' ))
736740{
741+ List * buffer_list ;
742+ ListCell * cell ;
743+ save_buffer * existing_slot = NULL ,
744+ * free_slot = NULL ;
745+ StringInfo str ;
746+
737747chunklen = PIPE_HEADER_SIZE + p .len ;
738748
739749/* Fall out of loop if we don't have the whole chunk yet */
@@ -743,80 +753,70 @@ process_pipe_input(char *logbuffer, int *bytes_in_logbuffer)
743753dest = (p .is_last == 'T' || p .is_last == 'F' ) ?
744754LOG_DESTINATION_CSVLOG :LOG_DESTINATION_STDERR ;
745755
746- if (p .is_last == 'f' || p .is_last == 'F' )
756+ /* Locate any existing buffer for this source pid */
757+ buffer_list = buffer_lists [p .pid %NBUFFER_LISTS ];
758+ foreach (cell ,buffer_list )
747759{
748- /*
749- * Save a complete non-final chunk in the per-pid buffer if
750- * possible - if not just write it out.
751- */
752- int free_slot = -1 ,
753- existing_slot = -1 ;
754- int i ;
755- StringInfo str ;
760+ save_buffer * buf = (save_buffer * )lfirst (cell );
756761
757- for ( i = 0 ; i < CHUNK_SLOTS ; i ++ )
762+ if ( buf -> pid == p . pid )
758763{
759- if (saved_chunks [i ].pid == p .pid )
760- {
761- existing_slot = i ;
762- break ;
763- }
764- if (free_slot < 0 && saved_chunks [i ].pid == 0 )
765- free_slot = i ;
764+ existing_slot = buf ;
765+ break ;
766766}
767- if (existing_slot >=0 )
767+ if (buf -> pid == 0 && free_slot == NULL )
768+ free_slot = buf ;
769+ }
770+
771+ if (p .is_last == 'f' || p .is_last == 'F' )
772+ {
773+ /*
774+ * Save a complete non-final chunk in a per-pid buffer
775+ */
776+ if (existing_slot != NULL )
768777{
769- str = & (saved_chunks [existing_slot ].data );
778+ /* Add chunk to data from preceding chunks */
779+ str = & (existing_slot -> data );
770780appendBinaryStringInfo (str ,
771781cursor + PIPE_HEADER_SIZE ,
772782p .len );
773783}
774- else if ( free_slot >= 0 )
784+ else
775785{
776- saved_chunks [free_slot ].pid = p .pid ;
777- str = & (saved_chunks [free_slot ].data );
786+ /* First chunk of message, save in a new buffer */
787+ if (free_slot == NULL )
788+ {
789+ /*
790+ * Need a free slot, but there isn't one in the list,
791+ * so create a new one and extend the list with it.
792+ */
793+ free_slot = palloc (sizeof (save_buffer ));
794+ buffer_list = lappend (buffer_list ,free_slot );
795+ buffer_lists [p .pid %NBUFFER_LISTS ]= buffer_list ;
796+ }
797+ free_slot -> pid = p .pid ;
798+ str = & (free_slot -> data );
778799initStringInfo (str );
779800appendBinaryStringInfo (str ,
780801cursor + PIPE_HEADER_SIZE ,
781802p .len );
782803}
783- else
784- {
785- /*
786- * If there is no free slot we'll just have to take our
787- * chances and write out a partial message and hope that
788- * it's not followed by something from another pid.
789- */
790- write_syslogger_file (cursor + PIPE_HEADER_SIZE ,p .len ,
791- dest );
792- }
793804}
794805else
795806{
796807/*
797808 * Final chunk --- add it to anything saved for that pid, and
798809 * either way write the whole thing out.
799810 */
800- int existing_slot = -1 ;
801- int i ;
802- StringInfo str ;
803-
804- for (i = 0 ;i < CHUNK_SLOTS ;i ++ )
805- {
806- if (saved_chunks [i ].pid == p .pid )
807- {
808- existing_slot = i ;
809- break ;
810- }
811- }
812- if (existing_slot >=0 )
811+ if (existing_slot != NULL )
813812{
814- str = & (saved_chunks [ existing_slot ]. data );
813+ str = & (existing_slot -> data );
815814appendBinaryStringInfo (str ,
816815cursor + PIPE_HEADER_SIZE ,
817816p .len );
818817write_syslogger_file (str -> data ,str -> len ,dest );
819- saved_chunks [existing_slot ].pid = 0 ;
818+ /* Mark the buffer unused, and reclaim string storage */
819+ existing_slot -> pid = 0 ;
820820pfree (str -> data );
821821}
822822else
@@ -872,17 +872,27 @@ static void
872872flush_pipe_input (char * logbuffer ,int * bytes_in_logbuffer )
873873{
874874int i ;
875- StringInfo str ;
876875
877876/* Dump any incomplete protocol messages */
878- for (i = 0 ;i < CHUNK_SLOTS ;i ++ )
877+ for (i = 0 ;i < NBUFFER_LISTS ;i ++ )
879878{
880- if (saved_chunks [i ].pid != 0 )
879+ List * list = buffer_lists [i ];
880+ ListCell * cell ;
881+
882+ foreach (cell ,list )
881883{
882- str = & (saved_chunks [i ].data );
883- write_syslogger_file (str -> data ,str -> len ,LOG_DESTINATION_STDERR );
884- saved_chunks [i ].pid = 0 ;
885- pfree (str -> data );
884+ save_buffer * buf = (save_buffer * )lfirst (cell );
885+
886+ if (buf -> pid != 0 )
887+ {
888+ StringInfo str = & (buf -> data );
889+
890+ write_syslogger_file (str -> data ,str -> len ,
891+ LOG_DESTINATION_STDERR );
892+ /* Mark the buffer unused, and reclaim string storage */
893+ buf -> pid = 0 ;
894+ pfree (str -> data );
895+ }
886896}
887897}
888898