Files
ProjectMD/DataTools/sync_from_notion.py

475 lines
15 KiB
Python
Raw Blame History

This file contains invisible Unicode characters
This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# DataTools/sync_from_notion.py
"""노션 스키마 → CSV 동기화 (자동 발견)"""
import os
import re
import pandas as pd
from notion_client import Client
from pathlib import Path
from datetime import datetime
# ===== 설정 =====
NOTION_API_KEY = "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"
notion = Client(auth=NOTION_API_KEY) if NOTION_API_KEY else None
# ⭐ 부모 페이지 ID만 설정 (1회)
SCHEMA_PARENT_PAGE_ID = "2ef94d45b1a380438d66fabc0c86b3d7"
SCRIPT_DIR = Path(__file__).parent
GAMEDATA_DIR = SCRIPT_DIR.parent / "GameData"
BACKUP_DIR = GAMEDATA_DIR / "Backups"
BACKUP_DIR.mkdir(exist_ok=True)
# ===== 유틸리티 함수 =====
def clean_page_title(title):
"""
노션 페이지 제목 → 스키마 이름 변환
규칙:
1. 이모지, 특수문자 제거
2. "스키마", "Schema" 제거
3. 공백 제거
예시:
- "🏰 타워 스키마""타워"
- "Tower Schema""Tower"
- "Enemies""Enemies"
"""
# 1. 이모지 및 특수문자 제거
cleaned = re.sub(r'[^\w\s가-힣]', '', title).strip()
# 2. "스키마", "Schema" 제거
cleaned = re.sub(r'\s*(스키마|Schema)\s*', '', cleaned, flags=re.IGNORECASE).strip()
# 3. 공백 제거
cleaned = cleaned.replace(' ', '')
# 4. 비어있으면 원본 반환
if not cleaned:
return title.replace(' ', '')
return cleaned
def discover_schema_pages(parent_id=None, depth=0, max_depth=3):
"""
부모 페이지의 하위 페이지들을 재귀적으로 탐색하여 스키마 발견
Args:
parent_id: 탐색할 부모 페이지 ID (None이면 SCHEMA_PARENT_PAGE_ID 사용)
depth: 현재 깊이 (0부터 시작)
max_depth: 최대 탐색 깊이 (기본 3단계)
반환:
{
"타워": "page_id_1",
"적유닛": "page_id_2",
...
}
"""
if not notion:
raise ValueError("Notion API 클라이언트가 초기화되지 않았습니다.")
if parent_id is None:
if not SCHEMA_PARENT_PAGE_ID or SCHEMA_PARENT_PAGE_ID == "노션_데이터_스키마_정의_페이지_ID":
raise ValueError(
"SCHEMA_PARENT_PAGE_ID가 설정되지 않았습니다.\n"
"sync_from_notion.py 파일에서 부모 페이지 ID를 설정하세요."
)
parent_id = SCHEMA_PARENT_PAGE_ID
# 최대 깊이 체크
if depth > max_depth:
return {}
indent = " " * depth
if depth == 0:
print("🔍 스키마 페이지 자동 발견 중...")
try:
# 부모 페이지의 자식 블록 가져오기
children = notion.blocks.children.list(block_id=parent_id)
schemas = {}
for block in children['results']:
if block['type'] == 'child_page':
page_id = block['id']
page_title = block['child_page']['title']
# 제목 정리
schema_name = clean_page_title(page_title)
print(f"{indent}📋 발견: '{page_title}'", end="")
# 이 페이지에 테이블이 있는지 확인
has_table = check_page_has_table(page_id)
if has_table:
# 테이블이 있으면 스키마로 등록
schemas[schema_name] = page_id
print(f"{schema_name}")
else:
# 테이블이 없으면 하위 페이지 탐색
print(f" (폴더)")
child_schemas = discover_schema_pages(page_id, depth + 1, max_depth)
schemas.update(child_schemas)
if depth == 0 and not schemas:
print(" ⚠️ 하위 페이지를 찾을 수 없습니다.")
print(f" 💡 노션에서 부모 페이지 하위에 스키마 페이지를 추가하세요.")
return schemas
except Exception as e:
print(f"{indent}❌ 탐색 실패: {e}")
import traceback
traceback.print_exc()
return {}
def check_page_has_table(page_id):
"""
페이지에 테이블 블록이 있는지 확인
Args:
page_id: 확인할 페이지 ID
반환:
True: 테이블 있음
False: 테이블 없음
"""
try:
blocks_response = notion.blocks.children.list(block_id=page_id)
blocks = blocks_response.get('results', [])
for block in blocks:
if block.get('type') == 'table':
return True
return False
except Exception as e:
# 에러 발생 시 테이블 없음으로 간주
return False
def parse_condition(condition_str):
"""
사용 조건 파싱
빈 문자열 → None (항상 사용)
"tower_type=attack"{'field': 'tower_type', 'op': '=', 'value': 'attack'}
"""
if not condition_str or condition_str.strip() == "":
return None
condition_str = condition_str.strip()
# 단순 조건: "tower_type=attack"
match = re.match(r'(\w+)\s*(=|!=|>|<|>=|<=)\s*(.+)', condition_str)
if match:
return {
'field': match.group(1),
'op': match.group(2),
'value': match.group(3).strip()
}
return None
def parse_notion_table(page_id):
"""노션 테이블 파싱"""
if not notion:
raise ValueError("Notion API 클라이언트가 초기화되지 않았습니다.")
try:
# 1. 블록 가져오기
blocks_response = notion.blocks.children.list(block_id=page_id)
blocks = blocks_response.get('results', [])
# 2. 테이블 찾기
table_block = None
for block in blocks:
if block.get('type') == 'table':
table_block = block
break
if not table_block:
raise ValueError(f"테이블을 찾을 수 없습니다.")
print(f" 📋 테이블 발견")
# 3. 행 가져오기
table_id = table_block['id']
rows_response = notion.blocks.children.list(block_id=table_id)
rows = rows_response.get('results', [])
if len(rows) < 2:
raise ValueError("테이블에 데이터가 없습니다.")
# 4. 파싱
schema = []
def extract_text(cell, preserve_newlines=False):
"""
셀에서 텍스트 추출
Args:
cell: 노션 셀 데이터
preserve_newlines: True면 줄바꿈 보존, False면 공백으로 변환
"""
if not cell or len(cell) == 0:
return ""
text_parts = []
for content in cell:
if content.get('type') == 'text':
text_content = content.get('text', {}).get('content', '')
text_parts.append(text_content)
if preserve_newlines:
# 줄바꿈 보존 (\\n으로 이스케이프)
result = ''.join(text_parts)
# CSV에서 안전하게 저장하기 위해 실제 줄바꿈을 \\n으로 변환
result = result.replace('\n', '\\n')
return result.strip()
else:
# 줄바꿈을 공백으로 변환
return ''.join(text_parts).strip()
for row_idx, row in enumerate(rows[1:], start=2):
if row.get('type') != 'table_row':
continue
cells = row['table_row']['cells']
# 4개 컬럼: 필드명, 타입, 사용 조건, 설명
field_name = extract_text(cells[0]) if len(cells) > 0 else ""
field_type = extract_text(cells[1]) if len(cells) > 1 else "string"
condition_str = extract_text(cells[2]) if len(cells) > 2 else ""
# ⭐ 설명 컬럼만 줄바꿈 보존
description = extract_text(cells[3], preserve_newlines=True) if len(cells) > 3 else ""
if not field_name:
continue
# 조건 파싱
condition = parse_condition(condition_str)
if condition:
print(f" 📌 {field_name}: {condition['field']}={condition['value']}일 때 사용")
schema.append({
'name': field_name,
'type': field_type.lower(),
'condition': condition,
'description': description
})
if len(schema) == 0:
raise ValueError("파싱된 스키마가 비어있습니다.")
return schema
except Exception as e:
print(f" ❌ 파싱 오류: {e}")
raise
def get_default_value(field_type, has_condition):
"""
기본값 결정
조건부 필드 → None (빈 칸)
공통 필드 → 타입별 기본값
"""
# 조건부 필드는 빈 칸
if has_condition:
return None
# 공통 필드는 타입별 기본값
if field_type == "int":
return 0
elif field_type in ["float", "number"]:
return 0.0
elif field_type in ["bool", "boolean"]:
return False
elif field_type == "string":
return ""
else:
return None
def merge_schema_and_data(schema, existing_data):
"""스키마와 데이터 병합"""
schema_columns = [f['name'] for f in schema]
if existing_data is None or existing_data.empty:
print(" 새 파일 생성")
example_row = {}
for field in schema:
has_condition = field.get('condition') is not None
example_row[field['name']] = get_default_value(field['type'], has_condition)
return pd.DataFrame([example_row])
print(f" 기존 데이터: {len(existing_data)}")
new_df = pd.DataFrame()
for field in schema:
col_name = field['name']
if col_name in existing_data.columns:
print(f"{col_name}: 유지")
new_df[col_name] = existing_data[col_name]
else:
has_condition = field.get('condition') is not None
default_val = get_default_value(field['type'], has_condition)
if default_val is None:
print(f" + {col_name}: 추가 (조건부 필드, 빈 칸)")
else:
print(f" + {col_name}: 추가 (기본값: {default_val})")
new_df[col_name] = default_val
return new_df
def sync_single_schema(data_name, page_id):
"""단일 스키마 동기화 (CSV 버전)"""
print(f"\n{'='*60}")
print(f"📋 {data_name} 동기화")
print(f"{'='*60}")
try:
# 1. 스키마 읽기
print("1⃣ 스키마 읽기...")
schema = parse_notion_table(page_id)
print(f"{len(schema)}개 필드")
# 2. 스키마를 JSON으로 저장 (검증용)
import json
schema_json_path = GAMEDATA_DIR / f".{data_name}_schema.json"
with open(schema_json_path, 'w', encoding='utf-8') as f:
json.dump(schema, f, ensure_ascii=False, indent=2)
print(f" 💾 스키마 저장: {schema_json_path.name}")
# 3. 기존 파일 확인
csv_path = GAMEDATA_DIR / f"{data_name}.csv"
print(f"\n2⃣ 기존 파일: {csv_path}")
existing_data = None
if csv_path.exists():
# 백업
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
backup_path = BACKUP_DIR / f"{data_name}_{timestamp}.csv"
import shutil
shutil.copy2(csv_path, backup_path)
print(f" 💾 백업: {backup_path.name}")
existing_data = pd.read_csv(csv_path)
# 4. 병합
print(f"\n3⃣ 병합 중...")
merged_df = merge_schema_and_data(schema, existing_data)
# 5. 저장 (CSV)
print(f"\n4⃣ 저장...")
merged_df.to_csv(csv_path, index=False, encoding='utf-8-sig')
print(f" ✅ 완료: {csv_path}")
return True
except Exception as e:
print(f"\n❌ 오류: {e}")
import traceback
traceback.print_exc()
return False
def main():
print("=" * 60)
print("🔄 Notion → CSV 동기화 (자동 발견)")
print("=" * 60)
print()
if not NOTION_API_KEY:
print("❌ NOTION_API_KEY 환경변수가 없습니다")
print("💡 설정 방법:")
print(' $env:NOTION_API_KEY = "your_key"')
return
print(f"📂 데이터 폴더: {GAMEDATA_DIR}")
print(f"💾 백업 폴더: {BACKUP_DIR}")
print()
# ⭐ 스키마 자동 발견
try:
SCHEMA_PAGE_IDS = discover_schema_pages()
except Exception as e:
print(f"\n❌ 스키마 발견 실패: {e}")
return
if not SCHEMA_PAGE_IDS:
print("\n❌ 발견된 스키마 페이지가 없습니다.")
return
print()
print("=" * 60)
print("동기화할 스키마를 선택하세요:")
schemas = list(SCHEMA_PAGE_IDS.keys())
for idx, name in enumerate(schemas, 1):
print(f" {idx}. {name}")
print(f" {len(schemas) + 1}. 전체")
print()
try:
choice = input("선택 (번호 입력): ").strip()
if choice == str(len(schemas) + 1):
selected = schemas
else:
idx = int(choice) - 1
if 0 <= idx < len(schemas):
selected = [schemas[idx]]
else:
print("❌ 잘못된 선택입니다.")
return
except (ValueError, KeyboardInterrupt):
print("\n⚠️ 취소되었습니다.")
return
# 동기화 실행
print()
success_count = 0
for schema_name in selected:
page_id = SCHEMA_PAGE_IDS[schema_name]
if sync_single_schema(schema_name, page_id):
success_count += 1
# 최종 결과
print()
print("=" * 60)
print(f"✅ 완료: {success_count}/{len(selected)} 성공")
print("=" * 60)
if success_count > 0:
print()
print("💡 다음 단계:")
print(" 1. GameData 폴더에서 CSV 파일 확인")
print(" 2. 데이터 수정")
print(" 3. Git 커밋:")
print(" git add GameData/*.csv")
print(' git commit -m "Update data from Notion"')
if __name__ == "__main__":
main()