import threading from datetime import timedelta from requests_cache import CachedSession # Cache for "semi-static" data such as the locations of parks, CSVs of reference lists, etc. # This has an expiry time of 30 days, so will re-request from the source after that amount # of time has passed. This is used throughout Spothole to cache data that does not change # rapidly. The ThreadSafeSession construct here protects it against some multithreading # contention weirdness we sometimes used to see on startup where the cache was hammered # pretty hard. _session = CachedSession("cache/semi_static_url_data_cache", expire_after=timedelta(days=30)) _lock = threading.Lock() class _ThreadSafeSession: """Wraps CachedSession with a lock to prevent concurrent SQLite access across threads.""" def get(self, *args, **kwargs): with _lock: return _session.get(*args, **kwargs) SEMI_STATIC_URL_DATA_CACHE = _ThreadSafeSession()