-
Notifications
You must be signed in to change notification settings - Fork 368
Open
Description
Expected behavior
When an error occurs and the partition consumer is closed, subsequent retries or partition expansion logic should recreate the closed partition consumer to ensure no partitions are permanently skipped.
Actual behavior
Once a partition consumer is closed due to an error, it is never recreated. This results in missed consumption on that partition.
Steps to reproduce
- Create a consumer on a partitioned topic.
- Trigger an error at consumer_impl.go#L418.
- Observe that the partition consumer is closed.
- Wait for retry or trigger partition expansion.
- Verify that the closed consumer is not recreated, and messages on that partition are no longer consumed.
System configuration
Pulsar version: 0.14.0, 0.15.1, 0.16.0
Relevant Code Snippets with Bug Notes
- consumer_impl.go#L309-L333 ingores the error from method
internalTopicSubscribeToPartitionsand calls it on ticker
func (c *consumer) runBackgroundPartitionDiscovery(period time.Duration) (cancel func()) {
var wg sync.WaitGroup
stopDiscoveryCh := make(chan struct{})
ticker := time.NewTicker(period)
wg.Add(1)
go func() {
defer wg.Done()
for {
select {
case <-stopDiscoveryCh:
return
case <-ticker.C:
c.log.Debug("Auto discovering new partitions")
c.internalTopicSubscribeToPartitions() // Uncaught error
}
}
}()
return func() {
ticker.Stop()
close(stopDiscoveryCh)
wg.Wait()
}
}- consumer_impl.go#L418-L427 closes all partition consumer when encounting error
if err != nil {
// Since there were some failures,
// cleanup all the partitions that succeeded in creating the consumer
for _, c := range c.consumers {
if c != nil {
c.Close()
}
}
return err
}- consumer_impl.go#L351-L354 returns fastly when the number of partitions has not changed even though the old consumers are closed.
if oldNumPartitions == newNumPartitions {
c.log.Debug("Number of partitions in topic has not changed")
return nil
}- consumer_impl.go#L363-L370 reuses the existing closed consumer instances.
// When for some reason (eg: forced deletion of sub partition) causes oldNumPartitions> newNumPartitions,
// we need to rebuild the cache of new consumers, otherwise the array will be out of bounds.
if oldConsumers != nil && oldNumPartitions < newNumPartitions {
// Copy over the existing consumer instances
for i := 0; i < oldNumPartitions; i++ {
c.consumers[i] = oldConsumers[i]
}
}Metadata
Metadata
Assignees
Labels
No labels