-
-
Notifications
You must be signed in to change notification settings - Fork 1.3k
Add diagnostics_channel TracingChannel support #3650
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
b7f80ac
91ace72
5b50561
73c9946
c31eab5
3957e3c
b922f0d
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,35 @@ | ||
| 'use strict' | ||
|
|
||
| const noopChannel = { hasSubscribers: false } | ||
|
|
||
| let poolConnectChannel = noopChannel | ||
| let poolReleaseChannel = noopChannel | ||
| let poolRemoveChannel = noopChannel | ||
|
|
||
| try { | ||
| let dc | ||
| if (typeof process.getBuiltInModule === 'function') { | ||
| dc = process.getBuiltInModule('diagnostics_channel') | ||
| } else { | ||
| dc = require('diagnostics_channel') | ||
| } | ||
| if (typeof dc.tracingChannel === 'function') { | ||
| poolConnectChannel = dc.tracingChannel('pg:pool:connect') | ||
| } | ||
| if (typeof dc.channel === 'function') { | ||
| poolReleaseChannel = dc.channel('pg:pool:release') | ||
| poolRemoveChannel = dc.channel('pg:pool:remove') | ||
| } | ||
| } catch (e) { | ||
| // diagnostics_channel not available (non-Node environment) | ||
| } | ||
|
|
||
| // Check explicitly for `false` rather than truthiness because the aggregated | ||
| // `hasSubscribers` getter on TracingChannel is `undefined` on Node 18 (which | ||
| // backported TracingChannel but not the getter). When `undefined`, we assume | ||
| // there may be subscribers and trace unconditionally. | ||
| function shouldTrace(channel) { | ||
| return channel.hasSubscribers !== false | ||
| } | ||
|
|
||
| module.exports = { poolConnectChannel, poolReleaseChannel, poolRemoveChannel, shouldTrace } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,197 @@ | ||
| 'use strict' | ||
|
|
||
| const expect = require('expect.js') | ||
| const EventEmitter = require('events').EventEmitter | ||
| const describe = require('mocha').describe | ||
| const it = require('mocha').it | ||
| const dc = require('diagnostics_channel') | ||
| const Pool = require('../') | ||
|
|
||
| // TracingChannel exists on Node 18+ but the aggregated hasSubscribers getter | ||
| // and stable unsubscribe behavior require Node 19.9+/20.5+. Skip tracing | ||
| // tests on older versions where TracingChannel is missing or has internal bugs. | ||
| const hasStableTracingChannel = | ||
| typeof dc.tracingChannel === 'function' && typeof dc.tracingChannel('pg:pool:test:probe').hasSubscribers === 'boolean' | ||
|
|
||
| function mockClient(methods) { | ||
| return function () { | ||
| const client = new EventEmitter() | ||
| client.end = function (cb) { | ||
| if (cb) process.nextTick(cb) | ||
| } | ||
| client._queryable = true | ||
| client._ending = false | ||
| client.processID = 12345 | ||
| Object.assign(client, methods) | ||
| return client | ||
| } | ||
| } | ||
|
|
||
| describe('diagnostics channels', function () { | ||
| describe('pg:pool:connect', function () { | ||
| ;(hasStableTracingChannel ? it : it.skip)('publishes start event when connect is called', function (done) { | ||
| const pool = new Pool({ | ||
| Client: mockClient({ | ||
| connect: function (cb) { | ||
| process.nextTick(() => cb(null)) | ||
| }, | ||
| }), | ||
| }) | ||
|
|
||
| let capturedContext | ||
| const channel = dc.tracingChannel('pg:pool:connect') | ||
| const subs = { | ||
| start: (ctx) => { | ||
| capturedContext = ctx | ||
| }, | ||
| end: () => {}, | ||
| asyncStart: () => {}, | ||
| asyncEnd: () => {}, | ||
| error: () => {}, | ||
| } | ||
|
|
||
| channel.subscribe(subs) | ||
|
|
||
| pool.connect(function (err, client, release) { | ||
| if (err) return done(err) | ||
| release() | ||
| pool.end(() => { | ||
| expect(capturedContext).to.be.ok() | ||
| expect(capturedContext.pool).to.be.ok() | ||
| expect(capturedContext.pool.maxSize).to.be(10) | ||
| expect(capturedContext.pool.totalCount).to.be.a('number') | ||
|
|
||
| channel.unsubscribe(subs) | ||
| done() | ||
| }) | ||
| }) | ||
| }) | ||
| ;(hasStableTracingChannel ? it : it.skip)('enriches context with client info on asyncEnd', function (done) { | ||
| const pool = new Pool({ | ||
| Client: mockClient({ | ||
| connect: function (cb) { | ||
| process.nextTick(() => cb(null)) | ||
| }, | ||
| }), | ||
| }) | ||
|
|
||
| const channel = dc.tracingChannel('pg:pool:connect') | ||
| const subs = { | ||
| start: () => {}, | ||
| end: () => {}, | ||
| asyncStart: () => {}, | ||
| asyncEnd: (ctx) => { | ||
| expect(ctx.client).to.be.ok() | ||
| expect(ctx.client.processID).to.be(12345) | ||
|
|
||
| channel.unsubscribe(subs) | ||
| done() | ||
| }, | ||
| error: () => {}, | ||
| } | ||
|
|
||
| channel.subscribe(subs) | ||
|
|
||
| pool.connect(function (err, client, release) { | ||
| if (err) return done(err) | ||
| release() | ||
| pool.end() | ||
| }) | ||
| }) | ||
| }) | ||
|
|
||
| describe('pg:pool:release', function () { | ||
| it('publishes when a client is released', function (done) { | ||
| const pool = new Pool({ | ||
| Client: mockClient({ | ||
| connect: function (cb) { | ||
| process.nextTick(() => cb(null)) | ||
| }, | ||
| }), | ||
| }) | ||
|
|
||
| let releaseMessage | ||
| const channel = dc.channel('pg:pool:release') | ||
| const onMessage = (msg) => { | ||
| releaseMessage = msg | ||
| } | ||
| channel.subscribe(onMessage) | ||
|
|
||
| pool.connect(function (err, client, release) { | ||
| if (err) return done(err) | ||
| release() | ||
| pool.end(() => { | ||
| expect(releaseMessage).to.be.ok() | ||
| expect(releaseMessage.client).to.be.ok() | ||
| expect(releaseMessage.client.processID).to.be(12345) | ||
|
|
||
| channel.unsubscribe(onMessage) | ||
| done() | ||
| }) | ||
| }) | ||
| }) | ||
|
|
||
| it('includes error when released with error', function (done) { | ||
| const pool = new Pool({ | ||
| Client: mockClient({ | ||
| connect: function (cb) { | ||
| process.nextTick(() => cb(null)) | ||
| }, | ||
| }), | ||
| }) | ||
|
|
||
| let releaseMessage | ||
| const channel = dc.channel('pg:pool:release') | ||
| const onMessage = (msg) => { | ||
| releaseMessage = msg | ||
| } | ||
| channel.subscribe(onMessage) | ||
|
|
||
| pool.connect(function (err, client, release) { | ||
| if (err) return done(err) | ||
| const releaseError = new Error('test error') | ||
| release(releaseError) | ||
| pool.end(() => { | ||
| expect(releaseMessage).to.be.ok() | ||
| expect(releaseMessage.error).to.be(releaseError) | ||
|
|
||
| channel.unsubscribe(onMessage) | ||
| done() | ||
| }) | ||
| }) | ||
| }) | ||
| }) | ||
|
|
||
| describe('pg:pool:remove', function () { | ||
| it('publishes when a client is removed', function (done) { | ||
| const pool = new Pool({ | ||
| Client: mockClient({ | ||
| connect: function (cb) { | ||
| process.nextTick(() => cb(null)) | ||
| }, | ||
| }), | ||
| }) | ||
|
|
||
| let removeMessage | ||
| const channel = dc.channel('pg:pool:remove') | ||
| const onMessage = (msg) => { | ||
| removeMessage = msg | ||
| } | ||
| channel.subscribe(onMessage) | ||
|
|
||
| pool.connect(function (err, client, release) { | ||
| if (err) return done(err) | ||
| // release with error to trigger removal | ||
| release(new Error('force remove')) | ||
| pool.end(() => { | ||
| expect(removeMessage).to.be.ok() | ||
| expect(removeMessage.client).to.be.ok() | ||
| expect(removeMessage.client.processID).to.be(12345) | ||
|
|
||
| channel.unsubscribe(onMessage) | ||
| done() | ||
| }) | ||
| }) | ||
| }) | ||
| }) | ||
| }) |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -9,6 +9,7 @@ const Query = require('./query') | |
| const defaults = require('./defaults') | ||
| const Connection = require('./connection') | ||
| const crypto = require('./crypto/utils') | ||
| const { queryChannel, connectionChannel, shouldTrace } = require('./diagnostics') | ||
|
|
||
| const activeQueryDeprecationNotice = nodeUtils.deprecate( | ||
| () => {}, | ||
|
|
@@ -207,18 +208,30 @@ class Client extends EventEmitter { | |
|
|
||
| connect(callback) { | ||
| if (callback) { | ||
| this._connect(callback) | ||
| if (shouldTrace(connectionChannel)) { | ||
| const context = { | ||
| connection: { database: this.database, host: this.host, port: this.port, user: this.user, ssl: !!this.ssl }, | ||
| } | ||
| connectionChannel.traceCallback((tracedCb) => this._connect(tracedCb), 0, context, null, callback) | ||
| } else { | ||
| this._connect(callback) | ||
| } | ||
| return | ||
| } | ||
|
|
||
| return new this._Promise((resolve, reject) => { | ||
| this._connect((error) => { | ||
| if (error) { | ||
| reject(error) | ||
| } else { | ||
| resolve(this) | ||
| const callback = (error) => { | ||
| if (error) reject(error) | ||
| else resolve(this) | ||
| } | ||
| if (shouldTrace(connectionChannel)) { | ||
| const context = { | ||
| connection: { database: this.database, host: this.host, port: this.port, user: this.user, ssl: !!this.ssl }, | ||
| } | ||
| }) | ||
| connectionChannel.traceCallback((tracedCb) => this._connect(tracedCb), 0, context, null, callback) | ||
| } else { | ||
| this._connect(callback) | ||
| } | ||
| }) | ||
| } | ||
|
|
||
|
|
@@ -687,11 +700,42 @@ class Client extends EventEmitter { | |
| return result | ||
| } | ||
|
|
||
| if (this._queryQueue.length > 0) { | ||
| queryQueueLengthDeprecationNotice() | ||
| const enqueue = () => { | ||
| if (this._queryQueue.length > 0) queryQueueLengthDeprecationNotice() | ||
| this._queryQueue.push(query) | ||
| this._pulseQueryQueue() | ||
| } | ||
|
|
||
| if (shouldTrace(queryChannel) && query.callback) { | ||
| const context = { | ||
| query: { text: query.text, name: query.name, rowMode: query._rowMode }, | ||
| client: { | ||
| database: this.database, | ||
| host: this.host, | ||
| port: this.port, | ||
| user: this.user, | ||
| processID: this.processID, | ||
| ssl: !!this.ssl, | ||
| }, | ||
| } | ||
| const origCb = query.callback | ||
| const enrichedCb = (err, res) => { | ||
| if (res) context.result = { rowCount: res.rowCount, command: res.command } | ||
| return origCb(err, res) | ||
| } | ||
| queryChannel.traceCallback( | ||
| (tracedCb) => { | ||
| query.callback = tracedCb | ||
| enqueue() | ||
| }, | ||
| 0, | ||
| context, | ||
| null, | ||
| enrichedCb | ||
| ) | ||
| } else { | ||
| enqueue() | ||
|
Comment on lines
+709
to
+737
|
||
| } | ||
| this._queryQueue.push(query) | ||
| this._pulseQueryQueue() | ||
| return result | ||
| } | ||
|
|
||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shouldTrace()returns true whenchannel.hasSubscribersisundefined, which can causepublish()/traceCallback()to run unconditionally on runtimes wherehasSubscribersisn’t implemented. If the goal is “no overhead when there are no subscribers”, consider a safer subscriber check (or defaultundefinedtofalse) and document any Node-version limitations explicitly.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Already discussed above.