Files
zipair-monitor/check_zipair.py
2026-05-17 16:45:05 +08:00

166 lines
6.2 KiB
Python

#!/usr/bin/env python3
"""
ZIPAIR Singapore Winter Sale Monitor
Uses FlareSolverr to bypass WAF, scrapes the EN notification listing,
and fires an ntfy push when a Singapore/winter sale is detected.
State is persisted in last_seen.txt (committed back to repo by the workflow).
"""
import os
import re
import sys
import json
import time
import urllib.request
import urllib.error
from datetime import datetime
# ── Config ───────────────────────────────────────────────────────────────────
NTFY_URL = os.environ.get("NTFY_URL") or "https://ntfy.isky-homelab.com/zipair"
NTFY_TOKEN = os.environ.get("NTFY_TOKEN", "")
STATE_FILE = os.environ.get("STATE_FILE", "last_seen.txt")
FLARESOLVERR_URL = os.environ.get("FLARESOLVERR_URL", "http://192.168.10.76:8191")
ZIPAIR_NOTIF_LIST = "https://www.zipair.net/en/notification/"
ZIPAIR_NOTIF_BASE = "https://www.zipair.net"
# ── FlareSolverr fetch ───────────────────────────────────────────────────────
def fs_fetch(url: str, timeout_ms: int = 60000) -> str:
"""Fetch a URL via FlareSolverr and return the HTML, or empty string on error."""
payload = json.dumps({
"cmd": "request.get",
"url": url,
"maxTimeout": timeout_ms,
}).encode()
req = urllib.request.Request(
f"{FLARESOLVERR_URL.rstrip('/')}/v1",
data=payload,
headers={"Content-Type": "application/json"},
method="POST",
)
try:
with urllib.request.urlopen(req, timeout=timeout_ms // 1000 + 10) as resp:
data = json.loads(resp.read())
status = data.get("solution", {}).get("status", 0)
html = data.get("solution", {}).get("response", "")
print(f" FlareSolverr: {data.get('status')} | HTTP {status} | {len(html)} bytes")
return html
except Exception as e:
print(f" FlareSolverr error for {url}: {e}", file=sys.stderr)
return ""
# ── State (slug-based) ───────────────────────────────────────────────────────
def read_seen_slugs() -> set:
try:
with open(STATE_FILE) as f:
raw = f.read().strip()
# new format: JSON list of slugs
data = json.loads(raw)
if isinstance(data, list):
return set(data)
except Exception:
pass
return set()
def write_seen_slugs(slugs: set):
with open(STATE_FILE, "w") as f:
json.dump(sorted(slugs), f)
print(f"State updated: {len(slugs)} slug(s) tracked.")
# ── Notification discovery ───────────────────────────────────────────────────
def get_notification_slugs() -> list[str]:
"""Fetch the EN notification listing and return all notification slugs."""
print(f"Fetching notification listing via FlareSolverr …")
html = fs_fetch(ZIPAIR_NOTIF_LIST)
if not html:
return []
# Match /en/notification/some-slug or /en/notification/123
# Try both double and single quotes, and also JSON-style escaped URLs
slugs = re.findall(r'["\'](/(?:en|ja|ko|th|zh-tw|zh-cn)/notification/([^"\'?#/]+))["\']', html)
# dedupe by slug
seen = set()
result = []
for path, slug in slugs:
if slug and slug not in seen:
seen.add(slug)
result.append((slug, ZIPAIR_NOTIF_BASE + path))
print(f" Found {len(result)} notification(s) on listing page.")
return result # list of (slug, full_url)
# ── ntfy ─────────────────────────────────────────────────────────────────────
def send_ntfy(slug: str, url: str):
title = "✈️ New ZIPAIR Notification"
message = f"New announcement posted: {slug}\n{url}"
payload = json.dumps({
"topic": NTFY_URL.rstrip("/").rsplit("/", 1)[-1],
"title": title,
"message": message,
"priority": 5,
"tags": ["airplane", "moneybag"],
"click": url,
"actions": [{"action": "view", "label": "Open ZIPAIR", "url": url}],
}).encode()
base_url = NTFY_URL.rstrip("/").rsplit("/", 1)[0]
req = urllib.request.Request(
f"{base_url}/",
data=payload,
headers={"Content-Type": "application/json"},
method="POST",
)
if NTFY_TOKEN:
req.add_header("Authorization", f"Bearer {NTFY_TOKEN}")
try:
with urllib.request.urlopen(req, timeout=10) as resp:
print(f"ntfy response: {resp.status} {resp.reason}")
except Exception as e:
print(f"Failed to send ntfy: {e}", file=sys.stderr)
sys.exit(1)
# ── Main ─────────────────────────────────────────────────────────────────────
def main():
print(f"\n[{datetime.utcnow().isoformat()}Z] ZIPAIR monitor starting …")
print(f" ntfy URL : {NTFY_URL}")
print(f" FlareSolverr : {FLARESOLVERR_URL}")
seen_slugs = read_seen_slugs()
print(f" Known slugs : {len(seen_slugs)}")
notifications = get_notification_slugs()
if not notifications:
print("Could not retrieve notification list; exiting.")
sys.exit(0)
all_slugs = {slug for slug, _ in notifications}
new_entries = [(slug, url) for slug, url in notifications if slug not in seen_slugs]
if not new_entries:
print("No new notifications since last check. All good.")
write_seen_slugs(all_slugs)
sys.exit(0)
print(f" {len(new_entries)} new notification(s): {[s for s,_ in new_entries]}")
for slug, url in new_entries:
print(f" 🚨 New notification: {slug} — sending ntfy …")
send_ntfy(slug, url)
write_seen_slugs(all_slugs)
print("Done.")
sys.exit(0)
if __name__ == "__main__":
main()