#!/usr/bin/env python3 """Upload script for the Nostradamus session-based upload/import API. The local tool only: - scans existing .torrent files and/or local data folders - creates a new import session on Nostradamus - uploads raw import candidates to that session - prints the web URL where the uploader should review the items """ from __future__ import annotations import argparse import base64 import hashlib import json import secrets import sys import time import urllib.error import urllib.parse import urllib.request from dataclasses import dataclass from pathlib import Path from typing import Any, Dict, Iterable, List, Optional, Sequence, Tuple TEXT_EXTS = {".nfo", ".txt"} DEFAULT_TIMEOUT = 30 DEFAULT_STATE_FILE = ".nostradamus-import-state.json" @dataclass(frozen=True) class RawBencoded: value: bytes @dataclass class ParsedTorrent: name: str info_hash: str size: int files: Dict[str, int] data: Dict[str, Any] binary: bytes info_raw: bytes class Bencode: @staticmethod def decode_torrent(data: bytes) -> Tuple[Dict[str, Any], bytes]: if not data.startswith(b"d"): raise ValueError("torrent payload must be a bencoded dictionary") index = 1 result: Dict[str, Any] = {} info_raw: Optional[bytes] = None while data[index:index + 1] != b"e": key_raw, index = Bencode._decode_bytes_at(data, index) value_start = index value, index = Bencode._decode_at(data, index) key = key_raw.decode("utf-8", errors="replace") if key_raw == b"info": info_raw = data[value_start:index] result[key] = value if info_raw is None: raise ValueError("torrent missing info dictionary") return result, info_raw @staticmethod def _decode_bytes_at(data: bytes, index: int) -> Tuple[bytes, int]: token = data[index:index + 1] if not token.isdigit(): raise ValueError(f"invalid byte string token at {index}: {token!r}") colon = data.index(b":", index) length = int(data[index:colon]) start = colon + 1 end = start + length return data[start:end], end @staticmethod def _decode_at(data: bytes, index: int) -> Tuple[Any, int]: token = data[index:index + 1] if token == b"i": end = data.index(b"e", index) return int(data[index + 1:end]), end + 1 if token == b"l": index += 1 items = [] while data[index:index + 1] != b"e": item, index = Bencode._decode_at(data, index) items.append(item) return items, index + 1 if token == b"d": index += 1 result = {} while data[index:index + 1] != b"e": key, index = Bencode._decode_at(data, index) value, index = Bencode._decode_at(data, index) if isinstance(key, bytes): key = key.decode("utf-8", errors="replace") result[key] = value return result, index + 1 if token.isdigit(): colon = data.index(b":", index) length = int(data[index:colon]) start = colon + 1 end = start + length raw = data[start:end] try: return raw.decode("utf-8"), end except UnicodeDecodeError: return raw, end raise ValueError(f"invalid bencode token at {index}: {token!r}") @staticmethod def encode(value: Any) -> bytes: if isinstance(value, RawBencoded): return value.value if isinstance(value, bool): value = int(value) if isinstance(value, int): return f"i{value}e".encode("ascii") if isinstance(value, bytes): return str(len(value)).encode("ascii") + b":" + value if isinstance(value, str): raw = value.encode("utf-8") return str(len(raw)).encode("ascii") + b":" + raw if isinstance(value, list): return b"l" + b"".join(Bencode.encode(item) for item in value) + b"e" if isinstance(value, dict): items = [] for key in sorted(value.keys(), key=lambda item: item if isinstance(item, bytes) else str(item).encode("utf-8")): encoded_key = key if isinstance(key, bytes) else str(key) items.append(Bencode.encode(encoded_key)) items.append(Bencode.encode(value[key])) return b"d" + b"".join(items) + b"e" raise TypeError(f"unsupported bencode type: {type(value)!r}") class ImportUploader: def __init__(self, args: argparse.Namespace): self.args = args self.base_url = validate_base_url(args.base_url, args.allow_insecure_http) self.importer_token = args.importer_token self.timeout = args.timeout self.state_path = Path(args.state_file) self.state: Dict[str, Any] = {} def run(self) -> int: candidates = self.collect_items() if not candidates: fatal("No import candidates found. Provide --torrent-root, --torrent, --data-root, or --path.") mode = self.infer_mode(candidates) client_job_id = self.args.client_job_id or default_client_job_id() info(f"Creating import session ({mode})") session = self.create_session(client_job_id, mode) session_id = session["id"] session_url = f"{self.base_url}/imports/{session_id}" self.state = { "client_job_id": client_job_id, "session_id": session_id, "mode": mode, "items": [], } self.write_state() print_scan_summary(candidates, mode) uploaded = [] failures = 0 for index, candidate in enumerate(candidates, start=1): label = candidate["original_name"] info(f"[{index}/{len(candidates)}] Uploading {label}") try: response = self.create_item(session_id, candidate) item = response["item"] uploaded.append({ "client_item_id": candidate["client_item_id"], "item_id": item["id"], "info_hash": candidate.get("info_hash"), "status": item["status"], "name": candidate["original_name"], "source_kind": candidate["source_kind"], }) except RuntimeError as error: failures += 1 warn(f"{label}: {error}") uploaded.append({ "client_item_id": candidate["client_item_id"], "info_hash": candidate.get("info_hash"), "status": "failed", "name": candidate["original_name"], "source_kind": candidate["source_kind"], "error": str(error), }) self.state["items"] = uploaded self.write_state() print("") print("Import session created.") print(f"Session ID : {session_id}") print(f"Review URL : {session_url}") print(f"Uploaded : {len(candidates) - failures}/{len(candidates)} item(s)") if failures: print(f"Failed : {failures}") print(f"State file : {self.state_path}") print("") print("Next steps:") print("1. Open the review URL in Nostradamus") print("2. Review the imported items on the website") print("3. Finalize the items you want to send into pending moderation") print("4. After approval, download the prepared .torrent from the site") return 0 if failures == 0 else 1 def create_session(self, client_job_id: str, mode: str) -> dict: response = self.request("POST", "/api/import/sessions", { "client_job_id": client_job_id, "mode": mode, }) return response["data"] def create_item(self, session_id: str, payload: dict) -> dict: response = self.request("POST", f"/api/import/sessions/{session_id}/items", payload) return response["data"] def request(self, method: str, path: str, payload: Optional[dict] = None) -> dict: url = f"{self.base_url}{path}" body = None headers = {"x-importer-token": self.importer_token, "accept": "application/json"} if payload is not None: body = json.dumps(payload).encode("utf-8") headers["content-type"] = "application/json" request = urllib.request.Request(url, data=body, method=method, headers=headers) try: with urllib.request.urlopen(request, timeout=self.timeout) as response: raw = response.read().decode("utf-8") return json.loads(raw) if raw else {} except urllib.error.HTTPError as error: body_text = error.read().decode("utf-8", errors="replace") try: parsed = json.loads(body_text) message = parsed.get("error", body_text) except json.JSONDecodeError: message = body_text raise RuntimeError(f"{method} {path} failed with {error.code}: {message}") from error except urllib.error.URLError as error: raise RuntimeError(f"{method} {path} failed: {error}") from error def collect_items(self) -> List[dict]: torrent_paths = self.expand_torrent_paths() data_paths = self.expand_data_paths() data_matcher = self.build_data_path_matcher(data_paths) matched_data_paths = set() items: List[dict] = [] for torrent_path in torrent_paths: parsed = self.parse_torrent_file(torrent_path) if parsed is None: continue matched_data_path = self.match_data_path(parsed.name, data_matcher) if matched_data_path is not None: matched_data_paths.add(matched_data_path) item = self.build_cross_seed_item(torrent_path, parsed, matched_data_path) if item: items.append(item) for data_path in data_paths: if data_path in matched_data_paths: continue item = self.build_data_only_item(data_path) if item: items.append(item) deduped: Dict[str, dict] = {} unnamed: List[dict] = [] for item in items: info_hash = item.get("info_hash") if info_hash: deduped[info_hash] = item else: unnamed.append(item) return list(deduped.values()) + unnamed def expand_torrent_paths(self) -> List[Path]: paths: List[Path] = [] for raw in self.args.torrent or []: path = Path(raw).expanduser() if path.is_file(): paths.append(path) for raw in self.args.torrent_root or []: root = Path(raw).expanduser() if root.is_dir(): paths.extend(sorted(root.rglob("*.torrent"))) return sorted(set(paths)) def expand_data_paths(self) -> List[Path]: paths: List[Path] = [] for raw in self.args.path or []: path = Path(raw).expanduser() if path.exists(): paths.append(path) for raw in self.args.data_root or []: root = Path(raw).expanduser() if root.is_dir(): for child in sorted(root.iterdir()): if child.name.startswith("."): continue paths.append(child) return [path for path in sorted(set(paths)) if path.suffix.lower() != ".torrent"] def infer_mode(self, items: Sequence[dict]) -> str: kinds = {item["source_kind"] for item in items} if kinds == {"cross_seed"}: return "cross_seed" if kinds == {"data_only"}: return "data_only" return "mixed" def build_data_path_matcher(self, paths: Sequence[Path]) -> Dict[str, List[Path]]: matcher: Dict[str, List[Path]] = {} for path in paths: for key in data_match_keys(path.name): matcher.setdefault(key, []).append(path) return matcher def match_data_path(self, torrent_name: str, matcher: Dict[str, List[Path]]) -> Optional[Path]: for key in data_match_keys(torrent_name): candidates = matcher.get(key) or [] while candidates: candidate = candidates.pop(0) if candidate.exists(): return candidate return None def build_cross_seed_item( self, torrent_path: Path, parsed: ParsedTorrent, matched_data_path: Optional[Path], ) -> dict: nfo_content = load_adjacent_nfo([matched_data_path, torrent_path]) local_path_hint = str(matched_data_path or torrent_path.parent) return { "client_item_id": stable_client_item_id(parsed.info_hash), "source_kind": "cross_seed", "original_name": parsed.name, "local_path_hint": local_path_hint, "info_hash": parsed.info_hash, "size": parsed.size, "raw_torrent": base64.b64encode(parsed.binary).decode("ascii"), "raw_nfo": nfo_content, "file_list": [{"path": path, "size": size} for path, size in parsed.files.items()], "detected_facts": { "source_path": str(torrent_path), "matched_data_path": str(matched_data_path) if matched_data_path else None, "has_nfo": bool(nfo_content), "file_count": len(parsed.files), }, } def build_data_only_item(self, data_path: Path) -> Optional[dict]: info(f"Hashing data for {data_path.name}") torrent_binary, parsed = self.create_torrent_from_path(data_path) nfo_content = load_adjacent_nfo([data_path]) return { "client_item_id": stable_client_item_id(parsed.info_hash), "source_kind": "data_only", "original_name": parsed.name, "local_path_hint": str(data_path), "info_hash": parsed.info_hash, "size": parsed.size, "raw_torrent": base64.b64encode(torrent_binary).decode("ascii"), "raw_nfo": nfo_content, "file_list": [{"path": path, "size": size} for path, size in parsed.files.items()], "detected_facts": { "source_path": str(data_path), "has_nfo": bool(nfo_content), "file_count": len(parsed.files), }, } def parse_torrent_file(self, path: Path) -> Optional[ParsedTorrent]: try: binary = path.read_bytes() data, info_raw = Bencode.decode_torrent(binary) info_dict = data["info"] info_hash = hashlib.sha1(info_raw).hexdigest() files = torrent_files(info_dict) size = sum(files.values()) return ParsedTorrent( name=str(info_dict["name"]), info_hash=info_hash, size=size, files=files, data=data, binary=binary, info_raw=info_raw, ) except Exception as error: warn(f"Failed to parse {path}: {error}") return None def create_torrent_from_path(self, path: Path) -> Tuple[bytes, ParsedTorrent]: info_dict = build_info_dict(path) torrent_data = { "created by": "Nostradamus", "creation date": int(time.time()), "comment": "Proudly generated for Nostradamus", "info": info_dict, } torrent_binary = Bencode.encode(torrent_data) parsed = ParsedTorrent( name=str(info_dict["name"]), info_hash=hashlib.sha1(Bencode.encode(info_dict)).hexdigest(), size=sum(torrent_files(info_dict).values()), files=torrent_files(info_dict), data=torrent_data, binary=torrent_binary, info_raw=Bencode.encode(info_dict), ) return torrent_binary, parsed def write_state(self) -> None: try: self.state_path.write_text(json.dumps(self.state, indent=2, sort_keys=True), encoding="utf-8") except OSError as error: warn(f"Could not write state file {self.state_path}: {error}") def validate_base_url(url: str, allow_insecure_http: bool) -> str: parsed = urllib.parse.urlparse(url) if parsed.scheme not in {"http", "https"} or not parsed.netloc: fatal("--base-url must be a full URL like https://nostradamus.foo") if parsed.scheme == "http" and not allow_insecure_http and parsed.hostname not in {"127.0.0.1", "localhost"}: fatal("Refusing insecure HTTP for a non-local host. Use HTTPS or pass --allow-insecure-http.") return parsed.geturl().rstrip("/") def load_adjacent_nfo(candidates: Sequence[Optional[Path]]) -> Optional[str]: visited = set() for candidate in candidates: if candidate is None: continue candidate = candidate.expanduser() search_roots = [] if candidate.is_dir(): search_roots.append(candidate) elif candidate.exists(): search_roots.append(candidate.parent) for root in search_roots: if root in visited: continue visited.add(root) for entry in sorted(root.iterdir()): if entry.suffix.lower() in TEXT_EXTS and entry.suffix.lower() == ".nfo": try: return entry.read_text(encoding="utf-8", errors="replace")[:500_000] except OSError: continue return None def torrent_files(info_dict: dict) -> Dict[str, int]: if "files" in info_dict: return { "/".join(str(part) for part in file_entry["path"]): int(file_entry["length"]) for file_entry in info_dict["files"] } return {str(info_dict["name"]): int(info_dict["length"])} def build_info_dict(path: Path) -> dict: piece_length = choose_piece_length(path) pieces = compute_pieces(path, piece_length) info_dict: Dict[str, Any] = { "name": path.name, "piece length": piece_length, "pieces": pieces, "private": 1, "source": "Nostradamus", } if path.is_file(): info_dict["length"] = path.stat().st_size else: files = [] for file_path in sorted(p for p in path.rglob("*") if p.is_file()): files.append({ "length": file_path.stat().st_size, "path": list(file_path.relative_to(path).parts), }) info_dict["files"] = files return info_dict def choose_piece_length(path: Path) -> int: total_size = total_path_size(path) if total_size < 1 * 1024**3: return 256 * 1024 if total_size < 4 * 1024**3: return 512 * 1024 if total_size < 16 * 1024**3: return 1024 * 1024 return 2 * 1024 * 1024 def compute_pieces(path: Path, piece_length: int) -> bytes: digest = [] buffer = bytearray() for chunk in iter_path_chunks(path): buffer.extend(chunk) while len(buffer) >= piece_length: piece = bytes(buffer[:piece_length]) digest.append(hashlib.sha1(piece).digest()) del buffer[:piece_length] if buffer: digest.append(hashlib.sha1(bytes(buffer)).digest()) return b"".join(digest) def iter_path_chunks(path: Path, chunk_size: int = 1024 * 1024) -> Iterable[bytes]: files = [path] if path.is_file() else sorted(p for p in path.rglob("*") if p.is_file()) for file_path in files: with file_path.open("rb") as handle: while True: chunk = handle.read(chunk_size) if not chunk: break yield chunk def total_path_size(path: Path) -> int: if path.is_file(): return path.stat().st_size return sum(file_path.stat().st_size for file_path in path.rglob("*") if file_path.is_file()) def data_match_keys(name: str) -> List[str]: path = Path(name) raw = [name, path.stem] normalized = [normalize_key(value) for value in raw] return [key for key in dict.fromkeys(raw + normalized) if key] def normalize_key(value: str) -> str: import re return re.sub(r"[^a-z0-9]+", "", value.lower()) def stable_client_item_id(info_hash: str) -> str: return f"item-{info_hash[:16]}" def default_client_job_id() -> str: return f"import-{int(time.time())}-{secrets.token_hex(4)}" def print_scan_summary(candidates: Sequence[dict], mode: str) -> None: counts = {"cross_seed": 0, "data_only": 0} for candidate in candidates: counts[candidate["source_kind"]] += 1 print("") print("Local scan complete.") print(f"Mode : {mode}") print(f"Candidates : {len(candidates)}") print(f"Cross-seed : {counts['cross_seed']}") print(f"Data-only : {counts['data_only']}") print("") def info(message: str) -> None: print(f"==> {message}") def warn(message: str) -> None: print(f"[warn] {message}", file=sys.stderr) def fatal(message: str) -> None: print(f"[error] {message}", file=sys.stderr) raise SystemExit(1) def build_parser() -> argparse.ArgumentParser: parser = argparse.ArgumentParser( description="Official client for the Nostradamus upload API" ) parser.add_argument("--base-url", required=True, help="Nostradamus base URL, for example https://nostradamus.foo") parser.add_argument("--importer-token", required=True, help="Dedicated importer token from Nostradamus settings") parser.add_argument("--torrent-root", action="append", help="Directory containing existing .torrent files") parser.add_argument("--torrent", action="append", help="Single .torrent file to import") parser.add_argument( "--data-root", action="append", help="Directory whose direct children should be imported as separate data-only items", ) parser.add_argument( "--path", action="append", help="Single file or directory to import as exactly one data-only item", ) parser.add_argument("--client-job-id", help="Optional stable client job id for the session") parser.add_argument("--state-file", default=DEFAULT_STATE_FILE, help=f"Local state file path (default: {DEFAULT_STATE_FILE})") parser.add_argument("--timeout", type=int, default=DEFAULT_TIMEOUT, help=f"HTTP timeout in seconds (default: {DEFAULT_TIMEOUT})") parser.add_argument("--allow-insecure-http", action="store_true", help="Allow plain HTTP for non-local hosts") return parser def main(argv: Optional[Sequence[str]] = None) -> int: parser = build_parser() args = parser.parse_args(argv) uploader = ImportUploader(args) return uploader.run() if __name__ == "__main__": raise SystemExit(main())