@@ -7,11 +7,14 @@ import (
7
7
"strconv"
8
8
"strings"
9
9
"sync/atomic"
10
+ "time"
10
11
11
12
"github.com/google/uuid"
12
13
"github.com/hashicorp/yamux"
14
+ "storj.io/drpc"
13
15
"storj.io/drpc/drpcmux"
14
16
"storj.io/drpc/drpcserver"
17
+ "tailscale.com/tailcfg"
15
18
16
19
"cdr.dev/slog"
17
20
"github.com/coder/coder/v2/tailnet/proto"
@@ -92,10 +95,22 @@ type ClientService struct {
92
95
93
96
// NewClientService returns a ClientService based on the given Coordinator pointer. The pointer is
94
97
// loaded on each processed connection.
95
- func NewClientService (logger slog.Logger ,coordPtr * atomic.Pointer [Coordinator ]) (* ClientService ,error ) {
98
+ func NewClientService (
99
+ logger slog.Logger ,
100
+ coordPtr * atomic.Pointer [Coordinator ],
101
+ derpMapUpdateFrequency time.Duration ,
102
+ derpMapFn func ()* tailcfg.DERPMap ,
103
+ ) (
104
+ * ClientService ,error ,
105
+ ) {
96
106
s := & ClientService {logger :logger ,coordPtr :coordPtr }
97
107
mux := drpcmux .New ()
98
- drpcService := NewDRPCService (logger ,coordPtr )
108
+ drpcService := & DRPCService {
109
+ CoordPtr :coordPtr ,
110
+ Logger :logger ,
111
+ DerpMapUpdateFrequency :derpMapUpdateFrequency ,
112
+ DerpMapFn :derpMapFn ,
113
+ }
99
114
err := proto .DRPCRegisterClient (mux ,drpcService )
100
115
if err != nil {
101
116
return nil ,xerrors .Errorf ("register DRPC service: %w" ,err )
@@ -145,20 +160,42 @@ func (s *ClientService) ServeClient(ctx context.Context, version string, conn ne
145
160
146
161
// DRPCService is the dRPC-based, version 2.x of the tailnet API and implements proto.DRPCClientServer
147
162
type DRPCService struct {
148
- coordPtr * atomic.Pointer [Coordinator ]
149
- logger slog.Logger
163
+ CoordPtr * atomic.Pointer [Coordinator ]
164
+ Logger slog.Logger
165
+ DerpMapUpdateFrequency time.Duration
166
+ DerpMapFn func ()* tailcfg.DERPMap
150
167
}
151
168
152
- func NewDRPCService (logger slog.Logger ,coordPtr * atomic.Pointer [Coordinator ])* DRPCService {
153
- return & DRPCService {
154
- coordPtr :coordPtr ,
155
- logger :logger ,
156
- }
169
+ type StreamDERPMapsStream interface {
170
+ drpc.Stream
171
+ Send (* proto.DERPMap )error
157
172
}
158
173
159
- func (* DRPCService )StreamDERPMaps (* proto.StreamDERPMapsRequest , proto.DRPCClient_StreamDERPMapsStream )error {
160
- // TODO integrate with Dean's PR implementation
161
- return xerrors .New ("unimplemented" )
174
+ func (s * DRPCService )StreamDERPMaps (_ * proto.StreamDERPMapsRequest ,stream proto.DRPCClient_StreamDERPMapsStream )error {
175
+ defer stream .Close ()
176
+
177
+ ticker := time .NewTicker (s .DerpMapUpdateFrequency )
178
+ defer ticker .Stop ()
179
+
180
+ var lastDERPMap * tailcfg.DERPMap
181
+ for {
182
+ derpMap := s .DerpMapFn ()
183
+ if lastDERPMap == nil || ! CompareDERPMaps (lastDERPMap ,derpMap ) {
184
+ protoDERPMap := DERPMapToProto (derpMap )
185
+ err := stream .Send (protoDERPMap )
186
+ if err != nil {
187
+ return xerrors .Errorf ("send derp map: %w" ,err )
188
+ }
189
+ lastDERPMap = derpMap
190
+ }
191
+
192
+ ticker .Reset (s .DerpMapUpdateFrequency )
193
+ select {
194
+ case <- stream .Context ().Done ():
195
+ return nil
196
+ case <- ticker .C :
197
+ }
198
+ }
162
199
}
163
200
164
201
func (s * DRPCService )CoordinateTailnet (stream proto.DRPCClient_CoordinateTailnetStream )error {
@@ -168,9 +205,9 @@ func (s *DRPCService) CoordinateTailnet(stream proto.DRPCClient_CoordinateTailne
168
205
_ = stream .Close ()
169
206
return xerrors .New ("no Stream ID" )
170
207
}
171
- logger := s .logger .With (slog .F ("peer_id" ,streamID ),slog .F ("name" ,streamID .Name ))
208
+ logger := s .Logger .With (slog .F ("peer_id" ,streamID ),slog .F ("name" ,streamID .Name ))
172
209
logger .Debug (ctx ,"starting tailnet Coordinate" )
173
- coord := * (s .coordPtr .Load ())
210
+ coord := * (s .CoordPtr .Load ())
174
211
reqs ,resps := coord .Coordinate (ctx ,streamID .ID ,streamID .Name ,streamID .Auth )
175
212
c := communicator {
176
213
logger :logger ,