diff --git a/avise/cli.py b/avise/cli.py index f823bef..f5b1264 100644 --- a/avise/cli.py +++ b/avise/cli.py @@ -186,9 +186,13 @@ def main(arguments=[]) -> None: print(f"\nSecurity Evaluation Test completed!") print(f" Format: {report_format.value.upper()}") print(f" Total: {report.summary['total_sets']}") - print(f" Passed: {report.summary['passed']} ({report.summary['pass_rate']}%)") - print(f" Failed: {report.summary['failed']} ({report.summary['fail_rate']}%)") - print(f" Errors: {report.summary['error']}") + print( + f" Passed: {report.summary['total_passed']} ({report.summary['total_pass_rate']}%)" + ) + print( + f" Failed: {report.summary['total_failed']} ({report.summary['total_fail_rate']}%)" + ) + print(f" Errors: {report.summary['total_error']}") except Exception as e: logger.error( diff --git a/avise/pipelines/languagemodel/pipeline.py b/avise/pipelines/languagemodel/pipeline.py index 8a8240b..10eccee 100644 --- a/avise/pipelines/languagemodel/pipeline.py +++ b/avise/pipelines/languagemodel/pipeline.py @@ -10,8 +10,15 @@ from typing import List, Dict, Any, Optional from datetime import datetime from math import sqrt - -from .schema import LanguageModelSETCase, OutputData, EvaluationResult, ReportData +from collections import defaultdict + +from .schema import ( + LanguageModelSETCase, + OutputData, + EvaluationResult, + ReportData, + SubcategoryReport, +) from ...connectors.languagemodel.base import BaseLMConnector from ...models import EvaluationLanguageModel @@ -60,6 +67,7 @@ def __init__(self): self.evaluation_model_name: Optional[str] = None self.evaluation_model_max_tokens: Optional[int] = None self.evaluation_model: Optional[EvaluationLanguageModel] = None + self.group_by_metadata_key: Optional[str] = None @abstractmethod def initialize(self, set_config_path: str) -> List[LanguageModelSETCase]: @@ -211,11 +219,11 @@ def calculate_passrates(results: List[EvaluationResult]) -> Dict[str, Any]: return { "total_sets": total_sets, - "passed": passed, - "failed": failed, - "error": errors, - "pass_rate": pass_rate, - "fail_rate": fail_rate, + "total_passed": passed, + "total_failed": failed, + "total_error": errors, + "total_pass_rate": pass_rate, + "total_fail_rate": fail_rate, "ci_lower_bound": confidence_interval[1], "ci_upper_bound": confidence_interval[2], } @@ -258,3 +266,53 @@ def _calculate_confidence_interval( upper_bound = min(1, upper_bound) return (p, lower_bound, upper_bound) + + def _group_results_by_metadata_key( + self, results: List[EvaluationResult], key: str + ) -> List[SubcategoryReport]: + """Generic grouping utility. + Groups AnalysisResults by metadata[key]. + """ + grouped = defaultdict(list) + + for result in results: + group_value = result.metadata.get(key, "Unknown") + grouped[group_value].append(result) + + grouped_reports: List[SubcategoryReport] = [] + + for group_name, group_results in grouped.items(): + total_runs = len(group_results) + passed = sum(1 for r in group_results if r.status == "passed") + failed = sum(1 for r in group_results if r.status == "failed") + error = sum(1 for r in group_results if r.status == "error") + + pass_rate = (passed / total_runs * 100) if total_runs else 0.0 + fail_rate = (failed / total_runs * 100) if total_runs else 0.0 + + grouped_reports.append( + SubcategoryReport( + subcategory_name=group_name, + total_runs=total_runs, + passed=passed, + failed=failed, + error=error, + pass_rate=round(pass_rate, 2), + fail_rate=round(fail_rate, 2), + recommended_remediation="", + sets=group_results, + ) + ) + + return grouped_reports + + def _prepare_report_results(self, results: List[EvaluationResult]): + """Returns either flat results or grouped results + depending on group_by_metadata_key. + """ + if self.group_by_metadata_key: + return self._group_results_by_metadata_key( + results, self.group_by_metadata_key + ) + + return results diff --git a/avise/pipelines/languagemodel/schema.py b/avise/pipelines/languagemodel/schema.py index 72feeeb..cf5078b 100644 --- a/avise/pipelines/languagemodel/schema.py +++ b/avise/pipelines/languagemodel/schema.py @@ -104,6 +104,32 @@ def to_dict(self) -> Dict[str, Any]: return result +@dataclass +class SubcategoryReport: + subcategory_name: str + total_runs: int + passed: int + failed: int + error: int + pass_rate: float + fail_rate: float + recommended_remediation: str + sets: List[EvaluationResult] + + def to_dict(self) -> Dict[str, Any]: + return { + "subcategory_name": self.subcategory_name, + "total_runs": self.total_runs, + "passed": self.passed, + "failed": self.failed, + "error": self.error, + "pass_rate": self.pass_rate, + "fail_rate": self.fail_rate, + "recommended_remediation": self.recommended_remediation, + "SETs": [s.to_dict() for s in self.sets], + } + + @dataclass class ReportData: """Output of the report phase / function. @@ -115,7 +141,9 @@ class ReportData: timestamp: str execution_time_seconds: Optional[float] summary: Dict[str, Any] # total tests ran, passed%, failed%, error% rates - results: List[EvaluationResult] # All evaluation results + results: Optional[List[SubcategoryReport]] = field( + default_factory=list + ) # All analysis results, optional configuration: Dict[str, Any] = field(default_factory=dict) # Test config def to_dict(self) -> Dict[str, Any]: @@ -125,5 +153,5 @@ def to_dict(self) -> Dict[str, Any]: "execution_time_seconds": self.execution_time_seconds, "configuration": self.configuration, "summary": self.summary, - "results": [result.to_dict() for result in self.results], + "results": [r.to_dict() for r in self.results] if self.results else [], } diff --git a/avise/reportgen/reporters/html_reporter.py b/avise/reportgen/reporters/html_reporter.py index f1fd1fa..b7a4fa7 100644 --- a/avise/reportgen/reporters/html_reporter.py +++ b/avise/reportgen/reporters/html_reporter.py @@ -172,47 +172,57 @@ def _get_summary_section(self, report_data: ReportData) -> str:
Total Security Evaluation Tests
-
{summary["passed"]}
-
Passed ({summary["pass_rate"]}%)
+
{summary["total_passed"]}
+
Passed ({summary["total_pass_rate"]}%)
-
{summary["failed"]}
-
Failed ({summary["fail_rate"]}%)
+
{summary["total_failed"]}
+
Failed ({summary["total_fail_rate"]}%)
-
{summary["error"]}
+
{summary["total_error"]}
Inconclusive
""" def _get_results(self, results: list) -> str: - """Generate list of results.""" - html = """ -
-
-

Security Evaluation Test Results

-
-""" - for result in results: - if isinstance(result, EvaluationResult): - set_ = { - "set_id": result.set_id, - "prompt": result.prompt, - "response": result.response, - "status": result.status, - "reason": result.reason, - "attack_type": result.metadata.get("attack_type", ""), - "detections": result.detections, - "full_conversation": result.metadata.get("full_conversation", []), - "description": result.metadata.get("description", ""), + """Generate HTML for grouped subcategory results.""" + html = "" + for group in results: + # Access attributes instead of dictionary keys + subcategory_name = getattr(group, "subcategory_name", "") + total_passed = getattr(group, "passed", 0) + total_failed = getattr(group, "failed", 0) + total_error = getattr(group, "error", 0) + + html += f""" +
+
+

{self.escape_html(subcategory_name)}

+
Passed: {total_passed}, Failed: {total_failed}, Inconclusive: {total_error}
+
+ """ + + # Loop through the SETs in this subcategory + for set_ in getattr(group, "SETs", []): + # Build a dict to pass to _get_set_item + set_dict = { + "set_id": getattr(set_, "set_id", ""), + "prompt": getattr(set_, "prompt", ""), + "response": getattr(set_, "response", ""), + "status": getattr(set_, "status", ""), + "reason": getattr(set_, "reason", ""), + "detections": getattr(set_, "detections", {}), + "metadata": getattr(set_, "metadata", {}), + "full_conversation": getattr(set_, "full_conversation", []), + "elm_evaluation": getattr(set_, "elm_evaluation", ""), } - if result.elm_evaluation: - set_["elm_evaluation"] = result.elm_evaluation - else: - set_ = result - html += self._get_set_item(set_) - html += "
\n" + # Add attack_type for _get_set_item + set_dict["attack_type"] = set_dict["metadata"].get("attack_type", "") + html += self._get_set_item(set_dict) + + html += "
\n" return html def _get_set_item(self, set_: Dict[str, Any]) -> str: @@ -222,11 +232,16 @@ def _get_set_item(self, set_: Dict[str, Any]) -> str: set_label = f" - {set_label}" elm_html = "" - if "elm_evaluation" in set_: + elm_eval = set_.get("elm_evaluation") or "" + if elm_eval: elm_html = f"""
ELM Evaluation
+<<<<<<< HEAD + {self.escape_html(elm_eval)} +======= {self.escape_html(set_["elm_evaluation"])} +>>>>>>> main
""" # Check for conversation format (memory test) diff --git a/avise/reportgen/reporters/json_reporter.py b/avise/reportgen/reporters/json_reporter.py index 8d4195e..651740d 100644 --- a/avise/reportgen/reporters/json_reporter.py +++ b/avise/reportgen/reporters/json_reporter.py @@ -20,5 +20,11 @@ def write(self, report_data: ReportData, output_path: Path) -> None: report_data: The report data to write output_path: Path to the output file / directory """ - with open(output_path, "w") as f: - json.dump(report_data.to_dict(), f, indent=2) + data = report_data.to_dict() + + # Add grouped_results + if hasattr(report_data, "grouped_results") and report_data.grouped_results: + data["grouped_results"] = report_data.grouped_results + + with open(output_path, "w", encoding="utf-8") as f: + json.dump(data, f, indent=2)