- Notifications
You must be signed in to change notification settings - Fork68
Expand file tree
/
Copy pathtest_pack.py
More file actions
249 lines (206 loc) · 9.01 KB
/
test_pack.py
File metadata and controls
249 lines (206 loc) · 9.01 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
# Copyright (C) 2010, 2011 Sebastian Thiel (byronimo@gmail.com) and contributors
#
# This module is part of GitDB and is released under
# the New BSD License: https://opensource.org/license/bsd-3-clause/
"""Test everything about packs reading and writing"""
fromgitdb.test.libimport (
TestBase,
with_rw_directory,
fixture_path
)
fromgitdb.streamimportDeltaApplyReader
fromgitdb.packimport (
PackEntity,
PackIndexFile,
PackFile
)
fromgitdb.baseimport (
OInfo,
OStream,
)
fromgitdb.funimportdelta_types
fromgitdb.excimportUnsupportedOperation
fromgitdb.utilimportto_bin_sha
importpytest
importos
importtempfile
#{ Utilities
defbin_sha_from_filename(filename):
returnto_bin_sha(os.path.splitext(os.path.basename(filename))[0][5:])
#} END utilities
classTestPack(TestBase):
packindexfile_v1= (fixture_path('packs/pack-c0438c19fb16422b6bbcce24387b3264416d485b.idx'),1,67)
packindexfile_v2= (fixture_path('packs/pack-11fdfa9e156ab73caae3b6da867192221f2089c2.idx'),2,30)
packindexfile_v2_3_ascii= (fixture_path('packs/pack-a2bf8e71d8c18879e499335762dd95119d93d9f1.idx'),2,42)
packfile_v2_1= (fixture_path('packs/pack-c0438c19fb16422b6bbcce24387b3264416d485b.pack'),2,packindexfile_v1[2])
packfile_v2_2= (fixture_path('packs/pack-11fdfa9e156ab73caae3b6da867192221f2089c2.pack'),2,packindexfile_v2[2])
packfile_v2_3_ascii= (
fixture_path('packs/pack-a2bf8e71d8c18879e499335762dd95119d93d9f1.pack'),2,packindexfile_v2_3_ascii[2])
def_assert_index_file(self,index,version,size):
assertindex.packfile_checksum()!=index.indexfile_checksum()
assertlen(index.packfile_checksum())==20
assertlen(index.indexfile_checksum())==20
assertindex.version()==version
assertindex.size()==size
assertlen(index.offsets())==size
# get all data of all objects
foroidxinrange(index.size()):
sha=index.sha(oidx)
assertoidx==index.sha_to_index(sha)
entry=index.entry(oidx)
assertlen(entry)==3
assertentry[0]==index.offset(oidx)
assertentry[1]==sha
assertentry[2]==index.crc(oidx)
# verify partial sha
forlin (4,8,11,17,20):
assertindex.partial_sha_to_index(sha[:l],l*2)==oidx
# END for each object index in indexfile
self.assertRaises(ValueError,index.partial_sha_to_index,"\0",2)
def_assert_pack_file(self,pack,version,size):
assertpack.version()==2
assertpack.size()==size
assertlen(pack.checksum())==20
num_obj=0
forobjinpack.stream_iter():
num_obj+=1
info=pack.info(obj.pack_offset)
stream=pack.stream(obj.pack_offset)
assertinfo.pack_offset==stream.pack_offset
assertinfo.type_id==stream.type_id
asserthasattr(stream,'read')
# it should be possible to read from both streams
assertobj.read()==stream.read()
streams=pack.collect_streams(obj.pack_offset)
assertstreams
# read the stream
try:
dstream=DeltaApplyReader.new(streams)
exceptValueError:
# ignore these, old git versions use only ref deltas,
# which we haven't resolved ( as we are without an index )
# Also ignore non-delta streams
continue
# END get deltastream
# read all
data=dstream.read()
assertlen(data)==dstream.size
# test seek
dstream.seek(0)
assertdstream.read()==data
# read chunks
# NOTE: the current implementation is safe, it basically transfers
# all calls to the underlying memory map
# END for each object
assertnum_obj==size
deftest_pack_index(self):
# check version 1 and 2
forindexfile,version,sizein (self.packindexfile_v1,self.packindexfile_v2):
index=PackIndexFile(indexfile)
self._assert_index_file(index,version,size)
# END run tests
deftest_pack(self):
# there is this special version 3, but apparently its like 2 ...
forpackfile,version,sizein (self.packfile_v2_3_ascii,self.packfile_v2_1,self.packfile_v2_2):
pack=PackFile(packfile)
self._assert_pack_file(pack,version,size)
# END for each pack to test
@with_rw_directory
deftest_pack_entity(self,rw_dir):
pack_objs=list()
forpackinfo,indexinfoin ((self.packfile_v2_1,self.packindexfile_v1),
(self.packfile_v2_2,self.packindexfile_v2),
(self.packfile_v2_3_ascii,self.packindexfile_v2_3_ascii)):
packfile,version,size=packinfo
indexfile,version,size=indexinfo
entity=PackEntity(packfile)
assertentity.pack().path()==packfile
assertentity.index().path()==indexfile
pack_objs.extend(entity.stream_iter())
count=0
forinfo,streaminzip(entity.info_iter(),entity.stream_iter()):
count+=1
assertinfo.binsha==stream.binsha
assertlen(info.binsha)==20
assertinfo.type_id==stream.type_id
assertinfo.size==stream.size
# we return fully resolved items, which is implied by the sha centric access
assertnotinfo.type_idindelta_types
# try all calls
assertlen(entity.collect_streams(info.binsha))
oinfo=entity.info(info.binsha)
assertisinstance(oinfo,OInfo)
assertoinfo.binshaisnotNone
ostream=entity.stream(info.binsha)
assertisinstance(ostream,OStream)
assertostream.binshaisnotNone
# verify the stream
try:
assertentity.is_valid_stream(info.binsha,use_crc=True)
exceptUnsupportedOperation:
pass
# END ignore version issues
assertentity.is_valid_stream(info.binsha,use_crc=False)
# END for each info, stream tuple
assertcount==size
# END for each entity
# pack writing - write all packs into one
# index path can be None
pack_path1=tempfile.mktemp('',"pack1",rw_dir)
pack_path2=tempfile.mktemp('',"pack2",rw_dir)
index_path=tempfile.mktemp('','index',rw_dir)
iteration=0
defrewind_streams():
forobjinpack_objs:
obj.stream.seek(0)
# END utility
forppath,ipath,num_objinzip((pack_path1,pack_path2),
(index_path,None),
(len(pack_objs),None)):
iwrite=None
ifipath:
ifile=open(ipath,'wb')
iwrite=ifile.write
# END handle ip
# make sure we rewind the streams ... we work on the same objects over and over again
ifiteration>0:
rewind_streams()
# END rewind streams
iteration+=1
withopen(ppath,'wb')aspfile:
pack_sha,index_sha=PackEntity.write_pack(pack_objs,pfile.write,iwrite,object_count=num_obj)
assertos.path.getsize(ppath)>100
# verify pack
pf=PackFile(ppath)
assertpf.size()==len(pack_objs)
assertpf.version()==PackFile.pack_version_default
assertpf.checksum()==pack_sha
pf.close()
# verify index
ifipathisnotNone:
ifile.close()
assertos.path.getsize(ipath)>100
idx=PackIndexFile(ipath)
assertidx.version()==PackIndexFile.index_version_default
assertidx.packfile_checksum()==pack_sha
assertidx.indexfile_checksum()==index_sha
assertidx.size()==len(pack_objs)
idx.close()
# END verify files exist
# END for each packpath, indexpath pair
# verify the packs thoroughly
rewind_streams()
entity=PackEntity.create(pack_objs,rw_dir)
count=0
forinfoinentity.info_iter():
count+=1
foruse_crcinrange(2):
assertentity.is_valid_stream(info.binsha,use_crc)
# END for each crc mode
# END for each info
assertcount==len(pack_objs)
entity.close()
deftest_pack_64(self):
# TODO: hex-edit a pack helping us to verify that we can handle 64 byte offsets
# of course without really needing such a huge pack
pytest.skip('not implemented')