|
| 1 | +/* |
| 2 | + * raftable.c |
| 3 | + * |
| 4 | + * A key-value table replicated over Raft. |
| 5 | + * |
| 6 | + */ |
| 7 | + |
| 8 | +#include"postgres.h" |
| 9 | +#include"utils/builtins.h" |
| 10 | +#include"utils/hsearch.h" |
| 11 | +#include"storage/lwlock.h" |
| 12 | +#include"storage/ipc.h" |
| 13 | +#include"funcapi.h" |
| 14 | +#include"access/htup_details.h" |
| 15 | +#include"miscadmin.h" |
| 16 | + |
| 17 | +#include"raftable.h" |
| 18 | + |
| 19 | +#defineRAFTABLE_SHMEM_SIZE (16 * 1024) |
| 20 | +#defineRAFTABLE_HASH_SIZE (127) |
| 21 | +#defineRAFTABLE_VALUE_LEN 64 |
| 22 | + |
| 23 | +void_PG_init(void); |
| 24 | +void_PG_fini(void); |
| 25 | + |
| 26 | +PG_MODULE_MAGIC; |
| 27 | + |
| 28 | +PG_FUNCTION_INFO_V1(raftable_sql_get); |
| 29 | +PG_FUNCTION_INFO_V1(raftable_sql_set); |
| 30 | +PG_FUNCTION_INFO_V1(raftable_sql_list); |
| 31 | + |
| 32 | +staticHTAB*data; |
| 33 | +staticLWLockIddatalock; |
| 34 | +staticshmem_startup_hook_typePreviousShmemStartupHook; |
| 35 | + |
| 36 | +typedefstructRaftableEntry |
| 37 | +{ |
| 38 | +intkey; |
| 39 | +charvalue[RAFTABLE_VALUE_LEN]; |
| 40 | +}RaftableEntry; |
| 41 | + |
| 42 | +Datum |
| 43 | +raftable_sql_get(PG_FUNCTION_ARGS) |
| 44 | +{ |
| 45 | +RaftableEntry*entry; |
| 46 | +intkey=PG_GETARG_INT32(0); |
| 47 | + |
| 48 | +LWLockAcquire(datalock,LW_SHARED); |
| 49 | +entry=hash_search(data,&key,HASH_FIND,NULL); |
| 50 | + |
| 51 | +if (entry) |
| 52 | +{ |
| 53 | +text*t=cstring_to_text(entry->value); |
| 54 | +LWLockRelease(datalock); |
| 55 | +PG_RETURN_TEXT_P(t); |
| 56 | +} |
| 57 | +else |
| 58 | +{ |
| 59 | +LWLockRelease(datalock); |
| 60 | +PG_RETURN_NULL(); |
| 61 | +} |
| 62 | +} |
| 63 | + |
| 64 | +Datum |
| 65 | +raftable_sql_set(PG_FUNCTION_ARGS) |
| 66 | +{ |
| 67 | +intkey=PG_GETARG_INT32(0); |
| 68 | + |
| 69 | +LWLockAcquire(datalock,LW_EXCLUSIVE); |
| 70 | +if (PG_ARGISNULL(1)) |
| 71 | +hash_search(data,&key,HASH_REMOVE,NULL); |
| 72 | +else |
| 73 | +{ |
| 74 | +RaftableEntry*entry=hash_search(data,&key,HASH_ENTER,NULL); |
| 75 | +entry->key=key; |
| 76 | +text_to_cstring_buffer(PG_GETARG_TEXT_P(1),entry->value,sizeof(entry->value)); |
| 77 | +} |
| 78 | +LWLockRelease(datalock); |
| 79 | + |
| 80 | +PG_RETURN_VOID(); |
| 81 | +} |
| 82 | + |
| 83 | +Datum |
| 84 | +raftable_sql_list(PG_FUNCTION_ARGS) |
| 85 | +{ |
| 86 | +FuncCallContext*funcctx; |
| 87 | +MemoryContextoldcontext; |
| 88 | +HASH_SEQ_STATUS*scan; |
| 89 | +RaftableEntry*entry; |
| 90 | + |
| 91 | +if (SRF_IS_FIRSTCALL()) |
| 92 | +{ |
| 93 | +TypeFuncClasstfc; |
| 94 | +funcctx=SRF_FIRSTCALL_INIT(); |
| 95 | + |
| 96 | +oldcontext=MemoryContextSwitchTo(funcctx->multi_call_memory_ctx); |
| 97 | + |
| 98 | +tfc=get_call_result_type(fcinfo,NULL,&funcctx->tuple_desc); |
| 99 | +Assert(tfc==TYPEFUNC_COMPOSITE); |
| 100 | +funcctx->tuple_desc=BlessTupleDesc(funcctx->tuple_desc); |
| 101 | + |
| 102 | +scan= (HASH_SEQ_STATUS*)palloc(sizeof(HASH_SEQ_STATUS)); |
| 103 | +LWLockAcquire(datalock,LW_SHARED); |
| 104 | +hash_seq_init(scan,data); |
| 105 | + |
| 106 | +MemoryContextSwitchTo(oldcontext); |
| 107 | + |
| 108 | +funcctx->user_fctx=scan; |
| 109 | +} |
| 110 | + |
| 111 | +funcctx=SRF_PERCALL_SETUP(); |
| 112 | +scan=funcctx->user_fctx; |
| 113 | + |
| 114 | +if ((entry= (RaftableEntry*)hash_seq_search(scan))) |
| 115 | +{ |
| 116 | +HeapTupletuple; |
| 117 | +Datumvals[2]; |
| 118 | +boolisnull[2]; |
| 119 | + |
| 120 | +vals[0]=Int32GetDatum(entry->key); |
| 121 | +vals[1]=CStringGetTextDatum(entry->value); |
| 122 | +isnull[0]=isnull[1]= false; |
| 123 | + |
| 124 | +tuple=heap_form_tuple(funcctx->tuple_desc,vals,isnull); |
| 125 | +SRF_RETURN_NEXT(funcctx,HeapTupleGetDatum(tuple)); |
| 126 | +} |
| 127 | +else |
| 128 | +{ |
| 129 | +LWLockRelease(datalock); |
| 130 | +SRF_RETURN_DONE(funcctx); |
| 131 | +} |
| 132 | + |
| 133 | +} |
| 134 | + |
| 135 | +staticuint32raftable_hash_fn(constvoid*key,Sizekeysize) |
| 136 | +{ |
| 137 | +return (uint32)*(int*)key; |
| 138 | +} |
| 139 | + |
| 140 | +staticintraftable_match_fn(constvoid*key1,constvoid*key2,Sizekeysize) |
| 141 | +{ |
| 142 | +return*(int*)key1!=*(int*)key2; |
| 143 | +} |
| 144 | + |
| 145 | +staticvoidstartup_shmem(void) |
| 146 | +{ |
| 147 | +HASHCTLinfo; |
| 148 | + |
| 149 | +if (PreviousShmemStartupHook){ |
| 150 | +PreviousShmemStartupHook(); |
| 151 | +} |
| 152 | + |
| 153 | +datalock=LWLockAssign(); |
| 154 | + |
| 155 | +info.keysize=sizeof(int); |
| 156 | +info.entrysize=sizeof(RaftableEntry); |
| 157 | +info.hash=raftable_hash_fn; |
| 158 | +info.match=raftable_match_fn; |
| 159 | + |
| 160 | +data=ShmemInitHash( |
| 161 | +"raftable", |
| 162 | +RAFTABLE_HASH_SIZE,RAFTABLE_HASH_SIZE, |
| 163 | +&info, |
| 164 | +HASH_ELEM |HASH_FUNCTION |HASH_COMPARE |
| 165 | +); |
| 166 | +} |
| 167 | + |
| 168 | +void |
| 169 | +_PG_init(void) |
| 170 | +{ |
| 171 | +if (!process_shared_preload_libraries_in_progress) |
| 172 | +elog(ERROR,"please add 'raftable' to shared_preload_libraries list"); |
| 173 | +RequestAddinShmemSpace(RAFTABLE_SHMEM_SIZE); |
| 174 | +RequestAddinLWLocks(1); |
| 175 | + |
| 176 | +PreviousShmemStartupHook=shmem_startup_hook; |
| 177 | +shmem_startup_hook=startup_shmem; |
| 178 | +} |
| 179 | + |
| 180 | +void |
| 181 | +_PG_fini(void) |
| 182 | +{ |
| 183 | +shmem_startup_hook=PreviousShmemStartupHook; |
| 184 | +} |