@@ -21,6 +21,7 @@ import (
21
21
"time"
22
22
23
23
"github.com/coder/coder/v2/aibridged"
24
+ aibridgedproto"github.com/coder/coder/v2/aibridged/proto"
24
25
"github.com/coder/coder/v2/coderd/oauth2provider"
25
26
"github.com/coder/coder/v2/coderd/pproflabel"
26
27
"github.com/coder/coder/v2/coderd/prebuilds"
@@ -45,6 +46,9 @@ import (
45
46
"tailscale.com/types/key"
46
47
"tailscale.com/util/singleflight"
47
48
49
+ "github.com/coder/coder/v2/coderd/aibridgedserver"
50
+ "github.com/coder/coder/v2/provisionerd/proto"
51
+
48
52
"cdr.dev/slog"
49
53
"github.com/coder/quartz"
50
54
"github.com/coder/serpent"
@@ -96,7 +100,6 @@ import (
96
100
"github.com/coder/coder/v2/coderd/workspacestats"
97
101
"github.com/coder/coder/v2/codersdk"
98
102
"github.com/coder/coder/v2/codersdk/healthsdk"
99
- "github.com/coder/coder/v2/provisionerd/proto"
100
103
"github.com/coder/coder/v2/provisionersdk"
101
104
"github.com/coder/coder/v2/site"
102
105
"github.com/coder/coder/v2/tailnet"
@@ -633,6 +636,7 @@ func New(options *Options) *API {
633
636
api .PortSharer .Store (& portsharing .DefaultPortSharer )
634
637
api .PrebuildsClaimer .Store (& prebuilds .DefaultClaimer )
635
638
api .PrebuildsReconciler .Store (& prebuilds .DefaultReconciler )
639
+ api .AIBridgeDaemon .Store (& aibridged .DefaultServer )
636
640
buildInfo := codersdk.BuildInfoResponse {
637
641
ExternalURL :buildinfo .ExternalURL (),
638
642
Version :buildinfo .Version (),
@@ -1828,6 +1832,10 @@ func (api *API) Close() error {
1828
1832
(* current ).Stop (ctx ,nil )
1829
1833
}
1830
1834
1835
+ if current := api .AIBridgeDaemon .Load ();current != nil {
1836
+ _ = (* current ).Close ()
1837
+ }
1838
+
1831
1839
return nil
1832
1840
}
1833
1841
@@ -2001,6 +2009,76 @@ func (api *API) CreateInMemoryTaggedProvisionerDaemon(dialCtx context.Context, n
2001
2009
return proto .NewDRPCProvisionerDaemonClient (clientSession ),nil
2002
2010
}
2003
2011
2012
+ func (api * API )CreateInMemoryAIBridgeDaemon (dialCtx context.Context ) (client aibridged.DRPCClient ,err error ) {
2013
+ // TODO(dannyk): implement options.
2014
+ // TODO(dannyk): implement tracing.
2015
+ // TODO(dannyk): implement API versioning.
2016
+
2017
+ clientSession ,serverSession := drpcsdk .MemTransportPipe ()
2018
+ defer func () {
2019
+ if err != nil {
2020
+ _ = clientSession .Close ()
2021
+ _ = serverSession .Close ()
2022
+ }
2023
+ }()
2024
+
2025
+ mux := drpcmux .New ()
2026
+ api .Logger .Debug (dialCtx ,"starting in-memory aibridge daemon" )
2027
+ logger := api .Logger .Named ("inmem-aibridged" )
2028
+ srv ,err := aibridgedserver .NewServer (api .ctx ,api .Database ,logger ,
2029
+ api .DeploymentValues .AccessURL .String (),api .ExternalAuthConfigs )
2030
+ if err != nil {
2031
+ return nil ,err
2032
+ }
2033
+ err = aibridgedproto .DRPCRegisterRecorder (mux ,srv )
2034
+ if err != nil {
2035
+ return nil ,xerrors .Errorf ("register recorder service: %w" ,err )
2036
+ }
2037
+ err = aibridgedproto .DRPCRegisterMCPConfigurator (mux ,srv )
2038
+ if err != nil {
2039
+ return nil ,xerrors .Errorf ("register MCP configurator service: %w" ,err )
2040
+ }
2041
+ err = aibridgedproto .DRPCRegisterAuthenticator (mux ,srv )
2042
+ if err != nil {
2043
+ return nil ,xerrors .Errorf ("register authenticator service: %w" ,err )
2044
+ }
2045
+ server := drpcserver .NewWithOptions (& tracing.DRPCHandler {Handler :mux },
2046
+ drpcserver.Options {
2047
+ Manager :drpcsdk .DefaultDRPCOptions (nil ),
2048
+ Log :func (err error ) {
2049
+ if xerrors .Is (err ,io .EOF ) {
2050
+ return
2051
+ }
2052
+ logger .Debug (dialCtx ,"drpc server error" ,slog .Error (err ))
2053
+ },
2054
+ },
2055
+ )
2056
+ // in-mem pipes aren't technically "websockets" but they have the same properties as far as the
2057
+ // API is concerned: they are long-lived connections that we need to close before completing
2058
+ // shutdown of the API.
2059
+ api .WebsocketWaitMutex .Lock ()
2060
+ api .WebsocketWaitGroup .Add (1 )
2061
+ api .WebsocketWaitMutex .Unlock ()
2062
+ go func () {
2063
+ defer api .WebsocketWaitGroup .Done ()
2064
+ // Here we pass the background context, since we want the server to keep serving until the
2065
+ // client hangs up. The aibridged is local, in-mem, so there isn't a danger of losing contact with it and
2066
+ // having a dead connection we don't know the status of.
2067
+ err := server .Serve (context .Background (),serverSession )
2068
+ logger .Info (dialCtx ,"aibridge daemon disconnected" ,slog .Error (err ))
2069
+ // Close the sessions, so we don't leak goroutines serving them.
2070
+ _ = clientSession .Close ()
2071
+ _ = serverSession .Close ()
2072
+ }()
2073
+
2074
+ return & aibridged.Client {
2075
+ Conn :clientSession ,
2076
+ DRPCRecorderClient :aibridgedproto .NewDRPCRecorderClient (clientSession ),
2077
+ DRPCMCPConfiguratorClient :aibridgedproto .NewDRPCMCPConfiguratorClient (clientSession ),
2078
+ DRPCAuthenticatorClient :aibridgedproto .NewDRPCAuthenticatorClient (clientSession ),
2079
+ },nil
2080
+ }
2081
+
2004
2082
func (api * API )DERPMap ()* tailcfg.DERPMap {
2005
2083
fn := api .DERPMapper .Load ()
2006
2084
if fn != nil {