refactor(extractors/solana): in-memory CAR file parsing#1940
refactor(extractors/solana): in-memory CAR file parsing#1940
Conversation
c9b28db to
3e42aec
Compare
There was a problem hiding this comment.
Code Review: In-memory CAR file parsing refactor
Overall this is a well-structured refactoring that cleanly removes the file-based CAR download/mmap approach in favor of streaming HTTP reads. The removal of wait_for_cleanup from the BlockStreamer trait is a nice simplification across the codebase. The CarReader implementing AsyncRead with range-request resume and exponential backoff is a solid design.
Key findings (by priority):
Potential Bug:
- HTTP 404 infinite retry — The
CarReaderretries all non-success HTTP statuses indefinitely, but 404 means the epoch doesn't exist and should not be retried. TheCarReaderError::Httpvariant exists but is never constructed by theCarReader, making the downstream 404 handling instream()dead code. See inline comment for suggested fix.
Robustness:
- Fragile
unreachable!— The catch-all match arm in the IO error downcasting relies on internal knowledge of whatCarReadersurfaces. IfCarReaderis later modified (e.g. to fix the 404 issue), this will panic in production. - No upper bound on reconnect attempts — Persistent non-transient errors (403, misconfigured URL) will retry at 30s intervals forever.
- Metrics lost on reconnect — Partial byte counts from interrupted connections are not flushed before the
MonitoredByteStreamis replaced.
Cleanup:
- Dead dependency
memmap2— Still inCargo.tomlbut no longer used after the mmap removal. fs-errmay be demotable — Only used in the example, which can use the dev-dependency.- Typo — "indefinetly" in the unreachable message.
3e42aec to
0fddf29
Compare
There was a problem hiding this comment.
Code Review: In-Memory CAR File Parsing
Overall this is a well-structured refactor that cleanly removes disk I/O from the CAR download pipeline. The CarReader state machine (Connect/Stream/Backoff) with AsyncRead implementation is a solid approach for streaming HTTP data with automatic retry. Good removal of the wait_for_cleanup trait method from BlockStreamer — it simplifies the interface.
Key findings (by priority):
Potential Bugs / Robustness:
- Unbounded retry in
CarReader— The reader retries indefinitely on non-404 HTTP errors with no maximum attempt count. Persistent server issues could cause the stream to hang. - Unbounded
prev_blockhashlookup loop — No upper bound on slot skipping when searching for the previous blockhash via RPC. expectcalls on external data — Multipleexpectcalls inread_next_slotoperate on data from CAR files (entries, blockhash length, blocktime conversion). Malformed CAR data would cause panics instead of returning errors.expecton RPC-returned blockhash — Lines 96-100 useexpecton RPC data that could be malformed.
Performance:
5. Overflow buffer growth — Vec<u8> overflow buffer can accumulate retained capacity over long downloads. Consider BytesMut or periodic shrink_to_fit.
Metrics accuracy:
6. Per-segment vs total download duration — ByteStreamMonitor is recreated on each reconnect, so record_car_download reflects segment time, not total download time.
Dead code:
7. fs-err dependency — Appears unused after disk I/O removal.
8. bytes dependency — May be unnecessary if only used via reqwest's re-export.
Minor:
9. compute_backoff u64 to u32 cast — Currently safe due to .min(30) clamp, but fragile to future edits.
| fn compute_backoff(base: Duration, cap: Duration, attempt: u32) -> Duration { | ||
| // attempt=1 => base, attempt=2 => 2*base, attempt=3 => 4*base, ... | ||
| let factor = 1u64 << attempt.saturating_sub(1).min(30); | ||
| let backoff = base.saturating_mul(factor as u32); |
There was a problem hiding this comment.
The compute_backoff function has a subtle truncation issue. factor is computed as u64 (which can be up to 2^30), but then cast to u32 via factor as u32. When attempt >= 32 (after saturating_sub(1) gives 31), factor is 2^30 = 1_073_741_824, which fits in u32. However, the as u32 cast is a silent truncation for any value above u32::MAX.
While the .min(30) clamp on the shift prevents factor itself from overflowing (max is 2^30), the intent would be clearer and safer by computing entirely in u32 or using try_into(). As-is, a future edit that changes .min(30) to a larger value would silently introduce a bug.
Consider:
let factor = 1u32.checked_shl(attempt.saturating_sub(1).min(30)).unwrap_or(u32::MAX);
let backoff = base.saturating_mul(factor);| loop { | ||
| match &mut this.state { | ||
| ReaderState::Connect(fut) => match fut.as_mut().poll(cx) { | ||
| std::task::Poll::Ready(Ok(resp)) => { | ||
| let status = resp.status(); | ||
| // Handle error codes. | ||
| match status { | ||
| reqwest::StatusCode::NOT_FOUND => { | ||
| let err = std::io::Error::other(CarReaderError::Http(status)); | ||
| return std::task::Poll::Ready(Err(err)); | ||
| } | ||
| status if !status.is_success() => { | ||
| this.schedule_backoff(CarReaderError::Http(status)); | ||
| continue; | ||
| } | ||
| _ => {} | ||
| } | ||
|
|
||
| // Handle partial content. | ||
| if this.bytes_read_total > 0 | ||
| && status != reqwest::StatusCode::PARTIAL_CONTENT | ||
| { | ||
| let e = std::io::Error::other(CarReaderError::RangeRequestUnsupported); | ||
| return std::task::Poll::Ready(Err(e)); | ||
| } | ||
|
|
||
| // Initial connection succeeded, start reading the byte stream. | ||
| this.reconnect_attempt = 0; | ||
| let stream = MonitoredByteStream::new( | ||
| resp.bytes_stream(), | ||
| this.epoch, | ||
| this.metrics.clone(), | ||
| ); | ||
| this.state = ReaderState::Stream(stream); | ||
| } | ||
| std::task::Poll::Ready(Err(e)) => { | ||
| this.schedule_backoff(CarReaderError::Reqwest(e)); | ||
| } | ||
| std::task::Poll::Pending => return std::task::Poll::Pending, | ||
| }, | ||
| ReaderState::Stream(stream) => match stream.poll_next_unpin(cx) { | ||
| // Reached EOF. | ||
| std::task::Poll::Ready(None) => { | ||
| return std::task::Poll::Ready(Ok(())); | ||
| } | ||
| // Read some bytes, account for possible overflow. | ||
| std::task::Poll::Ready(Some(Ok(bytes))) => { | ||
| let n_read = bytes.len(); | ||
| let to_copy = n_read.min(buf.remaining()); | ||
|
|
||
| buf.put_slice(&bytes[..to_copy]); | ||
| this.overflow.extend_from_slice(&bytes[to_copy..]); | ||
| this.bytes_read_total += n_read as u64; | ||
|
|
||
| return std::task::Poll::Ready(Ok(())); | ||
| } | ||
| std::task::Poll::Ready(Some(Err(e))) => { | ||
| this.schedule_backoff(CarReaderError::Reqwest(e)); | ||
| } | ||
| std::task::Poll::Pending => return std::task::Poll::Pending, | ||
| }, | ||
| ReaderState::Backoff(fut) => match fut.poll_unpin(cx) { | ||
| std::task::Poll::Ready(()) => { | ||
| let fut = get_with_range_header( | ||
| this.reqwest.clone(), | ||
| this.url.clone(), | ||
| this.bytes_read_total, | ||
| ); | ||
| this.state = ReaderState::Connect(Box::pin(fut)); | ||
| } | ||
| std::task::Poll::Pending => return std::task::Poll::Pending, | ||
| }, | ||
| } | ||
| } |
There was a problem hiding this comment.
When schedule_backoff is called from within the Connect or Stream match arms, the loop continues back to match &mut this.state which will now be Backoff. This is correct behavior but there's a concern: schedule_backoff is called on every non-success HTTP status (except 404) without any limit on retry attempts. The reconnect_attempt counter is u32 with saturating_add, so it won't overflow, but the CarReader will retry indefinitely for persistent non-404 HTTP errors (e.g., 503, 429).
Is this intentional? The outer BlockStreamerWithRetry handles retries for Recoverable errors, but since CarReader never surfaces non-404 HTTP errors as Err (it just keeps retrying internally), persistent server issues could cause the reader to hang indefinitely. Consider adding a maximum retry count that surfaces a CarReaderError::Http after exhausting retries, allowing the higher-level retry machinery to handle it.
| let stream = MonitoredByteStream::new( | ||
| resp.bytes_stream(), | ||
| this.epoch, | ||
| this.metrics.clone(), | ||
| ); |
There was a problem hiding this comment.
On reconnect, a new MonitoredByteStream is created with a fresh ByteStreamMonitor (new started_at, zeroed bytes_read_chunk). This means each reconnect records a separate record_car_download metric call when the stream ends, and the duration only reflects the time since the last reconnect, not the total download time.
Is this the intended behavior? If you want the metrics to reflect the total time spent downloading the full CAR file (including retries), the monitor's started_at should persist across reconnections. If per-segment metrics are intended, a comment clarifying this would help.
| // Drain overflow first. | ||
| if !this.overflow.is_empty() { | ||
| let to_copy = this.overflow.len().min(buf.remaining()); | ||
| buf.put_slice(&this.overflow[..to_copy]); | ||
| this.overflow.drain(..to_copy); | ||
| return std::task::Poll::Ready(Ok(())); | ||
| } |
There was a problem hiding this comment.
The overflow Vec<u8> can grow without bound. If the HTTP response yields very large chunks (some CDNs send multi-megabyte chunks) and the caller provides a small ReadBuf, the overflow buffer accumulates the difference. While drain(..to_copy) removes consumed bytes, it doesn't shrink the allocation.
For long-lived downloads of multi-GB CAR files, this could lead to memory fragmentation or unnecessary retained capacity. Consider periodically calling self.overflow.shrink_to_fit() (e.g., after draining to empty), or using a VecDeque<u8> / bytes::BytesMut which handles this pattern more efficiently.
| @@ -274,7 +98,7 @@ pub fn stream( | |||
| .map(TryInto::try_into) | |||
| .expect("invalid base-58 string") | |||
| .expect("blockhash is 32 bytes"); | |||
| }, | |||
| } | |||
| Err(e) if rpc_client::is_block_missing_err(&e) => slot += 1, | |||
| Err(e) => { | |||
| yield Err(Of1StreamError::RpcClient(e)); | |||
| @@ -284,67 +108,51 @@ pub fn stream( | |||
| } | |||
| }; | |||
There was a problem hiding this comment.
The prev_blockhash lookup loop increments slot without an upper bound. If the RPC consistently returns "block missing" for a long range of skipped slots (or due to a bug), this loop runs indefinitely. Consider adding a maximum number of attempts or an upper bound check (e.g., slot < start + MAX_SKIP_SLOTS) to prevent unbounded looping.
const MAX_SKIP: u64 = 1000;
// ...
Err(e) if rpc_client::is_block_missing_err(&e) => {
slot += 1;
if slot > start + MAX_SKIP {
yield Err(Of1StreamError::RpcClient(e));
return;
}
}| struct CarReader { | ||
| url: String, | ||
| epoch: solana_clock::Epoch, | ||
| reqwest: Arc<reqwest::Client>, | ||
| state: ReaderState, | ||
| overflow: Vec<u8>, | ||
| bytes_read_total: u64, | ||
|
|
||
| // Backoff control | ||
| reconnect_attempt: u32, | ||
| max_backoff: Duration, | ||
| base_backoff: Duration, | ||
|
|
||
| metrics: Option<MetricsContext>, | ||
| } |
There was a problem hiding this comment.
CarReader holds significant state and responsibility: HTTP connection management with a 3-state machine, retry/backoff logic, overflow buffering, metrics tracking, and URL construction. Per the Single Responsibility Principle, the retry/backoff logic (state transitions between Connect/Stream/Backoff, reconnect counting, backoff computation) could be separated from the byte-level AsyncRead implementation.
This isn't blocking, but worth considering for testability — currently the retry behavior can only be tested by standing up an HTTP server, whereas a separated retry policy could be unit-tested independently.
| } | ||
|
|
||
| type ConnectFuture = Pin<Box<dyn Future<Output = reqwest::Result<reqwest::Response>> + Send>>; | ||
| type ByteStream = Pin<Box<dyn Stream<Item = reqwest::Result<bytes::Bytes>> + Send>>; |
There was a problem hiding this comment.
Nit: The bytes dependency was added to Cargo.toml but bytes::Bytes is only used here indirectly via reqwest::Result<bytes::Bytes> in the ByteStream type alias (line 419). Since reqwest re-exports bytes::Bytes in its public API, is the explicit bytes dependency in Cargo.toml still needed? If it's only used through reqwest's re-export, the direct dependency could be removed to reduce the dependency surface.
| Of1StreamError::ChannelClosed(_) | ||
| | Of1StreamError::FileOpen(_) | ||
| | Of1StreamError::Mmap(_) | ||
| Of1StreamError::FileStream(of1_client::CarReaderError::Io(_)) |
There was a problem hiding this comment.
CarReaderError::Io is mapped as Fatal, but IO errors during CAR streaming (e.g., connection reset by peer, unexpected EOF) are arguably transient/recoverable. The CarReader handles most transient IO errors internally via retry, but if an IO error somehow escapes the reader (e.g., from the node parser layer wrapping a non-CarReaderError IO error), classifying it as fatal means it won't be retried by BlockStreamerWithRetry.
Is this intentional? The comment at of1_client.rs:134-136 says "There should be no retry logic here because the CarReader should handle all retry logic internally", which makes sense, but IO errors from the node parser (not the CarReader) could still be transient. Consider whether CarReaderError::Io should be Recoverable to be safe.
| datasets-common = { path = "../../core/datasets-common" } | ||
| datasets-raw = { path = "../../core/datasets-raw" } | ||
| futures.workspace = true | ||
| fs-err.workspace = true |
There was a problem hiding this comment.
fs-err is still listed as a dependency (line 22) but no source file in crates/extractors/solana/src/ imports it. Since this PR removes the disk-based CAR file writing, fs-err may now be unused. If so, it should be removed to keep the dependency surface clean.
| tracing::warn!( | ||
| epoch = self.epoch, | ||
| bytes_read = self.bytes_read_total, | ||
| attempt = self.reconnect_attempt, | ||
| error = ?err, | ||
| error_source = monitoring::logging::error_source(&err), | ||
| backoff = %backoff_str, | ||
| "CAR reader failed; scheduled retry" | ||
| ); |
There was a problem hiding this comment.
Three logging guideline violations here:
-
error = ?errshould beerror = %err— Project guidelines require Display formatting (%) for the top-level error field, not Debug (?).error_sourcealready captures the chain viamonitoring::logging::error_source(). -
Field ordering —
erroranderror_sourcemust be the last fields before the message string.backoffshould be moved before them. -
Semicolon in message — Project guidelines prohibit punctuation in log messages.
"CAR reader failed; scheduled retry"should use a comma or be rephrased.
Suggested fix:
tracing::warn!(
epoch = self.epoch,
bytes_read = self.bytes_read_total,
attempt = self.reconnect_attempt,
backoff = %backoff_str,
error = %err,
error_source = monitoring::logging::error_source(&err),
"CAR reader failed, scheduled retry"
);| @@ -176,6 +162,12 @@ impl From<Of1StreamError> for BlockStreamError { | |||
| | Of1StreamError::DecodeField { .. } | |||
| | Of1StreamError::NodeParse(_) | |||
| | Of1StreamError::DataframeReassembly(_) => BlockStreamError::Fatal(value.into()), | |||
|
|
|||
| Of1StreamError::RpcClient(_) | |||
| | Of1StreamError::FileStream(of1_client::CarReaderError::Http(_)) | |||
| | Of1StreamError::FileStream(of1_client::CarReaderError::Reqwest(_)) => { | |||
| BlockStreamError::Recoverable(value.into()) | |||
| } | |||
| } | |||
| } | |||
There was a problem hiding this comment.
Per the error handling guidelines ("Avoid #[from] Attribute and From Implementations"), this manual From implementation enables implicit conversions via ? and .into() at call sites (e.g., client.rs:304 uses .map_err(Into::into)), making error flow less visible.
The guideline recommends explicit .map_err() at each call site instead. That said, the exhaustive match here (no catch-all) with deliberate recoverable/fatal classification is valuable and well-documented. If you keep the From impl, consider adding a comment explaining why it's preferred here (centralized error classification).
Parse CAR data in memory as it is downloaded, never write CAR files to disk. This reduces disk size requirements and completely removes the time wasted on writing to disk when the data could already be getting processed instead.
Closes #1630.
Obsoletes and therefore closes #1408.