Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit28f5cb5

Browse files
committed
feat: aibridge package
1 parent7bbab58 commit28f5cb5

File tree

19 files changed

+1514
-40
lines changed

19 files changed

+1514
-40
lines changed

‎Makefile‎

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -630,6 +630,10 @@ TAILNETTEST_MOCKS := \
630630
tailnet/tailnettest/workspaceupdatesprovidermock.go\
631631
tailnet/tailnettest/subscriptionmock.go
632632

633+
AIBRIDGED_MOCKS :=\
634+
aibridged/aibridgedmock/clientmock.go\
635+
aibridged/aibridgedmock/poolmock.go
636+
633637
GEN_FILES :=\
634638
tailnet/proto/tailnet.pb.go\
635639
agent/proto/agent.pb.go\
@@ -654,7 +658,8 @@ GEN_FILES := \
654658
agent/agentcontainers/acmock/acmock.go\
655659
agent/agentcontainers/dcspec/dcspec_gen.go\
656660
coderd/httpmw/loggermw/loggermock/loggermock.go\
657-
codersdk/workspacesdk/agentconnmock/agentconnmock.go
661+
codersdk/workspacesdk/agentconnmock/agentconnmock.go\
662+
$(AIBRIDGED_MOCKS)
658663

659664
# all gen targets should be added here and to gen/mark-fresh
660665
gen: gen/db gen/golden-files$(GEN_FILES)
@@ -706,6 +711,7 @@ gen/mark-fresh:
706711
agent/agentcontainers/dcspec/dcspec_gen.go\
707712
coderd/httpmw/loggermw/loggermock/loggermock.go\
708713
codersdk/workspacesdk/agentconnmock/agentconnmock.go\
714+
$(AIBRIDGED_MOCKS)\
709715
"
710716

711717
for file in $$files; do
@@ -753,6 +759,10 @@ codersdk/workspacesdk/agentconnmock/agentconnmock.go: codersdk/workspacesdk/agen
753759
go generate ./codersdk/workspacesdk/agentconnmock/
754760
touch"$@"
755761

762+
$(AIBRIDGED_MOCKS): aibridged/client.go aibridged/pool.go
763+
go generate ./aibridged/aibridgedmock/
764+
touch"$@"
765+
756766
agent/agentcontainers/dcspec/dcspec_gen.go:\
757767
node_modules/.installed\
758768
agent/agentcontainers/dcspec/devContainer.base.schema.json\

‎aibridged/aibridged.go‎

Lines changed: 185 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,185 @@
1+
package aibridged
2+
3+
import (
4+
"context"
5+
"errors"
6+
"net/http"
7+
"sync"
8+
"time"
9+
10+
"golang.org/x/xerrors"
11+
12+
"cdr.dev/slog"
13+
"github.com/coder/coder/v2/codersdk"
14+
"github.com/coder/retry"
15+
)
16+
17+
// Server provides the AI Bridge functionality.
18+
// It is responsible for:
19+
// - receiving requests on /api/experimental/aibridged/* // TODO: update endpoint once out of experimental
20+
// - manipulating the requests
21+
// - relaying requests to upstream AI services and relaying responses to caller
22+
//
23+
// It requires a [Dialer] to provide a [DRPCClient] implementation to
24+
// communicate with a [DRPCServer] implementation, to persist state and perform other functions.
25+
typeServerstruct {
26+
clientDialerDialer
27+
clientChchanDRPCClient
28+
29+
// A pool of [aibridge.RequestBridge] instances, which service incoming requests.
30+
requestBridgePoolPooler
31+
32+
logger slog.Logger
33+
wg sync.WaitGroup
34+
35+
// initConnectionCh will receive when the daemon connects to coderd for the
36+
// first time.
37+
initConnectionChchanstruct{}
38+
initConnectionOnce sync.Once
39+
40+
// lifecycleCtx is canceled when we start closing.
41+
lifecycleCtx context.Context
42+
// cancelFn closes the lifecycleCtx.
43+
cancelFnfunc()
44+
45+
shutdownOnce sync.Once
46+
}
47+
48+
funcNew(ctx context.Context,poolPooler,rpcDialerDialer,logger slog.Logger) (*Server,error) {
49+
ifrpcDialer==nil {
50+
returnnil,xerrors.Errorf("nil rpcDialer given")
51+
}
52+
53+
ctx,cancel:=context.WithCancel(ctx)
54+
daemon:=&Server{
55+
logger:logger,
56+
clientDialer:rpcDialer,
57+
requestBridgePool:pool,
58+
clientCh:make(chanDRPCClient),
59+
lifecycleCtx:ctx,
60+
cancelFn:cancel,
61+
initConnectionCh:make(chanstruct{}),
62+
}
63+
64+
daemon.wg.Add(1)
65+
godaemon.connect()
66+
67+
returndaemon,nil
68+
}
69+
70+
// Connect establishes a connection to coderd.
71+
func (d*Server)connect() {
72+
deferd.logger.Debug(d.lifecycleCtx,"connect loop exited")
73+
deferd.wg.Done()
74+
75+
logConnect:=d.logger.With(slog.F("context","aibridged.server")).Debug
76+
// An exponential back-off occurs when the connection is failing to dial.
77+
// This is to prevent server spam in case of a coderd outage.
78+
connectLoop:
79+
forretrier:=retry.New(50*time.Millisecond,10*time.Second);retrier.Wait(d.lifecycleCtx); {
80+
// It's possible for the aibridge daemon to be shut down
81+
// before the wait is complete!
82+
ifd.isShutdown() {
83+
return
84+
}
85+
d.logger.Debug(d.lifecycleCtx,"dialing coderd")
86+
client,err:=d.clientDialer(d.lifecycleCtx)
87+
iferr!=nil {
88+
iferrors.Is(err,context.Canceled) {
89+
return
90+
}
91+
varsdkErr*codersdk.Error
92+
// If something is wrong with our auth, stop trying to connect.
93+
iferrors.As(err,&sdkErr)&&sdkErr.StatusCode()==http.StatusForbidden {
94+
d.logger.Error(d.lifecycleCtx,"not authorized to dial coderd",slog.Error(err))
95+
return
96+
}
97+
ifd.isShutdown() {
98+
return
99+
}
100+
d.logger.Warn(d.lifecycleCtx,"coderd client failed to dial",slog.Error(err))
101+
continue
102+
}
103+
104+
// TODO: log this with INFO level when we implement external aibridge daemons.
105+
logConnect(d.lifecycleCtx,"successfully connected to coderd")
106+
retrier.Reset()
107+
d.initConnectionOnce.Do(func() {
108+
close(d.initConnectionCh)
109+
})
110+
111+
// Serve the client until we are closed or it disconnects.
112+
for {
113+
select {
114+
case<-d.lifecycleCtx.Done():
115+
client.DRPCConn().Close()
116+
return
117+
case<-client.DRPCConn().Closed():
118+
logConnect(d.lifecycleCtx,"connection to coderd closed")
119+
continue connectLoop
120+
cased.clientCh<-client:
121+
continue
122+
}
123+
}
124+
}
125+
}
126+
127+
func (d*Server)Client() (DRPCClient,error) {
128+
select {
129+
case<-d.lifecycleCtx.Done():
130+
returnnil,xerrors.New("context closed")
131+
caseclient:=<-d.clientCh:
132+
returnclient,nil
133+
}
134+
}
135+
136+
// GetRequestHandler retrieves a (possibly reused) [*aibridge.RequestBridge] from the pool, for the given user.
137+
func (d*Server)GetRequestHandler(ctx context.Context,reqRequest) (http.Handler,error) {
138+
ifd.requestBridgePool==nil {
139+
returnnil,xerrors.New("nil requestBridgePool")
140+
}
141+
142+
reqBridge,err:=d.requestBridgePool.Acquire(ctx,req,d.Client)
143+
iferr!=nil {
144+
returnnil,xerrors.Errorf("acquire request bridge: %w",err)
145+
}
146+
147+
returnreqBridge,nil
148+
}
149+
150+
// isShutdown returns whether the Server is shutdown or not.
151+
func (d*Server)isShutdown()bool {
152+
select {
153+
case<-d.lifecycleCtx.Done():
154+
returntrue
155+
default:
156+
returnfalse
157+
}
158+
}
159+
160+
// Shutdown waits for all exiting in-flight requests to complete, or the context to expire, whichever comes first.
161+
func (d*Server)Shutdown(ctx context.Context)error {
162+
varerrerror
163+
d.shutdownOnce.Do(func() {
164+
d.cancelFn()
165+
166+
// Wait for any outstanding connections to terminate.
167+
d.wg.Wait()
168+
169+
select {
170+
case<-ctx.Done():
171+
d.logger.Warn(ctx,"graceful shutdown failed",slog.Error(ctx.Err()))
172+
err=ctx.Err()
173+
return
174+
default:
175+
}
176+
177+
d.logger.Info(ctx,"shutting down request pool")
178+
iferr=d.requestBridgePool.Shutdown(ctx);err!=nil {
179+
d.logger.Error(ctx,"request pool shutdown failed with error",slog.Error(err))
180+
}
181+
182+
d.logger.Info(ctx,"gracefully shutdown")
183+
})
184+
returnerr
185+
}

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp