|
|
import os |
|
|
import pyarrow.parquet as pq |
|
|
from glob import glob |
|
|
from tqdm import tqdm |
|
|
|
|
|
INPUT_DIRS = [ |
|
|
"books", |
|
|
"fineweb", |
|
|
"wikipedia", |
|
|
] |
|
|
|
|
|
OUTPUT_DIR = "merged_text" |
|
|
os.makedirs(OUTPUT_DIR, exist_ok=True) |
|
|
OUT_FILE = os.path.join(OUTPUT_DIR, "corpus.txt") |
|
|
|
|
|
def extract_text_from_parquet(path): |
|
|
try: |
|
|
table = pq.read_table(path) |
|
|
df = table.to_pandas() |
|
|
|
|
|
|
|
|
for col in ["text", "content", "document", "article", "source"]: |
|
|
if col in df.columns: |
|
|
return df[col].astype(str).tolist() |
|
|
|
|
|
|
|
|
for col in df.columns: |
|
|
if df[col].dtype == object: |
|
|
return df[col].astype(str).tolist() |
|
|
|
|
|
return [] |
|
|
except Exception as e: |
|
|
print(f"Error reading {path}: {e}") |
|
|
return [] |
|
|
|
|
|
all_parquet_files = [] |
|
|
for d in INPUT_DIRS: |
|
|
all_parquet_files.extend(glob(f"{d}/**/*.parquet", recursive=True)) |
|
|
|
|
|
print("Total parquet files found:", len(all_parquet_files)) |
|
|
|
|
|
with open(OUT_FILE, "w", encoding="utf-8") as fout: |
|
|
for file in tqdm(all_parquet_files, desc="Extracting text"): |
|
|
texts = extract_text_from_parquet(file) |
|
|
for t in texts: |
|
|
t = t.strip() |
|
|
if len(t) < 50: |
|
|
continue |
|
|
if not any(c.isalpha() for c in t): |
|
|
continue |
|
|
fout.write(t + "\n\n") |
|
|
|
|
|
print("DONE! Saved merged corpus →", OUT_FILE) |
|
|
|