import os import re import json import pandas as pd import shutil from notion_client import Client from pathlib import Path from datetime import datetime # ===== 설정 ===== NOTION_API_KEY = "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" notion = Client(auth=NOTION_API_KEY) if NOTION_API_KEY else None SCHEMA_PARENT_PAGE_ID = "2f494d45b1a3818fa9fceb4f9e17d905" SCRIPT_DIR = Path(__file__).parent GAMEDATA_DIR = SCRIPT_DIR.parent / "GameData" BACKUP_DIR = GAMEDATA_DIR / "Backups" BACKUP_DIR.mkdir(exist_ok=True) # ===== 유틸리티 함수 ===== def clean_page_title(title): cleaned = re.sub(r'[^\w\s가-힣]', '', title).strip() cleaned = re.sub(r'\s*(스키마|Schema)\s*', '', cleaned, flags=re.IGNORECASE).strip() cleaned = cleaned.replace(' ', '') return cleaned if cleaned else title.replace(' ', '') def check_page_has_table(page_id): try: blocks_response = notion.blocks.children.list(block_id=page_id) return any(block.get('type') == 'table' for block in blocks_response.get('results', [])) except: return False def discover_schema_pages(parent_id=None, depth=0, max_depth=3): if not notion: raise ValueError("API Key missing") parent_id = parent_id or SCHEMA_PARENT_PAGE_ID if depth > max_depth: return {} indent = " " * depth if depth == 0: print("🔍 스키마 페이지 자동 발견 중...") try: children = notion.blocks.children.list(block_id=parent_id) schemas = {} for block in children['results']: if block['type'] == 'child_page': page_id = block['id'] page_title = block['child_page']['title'] schema_name = clean_page_title(page_title) print(f"{indent}📋 발견: '{page_title}'", end="") if check_page_has_table(page_id): schemas[schema_name] = page_id print(f" → {schema_name} ✅") else: print(f" (폴더)") schemas.update(discover_schema_pages(page_id, depth + 1, max_depth)) return schemas except Exception as e: print(f"{indent}❌ 탐색 실패: {e}") return {} def parse_condition(condition_str): if not condition_str or condition_str.strip() == "": return None match = re.match(r'(\w+)\s*(=|!=|>|<|>=|<=)\s*(.+)', condition_str.strip()) if match: return {'field': match.group(1), 'op': match.group(2), 'value': match.group(3).strip()} return None def parse_notion_table(page_id): blocks = notion.blocks.children.list(block_id=page_id).get('results', []) table_block = next((b for b in blocks if b['type'] == 'table'), None) if not table_block: raise ValueError("테이블을 찾을 수 없습니다.") rows = notion.blocks.children.list(block_id=table_block['id']).get('results', []) schema = [] def extract_text(cell, preserve_newlines=False): if not cell: return "" text = "".join([c.get('text', {}).get('content', '') for c in cell if c.get('type') == 'text']) return text.replace('\n', '\\n').strip() if preserve_newlines else text.strip() for row in rows[1:]: # 헤더 제외 if row.get('type') != 'table_row': continue cells = row['table_row']['cells'] field_name = extract_text(cells[0]) if not field_name: continue # 필드 파싱 field_type = extract_text(cells[1]).lower() condition_str = extract_text(cells[2]) description = extract_text(cells[3], preserve_newlines=True) if len(cells) > 3 else "" schema.append({ 'name': field_name, 'type': field_type, # "list:int" 형태 그대로 보존 'condition': parse_condition(condition_str), 'description': description }) return schema def get_default_value(field_type, has_condition): """기본값 결정 (List 대응)""" if has_condition: return None f_type = field_type.lower() if 'list' in f_type: return [] # 리스트는 빈 리스트 객체 if f_type == "int": return 0 if f_type in ["float", "number"]: return 0.0 if f_type in ["bool", "boolean"]: return False return "" def merge_schema_and_data(schema, existing_data): if existing_data is None or existing_data.empty: example_row = {f['name']: get_default_value(f['type'], f.get('condition') is not None) for f in schema} # CSV 저장을 위해 리스트는 문자열로 변환 (빈 값) for k, v in example_row.items(): if isinstance(v, list): example_row[k] = "" return pd.DataFrame([example_row]) new_df = pd.DataFrame() for field in schema: col = field['name'] if col in existing_data.columns: new_df[col] = existing_data[col] else: val = get_default_value(field['type'], field.get('condition') is not None) new_df[col] = "" if isinstance(val, list) else val return new_df def sync_single_schema(data_name, page_id): print(f"\n🔄 {data_name} 동기화 시작...") try: schema = parse_notion_table(page_id) # 1. 스키마 JSON 저장 (generate_all_classes.py가 읽을 파일) schema_path = GAMEDATA_DIR / f".{data_name}_schema.json" with open(schema_path, 'w', encoding='utf-8') as f: json.dump(schema, f, ensure_ascii=False, indent=2) # 2. CSV 업데이트 csv_path = GAMEDATA_DIR / f"{data_name}.csv" existing_data = pd.read_csv(csv_path) if csv_path.exists() else None if existing_data is not None: shutil.copy2(csv_path, BACKUP_DIR / f"{data_name}_{datetime.now().strftime('%Y%m%d_%H%M%S')}.csv") merged_df = merge_schema_and_data(schema, existing_data) merged_df.to_csv(csv_path, index=False, encoding='utf-8-sig') print(f" ✅ 완료: {data_name} (스키마 및 CSV 업데이트)") return True except Exception as e: print(f" ❌ 실패: {e}") return False def main(): print("🚀 Notion Schema Sync Start") try: page_ids = discover_schema_pages() if not page_ids: return schemas = list(page_ids.keys()) for idx, name in enumerate(schemas, 1): print(f" {idx}. {name}") print(f" {len(schemas) + 1}. 전체") choice = input("\n번호 선택: ").strip() selected = schemas if choice == str(len(schemas) + 1) else [schemas[int(choice)-1]] for name in selected: sync_single_schema(name, page_ids[name]) except Exception as e: print(f"오류 발생: {e}") if __name__ == "__main__": main()