Skip to content

refactor(extractors/solana): in-memory CAR file parsing#1940

Open
sistemd wants to merge 5 commits intomainfrom
sistemd/in-memory-car-file-parsing
Open

refactor(extractors/solana): in-memory CAR file parsing#1940
sistemd wants to merge 5 commits intomainfrom
sistemd/in-memory-car-file-parsing

Conversation

@sistemd
Copy link
Contributor

@sistemd sistemd commented Mar 8, 2026

Parse CAR data in memory as it is downloaded, never write CAR files to disk. This reduces disk size requirements and completely removes the time wasted on writing to disk when the data could already be getting processed instead.

Closes #1630.
Obsoletes and therefore closes #1408.

@sistemd sistemd force-pushed the sistemd/in-memory-car-file-parsing branch 2 times, most recently from c9b28db to 3e42aec Compare March 8, 2026 16:58
Copy link

@claude claude bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review: In-memory CAR file parsing refactor

Overall this is a well-structured refactoring that cleanly removes the file-based CAR download/mmap approach in favor of streaming HTTP reads. The removal of wait_for_cleanup from the BlockStreamer trait is a nice simplification across the codebase. The CarReader implementing AsyncRead with range-request resume and exponential backoff is a solid design.

Key findings (by priority):

Potential Bug:

  • HTTP 404 infinite retry — The CarReader retries all non-success HTTP statuses indefinitely, but 404 means the epoch doesn't exist and should not be retried. The CarReaderError::Http variant exists but is never constructed by the CarReader, making the downstream 404 handling in stream() dead code. See inline comment for suggested fix.

Robustness:

  • Fragile unreachable! — The catch-all match arm in the IO error downcasting relies on internal knowledge of what CarReader surfaces. If CarReader is later modified (e.g. to fix the 404 issue), this will panic in production.
  • No upper bound on reconnect attempts — Persistent non-transient errors (403, misconfigured URL) will retry at 30s intervals forever.
  • Metrics lost on reconnect — Partial byte counts from interrupted connections are not flushed before the MonitoredByteStream is replaced.

Cleanup:

  • Dead dependency memmap2 — Still in Cargo.toml but no longer used after the mmap removal.
  • fs-err may be demotable — Only used in the example, which can use the dev-dependency.
  • Typo — "indefinetly" in the unreachable message.

Copy link

@claude claude bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Submitting stale pending review to clear state.

Copy link

@claude claude bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Additional finding from guideline review (logging).

@sistemd sistemd force-pushed the sistemd/in-memory-car-file-parsing branch from 3e42aec to 0fddf29 Compare March 9, 2026 12:14
Copy link

@claude claude bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review: In-Memory CAR File Parsing

Overall this is a well-structured refactor that cleanly removes disk I/O from the CAR download pipeline. The CarReader state machine (Connect/Stream/Backoff) with AsyncRead implementation is a solid approach for streaming HTTP data with automatic retry. Good removal of the wait_for_cleanup trait method from BlockStreamer — it simplifies the interface.

Key findings (by priority):

Potential Bugs / Robustness:

  1. Unbounded retry in CarReader — The reader retries indefinitely on non-404 HTTP errors with no maximum attempt count. Persistent server issues could cause the stream to hang.
  2. Unbounded prev_blockhash lookup loop — No upper bound on slot skipping when searching for the previous blockhash via RPC.
  3. expect calls on external data — Multiple expect calls in read_next_slot operate on data from CAR files (entries, blockhash length, blocktime conversion). Malformed CAR data would cause panics instead of returning errors.
  4. expect on RPC-returned blockhash — Lines 96-100 use expect on RPC data that could be malformed.

Performance:
5. Overflow buffer growthVec<u8> overflow buffer can accumulate retained capacity over long downloads. Consider BytesMut or periodic shrink_to_fit.

Metrics accuracy:
6. Per-segment vs total download durationByteStreamMonitor is recreated on each reconnect, so record_car_download reflects segment time, not total download time.

Dead code:
7. fs-err dependency — Appears unused after disk I/O removal.
8. bytes dependency — May be unnecessary if only used via reqwest's re-export.

Minor:
9. compute_backoff u64 to u32 cast — Currently safe due to .min(30) clamp, but fragile to future edits.

fn compute_backoff(base: Duration, cap: Duration, attempt: u32) -> Duration {
// attempt=1 => base, attempt=2 => 2*base, attempt=3 => 4*base, ...
let factor = 1u64 << attempt.saturating_sub(1).min(30);
let backoff = base.saturating_mul(factor as u32);
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The compute_backoff function has a subtle truncation issue. factor is computed as u64 (which can be up to 2^30), but then cast to u32 via factor as u32. When attempt >= 32 (after saturating_sub(1) gives 31), factor is 2^30 = 1_073_741_824, which fits in u32. However, the as u32 cast is a silent truncation for any value above u32::MAX.

While the .min(30) clamp on the shift prevents factor itself from overflowing (max is 2^30), the intent would be clearer and safer by computing entirely in u32 or using try_into(). As-is, a future edit that changes .min(30) to a larger value would silently introduce a bug.

Consider:

let factor = 1u32.checked_shl(attempt.saturating_sub(1).min(30)).unwrap_or(u32::MAX);
let backoff = base.saturating_mul(factor);

Comment on lines +619 to +692
loop {
match &mut this.state {
ReaderState::Connect(fut) => match fut.as_mut().poll(cx) {
std::task::Poll::Ready(Ok(resp)) => {
let status = resp.status();
// Handle error codes.
match status {
reqwest::StatusCode::NOT_FOUND => {
let err = std::io::Error::other(CarReaderError::Http(status));
return std::task::Poll::Ready(Err(err));
}
status if !status.is_success() => {
this.schedule_backoff(CarReaderError::Http(status));
continue;
}
_ => {}
}

// Handle partial content.
if this.bytes_read_total > 0
&& status != reqwest::StatusCode::PARTIAL_CONTENT
{
let e = std::io::Error::other(CarReaderError::RangeRequestUnsupported);
return std::task::Poll::Ready(Err(e));
}

// Initial connection succeeded, start reading the byte stream.
this.reconnect_attempt = 0;
let stream = MonitoredByteStream::new(
resp.bytes_stream(),
this.epoch,
this.metrics.clone(),
);
this.state = ReaderState::Stream(stream);
}
std::task::Poll::Ready(Err(e)) => {
this.schedule_backoff(CarReaderError::Reqwest(e));
}
std::task::Poll::Pending => return std::task::Poll::Pending,
},
ReaderState::Stream(stream) => match stream.poll_next_unpin(cx) {
// Reached EOF.
std::task::Poll::Ready(None) => {
return std::task::Poll::Ready(Ok(()));
}
// Read some bytes, account for possible overflow.
std::task::Poll::Ready(Some(Ok(bytes))) => {
let n_read = bytes.len();
let to_copy = n_read.min(buf.remaining());

buf.put_slice(&bytes[..to_copy]);
this.overflow.extend_from_slice(&bytes[to_copy..]);
this.bytes_read_total += n_read as u64;

return std::task::Poll::Ready(Ok(()));
}
std::task::Poll::Ready(Some(Err(e))) => {
this.schedule_backoff(CarReaderError::Reqwest(e));
}
std::task::Poll::Pending => return std::task::Poll::Pending,
},
ReaderState::Backoff(fut) => match fut.poll_unpin(cx) {
std::task::Poll::Ready(()) => {
let fut = get_with_range_header(
this.reqwest.clone(),
this.url.clone(),
this.bytes_read_total,
);
this.state = ReaderState::Connect(Box::pin(fut));
}
std::task::Poll::Pending => return std::task::Poll::Pending,
},
}
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When schedule_backoff is called from within the Connect or Stream match arms, the loop continues back to match &mut this.state which will now be Backoff. This is correct behavior but there's a concern: schedule_backoff is called on every non-success HTTP status (except 404) without any limit on retry attempts. The reconnect_attempt counter is u32 with saturating_add, so it won't overflow, but the CarReader will retry indefinitely for persistent non-404 HTTP errors (e.g., 503, 429).

Is this intentional? The outer BlockStreamerWithRetry handles retries for Recoverable errors, but since CarReader never surfaces non-404 HTTP errors as Err (it just keeps retrying internally), persistent server issues could cause the reader to hang indefinitely. Consider adding a maximum retry count that surfaces a CarReaderError::Http after exhausting retries, allowing the higher-level retry machinery to handle it.

Comment on lines +647 to +651
let stream = MonitoredByteStream::new(
resp.bytes_stream(),
this.epoch,
this.metrics.clone(),
);
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On reconnect, a new MonitoredByteStream is created with a fresh ByteStreamMonitor (new started_at, zeroed bytes_read_chunk). This means each reconnect records a separate record_car_download metric call when the stream ends, and the duration only reflects the time since the last reconnect, not the total download time.

Is this the intended behavior? If you want the metrics to reflect the total time spent downloading the full CAR file (including retries), the monitor's started_at should persist across reconnections. If per-segment metrics are intended, a comment clarifying this would help.

Comment on lines +610 to +616
// Drain overflow first.
if !this.overflow.is_empty() {
let to_copy = this.overflow.len().min(buf.remaining());
buf.put_slice(&this.overflow[..to_copy]);
this.overflow.drain(..to_copy);
return std::task::Poll::Ready(Ok(()));
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The overflow Vec<u8> can grow without bound. If the HTTP response yields very large chunks (some CDNs send multi-megabyte chunks) and the caller provides a small ReadBuf, the overflow buffer accumulates the difference. While drain(..to_copy) removes consumed bytes, it doesn't shrink the allocation.

For long-lived downloads of multi-GB CAR files, this could lead to memory fragmentation or unnecessary retained capacity. Consider periodically calling self.overflow.shrink_to_fit() (e.g., after draining to empty), or using a VecDeque<u8> / bytes::BytesMut which handles this pattern more efficiently.

Comment on lines 87 to 109
@@ -274,7 +98,7 @@ pub fn stream(
.map(TryInto::try_into)
.expect("invalid base-58 string")
.expect("blockhash is 32 bytes");
},
}
Err(e) if rpc_client::is_block_missing_err(&e) => slot += 1,
Err(e) => {
yield Err(Of1StreamError::RpcClient(e));
@@ -284,67 +108,51 @@ pub fn stream(
}
};
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The prev_blockhash lookup loop increments slot without an upper bound. If the RPC consistently returns "block missing" for a long range of skipped slots (or due to a bug), this loop runs indefinitely. Consider adding a maximum number of attempts or an upper bound check (e.g., slot < start + MAX_SKIP_SLOTS) to prevent unbounded looping.

const MAX_SKIP: u64 = 1000;
// ...
Err(e) if rpc_client::is_block_missing_err(&e) => {
    slot += 1;
    if slot > start + MAX_SKIP {
        yield Err(Of1StreamError::RpcClient(e));
        return;
    }
}

Comment on lines +543 to +557
struct CarReader {
url: String,
epoch: solana_clock::Epoch,
reqwest: Arc<reqwest::Client>,
state: ReaderState,
overflow: Vec<u8>,
bytes_read_total: u64,

// Backoff control
reconnect_attempt: u32,
max_backoff: Duration,
base_backoff: Duration,

metrics: Option<MetricsContext>,
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CarReader holds significant state and responsibility: HTTP connection management with a 3-state machine, retry/backoff logic, overflow buffering, metrics tracking, and URL construction. Per the Single Responsibility Principle, the retry/backoff logic (state transitions between Connect/Stream/Backoff, reconnect counting, backoff computation) could be separated from the byte-level AsyncRead implementation.

This isn't blocking, but worth considering for testability — currently the retry behavior can only be tested by standing up an HTTP server, whereas a separated retry policy could be unit-tested independently.

}

type ConnectFuture = Pin<Box<dyn Future<Output = reqwest::Result<reqwest::Response>> + Send>>;
type ByteStream = Pin<Box<dyn Stream<Item = reqwest::Result<bytes::Bytes>> + Send>>;
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: The bytes dependency was added to Cargo.toml but bytes::Bytes is only used here indirectly via reqwest::Result<bytes::Bytes> in the ByteStream type alias (line 419). Since reqwest re-exports bytes::Bytes in its public API, is the explicit bytes dependency in Cargo.toml still needed? If it's only used through reqwest's re-export, the direct dependency could be removed to reduce the dependency surface.

Of1StreamError::ChannelClosed(_)
| Of1StreamError::FileOpen(_)
| Of1StreamError::Mmap(_)
Of1StreamError::FileStream(of1_client::CarReaderError::Io(_))
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CarReaderError::Io is mapped as Fatal, but IO errors during CAR streaming (e.g., connection reset by peer, unexpected EOF) are arguably transient/recoverable. The CarReader handles most transient IO errors internally via retry, but if an IO error somehow escapes the reader (e.g., from the node parser layer wrapping a non-CarReaderError IO error), classifying it as fatal means it won't be retried by BlockStreamerWithRetry.

Is this intentional? The comment at of1_client.rs:134-136 says "There should be no retry logic here because the CarReader should handle all retry logic internally", which makes sense, but IO errors from the node parser (not the CarReader) could still be transient. Consider whether CarReaderError::Io should be Recoverable to be safe.

datasets-common = { path = "../../core/datasets-common" }
datasets-raw = { path = "../../core/datasets-raw" }
futures.workspace = true
fs-err.workspace = true
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fs-err is still listed as a dependency (line 22) but no source file in crates/extractors/solana/src/ imports it. Since this PR removes the disk-based CAR file writing, fs-err may now be unused. If so, it should be removed to keep the dependency surface clean.

Copy link

@claude claude bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Follow-up: logging guideline violations found during automated pattern review.

Comment on lines +587 to +595
tracing::warn!(
epoch = self.epoch,
bytes_read = self.bytes_read_total,
attempt = self.reconnect_attempt,
error = ?err,
error_source = monitoring::logging::error_source(&err),
backoff = %backoff_str,
"CAR reader failed; scheduled retry"
);
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Three logging guideline violations here:

  1. error = ?err should be error = %err — Project guidelines require Display formatting (%) for the top-level error field, not Debug (?). error_source already captures the chain via monitoring::logging::error_source().

  2. Field orderingerror and error_source must be the last fields before the message string. backoff should be moved before them.

  3. Semicolon in message — Project guidelines prohibit punctuation in log messages. "CAR reader failed; scheduled retry" should use a comma or be rephrased.

Suggested fix:

tracing::warn!(
    epoch = self.epoch,
    bytes_read = self.bytes_read_total,
    attempt = self.reconnect_attempt,
    backoff = %backoff_str,
    error = %err,
    error_source = monitoring::logging::error_source(&err),
    "CAR reader failed, scheduled retry"
);

Copy link

@claude claude bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Follow-up: error handling guideline violations found during automated pattern review.

Comment on lines 147 to 172
@@ -176,6 +162,12 @@ impl From<Of1StreamError> for BlockStreamError {
| Of1StreamError::DecodeField { .. }
| Of1StreamError::NodeParse(_)
| Of1StreamError::DataframeReassembly(_) => BlockStreamError::Fatal(value.into()),

Of1StreamError::RpcClient(_)
| Of1StreamError::FileStream(of1_client::CarReaderError::Http(_))
| Of1StreamError::FileStream(of1_client::CarReaderError::Reqwest(_)) => {
BlockStreamError::Recoverable(value.into())
}
}
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Per the error handling guidelines ("Avoid #[from] Attribute and From Implementations"), this manual From implementation enables implicit conversions via ? and .into() at call sites (e.g., client.rs:304 uses .map_err(Into::into)), making error flow less visible.

The guideline recommends explicit .map_err() at each call site instead. That said, the exhaustive match here (no catch-all) with deliberate recoverable/fatal classification is valuable and well-documented. If you keep the From impl, consider adding a comment explaining why it's preferred here (centralized error classification).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Solana performance improvement Solana: use ObjectStore instead of the file system for Old Faithful files

2 participants