Skip to content

Commit b756067

Browse files
ivovozcangungor
authored andcommitted
fix(core): Support task runner in execute and execute-batch commands (n8n-io#15147)
1 parent 7e8fe8f commit b756067

File tree

7 files changed

+196
-15
lines changed

7 files changed

+196
-15
lines changed
Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,94 @@
1+
import { GlobalConfig } from '@n8n/config';
2+
import type { User, WorkflowEntity } from '@n8n/db';
3+
import { Container } from '@n8n/di';
4+
import type { SelectQueryBuilder } from '@n8n/typeorm';
5+
import type { Config } from '@oclif/core';
6+
import { mock } from 'jest-mock-extended';
7+
import type { IRun } from 'n8n-workflow';
8+
9+
import { ActiveExecutions } from '@/active-executions';
10+
import { WorkflowRepository } from '@/databases/repositories/workflow.repository';
11+
import { DeprecationService } from '@/deprecation/deprecation.service';
12+
import { MessageEventBus } from '@/eventbus/message-event-bus/message-event-bus';
13+
import { TelemetryEventRelay } from '@/events/relays/telemetry.event-relay';
14+
import { ExternalHooks } from '@/external-hooks';
15+
import { LoadNodesAndCredentials } from '@/load-nodes-and-credentials';
16+
import { PostHogClient } from '@/posthog';
17+
import { OwnershipService } from '@/services/ownership.service';
18+
import { ShutdownService } from '@/shutdown/shutdown.service';
19+
import { TaskRunnerModule } from '@/task-runners/task-runner-module';
20+
import { WorkflowRunner } from '@/workflow-runner';
21+
import { mockInstance } from '@test/mocking';
22+
23+
import { ExecuteBatch } from '../execute-batch';
24+
25+
const taskRunnerModule = mockInstance(TaskRunnerModule);
26+
const workflowRepository = mockInstance(WorkflowRepository);
27+
const ownershipService = mockInstance(OwnershipService);
28+
const workflowRunner = mockInstance(WorkflowRunner);
29+
const activeExecutions = mockInstance(ActiveExecutions);
30+
const loadNodesAndCredentials = mockInstance(LoadNodesAndCredentials);
31+
const shutdownService = mockInstance(ShutdownService);
32+
const deprecationService = mockInstance(DeprecationService);
33+
mockInstance(MessageEventBus);
34+
const posthogClient = mockInstance(PostHogClient);
35+
const telemetryEventRelay = mockInstance(TelemetryEventRelay);
36+
const externalHooks = mockInstance(ExternalHooks);
37+
38+
jest.mock('@/db', () => ({
39+
init: jest.fn().mockResolvedValue(undefined),
40+
migrate: jest.fn().mockResolvedValue(undefined),
41+
connectionState: { connected: false },
42+
close: jest.fn().mockResolvedValue(undefined),
43+
}));
44+
45+
test('should start a task runner when task runners are enabled', async () => {
46+
// arrange
47+
48+
const workflow = mock<WorkflowEntity>({
49+
id: '123',
50+
nodes: [{ type: 'n8n-nodes-base.manualTrigger' }],
51+
});
52+
53+
const run = mock<IRun>({ data: { resultData: { error: undefined } } });
54+
55+
const queryBuilder = mock<SelectQueryBuilder<WorkflowEntity>>({
56+
andWhere: jest.fn().mockReturnThis(),
57+
getMany: jest.fn().mockResolvedValue([workflow]),
58+
});
59+
60+
loadNodesAndCredentials.init.mockResolvedValue(undefined);
61+
shutdownService.shutdown.mockReturnValue();
62+
deprecationService.warn.mockReturnValue();
63+
posthogClient.init.mockResolvedValue();
64+
telemetryEventRelay.init.mockResolvedValue();
65+
externalHooks.init.mockResolvedValue();
66+
67+
workflowRepository.createQueryBuilder.mockReturnValue(queryBuilder);
68+
ownershipService.getInstanceOwner.mockResolvedValue(mock<User>({ id: '123' }));
69+
workflowRunner.run.mockResolvedValue('123');
70+
activeExecutions.getPostExecutePromise.mockResolvedValue(run);
71+
72+
Container.set(
73+
GlobalConfig,
74+
mock<GlobalConfig>({
75+
taskRunners: { enabled: true },
76+
nodes: { communityPackages: { enabled: false } },
77+
}),
78+
);
79+
80+
const cmd = new ExecuteBatch([], {} as Config);
81+
// @ts-expect-error Private property
82+
cmd.parse = jest.fn().mockResolvedValue({ flags: {} });
83+
// @ts-expect-error Private property
84+
cmd.runTests = jest.fn().mockResolvedValue({ summary: { failedExecutions: [] } });
85+
86+
// act
87+
88+
await cmd.init();
89+
await cmd.run();
90+
91+
// assert
92+
93+
expect(taskRunnerModule.start).toHaveBeenCalledTimes(1);
94+
});
Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
import { GlobalConfig } from '@n8n/config';
2+
import type { User, WorkflowEntity } from '@n8n/db';
3+
import { Container } from '@n8n/di';
4+
import type { Config } from '@oclif/core';
5+
import { mock } from 'jest-mock-extended';
6+
import type { IRun } from 'n8n-workflow';
7+
8+
import { ActiveExecutions } from '@/active-executions';
9+
import { WorkflowRepository } from '@/databases/repositories/workflow.repository';
10+
import { DeprecationService } from '@/deprecation/deprecation.service';
11+
import { MessageEventBus } from '@/eventbus/message-event-bus/message-event-bus';
12+
import { TelemetryEventRelay } from '@/events/relays/telemetry.event-relay';
13+
import { ExternalHooks } from '@/external-hooks';
14+
import { LoadNodesAndCredentials } from '@/load-nodes-and-credentials';
15+
import { PostHogClient } from '@/posthog';
16+
import { OwnershipService } from '@/services/ownership.service';
17+
import { ShutdownService } from '@/shutdown/shutdown.service';
18+
import { TaskRunnerModule } from '@/task-runners/task-runner-module';
19+
import { WorkflowRunner } from '@/workflow-runner';
20+
import { mockInstance } from '@test/mocking';
21+
22+
import { Execute } from '../execute';
23+
24+
const taskRunnerModule = mockInstance(TaskRunnerModule);
25+
const workflowRepository = mockInstance(WorkflowRepository);
26+
const ownershipService = mockInstance(OwnershipService);
27+
const workflowRunner = mockInstance(WorkflowRunner);
28+
const activeExecutions = mockInstance(ActiveExecutions);
29+
const loadNodesAndCredentials = mockInstance(LoadNodesAndCredentials);
30+
const shutdownService = mockInstance(ShutdownService);
31+
const deprecationService = mockInstance(DeprecationService);
32+
mockInstance(MessageEventBus);
33+
const posthogClient = mockInstance(PostHogClient);
34+
const telemetryEventRelay = mockInstance(TelemetryEventRelay);
35+
const externalHooks = mockInstance(ExternalHooks);
36+
37+
jest.mock('@/db', () => ({
38+
init: jest.fn().mockResolvedValue(undefined),
39+
migrate: jest.fn().mockResolvedValue(undefined),
40+
connectionState: { connected: false },
41+
close: jest.fn().mockResolvedValue(undefined),
42+
}));
43+
44+
test('should start a task runner when task runners are enabled', async () => {
45+
// arrange
46+
47+
const workflow = mock<WorkflowEntity>({
48+
id: '123',
49+
nodes: [{ type: 'n8n-nodes-base.manualTrigger' }],
50+
});
51+
52+
const run = mock<IRun>({ data: { resultData: { error: undefined } } });
53+
54+
loadNodesAndCredentials.init.mockResolvedValue(undefined);
55+
shutdownService.shutdown.mockReturnValue();
56+
deprecationService.warn.mockReturnValue();
57+
posthogClient.init.mockResolvedValue();
58+
telemetryEventRelay.init.mockResolvedValue();
59+
externalHooks.init.mockResolvedValue();
60+
61+
workflowRepository.findOneBy.mockResolvedValue(workflow);
62+
ownershipService.getInstanceOwner.mockResolvedValue(mock<User>({ id: '123' }));
63+
workflowRunner.run.mockResolvedValue('123');
64+
activeExecutions.getPostExecutePromise.mockResolvedValue(run);
65+
66+
Container.set(
67+
GlobalConfig,
68+
mock<GlobalConfig>({
69+
taskRunners: { enabled: true },
70+
nodes: { communityPackages: { enabled: false } },
71+
}),
72+
);
73+
74+
const cmd = new Execute([], {} as Config);
75+
// @ts-expect-error Private property
76+
cmd.parse = jest.fn().mockResolvedValue({ flags: { id: '123' } });
77+
78+
// act
79+
80+
await cmd.init();
81+
await cmd.run();
82+
83+
// assert
84+
85+
expect(taskRunnerModule.start).toHaveBeenCalledTimes(1);
86+
});

packages/cli/src/commands/base-command.ts

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,9 @@ export abstract class BaseCommand extends Command {
6969
/** Whether to init community packages (if enabled) */
7070
protected needsCommunityPackages = false;
7171

72+
/** Whether to init task runner (if enabled). */
73+
protected needsTaskRunner = false;
74+
7275
protected async loadModules() {
7376
for (const moduleName of this.modulesConfig.modules) {
7477
let preInitModule: ModulePreInit | undefined;
@@ -156,6 +159,11 @@ export abstract class BaseCommand extends Command {
156159
await Container.get(CommunityPackagesService).checkForMissingPackages();
157160
}
158161

162+
if (this.needsTaskRunner && this.globalConfig.taskRunners.enabled) {
163+
const { TaskRunnerModule } = await import('@/task-runners/task-runner-module');
164+
await Container.get(TaskRunnerModule).start();
165+
}
166+
159167
// TODO: remove this after the cyclic dependencies around the event-bus are resolved
160168
Container.get(MessageEventBus);
161169

packages/cli/src/commands/execute-batch.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,8 @@ export class ExecuteBatch extends BaseCommand {
112112

113113
override needsCommunityPackages = true;
114114

115+
override needsTaskRunner = true;
116+
115117
/**
116118
* Gracefully handles exit.
117119
* @param {boolean} skipExit Whether to skip exit or number according to received signal
@@ -335,7 +337,6 @@ export class ExecuteBatch extends BaseCommand {
335337
if (results.summary.failedExecutions > 0) {
336338
this.exit(1);
337339
}
338-
this.exit(0);
339340
}
340341

341342
mergeResults(results: IResult, retryResults: IResult) {

packages/cli/src/commands/execute.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,8 @@ export class Execute extends BaseCommand {
2929

3030
override needsCommunityPackages = true;
3131

32+
override needsTaskRunner = true;
33+
3234
async init() {
3335
await super.init();
3436
await this.initBinaryDataService();

packages/cli/src/commands/start.ts

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,8 @@ export class Start extends BaseCommand {
6868

6969
override needsCommunityPackages = true;
7070

71+
override needsTaskRunner = true;
72+
7173
private getEditorUrl = () => Container.get(UrlService).getInstanceBaseUrl();
7274

7375
/**
@@ -234,13 +236,6 @@ export class Start extends BaseCommand {
234236
await this.generateStaticAssets();
235237
}
236238

237-
const { taskRunners: taskRunnerConfig } = this.globalConfig;
238-
if (taskRunnerConfig.enabled) {
239-
const { TaskRunnerModule } = await import('@/task-runners/task-runner-module');
240-
const taskRunnerModule = Container.get(TaskRunnerModule);
241-
await taskRunnerModule.start();
242-
}
243-
244239
await this.loadModules();
245240
}
246241

packages/cli/src/commands/worker.ts

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,8 @@ export class Worker extends BaseCommand {
3939

4040
override needsCommunityPackages = true;
4141

42+
override needsTaskRunner = true;
43+
4244
/**
4345
* Stop n8n in a graceful way.
4446
* Make for example sure that all the webhooks from third party services
@@ -108,13 +110,6 @@ export class Worker extends BaseCommand {
108110
}),
109111
);
110112

111-
const { taskRunners: taskRunnerConfig } = this.globalConfig;
112-
if (taskRunnerConfig.enabled) {
113-
const { TaskRunnerModule } = await import('@/task-runners/task-runner-module');
114-
const taskRunnerModule = Container.get(TaskRunnerModule);
115-
await taskRunnerModule.start();
116-
}
117-
118113
await this.loadModules();
119114
}
120115

0 commit comments

Comments
 (0)