@@ -305,71 +305,207 @@ def _get_table_metadata(self, table: pa.Table, duration: float, batch_count: int
305305
306306## Testing
307307
308- ### Integration Test Structure
308+ ### Generalized Test Infrastructure
309309
310- Create integration tests in `tests/integration/test_{system}_loader.py`:
310+ The project uses a generalized test infrastructure that eliminates code duplication across loader tests. Instead of writing standalone tests for each loader, you inherit from shared base test classes.
311+
312+ ### Architecture
313+
314+ ```
315+ tests/integration/loaders/
316+ ├── conftest.py # Base classes and fixtures
317+ ├── test_base_loader.py # 7 core tests (all loaders inherit)
318+ ├── test_base_streaming.py # 5 streaming tests (for loaders with reorg support)
319+ └── backends/
320+ ├── test_postgresql.py # PostgreSQL-specific config + tests
321+ ├── test_redis.py # Redis-specific config + tests
322+ └── test_example.py # Your loader tests here
323+ ```
324+
325+ ### Step 1: Create Configuration Fixture
326+
327+ Add your loader's configuration fixture to `tests/conftest.py`:
328+
329+ ```python
330+ @pytest.fixture(scope='session')
331+ def example_test_config(request):
332+ """Example loader configuration from testcontainer or environment"""
333+ # Use testcontainers for CI, or fall back to environment variables
334+ if TESTCONTAINERS_AVAILABLE and USE_TESTCONTAINERS:
335+ # Set up testcontainer (if applicable)
336+ example_container = request.getfixturevalue('example_container')
337+ return {
338+ 'host': example_container.get_container_host_ip(),
339+ 'port': example_container.get_exposed_port(5432),
340+ 'database': 'test_db',
341+ 'user': 'test_user',
342+ 'password': 'test_pass',
343+ }
344+ else:
345+ # Fall back to environment variables
346+ return {
347+ 'host': os.getenv('EXAMPLE_HOST', 'localhost'),
348+ 'port': int(os.getenv('EXAMPLE_PORT', '5432')),
349+ 'database': os.getenv('EXAMPLE_DB', 'test_db'),
350+ 'user': os.getenv('EXAMPLE_USER', 'test_user'),
351+ 'password': os.getenv('EXAMPLE_PASSWORD', 'test_pass'),
352+ }
353+ ```
354+
355+ ### Step 2: Create Test Configuration Class
356+
357+ Create `tests/integration/loaders/backends/test_example.py`:
311358
312359```python
313- # tests/integration/test_example_loader.py
360+ """
361+ Example loader integration tests using generalized test infrastructure.
362+ """
314363
364+ from typing import Any, Dict, List, Optional
315365import pytest
316- import pyarrow as pa
317- from src.amp.loaders.base import LoadMode
366+
318367from src.amp.loaders.implementations.example_loader import ExampleLoader
368+ from tests.integration.loaders.conftest import LoaderTestConfig
369+ from tests.integration.loaders.test_base_loader import BaseLoaderTests
370+ from tests.integration.loaders.test_base_streaming import BaseStreamingTests
371+
372+
373+ class ExampleTestConfig(LoaderTestConfig):
374+ """Example-specific test configuration"""
375+
376+ loader_class = ExampleLoader
377+ config_fixture_name = 'example_test_config'
378+
379+ # Declare loader capabilities
380+ supports_overwrite = True
381+ supports_streaming = True # Set to False if no streaming support
382+ supports_multi_network = True # For blockchain loaders with reorg
383+ supports_null_values = True
384+
385+ def get_row_count(self, loader: ExampleLoader, table_name: str) -> int:
386+ """Get row count from table"""
387+ # Implement using your loader's API
388+ return loader._connection.query(f"SELECT COUNT(*) FROM {table_name}")[0]['count']
389+
390+ def query_rows(
391+ self,
392+ loader: ExampleLoader,
393+ table_name: str,
394+ where: Optional[str] = None,
395+ order_by: Optional[str] = None
396+ ) -> List[Dict[str, Any]]:
397+ """Query rows from table"""
398+ query = f"SELECT * FROM {table_name}"
399+ if where:
400+ query += f" WHERE {where}"
401+ if order_by:
402+ query += f" ORDER BY {order_by}"
403+ return loader._connection.query(query)
404+
405+ def cleanup_table(self, loader: ExampleLoader, table_name: str) -> None:
406+ """Drop table"""
407+ loader._connection.execute(f"DROP TABLE IF EXISTS {table_name}")
408+
409+ def get_column_names(self, loader: ExampleLoader, table_name: str) -> List[str]:
410+ """Get column names from table"""
411+ result = loader._connection.query(
412+ f"SELECT column_name FROM information_schema.columns WHERE table_name = '{table_name}'"
413+ )
414+ return [row['column_name'] for row in result]
319415
320- @pytest.fixture
321- def example_config():
322- return {
323- 'host': 'localhost',
324- 'port': 5432,
325- 'database': 'test_db',
326- 'user': 'test_user',
327- 'password': 'test_pass'
328- }
329416
330- @pytest.fixture
331- def test_data():
332- return pa.Table.from_pydict({
333- 'id': [1, 2, 3],
334- 'name': ['a', 'b', 'c'],
335- 'value': [1.0, 2.0, 3.0]
336- })
417+ # Core tests - ALL loaders must inherit these
418+ class TestExampleCore(BaseLoaderTests):
419+ """Inherits 7 core tests: connection, context manager, batching, modes, null handling, errors"""
420+ config = ExampleTestConfig()
421+
337422
423+ # Streaming tests - Only for loaders with streaming/reorg support
424+ class TestExampleStreaming(BaseStreamingTests):
425+ """Inherits 5 streaming tests: metadata columns, reorg deletion, overlapping ranges, multi-network, microbatch dedup"""
426+ config = ExampleTestConfig()
427+
428+
429+ # Loader-specific tests
338430@pytest.mark.integration
339431@pytest.mark.example
340- class TestExampleLoaderIntegration:
341- def test_connection(self, example_config):
342- loader = ExampleLoader(example_config)
343-
344- loader.connect()
345- assert loader.is_connected
346-
347- loader.disconnect()
348- assert not loader.is_connected
349-
350- def test_basic_loading(self, example_config, test_data):
351- loader = ExampleLoader(example_config)
352-
432+ class TestExampleSpecific:
433+ """Example-specific functionality tests"""
434+ config = ExampleTestConfig()
435+
436+ def test_custom_feature(self, loader, test_table_name, cleanup_tables):
437+ """Test example-specific functionality"""
438+ cleanup_tables.append(test_table_name)
439+
353440 with loader:
354- result = loader.load_table(test_data, 'test_table')
355-
441+ # Test your loader's unique features
442+ result = loader.some_custom_method(test_table_name)
356443 assert result.success
357- assert result.rows_loaded == 3
358- assert result.metadata['operation'] == 'load_table'
359- assert result.metadata['batches_processed'] > 0
444+ ```
445+
446+ ### What You Get Automatically
447+
448+ By inheriting from the base test classes, you automatically get:
449+
450+ **From `BaseLoaderTests` (7 core tests):**
451+ - `test_connection` - Connection establishment and disconnection
452+ - `test_context_manager` - Context manager functionality
453+ - `test_batch_loading` - Basic batch loading
454+ - `test_append_mode` - Append mode operations
455+ - `test_overwrite_mode` - Overwrite mode operations
456+ - `test_null_handling` - Null value handling
457+ - `test_error_handling` - Error scenarios
458+
459+ **From `BaseStreamingTests` (5 streaming tests):**
460+ - `test_streaming_metadata_columns` - Metadata column creation
461+ - `test_reorg_deletion` - Blockchain reorganization handling
462+ - `test_reorg_overlapping_ranges` - Overlapping range invalidation
463+ - `test_reorg_multi_network` - Multi-network reorg isolation
464+ - `test_microbatch_deduplication` - Microbatch duplicate detection
465+
466+ ### Required LoaderTestConfig Methods
467+
468+ You must implement these four methods in your `LoaderTestConfig` subclass:
469+
470+ ```python
471+ def get_row_count(self, loader, table_name: str) -> int:
472+ """Return number of rows in table"""
473+
474+ def query_rows(self, loader, table_name: str, where=None, order_by=None) -> List[Dict]:
475+ """Query and return rows as list of dicts"""
476+
477+ def cleanup_table(self, loader, table_name: str) -> None:
478+ """Drop/delete the table"""
479+
480+ def get_column_names(self, loader, table_name: str) -> List[str]:
481+ """Return list of column names"""
482+ ```
483+
484+ ### Capability Flags
485+
486+ Set these flags in your `LoaderTestConfig` to control which tests run:
487+
488+ ```python
489+ supports_overwrite = True # Can overwrite existing data
490+ supports_streaming = True # Supports streaming with metadata
491+ supports_multi_network = True # Supports multi-network isolation (blockchain loaders)
492+ supports_null_values = True # Handles NULL values correctly
360493```
361494
362495### Running Tests
363496
364497```bash
365- # Run all integration tests
366- make test-integration
498+ # Run all tests for your loader
499+ uv run pytest tests/integration/loaders/backends/test_example.py -v
500+
501+ # Run only core tests
502+ uv run pytest tests/integration/loaders/backends/test_example.py::TestExampleCore -v
367503
368- # Run specific loader tests
369- make test-example
504+ # Run only streaming tests
505+ uv run pytest tests/integration/loaders/backends/test_example.py::TestExampleStreaming -v
370506
371- # Run with environment variables
372- uv run --env-file .test.env pytest tests/integration/test_example_loader .py -v
507+ # Run specific test
508+ uv run pytest tests/integration/loaders/backends/test_example .py::TestExampleCore::test_connection -v
373509```
374510
375511## Best Practices
0 commit comments