Skip to content

PSv2: Track and display image count progress and state#1121

Open
carlosgjs wants to merge 27 commits intoRolnickLab:mainfrom
uw-ssec:carlos/trackcounts
Open

PSv2: Track and display image count progress and state#1121
carlosgjs wants to merge 27 commits intoRolnickLab:mainfrom
uw-ssec:carlos/trackcounts

Conversation

@carlosgjs
Copy link
Collaborator

@carlosgjs carlosgjs commented Feb 6, 2026

Summary

This pull request introduces progress tracking of various counts for async image processing jobs. It brings the async path to parity with the sync path. This includes setting the JobState according to the ratio of failures.

  • Added tracking of failed images and cumulative counts for detections, classifications, and captures in TaskStateManager and TaskProgress, including new Redis keys and logic for updating and cleaning up these metrics. * Updated process_nats_pipeline_result in ami/jobs/tasks.py to calculate and pass failed image IDs and detection/classification/capture counts to TaskStateManager, and to reflect these in job progress updates.
  • Modified _update_job_progress to support custom completion states (success/failure) based on the proportion of failed images, and to propagate new progress metrics to the job model.
  • Added logging of error results to the job log
  • Expanded unit tests in ami/ml/tests.py to validate the new progress fields, including detections, classifications, and failed image counts.

Screenshots

Success case:
image
image

Failure case:
(test artificially failed some images). Also Job state set to Failure.
image

image image

Checklist

  • I have tested these changes appropriately.
  • I have added and/or modified relevant tests.
  • I updated relevant documentation or comments.
  • I have verified that this PR follows the project's coding standards.
  • Any dependent changes have already been merged to main.

Summary by CodeRabbit

  • New Features

    • Enhanced job progress tracking: per-stage counts for detections, classifications, captures, plus cumulative failed-image tracking and propagated counts across stages.
  • Bug Fixes

    • Improved error handling and failure-state determination during pipeline processing and finalization; progress reflects failure ratios.
  • Tests

    • Tests updated to verify cumulative failure tracking, retry behavior, cleanup of failed-image tracking, and new progress semantics.
  • Documentation

    • Added NATS infrastructure notes and planning docs for reliability and cleanup improvements.

@netlify
Copy link

netlify bot commented Feb 6, 2026

👷 Deploy request for antenna-ssec pending review.

Visit the deploys page to approve it

Name Link
🔨 Latest commit b15024f

@netlify
Copy link

netlify bot commented Feb 6, 2026

Deploy Preview for antenna-preview canceled.

Name Link
🔨 Latest commit b15024f
🔍 Latest deploy log https://app.netlify.com/projects/antenna-preview/deploys/698f7fb71944b30008f06bb7

@coderabbitai
Copy link
Contributor

coderabbitai bot commented Feb 6, 2026

Note

Reviews paused

It looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the reviews.auto_review.auto_pause_after_reviewed_commits setting.

Use the following commands to manage reviews:

  • @coderabbitai resume to resume automatic reviews.
  • @coderabbitai review to trigger a single review.

Use the checkboxes below for quick actions:

  • ▶️ Resume reviews
  • 🔍 Trigger review
📝 Walkthrough

Walkthrough

Replaces TaskStateManager with AsyncJobStateManager, adds failed-image tracking and cumulative counts (detections, classifications, captures) through NATS pipeline result handling, and updates _update_job_progress to accept a complete_state and propagate computed completion and counts into job progress and status.

Changes

Cohort / File(s) Summary
State manager implementation
ami/ml/orchestration/async_job_state.py
Renamed TaskStateManager → AsyncJobStateManager; introduced JobStateProgress dataclass, _failed_key tracking, _commit_update for atomic updates, added failed-count handling, updated get_progress/update_state return types and cleanup.
Pipeline & job progress
ami/jobs/tasks.py
Replaced TaskStateManager usage with AsyncJobStateManager; process_nats_pipeline_result now computes/propagates failed_image_ids, detections/classifications/captures counts, derives complete_state from failure ratio, and calls updated _update_job_progress(job_id, stage, pct, complete_state, **state_params).
Tests
ami/ml/tests.py, ami/jobs/test_tasks.py, ami/ml/orchestration/tests/test_cleanup.py
Updated imports/usages to AsyncJobStateManager; replaced _get_progress usage with _commit_update/_commit_update semantics; added/updated tests for failed-image tracking, retries, cumulative counts, and cleanup of failed keys.
Integration / jobs orchestration
ami/ml/orchestration/jobs.py, ami/jobs/models.py
Swapped TaskStateManager → AsyncJobStateManager in cleanup/initialization paths; moved FAILURE_THRESHOLD usage to ami/jobs/tasks.py and imported it where needed.
Docs / planning
docs/claude/nats-todo.md, docs/claude/planning/pr-trackcounts-next-session.md
Added NATS reliability/cleanup TODO and planning notes documenting design decisions, next steps, and interactions between Redis-based JobStateProgress and Django JobProgress.

Sequence Diagram

sequenceDiagram
    participant NATS
    participant Pipeline as process_nats_pipeline_result
    participant StateMgr as AsyncJobStateManager
    participant Redis
    participant Django as Job Model

    NATS->>Pipeline: deliver result (success / error)
    Pipeline->>Pipeline: extract processed_image_ids, error_result, counts
    Pipeline->>StateMgr: _commit_update(processed_ids, stage, failed_image_ids?)
    StateMgr->>Redis: add/remove failed ids, update counters (processed, total, failed)
    StateMgr-->>Pipeline: JobStateProgress (processed, remaining, failed, percentage)
    Pipeline->>Pipeline: compute complete_state from failure ratio and FAILURE_THRESHOLD
    Pipeline->>Pipeline: call _update_job_progress(job_id, stage, pct, complete_state, detections..., classifications..., captures...)
    Pipeline->>Django: persist job.status and progress.summary.status when complete
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~50 minutes

Possibly related PRs

Suggested labels

backend, ml

Suggested reviewers

  • mihow

Poem

🐰 I tally hops and image counts,
failed seeds I gently fence,
detections, captures, classifications—
I carry sums through every tense.
Hop on, progress, make it dense! 🥕

🚥 Pre-merge checks | ✅ 4
✅ Passed checks (4 passed)
Check name Status Explanation
Title check ✅ Passed The title accurately captures the main objective of the PR: tracking and displaying image count progress and state for async jobs (PSv2).
Description check ✅ Passed The description covers all required sections: summary, list of changes, screenshots demonstrating success/failure cases, and a completed checklist.
Docstring Coverage ✅ Passed Docstring coverage is 83.33% which is sufficient. The required threshold is 80.00%.
Merge Conflict Detection ✅ Passed ✅ No merge conflicts detected when merging into main

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

carlosgjs and others added 7 commits February 10, 2026 08:50
* fix: update date picker version and tweak layout logic

* feat: set start month based on selected date
* merge

* fix: Properly handle async job state with celery tasks

* Apply suggestion from @Copilot

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>

* Delete implemented plan

---------

Co-authored-by: Carlos Garcia Jurado Suarez <carlos@irreverentlabs.com>
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
* merge

* feat: PSv2 - Queue/redis clean-up upon job completion

* fix: catch specific exception

* chore: move tests to a subdir

---------

Co-authored-by: Carlos Garcia Jurado Suarez <carlos@irreverentlabs.com>
Co-authored-by: Michael Bunsen <notbot@gmail.com>
…kLab#1118)

Introduces the dispatch_mode field on the Job model to track how each job dispatches its workload. 


This allows API clients (including the AMI worker) to filter jobs by dispatch mode — for example, fetching only async_api jobs so workers don't pull synchronous or internal jobs.

JobDispatchMode enum (ami/jobs/models.py):

internal — work handled entirely within the platform (Celery worker, no external calls). Default for all jobs.
sync_api — worker calls an external processing service API synchronously and waits for each response.
async_api — worker publishes items to NATS for external processing service workers to pick up independently.
Database and Model Changes:

Added dispatch_mode CharField with TextChoices, defaulting to internal, with the migration in ami/jobs/migrations/0019_job_dispatch_mode.py.
ML jobs set dispatch_mode = async_api when the project's async_pipeline_workers feature flag is enabled.
ML jobs set dispatch_mode = sync_api on the synchronous processing path (previously unset).
API and Filtering:

dispatch_mode is exposed (read-only) in job list and detail serializers.
Filterable via query parameter: ?dispatch_mode=async_api
The /tasks endpoint now returns 400 for non-async_api jobs, since only those have NATS tasks to fetch.
Architecture doc: docs/claude/job-dispatch-modes.md documents the three modes, naming decisions, and per-job-type mapping.

---------

Co-authored-by: Carlos Garcia Jurado Suarez <carlos@irreverentlabs.com>
Co-authored-by: Michael Bunsen <notbot@gmail.com>
Co-authored-by: Claude <noreply@anthropic.com>
…dler (RolnickLab#1125)

* refactor: use is_complete() and dispatch_mode in job progress handler

Replace hardcoded `stage == "results"` check with `job.progress.is_complete()`
which verifies ALL stages are done, making it work for any job type.

Replace feature flag check in cleanup with `dispatch_mode == ASYNC_API`
which is immutable for the job's lifetime and more correct than re-reading
a mutable flag that could change between job creation and completion.

Co-Authored-By: Claude <noreply@anthropic.com>

* test: update cleanup tests for is_complete() and dispatch_mode checks

Set dispatch_mode=ASYNC_API on test jobs to match the new cleanup guard.
Complete all stages (collect, process, results) in the completion test
since is_complete() correctly requires all stages to be done.

Co-Authored-By: Claude <noreply@anthropic.com>

---------

Co-authored-by: Claude <noreply@anthropic.com>
@carlosgjs carlosgjs changed the title Carlos/trackcounts PSv2: Track and display image count progress and state Feb 11, 2026
@carlosgjs carlosgjs requested a review from mihow February 11, 2026 00:51
@carlosgjs carlosgjs marked this pull request as ready for review February 11, 2026 00:55
Copilot AI review requested due to automatic review settings February 11, 2026 00:55
Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Adds richer progress tracking for async ML image processing jobs (detections/classifications/captures/failed) and uses failure ratio to determine final job state, bringing async behavior closer to the existing sync path.

Changes:

  • Extend TaskStateManager/TaskProgress to track cumulative detections/classifications/captures and a unique failed-image set in Redis.
  • Update async result handling (process_nats_pipeline_result) to record failed images and per-result counts, and to propagate these into job progress updates (including final success/failure state).
  • Expand TaskStateManager unit tests to validate the new progress fields and cleanup behavior (partial coverage).

Reviewed changes

Copilot reviewed 3 out of 3 changed files in this pull request and generated 4 comments.

File Description
ami/ml/orchestration/task_state.py Adds Redis-backed cumulative counters + failed-image tracking and surfaces them via TaskProgress.
ami/jobs/tasks.py Plumbs new progress metrics through async pipeline result processing and modifies job completion state handling.
ami/ml/tests.py Adds/updates tests for new progress fields and failed-image tracking.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🤖 Fix all issues with AI agents
In `@ami/jobs/tasks.py`:
- Around line 92-95: complete_state currently computed once using
FAILURE_THRESHOLD, JobState and progress_info and then reused later for the
"results" stage, which can be stale; before calling _update_job_progress for the
"results" stage (and before the state_manager.update_state that sets results to
100%), re-read the latest progress counts (e.g., fetch updated
progress_info.failed and progress_info.total from the state manager or from the
second state_manager.update_state response) and recompute complete_state =
JobState.FAILURE if (failed/total) >= FAILURE_THRESHOLD else JobState.SUCCESS so
the final job.status reflects the most recent state.
- Around line 92-95: The code currently divides progress_info.failed by
progress_info.total without guarding total==0, causing ZeroDivisionError; update
the logic around FAILURE_THRESHOLD, complete_state and JobState so that if
progress_info.total == 0 you short-circuit (set complete_state =
JobState.SUCCESS) and only compute (progress_info.failed / progress_info.total)
>= FAILURE_THRESHOLD when total > 0, mirroring the guard used in
_get_progress/task_state.py.
🧹 Nitpick comments (4)
ami/jobs/tasks.py (2)

133-140: Redundant if pipeline_result checks inside if pipeline_result: block.

Lines 134 and 137 guard with if pipeline_result else 0, but this code is already inside the if pipeline_result: block at line 123. The checks are dead branches.

♻️ Simplification
             # Calculate detection and classification counts from this result
-            detections_count = len(pipeline_result.detections) if pipeline_result else 0
-            classifications_count = (
-                sum(len(detection.classifications) for detection in pipeline_result.detections)
-                if pipeline_result
-                else 0
-            )
+            detections_count = len(pipeline_result.detections)
+            classifications_count = sum(len(detection.classifications) for detection in pipeline_result.detections)
             captures_count = len(pipeline_result.source_images)

195-197: complete_state typed as Any — prefer a narrower type.

Using Any weakens type safety. Since this always receives a JobState value, type it accordingly.

♻️ Tighten the type
+from ami.jobs.models import JobState  # if not already imported at module level
+
 def _update_job_progress(
-    job_id: int, stage: str, progress_percentage: float, complete_state: Any, **state_params
+    job_id: int, stage: str, progress_percentage: float, complete_state: "JobState", **state_params
 ) -> None:

Since JobState is imported locally to avoid circular imports, you can use a string literal ("JobState") or TYPE_CHECKING guard for the annotation.

ami/ml/orchestration/task_state.py (2)

146-169: Read-modify-write on counters is safe only under the lock — consider a comment.

The cumulative counting (lines 148–158) and failed-set union (lines 162–164) use non-atomic cache.get + cache.set. This is safe because the caller update_state holds a per-job lock, but _get_progress is also called directly in tests. A brief inline comment noting the lock invariant would help future maintainers.

📝 Suggested comment
+        # NOTE: The read-modify-write below is safe because callers must hold
+        # the per-job lock (acquired in update_state). Do not call _get_progress
+        # without the lock in production code.
+
         # Update cumulative detection, classification, and capture counts
         current_detections = cache.get(self._detections_key, 0)

139-141: O(n) filtering of pending images on every update.

remaining_images is computed with a list comprehension iterating all pending images (line 139), which grows as job size grows. For large jobs (tens of thousands of images), this linear scan on every progress update could become a bottleneck. A Redis set with SREM would make removals O(1) per image, but this may be acceptable for current workloads.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
ami/jobs/tasks.py (1)

124-183: ⚠️ Potential issue | 🔴 Critical

self.retry() at line 163 will be swallowed by the broad except Exception at line 180.

celery.exceptions.Retry is a subclass of Exception. When state_manager.update_state returns None and self.retry(...) is raised at line 163, it's caught by line 180, logged as an error, and silently discarded — the task will never actually retry.

Either narrow the except clause or re-raise Retry:

Proposed fix (option A — let Retry propagate)
+    from celery.exceptions import Retry
+
     try:
         # Save to database (this is the slow operation)
         ...
-    except Exception as e:
+    except Retry:
+        raise
+    except Exception as e:
         job.logger.error(
             f"Failed to process pipeline result for job {job_id}: {e}. NATS will redeliver the task message."
         )
🧹 Nitpick comments (4)
ami/ml/tests.py (2)

883-886: Consider also asserting captures count in the init helper.

_init_and_verify checks detections, classifications, and failed but omits captures. While captures is tested in test_cumulative_counting, adding it here keeps the initial-state verification comprehensive and consistent.

Proposed fix
         self.assertEqual(progress.detections, 0)
         self.assertEqual(progress.classifications, 0)
         self.assertEqual(progress.failed, 0)
+        self.assertEqual(progress.captures, 0)

1104-1122: cache.get on a Redis set key may return None or an unexpected type depending on the cache backend.

Line 1114 assumes cache.get(self.manager._failed_key) returns a Python set with a len(). This works with Django's locmem cache in tests, but if the _get_progress method uses cache.get/cache.set to store a Python set, the test is valid. Just be aware this would break if the underlying storage changes (e.g., raw Redis commands instead of Django cache).

ami/jobs/tasks.py (2)

137-144: Redundant if pipeline_result guards inside an if pipeline_result: block.

Lines 138 and 141 check if pipeline_result again, but they're already inside the if pipeline_result: block starting at line 127.

Simplified
-            detections_count = len(pipeline_result.detections) if pipeline_result else 0
-            classifications_count = (
-                sum(len(detection.classifications) for detection in pipeline_result.detections)
-                if pipeline_result
-                else 0
-            )
+            detections_count = len(pipeline_result.detections)
+            classifications_count = sum(len(d.classifications) for d in pipeline_result.detections)
             captures_count = len(pipeline_result.source_images)

204-206: Consider using JobState instead of Any for complete_state type hint.

Any obscures the expected type. Since JobState is already imported inside the function body, you could use a string literal "JobState" or import it at module level under TYPE_CHECKING to get a proper type hint without circular import issues.

carlosgjs and others added 6 commits February 11, 2026 12:09
# Conflicts:
#	ami/ml/orchestration/task_state.py
Clarify naming to distinguish mutating vs read-only methods:
- _commit_update(): private, writes mutations to Redis, returns progress
- get_progress(): public, read-only snapshot (added in RolnickLab#1129)
- update_state(): public API, acquires lock, calls _commit_update()

Co-Authored-By: Claude <noreply@anthropic.com>
- Single FAILURE_THRESHOLD constant in tasks.py, imported by models.py
- Fix async path to use `> FAILURE_THRESHOLD` (was `>=`) to match
  the sync path's boundary behavior at exactly 50%
- Convert TaskProgress from namedtuple to dataclass with defaults,
  so new fields don't break existing callers

Co-Authored-By: Claude <noreply@anthropic.com>
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (2)
ami/jobs/tasks.py (2)

159-184: ⚠️ Potential issue | 🔴 Critical

raise self.retry() on line 164 will be swallowed by the except Exception on line 181.

Celery's self.retry() raises celery.exceptions.Retry, which is a subclass of Exception. The broad except Exception at line 181 will catch it, log it as a processing error, and silently discard the retry. This means if the lock isn't acquired for the "results" stage update, the progress will never be updated for that image, and the job may never complete.

Note that the identical pattern at line 95 works correctly because it's outside any try/except block.

🐛 Proposed fix — exclude Retry from the catch-all
+    from celery.exceptions import Retry as CeleryRetry
+
     try:
         # Save to database (this is the slow operation)
         detections_count, classifications_count, captures_count = 0, 0, 0
         ...
+    except CeleryRetry:
+        raise
     except Exception as e:
         job.logger.error(
             f"Failed to process pipeline result for job {job_id}: {e}. NATS will redeliver the task message."
         )

129-184: ⚠️ Potential issue | 🟠 Major

NATS ack at line 147 precedes the results-stage progress update — if the latter fails, the message is lost.

After _ack_task_via_nats succeeds (line 147), any exception in the results-stage update_state or _update_job_progress (lines 150–179) is caught by the except at line 181 without re-raising. The error message claims "NATS will redeliver the task message," but the message has already been acknowledged. This could leave the job stuck with a results stage that never reaches 100%.

Consider either moving the ack after the results-stage progress update, or re-raising the exception so Celery retries the task (NATS redelivery would then handle it since ack didn't happen).

🧹 Nitpick comments (1)
ami/ml/orchestration/task_state.py (1)

144-208: Cumulative counters are shared across stages — works but is implicit.

The detection/classification/capture counters use a single set of Redis keys (not per-stage), so the "process" stage always contributes 0 and the "results" stage contributes actual counts. This works correctly given the current call sites in tasks.py, but it's fragile if a future caller passes non-zero counts for the "process" stage — they'd be silently accumulated.

A brief comment noting that counters are only expected to be incremented during the "results" stage would help future maintainners.

mihow and others added 2 commits February 12, 2026 16:57
Clarify that this dataclass tracks job-level progress in Redis,
not individual task/image progress. Aligns with the naming of
JobProgress (the Django/Pydantic model equivalent).

Co-Authored-By: Claude <noreply@anthropic.com>
Mark connection handling as done (PR RolnickLab#1130), add worktree/remote
mapping and docker testing notes for future sessions.

Co-Authored-By: Claude <noreply@anthropic.com>
Copy link
Collaborator

@mihow mihow left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@carlosgjs what do you think about using the existing Job model to track the progress & failure counts directly? instead of adding counts in Redis then copying to the Job model. Also I am a bit confused about what the "Tasks" are in Redis now, since they have counts for the Job's total captures, failures, etc. Or are those counts for a single batch? The current job model counts the progress & failures per stage type, so there isn't a single count for all failures. Or if there is a reason to use Redis I am open to that! But maybe we can update the naming & docstrings.

Also, I am thinking we should simplify the logic determining if a Job is in the FAILURE state. Let's just show the counts. Really we need a new state like "COMPLETED" instead of Celery's SUCCESS & FAILURE states. "Completed with errors". Then we can remove a number of checks related to the stage status & overall status.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 4

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (3)
ami/ml/orchestration/jobs.py (1)

15-21: ⚠️ Potential issue | 🟡 Minor

Stale docstring reference to TaskStateManager.

Line 19 still says "Redis state (via TaskStateManager.cleanup)" but the class has been renamed to AsyncJobStateManager.

📝 Proposed fix
-    1. Redis state (via TaskStateManager.cleanup):
+    1. Redis state (via AsyncJobStateManager.cleanup):
ami/ml/tests.py (1)

860-861: ⚠️ Potential issue | 🟡 Minor

Stale class docstring.

Line 861 says "Test TaskStateManager" but the class under test is now AsyncJobStateManager.

📝 Proposed fix
 class TestTaskStateManager(TestCase):
-    """Test TaskStateManager for job progress tracking."""
+    """Test AsyncJobStateManager for job progress tracking."""
ami/jobs/test_tasks.py (1)

89-93: ⚠️ Potential issue | 🟡 Minor

Stale docstring reference.

Line 92 still says "Assert TaskStateManager state is correct" — should reference AsyncJobStateManager.

📝 Proposed fix
     def _assert_progress_updated(
         self, job_id: int, expected_processed: int, expected_total: int, stage: str = "process"
     ):
-        """Assert TaskStateManager state is correct."""
+        """Assert AsyncJobStateManager state is correct."""
🤖 Fix all issues with AI agents
In `@docs/claude/nats-todo.md`:
- Around line 132-159: Fix three minor typos in the notes: replace the word
"prevous" with "previous", change "hbs" to "has", and correct "SLUM" to "SLURM"
in the nats-todo notes (search for the exact tokens "prevous", "hbs", and "SLUM"
to locate the spots to update).
- Around line 86-93: The doc exposes an internal IP (192.168.123.176) in the
NATS monitoring instructions; update the text to avoid leaking internal network
addresses by replacing the IP with the hostname (e.g., ami-redis-1) or a
placeholder like <internal-host> and/or remove the IP entirely, and ensure the
example line that reads "http://192.168.123.176:8222" is changed to use the
hostname or placeholder so the SSH tunnel instructions and the dashboard server
URL examples remain accurate and non-sensitive.
- Around line 10-16: The doc contains sensitive infrastructure topology
(hostnames like ami-cc/ami-worker-2 and SSH commands) that should be removed;
replace those concrete hostnames/SSH commands with a brief, non-sensitive
operational note and/or move detailed steps to the private ops repo or wiki, and
redact the specific commands from the document while keeping the relevant config
detail (the NATS_URL change from the old default to nats://nats:4222) documented
generically; search for occurrences of NATS_URL and the nats://nats:4222 default
in the docs entry and replace the explicit SSH/host examples with a short,
environment-agnostic remediation note pointing to the private ops location.

In `@docs/claude/planning/pr-trackcounts-next-session.md`:
- Around line 39-51: Update the stale references in the document: replace any
mention of the old class name TaskStateManager with AsyncJobStateManager and
update the file reference ami/ml/orchestration/task_state.py to
ami/ml/orchestration/async_job_state.py (also check nearby mentions of
TaskStateManager/ task_state.py and adjust them to AsyncJobStateManager/
async_job_state.py so all references match the renamed class and file).
🧹 Nitpick comments (1)
ami/ml/orchestration/async_job_state.py (1)

144-208: _commit_update relies on the caller holding the lock — document and enforce this.

The read-modify-write cycle on counters (lines 171–181) and the failed set (lines 184–190) is safe only because update_state serializes access via the Redis lock. However, _commit_update is callable directly (and tests do so), and has no guard of its own.

The docstring on line 154 says "Must be called under lock," which is good. Consider raising or asserting if the lock is not held, or at minimum note in the docstring that direct calls outside tests are unsafe.

Comment on lines +10 to +16
- **Root cause of job 2226 failure:** worker-2 was missing `NATS_URL` in `.envs/.production/.django`, so it defaulted to `nats://localhost:4222`. Every NATS ack from worker-2 failed with `Connect call failed ('127.0.0.1', 4222)`.
- **Fix applied in code:** Changed default in `config/settings/base.py:268` from `nats://localhost:4222` to `nats://nats:4222` (matches the hostname mapped via `extra_hosts` in all compose files).
- **Still needed on server:**
```bash
ssh ami-cc "ssh ami-worker-2 'echo NATS_URL=nats://nats:4222 >> ~/ami-platform/.envs/.production/.django'"
ssh ami-cc "ssh ami-worker-2 'cd ~/ami-platform && docker compose -f docker-compose.worker.yml restart celeryworker'"
```
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Internal infrastructure details committed to the repository.

These lines contain server hostnames (ami-cc, ami-worker-2), env file paths, and SSH commands specific to your production/staging environment. While no actual secrets (passwords/tokens) are present, this is infrastructure topology information that may be better suited for a private ops repo or wiki rather than the main codebase.

🤖 Prompt for AI Agents
In `@docs/claude/nats-todo.md` around lines 10 - 16, The doc contains sensitive
infrastructure topology (hostnames like ami-cc/ami-worker-2 and SSH commands)
that should be removed; replace those concrete hostnames/SSH commands with a
brief, non-sensitive operational note and/or move detailed steps to the private
ops repo or wiki, and redact the specific commands from the document while
keeping the relevant config detail (the NATS_URL change from the old default to
nats://nats:4222) documented generically; search for occurrences of NATS_URL and
the nats://nats:4222 default in the docs entry and replace the explicit SSH/host
examples with a short, environment-agnostic remediation note pointing to the
private ops location.

Comment on lines +86 to +93
### Expose NATS monitoring for dashboard access

- **Port 8222 is already exposed** on ami-redis-1, so `http://192.168.123.176:8222` should work from the VPN
- **For browser dashboard** (https://natsdashboard.com/): Needs the monitoring endpoint reachable from your browser. Use SSH tunnel if not on VPN:
```bash
ssh -L 8222:localhost:8222 ami-cc -t "ssh -L 8222:localhost:8222 ami-redis-1"
```
Then open https://natsdashboard.com/ with server URL `http://localhost:8222`
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Internal IP address in documentation.

Line 88 contains the internal IP 192.168.123.176. Consider referencing by hostname or removing if this doc is in a public or semi-public repository.

🤖 Prompt for AI Agents
In `@docs/claude/nats-todo.md` around lines 86 - 93, The doc exposes an internal
IP (192.168.123.176) in the NATS monitoring instructions; update the text to
avoid leaking internal network addresses by replacing the IP with the hostname
(e.g., ami-redis-1) or a placeholder like <internal-host> and/or remove the IP
entirely, and ensure the example line that reads "http://192.168.123.176:8222"
is changed to use the hostname or placeholder so the SSH tunnel instructions and
the dashboard server URL examples remain accurate and non-sensitive.

Comment on lines +132 to +159
- Processing service failing on batches with different image sizes
- How can we mark an image/task as failed and say don't retry?
- Processing service still needs to batch classifications (like prevous methods)
- Nats jobs appear stuck if there are any task failures: https://antenna.insectai.org/projects/18/jobs/2228
- If a task crashes, the whole worker seems to reset
- Then no tasks are found remaining for the job in NATS
2026-02-09 18:23:49 [info ] No jobs found, sleeping for 5 seconds
2026-02-09 18:23:54 [info ] Checking for jobs for pipeline panama_moths_2023
2026-02-09 18:23:55 [info ] Checking for jobs for pipeline panama_moths_2024
2026-02-09 18:23:55 [info ] Checking for jobs for pipeline quebec_vermont_moths_2023
2026-02-09 18:23:55 [info ] Processing job 2229 with pipeline quebec_vermont_moths_2023
2026-02-09 18:23:55 [info ] Worker 0/2 starting iteration for job 2229
2026-02-09 18:23:55 [info ] Worker 1/2 starting iteration for job 2229
2026-02-09 18:23:59 [info ] Worker 0: No more tasks for job 2229
2026-02-09 18:23:59 [info ] Worker 0: Iterator finished
2026-02-09 18:24:03 [info ] Worker 1: No more tasks for job 2229
2026-02-09 18:24:03 [info ] Worker 1: Iterator finished
2026-02-09 18:24:03 [info ] Done, detections: 0. Detecting time: 0.0, classification time: 0.0, dl time: 0.0, save time: 0.0

- Would love some logs like "no task has been picked up in X minutes" or "last seen", etc.
- Skip jobs that hbs no tasks in the initial query

- test in a SLUM job! yeah! in Victoria?

- jumps around between jobs - good thing? annoying? what about when there is only one job open?
- time for time estimates

- bring back vectors asap
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Minor typos in the notes section.

  • Line 134: "prevous" → "previous"
  • Line 152: "hbs" → "has"
  • Line 154: "SLUM" → likely "SLURM"
🤖 Prompt for AI Agents
In `@docs/claude/nats-todo.md` around lines 132 - 159, Fix three minor typos in
the notes: replace the word "prevous" with "previous", change "hbs" to "has",
and correct "SLUM" to "SLURM" in the nats-todo notes (search for the exact
tokens "prevous", "hbs", and "SLUM" to locate the spots to update).

Comment on lines +39 to +51
- **Redis side:** `TaskStateManager` → `JobStateProgress` (dataclass, ephemeral in Redis)
- Tracks pending image IDs per stage (process, results)
- Tracks cumulative counts: detections, classifications, captures, failed
- Single flat object — no per-stage breakdown of counts

The disconnect: Redis tracks **per-stage pending images** (separate pending lists for "process" and "results" stages) but returns **job-wide cumulative counts** (one detections counter, one failed set). So `JobStateProgress` is a hybrid — stage-scoped for image completion, but job-scoped for counts.

**Should counts be per-stage?** For example, "failed" in the process stage means images that errored during ML inference. But could there be failures in the results stage too (failed to save)? The sync path tracks `request_failed_images` (process failures) separately from `failed_save_tasks` (results failures). The async path currently lumps all failures into one set.

**Key files to read:**
- `ami/ml/orchestration/task_state.py` — `TaskStateManager` and `JobStateProgress`
- `ami/jobs/tasks.py:62-185` — `process_nats_pipeline_result` (async path, uses TaskStateManager)
- `ami/jobs/models.py:466-582` — `MLJob.process_images` (sync path, tracks counts locally)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Stale references to old names.

  • Line 39: TaskStateManager → should be AsyncJobStateManager
  • Line 49: ami/ml/orchestration/task_state.py → should be ami/ml/orchestration/async_job_state.py

These references are inconsistent with the rename performed in this PR.

🤖 Prompt for AI Agents
In `@docs/claude/planning/pr-trackcounts-next-session.md` around lines 39 - 51,
Update the stale references in the document: replace any mention of the old
class name TaskStateManager with AsyncJobStateManager and update the file
reference ami/ml/orchestration/task_state.py to
ami/ml/orchestration/async_job_state.py (also check nearby mentions of
TaskStateManager/ task_state.py and adjust them to AsyncJobStateManager/
async_job_state.py so all references match the renamed class and file).

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (2)
ami/ml/orchestration/async_job_state.py (1)

69-86: ⚠️ Potential issue | 🟡 Minor

Stale docstring: references non-existent parameters.

The docstring for update_state documents detections_count, classifications_count, and captures_count parameters (Lines 83–85) that don't exist in the method signature. These appear to be leftovers from an earlier iteration.

✏️ Proposed fix
         Args:
             processed_image_ids: Set of image IDs that have just been processed
             stage: The processing stage ("process" or "results")
             request_id: Unique identifier for this processing request
-            detections_count: Number of detections to add to cumulative count
-            classifications_count: Number of classifications to add to cumulative count
-            captures_count: Number of captures to add to cumulative count
             failed_image_ids: Set of image IDs that failed processing (optional)
ami/jobs/tasks.py (1)

129-181: ⚠️ Potential issue | 🟠 Major

If an error occurs after the NATS ACK (Line 147), the results-stage progress will never be updated, potentially leaving the job stuck.

The except on Line 178 catches all exceptions and logs them, but doesn't re-raise. Since the NATS message was already acknowledged, it won't be redelivered. If state_manager.update_state (Line 150) or _update_job_progress (Line 168) fails, the "results" stage will never reach 100% and the job will remain incomplete.

Also, the error message on Line 180 says "NATS will redeliver the task message," which is incorrect for failures after the ACK.

Consider either:

  • Moving the ACK after the results-stage update, or
  • Re-raising after logging so the Celery task can be retried.
🧹 Nitpick comments (2)
ami/ml/tests.py (2)

860-861: Stale class name in test class docstring.

The docstring still references TaskStateManager but the class was renamed to AsyncJobStateManager.

 class TestTaskStateManager(TestCase):
-    """Test TaskStateManager for job progress tracking."""
+    """Test AsyncJobStateManager for job progress tracking."""

1018-1036: Direct access to _failed_key is fine for whitebox testing, but cache.get on Line 1028 could return None if the test setup changes.

Currently this is safe because the preceding _commit_update call guarantees the key exists. Just be aware this is a fragile coupling—if _commit_update's caching behavior changes, len(failed_set) on Line 1029 would raise TypeError.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants