- Notifications
You must be signed in to change notification settings - Fork36.2k
feat: AI Workflow Builder core (no-changelog)#17423
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to ourterms of service andprivacy statement. We’ll occasionally send you account related emails.
Already on GitHub?Sign in to your account
base:master
Are you sure you want to change the base?
Conversation
…-changelog)- Replace chain-based system with tool-based agent architecture- Add tools for node operations (add, connect, remove, update)- Implement new prompt engineering system- Add node search engine with fuzzy matching- Add tests
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
cubic found 15 issues across 77 files. Review them incubic.dev
React with 👍 or 👎 to teach cubic. Tag@cubic-dev-ai
to give specific feedback.
Uh oh!
There was an error while loading.Please reload this page.
/** | ||
* Count connections that will be removed for a node | ||
*/ | ||
functioncountNodeConnections(nodeId:string,connections:IConnections):number{ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
Outgoing connections to the same node are counted once in the “outgoing” loop and again in the “incoming” loop, so self-loop connections will be double-counted, leading to an incorrectconnectionsRemoved
value that is shown to users.
Prompt for AI agents
Address the following comment on packages/@n8n/ai-workflow-builder.ee/src/tools/remove-node.tool.ts at line 23:<comment>Outgoing connections to the same node are counted once in the “outgoing” loop and again in the “incoming” loop, so self-loop connections will be double-counted, leading to an incorrect `connectionsRemoved` value that is shown to users.</comment><file context>@@ -0,0 +1,155 @@+import { tool } from '@langchain/core/tools';+import type { Logger } from '@n8n/backend-common';+import type { IConnections } from 'n8n-workflow';+import { z } from 'zod';++import { ValidationError, ToolExecutionError } from '../errors';+import { createProgressReporter, reportProgress } from './helpers/progress';+import { createSuccessResponse, createErrorResponse } from './helpers/response';+import { getCurrentWorkflow, getWorkflowState, removeNodeFromWorkflow } from './helpers/state';+import { validateNodeExists, createNodeNotFoundError } from './helpers/validation';+import type { RemoveNodeOutput } from '../types/tools';++/**+ * Schema for the remove node tool+ */+const removeNodeSchema = z.object({+nodeId: z.string().describe('The ID of the node to remove from the workflow'),+});++/**+ * Count connections that will be removed for a node+ */+function countNodeConnections(nodeId: string, connections: IConnections): number {+let count = 0;++// Count outgoing connections+if (connections[nodeId]) {+for (const connectionType of Object.values(connections[nodeId])) {+if (Array.isArray(connectionType)) {+for (const outputs of connectionType) {+if (Array.isArray(outputs)) {+count += outputs.length;+}+}+}+}+}++// Count incoming connections+for (const [_sourceNodeId, nodeConnections] of Object.entries(connections)) {+for (const outputs of Object.values(nodeConnections)) {+if (Array.isArray(outputs)) {+for (const outputConnections of outputs) {+if (Array.isArray(outputConnections)) {+count += outputConnections.filter((conn) => conn.node === nodeId).length;+}+}+}+}+}++return count;+}++/**+ * Build the response message for the removed node+ */+function buildResponseMessage(+nodeName: string,+nodeType: string,+connectionsRemoved: number,+): string {+const parts: string[] = [`Successfully removed node "${nodeName}" (${nodeType})`];++if (connectionsRemoved > 0) {+parts.push(`Removed ${connectionsRemoved} connection${connectionsRemoved > 1 ? 's' : ''}`);+}++return parts.join('\n');+}++/**+ * Factory function to create the remove node tool+ */+export function createRemoveNodeTool(_logger?: Logger) {+return tool(+(input, config) => {+const reporter = createProgressReporter(config, 'remove_node');++try {+// Validate input using Zod schema+const validatedInput = removeNodeSchema.parse(input);+const { nodeId } = validatedInput;++// Report tool start+reporter.start(validatedInput);++// Get current state+const state = getWorkflowState();+const workflow = getCurrentWorkflow(state);++// Report progress+reportProgress(reporter, `Removing node ${nodeId}`);++// Find the node to remove+const nodeToRemove = validateNodeExists(nodeId, workflow.nodes);++if (!nodeToRemove) {+const error = createNodeNotFoundError(nodeId);+reporter.error(error);+return createErrorResponse(config, error);+}++// Count connections that will be removed+const connectionsRemoved = countNodeConnections(nodeId, workflow.connections);++// Build success message+const message = buildResponseMessage(+nodeToRemove.name,+nodeToRemove.type,+connectionsRemoved,+);++// Report completion+const output: RemoveNodeOutput = {+removedNodeId: nodeId,+removedNodeName: nodeToRemove.name,+removedNodeType: nodeToRemove.type,+connectionsRemoved,+message,+};+reporter.complete(output);++// Return success with state updates+const stateUpdates = removeNodeFromWorkflow(nodeId);+return createSuccessResponse(config, message, stateUpdates);+} catch (error) {+// Handle validation or unexpected errors+if (error instanceof z.ZodError) {+const validationError = new ValidationError('Invalid input parameters', {+extra: { errors: error.errors },+});+reporter.error(validationError);+return createErrorResponse(config, validationError);+}++const toolError = new ToolExecutionError(+error instanceof Error ? error.message : 'Unknown error occurred',+{+toolName: 'remove_node',+cause: error instanceof Error ? error : undefined,+},+);+reporter.error(toolError);+return createErrorResponse(config, toolError);+}+},+{+name: 'remove_node',+description:+'Remove a node from the workflow by its ID. This will also remove all connections to and from the node. Use this tool when you need to delete a node that is no longer needed in the workflow.',+schema: removeNodeSchema,+},+);+}</file context>
Uh oh!
There was an error while loading.Please reload this page.
packages/@n8n/ai-workflow-builder.ee/src/chains/parameter-updater.ts OutdatedShow resolvedHide resolved
Uh oh!
There was an error while loading.Please reload this page.
Uh oh!
There was an error while loading.Please reload this page.
packages/@n8n/ai-workflow-builder.ee/src/tools/utils/connection-parameters.utils.ts OutdatedShow resolvedHide resolved
Uh oh!
There was an error while loading.Please reload this page.
packages/@n8n/ai-workflow-builder.ee/src/ai-workflow-builder-agent.service.tsShow resolvedHide resolved
Uh oh!
There was an error while loading.Please reload this page.
*/ | ||
constsearchQuerySchema=z.object({ | ||
queryType:z.enum(['name','subNodeSearch']).describe('Type of search to perform'), | ||
query:z.string().optional().describe('Search term to filter results'), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
The validation schema allows queryType "name" while leaving query undefined, but processQuery later treats a missing query as an error case that silently returns no results. This leads to silent failures that should instead be caught during validation.
Prompt for AI agents
Address the following comment on packages/@n8n/ai-workflow-builder.ee/src/tools/node-search.tool.ts at line 17:<comment>The validation schema allows queryType "name" while leaving query undefined, but processQuery later treats a missing query as an error case that silently returns no results. This leads to silent failures that should instead be caught during validation.</comment><file context>@@ -0,0 +1,213 @@+import { tool } from '@langchain/core/tools';+import { NodeConnectionTypes, type INodeTypeDescription } from 'n8n-workflow';+import { z } from 'zod';++import { ValidationError, ToolExecutionError } from '../errors';+import { NodeSearchEngine } from './engines/node-search-engine';+import { createProgressReporter, createBatchProgressReporter } from './helpers/progress';+import { createSuccessResponse, createErrorResponse } from './helpers/response';+import type { NodeSearchResult } from '../types/nodes';+import type { NodeSearchOutput } from '../types/tools';++/**+ * Search query schema - simplified for better LLM compatibility+ */+const searchQuerySchema = z.object({+queryType: z.enum(['name', 'subNodeSearch']).describe('Type of search to perform'),+query: z.string().optional().describe('Search term to filter results'),+connectionType: z+.nativeEnum(NodeConnectionTypes)+.optional()+.describe('For subNodeSearch: connection type like ai_languageModel, ai_tool, etc.'),+});++/**+ * Main schema for node search tool+ */+const nodeSearchSchema = z.object({+queries: z+.array(searchQuerySchema)+.min(1)+.describe('Array of search queries to find different types of nodes'),+});++/**+ * Inferred types from schemas+ */+type SearchQuery = z.infer<typeof searchQuerySchema>;++const SEARCH_LIMIT = 5;++/**+ * Process a single search query+ */+function processQuery(+query: SearchQuery,+searchEngine: NodeSearchEngine,+): { searchResults: NodeSearchResult[]; searchIdentifier: string } {+if (query.queryType === 'name') {+// Name-based search+const searchTerm = query.query;+if (!searchTerm) {+return {+searchResults: [],+searchIdentifier: '',+};+}+const searchResults = searchEngine.searchByName(searchTerm, SEARCH_LIMIT);+return {+searchResults,+searchIdentifier: searchTerm,+};+} else {+// Sub-node search by connection type+const connectionType = query.connectionType;+if (!connectionType) {+return {+searchResults: [],+searchIdentifier: '',+};+}+const searchResults = searchEngine.searchByConnectionType(+connectionType,+SEARCH_LIMIT,+query.query,+);+const searchIdentifier = query.query+? `sub-nodes with ${connectionType} output matching "${query.query}"`+: `sub-nodes with ${connectionType} output`;+return {+searchResults,+searchIdentifier,+};+}+}++/**+ * Build the response message from search results+ */+function buildResponseMessage(+results: NodeSearchOutput['results'],+nodeTypes: INodeTypeDescription[],+): string {+const searchEngine = new NodeSearchEngine(nodeTypes);+let responseContent = '';++for (const { query, results: searchResults } of results) {+if (responseContent) responseContent += '\n\n';++if (searchResults.length === 0) {+responseContent += `No nodes found matching "${query}"`;+} else {+responseContent += `Found ${searchResults.length} nodes matching "${query}":${searchResults+.map((node) => searchEngine.formatResult(node))+.join('')}`;+}+}++return responseContent;+}++/**+ * Factory function to create the node search tool+ */+export function createNodeSearchTool(nodeTypes: INodeTypeDescription[]) {+return tool(+(input: unknown, config) => {+const reporter = createProgressReporter(config, 'search_nodes');++try {+// Validate input using Zod schema+const validatedInput = nodeSearchSchema.parse(input);+const { queries } = validatedInput;++// Report tool start+reporter.start(validatedInput);++const allResults: NodeSearchOutput['results'] = [];++// Create search engine instance+const searchEngine = new NodeSearchEngine(nodeTypes);++// Create batch reporter for progress tracking+const batchReporter = createBatchProgressReporter(reporter, 'Searching nodes');+batchReporter.init(queries.length);++// Process each query+for (const searchQuery of queries) {+const { searchResults, searchIdentifier } = processQuery(searchQuery, searchEngine);++// Report progress+batchReporter.next(searchIdentifier);++// Add to results+allResults.push({+query: searchIdentifier,+results: searchResults,+});+}++// Complete batch reporting+batchReporter.complete();++// Build response message+const responseMessage = buildResponseMessage(allResults, nodeTypes);++// Report completion+const output: NodeSearchOutput = {+results: allResults,+totalResults: allResults.reduce((sum, r) => sum + r.results.length, 0),+message: responseMessage,+};+reporter.complete(output);++// Return success response+return createSuccessResponse(config, responseMessage);+} catch (error) {+// Handle validation or unexpected errors+if (error instanceof z.ZodError) {+const validationError = new ValidationError('Invalid input parameters', {+extra: { errors: error.errors },+});+reporter.error(validationError);+return createErrorResponse(config, validationError);+}++const toolError = new ToolExecutionError(+error instanceof Error ? error.message : 'Unknown error occurred',+{+toolName: 'search_nodes',+cause: error instanceof Error ? error : undefined,+},+);+reporter.error(toolError);+return createErrorResponse(config, toolError);+}+},+{+name: 'search_nodes',+description: `Search for n8n nodes by name or find sub-nodes that output specific connection types. Use this before adding nodes to find the correct node types.++Search modes:+1. Name search (default): Search nodes by name/description+ Example: { queryType: "name", query: "http" }++2. Sub-node search: Find sub-nodes that output specific AI connection types+ Example: { queryType: "subNodeSearch", connectionType: NodeConnectionTypes.AiTool }+ With optional query filter: { queryType: "subNodeSearch", connectionType: NodeConnectionTypes.AiTool, query: "calculator" }+ This finds sub-nodes (like "Calculator Tool") that can be connected to nodes accepting that connection type++Common AI connection types for sub-node search:+- NodeConnectionTypes.AiLanguageModel (finds LLM provider sub-nodes like "OpenAI Chat Model")+- NodeConnectionTypes.AiTool (finds tool sub-nodes like "Calculator Tool", "Code Tool")+- NodeConnectionTypes.AiMemory (finds memory sub-nodes like "Window Buffer Memory")+- NodeConnectionTypes.AiEmbedding (finds embedding sub-nodes like "Embeddings OpenAI")+- NodeConnectionTypes.AiVectorStore (finds vector store sub-nodes)+- NodeConnectionTypes.AiDocument (finds document loader sub-nodes)+- NodeConnectionTypes.AiTextSplitter (finds text splitter sub-nodes)++You can search for multiple different criteria at once by providing an array of queries.`,+schema: nodeSearchSchema,+},+);+}</file context>
typeofgetCurrentTaskInput | ||
>; | ||
jest.mock('@langchain/langgraph',()=>({ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
jest.mock is called inside a helper function, so the mock is applied only after the function is executed rather than being hoisted to the top of the module. This can lead to the real module being loaded before the mock is in place, causing unpredictable test behaviour.
Prompt for AI agents
Address the following comment on packages/@n8n/ai-workflow-builder.ee/test/test-utils.ts at line 334:<comment>jest.mock is called inside a helper function, so the mock is applied only after the function is executed rather than being hoisted to the top of the module. This can lead to the real module being loaded before the mock is in place, causing unpredictable test behaviour.</comment><file context>@@ -0,0 +1,601 @@+import type { ToolRunnableConfig } from '@langchain/core/tools';+import type { LangGraphRunnableConfig } from '@langchain/langgraph';+import { getCurrentTaskInput } from '@langchain/langgraph';+import type { MockProxy } from 'jest-mock-extended';+import { mock } from 'jest-mock-extended';+import type {+INode,+INodeTypeDescription,+INodeParameters,+IConnection,+NodeConnectionType,+} from 'n8n-workflow';+import { jsonParse } from 'n8n-workflow';++import type { ProgressReporter, ToolProgressMessage } from '../src/types/tools';+import type { SimpleWorkflow } from '../src/types/workflow';++// Mock progress reporter with strong typing+export const mockProgress = (): MockProxy<ProgressReporter> => mock<ProgressReporter>();++// Mock state helpers - using regular jest mocks for simplicity but with proper typing+export const mockStateHelpers = () => ({+getNodes: jest.fn(() => [] as INode[]),+getConnections: jest.fn(() => ({}) as SimpleWorkflow['connections']),+updateNode: jest.fn((_id: string, _updates: Partial<INode>) => undefined),+addNodes: jest.fn((_nodes: INode[]) => undefined),+removeNode: jest.fn((_id: string) => undefined),+addConnections: jest.fn((_connections: IConnection[]) => undefined),+removeConnection: jest.fn((_sourceId: string, _targetId: string, _type?: string) => undefined),+});++// Type for our mock state helpers+export type MockStateHelpers = ReturnType<typeof mockStateHelpers>;++// Simple node creation helper+export const createNode = (overrides: Partial<INode> = {}): INode => ({+id: 'node1',+name: 'TestNode',+type: 'n8n-nodes-base.code',+typeVersion: 1,+position: [0, 0],+...overrides,+// Ensure parameters are properly merged if provided in overrides+parameters: overrides.parameters ?? {},+});++// Simple workflow builder+export const createWorkflow = (nodes: INode[] = []): SimpleWorkflow => {+const workflow: SimpleWorkflow = { nodes, connections: {} };+return workflow;+};++// Create mock node type description+export const createNodeType = (+overrides: Partial<INodeTypeDescription> = {},+): INodeTypeDescription => ({+displayName: overrides.displayName ?? 'Test Node',+name: overrides.name ?? 'test.node',+group: overrides.group ?? ['transform'],+version: overrides.version ?? 1,+description: overrides.description ?? 'Test node description',+defaults: overrides.defaults ?? { name: 'Test Node' },+inputs: overrides.inputs ?? ['main'],+outputs: overrides.outputs ?? ['main'],+properties: overrides.properties ?? [],+...overrides,+});++// Common node types for testing+export const nodeTypes = {+code: createNodeType({+displayName: 'Code',+name: 'n8n-nodes-base.code',+group: ['transform'],+properties: [+{+displayName: 'JavaScript',+name: 'jsCode',+type: 'string',+typeOptions: {+editor: 'codeNodeEditor',+},+default: '',+},+],+}),+httpRequest: createNodeType({+displayName: 'HTTP Request',+name: 'n8n-nodes-base.httpRequest',+group: ['input'],+properties: [+{+displayName: 'URL',+name: 'url',+type: 'string',+default: '',+},+{+displayName: 'Method',+name: 'method',+type: 'options',+options: [+{ name: 'GET', value: 'GET' },+{ name: 'POST', value: 'POST' },+],+default: 'GET',+},+],+}),+webhook: createNodeType({+displayName: 'Webhook',+name: 'n8n-nodes-base.webhook',+group: ['trigger'],+inputs: [],+outputs: ['main'],+webhooks: [+{+name: 'default',+httpMethod: 'POST',+responseMode: 'onReceived',+path: 'webhook',+},+],+properties: [+{+displayName: 'Path',+name: 'path',+type: 'string',+default: 'webhook',+},+],+}),+agent: createNodeType({+displayName: 'AI Agent',+name: '@n8n/n8n-nodes-langchain.agent',+group: ['output'],+inputs: ['ai_agent'],+outputs: ['main'],+properties: [],+}),+openAiModel: createNodeType({+displayName: 'OpenAI Chat Model',+name: '@n8n/n8n-nodes-langchain.lmChatOpenAi',+group: ['output'],+inputs: [],+outputs: ['ai_languageModel'],+properties: [],+}),+setNode: createNodeType({+displayName: 'Set',+name: 'n8n-nodes-base.set',+group: ['transform'],+properties: [+{+displayName: 'Values to Set',+name: 'values',+type: 'collection',+default: {},+},+],+}),+ifNode: createNodeType({+displayName: 'If',+name: 'n8n-nodes-base.if',+group: ['transform'],+inputs: ['main'],+outputs: ['main', 'main'],+outputNames: ['true', 'false'],+properties: [+{+displayName: 'Conditions',+name: 'conditions',+type: 'collection',+default: {},+},+],+}),+mergeNode: createNodeType({+displayName: 'Merge',+name: 'n8n-nodes-base.merge',+group: ['transform'],+inputs: ['main', 'main'],+outputs: ['main'],+inputNames: ['Input 1', 'Input 2'],+properties: [+{+displayName: 'Mode',+name: 'mode',+type: 'options',+options: [+{ name: 'Append', value: 'append' },+{ name: 'Merge By Index', value: 'mergeByIndex' },+{ name: 'Merge By Key', value: 'mergeByKey' },+],+default: 'append',+},+],+}),+vectorStoreNode: createNodeType({+displayName: 'Vector Store',+name: '@n8n/n8n-nodes-langchain.vectorStore',+subtitle: '={{$parameter["mode"] === "retrieve" ? "Retrieve" : "Insert"}}',+group: ['transform'],+inputs: `={{ ((parameter) => {+function getInputs(parameters) {+const mode = parameters?.mode;+const inputs = [];+if (mode === 'retrieve-as-tool') {+inputs.push({+displayName: 'Embedding',+type: 'ai_embedding',+required: true+});+} else {+inputs.push({+displayName: '',+type: 'main'+});+inputs.push({+displayName: 'Embedding',+type: 'ai_embedding',+required: true+});+}+return inputs;+};+return getInputs(parameter)+})($parameter) }}`,+outputs: `={{ ((parameter) => {+function getOutputs(parameters) {+const mode = parameters?.mode;+if (mode === 'retrieve-as-tool') {+return ['ai_tool'];+} else if (mode === 'retrieve') {+return ['ai_document'];+} else {+return ['main'];+}+};+return getOutputs(parameter)+})($parameter) }}`,+properties: [+{+displayName: 'Mode',+name: 'mode',+type: 'options',+options: [+{ name: 'Insert', value: 'insert' },+{ name: 'Retrieve', value: 'retrieve' },+{ name: 'Retrieve (As Tool)', value: 'retrieve-as-tool' },+],+default: 'insert',+},+// Many more properties would be here in reality+],+}),+};++// Helper to create connections+export const createConnection = (+_fromId: string,+toId: string,+type: NodeConnectionType = 'main',+index: number = 0,+) => ({+node: toId,+type,+index,+});++// Generic chain interface+interface Chain<TInput = Record<string, unknown>, TOutput = Record<string, unknown>> {+invoke: (input: TInput) => Promise<TOutput>;+}++// Generic mock chain factory with proper typing+export const mockChain = <+TInput = Record<string, unknown>,+TOutput = Record<string, unknown>,+>(): MockProxy<Chain<TInput, TOutput>> => {+return mock<Chain<TInput, TOutput>>();+};++// Convenience factory for parameter updater chain+export const mockParameterUpdaterChain = () => {+return mockChain<Record<string, unknown>, { parameters: Record<string, unknown> }>();+};++// Helper to assert node parameters+export const expectNodeToHaveParameters = (+node: INode,+expectedParams: Partial<INodeParameters>,+): void => {+expect(node.parameters).toMatchObject(expectedParams);+};++// Helper to assert connections exist+export const expectConnectionToExist = (+connections: SimpleWorkflow['connections'],+fromId: string,+toId: string,+type: string = 'main',+): void => {+expect(connections[fromId]).toBeDefined();+expect(connections[fromId][type]).toBeDefined();+expect(connections[fromId][type]).toContainEqual(+expect.arrayContaining([expect.objectContaining({ node: toId })]),+);+};++// ========== LangGraph Testing Utilities ==========++// Types for mocked Command results+export type MockedCommandResult = { content: string };++// Common parsed content structure for tool results+export interface ParsedToolContent {+update: {+messages: Array<{ kwargs: { content: string } }>;+workflowOperations?: Array<{+type: string;+nodes?: INode[];+[key: string]: unknown;+}>;+};+}++// Setup LangGraph mocks+export const setupLangGraphMocks = () => {+const mockGetCurrentTaskInput = getCurrentTaskInput as jest.MockedFunction<+typeof getCurrentTaskInput+>;++jest.mock('@langchain/langgraph', () => ({+getCurrentTaskInput: jest.fn(),+Command: jest.fn().mockImplementation((params: Record<string, unknown>) => ({+content: JSON.stringify(params),+})),+}));++return { mockGetCurrentTaskInput };+};++// Parse tool result with double-wrapped content handling+export const parseToolResult = <T = ParsedToolContent>(result: unknown): T => {+const parsed = jsonParse<{ content?: string }>((result as MockedCommandResult).content);+return parsed.content ? jsonParse<T>(parsed.content) : (parsed as T);+};++// ========== Progress Message Utilities ==========++// Extract progress messages from mockWriter+export const extractProgressMessages = (+mockWriter: jest.Mock,+): Array<ToolProgressMessage<string>> => {+const progressCalls: Array<ToolProgressMessage<string>> = [];++mockWriter.mock.calls.forEach((call) => {+// eslint-disable-next-line @typescript-eslint/no-unsafe-assignment+const [arg] = call;+progressCalls.push(arg as ToolProgressMessage<string>);+});+return progressCalls;+};++// Find specific progress message by type+export const findProgressMessage = (+messages: Array<ToolProgressMessage<string>>,+status: 'running' | 'completed' | 'error',+updateType?: string,+): ToolProgressMessage<string> | undefined => {+return messages.find(+(msg) => msg.status === status && (!updateType || msg.updates[0]?.type === updateType),+);+};++// ========== Tool Config Helpers ==========++// Create basic tool config+export const createToolConfig = (+toolName: string,+callId: string = 'test-call',+): ToolRunnableConfig => ({+toolCall: { id: callId, name: toolName, args: {} },+});++// Create tool config with writer for progress tracking+export const createToolConfigWithWriter = (+toolName: string,+callId: string = 'test-call',+): ToolRunnableConfig & LangGraphRunnableConfig & { writer: jest.Mock } => {+const mockWriter = jest.fn();+return {+toolCall: { id: callId, name: toolName, args: {} },+writer: mockWriter,+};+};++// ========== Workflow State Helpers ==========++// Setup workflow state with mockGetCurrentTaskInput+export const setupWorkflowState = (+mockGetCurrentTaskInput: jest.MockedFunction<typeof getCurrentTaskInput>,+workflow: SimpleWorkflow = createWorkflow([]),+) => {+mockGetCurrentTaskInput.mockReturnValue({+workflowJSON: workflow,+});+};++// ========== Common Tool Assertions ==========++// Expect tool success message+export const expectToolSuccess = (+content: ParsedToolContent,+expectedMessage: string | RegExp,+): void => {+const message = content.update.messages[0]?.kwargs.content;+expect(message).toBeDefined();+if (typeof expectedMessage === 'string') {+expect(message).toContain(expectedMessage);+} else {+expect(message).toMatch(expectedMessage);+}+};++// Expect tool error message+export const expectToolError = (+content: ParsedToolContent,+expectedError: string | RegExp,+): void => {+const message = content.update.messages[0]?.kwargs.content;+if (typeof expectedError === 'string') {+expect(message).toBe(expectedError);+} else {+expect(message).toMatch(expectedError);+}+};++// Expect workflow operation of specific type+export const expectWorkflowOperation = (+content: ParsedToolContent,+operationType: string,+matcher?: Record<string, unknown>,+): void => {+const operation = content.update.workflowOperations?.[0];+expect(operation).toBeDefined();+expect(operation?.type).toBe(operationType);+if (matcher) {+expect(operation).toMatchObject(matcher);+}+};++// Expect node was added+export const expectNodeAdded = (content: ParsedToolContent, expectedNode: Partial<INode>): void => {+expectWorkflowOperation(content, 'addNodes');+const addedNode = content.update.workflowOperations?.[0]?.nodes?.[0];+expect(addedNode).toBeDefined();+expect(addedNode).toMatchObject(expectedNode);+};++// Expect node was removed+export const expectNodeRemoved = (content: ParsedToolContent, nodeId: string): void => {+expectWorkflowOperation(content, 'removeNode', { nodeIds: [nodeId] });+};++// Expect connections were added+export const expectConnectionsAdded = (+content: ParsedToolContent,+expectedCount?: number,+): void => {+expectWorkflowOperation(content, 'addConnections');+if (expectedCount !== undefined) {+const connections = content.update.workflowOperations?.[0]?.connections;+expect(connections).toHaveLength(expectedCount);+}+};++// Expect node was updated+export const expectNodeUpdated = (+content: ParsedToolContent,+nodeId: string,+expectedUpdates?: Record<string, unknown>,+): void => {+expectWorkflowOperation(content, 'updateNode', {+nodeId,+...(expectedUpdates ? { updates: expect.objectContaining(expectedUpdates) } : {}),+});+};++// ========== Test Data Builders ==========++// Build add node input+export const buildAddNodeInput = (overrides: {+nodeType: string;+name?: string;+connectionParametersReasoning?: string;+connectionParameters?: Record<string, unknown>;+}) => ({+nodeType: overrides.nodeType,+name: overrides.name ?? 'Test Node',+connectionParametersReasoning:+overrides.connectionParametersReasoning ??+'Standard node with static inputs/outputs, no connection parameters needed',+connectionParameters: overrides.connectionParameters ?? {},+});++// Build connect nodes input+export const buildConnectNodesInput = (overrides: {+sourceNodeId: string;+targetNodeId: string;+sourceOutputIndex?: number;+targetInputIndex?: number;+}) => ({+sourceNodeId: overrides.sourceNodeId,+targetNodeId: overrides.targetNodeId,+sourceOutputIndex: overrides.sourceOutputIndex ?? 0,+targetInputIndex: overrides.targetInputIndex ?? 0,+});++// Build node search query+export const buildNodeSearchQuery = (+queryType: 'name' | 'subNodeSearch',+query?: string,+connectionType?: NodeConnectionType,+) => ({+queryType,+...(query && { query }),+...(connectionType && { connectionType }),+});++// Build update node parameters input+export const buildUpdateNodeInput = (nodeId: string, changes: string[]) => ({+nodeId,+changes,+});++// Build node details input+export const buildNodeDetailsInput = (overrides: {+nodeName: string;+withParameters?: boolean;+withConnections?: boolean;+}) => ({+nodeName: overrides.nodeName,+withParameters: overrides.withParameters ?? false,+withConnections: overrides.withConnections ?? true,+});++// Expect node details in response+export const expectNodeDetails = (+content: ParsedToolContent,+expectedDetails: Partial<{+name: string;+displayName: string;+description: string;+subtitle?: string;+}>,+): void => {+const message = content.update.messages[0]?.kwargs.content;+expect(message).toBeDefined();++// Check for expected XML-like tags in formatted output+if (expectedDetails.name) {+expect(message).toContain(`<name>${expectedDetails.name}</name>`);+}+if (expectedDetails.displayName) {+expect(message).toContain(`<display_name>${expectedDetails.displayName}</display_name>`);+}+if (expectedDetails.description) {+expect(message).toContain(`<description>${expectedDetails.description}</description>`);+}+if (expectedDetails.subtitle) {+expect(message).toContain(`<subtitle>${expectedDetails.subtitle}</subtitle>`);+}+};++// Helper to validate XML-like structure in output+export const expectXMLTag = (+content: string,+tagName: string,+expectedValue?: string | RegExp,+): void => {+const tagRegex = new RegExp(`<${tagName}>([\\s\\S]*?)</${tagName}>`);+const match = content.match(tagRegex);+expect(match).toBeDefined();+if (expectedValue) {+if (typeof expectedValue === 'string') {+expect(match?.[1]?.trim()).toBe(expectedValue);+} else {+expect(match?.[1]).toMatch(expectedValue);+}+}+};++// Common reasoning strings+export const REASONING = {+STATIC_NODE: 'Node has static inputs/outputs, no connection parameters needed',+DYNAMIC_AI_NODE: 'AI node has dynamic inputs, setting connection parameters',+TRIGGER_NODE: 'Trigger node, no connection parameters needed',+WEBHOOK_NODE: 'Webhook is a trigger node, no connection parameters needed',+} as const;</file context>
// Start with a copy of the current workflow | ||
letresult:SimpleWorkflow={ | ||
nodes:[...workflow.nodes], | ||
connections:{ ...workflow.connections}, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
Shallow–copying only the top-levelconnections
object means nested arrays/objects are still shared with the original workflow. Subsequent mutations insidemergeConnections
mutate both the new and the original workflow, breaking immutability assumptions and causing hard-to-trace state bugs.
Prompt for AI agents
Address the following comment on packages/@n8n/ai-workflow-builder.ee/src/utils/operations-processor.ts at line 17:<comment>Shallow–copying only the top-level `connections` object means nested arrays/objects are still shared with the original workflow. Subsequent mutations inside `mergeConnections` mutate both the new and the original workflow, breaking immutability assumptions and causing hard-to-trace state bugs.</comment><file context>@@ -0,0 +1,162 @@+import type { INode, IConnections } from 'n8n-workflow';++import type { SimpleWorkflow, WorkflowOperation } from '../types/workflow';+import type { WorkflowState } from '../workflow-state';++/**+ * Apply a list of operations to a workflow+ */+// eslint-disable-next-line complexity+export function applyOperations(+workflow: SimpleWorkflow,+operations: WorkflowOperation[],+): SimpleWorkflow {+// Start with a copy of the current workflow+let result: SimpleWorkflow = {+nodes: [...workflow.nodes],+connections: { ...workflow.connections },+};++// Apply each operation in sequence+for (const operation of operations) {+switch (operation.type) {+case 'clear':+result = { nodes: [], connections: {} };+break;++case 'removeNode': {+const nodesToRemove = new Set(operation.nodeIds);++// Filter out removed nodes+result.nodes = result.nodes.filter((node) => !nodesToRemove.has(node.id));++// Clean up connections+const cleanedConnections: IConnections = {};++// Copy connections, excluding those from/to removed nodes+for (const [sourceId, nodeConnections] of Object.entries(result.connections)) {+if (!nodesToRemove.has(sourceId)) {+cleanedConnections[sourceId] = {};++for (const [connectionType, outputs] of Object.entries(nodeConnections)) {+if (Array.isArray(outputs)) {+cleanedConnections[sourceId][connectionType] = outputs.map((outputConnections) => {+if (Array.isArray(outputConnections)) {+return outputConnections.filter((conn) => !nodesToRemove.has(conn.node));+}+return outputConnections;+});+}+}+}+}++result.connections = cleanedConnections;+break;+}++case 'addNodes': {+// Create a map for quick lookup+const nodeMap = new Map<string, INode>();+result.nodes.forEach((node) => nodeMap.set(node.id, node));++// Add or update nodes+operation.nodes.forEach((node) => {+nodeMap.set(node.id, node);+});++result.nodes = Array.from(nodeMap.values());+break;+}++case 'updateNode': {+result.nodes = result.nodes.map((node) => {+if (node.id === operation.nodeId) {+return { ...node, ...operation.updates };+}+return node;+});+break;+}++case 'setConnections': {+// Replace connections entirely+result.connections = operation.connections;+break;+}++case 'mergeConnections': {+// Merge connections additively+for (const [sourceId, nodeConnections] of Object.entries(operation.connections)) {+if (!result.connections[sourceId]) {+result.connections[sourceId] = nodeConnections;+} else {+// Merge connections for this source node+for (const [connectionType, newOutputs] of Object.entries(nodeConnections)) {+if (!result.connections[sourceId][connectionType]) {+result.connections[sourceId][connectionType] = newOutputs;+} else {+// Merge arrays of connections+const existingOutputs = result.connections[sourceId][connectionType];++if (Array.isArray(newOutputs) && Array.isArray(existingOutputs)) {+// Merge each output index+for (let i = 0; i < Math.max(newOutputs.length, existingOutputs.length); i++) {+if (!newOutputs[i]) continue;++if (!existingOutputs[i]) {+existingOutputs[i] = newOutputs[i];+} else if (Array.isArray(newOutputs[i]) && Array.isArray(existingOutputs[i])) {+// Merge connections at this output index, avoiding duplicates+const existingSet = new Set(+existingOutputs[i]!.map((conn) =>+JSON.stringify({ node: conn.node, type: conn.type, index: conn.index }),+),+);++newOutputs[i]!.forEach((conn) => {+const connStr = JSON.stringify({+node: conn.node,+type: conn.type,+index: conn.index,+});+if (!existingSet.has(connStr)) {+existingOutputs[i]!.push(conn);+}+});+}+}+}+}+}+}+}+break;+}+}+}++return result;+}++/**+ * Process operations node for the LangGraph workflow+ * This node applies accumulated operations to the workflow state+ */+export function processOperations(state: typeof WorkflowState.State) {+const { workflowJSON, workflowOperations } = state;++// If no operations to process, return unchanged+if (!workflowOperations || workflowOperations.length === 0) {+return {};+}++// Apply all operations to get the new workflow+const newWorkflow = applyOperations(workflowJSON, workflowOperations);++// Return updated state with cleared operations+return {+workflowJSON: newWorkflow,+workflowOperations: null, // Clear processed operations+};+}</file context>
connections:{ ...workflow.connections}, | |
connections:structuredClone(workflow.connections), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
cubic found 14 issues across 77 files. Review them incubic.dev
React with 👍 or 👎 to teach cubic. Tag@cubic-dev-ai
to give specific feedback.
Uh oh!
There was an error while loading.Please reload this page.
Uh oh!
There was an error while loading.Please reload this page.
Uh oh!
There was an error while loading.Please reload this page.
packages/@n8n/ai-workflow-builder.ee/src/chains/parameter-updater.ts OutdatedShow resolvedHide resolved
Uh oh!
There was an error while loading.Please reload this page.
Uh oh!
There was an error while loading.Please reload this page.
packages/@n8n/ai-workflow-builder.ee/src/tools/utils/connection-parameters.utils.ts OutdatedShow resolvedHide resolved
Uh oh!
There was an error while loading.Please reload this page.
packages/@n8n/ai-workflow-builder.ee/src/ai-workflow-builder-agent.service.tsShow resolvedHide resolved
Uh oh!
There was an error while loading.Please reload this page.
Uh oh!
There was an error while loading.Please reload this page.
Uh oh!
There was an error while loading.Please reload this page.
// Start with a copy of the current workflow | ||
letresult:SimpleWorkflow={ | ||
nodes:[...workflow.nodes], | ||
connections:{ ...workflow.connections}, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
Shallow–copying only the top-levelconnections
object means nested arrays/objects are still shared with the original workflow. Subsequent mutations insidemergeConnections
mutate both the new and the original workflow, breaking immutability assumptions and causing hard-to-trace state bugs.
Prompt for AI agents
Address the following comment on packages/@n8n/ai-workflow-builder.ee/src/utils/operations-processor.ts at line 17:<comment>Shallow–copying only the top-level `connections` object means nested arrays/objects are still shared with the original workflow. Subsequent mutations inside `mergeConnections` mutate both the new and the original workflow, breaking immutability assumptions and causing hard-to-trace state bugs.</comment><file context>@@ -0,0 +1,162 @@+import type { INode, IConnections } from 'n8n-workflow';++import type { SimpleWorkflow, WorkflowOperation } from '../types/workflow';+import type { WorkflowState } from '../workflow-state';++/**+ * Apply a list of operations to a workflow+ */+// eslint-disable-next-line complexity+export function applyOperations(+workflow: SimpleWorkflow,+operations: WorkflowOperation[],+): SimpleWorkflow {+// Start with a copy of the current workflow+let result: SimpleWorkflow = {+nodes: [...workflow.nodes],+connections: { ...workflow.connections },+};++// Apply each operation in sequence+for (const operation of operations) {+switch (operation.type) {+case 'clear':+result = { nodes: [], connections: {} };+break;++case 'removeNode': {+const nodesToRemove = new Set(operation.nodeIds);++// Filter out removed nodes+result.nodes = result.nodes.filter((node) => !nodesToRemove.has(node.id));++// Clean up connections+const cleanedConnections: IConnections = {};++// Copy connections, excluding those from/to removed nodes+for (const [sourceId, nodeConnections] of Object.entries(result.connections)) {+if (!nodesToRemove.has(sourceId)) {+cleanedConnections[sourceId] = {};++for (const [connectionType, outputs] of Object.entries(nodeConnections)) {+if (Array.isArray(outputs)) {+cleanedConnections[sourceId][connectionType] = outputs.map((outputConnections) => {+if (Array.isArray(outputConnections)) {+return outputConnections.filter((conn) => !nodesToRemove.has(conn.node));+}+return outputConnections;+});+}+}+}+}++result.connections = cleanedConnections;+break;+}++case 'addNodes': {+// Create a map for quick lookup+const nodeMap = new Map<string, INode>();+result.nodes.forEach((node) => nodeMap.set(node.id, node));++// Add or update nodes+operation.nodes.forEach((node) => {+nodeMap.set(node.id, node);+});++result.nodes = Array.from(nodeMap.values());+break;+}++case 'updateNode': {+result.nodes = result.nodes.map((node) => {+if (node.id === operation.nodeId) {+return { ...node, ...operation.updates };+}+return node;+});+break;+}++case 'setConnections': {+// Replace connections entirely+result.connections = operation.connections;+break;+}++case 'mergeConnections': {+// Merge connections additively+for (const [sourceId, nodeConnections] of Object.entries(operation.connections)) {+if (!result.connections[sourceId]) {+result.connections[sourceId] = nodeConnections;+} else {+// Merge connections for this source node+for (const [connectionType, newOutputs] of Object.entries(nodeConnections)) {+if (!result.connections[sourceId][connectionType]) {+result.connections[sourceId][connectionType] = newOutputs;+} else {+// Merge arrays of connections+const existingOutputs = result.connections[sourceId][connectionType];++if (Array.isArray(newOutputs) && Array.isArray(existingOutputs)) {+// Merge each output index+for (let i = 0; i < Math.max(newOutputs.length, existingOutputs.length); i++) {+if (!newOutputs[i]) continue;++if (!existingOutputs[i]) {+existingOutputs[i] = newOutputs[i];+} else if (Array.isArray(newOutputs[i]) && Array.isArray(existingOutputs[i])) {+// Merge connections at this output index, avoiding duplicates+const existingSet = new Set(+existingOutputs[i]!.map((conn) =>+JSON.stringify({ node: conn.node, type: conn.type, index: conn.index }),+),+);++newOutputs[i]!.forEach((conn) => {+const connStr = JSON.stringify({+node: conn.node,+type: conn.type,+index: conn.index,+});+if (!existingSet.has(connStr)) {+existingOutputs[i]!.push(conn);+}+});+}+}+}+}+}+}+}+break;+}+}+}++return result;+}++/**+ * Process operations node for the LangGraph workflow+ * This node applies accumulated operations to the workflow state+ */+export function processOperations(state: typeof WorkflowState.State) {+const { workflowJSON, workflowOperations } = state;++// If no operations to process, return unchanged+if (!workflowOperations || workflowOperations.length === 0) {+return {};+}++// Apply all operations to get the new workflow+const newWorkflow = applyOperations(workflowJSON, workflowOperations);++// Return updated state with cleared operations+return {+workflowJSON: newWorkflow,+workflowOperations: null, // Clear processed operations+};+}</file context>
connections:{ ...workflow.connections}, | |
connections:structuredClone(workflow.connections), |
Uh oh!
There was an error while loading.Please reload this page.
AI Workflow Builder: Core Agent Architecture Refactoring (Part 1/3)
Overview
This PR introduces fundamental architectural changes to the AI Workflow Builder, refactoring from a sequential chain-based approach to a flexible agent-based architecture. This is the first of three PRs that need to be merged to fully implement the new system.
Architecture Overview
The new architecture centers around a LangGraph state machine that orchestrates tool execution and state management:
Key Changes
1. Core Agent Implementation
The refactoring replaces sequential chain execution with a stateful LangGraph agent that:
Implementation:
packages/@n8n/ai-workflow-builder.ee/src/workflow-builder-agent.ts
2. Tool System Implementation
Six specialized tools replace the previous chain components:
node-search
: Fuzzy search across available node typesnode-details
: Retrieve detailed parameter schemas for nodesadd-node
: Add nodes with intelligent positioning and namingconnect-nodes
: Create connections with proper output handlingremove-node
: Safely remove nodes and their connectionsupdate-node-parameters
: Configure node parameters using dedicated LLM chainTools execute in parallel when possible, improving response times for complex operations.
3. State Management and Operations Queue
The system uses an operations queue pattern to ensure atomic state updates:
packages/@n8n/ai-workflow-builder.ee/src/workflow-state.ts
packages/@n8n/ai-workflow-builder.ee/src/utils/operations-processor.ts
Key features:
4. Message Streaming and Real-time Updates
The streaming architecture provides real-time feedback:
packages/@n8n/ai-workflow-builder.ee/src/utils/stream-processor.ts
packages/@n8n/ai-workflow-builder.ee/src/tools/helpers/progress.ts
Stream modes:
updates
: State changes and agent messagescustom
: Tool execution progress5. Session Management
Conversation persistence through LangGraph's MemorySaver:
workflow-{workflowId}-user-{userId}
/clear
(reset session),/compact
(summarize conversation)6. Supporting Chains
Three chains enhance the agent's capabilities:
7. Error Handling
Error handling maintains conversation flow:
Technical Deep Dive
1. LangGraph Agent State Machine
The agent orchestrates all operations through a state machine:
State Flow Details:
packages/@n8n/ai-workflow-builder.ee/src/workflow-builder-agent.ts
2. State Mutations Through Operations Queue
The system ensures atomic state updates through an operations queue pattern:
Key Concepts:
packages/@n8n/ai-workflow-builder.ee/src/workflow-state.ts
packages/@n8n/ai-workflow-builder.ee/src/utils/operations-processor.ts
3. Stream Processing and Real-time Updates
The system provides real-time feedback through streaming:
Implementation Flow:
progressReporter.progress()
packages/@n8n/ai-workflow-builder.ee/src/utils/stream-processor.ts
packages/@n8n/ai-workflow-builder.ee/src/tools/helpers/progress.ts
4. Parameter Update Chain and Prompt Caching
The parameter update chain dynamically assembles prompts with caching for efficiency:
Key Features:
packages/@n8n/ai-workflow-builder.ee/src/chains/parameter-updater.ts
Integration Points
Service Layer Integration
packages/cli/src/services/ai-workflow-builder.service.ts
: Manages AI builder lifecycleAPI Controller
packages/cli/src/controllers/ai.controller.ts
: Streaming endpoint⧉⇋⇋➽⌑⧉§§\n
Frontend State Management
Review / Merge checklist
release/backport
(if the PR is an urgent fix that needs to be backported)