| | import json |
| | import os |
| | import re |
| | import pandas as pd |
| | import nltk |
| | from nltk.corpus import stopwords |
| | from nltk.stem import WordNetLemmatizer |
| | from sklearn.feature_extraction.text import TfidfVectorizer |
| | from typing import List, Dict, Any |
| | from tqdm import tqdm |
| | import hashlib |
| |
|
| | |
| | required_packages = ['punkt', 'stopwords', 'wordnet'] |
| | for package in required_packages: |
| | try: |
| | nltk.data.find(f'tokenizers/{package}' if package == 'punkt' else f'corpora/{package}') |
| | print(f"Package {package} is already downloaded.") |
| | except LookupError: |
| | print(f"Downloading NLTK {package}...") |
| | nltk.download(package) |
| |
|
| | class QAProcessor: |
| | def __init__(self, data_dir: str, output_path: str): |
| | self.data_dir = data_dir |
| | self.output_path = output_path |
| | self.lemmatizer = WordNetLemmatizer() |
| | self.stop_words = set(stopwords.words('english')) |
| | self.medical_stopwords = {'disease', 'patient', 'treatment', 'condition', 'symptom', 'doctor', 'health', 'may', 'also', 'one', 'use'} |
| | self.stop_words.update(self.medical_stopwords) |
| | |
| | def clean_text(self, text: str) -> str: |
| | """清理文本""" |
| | if isinstance(text, str): |
| | |
| | text = re.sub(r'<.*?>', ' ', text) |
| | |
| | text = re.sub(r'[^\w\s]', ' ', text) |
| | |
| | text = re.sub(r'\s+', ' ', text).strip().lower() |
| | return text |
| | return "" |
| |
|
| | def simple_tokenize(self, text: str) -> List[str]: |
| | """简单的分词函数""" |
| | text = text.lower().strip() |
| | tokens = re.findall(r'\b\w+\b', text) |
| | return tokens |
| |
|
| | def extract_keywords(self, text: str, top_n: int = 10) -> List[str]: |
| | """使用TF-IDF提取关键词""" |
| | if not isinstance(text, str) or not text.strip(): |
| | return [] |
| | |
| | tokens = self.simple_tokenize(text) |
| | filtered_tokens = [self.lemmatizer.lemmatize(token) |
| | for token in tokens |
| | if token.isalpha() and token not in self.stop_words and len(token) > 2] |
| | |
| | if filtered_tokens: |
| | vectorizer = TfidfVectorizer(max_features=top_n) |
| | try: |
| | tfidf_matrix = vectorizer.fit_transform([' '.join(filtered_tokens)]) |
| | feature_names = vectorizer.get_feature_names_out() |
| | |
| | scores = zip(feature_names, tfidf_matrix.toarray()[0]) |
| | sorted_scores = sorted(scores, key=lambda x: x[1], reverse=True) |
| | |
| | return [word for word, score in sorted_scores[:top_n] if score > 0] |
| | except Exception as e: |
| | print(f"TF-IDF提取失败: {e}") |
| | return filtered_tokens[:top_n] |
| | return [] |
| |
|
| | def process_data(self) -> tuple[List[Dict[str, Any]], Dict[str, List[str]]]: |
| | """处理所有数据源""" |
| | qa_database = [] |
| | keyword_index = {} |
| |
|
| | |
| | healthline_path = os.path.join(self.data_dir,'Healthline', 'healthline_articles_text.csv') |
| | if os.path.exists(healthline_path): |
| | print("处理Healthline数据...") |
| | healthline_df = pd.read_csv(healthline_path) |
| | for idx, row in tqdm(healthline_df.iterrows(), total=len(healthline_df)): |
| | title = row.get('title', '') |
| | content = row.get('content', '') |
| | |
| | if not isinstance(title, str) or not isinstance(content, str): |
| | continue |
| | |
| | clean_title = self.clean_text(title) |
| | clean_content = self.clean_text(content) |
| | |
| | qa_pair = { |
| | 'id': f"healthline_{idx}", |
| | 'source': 'healthline', |
| | 'question': clean_title, |
| | 'answer': clean_content, |
| | 'keywords': self.extract_keywords(clean_title + " " + clean_content) |
| | } |
| | |
| | qa_database.append(qa_pair) |
| | |
| | |
| | for keyword in qa_pair['keywords']: |
| | if keyword not in keyword_index: |
| | keyword_index[keyword] = [] |
| | keyword_index[keyword].append(qa_pair['id']) |
| |
|
| | |
| | medqa_dir = os.path.join(self.data_dir, 'MedQA') |
| | if os.path.exists(medqa_dir): |
| | print("处理MedQA数据...") |
| | for file_name in os.listdir(medqa_dir): |
| | if file_name.endswith('.csv'): |
| | dataset_name = file_name.split('.')[0] |
| | df = pd.read_csv(os.path.join(medqa_dir, file_name)) |
| | |
| | question_col = next((col for col in ['Question', 'question'] if col in df.columns), None) |
| | answer_col = next((col for col in ['Answer', 'answer'] if col in df.columns), None) |
| | |
| | if not question_col or not answer_col: |
| | print(f"跳过 {dataset_name} - 缺少问题/答案列") |
| | continue |
| | |
| | for idx, row in tqdm(df.iterrows(), total=len(df)): |
| | question = row.get(question_col, '') |
| | answer = row.get(answer_col, '') |
| | |
| | if not isinstance(question, str) or not isinstance(answer, str): |
| | continue |
| | |
| | clean_question = self.clean_text(question) |
| | clean_answer = self.clean_text(answer) |
| | |
| | qa_pair = { |
| | 'id': f"{dataset_name}_{idx}", |
| | 'source': dataset_name, |
| | 'question': clean_question, |
| | 'answer': clean_answer, |
| | 'keywords': self.extract_keywords(clean_question + " " + clean_answer) |
| | } |
| | |
| | qa_database.append(qa_pair) |
| | |
| | for keyword in qa_pair['keywords']: |
| | if keyword not in keyword_index: |
| | keyword_index[keyword] = [] |
| | keyword_index[keyword].append(qa_pair['id']) |
| |
|
| | return qa_database, keyword_index |
| |
|
| | def save_results(self, qa_database: List[Dict[str, Any]], keyword_index: Dict[str, List[str]]): |
| | """保存处理后的数据""" |
| | |
| | qa_output = os.path.join(self.output_path, 'cleaned_qa','qa_database.json') |
| | keyword_output = os.path.join(self.output_path, 'keywords','keyword_index.json') |
| | os.makedirs(os.path.dirname(qa_output), exist_ok=True) |
| | os.makedirs(os.path.dirname(keyword_output), exist_ok=True) |
| |
|
| | |
| | with open(qa_output, 'w', encoding='utf-8') as f: |
| | json.dump(qa_database, f, ensure_ascii=False, indent=2) |
| | |
| | with open(keyword_output, 'w', encoding='utf-8') as f: |
| | json.dump(keyword_index, f, ensure_ascii=False, indent=2) |
| | |
| | print(f"处理完成的QA对数量: {len(qa_database)}") |
| | print(f"关键词索引中的关键词数量: {len(keyword_index)}") |
| | print(f"数据已保存到: {self.output_path}") |
| |
|
| | if __name__ == "__main__": |
| | |
| | data_dir = './Data/raw' |
| | output_path = './Data/Processed/' |
| | |
| | |
| | processor = QAProcessor(data_dir, output_path) |
| | qa_database, keyword_index = processor.process_data() |
| | processor.save_results(qa_database, keyword_index) |