@@ -31,7 +31,6 @@ import (
3131"golang.org/x/exp/slices"
3232"golang.org/x/sync/errgroup"
3333"golang.org/x/xerrors"
34- "storj.io/drpc"
3534"tailscale.com/net/speedtest"
3635"tailscale.com/tailcfg"
3736"tailscale.com/types/netlogtype"
@@ -94,7 +93,9 @@ type Options struct {
9493}
9594
9695type Client interface {
97- ConnectRPC (ctx context.Context ) (drpc.Conn ,error )
96+ ConnectRPC23 (ctx context.Context ) (
97+ proto.DRPCAgentClient23 , tailnetproto.DRPCTailnetClient23 ,error ,
98+ )
9899RewriteDERPMap (derpMap * tailcfg.DERPMap )
99100}
100101
@@ -410,7 +411,7 @@ func (t *trySingleflight) Do(key string, fn func()) {
410411fn ()
411412}
412413
413- func (a * agent )reportMetadata (ctx context.Context ,conn drpc. Conn )error {
414+ func (a * agent )reportMetadata (ctx context.Context ,aAPI proto. DRPCAgentClient23 )error {
414415tickerDone := make (chan struct {})
415416collectDone := make (chan struct {})
416417ctx ,cancel := context .WithCancel (ctx )
@@ -572,7 +573,6 @@ func (a *agent) reportMetadata(ctx context.Context, conn drpc.Conn) error {
572573reportTimeout = 30 * time .Second
573574reportError = make (chan error ,1 )
574575reportInFlight = false
575- aAPI = proto .NewDRPCAgentClient (conn )
576576)
577577
578578for {
@@ -627,8 +627,7 @@ func (a *agent) reportMetadata(ctx context.Context, conn drpc.Conn) error {
627627
628628// reportLifecycle reports the current lifecycle state once. All state
629629// changes are reported in order.
630- func (a * agent )reportLifecycle (ctx context.Context ,conn drpc.Conn )error {
631- aAPI := proto .NewDRPCAgentClient (conn )
630+ func (a * agent )reportLifecycle (ctx context.Context ,aAPI proto.DRPCAgentClient23 )error {
632631for {
633632select {
634633case <- a .lifecycleUpdate :
@@ -710,8 +709,7 @@ func (a *agent) setLifecycle(state codersdk.WorkspaceAgentLifecycle) {
710709// fetchServiceBannerLoop fetches the service banner on an interval. It will
711710// not be fetched immediately; the expectation is that it is primed elsewhere
712711// (and must be done before the session actually starts).
713- func (a * agent )fetchServiceBannerLoop (ctx context.Context ,conn drpc.Conn )error {
714- aAPI := proto .NewDRPCAgentClient (conn )
712+ func (a * agent )fetchServiceBannerLoop (ctx context.Context ,aAPI proto.DRPCAgentClient23 )error {
715713ticker := time .NewTicker (a .announcementBannersRefreshInterval )
716714defer ticker .Stop ()
717715for {
@@ -737,7 +735,7 @@ func (a *agent) fetchServiceBannerLoop(ctx context.Context, conn drpc.Conn) erro
737735}
738736
739737func (a * agent )run () (retErr error ) {
740- // This allows the agent to refreshit's token if necessary.
738+ // This allows the agent to refreshits token if necessary.
741739// For instance identity this is required, since the instance
742740// may not have re-provisioned, but a new agent ID was created.
743741sessionToken ,err := a .exchangeToken (a .hardCtx )
@@ -747,12 +745,12 @@ func (a *agent) run() (retErr error) {
747745a .sessionToken .Store (& sessionToken )
748746
749747// ConnectRPC returns the dRPC connection we use for the Agent and Tailnet v2+ APIs
750- conn , err := a .client .ConnectRPC (a .hardCtx )
748+ aAPI , tAPI , err := a .client .ConnectRPC23 (a .hardCtx )
751749if err != nil {
752750return err
753751}
754752defer func () {
755- cErr := conn .Close ()
753+ cErr := aAPI . DRPCConn () .Close ()
756754if cErr != nil {
757755a .logger .Debug (a .hardCtx ,"error closing drpc connection" ,slog .Error (err ))
758756}
@@ -761,11 +759,10 @@ func (a *agent) run() (retErr error) {
761759// A lot of routines need the agent API / tailnet API connection. We run them in their own
762760// goroutines in parallel, but errors in any routine will cause them all to exit so we can
763761// redial the coder server and retry.
764- connMan := newAPIConnRoutineManager (a .gracefulCtx ,a .hardCtx ,a .logger ,conn )
762+ connMan := newAPIConnRoutineManager (a .gracefulCtx ,a .hardCtx ,a .logger ,aAPI , tAPI )
765763
766- connMan .start ("init notification banners" ,gracefulShutdownBehaviorStop ,
767- func (ctx context.Context ,conn drpc.Conn )error {
768- aAPI := proto .NewDRPCAgentClient (conn )
764+ connMan .startAgentAPI ("init notification banners" ,gracefulShutdownBehaviorStop ,
765+ func (ctx context.Context ,aAPI proto.DRPCAgentClient23 )error {
769766bannersProto ,err := aAPI .GetAnnouncementBanners (ctx ,& proto.GetAnnouncementBannersRequest {})
770767if err != nil {
771768return xerrors .Errorf ("fetch service banner: %w" ,err )
@@ -781,9 +778,9 @@ func (a *agent) run() (retErr error) {
781778
782779// sending logs gets gracefulShutdownBehaviorRemain because we want to send logs generated by
783780// shutdown scripts.
784- connMan .start ("send logs" ,gracefulShutdownBehaviorRemain ,
785- func (ctx context.Context ,conn drpc. Conn )error {
786- err := a .logSender .SendLoop (ctx ,proto . NewDRPCAgentClient ( conn ) )
781+ connMan .startAgentAPI ("send logs" ,gracefulShutdownBehaviorRemain ,
782+ func (ctx context.Context ,aAPI proto. DRPCAgentClient23 )error {
783+ err := a .logSender .SendLoop (ctx ,aAPI )
787784if xerrors .Is (err ,agentsdk .LogLimitExceededError ) {
788785// we don't want this error to tear down the API connection and propagate to the
789786// other routines that use the API. The LogSender has already dropped a warning
@@ -795,10 +792,10 @@ func (a *agent) run() (retErr error) {
795792
796793// part of graceful shut down is reporting the final lifecycle states, e.g "ShuttingDown" so the
797794// lifecycle reporting has to be via gracefulShutdownBehaviorRemain
798- connMan .start ("report lifecycle" ,gracefulShutdownBehaviorRemain ,a .reportLifecycle )
795+ connMan .startAgentAPI ("report lifecycle" ,gracefulShutdownBehaviorRemain ,a .reportLifecycle )
799796
800797// metadata reporting can cease as soon as we start gracefully shutting down
801- connMan .start ("report metadata" ,gracefulShutdownBehaviorStop ,a .reportMetadata )
798+ connMan .startAgentAPI ("report metadata" ,gracefulShutdownBehaviorStop ,a .reportMetadata )
802799
803800// channels to sync goroutines below
804801// handle manifest
@@ -819,55 +816,55 @@ func (a *agent) run() (retErr error) {
819816networkOK := newCheckpoint (a .logger )
820817manifestOK := newCheckpoint (a .logger )
821818
822- connMan .start ("handle manifest" ,gracefulShutdownBehaviorStop ,a .handleManifest (manifestOK ))
819+ connMan .startAgentAPI ("handle manifest" ,gracefulShutdownBehaviorStop ,a .handleManifest (manifestOK ))
823820
824- connMan .start ("app health reporter" ,gracefulShutdownBehaviorStop ,
825- func (ctx context.Context ,conn drpc. Conn )error {
821+ connMan .startAgentAPI ("app health reporter" ,gracefulShutdownBehaviorStop ,
822+ func (ctx context.Context ,aAPI proto. DRPCAgentClient23 )error {
826823if err := manifestOK .wait (ctx );err != nil {
827824return xerrors .Errorf ("no manifest: %w" ,err )
828825}
829826manifest := a .manifest .Load ()
830827NewWorkspaceAppHealthReporter (
831- a .logger ,manifest .Apps ,agentsdk .AppHealthPoster (proto . NewDRPCAgentClient ( conn ) ),
828+ a .logger ,manifest .Apps ,agentsdk .AppHealthPoster (aAPI ),
832829)(ctx )
833830return nil
834831})
835832
836- connMan .start ("create or update network" ,gracefulShutdownBehaviorStop ,
833+ connMan .startAgentAPI ("create or update network" ,gracefulShutdownBehaviorStop ,
837834a .createOrUpdateNetwork (manifestOK ,networkOK ))
838835
839- connMan .start ("coordination" ,gracefulShutdownBehaviorStop ,
840- func (ctx context.Context ,conn drpc. Conn )error {
836+ connMan .startTailnetAPI ("coordination" ,gracefulShutdownBehaviorStop ,
837+ func (ctx context.Context ,tAPI tailnetproto. DRPCTailnetClient23 )error {
841838if err := networkOK .wait (ctx );err != nil {
842839return xerrors .Errorf ("no network: %w" ,err )
843840}
844- return a .runCoordinator (ctx ,conn ,a .network )
841+ return a .runCoordinator (ctx ,tAPI ,a .network )
845842},
846843)
847844
848- connMan .start ("derp map subscriber" ,gracefulShutdownBehaviorStop ,
849- func (ctx context.Context ,conn drpc. Conn )error {
845+ connMan .startTailnetAPI ("derp map subscriber" ,gracefulShutdownBehaviorStop ,
846+ func (ctx context.Context ,tAPI tailnetproto. DRPCTailnetClient23 )error {
850847if err := networkOK .wait (ctx );err != nil {
851848return xerrors .Errorf ("no network: %w" ,err )
852849}
853- return a .runDERPMapSubscriber (ctx ,conn ,a .network )
850+ return a .runDERPMapSubscriber (ctx ,tAPI ,a .network )
854851})
855852
856- connMan .start ("fetch service banner loop" ,gracefulShutdownBehaviorStop ,a .fetchServiceBannerLoop )
853+ connMan .startAgentAPI ("fetch service banner loop" ,gracefulShutdownBehaviorStop ,a .fetchServiceBannerLoop )
857854
858- connMan .start ("stats report loop" ,gracefulShutdownBehaviorStop ,func (ctx context.Context ,conn drpc. Conn )error {
855+ connMan .startAgentAPI ("stats report loop" ,gracefulShutdownBehaviorStop ,func (ctx context.Context ,aAPI proto. DRPCAgentClient23 )error {
859856if err := networkOK .wait (ctx );err != nil {
860857return xerrors .Errorf ("no network: %w" ,err )
861858}
862- return a .statsReporter .reportLoop (ctx ,proto . NewDRPCAgentClient ( conn ) )
859+ return a .statsReporter .reportLoop (ctx ,aAPI )
863860})
864861
865862return connMan .wait ()
866863}
867864
868865// handleManifest returns a function that fetches and processes the manifest
869- func (a * agent )handleManifest (manifestOK * checkpoint )func (ctx context.Context ,conn drpc. Conn )error {
870- return func (ctx context.Context ,conn drpc. Conn )error {
866+ func (a * agent )handleManifest (manifestOK * checkpoint )func (ctx context.Context ,aAPI proto. DRPCAgentClient23 )error {
867+ return func (ctx context.Context ,aAPI proto. DRPCAgentClient23 )error {
871868var (
872869sentResult = false
873870err error
@@ -877,7 +874,6 @@ func (a *agent) handleManifest(manifestOK *checkpoint) func(ctx context.Context,
877874manifestOK .complete (err )
878875}
879876}()
880- aAPI := proto .NewDRPCAgentClient (conn )
881877mp ,err := aAPI .GetManifest (ctx ,& proto.GetManifestRequest {})
882878if err != nil {
883879return xerrors .Errorf ("fetch metadata: %w" ,err )
@@ -977,8 +973,8 @@ func (a *agent) handleManifest(manifestOK *checkpoint) func(ctx context.Context,
977973
978974// createOrUpdateNetwork waits for the manifest to be set using manifestOK, then creates or updates
979975// the tailnet using the information in the manifest
980- func (a * agent )createOrUpdateNetwork (manifestOK ,networkOK * checkpoint )func (context.Context ,drpc. Conn )error {
981- return func (ctx context.Context ,_ drpc. Conn ) (retErr error ) {
976+ func (a * agent )createOrUpdateNetwork (manifestOK ,networkOK * checkpoint )func (context.Context ,proto. DRPCAgentClient23 )error {
977+ return func (ctx context.Context ,_ proto. DRPCAgentClient23 ) (retErr error ) {
982978if err := manifestOK .wait (ctx );err != nil {
983979return xerrors .Errorf ("no manifest: %w" ,err )
984980}
@@ -1325,9 +1321,8 @@ func (a *agent) createTailnet(ctx context.Context, agentID uuid.UUID, derpMap *t
13251321
13261322// runCoordinator runs a coordinator and returns whether a reconnect
13271323// should occur.
1328- func (a * agent )runCoordinator (ctx context.Context ,conn drpc. Conn ,network * tailnet.Conn )error {
1324+ func (a * agent )runCoordinator (ctx context.Context ,tClient tailnetproto. DRPCTailnetClient23 ,network * tailnet.Conn )error {
13291325defer a .logger .Debug (ctx ,"disconnected from coordination RPC" )
1330- tClient := tailnetproto .NewDRPCTailnetClient (conn )
13311326// we run the RPC on the hardCtx so that we have a chance to send the disconnect message if we
13321327// gracefully shut down.
13331328coordinate ,err := tClient .Coordinate (a .hardCtx )
@@ -1373,11 +1368,10 @@ func (a *agent) runCoordinator(ctx context.Context, conn drpc.Conn, network *tai
13731368}
13741369
13751370// runDERPMapSubscriber runs a coordinator and returns if a reconnect should occur.
1376- func (a * agent )runDERPMapSubscriber (ctx context.Context ,conn drpc. Conn ,network * tailnet.Conn )error {
1371+ func (a * agent )runDERPMapSubscriber (ctx context.Context ,tClient tailnetproto. DRPCTailnetClient23 ,network * tailnet.Conn )error {
13771372defer a .logger .Debug (ctx ,"disconnected from derp map RPC" )
13781373ctx ,cancel := context .WithCancel (ctx )
13791374defer cancel ()
1380- tClient := tailnetproto .NewDRPCTailnetClient (conn )
13811375stream ,err := tClient .StreamDERPMaps (ctx ,& tailnetproto.StreamDERPMapsRequest {})
13821376if err != nil {
13831377return xerrors .Errorf ("stream DERP Maps: %w" ,err )
@@ -1981,13 +1975,17 @@ const (
19811975
19821976type apiConnRoutineManager struct {
19831977logger slog.Logger
1984- conn drpc.Conn
1978+ aAPI proto.DRPCAgentClient23
1979+ tAPI tailnetproto.DRPCTailnetClient23
19851980eg * errgroup.Group
19861981stopCtx context.Context
19871982remainCtx context.Context
19881983}
19891984
1990- func newAPIConnRoutineManager (gracefulCtx ,hardCtx context.Context ,logger slog.Logger ,conn drpc.Conn )* apiConnRoutineManager {
1985+ func newAPIConnRoutineManager (
1986+ gracefulCtx ,hardCtx context.Context ,logger slog.Logger ,
1987+ aAPI proto.DRPCAgentClient23 ,tAPI tailnetproto.DRPCTailnetClient23 ,
1988+ )* apiConnRoutineManager {
19911989// routines that remain in operation during graceful shutdown use the remainCtx. They'll still
19921990// exit if the errgroup hits an error, which usually means a problem with the conn.
19931991eg ,remainCtx := errgroup .WithContext (hardCtx )
@@ -2007,17 +2005,60 @@ func newAPIConnRoutineManager(gracefulCtx, hardCtx context.Context, logger slog.
20072005stopCtx := eitherContext (remainCtx ,gracefulCtx )
20082006return & apiConnRoutineManager {
20092007logger :logger ,
2010- conn :conn ,
2008+ aAPI :aAPI ,
2009+ tAPI :tAPI ,
20112010eg :eg ,
20122011stopCtx :stopCtx ,
20132012remainCtx :remainCtx ,
20142013}
20152014}
20162015
2017- func (a * apiConnRoutineManager )start (name string ,b gracefulShutdownBehavior ,f func (context.Context , drpc.Conn )error ) {
2016+ // startAgentAPI starts a routine that uses the Agent API. c.f. startTailnetAPI which is the same
2017+ // but for Tailnet.
2018+ func (a * apiConnRoutineManager )startAgentAPI (
2019+ name string ,behavior gracefulShutdownBehavior ,
2020+ f func (context.Context , proto.DRPCAgentClient23 )error ,
2021+ ) {
2022+ logger := a .logger .With (slog .F ("name" ,name ))
2023+ var ctx context.Context
2024+ switch behavior {
2025+ case gracefulShutdownBehaviorStop :
2026+ ctx = a .stopCtx
2027+ case gracefulShutdownBehaviorRemain :
2028+ ctx = a .remainCtx
2029+ default :
2030+ panic ("unknown behavior" )
2031+ }
2032+ a .eg .Go (func ()error {
2033+ logger .Debug (ctx ,"starting agent routine" )
2034+ err := f (ctx ,a .aAPI )
2035+ if xerrors .Is (err ,context .Canceled )&& ctx .Err ()!= nil {
2036+ logger .Debug (ctx ,"swallowing context canceled" )
2037+ // Don't propagate context canceled errors to the error group, because we don't want the
2038+ // graceful context being canceled to halt the work of routines with
2039+ // gracefulShutdownBehaviorRemain. Note that we check both that the error is
2040+ // context.Canceled and that *our* context is currently canceled, because when Coderd
2041+ // unilaterally closes the API connection (for example if the build is outdated), it can
2042+ // sometimes show up as context.Canceled in our RPC calls.
2043+ return nil
2044+ }
2045+ logger .Debug (ctx ,"routine exited" ,slog .Error (err ))
2046+ if err != nil {
2047+ return xerrors .Errorf ("error in routine %s: %w" ,name ,err )
2048+ }
2049+ return nil
2050+ })
2051+ }
2052+
2053+ // startTailnetAPI starts a routine that uses the Tailnet API. c.f. startAgentAPI which is the same
2054+ // but for the Agent API.
2055+ func (a * apiConnRoutineManager )startTailnetAPI (
2056+ name string ,behavior gracefulShutdownBehavior ,
2057+ f func (context.Context , tailnetproto.DRPCTailnetClient23 )error ,
2058+ ) {
20182059logger := a .logger .With (slog .F ("name" ,name ))
20192060var ctx context.Context
2020- switch b {
2061+ switch behavior {
20212062case gracefulShutdownBehaviorStop :
20222063ctx = a .stopCtx
20232064case gracefulShutdownBehaviorRemain :
@@ -2026,8 +2067,8 @@ func (a *apiConnRoutineManager) start(name string, b gracefulShutdownBehavior, f
20262067panic ("unknown behavior" )
20272068}
20282069a .eg .Go (func ()error {
2029- logger .Debug (ctx ,"starting routine" )
2030- err := f (ctx ,a .conn )
2070+ logger .Debug (ctx ,"startingtailnet routine" )
2071+ err := f (ctx ,a .tAPI )
20312072if xerrors .Is (err ,context .Canceled )&& ctx .Err ()!= nil {
20322073logger .Debug (ctx ,"swallowing context canceled" )
20332074// Don't propagate context canceled errors to the error group, because we don't want the