-
Notifications
You must be signed in to change notification settings - Fork 16.3k
Add configurable LRU+TTL caching for API server DAG retrieval #60804
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
The API server's DBDagBag previously used an unbounded dict cache that never expired, causing memory growth in long-running processes. This adds configurable LRU+TTL caching controlled by [api] dag_cache_size and dag_cache_ttl settings. - Add cachetools dependency for LRU/TTL cache implementations - DBDagBag accepts optional cache_size and cache_ttl parameters - API server uses cached DBDagBag; scheduler unchanged (no caching) - Prevent cache thrashing in iter_all_latest_version_dags - Add metrics: dag_bag.cache_hit, dag_bag.cache_miss, dag_bag.cache_clear - Thread-safe cache access with RLock
795bfb1 to
8a1f7fd
Compare
jason810496
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice improvement! LGTM overall.
With 100+ DAGs updating daily, memory grows ~500 MB/day, eventually causing OOM.
It’s surprising that the API server without cache eviction can cause system instability.
| self._dags: MutableMapping[str, SerializedDAG] = {} | ||
|
|
||
| # Initialize cache if cache_size is provided | ||
| if cache_size and cache_size > 0: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure would it be better to use existed _disable_cache as condition?
| if cache_size and cache_size > 0: | |
| if not self._disable_cache: |
| if self._use_cache: | ||
| Stats.incr("api_server.dag_bag.cache_hit") | ||
| return dag | ||
| if self._use_cache: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems we could consolidate _disable_cache and _use_cache as same variable.
| if self._lock and not self._disable_cache: | ||
| with self._lock: | ||
| if dag := self._dags.get(version_id): | ||
| return dag |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| if self._lock and not self._disable_cache: | |
| with self._lock: | |
| if dag := self._dags.get(version_id): | |
| return dag |
If I understand correctly, we have already handled the case where retrieve from the cache before fetching dag_version.serialized_dag.
|
|
||
| class TestCreateDagBag: | ||
| """Tests for create_dag_bag() function.""" | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Although not necessary, we could consolidate these test methods using pytest.mark.parameterize with dag_cache_size, dag_cache_ttl, expected_class.
| assert result == mock_sdm.dag | ||
| assert "test_version" in dag_bag._dags | ||
|
|
||
| def test_read_dag_without_caching(self): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The behavior of this test case doesn't seem to match its naming for test_read_dag_caches_with_lock and test_read_dag_without_caching test cases. Regardless of whether the cache is enabled or not, self._dags[serdag.dag_version_id] = dag is executed in the _read_dag method.
One question. In gunicorn in Airflow 2 we had a way simpler solution. Simply the uvicorn servers have restarted every few (tens?) of minutes or every N requests - effectively cleaning the cache and also getting rid of some other side effects (and for example reloading UI plugins). Since api-server (except the cache) is essentially stateless, that did not have almost any negative side effects - except some load caused on the startup time and database refreshing happening then, but that's not much different than the caching implemented here provides. Additionally that approach was far more "resilient" to any kinds of accumulation-type bugs, yes it was hiding them as well, but the overall stability and resilience to any kind of mistakes made with memory usage, or side-effects of imports or global state sharung was eventually high-up. This approach is named "software rejuvenation" https://ieeexplore.ieee.org/document/466961 - there are some studies and recommendations to use it as it is effectively way more resilient and in complex systems it allows to handle much wide range of issues. Maybe we should explore that as well (or instead) - I am not sure if fast-api/starlette has similar concept, but in case of all kinds of stateless webserves, the technique of restarting them gracefully while load-balancing requests has a long proven history. Should we possibly do it instead of caching LRU/TTL ? That seems way more robust if this is easy and supported by Fast API |
Good idea, worth trying that out too. Marking this as draft to playaround with it |
|
@potiuk Alternate approach is here in #60919 which uses pure uvicorn signals to increment/decrement workers. Limitations:
The LIFO thing is worth noting since it's a non-obvious difference between |
|
Another alternative using gunicorn is in #60940 |
|
Worth now comparing them side-by-side -- and will let other review all 3 of them. Will check back next week |
Fixes memory growth in long-running API servers by adding bounded LRU+TTL caching to
DBDagBag. Previously, the cache was an unbounded dict that never expired, causing memory to grow indefinitely as DAG versions accumulated.Problem
The API server's
DBDagBaguses an internal dict to cacheSerializedDAGobjects (5-50 MB each). This cache:With 100+ DAGs updating daily, memory grows ~500 MB/day, eventually causing OOM.
Solution
Add optional LRU+TTL caching controlled by new
[api]configuration:dag_cache_sizedag_cache_ttlKey Design Decisions
iter_all_latest_version_dags()bypasses cacheConfiguration
Metrics
api_server.dag_bag.cache_hitapi_server.dag_bag.cache_missapi_server.dag_bag.cache_clearapi_server.dag_bag.cache_sizeBackward Compatibility
dag_cache_size = 0)Was generative AI tooling used to co-author this PR?
{pr_number}.significant.rstor{issue_number}.significant.rst, in airflow-core/newsfragments.