-
Notifications
You must be signed in to change notification settings - Fork 1.8k
fix: close broken tcp connections #3384
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
4405310 to
6202ea6
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for reporting and investigating this issue, as well as raising a PR!
A few thoughts:
| func (client *client) checkSeedBrokersHealth(brokers []*Broker) []*Broker { | ||
| if len(brokers) == 0 { | ||
| return nil | ||
| } | ||
|
|
||
| healthyBrokers := make([]*Broker, 0, len(brokers)) | ||
| for _, broker := range brokers { | ||
| if err := broker.getSockError(); err != nil { | ||
| Logger.Printf("client/seedbrokers skipped seed broker #%d at %s due to socket error: %v", broker.ID(), broker.Addr(), err) | ||
| safeAsyncClose(broker) | ||
| continue | ||
| } | ||
|
|
||
| healthyBrokers = append(healthyBrokers, broker) | ||
| } | ||
|
|
||
| return healthyBrokers | ||
| } | ||
|
|
||
| func (client *client) checkBrokersHealth() { | ||
| for id, broker := range client.brokers { | ||
| if err := broker.getSockError(); err != nil { | ||
| Logger.Printf("client/brokers skipped broker #%d at %s due to socket error: %v", broker.ID(), broker.Addr(), err) | ||
| safeAsyncClose(broker) | ||
| delete(client.brokers, id) | ||
| } | ||
| } | ||
|
|
||
| client.seedBrokers = client.checkSeedBrokersHealth(client.seedBrokers) | ||
| client.deadSeeds = client.checkSeedBrokersHealth(client.deadSeeds) | ||
| } | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we delete from seedBrokers (bootstraps) what re-populates them?
I don't really understand why this new health check is bolted on as a separate pass (client.checkBrokersHealth + checkSeedBrokersHealth) instead of piggy‑backing on existing metadata flows - was there a strong motivation for doing so?
During LeastLoadedBroker/Leader/RefreshMetadata, when a broker returns an error, we already close and deregister it so we could put a check for socket errors there I think? Similarly In updateBroker, while reconciling broker lists, we could probe each existing broker and skip any with SO_ERROR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for your explanation.
If we delete from seedBrokers (bootstraps) what re-populates them?
In that case, should the seedBrokers be re-opened instead of being removed when we detect SO_ERROR?
why this new health check is bolted on as a separate pass
I'm not entirely sure where the most appropriate place is to perform this health check, as I'm not deeply familiar with all parts of this library. I initially attempted to implement it inside updateBrokers(), but the issue still occurred.
With this PR, broken TCP connections would at least be detected and closed every 10 minutes.
During LeastLoadedBroker/Leader/RefreshMetadata, when a broker returns an error, we already close and deregister it so we could put a check for socket errors there I think?
I'm not certain that covers all cases. For brokers that rarely get used and don't fetch data for a long time, their TCP connections may remain broken for an extended period without triggering any of those paths.
internal/sockopt/sockopt_other.go
Outdated
| //go:build !windows | ||
|
|
||
| package sockopt | ||
|
|
||
| import "syscall" | ||
|
|
||
| const SockoptError = syscall.SO_ERROR | ||
|
|
||
| func GetSockoptInt(fd uintptr, opt int) (int, error) { | ||
| return syscall.GetsockoptInt(int(fd), syscall.SOL_SOCKET, opt) | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"other" is a bit vague, if we're going to have this then lets match Go's convention of calling it sockopt_unix.go or sockopt_posix.go
I also wonder if we should just have these in the single top-level sarama package but use unexported names for the funcs rather than adding an ./internal/ package?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The weird thing is that Windows does implement this part of the POSIX standard, Golang just doesn’t have the SO_ERROR definition. So _posix.go would be misleading.
Convention is to simply add no to the build tag suffix: sockopt_nowindows.go
internal/sockopt/sockopt_other.go
Outdated
| //go:build !windows | ||
|
|
||
| package sockopt | ||
|
|
||
| import "syscall" | ||
|
|
||
| const SockoptError = syscall.SO_ERROR | ||
|
|
||
| func GetSockoptInt(fd uintptr, opt int) (int, error) { | ||
| return syscall.GetsockoptInt(int(fd), syscall.SOL_SOCKET, opt) | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The weird thing is that Windows does implement this part of the POSIX standard, Golang just doesn’t have the SO_ERROR definition. So _posix.go would be misleading.
Convention is to simply add no to the build tag suffix: sockopt_nowindows.go
internal/sockopt/sockopt_windows.go
Outdated
| @@ -0,0 +1,9 @@ | |||
| //go:build windows | |||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
plan9 also doesn’t have SO_ERROR, or GetSockoptInt. (Though, in this case, I don’t think we support it, as it already doesn’t build because go-metrics tries to use a non-existent syslog.Writer.)
Also, neither js/wasm nor wasip1/wasm work. These ones at least seem to compile prior to this change.
You may want:
| //go:build windows | |
| //go:build windows || wasm |
Though, this will require renaming the file, as the _windows.go suffix means this build tag for windows is currently redundant, and unnecessary.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would be better to rely on //go:build unix, since the issue has only been observed on Linux so far. From there, we can exclude the OSes that do not define SO_ERROR:
//go:build unix && !android && !illumos && !ios && !hurdWith this condition, the socket error check will apply to the following Unix-like systems:
aix, darwin, dragonfly, freebsd, linux, netbsd, openbsd, and solaris.
//go:build unix is short for the following OSes:
// https://github.com/golang/go/blob/e88be8a128d167c5ab91eabc998ab364370c702e/src/cmd/dist/build.go#L1063C4-L1063C10
// unixOS is the set of GOOS values matched by the "unix" build tag.
// This is the same list as in internal/syslist/syslist.go.
var unixOS = map[string]bool{
"aix": true,
"android": true,
"darwin": true,
"dragonfly": true,
"freebsd": true,
"hurd": true,
"illumos": true,
"ios": true,
"linux": true,
"netbsd": true,
"openbsd": true,
"solaris": true,
}184a7aa to
6a313e4
Compare
6a313e4 to
fbd1452
Compare
puellanivis
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good just one small possible improvement. (The code, as is, is fine though.)
| if b.conn == nil { | ||
| return nil | ||
| } | ||
|
|
||
| conn := b.conn | ||
| if c, ok := conn.(*tls.Conn); ok { | ||
| conn = c.NetConn() | ||
| } | ||
| if c, ok := conn.(*net.TCPConn); ok { | ||
| return getTCPConnSockError(c) | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[improvement] We probably want to test to ensure that the values we’re getting out of the type assertion are not typed nil pointers.
This also means that testing for validity (non-nil) removes the need to explicitly test the success of the type assertion itself; as the result of a failed , ok type assertion is the zero value, which is here the typed nil pointer:
| if b.conn == nil { | |
| return nil | |
| } | |
| conn := b.conn | |
| if c, ok := conn.(*tls.Conn); ok { | |
| conn = c.NetConn() | |
| } | |
| if c, ok := conn.(*net.TCPConn); ok { | |
| return getTCPConnSockError(c) | |
| } | |
| conn := b.conn | |
| if c, _ := conn.(*tls.Conn); c != nil { | |
| conn = c.NetConn() | |
| } | |
| if c, _ := conn.(*net.TCPConn); c != nil { | |
| return getTCPConnSockError(c) | |
| } |
(Note, we can remove the test for b.conn == nil because if it be true, then all type assertions will fail, because as an interface, v == nil means that it is the untyped nil, and thus cannot be assigned to any type. Example: where a typed nil will not compare equal to nil if in an interface: any((*net.TCPConn)(nil)) != nil
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the suggestion! I appreciate the insight about typed nil pointers.
However, I prefer to keep the current implementation for the following reasons:
-
Explicit intent: The
if b.conn == nilcheck at the beginning makes it immediately clear that we're guarding against nil connections. While it's true that type assertions will fail on untyped nil, this explicit check documents the precondition and makes the code easier to understand at first glance. -
Readability: Using
okin type assertions is a common Go idiom that signals "we're checking if this conversion is safe." Testingc != nilafter discardingokis less conventional and may confuse readers who expect to see the standardif x, ok := ...pattern. -
Future-proofing: If
NetConn()or the connection wrapping logic changes in the future, the explicit checks make it easier to understand the control flow and add debugging.
While your suggestion is technically correct and slightly more concise, I believe the current version optimizes for clarity and maintainability, which is especially important for connection management code where bugs can be subtle and hard to debug.
What do you think? If there's a strong preference in the project for the more compact style, I'm happy to adjust.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I am already well aware of the ok pattern, and the nuances behind it, which is specifically why I have been bringing up this point.
Testing that the type-assertion succeeded is not always sufficient—as specifically in this case, a type-nil pointer being in the interface will result in a panic on the receiver method calls. So, we should also be testing for validity of the typed pointer c: c != nil. So, at the very minimum, we should have ok && c != nil, which is a test for success and validity.
I don’t see how my code would be any less future proof than yours. First, the go1 compatibility guarantee is in effect for crypto/tls, so the NetConn behavior shall not change radically enough to force either into breaking.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't believe it's our responsibility to check whether b.conn is a typed-nil pointer here.
If b.conn were a typed-nil pointer—which could only originate from Config.Net.Proxy.Dialer—calling any method on it would panic when attempting to serve Kafka traffic. This is clearly not expected behavior and would indicate a bug in the user's custom Dialer implementation, not in sarama itself.
In other words, b.conn == nil (untyped nil) means the connection hasn't been established yet, which is a valid and expected state that we should handle.
| conn := b.conn | ||
| if c, ok := conn.(*tls.Conn); ok { | ||
| conn = c.NetConn() | ||
| } | ||
| if c, ok := conn.(*net.TCPConn); ok { | ||
| return getTCPConnSockError(c) | ||
| } | ||
| return nil |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Have you tried this to see if it works in practice?
Looking at the code b.conn is stored as *bufConn so I feel like these are both just going to always get !ok and always return nil, so none of the other code will have run
elsewhere we unwrap from bufConn and grab the underlying Conn like this:
conn := b.conn
if bconn, ok := b.conn.(*bufConn); ok {
conn = bconn.Conn
}
if tc, ok := conn.(*tls.Conn); ok {
// etc.
}There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for pointing that out.
After adding c, ok := b.conn.(*bufConn); ok, it now works as expected:
[Sarama] 2025/12/01 11:59:32 client/seedbrokers close seed broker #-1 at xxx.io:9093 due to socket error: broken pipe
[Sarama] 2025/12/01 12:09:32 client/brokers close broker #1081 at 10.xx.xx.xx:9093 due to socket error: broken pipe
However, the current approach has become quite complicated:
conn := b.conn
if c, ok := conn.(*bufConn); ok {
conn = c.Conn
}
if c, ok := conn.(*tls.Conn); ok {
conn = c.NetConn()
}
if c, ok := conn.(*net.TCPConn); ok {
return getTCPConnSockError(c)
}
return nilI ended up paying the complexity cost, and it feels a bit painful—especially since this approach wasn't originally mine.
With a small change, such as adding a comment to the dedicated field b.connTCP, e.g. // used for getSockError, the solution would be much cleaner:
return getTCPConnSockError(b.connTCP)There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don’t know. I don’t think it’s particularly all that complicated?
It’s basically just three c = maybeDeref(c) operations after all. It’s quite simple to track each operation individually, and in a simple series.
fbd1452 to
30f6594
Compare
Prevent TCP connection leaks that error occurred to kernel space's socket. Previously, when errors occurred on TCP sockets, connections would remain in TCP_CLOSE state indefinitely without being properly freed. The leaked connections caused a cascade of kernel-level issues: - TCP sockets remain in TCP_CLOSE state indefinitely - Their sk_receive_queues retain FINACK skb packets - These skbs hold pages allocated from page_pool - page_pool_release_retry() stalls because pages cannot be freed while still referenced by the TCP stack The fix ensures that TCP connections are properly closed when socket errors occur, preventing resource leaks at both the application and kernel levels. Signed-off-by: Leon Hwang <[email protected]>
30f6594 to
52b1d8e
Compare
Prevent TCP connection leaks that error occurred to kernel space's socket. Previously, when errors occurred on TCP sockets, connections would remain in TCP_CLOSE state indefinitely without being properly freed.
The leaked connections caused a cascade of kernel-level issues:
The fix ensures that TCP connections are properly closed when socket errors occur, preventing resource leaks at both the application and kernel levels.