diff --git a/go.mod b/go.mod index 5ad247cd4..f7eecf45d 100644 --- a/go.mod +++ b/go.mod @@ -30,17 +30,17 @@ require ( github.com/stretchr/testify v1.11.1 github.com/twitchtv/twirp v8.1.3+incompatible github.com/zeebo/xxh3 v1.0.2 - go.opentelemetry.io/otel v1.39.0 + go.opentelemetry.io/otel v1.40.0 go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.39.0 - go.opentelemetry.io/otel/sdk v1.39.0 - go.opentelemetry.io/otel/trace v1.39.0 + go.opentelemetry.io/otel/sdk v1.40.0 + go.opentelemetry.io/otel/trace v1.40.0 go.uber.org/atomic v1.11.0 go.uber.org/multierr v1.11.0 go.uber.org/zap v1.27.0 go.uber.org/zap/exp v0.3.0 golang.org/x/exp v0.0.0-20250620022241-b7579e27df2b golang.org/x/mod v0.29.0 - golang.org/x/sys v0.39.0 + golang.org/x/sys v0.40.0 golang.org/x/text v0.31.0 google.golang.org/genproto/googleapis/rpc v0.0.0-20251202230838-ff82c1b0f217 google.golang.org/grpc v1.77.0 @@ -89,7 +89,7 @@ require ( github.com/wlynxg/anet v0.0.5 // indirect go.opentelemetry.io/auto/sdk v1.2.1 // indirect go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.39.0 // indirect - go.opentelemetry.io/otel/metric v1.39.0 // indirect + go.opentelemetry.io/otel/metric v1.40.0 // indirect go.opentelemetry.io/proto/otlp v1.9.0 // indirect golang.org/x/crypto v0.45.0 // indirect golang.org/x/net v0.47.0 // indirect diff --git a/go.sum b/go.sum index 2e3e79609..29f6a9888 100644 --- a/go.sum +++ b/go.sum @@ -217,20 +217,20 @@ github.com/zeebo/xxh3 v1.0.2 h1:xZmwmqxHZA8AI603jOQ0tMqmBr9lPeFwGg6d+xy9DC0= github.com/zeebo/xxh3 v1.0.2/go.mod h1:5NWz9Sef7zIDm2JHfFlcQvNekmcEl9ekUZQQKCYaDcA= go.opentelemetry.io/auto/sdk v1.2.1 h1:jXsnJ4Lmnqd11kwkBV2LgLoFMZKizbCi5fNZ/ipaZ64= go.opentelemetry.io/auto/sdk v1.2.1/go.mod h1:KRTj+aOaElaLi+wW1kO/DZRXwkF4C5xPbEe3ZiIhN7Y= -go.opentelemetry.io/otel v1.39.0 h1:8yPrr/S0ND9QEfTfdP9V+SiwT4E0G7Y5MO7p85nis48= -go.opentelemetry.io/otel v1.39.0/go.mod h1:kLlFTywNWrFyEdH0oj2xK0bFYZtHRYUdv1NklR/tgc8= +go.opentelemetry.io/otel v1.40.0 h1:oA5YeOcpRTXq6NN7frwmwFR0Cn3RhTVZvXsP4duvCms= +go.opentelemetry.io/otel v1.40.0/go.mod h1:IMb+uXZUKkMXdPddhwAHm6UfOwJyh4ct1ybIlV14J0g= go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.39.0 h1:f0cb2XPmrqn4XMy9PNliTgRKJgS5WcL/u0/WRYGz4t0= go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.39.0/go.mod h1:vnakAaFckOMiMtOIhFI2MNH4FYrZzXCYxmb1LlhoGz8= go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.39.0 h1:Ckwye2FpXkYgiHX7fyVrN1uA/UYd9ounqqTuSNAv0k4= go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.39.0/go.mod h1:teIFJh5pW2y+AN7riv6IBPX2DuesS3HgP39mwOspKwU= -go.opentelemetry.io/otel/metric v1.39.0 h1:d1UzonvEZriVfpNKEVmHXbdf909uGTOQjA0HF0Ls5Q0= -go.opentelemetry.io/otel/metric v1.39.0/go.mod h1:jrZSWL33sD7bBxg1xjrqyDjnuzTUB0x1nBERXd7Ftcs= -go.opentelemetry.io/otel/sdk v1.39.0 h1:nMLYcjVsvdui1B/4FRkwjzoRVsMK8uL/cj0OyhKzt18= -go.opentelemetry.io/otel/sdk v1.39.0/go.mod h1:vDojkC4/jsTJsE+kh+LXYQlbL8CgrEcwmt1ENZszdJE= -go.opentelemetry.io/otel/sdk/metric v1.39.0 h1:cXMVVFVgsIf2YL6QkRF4Urbr/aMInf+2WKg+sEJTtB8= -go.opentelemetry.io/otel/sdk/metric v1.39.0/go.mod h1:xq9HEVH7qeX69/JnwEfp6fVq5wosJsY1mt4lLfYdVew= -go.opentelemetry.io/otel/trace v1.39.0 h1:2d2vfpEDmCJ5zVYz7ijaJdOF59xLomrvj7bjt6/qCJI= -go.opentelemetry.io/otel/trace v1.39.0/go.mod h1:88w4/PnZSazkGzz/w84VHpQafiU4EtqqlVdxWy+rNOA= +go.opentelemetry.io/otel/metric v1.40.0 h1:rcZe317KPftE2rstWIBitCdVp89A2HqjkxR3c11+p9g= +go.opentelemetry.io/otel/metric v1.40.0/go.mod h1:ib/crwQH7N3r5kfiBZQbwrTge743UDc7DTFVZrrXnqc= +go.opentelemetry.io/otel/sdk v1.40.0 h1:KHW/jUzgo6wsPh9At46+h4upjtccTmuZCFAc9OJ71f8= +go.opentelemetry.io/otel/sdk v1.40.0/go.mod h1:Ph7EFdYvxq72Y8Li9q8KebuYUr2KoeyHx0DRMKrYBUE= +go.opentelemetry.io/otel/sdk/metric v1.40.0 h1:mtmdVqgQkeRxHgRv4qhyJduP3fYJRMX4AtAlbuWdCYw= +go.opentelemetry.io/otel/sdk/metric v1.40.0/go.mod h1:4Z2bGMf0KSK3uRjlczMOeMhKU2rhUqdWNoKcYrtcBPg= +go.opentelemetry.io/otel/trace v1.40.0 h1:WA4etStDttCSYuhwvEa8OP8I5EWu24lkOzp+ZYblVjw= +go.opentelemetry.io/otel/trace v1.40.0/go.mod h1:zeAhriXecNGP/s2SEG3+Y8X9ujcJOTqQ5RgdEJcawiA= go.opentelemetry.io/proto/otlp v1.9.0 h1:l706jCMITVouPOqEnii2fIAuO3IVGBRPV5ICjceRb/A= go.opentelemetry.io/proto/otlp v1.9.0/go.mod h1:xE+Cx5E/eEHw+ISFkwPLwCZefwVjY+pqKg1qcK03+/4= go.uber.org/atomic v1.11.0 h1:ZvwS0R+56ePWxUNi+Atn9dWONBPp/AUETXlHW0DxSjE= @@ -274,8 +274,8 @@ golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.17.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= -golang.org/x/sys v0.39.0 h1:CvCKL8MeisomCi6qNZ+wbb0DN9E5AATixKsvNtMoMFk= -golang.org/x/sys v0.39.0/go.mod h1:OgkHotnGiDImocRcuBABYBEXf8A9a87e/uXjp9XT3ks= +golang.org/x/sys v0.40.0 h1:DBZZqJ2Rkml6QMQsZywtnjnnGvHza6BTfYFWY9kjEWQ= +golang.org/x/sys v0.40.0/go.mod h1:OgkHotnGiDImocRcuBABYBEXf8A9a87e/uXjp9XT3ks= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= golang.org/x/term v0.5.0/go.mod h1:jMB1sMXY+tzblOD4FWmEbocvup2/aLOaQEp7JmGp78k= diff --git a/observability/egressobs/egress.go b/observability/egressobs/egress.go new file mode 100644 index 000000000..96f67ccf3 --- /dev/null +++ b/observability/egressobs/egress.go @@ -0,0 +1,149 @@ +package egressobs + +import ( + "encoding/json" + + "google.golang.org/protobuf/encoding/protojson" + + "github.com/pkg/errors" + + "github.com/livekit/protocol/livekit" +) + +type EgressResults struct { + FileResults []*livekit.FileInfo + StreamResults []*livekit.StreamInfo + SegmentResults []*livekit.SegmentsInfo + ImageResults []*livekit.ImagesInfo +} + +func GetSourceType(info *livekit.EgressInfo) SessionSourceType { + switch info.SourceType { + case livekit.EgressSourceType_EGRESS_SOURCE_TYPE_WEB: + return SessionSourceTypeWeb + case livekit.EgressSourceType_EGRESS_SOURCE_TYPE_SDK: + return SessionSourceTypeSdk + default: + return SessionSourceTypeUndefined + } +} + +func GetRequestType(info *livekit.EgressInfo) EgressRequestType { + switch info.Request.(type) { + case *livekit.EgressInfo_RoomComposite: + return EgressRequestTypeRoomComposite + case *livekit.EgressInfo_Web: + return EgressRequestTypeWeb + case *livekit.EgressInfo_Participant: + return EgressRequestTypeParticipant + case *livekit.EgressInfo_TrackComposite: + return EgressRequestTypeTrackComposite + case *livekit.EgressInfo_Track: + return EgressRequestTypeTrack + default: + return EgressRequestTypeUndefined + } +} + +func GetStatus(info *livekit.EgressInfo) SessionStatus { + switch info.Status { + case livekit.EgressStatus_EGRESS_STARTING: + return SessionStatusStarting + case livekit.EgressStatus_EGRESS_ACTIVE: + return SessionStatusActive + case livekit.EgressStatus_EGRESS_ENDING: + return SessionStatusEnding + case livekit.EgressStatus_EGRESS_COMPLETE: + return SessionStatusComplete + case livekit.EgressStatus_EGRESS_ABORTED: + return SessionStatusAborted + case livekit.EgressStatus_EGRESS_LIMIT_REACHED: + return SessionStatusLimitReached + case livekit.EgressStatus_EGRESS_FAILED: + return SessionStatusFailed + default: + return SessionStatusUndefined + } +} + +func GetRequest(info *livekit.EgressInfo) (string, error) { + switch req := info.Request.(type) { + case *livekit.EgressInfo_RoomComposite: + b, err := protojson.Marshal(req.RoomComposite) + if err != nil { + return "", errors.Wrap(err, "failed serializing RoomComposite request") + } + return string(b), nil + case *livekit.EgressInfo_Web: + b, err := protojson.Marshal(req.Web) + if err != nil { + return "", errors.Wrap(err, "failed serializing Web request") + } + return string(b), nil + case *livekit.EgressInfo_Participant: + b, err := protojson.Marshal(req.Participant) + if err != nil { + return "", errors.Wrap(err, "failed serializing Participant request") + } + return string(b), nil + case *livekit.EgressInfo_TrackComposite: + b, err := protojson.Marshal(req.TrackComposite) + if err != nil { + return "", errors.Wrap(err, "failed serializing TrackComposite request") + } + return string(b), nil + case *livekit.EgressInfo_Track: + b, err := protojson.Marshal(req.Track) + if err != nil { + return "", errors.Wrap(err, "failed serializing Track request") + } + return string(b), nil + default: + return "", nil + } +} + +func GetResult(info *livekit.EgressInfo) (string, error) { + if file := info.GetFile(); file != nil { + b, err := protojson.Marshal(file) + if err != nil { + return "", errors.Wrap(err, "failed serializing File result") + } + return string(b), nil + } else if stream := info.GetStream(); stream != nil { + b, err := protojson.Marshal(stream) + if err != nil { + return "", errors.Wrap(err, "failed serializing Stream result") + } + return string(b), nil + } else if segments := info.GetSegments(); segments != nil { + b, err := protojson.Marshal(segments) + if err != nil { + return "", errors.Wrap(err, "failed serializing Segments result") + } + return string(b), nil + } else { + results := &EgressResults{ + FileResults: info.FileResults, + StreamResults: info.StreamResults, + SegmentResults: info.SegmentResults, + ImageResults: info.ImageResults, + } + b, err := json.Marshal(results) + if err != nil { + return "", errors.Wrap(err, "failed serializing Multiple result") + } + return string(b), nil + } +} + +func GetAudioOnly(info *livekit.EgressInfo) bool { + switch req := info.Request.(type) { + case *livekit.EgressInfo_RoomComposite: + return req.RoomComposite.AudioOnly + case *livekit.EgressInfo_Web: + return req.Web.AudioOnly + default: + return false + } +} diff --git a/observability/egressobs/egress_test.go b/observability/egressobs/egress_test.go new file mode 100644 index 000000000..533c9f25b --- /dev/null +++ b/observability/egressobs/egress_test.go @@ -0,0 +1,289 @@ +package egressobs + +import ( + "testing" + + "github.com/livekit/protocol/livekit" + "github.com/stretchr/testify/require" +) + +func TestGetSourceType(t *testing.T) { + tests := []struct { + sourceType livekit.EgressSourceType + expected string + }{ + {livekit.EgressSourceType_EGRESS_SOURCE_TYPE_WEB, "web"}, + {livekit.EgressSourceType_EGRESS_SOURCE_TYPE_SDK, "sdk"}, + {livekit.EgressSourceType(99), ""}, // Unknown value falls back to undefined (empty string) + } + + for _, tt := range tests { + t.Run(tt.sourceType.String(), func(t *testing.T) { + info := &livekit.EgressInfo{SourceType: tt.sourceType} + result := GetSourceType(info) + require.Equal(t, tt.expected, string(result)) + }) + } +} + +func TestGetRequestType(t *testing.T) { + tests := []struct { + name string + info *livekit.EgressInfo + expected string + }{ + { + name: "RoomComposite", + info: &livekit.EgressInfo{ + Request: &livekit.EgressInfo_RoomComposite{ + RoomComposite: &livekit.RoomCompositeEgressRequest{}, + }, + }, + expected: "room_composite", + }, + { + name: "Web", + info: &livekit.EgressInfo{ + Request: &livekit.EgressInfo_Web{ + Web: &livekit.WebEgressRequest{}, + }, + }, + expected: "web", + }, + { + name: "Participant", + info: &livekit.EgressInfo{ + Request: &livekit.EgressInfo_Participant{ + Participant: &livekit.ParticipantEgressRequest{}, + }, + }, + expected: "participant", + }, + { + name: "TrackComposite", + info: &livekit.EgressInfo{ + Request: &livekit.EgressInfo_TrackComposite{ + TrackComposite: &livekit.TrackCompositeEgressRequest{}, + }, + }, + expected: "track_composite", + }, + { + name: "Track", + info: &livekit.EgressInfo{ + Request: &livekit.EgressInfo_Track{ + Track: &livekit.TrackEgressRequest{}, + }, + }, + expected: "track", + }, + { + name: "Undefined", + info: &livekit.EgressInfo{}, + expected: "", // Undefined is an empty string + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + result := GetRequestType(tt.info) + require.Equal(t, tt.expected, string(result)) + }) + } +} + +func TestGetAudioOnly(t *testing.T) { + tests := []struct { + name string + info *livekit.EgressInfo + audioOnly bool + }{ + { + name: "RoomComposite audio only", + info: &livekit.EgressInfo{ + Request: &livekit.EgressInfo_RoomComposite{ + RoomComposite: &livekit.RoomCompositeEgressRequest{AudioOnly: true}, + }, + }, + audioOnly: true, + }, + { + name: "RoomComposite not audio only", + info: &livekit.EgressInfo{ + Request: &livekit.EgressInfo_RoomComposite{ + RoomComposite: &livekit.RoomCompositeEgressRequest{AudioOnly: false}, + }, + }, + audioOnly: false, + }, + { + name: "Web audio only", + info: &livekit.EgressInfo{ + Request: &livekit.EgressInfo_Web{ + Web: &livekit.WebEgressRequest{AudioOnly: true}, + }, + }, + audioOnly: true, + }, + { + name: "Track request returns false", + info: &livekit.EgressInfo{ + Request: &livekit.EgressInfo_Track{ + Track: &livekit.TrackEgressRequest{}, + }, + }, + audioOnly: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + require.Equal(t, tt.audioOnly, GetAudioOnly(tt.info)) + }) + } +} + +func TestGetRequest(t *testing.T) { + tests := []struct { + name string + info *livekit.EgressInfo + }{ + { + name: "RoomComposite", + info: &livekit.EgressInfo{ + Request: &livekit.EgressInfo_RoomComposite{ + RoomComposite: &livekit.RoomCompositeEgressRequest{ + RoomName: "test-room", + }, + }, + }, + }, + { + name: "Web", + info: &livekit.EgressInfo{ + Request: &livekit.EgressInfo_Web{ + Web: &livekit.WebEgressRequest{ + Url: "https://example.com", + }, + }, + }, + }, + { + name: "Participant", + info: &livekit.EgressInfo{ + Request: &livekit.EgressInfo_Participant{ + Participant: &livekit.ParticipantEgressRequest{ + RoomName: "test-room", + }, + }, + }, + }, + { + name: "TrackComposite", + info: &livekit.EgressInfo{ + Request: &livekit.EgressInfo_TrackComposite{ + TrackComposite: &livekit.TrackCompositeEgressRequest{ + RoomName: "test-room", + }, + }, + }, + }, + { + name: "Track", + info: &livekit.EgressInfo{ + Request: &livekit.EgressInfo_Track{ + Track: &livekit.TrackEgressRequest{ + RoomName: "test-room", + }, + }, + }, + }, + { + name: "Undefined", + info: &livekit.EgressInfo{}, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + result, err := GetRequest(tt.info) + require.NoError(t, err) + if tt.info.Request == nil { + require.Empty(t, result) + } else { + require.NotEmpty(t, result) + } + }) + } +} + +func TestGetResult(t *testing.T) { + tests := []struct { + name string + info *livekit.EgressInfo + }{ + { + name: "FileResult", + info: &livekit.EgressInfo{ + Result: &livekit.EgressInfo_File{ + File: &livekit.FileInfo{Filename: "test.mp4"}, + }, + }, + }, + { + name: "StreamResult", + info: &livekit.EgressInfo{ + Result: &livekit.EgressInfo_Stream{ + Stream: &livekit.StreamInfoList{}, + }, + }, + }, + { + name: "SegmentResult", + info: &livekit.EgressInfo{ + Result: &livekit.EgressInfo_Segments{ + Segments: &livekit.SegmentsInfo{}, + }, + }, + }, + { + name: "MultipleResults", + info: &livekit.EgressInfo{ + FileResults: []*livekit.FileInfo{ + {Filename: "test.mp4"}, + }, + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + result, err := GetResult(tt.info) + require.NoError(t, err) + require.NotEmpty(t, result) + }) + } +} + +func TestGetStatus(t *testing.T) { + tests := []struct { + status livekit.EgressStatus + expected string + }{ + {livekit.EgressStatus_EGRESS_STARTING, "starting"}, + {livekit.EgressStatus_EGRESS_ACTIVE, "active"}, + {livekit.EgressStatus_EGRESS_ENDING, "ending"}, + {livekit.EgressStatus_EGRESS_COMPLETE, "complete"}, + {livekit.EgressStatus_EGRESS_ABORTED, "aborted"}, + {livekit.EgressStatus_EGRESS_LIMIT_REACHED, "limit_reached"}, + {livekit.EgressStatus_EGRESS_FAILED, "failed"}, + } + + for _, tt := range tests { + t.Run(tt.status.String(), func(t *testing.T) { + info := &livekit.EgressInfo{Status: tt.status} + result := GetStatus(info) + require.Equal(t, tt.expected, string(result)) + }) + } +} diff --git a/observability/egressobs/gen_reporter.go b/observability/egressobs/gen_reporter.go index d33fcda45..602a04dd2 100644 --- a/observability/egressobs/gen_reporter.go +++ b/observability/egressobs/gen_reporter.go @@ -6,7 +6,7 @@ import ( "time" ) -const Version_57DK1I8 = true +const Version_QRLBLU8 = true type KeyResolver interface { Resolve(string) @@ -30,29 +30,50 @@ type ProjectReporter interface { } type EgressTx interface { - ReportStartTime(v time.Time) - ReportEndTime(v time.Time) - ReportUpdateTime(v time.Time) - ReportDuration(v uint64) ReportRequestType(v EgressRequestType) - ReportSourceType(v EgressSourceType) - ReportRegion(v string) ReportRoomName(v string) - ReportRoomID(v string) + ReportRequest(v string) + ReportAudioOnly(v bool) + ReportStartTime(v time.Time) + ReportEndTime(v time.Time) ReportStatus(v EgressStatus) ReportDetails(v string) ReportError(v string) ReportErrorCode(v int32) - ReportManifestLocation(v string) - ReportBackupStorageUsed(v bool) ReportResult(v string) - ReportRequest(v string) - ReportAudioOnly(v bool) + ReportManifestLocation(v string) } type EgressReporter interface { RegisterFunc(func(ts time.Time, tx EgressTx) bool) Tx(func(tx EgressTx)) TxAt(time.Time, func(tx EgressTx)) + WithSession(id string) SessionReporter + WithDeferredSession() (SessionReporter, KeyResolver) EgressTx } + +type SessionTx interface { + ReportStartTime(v time.Time) + ReportEndTime(v time.Time) + ReportUpdateTime(v time.Time) + ReportDuration(v uint64) + ReportRetryCount(v uint32) + ReportSourceType(v SessionSourceType) + ReportRegion(v string) + ReportRoomID(v string) + ReportStatus(v SessionStatus) + ReportDetails(v string) + ReportError(v string) + ReportErrorCode(v int32) + ReportManifestLocation(v string) + ReportBackupStorageUsed(v bool) + ReportResult(v string) +} + +type SessionReporter interface { + RegisterFunc(func(ts time.Time, tx SessionTx) bool) + Tx(func(tx SessionTx)) + TxAt(time.Time, func(tx SessionTx)) + SessionTx +} diff --git a/observability/egressobs/gen_reporter_noop.go b/observability/egressobs/gen_reporter_noop.go index bdda1fcd0..5aca3867f 100644 --- a/observability/egressobs/gen_reporter_noop.go +++ b/observability/egressobs/gen_reporter_noop.go @@ -10,6 +10,7 @@ var ( _ Reporter = (*noopReporter)(nil) _ ProjectReporter = (*noopProjectReporter)(nil) _ EgressReporter = (*noopEgressReporter)(nil) + _ SessionReporter = (*noopSessionReporter)(nil) ) type noopKeyResolver struct{} @@ -56,21 +57,46 @@ func NewNoopEgressReporter() EgressReporter { func (r *noopEgressReporter) RegisterFunc(f func(ts time.Time, tx EgressTx) bool) {} func (r *noopEgressReporter) Tx(f func(EgressTx)) {} func (r *noopEgressReporter) TxAt(ts time.Time, f func(EgressTx)) {} -func (r *noopEgressReporter) ReportStartTime(v time.Time) {} -func (r *noopEgressReporter) ReportEndTime(v time.Time) {} -func (r *noopEgressReporter) ReportUpdateTime(v time.Time) {} -func (r *noopEgressReporter) ReportDuration(v uint64) {} func (r *noopEgressReporter) ReportRequestType(v EgressRequestType) {} -func (r *noopEgressReporter) ReportSourceType(v EgressSourceType) {} -func (r *noopEgressReporter) ReportRegion(v string) {} func (r *noopEgressReporter) ReportRoomName(v string) {} -func (r *noopEgressReporter) ReportRoomID(v string) {} +func (r *noopEgressReporter) ReportRequest(v string) {} +func (r *noopEgressReporter) ReportAudioOnly(v bool) {} +func (r *noopEgressReporter) ReportStartTime(v time.Time) {} +func (r *noopEgressReporter) ReportEndTime(v time.Time) {} func (r *noopEgressReporter) ReportStatus(v EgressStatus) {} func (r *noopEgressReporter) ReportDetails(v string) {} func (r *noopEgressReporter) ReportError(v string) {} func (r *noopEgressReporter) ReportErrorCode(v int32) {} -func (r *noopEgressReporter) ReportManifestLocation(v string) {} -func (r *noopEgressReporter) ReportBackupStorageUsed(v bool) {} func (r *noopEgressReporter) ReportResult(v string) {} -func (r *noopEgressReporter) ReportRequest(v string) {} -func (r *noopEgressReporter) ReportAudioOnly(v bool) {} +func (r *noopEgressReporter) ReportManifestLocation(v string) {} +func (r *noopEgressReporter) WithSession(id string) SessionReporter { + return &noopSessionReporter{} +} +func (r *noopEgressReporter) WithDeferredSession() (SessionReporter, KeyResolver) { + return &noopSessionReporter{}, noopKeyResolver{} +} + +type noopSessionReporter struct{} + +func NewNoopSessionReporter() SessionReporter { + return &noopSessionReporter{} +} + +func (r *noopSessionReporter) RegisterFunc(f func(ts time.Time, tx SessionTx) bool) {} +func (r *noopSessionReporter) Tx(f func(SessionTx)) {} +func (r *noopSessionReporter) TxAt(ts time.Time, f func(SessionTx)) {} +func (r *noopSessionReporter) ReportStartTime(v time.Time) {} +func (r *noopSessionReporter) ReportEndTime(v time.Time) {} +func (r *noopSessionReporter) ReportUpdateTime(v time.Time) {} +func (r *noopSessionReporter) ReportDuration(v uint64) {} +func (r *noopSessionReporter) ReportRetryCount(v uint32) {} +func (r *noopSessionReporter) ReportSourceType(v SessionSourceType) {} +func (r *noopSessionReporter) ReportRegion(v string) {} +func (r *noopSessionReporter) ReportRoomID(v string) {} +func (r *noopSessionReporter) ReportStatus(v SessionStatus) {} +func (r *noopSessionReporter) ReportDetails(v string) {} +func (r *noopSessionReporter) ReportError(v string) {} +func (r *noopSessionReporter) ReportErrorCode(v int32) {} +func (r *noopSessionReporter) ReportManifestLocation(v string) {} +func (r *noopSessionReporter) ReportBackupStorageUsed(v bool) {} +func (r *noopSessionReporter) ReportResult(v string) {} diff --git a/observability/egressobs/gen_source.go b/observability/egressobs/gen_source.go index 978cf535b..cee8b356a 100644 --- a/observability/egressobs/gen_source.go +++ b/observability/egressobs/gen_source.go @@ -12,14 +12,6 @@ const ( EgressRequestTypeWeb EgressRequestType = "web" ) -type EgressSourceType string - -const ( - EgressSourceTypeUndefined EgressSourceType = "" - EgressSourceTypeSdk EgressSourceType = "sdk" - EgressSourceTypeWeb EgressSourceType = "web" -) - type EgressStatus string const ( @@ -33,6 +25,27 @@ const ( EgressStatusLimitReached EgressStatus = "limit_reached" ) +type SessionSourceType string + +const ( + SessionSourceTypeUndefined SessionSourceType = "" + SessionSourceTypeSdk SessionSourceType = "sdk" + SessionSourceTypeWeb SessionSourceType = "web" +) + +type SessionStatus string + +const ( + SessionStatusUndefined SessionStatus = "" + SessionStatusStarting SessionStatus = "starting" + SessionStatusActive SessionStatus = "active" + SessionStatusEnding SessionStatus = "ending" + SessionStatusComplete SessionStatus = "complete" + SessionStatusFailed SessionStatus = "failed" + SessionStatusAborted SessionStatus = "aborted" + SessionStatusLimitReached SessionStatus = "limit_reached" +) + type Rollup string const ( @@ -42,4 +55,6 @@ const ( RollupEndTimeIndex Rollup = "end_time_index" RollupStartTimeIndex Rollup = "start_time_index" RollupRoomNameIndex Rollup = "room_name_index" + RollupEgressEgress Rollup = "egress_egress" + RollupSessionIndex Rollup = "session_index" ) diff --git a/observability/gatewayobs/gen_reporter.go b/observability/gatewayobs/gen_reporter.go index dfd3b4fc4..071937812 100644 --- a/observability/gatewayobs/gen_reporter.go +++ b/observability/gatewayobs/gen_reporter.go @@ -6,7 +6,7 @@ import ( "time" ) -const Version_J2LFGS0 = true +const Version_PKGHM18 = true type KeyResolver interface { Resolve(string) @@ -49,6 +49,8 @@ type ModelTx interface { ReportInferenceCacheReadTokens(v uint64) ReportSttDuration(v uint32) ReportTtsChars(v uint32) + ReportBargeInRequests(v uint64) + ReportBargeInRequestTypes(v ModelBargeInRequestTypes) } type ModelReporter interface { diff --git a/observability/gatewayobs/gen_reporter_noop.go b/observability/gatewayobs/gen_reporter_noop.go index ae96ff614..e8278b5c7 100644 --- a/observability/gatewayobs/gen_reporter_noop.go +++ b/observability/gatewayobs/gen_reporter_noop.go @@ -70,14 +70,16 @@ func NewNoopModelReporter() ModelReporter { return &noopModelReporter{} } -func (r *noopModelReporter) RegisterFunc(f func(ts time.Time, tx ModelTx) bool) {} -func (r *noopModelReporter) Tx(f func(ModelTx)) {} -func (r *noopModelReporter) TxAt(ts time.Time, f func(ModelTx)) {} -func (r *noopModelReporter) ReportInferencePromptTokens(v uint64) {} -func (r *noopModelReporter) ReportInferencePromptCacheTokens(v uint64) {} -func (r *noopModelReporter) ReportInferenceCompletionTokens(v uint64) {} -func (r *noopModelReporter) ReportInferenceTotalTokens(v uint64) {} -func (r *noopModelReporter) ReportInferenceCacheCreateTokens(v uint64) {} -func (r *noopModelReporter) ReportInferenceCacheReadTokens(v uint64) {} -func (r *noopModelReporter) ReportSttDuration(v uint32) {} -func (r *noopModelReporter) ReportTtsChars(v uint32) {} +func (r *noopModelReporter) RegisterFunc(f func(ts time.Time, tx ModelTx) bool) {} +func (r *noopModelReporter) Tx(f func(ModelTx)) {} +func (r *noopModelReporter) TxAt(ts time.Time, f func(ModelTx)) {} +func (r *noopModelReporter) ReportInferencePromptTokens(v uint64) {} +func (r *noopModelReporter) ReportInferencePromptCacheTokens(v uint64) {} +func (r *noopModelReporter) ReportInferenceCompletionTokens(v uint64) {} +func (r *noopModelReporter) ReportInferenceTotalTokens(v uint64) {} +func (r *noopModelReporter) ReportInferenceCacheCreateTokens(v uint64) {} +func (r *noopModelReporter) ReportInferenceCacheReadTokens(v uint64) {} +func (r *noopModelReporter) ReportSttDuration(v uint32) {} +func (r *noopModelReporter) ReportTtsChars(v uint32) {} +func (r *noopModelReporter) ReportBargeInRequests(v uint64) {} +func (r *noopModelReporter) ReportBargeInRequestTypes(v ModelBargeInRequestTypes) {} diff --git a/observability/gatewayobs/gen_source.go b/observability/gatewayobs/gen_source.go index 89a836c1b..0e5d9ab64 100644 --- a/observability/gatewayobs/gen_source.go +++ b/observability/gatewayobs/gen_source.go @@ -1,6 +1,14 @@ // Code generated; DO NOT EDIT. package gatewayobs +type ModelBargeInRequestTypes string + +const ( + ModelBargeInRequestTypesUndefined ModelBargeInRequestTypes = "" + ModelBargeInRequestTypesCloud ModelBargeInRequestTypes = "cloud" + ModelBargeInRequestTypesSelfHosted ModelBargeInRequestTypes = "self_hosted" +) + type Rollup string const (