Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 35 additions & 0 deletions packages/pg-pool/diagnostics.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
'use strict'

const noopChannel = { hasSubscribers: false }

let poolConnectChannel = noopChannel
let poolReleaseChannel = noopChannel
let poolRemoveChannel = noopChannel

try {
let dc
if (typeof process.getBuiltInModule === 'function') {
dc = process.getBuiltInModule('diagnostics_channel')
} else {
dc = require('diagnostics_channel')
}
if (typeof dc.tracingChannel === 'function') {
poolConnectChannel = dc.tracingChannel('pg:pool:connect')
}
if (typeof dc.channel === 'function') {
poolReleaseChannel = dc.channel('pg:pool:release')
poolRemoveChannel = dc.channel('pg:pool:remove')
}
} catch (e) {
// diagnostics_channel not available (non-Node environment)
}

// Check explicitly for `false` rather than truthiness because the aggregated
// `hasSubscribers` getter on TracingChannel is `undefined` on Node 18 (which
// backported TracingChannel but not the getter). When `undefined`, we assume
// there may be subscribers and trace unconditionally.
function shouldTrace(channel) {
return channel.hasSubscribers !== false
Comment on lines +27 to +32
Copy link

Copilot AI Apr 7, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shouldTrace() returns true when channel.hasSubscribers is undefined, which can cause publish()/traceCallback() to run unconditionally on runtimes where hasSubscribers isn’t implemented. If the goal is “no overhead when there are no subscribers”, consider a safer subscriber check (or default undefined to false) and document any Node-version limitations explicitly.

Suggested change
// Check explicitly for `false` rather than truthiness because the aggregated
// `hasSubscribers` getter on TracingChannel is `undefined` on Node 18 (which
// backported TracingChannel but not the getter). When `undefined`, we assume
// there may be subscribers and trace unconditionally.
function shouldTrace(channel) {
return channel.hasSubscribers !== false
// Only trace when subscriber presence is known. Some runtimes expose
// `TracingChannel` without an aggregated `hasSubscribers` getter, in which case
// `channel.hasSubscribers` is `undefined`. Default that case to `false` so we
// avoid tracing overhead when subscriber state cannot be determined.
function shouldTrace(channel) {
return channel.hasSubscribers === true

Copilot uses AI. Check for mistakes.
Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Already discussed above.

}

module.exports = { poolConnectChannel, poolReleaseChannel, poolRemoveChannel, shouldTrace }
34 changes: 34 additions & 0 deletions packages/pg-pool/index.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
'use strict'
const EventEmitter = require('events').EventEmitter
const { poolConnectChannel, poolReleaseChannel, poolRemoveChannel, shouldTrace } = require('./diagnostics')

const NOOP = function () {}

Expand Down Expand Up @@ -178,6 +179,10 @@ class Pool extends EventEmitter {

this._clients = this._clients.filter((c) => c !== client)
const context = this
if (shouldTrace(poolRemoveChannel)) {
poolRemoveChannel.publish({ client: { processID: client.processID } })
}

client.end(() => {
context.emit('remove', client)

Expand All @@ -196,6 +201,31 @@ class Pool extends EventEmitter {
const response = promisify(this.Promise, cb)
const result = response.result

if (shouldTrace(poolConnectChannel)) {
const context = {
pool: {
totalCount: this.totalCount,
idleCount: this.idleCount,
waitingCount: this.waitingCount,
maxSize: this.options.max,
},
}
const origCb = response.callback
const enrichedCb = (err, client, done) => {
if (client) context.client = { processID: client.processID, reused: !!client._poolUseCount }
return origCb(err, client, done)
}
poolConnectChannel.traceCallback(
(tracedCb) => {
response.callback = tracedCb
},
0,
context,
null,
enrichedCb
)
}

// if we don't have to connect a new client, don't do so
if (this._isFull() || this._idle.length) {
// if we have idle clients schedule a pulse immediately
Expand Down Expand Up @@ -388,6 +418,10 @@ class Pool extends EventEmitter {

this.emit('release', err, client)

if (shouldTrace(poolReleaseChannel)) {
poolReleaseChannel.publish({ client: { processID: client.processID }, error: err || undefined })
}

// TODO(bmc): expose a proper, public interface _queryable and _ending
if (err || this.ending || !client._queryable || client._ending || client._poolUseCount >= this.options.maxUses) {
if (client._poolUseCount >= this.options.maxUses) {
Expand Down
197 changes: 197 additions & 0 deletions packages/pg-pool/test/diagnostics.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,197 @@
'use strict'

const expect = require('expect.js')
const EventEmitter = require('events').EventEmitter
const describe = require('mocha').describe
const it = require('mocha').it
const dc = require('diagnostics_channel')
const Pool = require('../')

// TracingChannel exists on Node 18+ but the aggregated hasSubscribers getter
// and stable unsubscribe behavior require Node 19.9+/20.5+. Skip tracing
// tests on older versions where TracingChannel is missing or has internal bugs.
const hasStableTracingChannel =
typeof dc.tracingChannel === 'function' && typeof dc.tracingChannel('pg:pool:test:probe').hasSubscribers === 'boolean'

function mockClient(methods) {
return function () {
const client = new EventEmitter()
client.end = function (cb) {
if (cb) process.nextTick(cb)
}
client._queryable = true
client._ending = false
client.processID = 12345
Object.assign(client, methods)
return client
}
}

describe('diagnostics channels', function () {
describe('pg:pool:connect', function () {
;(hasStableTracingChannel ? it : it.skip)('publishes start event when connect is called', function (done) {
const pool = new Pool({
Client: mockClient({
connect: function (cb) {
process.nextTick(() => cb(null))
},
}),
})

let capturedContext
const channel = dc.tracingChannel('pg:pool:connect')
const subs = {
start: (ctx) => {
capturedContext = ctx
},
end: () => {},
asyncStart: () => {},
asyncEnd: () => {},
error: () => {},
}

channel.subscribe(subs)

pool.connect(function (err, client, release) {
if (err) return done(err)
release()
pool.end(() => {
expect(capturedContext).to.be.ok()
expect(capturedContext.pool).to.be.ok()
expect(capturedContext.pool.maxSize).to.be(10)
expect(capturedContext.pool.totalCount).to.be.a('number')

channel.unsubscribe(subs)
done()
})
})
})
;(hasStableTracingChannel ? it : it.skip)('enriches context with client info on asyncEnd', function (done) {
const pool = new Pool({
Client: mockClient({
connect: function (cb) {
process.nextTick(() => cb(null))
},
}),
})

const channel = dc.tracingChannel('pg:pool:connect')
const subs = {
start: () => {},
end: () => {},
asyncStart: () => {},
asyncEnd: (ctx) => {
expect(ctx.client).to.be.ok()
expect(ctx.client.processID).to.be(12345)

channel.unsubscribe(subs)
done()
},
error: () => {},
}

channel.subscribe(subs)

pool.connect(function (err, client, release) {
if (err) return done(err)
release()
pool.end()
})
})
})

describe('pg:pool:release', function () {
it('publishes when a client is released', function (done) {
const pool = new Pool({
Client: mockClient({
connect: function (cb) {
process.nextTick(() => cb(null))
},
}),
})

let releaseMessage
const channel = dc.channel('pg:pool:release')
const onMessage = (msg) => {
releaseMessage = msg
}
channel.subscribe(onMessage)

pool.connect(function (err, client, release) {
if (err) return done(err)
release()
pool.end(() => {
expect(releaseMessage).to.be.ok()
expect(releaseMessage.client).to.be.ok()
expect(releaseMessage.client.processID).to.be(12345)

channel.unsubscribe(onMessage)
done()
})
})
})

it('includes error when released with error', function (done) {
const pool = new Pool({
Client: mockClient({
connect: function (cb) {
process.nextTick(() => cb(null))
},
}),
})

let releaseMessage
const channel = dc.channel('pg:pool:release')
const onMessage = (msg) => {
releaseMessage = msg
}
channel.subscribe(onMessage)

pool.connect(function (err, client, release) {
if (err) return done(err)
const releaseError = new Error('test error')
release(releaseError)
pool.end(() => {
expect(releaseMessage).to.be.ok()
expect(releaseMessage.error).to.be(releaseError)

channel.unsubscribe(onMessage)
done()
})
})
})
})

describe('pg:pool:remove', function () {
it('publishes when a client is removed', function (done) {
const pool = new Pool({
Client: mockClient({
connect: function (cb) {
process.nextTick(() => cb(null))
},
}),
})

let removeMessage
const channel = dc.channel('pg:pool:remove')
const onMessage = (msg) => {
removeMessage = msg
}
channel.subscribe(onMessage)

pool.connect(function (err, client, release) {
if (err) return done(err)
// release with error to trigger removal
release(new Error('force remove'))
pool.end(() => {
expect(removeMessage).to.be.ok()
expect(removeMessage.client).to.be.ok()
expect(removeMessage.client.processID).to.be(12345)

channel.unsubscribe(onMessage)
done()
})
})
})
})
})
66 changes: 55 additions & 11 deletions packages/pg/lib/client.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ const Query = require('./query')
const defaults = require('./defaults')
const Connection = require('./connection')
const crypto = require('./crypto/utils')
const { queryChannel, connectionChannel, shouldTrace } = require('./diagnostics')

const activeQueryDeprecationNotice = nodeUtils.deprecate(
() => {},
Expand Down Expand Up @@ -207,18 +208,30 @@ class Client extends EventEmitter {

connect(callback) {
if (callback) {
this._connect(callback)
if (shouldTrace(connectionChannel)) {
const context = {
connection: { database: this.database, host: this.host, port: this.port, user: this.user, ssl: !!this.ssl },
}
connectionChannel.traceCallback((tracedCb) => this._connect(tracedCb), 0, context, null, callback)
} else {
this._connect(callback)
}
return
}

return new this._Promise((resolve, reject) => {
this._connect((error) => {
if (error) {
reject(error)
} else {
resolve(this)
const callback = (error) => {
if (error) reject(error)
else resolve(this)
}
if (shouldTrace(connectionChannel)) {
const context = {
connection: { database: this.database, host: this.host, port: this.port, user: this.user, ssl: !!this.ssl },
}
})
connectionChannel.traceCallback((tracedCb) => this._connect(tracedCb), 0, context, null, callback)
} else {
this._connect(callback)
}
})
}

Expand Down Expand Up @@ -687,11 +700,42 @@ class Client extends EventEmitter {
return result
}

if (this._queryQueue.length > 0) {
queryQueueLengthDeprecationNotice()
const enqueue = () => {
if (this._queryQueue.length > 0) queryQueueLengthDeprecationNotice()
this._queryQueue.push(query)
this._pulseQueryQueue()
}

if (shouldTrace(queryChannel) && query.callback) {
const context = {
query: { text: query.text, name: query.name, rowMode: query._rowMode },
client: {
database: this.database,
host: this.host,
port: this.port,
user: this.user,
processID: this.processID,
ssl: !!this.ssl,
},
}
const origCb = query.callback
const enrichedCb = (err, res) => {
if (res) context.result = { rowCount: res.rowCount, command: res.command }
return origCb(err, res)
}
queryChannel.traceCallback(
(tracedCb) => {
query.callback = tracedCb
enqueue()
},
0,
context,
null,
enrichedCb
)
} else {
enqueue()
Comment on lines +709 to +737
Copy link

Copilot AI Apr 7, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The tracing wrapper is only applied when query.callback is present. Queries executed in EventEmitter mode (user supplies a Query instance and listens for 'row'/'end' without a callback/promise) won’t emit pg:query lifecycle events, so instrumentation coverage is incomplete. Consider tracing those queries by attaching to the query’s 'end'/'error' events (or using a TracingChannel API that supports non-callback lifecycles) so all query styles are covered.

Copilot uses AI. Check for mistakes.
Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With OTel instrumentation as the guide here, they don't handle this so this PR also doesn't. I think this is an advanced use-case and users who do it are in a better position to instrument on their own with their APM's API.

}
this._queryQueue.push(query)
this._pulseQueryQueue()
return result
}

Expand Down
Loading
Loading