Skip to content

Conversation

@sanjams2
Copy link
Contributor

Summary

Add a histogram metric s3_object_download_duration_seconds to the AWS S3 source to track how long S3 object download + decompression + framing + decoding takes. This helps operators identify slow buckets and debug S3 performance issues.

The metric includes two labels:

  • bucket: The S3 bucket name, to identify which buckets are slow
  • status: Either "success" or "error", to distinguish failed downloads

The timing captures the full download lifecycle from the GetObject API call through complete ByteStream body consumption (including decompression).

Vector configuration

[sources.s3]
type = "aws_s3"
region = "us-east-1"
sqs.queue_url = "https://sqs.us-east-1.amazonaws.com/123456789012/my-queue"

How did you test this PR?

  • cargo test
  • running locally now

Change Type

  • Bug fix
  • New feature
  • Non-functional (chore, refactoring, docs)
  • Performance

Is this a breaking change?

  • Yes
  • No

Does this PR include user facing changes?

  • Yes. Please add a changelog fragment based on our guidelines.
  • No. A maintainer will apply the no-changelog label to this PR.

References

  • Histogram metric pattern followed:
    #[derive(Debug)]
    pub struct GotHttpResponse<'a, T> {
    pub response: &'a Response<T>,
    pub roundtrip: Duration,
    }
    impl<T: HttpBody> InternalEvent for GotHttpResponse<'_, T> {
    fn emit(self) {
    debug!(
    message = "HTTP response.",
    status = %self.response.status(),
    version = ?self.response.version(),
    headers = ?remove_sensitive(self.response.headers()),
    body = %FormatBody(self.response.body()),
    );
    counter!(
    "http_client_responses_total",
    "status" => self.response.status().as_u16().to_string(),
    )
    .increment(1);
    histogram!("http_client_rtt_seconds").record(self.roundtrip);
    histogram!(
    "http_client_response_rtt_seconds",
    "status" => self.response.status().as_u16().to_string(),
    )
    .record(self.roundtrip);
    }
    }
  • Existing S3/SQS internal events:
    mod s3 {
    use aws_sdk_sqs::types::{
    BatchResultErrorEntry, DeleteMessageBatchRequestEntry, DeleteMessageBatchResultEntry,
    SendMessageBatchRequestEntry, SendMessageBatchResultEntry,
    };
    use super::*;
    use crate::sources::aws_s3::sqs::ProcessingError;
    #[derive(Debug)]
    pub struct SqsMessageProcessingError<'a> {
    pub message_id: &'a str,
    pub error: &'a ProcessingError,
    }
    impl InternalEvent for SqsMessageProcessingError<'_> {
    fn emit(self) {
    error!(
    message = "Failed to process SQS message.",
    message_id = %self.message_id,
    error = %self.error,
    error_code = "failed_processing_sqs_message",
    error_type = error_type::PARSER_FAILED,
    stage = error_stage::PROCESSING,
    );
    counter!(
    "component_errors_total",
    "error_code" => "failed_processing_sqs_message",
    "error_type" => error_type::PARSER_FAILED,
    "stage" => error_stage::PROCESSING,
    )
    .increment(1);
    }
    }
    - Existing
    SqsMessageProcessingError pattern we followed

Add histogram metric `s3_object_processing_duration_seconds` to track
how long S3 object downloads and processing takes. The metric includes
`bucket` and `status` labels to help identify slow buckets and
distinguish successful vs failed downloads.

The timing captures the full download lifecycle from the GetObject
API call through complete ByteStream body consumption.
@sanjams2 sanjams2 requested a review from a team as a code owner November 20, 2025 16:01
@github-actions github-actions bot added the domain: sources Anything related to the Vector's sources label Nov 20, 2025
@github-actions
Copy link

github-actions bot commented Nov 20, 2025

All contributors have signed the CLA ✍️ ✅
Posted by the CLA Assistant Lite bot.

@sanjams2
Copy link
Contributor Author

I have read the CLA Document and I hereby sign the CLA

@sanjams2
Copy link
Contributor Author

recheck

Copy link
Contributor

@thomasqueirozb thomasqueirozb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will be a nice addition! I just think we need to redesign the metrics a bit. Additionally new metrics need to be documented in website/cue/reference/components/sources/internal_metrics.cue.

Comment on lines 799 to 808
let processing_status = if read_error.is_some() {
"error"
} else {
"success"
};
emit!(S3ObjectProcessingCompleted {
bucket: &s3_event.s3.bucket.name,
duration: download_start.elapsed(),
status: processing_status,
});
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should emit different events here. One on success and another on failure. I don't think any other metric/histogram in Vector uses status as a way to track success/failure but we instead opt to emit different metrics.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I debated on this a bit, but SGTM. Will fix

@sanjams2 sanjams2 requested a review from a team as a code owner December 3, 2025 03:33
@github-actions github-actions bot added domain: sinks Anything related to the Vector's sinks domain: external docs Anything related to Vector's external, public documentation labels Dec 3, 2025
@github-actions github-actions bot removed the domain: sinks Anything related to the Vector's sinks label Dec 3, 2025
@sanjams2
Copy link
Contributor Author

sanjams2 commented Dec 3, 2025

sorry I kinda screwed up the git history here b/c I was juggling a few PRs for this package. This is what changed if that helps: e49004b

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

domain: external docs Anything related to Vector's external, public documentation domain: sources Anything related to the Vector's sources

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants