- Notifications
You must be signed in to change notification settings - Fork68
Expand file tree
/
Copy pathtest_example.py
More file actions
64 lines (49 loc) · 1.64 KB
/
test_example.py
File metadata and controls
64 lines (49 loc) · 1.64 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
# Copyright (C) 2010, 2011 Sebastian Thiel (byronimo@gmail.com) and contributors
#
# This module is part of GitDB and is released under
# the New BSD License: http://www.opensource.org/licenses/bsd-license.php
"""Module with examples from the tutorial section of the docs"""
fromlibimport*
fromgitdbimportIStream
fromgitdb.db.pyimportPureLooseObjectODB
fromgitdb.utilimportpool
fromcStringIOimportStringIO
fromasyncimportIteratorReader
classTestExamples(TestBase):
deftest_base(self):
ldb=PureLooseObjectODB(fixture_path("../../../.git/objects"))
forsha1inldb.sha_iter():
oinfo=ldb.info(sha1)
ostream=ldb.stream(sha1)
assertoinfo[:3]==ostream[:3]
assertlen(ostream.read())==ostream.size
assertldb.has_object(oinfo.binsha)
# END for each sha in database
# assure we close all files
try:
del(ostream)
del(oinfo)
exceptUnboundLocalError:
pass
# END ignore exception if there are no loose objects
data="my data"
istream=IStream("blob",len(data),StringIO(data))
# the object does not yet have a sha
assertistream.binshaisNone
ldb.store(istream)
# now the sha is set
assertlen(istream.binsha)==20
assertldb.has_object(istream.binsha)
# async operation
# Create a reader from an iterator
reader=IteratorReader(ldb.sha_iter())
# get reader for object streams
info_reader=ldb.stream_async(reader)
# read one
info=info_reader.read(1)[0]
# read all the rest until depletion
ostreams=info_reader.read()
# set the pool to use two threads
pool.set_size(2)
# synchronize the mode of operation
pool.set_size(0)