34
34
#include "lib/stringinfo.h"
35
35
#include "libpq/pqsignal.h"
36
36
#include "miscadmin.h"
37
+ #include "nodes/pg_list.h"
37
38
#include "pgtime.h"
38
39
#include "postmaster/fork_process.h"
39
40
#include "postmaster/postmaster.h"
@@ -92,11 +93,14 @@ static char *last_file_name = NULL;
92
93
static char * last_csv_file_name = NULL ;
93
94
94
95
/*
95
- * Buffers for saving partial messages from different backends. We don't expect
96
- * that there will be very many outstanding at one time, so 20 seems plenty of
97
- * leeway. If this array gets full we won't lose messages, but we will lose
98
- * the protocol protection against them being partially written or interleaved.
96
+ * Buffers for saving partial messages from different backends.
99
97
*
98
+ * Keep NBUFFER_LISTS lists of these, with the entry for a given source pid
99
+ * being in the list numbered (pid % NBUFFER_LISTS), so as to cut down on
100
+ * the number of entries we have to examine for any one incoming message.
101
+ * There must never be more than one entry for the same source pid.
102
+ *
103
+ * An inactive buffer is not removed from its list, just held for re-use.
100
104
* An inactive buffer has pid == 0 and undefined contents of data.
101
105
*/
102
106
typedef struct
@@ -105,8 +109,8 @@ typedef struct
105
109
StringInfoData data ;/* accumulated data, as a StringInfo */
106
110
}save_buffer ;
107
111
108
- #define CHUNK_SLOTS 20
109
- static save_buffer saved_chunks [ CHUNK_SLOTS ];
112
+ #define NBUFFER_LISTS 256
113
+ static List * buffer_lists [ NBUFFER_LISTS ];
110
114
111
115
/* These must be exported for EXEC_BACKEND case ... annoying */
112
116
#ifndef WIN32
@@ -597,7 +601,7 @@ SysLogger_Start(void)
597
601
* Now we are done with the write end of the pipe.
598
602
* CloseHandle() must not be called because the preceding
599
603
* close() closes the underlying handle.
600
- */
604
+ */
601
605
syslogPipe [1 ]= 0 ;
602
606
#endif
603
607
redirection_done = true;
@@ -739,6 +743,12 @@ process_pipe_input(char *logbuffer, int *bytes_in_logbuffer)
739
743
(p .is_last == 't' || p .is_last == 'f' ||
740
744
p .is_last == 'T' || p .is_last == 'F' ))
741
745
{
746
+ List * buffer_list ;
747
+ ListCell * cell ;
748
+ save_buffer * existing_slot = NULL ,
749
+ * free_slot = NULL ;
750
+ StringInfo str ;
751
+
742
752
chunklen = PIPE_HEADER_SIZE + p .len ;
743
753
744
754
/* Fall out of loop if we don't have the whole chunk yet */
@@ -748,80 +758,70 @@ process_pipe_input(char *logbuffer, int *bytes_in_logbuffer)
748
758
dest = (p .is_last == 'T' || p .is_last == 'F' ) ?
749
759
LOG_DESTINATION_CSVLOG :LOG_DESTINATION_STDERR ;
750
760
751
- if (p .is_last == 'f' || p .is_last == 'F' )
761
+ /* Locate any existing buffer for this source pid */
762
+ buffer_list = buffer_lists [p .pid %NBUFFER_LISTS ];
763
+ foreach (cell ,buffer_list )
752
764
{
753
- /*
754
- * Save a complete non-final chunk in the per-pid buffer if
755
- * possible - if not just write it out.
756
- */
757
- int free_slot = -1 ,
758
- existing_slot = -1 ;
759
- int i ;
760
- StringInfo str ;
765
+ save_buffer * buf = (save_buffer * )lfirst (cell );
761
766
762
- for ( i = 0 ; i < CHUNK_SLOTS ; i ++ )
767
+ if ( buf -> pid == p . pid )
763
768
{
764
- if (saved_chunks [i ].pid == p .pid )
765
- {
766
- existing_slot = i ;
767
- break ;
768
- }
769
- if (free_slot < 0 && saved_chunks [i ].pid == 0 )
770
- free_slot = i ;
769
+ existing_slot = buf ;
770
+ break ;
771
771
}
772
- if (existing_slot >=0 )
772
+ if (buf -> pid == 0 && free_slot == NULL )
773
+ free_slot = buf ;
774
+ }
775
+
776
+ if (p .is_last == 'f' || p .is_last == 'F' )
777
+ {
778
+ /*
779
+ * Save a complete non-final chunk in a per-pid buffer
780
+ */
781
+ if (existing_slot != NULL )
773
782
{
774
- str = & (saved_chunks [existing_slot ].data );
783
+ /* Add chunk to data from preceding chunks */
784
+ str = & (existing_slot -> data );
775
785
appendBinaryStringInfo (str ,
776
786
cursor + PIPE_HEADER_SIZE ,
777
787
p .len );
778
788
}
779
- else if ( free_slot >= 0 )
789
+ else
780
790
{
781
- saved_chunks [free_slot ].pid = p .pid ;
782
- str = & (saved_chunks [free_slot ].data );
791
+ /* First chunk of message, save in a new buffer */
792
+ if (free_slot == NULL )
793
+ {
794
+ /*
795
+ * Need a free slot, but there isn't one in the list,
796
+ * so create a new one and extend the list with it.
797
+ */
798
+ free_slot = palloc (sizeof (save_buffer ));
799
+ buffer_list = lappend (buffer_list ,free_slot );
800
+ buffer_lists [p .pid %NBUFFER_LISTS ]= buffer_list ;
801
+ }
802
+ free_slot -> pid = p .pid ;
803
+ str = & (free_slot -> data );
783
804
initStringInfo (str );
784
805
appendBinaryStringInfo (str ,
785
806
cursor + PIPE_HEADER_SIZE ,
786
807
p .len );
787
808
}
788
- else
789
- {
790
- /*
791
- * If there is no free slot we'll just have to take our
792
- * chances and write out a partial message and hope that
793
- * it's not followed by something from another pid.
794
- */
795
- write_syslogger_file (cursor + PIPE_HEADER_SIZE ,p .len ,
796
- dest );
797
- }
798
809
}
799
810
else
800
811
{
801
812
/*
802
813
* Final chunk --- add it to anything saved for that pid, and
803
814
* either way write the whole thing out.
804
815
*/
805
- int existing_slot = -1 ;
806
- int i ;
807
- StringInfo str ;
808
-
809
- for (i = 0 ;i < CHUNK_SLOTS ;i ++ )
810
- {
811
- if (saved_chunks [i ].pid == p .pid )
812
- {
813
- existing_slot = i ;
814
- break ;
815
- }
816
- }
817
- if (existing_slot >=0 )
816
+ if (existing_slot != NULL )
818
817
{
819
- str = & (saved_chunks [ existing_slot ]. data );
818
+ str = & (existing_slot -> data );
820
819
appendBinaryStringInfo (str ,
821
820
cursor + PIPE_HEADER_SIZE ,
822
821
p .len );
823
822
write_syslogger_file (str -> data ,str -> len ,dest );
824
- saved_chunks [existing_slot ].pid = 0 ;
823
+ /* Mark the buffer unused, and reclaim string storage */
824
+ existing_slot -> pid = 0 ;
825
825
pfree (str -> data );
826
826
}
827
827
else
@@ -877,17 +877,27 @@ static void
877
877
flush_pipe_input (char * logbuffer ,int * bytes_in_logbuffer )
878
878
{
879
879
int i ;
880
- StringInfo str ;
881
880
882
881
/* Dump any incomplete protocol messages */
883
- for (i = 0 ;i < CHUNK_SLOTS ;i ++ )
882
+ for (i = 0 ;i < NBUFFER_LISTS ;i ++ )
884
883
{
885
- if (saved_chunks [i ].pid != 0 )
884
+ List * list = buffer_lists [i ];
885
+ ListCell * cell ;
886
+
887
+ foreach (cell ,list )
886
888
{
887
- str = & (saved_chunks [i ].data );
888
- write_syslogger_file (str -> data ,str -> len ,LOG_DESTINATION_STDERR );
889
- saved_chunks [i ].pid = 0 ;
890
- pfree (str -> data );
889
+ save_buffer * buf = (save_buffer * )lfirst (cell );
890
+
891
+ if (buf -> pid != 0 )
892
+ {
893
+ StringInfo str = & (buf -> data );
894
+
895
+ write_syslogger_file (str -> data ,str -> len ,
896
+ LOG_DESTINATION_STDERR );
897
+ /* Mark the buffer unused, and reclaim string storage */
898
+ buf -> pid = 0 ;
899
+ pfree (str -> data );
900
+ }
891
901
}
892
902
}
893
903