Files
Northbound/DataTools/sync_from_notion.py
2026-01-30 13:40:31 +09:00

178 lines
6.7 KiB
Python

import os
import re
import json
import pandas as pd
import shutil
from notion_client import Client
from pathlib import Path
from datetime import datetime
# ===== 설정 =====
NOTION_API_KEY = "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"
notion = Client(auth=NOTION_API_KEY) if NOTION_API_KEY else None
SCHEMA_PARENT_PAGE_ID = "2f494d45b1a3818fa9fceb4f9e17d905"
SCRIPT_DIR = Path(__file__).parent
GAMEDATA_DIR = SCRIPT_DIR.parent / "GameData"
BACKUP_DIR = GAMEDATA_DIR / "Backups"
BACKUP_DIR.mkdir(exist_ok=True)
# ===== 유틸리티 함수 =====
def clean_page_title(title):
cleaned = re.sub(r'[^\w\s가-힣]', '', title).strip()
cleaned = re.sub(r'\s*(스키마|Schema)\s*', '', cleaned, flags=re.IGNORECASE).strip()
cleaned = cleaned.replace(' ', '')
return cleaned if cleaned else title.replace(' ', '')
def check_page_has_table(page_id):
try:
blocks_response = notion.blocks.children.list(block_id=page_id)
return any(block.get('type') == 'table' for block in blocks_response.get('results', []))
except:
return False
def discover_schema_pages(parent_id=None, depth=0, max_depth=3):
if not notion: raise ValueError("API Key missing")
parent_id = parent_id or SCHEMA_PARENT_PAGE_ID
if depth > max_depth: return {}
indent = " " * depth
if depth == 0: print("🔍 스키마 페이지 자동 발견 중...")
try:
children = notion.blocks.children.list(block_id=parent_id)
schemas = {}
for block in children['results']:
if block['type'] == 'child_page':
page_id = block['id']
page_title = block['child_page']['title']
schema_name = clean_page_title(page_title)
print(f"{indent}📋 발견: '{page_title}'", end="")
if check_page_has_table(page_id):
schemas[schema_name] = page_id
print(f"{schema_name}")
else:
print(f" (폴더)")
schemas.update(discover_schema_pages(page_id, depth + 1, max_depth))
return schemas
except Exception as e:
print(f"{indent}❌ 탐색 실패: {e}")
return {}
def parse_condition(condition_str):
if not condition_str or condition_str.strip() == "": return None
match = re.match(r'(\w+)\s*(=|!=|>|<|>=|<=)\s*(.+)', condition_str.strip())
if match:
return {'field': match.group(1), 'op': match.group(2), 'value': match.group(3).strip()}
return None
def parse_notion_table(page_id):
blocks = notion.blocks.children.list(block_id=page_id).get('results', [])
table_block = next((b for b in blocks if b['type'] == 'table'), None)
if not table_block: raise ValueError("테이블을 찾을 수 없습니다.")
rows = notion.blocks.children.list(block_id=table_block['id']).get('results', [])
schema = []
def extract_text(cell, preserve_newlines=False):
if not cell: return ""
text = "".join([c.get('text', {}).get('content', '') for c in cell if c.get('type') == 'text'])
return text.replace('\n', '\\n').strip() if preserve_newlines else text.strip()
for row in rows[1:]: # 헤더 제외
if row.get('type') != 'table_row': continue
cells = row['table_row']['cells']
field_name = extract_text(cells[0])
if not field_name: continue
# 필드 파싱
field_type = extract_text(cells[1]).lower()
condition_str = extract_text(cells[2])
description = extract_text(cells[3], preserve_newlines=True) if len(cells) > 3 else ""
schema.append({
'name': field_name,
'type': field_type, # "list:int" 형태 그대로 보존
'condition': parse_condition(condition_str),
'description': description
})
return schema
def get_default_value(field_type, has_condition):
"""기본값 결정 (List 대응)"""
if has_condition: return None
f_type = field_type.lower()
if 'list' in f_type: return [] # 리스트는 빈 리스트 객체
if f_type == "int": return 0
if f_type in ["float", "number"]: return 0.0
if f_type in ["bool", "boolean"]: return False
return ""
def merge_schema_and_data(schema, existing_data):
if existing_data is None or existing_data.empty:
example_row = {f['name']: get_default_value(f['type'], f.get('condition') is not None) for f in schema}
# CSV 저장을 위해 리스트는 문자열로 변환 (빈 값)
for k, v in example_row.items():
if isinstance(v, list): example_row[k] = ""
return pd.DataFrame([example_row])
new_df = pd.DataFrame()
for field in schema:
col = field['name']
if col in existing_data.columns:
new_df[col] = existing_data[col]
else:
val = get_default_value(field['type'], field.get('condition') is not None)
new_df[col] = "" if isinstance(val, list) else val
return new_df
def sync_single_schema(data_name, page_id):
print(f"\n🔄 {data_name} 동기화 시작...")
try:
schema = parse_notion_table(page_id)
# 1. 스키마 JSON 저장 (generate_all_classes.py가 읽을 파일)
schema_path = GAMEDATA_DIR / f".{data_name}_schema.json"
with open(schema_path, 'w', encoding='utf-8') as f:
json.dump(schema, f, ensure_ascii=False, indent=2)
# 2. CSV 업데이트
csv_path = GAMEDATA_DIR / f"{data_name}.csv"
existing_data = pd.read_csv(csv_path) if csv_path.exists() else None
if existing_data is not None:
shutil.copy2(csv_path, BACKUP_DIR / f"{data_name}_{datetime.now().strftime('%Y%m%d_%H%M%S')}.csv")
merged_df = merge_schema_and_data(schema, existing_data)
merged_df.to_csv(csv_path, index=False, encoding='utf-8-sig')
print(f" ✅ 완료: {data_name} (스키마 및 CSV 업데이트)")
return True
except Exception as e:
print(f" ❌ 실패: {e}")
return False
def main():
print("🚀 Notion Schema Sync Start")
try:
page_ids = discover_schema_pages()
if not page_ids: return
schemas = list(page_ids.keys())
for idx, name in enumerate(schemas, 1):
print(f" {idx}. {name}")
print(f" {len(schemas) + 1}. 전체")
choice = input("\n번호 선택: ").strip()
selected = schemas if choice == str(len(schemas) + 1) else [schemas[int(choice)-1]]
for name in selected:
sync_single_schema(name, page_ids[name])
except Exception as e:
print(f"오류 발생: {e}")
if __name__ == "__main__":
main()