- /bag/code-kinds, /bag/code-details/{ck_idx} 조회 (LoginAuthFilter, Roles::canManageCodeMaster)
- admin에서는 종류·세부 목록 제거, 등록·수정·삭제만 유지 후 bag으로 리다이렉트
- 사이트 메뉴·기본코드 링크 SQL, CSV 동기화 스크립트·README 보강
- 관리자 대시보드: 발주·판매 테이블 미존재 시 통계 비활성화
- 회원 로그인 잠금(mb_login_fail_count, mb_locked_until) 및 관리자 잠금 해제
Made-with: Cursor
146 lines
4.7 KiB
Python
146 lines
4.7 KiB
Python
#!/usr/bin/env python3
|
||
# -*- coding: utf-8 -*-
|
||
"""Parse 종량제_개발목록_20260127(기본코드 종류).csv → writable/database/code_master_sync_from_csv.sql"""
|
||
|
||
from __future__ import annotations
|
||
|
||
import csv
|
||
import re
|
||
import sys
|
||
from pathlib import Path
|
||
|
||
|
||
def read_csv_rows(path: Path) -> list[list[str]]:
|
||
raw = path.read_text(encoding="utf-8-sig")
|
||
return list(csv.reader(raw.splitlines()))
|
||
|
||
|
||
def extract_pairs(fields: list[str], first_code_idx: int, ncols: int, step: int = 3) -> list[tuple[str, str]]:
|
||
pairs: list[tuple[str, str]] = []
|
||
for k in range(ncols):
|
||
i = first_code_idx + k * step
|
||
if i + 1 >= len(fields):
|
||
pairs.append(("", ""))
|
||
continue
|
||
pairs.append((fields[i].strip(), fields[i + 1].strip()))
|
||
return pairs
|
||
|
||
|
||
SKIP_NAME = re.compile(r"코드\s*[ABCD]|순번\s*두자리|등록되는구의", re.I)
|
||
|
||
|
||
def valid_detail(cd_code: str, cd_name: str) -> bool:
|
||
if not cd_code or not cd_name:
|
||
return False
|
||
if cd_code in ("세부코드", "코드명"):
|
||
return False
|
||
if SKIP_NAME.search(cd_name) or SKIP_NAME.search(cd_code):
|
||
return False
|
||
return True
|
||
|
||
|
||
def dedup_pairs(pairs: list[tuple[str, str]]) -> list[tuple[str, str]]:
|
||
seen: set[str] = set()
|
||
out: list[tuple[str, str]] = []
|
||
for c, n in pairs:
|
||
if c in seen:
|
||
continue
|
||
seen.add(c)
|
||
out.append((c, n))
|
||
return out
|
||
|
||
|
||
def main() -> int:
|
||
root = Path(__file__).resolve().parents[2]
|
||
csv_path = root / "docs/종량제 관련 자료/종량제 개발목록/종량제_개발목록_20260127(기본코드 종류).csv"
|
||
if not csv_path.exists():
|
||
print("CSV not found:", csv_path, file=sys.stderr)
|
||
return 1
|
||
|
||
rows = read_csv_rows(csv_path)
|
||
details: dict[str, list[tuple[str, str]]] = {chr(65 + i): [] for i in range(25)}
|
||
|
||
KIND_NAMES: dict[str, str] = {}
|
||
for fields in rows:
|
||
if len(fields) >= 2:
|
||
a, b = fields[0].strip(), fields[1].strip()
|
||
if len(a) == 1 and a.isalpha() and "A" <= a <= "Y" and b and "세부코드" not in b:
|
||
KIND_NAMES[a] = b
|
||
|
||
# 블록 A–I: CSV 상 4행째~ ≈ rows[3]부터 동·메모 행 전까지 (rows[3:32])
|
||
for fields in rows[3:32]:
|
||
if len(fields) < 5:
|
||
continue
|
||
pairs = extract_pairs(fields, 3, 9, step=3)
|
||
for col, let in enumerate("ABCDEFGHI"):
|
||
c, n = pairs[col]
|
||
if valid_detail(c, n):
|
||
details[let].append((c, n))
|
||
|
||
# 블록 J–R: 데이터 rows[37:64]
|
||
for fields in rows[37:64]:
|
||
if len(fields) < 5:
|
||
continue
|
||
pairs = extract_pairs(fields, 3, 9, step=3)
|
||
for col, let in enumerate("JKLMNOPQR"):
|
||
c, n = pairs[col]
|
||
if valid_detail(c, n):
|
||
details[let].append((c, n))
|
||
|
||
# 블록 S–Y: rows[68:]
|
||
for fields in rows[68:]:
|
||
if len(fields) < 5:
|
||
continue
|
||
pairs = extract_pairs(fields, 3, 7, step=3)
|
||
for col, let in enumerate("STUVWXY"):
|
||
c, n = pairs[col]
|
||
if valid_detail(c, n):
|
||
details[let].append((c, n))
|
||
|
||
for L in details:
|
||
details[L] = dedup_pairs(details[L])
|
||
|
||
sql: list[str] = [
|
||
"-- Sync missing rows from 종량제_개발목록_20260127(기본코드 종류).csv",
|
||
"-- Generated by writable/tools/sync_basic_codes_from_csv.py",
|
||
"SET NAMES utf8mb4;",
|
||
"",
|
||
]
|
||
|
||
for L in "ABCDEFGHIJKLMNOPQRSTUVWXY":
|
||
name = KIND_NAMES.get(L, L)
|
||
ne = name.replace("'", "''")
|
||
sql.append(
|
||
f"INSERT INTO `code_kind` (`ck_code`, `ck_name`, `ck_state`, `ck_regdate`) "
|
||
f"SELECT '{L}', '{ne}', 1, NOW() FROM DUAL "
|
||
f"WHERE NOT EXISTS (SELECT 1 FROM `code_kind` c WHERE c.ck_code = '{L}');"
|
||
)
|
||
|
||
sql.append("")
|
||
|
||
for L in "ABCDEFGHIJKLMNOPQRSTUVWXY":
|
||
sort_i = 0
|
||
for c, n in details[L]:
|
||
sort_i += 10
|
||
ce = c.replace("'", "''")
|
||
ne = n.replace("'", "''")
|
||
sql.append(
|
||
"INSERT INTO `code_detail` (`cd_ck_idx`, `cd_code`, `cd_name`, `cd_sort`, `cd_state`, `cd_regdate`) "
|
||
f"SELECT k.ck_idx, '{ce}', '{ne}', {sort_i}, 1, NOW() FROM `code_kind` k "
|
||
f"WHERE k.ck_code = '{L}' AND NOT EXISTS ("
|
||
"SELECT 1 FROM `code_detail` d WHERE d.cd_ck_idx = k.ck_idx AND d.cd_code = "
|
||
f"'{ce}'"
|
||
");"
|
||
)
|
||
|
||
out_path = root / "writable/database/code_master_sync_from_csv.sql"
|
||
out_path.write_text("\n".join(sql) + "\n", encoding="utf-8")
|
||
print("Wrote", out_path)
|
||
for L in "ABCDEFGHIJKLMNOPQRSTUVWXY":
|
||
print(f" {L}: {len(details[L])} details (unique cd_code)")
|
||
return 0
|
||
|
||
|
||
if __name__ == "__main__":
|
||
raise SystemExit(main())
|