Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .changeset/bright-keys-shine.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
"@trigger.dev/sdk": patch
"@trigger.dev/core": patch
---

Expose user-provided idempotency key and scope in task context. `ctx.run.idempotencyKey` now returns the original key passed to `idempotencyKeys.create()` instead of the hash, and `ctx.run.idempotencyKeyScope` shows the scope ("run", "attempt", or "global").
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import {
logger,
} from "@trigger.dev/core/v3";
import { parsePacketAsJson } from "@trigger.dev/core/v3/utils/ioSerialization";
import { getUserProvidedIdempotencyKey } from "@trigger.dev/core/v3/serverOnly";
import { Prisma, TaskRunAttemptStatus, TaskRunStatus } from "@trigger.dev/database";
import assertNever from "assert-never";
import { API_VERSIONS, CURRENT_API_VERSION, RunStatusUnspecifiedApiVersion } from "~/api/versions";
Expand Down Expand Up @@ -38,6 +39,7 @@ const commonRunSelect = {
baseCostInCents: true,
usageDurationMs: true,
idempotencyKey: true,
idempotencyKeyOptions: true,
isTest: true,
depth: true,
scheduleId: true,
Expand Down Expand Up @@ -442,7 +444,7 @@ async function createCommonRunStructure(run: CommonRelatedRun, apiVersion: API_V
return {
id: run.friendlyId,
taskIdentifier: run.taskIdentifier,
idempotencyKey: run.idempotencyKey ?? undefined,
idempotencyKey: getUserProvidedIdempotencyKey(run),
version: run.lockedToVersion?.version,
status: ApiRetrieveRunPresenter.apiStatusFromRunStatus(run.status, apiVersion),
createdAt: run.createdAt,
Expand Down
7 changes: 5 additions & 2 deletions apps/webapp/app/presenters/v3/SpanPresenter.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import {
type V3TaskRunContext,
} from "@trigger.dev/core/v3";
import { AttemptId, getMaxDuration, parseTraceparent } from "@trigger.dev/core/v3/isomorphic";
import { getUserProvidedIdempotencyKey } from "@trigger.dev/core/v3/serverOnly";
import { RUNNING_STATUSES } from "~/components/runs/v3/TaskRunStatus";
import { logger } from "~/services/logger.server";
import { rehydrateAttribute } from "~/v3/eventRepository/eventRepository.server";
Expand Down Expand Up @@ -229,7 +230,7 @@ export class SpanPresenter extends BasePresenter {
isTest: run.isTest,
replayedFromTaskRunFriendlyId: run.replayedFromTaskRunFriendlyId,
environmentId: run.runtimeEnvironment.id,
idempotencyKey: run.idempotencyKey,
idempotencyKey: getUserProvidedIdempotencyKey(run),
idempotencyKeyExpiresAt: run.idempotencyKeyExpiresAt,
debounce: run.debounce as { key: string; delay: string; createdAt: Date } | null,
schedule: await this.resolveSchedule(run.scheduleId ?? undefined),
Expand Down Expand Up @@ -355,6 +356,7 @@ export class SpanPresenter extends BasePresenter {
//idempotency
idempotencyKey: true,
idempotencyKeyExpiresAt: true,
idempotencyKeyOptions: true,
//debounce
debounce: true,
//delayed
Expand Down Expand Up @@ -644,7 +646,7 @@ export class SpanPresenter extends BasePresenter {
createdAt: run.createdAt,
tags: run.runTags,
isTest: run.isTest,
idempotencyKey: run.idempotencyKey ?? undefined,
idempotencyKey: getUserProvidedIdempotencyKey(run) ?? undefined,
startedAt: run.startedAt ?? run.createdAt,
durationMs: run.usageDurationMs,
costInCents: run.costInCents,
Expand Down Expand Up @@ -704,4 +706,5 @@ export class SpanPresenter extends BasePresenter {

return parsedTraceparent?.traceId;
}

}
1 change: 1 addition & 0 deletions apps/webapp/app/runEngine/services/triggerTask.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -304,6 +304,7 @@ export class RunEngineTriggerTaskService {
environment: environment,
idempotencyKey,
idempotencyKeyExpiresAt: idempotencyKey ? idempotencyKeyExpiresAt : undefined,
idempotencyKeyOptions: body.options?.idempotencyKeyOptions,
taskIdentifier: taskId,
payload: payloadPacket.data ?? "",
payloadType: payloadPacket.dataType,
Expand Down
4 changes: 4 additions & 0 deletions apps/webapp/app/services/runsReplicationService.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import {
import { Logger, type LogLevel } from "@trigger.dev/core/logger";
import { tryCatch } from "@trigger.dev/core/utils";
import { parsePacketAsJson } from "@trigger.dev/core/v3/utils/ioSerialization";
import { extractIdempotencyKeyUser, getIdempotencyKeyScope } from "@trigger.dev/core/v3/serverOnly";
import { type TaskRun } from "@trigger.dev/database";
import { nanoid } from "nanoid";
import EventEmitter from "node:events";
Expand Down Expand Up @@ -891,6 +892,8 @@ export class RunsReplicationService {
run.spanId, // span_id
run.traceId, // trace_id
run.idempotencyKey ?? "", // idempotency_key
extractIdempotencyKeyUser(run) ?? "", // idempotency_key_user
getIdempotencyKeyScope(run) ?? "", // idempotency_key_scope
run.ttl ?? "", // expiration_ttl
run.isTest ?? false, // is_test
_version.toString(), // _version
Expand Down Expand Up @@ -951,6 +954,7 @@ export class RunsReplicationService {

return { data: parsedData };
}

}

export type ConcurrentFlushSchedulerConfig<T> = {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
-- +goose Up

-- Add columns for storing user-provided idempotency key and scope for searching
ALTER TABLE trigger_dev.task_runs_v2
ADD COLUMN idempotency_key_user String DEFAULT '';

ALTER TABLE trigger_dev.task_runs_v2
ADD COLUMN idempotency_key_scope String DEFAULT '';

-- +goose Down

ALTER TABLE trigger_dev.task_runs_v2
DROP COLUMN idempotency_key_user;

ALTER TABLE trigger_dev.task_runs_v2
DROP COLUMN idempotency_key_scope;
8 changes: 8 additions & 0 deletions internal-packages/clickhouse/src/taskRuns.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ describe("Task Runs V2", () => {
"span_1234", // span_id
"trace_1234", // trace_id
"idempotency_key_1234", // idempotency_key
"my-user-key", // idempotency_key_user
"run", // idempotency_key_scope
"1h", // expiration_ttl
true, // is_test
"1", // _version
Expand Down Expand Up @@ -189,6 +191,8 @@ describe("Task Runs V2", () => {
"538677637f937f54", // span_id
"20a28486b0b9f50c647b35e8863e36a5", // trace_id
"", // idempotency_key
"", // idempotency_key_user
"", // idempotency_key_scope
"", // expiration_ttl
true, // is_test
"1", // _version
Expand Down Expand Up @@ -237,6 +241,8 @@ describe("Task Runs V2", () => {
"538677637f937f54", // span_id
"20a28486b0b9f50c647b35e8863e36a5", // trace_id
"", // idempotency_key
"", // idempotency_key_user
"", // idempotency_key_scope
"", // expiration_ttl
true, // is_test
"2", // _version
Expand Down Expand Up @@ -332,6 +338,8 @@ describe("Task Runs V2", () => {
"538677637f937f54", // span_id
"20a28486b0b9f50c647b35e8863e36a5", // trace_id
"", // idempotency_key
"", // idempotency_key_user
"", // idempotency_key_scope
"", // expiration_ttl
true, // is_test
"1", // _version
Expand Down
8 changes: 8 additions & 0 deletions internal-packages/clickhouse/src/taskRuns.ts
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ export const TaskRunV2 = z.object({
span_id: z.string(),
trace_id: z.string(),
idempotency_key: z.string(),
idempotency_key_user: z.string().default(""),
idempotency_key_scope: z.string().default(""),
expiration_ttl: z.string(),
is_test: z.boolean().default(false),
concurrency_key: z.string().default(""),
Expand Down Expand Up @@ -91,6 +93,8 @@ export const TASK_RUN_COLUMNS = [
"span_id",
"trace_id",
"idempotency_key",
"idempotency_key_user",
"idempotency_key_scope",
"expiration_ttl",
"is_test",
"_version",
Expand Down Expand Up @@ -151,6 +155,8 @@ export type TaskRunFieldTypes = {
span_id: string;
trace_id: string;
idempotency_key: string;
idempotency_key_user: string;
idempotency_key_scope: string;
expiration_ttl: string;
is_test: boolean;
_version: string;
Expand Down Expand Up @@ -282,6 +288,8 @@ export type TaskRunInsertArray = [
span_id: string,
trace_id: string,
idempotency_key: string,
idempotency_key_user: string,
idempotency_key_scope: string,
expiration_ttl: string,
is_test: boolean,
_version: string,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
-- AlterTable
ALTER TABLE "TaskRun" ADD COLUMN "idempotencyKeyOptions" JSONB;
2 changes: 2 additions & 0 deletions internal-packages/database/prisma/schema.prisma
Original file line number Diff line number Diff line change
Expand Up @@ -589,6 +589,8 @@ model TaskRun {

idempotencyKey String?
idempotencyKeyExpiresAt DateTime?
/// Stores the user-provided key and scope: { key: string, scope: "run" | "attempt" | "global" }
idempotencyKeyOptions Json?

/// Debounce options: { key: string, delay: string, createdAt: Date }
debounce Json?
Expand Down
2 changes: 2 additions & 0 deletions internal-packages/run-engine/src/engine/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -395,6 +395,7 @@ export class RunEngine {
environment,
idempotencyKey,
idempotencyKeyExpiresAt,
idempotencyKeyOptions,
taskIdentifier,
payload,
payloadType,
Expand Down Expand Up @@ -544,6 +545,7 @@ export class RunEngine {
projectId: environment.project.id,
idempotencyKey,
idempotencyKeyExpiresAt,
idempotencyKeyOptions,
taskIdentifier,
payload,
payloadType,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,10 @@ import {
TaskRunInternalError,
TaskRunSuccessfulExecutionResult,
} from "@trigger.dev/core/v3/schemas";
import {
getIdempotencyKeyScope,
getUserProvidedIdempotencyKey,
} from "@trigger.dev/core/v3/serverOnly";
import { parsePacket } from "@trigger.dev/core/v3/utils/ioSerialization";
import {
$transaction,
Expand Down Expand Up @@ -194,6 +198,7 @@ export class RunAttemptSystem {
runTags: true,
isTest: true,
idempotencyKey: true,
idempotencyKeyOptions: true,
startedAt: true,
maxAttempts: true,
taskVersion: true,
Expand Down Expand Up @@ -261,7 +266,8 @@ export class RunAttemptSystem {
isTest: run.isTest,
createdAt: run.createdAt,
startedAt: run.startedAt ?? run.createdAt,
idempotencyKey: run.idempotencyKey ?? undefined,
idempotencyKey: getUserProvidedIdempotencyKey(run) ?? undefined,
idempotencyKeyScope: getIdempotencyKeyScope(run),
maxAttempts: run.maxAttempts ?? undefined,
version: run.taskVersion ?? "unknown",
maxDuration: run.maxDurationInSeconds ?? undefined,
Expand Down Expand Up @@ -422,6 +428,7 @@ export class RunAttemptSystem {
runTags: true,
isTest: true,
idempotencyKey: true,
idempotencyKeyOptions: true,
startedAt: true,
maxAttempts: true,
taskVersion: true,
Expand Down Expand Up @@ -570,7 +577,8 @@ export class RunAttemptSystem {
createdAt: updatedRun.createdAt,
tags: updatedRun.runTags,
isTest: updatedRun.isTest,
idempotencyKey: updatedRun.idempotencyKey ?? undefined,
idempotencyKey: getUserProvidedIdempotencyKey(updatedRun) ?? undefined,
idempotencyKeyScope: getIdempotencyKeyScope(updatedRun),
startedAt: updatedRun.startedAt ?? updatedRun.createdAt,
maxAttempts: updatedRun.maxAttempts ?? undefined,
version: updatedRun.taskVersion ?? "unknown",
Expand Down Expand Up @@ -1914,6 +1922,7 @@ export class RunAttemptSystem {
stackTrace: truncateString(error.stackTrace, 1024 * 16), // 16kb
};
}

}

export function safeParseGitMeta(git: unknown): GitMeta | undefined {
Expand Down
2 changes: 2 additions & 0 deletions internal-packages/run-engine/src/engine/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,8 @@ export type TriggerParams = {
environment: MinimalAuthenticatedEnvironment;
idempotencyKey?: string;
idempotencyKeyExpiresAt?: Date;
/** The original user-provided idempotency key and scope */
idempotencyKeyOptions?: { key: string; scope: "run" | "attempt" | "global" };
taskIdentifier: string;
payload: string;
payloadType: string;
Expand Down
Loading