Json Split 當 Json 檔案太大無法開啟的分割方案

2025-11-29

實用的 Python Script 透過 ijson 將大型 Json 檔案分割成多個小檔案,方便處理與分析。

logo

說明

適合處理無法透過 Visual Studio Code 開啟檢視的 JSON 陣列格式大型檔案:

[ {...}, {...}, ... ]

import json
from decimal import Decimal

def _json_default(value):
    """提供 json.dump 需要的序列化 fallback。"""
    if isinstance(value, Decimal):
        if value == value.to_integral_value():
            return int(value)
        return float(value)
    raise TypeError(f"無法序列化的型別: {type(value)!r}")

def split_json(input_file, output_prefix="part_", chunk_size=1000):
    """
    將大 JSON 陣列切成多個小檔案,每個檔案最多 chunk_size 筆資料
    """
    count = 0
    file_index = 0
    buffer = []

    with open(input_file, "r", encoding="utf-8") as f:
        # 我們用 json.load 逐筆讀取會吃太多記憶體,所以改用流式讀取
        try:
            import ijson
        except ImportError:
            print("請先安裝 ijson: pip install ijson")
            return

        try:
            objects = ijson.items(f, "item", use_float=True)  # 逐筆讀取陣列元素
        except TypeError:
            objects = ijson.items(f, "item")
        for obj in objects:
            buffer.append(obj)
            count += 1

            if count >= chunk_size:
                out_file = f"{output_prefix}{file_index}.json"
                with open(out_file, "w", encoding="utf-8") as out:
                    json.dump(buffer, out, ensure_ascii=False, indent=2, default=_json_default)
                print(f"寫入 {out_file} ({len(buffer)} 筆資料)")
                buffer = []
                count = 0
                file_index += 1

        # 處理剩餘資料
        if buffer:
            out_file = f"{output_prefix}{file_index}.json"
            with open(out_file, "w", encoding="utf-8") as out:
                json.dump(buffer, out, ensure_ascii=False, indent=2, default=_json_default)
            print(f"寫入 {out_file} ({len(buffer)} 筆資料)")

if __name__ == "__main__":
    import argparse
    parser = argparse.ArgumentParser(description="分割大 JSON 檔案")
    parser.add_argument("input_file", help="輸入 JSON 檔案(陣列格式)")
    parser.add_argument("--prefix", default="part_", help="輸出檔名前綴")
    parser.add_argument("--size", type=int, default=1, help="每個檔案最大筆數")
    args = parser.parse_args()

    split_json(args.input_file, args.prefix, args.size)