Skip to content

Commit 54d0026

Browse files
committed
Linting fixes
1 parent 9bce014 commit 54d0026

File tree

8 files changed

+31
-28
lines changed

8 files changed

+31
-28
lines changed

src/amp/config/label_manager.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -123,7 +123,7 @@ def hex_to_binary(v):
123123
)
124124

125125
except FileNotFoundError:
126-
raise FileNotFoundError(f'Label CSV file not found: {csv_path}')
126+
raise FileNotFoundError(f'Label CSV file not found: {csv_path}') from None
127127
except Exception as e:
128128
raise ValueError(f"Failed to load label CSV '{csv_path}': {e}") from e
129129

src/amp/loaders/implementations/deltalake_loader.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -707,7 +707,8 @@ def _handle_reorg(self, invalidation_ranges: List[BlockRange], table_name: str,
707707
# Overwrite the table with filtered data
708708
self.logger.info(
709709
f'Executing blockchain reorg deletion for {len(invalidation_ranges)} networks '
710-
f'in Delta Lake table. Deleting {deleted_count} rows affected by {len(all_affected_batch_ids)} batches.'
710+
f'in Delta Lake table. Deleting {deleted_count} rows affected by '
711+
f'{len(all_affected_batch_ids)} batches.'
711712
)
712713

713714
# Use overwrite mode to replace table contents

src/amp/loaders/implementations/snowflake_loader.py

Lines changed: 20 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -659,7 +659,7 @@ def acquire(self, timeout: Optional[float] = 30.0) -> SnowflakeConnection:
659659

660660
return connection
661661
except Empty:
662-
raise RuntimeError(f'Failed to acquire connection from pool within {timeout}s')
662+
raise RuntimeError(f'Failed to acquire connection from pool within {timeout}s') from None
663663

664664
def release(self, connection: SnowflakeConnection) -> None:
665665
"""
@@ -772,13 +772,6 @@ def connect(self) -> None:
772772
else:
773773
# Create dedicated connection (legacy behavior)
774774
# Set defaults for connection parameters
775-
default_params = {
776-
'login_timeout': 60,
777-
'network_timeout': 300,
778-
'socket_timeout': 300,
779-
'validate_default_parameters': True,
780-
'paramstyle': 'qmark',
781-
}
782775

783776
conn_params = {
784777
'account': self.config.account,
@@ -920,7 +913,7 @@ def _init_streaming_client(self, table_name: str) -> None:
920913
raise ImportError(
921914
'snowpipe-streaming package required for Snowpipe Streaming. '
922915
'Install with: pip install snowpipe-streaming'
923-
)
916+
) from None
924917
except Exception as e:
925918
self.logger.error(f'Failed to initialize Snowpipe Streaming client for {table_name}: {e}')
926919
raise
@@ -971,7 +964,8 @@ def _create_streaming_pipe(self, pipe_name: str, table_name: str) -> None:
971964
"""
972965
self.cursor.execute(create_pipe_sql)
973966
self.logger.info(
974-
f"Created or verified Snowpipe Streaming pipe '{pipe_name}' for table {table_name} with {len(column_info)} columns"
967+
f"Created or verified Snowpipe Streaming pipe '{pipe_name}' for table {table_name} "
968+
f'with {len(column_info)} columns'
975969
)
976970
except Exception as e:
977971
# Pipe creation might fail if it already exists or if we don't have permissions
@@ -1237,7 +1231,8 @@ def _load_via_stage(self, batch: pa.RecordBatch, table_name: str) -> int:
12371231

12381232
t_end = time.time()
12391233
self.logger.info(
1240-
f'Total _load_via_stage took {t_end - t_start:.2f}s for {rows_loaded} rows ({rows_loaded / (t_end - t_start):.0f} rows/sec)'
1234+
f'Total _load_via_stage took {t_end - t_start:.2f}s for {rows_loaded} rows '
1235+
f'({rows_loaded / (t_end - t_start):.0f} rows/sec)'
12411236
)
12421237

12431238
return rows_loaded
@@ -1334,7 +1329,7 @@ def _load_via_pandas(self, batch: pa.RecordBatch, table_name: str) -> int:
13341329
raise ImportError(
13351330
'pandas and snowflake.connector.pandas_tools are required for pandas loading. '
13361331
'Install with: pip install pandas'
1337-
)
1332+
) from None
13381333

13391334
t_start = time.time()
13401335
max_retries = 3 # Retry on transient errors
@@ -1575,7 +1570,10 @@ def _append_with_retry(self, channel: Any, rows: List[Dict[str, Any]]) -> None:
15751570
import sys
15761571

15771572
append_time_ms = (t_append_end - t_append_start) * 1000
1578-
timing_msg = f'⏱️ Snowpipe append: {len(rows)} rows in {append_time_ms:.2f}ms ({len(rows) / append_time_ms * 1000:.0f} rows/sec)\n'
1573+
rows_per_sec = len(rows) / append_time_ms * 1000
1574+
timing_msg = (
1575+
f'⏱️ Snowpipe append: {len(rows)} rows in {append_time_ms:.2f}ms ({rows_per_sec:.0f} rows/sec)\n'
1576+
)
15791577
sys.stderr.write(timing_msg)
15801578
sys.stderr.flush()
15811579

@@ -1649,7 +1647,11 @@ def _load_via_streaming(self, batch: pa.RecordBatch, table_name: str, **kwargs)
16491647
t_batch_end = time.perf_counter()
16501648
batch_time_ms = (t_batch_end - t_batch_start) * 1000
16511649
num_chunks = (batch.num_rows + MAX_ROWS_PER_CHUNK - 1) // MAX_ROWS_PER_CHUNK
1652-
timing_msg = f'⏱️ Batch load complete: {total_loaded} rows in {batch_time_ms:.2f}ms ({total_loaded / batch_time_ms * 1000:.0f} rows/sec) [{num_chunks} chunks]\n'
1650+
rows_per_sec = total_loaded / batch_time_ms * 1000
1651+
timing_msg = (
1652+
f'⏱️ Batch load complete: {total_loaded} rows in {batch_time_ms:.2f}ms '
1653+
f'({rows_per_sec:.0f} rows/sec) [{num_chunks} chunks]\n'
1654+
)
16531655
sys.stderr.write(timing_msg)
16541656
sys.stderr.flush()
16551657

@@ -1661,7 +1663,10 @@ def _load_via_streaming(self, batch: pa.RecordBatch, table_name: str, **kwargs)
16611663

16621664
t_batch_end = time.perf_counter()
16631665
batch_time_ms = (t_batch_end - t_batch_start) * 1000
1664-
timing_msg = f'⏱️ Batch load complete: {len(rows)} rows in {batch_time_ms:.2f}ms ({len(rows) / batch_time_ms * 1000:.0f} rows/sec)\n'
1666+
rows_per_sec = len(rows) / batch_time_ms * 1000
1667+
timing_msg = (
1668+
f'⏱️ Batch load complete: {len(rows)} rows in {batch_time_ms:.2f}ms ({rows_per_sec:.0f} rows/sec)\n'
1669+
)
16651670
sys.stderr.write(timing_msg)
16661671
sys.stderr.flush()
16671672

src/amp/streaming/parallel.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -408,8 +408,9 @@ def _get_resume_adjusted_config(
408408
total_gap_blocks = sum(br.end - br.start + 1 for br in gap_ranges)
409409

410410
log_message = (
411-
f'Resume optimization: Detected {len(gap_ranges)} gap(s) totaling {total_gap_blocks:,} blocks. '
412-
f'Will prioritize gap filling before processing remaining historical range.'
411+
f'Resume optimization: Detected {len(gap_ranges)} gap(s) totaling '
412+
f'{total_gap_blocks:,} blocks. Will prioritize gap filling before '
413+
f'processing remaining historical range.'
413414
)
414415

415416
return config, resume_watermark, log_message

tests/integration/test_checkpoint_resume.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -406,7 +406,7 @@ def test_processed_ranges_table_creation(self, checkpoint_db_connection):
406406
enabled=True,
407407
table_prefix='test_amp_',
408408
)
409-
store = DatabaseProcessedRangesStore(config, checkpoint_db_connection)
409+
DatabaseProcessedRangesStore(config, checkpoint_db_connection)
410410

411411
cursor = checkpoint_db_connection.cursor()
412412
cursor.execute("""

tests/integration/test_resilient_streaming.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -362,7 +362,7 @@ def test_all_resilience_features_together(self):
362362
batch = pa.record_batch([[1]], schema=schema)
363363

364364
# Multiple successful loads with retries
365-
for i in range(3):
365+
for _i in range(3):
366366
# Reset failure mode for each iteration
367367
loader.current_attempt = 0
368368

tests/integration/test_snowflake_loader.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1075,7 +1075,8 @@ def test_streaming_performance(
10751075
print(f' Throughput: {rows_per_second:,.0f} rows/sec')
10761076
print(f' Loading method: {result.metadata.get("loading_method")}')
10771077

1078-
# Wait for Snowpipe streaming data to become queryable (eventual consistency, larger dataset may take longer)
1078+
# Wait for Snowpipe streaming data to become queryable
1079+
# (eventual consistency, larger dataset may take longer)
10791080
count = wait_for_snowpipe_data(loader, test_table_name, performance_test_data.num_rows, max_wait=60)
10801081
assert count == performance_test_data.num_rows
10811082

tests/unit/test_streaming_types.py

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -276,7 +276,7 @@ def test_from_flight_data_ranges_complete_default(self):
276276

277277
@pytest.mark.unit
278278
class TestResponseBatch:
279-
"""Test ResponseBatch properties"""
279+
"""Test ResponseBatch factory methods and properties"""
280280

281281
def test_num_rows_property(self):
282282
"""Test num_rows property delegates to data"""
@@ -305,11 +305,6 @@ def test_networks_property(self):
305305
assert len(networks) == 2
306306
assert set(networks) == {'ethereum', 'polygon'}
307307

308-
309-
@pytest.mark.unit
310-
class TestResponseBatch:
311-
"""Test ResponseBatch factory methods and properties"""
312-
313308
def test_data_batch_creation(self):
314309
"""Test creating a data batch response"""
315310
data = pa.record_batch([pa.array([1])], names=['id'])

0 commit comments

Comments
 (0)