@@ -6,17 +6,18 @@ import {
66} from "axios" ;
77import { Api } from "coder/site/src/api/api" ;
88import {
9+ type ServerSentEvent ,
910type GetInboxNotificationResponse ,
1011type ProvisionerJobLog ,
11- type ServerSentEvent ,
1212type Workspace ,
1313type WorkspaceAgent ,
1414} from "coder/site/src/api/typesGenerated" ;
1515import * as vscode from "vscode" ;
16- import { type ClientOptions } from "ws" ;
16+ import { type ClientOptions , type CloseEvent , type ErrorEvent } from "ws" ;
1717
1818import { CertificateError } from "../error" ;
1919import { getHeaderCommand , getHeaders } from "../headers" ;
20+ import { EventStreamLogger } from "../logging/eventStreamLogger" ;
2021import {
2122createRequestMeta ,
2223logRequest ,
@@ -29,11 +30,12 @@ import {
2930HttpClientLogLevel ,
3031} from "../logging/types" ;
3132import { sizeOf } from "../logging/utils" ;
32- import { WsLogger } from "../logging/wsLogger " ;
33+ import { type UnidirectionalStream } from "../websocket/eventStreamConnection " ;
3334import {
3435OneWayWebSocket ,
3536type OneWayWebSocketInit ,
3637} from "../websocket/oneWayWebSocket" ;
38+ import { SseConnection } from "../websocket/sseConnection" ;
3739
3840import { createHttpAgent } from "./utils" ;
3941
@@ -84,8 +86,9 @@ export class CoderApi extends Api {
8486} ;
8587
8688watchWorkspace = async ( workspace :Workspace , options ?:ClientOptions ) => {
87- return this . createWebSocket < ServerSentEvent > ( {
89+ return this . createWebSocketWithFallback < ServerSentEvent > ( {
8890apiRoute :`/api/v2/workspaces/${ workspace . id } /watch-ws` ,
91+ fallbackApiRoute :`/api/v2/workspaces/${ workspace . id } /watch` ,
8992options,
9093} ) ;
9194} ;
@@ -94,15 +97,17 @@ export class CoderApi extends Api {
9497agentId :WorkspaceAgent [ "id" ] ,
9598options ?:ClientOptions ,
9699) => {
97- return this . createWebSocket < ServerSentEvent > ( {
100+ return this . createWebSocketWithFallback < ServerSentEvent > ( {
98101apiRoute :`/api/v2/workspaceagents/${ agentId } /watch-metadata-ws` ,
102+ fallbackApiRoute :`/api/v2/workspaceagents/${ agentId } /watch-metadata` ,
99103options,
100104} ) ;
101105} ;
102106
103107watchBuildLogsByBuildId = async (
104108buildId :string ,
105109logs :ProvisionerJobLog [ ] ,
110+ options ?:ClientOptions ,
106111) => {
107112const searchParams = new URLSearchParams ( { follow :"true" } ) ;
108113if ( logs . length ) {
@@ -112,6 +117,7 @@ export class CoderApi extends Api {
112117return this . createWebSocket < ProvisionerJobLog > ( {
113118apiRoute :`/api/v2/workspacebuilds/${ buildId } /logs` ,
114119searchParams,
120+ options,
115121} ) ;
116122} ;
117123
@@ -128,7 +134,7 @@ export class CoderApi extends Api {
128134coderSessionTokenHeader
129135] as string | undefined ;
130136
131- const headers = await getHeaders (
137+ const headersFromCommand = await getHeaders (
132138baseUrlRaw ,
133139getHeaderCommand ( vscode . workspace . getConfiguration ( ) ) ,
134140this . output ,
@@ -137,43 +143,154 @@ export class CoderApi extends Api {
137143const httpAgent = await createHttpAgent (
138144vscode . workspace . getConfiguration ( ) ,
139145) ;
146+
147+ /**
148+ * Similar to the REST client, we want to prioritize headers in this order (highest to lowest):
149+ * 1. Headers from the header command
150+ * 2. Any headers passed directly to this function
151+ * 3. Coder session token from the Api client (if set)
152+ */
153+ const headers = {
154+ ...( token ?{ [ coderSessionTokenHeader ] :token } :{ } ) ,
155+ ...configs . options ?. headers ,
156+ ...headersFromCommand ,
157+ } ;
158+
140159const webSocket = new OneWayWebSocket < TData > ( {
141160location :baseUrl ,
142161...configs ,
143162options :{
163+ ...configs . options ,
144164agent :httpAgent ,
145165followRedirects :true ,
146- headers :{
147- ...( token ?{ [ coderSessionTokenHeader ] :token } :{ } ) ,
148- ...configs . options ?. headers ,
149- ...headers ,
150- } ,
151- ...configs . options ,
166+ headers,
152167} ,
153168} ) ;
154169
155- const wsUrl = new URL ( webSocket . url ) ;
156- const pathWithQuery = wsUrl . pathname + wsUrl . search ;
157- const wsLogger = new WsLogger ( this . output , pathWithQuery ) ;
158- wsLogger . logConnecting ( ) ;
170+ this . attachStreamLogger ( webSocket ) ;
171+ return webSocket ;
172+ }
159173
160- webSocket . addEventListener ( "open" , ( ) => {
161- wsLogger . logOpen ( ) ;
162- } ) ;
174+ private attachStreamLogger < TData > (
175+ connection :UnidirectionalStream < TData > ,
176+ ) :void {
177+ const url = new URL ( connection . url ) ;
178+ const logger = new EventStreamLogger (
179+ this . output ,
180+ url . pathname + url . search ,
181+ url . protocol . startsWith ( "http" ) ?"SSE" :"WS" ,
182+ ) ;
183+ logger . logConnecting ( ) ;
163184
164- webSocket . addEventListener ( "message" , ( event ) => {
165- wsLogger . logMessage ( event . sourceEvent . data ) ;
166- } ) ;
185+ connection . addEventListener ( "open" , ( ) => logger . logOpen ( ) ) ;
186+ connection . addEventListener ( "close" , ( event :CloseEvent ) =>
187+ logger . logClose ( event . code , event . reason ) ,
188+ ) ;
189+ connection . addEventListener ( "error" , ( event :ErrorEvent ) =>
190+ logger . logError ( event . error , event . message ) ,
191+ ) ;
192+ connection . addEventListener ( "message" , ( event ) =>
193+ logger . logMessage ( event . sourceEvent . data ) ,
194+ ) ;
195+ }
167196
168- webSocket . addEventListener ( "close" , ( event ) => {
169- wsLogger . logClose ( event . code , event . reason ) ;
197+ /**
198+ * Create a WebSocket connection with SSE fallback on 404.
199+ *
200+ * Note: The fallback on SSE ignores all passed client options except the headers.
201+ */
202+ private async createWebSocketWithFallback < TData = unknown > ( configs :{
203+ apiRoute :string ;
204+ fallbackApiRoute :string ;
205+ searchParams ?:Record < string , string > | URLSearchParams ;
206+ options ?:ClientOptions ;
207+ } ) :Promise < UnidirectionalStream < TData > > {
208+ let webSocket :OneWayWebSocket < TData > ;
209+ try {
210+ webSocket = await this . createWebSocket < TData > ( {
211+ apiRoute :configs . apiRoute ,
212+ searchParams :configs . searchParams ,
213+ options :configs . options ,
214+ } ) ;
215+ } catch {
216+ // Failed to create WebSocket, use SSE fallback
217+ return this . createSseFallback < TData > (
218+ configs . fallbackApiRoute ,
219+ configs . searchParams ,
220+ configs . options ?. headers ,
221+ ) ;
222+ }
223+
224+ return this . waitForConnection ( webSocket , ( ) =>
225+ this . createSseFallback < TData > (
226+ configs . fallbackApiRoute ,
227+ configs . searchParams ,
228+ configs . options ?. headers ,
229+ ) ,
230+ ) ;
231+ }
232+
233+ private waitForConnection < TData > (
234+ connection :UnidirectionalStream < TData > ,
235+ onNotFound ?:( ) => Promise < UnidirectionalStream < TData > > ,
236+ ) :Promise < UnidirectionalStream < TData > > {
237+ return new Promise ( ( resolve , reject ) => {
238+ const cleanup = ( ) => {
239+ connection . removeEventListener ( "open" , handleOpen ) ;
240+ connection . removeEventListener ( "error" , handleError ) ;
241+ } ;
242+
243+ const handleOpen = ( ) => {
244+ cleanup ( ) ;
245+ resolve ( connection ) ;
246+ } ;
247+
248+ const handleError = ( event :ErrorEvent ) => {
249+ cleanup ( ) ;
250+ const is404 =
251+ event . message ?. includes ( "404" ) ||
252+ event . error ?. message ?. includes ( "404" ) ;
253+
254+ if ( is404 && onNotFound ) {
255+ connection . close ( ) ;
256+ onNotFound ( ) . then ( resolve ) . catch ( reject ) ;
257+ } else {
258+ reject ( event . error || new Error ( event . message ) ) ;
259+ }
260+ } ;
261+
262+ connection . addEventListener ( "open" , handleOpen ) ;
263+ connection . addEventListener ( "error" , handleError ) ;
170264} ) ;
265+ }
266+
267+ /**
268+ * Create SSE fallback connection
269+ */
270+ private async createSseFallback < TData = unknown > (
271+ apiRoute :string ,
272+ searchParams ?:Record < string , string > | URLSearchParams ,
273+ optionsHeaders ?:Record < string , string > ,
274+ ) :Promise < UnidirectionalStream < TData > > {
275+ this . output . warn ( `WebSocket failed, using SSE fallback:${ apiRoute } ` ) ;
276+
277+ const baseUrlRaw = this . getAxiosInstance ( ) . defaults . baseURL ;
278+ if ( ! baseUrlRaw ) {
279+ throw new Error ( "No base URL set on REST client" ) ;
280+ }
171281
172- webSocket . addEventListener ( "error" , ( event ) => {
173- wsLogger . logError ( event . error , event . message ) ;
282+ const baseUrl = new URL ( baseUrlRaw ) ;
283+ const sseConnection = new SseConnection ( {
284+ location :baseUrl ,
285+ apiRoute,
286+ searchParams,
287+ axiosInstance :this . getAxiosInstance ( ) ,
288+ optionsHeaders :optionsHeaders ,
289+ logger :this . output ,
174290} ) ;
175291
176- return webSocket ;
292+ this . attachStreamLogger ( sseConnection ) ;
293+ return this . waitForConnection ( sseConnection ) ;
177294}
178295}
179296