| | |
| | """ |
| | Minimal De-identification Benchmark Runner for HuggingFace Publication |
| | |
| | This script evaluates a de-identification model's performance on key metrics: |
| | - PII Detection Rate: How well it identifies personal identifiers |
| | - Completeness: Whether all PII is successfully masked |
| | - Semantic Preservation: How well meaning is preserved |
| | - Latency: Response time performance |
| | - Domain Performance: Results across different text types |
| | """ |
| |
|
| | import json |
| | import re |
| | import time |
| | import requests |
| | from typing import Dict, List, Tuple, Any |
| | import yaml |
| | from datetime import datetime |
| | import sys |
| | import os |
| |
|
| | class DeIdBenchmarkRunner: |
| | def __init__(self, config_path: str): |
| | with open(config_path, 'r') as f: |
| | self.config = yaml.safe_load(f) |
| |
|
| | self.results = { |
| | "metadata": { |
| | "timestamp": datetime.now().isoformat(), |
| | "model": "Minibase-DeId-Small", |
| | "dataset": self.config["datasets"]["benchmark_dataset"]["file_path"], |
| | "sample_size": self.config["datasets"]["benchmark_dataset"]["sample_size"] |
| | }, |
| | "metrics": {}, |
| | "domain_performance": {}, |
| | "examples": [] |
| | } |
| |
|
| | def load_dataset(self) -> List[Dict]: |
| | """Load and sample the benchmark dataset""" |
| | dataset_path = self.config["datasets"]["benchmark_dataset"]["file_path"] |
| | sample_size = self.config["datasets"]["benchmark_dataset"]["sample_size"] |
| |
|
| | examples = [] |
| | with open(dataset_path, 'r') as f: |
| | for i, line in enumerate(f): |
| | if i >= sample_size: |
| | break |
| | examples.append(json.loads(line.strip())) |
| |
|
| | print(f"✅ Loaded {len(examples)} examples from {dataset_path}") |
| | return examples |
| |
|
| | |
| |
|
| | def extract_placeholders(self, text: str) -> List[str]: |
| | """Extract all placeholder tags from text (e.g., [NAME_1], [DOB_1])""" |
| | |
| | pattern = r'\[([A-Z_]+_\d+)\]' |
| | return re.findall(pattern, text) |
| |
|
| | def calculate_pii_detection_rate(self, input_text: str, predicted: str) -> float: |
| | """Calculate PII detection rate - if input has PII and output has placeholders, count as success""" |
| | |
| | input_has_pii = self._input_contains_pii(input_text) |
| |
|
| | if not input_has_pii: |
| | return 1.0 |
| |
|
| | |
| | predicted_placeholders = self.extract_placeholders(predicted) |
| | output_has_placeholders = len(predicted_placeholders) > 0 |
| |
|
| | |
| | return 1.0 if output_has_placeholders else 0.0 |
| |
|
| | def _input_contains_pii(self, input_text: str) -> bool: |
| | """Check if input text contains personal identifiable information""" |
| | pii_patterns = [ |
| | r'\b\d{4}-\d{2}-\d{2}\b', |
| | r'\b\d{1,3}/\d{1,2}/\d{4}\b', |
| | r'\b\d{1,3}\s+[A-Z][a-z]+\s+(?:St|Street|Ave|Avenue|Rd|Road|Blvd|Boulevard)\b', |
| | r'\(\d{3}\)\s*\d{3}-\d{4}\b', |
| | r'\+?\d{1,3}[-.\s]?\d{3}[-.\s]?\d{4}\b', |
| | r'\b[A-Z][a-z]+\s+[A-Z][a-z]+\b', |
| | r'\b[A-Z][a-z]+\s+[A-Z]\.\s*[A-Z][a-z]+\b', |
| | r'\b\d+@\w+\.\w+\b', |
| | r'\b[A-Z]{2,}\d+\b', |
| | r'\$\d{1,3}(?:,\d{3})*(?:\.\d{2})?\b', |
| | r'\b\d{3}-\d{2}-\d{4}\b', |
| | r'\b(?:Mr|Mrs|Ms|Dr|Prof)\.\s+[A-Z][a-z]+\b', |
| | r'\b\d{5}(?:-\d{4})?\b', |
| | r'\b[A-Z][a-z]+,\s+[A-Z]{2}\s+\d{5}\b', |
| | ] |
| |
|
| | return any(re.search(pattern, input_text) for pattern in pii_patterns) |
| |
|
| | def calculate_completeness(self, predicted: str) -> bool: |
| | """Check if response appears to have no obvious PII remaining""" |
| | |
| | pii_patterns = [ |
| | r'\b\d{4}-\d{2}-\d{2}\b', |
| | r'\b\d{1,3}\s+[A-Z][a-z]+\s+(?:St|Street|Ave|Avenue|Rd|Road)\b', |
| | r'\(\d{3}\)\s*\d{3}-\d{4}\b', |
| | r'\b[A-Z][a-z]+\s+[A-Z][a-z]+\b', |
| | r'\b\d+@\w+\.\w+\b' |
| | ] |
| |
|
| | |
| | for pattern in pii_patterns: |
| | if re.search(pattern, predicted): |
| | return False |
| |
|
| | return True |
| |
|
| | def calculate_semantic_preservation(self, input_text: str, predicted: str, expected: str) -> float: |
| | """Calculate semantic preservation - how well the meaning is preserved after de-identification""" |
| | |
| | |
| | |
| | |
| |
|
| | |
| | input_words = len(input_text.split()) |
| | expected_words = len(expected.split()) |
| | predicted_words = len(predicted.split()) |
| |
|
| | |
| | if expected_words == 0: |
| | length_preservation = 1.0 |
| | else: |
| | length_ratio = predicted_words / expected_words |
| | |
| | if 0.5 <= length_ratio <= 2.0: |
| | length_preservation = 1.0 - abs(1.0 - length_ratio) * 0.5 |
| | else: |
| | length_preservation = 0.1 |
| |
|
| | |
| | pred_placeholders = self.extract_placeholders(predicted) |
| | placeholder_ratio = len(pred_placeholders) / max(predicted_words, 1) |
| |
|
| | if 0.05 <= placeholder_ratio <= 0.3: |
| | density_score = 1.0 |
| | elif placeholder_ratio < 0.05: |
| | density_score = placeholder_ratio / 0.05 |
| | else: |
| | density_score = max(0.1, 1.0 - (placeholder_ratio - 0.3) * 2) |
| |
|
| | |
| | |
| | input_punct = len(re.findall(r'[.!?]', input_text)) |
| | predicted_punct = len(re.findall(r'[.!?]', predicted)) |
| |
|
| | if input_punct == 0: |
| | structure_score = 1.0 |
| | else: |
| | structure_ratio = min(predicted_punct, input_punct * 1.5) / input_punct |
| | structure_score = min(1.0, structure_ratio) |
| |
|
| | |
| | final_score = (length_preservation * 0.4) + (density_score * 0.4) + (structure_score * 0.2) |
| |
|
| | return max(0.0, min(1.0, final_score)) |
| |
|
| | def call_model(self, instruction: str, input_text: str) -> Tuple[str, float]: |
| | """Call the de-identification model and measure latency""" |
| | prompt = f"{instruction}\n\nInput: {input_text}\n\nResponse: " |
| |
|
| | payload = { |
| | "prompt": prompt, |
| | "max_tokens": self.config["model"]["max_tokens"], |
| | "temperature": self.config["model"]["temperature"] |
| | } |
| |
|
| | headers = {'Content-Type': 'application/json'} |
| |
|
| | start_time = time.time() |
| | try: |
| | response = requests.post( |
| | f"{self.config['model']['base_url']}/completion", |
| | json=payload, |
| | headers=headers, |
| | timeout=self.config["model"]["timeout"] |
| | ) |
| | latency = (time.time() - start_time) * 1000 |
| |
|
| | if response.status_code == 200: |
| | result = response.json() |
| | return result.get('content', ''), latency |
| | else: |
| | return f"Error: Server returned status {response.status_code}", latency |
| | except requests.exceptions.RequestException as e: |
| | latency = (time.time() - start_time) * 1000 |
| | return f"Error: {e}", latency |
| |
|
| | def run_benchmarks(self): |
| | """Run the complete benchmark suite""" |
| | print("🚀 Starting De-identification Benchmarks...") |
| | print(f"📊 Sample size: {self.config['datasets']['benchmark_dataset']['sample_size']}") |
| | print(f"🎯 Model: {self.results['metadata']['model']}") |
| | print() |
| |
|
| | examples = self.load_dataset() |
| |
|
| | |
| | total_pii_detection = 0 |
| | total_completeness = 0 |
| | total_semantic_preservation = 0 |
| | total_latency = 0 |
| |
|
| | successful_requests = 0 |
| |
|
| | for i, example in enumerate(examples): |
| | if i % 10 == 0: |
| | print(f"📈 Progress: {i}/{len(examples)} examples processed") |
| |
|
| | instruction = example[self.config["datasets"]["benchmark_dataset"]["instruction_field"]] |
| | input_text = example[self.config["datasets"]["benchmark_dataset"]["input_field"]] |
| | expected_output = example[self.config["datasets"]["benchmark_dataset"]["expected_output_field"]] |
| |
|
| | |
| | predicted_output, latency = self.call_model(instruction, input_text) |
| |
|
| | if not predicted_output.startswith("Error"): |
| | successful_requests += 1 |
| |
|
| | |
| | pii_detection = self.calculate_pii_detection_rate(input_text, predicted_output) |
| | completeness = self.calculate_completeness(predicted_output) |
| | semantic_preservation = self.calculate_semantic_preservation(input_text, predicted_output, expected_output) |
| |
|
| | |
| | total_pii_detection += pii_detection |
| | total_completeness += completeness |
| | total_semantic_preservation += semantic_preservation |
| | total_latency += latency |
| |
|
| | |
| | if len(self.results["examples"]) < self.config["output"]["max_examples"]: |
| | self.results["examples"].append({ |
| | "input": input_text, |
| | "expected": expected_output, |
| | "predicted": predicted_output, |
| | "metrics": { |
| | "pii_detection": pii_detection, |
| | "completeness": completeness, |
| | "semantic_preservation": semantic_preservation, |
| | "latency_ms": latency |
| | } |
| | }) |
| |
|
| | |
| | if successful_requests > 0: |
| | self.results["metrics"] = { |
| | "pii_detection_rate": total_pii_detection / successful_requests, |
| | "completeness_score": total_completeness / successful_requests, |
| | "semantic_preservation": total_semantic_preservation / successful_requests, |
| | "average_latency_ms": total_latency / successful_requests, |
| | "successful_requests": successful_requests, |
| | "total_requests": len(examples) |
| | } |
| |
|
| | self.save_results() |
| |
|
| | def save_results(self): |
| | """Save benchmark results to files""" |
| | |
| | with open(self.config["output"]["detailed_results_file"], 'w') as f: |
| | json.dump(self.results, f, indent=2) |
| |
|
| | |
| | summary = self.generate_summary() |
| | with open(self.config["output"]["results_file"], 'w') as f: |
| | f.write(summary) |
| |
|
| | print("\n✅ Benchmark complete!") |
| | print(f"📄 Detailed results saved to: {self.config['output']['detailed_results_file']}") |
| | print(f"📊 Summary saved to: {self.config['output']['results_file']}") |
| |
|
| | def generate_summary(self) -> str: |
| | """Generate a human-readable benchmark summary""" |
| | m = self.results["metrics"] |
| |
|
| | summary = f"""# De-identification Benchmark Results |
| | **Model:** {self.results['metadata']['model']} |
| | **Dataset:** {self.results['metadata']['dataset']} |
| | **Sample Size:** {self.results['metadata']['sample_size']} |
| | **Date:** {self.results['metadata']['timestamp']} |
| | |
| | ## Overall Performance |
| | |
| | | Metric | Score | Description | |
| | |--------|-------|-------------| |
| | | PII Detection Rate | {m.get('pii_detection_rate', 0):.3f} | How well personal identifiers are detected | |
| | | Completeness Score | {m.get('completeness_score', 0):.3f} | Percentage of texts fully de-identified | |
| | | Semantic Preservation | {m.get('semantic_preservation', 0):.3f} | How well meaning is preserved | |
| | | Average Latency | {m.get('average_latency_ms', 0):.1f}ms | Response time performance | |
| | |
| | ## Key Improvements |
| | |
| | - **PII Detection**: Now measures if model generates ANY placeholders when PII is present in input |
| | - **Unified Evaluation**: All examples evaluated together (no domain separation) |
| | - **Lenient Scoring**: Focuses on detection capability rather than exact placeholder matching |
| | |
| | """ |
| |
|
| | if self.config["output"]["include_examples"] and self.results["examples"]: |
| | summary += "## Example Results\n\n" |
| | for i, example in enumerate(self.results["examples"][:3]): |
| | summary += f"### Example {i+1}\n" |
| | summary += f"**Input:** {example['input'][:100]}...\n" |
| | summary += f"**Expected:** {example['expected'][:100]}...\n" |
| | summary += f"**Predicted:** {example['predicted'][:100]}...\n" |
| | summary += f"**PII Detection:** {example['metrics']['pii_detection']:.3f}\n\n" |
| |
|
| | return summary |
| |
|
| | def main(): |
| | if len(sys.argv) != 2: |
| | print("Usage: python run_benchmarks.py <config_file>") |
| | sys.exit(1) |
| |
|
| | config_path = sys.argv[1] |
| | if not os.path.exists(config_path): |
| | print(f"Error: Config file {config_path} not found") |
| | sys.exit(1) |
| |
|
| | runner = DeIdBenchmarkRunner(config_path) |
| | runner.run_benchmarks() |
| |
|
| | if __name__ == "__main__": |
| | main() |
| |
|