데이터파이프 라인에서 리스트 타입 지원

list:int, list:bool 등
This commit is contained in:
2026-01-30 13:40:31 +09:00
parent 23f200348f
commit 6f358a4aef
3 changed files with 238 additions and 732 deletions

View File

@@ -1,9 +1,8 @@
# DataTools/sync_from_notion.py
"""노션 스키마 → CSV 동기화 (자동 발견)"""
import os
import re
import json
import pandas as pd
import shutil
from notion_client import Client
from pathlib import Path
from datetime import datetime
@@ -11,8 +10,6 @@ from datetime import datetime
# ===== 설정 =====
NOTION_API_KEY = "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"
notion = Client(auth=NOTION_API_KEY) if NOTION_API_KEY else None
# ⭐ 부모 페이지 ID만 설정 (1회)
SCHEMA_PARENT_PAGE_ID = "2f494d45b1a3818fa9fceb4f9e17d905"
SCRIPT_DIR = Path(__file__).parent
@@ -20,456 +17,162 @@ GAMEDATA_DIR = SCRIPT_DIR.parent / "GameData"
BACKUP_DIR = GAMEDATA_DIR / "Backups"
BACKUP_DIR.mkdir(exist_ok=True)
# ===== 유틸리티 함수 =====
def clean_page_title(title):
"""
노션 페이지 제목 → 스키마 이름 변환
규칙:
1. 이모지, 특수문자 제거
2. "스키마", "Schema" 제거
3. 공백 제거
예시:
- "🏰 타워 스키마""타워"
- "Tower Schema""Tower"
- "Enemies""Enemies"
"""
# 1. 이모지 및 특수문자 제거
cleaned = re.sub(r'[^\w\s가-힣]', '', title).strip()
# 2. "스키마", "Schema" 제거
cleaned = re.sub(r'\s*(스키마|Schema)\s*', '', cleaned, flags=re.IGNORECASE).strip()
# 3. 공백 제거
cleaned = cleaned.replace(' ', '')
# 4. 비어있으면 원본 반환
if not cleaned:
return title.replace(' ', '')
return cleaned
return cleaned if cleaned else title.replace(' ', '')
def check_page_has_table(page_id):
try:
blocks_response = notion.blocks.children.list(block_id=page_id)
return any(block.get('type') == 'table' for block in blocks_response.get('results', []))
except:
return False
def discover_schema_pages(parent_id=None, depth=0, max_depth=3):
"""
부모 페이지의 하위 페이지들을 재귀적으로 탐색하여 스키마 발견
Args:
parent_id: 탐색할 부모 페이지 ID (None이면 SCHEMA_PARENT_PAGE_ID 사용)
depth: 현재 깊이 (0부터 시작)
max_depth: 최대 탐색 깊이 (기본 3단계)
반환:
{
"타워": "page_id_1",
"적유닛": "page_id_2",
...
}
"""
if not notion:
raise ValueError("Notion API 클라이언트가 초기화되지 않았습니다.")
if parent_id is None:
if not SCHEMA_PARENT_PAGE_ID or SCHEMA_PARENT_PAGE_ID == "노션_데이터_스키마_정의_페이지_ID":
raise ValueError(
"SCHEMA_PARENT_PAGE_ID가 설정되지 않았습니다.\n"
"sync_from_notion.py 파일에서 부모 페이지 ID를 설정하세요."
)
parent_id = SCHEMA_PARENT_PAGE_ID
# 최대 깊이 체크
if depth > max_depth:
return {}
if not notion: raise ValueError("API Key missing")
parent_id = parent_id or SCHEMA_PARENT_PAGE_ID
if depth > max_depth: return {}
indent = " " * depth
if depth == 0:
print("🔍 스키마 페이지 자동 발견 중...")
if depth == 0: print("🔍 스키마 페이지 자동 발견 중...")
try:
# 부모 페이지의 자식 블록 가져오기
children = notion.blocks.children.list(block_id=parent_id)
schemas = {}
for block in children['results']:
if block['type'] == 'child_page':
page_id = block['id']
page_title = block['child_page']['title']
# 제목 정리
schema_name = clean_page_title(page_title)
print(f"{indent}📋 발견: '{page_title}'", end="")
# 이 페이지에 테이블이 있는지 확인
has_table = check_page_has_table(page_id)
if has_table:
# 테이블이 있으면 스키마로 등록
if check_page_has_table(page_id):
schemas[schema_name] = page_id
print(f"{schema_name}")
else:
# 테이블이 없으면 하위 페이지 탐색
print(f" (폴더)")
child_schemas = discover_schema_pages(page_id, depth + 1, max_depth)
schemas.update(child_schemas)
if depth == 0 and not schemas:
print(" ⚠️ 하위 페이지를 찾을 수 없습니다.")
print(f" 💡 노션에서 부모 페이지 하위에 스키마 페이지를 추가하세요.")
schemas.update(discover_schema_pages(page_id, depth + 1, max_depth))
return schemas
except Exception as e:
print(f"{indent}❌ 탐색 실패: {e}")
import traceback
traceback.print_exc()
return {}
def check_page_has_table(page_id):
"""
페이지에 테이블 블록이 있는지 확인
Args:
page_id: 확인할 페이지 ID
반환:
True: 테이블 있음
False: 테이블 없음
"""
try:
blocks_response = notion.blocks.children.list(block_id=page_id)
blocks = blocks_response.get('results', [])
for block in blocks:
if block.get('type') == 'table':
return True
return False
except Exception as e:
# 에러 발생 시 테이블 없음으로 간주
return False
def parse_condition(condition_str):
"""
사용 조건 파싱
빈 문자열 → None (항상 사용)
"tower_type=attack"{'field': 'tower_type', 'op': '=', 'value': 'attack'}
"""
if not condition_str or condition_str.strip() == "":
return None
condition_str = condition_str.strip()
# 단순 조건: "tower_type=attack"
match = re.match(r'(\w+)\s*(=|!=|>|<|>=|<=)\s*(.+)', condition_str)
if not condition_str or condition_str.strip() == "": return None
match = re.match(r'(\w+)\s*(=|!=|>|<|>=|<=)\s*(.+)', condition_str.strip())
if match:
return {
'field': match.group(1),
'op': match.group(2),
'value': match.group(3).strip()
}
return {'field': match.group(1), 'op': match.group(2), 'value': match.group(3).strip()}
return None
def parse_notion_table(page_id):
"""노션 테이블 파싱"""
blocks = notion.blocks.children.list(block_id=page_id).get('results', [])
table_block = next((b for b in blocks if b['type'] == 'table'), None)
if not table_block: raise ValueError("테이블을 찾을 수 없습니다.")
if not notion:
raise ValueError("Notion API 클라이언트가 초기화되지 않았습니다.")
try:
# 1. 블록 가져오기
blocks_response = notion.blocks.children.list(block_id=page_id)
blocks = blocks_response.get('results', [])
# 2. 테이블 찾기
table_block = None
for block in blocks:
if block.get('type') == 'table':
table_block = block
break
if not table_block:
raise ValueError(f"테이블을 찾을 수 없습니다.")
print(f" 📋 테이블 발견")
# 3. 행 가져오기
table_id = table_block['id']
rows_response = notion.blocks.children.list(block_id=table_id)
rows = rows_response.get('results', [])
if len(rows) < 2:
raise ValueError("테이블에 데이터가 없습니다.")
# 4. 파싱
schema = []
def extract_text(cell, preserve_newlines=False):
"""
셀에서 텍스트 추출
Args:
cell: 노션 셀 데이터
preserve_newlines: True면 줄바꿈 보존, False면 공백으로 변환
"""
if not cell or len(cell) == 0:
return ""
text_parts = []
for content in cell:
if content.get('type') == 'text':
text_content = content.get('text', {}).get('content', '')
text_parts.append(text_content)
if preserve_newlines:
# 줄바꿈 보존 (\\n으로 이스케이프)
result = ''.join(text_parts)
# CSV에서 안전하게 저장하기 위해 실제 줄바꿈을 \\n으로 변환
result = result.replace('\n', '\\n')
return result.strip()
else:
# 줄바꿈을 공백으로 변환
return ''.join(text_parts).strip()
for row_idx, row in enumerate(rows[1:], start=2):
if row.get('type') != 'table_row':
continue
cells = row['table_row']['cells']
# 4개 컬럼: 필드명, 타입, 사용 조건, 설명
field_name = extract_text(cells[0]) if len(cells) > 0 else ""
field_type = extract_text(cells[1]) if len(cells) > 1 else "string"
condition_str = extract_text(cells[2]) if len(cells) > 2 else ""
# ⭐ 설명 컬럼만 줄바꿈 보존
description = extract_text(cells[3], preserve_newlines=True) if len(cells) > 3 else ""
if not field_name:
continue
# 조건 파싱
condition = parse_condition(condition_str)
if condition:
print(f" 📌 {field_name}: {condition['field']}={condition['value']}일 때 사용")
schema.append({
'name': field_name,
'type': field_type.lower(),
'condition': condition,
'description': description
})
if len(schema) == 0:
raise ValueError("파싱된 스키마가 비어있습니다.")
return schema
except Exception as e:
print(f" ❌ 파싱 오류: {e}")
raise
rows = notion.blocks.children.list(block_id=table_block['id']).get('results', [])
schema = []
def extract_text(cell, preserve_newlines=False):
if not cell: return ""
text = "".join([c.get('text', {}).get('content', '') for c in cell if c.get('type') == 'text'])
return text.replace('\n', '\\n').strip() if preserve_newlines else text.strip()
for row in rows[1:]: # 헤더 제외
if row.get('type') != 'table_row': continue
cells = row['table_row']['cells']
field_name = extract_text(cells[0])
if not field_name: continue
# 필드 파싱
field_type = extract_text(cells[1]).lower()
condition_str = extract_text(cells[2])
description = extract_text(cells[3], preserve_newlines=True) if len(cells) > 3 else ""
schema.append({
'name': field_name,
'type': field_type, # "list:int" 형태 그대로 보존
'condition': parse_condition(condition_str),
'description': description
})
return schema
def get_default_value(field_type, has_condition):
"""
기본값 결정
"""기본값 결정 (List 대응)"""
if has_condition: return None
조건부 필드 → None (빈 칸)
공통 필드 → 타입별 기본값
"""
# 조건부 필드는 빈 칸
if has_condition:
return None
# 공통 필드는 타입별 기본값
if field_type == "int":
return 0
elif field_type in ["float", "number"]:
return 0.0
elif field_type in ["bool", "boolean"]:
return False
elif field_type == "string":
return ""
else:
return None
f_type = field_type.lower()
if 'list' in f_type: return [] # 리스트는 빈 리스트 객체
if f_type == "int": return 0
if f_type in ["float", "number"]: return 0.0
if f_type in ["bool", "boolean"]: return False
return ""
def merge_schema_and_data(schema, existing_data):
"""스키마와 데이터 병합"""
schema_columns = [f['name'] for f in schema]
if existing_data is None or existing_data.empty:
print(" 새 파일 생성")
example_row = {}
for field in schema:
has_condition = field.get('condition') is not None
example_row[field['name']] = get_default_value(field['type'], has_condition)
example_row = {f['name']: get_default_value(f['type'], f.get('condition') is not None) for f in schema}
# CSV 저장을 위해 리스트는 문자열로 변환 (빈 값)
for k, v in example_row.items():
if isinstance(v, list): example_row[k] = ""
return pd.DataFrame([example_row])
print(f" 기존 데이터: {len(existing_data)}")
new_df = pd.DataFrame()
for field in schema:
col_name = field['name']
if col_name in existing_data.columns:
print(f"{col_name}: 유지")
new_df[col_name] = existing_data[col_name]
col = field['name']
if col in existing_data.columns:
new_df[col] = existing_data[col]
else:
has_condition = field.get('condition') is not None
default_val = get_default_value(field['type'], has_condition)
if default_val is None:
print(f" + {col_name}: 추가 (조건부 필드, 빈 칸)")
else:
print(f" + {col_name}: 추가 (기본값: {default_val})")
new_df[col_name] = default_val
val = get_default_value(field['type'], field.get('condition') is not None)
new_df[col] = "" if isinstance(val, list) else val
return new_df
def sync_single_schema(data_name, page_id):
"""단일 스키마 동기화 (CSV 버전)"""
print(f"\n{'='*60}")
print(f"📋 {data_name} 동기화")
print(f"{'='*60}")
print(f"\n🔄 {data_name} 동기화 시작...")
try:
# 1. 스키마 읽기
print("1⃣ 스키마 읽기...")
schema = parse_notion_table(page_id)
print(f"{len(schema)}개 필드")
# 2. 스키마 JSON으로 저장 (검증용)
import json
schema_json_path = GAMEDATA_DIR / f".{data_name}_schema.json"
with open(schema_json_path, 'w', encoding='utf-8') as f:
# 1. 스키마 JSON 저장 (generate_all_classes.py가 읽을 파일)
schema_path = GAMEDATA_DIR / f".{data_name}_schema.json"
with open(schema_path, 'w', encoding='utf-8') as f:
json.dump(schema, f, ensure_ascii=False, indent=2)
print(f" 💾 스키마 저장: {schema_json_path.name}")
# 3. 기존 파일 확인
# 2. CSV 업데이트
csv_path = GAMEDATA_DIR / f"{data_name}.csv"
print(f"\n2⃣ 기존 파일: {csv_path}")
existing_data = pd.read_csv(csv_path) if csv_path.exists() else None
existing_data = None
if csv_path.exists():
# 백업
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
backup_path = BACKUP_DIR / f"{data_name}_{timestamp}.csv"
import shutil
shutil.copy2(csv_path, backup_path)
print(f" 💾 백업: {backup_path.name}")
if existing_data is not None:
shutil.copy2(csv_path, BACKUP_DIR / f"{data_name}_{datetime.now().strftime('%Y%m%d_%H%M%S')}.csv")
existing_data = pd.read_csv(csv_path)
# 4. 병합
print(f"\n3⃣ 병합 중...")
merged_df = merge_schema_and_data(schema, existing_data)
# 5. 저장 (CSV)
print(f"\n4⃣ 저장...")
merged_df.to_csv(csv_path, index=False, encoding='utf-8-sig')
print(f" ✅ 완료: {csv_path}")
print(f" ✅ 완료: {data_name} (스키마 및 CSV 업데이트)")
return True
except Exception as e:
print(f"\n❌ 오류: {e}")
import traceback
traceback.print_exc()
print(f" ❌ 실패: {e}")
return False
def main():
print("=" * 60)
print("🔄 Notion → CSV 동기화 (자동 발견)")
print("=" * 60)
print()
if not NOTION_API_KEY:
print("❌ NOTION_API_KEY 환경변수가 없습니다")
print("💡 설정 방법:")
print(' $env:NOTION_API_KEY = "your_key"')
return
print(f"📂 데이터 폴더: {GAMEDATA_DIR}")
print(f"💾 백업 폴더: {BACKUP_DIR}")
print()
# ⭐ 스키마 자동 발견
print("🚀 Notion Schema Sync Start")
try:
SCHEMA_PAGE_IDS = discover_schema_pages()
page_ids = discover_schema_pages()
if not page_ids: return
schemas = list(page_ids.keys())
for idx, name in enumerate(schemas, 1):
print(f" {idx}. {name}")
print(f" {len(schemas) + 1}. 전체")
choice = input("\n번호 선택: ").strip()
selected = schemas if choice == str(len(schemas) + 1) else [schemas[int(choice)-1]]
for name in selected:
sync_single_schema(name, page_ids[name])
except Exception as e:
print(f"\n❌ 스키마 발견 실패: {e}")
return
if not SCHEMA_PAGE_IDS:
print("\n❌ 발견된 스키마 페이지가 없습니다.")
return
print()
print("=" * 60)
print("동기화할 스키마를 선택하세요:")
schemas = list(SCHEMA_PAGE_IDS.keys())
for idx, name in enumerate(schemas, 1):
print(f" {idx}. {name}")
print(f" {len(schemas) + 1}. 전체")
print()
try:
choice = input("선택 (번호 입력): ").strip()
if choice == str(len(schemas) + 1):
selected = schemas
else:
idx = int(choice) - 1
if 0 <= idx < len(schemas):
selected = [schemas[idx]]
else:
print("❌ 잘못된 선택입니다.")
return
except (ValueError, KeyboardInterrupt):
print("\n⚠️ 취소되었습니다.")
return
# 동기화 실행
print()
success_count = 0
for schema_name in selected:
page_id = SCHEMA_PAGE_IDS[schema_name]
if sync_single_schema(schema_name, page_id):
success_count += 1
# 최종 결과
print()
print("=" * 60)
print(f"✅ 완료: {success_count}/{len(selected)} 성공")
print("=" * 60)
if success_count > 0:
print()
print("💡 다음 단계:")
print(" 1. GameData 폴더에서 CSV 파일 확인")
print(" 2. 데이터 수정")
print(" 3. Git 커밋:")
print(" git add GameData/*.csv")
print(' git commit -m "Update data from Notion"')
print(f"오류 발생: {e}")
if __name__ == "__main__":
main()