feat: switch to FlareSolverr + slug-based notification tracking

- All ZIPAIR fetches now go through FlareSolverr at 192.168.10.76:8191
- Dropped sitemap/probe approach; scrape EN notification listing directly
- State file now stores JSON list of seen slugs instead of last integer ID
- Matches keywords against slug and full page content

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-05-17 16:41:15 +08:00
parent 5cde054f71
commit 7487b5f630
2 changed files with 107 additions and 196 deletions

View File

@@ -1,8 +1,8 @@
#!/usr/bin/env python3
"""
ZIPAIR Singapore Winter Sale Monitor
Checks ZIPAIR's sitemap for new notifications about Singapore ticket sales.
Sends a push notification via ntfy when detected.
Uses FlareSolverr to bypass WAF, scrapes the EN notification listing,
and fires an ntfy push when a Singapore/winter sale is detected.
State is persisted in last_seen.txt (committed back to repo by the workflow).
"""
@@ -14,170 +14,99 @@ import json
import time
import urllib.request
import urllib.error
import http.cookiejar
from datetime import datetime
# ── Config (set via environment variables / Gitea secrets) ──────────────────
NTFY_URL = os.environ.get("NTFY_URL") or "https://ntfy.isky-homelab.com/zipair"
NTFY_TOKEN = os.environ.get("NTFY_TOKEN", "") # optional, if your ntfy requires auth
STATE_FILE = os.environ.get("STATE_FILE", "last_seen.txt")
# ── Config ───────────────────────────────────────────────────────────────────
NTFY_URL = os.environ.get("NTFY_URL") or "https://ntfy.isky-homelab.com/zipair"
NTFY_TOKEN = os.environ.get("NTFY_TOKEN", "")
STATE_FILE = os.environ.get("STATE_FILE", "last_seen.txt")
FLARESOLVERR_URL = os.environ.get("FLARESOLVERR_URL", "http://192.168.10.76:8191")
ZIPAIR_SITEMAP = "https://www.zipair.net/sitemap.xml"
ZIPAIR_SITEMAP_INDEX = "https://www.zipair.net/sitemap_index.xml"
ZIPAIR_NOTIF = "https://www.zipair.net/en/notification/{id}"
ZIPAIR_NOTIF_LIST = "https://www.zipair.net/en/notification/"
ZIPAIR_NOTIF_BASE = "https://www.zipair.net"
# How many IDs above last_seen to probe when sitemap is unavailable
PROBE_AHEAD = 20
TRIGGER_KEYWORDS = ["singapore", "winter"]
# Keywords that must ALL appear (case-insensitive) in a notification page
# to trigger an alert. Tune these as needed.
TRIGGER_KEYWORDS = ["singapore", "winter"]
# ── FlareSolverr fetch ───────────────────────────────────────────────────────
# Browser-like headers to avoid 403
HEADERS = {
"User-Agent": (
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
"AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/124.0.0.0 Safari/537.36"
),
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
"Accept-Language": "en-US,en;q=0.9,ja;q=0.8",
"Accept-Encoding": "gzip, deflate, br",
"Connection": "keep-alive",
"Upgrade-Insecure-Requests": "1",
}
# ── Helpers ─────────────────────────────────────────────────────────────────
# Shared cookie jar + opener so session cookies persist across requests
_cookie_jar = http.cookiejar.CookieJar()
_opener = urllib.request.build_opener(urllib.request.HTTPCookieProcessor(_cookie_jar))
def warm_session():
"""Visit the homepage once to pick up any WAF/CDN session cookies."""
print("Warming session via homepage …")
req = urllib.request.Request("https://www.zipair.net/", headers=HEADERS)
def fs_fetch(url: str, timeout_ms: int = 60000) -> str:
"""Fetch a URL via FlareSolverr and return the HTML, or empty string on error."""
payload = json.dumps({
"cmd": "request.get",
"url": url,
"maxTimeout": timeout_ms,
}).encode()
req = urllib.request.Request(
f"{FLARESOLVERR_URL.rstrip('/')}/v1",
data=payload,
headers={"Content-Type": "application/json"},
method="POST",
)
try:
with _opener.open(req, timeout=15):
pass
print(f" Cookies acquired: {len(list(_cookie_jar))}")
with urllib.request.urlopen(req, timeout=timeout_ms // 1000 + 10) as resp:
data = json.loads(resp.read())
status = data.get("solution", {}).get("status", 0)
html = data.get("solution", {}).get("response", "")
print(f" FlareSolverr: {data.get('status')} | HTTP {status} | {len(html)} bytes")
return html
except Exception as e:
print(f" Homepage fetch failed (non-fatal): {e}", file=sys.stderr)
def fetch(url: str, timeout: int = 15) -> str:
"""Fetch a URL and return the decoded body, or empty string on error."""
req = urllib.request.Request(url, headers=HEADERS)
try:
with _opener.open(req, timeout=timeout) as resp:
raw = resp.read()
try:
import gzip
return gzip.decompress(raw).decode("utf-8", errors="replace")
except Exception:
return raw.decode("utf-8", errors="replace")
except urllib.error.HTTPError as e:
print(f" HTTP {e.code} for {url}", file=sys.stderr)
return ""
except Exception as e:
print(f" Error fetching {url}: {e}", file=sys.stderr)
print(f" FlareSolverr error for {url}: {e}", file=sys.stderr)
return ""
# ── State (slug-based) ───────────────────────────────────────────────────────
def get_notification_ids_from_sitemap() -> list[int]:
"""Try sitemap.xml then sitemap_index.xml; return sorted notification IDs."""
for sitemap_url in (ZIPAIR_SITEMAP, ZIPAIR_SITEMAP_INDEX):
print(f"Fetching {sitemap_url}")
xml = fetch(sitemap_url)
if xml:
sub_sitemaps = re.findall(r"<loc>(https?://[^<]*sitemap[^<]*)</loc>", xml)
for sub in sub_sitemaps:
if sub not in (ZIPAIR_SITEMAP, ZIPAIR_SITEMAP_INDEX):
print(f" Fetching sub-sitemap {sub}")
chunk = fetch(sub)
print(f"{len(chunk)} bytes")
xml += chunk
time.sleep(0.5)
ids = [int(m) for m in re.findall(r"/notification/(\d+)", xml)]
if ids:
ids = sorted(set(ids))
print(f" Found {len(ids)} notification IDs (max={ids[-1]})")
return ids
# Debug: show sample URLs from sitemap so we can see the real pattern
sample_urls = re.findall(r"<loc>(https?://[^<]{10,})</loc>", xml)[:5]
print(f" No notification IDs found. Sample URLs from sitemap:")
for u in sample_urls:
print(f" {u}")
# Fallback: EN notification listing page (IDs may be in HTML even if JS-rendered)
print("Trying EN notification listing page …")
html = fetch("https://www.zipair.net/en/notification/")
print(f"{len(html)} bytes")
ids = [int(m) for m in re.findall(r"/(?:en|ja|ko|th|zh-tw|zh-cn)/notification/(\d+)", html)]
if not ids:
ids = [int(m) for m in re.findall(r"/notification/(\d+)", html)]
if ids:
ids = sorted(set(ids))
print(f" Found {len(ids)} notification IDs from listing page (max={ids[-1]})")
return ids
# Debug: show a snippet around "notification" in the HTML
lower = html.lower()
pos = lower.find("notification")
if pos != -1:
print(f" Sample HTML around 'notification': {repr(html[pos:pos+300])}")
return []
def probe_for_new_ids(last_seen: int) -> list[int]:
"""When sitemap is unavailable, probe notification pages above last_seen.
If last_seen is 0 we have no anchor — skip probe to avoid spamming."""
if last_seen == 0:
print(" last_seen=0 and no sitemap — cannot probe without an anchor ID.")
return []
print(f" Probing IDs {last_seen+1} to {last_seen+PROBE_AHEAD}")
found = []
for nid in range(last_seen + 1, last_seen + PROBE_AHEAD + 1):
url = ZIPAIR_NOTIF.format(id=nid)
html = fetch(url)
time.sleep(0.5)
if html:
print(f" ID {nid} exists.")
found.append(nid)
else:
print(f" ID {nid} not found (or blocked), stopping probe.")
break
return found
def read_last_seen() -> int:
"""Read the last-seen notification ID from the state file."""
def read_seen_slugs() -> set:
try:
with open(STATE_FILE) as f:
return int(f.read().strip())
raw = f.read().strip()
# new format: JSON list of slugs
data = json.loads(raw)
if isinstance(data, list):
return set(data)
except Exception:
return 0
pass
return set()
def write_last_seen(n: int):
"""Persist the last-seen notification ID."""
def write_seen_slugs(slugs: set):
with open(STATE_FILE, "w") as f:
f.write(str(n))
print(f"State updated: last_seen = {n}")
json.dump(sorted(slugs), f)
print(f"State updated: {len(slugs)} slug(s) tracked.")
# ── Notification discovery ───────────────────────────────────────────────────
def get_notification_slugs() -> list[str]:
"""Fetch the EN notification listing and return all notification slugs."""
print(f"Fetching notification listing via FlareSolverr …")
html = fs_fetch(ZIPAIR_NOTIF_LIST)
if not html:
return []
# Match /en/notification/some-slug or /en/notification/123
slugs = re.findall(r'href="(/(?:en|ja|ko|th|zh-tw|zh-cn)/notification/([^"?#/]+))"', html)
# slugs is list of (full_path, slug) — dedupe by slug
seen = set()
result = []
for path, slug in slugs:
if slug and slug not in seen:
seen.add(slug)
result.append((slug, ZIPAIR_NOTIF_BASE + path))
print(f" Found {len(result)} notification(s) on listing page.")
return result # list of (slug, full_url)
# ── Keyword check ────────────────────────────────────────────────────────────
def matches_keywords(text: str) -> bool:
"""Return True if all TRIGGER_KEYWORDS appear in text."""
lower = text.lower()
return all(kw in lower for kw in TRIGGER_KEYWORDS)
# ── ntfy ─────────────────────────────────────────────────────────────────────
def send_ntfy(notif_id: int, snippet: str):
"""Fire a push notification via ntfy."""
def send_ntfy(slug: str, url: str):
title = "✈️ ZIPAIR SIN→TYO Tickets On Sale!"
message = (
f"A new ZIPAIR announcement about Singapore winter sales was detected "
f"(notification #{notif_id}). "
f"Check: https://www.zipair.net/en/notification/{notif_id}"
f"New ZIPAIR Singapore/winter announcement detected. "
f"Check: {url}"
)
payload = json.dumps({
"topic": NTFY_URL.rstrip("/").rsplit("/", 1)[-1],
@@ -185,12 +114,8 @@ def send_ntfy(notif_id: int, snippet: str):
"message": message,
"priority": 5,
"tags": ["airplane", "moneybag"],
"click": f"https://www.zipair.net/en/notification/{notif_id}",
"actions": [{
"action": "view",
"label": "Open ZIPAIR",
"url": f"https://www.zipair.net/en/notification/{notif_id}",
}],
"click": url,
"actions": [{"action": "view", "label": "Open ZIPAIR", "url": url}],
}).encode()
base_url = NTFY_URL.rstrip("/").rsplit("/", 1)[0]
@@ -202,7 +127,6 @@ def send_ntfy(notif_id: int, snippet: str):
)
if NTFY_TOKEN:
req.add_header("Authorization", f"Bearer {NTFY_TOKEN}")
try:
with urllib.request.urlopen(req, timeout=10) as resp:
print(f"ntfy response: {resp.status} {resp.reason}")
@@ -210,68 +134,55 @@ def send_ntfy(notif_id: int, snippet: str):
print(f"Failed to send ntfy: {e}", file=sys.stderr)
sys.exit(1)
# ── Main ────────────────────────────────────────────────────────────────────
# ── Main ─────────────────────────────────────────────────────────────────────
def main():
print(f"\n[{datetime.utcnow().isoformat()}Z] ZIPAIR monitor starting …")
print(f" Keywords : {TRIGGER_KEYWORDS}")
print(f" ntfy URL : {NTFY_URL}")
print(f" Keywords : {TRIGGER_KEYWORDS}")
print(f" ntfy URL : {NTFY_URL}")
print(f" FlareSolverr : {FLARESOLVERR_URL}")
warm_session()
time.sleep(1)
seen_slugs = read_seen_slugs()
print(f" Known slugs : {len(seen_slugs)}")
last_seen = read_last_seen()
print(f" Last seen notification ID: {last_seen}")
notifications = get_notification_slugs()
if not notifications:
print("Could not retrieve notification list; exiting.")
sys.exit(0)
ids = get_notification_ids_from_sitemap()
all_slugs = {slug for slug, _ in notifications}
new_entries = [(slug, url) for slug, url in notifications if slug not in seen_slugs]
if not ids:
# Sitemap completely blocked — probe directly
new_ids = probe_for_new_ids(last_seen)
if not new_ids:
print("No new notifications found via probe either.")
sys.exit(0)
else:
new_ids = [i for i in ids if i > last_seen]
if not new_ids:
print("No new notifications since last check. All good.")
write_last_seen(max(ids))
sys.exit(0)
if not new_entries:
print("No new notifications since last check. All good.")
write_seen_slugs(all_slugs)
sys.exit(0)
print(f" {len(new_ids)} new notification(s) to check: {new_ids}")
print(f" {len(new_entries)} new notification(s): {[s for s,_ in new_entries]}")
found_match = None
for nid in new_ids:
url = ZIPAIR_NOTIF.format(id=nid)
print(f" Fetching notification #{nid}")
text = fetch(url)
time.sleep(1)
if not text:
print(f" Could not fetch #{nid}, skipping.")
continue
if matches_keywords(text):
print(f" ✅ MATCH in notification #{nid}!")
lower = text.lower()
pos = lower.find("singapore")
snippet = text[max(0, pos - 50): pos + 200].strip()
found_match = (nid, snippet)
for slug, url in new_entries:
print(f" Checking {slug}")
# Check slug itself first (fast, no extra fetch needed)
if matches_keywords(slug):
print(f" ✅ MATCH in slug: {slug}")
found_match = (slug, url)
break
else:
print(f" No match in #{nid}.")
# Fetch full page and check content
text = fs_fetch(url)
time.sleep(1)
if matches_keywords(text):
print(f" ✅ MATCH in page content: {slug}")
found_match = (slug, url)
break
print(f" No match.")
# Advance state to highest ID we've confirmed exists
if ids:
write_last_seen(max(ids))
elif new_ids:
write_last_seen(max(new_ids))
write_seen_slugs(all_slugs)
if found_match:
nid, snippet = found_match
print(f"\n🚨 Sending ntfy push for notification #{nid}")
send_ntfy(nid, snippet)
slug, url = found_match
print(f"\n🚨 Sending ntfy push for {slug}")
send_ntfy(slug, url)
print("Done — notification sent!")
else:
print("\nNo Singapore winter sale announcement found yet.")

View File

@@ -1 +1 @@
0
[]