From eb68d8cb2e4e95e347342094473a974365dbe171 Mon Sep 17 00:00:00 2001 From: Farhan Yahaya Date: Thu, 26 Feb 2026 17:53:35 +0000 Subject: [PATCH 1/5] tests: deploy, update, deploy --- integration-tests/cli/test/deploy.test.ts | 121 ++++++++++++++++++++++ packages/lightning-mock/src/api-dev.ts | 110 ++++++++++++++++++++ packages/lightning-mock/src/api-rest.ts | 35 ++++++- packages/lightning-mock/src/types.ts | 1 + packages/project/src/gen/generator.ts | 1 + 5 files changed, 264 insertions(+), 4 deletions(-) diff --git a/integration-tests/cli/test/deploy.test.ts b/integration-tests/cli/test/deploy.test.ts index 6fd11dd3f..6ef8a3be9 100644 --- a/integration-tests/cli/test/deploy.test.ts +++ b/integration-tests/cli/test/deploy.test.ts @@ -1,4 +1,6 @@ import test from 'ava'; +import path from 'node:path'; +import fs from 'node:fs/promises'; import run from '../src/run'; import createLightningServer, { DEFAULT_PROJECT_ID, @@ -14,10 +16,129 @@ const port = 8967; const endpoint = `http://localhost:${port}`; +let tmpDir = path.resolve('tmp/deploy'); + +const testProject = ` +name: test-project +workflows: + my-workflow: + name: My Workflow + jobs: + my-job: + name: My Job + adaptor: '@openfn/language-common@latest' + body: 'fn(s => s)' + triggers: + webhook: + type: webhook + enabled: true + edges: + webhook->my-job: + condition_type: always + source_trigger: webhook + target_job: my-job +`.trim(); + test.before(async () => { server = await createLightningServer({ port }); }); +test.beforeEach(async () => { + await fs.mkdir(tmpDir, { recursive: true }); +}); + +test.afterEach(async () => { + await rimraf(tmpDir); +}); + +test.serial('deploy a local project to lightning mock', async (t) => { + await fs.writeFile(path.join(tmpDir, 'project.yaml'), testProject); + + const { stdout, stderr } = await run( + `OPENFN_ENDPOINT=${endpoint} OPENFN_API_KEY=test-key openfn deploy \ + --project-path ${tmpDir}/project.yaml \ + --state-path ${tmpDir}/.state.json \ + --no-confirm \ + --log-json -l debug` + ); + + t.falsy(stderr); + + const logs = extractLogs(stdout); + assertLog(t, logs, /Deployed/); +}); + +test.serial('deploy a project, update workflow, deploy again', async (t) => { + const projectYamlUpdated = testProject.replace( + "body: 'fn(s => s)'", + "body: 'fn(s => ({ ...s, updated: true }))'" + ); + + const projectPath = path.join(tmpDir, 'project.yaml'); + const statePath = path.join(tmpDir, '.state.json'); + + const deployCmd = `OPENFN_ENDPOINT=${endpoint} OPENFN_API_KEY=test-key openfn deploy \ + --project-path ${projectPath} \ + --state-path ${statePath} \ + --no-confirm \ + --log-json -l debug`; + + // first deployment + await fs.writeFile(projectPath, testProject); + const first = await run(deployCmd); + t.falsy(first.stderr); + assertLog(t, extractLogs(first.stdout), /Deployed/); + + // second deployment after update + await fs.writeFile(projectPath, projectYamlUpdated); + const { stdout, stderr } = await run(deployCmd); + const logs = extractLogs(stdout); + t.falsy(stderr); + assertLog(t, logs, /Deployed/); + const changesLog = logs.find( + (log) => log.level === 'always' && /Changes\:/.test(`${log.message}`) + ); + t.regex(changesLog.message[0], /fn\(s => s\)/); + t.regex(changesLog.message[0], /fn\(s => \(\{ \.\.\.s, updated: true \}\)\)/); +}); + +test.skip('deploy then pull to check version history', async (t) => { + const projectPath = path.join(tmpDir, 'project.yaml'); + const statePath = path.join(tmpDir, '.state.json'); + + await fs.writeFile(projectPath, testProject); + + const deployCmd = `OPENFN_ENDPOINT=${endpoint} OPENFN_API_KEY=test-key openfn deploy \ + --project-path ${projectPath} \ + --state-path ${statePath} \ + --no-confirm \ + --log-json -l debug`; + + const deployResult = await run(deployCmd); + t.falsy(deployResult.stderr); + assertLog(t, extractLogs(deployResult.stdout), /Deployed/); + + const stateAfterDeploy = JSON.parse(await fs.readFile(statePath, 'utf8')); + // console.log("passed-here", stateAfterDeploy) + const projectId = stateAfterDeploy.id; + t.truthy(projectId); + + const pullResult = await run( + `OPENFN_ENDPOINT=${endpoint} OPENFN_API_KEY=test-key openfn pull ${projectId} \ + --project-path ${projectPath} \ + --state-path ${statePath} \ + --log-json` + ); + + t.falsy(pullResult.stderr); + assertLog(t, extractLogs(pullResult.stdout), /Project pulled successfully/i); + + const pulledState = JSON.parse(await fs.readFile(statePath, 'utf8')); + const workflow = Object.values(pulledState.workflows)[0] as any; + t.truthy(workflow.version_history); + t.is(workflow.version_history.length, 1); +}); + // This should fail against the built CLI right now test.serial( `OPENFN_ENDPOINT=${endpoint} openfn pull ${DEFAULT_PROJECT_ID} --log-json`, diff --git a/packages/lightning-mock/src/api-dev.ts b/packages/lightning-mock/src/api-dev.ts index 473c02343..595c738b1 100644 --- a/packages/lightning-mock/src/api-dev.ts +++ b/packages/lightning-mock/src/api-dev.ts @@ -2,6 +2,7 @@ * This module sets up a bunch of dev-only APIs * These are not intended to be reflected in Lightning itself */ +import { createHash } from 'node:crypto'; import Koa from 'koa'; import crypto from 'node:crypto'; import Router from '@koa/router'; @@ -17,6 +18,51 @@ import { RUN_COMPLETE } from './events'; import type { DevServer, LightningEvents } from './types'; import { PhoenixEvent } from './socket-server'; +function hashWorkflow(wf: any): string { + const pick = (obj: any, keys: string[]) => { + const out: any = {}; + keys.forEach((k) => { + if (obj[k] !== undefined) out[k] = obj[k]; + }); + return out; + }; + + const data = { + name: wf.name, + jobs: Object.values(wf.jobs ?? {}) + .map((j: any) => + pick(j, [ + 'name', + 'adaptor', + 'body', + 'project_credential_id', + 'keychain_credential_id', + ]) + ) + .sort((a: any, b: any) => (a.name ?? '').localeCompare(b.name ?? '')), + triggers: Object.values(wf.triggers ?? {}) + .map((t: any) => pick(t, ['type', 'cron_expression', 'enabled'])) + .sort((a: any, b: any) => (a.type ?? '').localeCompare(b.type ?? '')), + edges: Object.values(wf.edges ?? {}) + .map((e: any) => + pick(e, [ + 'condition_type', + 'condition_label', + 'condition_expression', + 'enabled', + ]) + ) + .sort((a: any, b: any) => + (a.condition_type ?? '').localeCompare(b.condition_type ?? '') + ), + }; + + return createHash('sha256') + .update(JSON.stringify(data)) + .digest('hex') + .slice(0, 12); +} + type Api = { startRun(runId: string): void; messageClients(message: PhoenixEvent): void; @@ -69,6 +115,70 @@ const setupDevAPI = ( state.projects[project.id] = project; }; + app.updateWorkflow = (projectId: string, wf: Provisioner.Workflow) => { + const project = state.projects[projectId]; + if (!project) { + throw new Error(`updateWorkflow: project ${projectId} not found`); + } + + const now = new Date().toISOString(); + + if (Array.isArray(project.workflows)) { + (project as any).workflows = (project.workflows as any[]).reduce( + (acc: any, w: any) => { + acc[w.id] = w; + return acc; + }, + {} + ); + } + + const workflows = project.workflows as Record; + const w = wf as any; + + if (w.delete) { + const key = Object.keys(workflows).find((k) => workflows[k].id === w.id); + if (key) delete workflows[key]; + return; + } + + const existingEntry = Object.entries(workflows).find( + ([, v]: any) => v.id === w.id + ) as [string, any] | undefined; + + const newHash = hashWorkflow(w); + + if (!existingEntry) { + workflows[w.id] = { + ...w, + lock_version: w.lock_version ?? 1, + inserted_at: now, + updated_at: now, + deleted_at: w.deleted_at ?? null, + version_history: [newHash], + }; + } else { + const [existingKey, existingWf] = existingEntry; + const existingHash = hashWorkflow(existingWf); + + if (newHash !== existingHash) { + const prevHistory: string[] = existingWf.version_history ?? []; + const newHistory = + prevHistory.length > 0 + ? [...prevHistory.slice(0, -1), newHash] // squash + : [newHash]; + + workflows[existingKey] = { + ...existingWf, + ...w, + lock_version: (existingWf.lock_version ?? 1) + 1, + updated_at: now, + version_history: newHistory, + }; + } + } + }; + // Promise which returns when a workflow is complete app.waitForResult = (runId: string) => { return new Promise((resolve) => { diff --git a/packages/lightning-mock/src/api-rest.ts b/packages/lightning-mock/src/api-rest.ts index bed2a562b..52eec5dcb 100644 --- a/packages/lightning-mock/src/api-rest.ts +++ b/packages/lightning-mock/src/api-rest.ts @@ -104,17 +104,44 @@ export default ( ctx.response.body = yaml; } else { // TODO what if doesn't exist? - ctx.response.body = { data: state.projects[ctx.params.id] }; + const project = state.projects[ctx.params.id]; + if (project && project.workflows && !Array.isArray(project.workflows)) { + ctx.response.body = { + data: { ...project, workflows: Object.values(project.workflows) }, + }; + } else { + ctx.response.body = { data: project }; + } } }); router.post('/api/provision', (ctx) => { - const proj: any = ctx.request.body; + const incoming: any = ctx.request.body; + const now = new Date().toISOString(); + + if (!state.projects[incoming.id]) { + state.projects[incoming.id] = { + ...incoming, + workflows: {}, + inserted_at: now, + updated_at: now, + }; + } else { + const { workflows: _ignored, ...projectFields } = incoming; + state.projects[incoming.id] = { + ...state.projects[incoming.id], + ...projectFields, + updated_at: now, + }; + console.log('han:', _ignored); + } - state.projects[proj.id] = proj; + Object.values(incoming.workflows ?? {}).forEach((wf: any) => { + app.updateWorkflow(incoming.id, wf); + }); ctx.response.status = 200; - ctx.response.body = { data: proj }; + ctx.response.body = { data: state.projects[incoming.id] }; }); // list with query diff --git a/packages/lightning-mock/src/types.ts b/packages/lightning-mock/src/types.ts index 389ed7c2d..04f0836e1 100644 --- a/packages/lightning-mock/src/types.ts +++ b/packages/lightning-mock/src/types.ts @@ -15,6 +15,7 @@ export type DevServer = Koa & { addCredential(id: string, cred: Credential): void; addDataclip(id: string, data: DataClip): void; addProject(proj: Provisioner.Project_v1): void; + updateWorkflow(projectId: string, workflow: Provisioner.Workflow): void; enqueueRun(run: LightningPlan): void; destroy: () => Promise; getRun(id: string): LightningPlan; diff --git a/packages/project/src/gen/generator.ts b/packages/project/src/gen/generator.ts index c4f2665cf..a2620b610 100644 --- a/packages/project/src/gen/generator.ts +++ b/packages/project/src/gen/generator.ts @@ -196,6 +196,7 @@ const initOperations = (options: any = {}) => { export const createParser = () => { // Load the grammar // TODO: is there any way I can compile/serialize the grammar into JS? + // @ts-expect-error const grammarPath = path.resolve(import.meta.dirname, 'workflow.ohm'); const contents = readFileSync(grammarPath, 'utf-8'); const parser = grammar(contents); From bb63175ef12bab896f729d194313a7119bc58a02 Mon Sep 17 00:00:00 2001 From: Farhan Yahaya Date: Fri, 27 Feb 2026 13:28:01 +0000 Subject: [PATCH 2/5] chore: proper support version histories --- packages/lightning-mock/src/api-dev.ts | 50 ++++++++++++++++++++----- packages/lightning-mock/src/api-rest.ts | 5 +-- 2 files changed, 43 insertions(+), 12 deletions(-) diff --git a/packages/lightning-mock/src/api-dev.ts b/packages/lightning-mock/src/api-dev.ts index 595c738b1..485bc6c26 100644 --- a/packages/lightning-mock/src/api-dev.ts +++ b/packages/lightning-mock/src/api-dev.ts @@ -120,17 +120,49 @@ const setupDevAPI = ( if (!project) { throw new Error(`updateWorkflow: project ${projectId} not found`); } - const now = new Date().toISOString(); - if (Array.isArray(project.workflows)) { - (project as any).workflows = (project.workflows as any[]).reduce( - (acc: any, w: any) => { - acc[w.id] = w; - return acc; - }, - {} - ); + const _newHash = hashWorkflow(wf); + + const _exists = Object.values(project.workflows).find((wf) => { + wf.id === wf.id; + }); + + if (!_exists) { + const new_workflow = { + ...wf, + lock_version: wf.lock_version ?? 1, + inserted_at: now, + updated_at: now, + deleted_at: wf.deleted_at ?? null, + version_history: [_newHash], + }; + // @ts-ignore + project.workflows = [...Object.values(project.workflows), new_workflow]; + } else { + // if existing. update it + const existingHash = hashWorkflow(_exists); + + if (_newHash !== existingHash) { + const prevHistory: string[] = _exists.version_history ?? []; + const newHistory = + prevHistory.length > 0 + ? [...prevHistory.slice(0, -1), _newHash] // squash + : [_newHash]; + + // @ts-ignore + project.workflows = Object.values(project.workflows).map((wf) => { + if (wf.id === _exists.id) { + return { + ..._exists, + ...wf, + lock_version: (_exists.lock_version ?? 1) + 1, + updated_at: now, + version_history: newHistory, + }; + } + }); + } } const workflows = project.workflows as Record; diff --git a/packages/lightning-mock/src/api-rest.ts b/packages/lightning-mock/src/api-rest.ts index 52eec5dcb..dde945372 100644 --- a/packages/lightning-mock/src/api-rest.ts +++ b/packages/lightning-mock/src/api-rest.ts @@ -122,7 +122,7 @@ export default ( if (!state.projects[incoming.id]) { state.projects[incoming.id] = { ...incoming, - workflows: {}, + workflows: [], inserted_at: now, updated_at: now, }; @@ -133,10 +133,9 @@ export default ( ...projectFields, updated_at: now, }; - console.log('han:', _ignored); } - Object.values(incoming.workflows ?? {}).forEach((wf: any) => { + (incoming.workflows ?? []).forEach((wf: any) => { app.updateWorkflow(incoming.id, wf); }); From 7015188969aa7a9bd65fcfefffd40de3597572b6 Mon Sep 17 00:00:00 2001 From: Farhan Yahaya Date: Fri, 27 Feb 2026 13:34:08 +0000 Subject: [PATCH 3/5] test: add version history test --- integration-tests/cli/test/deploy.test.ts | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/integration-tests/cli/test/deploy.test.ts b/integration-tests/cli/test/deploy.test.ts index 6ef8a3be9..01a90036d 100644 --- a/integration-tests/cli/test/deploy.test.ts +++ b/integration-tests/cli/test/deploy.test.ts @@ -102,7 +102,7 @@ test.serial('deploy a project, update workflow, deploy again', async (t) => { t.regex(changesLog.message[0], /fn\(s => \(\{ \.\.\.s, updated: true \}\)\)/); }); -test.skip('deploy then pull to check version history', async (t) => { +test.serial('deploy then pull to check version history', async (t) => { const projectPath = path.join(tmpDir, 'project.yaml'); const statePath = path.join(tmpDir, '.state.json'); @@ -117,7 +117,7 @@ test.skip('deploy then pull to check version history', async (t) => { const deployResult = await run(deployCmd); t.falsy(deployResult.stderr); assertLog(t, extractLogs(deployResult.stdout), /Deployed/); - + const stateAfterDeploy = JSON.parse(await fs.readFile(statePath, 'utf8')); // console.log("passed-here", stateAfterDeploy) const projectId = stateAfterDeploy.id; From 23959efd8237eba9e8a0d3c7e1da0626c78e2ebf Mon Sep 17 00:00:00 2001 From: Farhan Yahaya Date: Fri, 27 Feb 2026 13:35:58 +0000 Subject: [PATCH 4/5] chore: ts-ignore --- packages/project/src/gen/generator.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/project/src/gen/generator.ts b/packages/project/src/gen/generator.ts index a2620b610..b45aa630f 100644 --- a/packages/project/src/gen/generator.ts +++ b/packages/project/src/gen/generator.ts @@ -196,7 +196,7 @@ const initOperations = (options: any = {}) => { export const createParser = () => { // Load the grammar // TODO: is there any way I can compile/serialize the grammar into JS? - // @ts-expect-error + // @ts-ignore const grammarPath = path.resolve(import.meta.dirname, 'workflow.ohm'); const contents = readFileSync(grammarPath, 'utf-8'); const parser = grammar(contents); From e7b6909518bf3aabb55fd1142c07ae85007c0372 Mon Sep 17 00:00:00 2001 From: Farhan Yahaya Date: Fri, 27 Feb 2026 14:52:13 +0000 Subject: [PATCH 5/5] test: update one workflow and deploy --- integration-tests/cli/test/deploy.test.ts | 91 ++++++++++++++++++++++- 1 file changed, 90 insertions(+), 1 deletion(-) diff --git a/integration-tests/cli/test/deploy.test.ts b/integration-tests/cli/test/deploy.test.ts index 01a90036d..1ff64173d 100644 --- a/integration-tests/cli/test/deploy.test.ts +++ b/integration-tests/cli/test/deploy.test.ts @@ -39,6 +39,43 @@ workflows: target_job: my-job `.trim(); +const testProjectMulti = ` +name: test-project +workflows: + my-workflow: + name: My Workflow + jobs: + my-job: + name: My Job + adaptor: '@openfn/language-common@latest' + body: 'fn(s => s)' + triggers: + webhook: + type: webhook + enabled: true + edges: + webhook->my-job: + condition_type: always + source_trigger: webhook + target_job: my-job + another-workflow: + name: Another Workflow + jobs: + another-job: + name: Another Job + adaptor: '@openfn/language-http@latest' + body: "get('http://example.com')" + triggers: + webhook: + type: webhook + enabled: true + edges: + webhook->another-job: + condition_type: always + source_trigger: webhook + target_job: another-job +`.trim(); + test.before(async () => { server = await createLightningServer({ port }); }); @@ -119,7 +156,6 @@ test.serial('deploy then pull to check version history', async (t) => { assertLog(t, extractLogs(deployResult.stdout), /Deployed/); const stateAfterDeploy = JSON.parse(await fs.readFile(statePath, 'utf8')); - // console.log("passed-here", stateAfterDeploy) const projectId = stateAfterDeploy.id; t.truthy(projectId); @@ -139,6 +175,59 @@ test.serial('deploy then pull to check version history', async (t) => { t.is(workflow.version_history.length, 1); }); +test.serial('deploy then pull, changes one workflow, deploy', async (t) => { + const projectYamlUpdated = testProjectMulti.replace( + 'body: "get(\'http://example.com\')"', + 'body: "post(\'http://success.org\')"' + ); + const projectPath = path.join(tmpDir, 'project.yaml'); + const statePath = path.join(tmpDir, '.state.json'); + + await fs.writeFile(projectPath, testProjectMulti); + + // deploy fresh project + const deployCmd = `OPENFN_ENDPOINT=${endpoint} OPENFN_API_KEY=test-key openfn deploy \ + --project-path ${projectPath} \ + --state-path ${statePath} \ + --no-confirm \ + --log-json -l debug`; + + const deployResult = await run(deployCmd); + t.falsy(deployResult.stderr); + assertLog(t, extractLogs(deployResult.stdout), /Deployed/); + + const stateAfterDeploy = JSON.parse(await fs.readFile(statePath, 'utf8')); + const projectId = stateAfterDeploy.id; + t.truthy(projectId); + + // pull the project back + const pullResult = await run( + `OPENFN_ENDPOINT=${endpoint} OPENFN_API_KEY=test-key openfn pull ${projectId} \ + --project-path ${projectPath} \ + --state-path ${statePath} \ + --log-json` + ); + + t.falsy(pullResult.stderr); + assertLog(t, extractLogs(pullResult.stdout), /Project pulled successfully/i); + const pulledState = JSON.parse(await fs.readFile(statePath, 'utf8')); + const workflow = Object.values(pulledState.workflows)[0] as any; + t.truthy(workflow.version_history); + t.is(workflow.version_history.length, 1); + + // now deploy with changes to one workflow + await fs.writeFile(projectPath, projectYamlUpdated); + const { stdout, stderr } = await run(deployCmd); + const logs = extractLogs(stdout); + t.falsy(stderr); + assertLog(t, logs, /Deployed/); + const changesLog = logs.find( + (log) => log.level === 'always' && /Changes\:/.test(`${log.message}`) + ); + t.regex(changesLog.message[0], /\-.+body: \"get\('http:\/\/example.com'\)\"/); + t.regex(changesLog.message[0], /\+.+body: \"post\('http:\/\/success.org'\)"/); +}); + // This should fail against the built CLI right now test.serial( `OPENFN_ENDPOINT=${endpoint} openfn pull ${DEFAULT_PROJECT_ID} --log-json`,