@@ -7,11 +7,13 @@ import (
7
7
"strconv"
8
8
"strings"
9
9
"sync/atomic"
10
+ "time"
10
11
11
12
"github.com/google/uuid"
12
13
"github.com/hashicorp/yamux"
13
14
"storj.io/drpc/drpcmux"
14
15
"storj.io/drpc/drpcserver"
16
+ "tailscale.com/tailcfg"
15
17
16
18
"cdr.dev/slog"
17
19
"github.com/coder/coder/v2/tailnet/proto"
@@ -92,10 +94,22 @@ type ClientService struct {
92
94
93
95
// NewClientService returns a ClientService based on the given Coordinator pointer. The pointer is
94
96
// loaded on each processed connection.
95
- func NewClientService (logger slog.Logger ,coordPtr * atomic.Pointer [Coordinator ]) (* ClientService ,error ) {
97
+ func NewClientService (
98
+ logger slog.Logger ,
99
+ coordPtr * atomic.Pointer [Coordinator ],
100
+ derpMapUpdateFrequency time.Duration ,
101
+ derpMapFn func ()* tailcfg.DERPMap ,
102
+ ) (
103
+ * ClientService ,error ,
104
+ ) {
96
105
s := & ClientService {logger :logger ,coordPtr :coordPtr }
97
106
mux := drpcmux .New ()
98
- drpcService := NewDRPCService (logger ,coordPtr )
107
+ drpcService := & DRPCService {
108
+ CoordPtr :coordPtr ,
109
+ Logger :logger ,
110
+ DerpMapUpdateFrequency :derpMapUpdateFrequency ,
111
+ DerpMapFn :derpMapFn ,
112
+ }
99
113
err := proto .DRPCRegisterClient (mux ,drpcService )
100
114
if err != nil {
101
115
return nil ,xerrors .Errorf ("register DRPC service: %w" ,err )
@@ -145,20 +159,37 @@ func (s *ClientService) ServeClient(ctx context.Context, version string, conn ne
145
159
146
160
// DRPCService is the dRPC-based, version 2.x of the tailnet API and implements proto.DRPCClientServer
147
161
type DRPCService struct {
148
- coordPtr * atomic.Pointer [Coordinator ]
149
- logger slog.Logger
162
+ CoordPtr * atomic.Pointer [Coordinator ]
163
+ Logger slog.Logger
164
+ DerpMapUpdateFrequency time.Duration
165
+ DerpMapFn func ()* tailcfg.DERPMap
150
166
}
151
167
152
- func NewDRPCService (logger slog.Logger ,coordPtr * atomic.Pointer [Coordinator ])* DRPCService {
153
- return & DRPCService {
154
- coordPtr :coordPtr ,
155
- logger :logger ,
156
- }
157
- }
168
+ func (s * DRPCService )StreamDERPMaps (_ * proto.StreamDERPMapsRequest ,stream proto.DRPCClient_StreamDERPMapsStream )error {
169
+ defer stream .Close ()
170
+
171
+ ticker := time .NewTicker (s .DerpMapUpdateFrequency )
172
+ defer ticker .Stop ()
158
173
159
- func (* DRPCService )StreamDERPMaps (* proto.StreamDERPMapsRequest , proto.DRPCClient_StreamDERPMapsStream )error {
160
- // TODO integrate with Dean's PR implementation
161
- return xerrors .New ("unimplemented" )
174
+ var lastDERPMap * tailcfg.DERPMap
175
+ for {
176
+ derpMap := s .DerpMapFn ()
177
+ if lastDERPMap == nil || ! CompareDERPMaps (lastDERPMap ,derpMap ) {
178
+ protoDERPMap := DERPMapToProto (derpMap )
179
+ err := stream .Send (protoDERPMap )
180
+ if err != nil {
181
+ return xerrors .Errorf ("send derp map: %w" ,err )
182
+ }
183
+ lastDERPMap = derpMap
184
+ }
185
+
186
+ ticker .Reset (s .DerpMapUpdateFrequency )
187
+ select {
188
+ case <- stream .Context ().Done ():
189
+ return nil
190
+ case <- ticker .C :
191
+ }
192
+ }
162
193
}
163
194
164
195
func (s * DRPCService )CoordinateTailnet (stream proto.DRPCClient_CoordinateTailnetStream )error {
@@ -168,9 +199,9 @@ func (s *DRPCService) CoordinateTailnet(stream proto.DRPCClient_CoordinateTailne
168
199
_ = stream .Close ()
169
200
return xerrors .New ("no Stream ID" )
170
201
}
171
- logger := s .logger .With (slog .F ("peer_id" ,streamID ),slog .F ("name" ,streamID .Name ))
202
+ logger := s .Logger .With (slog .F ("peer_id" ,streamID ),slog .F ("name" ,streamID .Name ))
172
203
logger .Debug (ctx ,"starting tailnet Coordinate" )
173
- coord := * (s .coordPtr .Load ())
204
+ coord := * (s .CoordPtr .Load ())
174
205
reqs ,resps := coord .Coordinate (ctx ,streamID .ID ,streamID .Name ,streamID .Auth )
175
206
c := communicator {
176
207
logger :logger ,