Skip to content

Conversation

@DeStefaniAndrei
Copy link
Contributor

Summary

Add Kafka consumer for reading UserOp audit events, following the existing KafkaAuditLogReader pattern for bundle events.

Changes

New Types (reader.rs)

  • UserOpEventWrapper: Wraps UserOpEvent with Kafka metadata (key, timestamp)
  • UserOpEventReader trait: Defines the interface for reading UserOp events
  • KafkaUserOpAuditLogReader: Kafka consumer implementation

Integration Test (integration_tests.rs)

  • test_userop_kafka_publisher_reader_integration: End-to-end test that:
    1. Spins up Kafka in Docker (via testcontainers)
    2. Publishes a UserOpEvent::AddedToMempool
    3. Reads it back with KafkaUserOpAuditLogReader
    4. Verifies all fields match
    5. Tests commit functionality

Testing

All pass, including the new integration test.

Next Steps

  • UserOp archiver (reader → S3 storage)

- Add UserOpEventWrapper struct with key, event, timestamp
- Add UserOpEventReader trait with read_event() and commit()
- Add KafkaUserOpAuditLogReader implementation
- Add integration test verifying real Kafka publish/read flow
Comment on lines +165 to +167
pub fn topic(&self) -> &str {
&self.topic
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

qq: seems like KafkaUserOpAuditLogReader::topic() isn't used. should we remove?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants