Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit3a5c3ca

Browse files
committed
feat(agent): add agent unit manager
1 parente96ab0e commit3a5c3ca

File tree

2 files changed

+998
-0
lines changed

2 files changed

+998
-0
lines changed

‎agent/unit/manager.go‎

Lines changed: 307 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,307 @@
1+
package unit
2+
3+
import (
4+
"sync"
5+
6+
"golang.org/x/xerrors"
7+
)
8+
9+
// ErrConsumerNotFound is returned when a consumer ID is not registered.
10+
varErrConsumerNotFound=xerrors.New("consumer not found")
11+
12+
// ErrConsumerAlreadyRegistered is returned when a consumer ID is already registered.
13+
varErrConsumerAlreadyRegistered=xerrors.New("consumer already registered")
14+
15+
// ErrCannotUpdateOtherConsumer is returned when attempting to update another consumer's status.
16+
varErrCannotUpdateOtherConsumer=xerrors.New("cannot update other consumer's status")
17+
18+
// ErrDependenciesNotSatisfied is returned when a consumer's dependencies are not satisfied.
19+
varErrDependenciesNotSatisfied=xerrors.New("unit dependencies not satisfied")
20+
21+
// ErrSameStatusAlreadySet is returned when attempting to set the same status as the current status.
22+
varErrSameStatusAlreadySet=xerrors.New("same status already set")
23+
24+
// Status constants for dependency tracking
25+
const (
26+
StatusStarted="started"
27+
StatusComplete="completed"
28+
)
29+
30+
// dependencyVertex represents a vertex in the dependency graph that is associated with a consumer.
31+
typedependencyVertex[ConsumerIDcomparable]struct {
32+
IDConsumerID
33+
}
34+
35+
// Dependency represents a dependency relationship between consumers.
36+
typeDependency[StatusType,ConsumerIDcomparable]struct {
37+
ConsumerConsumerID
38+
DependsOnConsumerID
39+
RequiredStatusStatusType
40+
CurrentStatusStatusType
41+
IsSatisfiedbool
42+
}
43+
44+
// Manager provides reactive dependency tracking over a Graph.
45+
// It manages consumer registration, dependency relationships, and status updates
46+
// with automatic recalculation of readiness when dependencies are satisfied.
47+
typeManager[StatusType,ConsumerIDcomparable]struct {
48+
mu sync.RWMutex
49+
50+
// The underlying graph that stores dependency relationships
51+
graph*Graph[StatusType,*dependencyVertex[ConsumerID]]
52+
53+
// Track current status of each consumer
54+
consumerStatusmap[ConsumerID]StatusType
55+
56+
// Track readiness state (cached to avoid repeated graph traversal)
57+
consumerReadinessmap[ConsumerID]bool
58+
59+
// Track which consumers are registered
60+
registeredConsumersmap[ConsumerID]bool
61+
62+
// Store vertex instances for each consumer to ensure consistent references
63+
consumerVerticesmap[ConsumerID]*dependencyVertex[ConsumerID]
64+
}
65+
66+
// NewManager creates a new Manager instance.
67+
funcNewManager[StatusType,ConsumerIDcomparable]()*Manager[StatusType,ConsumerID] {
68+
return&Manager[StatusType,ConsumerID]{
69+
graph:&Graph[StatusType,*dependencyVertex[ConsumerID]]{},
70+
consumerStatus:make(map[ConsumerID]StatusType),
71+
consumerReadiness:make(map[ConsumerID]bool),
72+
registeredConsumers:make(map[ConsumerID]bool),
73+
consumerVertices:make(map[ConsumerID]*dependencyVertex[ConsumerID]),
74+
}
75+
}
76+
77+
// Register registers a new consumer as a vertex in the dependency graph.
78+
func (dt*Manager[StatusType,ConsumerID])Register(idConsumerID)error {
79+
dt.mu.Lock()
80+
deferdt.mu.Unlock()
81+
82+
ifdt.registeredConsumers[id] {
83+
returnErrConsumerAlreadyRegistered
84+
}
85+
86+
// Create and store the vertex for this consumer
87+
vertex:=&dependencyVertex[ConsumerID]{ID:id}
88+
dt.consumerVertices[id]=vertex
89+
dt.registeredConsumers[id]=true
90+
dt.consumerReadiness[id]=true// New consumers start as ready (no dependencies)
91+
92+
returnnil
93+
}
94+
95+
// AddDependency adds a dependency relationship between consumers.
96+
// The consumer depends on the dependsOn consumer reaching the requiredStatus.
97+
func (dt*Manager[StatusType,ConsumerID])AddDependency(consumerConsumerID,dependsOnConsumerID,requiredStatusStatusType)error {
98+
dt.mu.Lock()
99+
deferdt.mu.Unlock()
100+
101+
if!dt.registeredConsumers[consumer] {
102+
returnxerrors.Errorf("consumer %v is not registered",consumer)
103+
}
104+
if!dt.registeredConsumers[dependsOn] {
105+
returnxerrors.Errorf("consumer %v is not registered",dependsOn)
106+
}
107+
108+
// Get the stored vertices for both consumers
109+
consumerVertex:=dt.consumerVertices[consumer]
110+
dependsOnVertex:=dt.consumerVertices[dependsOn]
111+
112+
// Add the dependency edge to the graph
113+
// The edge goes from consumer to dependsOn, representing the dependency
114+
err:=dt.graph.AddEdge(consumerVertex,dependsOnVertex,requiredStatus)
115+
iferr!=nil {
116+
returnxerrors.Errorf("failed to add dependency: %w",err)
117+
}
118+
119+
// Recalculate readiness for the consumer since it now has a dependency
120+
dt.recalculateReadinessUnsafe(consumer)
121+
122+
returnnil
123+
}
124+
125+
// UpdateStatus updates a consumer's status and recalculates readiness for affected dependents.
126+
func (dt*Manager[StatusType,ConsumerID])UpdateStatus(consumerConsumerID,newStatusStatusType)error {
127+
dt.mu.Lock()
128+
deferdt.mu.Unlock()
129+
130+
if!dt.registeredConsumers[consumer] {
131+
returnErrConsumerNotFound
132+
}
133+
134+
// Update the consumer's status
135+
ifdt.consumerStatus[consumer]==newStatus {
136+
returnErrSameStatusAlreadySet
137+
}
138+
dt.consumerStatus[consumer]=newStatus
139+
140+
// Get all consumers that depend on this one (reverse adjacent vertices)
141+
consumerVertex:=dt.consumerVertices[consumer]
142+
dependentEdges:=dt.graph.GetReverseAdjacentVertices(consumerVertex)
143+
144+
// Recalculate readiness for all dependents
145+
for_,edge:=rangedependentEdges {
146+
dt.recalculateReadinessUnsafe(edge.From.ID)
147+
}
148+
149+
returnnil
150+
}
151+
152+
// IsReady checks if all dependencies for a consumer are satisfied.
153+
func (dt*Manager[StatusType,ConsumerID])IsReady(consumerConsumerID) (bool,error) {
154+
dt.mu.RLock()
155+
deferdt.mu.RUnlock()
156+
157+
if!dt.registeredConsumers[consumer] {
158+
returnfalse,ErrConsumerNotFound
159+
}
160+
161+
returndt.consumerReadiness[consumer],nil
162+
}
163+
164+
// GetUnmetDependencies returns a list of unsatisfied dependencies for a consumer.
165+
func (dt*Manager[StatusType,ConsumerID])GetUnmetDependencies(consumerConsumerID) ([]Dependency[StatusType,ConsumerID],error) {
166+
dt.mu.RLock()
167+
deferdt.mu.RUnlock()
168+
169+
if!dt.registeredConsumers[consumer] {
170+
returnnil,ErrConsumerNotFound
171+
}
172+
173+
consumerVertex:=dt.consumerVertices[consumer]
174+
forwardEdges:=dt.graph.GetForwardAdjacentVertices(consumerVertex)
175+
176+
varunmetDependencies []Dependency[StatusType,ConsumerID]
177+
178+
for_,edge:=rangeforwardEdges {
179+
dependsOnConsumer:=edge.To.ID
180+
requiredStatus:=edge.Edge
181+
currentStatus,exists:=dt.consumerStatus[dependsOnConsumer]
182+
if!exists {
183+
// If the dependency consumer has no status, it's not satisfied
184+
varzeroStatusStatusType
185+
unmetDependencies=append(unmetDependencies,Dependency[StatusType,ConsumerID]{
186+
Consumer:consumer,
187+
DependsOn:dependsOnConsumer,
188+
RequiredStatus:requiredStatus,
189+
CurrentStatus:zeroStatus,// Zero value
190+
IsSatisfied:false,
191+
})
192+
}else {
193+
isSatisfied:=currentStatus==requiredStatus
194+
if!isSatisfied {
195+
unmetDependencies=append(unmetDependencies,Dependency[StatusType,ConsumerID]{
196+
Consumer:consumer,
197+
DependsOn:dependsOnConsumer,
198+
RequiredStatus:requiredStatus,
199+
CurrentStatus:currentStatus,
200+
IsSatisfied:false,
201+
})
202+
}
203+
}
204+
}
205+
206+
returnunmetDependencies,nil
207+
}
208+
209+
// recalculateReadinessUnsafe recalculates the readiness state for a consumer.
210+
// This method assumes the caller holds the write lock.
211+
func (dt*Manager[StatusType,ConsumerID])recalculateReadinessUnsafe(consumerConsumerID) {
212+
consumerVertex:=dt.consumerVertices[consumer]
213+
forwardEdges:=dt.graph.GetForwardAdjacentVertices(consumerVertex)
214+
215+
// If there are no dependencies, the consumer is ready
216+
iflen(forwardEdges)==0 {
217+
dt.consumerReadiness[consumer]=true
218+
return
219+
}
220+
221+
// Check if all dependencies are satisfied
222+
allSatisfied:=true
223+
for_,edge:=rangeforwardEdges {
224+
dependsOnConsumer:=edge.To.ID
225+
requiredStatus:=edge.Edge
226+
currentStatus,exists:=dt.consumerStatus[dependsOnConsumer]
227+
if!exists||currentStatus!=requiredStatus {
228+
allSatisfied=false
229+
break
230+
}
231+
}
232+
233+
dt.consumerReadiness[consumer]=allSatisfied
234+
}
235+
236+
// GetGraph returns the underlying graph for visualization and debugging.
237+
// This should be used carefully as it exposes the internal graph structure.
238+
func (dt*Manager[StatusType,ConsumerID])GetGraph()*Graph[StatusType,*dependencyVertex[ConsumerID]] {
239+
returndt.graph
240+
}
241+
242+
// GetStatus returns the current status of a consumer.
243+
func (dt*Manager[StatusType,ConsumerID])GetStatus(consumerConsumerID) (StatusType,error) {
244+
dt.mu.RLock()
245+
deferdt.mu.RUnlock()
246+
247+
if!dt.registeredConsumers[consumer] {
248+
varzeroStatusStatusType
249+
returnzeroStatus,ErrConsumerNotFound
250+
}
251+
252+
status,exists:=dt.consumerStatus[consumer]
253+
if!exists {
254+
varzeroStatusStatusType
255+
returnzeroStatus,nil
256+
}
257+
258+
returnstatus,nil
259+
}
260+
261+
// GetAllDependencies returns all dependencies for a consumer, both satisfied and unsatisfied.
262+
func (dt*Manager[StatusType,ConsumerID])GetAllDependencies(consumerConsumerID) ([]Dependency[StatusType,ConsumerID],error) {
263+
dt.mu.RLock()
264+
deferdt.mu.RUnlock()
265+
266+
if!dt.registeredConsumers[consumer] {
267+
returnnil,ErrConsumerNotFound
268+
}
269+
270+
consumerVertex:=dt.consumerVertices[consumer]
271+
forwardEdges:=dt.graph.GetForwardAdjacentVertices(consumerVertex)
272+
273+
varallDependencies []Dependency[StatusType,ConsumerID]
274+
275+
for_,edge:=rangeforwardEdges {
276+
dependsOnConsumer:=edge.To.ID
277+
requiredStatus:=edge.Edge
278+
currentStatus,exists:=dt.consumerStatus[dependsOnConsumer]
279+
if!exists {
280+
// If the dependency consumer has no status, it's not satisfied
281+
varzeroStatusStatusType
282+
allDependencies=append(allDependencies,Dependency[StatusType,ConsumerID]{
283+
Consumer:consumer,
284+
DependsOn:dependsOnConsumer,
285+
RequiredStatus:requiredStatus,
286+
CurrentStatus:zeroStatus,// Zero value
287+
IsSatisfied:false,
288+
})
289+
}else {
290+
isSatisfied:=currentStatus==requiredStatus
291+
allDependencies=append(allDependencies,Dependency[StatusType,ConsumerID]{
292+
Consumer:consumer,
293+
DependsOn:dependsOnConsumer,
294+
RequiredStatus:requiredStatus,
295+
CurrentStatus:currentStatus,
296+
IsSatisfied:isSatisfied,
297+
})
298+
}
299+
}
300+
301+
returnallDependencies,nil
302+
}
303+
304+
// ExportDOT exports the dependency graph to DOT format for visualization.
305+
func (dt*Manager[StatusType,ConsumerID])ExportDOT(namestring) (string,error) {
306+
returndt.graph.ToDOT(name)
307+
}

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp