From 8172b5e23f1e8d952e4f0cd1ca1c47254af5ecee Mon Sep 17 00:00:00 2001 From: Volodymyr Nazarkevych Date: Mon, 1 Dec 2025 18:03:02 +0200 Subject: [PATCH 1/5] feat: create cache_interfaces.py and add it to __init__.py --- growthbook/__init__.py | 5 ++++ growthbook/cache_interfaces.py | 46 ++++++++++++++++++++++++++++++++++ 2 files changed, 51 insertions(+) create mode 100644 growthbook/cache_interfaces.py diff --git a/growthbook/__init__.py b/growthbook/__init__.py index f6559aa..4569682 100644 --- a/growthbook/__init__.py +++ b/growthbook/__init__.py @@ -7,6 +7,11 @@ BackoffStrategy ) +from .cache_interfaces import ( + AbstractFeatureCache, + AbstractAsyncFeatureCache +) + # Plugin support from .plugins import ( GrowthBookTrackingPlugin, diff --git a/growthbook/cache_interfaces.py b/growthbook/cache_interfaces.py new file mode 100644 index 0000000..25a5124 --- /dev/null +++ b/growthbook/cache_interfaces.py @@ -0,0 +1,46 @@ +from abc import abstractmethod, ABC +from typing import Optional, Dict + +class AbstractFeatureCache(ABC): + @abstractmethod + def get(self, key: str) -> Optional[Dict]: + pass + + @abstractmethod + def set(self, key: str, value: Dict, ttl: int) -> None: + pass + + def clear(self) -> None: + pass + +class AbstractAsyncFeatureCache(ABC): + """Abstract base class for async feature caching implementations""" + + @abstractmethod + async def get(self, key: str) -> Optional[Dict]: + """ + Retrieve cached features by key. + + Args: + key: Cache key + + Returns: + Cached dictionary or None if not found/expired + """ + pass + + @abstractmethod + async def set(self, key: str, value: Dict, ttl: int) -> None: + """ + Store features in cache with TTL. + + Args: + key: Cache key + value: Features dictionary to cache + ttl: Time to live in seconds + """ + pass + + async def clear(self) -> None: + """Clear all cached entries (optional to override)""" + pass From 02ac096fd0160d73a06bbdcfdecf0913dfe382db Mon Sep 17 00:00:00 2001 From: Volodymyr Nazarkevych Date: Mon, 1 Dec 2025 18:04:35 +0200 Subject: [PATCH 2/5] feat: add cache and async_cache properties to Options class --- growthbook/common_types.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/growthbook/common_types.py b/growthbook/common_types.py index 916f4d3..97db9e1 100644 --- a/growthbook/common_types.py +++ b/growthbook/common_types.py @@ -12,6 +12,7 @@ from typing import Any, Callable, Dict, List, Optional, Union, Set, Tuple from enum import Enum from abc import ABC, abstractmethod +from .cache_interfaces import AbstractFeatureCache, AbstractAsyncFeatureCache class VariationMeta(TypedDict): key: str @@ -396,7 +397,7 @@ def get_all_assignments(self, attributes: Dict[str, str]) -> Dict[str, Dict]: return docs @dataclass -class StackContext: +class StackContext: id: Optional[str] = None evaluated_features: Set[str] = field(default_factory=set) @@ -431,6 +432,8 @@ class Options: on_experiment_viewed: Optional[Callable[[Experiment, Result, Optional[UserContext]], None]] = None on_feature_usage: Optional[Callable[[str, 'FeatureResult', UserContext], None]] = None tracking_plugins: Optional[List[Any]] = None + cache: Optional[AbstractFeatureCache] = None + async_cache: Optional[AbstractAsyncFeatureCache] = None @dataclass From 031a67faec9076dd5aeb7747333ed93e1cf8a28b Mon Sep 17 00:00:00 2001 From: Volodymyr Nazarkevych Date: Mon, 1 Dec 2025 18:06:46 +0200 Subject: [PATCH 3/5] feat: implement InMemoryAsyncFeatureCache, add set_async_cache in FeatureRepository, integrate async cache into load_features_async --- growthbook/growthbook.py | 176 +++++++++++++++++++++++---------------- 1 file changed, 106 insertions(+), 70 deletions(-) diff --git a/growthbook/growthbook.py b/growthbook/growthbook.py index 28f5929..59b26aa 100644 --- a/growthbook/growthbook.py +++ b/growthbook/growthbook.py @@ -14,17 +14,18 @@ from abc import ABC, abstractmethod from typing import Optional, Any, Set, Tuple, List, Dict, Callable -from .common_types import ( EvaluationContext, - Experiment, - FeatureResult, - Feature, - GlobalContext, - Options, - Result, StackContext, - UserContext, - AbstractStickyBucketService, - FeatureRule -) +from .cache_interfaces import AbstractFeatureCache, AbstractAsyncFeatureCache +from .common_types import (EvaluationContext, + Experiment, + FeatureResult, + Feature, + GlobalContext, + Options, + Result, StackContext, + UserContext, + AbstractStickyBucketService, + FeatureRule + ) # Only require typing_extensions if using Python 3.7 or earlier if sys.version_info >= (3, 8): @@ -63,19 +64,6 @@ def decrypt(encrypted_str: str, key_str: str) -> str: return bytestring.decode("utf-8") -class AbstractFeatureCache(ABC): - @abstractmethod - def get(self, key: str) -> Optional[Dict]: - pass - - @abstractmethod - def set(self, key: str, value: Dict, ttl: int) -> None: - pass - - def clear(self) -> None: - pass - - class CacheEntry(object): def __init__(self, value: Dict, ttl: int) -> None: self.value = value @@ -107,6 +95,35 @@ def set(self, key: str, value: Dict, ttl: int) -> None: def clear(self) -> None: self.cache.clear() +class InMemoryAsyncFeatureCache(AbstractAsyncFeatureCache): + """ + Async in-memory cache implementation. + Uses the same CacheEntry structure but with async interface. + """ + + def __init__(self) -> None: + self._cache: Dict[str, CacheEntry] = {} + self._lock = asyncio.Lock() + + async def get(self, key: str) -> Optional[Dict]: + async with self._lock: + if key in self._cache: + entry = self._cache[key] + if entry.expires >= time(): + return entry.value + return None + + async def set(self, key: str, value: Dict, ttl: int) -> None: + async with self._lock: + if key in self._cache: + self._cache[key].update(value) + else: + self._cache[key] = CacheEntry(value, ttl) + + async def clear(self) -> None: + async with self._lock: + self._cache.clear() + class InMemoryStickyBucketService(AbstractStickyBucketService): def __init__(self) -> None: self.docs: Dict[str, Dict] = {} @@ -157,7 +174,7 @@ def disconnect(self, timeout=10): """Gracefully disconnect with timeout""" logger.debug("Initiating SSE client disconnect") self.is_running = False - + if self._loop and self._loop.is_running(): future = asyncio.run_coroutine_threadsafe(self._stop_session(timeout), self._loop) try: @@ -188,11 +205,11 @@ def _get_sse_url(self, api_host: str, client_key: str) -> str: async def _init_session(self): url = self._get_sse_url(self.api_host, self.client_key) - + try: while self.is_running: try: - async with aiohttp.ClientSession(headers=self.headers, + async with aiohttp.ClientSession(headers=self.headers, timeout=aiohttp.ClientTimeout(connect=self.timeout)) as session: self._sse_session = session @@ -233,7 +250,7 @@ async def _process_response(self, response): if not self.is_running: logger.debug("SSE processing stopped - is_running is False") break - + decoded_line = line.decode('utf-8').strip() if decoded_line.startswith("event:"): event_data['type'] = decoded_line[len("event:"):].strip() @@ -246,7 +263,7 @@ async def _process_response(self, response): except Exception as e: logger.warning(f"Error in event handler: {e}") event_data = {} - + # Process any remaining event data if 'type' in event_data and 'data' in event_data: try: @@ -275,7 +292,7 @@ async def _close_session(self): def _run_sse_channel(self): self._loop = asyncio.new_event_loop() - + try: self._loop.run_until_complete(self._init_session()) except asyncio.CancelledError: @@ -287,7 +304,7 @@ def _run_sse_channel(self): async def _stop_session(self, timeout=10): """Stop the SSE session and cancel all tasks with timeout""" logger.debug("Stopping SSE session") - + # Close the session first if self._sse_session and not self._sse_session.closed: try: @@ -300,15 +317,15 @@ async def _stop_session(self, timeout=10): if self._loop and self._loop.is_running(): try: # Get all tasks for this specific loop - tasks = [task for task in asyncio.all_tasks(self._loop) + tasks = [task for task in asyncio.all_tasks(self._loop) if not task.done() and task is not asyncio.current_task(self._loop)] - + if tasks: logger.debug(f"Cancelling {len(tasks)} SSE tasks") # Cancel all tasks for task in tasks: task.cancel() - + # Wait for tasks to complete with timeout try: await asyncio.wait_for( @@ -326,10 +343,11 @@ async def _stop_session(self, timeout=10): class FeatureRepository(object): def __init__(self) -> None: self.cache: AbstractFeatureCache = InMemoryFeatureCache() + self.async_cache: Optional[AbstractAsyncFeatureCache] = None self.http: Optional[PoolManager] = None self.sse_client: Optional[SSEClient] = None self._feature_update_callbacks: List[Callable[[Dict], None]] = [] - + # Background refresh support self._refresh_thread: Optional[threading.Thread] = None self._refresh_stop_event = threading.Event() @@ -338,6 +356,13 @@ def __init__(self) -> None: def set_cache(self, cache: AbstractFeatureCache) -> None: self.cache = cache + def set_async_cache(self, cache: AbstractAsyncFeatureCache) -> None: + """ + Set asynchronous cache implementation. + When set, load_features_async() will use this instead of sync cache. + """ + self.async_cache = cache + def clear_cache(self): self.cache.clear() @@ -368,7 +393,7 @@ def load_features( ) -> Optional[Dict]: if not client_key: raise ValueError("Must specify `client_key` to refresh features") - + key = api_host + "::" + client_key cached = self.cache.get(key) @@ -381,20 +406,31 @@ def load_features( self._notify_feature_update_callbacks(res) return res return cached - - + async def load_features_async( self, api_host: str, client_key: str, decryption_key: str = "", ttl: int = 600 ) -> Optional[Dict]: + if not client_key: + raise ValueError("Must specify `client_key` to refresh features") + key = api_host + "::" + client_key - cached = self.cache.get(key) + # Use async cache if existed, unless fallback to sync + if self.async_cache: + cached = await self.async_cache.get(key) # Async + else: + cached = self.cache.get(key) # Fallback to sync cache + if not cached: res = await self._fetch_features_async(api_host, client_key, decryption_key) if res is not None: - self.cache.set(key, res, ttl) + # save in cache + if self.async_cache: + await self.async_cache.set(key, res, ttl) # Async! + else: + self.cache.set(key, res, ttl) + logger.debug("Fetched features from API, stored in cache") - # Notify callbacks about fresh features self._notify_feature_update_callbacks(res) return res return cached @@ -403,7 +439,7 @@ async def load_features_async( def _get(self, url: str): self.http = self.http or PoolManager() return self.http.request("GET", url) - + def _fetch_and_decode(self, api_host: str, client_key: str) -> Optional[Dict]: try: r = self._get(self._get_features_url(api_host, client_key)) @@ -417,7 +453,7 @@ def _fetch_and_decode(self, api_host: str, client_key: str) -> Optional[Dict]: except Exception: logger.warning("Failed to decode feature JSON from GrowthBook API") return None - + async def _fetch_and_decode_async(self, api_host: str, client_key: str) -> Optional[Dict]: try: url = self._get_features_url(api_host, client_key) @@ -434,7 +470,7 @@ async def _fetch_and_decode_async(self, api_host: str, client_key: str) -> Optio except Exception as e: logger.warning("Failed to decode feature JSON from GrowthBook API: %s", e) return None - + def decrypt_response(self, data, decryption_key: str): if "encryptedFeatures" in data: if not decryption_key: @@ -450,7 +486,7 @@ def decrypt_response(self, data, decryption_key: str): return None elif "features" not in data: logger.warning("GrowthBook API response missing features") - + if "encryptedSavedGroups" in data: if not decryption_key: raise ValueError("Must specify decryption_key") @@ -463,7 +499,7 @@ def decrypt_response(self, data, decryption_key: str): logger.warning( "Failed to decrypt saved groups from GrowthBook API response" ) - + return data # Fetch features from the GrowthBook API @@ -477,7 +513,7 @@ def _fetch_features( data = self.decrypt_response(decoded, decryption_key) return data # type: ignore[no-any-return] - + async def _fetch_features_async( self, api_host: str, client_key: str, decryption_key: str = "" ) -> Optional[Dict]: @@ -501,7 +537,7 @@ def stopAutoRefresh(self, timeout=10): if self.sse_client: self.sse_client.disconnect(timeout=timeout) self.sse_client = None - + def start_background_refresh(self, api_host: str, client_key: str, decryption_key: str, ttl: int = 600, refresh_interval: int = 300) -> None: """Start periodic background refresh task""" @@ -511,7 +547,7 @@ def start_background_refresh(self, api_host: str, client_key: str, decryption_ke with self._refresh_lock: if self._refresh_thread is not None: return # Already running - + self._refresh_stop_event.clear() self._refresh_thread = threading.Thread( target=self._background_refresh_worker, @@ -520,7 +556,7 @@ def start_background_refresh(self, api_host: str, client_key: str, decryption_ke ) self._refresh_thread.start() logger.debug("Started background refresh task") - + def _background_refresh_worker(self, api_host: str, client_key: str, decryption_key: str, ttl: int, refresh_interval: int) -> None: """Worker method for periodic background refresh""" while not self._refresh_stop_event.is_set(): @@ -528,7 +564,7 @@ def _background_refresh_worker(self, api_host: str, client_key: str, decryption_ # Wait for the refresh interval or stop event if self._refresh_stop_event.wait(refresh_interval): break # Stop event was set - + logger.debug("Background refresh for Features - started") res = self._fetch_features(api_host, client_key, decryption_key) if res is not None: @@ -541,11 +577,11 @@ def _background_refresh_worker(self, api_host: str, client_key: str, decryption_ logger.warning("Background refresh failed") except Exception as e: logger.warning(f"Background refresh error: {e}") - + def stop_background_refresh(self) -> None: """Stop background refresh task""" self._refresh_stop_event.set() - + with self._refresh_lock: if self._refresh_thread is not None: self._refresh_thread.join(timeout=1.0) # Wait up to 1 second @@ -647,7 +683,7 @@ def __init__( ), features={}, saved_groups=self._saved_groups - ) + ) # Create a user context for the current user self._user_ctx: UserContext = UserContext( url=self._url, @@ -675,7 +711,7 @@ def __init__( # Start background refresh task for stale-while-revalidate self.load_features() # Initial load feature_repo.start_background_refresh( - self._api_host, self._client_key, self._decryption_key, + self._api_host, self._client_key, self._decryption_key, self._cache_ttl, self._stale_ttl ) @@ -715,7 +751,7 @@ def _features_event_handler(self, features): decoded = json.loads(features) if not decoded: return None - + data = feature_repo.decrypt_response(decoded, self._decryption_key) if data is not None: @@ -737,9 +773,9 @@ def _dispatch_sse_event(self, event_data): def startAutoRefresh(self): if not self._client_key: raise ValueError("Must specify `client_key` to start features streaming") - + feature_repo.startAutoRefresh( - api_host=self._api_host, + api_host=self._api_host, client_key=self._client_key, cb=self._dispatch_sse_event, streaming_timeout=self._streaming_timeout @@ -804,34 +840,34 @@ def get_attributes(self) -> dict: def destroy(self, timeout=10) -> None: """Gracefully destroy the GrowthBook instance""" logger.debug("Starting GrowthBook destroy process") - + try: # Clean up plugins logger.debug("Cleaning up plugins") self._cleanup_plugins() except Exception as e: logger.warning(f"Error cleaning up plugins: {e}") - + try: logger.debug("Stopping auto refresh during destroy") self.stopAutoRefresh(timeout=timeout) except Exception as e: logger.warning(f"Error stopping auto refresh during destroy: {e}") - + try: # Stop background refresh operations if self._stale_while_revalidate and self._client_key: feature_repo.stop_background_refresh() except Exception as e: logger.warning(f"Error stopping background refresh during destroy: {e}") - + try: # Clean up feature update callback if self._client_key: feature_repo.remove_feature_update_callback(self._on_feature_update) except Exception as e: logger.warning(f"Error removing feature update callback: {e}") - + # Clear all internal state try: self._subscriptions.clear() @@ -873,14 +909,14 @@ def get_feature_value(self, key: str, fallback): def evalFeature(self, key: str) -> FeatureResult: warnings.warn("evalFeature is deprecated, use eval_feature instead", DeprecationWarning) return self.eval_feature(key) - + def _ensure_fresh_features(self) -> None: """Lazy refresh: Check cache expiry and refresh if needed, but only if client_key is provided""" - + # Prevent infinite recursion when updating features (e.g., during sticky bucket refresh) if self._is_updating_features: return - + if self._streaming or self._stale_while_revalidate or not self._client_key: return # Skip cache checks - SSE or background refresh handles freshness @@ -892,7 +928,7 @@ def _ensure_fresh_features(self) -> None: def _get_eval_context(self) -> EvaluationContext: # Lazy refresh: ensure features are fresh before evaluation self._ensure_fresh_features() - + # use the latest attributes for every evaluation. self._user_ctx.attributes = self._attributes self._user_ctx.url = self._url @@ -906,8 +942,8 @@ def _get_eval_context(self) -> EvaluationContext: ) def eval_feature(self, key: str) -> FeatureResult: - result = core_eval_feature(key=key, - evalContext=self._get_eval_context(), + result = core_eval_feature(key=key, + evalContext=self._get_eval_context(), callback_subscription=self._fireSubscriptions, tracking_cb=self._track ) @@ -946,7 +982,7 @@ def _fireSubscriptions(self, experiment: Experiment, result: Result): def run(self, experiment: Experiment) -> Result: # result = self._run(experiment) - result = run_experiment(experiment=experiment, + result = run_experiment(experiment=experiment, evalContext=self._get_eval_context(), tracking_cb=self._track ) From ba5fb98de05777d7bbd389c4282027c5b3f43ee6 Mon Sep 17 00:00:00 2001 From: Volodymyr Nazarkevych Date: Mon, 1 Dec 2025 18:07:31 +0200 Subject: [PATCH 4/5] feat: integrate async cache setting in GrowthBookClient init block --- growthbook/growthbook_client.py | 80 +++++++++++++++++++-------------- 1 file changed, 46 insertions(+), 34 deletions(-) diff --git a/growthbook/growthbook_client.py b/growthbook/growthbook_client.py index bdd2af9..9aed92d 100644 --- a/growthbook/growthbook_client.py +++ b/growthbook/growthbook_client.py @@ -43,9 +43,9 @@ def __call__(cls, *args, **kwargs): class BackoffStrategy: """Exponential backoff with jitter for failed requests""" def __init__( - self, - initial_delay: float = 1.0, - max_delay: float = 60.0, + self, + initial_delay: float = 1.0, + max_delay: float = 60.0, multiplier: float = 2.0, jitter: float = 0.1 ): @@ -59,7 +59,7 @@ def __init__( def next_delay(self) -> float: """Calculate next delay with jitter""" delay = min( - self.current_delay * (self.multiplier ** self.attempt), + self.current_delay * (self.multiplier ** self.attempt), self.max_delay ) # Add random jitter @@ -252,7 +252,7 @@ async def refresh_loop() -> None: async def start_feature_refresh(self, strategy: FeatureRefreshStrategy, callback=None): """Initialize feature refresh based on strategy""" self._refresh_callback = callback - + if strategy == FeatureRefreshStrategy.SERVER_SENT_EVENTS: await self._start_sse_refresh() else: @@ -281,7 +281,7 @@ async def __aenter__(self): async def __aexit__(self, exc_type, exc_val, exc_tb): await self.stop_refresh() - + async def load_features_async( self, api_host: str, client_key: str, decryption_key: str = "", ttl: int = 60 ) -> Optional[Dict]: @@ -295,17 +295,17 @@ class GrowthBookClient: def __init__( self, options: Optional[Union[Dict[str, Any], Options]] = None - ): + ): self.options = ( options if isinstance(options, Options) else Options(**options) if options else Options() ) - + # Thread-safe tracking state self._tracked: Dict[str, bool] = {} # Access only within async context self._tracked_lock = threading.Lock() - + # Thread-safe subscription management self._subscriptions: Set[Callable[[Experiment, Result], None]] = set() self._subscriptions_lock = threading.Lock() @@ -316,25 +316,37 @@ def __init__( 'assignments': {} } self._sticky_bucket_cache_lock = False - + # Plugin support self._tracking_plugins: List[Any] = self.options.tracking_plugins or [] self._initialized_plugins: List[Any] = [] - + self._features_repository = ( EnhancedFeatureRepository( - self.options.api_host or "https://cdn.growthbook.io", - self.options.client_key or "", - self.options.decryption_key or "", + self.options.api_host or "https://cdn.growthbook.io", + self.options.client_key or "", + self.options.decryption_key or "", self.options.cache_ttl ) if self.options.client_key else None ) - + + # Check if repo was initialized + if self._features_repository is not None: + # 1. set sync cache + if self.options.cache is not None: + self._features_repository.set_cache(self.options.cache) + logger.debug("Custom sync cache set for FeatureRepository.") + + # 2. set async cache + if self.options.async_cache is not None: + self._features_repository.set_async_cache(self.options.async_cache) + logger.debug("Custom async cache set for FeatureRepository.") + self._global_context: Optional[GlobalContext] = None self._context_lock = asyncio.Lock() - + # Initialize plugins self._initialize_plugins() @@ -383,8 +395,8 @@ def _fire_subscriptions(self, experiment: Experiment, result: Result) -> None: async def set_features(self, features: dict) -> None: await self._feature_update_callback({"features": features}) - - + + async def _refresh_sticky_buckets(self, attributes: Dict[str, Any]) -> Dict[str, Any]: """Refresh sticky bucket assignments only if attributes have changed""" if not self.options.sticky_bucket_service: @@ -394,7 +406,7 @@ async def _refresh_sticky_buckets(self, attributes: Dict[str, Any]) -> Dict[str, while not self._sticky_bucket_cache_lock: if attributes == self._sticky_bucket_cache['attributes']: return self._sticky_bucket_cache['assignments'] - + self._sticky_bucket_cache_lock = True try: assignments = self.options.sticky_bucket_service.get_all_assignments(attributes) @@ -403,7 +415,7 @@ async def _refresh_sticky_buckets(self, attributes: Dict[str, Any]) -> Dict[str, return assignments finally: self._sticky_bucket_cache_lock = False - + # Fallback return for edge case where loop condition is never satisfied return {} @@ -416,9 +428,9 @@ async def initialize(self) -> bool: try: # Initial feature load initial_features = await self._features_repository.load_features_async( - self.options.api_host or "https://cdn.growthbook.io", - self.options.client_key or "", - self.options.decryption_key or "", + self.options.api_host or "https://cdn.growthbook.io", + self.options.client_key or "", + self.options.decryption_key or "", self.options.cache_ttl ) if not initial_features: @@ -427,15 +439,15 @@ async def initialize(self) -> bool: # Create global context with initial features await self._feature_update_callback(initial_features) - + # Set up callback for future updates self._features_repository.add_callback(self._feature_update_callback) - + # Start feature refresh refresh_strategy = self.options.refresh_strategy or FeatureRefreshStrategy.STALE_WHILE_REVALIDATE await self._features_repository.start_feature_refresh(refresh_strategy) return True - + except Exception as e: logger.error(f"Initialization failed: {str(e)}", exc_info=True) traceback.print_exc() @@ -482,10 +494,10 @@ async def create_evaluation_context(self, user_context: UserContext) -> Evaluati """Create evaluation context for feature evaluation""" if self._global_context is None: raise RuntimeError("GrowthBook client not properly initialized") - + # Get sticky bucket assignments if needed sticky_assignments = await self._refresh_sticky_buckets(user_context.attributes) - + # update user context with sticky bucket assignments user_context.sticky_bucket_assignment_docs = sticky_assignments @@ -520,7 +532,7 @@ async def is_on(self, key: str, user_context: UserContext) -> bool: except Exception: logger.exception("Error in feature usage callback") return result.on - + async def is_off(self, key: str, user_context: UserContext) -> bool: """Check if a feature is set to off with proper async context management""" async with self._context_lock: @@ -533,7 +545,7 @@ async def is_off(self, key: str, user_context: UserContext) -> bool: except Exception: logger.exception("Error in feature usage callback") return result.off - + async def get_feature_value(self, key: str, fallback: Any, user_context: UserContext) -> Any: async with self._context_lock: context = await self.create_evaluation_context(user_context) @@ -551,14 +563,14 @@ async def run(self, experiment: Experiment, user_context: UserContext) -> Result async with self._context_lock: context = await self.create_evaluation_context(user_context) result = run_experiment( - experiment=experiment, + experiment=experiment, evalContext=context, tracking_cb=self._track ) # Fire subscriptions synchronously self._fire_subscriptions(experiment, result) return result - + async def close(self) -> None: """Clean shutdown with proper cleanup""" if self._features_repository: @@ -573,7 +585,7 @@ async def close(self) -> None: # Clear context async with self._context_lock: self._global_context = None - + # Cleanup plugins self._cleanup_plugins() @@ -605,4 +617,4 @@ def _cleanup_plugins(self) -> None: logger.debug(f"Cleaned up plugin: {plugin.__class__.__name__}") except Exception as e: logger.error(f"Error cleaning up plugin {plugin}: {e}") - self._initialized_plugins.clear() \ No newline at end of file + self._initialized_plugins.clear() From 084c29ceab0d4e616c7dc08fec995a9f0f07572d Mon Sep 17 00:00:00 2001 From: Volodymyr Nazarkevych Date: Thu, 4 Dec 2025 11:10:08 +0200 Subject: [PATCH 5/5] refactor import --- growthbook/growthbook.py | 39 ++++++++++++++++++++++----------------- 1 file changed, 22 insertions(+), 17 deletions(-) diff --git a/growthbook/growthbook.py b/growthbook/growthbook.py index 91e015b..926e37e 100644 --- a/growthbook/growthbook.py +++ b/growthbook/growthbook.py @@ -13,7 +13,7 @@ from abc import ABC, abstractmethod from typing import Optional, Any, Set, Tuple, List, Dict, Callable - +from collections import OrderedDict from .cache_interfaces import AbstractFeatureCache, AbstractAsyncFeatureCache from .common_types import (EvaluationContext, Experiment, @@ -47,6 +47,7 @@ logger = logging.getLogger("growthbook") + def decrypt(encrypted_str: str, key_str: str) -> str: iv_str, ct_str = encrypted_str.split(".", 2) @@ -95,6 +96,7 @@ def set(self, key: str, value: Dict, ttl: int) -> None: def clear(self) -> None: self.cache.clear() + class InMemoryAsyncFeatureCache(AbstractAsyncFeatureCache): """ Async in-memory cache implementation. @@ -124,6 +126,7 @@ async def clear(self) -> None: async with self._lock: self._cache.clear() + class InMemoryStickyBucketService(AbstractStickyBucketService): def __init__(self) -> None: self.docs: Dict[str, Dict] = {} @@ -210,7 +213,7 @@ async def _init_session(self): while self.is_running: try: async with aiohttp.ClientSession(headers=self.headers, - timeout=aiohttp.ClientTimeout(connect=self.timeout)) as session: + timeout=aiohttp.ClientTimeout(connect=self.timeout)) as session: self._sse_session = session async with session.get(url) as response: @@ -318,7 +321,7 @@ async def _stop_session(self, timeout=10): try: # Get all tasks for this specific loop tasks = [task for task in asyncio.all_tasks(self._loop) - if not task.done() and task is not asyncio.current_task(self._loop)] + if not task.done() and task is not asyncio.current_task(self._loop)] if tasks: logger.debug(f"Cancelling {len(tasks)} SSE tasks") @@ -340,9 +343,6 @@ async def _stop_session(self, timeout=10): except Exception as e: logger.warning(f"Error during SSE task cleanup: {e}") -from collections import OrderedDict - -# ... (imports) class FeatureRepository(object): def __init__(self) -> None: @@ -533,7 +533,8 @@ async def _fetch_and_decode_async(self, api_host: str, client_key: str) -> Optio async with session.get(url, headers=headers) as response: # Handle 304 Not Modified - content hasn't changed if response.status == 304: - logger.debug(f"[Async] ETag match! Server returned 304 Not Modified - using cached data (saved bandwidth)") + logger.debug( + f"[Async] ETag match! Server returned 304 Not Modified - using cached data (saved bandwidth)") if cached_data is not None: logger.debug(f"[Async] Returning cached response ({len(str(cached_data))} bytes)") return cached_data @@ -559,7 +560,8 @@ async def _fetch_and_decode_async(self, api_host: str, client_key: str) -> Optio if cached_etag: logger.debug(f"[Async] ETag updated: {cached_etag[:20]}... -> {response_etag[:20]}...") else: - logger.debug(f"[Async] New ETag cached: {response_etag[:20]}... ({len(str(decoded))} bytes)") + logger.debug( + f"[Async] New ETag cached: {response_etag[:20]}... ({len(str(decoded))} bytes)") logger.debug(f"[Async] ETag cache now contains {len(self._etag_cache)} entries") else: logger.debug("[Async] No ETag header in response") @@ -626,11 +628,11 @@ async def _fetch_features_async( return data # type: ignore[no-any-return] - def startAutoRefresh(self, api_host, client_key, cb, streaming_timeout=30): if not client_key: raise ValueError("Must specify `client_key` to start features streaming") - self.sse_client = self.sse_client or SSEClient(api_host=api_host, client_key=client_key, on_event=cb, timeout=streaming_timeout) + self.sse_client = self.sse_client or SSEClient(api_host=api_host, client_key=client_key, on_event=cb, + timeout=streaming_timeout) self.sse_client.connect() def stopAutoRefresh(self, timeout=10): @@ -639,7 +641,8 @@ def stopAutoRefresh(self, timeout=10): self.sse_client.disconnect(timeout=timeout) self.sse_client = None - def start_background_refresh(self, api_host: str, client_key: str, decryption_key: str, ttl: int = 600, refresh_interval: int = 300) -> None: + def start_background_refresh(self, api_host: str, client_key: str, decryption_key: str, ttl: int = 600, + refresh_interval: int = 300) -> None: """Start periodic background refresh task""" if not client_key: @@ -658,7 +661,8 @@ def start_background_refresh(self, api_host: str, client_key: str, decryption_ke self._refresh_thread.start() logger.debug("Started background refresh task") - def _background_refresh_worker(self, api_host: str, client_key: str, decryption_key: str, ttl: int, refresh_interval: int) -> None: + def _background_refresh_worker(self, api_host: str, client_key: str, decryption_key: str, ttl: int, + refresh_interval: int) -> None: """Worker method for periodic background refresh""" while not self._refresh_stop_event.is_set(): try: @@ -698,6 +702,7 @@ def _get_features_url(api_host: str, client_key: str) -> str: # Singleton instance feature_repo = FeatureRepository() + class GrowthBook(object): def __init__( self, @@ -759,7 +764,8 @@ def __init__( self._user = user self._groups = groups self._overrides = overrides - self._forcedVariations = (forced_variations if forced_variations is not None else forcedVariations) if forced_variations is not None or forcedVariations else {} + self._forcedVariations = ( + forced_variations if forced_variations is not None else forcedVariations) if forced_variations is not None or forcedVariations else {} self._tracked: Dict[str, Any] = {} self._assigned: Dict[str, Any] = {} @@ -870,7 +876,6 @@ def _dispatch_sse_event(self, event_data): elif event_type == 'features': self._features_event_handler(data) - def startAutoRefresh(self): if not self._client_key: raise ValueError("Must specify `client_key` to start features streaming") @@ -1037,9 +1042,9 @@ def _get_eval_context(self) -> EvaluationContext: # set the url for every evaluation. (unlikely to change) self._global_ctx.options.url = self._url return EvaluationContext( - global_ctx = self._global_ctx, - user = self._user_ctx, - stack = StackContext(evaluated_features=set()) + global_ctx=self._global_ctx, + user=self._user_ctx, + stack=StackContext(evaluated_features=set()) ) def eval_feature(self, key: str) -> FeatureResult: