Add ContextValue abstraction for ScopedValue support on JDK 25+#20702
Add ContextValue abstraction for ScopedValue support on JDK 25+#20702
Conversation
|
🌟 Thank you for your contribution to the Apache Camel project! 🌟 🤖 CI automation will test this PR automatically. 🐫 Apache Camel Committers, please review the following items:
|
|
I think its best to wait this kind of changes in core until after 4.18 LTS next month. Then we have an open path to do more bigger changes leading up for SB v4, Jackson 3, JUnit 6, Java 25 and other tasks that are more impactful. Camel 4.18 LTS is expected to be similar to 4.14.x but as the last release supporting SB 3 that end users can use as a stable place. |
1cc11ea to
f52c900
Compare
e1a75a4 to
5b8a237
Compare
f1c50b3 to
5eb1377
Compare
This commit introduces a new ContextValue abstraction that provides a unified API for thread-scoped data sharing, with implementations using either ThreadLocal (JDK 17+) or ScopedValue (JDK 25+ with virtual threads). Key changes: - Add ContextValue interface in camel-util with factory methods for creating context values and executing operations within scoped contexts - Add ContextValueFactory with ThreadLocal implementation for base JDK - Add Java 25 multi-release JAR variant using ScopedValue when available - Deprecate NamedThreadLocal in favor of ContextValue.newThreadLocal() - Add new scoped API methods to ExtendedCamelContext: - setupRoutes(Runnable) and setupRoutes(Callable) - createRoute(String, Runnable) and createRoute(String, Callable) - createProcessor(String, Runnable) and createProcessor(String, Callable) - Deprecate the old boolean/void signaling methods (setupRoutes(boolean), createRoute(String), createProcessor(String)) - Update DefaultCamelContextExtension to use ContextValue.where() for scoped execution, enabling proper ScopedValue support on virtual threads - Update DefaultReactiveExecutor to use ContextValue instead of NamedThreadLocal - Simplify Worker class by removing cached stats field The ContextValue abstraction allows Camel to leverage ScopedValue on JDK 25+ when virtual threads are enabled, providing better performance characteristics for virtual thread workloads while maintaining backward compatibility with ThreadLocal on older JDK versions. Documentation added to ContextValue explaining that ThreadLocal variants should hold lightweight objects to avoid memory leaks with pooled threads.
Add two disabled load test classes that can be run manually to compare
performance between platform threads and virtual threads:
- VirtualThreadsLoadTest: Uses SEDA with concurrent consumers to test
throughput with simulated I/O delays
- VirtualThreadsWithThreadsDSLLoadTest: Uses threads() DSL to exercise
the ContextValue/ScopedValue code paths
Tests are disabled by default and configurable via system properties:
- loadtest.messages: Number of messages to process (default: 5000)
- loadtest.producers: Number of producer threads (default: 50)
- loadtest.consumers: Number of concurrent consumers (default: 100)
- loadtest.delay: Simulated I/O delay in ms (default: 5-10)
Run with:
mvn test -Dtest=VirtualThreadsLoadTest \
-Djunit.jupiter.conditions.deactivate='org.junit.*DisabledCondition' \
-Dcamel.threads.virtual.enabled=true
Extract template method hooks in SedaConsumer to allow subclasses to customize polling behavior without duplicating the entire doRun() loop: - beforePoll(): Called before polling, returns true to proceed or false to skip this iteration. Allows acquiring resources like permits. - afterPollEmpty(): Called when poll returns no message. Allows releasing resources. - processPolledExchange(Exchange): Processes the polled exchange. Default is inline processing; can be overridden to dispatch to another thread. Also made these methods protected for subclass access: - createExecutor(int poolSize): Creates the executor service - setupTasks(): Sets up thread pool and tasks - shutdownExecutor(): Shuts down executors - isShutdownPending()/setShutdownPending(): Access shutdown state - pollTimeout field: Made protected ThreadPerTaskSedaConsumer now simply overrides these hooks instead of duplicating the entire polling loop, reducing code from 223 to 158 lines and improving maintainability.
Add a new 'virtualThreadPerTask' option to SedaEndpoint that enables the ThreadPerTaskSedaConsumer. When enabled, spawns a new thread for each message instead of using a fixed pool of consumer threads. This model is optimized for virtual threads (JDK 21+) where thread creation is very cheap, making it ideal for I/O-bound workloads. The concurrentConsumers option becomes a limit on max concurrent tasks (0 means unlimited). Changes: - Add virtualThreadPerTask property to SedaEndpoint with getter/setter - Update createNewConsumer() to return ThreadPerTaskSedaConsumer when virtualThreadPerTask is enabled - Update VirtualThreadsLoadTest to support testing with the new mode - Add ThreadPerTaskSedaConsumerTest for unit testing the feature - Regenerate endpoint configurers and metadata files
Rethrow RuntimeException (and subclasses like IllegalArgumentException) directly without wrapping. Only wrap checked exceptions in RuntimeException. This fixes test failures where tests expected IllegalArgumentException but were receiving RuntimeException wrapping IllegalArgumentException.
This commit adds a new documentation page covering virtual threads support in Apache Camel, including: - Introduction to virtual threads and why they matter for integration - How to enable virtual threads globally in Camel - Components with virtual thread support (SEDA, Jetty, Platform HTTP, etc.) - SEDA deep dive with two execution models comparison (traditional vs virtualThreadPerTask) including a Mermaid diagram - Backpressure and flow control mechanisms - Context propagation with ContextValue (ThreadLocal vs ScopedValue) - Best practices and performance considerations - Complete code examples for common use cases The article is added to the navigation under Architecture, after Threading Model.
…ilation The ContextValue abstraction properly scopes createRoute/createProcessor ThreadLocals, but KameletEndpoint.doInit() relied on a leaked ThreadLocal value from a bug in the old createRoute(null) which cleared isSetupRoutes instead of isCreateRoute. Fix by capturing the route/processor context in KameletReifier (within the createProcessor scope) and passing them to KameletProcessor, which restores them during doInit() so the endpoint can inherit error handlers correctly. Also fix JDK 25 ScopedValue.Carrier API: use call() instead of get(). Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
- Fix KameletProcessor to use scoped API (createRoute/createProcessor with Runnable) instead of deprecated set/remove pattern that would break on JDK 25+ with ScopedValue - Fix ThreadPerTaskSedaConsumer to check prepared exchange for exceptions (not original) and call onProcessingDone properly - Add default implementations for new ExtendedCamelContext methods to avoid breaking third-party implementations - Fix deprecation version from 4.17.0 to 4.19.0 - Fix Javadoc: ScopedValue requires JDK 25+, not JDK 21+ - Fix @see tag to avoid broken link on JDK 17 Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
… docs - Extract ThreadLocalContextValue from inner class (duplicated in base and JDK 25 ContextValueFactory) into a shared top-level class - Default concurrentConsumers to 0 (unlimited) when virtualThreadPerTask is enabled, since the traditional default of 1 defeats the purpose - Fix Example 3 in virtual-threads.adoc: ContextValue doesn't propagate across SEDA boundaries, use exchange properties instead Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…Task Update @UriParam description and Javadoc to document that concurrentConsumers defaults to 0 (unlimited) when virtualThreadPerTask is enabled, instead of the traditional default of 1. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
6a15a41 to
13ddebb
Compare
Summary
Introduces a
ContextValueabstraction that provides a unified API for thread-scoped data sharing, with implementations using eitherThreadLocal(JDK 17+) orScopedValue(JDK 25+ with virtual threads). Also adds avirtualThreadPerTaskmode to SEDA, refactors SedaConsumer for extensibility, and includes comprehensive virtual threads documentation.Related to: https://issues.apache.org/jira/browse/CAMEL-20199
Key Changes
ContextValue API (
camel-util)ContextValueinterface with factory methods for creating context values and executing operations within scoped contextsContextValueFactorywithThreadLocalimplementation for base JDKScopedValuewhen virtual threads are enabledwhere()API for safe, nestable context bindingNamedThreadLocalin favor ofContextValue.newThreadLocal()ExtendedCamelContext API improvements
setupRoutes(Runnable)/setupRoutes(Callable<T>)createRoute(String, Runnable)/createRoute(String, Callable<T>)createProcessor(String, Runnable)/createProcessor(String, Callable<T>)setupRoutes(boolean),createRoute(String),createProcessor(String))createRoute(null)was clearingisSetupRoutesinstead ofisCreateRouteSedaConsumer template method refactoring
beforePoll(),afterPollEmpty(),processPolledExchange()hookscreateExecutor,setupTasks,shutdownExecutor) protectedThreadPerTaskSedaConsumernow overrides hooks instead of duplicating the entire polling loopSEDA
virtualThreadPerTaskoptionvirtualThreadPerTaskproperty onSedaEndpointThreadPerTaskSedaConsumerwith single coordinator thread + task executorconcurrentConsumersbecomes a max concurrency limit (0 = unlimited)KameletProcessor context propagation
KameletReifier(within createProcessor scope)KameletProcessorfor proper error handler resolution duringdoInit()createRoute(id, Runnable)) for ScopedValue compatibility on JDK 25+DefaultReactiveExecutor
ContextValueinstead ofNamedThreadLocalfor worker thread-local stateWorkerby readingstatisticsEnableddirectly instead of cachingDocumentation
docs/user-manual/modules/ROOT/pages/virtual-threads.adoc)Benefits
ScopedValueon JDK 25+: better performance for virtual thread workloads (no pinning), immutable valuesThreadLocalon JDK 17-24, default implementations in interfaceTest plan
ContextValueTest: unit tests for ThreadLocal-based implementationThreadPerTaskSedaConsumerTest: SEDA virtualThreadPerTask modeVirtualThreadsLoadTest/VirtualThreadsWithThreadsDSLLoadTest: manual benchmarks (disabled by default)