#!/usr/bin/env python3 # -*- coding: utf-8 -*- """Parse 종량제_개발목록_20260127(기본코드 종류).csv → writable/database/code_master_sync_from_csv.sql""" from __future__ import annotations import csv import re import sys from pathlib import Path def read_csv_rows(path: Path) -> list[list[str]]: raw = path.read_text(encoding="utf-8-sig") return list(csv.reader(raw.splitlines())) def extract_pairs(fields: list[str], first_code_idx: int, ncols: int, step: int = 3) -> list[tuple[str, str]]: pairs: list[tuple[str, str]] = [] for k in range(ncols): i = first_code_idx + k * step if i + 1 >= len(fields): pairs.append(("", "")) continue pairs.append((fields[i].strip(), fields[i + 1].strip())) return pairs SKIP_NAME = re.compile(r"코드\s*[ABCD]|순번\s*두자리|등록되는구의", re.I) def valid_detail(cd_code: str, cd_name: str) -> bool: if not cd_code or not cd_name: return False if cd_code in ("세부코드", "코드명"): return False if SKIP_NAME.search(cd_name) or SKIP_NAME.search(cd_code): return False return True def dedup_pairs(pairs: list[tuple[str, str]]) -> list[tuple[str, str]]: seen: set[str] = set() out: list[tuple[str, str]] = [] for c, n in pairs: if c in seen: continue seen.add(c) out.append((c, n)) return out def main() -> int: root = Path(__file__).resolve().parents[2] csv_path = root / "docs/종량제 관련 자료/종량제 개발목록/종량제_개발목록_20260127(기본코드 종류).csv" if not csv_path.exists(): print("CSV not found:", csv_path, file=sys.stderr) return 1 rows = read_csv_rows(csv_path) details: dict[str, list[tuple[str, str]]] = {chr(65 + i): [] for i in range(25)} KIND_NAMES: dict[str, str] = {} for fields in rows: if len(fields) >= 2: a, b = fields[0].strip(), fields[1].strip() if len(a) == 1 and a.isalpha() and "A" <= a <= "Y" and b and "세부코드" not in b: KIND_NAMES[a] = b # 블록 A–I: CSV 상 4행째~ ≈ rows[3]부터 동·메모 행 전까지 (rows[3:32]) for fields in rows[3:32]: if len(fields) < 5: continue pairs = extract_pairs(fields, 3, 9, step=3) for col, let in enumerate("ABCDEFGHI"): c, n = pairs[col] if valid_detail(c, n): details[let].append((c, n)) # 블록 J–R: 데이터 rows[37:64] for fields in rows[37:64]: if len(fields) < 5: continue pairs = extract_pairs(fields, 3, 9, step=3) for col, let in enumerate("JKLMNOPQR"): c, n = pairs[col] if valid_detail(c, n): details[let].append((c, n)) # 블록 S–Y: rows[68:] for fields in rows[68:]: if len(fields) < 5: continue pairs = extract_pairs(fields, 3, 7, step=3) for col, let in enumerate("STUVWXY"): c, n = pairs[col] if valid_detail(c, n): details[let].append((c, n)) for L in details: details[L] = dedup_pairs(details[L]) sql: list[str] = [ "-- Sync missing rows from 종량제_개발목록_20260127(기본코드 종류).csv", "-- Generated by writable/tools/sync_basic_codes_from_csv.py", "SET NAMES utf8mb4;", "", ] for L in "ABCDEFGHIJKLMNOPQRSTUVWXY": name = KIND_NAMES.get(L, L) ne = name.replace("'", "''") sql.append( f"INSERT INTO `code_kind` (`ck_code`, `ck_name`, `ck_state`, `ck_regdate`) " f"SELECT '{L}', '{ne}', 1, NOW() FROM DUAL " f"WHERE NOT EXISTS (SELECT 1 FROM `code_kind` c WHERE c.ck_code = '{L}');" ) sql.append("") for L in "ABCDEFGHIJKLMNOPQRSTUVWXY": sort_i = 0 for c, n in details[L]: sort_i += 10 ce = c.replace("'", "''") ne = n.replace("'", "''") sql.append( "INSERT INTO `code_detail` (`cd_ck_idx`, `cd_code`, `cd_name`, `cd_sort`, `cd_state`, `cd_regdate`) " f"SELECT k.ck_idx, '{ce}', '{ne}', {sort_i}, 1, NOW() FROM `code_kind` k " f"WHERE k.ck_code = '{L}' AND NOT EXISTS (" "SELECT 1 FROM `code_detail` d WHERE d.cd_ck_idx = k.ck_idx AND d.cd_code = " f"'{ce}'" ");" ) out_path = root / "writable/database/code_master_sync_from_csv.sql" out_path.write_text("\n".join(sql) + "\n", encoding="utf-8") print("Wrote", out_path) for L in "ABCDEFGHIJKLMNOPQRSTUVWXY": print(f" {L}: {len(details[L])} details (unique cd_code)") return 0 if __name__ == "__main__": raise SystemExit(main())