fix: use cookie jar session to bypass WAF, fix probe logic for 403 vs 404

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-05-17 16:17:37 +08:00
parent e9d7fdf7cc
commit 5df8c9f1ac

View File

@@ -14,6 +14,7 @@ import json
import time import time
import urllib.request import urllib.request
import urllib.error import urllib.error
import http.cookiejar
from datetime import datetime from datetime import datetime
# ── Config (set via environment variables / Gitea secrets) ────────────────── # ── Config (set via environment variables / Gitea secrets) ──────────────────
@@ -48,11 +49,26 @@ HEADERS = {
# ── Helpers ───────────────────────────────────────────────────────────────── # ── Helpers ─────────────────────────────────────────────────────────────────
# Shared cookie jar + opener so session cookies persist across requests
_cookie_jar = http.cookiejar.CookieJar()
_opener = urllib.request.build_opener(urllib.request.HTTPCookieProcessor(_cookie_jar))
def warm_session():
"""Visit the homepage once to pick up any WAF/CDN session cookies."""
print("Warming session via homepage …")
req = urllib.request.Request("https://www.zipair.net/", headers=HEADERS)
try:
with _opener.open(req, timeout=15):
pass
print(f" Cookies acquired: {len(list(_cookie_jar))}")
except Exception as e:
print(f" Homepage fetch failed (non-fatal): {e}", file=sys.stderr)
def fetch(url: str, timeout: int = 15) -> str: def fetch(url: str, timeout: int = 15) -> str:
"""Fetch a URL and return the decoded body, or empty string on error.""" """Fetch a URL and return the decoded body, or empty string on error."""
req = urllib.request.Request(url, headers=HEADERS) req = urllib.request.Request(url, headers=HEADERS)
try: try:
with urllib.request.urlopen(req, timeout=timeout) as resp: with _opener.open(req, timeout=timeout) as resp:
raw = resp.read() raw = resp.read()
try: try:
import gzip import gzip
@@ -90,18 +106,22 @@ def get_notification_ids_from_sitemap() -> list[int]:
def probe_for_new_ids(last_seen: int) -> list[int]: def probe_for_new_ids(last_seen: int) -> list[int]:
"""When sitemap is unavailable, probe notification pages above last_seen.""" """When sitemap is unavailable, probe notification pages above last_seen.
print(f" Sitemap unavailable — probing IDs {last_seen+1} to {last_seen+PROBE_AHEAD}") If last_seen is 0 we have no anchor — skip probe to avoid spamming."""
if last_seen == 0:
print(" last_seen=0 and no sitemap — cannot probe without an anchor ID.")
return []
print(f" Probing IDs {last_seen+1} to {last_seen+PROBE_AHEAD}")
found = [] found = []
for nid in range(last_seen + 1, last_seen + PROBE_AHEAD + 1): for nid in range(last_seen + 1, last_seen + PROBE_AHEAD + 1):
url = ZIPAIR_NOTIF.format(id=nid) url = ZIPAIR_NOTIF.format(id=nid)
html = fetch(url) html = fetch(url)
time.sleep(0.5) time.sleep(0.5)
if html and f"/notification/{nid}" in html: if html:
print(f" ID {nid} exists.") print(f" ID {nid} exists.")
found.append(nid) found.append(nid)
else: else:
print(f" ID {nid} not found, stopping probe.") print(f" ID {nid} not found (or blocked), stopping probe.")
break break
return found return found
@@ -175,6 +195,9 @@ def main():
print(f" Keywords : {TRIGGER_KEYWORDS}") print(f" Keywords : {TRIGGER_KEYWORDS}")
print(f" ntfy URL : {NTFY_URL}") print(f" ntfy URL : {NTFY_URL}")
warm_session()
time.sleep(1)
last_seen = read_last_seen() last_seen = read_last_seen()
print(f" Last seen notification ID: {last_seen}") print(f" Last seen notification ID: {last_seen}")