- Notifications
You must be signed in to change notification settings - Fork1k
feat(coderd): add experimental tasks send endpoint#19941
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to ourterms of service andprivacy statement. We’ll occasionally send you account related emails.
Already on GitHub?Sign in to your account
Merged
Uh oh!
There was an error while loading.Please reload this page.
Merged
Changes fromall commits
Commits
Show all changes
14 commits Select commitHold shift + click to select a range
812b2b2
feat(coderd): add tasks send endpoint
mafredri2b97e27
fix dbauthz/test
mafredric9b76f0
fix httpapi.write
mafredri2f98497
simplify
mafredrie1d8125
clean up
mafredridd7f459
nolint
mafredrifdb7af3
gen
mafredri04d6c60
fix pr caught missing return
mafredri6a4a085
avoid extra db call for apps, data is already available
mafredric37d9d0
add agentapi status check
mafredrif607b45
integration test
mafredri4c80700
rename
mafredri4103faa
fix match agentapi responses
mafredri942cb12
fix import
mafredriFile filter
Filter by extension
Conversations
Failed to load comments.
Loading
Uh oh!
There was an error while loading.Please reload this page.
Jump to
Jump to file
Failed to load files.
Loading
Uh oh!
There was an error while loading.Please reload this page.
Diff view
Diff view
There are no files selected for viewing
293 changes: 292 additions & 1 deletioncoderd/aitasks.go
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,19 +1,25 @@ | ||
package coderd | ||
import ( | ||
"bytes" | ||
"context" | ||
"database/sql" | ||
"encoding/json" | ||
"errors" | ||
"fmt" | ||
"io" | ||
"net" | ||
"net/http" | ||
"net/url" | ||
"path" | ||
"slices" | ||
"strings" | ||
"time" | ||
"github.com/go-chi/chi/v5" | ||
"github.com/google/uuid" | ||
"cdr.dev/slog" | ||
"github.com/coder/coder/v2/coderd/audit" | ||
"github.com/coder/coder/v2/coderd/database" | ||
"github.com/coder/coder/v2/coderd/httpapi" | ||
@@ -590,3 +596,288 @@ func (api *API) taskDelete(rw http.ResponseWriter, r *http.Request) { | ||
// Delete build created successfully. | ||
rw.WriteHeader(http.StatusAccepted) | ||
} | ||
// taskSend submits task input to the tasks sidebar app by dialing the agent | ||
// directly over the tailnet. We enforce ApplicationConnect RBAC on the | ||
// workspace and validate the sidebar app health. | ||
func (api *API) taskSend(rw http.ResponseWriter, r *http.Request) { | ||
ctx := r.Context() | ||
idStr := chi.URLParam(r, "id") | ||
taskID, err := uuid.Parse(idStr) | ||
if err != nil { | ||
httpapi.Write(ctx, rw, http.StatusBadRequest, codersdk.Response{ | ||
Message: fmt.Sprintf("Invalid UUID %q for task ID.", idStr), | ||
}) | ||
return | ||
} | ||
var req codersdk.TaskSendRequest | ||
if !httpapi.Read(ctx, rw, r, &req) { | ||
return | ||
} | ||
if req.Input == "" { | ||
httpapi.Write(ctx, rw, http.StatusBadRequest, codersdk.Response{ | ||
Message: "Task input is required.", | ||
}) | ||
return | ||
} | ||
if err = api.authAndDoWithTaskSidebarAppClient(r, taskID, func(ctx context.Context, client *http.Client, appURL *url.URL) error { | ||
status, err := agentapiDoStatusRequest(ctx, client, appURL) | ||
if err != nil { | ||
return err | ||
} | ||
if status != "stable" { | ||
return httperror.NewResponseError(http.StatusBadGateway, codersdk.Response{ | ||
Message: "Task app is not ready to accept input.", | ||
Detail: fmt.Sprintf("Status: %s", status), | ||
}) | ||
} | ||
var reqBody struct { | ||
Content string `json:"content"` | ||
Type string `json:"type"` | ||
} | ||
reqBody.Content = req.Input | ||
reqBody.Type = "user" | ||
req, err := agentapiNewRequest(ctx, http.MethodPost, appURL, "message", reqBody) | ||
if err != nil { | ||
return err | ||
} | ||
resp, err := client.Do(req) | ||
if err != nil { | ||
return httperror.NewResponseError(http.StatusBadGateway, codersdk.Response{ | ||
Message: "Failed to reach task app endpoint.", | ||
Detail: err.Error(), | ||
}) | ||
} | ||
defer resp.Body.Close() | ||
if resp.StatusCode != http.StatusOK { | ||
body, _ := io.ReadAll(io.LimitReader(resp.Body, 128)) | ||
return httperror.NewResponseError(http.StatusBadGateway, codersdk.Response{ | ||
Message: "Task app rejected the message.", | ||
Detail: fmt.Sprintf("Upstream status: %d; Body: %s", resp.StatusCode, body), | ||
}) | ||
} | ||
// {"$schema":"http://localhost:3284/schemas/MessageResponseBody.json","ok":true} | ||
// {"$schema":"http://localhost:3284/schemas/ErrorModel.json","title":"Unprocessable Entity","status":422,"detail":"validation failed","errors":[{"location":"body.type","value":"oof"}]} | ||
var respBody map[string]any | ||
if err := json.NewDecoder(resp.Body).Decode(&respBody); err != nil { | ||
return httperror.NewResponseError(http.StatusBadGateway, codersdk.Response{ | ||
Message: "Failed to decode task app response body.", | ||
Detail: err.Error(), | ||
}) | ||
} | ||
if v, ok := respBody["status"].(string); !ok || v != "ok" { | ||
return httperror.NewResponseError(http.StatusBadGateway, codersdk.Response{ | ||
Message: "Task app rejected the message.", | ||
Detail: fmt.Sprintf("Upstream response: %v", respBody), | ||
}) | ||
} | ||
return nil | ||
}); err != nil { | ||
httperror.WriteResponseError(ctx, rw, err) | ||
mafredri marked this conversation as resolved. Show resolvedHide resolvedUh oh!There was an error while loading.Please reload this page. | ||
return | ||
} | ||
rw.WriteHeader(http.StatusNoContent) | ||
} | ||
// authAndDoWithTaskSidebarAppClient centralizes the shared logic to: | ||
// | ||
// - Fetch the task workspace | ||
// - Authorize ApplicationConnect on the workspace | ||
// - Validate the AI task and sidebar app health | ||
// - Dial the agent and construct an HTTP client to the apps loopback URL | ||
// | ||
// The provided callback receives the context, an HTTP client that dials via the | ||
// agent, and the base app URL (as a value URL) to perform any request. | ||
func (api *API) authAndDoWithTaskSidebarAppClient( | ||
r *http.Request, | ||
taskID uuid.UUID, | ||
do func(ctx context.Context, client *http.Client, appURL *url.URL) error, | ||
) error { | ||
ctx := r.Context() | ||
workspaceID := taskID | ||
workspace, err := api.Database.GetWorkspaceByID(ctx, workspaceID) | ||
if err != nil { | ||
if httpapi.Is404Error(err) { | ||
return httperror.ErrResourceNotFound | ||
} | ||
return httperror.NewResponseError(http.StatusInternalServerError, codersdk.Response{ | ||
Message: "Internal error fetching workspace.", | ||
Detail: err.Error(), | ||
}) | ||
} | ||
// Connecting to applications requires ApplicationConnect on the workspace. | ||
if !api.Authorize(r, policy.ActionApplicationConnect, workspace) { | ||
return httperror.ErrResourceNotFound | ||
} | ||
data, err := api.workspaceData(ctx, []database.Workspace{workspace}) | ||
if err != nil { | ||
return httperror.NewResponseError(http.StatusInternalServerError, codersdk.Response{ | ||
Message: "Internal error fetching workspace resources.", | ||
Detail: err.Error(), | ||
}) | ||
} | ||
if len(data.builds) == 0 || len(data.templates) == 0 { | ||
return httperror.ErrResourceNotFound | ||
} | ||
build := data.builds[0] | ||
if build.HasAITask == nil || !*build.HasAITask || build.AITaskSidebarAppID == nil || *build.AITaskSidebarAppID == uuid.Nil { | ||
return httperror.NewResponseError(http.StatusBadRequest, codersdk.Response{ | ||
Message: "Task is not configured with a sidebar app.", | ||
}) | ||
} | ||
// Find the sidebar app details to get the URL and validate app health. | ||
sidebarAppID := *build.AITaskSidebarAppID | ||
agentID, sidebarApp, ok := func() (uuid.UUID, codersdk.WorkspaceApp, bool) { | ||
for _, res := range build.Resources { | ||
for _, agent := range res.Agents { | ||
for _, app := range agent.Apps { | ||
if app.ID == sidebarAppID { | ||
return agent.ID, app, true | ||
} | ||
} | ||
} | ||
} | ||
return uuid.Nil, codersdk.WorkspaceApp{}, false | ||
}() | ||
if !ok { | ||
return httperror.NewResponseError(http.StatusBadRequest, codersdk.Response{ | ||
Message: "Task sidebar app not found in latest build.", | ||
}) | ||
} | ||
// Return an informative error if the app isn't healthy rather than trying | ||
// and failing. | ||
switch sidebarApp.Health { | ||
case codersdk.WorkspaceAppHealthDisabled: | ||
// No health check, pass through. | ||
case codersdk.WorkspaceAppHealthInitializing: | ||
return httperror.NewResponseError(http.StatusServiceUnavailable, codersdk.Response{ | ||
Message: "Task sidebar app is initializing. Try again shortly.", | ||
}) | ||
case codersdk.WorkspaceAppHealthUnhealthy: | ||
return httperror.NewResponseError(http.StatusServiceUnavailable, codersdk.Response{ | ||
Message: "Task sidebar app is unhealthy.", | ||
}) | ||
} | ||
// Build the direct app URL and dial the agent. | ||
if sidebarApp.URL == "" { | ||
return httperror.NewResponseError(http.StatusInternalServerError, codersdk.Response{ | ||
Message: "Task sidebar app URL is not configured.", | ||
}) | ||
} | ||
parsedURL, err := url.Parse(sidebarApp.URL) | ||
if err != nil { | ||
return httperror.NewResponseError(http.StatusInternalServerError, codersdk.Response{ | ||
Message: "Internal error parsing task app URL.", | ||
Detail: err.Error(), | ||
}) | ||
} | ||
if parsedURL.Scheme != "http" { | ||
return httperror.NewResponseError(http.StatusBadRequest, codersdk.Response{ | ||
Message: "Only http scheme is supported for direct agent-dial.", | ||
}) | ||
} | ||
johnstcn marked this conversation as resolved. Show resolvedHide resolvedUh oh!There was an error while loading.Please reload this page. | ||
dialCtx, dialCancel := context.WithTimeout(ctx, time.Second*30) | ||
defer dialCancel() | ||
agentConn, release, err := api.agentProvider.AgentConn(dialCtx, agentID) | ||
if err != nil { | ||
return httperror.NewResponseError(http.StatusBadGateway, codersdk.Response{ | ||
Message: "Failed to reach task app endpoint.", | ||
Detail: err.Error(), | ||
}) | ||
} | ||
defer release() | ||
client := &http.Client{ | ||
Transport: &http.Transport{ | ||
DialContext: func(ctx context.Context, network, addr string) (net.Conn, error) { | ||
return agentConn.DialContext(ctx, network, addr) | ||
}, | ||
}, | ||
} | ||
return do(ctx, client, parsedURL) | ||
} | ||
func agentapiNewRequest(ctx context.Context, method string, appURL *url.URL, appURLPath string, body any) (*http.Request, error) { | ||
u := *appURL | ||
u.Path = path.Join(appURL.Path, appURLPath) | ||
var bodyReader io.Reader | ||
if body != nil { | ||
b, err := json.Marshal(body) | ||
if err != nil { | ||
return nil, httperror.NewResponseError(http.StatusBadRequest, codersdk.Response{ | ||
Message: "Failed to marshal task app request body.", | ||
Detail: err.Error(), | ||
}) | ||
} | ||
bodyReader = bytes.NewReader(b) | ||
} | ||
req, err := http.NewRequestWithContext(ctx, method, u.String(), bodyReader) | ||
if err != nil { | ||
return nil, httperror.NewResponseError(http.StatusBadRequest, codersdk.Response{ | ||
Message: "Failed to create task app request.", | ||
Detail: err.Error(), | ||
}) | ||
} | ||
req.Header.Set("Content-Type", "application/json") | ||
req.Header.Set("Accept", "application/json") | ||
return req, nil | ||
} | ||
func agentapiDoStatusRequest(ctx context.Context, client *http.Client, appURL *url.URL) (string, error) { | ||
req, err := agentapiNewRequest(ctx, http.MethodGet, appURL, "status", nil) | ||
if err != nil { | ||
return "", err | ||
} | ||
resp, err := client.Do(req) | ||
if err != nil { | ||
return "", httperror.NewResponseError(http.StatusBadGateway, codersdk.Response{ | ||
Message: "Failed to reach task app endpoint.", | ||
Detail: err.Error(), | ||
}) | ||
} | ||
defer resp.Body.Close() | ||
if resp.StatusCode != http.StatusOK { | ||
return "", httperror.NewResponseError(http.StatusBadGateway, codersdk.Response{ | ||
Message: "Task app status returned an error.", | ||
Detail: fmt.Sprintf("Status code: %d", resp.StatusCode), | ||
}) | ||
} | ||
// {"$schema":"http://localhost:3284/schemas/StatusResponseBody.json","status":"stable"} | ||
var respBody struct { | ||
Status string `json:"status"` | ||
} | ||
if err := json.NewDecoder(resp.Body).Decode(&respBody); err != nil { | ||
return "", httperror.NewResponseError(http.StatusBadGateway, codersdk.Response{ | ||
Message: "Failed to decode task app status response body.", | ||
Detail: err.Error(), | ||
}) | ||
} | ||
return respBody.Status, nil | ||
} |
Oops, something went wrong.
Uh oh!
There was an error while loading.Please reload this page.
Oops, something went wrong.
Uh oh!
There was an error while loading.Please reload this page.
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.