feat: support sample-weighted aggregations for sampled trace data#1963
feat: support sample-weighted aggregations for sampled trace data#1963vinzee wants to merge 3 commits intohyperdxio:mainfrom
Conversation
TraceSourceSchema has a new optional field `sampleRateExpression`. When undefined, SQL aggregations are unchanged. But when set (e.g. `SpanAttributes['SampleRate']`), SQL aggregations are rewritten to correct for upstream 1-in-N sampling: aggFn | Before | After (sample-corrected) | Overhead -------------- | ---------------------- | --------------------------------------------------- | -------- count | count() | sum(weight) | ~1x count + cond | countIf(cond) | sumIf(weight, cond) | ~1x avg | avg(col) | sum(col * weight) / sum(weight) | ~2x sum | sum(col) | sum(col * weight) | ~1x quantile(p) | quantile(p)(col) | quantileTDigestWeighted(p)(col, toUInt32(weight)) | ~1.5x min/max | unchanged | unchanged | 1x count_distinct | unchanged | unchanged (cannot correct) | 1x Weight is wrapped as greatest(toUInt64OrZero(toString(expr)), 1) so spans without a SampleRate attribute default to weight 1 (unsampled data produces identical results to the original queries). Types: - Add sampleWeightExpression to ChartConfig schema - Add sampleRateExpression to TraceSourceSchema + Mongoose model Query builder: - Rewrite aggFnExpr in renderChartConfig.ts when sampleWeightExpression is set, with safe default-to-1 wrapping Integration (propagate sampleWeightExpression to all chart configs): - ChartEditor/utils.ts, DBSearchPage, ServicesDashboardPage, sessions - DBDashboardPage (raw SQL + builder branches) - AlertPreviewChart - SessionSubpanel - ServiceDashboardEndpointPerformanceChart - ServiceDashboardSlowestEventsTile (p95 query + events table) - ServiceDashboardEndpointSidePanel (error rate + throughput) - ServiceDashboardDbQuerySidePanel (total query time + throughput) - External API v2 charts, AI controller, alerts (index + template) UI: - Add Sample Rate Expression field to trace source admin form
|
@vinzee is attempting to deploy a commit to the HyperDX Team on Vercel. A member of the Team first needs to authorize it. |
🦋 Changeset detectedLatest commit: 23872e7 The changes in this PR will be included in the next version bump. This PR includes changesets to release 4 packages
Not sure what this means? Click here to learn what changesets are. Click here if you're a maintainer who wants to add another changeset to this PR |
PR Review
|
add SampleRate migration for existing deployments, standardize sampleWeightExpression propagation, and document percentile approximation The initial sampling support (c61acd1) only added the SampleRate materialized column to the seed file, which runs on fresh installs. Existing deployments need an ALTER TABLE migration. Changes: - Add CH migration 000002 to create the SampleRate materialized column on existing otel_traces tables (IF NOT EXISTS, safe to re-run) - Standardize sampleWeightExpression propagation across 5 components (SessionSubpanel, ServiceDashboardDbQuerySidePanel, ServiceDashboardEndpointSidePanel, ServiceDashboardEndpointPerformanceChart, ServiceDashboardSlowestEventsTile) from direct assignment to conditional spread, matching the pattern used elsewhere - Note in SourceForm help text that percentiles under sampling use quantileTDigestWeighted (approximate T-Digest sketch)
| source.sampleRateExpression && { | ||
| sampleWeightExpression: source.sampleRateExpression, | ||
| }), | ||
| dateRange: [new Date(params.startTime), new Date(params.endTime)] as [ |
There was a problem hiding this comment.
This is repeated several times. Perhaps we can abstract it into a helper?
function getSampleWeightExpression(source: TSource) {
return source.kind === SourceKind.Trace &&
'sampleRateExpression' in source &&
source.sampleRateExpression
? source.sampleRateExpression
: undefined;
}
| const nullCheck = `${unsafeExpr.UNSAFE_RAW_SQL} IS NOT NULL`; | ||
| if (isWhereUsed) { | ||
| const cond = { UNSAFE_RAW_SQL: `${where} AND ${nullCheck}` }; | ||
| return chSql`sumIf(${weightedVal}, ${cond}) / sumIf(${w}, ${cond})`; |
There was a problem hiding this comment.
If all rows are filtered out or all have NULL values, the denominator could be 0.
| '@hyperdx/common-utils': patch | ||
| '@hyperdx/api': patch | ||
| '@hyperdx/app': patch | ||
| --- |
There was a problem hiding this comment.
lets update this to a minor bump instead of patch please
|
@vinzee Thank you for the PR. I left some feedback and will be testing this now. |
Review: Additional Integration Test Coverage & Division-by-Zero Edge CaseNice work on this feature — the core math and plumbing are solid. We ran the existing integration test suite against real ClickHouse and all 18 tests pass. We wrote 12 additional integration tests to cover scenarios the current suite doesn't exercise. All 30 pass. Suggested additional test casesBelow are the tests we'd recommend adding to Empty result sets (3 tests):
groupBy with weighted aggregations (3 tests):
Time-series with granularity (2 tests):
Edge cases with a separate table (4 tests):
Click to expand: full test code to addAdd an // Helper: render chart config to SQL, execute it, return all rows
async function executeChartConfigAllRows(
config: ChartConfigWithOptDateRange,
): Promise<Record<string, string>[]> {
const generatedSql = await renderChartConfig(
config,
metadata,
querySettings,
);
const sql = parameterizedQueryToSql(generatedSql);
const result = await client.query({
query: sql,
format: 'JSONEachRow',
});
return (await result.json()) as Record<string, string>[];
}Then add these tests after the existing it('weighted avg on empty result set: documents division-by-zero behavior', async () => {
const result = await executeChartConfig({
...baseConfig,
select: [
{
aggFn: 'avg',
valueExpression: 'Duration',
aggCondition: "ServiceName = 'nonexistent'",
aggConditionLanguage: 'sql',
},
],
});
const value = Object.values(result)[0];
const numValue = Number(value);
// ClickHouse may return NaN, inf, or 0 for 0/0 depending on column types.
// Any of these is acceptable — the key point is it doesn't error.
// A future improvement would wrap this in if(denom = 0, NULL, num/denom).
expect(
!Number.isFinite(numValue) || numValue === 0,
).toBe(true);
});
it('weighted sum on empty result set: should return 0', async () => {
const result = await executeChartConfig({
...baseConfig,
select: [
{
aggFn: 'sum',
valueExpression: 'Duration',
aggCondition: "ServiceName = 'nonexistent'",
aggConditionLanguage: 'sql',
},
],
});
const value = Number(Object.values(result)[0]);
expect(value).toBe(0);
});
it('weighted count on empty result set: should return 0', async () => {
const result = await executeChartConfig({
...baseConfig,
select: [
{
aggFn: 'count',
valueExpression: '',
aggCondition: "ServiceName = 'nonexistent'",
aggConditionLanguage: 'sql',
},
],
});
const value = Number(Object.values(result)[0]);
expect(value).toBe(0);
});
it('groupBy ServiceName: weighted count per group', async () => {
// api rows weights: 1+5+10+1+1+5 = 23
// web rows weights: 1+5+10+1 = 17
const rows = await executeChartConfigAllRows({
...baseConfig,
groupBy: 'ServiceName',
select: [
{
aggFn: 'count',
valueExpression: '',
aggCondition: '',
alias: 'weighted_count',
},
],
});
const byService = Object.fromEntries(
rows.map(r => [r['ServiceName'], Number(r['weighted_count'])]),
);
expect(byService['api']).toBe(23);
expect(byService['web']).toBe(17);
});
it('groupBy ServiceName: weighted avg(Duration) per group', async () => {
// api: sum=3530, weight=23, avg=153.478...
// web: sum=2700, weight=17, avg=158.823...
const rows = await executeChartConfigAllRows({
...baseConfig,
groupBy: 'ServiceName',
select: [
{
aggFn: 'avg',
valueExpression: 'Duration',
aggCondition: '',
alias: 'weighted_avg',
},
],
});
const byService = Object.fromEntries(
rows.map(r => [r['ServiceName'], Number(r['weighted_avg'])]),
);
expect(byService['api']).toBeCloseTo(3530 / 23, 2);
expect(byService['web']).toBeCloseTo(2700 / 17, 2);
});
it('groupBy ServiceName: weighted sum(Duration) per group', async () => {
// api: 100*1+200*5+150*10+250*1+80*1+120*5 = 3530
// web: 300*1+50*5+175*10+400*1 = 2700
const rows = await executeChartConfigAllRows({
...baseConfig,
groupBy: 'ServiceName',
select: [
{
aggFn: 'sum',
valueExpression: 'Duration',
aggCondition: '',
alias: 'weighted_sum',
},
],
});
const byService = Object.fromEntries(
rows.map(r => [r['ServiceName'], Number(r['weighted_sum'])]),
);
expect(byService['api']).toBe(3530);
expect(byService['web']).toBe(2700);
});
it('time-series with granularity: weighted count per time bucket', async () => {
const rows = await executeChartConfigAllRows({
...baseConfig,
displayType: DisplayType.Line,
granularity: '1 minute',
select: [
{
aggFn: 'count',
valueExpression: '',
aggCondition: '',
alias: 'weighted_count',
},
],
});
expect(rows.length).toBeGreaterThanOrEqual(1);
const totalCount = rows.reduce(
(acc, r) => acc + Number(r['weighted_count']),
0,
);
expect(totalCount).toBe(40);
});
it('time-series with groupBy: weighted count per service per time bucket', async () => {
const rows = await executeChartConfigAllRows({
...baseConfig,
displayType: DisplayType.Line,
granularity: '1 minute',
groupBy: 'ServiceName',
select: [
{
aggFn: 'count',
valueExpression: '',
aggCondition: '',
alias: 'weighted_count',
},
],
});
const byService = new Map<string, number>();
for (const r of rows) {
const svc = r['ServiceName'];
byService.set(svc, (byService.get(svc) ?? 0) + Number(r['weighted_count']));
}
expect(byService.get('api')).toBe(23);
expect(byService.get('web')).toBe(17);
});
describe('additional edge cases', () => {
const EDGE_TABLE = 'test_sample_weighted_edge';
beforeAll(async () => {
await client.command({
query: `
CREATE OR REPLACE TABLE ${DB}.${EDGE_TABLE} (
Timestamp DateTime64(9) CODEC(Delta(8), ZSTD(1)),
Duration Float64,
SampleRate UInt64,
ServiceName LowCardinality(String),
SpanAttributes Map(LowCardinality(String), String)
)
ENGINE = MergeTree()
ORDER BY (Timestamp)
`,
});
await client.command({
query: `
INSERT INTO ${DB}.${EDGE_TABLE}
(Timestamp, Duration, SampleRate, ServiceName, SpanAttributes)
VALUES
('2025-06-01 00:00:01', 100, 1, 'api', {'SampleRate': '1'}),
('2025-06-01 00:00:02', 200, 1, 'api', {'SampleRate': '1'}),
('2025-06-01 00:00:03', 300, 1, 'web', {'SampleRate': '1'}),
('2025-06-01 00:00:04', 400, 1, 'web', {'SampleRate': 'abc'}),
('2025-06-01 00:00:05', 50, 1000000, 'api', {'SampleRate': '1000000'})
`,
});
});
afterAll(async () => {
await client.command({
query: `DROP TABLE IF EXISTS ${DB}.${EDGE_TABLE}`,
});
});
const edgeConfig: ChartConfigWithOptDateRange = {
displayType: DisplayType.Table,
connection: 'test-connection',
from: { databaseName: DB, tableName: EDGE_TABLE },
select: [],
where: '',
whereLanguage: 'sql',
timestampValueExpression: 'Timestamp',
sampleWeightExpression: 'SampleRate',
dateRange: [new Date('2025-01-01'), new Date('2025-12-31')],
};
it('all SampleRate=1: weighted results should equal unweighted results', async () => {
const filterConfig = {
...edgeConfig,
where: "SampleRate = 1",
whereLanguage: 'sql' as const,
};
const weightedResult = await executeChartConfig({
...filterConfig,
select: [
{ aggFn: 'count', valueExpression: '', aggCondition: '', alias: 'wcount' },
{ aggFn: 'avg', valueExpression: 'Duration', aggCondition: '', alias: 'wavg' },
{ aggFn: 'sum', valueExpression: 'Duration', aggCondition: '', alias: 'wsum' },
],
});
const unweightedResult = await executeChartConfig({
...filterConfig,
sampleWeightExpression: undefined,
select: [
{ aggFn: 'count', valueExpression: '', aggCondition: '', alias: 'count' },
{ aggFn: 'avg', valueExpression: 'Duration', aggCondition: '', alias: 'avg' },
{ aggFn: 'sum', valueExpression: 'Duration', aggCondition: '', alias: 'sum' },
],
});
expect(Number(weightedResult['wcount'])).toBe(Number(unweightedResult['count']));
expect(Number(weightedResult['wavg'])).toBeCloseTo(Number(unweightedResult['avg']), 5);
expect(Number(weightedResult['wsum'])).toBeCloseTo(Number(unweightedResult['sum']), 5);
});
it('non-numeric SampleRate in SpanAttributes: should clamp to weight 1', async () => {
const result = await executeChartConfig({
...edgeConfig,
sampleWeightExpression: "SpanAttributes['SampleRate']",
select: [
{
aggFn: 'count',
valueExpression: '',
aggCondition: "ServiceName = 'web'",
aggConditionLanguage: 'sql',
},
],
});
const value = Number(Object.values(result)[0]);
// web rows: SpanAttributes['SampleRate'] = '1' (weight 1) + 'abc' (weight 1) = 2
expect(value).toBe(2);
});
it('very large SampleRate: should handle without overflow', async () => {
const result = await executeChartConfig({
...edgeConfig,
select: [
{ aggFn: 'count', valueExpression: '', aggCondition: '', alias: 'wcount' },
{ aggFn: 'sum', valueExpression: 'Duration', aggCondition: '', alias: 'wsum' },
{ aggFn: 'avg', valueExpression: 'Duration', aggCondition: '', alias: 'wavg' },
],
});
// Total weights: 1+1+1+1+1000000 = 1000004
expect(Number(result['wcount'])).toBe(1000004);
// Total weighted sum: 100*1+200*1+300*1+400*1+50*1000000 = 50001000
expect(Number(result['wsum'])).toBe(50001000);
// Weighted avg: 50001000/1000004 ≈ 50.000800...
expect(Number(result['wavg'])).toBeCloseTo(50001000 / 1000004, 2);
});
it('very large SampleRate: weighted avg dominated by high-weight row', async () => {
const result = await executeChartConfig({
...edgeConfig,
select: [
{ aggFn: 'avg', valueExpression: 'Duration', aggCondition: '' },
],
});
const value = Number(Object.values(result)[0]);
// With weight 1M on Duration=50, the avg should be very close to 50
expect(value).toBeGreaterThan(49);
expect(value).toBeLessThan(51);
});
});Bug: Division by zero in weighted avg for empty result setsIn chSql`sumIf(${weightedVal}, ${cond}) / sumIf(${w}, ${cond})`When no rows match the filter, both Suggested fix — wrap the denominator: // In the avg branch of aggFnExpr, for both the where and non-where cases:
const denom = isWhereUsed
? chSql`sumIf(${w}, ${cond})`
: chSql`sumIf(${w}, ${{ UNSAFE_RAW_SQL: nullCheck }})`;
const numer = isWhereUsed
? chSql`sumIf(${weightedVal}, ${cond})`
: chSql`sumIf(${weightedVal}, ${{ UNSAFE_RAW_SQL: nullCheck }})`;
return chSql`if(${denom} = 0, NULL, ${numer} / ${denom})`;This is a minor edge case (requires all rows to be filtered out), but worth fixing for correctness. |
Problem
High-throughput services can produce millions of spans per second. Storing every span is expensive, so we run the OpenTelemetry Collector's tail-sampling processor to keep only 1-in-N spans. Each kept span carries a
SampleRateattribute recording N.Once data is sampled, naive aggregations are wrong: count() returns N-x fewer events than actually occurred, sum()/avg() are biased, and percentiles shift. Dashboards show misleadingly low request counts, throughput, and error rates, making capacity planning and alerting unreliable.
Why Materialized Views Cannot Solve This Alone
A materialized view that pre-aggregates sampled spans is a useful performance optimization for known dashboard queries, but it cannot replace a sampling-aware query engine.
Fixed dimensions. A materialized view pre-aggregates by a fixed set of GROUP BY keys (e.g.
ServiceName,SpanName,StatusCode,TimestampBucket). Trace exploration requires slicing by arbitrary span attributes --http.target,k8s.pod.name, custom business tags -- in combinations that cannot be predicted at view creation time. Grouping by a different dimension either requires going back to raw table or a separate materialized views for every possible dimension combination. If you try to work around the fixed-dimensions problem by adding high-cardinality span attributes to the GROUP BY, the materialized table approaches a 1:1 row ratio with the raw table. You end up doubling storage without meaningful compression.Fixed aggregation fields. A typical MV only aggregates a single numeric column like
Duration. Users want weighted aggregations over any numeric attribute: request body sizes, queue depths, retry counts, custom metrics attached to spans. Each new field requires adding moreAggregateFunctioncolumns and recreating the view.Industry precedent. Platforms that rely solely on pre-aggregation (Datadog, Splunk, New Relic, Elastic) get accurate RED dashboards but cannot correct ad-hoc queries over sampled span data. Only query-engine weighting (Honeycomb) produces correct results for arbitrary ad-hoc queries, including weighted percentiles and heatmaps.
A better solution is making the query engine itself sampling-aware, so that all queries from dashboards, alerts, an ad-hoc searches, automatically weights by
SampleRateregardless of which dimensions or fields the user picks. Materialized views remain a useful complement for accelerating known, fixed-dimension dashboard panels, but they are not a substitute for correct query-time weighting.Summary
TraceSourceSchema gets a new optional field
sampleRateExpression- the ClickHouse expression that evaluates to the per-span sample rate (e.g.SpanAttributes['SampleRate']). When not configured, all queries are unchanged.When set, the query builder rewrites SQL aggregations to weight each span by its sample rate:
Types:
Query builder:
spans without a SampleRate attribute default to weight 1 (unsampled
data produces identical results to the original queries).
is set, with safe default-to-1 wrapping
Integration (propagate sampleWeightExpression to all chart configs):
UI:
Screenshots or video
How to test locally or on Vercel
References