Skip to content

Commit 028d16f

Browse files
committed
parallel streaming: various improvements
- Configurable reorg buffer - Create table ahead of spinning up parallel workers to ensure it's ready for all of them and avoid complexity of thread locking - SQL variables for string replacement - Better docs, including limitations
1 parent 0297ae8 commit 028d16f

File tree

5 files changed

+141
-49
lines changed

5 files changed

+141
-49
lines changed

docs/parallel_streaming_usage.md

Lines changed: 27 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -233,7 +233,23 @@ except KeyboardInterrupt:
233233
print("\nStopped by user")
234234
```
235235

236-
**Note on Reorg Buffer**: When transitioning from parallel catchup to continuous streaming, the system automatically starts continuous streaming from `detected_max_block - 200`. This 200-block overlap ensures that any reorgs that occurred during the parallel catchup phase are detected and handled properly. With reorg detection enabled, duplicate blocks are automatically handled correctly.
236+
**Note on Reorg Buffer**: When transitioning from parallel catchup to continuous streaming, the system automatically starts continuous streaming from `detected_max_block - reorg_buffer` (default: 200 blocks). This overlap ensures that any reorgs that occurred during the parallel catchup phase are detected and handled properly. With reorg detection enabled, duplicate blocks are automatically handled correctly. The `reorg_buffer` can be customized via `ParallelConfig(reorg_buffer=N)`.
237+
238+
## Limitations
239+
240+
Currently, parallel streaming has the following limitations:
241+
242+
1. **Block-based partitioning only**: Only supports partitioning by block number columns (`block_num` or `_block_num`). Tables without block numbers cannot use parallel execution.
243+
244+
2. **Schema detection requires data**: Pre-flight schema detection requires at least 1 row in the source table. Empty tables will skip pre-flight creation and let workers handle it.
245+
246+
3. **Static partitioning**: Partitions are created upfront based on the block range. The system does not support dynamic repartitioning during execution.
247+
248+
4. **Thread-level parallelism**: Uses Python threads (ThreadPoolExecutor), not processes. For CPU-bound transformations, performance may be limited by the GIL.
249+
250+
5. **Single table queries**: The partitioning strategy works best with queries against a single table. Complex joins or unions may require careful query structuring.
251+
252+
6. **Reorg buffer configuration**: The `reorg_buffer` parameter (default: 200 blocks) is configurable but applies uniformly. Per-chain customization requires separate `ParallelConfig` instances.
237253

238254
## Performance Characteristics
239255

@@ -301,16 +317,23 @@ Result: Zero data gaps, all reorgs caught ✓
301317
─────────────────────────────────────────────────────────────────────
302318
```
303319

304-
**Why 200 blocks?**
320+
**Why 200 blocks (default)?**
305321
- Ethereum average reorg depth: 1-5 blocks
306322
- 200 blocks = ~40 minutes of history
307323
- Provides safety margin for deep reorgs that occurred during catchup
308324
- Small performance cost (200 blocks re-loaded) vs high data integrity value
309325

310326
**Customizing the Buffer:**
311-
Currently hardcoded to 200 blocks. To modify, edit `parallel.py`:
327+
The reorg buffer is fully configurable via `ParallelConfig`:
312328
```python
313-
reorg_buffer = 200 # Increase for networks with deeper reorgs
329+
parallel_config = ParallelConfig(
330+
num_workers=4,
331+
table_name='eth_firehose.blocks',
332+
min_block=0,
333+
max_block=None, # Hybrid mode
334+
reorg_buffer=500, # Increase for networks with deeper reorgs (e.g., testnets)
335+
block_column='block_num'
336+
)
314337
```
315338

316339
### Custom Partition Filters

src/amp/client.py

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,13 @@
99
from .config.connection_manager import ConnectionManager
1010
from .loaders.registry import create_loader, get_available_loaders
1111
from .loaders.types import LoadConfig, LoadMode, LoadResult
12-
from .streaming import ParallelConfig, ParallelStreamExecutor, ReorgAwareStream, ResumeWatermark, StreamingResultIterator
12+
from .streaming import (
13+
ParallelConfig,
14+
ParallelStreamExecutor,
15+
ReorgAwareStream,
16+
ResumeWatermark,
17+
StreamingResultIterator,
18+
)
1319

1420

1521
class QueryBuilder:
@@ -57,7 +63,7 @@ def load(
5763

5864
# Validate that parallel_config is only used with stream=True
5965
if kwargs.get('parallel_config'):
60-
raise ValueError("parallel_config requires stream=True")
66+
raise ValueError('parallel_config requires stream=True')
6167

6268
# Default to batch streaming (read_all=False) for memory efficiency
6369
kwargs.setdefault('read_all', False)
@@ -238,7 +244,7 @@ def _load_table(
238244
table_name=table_name,
239245
loader_type=loader,
240246
success=False,
241-
error=str(e)
247+
error=str(e),
242248
)
243249

244250
def _load_stream(
@@ -264,7 +270,7 @@ def _load_stream(
264270
table_name=table_name,
265271
loader_type=loader,
266272
success=False,
267-
error=str(e)
273+
error=str(e),
268274
)
269275

270276
def query_and_load_streaming(
@@ -389,4 +395,3 @@ def query_and_load_streaming(
389395
error=str(e),
390396
metadata={'streaming_error': True},
391397
)
392-

src/amp/loaders/base.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@
1414
from ..streaming.types import BlockRange, ResponseBatchWithReorg
1515
from .types import LoadMode, LoadResult
1616

17-
1817
# Type variable for configuration classes
1918
TConfig = TypeVar('TConfig')
2019

src/amp/loaders/implementations/snowflake_loader.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -163,8 +163,12 @@ def _load_batch_impl(self, batch: pa.RecordBatch, table_name: str, **kwargs) ->
163163
'Please use APPEND mode or manually truncate/drop the table before loading.'
164164
)
165165

166+
# Table creation is now handled by base class or pre-flight creation in parallel mode
167+
# For pandas loading, we skip manual table creation and let write_pandas handle it
166168
if create_table and table_name.upper() not in self._created_tables:
167-
self._create_table_from_schema(batch.schema, table_name)
169+
# For pandas, skip table creation - write_pandas will handle it
170+
if self.loading_method != 'pandas':
171+
self._create_table_from_schema(batch.schema, table_name)
168172
self._created_tables.add(table_name.upper())
169173

170174
if self.use_stage:

src/amp/streaming/parallel.py

Lines changed: 99 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,10 @@
11
"""
22
Parallel streaming implementation for high-throughput data loading.
33
4-
This module implements parallel query execution using ThreadPoolExecutor.
5-
It partitions streaming queries by block_num ranges using CTEs (Common Table Expressions)
6-
that DataFusion inlines efficiently.
4+
This module implements parallel query execution using ThreadPoolExecutor.
5+
It partitions streaming queries by block_num ranges
76
87
Key design decisions:
9-
- Uses CTEs to shadow table names with filtered versions for clean partitioning
108
- Only supports streaming queries (not regular load operations)
119
- Block range partitioning only (block_num or _block_num columns)
1210
"""
@@ -24,6 +22,14 @@
2422
if TYPE_CHECKING:
2523
from ..client import Client
2624

25+
# SQL keyword constants for query parsing
26+
_WHERE = ' WHERE '
27+
_ORDER_BY = ' ORDER BY '
28+
_LIMIT = ' LIMIT '
29+
_GROUP_BY = ' GROUP BY '
30+
_SETTINGS = ' SETTINGS '
31+
_STREAM_TRUE = 'STREAM = TRUE'
32+
2733

2834
@dataclass
2935
class QueryPartition:
@@ -56,6 +62,7 @@ class ParallelConfig:
5662
partition_size: Optional[int] = None # Blocks per partition (auto-calculated if not set)
5763
block_column: str = 'block_num' # Column name to partition on
5864
stop_on_error: bool = False # Stop all workers on first error
65+
reorg_buffer: int = 200 # Block overlap when transitioning to continuous streaming (for reorg detection)
5966

6067
def __post_init__(self):
6168
if self.num_workers < 1:
@@ -175,24 +182,23 @@ def wrap_query_with_partition(self, user_query: str, partition: QueryPartition)
175182

176183
# Create partition filter
177184
partition_filter = (
178-
f"{partition.block_column} >= {partition.start_block} "
179-
f"AND {partition.block_column} < {partition.end_block}"
185+
f'{partition.block_column} >= {partition.start_block} AND {partition.block_column} < {partition.end_block}'
180186
)
181187

182188
# Check if query already has a WHERE clause (case-insensitive)
183189
# Look for WHERE before any ORDER BY, LIMIT, or SETTINGS clauses
184190
query_upper = user_query.upper()
185191

186192
# Find WHERE position
187-
where_pos = query_upper.find(' WHERE ')
193+
where_pos = query_upper.find(_WHERE)
188194

189195
if where_pos != -1:
190196
# Query has WHERE clause - append with AND
191197
# Need to insert before ORDER BY, LIMIT, GROUP BY, or SETTINGS if they exist
192-
insert_pos = where_pos + len(' WHERE ')
198+
insert_pos = where_pos + len(_WHERE)
193199

194200
# Find the end of the WHERE clause (before ORDER BY, LIMIT, GROUP BY, SETTINGS)
195-
end_keywords = [' ORDER BY ', ' LIMIT ', ' GROUP BY ', ' SETTINGS ']
201+
end_keywords = [_ORDER_BY, _LIMIT, _GROUP_BY, _SETTINGS]
196202
end_pos = len(user_query)
197203

198204
for keyword in end_keywords:
@@ -201,14 +207,10 @@ def wrap_query_with_partition(self, user_query: str, partition: QueryPartition)
201207
end_pos = keyword_pos
202208

203209
# Insert partition filter with AND
204-
partitioned_query = (
205-
user_query[:end_pos] +
206-
f" AND ({partition_filter})" +
207-
user_query[end_pos:]
208-
)
210+
partitioned_query = user_query[:end_pos] + f' AND ({partition_filter})' + user_query[end_pos:]
209211
else:
210212
# No WHERE clause - add one before ORDER BY, LIMIT, GROUP BY, or SETTINGS
211-
end_keywords = [' ORDER BY ', ' LIMIT ', ' GROUP BY ', ' SETTINGS ']
213+
end_keywords = [_ORDER_BY, _LIMIT, _GROUP_BY, _SETTINGS]
212214
insert_pos = len(user_query)
213215

214216
for keyword in end_keywords:
@@ -217,11 +219,7 @@ def wrap_query_with_partition(self, user_query: str, partition: QueryPartition)
217219
insert_pos = keyword_pos
218220

219221
# Insert WHERE clause with partition filter
220-
partitioned_query = (
221-
user_query[:insert_pos] +
222-
f" WHERE {partition_filter}" +
223-
user_query[insert_pos:]
224-
)
222+
partitioned_query = user_query[:insert_pos] + f' WHERE {partition_filter}' + user_query[insert_pos:]
225223

226224
return partitioned_query
227225

@@ -270,7 +268,7 @@ def _detect_current_max_block(self) -> int:
270268
Raises:
271269
RuntimeError: If query fails or returns no results
272270
"""
273-
query = f"SELECT MAX({self.config.block_column}) as max_block FROM {self.config.table_name}"
271+
query = f'SELECT MAX({self.config.block_column}) as max_block FROM {self.config.table_name}'
274272
self.logger.info(f'Detecting current max block with query: {query}')
275273

276274
try:
@@ -290,7 +288,7 @@ def _detect_current_max_block(self) -> int:
290288

291289
except Exception as e:
292290
self.logger.error(f'Failed to detect max block: {e}')
293-
raise RuntimeError(f'Failed to detect current max block from {self.config.table_name}: {e}')
291+
raise RuntimeError(f'Failed to detect current max block from {self.config.table_name}: {e}') from e
294292

295293
def execute_parallel_stream(
296294
self, user_query: str, destination: str, connection_name: str, load_config: Optional[Dict[str, Any]] = None
@@ -377,15 +375,83 @@ def execute_parallel_stream(
377375
f'Starting parallel streaming with {len(partitions)} partitions across {self.config.num_workers} workers'
378376
)
379377

380-
# 2. Submit worker tasks
378+
# 2. Pre-flight table creation (before workers start)
379+
# Create table once to avoid locking complexity in parallel workers
380+
try:
381+
# Get connection info
382+
connection_info = self.client.connection_manager.get_connection_info(connection_name)
383+
loader_config = connection_info['config']
384+
loader_type = connection_info['loader']
385+
386+
# Get sample schema by executing LIMIT 1 on original query
387+
# We don't need partition filtering for schema detection, just need any row
388+
sample_query = user_query.strip().rstrip(';')
389+
390+
# Remove SETTINGS clause (especially stream = true) to avoid streaming mode
391+
sample_query_upper = sample_query.upper()
392+
settings_pos = sample_query_upper.find(_SETTINGS)
393+
if settings_pos != -1:
394+
sample_query = sample_query[:settings_pos].rstrip()
395+
sample_query_upper = sample_query.upper()
396+
397+
# Insert LIMIT 1 before ORDER BY, GROUP BY if present
398+
end_keywords = [_ORDER_BY, _GROUP_BY]
399+
insert_pos = len(sample_query)
400+
401+
for keyword in end_keywords:
402+
keyword_pos = sample_query_upper.find(keyword)
403+
if keyword_pos != -1 and keyword_pos < insert_pos:
404+
insert_pos = keyword_pos
405+
406+
# Insert LIMIT 1 at the correct position
407+
sample_query = sample_query[:insert_pos].rstrip() + ' LIMIT 1' + sample_query[insert_pos:]
408+
409+
self.logger.debug(f'Fetching schema with sample query: {sample_query[:100]}...')
410+
sample_table = self.client.get_sql(sample_query, read_all=True)
411+
412+
if sample_table.num_rows > 0:
413+
# Create loader instance to get effective schema and create table
414+
from ..loaders.registry import create_loader
415+
416+
loader_instance = create_loader(loader_type, loader_config)
417+
418+
try:
419+
loader_instance.connect()
420+
421+
# Get schema from sample batch
422+
sample_batch = sample_table.to_batches()[0]
423+
effective_schema = sample_batch.schema
424+
425+
# Create table once with schema
426+
if hasattr(loader_instance, '_create_table_from_schema'):
427+
loader_instance._create_table_from_schema(effective_schema, destination)
428+
loader_instance._created_tables.add(destination)
429+
self.logger.info(f"Pre-created table '{destination}' with {len(effective_schema)} columns")
430+
else:
431+
self.logger.warning('Loader does not support table creation, workers will handle it')
432+
finally:
433+
loader_instance.disconnect()
434+
else:
435+
self.logger.warning('Sample query returned no rows, skipping pre-flight table creation')
436+
437+
# Update load_config to skip table creation in workers
438+
load_config['create_table'] = False
439+
440+
except Exception as e:
441+
self.logger.warning(
442+
f'Pre-flight table creation failed: {e}. Workers will attempt table creation with locking.'
443+
)
444+
# Don't fail the entire job - let workers try to create the table
445+
446+
# 3. Submit worker tasks
381447
futures = {}
382448
for partition in partitions:
383449
future = self.executor.submit(
384450
self._execute_partition, user_query, partition, destination, connection_name, load_config
385451
)
386452
futures[future] = partition
387453

388-
# 3. Stream results as they complete
454+
# 4. Stream results as they complete
389455
try:
390456
for future in as_completed(futures):
391457
partition = futures[future]
@@ -417,17 +483,16 @@ def execute_parallel_stream(
417483
self.executor.shutdown(wait=True)
418484
self._log_final_stats()
419485

420-
# 4. If in hybrid mode, transition to continuous streaming for live blocks
486+
# 5. If in hybrid mode, transition to continuous streaming for live blocks
421487
if continue_streaming:
422488
# Start continuous streaming with a buffer for reorg overlap
423489
# This ensures we catch any reorgs that occurred during parallel catchup
424-
reorg_buffer = 200
425-
continuous_start_block = max(self.config.min_block, detected_max_block - reorg_buffer)
490+
continuous_start_block = max(self.config.min_block, detected_max_block - self.config.reorg_buffer)
426491

427492
self.logger.info(
428493
f'Parallel catch-up complete (loaded up to block {detected_max_block:,}). '
429494
f'Transitioning to continuous streaming from block {continuous_start_block:,} '
430-
f'(with {reorg_buffer}-block reorg buffer)...'
495+
f'(with {self.config.reorg_buffer}-block reorg buffer)...'
431496
)
432497

433498
# Ensure query has streaming settings
@@ -443,20 +508,16 @@ def execute_parallel_stream(
443508
# Add block filter to start from (detected_max - buffer) to catch potential reorgs
444509
# Check if query already has WHERE clause
445510
where_pos = streaming_query_upper.find(' WHERE ')
446-
block_filter = f"{self.config.block_column} >= {continuous_start_block}"
511+
block_filter = f'{self.config.block_column} >= {continuous_start_block}'
447512

448513
if where_pos != -1:
449514
# Has WHERE clause - append with AND
450515
# Find position after WHERE keyword
451516
insert_pos = where_pos + len(' WHERE ')
452-
streaming_query = (
453-
streaming_query[:insert_pos] +
454-
f"({block_filter}) AND " +
455-
streaming_query[insert_pos:]
456-
)
517+
streaming_query = streaming_query[:insert_pos] + f'({block_filter}) AND ' + streaming_query[insert_pos:]
457518
else:
458519
# No WHERE clause - add one before SETTINGS if present
459-
streaming_query += f" WHERE {block_filter}"
520+
streaming_query += f' WHERE {block_filter}'
460521

461522
# Now add streaming settings for continuous mode
462523
streaming_query += ' SETTINGS stream = true'
@@ -521,7 +582,7 @@ def _execute_partition(
521582
destination=destination,
522583
connection_name=connection_name,
523584
read_all=False, # Stream batches for memory efficiency
524-
**load_config
585+
**load_config,
525586
)
526587

527588
# Aggregate results from streaming iterator
@@ -543,7 +604,7 @@ def _execute_partition(
543604
self.logger.info(
544605
f'Worker {partition.partition_id} completed: '
545606
f'{total_rows:,} rows in {duration:.2f}s '
546-
f'({batch_count} batches, {total_rows/duration:.0f} rows/sec)'
607+
f'({batch_count} batches, {total_rows / duration:.0f} rows/sec)'
547608
)
548609

549610
# Return aggregated result
@@ -603,4 +664,4 @@ def _log_final_stats(self):
603664
f'avg throughput: {avg_throughput:,.0f} rows/sec per worker'
604665
)
605666
else:
606-
self.logger.error(f'Parallel execution failed: all {self._stats.workers_failed} workers failed')
667+
self.logger.error(f'Parallel execution failed: all {self._stats.workers_failed} workers failed')

0 commit comments

Comments
 (0)