-
Notifications
You must be signed in to change notification settings - Fork 2.5k
Defensive fixes #4546
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Defensive fixes #4546
Conversation
📝 WalkthroughWalkthroughNine files updated with small control-flow and cleanup changes: future registration timing in IPC, async stream consumption in STT adapters, channel error-check order, HTTP session recreation on closed sessions, concurrent MCP server teardown, cancelling pending speech tasks, and guarding empty Google STT results. Changes
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~20 minutes Poem
🚥 Pre-merge checks | ✅ 1 | ❌ 2❌ Failed checks (1 warning, 1 inconclusive)
✅ Passed checks (1 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing touches
📜 Recent review detailsConfiguration used: Organization UI Review profile: CHILL Plan: Pro 📒 Files selected for processing (4)
🚧 Files skipped from review as they are similar to previous changes (3)
🧰 Additional context used📓 Path-based instructions (1)**/*.py📄 CodeRabbit inference engine (AGENTS.md)
Files:
🧬 Code graph analysis (1)livekit-agents/livekit/agents/ipc/inference_proc_executor.py (2)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (7)
🔇 Additional comments (1)
✏️ Tip: You can disable this entire section by setting Comment |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 4
🤖 Fix all issues with AI agents
In `@livekit-agents/livekit/agents/ipc/inference_proc_executor.py`:
- Around line 82-89: The code registers request_id/fut into
self._active_requests before awaiting channel.asend_message, but if
channel.asend_message raises the future remains leaked; wrap the await
channel.asend_message(...) call in a try/except (or try/finally) so that on any
exception you remove the entry from self._active_requests (e.g.
self._active_requests.pop(request_id, None)) and cancel the future
(fut.cancel()) before re-raising the exception; reference the existing symbols
request_id, fut, self._active_requests, and channel.asend_message/
proto.InferenceRequest when making the change.
In `@livekit-agents/livekit/agents/ipc/job_proc_lazy_main.py`:
- Around line 132-138: The code registers a future in self._active_requests
before sending and may leave an orphaned entry if
self._client.send(InferenceRequest(...)) raises; update the block around
request_id/fut/_active_requests/_client.send so that you register request_id ->
fut, then perform the send inside a try/except where on any exception you remove
the entry from self._active_requests and cancel the fut (e.g., fut.cancel() or
fut.set_exception(...)) before re-raising the error; ensure this cleanup runs
for all send failures to avoid leaking orphaned futures.
In `@livekit-agents/livekit/agents/voice/speech_handle.py`:
- Around line 174-176: The pending set currently may include the class-level
self._interrupt_fut which must not be cancelled because that corrupts the
interrupted property; change the cancellation so only the speech-processing
gather future(s) are cancelled—filter out self._interrupt_fut from pending (or
directly identify the gather future created for the speech task) and call
utils.aio.cancel_and_wait only with the remaining futures; ensure
self._interrupt_fut is left untouched so self.interrupted continues to reflect
the real interrupt state.
In `@livekit-plugins/livekit-plugins-google/livekit/plugins/google/stt.py`:
- Around line 779-781: process_stream currently indexes resp.results[0] before
calling _streaming_recognize_response_to_speech_data, which can raise IndexError
when resp.results is empty; add a guard in process_stream that checks if
resp.results is truthy (e.g., if not resp.results: continue/return/skip) before
accessing resp.results[0], and only call
_streaming_recognize_response_to_speech_data or perform further processing when
resp.results is non-empty so you never index into an empty list.
🧹 Nitpick comments (1)
livekit-agents/livekit/agents/voice/agent_activity.py (1)
713-716: Good concurrent cleanup pattern, but suppressed exceptions could hide errors.The concurrent
asyncio.gatherwithreturn_exceptions=Trueis a sound approach for parallelized cleanup. However, any exceptions during MCP server closure are silently discarded. Consider logging errors for observability, similar to the pattern used in_start_session(lines 512-517).♻️ Optional: Log any cleanup errors
if self.mcp_servers: - await asyncio.gather( + results = await asyncio.gather( *(mcp_server.aclose() for mcp_server in self.mcp_servers), return_exceptions=True ) + for mcp_server, result in zip(self.mcp_servers, results): + if isinstance(result, BaseException): + logger.warning( + f"failed to close MCP server {mcp_server}", + exc_info=result, + )
📜 Review details
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (9)
livekit-agents/livekit/agents/ipc/inference_proc_executor.pylivekit-agents/livekit/agents/ipc/job_proc_lazy_main.pylivekit-agents/livekit/agents/stt/fallback_adapter.pylivekit-agents/livekit/agents/stt/stream_adapter.pylivekit-agents/livekit/agents/utils/aio/channel.pylivekit-agents/livekit/agents/utils/http_context.pylivekit-agents/livekit/agents/voice/agent_activity.pylivekit-agents/livekit/agents/voice/speech_handle.pylivekit-plugins/livekit-plugins-google/livekit/plugins/google/stt.py
🧰 Additional context used
📓 Path-based instructions (1)
**/*.py
📄 CodeRabbit inference engine (AGENTS.md)
**/*.py: Format code with ruff
Run ruff linter and auto-fix issues
Run mypy type checker in strict mode
Maintain line length of 100 characters maximum
Ensure Python 3.9+ compatibility
Use Google-style docstrings
Files:
livekit-agents/livekit/agents/voice/speech_handle.pylivekit-plugins/livekit-plugins-google/livekit/plugins/google/stt.pylivekit-agents/livekit/agents/stt/fallback_adapter.pylivekit-agents/livekit/agents/voice/agent_activity.pylivekit-agents/livekit/agents/utils/http_context.pylivekit-agents/livekit/agents/ipc/job_proc_lazy_main.pylivekit-agents/livekit/agents/ipc/inference_proc_executor.pylivekit-agents/livekit/agents/utils/aio/channel.pylivekit-agents/livekit/agents/stt/stream_adapter.py
🧬 Code graph analysis (3)
livekit-agents/livekit/agents/voice/speech_handle.py (2)
livekit-agents/livekit/agents/utils/aio/wait_group.py (1)
wait(29-30)livekit-agents/livekit/agents/utils/aio/utils.py (1)
cancel_and_wait(6-23)
livekit-agents/livekit/agents/voice/agent_activity.py (2)
livekit-agents/livekit/agents/voice/agent_session.py (1)
mcp_servers(386-387)livekit-agents/livekit/agents/voice/agent.py (1)
mcp_servers(547-557)
livekit-agents/livekit/agents/utils/http_context.py (2)
livekit-agents/livekit/agents/utils/aio/channel.py (1)
closed(152-153)livekit-agents/livekit/agents/voice/transcription/synchronizer.py (1)
closed(163-164)
🔇 Additional comments (4)
livekit-agents/livekit/agents/stt/fallback_adapter.py (1)
435-437: LGTM — draining the metrics stream avoids backpressure.livekit-agents/livekit/agents/utils/aio/channel.py (1)
90-96: LGTM — ChanClosed should take precedence.livekit-agents/livekit/agents/stt/stream_adapter.py (1)
93-95: LGTM — draining the metrics stream is safer.livekit-agents/livekit/agents/utils/http_context.py (1)
19-19: LGTM! Good defensive fix for handling closed sessions.This correctly handles the edge case where a session may have been closed externally or due to errors. The short-circuit evaluation ensures
g_session.closedis only accessed wheng_sessionis notNone.
✏️ Tip: You can disable this entire section by setting review_details to false in your review settings.
livekit-plugins/livekit-plugins-google/livekit/plugins/google/stt.py
Outdated
Show resolved
Hide resolved
davidzhao
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
these are solid fixes. lgtm
| if pending: | ||
| await utils.aio.cancel_and_wait(*pending) | ||
| if cancellable := [fut for fut in pending if fut is not self._interrupt_fut]: | ||
| await utils.aio.cancel_and_wait(*cancellable) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should let the caller to manage the awaitables instead of cancelling them whenever the speech is interrupted?
Going through detail.dev issue reports and putting some fixes together. Most of them are clean-ups or defensive fixes.
Summary by CodeRabbit
Bug Fixes
Improvements
✏️ Tip: You can customize this high-level summary in your review settings.