# DataTools/sync_from_notion.py """노션 스키마 → CSV 동기화 (자동 발견)""" import os import re import pandas as pd from notion_client import Client from pathlib import Path from datetime import datetime # ===== 설정 ===== NOTION_API_KEY = "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" notion = Client(auth=NOTION_API_KEY) if NOTION_API_KEY else None # ⭐ 부모 페이지 ID만 설정 (1회) SCHEMA_PARENT_PAGE_ID = "2f494d45b1a3818fa9fceb4f9e17d905" SCRIPT_DIR = Path(__file__).parent GAMEDATA_DIR = SCRIPT_DIR.parent / "GameData" BACKUP_DIR = GAMEDATA_DIR / "Backups" BACKUP_DIR.mkdir(exist_ok=True) # ===== 유틸리티 함수 ===== def clean_page_title(title): """ 노션 페이지 제목 → 스키마 이름 변환 규칙: 1. 이모지, 특수문자 제거 2. "스키마", "Schema" 제거 3. 공백 제거 예시: - "🏰 타워 스키마" → "타워" - "Tower Schema" → "Tower" - "Enemies" → "Enemies" """ # 1. 이모지 및 특수문자 제거 cleaned = re.sub(r'[^\w\s가-힣]', '', title).strip() # 2. "스키마", "Schema" 제거 cleaned = re.sub(r'\s*(스키마|Schema)\s*', '', cleaned, flags=re.IGNORECASE).strip() # 3. 공백 제거 cleaned = cleaned.replace(' ', '') # 4. 비어있으면 원본 반환 if not cleaned: return title.replace(' ', '') return cleaned def discover_schema_pages(parent_id=None, depth=0, max_depth=3): """ 부모 페이지의 하위 페이지들을 재귀적으로 탐색하여 스키마 발견 Args: parent_id: 탐색할 부모 페이지 ID (None이면 SCHEMA_PARENT_PAGE_ID 사용) depth: 현재 깊이 (0부터 시작) max_depth: 최대 탐색 깊이 (기본 3단계) 반환: { "타워": "page_id_1", "적유닛": "page_id_2", ... } """ if not notion: raise ValueError("Notion API 클라이언트가 초기화되지 않았습니다.") if parent_id is None: if not SCHEMA_PARENT_PAGE_ID or SCHEMA_PARENT_PAGE_ID == "노션_데이터_스키마_정의_페이지_ID": raise ValueError( "SCHEMA_PARENT_PAGE_ID가 설정되지 않았습니다.\n" "sync_from_notion.py 파일에서 부모 페이지 ID를 설정하세요." ) parent_id = SCHEMA_PARENT_PAGE_ID # 최대 깊이 체크 if depth > max_depth: return {} indent = " " * depth if depth == 0: print("🔍 스키마 페이지 자동 발견 중...") try: # 부모 페이지의 자식 블록 가져오기 children = notion.blocks.children.list(block_id=parent_id) schemas = {} for block in children['results']: if block['type'] == 'child_page': page_id = block['id'] page_title = block['child_page']['title'] # 제목 정리 schema_name = clean_page_title(page_title) print(f"{indent}📋 발견: '{page_title}'", end="") # 이 페이지에 테이블이 있는지 확인 has_table = check_page_has_table(page_id) if has_table: # 테이블이 있으면 스키마로 등록 schemas[schema_name] = page_id print(f" → {schema_name} ✅") else: # 테이블이 없으면 하위 페이지 탐색 print(f" (폴더)") child_schemas = discover_schema_pages(page_id, depth + 1, max_depth) schemas.update(child_schemas) if depth == 0 and not schemas: print(" ⚠️ 하위 페이지를 찾을 수 없습니다.") print(f" 💡 노션에서 부모 페이지 하위에 스키마 페이지를 추가하세요.") return schemas except Exception as e: print(f"{indent}❌ 탐색 실패: {e}") import traceback traceback.print_exc() return {} def check_page_has_table(page_id): """ 페이지에 테이블 블록이 있는지 확인 Args: page_id: 확인할 페이지 ID 반환: True: 테이블 있음 False: 테이블 없음 """ try: blocks_response = notion.blocks.children.list(block_id=page_id) blocks = blocks_response.get('results', []) for block in blocks: if block.get('type') == 'table': return True return False except Exception as e: # 에러 발생 시 테이블 없음으로 간주 return False def parse_condition(condition_str): """ 사용 조건 파싱 빈 문자열 → None (항상 사용) "tower_type=attack" → {'field': 'tower_type', 'op': '=', 'value': 'attack'} """ if not condition_str or condition_str.strip() == "": return None condition_str = condition_str.strip() # 단순 조건: "tower_type=attack" match = re.match(r'(\w+)\s*(=|!=|>|<|>=|<=)\s*(.+)', condition_str) if match: return { 'field': match.group(1), 'op': match.group(2), 'value': match.group(3).strip() } return None def parse_notion_table(page_id): """노션 테이블 파싱""" if not notion: raise ValueError("Notion API 클라이언트가 초기화되지 않았습니다.") try: # 1. 블록 가져오기 blocks_response = notion.blocks.children.list(block_id=page_id) blocks = blocks_response.get('results', []) # 2. 테이블 찾기 table_block = None for block in blocks: if block.get('type') == 'table': table_block = block break if not table_block: raise ValueError(f"테이블을 찾을 수 없습니다.") print(f" 📋 테이블 발견") # 3. 행 가져오기 table_id = table_block['id'] rows_response = notion.blocks.children.list(block_id=table_id) rows = rows_response.get('results', []) if len(rows) < 2: raise ValueError("테이블에 데이터가 없습니다.") # 4. 파싱 schema = [] def extract_text(cell, preserve_newlines=False): """ 셀에서 텍스트 추출 Args: cell: 노션 셀 데이터 preserve_newlines: True면 줄바꿈 보존, False면 공백으로 변환 """ if not cell or len(cell) == 0: return "" text_parts = [] for content in cell: if content.get('type') == 'text': text_content = content.get('text', {}).get('content', '') text_parts.append(text_content) if preserve_newlines: # 줄바꿈 보존 (\\n으로 이스케이프) result = ''.join(text_parts) # CSV에서 안전하게 저장하기 위해 실제 줄바꿈을 \\n으로 변환 result = result.replace('\n', '\\n') return result.strip() else: # 줄바꿈을 공백으로 변환 return ''.join(text_parts).strip() for row_idx, row in enumerate(rows[1:], start=2): if row.get('type') != 'table_row': continue cells = row['table_row']['cells'] # 4개 컬럼: 필드명, 타입, 사용 조건, 설명 field_name = extract_text(cells[0]) if len(cells) > 0 else "" field_type = extract_text(cells[1]) if len(cells) > 1 else "string" condition_str = extract_text(cells[2]) if len(cells) > 2 else "" # ⭐ 설명 컬럼만 줄바꿈 보존 description = extract_text(cells[3], preserve_newlines=True) if len(cells) > 3 else "" if not field_name: continue # 조건 파싱 condition = parse_condition(condition_str) if condition: print(f" 📌 {field_name}: {condition['field']}={condition['value']}일 때 사용") schema.append({ 'name': field_name, 'type': field_type.lower(), 'condition': condition, 'description': description }) if len(schema) == 0: raise ValueError("파싱된 스키마가 비어있습니다.") return schema except Exception as e: print(f" ❌ 파싱 오류: {e}") raise def get_default_value(field_type, has_condition): """ 기본값 결정 조건부 필드 → None (빈 칸) 공통 필드 → 타입별 기본값 """ # 조건부 필드는 빈 칸 if has_condition: return None # 공통 필드는 타입별 기본값 if field_type == "int": return 0 elif field_type in ["float", "number"]: return 0.0 elif field_type in ["bool", "boolean"]: return False elif field_type == "string": return "" else: return None def merge_schema_and_data(schema, existing_data): """스키마와 데이터 병합""" schema_columns = [f['name'] for f in schema] if existing_data is None or existing_data.empty: print(" 새 파일 생성") example_row = {} for field in schema: has_condition = field.get('condition') is not None example_row[field['name']] = get_default_value(field['type'], has_condition) return pd.DataFrame([example_row]) print(f" 기존 데이터: {len(existing_data)}행") new_df = pd.DataFrame() for field in schema: col_name = field['name'] if col_name in existing_data.columns: print(f" ✓ {col_name}: 유지") new_df[col_name] = existing_data[col_name] else: has_condition = field.get('condition') is not None default_val = get_default_value(field['type'], has_condition) if default_val is None: print(f" + {col_name}: 추가 (조건부 필드, 빈 칸)") else: print(f" + {col_name}: 추가 (기본값: {default_val})") new_df[col_name] = default_val return new_df def sync_single_schema(data_name, page_id): """단일 스키마 동기화 (CSV 버전)""" print(f"\n{'='*60}") print(f"📋 {data_name} 동기화") print(f"{'='*60}") try: # 1. 스키마 읽기 print("1️⃣ 스키마 읽기...") schema = parse_notion_table(page_id) print(f" ✅ {len(schema)}개 필드") # 2. 스키마를 JSON으로 저장 (검증용) import json schema_json_path = GAMEDATA_DIR / f".{data_name}_schema.json" with open(schema_json_path, 'w', encoding='utf-8') as f: json.dump(schema, f, ensure_ascii=False, indent=2) print(f" 💾 스키마 저장: {schema_json_path.name}") # 3. 기존 파일 확인 csv_path = GAMEDATA_DIR / f"{data_name}.csv" print(f"\n2️⃣ 기존 파일: {csv_path}") existing_data = None if csv_path.exists(): # 백업 timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") backup_path = BACKUP_DIR / f"{data_name}_{timestamp}.csv" import shutil shutil.copy2(csv_path, backup_path) print(f" 💾 백업: {backup_path.name}") existing_data = pd.read_csv(csv_path) # 4. 병합 print(f"\n3️⃣ 병합 중...") merged_df = merge_schema_and_data(schema, existing_data) # 5. 저장 (CSV) print(f"\n4️⃣ 저장...") merged_df.to_csv(csv_path, index=False, encoding='utf-8-sig') print(f" ✅ 완료: {csv_path}") return True except Exception as e: print(f"\n❌ 오류: {e}") import traceback traceback.print_exc() return False def main(): print("=" * 60) print("🔄 Notion → CSV 동기화 (자동 발견)") print("=" * 60) print() if not NOTION_API_KEY: print("❌ NOTION_API_KEY 환경변수가 없습니다") print("💡 설정 방법:") print(' $env:NOTION_API_KEY = "your_key"') return print(f"📂 데이터 폴더: {GAMEDATA_DIR}") print(f"💾 백업 폴더: {BACKUP_DIR}") print() # ⭐ 스키마 자동 발견 try: SCHEMA_PAGE_IDS = discover_schema_pages() except Exception as e: print(f"\n❌ 스키마 발견 실패: {e}") return if not SCHEMA_PAGE_IDS: print("\n❌ 발견된 스키마 페이지가 없습니다.") return print() print("=" * 60) print("동기화할 스키마를 선택하세요:") schemas = list(SCHEMA_PAGE_IDS.keys()) for idx, name in enumerate(schemas, 1): print(f" {idx}. {name}") print(f" {len(schemas) + 1}. 전체") print() try: choice = input("선택 (번호 입력): ").strip() if choice == str(len(schemas) + 1): selected = schemas else: idx = int(choice) - 1 if 0 <= idx < len(schemas): selected = [schemas[idx]] else: print("❌ 잘못된 선택입니다.") return except (ValueError, KeyboardInterrupt): print("\n⚠️ 취소되었습니다.") return # 동기화 실행 print() success_count = 0 for schema_name in selected: page_id = SCHEMA_PAGE_IDS[schema_name] if sync_single_schema(schema_name, page_id): success_count += 1 # 최종 결과 print() print("=" * 60) print(f"✅ 완료: {success_count}/{len(selected)} 성공") print("=" * 60) if success_count > 0: print() print("💡 다음 단계:") print(" 1. GameData 폴더에서 CSV 파일 확인") print(" 2. 데이터 수정") print(" 3. Git 커밋:") print(" git add GameData/*.csv") print(' git commit -m "Update data from Notion"') if __name__ == "__main__": main()