Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit0589ddc

Browse files
committed
WIP
Signed-off-by: Danny Kopping <dannykopping@gmail.com>
1 parent1e407fb commit0589ddc

File tree

13 files changed

+1427
-11
lines changed

13 files changed

+1427
-11
lines changed

‎Makefile

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -566,6 +566,7 @@ GEN_FILES := \
566566
provisionersdk/proto/provisioner.pb.go\
567567
provisionerd/proto/provisionerd.pb.go\
568568
vpn/vpn.pb.go\
569+
aibridged/proto/aibridged.pb.go\
569570
$(DB_GEN_FILES)\
570571
$(SITE_GEN_FILES)\
571572
coderd/rbac/object_gen.go\
@@ -725,6 +726,14 @@ vpn/vpn.pb.go: vpn/vpn.proto
725726
--go_opt=paths=source_relative\
726727
./vpn/vpn.proto
727728

729+
aibridged/proto/aibridged.pb.go: aibridged/proto/aibridged.proto
730+
protoc\
731+
--go_out=.\
732+
--go_opt=paths=source_relative\
733+
--go-drpc_out=.\
734+
--go-drpc_opt=paths=source_relative\
735+
./aibridged/proto/aibridged.proto
736+
728737
site/src/api/typesGenerated.ts: site/node_modules/.installed$(wildcard scripts/apitypings/*)$(shell find ./codersdk$(FIND_EXCLUSIONS) -type f -name '*.go')
729738
# -C sets the directory for the go run command
730739
go run -C ./scripts/apitypings main.go>$@

‎aibridged/aibridged.go

Lines changed: 291 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,291 @@
1+
package aibridged
2+
3+
import (
4+
"context"
5+
"errors"
6+
"io"
7+
"net/http"
8+
"sync"
9+
"time"
10+
11+
"cdr.dev/slog"
12+
"github.com/coder/retry"
13+
"github.com/hashicorp/yamux"
14+
"github.com/valyala/fasthttp/fasthttputil"
15+
"golang.org/x/xerrors"
16+
17+
"github.com/coder/coder/v2/aibridged/proto"
18+
"github.com/coder/coder/v2/codersdk"
19+
)
20+
21+
typeDialerfunc(ctx context.Context) (proto.DRPCAIBridgeDaemonClient,error)
22+
23+
typeServerstruct {
24+
clientDialerDialer
25+
clientChchan proto.DRPCAIBridgeDaemonClient
26+
27+
logger slog.Logger
28+
wg sync.WaitGroup
29+
30+
// initConnectionCh will receive when the daemon connects to coderd for the
31+
// first time.
32+
initConnectionChchanstruct{}
33+
initConnectionOnce sync.Once
34+
35+
// mutex protects all subsequent fields
36+
mutex sync.Mutex
37+
// closeContext is canceled when we start closing.
38+
closeContext context.Context
39+
closeCancel context.CancelFunc
40+
// closeError stores the error when closing to return to subsequent callers
41+
closeErrorerror
42+
// closingB is set to true when we start closing
43+
closingBbool
44+
// closedCh will receive when we complete closing
45+
closedChchanstruct{}
46+
// shuttingDownB is set to true when we start graceful shutdown
47+
shuttingDownBbool
48+
// shuttingDownCh will receive when we start graceful shutdown
49+
shuttingDownChchanstruct{}
50+
}
51+
52+
funcNew(clientDialerDialer,logger slog.Logger) (*Server,error) {
53+
ctx,ctxCancel:=context.WithCancel(context.Background())
54+
daemon:=&Server{
55+
logger:logger,
56+
clientDialer:clientDialer,
57+
clientCh:make(chan proto.DRPCAIBridgeDaemonClient),
58+
closeContext:ctx,
59+
closeCancel:ctxCancel,
60+
closedCh:make(chanstruct{}),
61+
shuttingDownCh:make(chanstruct{}),
62+
initConnectionCh:make(chanstruct{}),
63+
}
64+
godaemon.connect()
65+
66+
returndaemon,nil
67+
}// Connect establishes a connection to coderd.
68+
func (s*Server)connect() {
69+
defers.logger.Debug(s.closeContext,"connect loop exited")
70+
defers.wg.Done()
71+
logConnect:=s.logger.Debug
72+
// An exponential back-off occurs when the connection is failing to dial.
73+
// This is to prevent server spam in case of a coderd outage.
74+
connectLoop:
75+
forretrier:=retry.New(50*time.Millisecond,10*time.Second);retrier.Wait(s.closeContext); {
76+
// TODO(dannyk): handle premature close.
77+
//// It's possible for the provisioner daemon to be shut down
78+
//// before the wait is complete!
79+
//if s.isClosed() {
80+
//return
81+
//}
82+
83+
s.logger.Debug(s.closeContext,"dialing coderd")
84+
client,err:=s.clientDialer(s.closeContext)
85+
iferr!=nil {
86+
iferrors.Is(err,context.Canceled) {
87+
return
88+
}
89+
varsdkErr*codersdk.Error
90+
// If something is wrong with our auth, stop trying to connect.
91+
iferrors.As(err,&sdkErr)&&sdkErr.StatusCode()==http.StatusForbidden {
92+
s.logger.Error(s.closeContext,"not authorized to dial coderd",slog.Error(err))
93+
return
94+
}
95+
ifs.isClosed() {
96+
return
97+
}
98+
s.logger.Warn(s.closeContext,"coderd client failed to dial",slog.Error(err))
99+
continue
100+
}
101+
102+
// This log is useful to verify that an external provisioner daemon is
103+
// successfully connecting to coderd. It doesn't add much value if the
104+
// daemon is built-in, so we only log it on the info level if p.externalProvisioner
105+
// is true. This log message is mentioned in the docs:
106+
// https://github.com/coder/coder/blob/5bd86cb1c06561d1d3e90ce689da220467e525c0/docs/admin/provisioners.md#L346
107+
logConnect(s.closeContext,"successfully connected to coderd")
108+
retrier.Reset()
109+
s.initConnectionOnce.Do(func() {
110+
close(s.initConnectionCh)
111+
})
112+
113+
// serve the client until we are closed or it disconnects
114+
for {
115+
select {
116+
case<-s.closeContext.Done():
117+
client.DRPCConn().Close()
118+
return
119+
case<-client.DRPCConn().Closed():
120+
logConnect(s.closeContext,"connection to coderd closed")
121+
continue connectLoop
122+
cases.clientCh<-client:
123+
continue
124+
}
125+
}
126+
}
127+
}
128+
129+
func (s*Server)client() (proto.DRPCAIBridgeDaemonClient,bool) {
130+
select {
131+
case<-s.closeContext.Done():
132+
returnnil,false
133+
case<-s.shuttingDownCh:
134+
// Shutting down should return a nil client and unblock
135+
returnnil,false
136+
caseclient:=<-s.clientCh:
137+
returnclient,true
138+
}
139+
}
140+
141+
func (s*Server)AuditPrompt(ctx context.Context,in*proto.AuditPromptRequest) (*proto.AuditPromptResponse,error) {
142+
out,err:=clientDoWithRetries(ctx,s.client,func(ctx context.Context,client proto.DRPCAIBridgeDaemonClient) (*proto.AuditPromptResponse,error) {
143+
returnclient.AuditPrompt(ctx,in)
144+
})
145+
iferr!=nil {
146+
returnnil,err
147+
}
148+
returnout,nil
149+
}
150+
151+
func (s*Server)ChatCompletions(payload*proto.JSONPayload,stream proto.DRPCOpenAIService_ChatCompletionsStream)error {
152+
// TODO: call OpenAI API.
153+
154+
select {
155+
case<-stream.Context().Done():
156+
returnnil
157+
default:
158+
}
159+
160+
err:=stream.Send(&proto.JSONPayload{
161+
Content:`
162+
{
163+
"id": "chatcmpl-B9MBs8CjcvOU2jLn4n570S5qMJKcT",
164+
"object": "chat.completion",
165+
"created": 1741569952,
166+
"model": "gpt-4.1-2025-04-14",
167+
"choices": [
168+
{
169+
"index": 0,
170+
"message": {
171+
"role": "assistant",
172+
"content": "Hello! How can I assist you today?",
173+
"refusal": null,
174+
"annotations": []
175+
},
176+
"logprobs": null,
177+
"finish_reason": "stop"
178+
}
179+
],
180+
"usage": {
181+
"prompt_tokens": 19,
182+
"completion_tokens": 10,
183+
"total_tokens": 29,
184+
"prompt_tokens_details": {
185+
"cached_tokens": 0,
186+
"audio_tokens": 0
187+
},
188+
"completion_tokens_details": {
189+
"reasoning_tokens": 0,
190+
"audio_tokens": 0,
191+
"accepted_prediction_tokens": 0,
192+
"rejected_prediction_tokens": 0
193+
}
194+
},
195+
"service_tier": "default"
196+
}
197+
`})
198+
iferr!=nil {
199+
returnxerrors.Errorf("stream chat completion response: %w",err)
200+
}
201+
returnnil
202+
}
203+
204+
// TODO: direct copy/paste from provisionerd, abstract into common util.
205+
funcretryable(errerror)bool {
206+
returnxerrors.Is(err,yamux.ErrSessionShutdown)||xerrors.Is(err,io.EOF)||xerrors.Is(err,fasthttputil.ErrInmemoryListenerClosed)||
207+
// annoyingly, dRPC sometimes returns context.Canceled if the transport was closed, even if the context for
208+
// the RPC *is not canceled*. Retrying is fine if the RPC context is not canceled.
209+
xerrors.Is(err,context.Canceled)
210+
}
211+
212+
// clientDoWithRetries runs the function f with a client, and retries with
213+
// backoff until either the error returned is not retryable() or the context
214+
// expires.
215+
// TODO: direct copy/paste from provisionerd, abstract into common util.
216+
funcclientDoWithRetries[Tany](ctx context.Context,
217+
getClientfunc() (proto.DRPCAIBridgeDaemonClient,bool),
218+
ffunc(context.Context, proto.DRPCAIBridgeDaemonClient) (T,error),
219+
) (retT,_error) {
220+
forretrier:=retry.New(25*time.Millisecond,5*time.Second);retrier.Wait(ctx); {
221+
client,ok:=getClient()
222+
if!ok {
223+
continue
224+
}
225+
resp,err:=f(ctx,client)
226+
ifretryable(err) {
227+
continue
228+
}
229+
returnresp,err
230+
}
231+
returnret,ctx.Err()
232+
}
233+
234+
// isClosed returns whether the API is closed or not.
235+
func (s*Server)isClosed()bool {
236+
select {
237+
case<-s.closeContext.Done():
238+
returntrue
239+
default:
240+
returnfalse
241+
}
242+
}
243+
244+
// closeWithError closes the provisioner; subsequent reads/writes will return the error err.
245+
func (s*Server)closeWithError(errerror)error {
246+
s.mutex.Lock()
247+
first:=false
248+
if!s.closingB {
249+
first=true
250+
s.closingB=true
251+
}
252+
// don't hold the mutex while doing I/O.
253+
s.mutex.Unlock()
254+
255+
iffirst {
256+
s.closeCancel()
257+
s.logger.Debug(context.Background(),"waiting for goroutines to exit")
258+
s.wg.Wait()
259+
s.logger.Debug(context.Background(),"closing server with error",slog.Error(err))
260+
s.closeError=err
261+
close(s.closedCh)
262+
returnerr
263+
}
264+
s.logger.Debug(s.closeContext,"waiting for first closer to complete")
265+
<-s.closedCh
266+
s.logger.Debug(s.closeContext,"first closer completed")
267+
returns.closeError
268+
}
269+
270+
// Close ends the aibridge daemon.
271+
func (s*Server)Close()error {
272+
ifs==nil {
273+
returnnil
274+
}
275+
276+
s.logger.Info(s.closeContext,"closing aibridged")
277+
// TODO: invalidate all running requests (cancelling context should be enough?).
278+
errMsg:="aibridged closed gracefully"
279+
err:=s.closeWithError(nil)
280+
iferr!=nil {
281+
errMsg=err.Error()
282+
}
283+
s.logger.Warn(s.closeContext,errMsg)
284+
285+
returnerr
286+
}
287+
288+
func (s*Server)Shutdown(ctx context.Context)error {
289+
// TODO: implement or remove.
290+
returnnil
291+
}

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp