- Notifications
You must be signed in to change notification settings - Fork68
Expand file tree
/
Copy pathmem.py
More file actions
110 lines (86 loc) · 3.26 KB
/
mem.py
File metadata and controls
110 lines (86 loc) · 3.26 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
# Copyright (C) 2010, 2011 Sebastian Thiel (byronimo@gmail.com) and contributors
#
# This module is part of GitDB and is released under
# the New BSD License: https://opensource.org/license/bsd-3-clause/
"""Contains the MemoryDatabase implementation"""
fromgitdb.db.looseimportLooseObjectDB
fromgitdb.db.baseimport (
ObjectDBR,
ObjectDBW
)
fromgitdb.baseimport (
OStream,
IStream,
)
fromgitdb.excimport (
BadObject,
UnsupportedOperation
)
fromgitdb.streamimport (
ZippedStoreShaWriter,
DecompressMemMapReader,
)
fromioimportBytesIO
__all__= ("MemoryDB", )
classMemoryDB(ObjectDBR,ObjectDBW):
"""A memory database stores everything to memory, providing fast IO and object
retrieval. It should be used to buffer results and obtain SHAs before writing
it to the actual physical storage, as it allows to query whether object already
exists in the target storage before introducing actual IO"""
def__init__(self):
super().__init__()
self._db=LooseObjectDB("path/doesnt/matter")
# maps 20 byte shas to their OStream objects
self._cache=dict()
defset_ostream(self,stream):
raiseUnsupportedOperation("MemoryDB's always stream into memory")
defstore(self,istream):
zstream=ZippedStoreShaWriter()
self._db.set_ostream(zstream)
istream=self._db.store(istream)
zstream.close()# close to flush
zstream.seek(0)
# don't provide a size, the stream is written in object format, hence the
# header needs decompression
decomp_stream=DecompressMemMapReader(zstream.getvalue(),close_on_deletion=False)
self._cache[istream.binsha]=OStream(istream.binsha,istream.type,istream.size,decomp_stream)
returnistream
defhas_object(self,sha):
returnshainself._cache
definfo(self,sha):
# we always return streams, which are infos as well
returnself.stream(sha)
defstream(self,sha):
try:
ostream=self._cache[sha]
# rewind stream for the next one to read
ostream.stream.seek(0)
returnostream
exceptKeyErrorase:
raiseBadObject(sha)frome
# END exception handling
defsize(self):
returnlen(self._cache)
defsha_iter(self):
returnself._cache.keys()
#{ Interface
defstream_copy(self,sha_iter,odb):
"""Copy the streams as identified by sha's yielded by sha_iter into the given odb
The streams will be copied directly
**Note:** the object will only be written if it did not exist in the target db
:return: amount of streams actually copied into odb. If smaller than the amount
of input shas, one or more objects did already exist in odb"""
count=0
forshainsha_iter:
ifodb.has_object(sha):
continue
# END check object existence
ostream=self.stream(sha)
# compressed data including header
sio=BytesIO(ostream.stream.data())
istream=IStream(ostream.type,ostream.size,sio,sha)
odb.store(istream)
count+=1
# END for each sha
returncount
#} END interface