- Notifications
You must be signed in to change notification settings - Fork68
Expand file tree
/
Copy pathtest_stream.py
More file actions
164 lines (133 loc) · 5.59 KB
/
test_stream.py
File metadata and controls
164 lines (133 loc) · 5.59 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
# Copyright (C) 2010, 2011 Sebastian Thiel (byronimo@gmail.com) and contributors
#
# This module is part of GitDB and is released under
# the New BSD License: https://opensource.org/license/bsd-3-clause/
"""Test for object db"""
fromgitdb.test.libimport (
TestBase,
DummyStream,
make_bytes,
make_object,
fixture_path
)
fromgitdbimport (
DecompressMemMapReader,
FDCompressedSha1Writer,
LooseObjectDB,
Sha1Writer,
MemoryDB,
IStream,
)
fromgitdb.utilimporthex_to_bin
importzlib
fromgitdb.typimport (
str_blob_type
)
importtempfile
importos
fromioimportBytesIO
classTestStream(TestBase):
"""Test stream classes"""
data_sizes= (15,10000,1000*1024+512)
def_assert_stream_reader(self,stream,cdata,rewind_stream=lambdas:None):
"""Make stream tests - the orig_stream is seekable, allowing it to be
rewound and reused
:param cdata: the data we expect to read from stream, the contents
:param rewind_stream: function called to rewind the stream to make it ready
for reuse"""
ns=10
assertlen(cdata)>ns-1,"Data must be larger than %i, was %i"% (ns,len(cdata))
# read in small steps
ss=len(cdata)//ns
foriinrange(ns):
data=stream.read(ss)
chunk=cdata[i*ss:(i+1)*ss]
assertdata==chunk
# END for each step
rest=stream.read()
ifrest:
assertrest==cdata[-len(rest):]
# END handle rest
ifisinstance(stream,DecompressMemMapReader):
assertlen(stream.data())==stream.compressed_bytes_read()
# END handle special type
rewind_stream(stream)
# read everything
rdata=stream.read()
assertrdata==cdata
ifisinstance(stream,DecompressMemMapReader):
assertlen(stream.data())==stream.compressed_bytes_read()
# END handle special type
deftest_decompress_reader(self):
forclose_on_deletioninrange(2):
forwith_sizeinrange(2):
fordsinself.data_sizes:
cdata=make_bytes(ds,randomize=False)
# zdata = zipped actual data
# cdata = original content data
# create reader
ifwith_size:
# need object data
zdata=zlib.compress(make_object(str_blob_type,cdata))
typ,size,reader=DecompressMemMapReader.new(zdata,close_on_deletion)
assertsize==len(cdata)
asserttyp==str_blob_type
# even if we don't set the size, it will be set automatically on first read
test_reader=DecompressMemMapReader(zdata,close_on_deletion=False)
asserttest_reader._s==len(cdata)
else:
# here we need content data
zdata=zlib.compress(cdata)
reader=DecompressMemMapReader(zdata,close_on_deletion,len(cdata))
assertreader._s==len(cdata)
# END get reader
self._assert_stream_reader(reader,cdata,lambdar:r.seek(0))
# put in a dummy stream for closing
dummy=DummyStream()
reader._m=dummy
assertnotdummy.closed
del(reader)
assertdummy.closed==close_on_deletion
# END for each datasize
# END whether size should be used
# END whether stream should be closed when deleted
deftest_sha_writer(self):
writer=Sha1Writer()
assert2==writer.write(b"hi")
assertlen(writer.sha(as_hex=1))==40
assertlen(writer.sha(as_hex=0))==20
# make sure it does something ;)
prev_sha=writer.sha()
writer.write(b"hi again")
assertwriter.sha()!=prev_sha
deftest_compressed_writer(self):
fordsinself.data_sizes:
fd,path=tempfile.mkstemp()
ostream=FDCompressedSha1Writer(fd)
data=make_bytes(ds,randomize=False)
# for now, just a single write, code doesn't care about chunking
assertlen(data)==ostream.write(data)
ostream.close()
# its closed already
self.assertRaises(OSError,os.close,fd)
# read everything back, compare to data we zip
fd=os.open(path,os.O_RDONLY|getattr(os,'O_BINARY',0))
written_data=os.read(fd,os.path.getsize(path))
assertlen(written_data)==os.path.getsize(path)
os.close(fd)
assertwritten_data==zlib.compress(data,1)# best speed
os.remove(path)
# END for each os
deftest_decompress_reader_special_case(self):
odb=LooseObjectDB(fixture_path('objects'))
mdb=MemoryDB()
forshain (b'888401851f15db0eed60eb1bc29dec5ddcace911',
b'7bb839852ed5e3a069966281bb08d50012fb309b',):
ostream=odb.stream(hex_to_bin(sha))
# if there is a bug, we will be missing one byte exactly !
data=ostream.read()
assertlen(data)==ostream.size
# Putting it back in should yield nothing new - after all, we have
dump=mdb.store(IStream(ostream.type,ostream.size,BytesIO(data)))
assertdump.hexsha==sha
# end for each loose object sha to test