From bae705eb72c2c950c8731f08663de2f41c2e6a87 Mon Sep 17 00:00:00 2001 From: Anthony Volk Date: Thu, 26 Feb 2026 01:19:33 +0100 Subject: [PATCH] Add GCS bucket version audit and restore scripts Closes #30 Co-Authored-By: Claude Opus 4.6 --- gcp/gcs_restore.py | 242 +++++++++++++++++++++++++++++++++++++ gcp/gcs_version_audit.py | 254 +++++++++++++++++++++++++++++++++++++++ 2 files changed, 496 insertions(+) create mode 100644 gcp/gcs_restore.py create mode 100644 gcp/gcs_version_audit.py diff --git a/gcp/gcs_restore.py b/gcp/gcs_restore.py new file mode 100644 index 0000000..17b9248 --- /dev/null +++ b/gcp/gcs_restore.py @@ -0,0 +1,242 @@ +#!/usr/bin/env python3 +""" +GCS Restore Script + +Restores all objects in a GCS bucket to their state as of a target date. +By default runs in DRY-RUN mode. Pass --execute to actually perform the restore. + +Requires object versioning to be enabled on the bucket. + +Requirements: + pip install google-cloud-storage + +Usage: + # Dry run (no changes) + python gcs_restore.py my-bucket --target="2026-01-15T23:59:00" + + # Execute restore + python gcs_restore.py my-bucket --target="2026-01-15T23:59:00" --execute + + # Also delete files created after the target date + python gcs_restore.py my-bucket --target="2026-01-15T23:59:00" --execute --delete-new +""" + +import argparse +import sys +import time +from collections import defaultdict +from concurrent.futures import ThreadPoolExecutor, as_completed +from datetime import datetime, timezone, timedelta + +from google.cloud import storage + + +def parse_args(): + parser = argparse.ArgumentParser( + description="Restore GCS objects to their state at a target date." + ) + parser.add_argument("bucket", help="GCS bucket name (without gs:// prefix)") + parser.add_argument( + "--prefix", + default="", + help="Only restore objects under this prefix", + ) + parser.add_argument( + "--target", + required=True, + help="Target datetime in ISO 8601 format (e.g. '2026-01-15T23:59:00')", + ) + parser.add_argument( + "--tz-offset", + default="+00:00", + help="UTC offset for target datetime (default: +00:00 UTC)", + ) + parser.add_argument( + "--delete-new", + action="store_true", + help="Delete objects that were created after the target date", + ) + parser.add_argument( + "--execute", + action="store_true", + help="Actually perform the restore. Without this flag, only a dry run is shown.", + ) + parser.add_argument( + "--workers", + type=int, + default=10, + help="Number of parallel workers for restore operations (default: 10)", + ) + return parser.parse_args() + + +def build_target_dt(target_str, tz_offset_str): + naive = datetime.fromisoformat(target_str) + if naive.tzinfo is not None: + return naive + sign = 1 if tz_offset_str[0] == "+" else -1 + parts = tz_offset_str[1:].split(":") + hours, minutes = int(parts[0]), int(parts[1]) if len(parts) > 1 else 0 + tz = timezone(timedelta(hours=sign * hours, minutes=sign * minutes)) + return naive.replace(tzinfo=tz) + + +def restore_version(client, bucket_name, object_name, generation): + """Copy a specific generation of an object over the current live version.""" + bucket = client.bucket(bucket_name) + source_blob = bucket.blob(object_name, generation=generation) + dest_blob = bucket.blob(object_name) + # Rewrite (copy) the old generation to become the new live version + dest_blob.rewrite(source_blob) + return object_name, generation, True + + +def delete_object(client, bucket_name, object_name): + """Delete the current live version of an object.""" + bucket = client.bucket(bucket_name) + blob = bucket.blob(object_name) + blob.delete() + return object_name, None, True + + +def main(): + args = parse_args() + target_dt = build_target_dt(args.target, args.tz_offset) + + mode = "EXECUTE" if args.execute else "DRY RUN" + + print("=" * 70) + print(f"GCS RESTORE ({mode})") + print("=" * 70) + print(f" Bucket: {args.bucket}") + print(f" Prefix: {args.prefix or '(entire bucket)'}") + print(f" Target date: {target_dt.isoformat()}") + print(f" Delete new: {args.delete_new}") + print(f" Workers: {args.workers}") + print("=" * 70) + print() + + client = storage.Client() + bucket = client.bucket(args.bucket) + bucket.reload() + + if not bucket.versioning_enabled: + print("ERROR: Object versioning is not enabled on this bucket.") + print(" Cannot restore without versioned objects.") + sys.exit(1) + + # Collect all versions + print("Scanning object versions...") + versions_by_name = defaultdict(list) + total_versions = 0 + + blobs = bucket.list_blobs(prefix=args.prefix or None, versions=True) + for blob in blobs: + versions_by_name[blob.name].append( + { + "generation": blob.generation, + "updated": blob.updated, + "size": blob.size, + "is_live": not blob.time_deleted, + } + ) + total_versions += 1 + if total_versions % 1000 == 0: + print(f" ...scanned {total_versions} versions...") + + print(f" Done. {total_versions} versions across {len(versions_by_name)} objects.") + print() + + # Build action plan + to_restore = [] # (object_name, target_generation, size) + to_delete = [] # (object_name,) + already_ok = 0 + + for name, vers in sorted(versions_by_name.items()): + vers.sort(key=lambda v: v["updated"], reverse=True) + + live = [v for v in vers if v["is_live"]] + current = live[0] if live else None + + best_at_target = None + for v in vers: + if v["updated"] <= target_dt: + best_at_target = v + break + + if best_at_target is None: + # Object created after target date + if args.delete_new: + to_delete.append(name) + elif current and current["generation"] == best_at_target["generation"]: + already_ok += 1 + else: + to_restore.append((name, best_at_target["generation"], best_at_target["size"])) + + # Summary + restore_gb = sum(s for _, _, s in to_restore) / (1024 ** 3) + print(f" Restore: {len(to_restore)} objects ({restore_gb:.2f} GB)") + print(f" Delete: {len(to_delete)} objects") + print(f" No-op: {already_ok} objects already at correct version") + print() + + if not to_restore and not to_delete: + print("Nothing to do!") + return + + if not args.execute: + print("=" * 70) + print("DRY RUN — no changes made. Pass --execute to perform the restore.") + print("=" * 70) + return + + # Confirm + print("!" * 70) + print(f"ABOUT TO RESTORE {len(to_restore)} objects and DELETE {len(to_delete)} objects.") + response = input("Type 'yes' to proceed: ") + if response.strip().lower() != "yes": + print("Aborted.") + sys.exit(0) + print() + + # Execute restores + succeeded = 0 + failed = 0 + start_time = time.time() + + with ThreadPoolExecutor(max_workers=args.workers) as executor: + futures = {} + + for name, generation, size in to_restore: + f = executor.submit(restore_version, client, args.bucket, name, generation) + futures[f] = ("restore", name) + + for name in to_delete: + f = executor.submit(delete_object, client, args.bucket, name) + futures[f] = ("delete", name) + + total = len(futures) + for i, future in enumerate(as_completed(futures), 1): + action, name = futures[future] + try: + future.result() + succeeded += 1 + except Exception as e: + failed += 1 + print(f" FAILED ({action}) {name}: {e}") + + if i % 50 == 0 or i == total: + elapsed = time.time() - start_time + print(f" Progress: {i}/{total} ({elapsed:.0f}s elapsed)") + + elapsed = time.time() - start_time + print() + print("=" * 70) + print(f"RESTORE COMPLETE in {elapsed:.1f}s") + print(f" Succeeded: {succeeded}") + print(f" Failed: {failed}") + print("=" * 70) + + +if __name__ == "__main__": + main() diff --git a/gcp/gcs_version_audit.py b/gcp/gcs_version_audit.py new file mode 100644 index 0000000..feed833 --- /dev/null +++ b/gcp/gcs_version_audit.py @@ -0,0 +1,254 @@ +#!/usr/bin/env python3 +""" +GCS Version Audit Script (READ-ONLY) + +Lists all object versions in a GCS bucket and shows what action would be +needed to revert each file to its state as of a target date. + +THIS SCRIPT DOES NOT MODIFY ANYTHING. It only reads and reports. + +Requirements: + pip install google-cloud-storage + +Usage: + python gcs_version_audit.py my-bucket --target="2026-01-15T23:59:00" + python gcs_version_audit.py my-bucket --target="2026-01-15T23:59:00" --tz-offset="-05:00" + python gcs_version_audit.py my-bucket --target="2026-01-15T23:59:00" --limit=10 + python gcs_version_audit.py my-bucket --target="2026-01-15T23:59:00" --show-all-versions +""" + +import argparse +import sys +from collections import defaultdict +from datetime import datetime, timezone, timedelta + +from google.cloud import storage + + +def parse_args(): + parser = argparse.ArgumentParser( + description="Audit GCS object versions and plan a revert (read-only)." + ) + parser.add_argument("bucket", help="GCS bucket name (without gs:// prefix)") + parser.add_argument( + "--prefix", + default="", + help="Only examine objects under this prefix (e.g. 'data/')", + ) + parser.add_argument( + "--target", + required=True, + help="Target datetime in ISO 8601 format (e.g. '2026-01-15T23:59:00')", + ) + parser.add_argument( + "--tz-offset", + default="+00:00", + help="UTC offset for target datetime, e.g. '-05:00' for EST (default: +00:00 UTC)", + ) + parser.add_argument( + "--show-all-versions", + action="store_true", + help="Print every version of every object (verbose)", + ) + parser.add_argument( + "--limit", + type=int, + default=0, + help="Stop after examining this many objects (0 = no limit, useful for testing)", + ) + return parser.parse_args() + + +def build_target_dt(target_str, tz_offset_str): + naive = datetime.fromisoformat(target_str) + if naive.tzinfo is not None: + return naive + # Parse the offset manually + sign = 1 if tz_offset_str[0] == "+" else -1 + parts = tz_offset_str[1:].split(":") + hours, minutes = int(parts[0]), int(parts[1]) if len(parts) > 1 else 0 + tz = timezone(timedelta(hours=sign * hours, minutes=sign * minutes)) + return naive.replace(tzinfo=tz) + + +def main(): + args = parse_args() + target_dt = build_target_dt(args.target, args.tz_offset) + + print("=" * 70) + print("GCS VERSION AUDIT (READ-ONLY)") + print("=" * 70) + print(f" Bucket: {args.bucket}") + print(f" Prefix: {args.prefix or '(entire bucket)'}") + print(f" Target date: {target_dt.isoformat()}") + print(f" Limit: {args.limit or 'none'}") + print("=" * 70) + print() + + client = storage.Client() + bucket = client.bucket(args.bucket) + + # Check versioning status + bucket.reload() + if not bucket.versioning_enabled: + print("WARNING: Object versioning is NOT currently enabled on this bucket.") + print(" Noncurrent versions may still exist if versioning was enabled previously.") + print() + + # Collect all versions grouped by object name + print("Fetching object versions (this may take a while for large buckets)...") + versions_by_name = defaultdict(list) + total_versions = 0 + + blobs = bucket.list_blobs(prefix=args.prefix or None, versions=True) + limit_reached = False + for blob in blobs: + if limit_reached and blob.name not in versions_by_name: + break + + versions_by_name[blob.name].append( + { + "generation": blob.generation, + "updated": blob.updated, + "size": blob.size, + "is_live": not blob.time_deleted, # live versions have no deletion time + "metageneration": blob.metageneration, + } + ) + total_versions += 1 + + if total_versions % 1000 == 0: + print(f" ...scanned {total_versions} versions so far...") + + if args.limit and len(versions_by_name) >= args.limit: + limit_reached = True + + print(f" Done. Found {total_versions} total versions across {len(versions_by_name)} objects.") + print() + + # Classify each object + actions = {"restore": [], "already_correct": [], "delete_new": [], "no_version": []} + + for name, vers in sorted(versions_by_name.items()): + # Sort versions by updated time descending + vers.sort(key=lambda v: v["updated"], reverse=True) + + # Find the live (current) version + live = [v for v in vers if v["is_live"]] + current = live[0] if live else None + + # Find the best version at or before the target date + best_at_target = None + for v in vers: + if v["updated"] <= target_dt: + best_at_target = v + break # first match is the most recent one <= target + + if args.show_all_versions: + print(f" {name}") + for v in vers: + marker = " <-- LIVE" if v["is_live"] else "" + target_marker = " <-- TARGET" if v is best_at_target else "" + print( + f" gen={v['generation']} updated={v['updated'].isoformat()}" + f" size={v['size']}{marker}{target_marker}" + ) + print() + + if best_at_target is None: + # Object didn't exist at target date — it was created after + actions["delete_new"].append( + { + "name": name, + "current": current, + "earliest_version": vers[-1] if vers else None, + } + ) + elif current and current["generation"] == best_at_target["generation"]: + # Current version IS the target version — no action needed + actions["already_correct"].append({"name": name, "version": current}) + elif current: + # Current version differs from target — would need to restore + actions["restore"].append( + { + "name": name, + "current": current, + "target_version": best_at_target, + } + ) + else: + # Object was deleted after target date but existed at target — would need to restore + actions["restore"].append( + { + "name": name, + "current": None, + "target_version": best_at_target, + } + ) + + # Print summary + print("=" * 70) + print("REVERT PLAN SUMMARY") + print("=" * 70) + restore_bytes = sum(item["target_version"]["size"] for item in actions["restore"]) + restore_gb = restore_bytes / (1024 ** 3) + coldline_cost = restore_gb * 0.02 + + print(f" Already at correct version: {len(actions['already_correct']):>6}") + print(f" Would need to restore: {len(actions['restore']):>6}") + print(f" Created after target (new): {len(actions['delete_new']):>6}") + print(f" Total objects: {len(versions_by_name):>6}") + print() + print(f" Total restore size: {restore_gb:>9.2f} GB") + print(f" Est. Coldline retrieval cost: ${coldline_cost:>8.2f} (at $0.02/GB)") + print() + + if actions["restore"]: + print("-" * 70) + print("FILES THAT WOULD BE RESTORED TO AN OLDER VERSION:") + print("-" * 70) + for item in actions["restore"]: + name = item["name"] + tv = item["target_version"] + cur = item["current"] + if cur: + print(f" {name}") + print(f" Current: gen={cur['generation']} updated={cur['updated'].isoformat()} size={cur['size']}") + print(f" Revert to: gen={tv['generation']} updated={tv['updated'].isoformat()} size={tv['size']}") + else: + print(f" {name} (DELETED — would be restored)") + print(f" Revert to: gen={tv['generation']} updated={tv['updated'].isoformat()} size={tv['size']}") + print() + + if actions["delete_new"]: + print("-" * 70) + print("FILES CREATED AFTER TARGET DATE (did not exist at target time):") + print("-" * 70) + for item in actions["delete_new"]: + name = item["name"] + ev = item["earliest_version"] + print(f" {name}") + if ev: + print(f" Earliest version: updated={ev['updated'].isoformat()}") + print() + + if actions["already_correct"]: + print("-" * 70) + print(f"FILES ALREADY AT CORRECT VERSION: {len(actions['already_correct'])}") + print("-" * 70) + if len(actions["already_correct"]) <= 20: + for item in actions["already_correct"]: + print(f" {item['name']}") + else: + for item in actions["already_correct"][:10]: + print(f" {item['name']}") + print(f" ... and {len(actions['already_correct']) - 10} more") + print() + + print("=" * 70) + print("NO CHANGES WERE MADE. This is a read-only audit.") + print("=" * 70) + + +if __name__ == "__main__": + main()