Skip to content

Commit 7e0b6d3

Browse files
committed
use lmdb stream state for kafka loader
1 parent 4f83204 commit 7e0b6d3

File tree

3 files changed

+18
-1
lines changed

3 files changed

+18
-1
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,3 +60,4 @@ data/
6060
# Build artifacts
6161
*.tar.gz
6262
*.zip
63+
.amp_state/

apps/kafka_streaming_loader.py

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,15 @@ def main(
5757
else:
5858
label_config = None
5959

60-
client.configure_connection('kafka', 'kafka', {'bootstrap_servers': kafka_brokers, 'client_id': 'amp-kafka-loader'})
60+
client.configure_connection(
61+
'kafka',
62+
'kafka',
63+
{
64+
'bootstrap_servers': kafka_brokers,
65+
'client_id': 'amp-kafka-loader',
66+
'state': {'enabled': True, 'storage': 'lmdb'},
67+
},
68+
)
6169

6270
with open(query_file) as f:
6371
query = f.read()

src/amp/loaders/implementations/kafka_loader.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import pyarrow as pa
66
from kafka import KafkaProducer
77

8+
from ...streaming.lmdb_state import LMDBStreamStateStore
89
from ...streaming.types import BlockRange
910
from ..base import DataLoader, LoadMode
1011

@@ -47,6 +48,10 @@ def connect(self) -> None:
4748
self.logger.info(f'Connected to Kafka at {self.config.bootstrap_servers}')
4849
self.logger.info(f'Client ID: {self.config.client_id}')
4950

51+
if self.state_enabled and self.state_storage == 'lmdb':
52+
self.state_store = LMDBStreamStateStore(connection_name=self.config.client_id)
53+
self.logger.info(f'Initialized LMDB state store at {self.state_store.data_dir}')
54+
5055
self._is_connected = True
5156

5257
except Exception as e:
@@ -58,6 +63,9 @@ def disconnect(self) -> None:
5863
self._producer.close()
5964
self._producer = None
6065

66+
if isinstance(self.state_store, LMDBStreamStateStore):
67+
self.state_store.close()
68+
6169
self._is_connected = False
6270
self.logger.info('Disconnected from Kafka')
6371

0 commit comments

Comments
 (0)