Skip to content

Commit 13f508f

Browse files
add recover mechanism for pg when pool is unexpectingly full when connections are not resolved (for instance because of short network issues)
1 parent 7e2ebc8 commit 13f508f

File tree

2 files changed

+180
-0
lines changed

2 files changed

+180
-0
lines changed

packages/cli/src/databases/db-connection.ts

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import { ErrorReporter } from 'n8n-core';
88
import { DbConnectionTimeoutError, ensureError } from 'n8n-workflow';
99

1010
import { DbConnectionOptions } from './db-connection-options';
11+
import { PgRecover } from './pg-recover';
1112

1213
type ConnectionState = {
1314
connected: boolean;
@@ -28,6 +29,7 @@ export class DbConnection {
2829
constructor(
2930
private readonly errorReporter: ErrorReporter,
3031
private readonly connectionOptions: DbConnectionOptions,
32+
private readonly pgRecover: PgRecover,
3133
) {
3234
this.dataSource = new DataSource(this.options);
3335
Container.set(DataSource, this.dataSource);
@@ -43,6 +45,9 @@ export class DbConnection {
4345
if (connectionState.connected) return;
4446
try {
4547
await this.dataSource.initialize();
48+
if (this.options.type === 'postgres') {
49+
this.pgRecover.recoverOnError();
50+
}
4651
} catch (e) {
4752
let error = ensureError(e);
4853
if (
Lines changed: 175 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,175 @@
1+
import { Service } from '@n8n/di';
2+
import { DataSource } from '@n8n/typeorm';
3+
import { PostgresDriver } from '@n8n/typeorm/driver/postgres/PostgresDriver';
4+
import { ErrorReporter, Logger } from 'n8n-core';
5+
import { ensureError } from 'n8n-workflow';
6+
import type { Pool, PoolClient } from 'pg';
7+
8+
// Internal structure of pg Pool and Client for connection recovery
9+
// These are not part of the public API but are needed for advanced recovery
10+
export interface InternalPoolClient extends PoolClient {
11+
_ended?: boolean;
12+
_ending?: boolean;
13+
}
14+
15+
export interface InternalPool extends Pool {
16+
_clients?: Array<InternalPoolClient | null>;
17+
}
18+
19+
@Service()
20+
export class PgRecover {
21+
constructor(
22+
private readonly dataSource: DataSource,
23+
private readonly errorReporter: ErrorReporter,
24+
private readonly logger: Logger,
25+
) {}
26+
27+
/**
28+
* Test method to simulate different types of network issues
29+
* This is useful for testing the connection pool recovery mechanism
30+
* @param issueType Type of issue to simulate ('network-drop', 'stalled-connections')
31+
* @param count Number of connections to affect
32+
* @returns Information about the simulated issue
33+
*/
34+
async simulateNetworkIssue(
35+
issueType: 'network-drop' | 'stalled-connections',
36+
count = 1,
37+
): Promise<{ success: boolean; message: string }> {
38+
if (!this.dataSource.isInitialized) {
39+
return { success: false, message: 'Database not initialized or not PostgreSQL' };
40+
}
41+
42+
try {
43+
const pgDriver = this.dataSource.driver as PostgresDriver;
44+
const pgPool = pgDriver.master as Pool;
45+
46+
switch (issueType) {
47+
case 'network-drop':
48+
// Simulate network drops by forcing pg driver to emit error events
49+
this.errorReporter.info(`Simulating ${count} network drops`);
50+
// Use EventEmitter's emit method to simulate network errors
51+
for (let i = 0; i < count; i++) {
52+
(pgPool as unknown as { emit(event: string, e: Error): boolean }).emit(
53+
'error',
54+
new Error(`Simulated network drop #${i + 1}`),
55+
);
56+
}
57+
return { success: true, message: `Simulated ${count} network drops` };
58+
59+
case 'stalled-connections':
60+
// Simulate stalled connections by marking some connections as ended
61+
this.errorReporter.info(`Simulating ${count} stalled connections`);
62+
const internalPool = pgPool as unknown as InternalPool;
63+
if (internalPool._clients && Array.isArray(internalPool._clients)) {
64+
let stalledCount = 0;
65+
for (let i = 0; i < internalPool._clients.length && stalledCount < count; i++) {
66+
const client = internalPool._clients[i];
67+
this.logger.debug(`Checking client at index ${i}: ${client?._ended}`);
68+
if (client && !client._ended && !client._ending) {
69+
// Mark the connection as ended but don't actually end it
70+
// This simulates a connection that's in a bad state but still in the pool
71+
client._ended = true;
72+
stalledCount++;
73+
this.logger.debug(`Marked connection ${i} as stalled`);
74+
}
75+
}
76+
return {
77+
success: stalledCount > 0,
78+
message:
79+
stalledCount > 0
80+
? `Simulated ${stalledCount} stalled connections`
81+
: 'No connections could be marked as stalled',
82+
};
83+
}
84+
return { success: false, message: 'Could not access internal pool structure' };
85+
86+
default:
87+
return { success: false, message: `Unknown issue type: ${issueType as string}` };
88+
}
89+
} catch (error) {
90+
this.errorReporter.error(
91+
`Failed to simulate network issue: ${error instanceof Error ? error.message : String(error)}`,
92+
);
93+
return {
94+
success: false,
95+
message: `Error: ${error instanceof Error ? error.message : String(error)}`,
96+
};
97+
}
98+
}
99+
100+
recoverOnError() {
101+
if (!this.dataSource.isInitialized) return;
102+
if (this.dataSource.driver instanceof PostgresDriver) {
103+
this.logger.debug('Recovering Postgres connection pool');
104+
const pgDriver = this.dataSource.driver;
105+
const pgPool = pgDriver.master as Pool;
106+
pgPool.on('error', async (error) => {
107+
this.logger.debug(`Postgres pool error: ${ensureError(error).message}`);
108+
// Log the current pool state
109+
this.logger.debug(
110+
`Recovering connection pool. Total: ${pgPool.totalCount}, Idle: ${pgPool.idleCount}, Waiting: ${pgPool.waitingCount}`,
111+
);
112+
// Attempt to recover the connection pool
113+
const recoveryAttempted = await this.recoverConnectionPool(pgPool);
114+
if (recoveryAttempted) {
115+
this.logger.debug('Connection pool recovery attempted');
116+
} else {
117+
this.logger.debug('Failed to recover connection pool');
118+
}
119+
});
120+
}
121+
}
122+
123+
/**
124+
* Attempt to recover the connection pool when it's in an unhealthy state
125+
* This method will try to release any stalled connections and create new ones
126+
* @returns True if recovery was attempted, false otherwise
127+
*/
128+
private async recoverConnectionPool(pgPool: Pool): Promise<boolean> {
129+
if (!this.dataSource.isInitialized) {
130+
return false;
131+
}
132+
133+
try {
134+
// Access the internal pool structure to find stalled connections
135+
// Note: This uses internal properties that are not part of the public API
136+
// but necessary for our specific use case of recovering from stalled connections
137+
const internalPool = pgPool as InternalPool;
138+
if (internalPool._clients && Array.isArray(internalPool._clients)) {
139+
this.logger.debug('Recovering connection pool...');
140+
let releasedCount = 0;
141+
const clients = internalPool._clients;
142+
143+
for (let i = 0; i < clients.length; i++) {
144+
const client = clients[i];
145+
this.logger.debug(`Checking client at index ${i}: ${client?._ended}`);
146+
// Check if the client is in an ended state but still in the pool
147+
if (client && (client._ended === true || client._ending === true)) {
148+
// Properly release the client instead of just setting it to null
149+
this.logger.debug(`Found stalled connection at index ${i}, properly releasing it`);
150+
try {
151+
// Force release the client (true parameter forces termination)
152+
client.release(true);
153+
} catch (error) {
154+
// If release fails, manually remove it from the pool
155+
this.logger.debug(
156+
`Failed to release client, removing from pool: ${ensureError(error).message}`,
157+
);
158+
clients[i] = null;
159+
}
160+
releasedCount++;
161+
}
162+
}
163+
164+
if (releasedCount > 0) {
165+
this.errorReporter.info(`Released ${releasedCount} stalled connections from the pool`);
166+
}
167+
}
168+
169+
return true;
170+
} catch (error) {
171+
this.errorReporter.error(`Failed to recover connection pool: ${ensureError(error).message}`);
172+
return false;
173+
}
174+
}
175+
}

0 commit comments

Comments
 (0)