22
22
#define SEQ_MAXVALUE ((int4)0x7FFFFFFF)
23
23
#define SEQ_MINVALUE -(SEQ_MAXVALUE)
24
24
25
- typedef struct FormData_pg_sequence
26
- {
27
- NameData sequence_name ;
28
- int4 last_value ;
29
- int4 increment_by ;
30
- int4 max_value ;
31
- int4 min_value ;
32
- int4 cache_value ;
33
- char is_cycled ;
34
- char is_called ;
35
- }FormData_pg_sequence ;
36
-
37
- typedef FormData_pg_sequence * Form_pg_sequence ;
25
+ /*
26
+ * We don't want to log each fetching values from sequences,
27
+ * so we pre-log a few fetches in advance. In the event of
28
+ * crash we can lose as much as we pre-logged.
29
+ */
30
+ #define SEQ_LOG_VALS 32
38
31
39
32
typedef struct sequence_magic
40
33
{
@@ -138,6 +131,11 @@ DefineSequence(CreateSeqStmt *seq)
138
131
coldef -> colname = "cache_value" ;
139
132
value [i - 1 ]= Int32GetDatum (new .cache_value );
140
133
break ;
134
+ case SEQ_COL_LOG :
135
+ typnam -> name = "int4" ;
136
+ coldef -> colname = "log_cnt" ;
137
+ value [i - 1 ]= Int32GetDatum ((int32 )1 );
138
+ break ;
141
139
case SEQ_COL_CYCLE :
142
140
typnam -> name = "char" ;
143
141
coldef -> colname = "is_cycled" ;
@@ -196,10 +194,14 @@ nextval(PG_FUNCTION_ARGS)
196
194
int32 incby ,
197
195
maxv ,
198
196
minv ,
199
- cache ;
197
+ cache ,
198
+ log ,
199
+ fetch ,
200
+ last ;
200
201
int32 result ,
201
202
next ,
202
203
rescnt = 0 ;
204
+ bool logit = false;
203
205
204
206
if (pg_aclcheck (seqname ,GetUserId (),ACL_WR )!= ACLCHECK_OK )
205
207
elog (ERROR ,"%s.nextval: you don't have permissions to set sequence %s" ,
@@ -219,16 +221,27 @@ nextval(PG_FUNCTION_ARGS)
219
221
seq = read_info ("nextval" ,elm ,& buf );/* lock page' buffer and
220
222
* read tuple */
221
223
222
- next = result = seq -> last_value ;
224
+ last = next = result = seq -> last_value ;
223
225
incby = seq -> increment_by ;
224
226
maxv = seq -> max_value ;
225
227
minv = seq -> min_value ;
226
- cache = seq -> cache_value ;
228
+ fetch = cache = seq -> cache_value ;
229
+ log = seq -> log_cnt ;
227
230
228
231
if (seq -> is_called != 't' )
232
+ {
229
233
rescnt ++ ;/* last_value if not called */
234
+ fetch -- ;
235
+ log -- ;
236
+ }
230
237
231
- while (rescnt < cache )/* try to fetch cache numbers */
238
+ if (log < fetch )
239
+ {
240
+ fetch = log = fetch - log + SEQ_LOG_VALS ;
241
+ logit = true;
242
+ }
243
+
244
+ while (fetch )/* try to fetch cache [+ log ] numbers */
232
245
{
233
246
234
247
/*
@@ -242,7 +255,7 @@ nextval(PG_FUNCTION_ARGS)
242
255
(maxv < 0 && next + incby > maxv ))
243
256
{
244
257
if (rescnt > 0 )
245
- break ;/* stopcaching */
258
+ break ;/* stopfetching */
246
259
if (seq -> is_cycled != 't' )
247
260
elog (ERROR ,"%s.nextval: got MAXVALUE (%d)" ,
248
261
elm -> name ,maxv );
@@ -258,7 +271,7 @@ nextval(PG_FUNCTION_ARGS)
258
271
(minv >=0 && next + incby < minv ))
259
272
{
260
273
if (rescnt > 0 )
261
- break ;/* stopcaching */
274
+ break ;/* stopfetching */
262
275
if (seq -> is_cycled != 't' )
263
276
elog (ERROR ,"%s.nextval: got MINVALUE (%d)" ,
264
277
elm -> name ,minv );
@@ -267,17 +280,43 @@ nextval(PG_FUNCTION_ARGS)
267
280
else
268
281
next += incby ;
269
282
}
270
- rescnt ++ ;/* got result */
271
- if (rescnt == 1 )/* if it's first one - */
272
- result = next ;/* it's what to return */
283
+ fetch -- ;
284
+ if (rescnt < cache )
285
+ {
286
+ log -- ;
287
+ rescnt ++ ;
288
+ last = next ;
289
+ if (rescnt == 1 )/* if it's first result - */
290
+ result = next ;/* it's what to return */
291
+ }
273
292
}
274
293
275
294
/* save info in local cache */
276
295
elm -> last = result ;/* last returned number */
277
- elm -> cached = next ;/* last cached number */
296
+ elm -> cached = last ;/* last fetched number */
297
+
298
+ if (logit )
299
+ {
300
+ xl_seq_rec xlrec ;
301
+ XLogRecPtr recptr ;
302
+
303
+ if (fetch )/* not all numbers were fetched */
304
+ log -= fetch ;
305
+
306
+ xlrec .node = elm -> rel -> rd_node ;
307
+ xlrec .value = next ;
308
+
309
+ recptr = XLogInsert (RM_SEQ_ID ,XLOG_SEQ_LOG |XLOG_NO_TRAN ,
310
+ (char * )& xlrec ,sizeof (xlrec ),NULL ,0 );
311
+
312
+ PageSetLSN (BufferGetPage (buf ),recptr );
313
+ PageSetSUI (BufferGetPage (buf ),ThisStartUpID );
314
+ }
278
315
279
316
/* save info in sequence relation */
280
- seq -> last_value = next ;/* last fetched number */
317
+ seq -> last_value = last ;/* last fetched number */
318
+ Assert (log >=0 );
319
+ seq -> log_cnt = log ;/* how much is logged */
281
320
seq -> is_called = 't' ;
282
321
283
322
LockBuffer (buf ,BUFFER_LOCK_UNLOCK );
@@ -349,6 +388,21 @@ do_setval(char *seqname, int32 next, bool iscalled)
349
388
/* save info in sequence relation */
350
389
seq -> last_value = next ;/* last fetched number */
351
390
seq -> is_called = iscalled ?'t' :'f' ;
391
+ seq -> log_cnt = (iscalled ) ?0 :1 ;
392
+
393
+ {
394
+ xl_seq_rec xlrec ;
395
+ XLogRecPtr recptr ;
396
+
397
+ xlrec .node = elm -> rel -> rd_node ;
398
+ xlrec .value = next ;
399
+
400
+ recptr = XLogInsert (RM_SEQ_ID ,XLOG_SEQ_SET |XLOG_NO_TRAN ,
401
+ (char * )& xlrec ,sizeof (xlrec ),NULL ,0 );
402
+
403
+ PageSetLSN (BufferGetPage (buf ),recptr );
404
+ PageSetSUI (BufferGetPage (buf ),ThisStartUpID );
405
+ }
352
406
353
407
LockBuffer (buf ,BUFFER_LOCK_UNLOCK );
354
408
@@ -638,7 +692,6 @@ init_params(CreateSeqStmt *seq, Form_pg_sequence new)
638
692
639
693
}
640
694
641
-
642
695
static int
643
696
get_param (DefElem * def )
644
697
{
@@ -651,3 +704,80 @@ get_param(DefElem *def)
651
704
elog (ERROR ,"DefineSequence: \"%s\" is to be integer" ,def -> defname );
652
705
return -1 ;
653
706
}
707
+
708
+ void seq_redo (XLogRecPtr lsn ,XLogRecord * record )
709
+ {
710
+ uint8 info = record -> xl_info & ~XLR_INFO_MASK ;
711
+ Relation reln ;
712
+ Buffer buffer ;
713
+ Page page ;
714
+ ItemId lp ;
715
+ HeapTupleData tuple ;
716
+ Form_pg_sequence seq ;
717
+ xl_seq_rec * xlrec ;
718
+
719
+ if (info != XLOG_SEQ_LOG && info != XLOG_SEQ_SET )
720
+ elog (STOP ,"seq_redo: unknown op code %u" ,info );
721
+
722
+ xlrec = (xl_seq_rec * )XLogRecGetData (record );
723
+
724
+ reln = XLogOpenRelation (true,RM_SEQ_ID ,xlrec -> node );
725
+ if (!RelationIsValid (reln ))
726
+ return ;
727
+
728
+ buffer = XLogReadBuffer (false,reln ,0 );
729
+ if (!BufferIsValid (buffer ))
730
+ elog (STOP ,"seq_redo: can't read block of %u/%u" ,
731
+ xlrec -> node .tblNode ,xlrec -> node .relNode );
732
+
733
+ page = (Page )BufferGetPage (buffer );
734
+ if (PageIsNew ((PageHeader )page )||
735
+ ((sequence_magic * )PageGetSpecialPointer (page ))-> magic != SEQ_MAGIC )
736
+ elog (STOP ,"seq_redo: uninitialized page of %u/%u" ,
737
+ xlrec -> node .tblNode ,xlrec -> node .relNode );
738
+
739
+ if (XLByteLE (lsn ,PageGetLSN (page )))
740
+ {
741
+ UnlockAndReleaseBuffer (buffer );
742
+ return ;
743
+ }
744
+
745
+ lp = PageGetItemId (page ,FirstOffsetNumber );
746
+ Assert (ItemIdIsUsed (lp ));
747
+ tuple .t_data = (HeapTupleHeader )PageGetItem ((Page )page ,lp );
748
+
749
+ seq = (Form_pg_sequence )GETSTRUCT (& tuple );
750
+
751
+ seq -> last_value = xlrec -> value ;/* last logged value */
752
+ seq -> is_called = 't' ;
753
+ seq -> log_cnt = 0 ;
754
+
755
+ PageSetLSN (page ,lsn );
756
+ PageSetSUI (page ,ThisStartUpID );
757
+ UnlockAndWriteBuffer (buffer );
758
+
759
+ return ;
760
+ }
761
+
762
+ void seq_undo (XLogRecPtr lsn ,XLogRecord * record )
763
+ {
764
+ }
765
+
766
+ void seq_desc (char * buf ,uint8 xl_info ,char * rec )
767
+ {
768
+ uint8 info = xl_info & ~XLR_INFO_MASK ;
769
+ xl_seq_rec * xlrec = (xl_seq_rec * )rec ;
770
+
771
+ if (info == XLOG_SEQ_LOG )
772
+ strcat (buf ,"log: " );
773
+ else if (info == XLOG_SEQ_SET )
774
+ strcat (buf ,"set: " );
775
+ else
776
+ {
777
+ strcat (buf ,"UNKNOWN" );
778
+ return ;
779
+ }
780
+
781
+ sprintf (buf + strlen (buf ),"node %u/%u; value %d" ,
782
+ xlrec -> node .tblNode ,xlrec -> node .relNode ,xlrec -> value );
783
+ }