o2o-infinith-demo/scripts/archive-screenshots.py

185 lines
7.4 KiB
Python

#!/usr/bin/env python3
"""
archive-screenshots.py
GCS 임시 URL(7일 만료)로 저장된 스크린샷을 Supabase Storage에 영구 아카이브.
실행: python3 scripts/archive-screenshots.py
환경: SUPABASE_URL, SUPABASE_SERVICE_ROLE_KEY (.env 또는 환경변수)
"""
import os, json, re, urllib.request, urllib.parse
# ── 환경변수 로드 ──────────────────────────────────────────────────────────────
def load_env():
env = {}
env_path = os.path.join(os.path.dirname(__file__), '..', '.env')
try:
with open(env_path) as f:
for line in f:
line = line.strip()
if line and not line.startswith('#') and '=' in line:
k, v = line.split('=', 1)
env[k.strip()] = v.strip().strip('"').strip("'")
except FileNotFoundError:
pass
# 환경변수 우선
for k in ('SUPABASE_URL', 'SUPABASE_SERVICE_ROLE_KEY', 'VITE_SUPABASE_URL'):
if os.environ.get(k):
env[k] = os.environ[k]
return env
env = load_env()
SUPABASE_URL = env.get('SUPABASE_URL') or env.get('VITE_SUPABASE_URL', '').replace('VITE_', '')
SERVICE_KEY = env.get('SUPABASE_SERVICE_ROLE_KEY', '')
BUCKET = 'screenshots'
DB_DIR = os.path.join(os.path.dirname(__file__), '..', 'src', 'data', 'db')
if not SUPABASE_URL or not SERVICE_KEY:
print('❌ SUPABASE_URL 또는 SUPABASE_SERVICE_ROLE_KEY 환경변수가 없습니다.')
print(' .env 파일에 추가하거나 환경변수로 설정해주세요.')
raise SystemExit(1)
# ── Supabase Storage 업로드 ────────────────────────────────────────────────────
def upload_to_storage(storage_path: str, image_bytes: bytes) -> str:
"""Supabase Storage에 업로드 후 public URL 반환."""
encoded_path = urllib.parse.quote(storage_path, safe='/')
url = f'{SUPABASE_URL}/storage/v1/object/{BUCKET}/{encoded_path}'
req = urllib.request.Request(
url,
data=image_bytes,
method='POST',
headers={
'Authorization': f'Bearer {SERVICE_KEY}',
'Content-Type': 'image/png',
'x-upsert': 'true', # 중복이면 덮어쓰기
}
)
try:
with urllib.request.urlopen(req, timeout=30) as r:
r.read()
except urllib.error.HTTPError as e:
body = e.read().decode()
raise RuntimeError(f'Upload failed {e.code}: {body}')
public_url = f'{SUPABASE_URL}/storage/v1/object/public/{BUCKET}/{encoded_path}'
return public_url
# ── GCS URL에서 이미지 다운로드 ────────────────────────────────────────────────
def fetch_image(gcs_url: str) -> bytes:
req = urllib.request.Request(gcs_url, headers={'User-Agent': 'Mozilla/5.0'})
with urllib.request.urlopen(req, timeout=20) as r:
return r.read()
# ── Supabase DB에서 모든 리포트 조회 + URL 업데이트 ───────────────────────────
def fetch_reports():
url = f'{SUPABASE_URL}/rest/v1/marketing_reports?select=id,clinic_name,url,channel_data,report'
req = urllib.request.Request(url, headers={
'Authorization': f'Bearer {SERVICE_KEY}',
'apikey': SERVICE_KEY,
'Accept': 'application/json',
})
with urllib.request.urlopen(req, timeout=30) as r:
return json.loads(r.read())
def update_report_screenshots(report_id: str, channel_data: dict, report: dict):
"""Supabase DB에 업데이트된 channel_data, report JSONB 저장."""
payload = json.dumps({'channel_data': channel_data, 'report': report}).encode()
url = f'{SUPABASE_URL}/rest/v1/marketing_reports?id=eq.{report_id}'
req = urllib.request.Request(url, data=payload, method='PATCH', headers={
'Authorization': f'Bearer {SERVICE_KEY}',
'apikey': SERVICE_KEY,
'Content-Type': 'application/json',
'Prefer': 'return=minimal',
})
with urllib.request.urlopen(req, timeout=30) as r:
r.read()
# ── 메인 ───────────────────────────────────────────────────────────────────────
def get_domain(site_url: str) -> str:
try:
return urllib.parse.urlparse(site_url).hostname.replace('www.', '') or 'unknown'
except Exception:
return 'unknown'
def archive_screenshots_for_report(row: dict) -> int:
report_id = row['id']
clinic_name = row.get('clinic_name', '?')
domain = get_domain(row.get('url', ''))
channel_data = row.get('channel_data') or {}
report = row.get('report') or {}
ss_list = channel_data.get('screenshots', [])
rss_list = report.get('screenshots', [])
all_ss = {s['id']: s for s in ss_list + rss_list if s.get('url')}
archived = 0
for ss_id, ss in all_ss.items():
url = ss.get('url', '')
if 'googleapis.com' not in url and 'firecrawl' not in url:
continue # 이미 Supabase URL이거나 다른 URL
storage_path = f'clinics/{domain}/{report_id}/screenshots/{ss_id}.png'
try:
print(f' → 다운로드: {ss_id} ({url[:60]}...)')
image_bytes = fetch_image(url)
public_url = upload_to_storage(storage_path, image_bytes)
# in-place URL 교체
ss['url'] = public_url
if ss_id in {s.get('id') for s in ss_list}:
for s in ss_list:
if s.get('id') == ss_id:
s['url'] = public_url
if ss_id in {s.get('id') for s in rss_list}:
for s in rss_list:
if s.get('id') == ss_id:
s['url'] = public_url
print(f' ✅ 아카이브 완료 → {public_url[:70]}...')
archived += 1
except Exception as e:
print(f' ❌ 실패: {e}')
if archived > 0:
# DB 업데이트
update_report_screenshots(report_id, channel_data, report)
print(f' 💾 DB 업데이트 완료 ({clinic_name})')
return archived
def main():
print('=== Supabase Screenshot 영구 아카이브 ===')
print(f'대상: {SUPABASE_URL}')
print()
print('DB에서 리포트 목록 조회 중...')
reports = fetch_reports()
print(f'{len(reports)}개 리포트')
print()
total_archived = 0
for row in reports:
name = row.get('clinic_name', '?')
ss_count = len((row.get('channel_data') or {}).get('screenshots', []))
rss_count = len((row.get('report') or {}).get('screenshots', []))
has_gcs = any(
'googleapis.com' in (s.get('url', ''))
for s in (row.get('channel_data') or {}).get('screenshots', [])
+ (row.get('report') or {}).get('screenshots', [])
)
if not has_gcs:
continue
print(f'[{name}] channel_data={ss_count}개 / report={rss_count}개 스크린샷')
n = archive_screenshots_for_report(row)
total_archived += n
print()
print(f'=== 완료: 총 {total_archived}개 스크린샷 영구 저장 ===')
if __name__ == '__main__':
main()