- Notifications
You must be signed in to change notification settings - Fork1.1k
feat: addcoder exp sync commands to allow script orchestration#20579
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to ourterms of service andprivacy statement. We’ll occasionally send you account related emails.
Already on GitHub?Sign in to your account
Draft
SasSwart wants to merge18 commits intomainChoose a base branch fromjjs/internal-1095-cli
base:main
Could not load branches
Branch not found:{{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline, and old review comments may become outdated.
+4,725 −0
Draft
Changes from1 commit
Commits
Show all changes
18 commits Select commitHold shift + click to select a range
e3dfe45 LLM generated implementation of unit status change communication
SasSwart851c4f9 add a socket to the agent for local IPC
SasSwart34c1370 fix agent socket tests
SasSwart9ca30e2 add a prototype cli command that uses the agent socket
SasSwart4616c82 switch agent socket to drpc. factor components and add tests
SasSwart55c5b70 Rename unit.DependencyTracker to unit.Manager
SasSwart8644712 make the agent socket path configurable
SasSwart216a5ac document initSocketServer and tweak its log levels
SasSwartc322b92 remove agent socket auth for now
SasSwart8c0bfcb Improve agentsocket rpc naming and documentation
SasSwarte6873c8 rename dependency_tracker.go to manager.go
SasSwartf550028 Move unit statuses to the appropriate package
SasSwart820d53b streamline agentsocket server initialization
SasSwart89b060e hide functions that do not need to be public
SasSwart0d3d493 fix an incomplete refactor
SasSwart217ddf4 fix an incomplete refactor
SasSwart10d4e42 remove defunct files
SasSwart9764926 remove defunct test file
SasSwartFile filter
Filter by extension
Conversations
Failed to load comments.
Loading
Uh oh!
There was an error while loading.Please reload this page.
Jump to
Jump to file
Failed to load files.
Loading
Uh oh!
There was an error while loading.Please reload this page.
Diff view
Diff view
NextNext commit
LLM generated implementation of unit status change communication
- Loading branch information
Uh oh!
There was an error while loading.Please reload this page.
commite3dfe45f35a66e0c1a87735a4f9954278ec6c855
There are no files selected for viewing
227 changes: 227 additions & 0 deletionsagent/unit/dependency_tracker.go
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,227 @@ | ||
| package unit | ||
| import ( | ||
| "sync" | ||
| "golang.org/x/xerrors" | ||
| ) | ||
| // ErrConsumerNotFound is returned when a consumer ID is not registered. | ||
| var ErrConsumerNotFound = xerrors.New("consumer not found") | ||
| // ErrCannotUpdateOtherConsumer is returned when attempting to update another consumer's status. | ||
| var ErrCannotUpdateOtherConsumer = xerrors.New("cannot update other consumer's status") | ||
| // dependencyVertex represents a vertex in the dependency graph that is associated with a consumer. | ||
| type dependencyVertex[ConsumerID comparable] struct { | ||
| ID ConsumerID | ||
| } | ||
| // Dependency represents a dependency relationship between consumers. | ||
| type Dependency[StatusType, ConsumerID comparable] struct { | ||
| Consumer ConsumerID | ||
| DependsOn ConsumerID | ||
| RequiredStatus StatusType | ||
| CurrentStatus StatusType | ||
| IsSatisfied bool | ||
| } | ||
| // DependencyTracker provides reactive dependency tracking over a Graph. | ||
| // It manages consumer registration, dependency relationships, and status updates | ||
| // with automatic recalculation of readiness when dependencies are satisfied. | ||
| type DependencyTracker[StatusType, ConsumerID comparable] struct { | ||
| mu sync.RWMutex | ||
| // The underlying graph that stores dependency relationships | ||
| graph *Graph[StatusType, *dependencyVertex[ConsumerID]] | ||
| // Track current status of each consumer | ||
| consumerStatus map[ConsumerID]StatusType | ||
| // Track readiness state (cached to avoid repeated graph traversal) | ||
| consumerReadiness map[ConsumerID]bool | ||
| // Track which consumers are registered | ||
| registeredConsumers map[ConsumerID]bool | ||
| // Store vertex instances for each consumer to ensure consistent references | ||
| consumerVertices map[ConsumerID]*dependencyVertex[ConsumerID] | ||
| } | ||
| // NewDependencyTracker creates a new DependencyTracker instance. | ||
| func NewDependencyTracker[StatusType, ConsumerID comparable]() *DependencyTracker[StatusType, ConsumerID] { | ||
| return &DependencyTracker[StatusType, ConsumerID]{ | ||
| graph: &Graph[StatusType, *dependencyVertex[ConsumerID]]{}, | ||
| consumerStatus: make(map[ConsumerID]StatusType), | ||
| consumerReadiness: make(map[ConsumerID]bool), | ||
| registeredConsumers: make(map[ConsumerID]bool), | ||
| consumerVertices: make(map[ConsumerID]*dependencyVertex[ConsumerID]), | ||
| } | ||
| } | ||
| // Register registers a new consumer as a vertex in the dependency graph. | ||
| func (dt *DependencyTracker[StatusType, ConsumerID]) Register(id ConsumerID) error { | ||
| dt.mu.Lock() | ||
| defer dt.mu.Unlock() | ||
| if dt.registeredConsumers[id] { | ||
| return xerrors.Errorf("consumer %v is already registered", id) | ||
| } | ||
| // Create and store the vertex for this consumer | ||
| vertex := &dependencyVertex[ConsumerID]{ID: id} | ||
| dt.consumerVertices[id] = vertex | ||
| dt.registeredConsumers[id] = true | ||
| dt.consumerReadiness[id] = true // New consumers start as ready (no dependencies) | ||
| return nil | ||
| } | ||
| // AddDependency adds a dependency relationship between consumers. | ||
| // The consumer depends on the dependsOn consumer reaching the requiredStatus. | ||
| func (dt *DependencyTracker[StatusType, ConsumerID]) AddDependency(consumer ConsumerID, dependsOn ConsumerID, requiredStatus StatusType) error { | ||
| dt.mu.Lock() | ||
| defer dt.mu.Unlock() | ||
| if !dt.registeredConsumers[consumer] { | ||
| return xerrors.Errorf("consumer %v is not registered", consumer) | ||
| } | ||
| if !dt.registeredConsumers[dependsOn] { | ||
| return xerrors.Errorf("consumer %v is not registered", dependsOn) | ||
| } | ||
| // Get the stored vertices for both consumers | ||
| consumerVertex := dt.consumerVertices[consumer] | ||
| dependsOnVertex := dt.consumerVertices[dependsOn] | ||
| // Add the dependency edge to the graph | ||
| // The edge goes from consumer to dependsOn, representing the dependency | ||
| err := dt.graph.AddEdge(consumerVertex, dependsOnVertex, requiredStatus) | ||
| if err != nil { | ||
| return xerrors.Errorf("failed to add dependency: %w", err) | ||
| } | ||
| // Recalculate readiness for the consumer since it now has a dependency | ||
| dt.recalculateReadinessUnsafe(consumer) | ||
| return nil | ||
| } | ||
| // UpdateStatus updates a consumer's status and recalculates readiness for affected dependents. | ||
| func (dt *DependencyTracker[StatusType, ConsumerID]) UpdateStatus(consumer ConsumerID, newStatus StatusType) error { | ||
| dt.mu.Lock() | ||
| defer dt.mu.Unlock() | ||
| if !dt.registeredConsumers[consumer] { | ||
| return ErrConsumerNotFound | ||
| } | ||
| // Update the consumer's status | ||
| dt.consumerStatus[consumer] = newStatus | ||
| // Get all consumers that depend on this one (reverse adjacent vertices) | ||
| consumerVertex := dt.consumerVertices[consumer] | ||
| dependentEdges := dt.graph.GetReverseAdjacentVertices(consumerVertex) | ||
| // Recalculate readiness for all dependents | ||
| for _, edge := range dependentEdges { | ||
| dt.recalculateReadinessUnsafe(edge.From.ID) | ||
| } | ||
| return nil | ||
| } | ||
| // IsReady checks if all dependencies for a consumer are satisfied. | ||
| func (dt *DependencyTracker[StatusType, ConsumerID]) IsReady(consumer ConsumerID) (bool, error) { | ||
| dt.mu.RLock() | ||
| defer dt.mu.RUnlock() | ||
| if !dt.registeredConsumers[consumer] { | ||
| return false, ErrConsumerNotFound | ||
| } | ||
| return dt.consumerReadiness[consumer], nil | ||
| } | ||
| // GetUnmetDependencies returns a list of unsatisfied dependencies for a consumer. | ||
| func (dt *DependencyTracker[StatusType, ConsumerID]) GetUnmetDependencies(consumer ConsumerID) ([]Dependency[StatusType, ConsumerID], error) { | ||
| dt.mu.RLock() | ||
| defer dt.mu.RUnlock() | ||
| if !dt.registeredConsumers[consumer] { | ||
| return nil, ErrConsumerNotFound | ||
| } | ||
| consumerVertex := dt.consumerVertices[consumer] | ||
| forwardEdges := dt.graph.GetForwardAdjacentVertices(consumerVertex) | ||
| var unmetDependencies []Dependency[StatusType, ConsumerID] | ||
| for _, edge := range forwardEdges { | ||
| dependsOnConsumer := edge.To.ID | ||
| requiredStatus := edge.Edge | ||
| currentStatus, exists := dt.consumerStatus[dependsOnConsumer] | ||
| if !exists { | ||
| // If the dependency consumer has no status, it's not satisfied | ||
| var zeroStatus StatusType | ||
| unmetDependencies = append(unmetDependencies, Dependency[StatusType, ConsumerID]{ | ||
| Consumer: consumer, | ||
| DependsOn: dependsOnConsumer, | ||
| RequiredStatus: requiredStatus, | ||
| CurrentStatus: zeroStatus, // Zero value | ||
| IsSatisfied: false, | ||
| }) | ||
| } else { | ||
| isSatisfied := currentStatus == requiredStatus | ||
| if !isSatisfied { | ||
| unmetDependencies = append(unmetDependencies, Dependency[StatusType, ConsumerID]{ | ||
| Consumer: consumer, | ||
| DependsOn: dependsOnConsumer, | ||
| RequiredStatus: requiredStatus, | ||
| CurrentStatus: currentStatus, | ||
| IsSatisfied: false, | ||
| }) | ||
| } | ||
| } | ||
| } | ||
| return unmetDependencies, nil | ||
| } | ||
| // recalculateReadinessUnsafe recalculates the readiness state for a consumer. | ||
| // This method assumes the caller holds the write lock. | ||
| func (dt *DependencyTracker[StatusType, ConsumerID]) recalculateReadinessUnsafe(consumer ConsumerID) { | ||
| consumerVertex := dt.consumerVertices[consumer] | ||
| forwardEdges := dt.graph.GetForwardAdjacentVertices(consumerVertex) | ||
| // If there are no dependencies, the consumer is ready | ||
| if len(forwardEdges) == 0 { | ||
| dt.consumerReadiness[consumer] = true | ||
| return | ||
| } | ||
| // Check if all dependencies are satisfied | ||
| allSatisfied := true | ||
| for _, edge := range forwardEdges { | ||
| dependsOnConsumer := edge.To.ID | ||
| requiredStatus := edge.Edge | ||
| currentStatus, exists := dt.consumerStatus[dependsOnConsumer] | ||
| if !exists || currentStatus != requiredStatus { | ||
| allSatisfied = false | ||
| break | ||
| } | ||
| } | ||
| dt.consumerReadiness[consumer] = allSatisfied | ||
| } | ||
| // GetGraph returns the underlying graph for visualization and debugging. | ||
| // This should be used carefully as it exposes the internal graph structure. | ||
| func (dt *DependencyTracker[StatusType, ConsumerID]) GetGraph() *Graph[StatusType, *dependencyVertex[ConsumerID]] { | ||
| return dt.graph | ||
| } | ||
| // ExportDOT exports the dependency graph to DOT format for visualization. | ||
| func (dt *DependencyTracker[StatusType, ConsumerID]) ExportDOT(name string) (string, error) { | ||
| return dt.graph.ToDOT(name) | ||
| } |
Oops, something went wrong.
Uh oh!
There was an error while loading.Please reload this page.
Oops, something went wrong.
Uh oh!
There was an error while loading.Please reload this page.
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.