publist

managing my list of publications, talks, reviews
git clone https://a3nm.net/git/publist/
Log | Files | Refs | README | LICENSE

commit 2358676c716d7f0d2f6886336c9152d912f115c8
parent 67d7ca3c6bdddebaf6bae53083a8a9fba1b235f2
Author: Antoine Amarilli <a3nm@a3nm.net>
Date:   Sat, 25 Oct 2025 13:48:25 +0200

fix_links script

Diffstat:
fix_links | 325+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
1 file changed, 325 insertions(+), 0 deletions(-)

diff --git a/fix_links b/fix_links @@ -0,0 +1,325 @@ +#!/usr/bin/env python3 +# chatgpt + +""" +fix_links_inplace_verified.py + +In-place fixer: + - upgrades http:// -> https:// when the https version actually loads + - if a URL doesn't load (or redirects to a clearly-nonsensical location such as the site's root), + replaces it with the latest working Internet Archive (Wayback) snapshot that actually loads + (the script verifies each Wayback snapshot by requesting it) + - edits the file in-place (creates <file>.bak unless --no-backup) + - prints per-request progress (disable with --quiet) + - prints list of substitutions at the end and detailed counts + +Dependencies: requests +pip install requests +""" +from __future__ import annotations + +import argparse +import concurrent.futures +import csv +import os +import re +import threading +import time +from collections import OrderedDict +from typing import Optional, Tuple, List +from urllib.parse import quote_plus, urlparse, urlunparse + +import requests + +# ---------------- Configuration ---------------- +DEFAULT_TIMEOUT = 12 +DEFAULT_CONCURRENCY = 8 +# A modern browser UA (Chrome on Windows) to appear like a regular browser +USER_AGENT = ( + "Mozilla/5.0 (Windows NT 10.0; Win64; x64) " + "AppleWebKit/537.36 (KHTML, like Gecko) " + "Chrome/119.0.0.0 Safari/537.36" +) +URL_RE = re.compile(r"""(?P<url>https?://[^\s'"<>()\]\}]+)""", re.IGNORECASE) +CDX_LIMIT = 12 # how many CDX candidates to inspect (we'll verify each) + + +# ---------------- Utilities ---------------- +print_lock = threading.Lock() + + +def log(msg: str, quiet: bool) -> None: + if quiet: + return + with print_lock: + print(msg, flush=True) + + +def canonicalize_https(url: str) -> str: + p = urlparse(url) + if p.scheme.lower() == "http": + p = p._replace(scheme="https") + return urlunparse(p) + return url + + +def get_response_info(session: requests.Session, url: str, timeout: int, quiet: bool) -> Tuple[bool, Optional[str], Optional[int], Optional[str]]: + """ + Perform GET request (follow redirects). + Returns tuple: + (ok_bool, final_url_or_none, status_code_or_none, error_message_or_none) + + ok_bool is True when response returned a status_code < 400. + final_url is resp.url (useful to detect redirects). + """ + try: + log(f"[REQ] GET {url}", quiet) + resp = session.get(url, timeout=timeout, allow_redirects=True, headers={"User-Agent": USER_AGENT}) + final = resp.url + log(f"[RESP] {url} -> {resp.status_code} final={final}", quiet) + return (resp.status_code < 400), final, resp.status_code, None + except requests.RequestException as e: + log(f"[ERR] {url} -> {e.__class__.__name__}: {e}", quiet) + return False, None, None, str(e) + + +def is_bad_redirect(original_url: str, final_url: Optional[str]) -> bool: + """ + Heuristic to detect 'clearly nonsensical' redirects such as: + - original had a non-root path but final is the root (path == '/' or empty) + - final netloc differs drastically (optional rule -- we only treat root-truncation as definite bad) + Returns True when the redirect should be considered 'broken' and cause us to prefer a Wayback snapshot. + """ + if not final_url: + return False + if original_url == final_url: + return False + po = urlparse(original_url) + pf = urlparse(final_url) + + # If original had a meaningful path (not root/index) and final is root -> bad + orig_path = po.path or "/" + final_path = pf.path or "/" + # consider common index filenames as not 'meaningful path' + index_paths = {"/", "/index.html", "/index.htm", "/index.php"} + if (orig_path.lower() not in index_paths) and (final_path in index_paths): + # Example: http://domain/some/page -> http://domain/ + return True + + # If final netloc differs and final is root of a different domain, treat as suspect. + if pf.netloc and (pf.netloc != po.netloc) and (final_path in index_paths): + return True + + return False + + +def query_wayback_candidates(session: requests.Session, url: str, timeout: int, quiet: bool, limit: int = CDX_LIMIT) -> List[Tuple[str, str]]: + """ + Query CDX and return a list of (timestamp, original) candidates (newest first). + Will return an empty list on error. + """ + cdx_url = ( + "https://web.archive.org/cdx/search/cdx?" + "output=json" + "&fl=timestamp,original,statuscode" + "&filter=statuscode:200" + "&sort=reverse" + f"&limit={limit}" + f"&url={quote_plus(url)}" + ) + try: + log(f"[REQ] CDX {cdx_url}", quiet) + r = session.get(cdx_url, timeout=timeout, headers={"User-Agent": USER_AGENT}) + r.raise_for_status() + data = r.json() + if not isinstance(data, list) or len(data) < 2: + return [] + # skip header if present + start_idx = 1 if isinstance(data[0], list) and any("timestamp" in str(x).lower() for x in data[0]) else 0 + candidates = [] + for row in data[start_idx:]: + if len(row) >= 2: + ts = row[0] + orig = row[1] + candidates.append((ts, orig)) + log(f"[CDX] found {len(candidates)} candidates for {url}", quiet) + return candidates + except requests.RequestException as e: + log(f"[ERR] CDX {url} -> {e.__class__.__name__}: {e}", quiet) + return [] + except ValueError as e: + log(f"[ERR] CDX JSON decode {url} -> {e}", quiet) + return [] + + +def verify_wayback_snapshot(session: requests.Session, timestamp: str, original: str, timeout: int, quiet: bool) -> Optional[str]: + """ + Build a Wayback snapshot URL and GET it to verify it actually loads (status < 400). + Returns the working wayback URL (string) or None. + """ + wb_url = f"https://web.archive.org/web/{timestamp}/{original}" + ok, final, status, err = get_response_info(session, wb_url, timeout, quiet) + # Accept when request returned status < 400 and final url is at web.archive.org + if ok and final and ("web.archive.org" in final): + return wb_url + # Some snapshots might redirect to a different archived representation, + # but we require final to be on web.archive.org and status < 400. + return None + + +# ---------------- Core logic ---------------- +def process_single_url(url: str, session: requests.Session, timeout: int, quiet: bool) -> Tuple[str, Optional[str], str]: + """ + Process one URL and return (original, replacement_or_None, note) + note is one of: "upgraded" (http->https), "ok" (left unchanged), "wayback" (replaced by wayback), + "no_snapshot" (broken, no wayback found) + """ + parsed = urlparse(url) + scheme = parsed.scheme.lower() + + # 1) If http: try https + if scheme == "http": + https_url = canonicalize_https(url) + log(f"[INFO] trying https for {url} -> {https_url}", quiet) + ok, final, status, err = get_response_info(session, https_url, timeout, quiet) + if ok: + # If it loaded but redirected to something meaningless, treat as failure + if is_bad_redirect(https_url, final): + log(f"[WARN] https for {url} redirected badly -> {final}; will try Wayback", quiet) + # fallthrough to next checks (do not accept https) + else: + return (url, https_url, "upgraded") + + # 2) Try original + log(f"[INFO] checking original {url}", quiet) + ok, final, status, err = get_response_info(session, url, timeout, quiet) + if ok: + if is_bad_redirect(url, final): + log(f"[WARN] {url} redirected badly -> {final}; treating as broken and trying Wayback", quiet) + # treat as broken, go to wayback flow + else: + return (url, url, "ok") + + # 3) Broken or bad redirect -> query Wayback CDX and verify candidates + log(f"[INFO] querying Wayback for {url}", quiet) + candidates = query_wayback_candidates(session, url, timeout, quiet, limit=CDX_LIMIT) + for ts, orig in candidates: + wb = verify_wayback_snapshot(session, ts, orig, timeout, quiet) + if wb: + log(f"[WAYBACK] selected {wb} for {url}", quiet) + return (url, wb, "wayback") + # nothing found + log(f"[INFO] no suitable Wayback snapshot found for {url}", quiet) + return (url, None, "no_snapshot") + + +# ---------------- Main CLI ---------------- +def main() -> None: + import argparse + + ap = argparse.ArgumentParser(description="In-place fix: upgrade http->https when possible, replace broken links with verified Wayback snapshots.") + ap.add_argument("file", help="Path to the file to edit in-place (backup saved as <file>.bak unless --no-backup).") + ap.add_argument("--map", help="Optional CSV mapping original_url -> replacement_url for audit.") + ap.add_argument("--concurrency", type=int, default=DEFAULT_CONCURRENCY, help="Number of worker threads (default %(default)s).") + ap.add_argument("--timeout", type=int, default=DEFAULT_TIMEOUT, help="Network timeout seconds (default %(default)s).") + ap.add_argument("--quiet", action="store_true", help="Suppress per-request progress output (still prints final summary and substitutions).") + ap.add_argument("--no-backup", action="store_true", help="Do not write <file>.bak backup (use with caution).") + args = ap.parse_args() + + filepath = args.file + if not os.path.isfile(filepath): + print(f"Error: file not found: {filepath}") + raise SystemExit(1) + + with open(filepath, "r", encoding="utf-8") as f: + text = f.read() + + found = [m.group("url") for m in URL_RE.finditer(text)] + unique_urls = list(OrderedDict.fromkeys(found).keys()) + + if not unique_urls: + print("No URLs found in file. Nothing to do.") + return + + print(f"Found {len(unique_urls)} unique URLs. Processing with concurrency={args.concurrency}...") + + session = requests.Session() + session.headers.update({"User-Agent": USER_AGENT}) + + mapping = {} # orig -> (replacement or None, note) + start = time.time() + + with concurrent.futures.ThreadPoolExecutor(max_workers=args.concurrency) as ex: + futures = {ex.submit(process_single_url, url, session, args.timeout, args.quiet): url for url in unique_urls} + for fut in concurrent.futures.as_completed(futures): + orig = futures[fut] + try: + o, r, note = fut.result() + except Exception as exc: + log(f"[ERR] processing {orig} -> exception {exc}", args.quiet) + r = None + note = "no_snapshot" + mapping[orig] = (r, note) + + elapsed = time.time() - start + print(f"Processing finished in {elapsed:.1f}s.") + + # Apply replacements and collect substitution list + substitutions = [] + + def repl_func(m): + u = m.group("url") + repl, note = mapping.get(u, (None, None)) + if repl and repl != u: + substitutions.append((u, repl, note)) + return repl + return u + + new_text = URL_RE.sub(repl_func, text) + + # Backup and write + if not args.no_backup: + bak_path = filepath + ".bak" + with open(bak_path, "w", encoding="utf-8") as bf: + bf.write(text) + print(f"Backup written to: {bak_path}") + with open(filepath, "w", encoding="utf-8") as out: + out.write(new_text) + print(f"File updated in-place: {filepath}") + + # Optional mapping CSV + if args.map: + try: + with open(args.map, "w", encoding="utf-8", newline="") as csvf: + w = csv.writer(csvf) + w.writerow(["original_url", "replacement_url", "note"]) + for orig, (repl, note) in mapping.items(): + w.writerow([orig, repl or "", note]) + print(f"Mapping CSV written to: {args.map}") + except Exception as e: + print(f"Warning: failed to write mapping CSV: {e}") + + # Counts + n_total = len(unique_urls) + n_upgraded = sum(1 for o, (r, n) in mapping.items() if n == "upgraded") + n_wayback = sum(1 for o, (r, n) in mapping.items() if n == "wayback") + n_unchanged = sum(1 for o, (r, n) in mapping.items() if n == "ok") + n_no_snapshot = sum(1 for o, (r, n) in mapping.items() if n == "no_snapshot") + + print("\nSummary:") + print(f" total_urls: {n_total}") + print(f" upgraded_http_to_https: {n_upgraded}") + print(f" replaced_with_wayback: {n_wayback}") + print(f" unchanged (working as-is): {n_unchanged}") + print(f" no_snapshot (broken, left unchanged): {n_no_snapshot}") + + # Substitutions printed + if substitutions: + print("\nSubstitutions performed (original -> replacement) [note]:") + for orig, repl, note in substitutions: + print(f"- {orig} -> {repl} [{note}]") + else: + print("\nNo substitutions performed.") + +if __name__ == "__main__": + main()