Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion .github/workflows/release-java-client.yml
Original file line number Diff line number Diff line change
Expand Up @@ -44,4 +44,3 @@ jobs:
gpg-secret-key: ${{ secrets.GPG_SECRET_KEY }}
sonatype-username: ${{ secrets.SONATYPE_USERNAME }}
sonatype-password: ${{ secrets.SONATYPE_PASSWORD }}

3 changes: 1 addition & 2 deletions agent-memory-client/agent-memory-client-java/RELEASE.md
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ The workflow will:
3. **Publish**: Publish artifacts to the staging repository
4. **Sign**: Sign all artifacts with GPG
5. **Assemble**: Prepare release artifacts with JReleaser
6. **Release**:
6. **Release**:
- Create a GitHub release with changelog
- Deploy to Maven Central
- Tag the repository
Expand Down Expand Up @@ -157,4 +157,3 @@ export JRELEASER_MAVENCENTRAL_PASSWORD=your_password
- [JReleaser Documentation](https://jreleaser.org/)
- [Maven Central Publishing Guide](https://central.sonatype.org/publish/)
- [GPG Signing Guide](https://central.sonatype.org/publish/requirements/gpg/)

1 change: 0 additions & 1 deletion agent-memory-client/agent-memory-client-java/jreleaser.yml
Original file line number Diff line number Diff line change
Expand Up @@ -77,4 +77,3 @@ deploy:
announce:
slack:
active: NEVER

5 changes: 5 additions & 0 deletions agent_memory_server/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -401,6 +401,11 @@ class Settings(BaseSettings):
redisvl_index_prefix: str = "memory_idx"
redisvl_indexing_algorithm: str = "HNSW"

# Working Memory Index Settings
# Used for listing sessions via Redis Search instead of sorted sets
working_memory_index_name: str = "working_memory_idx"
working_memory_index_prefix: str = "working_memory:"

# Deduplication Settings (Store-Time)
# Distance threshold for semantic similarity when deduplicating at store time
# 0.35 works well for catching paraphrased content while avoiding false positives
Expand Down
7 changes: 5 additions & 2 deletions agent_memory_server/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
_redis_pool as connection_pool,
get_redis_conn,
)
from agent_memory_server.working_memory import check_and_set_migration_status
from agent_memory_server.working_memory_index import ensure_working_memory_index


logger = get_logger(__name__)
Expand Down Expand Up @@ -82,10 +84,11 @@ async def lifespan(app: FastAPI):
redis_conn = await get_redis_conn()

# Check if any working memory keys need migration from string to JSON format
from agent_memory_server.working_memory import check_and_set_migration_status

await check_and_set_migration_status(redis_conn)

# Ensure working memory search index exists for session listing
await ensure_working_memory_index(redis_conn)

# Initialize Docket for background tasks if enabled
if settings.use_docket:
logger.info("Attempting to initialize Docket for background tasks.")
Expand Down
19 changes: 18 additions & 1 deletion agent_memory_server/utils/keys.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
"""Redis key utilities."""

import logging
import warnings

from agent_memory_server.config import settings

Expand Down Expand Up @@ -38,9 +39,25 @@ def messages_key(session_id: str, namespace: str | None = None) -> str:

@staticmethod
def sessions_key(namespace: str | None = None) -> str:
"""Get the sessions key for a namespace."""
"""Get the sessions key for a namespace.

DEPRECATED: This method is deprecated. Session listing now uses
Redis Search index on working memory JSON documents instead of
sorted sets. The index automatically handles TTL expiration.
"""
warnings.warn(
"sessions_key() is deprecated. Session listing now uses Redis Search "
"index on working memory JSON documents instead of sorted sets.",
DeprecationWarning,
stacklevel=2,
)
return f"sessions:{namespace}" if namespace else "sessions"
Comment on lines 42 to 54
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: raise a deprecated warning


@staticmethod
def working_memory_index_name() -> str:
"""Return the name of the working memory search index."""
return settings.working_memory_index_name

@staticmethod
def memory_key(id: str) -> str:
"""Get the memory key for an ID."""
Expand Down
104 changes: 77 additions & 27 deletions agent_memory_server/working_memory.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
from datetime import UTC, datetime

from redis.asyncio import Redis
from redisvl.query import FilterQuery
from redisvl.query.filter import Tag

from agent_memory_server.config import settings
from agent_memory_server.models import (
Expand Down Expand Up @@ -225,45 +227,96 @@ async def _migrate_string_to_json(


async def list_sessions(
redis,
redis: Redis,
limit: int = 10,
offset: int = 0,
namespace: str | None = None,
user_id: str | None = None,
) -> tuple[int, list[str]]:
"""
List sessions
List sessions using Redis Search index.

Uses RedisVL FilterQuery on the working memory index to list sessions.
This approach ensures that expired sessions (via TTL) are automatically
excluded since Redis Search removes deleted keys from the index.

Args:
redis: Redis client
limit: Maximum number of sessions to return
offset: Offset for pagination
namespace: Optional namespace filter
user_id: Optional user ID filter (not yet implemented - sessions are stored in sorted sets)
user_id: Optional user ID filter

Returns:
Tuple of (total_count, session_ids)

Note:
The user_id parameter is accepted for API compatibility but filtering by user_id
is not yet implemented. This would require changing how sessions are stored to
enable efficient user_id-based filtering.
"""
# Calculate start and end indices (0-indexed start, inclusive end)
start = offset
end = offset + limit - 1
from agent_memory_server.working_memory_index import get_working_memory_index
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would move this to the top


# TODO: This should take a user_id
sessions_key = Keys.sessions_key(namespace=namespace)
try:
# Get the search index
index = await get_working_memory_index(redis)

# Build filter expression using Tag filters
filters = []
if namespace:
filters.append(Tag("namespace") == namespace)
if user_id:
filters.append(Tag("user_id") == user_id)

# Combine filters with AND logic, or use None for no filter
filter_expression = None
if filters:
if len(filters) == 1:
filter_expression = filters[0]
else:
# Combine multiple filters with AND
from functools import reduce

async with redis.pipeline() as pipe:
pipe.zcard(sessions_key)
pipe.zrange(sessions_key, start, end)
total, session_ids = await pipe.execute()
filter_expression = reduce(lambda x, y: x & y, filters)

return total, [
s.decode("utf-8") if isinstance(s, bytes) else s for s in session_ids
]
# Create FilterQuery
filter_query = FilterQuery(
filter_expression=filter_expression,
return_fields=["session_id"],
num_results=limit + offset,
)

# Execute the query
raw_results = await index.search(filter_query)

# Parse results
docs = getattr(raw_results, "docs", raw_results) or []
total = getattr(raw_results, "total", len(docs))

# Extract session_ids from results, applying offset
session_ids = []
for doc in docs[offset:]:
# Handle different doc formats (dict-like or object)
if hasattr(doc, "__dict__"):
session_id = getattr(doc, "session_id", None)
elif isinstance(doc, dict):
session_id = doc.get("session_id")
else:
session_id = dict(doc).get("session_id")

if session_id:
# Handle bytes if needed
if isinstance(session_id, bytes):
session_id = session_id.decode("utf-8")
# Remove JSON quotes if present
if session_id.startswith('"') and session_id.endswith('"'):
session_id = session_id[1:-1]
session_ids.append(session_id)

if len(session_ids) >= limit:
break

return total, session_ids

except Exception as e:
logger.error(f"Error listing sessions: {e}")
# Return empty results on error
return 0, []
Comment on lines +316 to +319
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would prefer not to have silent errors. This can mask real issues, consider using logger.exception() to include the traceback, or re-raise the exception after logging.



async def get_working_memory(
Expand Down Expand Up @@ -455,12 +508,10 @@ async def set_working_memory(

try:
# Use Redis native JSON storage
# The working memory search index automatically indexes this document
# for session listing (no need for separate sorted set)
await redis_client.json().set(key, "$", data)

# Index session in sorted set for listing
sessions_key = Keys.sessions_key(namespace=working_memory.namespace)
await redis_client.zadd(sessions_key, {working_memory.session_id: time.time()})

if working_memory.ttl_seconds is not None:
# Set TTL separately for JSON keys
await redis_client.expire(key, working_memory.ttl_seconds)
Expand Down Expand Up @@ -501,10 +552,9 @@ async def delete_working_memory(
)

try:
# Delete the JSON key - the working memory search index automatically
# removes the document from the index when the key is deleted
await redis_client.delete(key)
# Remove session from sorted set index
sessions_key = Keys.sessions_key(namespace=namespace)
await redis_client.zrem(sessions_key, session_id)
logger.info(f"Deleted working memory for session {session_id}")

except Exception as e:
Expand Down
Loading
Loading