diff --git a/cmd/p2p/sensor/rpc.go b/cmd/p2p/sensor/rpc.go index 1263dea1..e29c6d40 100644 --- a/cmd/p2p/sensor/rpc.go +++ b/cmd/p2p/sensor/rpc.go @@ -9,8 +9,10 @@ import ( "strings" "github.com/0xPolygon/polygon-cli/p2p" + "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common/hexutil" "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/eth/protocols/eth" "github.com/rs/zerolog/log" ) @@ -44,6 +46,7 @@ type rpcError struct { func handleRPC(conns *p2p.Conns, networkID uint64) { // Use network ID as chain ID for signature validation chainID := new(big.Int).SetUint64(networkID) + gpo := p2p.NewGasPriceOracle(conns) mux := http.NewServeMux() mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { @@ -67,7 +70,7 @@ func handleRPC(conns *p2p.Conns, networkID uint64) { trimmed := strings.TrimSpace(string(body)) if len(trimmed) > 0 && trimmed[0] == '[' { // Handle batch request - handleBatchRequest(w, body, conns, chainID) + handleBatchRequest(w, body, conns, chainID, gpo) return } @@ -78,14 +81,22 @@ func handleRPC(conns *p2p.Conns, networkID uint64) { return } - // Handle eth_sendRawTransaction - if req.Method == "eth_sendRawTransaction" { - handleSendRawTransaction(w, req, conns, chainID) - return + // Process request (reuse same logic as batch) + var txs types.Transactions + resp := processRequest(req, conns, chainID, gpo, &txs) + + // Broadcast any transactions + if len(txs) > 0 { + log.Info().Str("hash", txs[0].Hash().Hex()).Msg("Broadcasting transaction") + count := conns.BroadcastTxs(txs) + log.Info().Str("hash", txs[0].Hash().Hex()).Int("peers", count).Msg("Transaction broadcast complete") } - // Method not found - writeError(w, -32601, "Method not found", req.ID) + // Write response + w.Header().Set("Content-Type", "application/json") + if err := json.NewEncoder(w).Encode(resp); err != nil { + log.Error().Err(err).Msg("Failed to encode response") + } }) addr := fmt.Sprintf(":%d", inputSensorParams.RPCPort) @@ -111,23 +122,10 @@ func writeError(w http.ResponseWriter, code int, message string, id any) { } } -// writeResult writes a JSON-RPC 2.0 success response with the specified result and request ID. -func writeResult(w http.ResponseWriter, result any, id any) { - w.Header().Set("Content-Type", "application/json") - response := rpcResponse{ - JSONRPC: "2.0", - Result: result, - ID: id, - } - if err := json.NewEncoder(w).Encode(response); err != nil { - log.Error().Err(err).Msg("Failed to encode result response") - } -} - -// handleBatchRequest processes JSON-RPC 2.0 batch requests, validates all transactions, -// and broadcasts valid transactions to connected peers. Returns a batch response with -// results or errors for each request in the batch. -func handleBatchRequest(w http.ResponseWriter, body []byte, conns *p2p.Conns, chainID *big.Int) { +// handleBatchRequest processes JSON-RPC 2.0 batch requests. +// For eth_sendRawTransaction requests, it collects valid transactions for batch broadcasting. +// Returns a batch response with results or errors for each request. +func handleBatchRequest(w http.ResponseWriter, body []byte, conns *p2p.Conns, chainID *big.Int, gpo *p2p.GasPriceOracle) { // Parse batch of requests var requests []rpcRequest if err := json.Unmarshal(body, &requests); err != nil { @@ -141,35 +139,13 @@ func handleBatchRequest(w http.ResponseWriter, body []byte, conns *p2p.Conns, ch return } - // Process all requests and collect valid transactions for batch broadcasting + // Process all requests responses := make([]rpcResponse, 0, len(requests)) txs := make(types.Transactions, 0) for _, req := range requests { - if req.Method != "eth_sendRawTransaction" { - responses = append(responses, rpcResponse{ - JSONRPC: "2.0", - Error: &rpcError{ - Code: -32601, - Message: "Method not found", - }, - ID: req.ID, - }) - continue - } - - tx, response := validateTx(req, chainID) - if tx == nil { - responses = append(responses, response) - continue - } - - txs = append(txs, tx) - responses = append(responses, rpcResponse{ - JSONRPC: "2.0", - Result: tx.Hash().Hex(), - ID: req.ID, - }) + resp := processRequest(req, conns, chainID, gpo, &txs) + responses = append(responses, resp) } // Broadcast all valid transactions in a single batch if there are any @@ -194,76 +170,118 @@ func handleBatchRequest(w http.ResponseWriter, body []byte, conns *p2p.Conns, ch } } +// newResultResponse creates a success response. +func newResultResponse(result, id any) rpcResponse { + return rpcResponse{JSONRPC: "2.0", Result: result, ID: id} +} + +// newErrorResponse creates an error response. +func newErrorResponse(err *rpcError, id any) rpcResponse { + return rpcResponse{JSONRPC: "2.0", Error: err, ID: id} +} + +// processRequest handles a single RPC request and returns a response. +// For eth_sendRawTransaction, valid transactions are appended to txs for batch broadcasting. +func processRequest(req rpcRequest, conns *p2p.Conns, chainID *big.Int, gpo *p2p.GasPriceOracle, txs *types.Transactions) rpcResponse { + switch req.Method { + case "eth_sendRawTransaction": + tx, resp := validateTx(req, chainID) + if tx == nil { + return resp + } + *txs = append(*txs, tx) + return newResultResponse(tx.Hash().Hex(), req.ID) + + case "eth_chainId": + return newResultResponse(hexutil.EncodeBig(chainID), req.ID) + + case "eth_blockNumber": + head := conns.HeadBlock() + if head.Block == nil { + return newResultResponse(nil, req.ID) + } + return newResultResponse(hexutil.EncodeUint64(head.Block.NumberU64()), req.ID) + + case "eth_gasPrice": + return newResultResponse(hexutil.EncodeBig(gpo.SuggestGasPrice()), req.ID) + + case "eth_maxPriorityFeePerGas": + tip := gpo.SuggestGasTipCap() + if tip == nil { + tip = big.NewInt(1e9) // Default to 1 gwei + } + return newResultResponse(hexutil.EncodeBig(tip), req.ID) + + case "eth_getBlockByHash": + result, err := getBlockByHash(req, conns) + return handleMethodResult(result, err, req.ID) + + case "eth_getBlockByNumber": + result, err := getBlockByNumber(req, conns) + return handleMethodResult(result, err, req.ID) + + case "eth_getTransactionByHash": + result, err := getTransactionByHash(req, conns) + return handleMethodResult(result, err, req.ID) + + case "eth_getTransactionByBlockHashAndIndex": + result, err := getTransactionByBlockHashAndIndex(req, conns) + return handleMethodResult(result, err, req.ID) + + case "eth_getBlockTransactionCountByHash": + result, err := getBlockTransactionCountByHash(req, conns) + return handleMethodResult(result, err, req.ID) + + case "eth_getUncleCountByBlockHash": + result, err := getUncleCountByBlockHash(req, conns) + return handleMethodResult(result, err, req.ID) + + default: + return newErrorResponse(&rpcError{Code: -32601, Message: "Method not found"}, req.ID) + } +} + +// handleMethodResult converts a method's result and error into an rpcResponse. +func handleMethodResult(result any, err *rpcError, id any) rpcResponse { + if err != nil { + return newErrorResponse(err, id) + } + return newResultResponse(result, id) +} + // validateTx validates a transaction from a JSON-RPC request by decoding the raw // transaction hex, unmarshaling it, and verifying the signature. Returns the transaction if valid // (with an empty response), or nil transaction with an error response if validation fails. func validateTx(req rpcRequest, chainID *big.Int) (*types.Transaction, rpcResponse) { - // Check params + invalidParams := func(msg string) rpcResponse { + return newErrorResponse(&rpcError{Code: -32602, Message: msg}, req.ID) + } + if len(req.Params) == 0 { - return nil, rpcResponse{ - JSONRPC: "2.0", - Error: &rpcError{ - Code: -32602, - Message: "Invalid params: missing raw transaction", - }, - ID: req.ID, - } + return nil, invalidParams("Invalid params: missing raw transaction") } - // Extract raw transaction hex string hex, ok := req.Params[0].(string) if !ok { - return nil, rpcResponse{ - JSONRPC: "2.0", - Error: &rpcError{ - Code: -32602, - Message: "Invalid params: raw transaction must be a hex string", - }, - ID: req.ID, - } + return nil, invalidParams("Invalid params: raw transaction must be a hex string") } - // Decode hex string to bytes bytes, err := hexutil.Decode(hex) if err != nil { - return nil, rpcResponse{ - JSONRPC: "2.0", - Error: &rpcError{ - Code: -32602, - Message: fmt.Sprintf("Invalid transaction hex: %v", err), - }, - ID: req.ID, - } + return nil, invalidParams(fmt.Sprintf("Invalid transaction hex: %v", err)) } - // Unmarshal transaction tx := new(types.Transaction) if err = tx.UnmarshalBinary(bytes); err != nil { - return nil, rpcResponse{ - JSONRPC: "2.0", - Error: &rpcError{ - Code: -32602, - Message: fmt.Sprintf("Invalid transaction encoding: %v", err), - }, - ID: req.ID, - } + return nil, invalidParams(fmt.Sprintf("Invalid transaction encoding: %v", err)) } - // Validate transaction signature signer := types.LatestSignerForChainID(chainID) sender, err := types.Sender(signer, tx) if err != nil { - return nil, rpcResponse{ - JSONRPC: "2.0", - Error: &rpcError{ - Code: -32602, - Message: fmt.Sprintf("Invalid transaction signature: %v", err), - }, - ID: req.ID, - } + return nil, invalidParams(fmt.Sprintf("Invalid transaction signature: %v", err)) } - // Log the transaction to := "nil" if tx.To() != nil { to = tx.To().Hex() @@ -280,26 +298,359 @@ func validateTx(req rpcRequest, chainID *big.Int) (*types.Transaction, rpcRespon return tx, rpcResponse{} } -// handleSendRawTransaction processes eth_sendRawTransaction requests, validates the -// transaction, broadcasts it to all connected peers, and writes the transaction hash -// as a JSON-RPC response. -func handleSendRawTransaction(w http.ResponseWriter, req rpcRequest, conns *p2p.Conns, chainID *big.Int) { - tx, response := validateTx(req, chainID) - if tx == nil { - writeError(w, response.Error.Code, response.Error.Message, response.ID) - return +// parseFullTxParam extracts the fullTx boolean from params[1], defaulting to false. +func parseFullTxParam(params []any) bool { + if len(params) >= 2 { + if fullTx, ok := params[1].(bool); ok { + return fullTx + } + } + return false +} + +// getBlockByHash retrieves a block by its hash from the cache. +func getBlockByHash(req rpcRequest, conns *p2p.Conns) (any, *rpcError) { + if len(req.Params) < 1 { + return nil, &rpcError{Code: -32602, Message: "missing block hash parameter"} } - log.Info(). - Str("hash", tx.Hash().Hex()). - Msg("Broadcasting transaction") + hashStr, ok := req.Params[0].(string) + if !ok { + return nil, &rpcError{Code: -32602, Message: "invalid block hash parameter"} + } - count := conns.BroadcastTx(tx) + hash := common.HexToHash(hashStr) + cache, ok := conns.Blocks().Get(hash) + if !ok { + return nil, nil // Return null for not found (per spec) + } - log.Info(). - Str("hash", tx.Hash().Hex()). - Int("peers", count). - Msg("Transaction broadcast complete") + return formatBlockResponse(hash, cache, parseFullTxParam(req.Params)), nil +} + +// getBlockByNumber retrieves a block by its number from the cache. +func getBlockByNumber(req rpcRequest, conns *p2p.Conns) (any, *rpcError) { + if len(req.Params) < 1 { + return nil, &rpcError{Code: -32602, Message: "missing block number parameter"} + } + + blockNumParam, ok := req.Params[0].(string) + if !ok { + return nil, &rpcError{Code: -32602, Message: "invalid block number parameter"} + } + + var hash common.Hash + var cache p2p.BlockCache + var found bool + + switch blockNumParam { + case "latest", "pending": + head := conns.HeadBlock() + if head.Block == nil { + return nil, nil + } + hash = head.Block.Hash() + cache, found = conns.Blocks().Get(hash) + if !found { + // Construct cache from head block + cache = p2p.BlockCache{ + Header: head.Block.Header(), + Body: ð.BlockBody{ + Transactions: head.Block.Transactions(), + Uncles: head.Block.Uncles(), + }, + TD: head.TD, + } + found = true + } + case "earliest": + hash, cache, found = conns.GetBlockByNumber(0) + default: + num, err := hexutil.DecodeUint64(blockNumParam) + if err != nil { + return nil, &rpcError{Code: -32602, Message: "invalid block number: " + err.Error()} + } + hash, cache, found = conns.GetBlockByNumber(num) + } + + if !found { + return nil, nil + } + + return formatBlockResponse(hash, cache, parseFullTxParam(req.Params)), nil +} + +// getTransactionByHash retrieves a transaction by its hash from the cache. +func getTransactionByHash(req rpcRequest, conns *p2p.Conns) (any, *rpcError) { + if len(req.Params) < 1 { + return nil, &rpcError{Code: -32602, Message: "missing transaction hash parameter"} + } + + hashStr, ok := req.Params[0].(string) + if !ok { + return nil, &rpcError{Code: -32602, Message: "invalid transaction hash parameter"} + } + + hash := common.HexToHash(hashStr) + + // First check the transactions cache + tx, ok := conns.GetTx(hash) + if ok { + return formatTransactionResponse(tx, common.Hash{}, nil, 0), nil + } + + // Search in blocks for the transaction + for _, blockHash := range conns.Blocks().Keys() { + cache, ok := conns.Blocks().Peek(blockHash) + if !ok || cache.Body == nil { + continue + } + for i, tx := range cache.Body.Transactions { + if tx.Hash() == hash { + return formatTransactionResponse(tx, blockHash, cache.Header, uint64(i)), nil + } + } + } + + return nil, nil +} + +// getTransactionByBlockHashAndIndex retrieves a transaction by block hash and index. +func getTransactionByBlockHashAndIndex(req rpcRequest, conns *p2p.Conns) (any, *rpcError) { + if len(req.Params) < 2 { + return nil, &rpcError{Code: -32602, Message: "missing block hash or index parameter"} + } + + hashStr, ok := req.Params[0].(string) + if !ok { + return nil, &rpcError{Code: -32602, Message: "invalid block hash parameter"} + } + + indexStr, ok := req.Params[1].(string) + if !ok { + return nil, &rpcError{Code: -32602, Message: "invalid index parameter"} + } + + index, err := hexutil.DecodeUint64(indexStr) + if err != nil { + return nil, &rpcError{Code: -32602, Message: "invalid index: " + err.Error()} + } + + blockHash := common.HexToHash(hashStr) + cache, ok := conns.Blocks().Get(blockHash) + if !ok || cache.Body == nil { + return nil, nil + } + + if int(index) >= len(cache.Body.Transactions) { + return nil, nil + } + + tx := cache.Body.Transactions[index] + return formatTransactionResponse(tx, blockHash, cache.Header, index), nil +} + +// getBlockCacheByHashParam parses a block hash from params[0] and returns the block cache. +// Returns the cache and nil error on success, or nil cache and error on parse failure. +// If the block is not found, returns nil cache with nil error (per JSON-RPC spec). +func getBlockCacheByHashParam(req rpcRequest, conns *p2p.Conns) (p2p.BlockCache, *rpcError) { + if len(req.Params) < 1 { + return p2p.BlockCache{}, &rpcError{Code: -32602, Message: "missing block hash parameter"} + } + + hashStr, ok := req.Params[0].(string) + if !ok { + return p2p.BlockCache{}, &rpcError{Code: -32602, Message: "invalid block hash parameter"} + } + + hash := common.HexToHash(hashStr) + cache, ok := conns.Blocks().Get(hash) + if !ok || cache.Body == nil { + return p2p.BlockCache{}, nil + } + + return cache, nil +} + +// getBlockTransactionCountByHash returns the transaction count in a block. +func getBlockTransactionCountByHash(req rpcRequest, conns *p2p.Conns) (any, *rpcError) { + cache, err := getBlockCacheByHashParam(req, conns) + if err != nil || cache.Body == nil { + return nil, err + } + return hexutil.EncodeUint64(uint64(len(cache.Body.Transactions))), nil +} + +// getUncleCountByBlockHash returns the uncle count in a block. +func getUncleCountByBlockHash(req rpcRequest, conns *p2p.Conns) (any, *rpcError) { + cache, err := getBlockCacheByHashParam(req, conns) + if err != nil || cache.Body == nil { + return nil, err + } + return hexutil.EncodeUint64(uint64(len(cache.Body.Uncles))), nil +} + +// formatBlockResponse formats a block cache into the Ethereum JSON-RPC block format. +func formatBlockResponse(hash common.Hash, cache p2p.BlockCache, fullTx bool) map[string]any { + header := cache.Header + if header == nil { + return nil + } + + result := map[string]any{ + "hash": hash.Hex(), + "number": hexutil.EncodeUint64(header.Number.Uint64()), + "parentHash": header.ParentHash.Hex(), + "nonce": hexutil.EncodeUint64(header.Nonce.Uint64()), + "sha3Uncles": header.UncleHash.Hex(), + "logsBloom": hexutil.Encode(header.Bloom.Bytes()), + "transactionsRoot": header.TxHash.Hex(), + "stateRoot": header.Root.Hex(), + "receiptsRoot": header.ReceiptHash.Hex(), + "miner": header.Coinbase.Hex(), + "difficulty": hexutil.EncodeBig(header.Difficulty), + "extraData": hexutil.Encode(header.Extra), + "gasLimit": hexutil.EncodeUint64(header.GasLimit), + "gasUsed": hexutil.EncodeUint64(header.GasUsed), + "timestamp": hexutil.EncodeUint64(header.Time), + "mixHash": header.MixDigest.Hex(), + } + + if header.BaseFee != nil { + result["baseFeePerGas"] = hexutil.EncodeBig(header.BaseFee) + } + + if header.WithdrawalsHash != nil { + result["withdrawalsRoot"] = header.WithdrawalsHash.Hex() + } + + if header.BlobGasUsed != nil { + result["blobGasUsed"] = hexutil.EncodeUint64(*header.BlobGasUsed) + } + + if header.ExcessBlobGas != nil { + result["excessBlobGas"] = hexutil.EncodeUint64(*header.ExcessBlobGas) + } + + if header.ParentBeaconRoot != nil { + result["parentBeaconBlockRoot"] = header.ParentBeaconRoot.Hex() + } + + // Add total difficulty if available + if cache.TD != nil { + result["totalDifficulty"] = hexutil.EncodeBig(cache.TD) + } + + // Add transactions + if cache.Body != nil && cache.Body.Transactions != nil { + if fullTx { + txs := make([]map[string]any, len(cache.Body.Transactions)) + for i, tx := range cache.Body.Transactions { + txs[i] = formatTransactionResponse(tx, hash, header, uint64(i)) + } + result["transactions"] = txs + } else { + txHashes := make([]string, len(cache.Body.Transactions)) + for i, tx := range cache.Body.Transactions { + txHashes[i] = tx.Hash().Hex() + } + result["transactions"] = txHashes + } + } else { + result["transactions"] = []string{} + } + + // Add uncles + if cache.Body != nil && cache.Body.Uncles != nil { + uncleHashes := make([]string, len(cache.Body.Uncles)) + for i, uncle := range cache.Body.Uncles { + uncleHashes[i] = uncle.Hash().Hex() + } + result["uncles"] = uncleHashes + } else { + result["uncles"] = []string{} + } + + // Add size (approximate based on header + body) + result["size"] = hexutil.EncodeUint64(0) // We don't have exact size; use 0 + + return result +} + +// formatTransactionResponse formats a transaction into the Ethereum JSON-RPC format. +// If blockHash is empty, the transaction is considered pending. +func formatTransactionResponse(tx *types.Transaction, blockHash common.Hash, header *types.Header, index uint64) map[string]any { + v, r, s := tx.RawSignatureValues() + + result := map[string]any{ + "hash": tx.Hash().Hex(), + "nonce": hexutil.EncodeUint64(tx.Nonce()), + "gas": hexutil.EncodeUint64(tx.Gas()), + "value": hexutil.EncodeBig(tx.Value()), + "input": hexutil.Encode(tx.Data()), + "v": hexutil.EncodeBig(v), + "r": hexutil.EncodeBig(r), + "s": hexutil.EncodeBig(s), + "type": hexutil.EncodeUint64(uint64(tx.Type())), + } + + if tx.To() != nil { + result["to"] = tx.To().Hex() + } else { + result["to"] = nil + } + + // Add from address if we can derive it + signer := types.LatestSignerForChainID(tx.ChainId()) + if from, err := types.Sender(signer, tx); err == nil { + result["from"] = from.Hex() + } + + // Set gas price fields based on transaction type + switch tx.Type() { + case types.LegacyTxType, types.AccessListTxType: + result["gasPrice"] = hexutil.EncodeBig(tx.GasPrice()) + case types.DynamicFeeTxType, types.BlobTxType: + result["maxFeePerGas"] = hexutil.EncodeBig(tx.GasFeeCap()) + result["maxPriorityFeePerGas"] = hexutil.EncodeBig(tx.GasTipCap()) + // For EIP-1559 txs, also set gasPrice to effective gas price if in a block + if header != nil && header.BaseFee != nil { + effectiveGasPrice := new(big.Int).Add(header.BaseFee, tx.GasTipCap()) + if effectiveGasPrice.Cmp(tx.GasFeeCap()) > 0 { + effectiveGasPrice = tx.GasFeeCap() + } + result["gasPrice"] = hexutil.EncodeBig(effectiveGasPrice) + } else { + result["gasPrice"] = hexutil.EncodeBig(tx.GasFeeCap()) + } + } + + // Add chain ID if present + if tx.ChainId() != nil { + result["chainId"] = hexutil.EncodeBig(tx.ChainId()) + } + + // Add access list if present + if tx.AccessList() != nil { + result["accessList"] = tx.AccessList() + } + + // Add blob-specific fields + if tx.Type() == types.BlobTxType { + result["maxFeePerBlobGas"] = hexutil.EncodeBig(tx.BlobGasFeeCap()) + result["blobVersionedHashes"] = tx.BlobHashes() + } + + // Add block info if transaction is in a block + if blockHash != (common.Hash{}) && header != nil { + result["blockHash"] = blockHash.Hex() + result["blockNumber"] = hexutil.EncodeUint64(header.Number.Uint64()) + result["transactionIndex"] = hexutil.EncodeUint64(index) + } else { + result["blockHash"] = nil + result["blockNumber"] = nil + result["transactionIndex"] = nil + } - writeResult(w, tx.Hash().Hex(), req.ID) + return result } diff --git a/cmd/p2p/sensor/usage.md b/cmd/p2p/sensor/usage.md index c45a77c5..37109552 100644 --- a/cmd/p2p/sensor/usage.md +++ b/cmd/p2p/sensor/usage.md @@ -9,6 +9,34 @@ created automatically. The bootnodes may change, so refer to the [Polygon Knowledge Layer][bootnodes] if the sensor is not discovering peers. +## JSON-RPC Server + +The sensor runs a JSON-RPC server on port 8545 (configurable via `--rpc-port`) +that supports a subset of Ethereum JSON-RPC methods using cached data. + +### Supported Methods + +| Method | Description | +|--------|-------------| +| `eth_chainId` | Returns the chain ID | +| `eth_blockNumber` | Returns the current head block number | +| `eth_gasPrice` | Returns suggested gas price based on recent blocks | +| `eth_getBlockByHash` | Returns block by hash | +| `eth_getBlockByNumber` | Returns block by number (if cached) | +| `eth_getTransactionByHash` | Returns transaction by hash | +| `eth_getTransactionByBlockHashAndIndex` | Returns transaction at index in block | +| `eth_getBlockTransactionCountByHash` | Returns transaction count in block | +| `eth_getUncleCountByBlockHash` | Returns uncle count in block | +| `eth_sendRawTransaction` | Broadcasts signed transaction to peers | + +### Limitations + +Methods requiring state or receipts are not supported: +- `eth_getBalance`, `eth_getCode`, `eth_call`, `eth_estimateGas` +- `eth_getTransactionReceipt`, `eth_getLogs` + +Data is served from an LRU cache, so older blocks/transactions may not be available. + ## Metrics The sensor exposes Prometheus metrics at `http://localhost:2112/metrics` diff --git a/doc/polycli_p2p_sensor.md b/doc/polycli_p2p_sensor.md index 0561950e..e7c58fca 100644 --- a/doc/polycli_p2p_sensor.md +++ b/doc/polycli_p2p_sensor.md @@ -30,6 +30,34 @@ created automatically. The bootnodes may change, so refer to the [Polygon Knowledge Layer][bootnodes] if the sensor is not discovering peers. +## JSON-RPC Server + +The sensor runs a JSON-RPC server on port 8545 (configurable via `--rpc-port`) +that supports a subset of Ethereum JSON-RPC methods using cached data. + +### Supported Methods + +| Method | Description | +|--------|-------------| +| `eth_chainId` | Returns the chain ID | +| `eth_blockNumber` | Returns the current head block number | +| `eth_gasPrice` | Returns suggested gas price based on recent blocks | +| `eth_getBlockByHash` | Returns block by hash | +| `eth_getBlockByNumber` | Returns block by number (if cached) | +| `eth_getTransactionByHash` | Returns transaction by hash | +| `eth_getTransactionByBlockHashAndIndex` | Returns transaction at index in block | +| `eth_getBlockTransactionCountByHash` | Returns transaction count in block | +| `eth_getUncleCountByBlockHash` | Returns uncle count in block | +| `eth_sendRawTransaction` | Broadcasts signed transaction to peers | + +### Limitations + +Methods requiring state or receipts are not supported: +- `eth_getBalance`, `eth_getCode`, `eth_call`, `eth_estimateGas` +- `eth_getTransactionReceipt`, `eth_getLogs` + +Data is served from an LRU cache, so older blocks/transactions may not be available. + ## Metrics The sensor exposes Prometheus metrics at `http://localhost:2112/metrics` diff --git a/p2p/conns.go b/p2p/conns.go index 7690cd62..803c3e71 100644 --- a/p2p/conns.go +++ b/p2p/conns.go @@ -334,6 +334,11 @@ func (c *Conns) AddTxs(txs []*types.Transaction) []common.Hash { return hashes } +// GetTx retrieves a transaction from the shared cache and updates LRU ordering. +func (c *Conns) GetTx(hash common.Hash) (*types.Transaction, bool) { + return c.txs.Get(hash) +} + // PeekTxs retrieves multiple transactions from the shared cache without updating LRU ordering. // Uses a single read lock for better concurrency when LRU ordering is not needed. func (c *Conns) PeekTxs(hashes []common.Hash) []*types.Transaction { @@ -431,6 +436,19 @@ func (c *Conns) GetPeerName(peerID string) string { return "" } +// GetBlockByNumber iterates through the cache to find a block by its number. +// Returns the hash, block cache, and true if found; empty values and false otherwise. +func (c *Conns) GetBlockByNumber(number uint64) (common.Hash, BlockCache, bool) { + for _, hash := range c.blocks.Keys() { + if cache, ok := c.blocks.Peek(hash); ok && cache.Header != nil { + if cache.Header.Number.Uint64() == number { + return hash, cache, true + } + } + } + return common.Hash{}, BlockCache{}, false +} + // GetPeerVersion returns the negotiated eth protocol version for a specific peer. // Returns 0 if the peer is not found. func (c *Conns) GetPeerVersion(peerID string) uint { diff --git a/p2p/datastructures/lru.go b/p2p/datastructures/lru.go index f39edd1c..d0730c1e 100644 --- a/p2p/datastructures/lru.go +++ b/p2p/datastructures/lru.go @@ -257,6 +257,19 @@ func (c *LRU[K, V]) Remove(key K) (V, bool) { return zero, false } +// Keys returns all keys in the cache in LRU order (most recent first). +func (c *LRU[K, V]) Keys() []K { + c.mu.RLock() + defer c.mu.RUnlock() + + keys := make([]K, 0, c.list.Len()) + for elem := c.list.Front(); elem != nil; elem = elem.Next() { + e := elem.Value.(*entry[K, V]) + keys = append(keys, e.key) + } + return keys +} + // AddBatch adds multiple key-value pairs to the cache. // Uses a single write lock for all additions, reducing lock contention // compared to calling Add in a loop. Keys and values must have the same length. diff --git a/p2p/gasprice.go b/p2p/gasprice.go new file mode 100644 index 00000000..b7c34906 --- /dev/null +++ b/p2p/gasprice.go @@ -0,0 +1,256 @@ +package p2p + +import ( + "math/big" + "sort" + "sync" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/types" +) + +// Gas price oracle constants (matching Bor/geth defaults) +const ( + // gpoSampleNumber is the number of transactions to sample per block + gpoSampleNumber = 3 + // gpoCheckBlocks is the number of blocks to check for gas price estimation + gpoCheckBlocks = 20 + // gpoPercentile is the percentile to use for gas price estimation + gpoPercentile = 60 +) + +var ( + // gpoMaxPrice is the maximum gas price to suggest (500 gwei) + gpoMaxPrice = big.NewInt(500_000_000_000) + // gpoIgnorePrice is the minimum tip to consider (2 gwei, lower than Bor's 25 gwei for broader network compatibility) + gpoIgnorePrice = big.NewInt(2_000_000_000) + // gpoDefaultPrice is the default gas price when no data is available (1 gwei) + gpoDefaultPrice = big.NewInt(1_000_000_000) +) + +// GasPriceOracle estimates gas prices based on recent block data. +// It follows Bor/geth's gas price oracle approach. +type GasPriceOracle struct { + conns *Conns + + mu sync.RWMutex + lastHead common.Hash + lastTip *big.Int +} + +// NewGasPriceOracle creates a new gas price oracle that uses the given Conns for block data. +func NewGasPriceOracle(conns *Conns) *GasPriceOracle { + return &GasPriceOracle{ + conns: conns, + } +} + +// SuggestGasPrice estimates the gas price based on recent blocks. +// For EIP-1559 networks, this returns baseFee + suggestedTip. +// For legacy networks, this returns the 60th percentile of gas prices. +func (o *GasPriceOracle) SuggestGasPrice() *big.Int { + head := o.conns.HeadBlock() + if head.Block == nil { + return gpoDefaultPrice + } + + // For EIP-1559: return baseFee + suggested tip + if baseFee := head.Block.BaseFee(); baseFee != nil { + tip := o.SuggestGasTipCap() + if tip == nil { + tip = gpoDefaultPrice + } + return new(big.Int).Add(baseFee, tip) + } + + // Legacy: return percentile of gas prices + return o.suggestLegacyGasPrice() +} + +// suggestLegacyGasPrice estimates gas price for pre-EIP-1559 networks. +func (o *GasPriceOracle) suggestLegacyGasPrice() *big.Int { + keys := o.conns.blocks.Keys() + if len(keys) == 0 { + return gpoDefaultPrice + } + + if len(keys) > gpoCheckBlocks { + keys = keys[:gpoCheckBlocks] + } + + var prices []*big.Int + for _, hash := range keys { + cache, ok := o.conns.blocks.Peek(hash) + if !ok || cache.Body == nil { + continue + } + + for _, tx := range cache.Body.Transactions { + if price := tx.GasPrice(); price != nil && price.Sign() > 0 { + prices = append(prices, new(big.Int).Set(price)) + } + } + } + + if len(prices) == 0 { + return gpoDefaultPrice + } + + sort.Slice(prices, func(i, j int) bool { + return prices[i].Cmp(prices[j]) < 0 + }) + + price := prices[(len(prices)-1)*gpoPercentile/100] + if price.Cmp(gpoMaxPrice) > 0 { + return new(big.Int).Set(gpoMaxPrice) + } + return price +} + +// SuggestGasTipCap estimates a gas tip cap (priority fee) based on recent blocks. +// This implementation follows Bor/geth's gas price oracle approach: +// - Samples the lowest N tips from each of the last M blocks +// - Ignores tips below a threshold +// - Returns the configured percentile of collected tips +// - Caches results until head changes +func (o *GasPriceOracle) SuggestGasTipCap() *big.Int { + head := o.conns.HeadBlock() + if head.Block == nil { + return nil + } + headHash := head.Block.Hash() + + // Check cache first + o.mu.RLock() + if headHash == o.lastHead && o.lastTip != nil { + tip := new(big.Int).Set(o.lastTip) + o.mu.RUnlock() + return tip + } + lastTip := o.lastTip + o.mu.RUnlock() + + // Collect tips from recent blocks + keys := o.conns.blocks.Keys() + if len(keys) == 0 { + return lastTip + } + + // Limit to checkBlocks most recent + if len(keys) > gpoCheckBlocks { + keys = keys[:gpoCheckBlocks] + } + + var results []*big.Int + for _, hash := range keys { + tips := o.getBlockTips(hash, gpoSampleNumber, gpoIgnorePrice) + if len(tips) == 0 && lastTip != nil { + // Empty block or all tips below threshold, use last tip + tips = []*big.Int{lastTip} + } + results = append(results, tips...) + } + + if len(results) == 0 { + return lastTip + } + + // Sort and get percentile + sort.Slice(results, func(i, j int) bool { + return results[i].Cmp(results[j]) < 0 + }) + tip := results[(len(results)-1)*gpoPercentile/100] + + // Apply max price cap + if tip.Cmp(gpoMaxPrice) > 0 { + tip = new(big.Int).Set(gpoMaxPrice) + } + + // Cache result + o.mu.Lock() + o.lastHead = headHash + o.lastTip = tip + o.mu.Unlock() + + return new(big.Int).Set(tip) +} + +// getBlockTips returns the lowest N tips from a block that are above the ignore threshold. +// Transactions are sorted by effective tip ascending, and the first N valid tips are returned. +func (o *GasPriceOracle) getBlockTips(hash common.Hash, limit int, ignoreUnder *big.Int) []*big.Int { + cache, ok := o.conns.blocks.Peek(hash) + if !ok || cache.Body == nil || cache.Header == nil { + return nil + } + + baseFee := cache.Header.BaseFee + if baseFee == nil { + return nil // Pre-EIP-1559 block + } + + // Calculate tips for all transactions + var allTips []*big.Int + for _, tx := range cache.Body.Transactions { + tip := effectiveGasTip(tx, baseFee) + if tip != nil && tip.Sign() > 0 { + allTips = append(allTips, tip) + } + } + + if len(allTips) == 0 { + return nil + } + + // Sort by tip ascending (lowest first, like Bor) + sort.Slice(allTips, func(i, j int) bool { + return allTips[i].Cmp(allTips[j]) < 0 + }) + + // Collect tips above threshold, up to limit + var tips []*big.Int + for _, tip := range allTips { + if ignoreUnder != nil && tip.Cmp(ignoreUnder) < 0 { + continue + } + tips = append(tips, tip) + if len(tips) >= limit { + break + } + } + + return tips +} + +// effectiveGasTip returns the effective tip (priority fee) for a transaction. +// For EIP-1559 transactions: min(maxPriorityFeePerGas, maxFeePerGas - baseFee) +// For legacy transactions: gasPrice - baseFee (the implicit tip) +// Returns nil if the tip cannot be determined or is negative. +func effectiveGasTip(tx *types.Transaction, baseFee *big.Int) *big.Int { + switch tx.Type() { + case types.DynamicFeeTxType, types.BlobTxType: + tip := tx.GasTipCap() + if tip == nil { + return nil + } + // Effective tip is min(maxPriorityFeePerGas, maxFeePerGas - baseFee) + if tx.GasFeeCap() != nil { + effectiveTip := new(big.Int).Sub(tx.GasFeeCap(), baseFee) + if effectiveTip.Cmp(tip) < 0 { + tip = effectiveTip + } + } + if tip.Sign() <= 0 { + return nil + } + return new(big.Int).Set(tip) + default: + // Legacy/AccessList transactions: tip is gasPrice - baseFee + if price := tx.GasPrice(); price != nil { + tip := new(big.Int).Sub(price, baseFee) + if tip.Sign() > 0 { + return tip + } + } + return nil + } +} diff --git a/p2p/log.go b/p2p/log.go index bfb26b8e..4ba2caba 100644 --- a/p2p/log.go +++ b/p2p/log.go @@ -89,6 +89,7 @@ func (c *MessageCount) IsEmpty() bool { return sum( c.BlockHeaders, c.BlockBodies, + c.Blocks, c.BlockHashes, c.BlockHeaderRequests, c.BlockBodiesRequests, @@ -98,16 +99,19 @@ func (c *MessageCount) IsEmpty() bool { c.Pings, c.Errors, c.Disconnects, + c.NewWitness, + c.NewWitnessHashes, + c.GetWitnessRequest, + c.Witness, ) == 0 } func sum(ints ...int64) int64 { - var sum int64 = 0 + var total int64 for _, i := range ints { - sum += i + total += i } - - return sum + return total } // IncrementByName increments the appropriate field based on message name. diff --git a/p2p/nodeset.go b/p2p/nodeset.go index 9572f853..aa25870f 100644 --- a/p2p/nodeset.go +++ b/p2p/nodeset.go @@ -97,7 +97,7 @@ func WriteURLs(file string, ns NodeSet) error { } } - urls := []string{} + var urls []string for url := range m { urls = append(urls, url) } @@ -130,7 +130,7 @@ func WritePeers(file string, urls []string) error { } func WriteDNSTreeNodes(file string, tree *dnsdisc.Tree) error { - urls := []string{} + var urls []string for _, node := range tree.Nodes() { urls = append(urls, node.URLv4()) }