From 07f0c01dc477f9825a2f11c7974804f2377b44f3 Mon Sep 17 00:00:00 2001
From: Waleed
Date: Tue, 20 Jan 2026 09:27:45 -0800
Subject: [PATCH 01/14] fix(google): wrap primitive tool responses for Gemini
API compatibility (#2900)
---
apps/sim/providers/gemini/core.ts | 3 +-
apps/sim/providers/google/utils.test.ts | 453 ++++++++++++++++++++++++
apps/sim/providers/google/utils.ts | 19 +-
3 files changed, 473 insertions(+), 2 deletions(-)
create mode 100644 apps/sim/providers/google/utils.test.ts
diff --git a/apps/sim/providers/gemini/core.ts b/apps/sim/providers/gemini/core.ts
index 76b9bb4293..c4ebf5b159 100644
--- a/apps/sim/providers/gemini/core.ts
+++ b/apps/sim/providers/gemini/core.ts
@@ -19,6 +19,7 @@ import {
convertToGeminiFormat,
convertUsageMetadata,
createReadableStreamFromGeminiStream,
+ ensureStructResponse,
extractFunctionCallPart,
extractTextContent,
mapToThinkingLevel,
@@ -104,7 +105,7 @@ async function executeToolCall(
const duration = toolCallEndTime - toolCallStartTime
const resultContent: Record = result.success
- ? (result.output as Record)
+ ? ensureStructResponse(result.output)
: { error: true, message: result.error || 'Tool execution failed', tool: toolName }
const toolCall: FunctionCallResponse = {
diff --git a/apps/sim/providers/google/utils.test.ts b/apps/sim/providers/google/utils.test.ts
new file mode 100644
index 0000000000..31d430e231
--- /dev/null
+++ b/apps/sim/providers/google/utils.test.ts
@@ -0,0 +1,453 @@
+/**
+ * @vitest-environment node
+ */
+import { describe, expect, it } from 'vitest'
+import { convertToGeminiFormat, ensureStructResponse } from '@/providers/google/utils'
+import type { ProviderRequest } from '@/providers/types'
+
+describe('ensureStructResponse', () => {
+ describe('should return objects unchanged', () => {
+ it('should return plain object unchanged', () => {
+ const input = { key: 'value', nested: { a: 1 } }
+ const result = ensureStructResponse(input)
+ expect(result).toBe(input) // Same reference
+ expect(result).toEqual({ key: 'value', nested: { a: 1 } })
+ })
+
+ it('should return empty object unchanged', () => {
+ const input = {}
+ const result = ensureStructResponse(input)
+ expect(result).toBe(input)
+ expect(result).toEqual({})
+ })
+ })
+
+ describe('should wrap primitive values in { value: ... }', () => {
+ it('should wrap boolean true', () => {
+ const result = ensureStructResponse(true)
+ expect(result).toEqual({ value: true })
+ expect(typeof result).toBe('object')
+ })
+
+ it('should wrap boolean false', () => {
+ const result = ensureStructResponse(false)
+ expect(result).toEqual({ value: false })
+ expect(typeof result).toBe('object')
+ })
+
+ it('should wrap string', () => {
+ const result = ensureStructResponse('success')
+ expect(result).toEqual({ value: 'success' })
+ expect(typeof result).toBe('object')
+ })
+
+ it('should wrap empty string', () => {
+ const result = ensureStructResponse('')
+ expect(result).toEqual({ value: '' })
+ expect(typeof result).toBe('object')
+ })
+
+ it('should wrap number', () => {
+ const result = ensureStructResponse(42)
+ expect(result).toEqual({ value: 42 })
+ expect(typeof result).toBe('object')
+ })
+
+ it('should wrap zero', () => {
+ const result = ensureStructResponse(0)
+ expect(result).toEqual({ value: 0 })
+ expect(typeof result).toBe('object')
+ })
+
+ it('should wrap null', () => {
+ const result = ensureStructResponse(null)
+ expect(result).toEqual({ value: null })
+ expect(typeof result).toBe('object')
+ })
+
+ it('should wrap undefined', () => {
+ const result = ensureStructResponse(undefined)
+ expect(result).toEqual({ value: undefined })
+ expect(typeof result).toBe('object')
+ })
+ })
+
+ describe('should wrap arrays in { value: ... }', () => {
+ it('should wrap array of strings', () => {
+ const result = ensureStructResponse(['a', 'b', 'c'])
+ expect(result).toEqual({ value: ['a', 'b', 'c'] })
+ expect(typeof result).toBe('object')
+ expect(Array.isArray(result)).toBe(false)
+ })
+
+ it('should wrap array of objects', () => {
+ const result = ensureStructResponse([{ id: 1 }, { id: 2 }])
+ expect(result).toEqual({ value: [{ id: 1 }, { id: 2 }] })
+ expect(typeof result).toBe('object')
+ expect(Array.isArray(result)).toBe(false)
+ })
+
+ it('should wrap empty array', () => {
+ const result = ensureStructResponse([])
+ expect(result).toEqual({ value: [] })
+ expect(typeof result).toBe('object')
+ expect(Array.isArray(result)).toBe(false)
+ })
+ })
+
+ describe('edge cases', () => {
+ it('should handle nested objects correctly', () => {
+ const input = { a: { b: { c: 1 } }, d: [1, 2, 3] }
+ const result = ensureStructResponse(input)
+ expect(result).toBe(input) // Same reference, unchanged
+ })
+
+ it('should handle object with array property correctly', () => {
+ const input = { items: ['a', 'b'], count: 2 }
+ const result = ensureStructResponse(input)
+ expect(result).toBe(input) // Same reference, unchanged
+ })
+ })
+})
+
+describe('convertToGeminiFormat', () => {
+ describe('tool message handling', () => {
+ it('should convert tool message with object response correctly', () => {
+ const request: ProviderRequest = {
+ model: 'gemini-2.5-flash',
+ messages: [
+ { role: 'user', content: 'Hello' },
+ {
+ role: 'assistant',
+ content: '',
+ tool_calls: [
+ {
+ id: 'call_123',
+ type: 'function',
+ function: { name: 'get_weather', arguments: '{"city": "London"}' },
+ },
+ ],
+ },
+ {
+ role: 'tool',
+ name: 'get_weather',
+ tool_call_id: 'call_123',
+ content: '{"temperature": 20, "condition": "sunny"}',
+ },
+ ],
+ }
+
+ const result = convertToGeminiFormat(request)
+
+ const toolResponseContent = result.contents.find(
+ (c) => c.parts?.[0] && 'functionResponse' in c.parts[0]
+ )
+ expect(toolResponseContent).toBeDefined()
+
+ const functionResponse = (toolResponseContent?.parts?.[0] as { functionResponse?: unknown })
+ ?.functionResponse as { response?: unknown }
+ expect(functionResponse?.response).toEqual({ temperature: 20, condition: 'sunny' })
+ expect(typeof functionResponse?.response).toBe('object')
+ })
+
+ it('should wrap boolean true response in an object for Gemini compatibility', () => {
+ const request: ProviderRequest = {
+ model: 'gemini-2.5-flash',
+ messages: [
+ { role: 'user', content: 'Check if user exists' },
+ {
+ role: 'assistant',
+ content: '',
+ tool_calls: [
+ {
+ id: 'call_456',
+ type: 'function',
+ function: { name: 'user_exists', arguments: '{"userId": "123"}' },
+ },
+ ],
+ },
+ {
+ role: 'tool',
+ name: 'user_exists',
+ tool_call_id: 'call_456',
+ content: 'true', // Boolean true as JSON string
+ },
+ ],
+ }
+
+ const result = convertToGeminiFormat(request)
+
+ const toolResponseContent = result.contents.find(
+ (c) => c.parts?.[0] && 'functionResponse' in c.parts[0]
+ )
+ expect(toolResponseContent).toBeDefined()
+
+ const functionResponse = (toolResponseContent?.parts?.[0] as { functionResponse?: unknown })
+ ?.functionResponse as { response?: unknown }
+
+ expect(typeof functionResponse?.response).toBe('object')
+ expect(functionResponse?.response).not.toBe(true)
+ expect(functionResponse?.response).toEqual({ value: true })
+ })
+
+ it('should wrap boolean false response in an object for Gemini compatibility', () => {
+ const request: ProviderRequest = {
+ model: 'gemini-2.5-flash',
+ messages: [
+ { role: 'user', content: 'Check if user exists' },
+ {
+ role: 'assistant',
+ content: '',
+ tool_calls: [
+ {
+ id: 'call_789',
+ type: 'function',
+ function: { name: 'user_exists', arguments: '{"userId": "999"}' },
+ },
+ ],
+ },
+ {
+ role: 'tool',
+ name: 'user_exists',
+ tool_call_id: 'call_789',
+ content: 'false', // Boolean false as JSON string
+ },
+ ],
+ }
+
+ const result = convertToGeminiFormat(request)
+
+ const toolResponseContent = result.contents.find(
+ (c) => c.parts?.[0] && 'functionResponse' in c.parts[0]
+ )
+ const functionResponse = (toolResponseContent?.parts?.[0] as { functionResponse?: unknown })
+ ?.functionResponse as { response?: unknown }
+
+ expect(typeof functionResponse?.response).toBe('object')
+ expect(functionResponse?.response).toEqual({ value: false })
+ })
+
+ it('should wrap string response in an object for Gemini compatibility', () => {
+ const request: ProviderRequest = {
+ model: 'gemini-2.5-flash',
+ messages: [
+ { role: 'user', content: 'Get status' },
+ {
+ role: 'assistant',
+ content: '',
+ tool_calls: [
+ {
+ id: 'call_str',
+ type: 'function',
+ function: { name: 'get_status', arguments: '{}' },
+ },
+ ],
+ },
+ {
+ role: 'tool',
+ name: 'get_status',
+ tool_call_id: 'call_str',
+ content: '"success"', // String as JSON
+ },
+ ],
+ }
+
+ const result = convertToGeminiFormat(request)
+
+ const toolResponseContent = result.contents.find(
+ (c) => c.parts?.[0] && 'functionResponse' in c.parts[0]
+ )
+ const functionResponse = (toolResponseContent?.parts?.[0] as { functionResponse?: unknown })
+ ?.functionResponse as { response?: unknown }
+
+ expect(typeof functionResponse?.response).toBe('object')
+ expect(functionResponse?.response).toEqual({ value: 'success' })
+ })
+
+ it('should wrap number response in an object for Gemini compatibility', () => {
+ const request: ProviderRequest = {
+ model: 'gemini-2.5-flash',
+ messages: [
+ { role: 'user', content: 'Get count' },
+ {
+ role: 'assistant',
+ content: '',
+ tool_calls: [
+ {
+ id: 'call_num',
+ type: 'function',
+ function: { name: 'get_count', arguments: '{}' },
+ },
+ ],
+ },
+ {
+ role: 'tool',
+ name: 'get_count',
+ tool_call_id: 'call_num',
+ content: '42', // Number as JSON
+ },
+ ],
+ }
+
+ const result = convertToGeminiFormat(request)
+
+ const toolResponseContent = result.contents.find(
+ (c) => c.parts?.[0] && 'functionResponse' in c.parts[0]
+ )
+ const functionResponse = (toolResponseContent?.parts?.[0] as { functionResponse?: unknown })
+ ?.functionResponse as { response?: unknown }
+
+ expect(typeof functionResponse?.response).toBe('object')
+ expect(functionResponse?.response).toEqual({ value: 42 })
+ })
+
+ it('should wrap null response in an object for Gemini compatibility', () => {
+ const request: ProviderRequest = {
+ model: 'gemini-2.5-flash',
+ messages: [
+ { role: 'user', content: 'Get data' },
+ {
+ role: 'assistant',
+ content: '',
+ tool_calls: [
+ {
+ id: 'call_null',
+ type: 'function',
+ function: { name: 'get_data', arguments: '{}' },
+ },
+ ],
+ },
+ {
+ role: 'tool',
+ name: 'get_data',
+ tool_call_id: 'call_null',
+ content: 'null', // null as JSON
+ },
+ ],
+ }
+
+ const result = convertToGeminiFormat(request)
+
+ const toolResponseContent = result.contents.find(
+ (c) => c.parts?.[0] && 'functionResponse' in c.parts[0]
+ )
+ const functionResponse = (toolResponseContent?.parts?.[0] as { functionResponse?: unknown })
+ ?.functionResponse as { response?: unknown }
+
+ expect(typeof functionResponse?.response).toBe('object')
+ expect(functionResponse?.response).toEqual({ value: null })
+ })
+
+ it('should keep array response as-is since arrays are valid Struct values', () => {
+ const request: ProviderRequest = {
+ model: 'gemini-2.5-flash',
+ messages: [
+ { role: 'user', content: 'Get items' },
+ {
+ role: 'assistant',
+ content: '',
+ tool_calls: [
+ {
+ id: 'call_arr',
+ type: 'function',
+ function: { name: 'get_items', arguments: '{}' },
+ },
+ ],
+ },
+ {
+ role: 'tool',
+ name: 'get_items',
+ tool_call_id: 'call_arr',
+ content: '["item1", "item2"]', // Array as JSON
+ },
+ ],
+ }
+
+ const result = convertToGeminiFormat(request)
+
+ const toolResponseContent = result.contents.find(
+ (c) => c.parts?.[0] && 'functionResponse' in c.parts[0]
+ )
+ const functionResponse = (toolResponseContent?.parts?.[0] as { functionResponse?: unknown })
+ ?.functionResponse as { response?: unknown }
+
+ expect(typeof functionResponse?.response).toBe('object')
+ expect(functionResponse?.response).toEqual({ value: ['item1', 'item2'] })
+ })
+
+ it('should handle invalid JSON by wrapping in output object', () => {
+ const request: ProviderRequest = {
+ model: 'gemini-2.5-flash',
+ messages: [
+ { role: 'user', content: 'Get data' },
+ {
+ role: 'assistant',
+ content: '',
+ tool_calls: [
+ {
+ id: 'call_invalid',
+ type: 'function',
+ function: { name: 'get_data', arguments: '{}' },
+ },
+ ],
+ },
+ {
+ role: 'tool',
+ name: 'get_data',
+ tool_call_id: 'call_invalid',
+ content: 'not valid json {',
+ },
+ ],
+ }
+
+ const result = convertToGeminiFormat(request)
+
+ const toolResponseContent = result.contents.find(
+ (c) => c.parts?.[0] && 'functionResponse' in c.parts[0]
+ )
+ const functionResponse = (toolResponseContent?.parts?.[0] as { functionResponse?: unknown })
+ ?.functionResponse as { response?: unknown }
+
+ expect(typeof functionResponse?.response).toBe('object')
+ expect(functionResponse?.response).toEqual({ output: 'not valid json {' })
+ })
+
+ it('should handle empty content by wrapping in output object', () => {
+ const request: ProviderRequest = {
+ model: 'gemini-2.5-flash',
+ messages: [
+ { role: 'user', content: 'Do something' },
+ {
+ role: 'assistant',
+ content: '',
+ tool_calls: [
+ {
+ id: 'call_empty',
+ type: 'function',
+ function: { name: 'do_action', arguments: '{}' },
+ },
+ ],
+ },
+ {
+ role: 'tool',
+ name: 'do_action',
+ tool_call_id: 'call_empty',
+ content: '', // Empty content - falls back to default '{}'
+ },
+ ],
+ }
+
+ const result = convertToGeminiFormat(request)
+
+ const toolResponseContent = result.contents.find(
+ (c) => c.parts?.[0] && 'functionResponse' in c.parts[0]
+ )
+ const functionResponse = (toolResponseContent?.parts?.[0] as { functionResponse?: unknown })
+ ?.functionResponse as { response?: unknown }
+
+ expect(typeof functionResponse?.response).toBe('object')
+ // Empty string is not valid JSON, so it falls back to { output: "" }
+ expect(functionResponse?.response).toEqual({ output: '' })
+ })
+ })
+})
diff --git a/apps/sim/providers/google/utils.ts b/apps/sim/providers/google/utils.ts
index 76d7961acb..7240947849 100644
--- a/apps/sim/providers/google/utils.ts
+++ b/apps/sim/providers/google/utils.ts
@@ -18,6 +18,22 @@ import { trackForcedToolUsage } from '@/providers/utils'
const logger = createLogger('GoogleUtils')
+/**
+ * Ensures a value is a valid object for Gemini's functionResponse.response field.
+ * Gemini's API requires functionResponse.response to be a google.protobuf.Struct,
+ * which must be an object with string keys. Primitive values (boolean, string,
+ * number, null) and arrays are wrapped in { value: ... }.
+ *
+ * @param value - The value to ensure is a Struct-compatible object
+ * @returns A Record suitable for functionResponse.response
+ */
+export function ensureStructResponse(value: unknown): Record {
+ if (typeof value === 'object' && value !== null && !Array.isArray(value)) {
+ return value as Record
+ }
+ return { value }
+}
+
/**
* Usage metadata for Google Gemini responses
*/
@@ -180,7 +196,8 @@ export function convertToGeminiFormat(request: ProviderRequest): {
}
let responseData: Record
try {
- responseData = JSON.parse(message.content ?? '{}')
+ const parsed = JSON.parse(message.content ?? '{}')
+ responseData = ensureStructResponse(parsed)
} catch {
responseData = { output: message.content }
}
From 689037a300e69ecf0531149e902ba89d670d07f7 Mon Sep 17 00:00:00 2001
From: Vikhyath Mondreti
Date: Tue, 20 Jan 2026 09:43:41 -0800
Subject: [PATCH 02/14] fix(canonical): copilot path + update parent (#2901)
---
apps/sim/app/api/workflows/[id]/state/route.ts | 1 +
apps/sim/hooks/use-collaborative-workflow.ts | 11 +++++++++++
apps/sim/socket/database/operations.ts | 10 ++++++----
3 files changed, 18 insertions(+), 4 deletions(-)
diff --git a/apps/sim/app/api/workflows/[id]/state/route.ts b/apps/sim/app/api/workflows/[id]/state/route.ts
index 7c8879430e..0c977a56c0 100644
--- a/apps/sim/app/api/workflows/[id]/state/route.ts
+++ b/apps/sim/app/api/workflows/[id]/state/route.ts
@@ -33,6 +33,7 @@ const BlockDataSchema = z.object({
doWhileCondition: z.string().optional(),
parallelType: z.enum(['collection', 'count']).optional(),
type: z.string().optional(),
+ canonicalModes: z.record(z.enum(['basic', 'advanced'])).optional(),
})
const SubBlockStateSchema = z.object({
diff --git a/apps/sim/hooks/use-collaborative-workflow.ts b/apps/sim/hooks/use-collaborative-workflow.ts
index 28940428f7..32ccf3147f 100644
--- a/apps/sim/hooks/use-collaborative-workflow.ts
+++ b/apps/sim/hooks/use-collaborative-workflow.ts
@@ -897,6 +897,17 @@ export function useCollaborativeWorkflow() {
// Collect all edge IDs to remove
const edgeIdsToRemove = updates.flatMap((u) => u.affectedEdges.map((e) => e.id))
if (edgeIdsToRemove.length > 0) {
+ const edgeOperationId = crypto.randomUUID()
+ addToQueue({
+ id: edgeOperationId,
+ operation: {
+ operation: EDGES_OPERATIONS.BATCH_REMOVE_EDGES,
+ target: OPERATION_TARGETS.EDGES,
+ payload: { ids: edgeIdsToRemove },
+ },
+ workflowId: activeWorkflowId || '',
+ userId: session?.user?.id || 'unknown',
+ })
useWorkflowStore.getState().batchRemoveEdges(edgeIdsToRemove)
}
diff --git a/apps/sim/socket/database/operations.ts b/apps/sim/socket/database/operations.ts
index 5fa69f8d98..991eac1a09 100644
--- a/apps/sim/socket/database/operations.ts
+++ b/apps/sim/socket/database/operations.ts
@@ -337,10 +337,11 @@ async function handleBlockOperationTx(
const currentData = currentBlock?.data || {}
// Update data with parentId and extent
+ const { parentId: _removedParentId, extent: _removedExtent, ...restData } = currentData
const updatedData = isRemovingFromParent
- ? {} // Clear data entirely when removing from parent
+ ? restData
: {
- ...currentData,
+ ...restData,
...(payload.parentId ? { parentId: payload.parentId } : {}),
...(payload.extent ? { extent: payload.extent } : {}),
}
@@ -828,10 +829,11 @@ async function handleBlocksOperationTx(
const currentData = currentBlock?.data || {}
+ const { parentId: _removedParentId, extent: _removedExtent, ...restData } = currentData
const updatedData = isRemovingFromParent
- ? {}
+ ? restData
: {
- ...currentData,
+ ...restData,
...(parentId ? { parentId, extent: 'parent' } : {}),
}
From a26a1a9737477447223078a790f64fdc9cb5d51e Mon Sep 17 00:00:00 2001
From: Waleed
Date: Tue, 20 Jan 2026 10:06:13 -0800
Subject: [PATCH 03/14] fix(rss): add top-level title, link, pubDate fields to
RSS trigger output (#2902)
* fix(rss): add top-level title, link, pubDate fields to RSS trigger output
* fix(imap): add top-level fields to IMAP trigger output
---
apps/sim/executor/utils/start-block.ts | 7 ++----
apps/sim/lib/webhooks/imap-polling-service.ts | 22 +++++++++++++++++++
apps/sim/lib/webhooks/rss-polling-service.ts | 6 +++++
apps/sim/lib/webhooks/utils.server.ts | 14 ++++++++++++
4 files changed, 44 insertions(+), 5 deletions(-)
diff --git a/apps/sim/executor/utils/start-block.ts b/apps/sim/executor/utils/start-block.ts
index 1ed90c3710..23163cc6d3 100644
--- a/apps/sim/executor/utils/start-block.ts
+++ b/apps/sim/executor/utils/start-block.ts
@@ -377,10 +377,7 @@ function buildManualTriggerOutput(
return mergeFilesIntoOutput(output, workflowInput)
}
-function buildIntegrationTriggerOutput(
- _finalInput: unknown,
- workflowInput: unknown
-): NormalizedBlockOutput {
+function buildIntegrationTriggerOutput(workflowInput: unknown): NormalizedBlockOutput {
return isPlainObject(workflowInput) ? (workflowInput as NormalizedBlockOutput) : {}
}
@@ -430,7 +427,7 @@ export function buildStartBlockOutput(options: StartBlockOutputOptions): Normali
return buildManualTriggerOutput(finalInput, workflowInput)
case StartBlockPath.EXTERNAL_TRIGGER:
- return buildIntegrationTriggerOutput(finalInput, workflowInput)
+ return buildIntegrationTriggerOutput(workflowInput)
case StartBlockPath.LEGACY_STARTER:
return buildLegacyStarterOutput(
diff --git a/apps/sim/lib/webhooks/imap-polling-service.ts b/apps/sim/lib/webhooks/imap-polling-service.ts
index 709311ada7..49185f9d96 100644
--- a/apps/sim/lib/webhooks/imap-polling-service.ts
+++ b/apps/sim/lib/webhooks/imap-polling-service.ts
@@ -54,6 +54,17 @@ export interface SimplifiedImapEmail {
}
export interface ImapWebhookPayload {
+ messageId: string
+ subject: string
+ from: string
+ to: string
+ cc: string
+ date: string | null
+ bodyText: string
+ bodyHtml: string
+ mailbox: string
+ hasAttachments: boolean
+ attachments: ImapAttachment[]
email: SimplifiedImapEmail
timestamp: string
}
@@ -613,6 +624,17 @@ async function processEmails(
}
const payload: ImapWebhookPayload = {
+ messageId: simplifiedEmail.messageId,
+ subject: simplifiedEmail.subject,
+ from: simplifiedEmail.from,
+ to: simplifiedEmail.to,
+ cc: simplifiedEmail.cc,
+ date: simplifiedEmail.date,
+ bodyText: simplifiedEmail.bodyText,
+ bodyHtml: simplifiedEmail.bodyHtml,
+ mailbox: simplifiedEmail.mailbox,
+ hasAttachments: simplifiedEmail.hasAttachments,
+ attachments: simplifiedEmail.attachments,
email: simplifiedEmail,
timestamp: new Date().toISOString(),
}
diff --git a/apps/sim/lib/webhooks/rss-polling-service.ts b/apps/sim/lib/webhooks/rss-polling-service.ts
index f74f3ab616..d950486995 100644
--- a/apps/sim/lib/webhooks/rss-polling-service.ts
+++ b/apps/sim/lib/webhooks/rss-polling-service.ts
@@ -48,6 +48,9 @@ interface RssFeed {
}
export interface RssWebhookPayload {
+ title?: string
+ link?: string
+ pubDate?: string
item: RssItem
feed: {
title?: string
@@ -349,6 +352,9 @@ async function processRssItems(
`${webhookData.id}:${itemGuid}`,
async () => {
const payload: RssWebhookPayload = {
+ title: item.title,
+ link: item.link,
+ pubDate: item.pubDate,
item: {
title: item.title,
link: item.link,
diff --git a/apps/sim/lib/webhooks/utils.server.ts b/apps/sim/lib/webhooks/utils.server.ts
index 2cbe3f4281..cbf72ac0cf 100644
--- a/apps/sim/lib/webhooks/utils.server.ts
+++ b/apps/sim/lib/webhooks/utils.server.ts
@@ -686,6 +686,9 @@ export async function formatWebhookInput(
if (foundWebhook.provider === 'rss') {
if (body && typeof body === 'object' && 'item' in body) {
return {
+ title: body.title,
+ link: body.link,
+ pubDate: body.pubDate,
item: body.item,
feed: body.feed,
timestamp: body.timestamp,
@@ -697,6 +700,17 @@ export async function formatWebhookInput(
if (foundWebhook.provider === 'imap') {
if (body && typeof body === 'object' && 'email' in body) {
return {
+ messageId: body.messageId,
+ subject: body.subject,
+ from: body.from,
+ to: body.to,
+ cc: body.cc,
+ date: body.date,
+ bodyText: body.bodyText,
+ bodyHtml: body.bodyHtml,
+ mailbox: body.mailbox,
+ hasAttachments: body.hasAttachments,
+ attachments: body.attachments,
email: body.email,
timestamp: body.timestamp,
}
From 8344d68ca8fe0d6a1949639668f93edf03a5ea75 Mon Sep 17 00:00:00 2001
From: Vikhyath Mondreti
Date: Tue, 20 Jan 2026 11:08:47 -0800
Subject: [PATCH 04/14] improvement(browseruse): add profile id param (#2903)
* improvement(browseruse): add profile id param
* make request a stub since we have directExec
---
apps/sim/blocks/blocks/browser_use.ts | 7 +
apps/sim/tools/browser_use/run_task.ts | 417 +++++++++++++++++--------
apps/sim/tools/browser_use/types.ts | 1 +
3 files changed, 291 insertions(+), 134 deletions(-)
diff --git a/apps/sim/blocks/blocks/browser_use.ts b/apps/sim/blocks/blocks/browser_use.ts
index d54628fb77..b9f364e2b9 100644
--- a/apps/sim/blocks/blocks/browser_use.ts
+++ b/apps/sim/blocks/blocks/browser_use.ts
@@ -57,6 +57,12 @@ export const BrowserUseBlock: BlockConfig = {
type: 'switch',
placeholder: 'Save browser data',
},
+ {
+ id: 'profile_id',
+ title: 'Profile ID',
+ type: 'short-input',
+ placeholder: 'Enter browser profile ID (optional)',
+ },
{
id: 'apiKey',
title: 'API Key',
@@ -75,6 +81,7 @@ export const BrowserUseBlock: BlockConfig = {
variables: { type: 'json', description: 'Task variables' },
model: { type: 'string', description: 'AI model to use' },
save_browser_data: { type: 'boolean', description: 'Save browser data' },
+ profile_id: { type: 'string', description: 'Browser profile ID for persistent sessions' },
},
outputs: {
id: { type: 'string', description: 'Task execution identifier' },
diff --git a/apps/sim/tools/browser_use/run_task.ts b/apps/sim/tools/browser_use/run_task.ts
index dff20bd126..e5a6f53814 100644
--- a/apps/sim/tools/browser_use/run_task.ts
+++ b/apps/sim/tools/browser_use/run_task.ts
@@ -1,11 +1,214 @@
import { createLogger } from '@sim/logger'
import type { BrowserUseRunTaskParams, BrowserUseRunTaskResponse } from '@/tools/browser_use/types'
-import type { ToolConfig } from '@/tools/types'
+import type { ToolConfig, ToolResponse } from '@/tools/types'
const logger = createLogger('BrowserUseTool')
-const POLL_INTERVAL_MS = 5000 // 5 seconds between polls
-const MAX_POLL_TIME_MS = 180000 // 3 minutes maximum polling time
+const POLL_INTERVAL_MS = 5000
+const MAX_POLL_TIME_MS = 180000
+const MAX_CONSECUTIVE_ERRORS = 3
+
+async function createSessionWithProfile(
+ profileId: string,
+ apiKey: string
+): Promise<{ sessionId: string } | { error: string }> {
+ try {
+ const response = await fetch('https://api.browser-use.com/api/v2/sessions', {
+ method: 'POST',
+ headers: {
+ 'Content-Type': 'application/json',
+ 'X-Browser-Use-API-Key': apiKey,
+ },
+ body: JSON.stringify({
+ profileId: profileId.trim(),
+ }),
+ })
+
+ if (!response.ok) {
+ const errorText = await response.text()
+ logger.error(`Failed to create session with profile: ${errorText}`)
+ return { error: `Failed to create session with profile: ${response.statusText}` }
+ }
+
+ const data = (await response.json()) as { id: string }
+ logger.info(`Created session ${data.id} with profile ${profileId}`)
+ return { sessionId: data.id }
+ } catch (error: any) {
+ logger.error('Error creating session with profile:', error)
+ return { error: `Error creating session: ${error.message}` }
+ }
+}
+
+async function stopSession(sessionId: string, apiKey: string): Promise {
+ try {
+ const response = await fetch(`https://api.browser-use.com/api/v2/sessions/${sessionId}`, {
+ method: 'PATCH',
+ headers: {
+ 'Content-Type': 'application/json',
+ 'X-Browser-Use-API-Key': apiKey,
+ },
+ body: JSON.stringify({ action: 'stop' }),
+ })
+
+ if (response.ok) {
+ logger.info(`Stopped session ${sessionId}`)
+ } else {
+ logger.warn(`Failed to stop session ${sessionId}: ${response.statusText}`)
+ }
+ } catch (error: any) {
+ logger.warn(`Error stopping session ${sessionId}:`, error)
+ }
+}
+
+function buildRequestBody(
+ params: BrowserUseRunTaskParams,
+ sessionId?: string
+): Record {
+ const requestBody: Record = {
+ task: params.task,
+ }
+
+ if (sessionId) {
+ requestBody.sessionId = sessionId
+ logger.info(`Using session ${sessionId} for task`)
+ }
+
+ if (params.variables) {
+ let secrets: Record = {}
+
+ if (Array.isArray(params.variables)) {
+ logger.info('Converting variables array to dictionary format')
+ params.variables.forEach((row: any) => {
+ if (row.cells?.Key && row.cells.Value !== undefined) {
+ secrets[row.cells.Key] = row.cells.Value
+ logger.info(`Added secret for key: ${row.cells.Key}`)
+ } else if (row.Key && row.Value !== undefined) {
+ secrets[row.Key] = row.Value
+ logger.info(`Added secret for key: ${row.Key}`)
+ }
+ })
+ } else if (typeof params.variables === 'object' && params.variables !== null) {
+ logger.info('Using variables object directly')
+ secrets = params.variables
+ }
+
+ if (Object.keys(secrets).length > 0) {
+ logger.info(`Found ${Object.keys(secrets).length} secrets to include`)
+ requestBody.secrets = secrets
+ } else {
+ logger.warn('No usable secrets found in variables')
+ }
+ }
+
+ if (params.model) {
+ requestBody.llm_model = params.model
+ }
+
+ if (params.save_browser_data) {
+ requestBody.save_browser_data = params.save_browser_data
+ }
+
+ requestBody.use_adblock = true
+ requestBody.highlight_elements = true
+
+ return requestBody
+}
+
+async function fetchTaskStatus(
+ taskId: string,
+ apiKey: string
+): Promise<{ ok: true; data: any } | { ok: false; error: string }> {
+ try {
+ const response = await fetch(`https://api.browser-use.com/api/v2/tasks/${taskId}`, {
+ method: 'GET',
+ headers: {
+ 'X-Browser-Use-API-Key': apiKey,
+ },
+ })
+
+ if (!response.ok) {
+ return { ok: false, error: `HTTP ${response.status}: ${response.statusText}` }
+ }
+
+ const data = await response.json()
+ return { ok: true, data }
+ } catch (error: any) {
+ return { ok: false, error: error.message || 'Network error' }
+ }
+}
+
+async function pollForCompletion(
+ taskId: string,
+ apiKey: string
+): Promise<{ success: boolean; output: any; steps: any[]; error?: string }> {
+ let liveUrlLogged = false
+ let consecutiveErrors = 0
+ const startTime = Date.now()
+
+ while (Date.now() - startTime < MAX_POLL_TIME_MS) {
+ const result = await fetchTaskStatus(taskId, apiKey)
+
+ if (!result.ok) {
+ consecutiveErrors++
+ logger.warn(
+ `Error polling task ${taskId} (attempt ${consecutiveErrors}/${MAX_CONSECUTIVE_ERRORS}): ${result.error}`
+ )
+
+ if (consecutiveErrors >= MAX_CONSECUTIVE_ERRORS) {
+ logger.error(`Max consecutive errors reached for task ${taskId}`)
+ return {
+ success: false,
+ output: null,
+ steps: [],
+ error: `Failed to poll task status after ${MAX_CONSECUTIVE_ERRORS} attempts: ${result.error}`,
+ }
+ }
+
+ await new Promise((resolve) => setTimeout(resolve, POLL_INTERVAL_MS))
+ continue
+ }
+
+ consecutiveErrors = 0
+ const taskData = result.data
+ const status = taskData.status
+
+ logger.info(`BrowserUse task ${taskId} status: ${status}`)
+
+ if (['finished', 'failed', 'stopped'].includes(status)) {
+ return {
+ success: status === 'finished',
+ output: taskData.output ?? null,
+ steps: taskData.steps || [],
+ }
+ }
+
+ if (!liveUrlLogged && taskData.live_url) {
+ logger.info(`BrowserUse task ${taskId} live URL: ${taskData.live_url}`)
+ liveUrlLogged = true
+ }
+
+ await new Promise((resolve) => setTimeout(resolve, POLL_INTERVAL_MS))
+ }
+
+ const finalResult = await fetchTaskStatus(taskId, apiKey)
+ if (finalResult.ok && ['finished', 'failed', 'stopped'].includes(finalResult.data.status)) {
+ return {
+ success: finalResult.data.status === 'finished',
+ output: finalResult.data.output ?? null,
+ steps: finalResult.data.steps || [],
+ }
+ }
+
+ logger.warn(
+ `Task ${taskId} did not complete within the maximum polling time (${MAX_POLL_TIME_MS / 1000}s)`
+ )
+ return {
+ success: false,
+ output: null,
+ steps: [],
+ error: `Task did not complete within the maximum polling time (${MAX_POLL_TIME_MS / 1000}s)`,
+ }
+}
export const runTaskTool: ToolConfig = {
id: 'browser_use_run_task',
@@ -44,7 +247,14 @@ export const runTaskTool: ToolConfig {
- const requestBody: Record = {
- task: params.task,
- }
-
- if (params.variables) {
- let secrets: Record = {}
-
- if (Array.isArray(params.variables)) {
- logger.info('Converting variables array to dictionary format')
- params.variables.forEach((row) => {
- if (row.cells?.Key && row.cells.Value !== undefined) {
- secrets[row.cells.Key] = row.cells.Value
- logger.info(`Added secret for key: ${row.cells.Key}`)
- } else if (row.Key && row.Value !== undefined) {
- secrets[row.Key] = row.Value
- logger.info(`Added secret for key: ${row.Key}`)
- }
- })
- } else if (typeof params.variables === 'object' && params.variables !== null) {
- logger.info('Using variables object directly')
- secrets = params.variables
- }
-
- if (Object.keys(secrets).length > 0) {
- logger.info(`Found ${Object.keys(secrets).length} secrets to include`)
- requestBody.secrets = secrets
- } else {
- logger.warn('No usable secrets found in variables')
- }
- }
-
- if (params.model) {
- requestBody.llm_model = params.model
- }
-
- if (params.save_browser_data) {
- requestBody.save_browser_data = params.save_browser_data
- }
-
- requestBody.use_adblock = true
- requestBody.highlight_elements = true
-
- return requestBody
- },
- },
-
- transformResponse: async (response: Response) => {
- const data = (await response.json()) as { id: string }
- return {
- success: true,
- output: {
- id: data.id,
- success: true,
- output: null,
- steps: [],
- },
- }
},
- postProcess: async (result, params) => {
- if (!result.success) {
- return result
- }
-
- const taskId = result.output.id
- let liveUrlLogged = false
+ directExecution: async (params: BrowserUseRunTaskParams): Promise => {
+ let sessionId: string | undefined
- try {
- const initialTaskResponse = await fetch(
- `https://api.browser-use.com/api/v2/tasks/${taskId}`,
- {
- method: 'GET',
- headers: {
- 'X-Browser-Use-API-Key': params.apiKey,
+ if (params.profile_id) {
+ logger.info(`Creating session with profile ID: ${params.profile_id}`)
+ const sessionResult = await createSessionWithProfile(params.profile_id, params.apiKey)
+ if ('error' in sessionResult) {
+ return {
+ success: false,
+ output: {
+ id: null,
+ success: false,
+ output: null,
+ steps: [],
},
- }
- )
-
- if (initialTaskResponse.ok) {
- const initialTaskData = await initialTaskResponse.json()
- if (initialTaskData.live_url) {
- logger.info(
- `BrowserUse task ${taskId} launched with live URL: ${initialTaskData.live_url}`
- )
- liveUrlLogged = true
+ error: sessionResult.error,
}
}
- } catch (error) {
- logger.warn(`Failed to get initial task details for ${taskId}:`, error)
+ sessionId = sessionResult.sessionId
}
- let elapsedTime = 0
+ const requestBody = buildRequestBody(params, sessionId)
+ logger.info('Creating BrowserUse task', { hasSession: !!sessionId })
- while (elapsedTime < MAX_POLL_TIME_MS) {
- try {
- const statusResponse = await fetch(`https://api.browser-use.com/api/v2/tasks/${taskId}`, {
- method: 'GET',
- headers: {
- 'X-Browser-Use-API-Key': params.apiKey,
- },
- })
+ try {
+ const response = await fetch('https://api.browser-use.com/api/v2/tasks', {
+ method: 'POST',
+ headers: {
+ 'Content-Type': 'application/json',
+ 'X-Browser-Use-API-Key': params.apiKey,
+ },
+ body: JSON.stringify(requestBody),
+ })
- if (!statusResponse.ok) {
- throw new Error(`Failed to get task status: ${statusResponse.statusText}`)
+ if (!response.ok) {
+ const errorText = await response.text()
+ logger.error(`Failed to create task: ${errorText}`)
+ return {
+ success: false,
+ output: {
+ id: null,
+ success: false,
+ output: null,
+ steps: [],
+ },
+ error: `Failed to create task: ${response.statusText}`,
}
+ }
- const taskData = await statusResponse.json()
- const status = taskData.status
-
- logger.info(`BrowserUse task ${taskId} status: ${status}`)
-
- if (['finished', 'failed', 'stopped'].includes(status)) {
- result.output = {
- id: taskId,
- success: status === 'finished',
- output: taskData.output ?? null,
- steps: taskData.steps || [],
- }
+ const data = (await response.json()) as { id: string }
+ const taskId = data.id
+ logger.info(`Created BrowserUse task: ${taskId}`)
- return result
- }
+ const result = await pollForCompletion(taskId, params.apiKey)
- if (!liveUrlLogged && status === 'running' && taskData.live_url) {
- logger.info(`BrowserUse task ${taskId} running with live URL: ${taskData.live_url}`)
- liveUrlLogged = true
- }
+ if (sessionId) {
+ await stopSession(sessionId, params.apiKey)
+ }
- await new Promise((resolve) => setTimeout(resolve, POLL_INTERVAL_MS))
- elapsedTime += POLL_INTERVAL_MS
- } catch (error: any) {
- logger.error('Error polling for task status:', {
- message: error.message || 'Unknown error',
- taskId,
- })
+ return {
+ success: result.success && !result.error,
+ output: {
+ id: taskId,
+ success: result.success,
+ output: result.output,
+ steps: result.steps,
+ },
+ error: result.error,
+ }
+ } catch (error: any) {
+ logger.error('Error creating BrowserUse task:', error)
- return {
- ...result,
- error: `Error polling for task status: ${error.message || 'Unknown error'}`,
- }
+ if (sessionId) {
+ await stopSession(sessionId, params.apiKey)
}
- }
- logger.warn(
- `Task ${taskId} did not complete within the maximum polling time (${MAX_POLL_TIME_MS / 1000}s)`
- )
- return {
- ...result,
- error: `Task did not complete within the maximum polling time (${MAX_POLL_TIME_MS / 1000}s)`,
+ return {
+ success: false,
+ output: {
+ id: null,
+ success: false,
+ output: null,
+ steps: [],
+ },
+ error: `Error creating task: ${error.message}`,
+ }
}
},
diff --git a/apps/sim/tools/browser_use/types.ts b/apps/sim/tools/browser_use/types.ts
index 293bcbfa7d..f38c4524e4 100644
--- a/apps/sim/tools/browser_use/types.ts
+++ b/apps/sim/tools/browser_use/types.ts
@@ -6,6 +6,7 @@ export interface BrowserUseRunTaskParams {
variables?: Record
model?: string
save_browser_data?: boolean
+ profile_id?: string
}
export interface BrowserUseTaskStep {
From 4afb245fa226af724a9e219febf9bd1ddf2cb9fd Mon Sep 17 00:00:00 2001
From: Waleed
Date: Tue, 20 Jan 2026 15:40:37 -0800
Subject: [PATCH 05/14] improvement(executor): upgraded abort controller to
handle aborts for loops and parallels (#2880)
* improvement(executor): upgraded abort controller to handle aborts for loops and parallels
* comments
---
apps/sim/executor/execution/engine.test.ts | 599 +++++++++++++++++++++
apps/sim/executor/execution/engine.ts | 68 ++-
2 files changed, 656 insertions(+), 11 deletions(-)
create mode 100644 apps/sim/executor/execution/engine.test.ts
diff --git a/apps/sim/executor/execution/engine.test.ts b/apps/sim/executor/execution/engine.test.ts
new file mode 100644
index 0000000000..f93ebc2068
--- /dev/null
+++ b/apps/sim/executor/execution/engine.test.ts
@@ -0,0 +1,599 @@
+/**
+ * @vitest-environment node
+ */
+import { loggerMock } from '@sim/testing'
+import { afterEach, beforeEach, describe, expect, it, type Mock, vi } from 'vitest'
+
+vi.mock('@sim/logger', () => loggerMock)
+
+vi.mock('@/lib/execution/cancellation', () => ({
+ isExecutionCancelled: vi.fn(),
+ isRedisCancellationEnabled: vi.fn(),
+}))
+
+import { isExecutionCancelled, isRedisCancellationEnabled } from '@/lib/execution/cancellation'
+import type { DAG, DAGNode } from '@/executor/dag/builder'
+import type { EdgeManager } from '@/executor/execution/edge-manager'
+import type { NodeExecutionOrchestrator } from '@/executor/orchestrators/node'
+import type { ExecutionContext } from '@/executor/types'
+import type { SerializedBlock } from '@/serializer/types'
+import { ExecutionEngine } from './engine'
+
+function createMockBlock(id: string): SerializedBlock {
+ return {
+ id,
+ metadata: { id: 'test', name: 'Test Block' },
+ position: { x: 0, y: 0 },
+ config: { tool: '', params: {} },
+ inputs: {},
+ outputs: {},
+ enabled: true,
+ }
+}
+
+function createMockNode(id: string, blockType = 'test'): DAGNode {
+ return {
+ id,
+ block: {
+ ...createMockBlock(id),
+ metadata: { id: blockType, name: `Block ${id}` },
+ },
+ outgoingEdges: new Map(),
+ incomingEdges: new Set(),
+ metadata: {},
+ }
+}
+
+function createMockContext(overrides: Partial = {}): ExecutionContext {
+ return {
+ workflowId: 'test-workflow',
+ workspaceId: 'test-workspace',
+ executionId: 'test-execution',
+ userId: 'test-user',
+ blockStates: new Map(),
+ executedBlocks: new Set(),
+ blockLogs: [],
+ loopExecutions: new Map(),
+ parallelExecutions: new Map(),
+ completedLoops: new Set(),
+ activeExecutionPath: new Set(),
+ metadata: {
+ executionId: 'test-execution',
+ startTime: new Date().toISOString(),
+ pendingBlocks: [],
+ },
+ envVars: {},
+ ...overrides,
+ }
+}
+
+function createMockDAG(nodes: DAGNode[]): DAG {
+ const nodeMap = new Map()
+ nodes.forEach((node) => nodeMap.set(node.id, node))
+ return {
+ nodes: nodeMap,
+ loopConfigs: new Map(),
+ parallelConfigs: new Map(),
+ }
+}
+
+interface MockEdgeManager extends EdgeManager {
+ processOutgoingEdges: ReturnType
+}
+
+function createMockEdgeManager(
+ processOutgoingEdgesImpl?: (node: DAGNode) => string[]
+): MockEdgeManager {
+ const mockFn = vi.fn().mockImplementation(processOutgoingEdgesImpl || (() => []))
+ return {
+ processOutgoingEdges: mockFn,
+ isNodeReady: vi.fn().mockReturnValue(true),
+ deactivateEdgeAndDescendants: vi.fn(),
+ restoreIncomingEdge: vi.fn(),
+ clearDeactivatedEdges: vi.fn(),
+ clearDeactivatedEdgesForNodes: vi.fn(),
+ } as unknown as MockEdgeManager
+}
+
+interface MockNodeOrchestrator extends NodeExecutionOrchestrator {
+ executionCount: number
+}
+
+function createMockNodeOrchestrator(executeDelay = 0): MockNodeOrchestrator {
+ const mock = {
+ executionCount: 0,
+ executeNode: vi.fn().mockImplementation(async () => {
+ mock.executionCount++
+ if (executeDelay > 0) {
+ await new Promise((resolve) => setTimeout(resolve, executeDelay))
+ }
+ return { nodeId: 'test', output: {}, isFinalOutput: false }
+ }),
+ handleNodeCompletion: vi.fn(),
+ }
+ return mock as unknown as MockNodeOrchestrator
+}
+
+describe('ExecutionEngine', () => {
+ beforeEach(() => {
+ vi.clearAllMocks()
+ ;(isExecutionCancelled as Mock).mockResolvedValue(false)
+ ;(isRedisCancellationEnabled as Mock).mockReturnValue(false)
+ })
+
+ afterEach(() => {
+ vi.useRealTimers()
+ })
+
+ describe('Normal execution', () => {
+ it('should execute a simple linear workflow', async () => {
+ const startNode = createMockNode('start', 'starter')
+ const endNode = createMockNode('end', 'function')
+ startNode.outgoingEdges.set('edge1', { target: 'end' })
+ endNode.incomingEdges.add('start')
+
+ const dag = createMockDAG([startNode, endNode])
+ const context = createMockContext()
+ const edgeManager = createMockEdgeManager((node) => {
+ if (node.id === 'start') return ['end']
+ return []
+ })
+ const nodeOrchestrator = createMockNodeOrchestrator()
+
+ const engine = new ExecutionEngine(context, dag, edgeManager, nodeOrchestrator)
+ const result = await engine.run('start')
+
+ expect(result.success).toBe(true)
+ expect(nodeOrchestrator.executionCount).toBe(2)
+ })
+
+ it('should mark execution as successful when completed without cancellation', async () => {
+ const startNode = createMockNode('start', 'starter')
+ const dag = createMockDAG([startNode])
+ const context = createMockContext()
+ const edgeManager = createMockEdgeManager()
+ const nodeOrchestrator = createMockNodeOrchestrator()
+
+ const engine = new ExecutionEngine(context, dag, edgeManager, nodeOrchestrator)
+ const result = await engine.run('start')
+
+ expect(result.success).toBe(true)
+ expect(result.status).toBeUndefined()
+ })
+
+ it('should execute all nodes in a multi-node workflow', async () => {
+ const nodes = [
+ createMockNode('start', 'starter'),
+ createMockNode('middle1', 'function'),
+ createMockNode('middle2', 'function'),
+ createMockNode('end', 'function'),
+ ]
+
+ nodes[0].outgoingEdges.set('e1', { target: 'middle1' })
+ nodes[1].outgoingEdges.set('e2', { target: 'middle2' })
+ nodes[2].outgoingEdges.set('e3', { target: 'end' })
+
+ const dag = createMockDAG(nodes)
+ const context = createMockContext()
+ const edgeManager = createMockEdgeManager((node) => {
+ if (node.id === 'start') return ['middle1']
+ if (node.id === 'middle1') return ['middle2']
+ if (node.id === 'middle2') return ['end']
+ return []
+ })
+ const nodeOrchestrator = createMockNodeOrchestrator()
+
+ const engine = new ExecutionEngine(context, dag, edgeManager, nodeOrchestrator)
+ const result = await engine.run('start')
+
+ expect(result.success).toBe(true)
+ expect(nodeOrchestrator.executionCount).toBe(4)
+ })
+ })
+
+ describe('Cancellation via AbortSignal', () => {
+ it('should stop execution immediately when aborted before start', async () => {
+ const abortController = new AbortController()
+ abortController.abort()
+
+ const startNode = createMockNode('start', 'starter')
+ const dag = createMockDAG([startNode])
+ const context = createMockContext({ abortSignal: abortController.signal })
+ const edgeManager = createMockEdgeManager()
+ const nodeOrchestrator = createMockNodeOrchestrator()
+
+ const engine = new ExecutionEngine(context, dag, edgeManager, nodeOrchestrator)
+ const result = await engine.run('start')
+
+ expect(result.status).toBe('cancelled')
+ expect(nodeOrchestrator.executionCount).toBe(0)
+ })
+
+ it('should stop execution when aborted mid-workflow', async () => {
+ const abortController = new AbortController()
+
+ const nodes = Array.from({ length: 5 }, (_, i) => createMockNode(`node${i}`, 'function'))
+ for (let i = 0; i < nodes.length - 1; i++) {
+ nodes[i].outgoingEdges.set(`e${i}`, { target: `node${i + 1}` })
+ }
+
+ const dag = createMockDAG(nodes)
+ const context = createMockContext({ abortSignal: abortController.signal })
+
+ let callCount = 0
+ const edgeManager = createMockEdgeManager((node) => {
+ callCount++
+ if (callCount === 2) abortController.abort()
+ const idx = Number.parseInt(node.id.replace('node', ''))
+ if (idx < 4) return [`node${idx + 1}`]
+ return []
+ })
+ const nodeOrchestrator = createMockNodeOrchestrator()
+
+ const engine = new ExecutionEngine(context, dag, edgeManager, nodeOrchestrator)
+ const result = await engine.run('node0')
+
+ expect(result.success).toBe(false)
+ expect(result.status).toBe('cancelled')
+ expect(nodeOrchestrator.executionCount).toBeLessThan(5)
+ })
+
+ it('should not wait for slow executions when cancelled', async () => {
+ const abortController = new AbortController()
+
+ const startNode = createMockNode('start', 'starter')
+ const slowNode = createMockNode('slow', 'function')
+ startNode.outgoingEdges.set('edge1', { target: 'slow' })
+
+ const dag = createMockDAG([startNode, slowNode])
+ const context = createMockContext({ abortSignal: abortController.signal })
+ const edgeManager = createMockEdgeManager((node) => {
+ if (node.id === 'start') return ['slow']
+ return []
+ })
+ const nodeOrchestrator = createMockNodeOrchestrator(500)
+
+ const engine = new ExecutionEngine(context, dag, edgeManager, nodeOrchestrator)
+
+ const executionPromise = engine.run('start')
+ setTimeout(() => abortController.abort(), 50)
+
+ const startTime = Date.now()
+ const result = await executionPromise
+ const duration = Date.now() - startTime
+
+ expect(result.status).toBe('cancelled')
+ expect(duration).toBeLessThan(400)
+ })
+
+ it('should return cancelled status even if error thrown during cancellation', async () => {
+ const abortController = new AbortController()
+ abortController.abort()
+
+ const startNode = createMockNode('start', 'starter')
+ const dag = createMockDAG([startNode])
+ const context = createMockContext({ abortSignal: abortController.signal })
+ const edgeManager = createMockEdgeManager()
+ const nodeOrchestrator = createMockNodeOrchestrator()
+
+ const engine = new ExecutionEngine(context, dag, edgeManager, nodeOrchestrator)
+ const result = await engine.run('start')
+
+ expect(result.status).toBe('cancelled')
+ expect(result.success).toBe(false)
+ })
+ })
+
+ describe('Cancellation via Redis', () => {
+ it('should check Redis for cancellation when enabled', async () => {
+ ;(isRedisCancellationEnabled as Mock).mockReturnValue(true)
+ ;(isExecutionCancelled as Mock).mockResolvedValue(false)
+
+ const startNode = createMockNode('start', 'starter')
+ const dag = createMockDAG([startNode])
+ const context = createMockContext()
+ const edgeManager = createMockEdgeManager()
+ const nodeOrchestrator = createMockNodeOrchestrator()
+
+ const engine = new ExecutionEngine(context, dag, edgeManager, nodeOrchestrator)
+ await engine.run('start')
+
+ expect(isExecutionCancelled as Mock).toHaveBeenCalled()
+ })
+
+ it('should stop execution when Redis reports cancellation', async () => {
+ ;(isRedisCancellationEnabled as Mock).mockReturnValue(true)
+
+ let checkCount = 0
+ ;(isExecutionCancelled as Mock).mockImplementation(async () => {
+ checkCount++
+ return checkCount > 1
+ })
+
+ const nodes = Array.from({ length: 5 }, (_, i) => createMockNode(`node${i}`, 'function'))
+ for (let i = 0; i < nodes.length - 1; i++) {
+ nodes[i].outgoingEdges.set(`e${i}`, { target: `node${i + 1}` })
+ }
+
+ const dag = createMockDAG(nodes)
+ const context = createMockContext()
+ const edgeManager = createMockEdgeManager((node) => {
+ const idx = Number.parseInt(node.id.replace('node', ''))
+ if (idx < 4) return [`node${idx + 1}`]
+ return []
+ })
+ const nodeOrchestrator = createMockNodeOrchestrator(150)
+
+ const engine = new ExecutionEngine(context, dag, edgeManager, nodeOrchestrator)
+ const result = await engine.run('node0')
+
+ expect(result.success).toBe(false)
+ expect(result.status).toBe('cancelled')
+ })
+
+ it('should respect cancellation check interval', async () => {
+ ;(isRedisCancellationEnabled as Mock).mockReturnValue(true)
+ ;(isExecutionCancelled as Mock).mockResolvedValue(false)
+
+ const startNode = createMockNode('start', 'starter')
+ const dag = createMockDAG([startNode])
+ const context = createMockContext()
+ const edgeManager = createMockEdgeManager()
+ const nodeOrchestrator = createMockNodeOrchestrator()
+
+ const engine = new ExecutionEngine(context, dag, edgeManager, nodeOrchestrator)
+ await engine.run('start')
+
+ expect((isExecutionCancelled as Mock).mock.calls.length).toBeGreaterThanOrEqual(1)
+ })
+ })
+
+ describe('Loop execution with cancellation', () => {
+ it('should break out of loop when cancelled mid-iteration', async () => {
+ const abortController = new AbortController()
+
+ const loopStartNode = createMockNode('loop-start', 'loop_sentinel')
+ loopStartNode.metadata = { isSentinel: true, sentinelType: 'start', loopId: 'loop1' }
+
+ const loopBodyNode = createMockNode('loop-body', 'function')
+ loopBodyNode.metadata = { isLoopNode: true, loopId: 'loop1' }
+
+ const loopEndNode = createMockNode('loop-end', 'loop_sentinel')
+ loopEndNode.metadata = { isSentinel: true, sentinelType: 'end', loopId: 'loop1' }
+
+ loopStartNode.outgoingEdges.set('edge1', { target: 'loop-body' })
+ loopBodyNode.outgoingEdges.set('edge2', { target: 'loop-end' })
+ loopEndNode.outgoingEdges.set('loop_continue', {
+ target: 'loop-start',
+ sourceHandle: 'loop_continue',
+ })
+
+ const dag = createMockDAG([loopStartNode, loopBodyNode, loopEndNode])
+ const context = createMockContext({ abortSignal: abortController.signal })
+
+ let iterationCount = 0
+ const edgeManager = createMockEdgeManager((node) => {
+ if (node.id === 'loop-start') return ['loop-body']
+ if (node.id === 'loop-body') return ['loop-end']
+ if (node.id === 'loop-end') {
+ iterationCount++
+ if (iterationCount === 3) abortController.abort()
+ return ['loop-start']
+ }
+ return []
+ })
+ const nodeOrchestrator = createMockNodeOrchestrator(5)
+
+ const engine = new ExecutionEngine(context, dag, edgeManager, nodeOrchestrator)
+ const result = await engine.run('loop-start')
+
+ expect(result.status).toBe('cancelled')
+ expect(iterationCount).toBeLessThan(100)
+ })
+ })
+
+ describe('Parallel execution with cancellation', () => {
+ it('should stop queueing parallel branches when cancelled', async () => {
+ const abortController = new AbortController()
+
+ const startNode = createMockNode('start', 'starter')
+ const parallelNodes = Array.from({ length: 10 }, (_, i) =>
+ createMockNode(`parallel${i}`, 'function')
+ )
+
+ parallelNodes.forEach((_, i) => {
+ startNode.outgoingEdges.set(`edge${i}`, { target: `parallel${i}` })
+ })
+
+ const dag = createMockDAG([startNode, ...parallelNodes])
+ const context = createMockContext({ abortSignal: abortController.signal })
+ const edgeManager = createMockEdgeManager((node) => {
+ if (node.id === 'start') {
+ return parallelNodes.map((_, i) => `parallel${i}`)
+ }
+ return []
+ })
+ const nodeOrchestrator = createMockNodeOrchestrator(50)
+
+ const engine = new ExecutionEngine(context, dag, edgeManager, nodeOrchestrator)
+
+ const executionPromise = engine.run('start')
+ setTimeout(() => abortController.abort(), 30)
+
+ const result = await executionPromise
+
+ expect(result.status).toBe('cancelled')
+ expect(nodeOrchestrator.executionCount).toBeLessThan(11)
+ })
+
+ it('should not wait for all parallel branches when cancelled', async () => {
+ const abortController = new AbortController()
+
+ const startNode = createMockNode('start', 'starter')
+ const slowNodes = Array.from({ length: 5 }, (_, i) => createMockNode(`slow${i}`, 'function'))
+
+ slowNodes.forEach((_, i) => {
+ startNode.outgoingEdges.set(`edge${i}`, { target: `slow${i}` })
+ })
+
+ const dag = createMockDAG([startNode, ...slowNodes])
+ const context = createMockContext({ abortSignal: abortController.signal })
+ const edgeManager = createMockEdgeManager((node) => {
+ if (node.id === 'start') return slowNodes.map((_, i) => `slow${i}`)
+ return []
+ })
+ const nodeOrchestrator = createMockNodeOrchestrator(200)
+
+ const engine = new ExecutionEngine(context, dag, edgeManager, nodeOrchestrator)
+
+ const executionPromise = engine.run('start')
+ setTimeout(() => abortController.abort(), 50)
+
+ const startTime = Date.now()
+ const result = await executionPromise
+ const duration = Date.now() - startTime
+
+ expect(result.status).toBe('cancelled')
+ expect(duration).toBeLessThan(500)
+ })
+ })
+
+ describe('Edge cases', () => {
+ it('should handle empty DAG gracefully', async () => {
+ const dag = createMockDAG([])
+ const context = createMockContext()
+ const edgeManager = createMockEdgeManager()
+ const nodeOrchestrator = createMockNodeOrchestrator()
+
+ const engine = new ExecutionEngine(context, dag, edgeManager, nodeOrchestrator)
+ const result = await engine.run()
+
+ expect(result.success).toBe(true)
+ expect(nodeOrchestrator.executionCount).toBe(0)
+ })
+
+ it('should preserve partial output when cancelled', async () => {
+ const abortController = new AbortController()
+
+ const startNode = createMockNode('start', 'starter')
+ const endNode = createMockNode('end', 'function')
+ endNode.outgoingEdges = new Map()
+
+ startNode.outgoingEdges.set('edge1', { target: 'end' })
+
+ const dag = createMockDAG([startNode, endNode])
+ const context = createMockContext({ abortSignal: abortController.signal })
+ const edgeManager = createMockEdgeManager((node) => {
+ if (node.id === 'start') return ['end']
+ return []
+ })
+
+ const nodeOrchestrator = {
+ executionCount: 0,
+ executeNode: vi.fn().mockImplementation(async (_ctx: ExecutionContext, nodeId: string) => {
+ if (nodeId === 'start') {
+ return { nodeId: 'start', output: { startData: 'value' }, isFinalOutput: false }
+ }
+ abortController.abort()
+ return { nodeId: 'end', output: { endData: 'value' }, isFinalOutput: true }
+ }),
+ handleNodeCompletion: vi.fn(),
+ } as unknown as MockNodeOrchestrator
+
+ const engine = new ExecutionEngine(context, dag, edgeManager, nodeOrchestrator)
+ const result = await engine.run('start')
+
+ expect(result.status).toBe('cancelled')
+ expect(result.output).toBeDefined()
+ })
+
+ it('should populate metadata on cancellation', async () => {
+ const abortController = new AbortController()
+ abortController.abort()
+
+ const startNode = createMockNode('start', 'starter')
+ const dag = createMockDAG([startNode])
+ const context = createMockContext({ abortSignal: abortController.signal })
+ const edgeManager = createMockEdgeManager()
+ const nodeOrchestrator = createMockNodeOrchestrator()
+
+ const engine = new ExecutionEngine(context, dag, edgeManager, nodeOrchestrator)
+ const result = await engine.run('start')
+
+ expect(result.metadata).toBeDefined()
+ expect(result.metadata.endTime).toBeDefined()
+ expect(result.metadata.duration).toBeDefined()
+ })
+
+ it('should return logs even when cancelled', async () => {
+ const abortController = new AbortController()
+
+ const startNode = createMockNode('start', 'starter')
+ const dag = createMockDAG([startNode])
+ const context = createMockContext({ abortSignal: abortController.signal })
+ context.blockLogs.push({
+ blockId: 'test',
+ blockName: 'Test',
+ blockType: 'test',
+ startedAt: '',
+ endedAt: '',
+ durationMs: 0,
+ success: true,
+ })
+
+ const edgeManager = createMockEdgeManager()
+ const nodeOrchestrator = createMockNodeOrchestrator()
+
+ abortController.abort()
+
+ const engine = new ExecutionEngine(context, dag, edgeManager, nodeOrchestrator)
+ const result = await engine.run('start')
+
+ expect(result.logs).toBeDefined()
+ expect(result.logs.length).toBeGreaterThan(0)
+ })
+ })
+
+ describe('Cancellation flag behavior', () => {
+ it('should set cancelledFlag when abort signal fires', async () => {
+ const abortController = new AbortController()
+
+ const nodes = Array.from({ length: 3 }, (_, i) => createMockNode(`node${i}`, 'function'))
+ for (let i = 0; i < nodes.length - 1; i++) {
+ nodes[i].outgoingEdges.set(`e${i}`, { target: `node${i + 1}` })
+ }
+
+ const dag = createMockDAG(nodes)
+ const context = createMockContext({ abortSignal: abortController.signal })
+ const edgeManager = createMockEdgeManager((node) => {
+ if (node.id === 'node0') {
+ abortController.abort()
+ return ['node1']
+ }
+ return node.id === 'node1' ? ['node2'] : []
+ })
+ const nodeOrchestrator = createMockNodeOrchestrator()
+
+ const engine = new ExecutionEngine(context, dag, edgeManager, nodeOrchestrator)
+ const result = await engine.run('node0')
+
+ expect(result.status).toBe('cancelled')
+ })
+
+ it('should cache Redis cancellation result', async () => {
+ ;(isRedisCancellationEnabled as Mock).mockReturnValue(true)
+ ;(isExecutionCancelled as Mock).mockResolvedValue(true)
+
+ const nodes = Array.from({ length: 5 }, (_, i) => createMockNode(`node${i}`, 'function'))
+ const dag = createMockDAG(nodes)
+ const context = createMockContext()
+ const edgeManager = createMockEdgeManager()
+ const nodeOrchestrator = createMockNodeOrchestrator()
+
+ const engine = new ExecutionEngine(context, dag, edgeManager, nodeOrchestrator)
+ await engine.run('node0')
+
+ expect((isExecutionCancelled as Mock).mock.calls.length).toBeLessThanOrEqual(3)
+ })
+ })
+})
diff --git a/apps/sim/executor/execution/engine.ts b/apps/sim/executor/execution/engine.ts
index 3ddea0ddcc..7c2317b047 100644
--- a/apps/sim/executor/execution/engine.ts
+++ b/apps/sim/executor/execution/engine.ts
@@ -28,6 +28,8 @@ export class ExecutionEngine {
private lastCancellationCheck = 0
private readonly useRedisCancellation: boolean
private readonly CANCELLATION_CHECK_INTERVAL_MS = 500
+ private abortPromise: Promise | null = null
+ private abortResolve: (() => void) | null = null
constructor(
private context: ExecutionContext,
@@ -37,6 +39,34 @@ export class ExecutionEngine {
) {
this.allowResumeTriggers = this.context.metadata.resumeFromSnapshot === true
this.useRedisCancellation = isRedisCancellationEnabled() && !!this.context.executionId
+ this.initializeAbortHandler()
+ }
+
+ /**
+ * Sets up a single abort promise that can be reused throughout execution.
+ * This avoids creating multiple event listeners and potential memory leaks.
+ */
+ private initializeAbortHandler(): void {
+ if (!this.context.abortSignal) return
+
+ if (this.context.abortSignal.aborted) {
+ this.cancelledFlag = true
+ this.abortPromise = Promise.resolve()
+ return
+ }
+
+ this.abortPromise = new Promise((resolve) => {
+ this.abortResolve = resolve
+ })
+
+ this.context.abortSignal.addEventListener(
+ 'abort',
+ () => {
+ this.cancelledFlag = true
+ this.abortResolve?.()
+ },
+ { once: true }
+ )
}
private async checkCancellation(): Promise {
@@ -73,12 +103,15 @@ export class ExecutionEngine {
this.initializeQueue(triggerBlockId)
while (this.hasWork()) {
- if ((await this.checkCancellation()) && this.executing.size === 0) {
+ if (await this.checkCancellation()) {
break
}
await this.processQueue()
}
- await this.waitForAllExecutions()
+
+ if (!this.cancelledFlag) {
+ await this.waitForAllExecutions()
+ }
if (this.pausedBlocks.size > 0) {
return this.buildPausedResult(startTime)
@@ -164,11 +197,7 @@ export class ExecutionEngine {
private trackExecution(promise: Promise): void {
this.executing.add(promise)
- // Attach error handler to prevent unhandled rejection warnings
- // The actual error handling happens in waitForAllExecutions/waitForAnyExecution
- promise.catch(() => {
- // Error will be properly handled by Promise.all/Promise.race in wait methods
- })
+ promise.catch(() => {})
promise.finally(() => {
this.executing.delete(promise)
})
@@ -176,12 +205,30 @@ export class ExecutionEngine {
private async waitForAnyExecution(): Promise {
if (this.executing.size > 0) {
- await Promise.race(this.executing)
+ const abortPromise = this.getAbortPromise()
+ if (abortPromise) {
+ await Promise.race([...this.executing, abortPromise])
+ } else {
+ await Promise.race(this.executing)
+ }
}
}
private async waitForAllExecutions(): Promise {
- await Promise.all(Array.from(this.executing))
+ const abortPromise = this.getAbortPromise()
+ if (abortPromise) {
+ await Promise.race([Promise.all(this.executing), abortPromise])
+ } else {
+ await Promise.all(this.executing)
+ }
+ }
+
+ /**
+ * Returns the cached abort promise. This is safe to call multiple times
+ * as it reuses the same promise instance created during initialization.
+ */
+ private getAbortPromise(): Promise | null {
+ return this.abortPromise
}
private async withQueueLock(fn: () => Promise | T): Promise {
@@ -277,7 +324,7 @@ export class ExecutionEngine {
this.trackExecution(promise)
}
- if (this.executing.size > 0) {
+ if (this.executing.size > 0 && !this.cancelledFlag) {
await this.waitForAnyExecution()
}
}
@@ -336,7 +383,6 @@ export class ExecutionEngine {
this.addMultipleToQueue(readyNodes)
- // Check for dynamically added nodes (e.g., from parallel expansion)
if (this.context.pendingDynamicNodes && this.context.pendingDynamicNodes.length > 0) {
const dynamicNodes = this.context.pendingDynamicNodes
this.context.pendingDynamicNodes = []
From 1f1f015031aae10481717af6b27ecc607ded9426 Mon Sep 17 00:00:00 2001
From: Vikhyath Mondreti
Date: Tue, 20 Jan 2026 17:49:00 -0800
Subject: [PATCH 06/14] improvement(files): update execution for passing base64
strings (#2906)
* progress
* improvement(execution): update execution for passing base64 strings
* fix types
* cleanup comments
* path security vuln
* reject promise correctly
* fix redirect case
* remove proxy routes
* fix tests
* use ipaddr
---
.../sim/app/api/copilot/execute-tool/route.ts | 2 +-
apps/sim/app/api/files/parse/route.ts | 173 ++++++--
apps/sim/app/api/proxy/route.ts | 395 ------------------
.../app/api/{proxy => tools}/image/route.ts | 0
.../sim/app/api/{proxy => tools}/stt/route.ts | 0
.../sim/app/api/{proxy => tools}/tts/route.ts | 0
.../api/{proxy => tools}/tts/unified/route.ts | 0
.../app/api/{proxy => tools}/video/route.ts | 0
.../app/api/workflows/[id]/execute/route.ts | 48 ++-
apps/sim/app/chat/hooks/use-chat-streaming.ts | 8 +-
.../components/tag-dropdown/tag-dropdown.tsx | 40 +-
apps/sim/background/schedule-execution.ts | 2 +
apps/sim/background/webhook-execution.ts | 3 +
apps/sim/background/workflow-execution.ts | 2 +
apps/sim/blocks/blocks/file.ts | 4 +
apps/sim/executor/execution/block-executor.ts | 12 +
apps/sim/executor/execution/executor.ts | 2 +
apps/sim/executor/execution/types.ts | 2 +
.../handlers/agent/agent-handler.test.ts | 6 +-
.../executor/handlers/agent/agent-handler.ts | 1 -
.../executor/handlers/api/api-handler.test.ts | 4 -
apps/sim/executor/handlers/api/api-handler.ts | 1 -
.../condition/condition-handler.test.ts | 1 -
.../handlers/condition/condition-handler.ts | 1 -
.../function/function-handler.test.ts | 3 -
.../handlers/function/function-handler.ts | 1 -
.../handlers/generic/generic-handler.test.ts | 1 -
.../handlers/generic/generic-handler.ts | 1 -
.../human-in-the-loop-handler.ts | 2 +-
apps/sim/executor/types.ts | 14 +
apps/sim/executor/utils/start-block.ts | 4 +-
.../sim/executor/variables/resolvers/block.ts | 62 ++-
.../core/security/input-validation.test.ts | 27 +-
.../sim/lib/core/security/input-validation.ts | 308 ++++++++++----
apps/sim/lib/core/security/redaction.test.ts | 94 +++++
apps/sim/lib/core/security/redaction.ts | 25 ++
apps/sim/lib/core/utils/display-filters.ts | 27 +-
apps/sim/lib/core/utils/user-file.ts | 57 +++
.../execution/execution-file-manager.ts | 4 +-
.../lib/uploads/utils/file-utils.server.ts | 41 +-
.../uploads/utils/user-file-base64.server.ts | 319 ++++++++++++++
apps/sim/lib/webhooks/rss-polling-service.ts | 9 +-
apps/sim/lib/webhooks/utils.server.ts | 17 +-
.../sim/lib/workflows/blocks/block-outputs.ts | 21 +-
.../workflows/executor/execute-workflow.ts | 4 +
.../lib/workflows/executor/execution-core.ts | 20 +-
.../executor/human-in-the-loop-manager.ts | 2 +
apps/sim/lib/workflows/streaming/streaming.ts | 46 +-
apps/sim/lib/workflows/types.ts | 10 +-
apps/sim/package.json | 1 +
apps/sim/providers/anthropic/index.ts | 2 +-
apps/sim/providers/azure-openai/index.ts | 2 +-
apps/sim/providers/bedrock/index.ts | 2 +-
apps/sim/providers/cerebras/index.ts | 2 +-
apps/sim/providers/deepseek/index.ts | 2 +-
apps/sim/providers/gemini/core.ts | 2 +-
apps/sim/providers/groq/index.ts | 2 +-
apps/sim/providers/mistral/index.ts | 2 +-
apps/sim/providers/ollama/index.ts | 2 +-
apps/sim/providers/openai/index.ts | 2 +-
apps/sim/providers/openrouter/index.ts | 2 +-
apps/sim/providers/vllm/index.ts | 2 +-
apps/sim/providers/xai/index.ts | 2 +-
apps/sim/tools/elevenlabs/tts.ts | 2 +-
apps/sim/tools/file/parser.ts | 57 ++-
apps/sim/tools/file/types.ts | 36 +-
apps/sim/tools/index.test.ts | 185 ++++----
apps/sim/tools/index.ts | 188 ++-------
apps/sim/tools/openai/image.ts | 2 +-
apps/sim/tools/stt/assemblyai.ts | 2 +-
apps/sim/tools/stt/deepgram.ts | 2 +-
apps/sim/tools/stt/elevenlabs.ts | 2 +-
apps/sim/tools/stt/gemini.ts | 2 +-
apps/sim/tools/stt/whisper.ts | 2 +-
apps/sim/tools/tts/azure.ts | 2 +-
apps/sim/tools/tts/cartesia.ts | 2 +-
apps/sim/tools/tts/deepgram.ts | 2 +-
apps/sim/tools/tts/elevenlabs.ts | 2 +-
apps/sim/tools/tts/google.ts | 2 +-
apps/sim/tools/tts/openai.ts | 2 +-
apps/sim/tools/tts/playht.ts | 2 +-
apps/sim/tools/video/falai.ts | 2 +-
apps/sim/tools/video/luma.ts | 2 +-
apps/sim/tools/video/minimax.ts | 2 +-
apps/sim/tools/video/runway.ts | 2 +-
apps/sim/tools/video/veo.ts | 2 +-
bun.lock | 6 +-
87 files changed, 1399 insertions(+), 964 deletions(-)
delete mode 100644 apps/sim/app/api/proxy/route.ts
rename apps/sim/app/api/{proxy => tools}/image/route.ts (100%)
rename apps/sim/app/api/{proxy => tools}/stt/route.ts (100%)
rename apps/sim/app/api/{proxy => tools}/tts/route.ts (100%)
rename apps/sim/app/api/{proxy => tools}/tts/unified/route.ts (100%)
rename apps/sim/app/api/{proxy => tools}/video/route.ts (100%)
create mode 100644 apps/sim/lib/core/utils/user-file.ts
create mode 100644 apps/sim/lib/uploads/utils/user-file-base64.server.ts
diff --git a/apps/sim/app/api/copilot/execute-tool/route.ts b/apps/sim/app/api/copilot/execute-tool/route.ts
index c8205821fb..e38309968b 100644
--- a/apps/sim/app/api/copilot/execute-tool/route.ts
+++ b/apps/sim/app/api/copilot/execute-tool/route.ts
@@ -224,7 +224,7 @@ export async function POST(req: NextRequest) {
hasApiKey: !!executionParams.apiKey,
})
- const result = await executeTool(resolvedToolName, executionParams, true)
+ const result = await executeTool(resolvedToolName, executionParams)
logger.info(`[${tracker.requestId}] Tool execution complete`, {
toolName,
diff --git a/apps/sim/app/api/files/parse/route.ts b/apps/sim/app/api/files/parse/route.ts
index 4e4d54f18b..50dc55572a 100644
--- a/apps/sim/app/api/files/parse/route.ts
+++ b/apps/sim/app/api/files/parse/route.ts
@@ -6,9 +6,10 @@ import { createLogger } from '@sim/logger'
import binaryExtensionsList from 'binary-extensions'
import { type NextRequest, NextResponse } from 'next/server'
import { checkHybridAuth } from '@/lib/auth/hybrid'
-import { createPinnedUrl, validateUrlWithDNS } from '@/lib/core/security/input-validation'
+import { secureFetchWithPinnedIP, validateUrlWithDNS } from '@/lib/core/security/input-validation'
import { isSupportedFileType, parseFile } from '@/lib/file-parsers'
import { isUsingCloudStorage, type StorageContext, StorageService } from '@/lib/uploads'
+import { uploadExecutionFile } from '@/lib/uploads/contexts/execution'
import { UPLOAD_DIR_SERVER } from '@/lib/uploads/core/setup.server'
import { getFileMetadataByKey } from '@/lib/uploads/server/metadata'
import {
@@ -21,6 +22,7 @@ import {
} from '@/lib/uploads/utils/file-utils'
import { getUserEntityPermissions } from '@/lib/workspaces/permissions/utils'
import { verifyFileAccess } from '@/app/api/files/authorization'
+import type { UserFile } from '@/executor/types'
import '@/lib/uploads/core/setup.server'
export const dynamic = 'force-dynamic'
@@ -30,6 +32,12 @@ const logger = createLogger('FilesParseAPI')
const MAX_DOWNLOAD_SIZE_BYTES = 100 * 1024 * 1024 // 100 MB
const DOWNLOAD_TIMEOUT_MS = 30000 // 30 seconds
+interface ExecutionContext {
+ workspaceId: string
+ workflowId: string
+ executionId: string
+}
+
interface ParseResult {
success: boolean
content?: string
@@ -37,6 +45,7 @@ interface ParseResult {
filePath: string
originalName?: string // Original filename from database (for workspace files)
viewerUrl?: string | null // Viewer URL for the file if available
+ userFile?: UserFile // UserFile object for the raw file
metadata?: {
fileType: string
size: number
@@ -70,27 +79,45 @@ export async function POST(request: NextRequest) {
const userId = authResult.userId
const requestData = await request.json()
- const { filePath, fileType, workspaceId } = requestData
+ const { filePath, fileType, workspaceId, workflowId, executionId } = requestData
if (!filePath || (typeof filePath === 'string' && filePath.trim() === '')) {
return NextResponse.json({ success: false, error: 'No file path provided' }, { status: 400 })
}
- logger.info('File parse request received:', { filePath, fileType, workspaceId, userId })
+ // Build execution context if all required fields are present
+ const executionContext: ExecutionContext | undefined =
+ workspaceId && workflowId && executionId
+ ? { workspaceId, workflowId, executionId }
+ : undefined
+
+ logger.info('File parse request received:', {
+ filePath,
+ fileType,
+ workspaceId,
+ userId,
+ hasExecutionContext: !!executionContext,
+ })
if (Array.isArray(filePath)) {
const results = []
- for (const path of filePath) {
- if (!path || (typeof path === 'string' && path.trim() === '')) {
+ for (const singlePath of filePath) {
+ if (!singlePath || (typeof singlePath === 'string' && singlePath.trim() === '')) {
results.push({
success: false,
error: 'Empty file path in array',
- filePath: path || '',
+ filePath: singlePath || '',
})
continue
}
- const result = await parseFileSingle(path, fileType, workspaceId, userId)
+ const result = await parseFileSingle(
+ singlePath,
+ fileType,
+ workspaceId,
+ userId,
+ executionContext
+ )
if (result.metadata) {
result.metadata.processingTime = Date.now() - startTime
}
@@ -106,6 +133,7 @@ export async function POST(request: NextRequest) {
fileType: result.metadata?.fileType || 'application/octet-stream',
size: result.metadata?.size || 0,
binary: false,
+ file: result.userFile,
},
filePath: result.filePath,
viewerUrl: result.viewerUrl,
@@ -121,7 +149,7 @@ export async function POST(request: NextRequest) {
})
}
- const result = await parseFileSingle(filePath, fileType, workspaceId, userId)
+ const result = await parseFileSingle(filePath, fileType, workspaceId, userId, executionContext)
if (result.metadata) {
result.metadata.processingTime = Date.now() - startTime
@@ -137,6 +165,7 @@ export async function POST(request: NextRequest) {
fileType: result.metadata?.fileType || 'application/octet-stream',
size: result.metadata?.size || 0,
binary: false,
+ file: result.userFile,
},
filePath: result.filePath,
viewerUrl: result.viewerUrl,
@@ -164,7 +193,8 @@ async function parseFileSingle(
filePath: string,
fileType: string,
workspaceId: string,
- userId: string
+ userId: string,
+ executionContext?: ExecutionContext
): Promise {
logger.info('Parsing file:', filePath)
@@ -186,18 +216,18 @@ async function parseFileSingle(
}
if (filePath.includes('/api/files/serve/')) {
- return handleCloudFile(filePath, fileType, undefined, userId)
+ return handleCloudFile(filePath, fileType, undefined, userId, executionContext)
}
if (filePath.startsWith('http://') || filePath.startsWith('https://')) {
- return handleExternalUrl(filePath, fileType, workspaceId, userId)
+ return handleExternalUrl(filePath, fileType, workspaceId, userId, executionContext)
}
if (isUsingCloudStorage()) {
- return handleCloudFile(filePath, fileType, undefined, userId)
+ return handleCloudFile(filePath, fileType, undefined, userId, executionContext)
}
- return handleLocalFile(filePath, fileType, userId)
+ return handleLocalFile(filePath, fileType, userId, executionContext)
}
/**
@@ -230,12 +260,14 @@ function validateFilePath(filePath: string): { isValid: boolean; error?: string
/**
* Handle external URL
* If workspaceId is provided, checks if file already exists and saves to workspace if not
+ * If executionContext is provided, also stores the file in execution storage and returns UserFile
*/
async function handleExternalUrl(
url: string,
fileType: string,
workspaceId: string,
- userId: string
+ userId: string,
+ executionContext?: ExecutionContext
): Promise {
try {
logger.info('Fetching external URL:', url)
@@ -312,17 +344,13 @@ async function handleExternalUrl(
if (existingFile) {
const storageFilePath = `/api/files/serve/${existingFile.key}`
- return handleCloudFile(storageFilePath, fileType, 'workspace', userId)
+ return handleCloudFile(storageFilePath, fileType, 'workspace', userId, executionContext)
}
}
}
- const pinnedUrl = createPinnedUrl(url, urlValidation.resolvedIP!)
- const response = await fetch(pinnedUrl, {
- signal: AbortSignal.timeout(DOWNLOAD_TIMEOUT_MS),
- headers: {
- Host: urlValidation.originalHostname!,
- },
+ const response = await secureFetchWithPinnedIP(url, urlValidation.resolvedIP!, {
+ timeout: DOWNLOAD_TIMEOUT_MS,
})
if (!response.ok) {
throw new Error(`Failed to fetch URL: ${response.status} ${response.statusText}`)
@@ -341,6 +369,19 @@ async function handleExternalUrl(
logger.info(`Downloaded file from URL: ${url}, size: ${buffer.length} bytes`)
+ let userFile: UserFile | undefined
+ const mimeType = response.headers.get('content-type') || getMimeTypeFromExtension(extension)
+
+ if (executionContext) {
+ try {
+ userFile = await uploadExecutionFile(executionContext, buffer, filename, mimeType, userId)
+ logger.info(`Stored file in execution storage: ${filename}`, { key: userFile.key })
+ } catch (uploadError) {
+ logger.warn(`Failed to store file in execution storage:`, uploadError)
+ // Continue without userFile - parsing can still work
+ }
+ }
+
if (shouldCheckWorkspace) {
try {
const permission = await getUserEntityPermissions(userId, 'workspace', workspaceId)
@@ -353,8 +394,6 @@ async function handleExternalUrl(
})
} else {
const { uploadWorkspaceFile } = await import('@/lib/uploads/contexts/workspace')
- const mimeType =
- response.headers.get('content-type') || getMimeTypeFromExtension(extension)
await uploadWorkspaceFile(workspaceId, userId, buffer, filename, mimeType)
logger.info(`Saved URL file to workspace storage: ${filename}`)
}
@@ -363,17 +402,23 @@ async function handleExternalUrl(
}
}
+ let parseResult: ParseResult
if (extension === 'pdf') {
- return await handlePdfBuffer(buffer, filename, fileType, url)
- }
- if (extension === 'csv') {
- return await handleCsvBuffer(buffer, filename, fileType, url)
+ parseResult = await handlePdfBuffer(buffer, filename, fileType, url)
+ } else if (extension === 'csv') {
+ parseResult = await handleCsvBuffer(buffer, filename, fileType, url)
+ } else if (isSupportedFileType(extension)) {
+ parseResult = await handleGenericTextBuffer(buffer, filename, extension, fileType, url)
+ } else {
+ parseResult = handleGenericBuffer(buffer, filename, extension, fileType)
}
- if (isSupportedFileType(extension)) {
- return await handleGenericTextBuffer(buffer, filename, extension, fileType, url)
+
+ // Attach userFile to the result
+ if (userFile) {
+ parseResult.userFile = userFile
}
- return handleGenericBuffer(buffer, filename, extension, fileType)
+ return parseResult
} catch (error) {
logger.error(`Error handling external URL ${url}:`, error)
return {
@@ -386,12 +431,15 @@ async function handleExternalUrl(
/**
* Handle file stored in cloud storage
+ * If executionContext is provided and file is not already from execution storage,
+ * copies the file to execution storage and returns UserFile
*/
async function handleCloudFile(
filePath: string,
fileType: string,
explicitContext: string | undefined,
- userId: string
+ userId: string,
+ executionContext?: ExecutionContext
): Promise {
try {
const cloudKey = extractStorageKey(filePath)
@@ -438,6 +486,7 @@ async function handleCloudFile(
const filename = originalFilename || cloudKey.split('/').pop() || cloudKey
const extension = path.extname(filename).toLowerCase().substring(1)
+ const mimeType = getMimeTypeFromExtension(extension)
const normalizedFilePath = `/api/files/serve/${encodeURIComponent(cloudKey)}?context=${context}`
let workspaceIdFromKey: string | undefined
@@ -453,6 +502,39 @@ async function handleCloudFile(
const viewerUrl = getViewerUrl(cloudKey, workspaceIdFromKey)
+ // Store file in execution storage if executionContext is provided
+ let userFile: UserFile | undefined
+
+ if (executionContext) {
+ // If file is already from execution context, create UserFile reference without re-uploading
+ if (context === 'execution') {
+ userFile = {
+ id: `file_${Date.now()}_${Math.random().toString(36).substring(2, 9)}`,
+ name: filename,
+ url: normalizedFilePath,
+ size: fileBuffer.length,
+ type: mimeType,
+ key: cloudKey,
+ context: 'execution',
+ }
+ logger.info(`Created UserFile reference for existing execution file: ${filename}`)
+ } else {
+ // Copy from workspace/other storage to execution storage
+ try {
+ userFile = await uploadExecutionFile(
+ executionContext,
+ fileBuffer,
+ filename,
+ mimeType,
+ userId
+ )
+ logger.info(`Copied file to execution storage: ${filename}`, { key: userFile.key })
+ } catch (uploadError) {
+ logger.warn(`Failed to copy file to execution storage:`, uploadError)
+ }
+ }
+ }
+
let parseResult: ParseResult
if (extension === 'pdf') {
parseResult = await handlePdfBuffer(fileBuffer, filename, fileType, normalizedFilePath)
@@ -477,6 +559,11 @@ async function handleCloudFile(
parseResult.viewerUrl = viewerUrl
+ // Attach userFile to the result
+ if (userFile) {
+ parseResult.userFile = userFile
+ }
+
return parseResult
} catch (error) {
logger.error(`Error handling cloud file ${filePath}:`, error)
@@ -500,7 +587,8 @@ async function handleCloudFile(
async function handleLocalFile(
filePath: string,
fileType: string,
- userId: string
+ userId: string,
+ executionContext?: ExecutionContext
): Promise {
try {
const filename = filePath.split('/').pop() || filePath
@@ -540,13 +628,32 @@ async function handleLocalFile(
const hash = createHash('md5').update(fileBuffer).digest('hex')
const extension = path.extname(filename).toLowerCase().substring(1)
+ const mimeType = fileType || getMimeTypeFromExtension(extension)
+
+ // Store file in execution storage if executionContext is provided
+ let userFile: UserFile | undefined
+ if (executionContext) {
+ try {
+ userFile = await uploadExecutionFile(
+ executionContext,
+ fileBuffer,
+ filename,
+ mimeType,
+ userId
+ )
+ logger.info(`Stored local file in execution storage: ${filename}`, { key: userFile.key })
+ } catch (uploadError) {
+ logger.warn(`Failed to store local file in execution storage:`, uploadError)
+ }
+ }
return {
success: true,
content: result.content,
filePath,
+ userFile,
metadata: {
- fileType: fileType || getMimeTypeFromExtension(extension),
+ fileType: mimeType,
size: stats.size,
hash,
processingTime: 0,
diff --git a/apps/sim/app/api/proxy/route.ts b/apps/sim/app/api/proxy/route.ts
deleted file mode 100644
index 24702aa48f..0000000000
--- a/apps/sim/app/api/proxy/route.ts
+++ /dev/null
@@ -1,395 +0,0 @@
-import { createLogger } from '@sim/logger'
-import type { NextRequest } from 'next/server'
-import { NextResponse } from 'next/server'
-import { z } from 'zod'
-import { checkHybridAuth } from '@/lib/auth/hybrid'
-import { generateInternalToken } from '@/lib/auth/internal'
-import { isDev } from '@/lib/core/config/feature-flags'
-import { createPinnedUrl, validateUrlWithDNS } from '@/lib/core/security/input-validation'
-import { generateRequestId } from '@/lib/core/utils/request'
-import { getBaseUrl } from '@/lib/core/utils/urls'
-import { executeTool } from '@/tools'
-import { getTool, validateRequiredParametersAfterMerge } from '@/tools/utils'
-
-const logger = createLogger('ProxyAPI')
-
-const proxyPostSchema = z.object({
- toolId: z.string().min(1, 'toolId is required'),
- params: z.record(z.any()).optional().default({}),
- executionContext: z
- .object({
- workflowId: z.string().optional(),
- workspaceId: z.string().optional(),
- executionId: z.string().optional(),
- userId: z.string().optional(),
- })
- .optional(),
-})
-
-/**
- * Creates a minimal set of default headers for proxy requests
- * @returns Record of HTTP headers
- */
-const getProxyHeaders = (): Record => {
- return {
- 'User-Agent':
- 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/135.0.0.0 Safari/537.36',
- Accept: '*/*',
- 'Accept-Encoding': 'gzip, deflate, br',
- 'Cache-Control': 'no-cache',
- Connection: 'keep-alive',
- }
-}
-
-/**
- * Formats a response with CORS headers
- * @param responseData Response data object
- * @param status HTTP status code
- * @returns NextResponse with CORS headers
- */
-const formatResponse = (responseData: any, status = 200) => {
- return NextResponse.json(responseData, {
- status,
- headers: {
- 'Access-Control-Allow-Origin': '*',
- 'Access-Control-Allow-Methods': 'GET, POST, PUT, DELETE, OPTIONS',
- 'Access-Control-Allow-Headers': 'Content-Type, Authorization',
- },
- })
-}
-
-/**
- * Creates an error response with consistent formatting
- * @param error Error object or message
- * @param status HTTP status code
- * @param additionalData Additional data to include in the response
- * @returns Formatted error response
- */
-const createErrorResponse = (error: any, status = 500, additionalData = {}) => {
- const errorMessage = error instanceof Error ? error.message : String(error)
- const errorStack = error instanceof Error ? error.stack : undefined
-
- logger.error('Creating error response', {
- errorMessage,
- status,
- stack: isDev ? errorStack : undefined,
- })
-
- return formatResponse(
- {
- success: false,
- error: errorMessage,
- stack: isDev ? errorStack : undefined,
- ...additionalData,
- },
- status
- )
-}
-
-/**
- * GET handler for direct external URL proxying
- * This allows for GET requests to external APIs
- */
-export async function GET(request: Request) {
- const url = new URL(request.url)
- const targetUrl = url.searchParams.get('url')
- const requestId = generateRequestId()
-
- // Vault download proxy: /api/proxy?vaultDownload=1&bucket=...&object=...&credentialId=...
- const vaultDownload = url.searchParams.get('vaultDownload')
- if (vaultDownload === '1') {
- try {
- const bucket = url.searchParams.get('bucket')
- const objectParam = url.searchParams.get('object')
- const credentialId = url.searchParams.get('credentialId')
-
- if (!bucket || !objectParam || !credentialId) {
- return createErrorResponse('Missing bucket, object, or credentialId', 400)
- }
-
- // Fetch access token using existing token API
- const baseUrl = new URL(getBaseUrl())
- const tokenUrl = new URL('/api/auth/oauth/token', baseUrl)
-
- // Build headers: forward session cookies if present; include internal auth for server-side
- const tokenHeaders: Record = { 'Content-Type': 'application/json' }
- const incomingCookie = request.headers.get('cookie')
- if (incomingCookie) tokenHeaders.Cookie = incomingCookie
- try {
- const internalToken = await generateInternalToken()
- tokenHeaders.Authorization = `Bearer ${internalToken}`
- } catch (_e) {
- // best-effort internal auth
- }
-
- // Optional workflow context for collaboration auth
- const workflowId = url.searchParams.get('workflowId') || undefined
-
- const tokenRes = await fetch(tokenUrl.toString(), {
- method: 'POST',
- headers: tokenHeaders,
- body: JSON.stringify({ credentialId, workflowId }),
- })
-
- if (!tokenRes.ok) {
- const err = await tokenRes.text()
- return createErrorResponse(`Failed to fetch access token: ${err}`, 401)
- }
-
- const tokenJson = await tokenRes.json()
- const accessToken = tokenJson.accessToken
- if (!accessToken) {
- return createErrorResponse('No access token available', 401)
- }
-
- // Avoid double-encoding: incoming object may already be percent-encoded
- const objectDecoded = decodeURIComponent(objectParam)
- const gcsUrl = `https://storage.googleapis.com/storage/v1/b/${encodeURIComponent(
- bucket
- )}/o/${encodeURIComponent(objectDecoded)}?alt=media`
-
- const fileRes = await fetch(gcsUrl, {
- headers: { Authorization: `Bearer ${accessToken}` },
- })
-
- if (!fileRes.ok) {
- const errText = await fileRes.text()
- return createErrorResponse(errText || 'Failed to download file', fileRes.status)
- }
-
- const headers = new Headers()
- fileRes.headers.forEach((v, k) => headers.set(k, v))
- return new NextResponse(fileRes.body, { status: 200, headers })
- } catch (error: any) {
- logger.error(`[${requestId}] Vault download proxy failed`, {
- error: error instanceof Error ? error.message : String(error),
- })
- return createErrorResponse('Vault download failed', 500)
- }
- }
-
- if (!targetUrl) {
- logger.error(`[${requestId}] Missing 'url' parameter`)
- return createErrorResponse("Missing 'url' parameter", 400)
- }
-
- const urlValidation = await validateUrlWithDNS(targetUrl)
- if (!urlValidation.isValid) {
- logger.warn(`[${requestId}] Blocked proxy request`, {
- url: targetUrl.substring(0, 100),
- error: urlValidation.error,
- })
- return createErrorResponse(urlValidation.error || 'Invalid URL', 403)
- }
-
- const method = url.searchParams.get('method') || 'GET'
-
- const bodyParam = url.searchParams.get('body')
- let body: string | undefined
-
- if (bodyParam && ['POST', 'PUT', 'PATCH'].includes(method.toUpperCase())) {
- try {
- body = decodeURIComponent(bodyParam)
- } catch (error) {
- logger.warn(`[${requestId}] Failed to decode body parameter`, error)
- }
- }
-
- const customHeaders: Record = {}
-
- for (const [key, value] of url.searchParams.entries()) {
- if (key.startsWith('header.')) {
- const headerName = key.substring(7)
- customHeaders[headerName] = value
- }
- }
-
- if (body && !customHeaders['Content-Type']) {
- customHeaders['Content-Type'] = 'application/json'
- }
-
- logger.info(`[${requestId}] Proxying ${method} request to: ${targetUrl}`)
-
- try {
- const pinnedUrl = createPinnedUrl(targetUrl, urlValidation.resolvedIP!)
- const response = await fetch(pinnedUrl, {
- method: method,
- headers: {
- ...getProxyHeaders(),
- ...customHeaders,
- Host: urlValidation.originalHostname!,
- },
- body: body || undefined,
- })
-
- const contentType = response.headers.get('content-type') || ''
- let data
-
- if (contentType.includes('application/json')) {
- data = await response.json()
- } else {
- data = await response.text()
- }
-
- const errorMessage = !response.ok
- ? data && typeof data === 'object' && data.error
- ? `${data.error.message || JSON.stringify(data.error)}`
- : response.statusText || `HTTP error ${response.status}`
- : undefined
-
- if (!response.ok) {
- logger.error(`[${requestId}] External API error: ${response.status} ${response.statusText}`)
- }
-
- return formatResponse({
- success: response.ok,
- status: response.status,
- statusText: response.statusText,
- headers: Object.fromEntries(response.headers.entries()),
- data,
- error: errorMessage,
- })
- } catch (error: any) {
- logger.error(`[${requestId}] Proxy GET request failed`, {
- url: targetUrl,
- error: error instanceof Error ? error.message : String(error),
- stack: error instanceof Error ? error.stack : undefined,
- })
-
- return createErrorResponse(error)
- }
-}
-
-export async function POST(request: NextRequest) {
- const requestId = generateRequestId()
- const startTime = new Date()
- const startTimeISO = startTime.toISOString()
-
- try {
- const authResult = await checkHybridAuth(request, { requireWorkflowId: false })
- if (!authResult.success) {
- logger.error(`[${requestId}] Authentication failed for proxy:`, authResult.error)
- return createErrorResponse('Unauthorized', 401)
- }
-
- let requestBody
- try {
- requestBody = await request.json()
- } catch (parseError) {
- logger.error(`[${requestId}] Failed to parse request body`, {
- error: parseError instanceof Error ? parseError.message : String(parseError),
- })
- throw new Error('Invalid JSON in request body')
- }
-
- const validationResult = proxyPostSchema.safeParse(requestBody)
- if (!validationResult.success) {
- logger.error(`[${requestId}] Request validation failed`, {
- errors: validationResult.error.errors,
- })
- const errorMessages = validationResult.error.errors
- .map((err) => `${err.path.join('.')}: ${err.message}`)
- .join(', ')
- throw new Error(`Validation failed: ${errorMessages}`)
- }
-
- const { toolId, params } = validationResult.data
-
- logger.info(`[${requestId}] Processing tool: ${toolId}`)
-
- const tool = getTool(toolId)
-
- if (!tool) {
- logger.error(`[${requestId}] Tool not found: ${toolId}`)
- throw new Error(`Tool not found: ${toolId}`)
- }
-
- try {
- validateRequiredParametersAfterMerge(toolId, tool, params)
- } catch (validationError) {
- logger.warn(`[${requestId}] Tool validation failed for ${toolId}`, {
- error: validationError instanceof Error ? validationError.message : String(validationError),
- })
-
- const endTime = new Date()
- const endTimeISO = endTime.toISOString()
- const duration = endTime.getTime() - startTime.getTime()
-
- return createErrorResponse(validationError, 400, {
- startTime: startTimeISO,
- endTime: endTimeISO,
- duration,
- })
- }
-
- const hasFileOutputs =
- tool.outputs &&
- Object.values(tool.outputs).some(
- (output) => output.type === 'file' || output.type === 'file[]'
- )
-
- const result = await executeTool(
- toolId,
- params,
- true, // skipProxy (we're already in the proxy)
- !hasFileOutputs, // skipPostProcess (don't skip if tool has file outputs)
- undefined // execution context is not available in proxy context
- )
-
- if (!result.success) {
- logger.warn(`[${requestId}] Tool execution failed for ${toolId}`, {
- error: result.error || 'Unknown error',
- })
-
- throw new Error(result.error || 'Tool execution failed')
- }
-
- const endTime = new Date()
- const endTimeISO = endTime.toISOString()
- const duration = endTime.getTime() - startTime.getTime()
-
- const responseWithTimingData = {
- ...result,
- startTime: startTimeISO,
- endTime: endTimeISO,
- duration,
- timing: {
- startTime: startTimeISO,
- endTime: endTimeISO,
- duration,
- },
- }
-
- logger.info(`[${requestId}] Tool executed successfully: ${toolId} (${duration}ms)`)
-
- return formatResponse(responseWithTimingData)
- } catch (error: any) {
- logger.error(`[${requestId}] Proxy request failed`, {
- error: error instanceof Error ? error.message : String(error),
- stack: error instanceof Error ? error.stack : undefined,
- name: error instanceof Error ? error.name : undefined,
- })
-
- const endTime = new Date()
- const endTimeISO = endTime.toISOString()
- const duration = endTime.getTime() - startTime.getTime()
-
- return createErrorResponse(error, 500, {
- startTime: startTimeISO,
- endTime: endTimeISO,
- duration,
- })
- }
-}
-
-export async function OPTIONS() {
- return new NextResponse(null, {
- status: 204,
- headers: {
- 'Access-Control-Allow-Origin': '*',
- 'Access-Control-Allow-Methods': 'GET, POST, PUT, DELETE, OPTIONS',
- 'Access-Control-Allow-Headers': 'Content-Type, Authorization',
- 'Access-Control-Max-Age': '86400',
- },
- })
-}
diff --git a/apps/sim/app/api/proxy/image/route.ts b/apps/sim/app/api/tools/image/route.ts
similarity index 100%
rename from apps/sim/app/api/proxy/image/route.ts
rename to apps/sim/app/api/tools/image/route.ts
diff --git a/apps/sim/app/api/proxy/stt/route.ts b/apps/sim/app/api/tools/stt/route.ts
similarity index 100%
rename from apps/sim/app/api/proxy/stt/route.ts
rename to apps/sim/app/api/tools/stt/route.ts
diff --git a/apps/sim/app/api/proxy/tts/route.ts b/apps/sim/app/api/tools/tts/route.ts
similarity index 100%
rename from apps/sim/app/api/proxy/tts/route.ts
rename to apps/sim/app/api/tools/tts/route.ts
diff --git a/apps/sim/app/api/proxy/tts/unified/route.ts b/apps/sim/app/api/tools/tts/unified/route.ts
similarity index 100%
rename from apps/sim/app/api/proxy/tts/unified/route.ts
rename to apps/sim/app/api/tools/tts/unified/route.ts
diff --git a/apps/sim/app/api/proxy/video/route.ts b/apps/sim/app/api/tools/video/route.ts
similarity index 100%
rename from apps/sim/app/api/proxy/video/route.ts
rename to apps/sim/app/api/tools/video/route.ts
diff --git a/apps/sim/app/api/workflows/[id]/execute/route.ts b/apps/sim/app/api/workflows/[id]/execute/route.ts
index df988f26a7..a850c7ac95 100644
--- a/apps/sim/app/api/workflows/[id]/execute/route.ts
+++ b/apps/sim/app/api/workflows/[id]/execute/route.ts
@@ -12,6 +12,10 @@ import { markExecutionCancelled } from '@/lib/execution/cancellation'
import { processInputFileFields } from '@/lib/execution/files'
import { preprocessExecution } from '@/lib/execution/preprocessing'
import { LoggingSession } from '@/lib/logs/execution/logging-session'
+import {
+ cleanupExecutionBase64Cache,
+ hydrateUserFilesWithBase64,
+} from '@/lib/uploads/utils/user-file-base64.server'
import { executeWorkflowCore } from '@/lib/workflows/executor/execution-core'
import { type ExecutionEvent, encodeSSEEvent } from '@/lib/workflows/executor/execution-events'
import { PauseResumeManager } from '@/lib/workflows/executor/human-in-the-loop-manager'
@@ -25,7 +29,7 @@ import type { WorkflowExecutionPayload } from '@/background/workflow-execution'
import { normalizeName } from '@/executor/constants'
import { ExecutionSnapshot } from '@/executor/execution/snapshot'
import type { ExecutionMetadata, IterationContext } from '@/executor/execution/types'
-import type { StreamingExecution } from '@/executor/types'
+import type { NormalizedBlockOutput, StreamingExecution } from '@/executor/types'
import { Serializer } from '@/serializer'
import { CORE_TRIGGER_TYPES, type CoreTriggerType } from '@/stores/logs/filters/types'
@@ -38,6 +42,8 @@ const ExecuteWorkflowSchema = z.object({
useDraftState: z.boolean().optional(),
input: z.any().optional(),
isClientSession: z.boolean().optional(),
+ includeFileBase64: z.boolean().optional().default(true),
+ base64MaxBytes: z.number().int().positive().optional(),
workflowStateOverride: z
.object({
blocks: z.record(z.any()),
@@ -214,6 +220,8 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
useDraftState,
input: validatedInput,
isClientSession = false,
+ includeFileBase64,
+ base64MaxBytes,
workflowStateOverride,
} = validation.data
@@ -227,6 +235,8 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
triggerType,
stream,
useDraftState,
+ includeFileBase64,
+ base64MaxBytes,
workflowStateOverride,
workflowId: _workflowId, // Also exclude workflowId used for internal JWT auth
...rest
@@ -427,16 +437,31 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
snapshot,
callbacks: {},
loggingSession,
+ includeFileBase64,
+ base64MaxBytes,
})
- const hasResponseBlock = workflowHasResponseBlock(result)
+ const outputWithBase64 = includeFileBase64
+ ? ((await hydrateUserFilesWithBase64(result.output, {
+ requestId,
+ executionId,
+ maxBytes: base64MaxBytes,
+ })) as NormalizedBlockOutput)
+ : result.output
+
+ const resultWithBase64 = { ...result, output: outputWithBase64 }
+
+ // Cleanup base64 cache for this execution
+ await cleanupExecutionBase64Cache(executionId)
+
+ const hasResponseBlock = workflowHasResponseBlock(resultWithBase64)
if (hasResponseBlock) {
- return createHttpResponseFromBlock(result)
+ return createHttpResponseFromBlock(resultWithBase64)
}
const filteredResult = {
success: result.success,
- output: result.output,
+ output: outputWithBase64,
error: result.error,
metadata: result.metadata
? {
@@ -498,6 +523,8 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
selectedOutputs: resolvedSelectedOutputs,
isSecureMode: false,
workflowTriggerType: triggerType === 'chat' ? 'chat' : 'api',
+ includeFileBase64,
+ base64MaxBytes,
},
executionId,
})
@@ -698,6 +725,8 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
},
loggingSession,
abortSignal: abortController.signal,
+ includeFileBase64,
+ base64MaxBytes,
})
if (result.status === 'paused') {
@@ -750,12 +779,21 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
workflowId,
data: {
success: result.success,
- output: result.output,
+ output: includeFileBase64
+ ? await hydrateUserFilesWithBase64(result.output, {
+ requestId,
+ executionId,
+ maxBytes: base64MaxBytes,
+ })
+ : result.output,
duration: result.metadata?.duration || 0,
startTime: result.metadata?.startTime || startTime.toISOString(),
endTime: result.metadata?.endTime || new Date().toISOString(),
},
})
+
+ // Cleanup base64 cache for this execution
+ await cleanupExecutionBase64Cache(executionId)
} catch (error: any) {
const errorMessage = error.message || 'Unknown error'
logger.error(`[${requestId}] SSE execution failed: ${errorMessage}`)
diff --git a/apps/sim/app/chat/hooks/use-chat-streaming.ts b/apps/sim/app/chat/hooks/use-chat-streaming.ts
index ac474fa377..e020870931 100644
--- a/apps/sim/app/chat/hooks/use-chat-streaming.ts
+++ b/apps/sim/app/chat/hooks/use-chat-streaming.ts
@@ -2,7 +2,7 @@
import { useRef, useState } from 'react'
import { createLogger } from '@sim/logger'
-import { isUserFile } from '@/lib/core/utils/display-filters'
+import { isUserFileWithMetadata } from '@/lib/core/utils/user-file'
import type { ChatFile, ChatMessage } from '@/app/chat/components/message/message'
import { CHAT_ERROR_MESSAGES } from '@/app/chat/constants'
@@ -17,7 +17,7 @@ function extractFilesFromData(
return files
}
- if (isUserFile(data)) {
+ if (isUserFileWithMetadata(data)) {
if (!seenIds.has(data.id)) {
seenIds.add(data.id)
files.push({
@@ -232,7 +232,7 @@ export function useChatStreaming() {
return null
}
- if (isUserFile(value)) {
+ if (isUserFileWithMetadata(value)) {
return null
}
@@ -285,7 +285,7 @@ export function useChatStreaming() {
const value = getOutputValue(blockOutputs, config.path)
- if (isUserFile(value)) {
+ if (isUserFileWithMetadata(value)) {
extractedFiles.push({
id: value.id,
name: value.name,
diff --git a/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/components/panel/components/editor/components/sub-block/components/tag-dropdown/tag-dropdown.tsx b/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/components/panel/components/editor/components/sub-block/components/tag-dropdown/tag-dropdown.tsx
index d5fde31199..32491d54e6 100644
--- a/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/components/panel/components/editor/components/sub-block/components/tag-dropdown/tag-dropdown.tsx
+++ b/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/components/panel/components/editor/components/sub-block/components/tag-dropdown/tag-dropdown.tsx
@@ -214,40 +214,18 @@ const getOutputTypeForPath = (
outputPath: string,
mergedSubBlocksOverride?: Record
): string => {
- if (block?.triggerMode && blockConfig?.triggers?.enabled) {
- return getBlockOutputType(block.type, outputPath, mergedSubBlocksOverride, true)
- }
- if (block?.type === 'starter') {
- const startWorkflowValue =
- mergedSubBlocksOverride?.startWorkflow?.value ?? getSubBlockValue(blockId, 'startWorkflow')
-
- if (startWorkflowValue === 'chat') {
- const chatModeTypes: Record = {
- input: 'string',
- conversationId: 'string',
- files: 'files',
- }
- return chatModeTypes[outputPath] || 'any'
- }
- const inputFormatValue =
- mergedSubBlocksOverride?.inputFormat?.value ?? getSubBlockValue(blockId, 'inputFormat')
- if (inputFormatValue && Array.isArray(inputFormatValue)) {
- const field = inputFormatValue.find(
- (f: { name?: string; type?: string }) => f.name === outputPath
- )
- if (field?.type) return field.type
- }
- } else if (blockConfig?.category === 'triggers') {
- const blockState = useWorkflowStore.getState().blocks[blockId]
- const subBlocks = mergedSubBlocksOverride ?? (blockState?.subBlocks || {})
- return getBlockOutputType(block.type, outputPath, subBlocks)
- } else {
+ const subBlocks =
+ mergedSubBlocksOverride ?? useWorkflowStore.getState().blocks[blockId]?.subBlocks
+ const triggerMode = block?.triggerMode && blockConfig?.triggers?.enabled
+
+ if (blockConfig?.tools?.config?.tool) {
const operationValue = getSubBlockValue(blockId, 'operation')
- if (blockConfig && operationValue) {
+ if (operationValue) {
return getToolOutputType(blockConfig, operationValue, outputPath)
}
}
- return 'any'
+
+ return getBlockOutputType(block?.type ?? '', outputPath, subBlocks, triggerMode)
}
/**
@@ -1789,7 +1767,7 @@ export const TagDropdown: React.FC = ({
mergedSubBlocks
)
- if (fieldType === 'files' || fieldType === 'array') {
+ if (fieldType === 'files' || fieldType === 'file[]' || fieldType === 'array') {
const blockName = parts[0]
const remainingPath = parts.slice(2).join('.')
processedTag = `${blockName}.${arrayFieldName}[0].${remainingPath}`
diff --git a/apps/sim/background/schedule-execution.ts b/apps/sim/background/schedule-execution.ts
index f4fc2a4430..7d19dc0604 100644
--- a/apps/sim/background/schedule-execution.ts
+++ b/apps/sim/background/schedule-execution.ts
@@ -208,6 +208,8 @@ async function runWorkflowExecution({
snapshot,
callbacks: {},
loggingSession,
+ includeFileBase64: true,
+ base64MaxBytes: undefined,
})
if (executionResult.status === 'paused') {
diff --git a/apps/sim/background/webhook-execution.ts b/apps/sim/background/webhook-execution.ts
index fbe0f08839..c34b5497b0 100644
--- a/apps/sim/background/webhook-execution.ts
+++ b/apps/sim/background/webhook-execution.ts
@@ -240,6 +240,8 @@ async function executeWebhookJobInternal(
snapshot,
callbacks: {},
loggingSession,
+ includeFileBase64: true, // Enable base64 hydration
+ base64MaxBytes: undefined, // Use default limit
})
if (executionResult.status === 'paused') {
@@ -493,6 +495,7 @@ async function executeWebhookJobInternal(
snapshot,
callbacks: {},
loggingSession,
+ includeFileBase64: true,
})
if (executionResult.status === 'paused') {
diff --git a/apps/sim/background/workflow-execution.ts b/apps/sim/background/workflow-execution.ts
index 491c9863b1..6a8cca8b1e 100644
--- a/apps/sim/background/workflow-execution.ts
+++ b/apps/sim/background/workflow-execution.ts
@@ -109,6 +109,8 @@ export async function executeWorkflowJob(payload: WorkflowExecutionPayload) {
snapshot,
callbacks: {},
loggingSession,
+ includeFileBase64: true,
+ base64MaxBytes: undefined,
})
if (result.status === 'paused') {
diff --git a/apps/sim/blocks/blocks/file.ts b/apps/sim/blocks/blocks/file.ts
index 46bf0f1380..eed7c3a256 100644
--- a/apps/sim/blocks/blocks/file.ts
+++ b/apps/sim/blocks/blocks/file.ts
@@ -121,5 +121,9 @@ export const FileBlock: BlockConfig = {
type: 'string',
description: 'All file contents merged into a single text string',
},
+ processedFiles: {
+ type: 'files',
+ description: 'Array of UserFile objects for downstream use (attachments, uploads, etc.)',
+ },
},
}
diff --git a/apps/sim/executor/execution/block-executor.ts b/apps/sim/executor/execution/block-executor.ts
index 116056d35e..2f60c96efc 100644
--- a/apps/sim/executor/execution/block-executor.ts
+++ b/apps/sim/executor/execution/block-executor.ts
@@ -3,6 +3,10 @@ import { mcpServers } from '@sim/db/schema'
import { createLogger } from '@sim/logger'
import { and, eq, inArray, isNull } from 'drizzle-orm'
import { getBaseUrl } from '@/lib/core/utils/urls'
+import {
+ containsUserFileWithMetadata,
+ hydrateUserFilesWithBase64,
+} from '@/lib/uploads/utils/user-file-base64.server'
import {
BlockType,
buildResumeApiUrl,
@@ -135,6 +139,14 @@ export class BlockExecutor {
normalizedOutput = this.normalizeOutput(output)
}
+ if (ctx.includeFileBase64 && containsUserFileWithMetadata(normalizedOutput)) {
+ normalizedOutput = (await hydrateUserFilesWithBase64(normalizedOutput, {
+ requestId: ctx.metadata.requestId,
+ executionId: ctx.executionId,
+ maxBytes: ctx.base64MaxBytes,
+ })) as NormalizedBlockOutput
+ }
+
const duration = Date.now() - startTime
if (blockLog) {
diff --git a/apps/sim/executor/execution/executor.ts b/apps/sim/executor/execution/executor.ts
index cf085b334f..c8da45234a 100644
--- a/apps/sim/executor/execution/executor.ts
+++ b/apps/sim/executor/execution/executor.ts
@@ -169,6 +169,8 @@ export class DAGExecutor {
onBlockStart: this.contextExtensions.onBlockStart,
onBlockComplete: this.contextExtensions.onBlockComplete,
abortSignal: this.contextExtensions.abortSignal,
+ includeFileBase64: this.contextExtensions.includeFileBase64,
+ base64MaxBytes: this.contextExtensions.base64MaxBytes,
}
if (this.contextExtensions.resumeFromSnapshot) {
diff --git a/apps/sim/executor/execution/types.ts b/apps/sim/executor/execution/types.ts
index 38d403f042..701f5de357 100644
--- a/apps/sim/executor/execution/types.ts
+++ b/apps/sim/executor/execution/types.ts
@@ -89,6 +89,8 @@ export interface ContextExtensions {
* When aborted, the execution should stop gracefully.
*/
abortSignal?: AbortSignal
+ includeFileBase64?: boolean
+ base64MaxBytes?: number
onStream?: (streamingExecution: unknown) => Promise
onBlockStart?: (
blockId: string,
diff --git a/apps/sim/executor/handlers/agent/agent-handler.test.ts b/apps/sim/executor/handlers/agent/agent-handler.test.ts
index 05be8ee039..a30f1a0458 100644
--- a/apps/sim/executor/handlers/agent/agent-handler.test.ts
+++ b/apps/sim/executor/handlers/agent/agent-handler.test.ts
@@ -387,7 +387,6 @@ describe('AgentBlockHandler', () => {
code: 'return { result: "auto tool executed", input }',
input: 'test input',
}),
- false, // skipProxy
false, // skipPostProcess
expect.any(Object) // execution context
)
@@ -400,7 +399,6 @@ describe('AgentBlockHandler', () => {
code: 'return { result: "force tool executed", input }',
input: 'another test',
}),
- false, // skipProxy
false, // skipPostProcess
expect.any(Object) // execution context
)
@@ -1407,7 +1405,7 @@ describe('AgentBlockHandler', () => {
})
it('should handle MCP tools in agent execution', async () => {
- mockExecuteTool.mockImplementation((toolId, params, skipProxy, skipPostProcess, context) => {
+ mockExecuteTool.mockImplementation((toolId, params, skipPostProcess, context) => {
if (isMcpTool(toolId)) {
return Promise.resolve({
success: true,
@@ -1682,7 +1680,7 @@ describe('AgentBlockHandler', () => {
it('should provide workspaceId context for MCP tool execution', async () => {
let capturedContext: any
- mockExecuteTool.mockImplementation((toolId, params, skipProxy, skipPostProcess, context) => {
+ mockExecuteTool.mockImplementation((toolId, params, skipPostProcess, context) => {
capturedContext = context
if (isMcpTool(toolId)) {
return Promise.resolve({
diff --git a/apps/sim/executor/handlers/agent/agent-handler.ts b/apps/sim/executor/handlers/agent/agent-handler.ts
index 6775a43067..098b813af5 100644
--- a/apps/sim/executor/handlers/agent/agent-handler.ts
+++ b/apps/sim/executor/handlers/agent/agent-handler.ts
@@ -325,7 +325,6 @@ export class AgentBlockHandler implements BlockHandler {
},
},
false,
- false,
ctx
)
diff --git a/apps/sim/executor/handlers/api/api-handler.test.ts b/apps/sim/executor/handlers/api/api-handler.test.ts
index 1a930f57ff..3af7fac6fd 100644
--- a/apps/sim/executor/handlers/api/api-handler.test.ts
+++ b/apps/sim/executor/handlers/api/api-handler.test.ts
@@ -106,7 +106,6 @@ describe('ApiBlockHandler', () => {
body: { key: 'value' }, // Expect parsed body
_context: { workflowId: 'test-workflow-id' },
},
- false, // skipProxy
false, // skipPostProcess
mockContext // execution context
)
@@ -158,7 +157,6 @@ describe('ApiBlockHandler', () => {
expect(mockExecuteTool).toHaveBeenCalledWith(
'http_request',
expect.objectContaining({ body: expectedParsedBody }),
- false, // skipProxy
false, // skipPostProcess
mockContext // execution context
)
@@ -175,7 +173,6 @@ describe('ApiBlockHandler', () => {
expect(mockExecuteTool).toHaveBeenCalledWith(
'http_request',
expect.objectContaining({ body: 'This is plain text' }),
- false, // skipProxy
false, // skipPostProcess
mockContext // execution context
)
@@ -192,7 +189,6 @@ describe('ApiBlockHandler', () => {
expect(mockExecuteTool).toHaveBeenCalledWith(
'http_request',
expect.objectContaining({ body: undefined }),
- false, // skipProxy
false, // skipPostProcess
mockContext // execution context
)
diff --git a/apps/sim/executor/handlers/api/api-handler.ts b/apps/sim/executor/handlers/api/api-handler.ts
index c8db117b88..775b886745 100644
--- a/apps/sim/executor/handlers/api/api-handler.ts
+++ b/apps/sim/executor/handlers/api/api-handler.ts
@@ -82,7 +82,6 @@ export class ApiBlockHandler implements BlockHandler {
},
},
false,
- false,
ctx
)
diff --git a/apps/sim/executor/handlers/condition/condition-handler.test.ts b/apps/sim/executor/handlers/condition/condition-handler.test.ts
index 1a022514df..f3c05c6470 100644
--- a/apps/sim/executor/handlers/condition/condition-handler.test.ts
+++ b/apps/sim/executor/handlers/condition/condition-handler.test.ts
@@ -201,7 +201,6 @@ describe('ConditionBlockHandler', () => {
},
}),
false,
- false,
mockContext
)
})
diff --git a/apps/sim/executor/handlers/condition/condition-handler.ts b/apps/sim/executor/handlers/condition/condition-handler.ts
index deac6c99a6..f450460589 100644
--- a/apps/sim/executor/handlers/condition/condition-handler.ts
+++ b/apps/sim/executor/handlers/condition/condition-handler.ts
@@ -44,7 +44,6 @@ export async function evaluateConditionExpression(
},
},
false,
- false,
ctx
)
diff --git a/apps/sim/executor/handlers/function/function-handler.test.ts b/apps/sim/executor/handlers/function/function-handler.test.ts
index 67e6e0939c..f04de4662b 100644
--- a/apps/sim/executor/handlers/function/function-handler.test.ts
+++ b/apps/sim/executor/handlers/function/function-handler.test.ts
@@ -84,7 +84,6 @@ describe('FunctionBlockHandler', () => {
expect(mockExecuteTool).toHaveBeenCalledWith(
'function_execute',
expectedToolParams,
- false, // skipProxy
false, // skipPostProcess
mockContext // execution context
)
@@ -117,7 +116,6 @@ describe('FunctionBlockHandler', () => {
expect(mockExecuteTool).toHaveBeenCalledWith(
'function_execute',
expectedToolParams,
- false, // skipProxy
false, // skipPostProcess
mockContext // execution context
)
@@ -142,7 +140,6 @@ describe('FunctionBlockHandler', () => {
expect(mockExecuteTool).toHaveBeenCalledWith(
'function_execute',
expectedToolParams,
- false, // skipProxy
false, // skipPostProcess
mockContext // execution context
)
diff --git a/apps/sim/executor/handlers/function/function-handler.ts b/apps/sim/executor/handlers/function/function-handler.ts
index cc8603760d..c7b9b00978 100644
--- a/apps/sim/executor/handlers/function/function-handler.ts
+++ b/apps/sim/executor/handlers/function/function-handler.ts
@@ -42,7 +42,6 @@ export class FunctionBlockHandler implements BlockHandler {
},
},
false,
- false,
ctx
)
diff --git a/apps/sim/executor/handlers/generic/generic-handler.test.ts b/apps/sim/executor/handlers/generic/generic-handler.test.ts
index 661c7a1244..3a107df40a 100644
--- a/apps/sim/executor/handlers/generic/generic-handler.test.ts
+++ b/apps/sim/executor/handlers/generic/generic-handler.test.ts
@@ -95,7 +95,6 @@ describe('GenericBlockHandler', () => {
expect(mockExecuteTool).toHaveBeenCalledWith(
'some_custom_tool',
expectedToolParams,
- false, // skipProxy
false, // skipPostProcess
mockContext // execution context
)
diff --git a/apps/sim/executor/handlers/generic/generic-handler.ts b/apps/sim/executor/handlers/generic/generic-handler.ts
index fc910eafad..558a37dee5 100644
--- a/apps/sim/executor/handlers/generic/generic-handler.ts
+++ b/apps/sim/executor/handlers/generic/generic-handler.ts
@@ -70,7 +70,6 @@ export class GenericBlockHandler implements BlockHandler {
},
},
false,
- false,
ctx
)
diff --git a/apps/sim/executor/handlers/human-in-the-loop/human-in-the-loop-handler.ts b/apps/sim/executor/handlers/human-in-the-loop/human-in-the-loop-handler.ts
index e1d31cc228..e7ba38543c 100644
--- a/apps/sim/executor/handlers/human-in-the-loop/human-in-the-loop-handler.ts
+++ b/apps/sim/executor/handlers/human-in-the-loop/human-in-the-loop-handler.ts
@@ -633,7 +633,7 @@ export class HumanInTheLoopBlockHandler implements BlockHandler {
blockNameMapping: blockNameMappingWithPause,
}
- const result = await executeTool(toolId, toolParams, false, false, ctx)
+ const result = await executeTool(toolId, toolParams, false, ctx)
const durationMs = Date.now() - startTime
if (!result.success) {
diff --git a/apps/sim/executor/types.ts b/apps/sim/executor/types.ts
index c0d96a81e5..27eaa0c2bc 100644
--- a/apps/sim/executor/types.ts
+++ b/apps/sim/executor/types.ts
@@ -11,6 +11,7 @@ export interface UserFile {
type: string
key: string
context?: string
+ base64?: string
}
export interface ParallelPauseScope {
@@ -236,6 +237,19 @@ export interface ExecutionContext {
// Dynamically added nodes that need to be scheduled (e.g., from parallel expansion)
pendingDynamicNodes?: string[]
+
+ /**
+ * When true, UserFile objects in block outputs will be hydrated with base64 content
+ * before being stored in execution state. This ensures base64 is available for
+ * variable resolution in downstream blocks.
+ */
+ includeFileBase64?: boolean
+
+ /**
+ * Maximum file size in bytes for base64 hydration. Files larger than this limit
+ * will not have their base64 content fetched.
+ */
+ base64MaxBytes?: number
}
export interface ExecutionResult {
diff --git a/apps/sim/executor/utils/start-block.ts b/apps/sim/executor/utils/start-block.ts
index 23163cc6d3..d18229d20e 100644
--- a/apps/sim/executor/utils/start-block.ts
+++ b/apps/sim/executor/utils/start-block.ts
@@ -1,4 +1,4 @@
-import { isUserFile } from '@/lib/core/utils/display-filters'
+import { isUserFileWithMetadata } from '@/lib/core/utils/user-file'
import {
classifyStartBlockType,
getLegacyStarterMode,
@@ -234,7 +234,7 @@ function getFilesFromWorkflowInput(workflowInput: unknown): UserFile[] | undefin
return undefined
}
const files = workflowInput.files
- if (Array.isArray(files) && files.every(isUserFile)) {
+ if (Array.isArray(files) && files.every(isUserFileWithMetadata)) {
return files
}
return undefined
diff --git a/apps/sim/executor/variables/resolvers/block.ts b/apps/sim/executor/variables/resolvers/block.ts
index 7b6b783e66..09904eed53 100644
--- a/apps/sim/executor/variables/resolvers/block.ts
+++ b/apps/sim/executor/variables/resolvers/block.ts
@@ -1,3 +1,4 @@
+import { USER_FILE_ACCESSIBLE_PROPERTIES } from '@/lib/workflows/types'
import {
isReference,
normalizeName,
@@ -20,11 +21,58 @@ function isPathInOutputSchema(
return true
}
+ const isFileArrayType = (value: any): boolean =>
+ value?.type === 'file[]' || value?.type === 'files'
+
let current: any = outputs
for (let i = 0; i < pathParts.length; i++) {
const part = pathParts[i]
+ const arrayMatch = part.match(/^([^[]+)\[(\d+)\]$/)
+ if (arrayMatch) {
+ const [, prop] = arrayMatch
+ let fieldDef: any
+
+ if (prop in current) {
+ fieldDef = current[prop]
+ } else if (current.properties && prop in current.properties) {
+ fieldDef = current.properties[prop]
+ } else if (current.type === 'array' && current.items) {
+ if (current.items.properties && prop in current.items.properties) {
+ fieldDef = current.items.properties[prop]
+ } else if (prop in current.items) {
+ fieldDef = current.items[prop]
+ }
+ }
+
+ if (!fieldDef) {
+ return false
+ }
+
+ if (isFileArrayType(fieldDef)) {
+ if (i + 1 < pathParts.length) {
+ return USER_FILE_ACCESSIBLE_PROPERTIES.includes(pathParts[i + 1] as any)
+ }
+ return true
+ }
+
+ if (fieldDef.type === 'array' && fieldDef.items) {
+ current = fieldDef.items
+ continue
+ }
+
+ current = fieldDef
+ continue
+ }
+
if (/^\d+$/.test(part)) {
+ if (isFileArrayType(current)) {
+ if (i + 1 < pathParts.length) {
+ const nextPart = pathParts[i + 1]
+ return USER_FILE_ACCESSIBLE_PROPERTIES.includes(nextPart as any)
+ }
+ return true
+ }
continue
}
@@ -33,7 +81,15 @@ function isPathInOutputSchema(
}
if (part in current) {
- current = current[part]
+ const nextCurrent = current[part]
+ if (nextCurrent?.type === 'file[]' && i + 1 < pathParts.length) {
+ const nextPart = pathParts[i + 1]
+ if (/^\d+$/.test(nextPart) && i + 2 < pathParts.length) {
+ const propertyPart = pathParts[i + 2]
+ return USER_FILE_ACCESSIBLE_PROPERTIES.includes(propertyPart as any)
+ }
+ }
+ current = nextCurrent
continue
}
@@ -53,6 +109,10 @@ function isPathInOutputSchema(
}
}
+ if (isFileArrayType(current) && USER_FILE_ACCESSIBLE_PROPERTIES.includes(part as any)) {
+ return true
+ }
+
if ('type' in current && typeof current.type === 'string') {
if (!current.properties && !current.items) {
return false
diff --git a/apps/sim/lib/core/security/input-validation.test.ts b/apps/sim/lib/core/security/input-validation.test.ts
index 7f455cb97e..c1979c1e3e 100644
--- a/apps/sim/lib/core/security/input-validation.test.ts
+++ b/apps/sim/lib/core/security/input-validation.test.ts
@@ -1,7 +1,6 @@
import { loggerMock } from '@sim/testing'
import { describe, expect, it, vi } from 'vitest'
import {
- createPinnedUrl,
validateAirtableId,
validateAlphanumericId,
validateEnum,
@@ -592,28 +591,6 @@ describe('validateUrlWithDNS', () => {
})
})
-describe('createPinnedUrl', () => {
- it('should replace hostname with IP', () => {
- const result = createPinnedUrl('https://example.com/api/data', '93.184.216.34')
- expect(result).toBe('https://93.184.216.34/api/data')
- })
-
- it('should preserve port if specified', () => {
- const result = createPinnedUrl('https://example.com:8443/api', '93.184.216.34')
- expect(result).toBe('https://93.184.216.34:8443/api')
- })
-
- it('should preserve query string', () => {
- const result = createPinnedUrl('https://example.com/api?foo=bar&baz=qux', '93.184.216.34')
- expect(result).toBe('https://93.184.216.34/api?foo=bar&baz=qux')
- })
-
- it('should preserve path', () => {
- const result = createPinnedUrl('https://example.com/a/b/c/d', '93.184.216.34')
- expect(result).toBe('https://93.184.216.34/a/b/c/d')
- })
-})
-
describe('validateInteger', () => {
describe('valid integers', () => {
it.concurrent('should accept positive integers', () => {
@@ -929,13 +906,13 @@ describe('validateExternalUrl', () => {
it.concurrent('should reject 127.0.0.1', () => {
const result = validateExternalUrl('https://127.0.0.1/api')
expect(result.isValid).toBe(false)
- expect(result.error).toContain('localhost')
+ expect(result.error).toContain('private IP')
})
it.concurrent('should reject 0.0.0.0', () => {
const result = validateExternalUrl('https://0.0.0.0/api')
expect(result.isValid).toBe(false)
- expect(result.error).toContain('localhost')
+ expect(result.error).toContain('private IP')
})
})
diff --git a/apps/sim/lib/core/security/input-validation.ts b/apps/sim/lib/core/security/input-validation.ts
index b5440ce166..f15b2412e8 100644
--- a/apps/sim/lib/core/security/input-validation.ts
+++ b/apps/sim/lib/core/security/input-validation.ts
@@ -1,5 +1,8 @@
import dns from 'dns/promises'
+import http from 'http'
+import https from 'https'
import { createLogger } from '@sim/logger'
+import * as ipaddr from 'ipaddr.js'
const logger = createLogger('InputValidation')
@@ -402,42 +405,20 @@ export function validateHostname(
}
}
- // Import the blocked IP ranges from url-validation
- const BLOCKED_IP_RANGES = [
- // Private IPv4 ranges (RFC 1918)
- /^10\./,
- /^172\.(1[6-9]|2[0-9]|3[01])\./,
- /^192\.168\./,
-
- // Loopback addresses
- /^127\./,
- /^localhost$/i,
-
- // Link-local addresses (RFC 3927)
- /^169\.254\./,
-
- // Cloud metadata endpoints
- /^169\.254\.169\.254$/,
-
- // Broadcast and other reserved ranges
- /^0\./,
- /^224\./,
- /^240\./,
- /^255\./,
-
- // IPv6 loopback and link-local
- /^::1$/,
- /^fe80:/i,
- /^::ffff:127\./i,
- /^::ffff:10\./i,
- /^::ffff:172\.(1[6-9]|2[0-9]|3[01])\./i,
- /^::ffff:192\.168\./i,
- ]
-
const lowerHostname = hostname.toLowerCase()
- for (const pattern of BLOCKED_IP_RANGES) {
- if (pattern.test(lowerHostname)) {
+ // Block localhost
+ if (lowerHostname === 'localhost') {
+ logger.warn('Hostname is localhost', { paramName })
+ return {
+ isValid: false,
+ error: `${paramName} cannot be a private IP address or localhost`,
+ }
+ }
+
+ // Use ipaddr.js to check if hostname is an IP and if it's private/reserved
+ if (ipaddr.isValid(lowerHostname)) {
+ if (isPrivateOrReservedIP(lowerHostname)) {
logger.warn('Hostname matches blocked IP range', {
paramName,
hostname: hostname.substring(0, 100),
@@ -710,33 +691,17 @@ export function validateExternalUrl(
// Block private IP ranges and localhost
const hostname = parsedUrl.hostname.toLowerCase()
- // Block localhost variations
- if (
- hostname === 'localhost' ||
- hostname === '127.0.0.1' ||
- hostname === '::1' ||
- hostname.startsWith('127.') ||
- hostname === '0.0.0.0'
- ) {
+ // Block localhost
+ if (hostname === 'localhost') {
return {
isValid: false,
error: `${paramName} cannot point to localhost`,
}
}
- // Block private IP ranges
- const privateIpPatterns = [
- /^10\./,
- /^172\.(1[6-9]|2[0-9]|3[0-1])\./,
- /^192\.168\./,
- /^169\.254\./, // Link-local
- /^fe80:/i, // IPv6 link-local
- /^fc00:/i, // IPv6 unique local
- /^fd00:/i, // IPv6 unique local
- ]
-
- for (const pattern of privateIpPatterns) {
- if (pattern.test(hostname)) {
+ // Use ipaddr.js to check if hostname is an IP and if it's private/reserved
+ if (ipaddr.isValid(hostname)) {
+ if (isPrivateOrReservedIP(hostname)) {
return {
isValid: false,
error: `${paramName} cannot point to private IP addresses`,
@@ -791,30 +756,25 @@ export function validateProxyUrl(
/**
* Checks if an IP address is private or reserved (not routable on the public internet)
+ * Uses ipaddr.js for robust handling of all IP formats including:
+ * - Octal notation (0177.0.0.1)
+ * - Hex notation (0x7f000001)
+ * - IPv4-mapped IPv6 (::ffff:127.0.0.1)
+ * - Various edge cases that regex patterns miss
*/
function isPrivateOrReservedIP(ip: string): boolean {
- const patterns = [
- /^127\./, // Loopback
- /^10\./, // Private Class A
- /^172\.(1[6-9]|2[0-9]|3[0-1])\./, // Private Class B
- /^192\.168\./, // Private Class C
- /^169\.254\./, // Link-local
- /^0\./, // Current network
- /^100\.(6[4-9]|[7-9][0-9]|1[0-1][0-9]|12[0-7])\./, // Carrier-grade NAT
- /^192\.0\.0\./, // IETF Protocol Assignments
- /^192\.0\.2\./, // TEST-NET-1
- /^198\.51\.100\./, // TEST-NET-2
- /^203\.0\.113\./, // TEST-NET-3
- /^224\./, // Multicast
- /^240\./, // Reserved
- /^255\./, // Broadcast
- /^::1$/, // IPv6 loopback
- /^fe80:/i, // IPv6 link-local
- /^fc00:/i, // IPv6 unique local
- /^fd00:/i, // IPv6 unique local
- /^::ffff:(127\.|10\.|172\.(1[6-9]|2[0-9]|3[0-1])\.|192\.168\.|169\.254\.)/i, // IPv4-mapped IPv6
- ]
- return patterns.some((pattern) => pattern.test(ip))
+ try {
+ if (!ipaddr.isValid(ip)) {
+ return true
+ }
+
+ const addr = ipaddr.process(ip)
+ const range = addr.range()
+
+ return range !== 'unicast'
+ } catch {
+ return true
+ }
}
/**
@@ -882,18 +842,194 @@ export async function validateUrlWithDNS(
}
}
}
+export interface SecureFetchOptions {
+ method?: string
+ headers?: Record
+ body?: string
+ timeout?: number
+ maxRedirects?: number
+}
+
+export class SecureFetchHeaders {
+ private headers: Map
+
+ constructor(headers: Record) {
+ this.headers = new Map(Object.entries(headers).map(([k, v]) => [k.toLowerCase(), v]))
+ }
+
+ get(name: string): string | null {
+ return this.headers.get(name.toLowerCase()) ?? null
+ }
+
+ toRecord(): Record {
+ const record: Record = {}
+ for (const [key, value] of this.headers) {
+ record[key] = value
+ }
+ return record
+ }
+
+ [Symbol.iterator]() {
+ return this.headers.entries()
+ }
+}
+
+export interface SecureFetchResponse {
+ ok: boolean
+ status: number
+ statusText: string
+ headers: SecureFetchHeaders
+ text: () => Promise
+ json: () => Promise
+ arrayBuffer: () => Promise
+}
+
+const DEFAULT_MAX_REDIRECTS = 5
+
+function isRedirectStatus(status: number): boolean {
+ return status >= 300 && status < 400 && status !== 304
+}
+
+function resolveRedirectUrl(baseUrl: string, location: string): string {
+ try {
+ return new URL(location, baseUrl).toString()
+ } catch {
+ throw new Error(`Invalid redirect location: ${location}`)
+ }
+}
/**
- * Creates a fetch URL that uses a resolved IP address to prevent DNS rebinding
- *
- * @param originalUrl - The original URL
- * @param resolvedIP - The resolved IP address to use
- * @returns The URL with IP substituted for hostname
+ * Performs a fetch with IP pinning to prevent DNS rebinding attacks.
+ * Uses the pre-resolved IP address while preserving the original hostname for TLS SNI.
+ * Follows redirects securely by validating each redirect target.
*/
-export function createPinnedUrl(originalUrl: string, resolvedIP: string): string {
- const parsed = new URL(originalUrl)
- const port = parsed.port ? `:${parsed.port}` : ''
- return `${parsed.protocol}//${resolvedIP}${port}${parsed.pathname}${parsed.search}`
+export async function secureFetchWithPinnedIP(
+ url: string,
+ resolvedIP: string,
+ options: SecureFetchOptions = {},
+ redirectCount = 0
+): Promise {
+ const maxRedirects = options.maxRedirects ?? DEFAULT_MAX_REDIRECTS
+
+ return new Promise((resolve, reject) => {
+ const parsed = new URL(url)
+ const isHttps = parsed.protocol === 'https:'
+ const defaultPort = isHttps ? 443 : 80
+ const port = parsed.port ? Number.parseInt(parsed.port, 10) : defaultPort
+
+ const isIPv6 = resolvedIP.includes(':')
+ const family = isIPv6 ? 6 : 4
+
+ const agentOptions = {
+ lookup: (
+ _hostname: string,
+ _options: unknown,
+ callback: (err: NodeJS.ErrnoException | null, address: string, family: number) => void
+ ) => {
+ callback(null, resolvedIP, family)
+ },
+ }
+
+ const agent = isHttps
+ ? new https.Agent(agentOptions as https.AgentOptions)
+ : new http.Agent(agentOptions as http.AgentOptions)
+
+ const requestOptions: http.RequestOptions = {
+ hostname: parsed.hostname,
+ port,
+ path: parsed.pathname + parsed.search,
+ method: options.method || 'GET',
+ headers: options.headers || {},
+ agent,
+ timeout: options.timeout || 30000,
+ }
+
+ const protocol = isHttps ? https : http
+ const req = protocol.request(requestOptions, (res) => {
+ const statusCode = res.statusCode || 0
+ const location = res.headers.location
+
+ if (isRedirectStatus(statusCode) && location && redirectCount < maxRedirects) {
+ res.resume()
+ const redirectUrl = resolveRedirectUrl(url, location)
+
+ validateUrlWithDNS(redirectUrl, 'redirectUrl')
+ .then((validation) => {
+ if (!validation.isValid) {
+ reject(new Error(`Redirect blocked: ${validation.error}`))
+ return
+ }
+ return secureFetchWithPinnedIP(
+ redirectUrl,
+ validation.resolvedIP!,
+ options,
+ redirectCount + 1
+ )
+ })
+ .then((response) => {
+ if (response) resolve(response)
+ })
+ .catch(reject)
+ return
+ }
+
+ if (isRedirectStatus(statusCode) && location && redirectCount >= maxRedirects) {
+ res.resume()
+ reject(new Error(`Too many redirects (max: ${maxRedirects})`))
+ return
+ }
+
+ const chunks: Buffer[] = []
+
+ res.on('data', (chunk: Buffer) => chunks.push(chunk))
+
+ res.on('error', (error) => {
+ reject(error)
+ })
+
+ res.on('end', () => {
+ const bodyBuffer = Buffer.concat(chunks)
+ const body = bodyBuffer.toString('utf-8')
+ const headersRecord: Record = {}
+ for (const [key, value] of Object.entries(res.headers)) {
+ if (typeof value === 'string') {
+ headersRecord[key.toLowerCase()] = value
+ } else if (Array.isArray(value)) {
+ headersRecord[key.toLowerCase()] = value.join(', ')
+ }
+ }
+
+ resolve({
+ ok: statusCode >= 200 && statusCode < 300,
+ status: statusCode,
+ statusText: res.statusMessage || '',
+ headers: new SecureFetchHeaders(headersRecord),
+ text: async () => body,
+ json: async () => JSON.parse(body),
+ arrayBuffer: async () =>
+ bodyBuffer.buffer.slice(
+ bodyBuffer.byteOffset,
+ bodyBuffer.byteOffset + bodyBuffer.byteLength
+ ),
+ })
+ })
+ })
+
+ req.on('error', (error) => {
+ reject(error)
+ })
+
+ req.on('timeout', () => {
+ req.destroy()
+ reject(new Error('Request timeout'))
+ })
+
+ if (options.body) {
+ req.write(options.body)
+ }
+
+ req.end()
+ })
}
/**
diff --git a/apps/sim/lib/core/security/redaction.test.ts b/apps/sim/lib/core/security/redaction.test.ts
index dc68d3d597..b5a3c0896a 100644
--- a/apps/sim/lib/core/security/redaction.test.ts
+++ b/apps/sim/lib/core/security/redaction.test.ts
@@ -1,11 +1,13 @@
import { describe, expect, it } from 'vitest'
import {
+ isLargeDataKey,
isSensitiveKey,
REDACTED_MARKER,
redactApiKeys,
redactSensitiveValues,
sanitizeEventData,
sanitizeForLogging,
+ TRUNCATED_MARKER,
} from './redaction'
/**
@@ -18,6 +20,24 @@ describe('REDACTED_MARKER', () => {
})
})
+describe('TRUNCATED_MARKER', () => {
+ it.concurrent('should be the standard marker', () => {
+ expect(TRUNCATED_MARKER).toBe('[TRUNCATED]')
+ })
+})
+
+describe('isLargeDataKey', () => {
+ it.concurrent('should identify base64 as large data key', () => {
+ expect(isLargeDataKey('base64')).toBe(true)
+ })
+
+ it.concurrent('should not identify other keys as large data', () => {
+ expect(isLargeDataKey('content')).toBe(false)
+ expect(isLargeDataKey('data')).toBe(false)
+ expect(isLargeDataKey('base')).toBe(false)
+ })
+})
+
describe('isSensitiveKey', () => {
describe('exact matches', () => {
it.concurrent('should match apiKey variations', () => {
@@ -234,6 +254,80 @@ describe('redactApiKeys', () => {
expect(result.config.database.password).toBe('[REDACTED]')
expect(result.config.database.host).toBe('localhost')
})
+
+ it.concurrent('should truncate base64 fields', () => {
+ const obj = {
+ id: 'file-123',
+ name: 'document.pdf',
+ base64: 'VGhpcyBpcyBhIHZlcnkgbG9uZyBiYXNlNjQgc3RyaW5n...',
+ size: 12345,
+ }
+
+ const result = redactApiKeys(obj)
+
+ expect(result.id).toBe('file-123')
+ expect(result.name).toBe('document.pdf')
+ expect(result.base64).toBe('[TRUNCATED]')
+ expect(result.size).toBe(12345)
+ })
+
+ it.concurrent('should truncate base64 in nested UserFile objects', () => {
+ const obj = {
+ files: [
+ {
+ id: 'file-1',
+ name: 'doc1.pdf',
+ url: 'http://example.com/file1',
+ size: 1000,
+ base64: 'base64content1...',
+ },
+ {
+ id: 'file-2',
+ name: 'doc2.pdf',
+ url: 'http://example.com/file2',
+ size: 2000,
+ base64: 'base64content2...',
+ },
+ ],
+ }
+
+ const result = redactApiKeys(obj)
+
+ expect(result.files[0].id).toBe('file-1')
+ expect(result.files[0].base64).toBe('[TRUNCATED]')
+ expect(result.files[1].base64).toBe('[TRUNCATED]')
+ })
+
+ it.concurrent('should filter UserFile objects to only expose allowed fields', () => {
+ const obj = {
+ processedFiles: [
+ {
+ id: 'file-123',
+ name: 'document.pdf',
+ url: 'http://localhost/api/files/serve/...',
+ size: 12345,
+ type: 'application/pdf',
+ key: 'execution/workspace/workflow/file.pdf',
+ context: 'execution',
+ base64: 'VGhpcyBpcyBhIGJhc2U2NCBzdHJpbmc=',
+ },
+ ],
+ }
+
+ const result = redactApiKeys(obj)
+
+ // Exposed fields should be present
+ expect(result.processedFiles[0].id).toBe('file-123')
+ expect(result.processedFiles[0].name).toBe('document.pdf')
+ expect(result.processedFiles[0].url).toBe('http://localhost/api/files/serve/...')
+ expect(result.processedFiles[0].size).toBe(12345)
+ expect(result.processedFiles[0].type).toBe('application/pdf')
+ expect(result.processedFiles[0].base64).toBe('[TRUNCATED]')
+
+ // Internal fields should be filtered out
+ expect(result.processedFiles[0]).not.toHaveProperty('key')
+ expect(result.processedFiles[0]).not.toHaveProperty('context')
+ })
})
describe('primitive handling', () => {
diff --git a/apps/sim/lib/core/security/redaction.ts b/apps/sim/lib/core/security/redaction.ts
index 92241cc4d1..d29bd0264e 100644
--- a/apps/sim/lib/core/security/redaction.ts
+++ b/apps/sim/lib/core/security/redaction.ts
@@ -2,10 +2,16 @@
* Centralized redaction utilities for sensitive data
*/
+import { filterUserFileForDisplay, isUserFile } from '@/lib/core/utils/user-file'
+
export const REDACTED_MARKER = '[REDACTED]'
+export const TRUNCATED_MARKER = '[TRUNCATED]'
const BYPASS_REDACTION_KEYS = new Set(['nextPageToken'])
+/** Keys that contain large binary/encoded data that should be truncated in logs */
+const LARGE_DATA_KEYS = new Set(['base64'])
+
const SENSITIVE_KEY_PATTERNS: RegExp[] = [
/^api[_-]?key$/i,
/^access[_-]?token$/i,
@@ -88,6 +94,10 @@ export function redactSensitiveValues(value: string): string {
return result
}
+export function isLargeDataKey(key: string): boolean {
+ return LARGE_DATA_KEYS.has(key)
+}
+
export function redactApiKeys(obj: any): any {
if (obj === null || obj === undefined) {
return obj
@@ -101,11 +111,26 @@ export function redactApiKeys(obj: any): any {
return obj.map((item) => redactApiKeys(item))
}
+ if (isUserFile(obj)) {
+ const filtered = filterUserFileForDisplay(obj)
+ const result: Record = {}
+ for (const [key, value] of Object.entries(filtered)) {
+ if (isLargeDataKey(key) && typeof value === 'string') {
+ result[key] = TRUNCATED_MARKER
+ } else {
+ result[key] = value
+ }
+ }
+ return result
+ }
+
const result: Record = {}
for (const [key, value] of Object.entries(obj)) {
if (isSensitiveKey(key)) {
result[key] = REDACTED_MARKER
+ } else if (isLargeDataKey(key) && typeof value === 'string') {
+ result[key] = TRUNCATED_MARKER
} else if (typeof value === 'object' && value !== null) {
result[key] = redactApiKeys(value)
} else {
diff --git a/apps/sim/lib/core/utils/display-filters.ts b/apps/sim/lib/core/utils/display-filters.ts
index 21194e48a0..e801c1d4d9 100644
--- a/apps/sim/lib/core/utils/display-filters.ts
+++ b/apps/sim/lib/core/utils/display-filters.ts
@@ -1,3 +1,5 @@
+import { filterUserFileForDisplay, isUserFile } from '@/lib/core/utils/user-file'
+
const MAX_STRING_LENGTH = 15000
const MAX_DEPTH = 50
@@ -8,32 +10,9 @@ function truncateString(value: string, maxLength = MAX_STRING_LENGTH): string {
return `${value.substring(0, maxLength)}... [truncated ${value.length - maxLength} chars]`
}
-export function isUserFile(candidate: unknown): candidate is {
- id: string
- name: string
- url: string
- key: string
- size: number
- type: string
- context?: string
-} {
- if (!candidate || typeof candidate !== 'object') {
- return false
- }
-
- const value = candidate as Record
- return (
- typeof value.id === 'string' &&
- typeof value.key === 'string' &&
- typeof value.url === 'string' &&
- typeof value.name === 'string'
- )
-}
-
function filterUserFile(data: any): any {
if (isUserFile(data)) {
- const { id, name, url, size, type } = data
- return { id, name, url, size, type }
+ return filterUserFileForDisplay(data)
}
return data
}
diff --git a/apps/sim/lib/core/utils/user-file.ts b/apps/sim/lib/core/utils/user-file.ts
new file mode 100644
index 0000000000..f2b0340477
--- /dev/null
+++ b/apps/sim/lib/core/utils/user-file.ts
@@ -0,0 +1,57 @@
+import type { UserFile } from '@/executor/types'
+
+export type UserFileLike = Pick &
+ Partial>
+
+/**
+ * Fields exposed for UserFile objects in UI (tag dropdown) and logs.
+ * Internal fields like 'key' and 'context' are not exposed.
+ */
+export const USER_FILE_DISPLAY_FIELDS = ['id', 'name', 'url', 'size', 'type', 'base64'] as const
+
+export type UserFileDisplayField = (typeof USER_FILE_DISPLAY_FIELDS)[number]
+
+/**
+ * Checks if a value matches the minimal UserFile shape.
+ */
+export function isUserFile(value: unknown): value is UserFileLike {
+ if (!value || typeof value !== 'object') {
+ return false
+ }
+
+ const candidate = value as Record
+
+ return (
+ typeof candidate.id === 'string' &&
+ typeof candidate.key === 'string' &&
+ typeof candidate.url === 'string' &&
+ typeof candidate.name === 'string'
+ )
+}
+
+/**
+ * Checks if a value matches the full UserFile metadata shape.
+ */
+export function isUserFileWithMetadata(value: unknown): value is UserFile {
+ if (!isUserFile(value)) {
+ return false
+ }
+
+ const candidate = value as Record
+
+ return typeof candidate.size === 'number' && typeof candidate.type === 'string'
+}
+
+/**
+ * Filters a UserFile object to only include display fields.
+ * Used for both UI display and log sanitization.
+ */
+export function filterUserFileForDisplay(data: Record): Record {
+ const filtered: Record = {}
+ for (const field of USER_FILE_DISPLAY_FIELDS) {
+ if (field in data) {
+ filtered[field] = data[field]
+ }
+ }
+ return filtered
+}
diff --git a/apps/sim/lib/uploads/contexts/execution/execution-file-manager.ts b/apps/sim/lib/uploads/contexts/execution/execution-file-manager.ts
index 8f86950c9c..bbf2a123eb 100644
--- a/apps/sim/lib/uploads/contexts/execution/execution-file-manager.ts
+++ b/apps/sim/lib/uploads/contexts/execution/execution-file-manager.ts
@@ -1,5 +1,5 @@
import { createLogger } from '@sim/logger'
-import { isUserFile } from '@/lib/core/utils/display-filters'
+import { isUserFileWithMetadata } from '@/lib/core/utils/user-file'
import type { ExecutionContext } from '@/lib/uploads/contexts/execution/utils'
import { generateExecutionFileKey, generateFileId } from '@/lib/uploads/contexts/execution/utils'
import type { UserFile } from '@/executor/types'
@@ -169,7 +169,7 @@ export async function uploadFileFromRawData(
context: ExecutionContext,
userId?: string
): Promise {
- if (isUserFile(rawData)) {
+ if (isUserFileWithMetadata(rawData)) {
return rawData
}
diff --git a/apps/sim/lib/uploads/utils/file-utils.server.ts b/apps/sim/lib/uploads/utils/file-utils.server.ts
index b896853bfe..c2f14e97e2 100644
--- a/apps/sim/lib/uploads/utils/file-utils.server.ts
+++ b/apps/sim/lib/uploads/utils/file-utils.server.ts
@@ -1,6 +1,7 @@
'use server'
import type { Logger } from '@sim/logger'
+import { secureFetchWithPinnedIP, validateUrlWithDNS } from '@/lib/core/security/input-validation'
import type { StorageContext } from '@/lib/uploads'
import { isExecutionFile } from '@/lib/uploads/contexts/execution/utils'
import { inferContextFromKey } from '@/lib/uploads/utils/file-utils'
@@ -9,38 +10,32 @@ import type { UserFile } from '@/executor/types'
/**
* Download a file from a URL (internal or external)
* For internal URLs, uses direct storage access (server-side only)
- * For external URLs, uses HTTP fetch
+ * For external URLs, validates DNS/SSRF and uses secure fetch with IP pinning
*/
export async function downloadFileFromUrl(fileUrl: string, timeoutMs = 180000): Promise {
const { isInternalFileUrl } = await import('./file-utils')
const { parseInternalFileUrl } = await import('./file-utils')
- const controller = new AbortController()
- const timeoutId = setTimeout(() => controller.abort(), timeoutMs)
- try {
- if (isInternalFileUrl(fileUrl)) {
- const { key, context } = parseInternalFileUrl(fileUrl)
- const { downloadFile } = await import('@/lib/uploads/core/storage-service')
- const buffer = await downloadFile({ key, context })
- clearTimeout(timeoutId)
- return buffer
- }
+ if (isInternalFileUrl(fileUrl)) {
+ const { key, context } = parseInternalFileUrl(fileUrl)
+ const { downloadFile } = await import('@/lib/uploads/core/storage-service')
+ return downloadFile({ key, context })
+ }
- const response = await fetch(fileUrl, { signal: controller.signal })
- clearTimeout(timeoutId)
+ const urlValidation = await validateUrlWithDNS(fileUrl, 'fileUrl')
+ if (!urlValidation.isValid) {
+ throw new Error(`Invalid file URL: ${urlValidation.error}`)
+ }
- if (!response.ok) {
- throw new Error(`Failed to download file: ${response.statusText}`)
- }
+ const response = await secureFetchWithPinnedIP(fileUrl, urlValidation.resolvedIP!, {
+ timeout: timeoutMs,
+ })
- return Buffer.from(await response.arrayBuffer())
- } catch (error) {
- clearTimeout(timeoutId)
- if (error instanceof Error && error.name === 'AbortError') {
- throw new Error('File download timed out')
- }
- throw error
+ if (!response.ok) {
+ throw new Error(`Failed to download file: ${response.statusText}`)
}
+
+ return Buffer.from(await response.arrayBuffer())
}
/**
diff --git a/apps/sim/lib/uploads/utils/user-file-base64.server.ts b/apps/sim/lib/uploads/utils/user-file-base64.server.ts
new file mode 100644
index 0000000000..33f7e62591
--- /dev/null
+++ b/apps/sim/lib/uploads/utils/user-file-base64.server.ts
@@ -0,0 +1,319 @@
+import type { Logger } from '@sim/logger'
+import { createLogger } from '@sim/logger'
+import { getRedisClient } from '@/lib/core/config/redis'
+import { isUserFileWithMetadata } from '@/lib/core/utils/user-file'
+import { bufferToBase64 } from '@/lib/uploads/utils/file-utils'
+import { downloadFileFromStorage, downloadFileFromUrl } from '@/lib/uploads/utils/file-utils.server'
+import type { UserFile } from '@/executor/types'
+
+const DEFAULT_MAX_BASE64_BYTES = 10 * 1024 * 1024
+const DEFAULT_TIMEOUT_MS = 180000
+const DEFAULT_CACHE_TTL_SECONDS = 300
+const REDIS_KEY_PREFIX = 'user-file:base64:'
+
+interface Base64Cache {
+ get(file: UserFile): Promise
+ set(file: UserFile, value: string, ttlSeconds: number): Promise
+}
+
+interface HydrationState {
+ seen: WeakSet
-
+
### Build Workflows with Ease
diff --git a/apps/docs/components/icons.tsx b/apps/docs/components/icons.tsx
index 1c245ffafd..689dbb50a5 100644
--- a/apps/docs/components/icons.tsx
+++ b/apps/docs/components/icons.tsx
@@ -4093,6 +4093,23 @@ export function SQSIcon(props: SVGProps) {
)
}
+export function TextractIcon(props: SVGProps) {
+ return (
+
+ )
+}
+
export function McpIcon(props: SVGProps) {
return (