From 6bcf39d1be3fab3b74f2a5116de61cc7e795310e Mon Sep 17 00:00:00 2001 From: Krish-vemula Date: Tue, 17 Feb 2026 14:00:28 -0800 Subject: [PATCH 1/9] add pollHealthChecker interface for optional RPC health checks Add optional interface for chain-specific RPC clients to run extra health checks during alive-loop polling. Failures count toward poll failure threshold. Enables chain integrations to detect issues like missing historical state. --- multinode/mock_rpc_client_test.go | 18 ++++++++++++++++++ multinode/node_lifecycle.go | 13 +++++++++++++ multinode/node_lifecycle_test.go | 25 +++++++++++++++++++++++++ 3 files changed, 56 insertions(+) diff --git a/multinode/mock_rpc_client_test.go b/multinode/mock_rpc_client_test.go index a90063e..a0ae372 100644 --- a/multinode/mock_rpc_client_test.go +++ b/multinode/mock_rpc_client_test.go @@ -296,6 +296,24 @@ func (_m *mockRPCClient[CHAIN_ID, HEAD]) IsSyncing(ctx context.Context) (bool, e return r0, r1 } +// PollHealthCheck provides a mock function with given fields: _a0 +func (_m *mockRPCClient[CHAIN_ID, HEAD]) PollHealthCheck(_a0 context.Context) error { + ret := _m.Called(_a0) + + if len(ret) == 0 { + panic("no return value specified for PollHealthCheck") + } + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context) error); ok { + r0 = rf(_a0) + } else { + r0 = ret.Error(0) + } + + return r0 +} + // mockRPCClient_IsSyncing_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'IsSyncing' type mockRPCClient_IsSyncing_Call[CHAIN_ID ID, HEAD Head] struct { *mock.Call diff --git a/multinode/node_lifecycle.go b/multinode/node_lifecycle.go index e2974c0..0e92cb7 100644 --- a/multinode/node_lifecycle.go +++ b/multinode/node_lifecycle.go @@ -29,6 +29,12 @@ const ( msgDegradedState = "Chainlink is now operating in a degraded state and urgent action is required to resolve the issue" ) +// pollHealthChecker is an optional RPC capability for running extra liveness checks during alive-loop polling. +// If implemented by a chain RPC client, failures are treated the same as ClientVersion polling failures. +type pollHealthChecker interface { + PollHealthCheck(context.Context) error +} + // Node is a FSM // Each state has a loop that goes with it, which monitors the node and moves it into another state as necessary. // Only one loop must run at a time. @@ -111,6 +117,13 @@ func (n *node[CHAIN_ID, HEAD, RPC]) aliveLoop() { lggr.Tracew("Pinging RPC", "nodeState", n.State(), "pollFailures", pollFailures) pollCtx, cancel := context.WithTimeout(ctx, pollInterval) version, pingErr := n.RPC().ClientVersion(pollCtx) + if pingErr == nil { + if healthChecker, ok := any(n.RPC()).(pollHealthChecker); ok { + if err := healthChecker.PollHealthCheck(pollCtx); err != nil { + pingErr = fmt.Errorf("poll health check failed: %w", err) + } + } + } cancel() if pingErr != nil { // prevent overflow diff --git a/multinode/node_lifecycle_test.go b/multinode/node_lifecycle_test.go index 684d0c7..e86261b 100644 --- a/multinode/node_lifecycle_test.go +++ b/multinode/node_lifecycle_test.go @@ -176,6 +176,31 @@ func TestUnit_NodeLifecycle_aliveLoop(t *testing.T) { return nodeStateUnreachable == node.State() }) }) + t.Run("optional poll health check failure counts as poll failure and transitions to unreachable", func(t *testing.T) { + t.Parallel() + rpc := newMockRPCClient[ID, Head](t) + rpc.On("GetInterceptedChainInfo").Return(ChainInfo{}, ChainInfo{}) + lggr, observedLogs := logger.TestObserved(t, zap.DebugLevel) + node := newSubscribedNode(t, testNodeOpts{ + config: testNodeConfig{ + pollFailureThreshold: 1, + pollInterval: tests.TestInterval, + }, + rpc: rpc, + lggr: lggr, + }) + defer func() { assert.NoError(t, node.close()) }() + + rpc.On("ClientVersion", mock.Anything).Return("mock-version", nil) + rpc.On("PollHealthCheck", mock.Anything).Return(errors.New("health check failed")) + rpc.On("Dial", mock.Anything).Return(errors.New("failed to dial")).Maybe() + + node.declareAlive() + tests.AssertLogEventually(t, observedLogs, "poll health check failed: health check failed") + tests.AssertEventually(t, func() bool { + return nodeStateUnreachable == node.State() + }) + }) t.Run("with threshold poll failures, but we are the last node alive, forcibly keeps it alive", func(t *testing.T) { t.Parallel() rpc := newMockRPCClient[ID, Head](t) From 0d408ca691cc005aa10d60ddce7f6a45d27a7d8d Mon Sep 17 00:00:00 2001 From: Krish-vemula Date: Wed, 18 Feb 2026 11:46:57 -0800 Subject: [PATCH 2/9] added fixes for build and lint --- multinode/mock_rpc_client_test.go | 54 +++++++++++++++++++++++-------- multinode/node_lifecycle.go | 12 ++----- multinode/node_lifecycle_test.go | 8 ++++- multinode/rpc_client_base.go | 7 ++++ multinode/types.go | 4 +++ 5 files changed, 61 insertions(+), 24 deletions(-) diff --git a/multinode/mock_rpc_client_test.go b/multinode/mock_rpc_client_test.go index a0ae372..6e129e3 100644 --- a/multinode/mock_rpc_client_test.go +++ b/multinode/mock_rpc_client_test.go @@ -296,9 +296,37 @@ func (_m *mockRPCClient[CHAIN_ID, HEAD]) IsSyncing(ctx context.Context) (bool, e return r0, r1 } -// PollHealthCheck provides a mock function with given fields: _a0 -func (_m *mockRPCClient[CHAIN_ID, HEAD]) PollHealthCheck(_a0 context.Context) error { - ret := _m.Called(_a0) +// mockRPCClient_IsSyncing_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'IsSyncing' +type mockRPCClient_IsSyncing_Call[CHAIN_ID ID, HEAD Head] struct { + *mock.Call +} + +// IsSyncing is a helper method to define mock.On call +// - ctx context.Context +func (_e *mockRPCClient_Expecter[CHAIN_ID, HEAD]) IsSyncing(ctx interface{}) *mockRPCClient_IsSyncing_Call[CHAIN_ID, HEAD] { + return &mockRPCClient_IsSyncing_Call[CHAIN_ID, HEAD]{Call: _e.mock.On("IsSyncing", ctx)} +} + +func (_c *mockRPCClient_IsSyncing_Call[CHAIN_ID, HEAD]) Run(run func(ctx context.Context)) *mockRPCClient_IsSyncing_Call[CHAIN_ID, HEAD] { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context)) + }) + return _c +} + +func (_c *mockRPCClient_IsSyncing_Call[CHAIN_ID, HEAD]) Return(_a0 bool, _a1 error) *mockRPCClient_IsSyncing_Call[CHAIN_ID, HEAD] { + _c.Call.Return(_a0, _a1) + return _c +} + +func (_c *mockRPCClient_IsSyncing_Call[CHAIN_ID, HEAD]) RunAndReturn(run func(context.Context) (bool, error)) *mockRPCClient_IsSyncing_Call[CHAIN_ID, HEAD] { + _c.Call.Return(run) + return _c +} + +// PollHealthCheck provides a mock function with given fields: ctx +func (_m *mockRPCClient[CHAIN_ID, HEAD]) PollHealthCheck(ctx context.Context) error { + ret := _m.Called(ctx) if len(ret) == 0 { panic("no return value specified for PollHealthCheck") @@ -306,7 +334,7 @@ func (_m *mockRPCClient[CHAIN_ID, HEAD]) PollHealthCheck(_a0 context.Context) er var r0 error if rf, ok := ret.Get(0).(func(context.Context) error); ok { - r0 = rf(_a0) + r0 = rf(ctx) } else { r0 = ret.Error(0) } @@ -314,30 +342,30 @@ func (_m *mockRPCClient[CHAIN_ID, HEAD]) PollHealthCheck(_a0 context.Context) er return r0 } -// mockRPCClient_IsSyncing_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'IsSyncing' -type mockRPCClient_IsSyncing_Call[CHAIN_ID ID, HEAD Head] struct { +// mockRPCClient_PollHealthCheck_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'PollHealthCheck' +type mockRPCClient_PollHealthCheck_Call[CHAIN_ID ID, HEAD Head] struct { *mock.Call } -// IsSyncing is a helper method to define mock.On call +// PollHealthCheck is a helper method to define mock.On call // - ctx context.Context -func (_e *mockRPCClient_Expecter[CHAIN_ID, HEAD]) IsSyncing(ctx interface{}) *mockRPCClient_IsSyncing_Call[CHAIN_ID, HEAD] { - return &mockRPCClient_IsSyncing_Call[CHAIN_ID, HEAD]{Call: _e.mock.On("IsSyncing", ctx)} +func (_e *mockRPCClient_Expecter[CHAIN_ID, HEAD]) PollHealthCheck(ctx interface{}) *mockRPCClient_PollHealthCheck_Call[CHAIN_ID, HEAD] { + return &mockRPCClient_PollHealthCheck_Call[CHAIN_ID, HEAD]{Call: _e.mock.On("PollHealthCheck", ctx)} } -func (_c *mockRPCClient_IsSyncing_Call[CHAIN_ID, HEAD]) Run(run func(ctx context.Context)) *mockRPCClient_IsSyncing_Call[CHAIN_ID, HEAD] { +func (_c *mockRPCClient_PollHealthCheck_Call[CHAIN_ID, HEAD]) Run(run func(ctx context.Context)) *mockRPCClient_PollHealthCheck_Call[CHAIN_ID, HEAD] { _c.Call.Run(func(args mock.Arguments) { run(args[0].(context.Context)) }) return _c } -func (_c *mockRPCClient_IsSyncing_Call[CHAIN_ID, HEAD]) Return(_a0 bool, _a1 error) *mockRPCClient_IsSyncing_Call[CHAIN_ID, HEAD] { - _c.Call.Return(_a0, _a1) +func (_c *mockRPCClient_PollHealthCheck_Call[CHAIN_ID, HEAD]) Return(_a0 error) *mockRPCClient_PollHealthCheck_Call[CHAIN_ID, HEAD] { + _c.Call.Return(_a0) return _c } -func (_c *mockRPCClient_IsSyncing_Call[CHAIN_ID, HEAD]) RunAndReturn(run func(context.Context) (bool, error)) *mockRPCClient_IsSyncing_Call[CHAIN_ID, HEAD] { +func (_c *mockRPCClient_PollHealthCheck_Call[CHAIN_ID, HEAD]) RunAndReturn(run func(context.Context) error) *mockRPCClient_PollHealthCheck_Call[CHAIN_ID, HEAD] { _c.Call.Return(run) return _c } diff --git a/multinode/node_lifecycle.go b/multinode/node_lifecycle.go index 0e92cb7..dffdcf3 100644 --- a/multinode/node_lifecycle.go +++ b/multinode/node_lifecycle.go @@ -29,12 +29,6 @@ const ( msgDegradedState = "Chainlink is now operating in a degraded state and urgent action is required to resolve the issue" ) -// pollHealthChecker is an optional RPC capability for running extra liveness checks during alive-loop polling. -// If implemented by a chain RPC client, failures are treated the same as ClientVersion polling failures. -type pollHealthChecker interface { - PollHealthCheck(context.Context) error -} - // Node is a FSM // Each state has a loop that goes with it, which monitors the node and moves it into another state as necessary. // Only one loop must run at a time. @@ -118,10 +112,8 @@ func (n *node[CHAIN_ID, HEAD, RPC]) aliveLoop() { pollCtx, cancel := context.WithTimeout(ctx, pollInterval) version, pingErr := n.RPC().ClientVersion(pollCtx) if pingErr == nil { - if healthChecker, ok := any(n.RPC()).(pollHealthChecker); ok { - if err := healthChecker.PollHealthCheck(pollCtx); err != nil { - pingErr = fmt.Errorf("poll health check failed: %w", err) - } + if healthErr := n.RPC().PollHealthCheck(pollCtx); healthErr != nil { + pingErr = fmt.Errorf("poll health check failed: %w", healthErr) } } cancel() diff --git a/multinode/node_lifecycle_test.go b/multinode/node_lifecycle_test.go index e86261b..dca2531 100644 --- a/multinode/node_lifecycle_test.go +++ b/multinode/node_lifecycle_test.go @@ -147,6 +147,8 @@ func TestUnit_NodeLifecycle_aliveLoop(t *testing.T) { }).Once() // redundant call to stay in alive state rpc.On("ClientVersion", mock.Anything).Return("", nil) + // PollHealthCheck is called after successful ClientVersion - return nil to pass + rpc.On("PollHealthCheck", mock.Anything).Return(nil).Maybe() node.declareAlive() tests.AssertLogCountEventually(t, observedLogs, fmt.Sprintf("Poll failure, RPC endpoint %s failed to respond properly", node.String()), pollFailureThreshold) tests.AssertLogCountEventually(t, observedLogs, "Ping successful", 2) @@ -196,7 +198,7 @@ func TestUnit_NodeLifecycle_aliveLoop(t *testing.T) { rpc.On("Dial", mock.Anything).Return(errors.New("failed to dial")).Maybe() node.declareAlive() - tests.AssertLogEventually(t, observedLogs, "poll health check failed: health check failed") + tests.AssertLogCountEventually(t, observedLogs, fmt.Sprintf("Poll failure, RPC endpoint %s failed to respond properly", node.String()), 1) tests.AssertEventually(t, func() bool { return nodeStateUnreachable == node.State() }) @@ -272,6 +274,7 @@ func TestUnit_NodeLifecycle_aliveLoop(t *testing.T) { }) defer func() { assert.NoError(t, node.close()) }() rpc.On("ClientVersion", mock.Anything).Return("", nil) + rpc.On("PollHealthCheck", mock.Anything).Return(nil).Maybe() const mostRecentBlock = 20 rpc.On("GetInterceptedChainInfo").Return(ChainInfo{BlockNumber: mostRecentBlock}, ChainInfo{BlockNumber: 30}) poolInfo := newMockPoolChainInfoProvider(t) @@ -307,6 +310,7 @@ func TestUnit_NodeLifecycle_aliveLoop(t *testing.T) { }) defer func() { assert.NoError(t, node.close()) }() rpc.On("ClientVersion", mock.Anything).Return("", nil) + rpc.On("PollHealthCheck", mock.Anything).Return(nil).Maybe() const mostRecentBlock = 20 rpc.On("GetInterceptedChainInfo").Return(ChainInfo{BlockNumber: mostRecentBlock}, ChainInfo{BlockNumber: 30}) poolInfo := newMockPoolChainInfoProvider(t) @@ -335,6 +339,7 @@ func TestUnit_NodeLifecycle_aliveLoop(t *testing.T) { }) defer func() { assert.NoError(t, node.close()) }() rpc.On("ClientVersion", mock.Anything).Return("", nil) + rpc.On("PollHealthCheck", mock.Anything).Return(nil).Maybe() const mostRecentBlock = 20 rpc.On("GetInterceptedChainInfo").Return(ChainInfo{BlockNumber: mostRecentBlock}, ChainInfo{BlockNumber: 30}).Twice() poolInfo := newMockPoolChainInfoProvider(t) @@ -369,6 +374,7 @@ func TestUnit_NodeLifecycle_aliveLoop(t *testing.T) { }) defer func() { assert.NoError(t, node.close()) }() rpc.On("ClientVersion", mock.Anything).Return("", nil) + rpc.On("PollHealthCheck", mock.Anything).Return(nil).Maybe() const mostRecentBlock = 20 rpc.On("GetInterceptedChainInfo").Return(ChainInfo{BlockNumber: mostRecentBlock}, ChainInfo{BlockNumber: 30}) node.declareAlive() diff --git a/multinode/rpc_client_base.go b/multinode/rpc_client_base.go index b4a886c..a06200d 100644 --- a/multinode/rpc_client_base.go +++ b/multinode/rpc_client_base.go @@ -296,3 +296,10 @@ func (m *RPCClientBase[HEAD]) GetInterceptedChainInfo() (latest, highestUserObse defer m.chainInfoLock.RUnlock() return m.latestChainInfo, m.highestUserObservations } + +// PollHealthCheck provides a default no-op implementation for the RPCClient interface. +// Chain-specific RPC clients can override this method to perform additional health checks +// during polling (e.g., verifying historical state availability). +func (m *RPCClientBase[HEAD]) PollHealthCheck(ctx context.Context) error { + return nil +} diff --git a/multinode/types.go b/multinode/types.go index b31c6ca..e9aa954 100644 --- a/multinode/types.go +++ b/multinode/types.go @@ -77,6 +77,10 @@ type RPCClient[ // Ensure implementation does not have a race condition when values are reset before request completion and as // a result latest ChainInfo contains information from the previous cycle. GetInterceptedChainInfo() (latest, highestUserObservations ChainInfo) + // PollHealthCheck - performs an optional additional health check during polling. + // Implementations can use this for chain-specific health verification (e.g., historical state availability). + // Return nil if the check passes or is not applicable, or an error if the check fails. + PollHealthCheck(ctx context.Context) error } // Head is the interface required by the NodeClient From 886da2b5ba70037891b290f68c42fd5b94d4851d Mon Sep 17 00:00:00 2001 From: Krish-vemula Date: Mon, 2 Mar 2026 09:52:25 -0600 Subject: [PATCH 3/9] Introduce nodeStateFinalizedStateNotAvailable and separate polling for finalized state availability with configurable threshold and regex-based error classification. --- multinode/config/config.go | 45 ++++++++++++++ multinode/node.go | 4 ++ multinode/node_fsm.go | 33 ++++++++++ multinode/node_lifecycle.go | 113 +++++++++++++++++++++++++++++++++++ multinode/rpc_client_base.go | 8 +++ multinode/types.go | 5 ++ 6 files changed, 208 insertions(+) diff --git a/multinode/config/config.go b/multinode/config/config.go index 4d7d8ca..345b230 100644 --- a/multinode/config/config.go +++ b/multinode/config/config.go @@ -34,6 +34,12 @@ type MultiNode struct { FinalityDepth *uint32 FinalityTagEnabled *bool FinalizedBlockOffset *uint32 + + // Finalized State Availability Check + FinalizedStateCheckEnabled *bool + FinalizedStateCheckAddress *string + FinalizedStateCheckFailureThreshold *uint32 + FinalizedStateUnavailableRegex *string } func (c *MultiNodeConfig) Enabled() bool { @@ -94,6 +100,31 @@ func (c *MultiNodeConfig) FinalityTagEnabled() bool { return *c.MultiNode.Finali func (c *MultiNodeConfig) FinalizedBlockOffset() uint32 { return *c.MultiNode.FinalizedBlockOffset } +func (c *MultiNodeConfig) FinalizedStateCheckEnabled() bool { + return c.MultiNode.FinalizedStateCheckEnabled != nil && *c.MultiNode.FinalizedStateCheckEnabled +} + +func (c *MultiNodeConfig) FinalizedStateCheckAddress() string { + if c.MultiNode.FinalizedStateCheckAddress == nil { + return "" + } + return *c.MultiNode.FinalizedStateCheckAddress +} + +func (c *MultiNodeConfig) FinalizedStateCheckFailureThreshold() uint32 { + if c.MultiNode.FinalizedStateCheckFailureThreshold == nil { + return 0 + } + return *c.MultiNode.FinalizedStateCheckFailureThreshold +} + +func (c *MultiNodeConfig) FinalizedStateUnavailableRegex() string { + if c.MultiNode.FinalizedStateUnavailableRegex == nil { + return "" + } + return *c.MultiNode.FinalizedStateUnavailableRegex +} + func (c *MultiNodeConfig) SetFrom(f *MultiNodeConfig) { if f.MultiNode.Enabled != nil { c.MultiNode.Enabled = f.MultiNode.Enabled @@ -150,4 +181,18 @@ func (c *MultiNodeConfig) SetFrom(f *MultiNodeConfig) { if f.MultiNode.FinalizedBlockOffset != nil { c.MultiNode.FinalizedBlockOffset = f.MultiNode.FinalizedBlockOffset } + + // Finalized State Availability Check + if f.MultiNode.FinalizedStateCheckEnabled != nil { + c.MultiNode.FinalizedStateCheckEnabled = f.MultiNode.FinalizedStateCheckEnabled + } + if f.MultiNode.FinalizedStateCheckAddress != nil { + c.MultiNode.FinalizedStateCheckAddress = f.MultiNode.FinalizedStateCheckAddress + } + if f.MultiNode.FinalizedStateCheckFailureThreshold != nil { + c.MultiNode.FinalizedStateCheckFailureThreshold = f.MultiNode.FinalizedStateCheckFailureThreshold + } + if f.MultiNode.FinalizedStateUnavailableRegex != nil { + c.MultiNode.FinalizedStateUnavailableRegex = f.MultiNode.FinalizedStateUnavailableRegex + } } diff --git a/multinode/node.go b/multinode/node.go index 6729459..a5eee22 100644 --- a/multinode/node.go +++ b/multinode/node.go @@ -27,6 +27,10 @@ type NodeConfig interface { DeathDeclarationDelay() time.Duration NewHeadsPollInterval() time.Duration VerifyChainID() bool + FinalizedStateCheckEnabled() bool + FinalizedStateCheckFailureThreshold() uint32 + FinalizedStateCheckAddress() string + FinalizedStateUnavailableRegex() string } type ChainConfig interface { diff --git a/multinode/node_fsm.go b/multinode/node_fsm.go index 818363e..0299050 100644 --- a/multinode/node_fsm.go +++ b/multinode/node_fsm.go @@ -35,6 +35,8 @@ func (n nodeState) String() string { return "Syncing" case nodeStateFinalizedBlockOutOfSync: return "FinalizedBlockOutOfSync" + case nodeStateFinalizedStateNotAvailable: + return "FinalizedStateNotAvailable" default: return fmt.Sprintf("nodeState(%d)", n) } @@ -72,6 +74,8 @@ const ( nodeStateSyncing // nodeStateFinalizedBlockOutOfSync - node is lagging behind on latest finalized block nodeStateFinalizedBlockOutOfSync + // nodeStateFinalizedStateNotAvailable - node cannot serve historical state at finalized block + nodeStateFinalizedStateNotAvailable // nodeStateLen tracks the number of states nodeStateLen ) @@ -288,6 +292,8 @@ func (n *node[CHAIN_ID, HEAD, RPC]) declareState(state nodeState) { n.declareSyncing() case nodeStateAlive: n.declareAlive() + case nodeStateFinalizedStateNotAvailable: + n.declareFinalizedStateNotAvailable() default: panic(fmt.Sprintf("%#v state declaration is not implemented", state)) } @@ -351,6 +357,33 @@ func (n *node[CHAIN_ID, HEAD, RPC]) transitionToSyncing(fn func()) { fn() } +func (n *node[CHAIN_ID, HEAD, RPC]) declareFinalizedStateNotAvailable() { + n.transitionToFinalizedStateNotAvailable(func() { + n.lfcLog.Errorw("RPC Node cannot serve finalized state", "nodeState", n.state) + n.wg.Add(1) + go n.finalizedStateNotAvailableLoop() + }) +} + +func (n *node[CHAIN_ID, HEAD, RPC]) transitionToFinalizedStateNotAvailable(fn func()) { + ctx, cancel := n.stopCh.NewCtx() + defer cancel() + n.metrics.IncrementNodeTransitionsToUnreachable(ctx, n.name) + n.stateMu.Lock() + defer n.stateMu.Unlock() + if n.state == nodeStateClosed { + return + } + switch n.state { + case nodeStateAlive: + n.rpc.Close() + n.state = nodeStateFinalizedStateNotAvailable + default: + panic(transitionFail(n.state, nodeStateFinalizedStateNotAvailable)) + } + fn() +} + func transitionFail(from nodeState, to nodeState) string { return fmt.Sprintf("cannot transition from %#v to %#v", from, to) } diff --git a/multinode/node_lifecycle.go b/multinode/node_lifecycle.go index dffdcf3..3553f1f 100644 --- a/multinode/node_lifecycle.go +++ b/multinode/node_lifecycle.go @@ -5,6 +5,7 @@ import ( "fmt" "math" "math/big" + "regexp" "time" "github.com/smartcontractkit/chainlink-common/pkg/logger" @@ -102,6 +103,13 @@ func (n *node[CHAIN_ID, HEAD, RPC]) aliveLoop() { localHighestChainInfo, _ := n.rpc.GetInterceptedChainInfo() var pollFailures uint32 + // Finalized state availability check config + finalizedStateCheckEnabled := n.nodePoolCfg.FinalizedStateCheckEnabled() + finalizedStateCheckFailureThreshold := n.nodePoolCfg.FinalizedStateCheckFailureThreshold() + finalizedStateCheckAddress := n.nodePoolCfg.FinalizedStateCheckAddress() + finalizedStateUnavailableRegex := n.nodePoolCfg.FinalizedStateUnavailableRegex() + var finalizedStateFailures uint32 + for { select { case <-ctx.Done(): @@ -150,6 +158,40 @@ func (n *node[CHAIN_ID, HEAD, RPC]) aliveLoop() { n.declareOutOfSync(syncStatusNotInSyncWithPool) return } + // Separate finalized state availability check + if finalizedStateCheckEnabled { + stateCheckCtx, stateCheckCancel := context.WithTimeout(ctx, pollInterval) + stateErr := n.RPC().CheckFinalizedStateAvailability(stateCheckCtx, finalizedStateCheckAddress) + stateCheckCancel() + if stateErr != nil { + if isFinalizedStateUnavailableError(stateErr, finalizedStateUnavailableRegex) { + if finalizedStateFailures < math.MaxUint32 { + finalizedStateFailures++ + } + lggr.Warnw("Finalized state not available", "err", stateErr, "failures", finalizedStateFailures, "threshold", finalizedStateCheckFailureThreshold) + if finalizedStateCheckFailureThreshold > 0 && finalizedStateFailures >= finalizedStateCheckFailureThreshold { + lggr.Errorw("RPC node cannot serve finalized state after consecutive failures", "failures", finalizedStateFailures) + if n.poolInfoProvider != nil { + if l, _ := n.poolInfoProvider.LatestChainInfo(); l < 2 && !n.isLoadBalancedRPC { + lggr.Criticalf("RPC endpoint cannot serve finalized state; %s %s", msgCannotDisable, msgDegradedState) + continue + } + } + n.declareFinalizedStateNotAvailable() + return + } + } else { + // Treat as RPC reachability error + if pollFailures < math.MaxUint32 { + n.metrics.IncrementPollsFailed(ctx, n.name) + pollFailures++ + } + lggr.Warnw("Finalized state check failed with RPC error", "err", stateErr, "pollFailures", pollFailures) + } + } else { + finalizedStateFailures = 0 + } + } case bh, open := <-headsSub.Heads: if !open { lggr.Errorw("Subscription channel unexpectedly closed", "nodeState", n.getCachedState()) @@ -684,3 +726,74 @@ func (n *node[CHAIN_ID, HEAD, RPC]) syncingLoop() { } } } + +func (n *node[CHAIN_ID, HEAD, RPC]) finalizedStateNotAvailableLoop() { + defer n.wg.Done() + ctx, cancel := n.newCtx() + defer cancel() + + { + state := n.getCachedState() + switch state { + case nodeStateFinalizedStateNotAvailable: + case nodeStateClosed: + return + default: + panic(fmt.Sprintf("finalizedStateNotAvailableLoop can only run for node in FinalizedStateNotAvailable state, got: %s", state)) + } + } + + unavailableAt := time.Now() + + lggr := logger.Sugared(logger.Named(n.lfcLog, "FinalizedStateNotAvailable")) + lggr.Debugw("Trying to revive RPC node with unavailable finalized state", "nodeState", n.getCachedState()) + + dialRetryBackoff := NewRedialBackoff() + + for { + select { + case <-ctx.Done(): + return + case <-time.After(dialRetryBackoff.Duration()): + lggr.Tracew("Trying to re-dial RPC node", "nodeState", n.getCachedState()) + + err := n.rpc.Dial(ctx) + if err != nil { + lggr.Errorw(fmt.Sprintf("Failed to redial RPC node: %v", err), "err", err, "nodeState", n.getCachedState()) + continue + } + + n.setState(nodeStateDialed) + + state := n.verifyConn(ctx, lggr) + switch state { + case nodeStateUnreachable: + n.setState(nodeStateFinalizedStateNotAvailable) + continue + case nodeStateAlive: + lggr.Infow(fmt.Sprintf("Successfully redialled and verified RPC node %s. Finalized state was unavailable for %s", n.String(), time.Since(unavailableAt)), "nodeState", n.getCachedState()) + fallthrough + default: + n.declareState(state) + return + } + } + } +} + +// isFinalizedStateUnavailableError checks if the error indicates that the RPC cannot serve +// historical state (as opposed to an RPC reachability issue). +// If regexPattern is empty, all errors are treated as state unavailable errors. +func isFinalizedStateUnavailableError(err error, regexPattern string) bool { + if err == nil { + return false + } + if regexPattern == "" { + return true + } + re, compileErr := regexp.Compile(regexPattern) + if compileErr != nil { + return true + } + return re.MatchString(err.Error()) +} diff --git a/multinode/rpc_client_base.go b/multinode/rpc_client_base.go index a06200d..6ac4414 100644 --- a/multinode/rpc_client_base.go +++ b/multinode/rpc_client_base.go @@ -303,3 +303,11 @@ func (m *RPCClientBase[HEAD]) GetInterceptedChainInfo() (latest, highestUserObse func (m *RPCClientBase[HEAD]) PollHealthCheck(ctx context.Context) error { return nil } + +// CheckFinalizedStateAvailability provides a default no-op implementation for the RPCClient interface. +// Chain-specific RPC clients can override this method to verify that the RPC can serve +// historical state at the finalized block (e.g., by calling eth_getBalance at the finalized block). +// The probeAddress parameter is used to perform a state query at the finalized block. +func (m *RPCClientBase[HEAD]) CheckFinalizedStateAvailability(ctx context.Context, probeAddress string) error { + return nil +} diff --git a/multinode/types.go b/multinode/types.go index e9aa954..4cb9c04 100644 --- a/multinode/types.go +++ b/multinode/types.go @@ -81,6 +81,11 @@ type RPCClient[ // Implementations can use this for chain-specific health verification (e.g., historical state availability). // Return nil if the check passes or is not applicable, or an error if the check fails. PollHealthCheck(ctx context.Context) error + // CheckFinalizedStateAvailability - verifies if the RPC can serve historical state at the finalized block. + // This is used to detect non-archive nodes that cannot serve state queries for older blocks. + // The probeAddress is used to call a state query (e.g., eth_getBalance) at the finalized block. + // Return nil if the check passes or is not applicable, or an error if the check fails. + CheckFinalizedStateAvailability(ctx context.Context, probeAddress string) error } // Head is the interface required by the NodeClient From 77b0b057a81e7a295b1aa0e2be2b423172edeaa5 Mon Sep 17 00:00:00 2001 From: Krish-vemula Date: Mon, 2 Mar 2026 10:20:49 -0600 Subject: [PATCH 4/9] added fixes for lint and mock --- multinode/mock_rpc_client_test.go | 47 +++++++++++++++++++++++++++++++ multinode/node_test.go | 16 +++++++++++ 2 files changed, 63 insertions(+) diff --git a/multinode/mock_rpc_client_test.go b/multinode/mock_rpc_client_test.go index 6e129e3..9fa4198 100644 --- a/multinode/mock_rpc_client_test.go +++ b/multinode/mock_rpc_client_test.go @@ -79,6 +79,53 @@ func (_c *mockRPCClient_ChainID_Call[CHAIN_ID, HEAD]) RunAndReturn(run func(cont return _c } +// CheckFinalizedStateAvailability provides a mock function with given fields: ctx, probeAddress +func (_m *mockRPCClient[CHAIN_ID, HEAD]) CheckFinalizedStateAvailability(ctx context.Context, probeAddress string) error { + ret := _m.Called(ctx, probeAddress) + + if len(ret) == 0 { + panic("no return value specified for CheckFinalizedStateAvailability") + } + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context, string) error); ok { + r0 = rf(ctx, probeAddress) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// mockRPCClient_CheckFinalizedStateAvailability_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'CheckFinalizedStateAvailability' +type mockRPCClient_CheckFinalizedStateAvailability_Call[CHAIN_ID ID, HEAD Head] struct { + *mock.Call +} + +// CheckFinalizedStateAvailability is a helper method to define mock.On call +// - ctx context.Context +// - probeAddress string +func (_e *mockRPCClient_Expecter[CHAIN_ID, HEAD]) CheckFinalizedStateAvailability(ctx interface{}, probeAddress interface{}) *mockRPCClient_CheckFinalizedStateAvailability_Call[CHAIN_ID, HEAD] { + return &mockRPCClient_CheckFinalizedStateAvailability_Call[CHAIN_ID, HEAD]{Call: _e.mock.On("CheckFinalizedStateAvailability", ctx, probeAddress)} +} + +func (_c *mockRPCClient_CheckFinalizedStateAvailability_Call[CHAIN_ID, HEAD]) Run(run func(ctx context.Context, probeAddress string)) *mockRPCClient_CheckFinalizedStateAvailability_Call[CHAIN_ID, HEAD] { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(string)) + }) + return _c +} + +func (_c *mockRPCClient_CheckFinalizedStateAvailability_Call[CHAIN_ID, HEAD]) Return(_a0 error) *mockRPCClient_CheckFinalizedStateAvailability_Call[CHAIN_ID, HEAD] { + _c.Call.Return(_a0) + return _c +} + +func (_c *mockRPCClient_CheckFinalizedStateAvailability_Call[CHAIN_ID, HEAD]) RunAndReturn(run func(context.Context, string) error) *mockRPCClient_CheckFinalizedStateAvailability_Call[CHAIN_ID, HEAD] { + _c.Call.Return(run) + return _c +} + // ClientVersion provides a mock function with given fields: _a0 func (_m *mockRPCClient[CHAIN_ID, HEAD]) ClientVersion(_a0 context.Context) (string, error) { ret := _m.Called(_a0) diff --git a/multinode/node_test.go b/multinode/node_test.go index e3c8d71..cf4af6c 100644 --- a/multinode/node_test.go +++ b/multinode/node_test.go @@ -66,6 +66,22 @@ func (n testNodeConfig) VerifyChainID() bool { return true } +func (n testNodeConfig) FinalizedStateCheckEnabled() bool { + return false +} + +func (n testNodeConfig) FinalizedStateCheckFailureThreshold() uint32 { + return 0 +} + +func (n testNodeConfig) FinalizedStateCheckAddress() string { + return "" +} + +func (n testNodeConfig) FinalizedStateUnavailableRegex() string { + return "" +} + type testNode struct { *node[ID, Head, RPCClient[ID, Head]] } From 8cb2cbb6682efe6dbc1957e4b56dfb606788f677 Mon Sep 17 00:00:00 2001 From: Krish-vemula Date: Wed, 4 Mar 2026 23:35:29 -0600 Subject: [PATCH 5/9] Add FinalizedStateUnavailable to ClientErrors --- multinode/config/config.go | 35 +------- multinode/mock_node_metrics_test.go | 68 ++++++++++++++ multinode/mock_rpc_client_test.go | 21 +++-- multinode/node.go | 7 +- multinode/node_fsm.go | 4 +- multinode/node_lifecycle.go | 105 +++++++++------------- multinode/node_lifecycle_test.go | 134 ++++++++++++++++++++++++++++ multinode/node_test.go | 35 +++----- multinode/rpc_client_base.go | 3 +- multinode/types.go | 11 ++- 10 files changed, 284 insertions(+), 139 deletions(-) diff --git a/multinode/config/config.go b/multinode/config/config.go index 345b230..2f09905 100644 --- a/multinode/config/config.go +++ b/multinode/config/config.go @@ -33,13 +33,10 @@ type MultiNode struct { NoNewFinalizedHeadsThreshold *config.Duration FinalityDepth *uint32 FinalityTagEnabled *bool - FinalizedBlockOffset *uint32 + FinalizedBlockOffset *uint32 // Finalized State Availability Check - FinalizedStateCheckEnabled *bool - FinalizedStateCheckAddress *string FinalizedStateCheckFailureThreshold *uint32 - FinalizedStateUnavailableRegex *string } func (c *MultiNodeConfig) Enabled() bool { @@ -100,31 +97,10 @@ func (c *MultiNodeConfig) FinalityTagEnabled() bool { return *c.MultiNode.Finali func (c *MultiNodeConfig) FinalizedBlockOffset() uint32 { return *c.MultiNode.FinalizedBlockOffset } -func (c *MultiNodeConfig) FinalizedStateCheckEnabled() bool { - return c.MultiNode.FinalizedStateCheckEnabled != nil && *c.MultiNode.FinalizedStateCheckEnabled -} - -func (c *MultiNodeConfig) FinalizedStateCheckAddress() string { - if c.MultiNode.FinalizedStateCheckAddress == nil { - return "" - } - return *c.MultiNode.FinalizedStateCheckAddress -} - func (c *MultiNodeConfig) FinalizedStateCheckFailureThreshold() uint32 { - if c.MultiNode.FinalizedStateCheckFailureThreshold == nil { - return 0 - } return *c.MultiNode.FinalizedStateCheckFailureThreshold } -func (c *MultiNodeConfig) FinalizedStateUnavailableRegex() string { - if c.MultiNode.FinalizedStateUnavailableRegex == nil { - return "" - } - return *c.MultiNode.FinalizedStateUnavailableRegex -} - func (c *MultiNodeConfig) SetFrom(f *MultiNodeConfig) { if f.MultiNode.Enabled != nil { c.MultiNode.Enabled = f.MultiNode.Enabled @@ -183,16 +159,7 @@ func (c *MultiNodeConfig) SetFrom(f *MultiNodeConfig) { } // Finalized State Availability Check - if f.MultiNode.FinalizedStateCheckEnabled != nil { - c.MultiNode.FinalizedStateCheckEnabled = f.MultiNode.FinalizedStateCheckEnabled - } - if f.MultiNode.FinalizedStateCheckAddress != nil { - c.MultiNode.FinalizedStateCheckAddress = f.MultiNode.FinalizedStateCheckAddress - } if f.MultiNode.FinalizedStateCheckFailureThreshold != nil { c.MultiNode.FinalizedStateCheckFailureThreshold = f.MultiNode.FinalizedStateCheckFailureThreshold } - if f.MultiNode.FinalizedStateUnavailableRegex != nil { - c.MultiNode.FinalizedStateUnavailableRegex = f.MultiNode.FinalizedStateUnavailableRegex - } } diff --git a/multinode/mock_node_metrics_test.go b/multinode/mock_node_metrics_test.go index 261d7cf..8f5829a 100644 --- a/multinode/mock_node_metrics_test.go +++ b/multinode/mock_node_metrics_test.go @@ -21,6 +21,40 @@ func (_m *mockNodeMetrics) EXPECT() *mockNodeMetrics_Expecter { return &mockNodeMetrics_Expecter{mock: &_m.Mock} } +// IncrementFinalizedStateFailed provides a mock function with given fields: ctx, nodeName +func (_m *mockNodeMetrics) IncrementFinalizedStateFailed(ctx context.Context, nodeName string) { + _m.Called(ctx, nodeName) +} + +// mockNodeMetrics_IncrementFinalizedStateFailed_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'IncrementFinalizedStateFailed' +type mockNodeMetrics_IncrementFinalizedStateFailed_Call struct { + *mock.Call +} + +// IncrementFinalizedStateFailed is a helper method to define mock.On call +// - ctx context.Context +// - nodeName string +func (_e *mockNodeMetrics_Expecter) IncrementFinalizedStateFailed(ctx interface{}, nodeName interface{}) *mockNodeMetrics_IncrementFinalizedStateFailed_Call { + return &mockNodeMetrics_IncrementFinalizedStateFailed_Call{Call: _e.mock.On("IncrementFinalizedStateFailed", ctx, nodeName)} +} + +func (_c *mockNodeMetrics_IncrementFinalizedStateFailed_Call) Run(run func(ctx context.Context, nodeName string)) *mockNodeMetrics_IncrementFinalizedStateFailed_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(string)) + }) + return _c +} + +func (_c *mockNodeMetrics_IncrementFinalizedStateFailed_Call) Return() *mockNodeMetrics_IncrementFinalizedStateFailed_Call { + _c.Call.Return() + return _c +} + +func (_c *mockNodeMetrics_IncrementFinalizedStateFailed_Call) RunAndReturn(run func(context.Context, string)) *mockNodeMetrics_IncrementFinalizedStateFailed_Call { + _c.Run(run) + return _c +} + // IncrementNodeTransitionsToAlive provides a mock function with given fields: ctx, nodeName func (_m *mockNodeMetrics) IncrementNodeTransitionsToAlive(ctx context.Context, nodeName string) { _m.Called(ctx, nodeName) @@ -55,6 +89,40 @@ func (_c *mockNodeMetrics_IncrementNodeTransitionsToAlive_Call) RunAndReturn(run return _c } +// IncrementNodeTransitionsToFinalizedStateNotAvailable provides a mock function with given fields: ctx, nodeName +func (_m *mockNodeMetrics) IncrementNodeTransitionsToFinalizedStateNotAvailable(ctx context.Context, nodeName string) { + _m.Called(ctx, nodeName) +} + +// mockNodeMetrics_IncrementNodeTransitionsToFinalizedStateNotAvailable_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'IncrementNodeTransitionsToFinalizedStateNotAvailable' +type mockNodeMetrics_IncrementNodeTransitionsToFinalizedStateNotAvailable_Call struct { + *mock.Call +} + +// IncrementNodeTransitionsToFinalizedStateNotAvailable is a helper method to define mock.On call +// - ctx context.Context +// - nodeName string +func (_e *mockNodeMetrics_Expecter) IncrementNodeTransitionsToFinalizedStateNotAvailable(ctx interface{}, nodeName interface{}) *mockNodeMetrics_IncrementNodeTransitionsToFinalizedStateNotAvailable_Call { + return &mockNodeMetrics_IncrementNodeTransitionsToFinalizedStateNotAvailable_Call{Call: _e.mock.On("IncrementNodeTransitionsToFinalizedStateNotAvailable", ctx, nodeName)} +} + +func (_c *mockNodeMetrics_IncrementNodeTransitionsToFinalizedStateNotAvailable_Call) Run(run func(ctx context.Context, nodeName string)) *mockNodeMetrics_IncrementNodeTransitionsToFinalizedStateNotAvailable_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(string)) + }) + return _c +} + +func (_c *mockNodeMetrics_IncrementNodeTransitionsToFinalizedStateNotAvailable_Call) Return() *mockNodeMetrics_IncrementNodeTransitionsToFinalizedStateNotAvailable_Call { + _c.Call.Return() + return _c +} + +func (_c *mockNodeMetrics_IncrementNodeTransitionsToFinalizedStateNotAvailable_Call) RunAndReturn(run func(context.Context, string)) *mockNodeMetrics_IncrementNodeTransitionsToFinalizedStateNotAvailable_Call { + _c.Run(run) + return _c +} + // IncrementNodeTransitionsToInSync provides a mock function with given fields: ctx, nodeName func (_m *mockNodeMetrics) IncrementNodeTransitionsToInSync(ctx context.Context, nodeName string) { _m.Called(ctx, nodeName) diff --git a/multinode/mock_rpc_client_test.go b/multinode/mock_rpc_client_test.go index 9fa4198..4274e5b 100644 --- a/multinode/mock_rpc_client_test.go +++ b/multinode/mock_rpc_client_test.go @@ -79,17 +79,17 @@ func (_c *mockRPCClient_ChainID_Call[CHAIN_ID, HEAD]) RunAndReturn(run func(cont return _c } -// CheckFinalizedStateAvailability provides a mock function with given fields: ctx, probeAddress -func (_m *mockRPCClient[CHAIN_ID, HEAD]) CheckFinalizedStateAvailability(ctx context.Context, probeAddress string) error { - ret := _m.Called(ctx, probeAddress) +// CheckFinalizedStateAvailability provides a mock function with given fields: ctx +func (_m *mockRPCClient[CHAIN_ID, HEAD]) CheckFinalizedStateAvailability(ctx context.Context) error { + ret := _m.Called(ctx) if len(ret) == 0 { panic("no return value specified for CheckFinalizedStateAvailability") } var r0 error - if rf, ok := ret.Get(0).(func(context.Context, string) error); ok { - r0 = rf(ctx, probeAddress) + if rf, ok := ret.Get(0).(func(context.Context) error); ok { + r0 = rf(ctx) } else { r0 = ret.Error(0) } @@ -104,14 +104,13 @@ type mockRPCClient_CheckFinalizedStateAvailability_Call[CHAIN_ID ID, HEAD Head] // CheckFinalizedStateAvailability is a helper method to define mock.On call // - ctx context.Context -// - probeAddress string -func (_e *mockRPCClient_Expecter[CHAIN_ID, HEAD]) CheckFinalizedStateAvailability(ctx interface{}, probeAddress interface{}) *mockRPCClient_CheckFinalizedStateAvailability_Call[CHAIN_ID, HEAD] { - return &mockRPCClient_CheckFinalizedStateAvailability_Call[CHAIN_ID, HEAD]{Call: _e.mock.On("CheckFinalizedStateAvailability", ctx, probeAddress)} +func (_e *mockRPCClient_Expecter[CHAIN_ID, HEAD]) CheckFinalizedStateAvailability(ctx interface{}) *mockRPCClient_CheckFinalizedStateAvailability_Call[CHAIN_ID, HEAD] { + return &mockRPCClient_CheckFinalizedStateAvailability_Call[CHAIN_ID, HEAD]{Call: _e.mock.On("CheckFinalizedStateAvailability", ctx)} } -func (_c *mockRPCClient_CheckFinalizedStateAvailability_Call[CHAIN_ID, HEAD]) Run(run func(ctx context.Context, probeAddress string)) *mockRPCClient_CheckFinalizedStateAvailability_Call[CHAIN_ID, HEAD] { +func (_c *mockRPCClient_CheckFinalizedStateAvailability_Call[CHAIN_ID, HEAD]) Run(run func(ctx context.Context)) *mockRPCClient_CheckFinalizedStateAvailability_Call[CHAIN_ID, HEAD] { _c.Call.Run(func(args mock.Arguments) { - run(args[0].(context.Context), args[1].(string)) + run(args[0].(context.Context)) }) return _c } @@ -121,7 +120,7 @@ func (_c *mockRPCClient_CheckFinalizedStateAvailability_Call[CHAIN_ID, HEAD]) Re return _c } -func (_c *mockRPCClient_CheckFinalizedStateAvailability_Call[CHAIN_ID, HEAD]) RunAndReturn(run func(context.Context, string) error) *mockRPCClient_CheckFinalizedStateAvailability_Call[CHAIN_ID, HEAD] { +func (_c *mockRPCClient_CheckFinalizedStateAvailability_Call[CHAIN_ID, HEAD]) RunAndReturn(run func(context.Context) error) *mockRPCClient_CheckFinalizedStateAvailability_Call[CHAIN_ID, HEAD] { _c.Call.Return(run) return _c } diff --git a/multinode/node.go b/multinode/node.go index a5eee22..9611829 100644 --- a/multinode/node.go +++ b/multinode/node.go @@ -27,10 +27,7 @@ type NodeConfig interface { DeathDeclarationDelay() time.Duration NewHeadsPollInterval() time.Duration VerifyChainID() bool - FinalizedStateCheckEnabled() bool FinalizedStateCheckFailureThreshold() uint32 - FinalizedStateCheckAddress() string - FinalizedStateUnavailableRegex() string } type ChainConfig interface { @@ -52,6 +49,7 @@ type nodeMetrics interface { IncrementNodeTransitionsToInvalidChainID(ctx context.Context, nodeName string) IncrementNodeTransitionsToUnusable(ctx context.Context, nodeName string) IncrementNodeTransitionsToSyncing(ctx context.Context, nodeName string) + IncrementNodeTransitionsToFinalizedStateNotAvailable(ctx context.Context, nodeName string) RecordNodeClientVersion(ctx context.Context, nodeName string, version string) SetHighestSeenBlock(ctx context.Context, nodeName string, blockNumber int64) SetHighestFinalizedBlock(ctx context.Context, nodeName string, blockNumber int64) @@ -59,6 +57,7 @@ type nodeMetrics interface { IncrementPolls(ctx context.Context, nodeName string) IncrementPollsFailed(ctx context.Context, nodeName string) IncrementPollsSuccess(ctx context.Context, nodeName string) + IncrementFinalizedStateFailed(ctx context.Context, nodeName string) } type Node[ @@ -277,7 +276,7 @@ func (n *node[CHAIN_ID, HEAD, RPC]) verifyChainID(callerCtx context.Context, lgg // The node is already closed, and any subsequent transition is invalid. // To make spotting such transitions a bit easier, return the invalid node state. return nodeStateLen - case nodeStateDialed, nodeStateOutOfSync, nodeStateInvalidChainID, nodeStateSyncing: + case nodeStateDialed, nodeStateOutOfSync, nodeStateInvalidChainID, nodeStateSyncing, nodeStateFinalizedStateNotAvailable: default: panic(fmt.Sprintf("cannot verify node in state %v", st)) } diff --git a/multinode/node_fsm.go b/multinode/node_fsm.go index 0299050..52c31c0 100644 --- a/multinode/node_fsm.go +++ b/multinode/node_fsm.go @@ -186,7 +186,7 @@ func (n *node[CHAIN_ID, HEAD, RPC]) transitionToAlive(fn func()) { return } switch n.state { - case nodeStateDialed, nodeStateInvalidChainID, nodeStateSyncing: + case nodeStateDialed, nodeStateInvalidChainID, nodeStateSyncing, nodeStateFinalizedStateNotAvailable: n.state = nodeStateAlive default: panic(transitionFail(n.state, nodeStateAlive)) @@ -368,7 +368,7 @@ func (n *node[CHAIN_ID, HEAD, RPC]) declareFinalizedStateNotAvailable() { func (n *node[CHAIN_ID, HEAD, RPC]) transitionToFinalizedStateNotAvailable(fn func()) { ctx, cancel := n.stopCh.NewCtx() defer cancel() - n.metrics.IncrementNodeTransitionsToUnreachable(ctx, n.name) + n.metrics.IncrementNodeTransitionsToFinalizedStateNotAvailable(ctx, n.name) n.stateMu.Lock() defer n.stateMu.Unlock() if n.state == nodeStateClosed { diff --git a/multinode/node_lifecycle.go b/multinode/node_lifecycle.go index 3553f1f..bc093b6 100644 --- a/multinode/node_lifecycle.go +++ b/multinode/node_lifecycle.go @@ -2,10 +2,10 @@ package multinode import ( "context" + "errors" "fmt" "math" "math/big" - "regexp" "time" "github.com/smartcontractkit/chainlink-common/pkg/logger" @@ -104,10 +104,7 @@ func (n *node[CHAIN_ID, HEAD, RPC]) aliveLoop() { var pollFailures uint32 // Finalized state availability check config - finalizedStateCheckEnabled := n.nodePoolCfg.FinalizedStateCheckEnabled() finalizedStateCheckFailureThreshold := n.nodePoolCfg.FinalizedStateCheckFailureThreshold() - finalizedStateCheckAddress := n.nodePoolCfg.FinalizedStateCheckAddress() - finalizedStateUnavailableRegex := n.nodePoolCfg.FinalizedStateUnavailableRegex() var finalizedStateFailures uint32 for { @@ -159,38 +156,37 @@ func (n *node[CHAIN_ID, HEAD, RPC]) aliveLoop() { return } // Separate finalized state availability check - if finalizedStateCheckEnabled { - stateCheckCtx, stateCheckCancel := context.WithTimeout(ctx, pollInterval) - stateErr := n.RPC().CheckFinalizedStateAvailability(stateCheckCtx, finalizedStateCheckAddress) - stateCheckCancel() - if stateErr != nil { - if isFinalizedStateUnavailableError(stateErr, finalizedStateUnavailableRegex) { - if finalizedStateFailures < math.MaxUint32 { - finalizedStateFailures++ - } - lggr.Warnw("Finalized state not available", "err", stateErr, "failures", finalizedStateFailures, "threshold", finalizedStateCheckFailureThreshold) - if finalizedStateCheckFailureThreshold > 0 && finalizedStateFailures >= finalizedStateCheckFailureThreshold { - lggr.Errorw("RPC node cannot serve finalized state after consecutive failures", "failures", finalizedStateFailures) - if n.poolInfoProvider != nil { - if l, _ := n.poolInfoProvider.LatestChainInfo(); l < 2 && !n.isLoadBalancedRPC { - lggr.Criticalf("RPC endpoint cannot serve finalized state; %s %s", msgCannotDisable, msgDegradedState) - continue - } + stateCheckCtx, stateCheckCancel := context.WithTimeout(ctx, pollInterval) + stateErr := n.RPC().CheckFinalizedStateAvailability(stateCheckCtx) + stateCheckCancel() + if stateErr != nil { + if errors.Is(stateErr, ErrFinalizedStateUnavailable) { + if finalizedStateFailures < math.MaxUint32 { + n.metrics.IncrementFinalizedStateFailed(ctx, n.name) + finalizedStateFailures++ + } + lggr.Warnw("Finalized state not available", "err", stateErr, "failures", finalizedStateFailures, "threshold", finalizedStateCheckFailureThreshold) + if finalizedStateFailures >= finalizedStateCheckFailureThreshold { + lggr.Errorw("RPC node cannot serve finalized state after consecutive failures", "failures", finalizedStateFailures) + if n.poolInfoProvider != nil { + if l, _ := n.poolInfoProvider.LatestChainInfo(); l < 2 && !n.isLoadBalancedRPC { + lggr.Criticalf("RPC endpoint cannot serve finalized state; %s %s", msgCannotDisable, msgDegradedState) + continue } - n.declareFinalizedStateNotAvailable() - return } - } else { - // Treat as RPC reachability error - if pollFailures < math.MaxUint32 { - n.metrics.IncrementPollsFailed(ctx, n.name) - pollFailures++ - } - lggr.Warnw("Finalized state check failed with RPC error", "err", stateErr, "pollFailures", pollFailures) + n.declareFinalizedStateNotAvailable() + return } } else { - finalizedStateFailures = 0 + // Treat as RPC reachability error + if pollFailures < math.MaxUint32 { + n.metrics.IncrementPollsFailed(ctx, n.name) + pollFailures++ + } + lggr.Warnw("Finalized state check failed with RPC error", "err", stateErr, "pollFailures", pollFailures) } + } else { + finalizedStateFailures = 0 } case bh, open := <-headsSub.Heads: if !open { @@ -757,43 +753,30 @@ func (n *node[CHAIN_ID, HEAD, RPC]) finalizedStateNotAvailableLoop() { case <-time.After(dialRetryBackoff.Duration()): lggr.Tracew("Trying to re-dial RPC node", "nodeState", n.getCachedState()) - err := n.rpc.Dial(ctx) - if err != nil { - lggr.Errorw(fmt.Sprintf("Failed to redial RPC node: %v", err), "err", err, "nodeState", n.getCachedState()) + state := n.createVerifiedConn(ctx, lggr) + if state != nodeStateAlive { + n.setState(nodeStateFinalizedStateNotAvailable) continue } - n.setState(nodeStateDialed) - - state := n.verifyConn(ctx, lggr) - switch state { - case nodeStateUnreachable: + stateCheckCtx, stateCheckCancel := context.WithTimeout(ctx, n.nodePoolCfg.PollInterval()) + stateErr := n.RPC().CheckFinalizedStateAvailability(stateCheckCtx) + stateCheckCancel() + if stateErr != nil { + if errors.Is(stateErr, ErrFinalizedStateUnavailable) { + lggr.Warnw("Finalized state still not available", "err", stateErr) + n.setState(nodeStateFinalizedStateNotAvailable) + continue + } + lggr.Warnw("Finalized state check failed with RPC error", "err", stateErr) n.setState(nodeStateFinalizedStateNotAvailable) continue - case nodeStateAlive: - lggr.Infow(fmt.Sprintf("Successfully redialled and verified RPC node %s. Finalized state was unavailable for %s", n.String(), time.Since(unavailableAt)), "nodeState", n.getCachedState()) - fallthrough - default: - n.declareState(state) - return } + + lggr.Infow(fmt.Sprintf("Successfully redialled and verified RPC node %s. Finalized state was unavailable for %s", n.String(), time.Since(unavailableAt)), "nodeState", n.getCachedState()) + n.declareState(nodeStateAlive) + return } } } -// isFinalizedStateUnavailableError checks if the error indicates that the RPC cannot serve -// historical state (as opposed to an RPC reachability issue). -// If regexPattern is empty, all errors are treated as state unavailable errors. -func isFinalizedStateUnavailableError(err error, regexPattern string) bool { - if err == nil { - return false - } - if regexPattern == "" { - return true - } - re, compileErr := regexp.Compile(regexPattern) - if compileErr != nil { - return true - } - return re.MatchString(err.Error()) -} diff --git a/multinode/node_lifecycle_test.go b/multinode/node_lifecycle_test.go index dca2531..01cf0c2 100644 --- a/multinode/node_lifecycle_test.go +++ b/multinode/node_lifecycle_test.go @@ -1,6 +1,7 @@ package multinode import ( + "context" "errors" "fmt" "math/big" @@ -9,6 +10,7 @@ import ( "sync" "sync/atomic" "testing" + "time" prom "github.com/prometheus/client_model/go" "github.com/stretchr/testify/assert" @@ -2177,3 +2179,135 @@ func TestNode_State(t *testing.T) { }) } } + +func TestUnit_NodeLifecycle_finalizedStateNotAvailableLoop(t *testing.T) { + t.Parallel() + + newFinalizedStateNotAvailableNode := func(t *testing.T, opts testNodeOpts) testNode { + node := newTestNode(t, opts) + opts.rpc.On("Close").Return(nil) + node.setState(nodeStateFinalizedStateNotAvailable) + return node + } + + t.Run("returns on closed", func(t *testing.T) { + t.Parallel() + node := newTestNode(t, testNodeOpts{}) + node.setState(nodeStateClosed) + node.wg.Add(1) + node.finalizedStateNotAvailableLoop() + }) + + t.Run("on failed dial keeps trying", func(t *testing.T) { + t.Parallel() + rpc := newMockRPCClient[ID, Head](t) + nodeChainID := RandomID() + lggr, observedLogs := logger.TestObserved(t, zap.DebugLevel) + node := newFinalizedStateNotAvailableNode(t, testNodeOpts{ + rpc: rpc, + chainID: nodeChainID, + lggr: lggr, + }) + defer func() { assert.NoError(t, node.close()) }() + + rpc.On("Dial", mock.Anything).Return(errors.New("failed to dial")) + node.wg.Add(1) + go node.finalizedStateNotAvailableLoop() + tests.AssertLogCountEventually(t, observedLogs, "Node is unreachable", 2) + }) + + t.Run("on finalized state still unavailable keeps trying", func(t *testing.T) { + t.Parallel() + rpc := newMockRPCClient[ID, Head](t) + nodeChainID := RandomID() + lggr, observedLogs := logger.TestObserved(t, zap.DebugLevel) + node := newFinalizedStateNotAvailableNode(t, testNodeOpts{ + rpc: rpc, + chainID: nodeChainID, + lggr: lggr, + }) + defer func() { assert.NoError(t, node.close()) }() + + rpc.On("Dial", mock.Anything).Return(nil) + rpc.On("ChainID", mock.Anything).Return(nodeChainID, nil) + rpc.On("CheckFinalizedStateAvailability", mock.Anything).Return(fmt.Errorf("%w: missing trie node", ErrFinalizedStateUnavailable)) + + node.wg.Add(1) + go node.finalizedStateNotAvailableLoop() + tests.AssertLogCountEventually(t, observedLogs, "Finalized state still not available", 2) + }) + + t.Run("on successful verification and state check becomes alive", func(t *testing.T) { + t.Parallel() + rpc := newMockRPCClient[ID, Head](t) + nodeChainID := RandomID() + node := newFinalizedStateNotAvailableNode(t, testNodeOpts{ + rpc: rpc, + chainID: nodeChainID, + }) + defer func() { assert.NoError(t, node.close()) }() + + rpc.On("Dial", mock.Anything).Return(nil) + rpc.On("ChainID", mock.Anything).Return(nodeChainID, nil) + rpc.On("CheckFinalizedStateAvailability", mock.Anything).Return(nil) + + setupRPCForAliveLoop(t, rpc) + + node.wg.Add(1) + go node.finalizedStateNotAvailableLoop() + tests.AssertEventually(t, func() bool { + return node.State() == nodeStateAlive + }) + }) + + t.Run("transitions from alive to finalizedStateNotAvailable and back", func(t *testing.T) { + t.Parallel() + rpc := newMockRPCClient[ID, Head](t) + nodeChainID := RandomID() + node := newTestNode(t, testNodeOpts{ + rpc: rpc, + chainID: nodeChainID, + config: testNodeConfig{ + pollInterval: 10 * time.Millisecond, + finalizedStateCheckFailureThreshold: 2, + }, + }) + defer func() { assert.NoError(t, node.close()) }() + + rpc.On("Close").Return(nil) + rpc.On("Dial", mock.Anything).Return(nil) + rpc.On("ChainID", mock.Anything).Return(nodeChainID, nil) + + sub := newMockSubscription(t) + sub.On("Err").Return(nil).Maybe() + sub.On("Unsubscribe").Maybe() + headsCh := make(chan Head) + rpc.On("SubscribeToHeads", mock.Anything).Return((<-chan Head)(headsCh), sub, nil).Maybe() + rpc.On("SubscribeToFinalizedHeads", mock.Anything).Return(make(<-chan Head), sub, nil).Maybe() + rpc.On("SetAliveLoopSub", mock.Anything).Maybe() + rpc.On("GetInterceptedChainInfo").Return(ChainInfo{}, ChainInfo{}).Maybe() + rpc.On("ClientVersion", mock.Anything).Return("test-version", nil).Maybe() + rpc.On("PollHealthCheck", mock.Anything).Return(nil).Maybe() + + var stateCheckCallCount int32 + rpc.On("CheckFinalizedStateAvailability", mock.Anything).Return(func(ctx context.Context) error { + count := atomic.AddInt32(&stateCheckCallCount, 1) + if count <= 2 { + return fmt.Errorf("%w: missing trie node", ErrFinalizedStateUnavailable) + } + return nil + }).Maybe() + + node.setState(nodeStateAlive) + node.wg.Add(1) + go node.aliveLoop() + + tests.AssertEventually(t, func() bool { + return node.State() == nodeStateFinalizedStateNotAvailable + }) + + tests.AssertEventually(t, func() bool { + return node.State() == nodeStateAlive + }) + }) +} diff --git a/multinode/node_test.go b/multinode/node_test.go index cf4af6c..440209c 100644 --- a/multinode/node_test.go +++ b/multinode/node_test.go @@ -15,15 +15,16 @@ import ( ) type testNodeConfig struct { - pollFailureThreshold uint32 - pollInterval time.Duration - selectionMode string - syncThreshold uint32 - nodeIsSyncingEnabled bool - enforceRepeatableRead bool - finalizedBlockPollInterval time.Duration - deathDeclarationDelay time.Duration - newHeadsPollInterval time.Duration + pollFailureThreshold uint32 + pollInterval time.Duration + selectionMode string + syncThreshold uint32 + nodeIsSyncingEnabled bool + enforceRepeatableRead bool + finalizedBlockPollInterval time.Duration + deathDeclarationDelay time.Duration + newHeadsPollInterval time.Duration + finalizedStateCheckFailureThreshold uint32 } func (n testNodeConfig) NewHeadsPollInterval() time.Duration { @@ -66,20 +67,8 @@ func (n testNodeConfig) VerifyChainID() bool { return true } -func (n testNodeConfig) FinalizedStateCheckEnabled() bool { - return false -} - func (n testNodeConfig) FinalizedStateCheckFailureThreshold() uint32 { - return 0 -} - -func (n testNodeConfig) FinalizedStateCheckAddress() string { - return "" -} - -func (n testNodeConfig) FinalizedStateUnavailableRegex() string { - return "" + return n.finalizedStateCheckFailureThreshold } type testNode struct { @@ -145,12 +134,14 @@ func makeMockNodeMetrics(t *testing.T) *mockNodeMetrics { mockMetrics.On("IncrementNodeTransitionsToInvalidChainID", mock.Anything, mock.Anything).Maybe() mockMetrics.On("IncrementNodeTransitionsToUnusable", mock.Anything, mock.Anything).Maybe() mockMetrics.On("IncrementNodeTransitionsToSyncing", mock.Anything, mock.Anything).Maybe() + mockMetrics.On("IncrementNodeTransitionsToFinalizedStateNotAvailable", mock.Anything, mock.Anything).Maybe() mockMetrics.On("SetHighestSeenBlock", mock.Anything, mock.Anything, mock.Anything).Maybe() mockMetrics.On("SetHighestFinalizedBlock", mock.Anything, mock.Anything, mock.Anything).Maybe() mockMetrics.On("IncrementSeenBlocks", mock.Anything, mock.Anything).Maybe() mockMetrics.On("IncrementPolls", mock.Anything, mock.Anything).Maybe() mockMetrics.On("IncrementPollsFailed", mock.Anything, mock.Anything).Maybe() mockMetrics.On("IncrementPollsSuccess", mock.Anything, mock.Anything).Maybe() + mockMetrics.On("IncrementFinalizedStateFailed", mock.Anything, mock.Anything).Maybe() mockMetrics.On("RecordNodeClientVersion", mock.Anything, mock.Anything, mock.Anything).Maybe() return mockMetrics } diff --git a/multinode/rpc_client_base.go b/multinode/rpc_client_base.go index 6ac4414..c900483 100644 --- a/multinode/rpc_client_base.go +++ b/multinode/rpc_client_base.go @@ -307,7 +307,6 @@ func (m *RPCClientBase[HEAD]) PollHealthCheck(ctx context.Context) error { // CheckFinalizedStateAvailability provides a default no-op implementation for the RPCClient interface. // Chain-specific RPC clients can override this method to verify that the RPC can serve // historical state at the finalized block (e.g., by calling eth_getBalance at the finalized block). -// The probeAddress parameter is used to perform a state query at the finalized block. -func (m *RPCClientBase[HEAD]) CheckFinalizedStateAvailability(ctx context.Context, probeAddress string) error { +func (m *RPCClientBase[HEAD]) CheckFinalizedStateAvailability(ctx context.Context) error { return nil } diff --git a/multinode/types.go b/multinode/types.go index 4cb9c04..beebd22 100644 --- a/multinode/types.go +++ b/multinode/types.go @@ -2,10 +2,15 @@ package multinode import ( "context" + "errors" "fmt" "math/big" ) +// ErrFinalizedStateUnavailable is returned by CheckFinalizedStateAvailability when the RPC +// cannot serve historical state at the finalized block (e.g., pruned/non-archive node). +var ErrFinalizedStateUnavailable = errors.New("finalized state unavailable") + // ID represents the base type, for any chain's ID. // It should be convertible to a string, that can uniquely identify this chain type ID fmt.Stringer @@ -83,9 +88,9 @@ type RPCClient[ PollHealthCheck(ctx context.Context) error // CheckFinalizedStateAvailability - verifies if the RPC can serve historical state at the finalized block. // This is used to detect non-archive nodes that cannot serve state queries for older blocks. - // The probeAddress is used to call a state query (e.g., eth_getBalance) at the finalized block. - // Return nil if the check passes or is not applicable, or an error if the check fails. - CheckFinalizedStateAvailability(ctx context.Context, probeAddress string) error + // Returns ErrFinalizedStateUnavailable if the RPC cannot serve historical state. + // Returns nil if the check passes or is not applicable, or another error for RPC issues. + CheckFinalizedStateAvailability(ctx context.Context) error } // Head is the interface required by the NodeClient From 61b3d253c3f4869f780c1c0803ac93847caec575 Mon Sep 17 00:00:00 2001 From: Krish-vemula Date: Tue, 10 Mar 2026 13:09:10 -0500 Subject: [PATCH 6/9] Update metrics dependency --- multinode/go.mod | 2 +- multinode/go.sum | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/multinode/go.mod b/multinode/go.mod index c18efc1..02b8bb6 100644 --- a/multinode/go.mod +++ b/multinode/go.mod @@ -7,7 +7,7 @@ require ( github.com/pkg/errors v0.9.1 github.com/prometheus/client_model v0.6.2 github.com/smartcontractkit/chainlink-common v0.10.1-0.20260305114348-b8bbac30bfc7 - github.com/smartcontractkit/chainlink-framework/metrics v0.0.0-20250717121125-2350c82883e2 + github.com/smartcontractkit/chainlink-framework/metrics v0.0.0-20260305141558-e2fed5909312 github.com/stretchr/testify v1.11.1 go.uber.org/zap v1.27.1 ) diff --git a/multinode/go.sum b/multinode/go.sum index 459485f..6435a86 100644 --- a/multinode/go.sum +++ b/multinode/go.sum @@ -80,8 +80,8 @@ github.com/smartcontractkit/chainlink-common v0.10.1-0.20260305114348-b8bbac30bf github.com/smartcontractkit/chainlink-common v0.10.1-0.20260305114348-b8bbac30bfc7/go.mod h1:0ghbAr7tRO0tT5ZqBXhOyzgUO37tNNe33Yn0hskauVM= github.com/smartcontractkit/chainlink-common/pkg/chipingress v0.0.10 h1:FJAFgXS9oqASnkS03RE1HQwYQQxrO4l46O5JSzxqLgg= github.com/smartcontractkit/chainlink-common/pkg/chipingress v0.0.10/go.mod h1:oiDa54M0FwxevWwyAX773lwdWvFYYlYHHQV1LQ5HpWY= -github.com/smartcontractkit/chainlink-framework/metrics v0.0.0-20250717121125-2350c82883e2 h1:ysZjKH+BpWlQhF93kr/Lc668UlCvT9NjfcsGdZT19I8= -github.com/smartcontractkit/chainlink-framework/metrics v0.0.0-20250717121125-2350c82883e2/go.mod h1:jo+cUqNcHwN8IF7SInQNXDZ8qzBsyMpnLdYbDswviFc= +github.com/smartcontractkit/chainlink-framework/metrics v0.0.0-20260305141558-e2fed5909312 h1:encVG6/5zxbnKOyHDkYD/aiYS9vOfFP2FgSj0PVhdXg= +github.com/smartcontractkit/chainlink-framework/metrics v0.0.0-20260305141558-e2fed5909312/go.mod h1:HG/aei0MgBOpsyRLexdKGtOUO8yjSJO3iUu0Uu8KBm4= github.com/smartcontractkit/freeport v0.1.3-0.20250716200817-cb5dfd0e369e h1:Hv9Mww35LrufCdM9wtS9yVi/rEWGI1UnjHbcKKU0nVY= github.com/smartcontractkit/freeport v0.1.3-0.20250716200817-cb5dfd0e369e/go.mod h1:T4zH9R8R8lVWKfU7tUvYz2o2jMv1OpGCdpY2j2QZXzU= github.com/smartcontractkit/libocr v0.0.0-20250912173940-f3ab0246e23d h1:LokA9PoCNb8mm8mDT52c3RECPMRsGz1eCQORq+J3n74= From c11a9a38af98c02f87aa200df319b0af5d02d25e Mon Sep 17 00:00:00 2001 From: Krish-vemula Date: Tue, 10 Mar 2026 13:15:26 -0500 Subject: [PATCH 7/9] Update metrics dependency to include IncrementFinalizedStateFailed --- multinode/go.mod | 2 +- multinode/go.sum | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/multinode/go.mod b/multinode/go.mod index 02b8bb6..902ee85 100644 --- a/multinode/go.mod +++ b/multinode/go.mod @@ -7,7 +7,7 @@ require ( github.com/pkg/errors v0.9.1 github.com/prometheus/client_model v0.6.2 github.com/smartcontractkit/chainlink-common v0.10.1-0.20260305114348-b8bbac30bfc7 - github.com/smartcontractkit/chainlink-framework/metrics v0.0.0-20260305141558-e2fed5909312 + github.com/smartcontractkit/chainlink-framework/metrics v0.0.0-20260310180305-3ee91a6d9ae9 github.com/stretchr/testify v1.11.1 go.uber.org/zap v1.27.1 ) diff --git a/multinode/go.sum b/multinode/go.sum index 6435a86..5c9709e 100644 --- a/multinode/go.sum +++ b/multinode/go.sum @@ -80,8 +80,8 @@ github.com/smartcontractkit/chainlink-common v0.10.1-0.20260305114348-b8bbac30bf github.com/smartcontractkit/chainlink-common v0.10.1-0.20260305114348-b8bbac30bfc7/go.mod h1:0ghbAr7tRO0tT5ZqBXhOyzgUO37tNNe33Yn0hskauVM= github.com/smartcontractkit/chainlink-common/pkg/chipingress v0.0.10 h1:FJAFgXS9oqASnkS03RE1HQwYQQxrO4l46O5JSzxqLgg= github.com/smartcontractkit/chainlink-common/pkg/chipingress v0.0.10/go.mod h1:oiDa54M0FwxevWwyAX773lwdWvFYYlYHHQV1LQ5HpWY= -github.com/smartcontractkit/chainlink-framework/metrics v0.0.0-20260305141558-e2fed5909312 h1:encVG6/5zxbnKOyHDkYD/aiYS9vOfFP2FgSj0PVhdXg= -github.com/smartcontractkit/chainlink-framework/metrics v0.0.0-20260305141558-e2fed5909312/go.mod h1:HG/aei0MgBOpsyRLexdKGtOUO8yjSJO3iUu0Uu8KBm4= +github.com/smartcontractkit/chainlink-framework/metrics v0.0.0-20260310180305-3ee91a6d9ae9 h1:GK+2aFpW/Z5ZnMGCa9NU6o7LKHQ/9xJVZx2yMAMudnc= +github.com/smartcontractkit/chainlink-framework/metrics v0.0.0-20260310180305-3ee91a6d9ae9/go.mod h1:HG/aei0MgBOpsyRLexdKGtOUO8yjSJO3iUu0Uu8KBm4= github.com/smartcontractkit/freeport v0.1.3-0.20250716200817-cb5dfd0e369e h1:Hv9Mww35LrufCdM9wtS9yVi/rEWGI1UnjHbcKKU0nVY= github.com/smartcontractkit/freeport v0.1.3-0.20250716200817-cb5dfd0e369e/go.mod h1:T4zH9R8R8lVWKfU7tUvYz2o2jMv1OpGCdpY2j2QZXzU= github.com/smartcontractkit/libocr v0.0.0-20250912173940-f3ab0246e23d h1:LokA9PoCNb8mm8mDT52c3RECPMRsGz1eCQORq+J3n74= From de1061f7187b1a5a737341cb2026520c5f30b0bf Mon Sep 17 00:00:00 2001 From: Krish-vemula Date: Tue, 10 Mar 2026 13:19:00 -0500 Subject: [PATCH 8/9] Fix goimports formatting --- multinode/config/config.go | 2 +- multinode/node_lifecycle.go | 1 - 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/multinode/config/config.go b/multinode/config/config.go index 2f09905..b218b73 100644 --- a/multinode/config/config.go +++ b/multinode/config/config.go @@ -33,7 +33,7 @@ type MultiNode struct { NoNewFinalizedHeadsThreshold *config.Duration FinalityDepth *uint32 FinalityTagEnabled *bool - FinalizedBlockOffset *uint32 + FinalizedBlockOffset *uint32 // Finalized State Availability Check FinalizedStateCheckFailureThreshold *uint32 diff --git a/multinode/node_lifecycle.go b/multinode/node_lifecycle.go index bc093b6..025a0eb 100644 --- a/multinode/node_lifecycle.go +++ b/multinode/node_lifecycle.go @@ -779,4 +779,3 @@ func (n *node[CHAIN_ID, HEAD, RPC]) finalizedStateNotAvailableLoop() { } } } - From 056829250029d17a53519dc0b9de9252c344bc73 Mon Sep 17 00:00:00 2001 From: Krish-vemula Date: Tue, 10 Mar 2026 13:38:37 -0500 Subject: [PATCH 9/9] Add CheckFinalizedStateAvailability mock expectations to tests --- multinode/node_fsm_test.go | 2 +- multinode/node_lifecycle_test.go | 10 ++++++++++ 2 files changed, 11 insertions(+), 1 deletion(-) diff --git a/multinode/node_fsm_test.go b/multinode/node_fsm_test.go index 17d312c..6701a23 100644 --- a/multinode/node_fsm_test.go +++ b/multinode/node_fsm_test.go @@ -36,7 +36,7 @@ func TestUnit_Node_StateTransitions(t *testing.T) { t.Run("transitionToAlive", func(t *testing.T) { const destinationState = nodeStateAlive - allowedStates := []nodeState{nodeStateDialed, nodeStateInvalidChainID, nodeStateSyncing} + allowedStates := []nodeState{nodeStateDialed, nodeStateInvalidChainID, nodeStateSyncing, nodeStateFinalizedStateNotAvailable} rpc := newMockRPCClient[ID, Head](t) testTransition(t, rpc, testNode.transitionToAlive, destinationState, allowedStates...) }) diff --git a/multinode/node_lifecycle_test.go b/multinode/node_lifecycle_test.go index 01cf0c2..b2943f2 100644 --- a/multinode/node_lifecycle_test.go +++ b/multinode/node_lifecycle_test.go @@ -151,6 +151,8 @@ func TestUnit_NodeLifecycle_aliveLoop(t *testing.T) { rpc.On("ClientVersion", mock.Anything).Return("", nil) // PollHealthCheck is called after successful ClientVersion - return nil to pass rpc.On("PollHealthCheck", mock.Anything).Return(nil).Maybe() + // CheckFinalizedStateAvailability is called after successful polling + rpc.On("CheckFinalizedStateAvailability", mock.Anything).Return(nil).Maybe() node.declareAlive() tests.AssertLogCountEventually(t, observedLogs, fmt.Sprintf("Poll failure, RPC endpoint %s failed to respond properly", node.String()), pollFailureThreshold) tests.AssertLogCountEventually(t, observedLogs, "Ping successful", 2) @@ -174,6 +176,8 @@ func TestUnit_NodeLifecycle_aliveLoop(t *testing.T) { pollError := errors.New("failed to get ClientVersion") rpc.On("ClientVersion", mock.Anything).Return("", pollError) rpc.On("Dial", mock.Anything).Return(errors.New("failed to dial")).Maybe() + // CheckFinalizedStateAvailability may be called + rpc.On("CheckFinalizedStateAvailability", mock.Anything).Return(nil).Maybe() node.declareAlive() tests.AssertLogCountEventually(t, observedLogs, fmt.Sprintf("Poll failure, RPC endpoint %s failed to respond properly", node.String()), pollFailureThreshold) tests.AssertEventually(t, func() bool { @@ -227,6 +231,8 @@ func TestUnit_NodeLifecycle_aliveLoop(t *testing.T) { rpc.On("GetInterceptedChainInfo").Return(ChainInfo{BlockNumber: 20}, ChainInfo{BlockNumber: 20}) pollError := errors.New("failed to get ClientVersion") rpc.On("ClientVersion", mock.Anything).Return("", pollError) + // CheckFinalizedStateAvailability may be called + rpc.On("CheckFinalizedStateAvailability", mock.Anything).Return(nil).Maybe() node.declareAlive() tests.AssertLogEventually(t, observedLogs, fmt.Sprintf("RPC endpoint failed to respond to %d consecutive polls", pollFailureThreshold)) assert.Equal(t, nodeStateAlive, node.State()) @@ -254,6 +260,8 @@ func TestUnit_NodeLifecycle_aliveLoop(t *testing.T) { rpc.On("GetInterceptedChainInfo").Return(ChainInfo{BlockNumber: 20}, ChainInfo{BlockNumber: 20}) pollError := errors.New("failed to get ClientVersion") rpc.On("ClientVersion", mock.Anything).Return("", pollError) + // CheckFinalizedStateAvailability may be called + rpc.On("CheckFinalizedStateAvailability", mock.Anything).Return(nil).Maybe() node.declareAlive() tests.AssertLogEventually(t, observedLogs, fmt.Sprintf("RPC endpoint failed to respond to %d consecutive polls", pollFailureThreshold)) tests.AssertEventually(t, func() bool { @@ -377,6 +385,8 @@ func TestUnit_NodeLifecycle_aliveLoop(t *testing.T) { defer func() { assert.NoError(t, node.close()) }() rpc.On("ClientVersion", mock.Anything).Return("", nil) rpc.On("PollHealthCheck", mock.Anything).Return(nil).Maybe() + // CheckFinalizedStateAvailability is called after successful polling + rpc.On("CheckFinalizedStateAvailability", mock.Anything).Return(nil).Maybe() const mostRecentBlock = 20 rpc.On("GetInterceptedChainInfo").Return(ChainInfo{BlockNumber: mostRecentBlock}, ChainInfo{BlockNumber: 30}) node.declareAlive()