Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
210 changes: 210 additions & 0 deletions integration-tests/cli/test/deploy.test.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
import test from 'ava';
import path from 'node:path';
import fs from 'node:fs/promises';
import run from '../src/run';
import createLightningServer, {
DEFAULT_PROJECT_ID,
Expand All @@ -14,10 +16,218 @@ const port = 8967;

const endpoint = `http://localhost:${port}`;

let tmpDir = path.resolve('tmp/deploy');

const testProject = `
name: test-project
workflows:
my-workflow:
name: My Workflow
jobs:
my-job:
name: My Job
adaptor: '@openfn/language-common@latest'
body: 'fn(s => s)'
triggers:
webhook:
type: webhook
enabled: true
edges:
webhook->my-job:
condition_type: always
source_trigger: webhook
target_job: my-job
`.trim();

const testProjectMulti = `
name: test-project
workflows:
my-workflow:
name: My Workflow
jobs:
my-job:
name: My Job
adaptor: '@openfn/language-common@latest'
body: 'fn(s => s)'
triggers:
webhook:
type: webhook
enabled: true
edges:
webhook->my-job:
condition_type: always
source_trigger: webhook
target_job: my-job
another-workflow:
name: Another Workflow
jobs:
another-job:
name: Another Job
adaptor: '@openfn/language-http@latest'
body: "get('http://example.com')"
triggers:
webhook:
type: webhook
enabled: true
edges:
webhook->another-job:
condition_type: always
source_trigger: webhook
target_job: another-job
`.trim();

test.before(async () => {
server = await createLightningServer({ port });
});

test.beforeEach(async () => {
await fs.mkdir(tmpDir, { recursive: true });
});

test.afterEach(async () => {
await rimraf(tmpDir);
});

test.serial('deploy a local project to lightning mock', async (t) => {
await fs.writeFile(path.join(tmpDir, 'project.yaml'), testProject);

const { stdout, stderr } = await run(
`OPENFN_ENDPOINT=${endpoint} OPENFN_API_KEY=test-key openfn deploy \
--project-path ${tmpDir}/project.yaml \
--state-path ${tmpDir}/.state.json \
--no-confirm \
--log-json -l debug`
);

t.falsy(stderr);

const logs = extractLogs(stdout);
assertLog(t, logs, /Deployed/);
});

test.serial('deploy a project, update workflow, deploy again', async (t) => {
const projectYamlUpdated = testProject.replace(
"body: 'fn(s => s)'",
"body: 'fn(s => ({ ...s, updated: true }))'"
);

const projectPath = path.join(tmpDir, 'project.yaml');
const statePath = path.join(tmpDir, '.state.json');

const deployCmd = `OPENFN_ENDPOINT=${endpoint} OPENFN_API_KEY=test-key openfn deploy \
--project-path ${projectPath} \
--state-path ${statePath} \
--no-confirm \
--log-json -l debug`;

// first deployment
await fs.writeFile(projectPath, testProject);
const first = await run(deployCmd);
t.falsy(first.stderr);
assertLog(t, extractLogs(first.stdout), /Deployed/);

// second deployment after update
await fs.writeFile(projectPath, projectYamlUpdated);
const { stdout, stderr } = await run(deployCmd);
const logs = extractLogs(stdout);
t.falsy(stderr);
assertLog(t, logs, /Deployed/);
const changesLog = logs.find(
(log) => log.level === 'always' && /Changes\:/.test(`${log.message}`)
);
t.regex(changesLog.message[0], /fn\(s => s\)/);
t.regex(changesLog.message[0], /fn\(s => \(\{ \.\.\.s, updated: true \}\)\)/);
});

test.serial('deploy then pull to check version history', async (t) => {
const projectPath = path.join(tmpDir, 'project.yaml');
const statePath = path.join(tmpDir, '.state.json');

await fs.writeFile(projectPath, testProject);

const deployCmd = `OPENFN_ENDPOINT=${endpoint} OPENFN_API_KEY=test-key openfn deploy \
--project-path ${projectPath} \
--state-path ${statePath} \
--no-confirm \
--log-json -l debug`;

const deployResult = await run(deployCmd);
t.falsy(deployResult.stderr);
assertLog(t, extractLogs(deployResult.stdout), /Deployed/);

const stateAfterDeploy = JSON.parse(await fs.readFile(statePath, 'utf8'));
const projectId = stateAfterDeploy.id;
t.truthy(projectId);

const pullResult = await run(
`OPENFN_ENDPOINT=${endpoint} OPENFN_API_KEY=test-key openfn pull ${projectId} \
--project-path ${projectPath} \
--state-path ${statePath} \
--log-json`
);

t.falsy(pullResult.stderr);
assertLog(t, extractLogs(pullResult.stdout), /Project pulled successfully/i);

const pulledState = JSON.parse(await fs.readFile(statePath, 'utf8'));
const workflow = Object.values(pulledState.workflows)[0] as any;
t.truthy(workflow.version_history);
t.is(workflow.version_history.length, 1);
});

test.serial('deploy then pull, changes one workflow, deploy', async (t) => {
const projectYamlUpdated = testProjectMulti.replace(
'body: "get(\'http://example.com\')"',
'body: "post(\'http://success.org\')"'
);
const projectPath = path.join(tmpDir, 'project.yaml');
const statePath = path.join(tmpDir, '.state.json');

await fs.writeFile(projectPath, testProjectMulti);

// deploy fresh project
const deployCmd = `OPENFN_ENDPOINT=${endpoint} OPENFN_API_KEY=test-key openfn deploy \
--project-path ${projectPath} \
--state-path ${statePath} \
--no-confirm \
--log-json -l debug`;

const deployResult = await run(deployCmd);
t.falsy(deployResult.stderr);
assertLog(t, extractLogs(deployResult.stdout), /Deployed/);

const stateAfterDeploy = JSON.parse(await fs.readFile(statePath, 'utf8'));
const projectId = stateAfterDeploy.id;
t.truthy(projectId);

// pull the project back
const pullResult = await run(
`OPENFN_ENDPOINT=${endpoint} OPENFN_API_KEY=test-key openfn pull ${projectId} \
--project-path ${projectPath} \
--state-path ${statePath} \
--log-json`
);

t.falsy(pullResult.stderr);
assertLog(t, extractLogs(pullResult.stdout), /Project pulled successfully/i);
const pulledState = JSON.parse(await fs.readFile(statePath, 'utf8'));
const workflow = Object.values(pulledState.workflows)[0] as any;
t.truthy(workflow.version_history);
t.is(workflow.version_history.length, 1);

// now deploy with changes to one workflow
await fs.writeFile(projectPath, projectYamlUpdated);
const { stdout, stderr } = await run(deployCmd);
const logs = extractLogs(stdout);
t.falsy(stderr);
assertLog(t, logs, /Deployed/);
const changesLog = logs.find(
(log) => log.level === 'always' && /Changes\:/.test(`${log.message}`)
);
t.regex(changesLog.message[0], /\-.+body: \"get\('http:\/\/example.com'\)\"/);
t.regex(changesLog.message[0], /\+.+body: \"post\('http:\/\/success.org'\)"/);
});

// This should fail against the built CLI right now
test.serial(
`OPENFN_ENDPOINT=${endpoint} openfn pull ${DEFAULT_PROJECT_ID} --log-json`,
Expand Down
142 changes: 142 additions & 0 deletions packages/lightning-mock/src/api-dev.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
* This module sets up a bunch of dev-only APIs
* These are not intended to be reflected in Lightning itself
*/
import { createHash } from 'node:crypto';
import Koa from 'koa';
import crypto from 'node:crypto';
import Router from '@koa/router';
Expand All @@ -17,6 +18,51 @@ import { RUN_COMPLETE } from './events';
import type { DevServer, LightningEvents } from './types';
import { PhoenixEvent } from './socket-server';

function hashWorkflow(wf: any): string {
const pick = (obj: any, keys: string[]) => {
const out: any = {};
keys.forEach((k) => {
if (obj[k] !== undefined) out[k] = obj[k];
});
return out;
};

const data = {
name: wf.name,
jobs: Object.values(wf.jobs ?? {})
.map((j: any) =>
pick(j, [
'name',
'adaptor',
'body',
'project_credential_id',
'keychain_credential_id',
])
)
.sort((a: any, b: any) => (a.name ?? '').localeCompare(b.name ?? '')),
triggers: Object.values(wf.triggers ?? {})
.map((t: any) => pick(t, ['type', 'cron_expression', 'enabled']))
.sort((a: any, b: any) => (a.type ?? '').localeCompare(b.type ?? '')),
edges: Object.values(wf.edges ?? {})
.map((e: any) =>
pick(e, [
'condition_type',
'condition_label',
'condition_expression',
'enabled',
])
)
.sort((a: any, b: any) =>
(a.condition_type ?? '').localeCompare(b.condition_type ?? '')
),
};

return createHash('sha256')
.update(JSON.stringify(data))
.digest('hex')
.slice(0, 12);
}

type Api = {
startRun(runId: string): void;
messageClients(message: PhoenixEvent): void;
Expand Down Expand Up @@ -69,6 +115,102 @@ const setupDevAPI = (
state.projects[project.id] = project;
};

app.updateWorkflow = (projectId: string, wf: Provisioner.Workflow) => {
const project = state.projects[projectId];
if (!project) {
throw new Error(`updateWorkflow: project ${projectId} not found`);
}
const now = new Date().toISOString();

const _newHash = hashWorkflow(wf);

const _exists = Object.values(project.workflows).find((wf) => {
wf.id === wf.id;
});

if (!_exists) {
const new_workflow = {
...wf,
lock_version: wf.lock_version ?? 1,
inserted_at: now,
updated_at: now,
deleted_at: wf.deleted_at ?? null,
version_history: [_newHash],
};
// @ts-ignore
project.workflows = [...Object.values(project.workflows), new_workflow];
} else {
// if existing. update it
const existingHash = hashWorkflow(_exists);

if (_newHash !== existingHash) {
const prevHistory: string[] = _exists.version_history ?? [];
const newHistory =
prevHistory.length > 0
? [...prevHistory.slice(0, -1), _newHash] // squash
: [_newHash];

// @ts-ignore
project.workflows = Object.values(project.workflows).map((wf) => {
if (wf.id === _exists.id) {
return {
..._exists,
...wf,
lock_version: (_exists.lock_version ?? 1) + 1,
updated_at: now,
version_history: newHistory,
};
}
});
}
}

const workflows = project.workflows as Record<string, any>;
const w = wf as any;

if (w.delete) {
const key = Object.keys(workflows).find((k) => workflows[k].id === w.id);
if (key) delete workflows[key];
return;
}

const existingEntry = Object.entries(workflows).find(
([, v]: any) => v.id === w.id
) as [string, any] | undefined;

const newHash = hashWorkflow(w);

if (!existingEntry) {
workflows[w.id] = {
...w,
lock_version: w.lock_version ?? 1,
inserted_at: now,
updated_at: now,
deleted_at: w.deleted_at ?? null,
version_history: [newHash],
};
} else {
const [existingKey, existingWf] = existingEntry;
const existingHash = hashWorkflow(existingWf);

if (newHash !== existingHash) {
const prevHistory: string[] = existingWf.version_history ?? [];
const newHistory =
prevHistory.length > 0
? [...prevHistory.slice(0, -1), newHash] // squash
: [newHash];

workflows[existingKey] = {
...existingWf,
...w,
lock_version: (existingWf.lock_version ?? 1) + 1,
updated_at: now,
version_history: newHistory,
};
}
}
};

// Promise which returns when a workflow is complete
app.waitForResult = (runId: string) => {
return new Promise((resolve) => {
Expand Down
Loading