178 lines
6.7 KiB
Python
178 lines
6.7 KiB
Python
import os
|
|
import re
|
|
import json
|
|
import pandas as pd
|
|
import shutil
|
|
from notion_client import Client
|
|
from pathlib import Path
|
|
from datetime import datetime
|
|
|
|
# ===== 설정 =====
|
|
NOTION_API_KEY = "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"
|
|
notion = Client(auth=NOTION_API_KEY) if NOTION_API_KEY else None
|
|
SCHEMA_PARENT_PAGE_ID = "2f494d45b1a3818fa9fceb4f9e17d905"
|
|
|
|
SCRIPT_DIR = Path(__file__).parent
|
|
GAMEDATA_DIR = SCRIPT_DIR.parent / "GameData"
|
|
BACKUP_DIR = GAMEDATA_DIR / "Backups"
|
|
BACKUP_DIR.mkdir(exist_ok=True)
|
|
|
|
# ===== 유틸리티 함수 =====
|
|
|
|
def clean_page_title(title):
|
|
cleaned = re.sub(r'[^\w\s가-힣]', '', title).strip()
|
|
cleaned = re.sub(r'\s*(스키마|Schema)\s*', '', cleaned, flags=re.IGNORECASE).strip()
|
|
cleaned = cleaned.replace(' ', '')
|
|
return cleaned if cleaned else title.replace(' ', '')
|
|
|
|
def check_page_has_table(page_id):
|
|
try:
|
|
blocks_response = notion.blocks.children.list(block_id=page_id)
|
|
return any(block.get('type') == 'table' for block in blocks_response.get('results', []))
|
|
except:
|
|
return False
|
|
|
|
def discover_schema_pages(parent_id=None, depth=0, max_depth=3):
|
|
if not notion: raise ValueError("API Key missing")
|
|
parent_id = parent_id or SCHEMA_PARENT_PAGE_ID
|
|
if depth > max_depth: return {}
|
|
|
|
indent = " " * depth
|
|
if depth == 0: print("🔍 스키마 페이지 자동 발견 중...")
|
|
|
|
try:
|
|
children = notion.blocks.children.list(block_id=parent_id)
|
|
schemas = {}
|
|
for block in children['results']:
|
|
if block['type'] == 'child_page':
|
|
page_id = block['id']
|
|
page_title = block['child_page']['title']
|
|
schema_name = clean_page_title(page_title)
|
|
|
|
print(f"{indent}📋 발견: '{page_title}'", end="")
|
|
if check_page_has_table(page_id):
|
|
schemas[schema_name] = page_id
|
|
print(f" → {schema_name} ✅")
|
|
else:
|
|
print(f" (폴더)")
|
|
schemas.update(discover_schema_pages(page_id, depth + 1, max_depth))
|
|
return schemas
|
|
except Exception as e:
|
|
print(f"{indent}❌ 탐색 실패: {e}")
|
|
return {}
|
|
|
|
def parse_condition(condition_str):
|
|
if not condition_str or condition_str.strip() == "": return None
|
|
match = re.match(r'(\w+)\s*(=|!=|>|<|>=|<=)\s*(.+)', condition_str.strip())
|
|
if match:
|
|
return {'field': match.group(1), 'op': match.group(2), 'value': match.group(3).strip()}
|
|
return None
|
|
|
|
def parse_notion_table(page_id):
|
|
blocks = notion.blocks.children.list(block_id=page_id).get('results', [])
|
|
table_block = next((b for b in blocks if b['type'] == 'table'), None)
|
|
if not table_block: raise ValueError("테이블을 찾을 수 없습니다.")
|
|
|
|
rows = notion.blocks.children.list(block_id=table_block['id']).get('results', [])
|
|
schema = []
|
|
|
|
def extract_text(cell, preserve_newlines=False):
|
|
if not cell: return ""
|
|
text = "".join([c.get('text', {}).get('content', '') for c in cell if c.get('type') == 'text'])
|
|
return text.replace('\n', '\\n').strip() if preserve_newlines else text.strip()
|
|
|
|
for row in rows[1:]: # 헤더 제외
|
|
if row.get('type') != 'table_row': continue
|
|
cells = row['table_row']['cells']
|
|
field_name = extract_text(cells[0])
|
|
if not field_name: continue
|
|
|
|
# 필드 파싱
|
|
field_type = extract_text(cells[1]).lower()
|
|
condition_str = extract_text(cells[2])
|
|
description = extract_text(cells[3], preserve_newlines=True) if len(cells) > 3 else ""
|
|
|
|
schema.append({
|
|
'name': field_name,
|
|
'type': field_type, # "list:int" 형태 그대로 보존
|
|
'condition': parse_condition(condition_str),
|
|
'description': description
|
|
})
|
|
return schema
|
|
|
|
def get_default_value(field_type, has_condition):
|
|
"""기본값 결정 (List 대응)"""
|
|
if has_condition: return None
|
|
|
|
f_type = field_type.lower()
|
|
if 'list' in f_type: return [] # 리스트는 빈 리스트 객체
|
|
if f_type == "int": return 0
|
|
if f_type in ["float", "number"]: return 0.0
|
|
if f_type in ["bool", "boolean"]: return False
|
|
return ""
|
|
|
|
def merge_schema_and_data(schema, existing_data):
|
|
if existing_data is None or existing_data.empty:
|
|
example_row = {f['name']: get_default_value(f['type'], f.get('condition') is not None) for f in schema}
|
|
# CSV 저장을 위해 리스트는 문자열로 변환 (빈 값)
|
|
for k, v in example_row.items():
|
|
if isinstance(v, list): example_row[k] = ""
|
|
return pd.DataFrame([example_row])
|
|
|
|
new_df = pd.DataFrame()
|
|
for field in schema:
|
|
col = field['name']
|
|
if col in existing_data.columns:
|
|
new_df[col] = existing_data[col]
|
|
else:
|
|
val = get_default_value(field['type'], field.get('condition') is not None)
|
|
new_df[col] = "" if isinstance(val, list) else val
|
|
return new_df
|
|
|
|
def sync_single_schema(data_name, page_id):
|
|
print(f"\n🔄 {data_name} 동기화 시작...")
|
|
try:
|
|
schema = parse_notion_table(page_id)
|
|
|
|
# 1. 스키마 JSON 저장 (generate_all_classes.py가 읽을 파일)
|
|
schema_path = GAMEDATA_DIR / f".{data_name}_schema.json"
|
|
with open(schema_path, 'w', encoding='utf-8') as f:
|
|
json.dump(schema, f, ensure_ascii=False, indent=2)
|
|
|
|
# 2. CSV 업데이트
|
|
csv_path = GAMEDATA_DIR / f"{data_name}.csv"
|
|
existing_data = pd.read_csv(csv_path) if csv_path.exists() else None
|
|
|
|
if existing_data is not None:
|
|
shutil.copy2(csv_path, BACKUP_DIR / f"{data_name}_{datetime.now().strftime('%Y%m%d_%H%M%S')}.csv")
|
|
|
|
merged_df = merge_schema_and_data(schema, existing_data)
|
|
merged_df.to_csv(csv_path, index=False, encoding='utf-8-sig')
|
|
print(f" ✅ 완료: {data_name} (스키마 및 CSV 업데이트)")
|
|
return True
|
|
except Exception as e:
|
|
print(f" ❌ 실패: {e}")
|
|
return False
|
|
|
|
def main():
|
|
print("🚀 Notion Schema Sync Start")
|
|
try:
|
|
page_ids = discover_schema_pages()
|
|
if not page_ids: return
|
|
|
|
schemas = list(page_ids.keys())
|
|
for idx, name in enumerate(schemas, 1):
|
|
print(f" {idx}. {name}")
|
|
print(f" {len(schemas) + 1}. 전체")
|
|
|
|
choice = input("\n번호 선택: ").strip()
|
|
selected = schemas if choice == str(len(schemas) + 1) else [schemas[int(choice)-1]]
|
|
|
|
for name in selected:
|
|
sync_single_schema(name, page_ids[name])
|
|
|
|
except Exception as e:
|
|
print(f"오류 발생: {e}")
|
|
|
|
if __name__ == "__main__":
|
|
main() |