#!/usr/bin/env python3 """ Enhanced DeepSeek Training Data Generator for Scientific Summarization Generates high-quality training data with integrated cleanup and row slicing """ import requests import json import pandas as pd import time import csv import os import re import hashlib from pathlib import Path from typing import List, Tuple, Dict, Optional from datetime import datetime, timedelta class EnhancedDeepSeekTrainingDataGenerator: """Generate training data using DeepSeek API with integrated cleanup and row slicing""" def __init__(self, api_key: str, base_url: str = "https://api.deepseek.com/v1"): """ Initialize DeepSeek API client Args: api_key: Your DeepSeek API key base_url: DeepSeek API base URL """ self.api_key = api_key self.base_url = base_url self.headers = { "Authorization": f"Bearer {api_key}", "Content-Type": "application/json" } self.start_time = None self.processed_count = 0 def clean_deepseek_output(self, text: str) -> str: """ Clean up DeepSeek output to remove formatting artifacts Args: text: Raw text from DeepSeek API Returns: Cleaned text without formatting artifacts """ if not text or pd.isna(text): return text text = str(text).strip() # Remove numbered prefixes (1., 2., 3.) text = re.sub(r'^\d+\.\s*', '', text) # Remove component labels text = re.sub(r'^(ABSTRACT[_\s]*SUMMARY:?|SHORT[_\s]*SUMMARY:?|TITLE:?)', '', text, flags=re.IGNORECASE) # Remove excessive whitespace text = re.sub(r'\s+', ' ', text) # Remove trailing colons or dashes text = re.sub(r'[:\-]+$', '', text) # Remove markdown formatting text = re.sub(r'\*+', '', text) # Remove quotes that sometimes wrap the entire response text = re.sub(r'^["\']+|["\']+$', '', text) return text.strip() def create_few_shot_prompt(self, concatenated_abstracts: str, keywords: str) -> str: """ Create optimized few-shot prompt for DeepSeek with clean output formatting """ prompt = ( "You are an expert scientific summarization assistant. Generate exactly three components separated by '|||':\n" "1. ABSTRACT_SUMMARY: A detailed 4-6 sentence summary highlighting key findings, methods, and implications\n" "2. SHORT_SUMMARY: A concise 2-3 sentence summary capturing the core essence\n" "3. TITLE: A sophisticated, detailed title reflecting the research scope and methods\n\n" "CRITICAL: Respond ONLY with the three components separated by '|||'. Do not include conversational text, explanations, or markdown formatting.\n\n" "Format: ABSTRACT_SUMMARY|||SHORT_SUMMARY|||TITLE\n\n" "Focus on:\n" "- Specific computational methods, techniques, and approaches\n" "- Key biological processes and mechanisms\n" "- Research methodologies and experimental designs\n" "- Clinical or therapeutic implications\n" "- Be specific and detailed; avoid generic terms\n\n" ) # Few-shot Example 1 - Immunology/Antimicrobial Research example1_text = ( "Studies investigated mammary gland candidiasis models using immunocompetent and immunodeficient mice " "treated with amphotericin B. Complement activation analysis revealed tissue inflammation patterns. " "Research on antigen processing examined proteasome mutants lacking specific protease activities for " "peptide generation. Novel ankyrin-repeat family member MAIL was identified with nuclear localization " "potentiating IL-6 expression. Antimicrobial peptides pseudins 1-4 were isolated from frog skin showing " "activity against various pathogens." ) example1_keywords = "MAIL; proteasome; antimicrobial peptides; complement activation; mammary glands" prompt += ( f"INPUT: {example1_text}\n" f"KEYWORDS: {example1_keywords}\n" "OUTPUT: " "Comprehensive investigation of innate immune responses utilizing murine mammary gland candidiasis models " "with complement activation analysis and proteasome-mediated antigen processing pathways, complemented by " "characterization of novel antimicrobial peptides and nuclear transcription modulators. Research demonstrates " "the critical role of specific protease activities in MHC class I-restricted peptide generation while identifying " "MAIL as a nuclear factor potentiating cytokine expression and pseudins as promising therapeutic antimicrobials. " "These findings advance understanding of immunopathological mechanisms and provide validated experimental models " "for antifungal compound evaluation.|||" "Studies utilized murine models to investigate immune responses in candidiasis while characterizing novel " "antimicrobial compounds and antigen processing mechanisms. Research identified critical protease activities " "and nuclear factors regulating immune responses.|||" "Integrated Immunological Modeling and Antimicrobial Peptide Discovery: Proteasome-Mediated Antigen Processing " "and Complement-Dependent Host Defense Mechanisms\n\n" ) # Few-shot Example 2 - Biotechnology/Tissue Engineering example2_text = ( "Biotechnology development focused on hematopoietic stem cell expansion using cytokine combinations. " "Temperature-responsive polymers enabled designed cell sheet engineering for tissue applications. " "Vascular anastomosis techniques using titanium clips reduced neointimal hyperplasia. Endothelial cell " "seeding protocols for vascular grafts were optimized. Gene transfer therapies for therapeutic angiogenesis " "showed clinical promise in cardiovascular applications." ) example2_keywords = "biotechnology; tissue engineering; vascular grafts; stem cells; angiogenesis" prompt += ( f"INPUT: {example2_text}\n" f"KEYWORDS: {example2_keywords}\n" "OUTPUT: " "Advanced biotechnology approaches combining cytokine-mediated hematopoietic stem cell expansion protocols " "with temperature-responsive polymer systems for precision cell sheet engineering and vascular reconstruction. " "Integration of titanium clip anastomosis techniques and optimized endothelial cell seeding methodologies " "demonstrates significant reduction in neointimal hyperplasia while enhancing graft patency. Gene transfer " "strategies for therapeutic angiogenesis represent promising clinical interventions for cardiovascular disease " "treatment, establishing proof-of-concept for growth factor-mediated collateral vessel development.|||" "Research combines stem cell expansion technologies with polymer-based cell engineering and vascular " "reconstruction techniques. Gene therapy approaches show clinical promise for treating cardiovascular disease " "through enhanced angiogenesis.|||" "Multiscale Biotechnology Integration: Cytokine-Mediated Stem Cell Engineering and Polymer-Assisted " "Vascular Reconstruction with Gene Transfer-Enhanced Therapeutic Angiogenesis\n\n" ) # User query prompt += ( f"INPUT: {concatenated_abstracts}\n" f"KEYWORDS: {keywords}\n" "OUTPUT:" ) return prompt def call_deepseek_api(self, prompt: str, max_retries: int = 3) -> str: """ Call DeepSeek API with enhanced retry logic and timeout handling """ for attempt in range(max_retries): try: payload = { "model": "deepseek-chat", # DeepSeek-V3 instruct model "messages": [ { "role": "user", "content": prompt } ], "max_tokens": 800, "temperature": 0.7, "top_p": 0.9, "stream": False } # Enhanced timeout handling response = requests.post( f"{self.base_url}/chat/completions", headers=self.headers, json=payload, timeout=(10, 60) # (connection timeout, read timeout) ) if response.status_code == 200: result = response.json() return result['choices'][0]['message']['content'].strip() elif response.status_code == 429: # Rate limit wait_time = min(60, 2 ** attempt * 30) print(f"Rate limit hit. Waiting {wait_time} seconds...") time.sleep(wait_time) continue elif response.status_code >= 500: # Server errors wait_time = min(30, 2 ** attempt * 5) print(f"Server error {response.status_code}. Retrying in {wait_time} seconds...") time.sleep(wait_time) continue else: print(f"API Error {response.status_code}: {response.text}") if attempt < max_retries - 1: time.sleep(2 ** attempt) continue else: return "" except requests.exceptions.Timeout as e: print(f"Timeout error on attempt {attempt + 1}: {e}") if attempt < max_retries - 1: wait_time = min(30, 2 ** attempt * 10) print(f"Retrying in {wait_time} seconds...") time.sleep(wait_time) continue else: print(f"Max retries exceeded due to timeout") return "" except requests.exceptions.ConnectionError as e: print(f"Connection error on attempt {attempt + 1}: {e}") if attempt < max_retries - 1: wait_time = min(30, 2 ** attempt * 10) print(f"Retrying in {wait_time} seconds...") time.sleep(wait_time) continue else: print(f"Max retries exceeded due to connection error") return "" except Exception as e: print(f"Attempt {attempt + 1} failed: {str(e)}") if attempt < max_retries - 1: time.sleep(2 ** attempt) continue else: return "" return "" def parse_response(self, response: str) -> Tuple[str, str, str]: """ Enhanced parsing for DeepSeek responses with integrated cleanup """ if not response: return "Failed to generate", "Failed to generate", "Failed to generate" # Clean the response first response = response.strip() # Remove common DeepSeek conversational elements conversational_starters = [ "Here are the structured outputs", "Here's the structured output", "Based on the provided keywords", "Let me know if you'd like", "Would you like me to", "I can help you", "Here's my analysis" ] for starter in conversational_starters: if response.startswith(starter): # Find the actual content after conversational part lines = response.split('\n') content_lines = [] found_content = False for line in lines: if any(marker in line.upper() for marker in ['ABSTRACT_SUMMARY:', 'ABSTRACT:', '1.', '**1.']): found_content = True if found_content: content_lines.append(line) if content_lines: response = '\n'.join(content_lines) break # Remove markdown formatting response = re.sub(r'\*\*(\d+\.)\*\*', r'\1', response) # **1.** -> 1. response = re.sub(r'\*\*(.*?)\*\*', r'\1', response) # **text** -> text response = re.sub(r'^\s*---\s*$', '', response, flags=re.MULTILINE) # Remove --- lines abstract_summary = "" short_summary = "" title = "" try: # Method 1: Look for standard ||| separator if '|||' in response: parts = [part.strip() for part in response.split('|||')] if len(parts) >= 3: abstract_summary = parts[0] short_summary = parts[1] title = parts[2] elif len(parts) == 2: abstract_summary = parts[0] title = parts[1] # Generate short summary from abstract sentences = re.split(r'[.!?]+', abstract_summary) short_summary = '. '.join(sentences[:2]).strip() + '.' # Method 2: Look for numbered sections (DeepSeek's preferred format) elif "1. ABSTRACT_SUMMARY:" in response or "1.ABSTRACT_SUMMARY:" in response: # Extract by numbered sections abstract_match = re.search(r'1\.?\s*ABSTRACT_SUMMARY:\s*(.*?)(?=2\.|3\.|$)', response, re.DOTALL | re.IGNORECASE) short_match = re.search(r'2\.?\s*SHORT_SUMMARY:\s*(.*?)(?=3\.|$)', response, re.DOTALL | re.IGNORECASE) title_match = re.search(r'3\.?\s*TITLE:\s*(.*?)(?=\n\n|$)', response, re.DOTALL | re.IGNORECASE) if abstract_match: abstract_summary = abstract_match.group(1).strip() if short_match: short_summary = short_match.group(1).strip() if title_match: title = title_match.group(1).strip() # Method 3: Look for any mention of the three components else: # Try to find ABSTRACT_SUMMARY, SHORT_SUMMARY, TITLE anywhere abstract_match = re.search(r'ABSTRACT[_\s]*SUMMARY:?\s*(.*?)(?=SHORT|TITLE|$)', response, re.DOTALL | re.IGNORECASE) short_match = re.search(r'SHORT[_\s]*SUMMARY:?\s*(.*?)(?=TITLE|$)', response, re.DOTALL | re.IGNORECASE) title_match = re.search(r'TITLE:?\s*(.*?)(?=\n|$)', response, re.DOTALL | re.IGNORECASE) if abstract_match: abstract_summary = abstract_match.group(1).strip() if short_match: short_summary = short_match.group(1).strip() if title_match: title = title_match.group(1).strip() except Exception as e: print(f"Error in enhanced parsing: {e}") # Fallback: if still no content, try to extract from the full response if not abstract_summary and not short_summary and not title: # Split response into sentences and distribute intelligently sentences = re.split(r'[.!?]+', response) sentences = [s.strip() for s in sentences if s.strip() and len(s.strip()) > 10] if len(sentences) >= 6: abstract_summary = '. '.join(sentences[:4]) + '.' short_summary = '. '.join(sentences[4:6]) + '.' title = sentences[6] if len(sentences) > 6 else "Advanced Scientific Research Analysis" elif len(sentences) >= 3: abstract_summary = '. '.join(sentences[:2]) + '.' short_summary = sentences[2] + '.' title = sentences[-1] if len(sentences) > 3 else "Scientific Research Study" elif len(sentences) >= 1: abstract_summary = sentences[0] short_summary = sentences[0][:100] + "..." if len(sentences[0]) > 100 else sentences[0] title = "Scientific Analysis" else: abstract_summary = response[:200] + "..." if len(response) > 200 else response short_summary = response[:100] + "..." if len(response) > 100 else response title = "Research Summary" # Apply integrated cleanup to all components abstract_summary = self.clean_deepseek_output(abstract_summary) short_summary = self.clean_deepseek_output(short_summary) title = self.clean_deepseek_output(title) # Ensure reasonable lengths after cleanup if len(abstract_summary.split()) > 150: abstract_summary = ' '.join(abstract_summary.split()[:150]) + "..." if len(short_summary.split()) > 75: short_summary = ' '.join(short_summary.split()[:75]) + "..." if len(title.split()) > 25: title = ' '.join(title.split()[:25]) + "..." # Final validation - ensure we have actual content if not abstract_summary or abstract_summary in ["", "Content not extracted", "Content not properly extracted"]: abstract_summary = "Content generation failed" if not short_summary or short_summary in ["", "Content not extracted", "Content not properly extracted"]: short_summary = "Content generation failed" if not title or title in ["", "Content not extracted", "Content not properly extracted"]: title = "Content generation failed" return abstract_summary, short_summary, title def load_checkpoint(self, checkpoint_file: str) -> Tuple[List[Dict], set]: """ Load existing checkpoint data and return processed data + processed indices """ if os.path.exists(checkpoint_file): try: df = pd.read_csv(checkpoint_file, sep='\t') processed_data = df.to_dict('records') processed_indices = set(df['OriginalIndex'].astype(str)) print(f"โœ“ Loaded checkpoint with {len(processed_data)} processed entries") return processed_data, processed_indices except Exception as e: print(f"Error loading checkpoint: {e}") return [], set() return [], set() def save_checkpoint(self, output_data: List[Dict], checkpoint_file: str): """ Save current progress to checkpoint file """ try: df = pd.DataFrame(output_data) df.to_csv(checkpoint_file, sep='\t', index=False, quoting=csv.QUOTE_ALL) print(f"๐Ÿ’พ Checkpoint saved: {len(output_data)} entries") except Exception as e: print(f"Error saving checkpoint: {e}") def estimate_time_remaining(self, current_progress: int, total_rows: int) -> str: """ Estimate time remaining based on current progress """ if self.start_time is None or current_progress == 0: return "Calculating..." elapsed = datetime.now() - self.start_time elapsed_seconds = elapsed.total_seconds() if current_progress > 0: avg_time_per_row = elapsed_seconds / current_progress remaining_rows = total_rows - current_progress remaining_seconds = remaining_rows * avg_time_per_row remaining_time = timedelta(seconds=int(remaining_seconds)) return str(remaining_time) return "Calculating..." def process_data_file(self, input_file: str, output_file: str, delay: float = 1.0, save_every: int = 50, debug_first_n: int = 3, start_row: int = 0, end_row: Optional[int] = None): """ Process the input TSV file and generate training data with checkpointing and row slicing Args: input_file: Path to input TSV file output_file: Path to output TSV file delay: Delay between API calls to respect rate limits save_every: Save checkpoint every N processed rows debug_first_n: Print full input/output for first N generations for QC start_row: Starting row index (0-based) end_row: Ending row index (0-based, None for all remaining rows) """ self.start_time = datetime.now() # Setup checkpoint file checkpoint_file = output_file.replace('.tsv', '_checkpoint.tsv') # Load existing checkpoint output_data, processed_indices = self.load_checkpoint(checkpoint_file) # Read input data try: df = pd.read_csv(input_file, sep='\t') except Exception as e: print(f"Error reading input file: {e}") return # Apply row slicing original_length = len(df) if end_row is None: end_row = original_length else: end_row = min(end_row, original_length) if start_row >= original_length: print(f"โŒ Error: start_row {start_row} is >= total rows {original_length}") return df_slice = df.iloc[start_row:end_row].copy() total_rows = len(df_slice) initial_processed = len(output_data) print(f"๐Ÿ“Š Processing Overview:") print(f" Input file total rows: {original_length}") print(f" Processing slice: rows {start_row} to {end_row-1}") print(f" Rows in slice: {total_rows}") print(f" Already processed: {initial_processed}") print(f" Remaining: {total_rows - initial_processed}") print(f" Checkpoint saves every {save_every} rows") print(f" Estimated cost: ~${total_rows * 0.0014:.2f}") print(f" Estimated time: ~{total_rows * 1.5 / 3600:.1f} hours") print(f" Debug mode: First {debug_first_n} generations will show detailed output") print("-" * 80) successful_processed = 0 failed_processed = 0 generations_count = 0 processed_this_run = 0 for index, row in df_slice.iterrows(): original_index = str(row.get('Index', index)) # Skip if already processed if original_index in processed_indices: continue concatenated_abstracts = str(row.get('ConcatenatedAbstracts', '')) keywords = str(row.get('TopKeywords', '')) # Skip if no content if not concatenated_abstracts or concatenated_abstracts == 'nan': print(f"[{processed_this_run + 1}/{total_rows}] Skipping empty cluster {original_index}") continue actual_row_num = start_row + processed_this_run print(f"[{processed_this_run + 1}/{total_rows}] Processing row {actual_row_num} (cluster {original_index})...") # Create prompt prompt = self.create_few_shot_prompt(concatenated_abstracts, keywords) # DEBUG: Print detailed input/output for first few generations if generations_count < debug_first_n: print("\n" + "="*80) print(f"๐Ÿ” DEBUG OUTPUT FOR GENERATION #{generations_count + 1}") print("="*80) print(f"๐Ÿ“‹ CLUSTER INDEX: {original_index} (Row {actual_row_num})") print(f"๐Ÿ”‘ KEYWORDS: {keywords}") print(f"๐Ÿ“„ ABSTRACTS (first 500 chars): {concatenated_abstracts[:500]}...") print("\n" + "-"*60) print("๐Ÿ“ค FULL PROMPT BEING SENT TO API:") print("-"*60) print(prompt) print("-"*60) # Call API response = self.call_deepseek_api(prompt) # Continue debug printing if generations_count < debug_first_n: print("๐Ÿ“ฅ RAW API RESPONSE:") print("-"*60) print(response if response else "โŒ NO RESPONSE / ERROR") print("-"*60) if response: # Parse response (now includes integrated cleanup) abstract_summary, short_summary, title = self.parse_response(response) # Continue debug printing if generations_count < debug_first_n: print("๐Ÿ”ง PARSED & CLEANED COMPONENTS:") print("-"*60) print(f"๐Ÿ“ ABSTRACT SUMMARY:\n{abstract_summary}\n") print(f"โšก SHORT SUMMARY:\n{short_summary}\n") print(f"๐Ÿท๏ธ TITLE:\n{title}\n") print("="*80 + "\n") # Add to output data output_data.append({ 'OriginalIndex': original_index, 'SourceRow': actual_row_num, # Track original row number 'AbstractSummary': abstract_summary, 'ShortSummary': short_summary, 'Title': title, 'OriginalKeywords': keywords, 'OriginalText': concatenated_abstracts[:1000] + "..." if len(concatenated_abstracts) > 1000 else concatenated_abstracts }) successful_processed += 1 print(f"โœ“ Success! ({successful_processed} total successes)") else: if generations_count < debug_first_n: print("โŒ FAILED TO PARSE OR GET RESPONSE") print("="*80 + "\n") print(f"โœ— Failed to process cluster {original_index}") # Add empty entry to maintain tracking output_data.append({ 'OriginalIndex': original_index, 'SourceRow': actual_row_num, 'AbstractSummary': 'Failed to generate', 'ShortSummary': 'Failed to generate', 'Title': 'Failed to generate', 'OriginalKeywords': keywords, 'OriginalText': concatenated_abstracts[:1000] + "..." if len(concatenated_abstracts) > 1000 else concatenated_abstracts }) failed_processed += 1 generations_count += 1 processed_this_run += 1 # Update processed set processed_indices.add(original_index) # Save checkpoint periodically if len(output_data) % save_every == 0: self.save_checkpoint(output_data, checkpoint_file) time_remaining = self.estimate_time_remaining(processed_this_run, total_rows) print(f"๐Ÿ“ Checkpoint saved! Progress: {processed_this_run}/{total_rows} | ETA: {time_remaining}") # Rate limiting time.sleep(delay) # Final save try: output_df = pd.DataFrame(output_data) output_df.to_csv(output_file, sep='\t', index=False, quoting=csv.QUOTE_ALL) print(f"\n๐ŸŽ‰ GENERATION COMPLETED!") print(f"โœ“ Successfully processed: {successful_processed}") print(f"โœ— Failed: {failed_processed}") print(f"๐Ÿ“„ Total entries saved: {len(output_data)}") print(f"๐Ÿ’พ Final output saved to: {output_file}") print(f"๐Ÿ’ฐ Estimated cost: ~${successful_processed * 0.0014:.2f}") print(f"๐Ÿ“Š Processed rows {start_row} to {end_row-1} from source file") # Clean up checkpoint file if os.path.exists(checkpoint_file): os.remove(checkpoint_file) print(f"๐Ÿ—‘๏ธ Checkpoint file cleaned up") except Exception as e: print(f"Error saving final output file: {e}") print(f"Your data is still safe in checkpoint: {checkpoint_file}") def main(): """ Main function to run the training data generation with row slicing """ # Configuration for processing all 30,000 examples API_KEY = "sk-6185ef64c68d473d984963356ab0378e" # Replace with your actual API key INPUT_FILE = "/home/joneill/pubmed_clustered_data_sciner.tsv" # Your input TSV file # Row slicing configuration - MODIFY THESE FOR YOUR BATCHES START_ROW = 0 # Starting row (0-based) END_ROW = 30000 # Ending row (None for all rows, or specify number) BATCH_NAME = "full" # Used in output filename # You can also run in batches, e.g.: # Batch 1: START_ROW = 0, END_ROW = 5000, BATCH_NAME = "batch1" # Batch 2: START_ROW = 5000, END_ROW = 10000, BATCH_NAME = "batch2" # Batch 3: START_ROW = 10000, END_ROW = 15000, BATCH_NAME = "batch3" # etc. OUTPUT_FILE = f"bsg_training_data_{BATCH_NAME}.tsv" # Output file for training data DELAY_BETWEEN_CALLS = 1.0 # Seconds between API calls SAVE_EVERY = 50 # Save checkpoint every N rows DEBUG_FIRST_N = 3 # Print full input/output for first N generations for QC # Initialize generator generator = EnhancedDeepSeekTrainingDataGenerator(API_KEY) # Calculate batch info total_rows_to_process = END_ROW - START_ROW if END_ROW else "all remaining" # Process data print("๐Ÿš€ Starting Enhanced DeepSeek Training Data Generation") print("="*80) print(f"๐ŸŽฏ Processing: {total_rows_to_process} rows (from row {START_ROW} to {END_ROW-1 if END_ROW else 'end'})") print(f"๐Ÿ’ฐ Estimated cost: ~${(END_ROW - START_ROW if END_ROW else 30000) * 0.0014:.2f}") print(f"โฑ๏ธ Estimated time: ~{(END_ROW - START_ROW if END_ROW else 30000) * 1.5 / 3600:.1f} hours") print(f"๐Ÿ” Debug mode: Will show detailed input/output for first {DEBUG_FIRST_N} generations") print(f"๐Ÿ’พ Automatic checkpointing every {SAVE_EVERY} rows") print(f"๐Ÿ”„ Auto-resume: Restart script to continue from checkpoint") print(f"๐Ÿงน Integrated cleanup: All outputs automatically cleaned of formatting artifacts") print("="*80) generator.process_data_file( INPUT_FILE, OUTPUT_FILE, DELAY_BETWEEN_CALLS, SAVE_EVERY, DEBUG_FIRST_N, START_ROW, END_ROW ) print("\n๐ŸŽ‰ Training data generation completed!") print(f"๐Ÿ“ Output file: {OUTPUT_FILE}") print("โœจ Data is automatically cleaned and ready for training! ๐Ÿงช") if __name__ == "__main__": main()