feat: switch to BFF JSON API for notification discovery
All checks were successful
ZIPAIR Singapore Sale Monitor / check (push) Successful in 22s

Replace HTML scraping with a direct call to bff.zipair.net/v1/information
via FlareSolverr. The BFF returns clean JSON (id, title, category,
publishedAt) — no regex parsing of rendered pages. Also adds first-run
seeding to avoid alerting on all historical notifications.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-05-17 19:41:13 +08:00
parent 2801757e34
commit 2ce66aba10
3 changed files with 77 additions and 85 deletions

3
.gitignore vendored
View File

@@ -1 +1,4 @@
.claude/
dump.html
debug_fetch.py
dump_page.py

View File

@@ -1,8 +1,8 @@
#!/usr/bin/env python3
"""
ZIPAIR Singapore Winter Sale Monitor
Uses FlareSolverr to bypass WAF, scrapes the EN notification listing,
and fires an ntfy push when a Singapore/winter sale is detected.
ZIPAIR Notification Monitor
Calls the ZIPAIR BFF API via FlareSolverr, compares notification IDs against
last_seen state, and fires an ntfy push for each new entry.
State is persisted in last_seen.txt (committed back to repo by the workflow).
"""
@@ -11,25 +11,24 @@ import os
import re
import sys
import json
import time
import urllib.request
import urllib.error
from datetime import datetime
from datetime import datetime, timezone
# ── Config ───────────────────────────────────────────────────────────────────
NTFY_URL = os.environ.get("NTFY_URL") or "https://ntfy.isky-homelab.com/zipair"
NTFY_TOKEN = os.environ.get("NTFY_TOKEN", "")
STATE_FILE = os.environ.get("STATE_FILE", "last_seen.txt")
FLARESOLVERR_URL = os.environ.get("FLARESOLVERR_URL", "http://192.168.10.76:8191")
NTFY_URL = os.environ.get("NTFY_URL") or "https://ntfy.isky-homelab.com/zipair"
NTFY_TOKEN = os.environ.get("NTFY_TOKEN", "")
STATE_FILE = os.environ.get("STATE_FILE", "last_seen.txt")
FLARESOLVERR_URL = os.environ.get("FLARESOLVERR_URL", "http://192.168.10.76:8191")
ZIPAIR_NOTIF_LIST = "https://www.zipair.net/en/notification/"
ZIPAIR_NOTIF_BASE = "https://www.zipair.net"
BFF_API = "https://bff.zipair.net/v1/information"
NOTIF_BASE = "https://www.zipair.net/en/notification"
# ── FlareSolverr fetch ───────────────────────────────────────────────────────
def fs_fetch(url: str, timeout_ms: int = 60000) -> str:
"""Fetch a URL via FlareSolverr and return the HTML, or empty string on error."""
"""Fetch a URL via FlareSolverr and return the response body, or '' on error."""
payload = json.dumps({
"cmd": "request.get",
"url": url,
@@ -44,22 +43,36 @@ def fs_fetch(url: str, timeout_ms: int = 60000) -> str:
try:
with urllib.request.urlopen(req, timeout=timeout_ms // 1000 + 10) as resp:
data = json.loads(resp.read())
status = data.get("solution", {}).get("status", 0)
html = data.get("solution", {}).get("response", "")
print(f" FlareSolverr: {data.get('status')} | HTTP {status} | {len(html)} bytes")
return html
http_status = data.get("solution", {}).get("status", 0)
body = data.get("solution", {}).get("response", "")
print(f" FlareSolverr: {data.get('status')} | HTTP {http_status} | {len(body)} bytes")
return body
except Exception as e:
print(f" FlareSolverr error for {url}: {e}", file=sys.stderr)
return ""
# ── State (slug-based) ───────────────────────────────────────────────────────
def read_seen_slugs() -> set:
def fs_fetch_json(url: str) -> dict | None:
"""Fetch a JSON API via FlareSolverr. Chromium wraps JSON in <pre>, so strip that."""
body = fs_fetch(url)
if not body:
return None
# Chromium renders raw JSON as <html><body><pre>{...}</pre></body></html>
m = re.search(r"<pre[^>]*>(.*)</pre>", body, re.DOTALL)
raw = m.group(1) if m else body
try:
return json.loads(raw)
except Exception as e:
print(f" JSON parse error: {e}", file=sys.stderr)
return None
# ── State ────────────────────────────────────────────────────────────────────
def read_seen_ids() -> set:
try:
with open(STATE_FILE) as f:
raw = f.read().strip()
# new format: JSON list of slugs
data = json.loads(raw)
data = json.loads(f.read().strip())
if isinstance(data, list):
return set(data)
except Exception:
@@ -67,57 +80,22 @@ def read_seen_slugs() -> set:
return set()
def write_seen_slugs(slugs: set):
def write_seen_ids(ids: set):
with open(STATE_FILE, "w") as f:
json.dump(sorted(slugs), f)
print(f"State updated: {len(slugs)} slug(s) tracked.")
json.dump(sorted(ids), f)
print(f"State updated: {len(ids)} notification ID(s) tracked.")
# ── Notification discovery ───────────────────────────────────────────────────
def get_notification_slugs() -> list[str]:
"""Fetch the EN notification listing and return all notification slugs."""
print(f"Fetching notification listing via FlareSolverr …")
html = fs_fetch(ZIPAIR_NOTIF_LIST)
if not html:
return []
# Strategy 1: parse __NEXT_DATA__ JSON (Next.js SSR)
paths = []
m = re.search(r'<script id="__NEXT_DATA__"[^>]*>(.*?)</script>', html, re.DOTALL)
if m:
try:
nd = json.loads(m.group(1))
# flatten all string values and grep for /notification/ paths
raw_json = json.dumps(nd)
paths = re.findall(r'(?:\\?/(?:en|ja|ko|th|zh-tw|zh-cn)\\?/notification\\?/([^"\\/?#]+))', raw_json)
except Exception as e:
print(f" __NEXT_DATA__ parse error: {e}", file=sys.stderr)
# Strategy 2: any href / quoted path in raw HTML
if not paths:
paths = [s for _, s in re.findall(
r'["\'](/(?:en|ja|ko|th|zh-tw|zh-cn)/notification/([^"\'?#/]+))["\']', html
)]
# dedupe, preserving order
seen = set()
result = []
for slug in paths:
if slug and slug not in seen:
seen.add(slug)
result.append((slug, f"{ZIPAIR_NOTIF_BASE}/en/notification/{slug}"))
print(f" Found {len(result)} notification(s) on listing page.")
return result # list of (slug, full_url)
# ── ntfy ─────────────────────────────────────────────────────────────────────
def send_ntfy(slug: str, url: str):
title = "✈️ New ZIPAIR Notification"
message = f"New announcement posted: {slug}\n{url}"
payload = json.dumps({
def send_ntfy(notif: dict):
notif_id = notif["id"]
title = notif.get("title", f"Notification #{notif_id}")
url = f"{NOTIF_BASE}/{notif_id}"
payload = json.dumps({
"topic": NTFY_URL.rstrip("/").rsplit("/", 1)[-1],
"title": title,
"message": message,
"title": f"✈️ New ZIPAIR Notification",
"message": title,
"priority": 5,
"tags": ["airplane", "moneybag"],
"click": url,
@@ -135,43 +113,54 @@ def send_ntfy(slug: str, url: str):
req.add_header("Authorization", f"Bearer {NTFY_TOKEN}")
try:
with urllib.request.urlopen(req, timeout=10) as resp:
print(f"ntfy response: {resp.status} {resp.reason}")
print(f" ntfy: {resp.status} {resp.reason}")
except Exception as e:
print(f"Failed to send ntfy: {e}", file=sys.stderr)
print(f" Failed to send ntfy: {e}", file=sys.stderr)
sys.exit(1)
# ── Main ─────────────────────────────────────────────────────────────────────
def main():
print(f"\n[{datetime.utcnow().isoformat()}Z] ZIPAIR monitor starting ")
print(f"\n[{datetime.now(timezone.utc).isoformat()}] ZIPAIR monitor starting ...")
print(f" ntfy URL : {NTFY_URL}")
print(f" FlareSolverr : {FLARESOLVERR_URL}")
seen_slugs = read_seen_slugs()
print(f" Known slugs : {len(seen_slugs)}")
seen_ids = read_seen_ids()
print(f" Known IDs : {len(seen_ids)}")
notifications = get_notification_slugs()
if not notifications:
api_url = f"{BFF_API}?language=en&page=1"
print(f"Fetching {api_url} via FlareSolverr ...")
data = fs_fetch_json(api_url)
if not data or "information" not in data:
print("Could not retrieve notification list; exiting.")
sys.exit(0)
all_slugs = {slug for slug, _ in notifications}
new_entries = [(slug, url) for slug, url in notifications if slug not in seen_slugs]
notifications = data["information"]
total = data.get("informationTotal", len(notifications))
all_ids = {n["id"] for n in notifications}
new_notifications = [n for n in notifications if n["id"] not in seen_ids]
if not new_entries:
print("No new notifications since last check. All good.")
write_seen_slugs(all_slugs)
print(f" API total: {total} | Page 1: {len(notifications)} | New: {len(new_notifications)}")
if not new_notifications:
print("No new notifications. All good.")
write_seen_ids(all_ids | seen_ids)
sys.exit(0)
print(f" {len(new_entries)} new notification(s): {[s for s,_ in new_entries]}")
# First run: no prior state — seed without alerting to avoid a flood
if not seen_ids:
print(f"First run: seeding state with {len(all_ids)} notification ID(s), no alerts sent.")
write_seen_ids(all_ids)
sys.exit(0)
for slug, url in new_entries:
print(f" 🚨 New notification: {slug} sending ntfy ")
send_ntfy(slug, url)
for notif in new_notifications:
print(f" NEW: [{notif['id']}] {notif['title']} -- sending ntfy ...")
send_ntfy(notif)
write_seen_slugs(all_slugs)
write_seen_ids(all_ids | seen_ids)
print("Done.")
sys.exit(0)

View File

@@ -1 +1 @@
[]
[297, 298, 300, 307, 308, 309, 313, 316, 317, 318, 319, 320, 321, 327, 329, 330, 335, 336, 340, 344, 345, 346, 349, 350, 352, 355, 356, 357, 359, 360, 361, 362, 364, 365, 369, 371, 372, 375, 377, 379, 381, 387, 390, 391, 392, 393, 395, 397, 399, 400]