@@ -7,11 +7,13 @@ import (
77"strconv"
88"strings"
99"sync/atomic"
10+ "time"
1011
1112"github.com/google/uuid"
1213"github.com/hashicorp/yamux"
1314"storj.io/drpc/drpcmux"
1415"storj.io/drpc/drpcserver"
16+ "tailscale.com/tailcfg"
1517
1618"cdr.dev/slog"
1719"github.com/coder/coder/v2/tailnet/proto"
@@ -92,10 +94,22 @@ type ClientService struct {
9294
9395// NewClientService returns a ClientService based on the given Coordinator pointer. The pointer is
9496// loaded on each processed connection.
95- func NewClientService (logger slog.Logger ,coordPtr * atomic.Pointer [Coordinator ]) (* ClientService ,error ) {
97+ func NewClientService (
98+ logger slog.Logger ,
99+ coordPtr * atomic.Pointer [Coordinator ],
100+ derpMapUpdateFrequency time.Duration ,
101+ derpMapFn func ()* tailcfg.DERPMap ,
102+ ) (
103+ * ClientService ,error ,
104+ ) {
96105s := & ClientService {logger :logger ,coordPtr :coordPtr }
97106mux := drpcmux .New ()
98- drpcService := NewDRPCService (logger ,coordPtr )
107+ drpcService := & DRPCService {
108+ CoordPtr :coordPtr ,
109+ Logger :logger ,
110+ DerpMapUpdateFrequency :derpMapUpdateFrequency ,
111+ DerpMapFn :derpMapFn ,
112+ }
99113err := proto .DRPCRegisterClient (mux ,drpcService )
100114if err != nil {
101115return nil ,xerrors .Errorf ("register DRPC service: %w" ,err )
@@ -145,20 +159,37 @@ func (s *ClientService) ServeClient(ctx context.Context, version string, conn ne
145159
146160// DRPCService is the dRPC-based, version 2.x of the tailnet API and implements proto.DRPCClientServer
147161type DRPCService struct {
148- coordPtr * atomic.Pointer [Coordinator ]
149- logger slog.Logger
162+ CoordPtr * atomic.Pointer [Coordinator ]
163+ Logger slog.Logger
164+ DerpMapUpdateFrequency time.Duration
165+ DerpMapFn func ()* tailcfg.DERPMap
150166}
151167
152- func NewDRPCService (logger slog.Logger ,coordPtr * atomic.Pointer [Coordinator ])* DRPCService {
153- return & DRPCService {
154- coordPtr :coordPtr ,
155- logger :logger ,
156- }
157- }
168+ func (s * DRPCService )StreamDERPMaps (_ * proto.StreamDERPMapsRequest ,stream proto.DRPCClient_StreamDERPMapsStream )error {
169+ defer stream .Close ()
170+
171+ ticker := time .NewTicker (s .DerpMapUpdateFrequency )
172+ defer ticker .Stop ()
158173
159- func (* DRPCService )StreamDERPMaps (* proto.StreamDERPMapsRequest , proto.DRPCClient_StreamDERPMapsStream )error {
160- // TODO integrate with Dean's PR implementation
161- return xerrors .New ("unimplemented" )
174+ var lastDERPMap * tailcfg.DERPMap
175+ for {
176+ derpMap := s .DerpMapFn ()
177+ if lastDERPMap == nil || ! CompareDERPMaps (lastDERPMap ,derpMap ) {
178+ protoDERPMap := DERPMapToProto (derpMap )
179+ err := stream .Send (protoDERPMap )
180+ if err != nil {
181+ return xerrors .Errorf ("send derp map: %w" ,err )
182+ }
183+ lastDERPMap = derpMap
184+ }
185+
186+ ticker .Reset (s .DerpMapUpdateFrequency )
187+ select {
188+ case <- stream .Context ().Done ():
189+ return nil
190+ case <- ticker .C :
191+ }
192+ }
162193}
163194
164195func (s * DRPCService )CoordinateTailnet (stream proto.DRPCClient_CoordinateTailnetStream )error {
@@ -168,9 +199,9 @@ func (s *DRPCService) CoordinateTailnet(stream proto.DRPCClient_CoordinateTailne
168199_ = stream .Close ()
169200return xerrors .New ("no Stream ID" )
170201}
171- logger := s .logger .With (slog .F ("peer_id" ,streamID ),slog .F ("name" ,streamID .Name ))
202+ logger := s .Logger .With (slog .F ("peer_id" ,streamID ),slog .F ("name" ,streamID .Name ))
172203logger .Debug (ctx ,"starting tailnet Coordinate" )
173- coord := * (s .coordPtr .Load ())
204+ coord := * (s .CoordPtr .Load ())
174205reqs ,resps := coord .Coordinate (ctx ,streamID .ID ,streamID .Name ,streamID .Auth )
175206c := communicator {
176207logger :logger ,