Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commite5fe570

Browse files
committed
aio: Implement smgr/md/fd write support
TODO:- Right now the sync.c integration with smgr.c/md.c isn't properly safe to use in a critical section The only reason it doesn't immediately fail is that it's reasonably rare that RegisterSyncRequest() fails *and* either: - smgropen()->hash_search(HASH_ENTER) decides to resize the hash table, even though the lookup is guaranteed to succeed for io_method=worker. - an io_method=uring completion is run in a different backend and smgropen() needs to build a new entry and thus needs to allocate memory For a bit I thought this could be worked around easily enough by not doing an smgropen() in mdsyncfiletag(), or adding a "fallible" smgropen() and instead just opening the file directly. That actually does kinda solve the problem, but only because the memory allocation in PathNameOpenFile() uses malloc(), not palloc() and thus doesn't trigger- temp_file_limit implementation
1 parent4ce424d commite5fe570

File tree

8 files changed

+269
-0
lines changed

8 files changed

+269
-0
lines changed

‎src/backend/storage/aio/aio_callback.c

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ static const PgAioHandleCallbacksEntry aio_handle_cbs[] = {
4141
CALLBACK_ENTRY(PGAIO_HCB_INVALID,aio_invalid_cb),
4242

4343
CALLBACK_ENTRY(PGAIO_HCB_MD_READV,aio_md_readv_cb),
44+
CALLBACK_ENTRY(PGAIO_HCB_MD_WRITEV,aio_md_writev_cb),
4445

4546
CALLBACK_ENTRY(PGAIO_HCB_SHARED_BUFFER_READV,aio_shared_buffer_readv_cb),
4647

‎src/backend/storage/file/fd.c

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2348,6 +2348,34 @@ FileWriteV(File file, const struct iovec *iov, int iovcnt, off_t offset,
23482348
returnreturnCode;
23492349
}
23502350

2351+
int
2352+
FileStartWriteV(PgAioHandle*ioh,Filefile,
2353+
intiovcnt,off_toffset,
2354+
uint32wait_event_info)
2355+
{
2356+
intreturnCode;
2357+
Vfd*vfdP;
2358+
2359+
Assert(FileIsValid(file));
2360+
2361+
DO_DB(elog(LOG,"FileStartWriteV: %d (%s) "INT64_FORMAT" %d",
2362+
file,VfdCache[file].fileName,
2363+
(int64)offset,
2364+
iovcnt));
2365+
2366+
returnCode=FileAccess(file);
2367+
if (returnCode<0)
2368+
returnreturnCode;
2369+
2370+
vfdP=&VfdCache[file];
2371+
2372+
/* FIXME: think about / reimplement temp_file_limit */
2373+
2374+
pgaio_io_start_writev(ioh,vfdP->fd,iovcnt,offset);
2375+
2376+
return0;
2377+
}
2378+
23512379
int
23522380
FileSync(Filefile,uint32wait_event_info)
23532381
{

‎src/backend/storage/smgr/md.c

Lines changed: 199 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -155,12 +155,19 @@ static BlockNumber _mdnblocks(SMgrRelation reln, ForkNumber forknum,
155155

156156
staticPgAioResultmd_readv_complete(PgAioHandle*ioh,PgAioResultprior_result,uint8cb_data);
157157
staticvoidmd_readv_report(PgAioResultresult,constPgAioTargetData*target_data,intelevel);
158+
staticPgAioResultmd_writev_complete(PgAioHandle*ioh,PgAioResultprior_result,uint8cb_data);
159+
staticvoidmd_writev_report(PgAioResultresult,constPgAioTargetData*target_data,intelevel);
158160

159161
constPgAioHandleCallbacksaio_md_readv_cb= {
160162
.complete_shared=md_readv_complete,
161163
.report=md_readv_report,
162164
};
163165

166+
constPgAioHandleCallbacksaio_md_writev_cb= {
167+
.complete_shared=md_writev_complete,
168+
.report=md_writev_report,
169+
};
170+
164171

165172
staticinlineint
166173
_mdfd_open_flags(void)
@@ -1115,6 +1122,64 @@ mdwritev(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
11151122
}
11161123
}
11171124

1125+
/*
1126+
* mdstartwritev() -- Asynchronous version of mdrwritev().
1127+
*/
1128+
void
1129+
mdstartwritev(PgAioHandle*ioh,
1130+
SMgrRelationreln,ForkNumberforknum,BlockNumberblocknum,
1131+
constvoid**buffers,BlockNumbernblocks,boolskipFsync)
1132+
{
1133+
off_tseekpos;
1134+
MdfdVec*v;
1135+
BlockNumbernblocks_this_segment;
1136+
structiovec*iov;
1137+
intiovcnt;
1138+
intret;
1139+
1140+
v=_mdfd_getseg(reln,forknum,blocknum, false,
1141+
EXTENSION_FAIL |EXTENSION_CREATE_RECOVERY);
1142+
1143+
seekpos= (off_t)BLCKSZ* (blocknum % ((BlockNumber)RELSEG_SIZE));
1144+
1145+
Assert(seekpos< (off_t)BLCKSZ*RELSEG_SIZE);
1146+
1147+
nblocks_this_segment=
1148+
Min(nblocks,
1149+
RELSEG_SIZE- (blocknum % ((BlockNumber)RELSEG_SIZE)));
1150+
1151+
if (nblocks_this_segment!=nblocks)
1152+
elog(ERROR,"write crossing segment boundary");
1153+
1154+
iovcnt=pgaio_io_get_iovec(ioh,&iov);
1155+
1156+
Assert(nblocks <=iovcnt);
1157+
1158+
iovcnt=buffers_to_iovec(iov,unconstify(void**,buffers),nblocks_this_segment);
1159+
1160+
Assert(iovcnt <=nblocks_this_segment);
1161+
1162+
if (!(io_direct_flags&IO_DIRECT_DATA))
1163+
pgaio_io_set_flag(ioh,PGAIO_HF_BUFFERED);
1164+
1165+
pgaio_io_set_target_smgr(ioh,
1166+
reln,
1167+
forknum,
1168+
blocknum,
1169+
nblocks,
1170+
skipFsync);
1171+
pgaio_io_register_callbacks(ioh,PGAIO_HCB_MD_WRITEV,0);
1172+
1173+
ret=FileStartWriteV(ioh,v->mdfd_vfd,iovcnt,seekpos,WAIT_EVENT_DATA_FILE_WRITE);
1174+
if (ret!=0)
1175+
ereport(ERROR,
1176+
(errcode_for_file_access(),
1177+
errmsg("could not start writing blocks %u..%u in file \"%s\": %m",
1178+
blocknum,
1179+
blocknum+nblocks_this_segment-1,
1180+
FilePathName(v->mdfd_vfd))));
1181+
}
1182+
11181183

11191184
/*
11201185
* mdwriteback() -- Tell the kernel to write pages back to storage.
@@ -1503,6 +1568,40 @@ register_dirty_segment(SMgrRelation reln, ForkNumber forknum, MdfdVec *seg)
15031568
}
15041569
}
15051570

1571+
/*
1572+
* Like register_dirty_segment(), except for use by AIO. In the completion
1573+
* callback we don't have access to the MdfdVec (the completion callback might
1574+
* be executed in a different backend than the issuing backend), therefore we
1575+
* have to implement this slightly differently.
1576+
*/
1577+
staticvoid
1578+
register_dirty_segment_aio(RelFileLocatorlocator,ForkNumberforknum,uint64segno)
1579+
{
1580+
FileTagtag;
1581+
1582+
INIT_MD_FILETAG(tag,locator,forknum,segno);
1583+
1584+
/*
1585+
* Can't block here waiting for checkpointer to accept our sync request,
1586+
* as checkpointer might be waiting for this AIO to finish if offloaded to
1587+
* a worker.
1588+
*/
1589+
if (!RegisterSyncRequest(&tag,SYNC_REQUEST, false/* retryOnError */ ))
1590+
{
1591+
charpath[MAXPGPATH];
1592+
1593+
ereport(DEBUG1,
1594+
(errmsg_internal("could not forward fsync request because request queue is full")));
1595+
1596+
/* reuse mdsyncfiletag() to avoid duplicating code */
1597+
if (mdsyncfiletag(&tag,path))
1598+
ereport(data_sync_elevel(ERROR),
1599+
(errcode_for_file_access(),
1600+
errmsg("could not fsync file \"%s\": %m",
1601+
path)));
1602+
}
1603+
}
1604+
15061605
/*
15071606
* register_unlink_segment() -- Schedule a file to be deleted after next checkpoint
15081607
*/
@@ -2037,3 +2136,103 @@ md_readv_report(PgAioResult result, const PgAioTargetData *td, int elevel)
20372136
td->smgr.nblocks* (size_t)BLCKSZ));
20382137
}
20392138
}
2139+
2140+
/*
2141+
* AIO completion callback for mdstartwritev().
2142+
*/
2143+
staticPgAioResult
2144+
md_writev_complete(PgAioHandle*ioh,PgAioResultprior_result,uint8cb_data)
2145+
{
2146+
PgAioTargetData*td=pgaio_io_get_target_data(ioh);
2147+
PgAioResultresult=prior_result;
2148+
2149+
if (prior_result.result<0)
2150+
{
2151+
result.status=PGAIO_RS_ERROR;
2152+
result.id=PGAIO_HCB_MD_WRITEV;
2153+
/* For "hard" errors, track the error number in error_data */
2154+
result.error_data=-prior_result.result;
2155+
result.result=0;
2156+
2157+
pgaio_result_report(result,td,LOG);
2158+
2159+
returnresult;
2160+
}
2161+
2162+
/*
2163+
* As explained above smgrstartwritev(), the smgr API operates on the
2164+
* level of blocks, rather than bytes. Convert.
2165+
*/
2166+
result.result /=BLCKSZ;
2167+
2168+
Assert(result.result <=td->smgr.nblocks);
2169+
2170+
if (result.result==0)
2171+
{
2172+
/* consider 0 blocks written a failure */
2173+
result.status=PGAIO_RS_ERROR;
2174+
result.id=PGAIO_HCB_MD_WRITEV;
2175+
result.error_data=0;
2176+
2177+
pgaio_result_report(result,td,LOG);
2178+
2179+
returnresult;
2180+
}
2181+
2182+
if (result.status!=PGAIO_RS_ERROR&&
2183+
result.result<td->smgr.nblocks)
2184+
{
2185+
/* partial writes should be retried at upper level */
2186+
result.status=PGAIO_RS_PARTIAL;
2187+
result.id=PGAIO_HCB_MD_WRITEV;
2188+
}
2189+
2190+
if (!td->smgr.skip_fsync)
2191+
register_dirty_segment_aio(td->smgr.rlocator,td->smgr.forkNum,
2192+
td->smgr.blockNum / ((BlockNumber)RELSEG_SIZE));
2193+
2194+
returnresult;
2195+
}
2196+
2197+
/*
2198+
* AIO error reporting callback for mdstartwritev().
2199+
*/
2200+
staticvoid
2201+
md_writev_report(PgAioResultresult,constPgAioTargetData*td,intelevel)
2202+
{
2203+
RelPathStrpath;
2204+
2205+
path=relpathbackend(td->smgr.rlocator,
2206+
td->smgr.is_temp ?MyProcNumber :INVALID_PROC_NUMBER,
2207+
td->smgr.forkNum);
2208+
2209+
if (result.error_data!=0)
2210+
{
2211+
errno=result.error_data;/* for errcode_for_file_access() */
2212+
2213+
ereport(elevel,
2214+
errcode_for_file_access(),
2215+
errmsg("could not write blocks %u..%u in file \"%s\": %m",
2216+
td->smgr.blockNum,
2217+
td->smgr.blockNum+td->smgr.nblocks,
2218+
path.str)
2219+
);
2220+
}
2221+
else
2222+
{
2223+
/*
2224+
* NB: This will typically only be output in debug messages, while
2225+
* retrying a partial IO.
2226+
*/
2227+
ereport(elevel,
2228+
errcode(ERRCODE_DATA_CORRUPTED),
2229+
errmsg("could not write blocks %u..%u in file \"%s\": wrote only %zu of %zu bytes",
2230+
td->smgr.blockNum,
2231+
td->smgr.blockNum+td->smgr.nblocks-1,
2232+
path.str,
2233+
result.result* (size_t)BLCKSZ,
2234+
td->smgr.nblocks* (size_t)BLCKSZ
2235+
)
2236+
);
2237+
}
2238+
}

‎src/backend/storage/smgr/smgr.c

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,11 @@ typedef struct f_smgr
115115
BlockNumberblocknum,
116116
constvoid**buffers,BlockNumbernblocks,
117117
boolskipFsync);
118+
void(*smgr_startwritev) (PgAioHandle*ioh,
119+
SMgrRelationreln,ForkNumberforknum,
120+
BlockNumberblocknum,
121+
constvoid**buffers,BlockNumbernblocks,
122+
boolskipFsync);
118123
void(*smgr_writeback) (SMgrRelationreln,ForkNumberforknum,
119124
BlockNumberblocknum,BlockNumbernblocks);
120125
BlockNumber (*smgr_nblocks) (SMgrRelationreln,ForkNumberforknum);
@@ -142,6 +147,7 @@ static const f_smgr smgrsw[] = {
142147
.smgr_readv=mdreadv,
143148
.smgr_startreadv=mdstartreadv,
144149
.smgr_writev=mdwritev,
150+
.smgr_startwritev=mdstartwritev,
145151
.smgr_writeback=mdwriteback,
146152
.smgr_nblocks=mdnblocks,
147153
.smgr_truncate=mdtruncate,
@@ -787,6 +793,29 @@ smgrwritev(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
787793
RESUME_INTERRUPTS();
788794
}
789795

796+
/*
797+
* smgrstartwritev() -- asynchronous version of smgrwritev()
798+
*
799+
* This starts an asynchronous writev IO using the IO handle `ioh`. Other than
800+
* `ioh` all parameters are the same as smgrwritev().
801+
*
802+
* Completion callbacks above smgr will be passed the result as the number of
803+
* successfully written blocks if the write [partially] succeeds. This
804+
* maintains the abstraction that smgr operates on the level of blocks, rather
805+
* than bytes.
806+
*/
807+
void
808+
smgrstartwritev(PgAioHandle*ioh,
809+
SMgrRelationreln,ForkNumberforknum,BlockNumberblocknum,
810+
constvoid**buffers,BlockNumbernblocks,boolskipFsync)
811+
{
812+
HOLD_INTERRUPTS();
813+
smgrsw[reln->smgr_which].smgr_startwritev(ioh,
814+
reln,forknum,blocknum,buffers,
815+
nblocks,skipFsync);
816+
RESUME_INTERRUPTS();
817+
}
818+
790819
/*
791820
* smgrwriteback() -- Trigger kernel writeback for the supplied range of
792821
* blocks.

‎src/include/storage/aio.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -194,6 +194,7 @@ typedef enum PgAioHandleCallbackID
194194
PGAIO_HCB_INVALID=0,
195195

196196
PGAIO_HCB_MD_READV,
197+
PGAIO_HCB_MD_WRITEV,
197198

198199
PGAIO_HCB_SHARED_BUFFER_READV,
199200

‎src/include/storage/fd.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,7 @@ extern intFilePrefetch(File file, off_t offset, off_t amount, uint32 wait_event
112112
externssize_tFileReadV(Filefile,conststructiovec*iov,intiovcnt,off_toffset,uint32wait_event_info);
113113
externssize_tFileWriteV(Filefile,conststructiovec*iov,intiovcnt,off_toffset,uint32wait_event_info);
114114
externintFileStartReadV(structPgAioHandle*ioh,Filefile,intiovcnt,off_toffset,uint32wait_event_info);
115+
externintFileStartWriteV(structPgAioHandle*ioh,Filefile,intiovcnt,off_toffset,uint32wait_event_info);
115116
externintFileSync(Filefile,uint32wait_event_info);
116117
externintFileZero(Filefile,off_toffset,off_tamount,uint32wait_event_info);
117118
externintFileFallocate(Filefile,off_toffset,off_tamount,uint32wait_event_info);

‎src/include/storage/md.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
#include"storage/sync.h"
2222

2323
externconstPgAioHandleCallbacksaio_md_readv_cb;
24+
externconstPgAioHandleCallbacksaio_md_writev_cb;
2425

2526
/* md storage manager functionality */
2627
externvoidmdinit(void);
@@ -45,6 +46,10 @@ extern void mdstartreadv(PgAioHandle *ioh,
4546
externvoidmdwritev(SMgrRelationreln,ForkNumberforknum,
4647
BlockNumberblocknum,
4748
constvoid**buffers,BlockNumbernblocks,boolskipFsync);
49+
externvoidmdstartwritev(PgAioHandle*ioh,
50+
SMgrRelationreln,ForkNumberforknum,
51+
BlockNumberblocknum,
52+
constvoid**buffers,BlockNumbernblocks,boolskipFsync);
4853
externvoidmdwriteback(SMgrRelationreln,ForkNumberforknum,
4954
BlockNumberblocknum,BlockNumbernblocks);
5055
externBlockNumbermdnblocks(SMgrRelationreln,ForkNumberforknum);

‎src/include/storage/smgr.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,11 @@ extern void smgrwritev(SMgrRelation reln, ForkNumber forknum,
108108
BlockNumberblocknum,
109109
constvoid**buffers,BlockNumbernblocks,
110110
boolskipFsync);
111+
externvoidsmgrstartwritev(PgAioHandle*ioh,
112+
SMgrRelationreln,ForkNumberforknum,
113+
BlockNumberblocknum,
114+
constvoid**buffers,BlockNumbernblocks,
115+
boolskipFsync);
111116
externvoidsmgrwriteback(SMgrRelationreln,ForkNumberforknum,
112117
BlockNumberblocknum,BlockNumbernblocks);
113118
externBlockNumbersmgrnblocks(SMgrRelationreln,ForkNumberforknum);

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp