@@ -10,6 +10,14 @@ def __init__(self):
10
10
self .queue = Queue ()
11
11
self .events = []
12
12
self .running_events = {}
13
+ self .last_aggregation = datetime .datetime .now ()
14
+ self .agg_template = {
15
+ 'commit' :0 ,
16
+ 'rollback' :0 ,
17
+ 'max_latency' :0.0 ,
18
+ 'running' :0 ,
19
+ 'running_latency' :0.0
20
+ }
13
21
14
22
def register_start (self ,name ):
15
23
event_id = uuid .uuid4 ()
@@ -54,22 +62,24 @@ def aggregate(self):
54
62
55
63
agg = {}
56
64
for ev in self .events :
57
- if ev ['name' ]in agg :
58
- named_agg = agg [ev ['name' ]]
59
- latency = (ev ['finished_at' ]- ev ['started_at' ]).total_seconds ()
60
- if ev ['status' ]in named_agg :
61
- named_agg [ev ['status' ]]+= 1
62
- if named_agg ['max_latency' ]< latency :
63
- named_agg ['max_latency' ]= latency
64
- else :
65
- named_agg [ev ['status' ]]= 0
66
- named_agg ['max_latency' ]= latency
67
- else :
68
- agg [ev ['name' ]]= {}
65
+ if ev ['finished_at' ]< self .last_aggregation :
66
+ continue
67
+
68
+ if ev ['name' ]not in agg :
69
+ agg [ev ['name' ]]= self .agg_template .copy ()
70
+
71
+ named_agg = agg [ev ['name' ]]
72
+ latency = (ev ['finished_at' ]- ev ['started_at' ]).total_seconds ()
73
+ named_agg [ev ['status' ]]+= 1
74
+ if named_agg ['max_latency' ]< latency :
75
+ named_agg ['max_latency' ]= latency
69
76
70
77
for value in self .running_events .itervalues ():
78
+ if value ['name' ]not in agg :
79
+ agg [value ['name' ]]= self .agg_template .copy ()
80
+
71
81
named_agg = agg [value ['name' ]]
72
- latency = (datetime .datetime .now ()- ev [ 'started_at ' ]).total_seconds ()
82
+ latency = (datetime .datetime .now ()- value [ 'time ' ]).total_seconds ()
73
83
if 'started' in named_agg :
74
84
named_agg ['running' ]+= 1
75
85
if latency > named_agg ['running_latency' ]:
@@ -78,6 +88,7 @@ def aggregate(self):
78
88
named_agg ['running' ]= 1
79
89
named_agg ['running_latency' ]= latency
80
90
91
+ self .last_aggregation = datetime .datetime .now ()
81
92
return agg
82
93
83
94
def aggregate_by (self ,period ):