-
Notifications
You must be signed in to change notification settings - Fork 34
Replace sorted sets with Redis Search index for session listing (fixes #121) #140
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
c12a39a
75c6e03
3660545
0a5bdaa
db8c991
5545048
db7d820
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -77,4 +77,3 @@ deploy: | |
| announce: | ||
| slack: | ||
| active: NEVER | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -6,6 +6,8 @@ | |
| from datetime import UTC, datetime | ||
|
|
||
| from redis.asyncio import Redis | ||
| from redisvl.query import FilterQuery | ||
| from redisvl.query.filter import Tag | ||
|
|
||
| from agent_memory_server.config import settings | ||
| from agent_memory_server.models import ( | ||
|
|
@@ -225,45 +227,96 @@ async def _migrate_string_to_json( | |
|
|
||
|
|
||
| async def list_sessions( | ||
| redis, | ||
| redis: Redis, | ||
| limit: int = 10, | ||
| offset: int = 0, | ||
| namespace: str | None = None, | ||
| user_id: str | None = None, | ||
| ) -> tuple[int, list[str]]: | ||
| """ | ||
| List sessions | ||
| List sessions using Redis Search index. | ||
|
|
||
| Uses RedisVL FilterQuery on the working memory index to list sessions. | ||
| This approach ensures that expired sessions (via TTL) are automatically | ||
| excluded since Redis Search removes deleted keys from the index. | ||
|
|
||
| Args: | ||
| redis: Redis client | ||
| limit: Maximum number of sessions to return | ||
| offset: Offset for pagination | ||
| namespace: Optional namespace filter | ||
| user_id: Optional user ID filter (not yet implemented - sessions are stored in sorted sets) | ||
| user_id: Optional user ID filter | ||
|
|
||
| Returns: | ||
| Tuple of (total_count, session_ids) | ||
|
|
||
| Note: | ||
| The user_id parameter is accepted for API compatibility but filtering by user_id | ||
| is not yet implemented. This would require changing how sessions are stored to | ||
| enable efficient user_id-based filtering. | ||
| """ | ||
| # Calculate start and end indices (0-indexed start, inclusive end) | ||
| start = offset | ||
| end = offset + limit - 1 | ||
| from agent_memory_server.working_memory_index import get_working_memory_index | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Would move this to the top |
||
|
|
||
| # TODO: This should take a user_id | ||
| sessions_key = Keys.sessions_key(namespace=namespace) | ||
| try: | ||
| # Get the search index | ||
| index = await get_working_memory_index(redis) | ||
|
|
||
| # Build filter expression using Tag filters | ||
| filters = [] | ||
| if namespace: | ||
| filters.append(Tag("namespace") == namespace) | ||
| if user_id: | ||
| filters.append(Tag("user_id") == user_id) | ||
|
|
||
| # Combine filters with AND logic, or use None for no filter | ||
| filter_expression = None | ||
| if filters: | ||
| if len(filters) == 1: | ||
| filter_expression = filters[0] | ||
| else: | ||
| # Combine multiple filters with AND | ||
| from functools import reduce | ||
|
|
||
| async with redis.pipeline() as pipe: | ||
| pipe.zcard(sessions_key) | ||
| pipe.zrange(sessions_key, start, end) | ||
| total, session_ids = await pipe.execute() | ||
| filter_expression = reduce(lambda x, y: x & y, filters) | ||
|
|
||
| return total, [ | ||
| s.decode("utf-8") if isinstance(s, bytes) else s for s in session_ids | ||
| ] | ||
| # Create FilterQuery | ||
| filter_query = FilterQuery( | ||
| filter_expression=filter_expression, | ||
| return_fields=["session_id"], | ||
| num_results=limit + offset, | ||
| ) | ||
|
|
||
| # Execute the query | ||
| raw_results = await index.search(filter_query) | ||
|
|
||
| # Parse results | ||
| docs = getattr(raw_results, "docs", raw_results) or [] | ||
| total = getattr(raw_results, "total", len(docs)) | ||
|
|
||
| # Extract session_ids from results, applying offset | ||
| session_ids = [] | ||
| for doc in docs[offset:]: | ||
| # Handle different doc formats (dict-like or object) | ||
| if hasattr(doc, "__dict__"): | ||
| session_id = getattr(doc, "session_id", None) | ||
| elif isinstance(doc, dict): | ||
| session_id = doc.get("session_id") | ||
| else: | ||
| session_id = dict(doc).get("session_id") | ||
|
|
||
| if session_id: | ||
| # Handle bytes if needed | ||
| if isinstance(session_id, bytes): | ||
| session_id = session_id.decode("utf-8") | ||
| # Remove JSON quotes if present | ||
| if session_id.startswith('"') and session_id.endswith('"'): | ||
| session_id = session_id[1:-1] | ||
| session_ids.append(session_id) | ||
|
|
||
| if len(session_ids) >= limit: | ||
| break | ||
|
|
||
| return total, session_ids | ||
|
|
||
| except Exception as e: | ||
| logger.error(f"Error listing sessions: {e}") | ||
| # Return empty results on error | ||
| return 0, [] | ||
|
Comment on lines
+316
to
+319
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Would prefer not to have silent errors. This can mask real issues, consider using logger.exception() to include the traceback, or re-raise the exception after logging. |
||
|
|
||
|
|
||
| async def get_working_memory( | ||
|
|
@@ -455,12 +508,10 @@ async def set_working_memory( | |
|
|
||
| try: | ||
| # Use Redis native JSON storage | ||
| # The working memory search index automatically indexes this document | ||
| # for session listing (no need for separate sorted set) | ||
| await redis_client.json().set(key, "$", data) | ||
|
|
||
| # Index session in sorted set for listing | ||
| sessions_key = Keys.sessions_key(namespace=working_memory.namespace) | ||
| await redis_client.zadd(sessions_key, {working_memory.session_id: time.time()}) | ||
|
|
||
| if working_memory.ttl_seconds is not None: | ||
| # Set TTL separately for JSON keys | ||
| await redis_client.expire(key, working_memory.ttl_seconds) | ||
|
|
@@ -501,10 +552,9 @@ async def delete_working_memory( | |
| ) | ||
|
|
||
| try: | ||
| # Delete the JSON key - the working memory search index automatically | ||
| # removes the document from the index when the key is deleted | ||
| await redis_client.delete(key) | ||
| # Remove session from sorted set index | ||
| sessions_key = Keys.sessions_key(namespace=namespace) | ||
| await redis_client.zrem(sessions_key, session_id) | ||
| logger.info(f"Deleted working memory for session {session_id}") | ||
|
|
||
| except Exception as e: | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: raise a deprecated warning