| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658 |
- #!/usr/bin/env python3
- """Upload script for the Nostradamus session-based upload/import API.
- The local tool only:
- - scans existing .torrent files and/or local data folders
- - creates a new import session on Nostradamus
- - uploads raw import candidates to that session
- - prints the web URL where the uploader should review the items
- """
- from __future__ import annotations
- import argparse
- import base64
- import hashlib
- import json
- import secrets
- import sys
- import time
- import urllib.error
- import urllib.parse
- import urllib.request
- from dataclasses import dataclass
- from pathlib import Path
- from typing import Any, Dict, Iterable, List, Optional, Sequence, Tuple
- TEXT_EXTS = {".nfo", ".txt"}
- DEFAULT_TIMEOUT = 30
- DEFAULT_STATE_FILE = ".nostradamus-import-state.json"
- @dataclass(frozen=True)
- class RawBencoded:
- value: bytes
- @dataclass
- class ParsedTorrent:
- name: str
- info_hash: str
- size: int
- files: Dict[str, int]
- data: Dict[str, Any]
- binary: bytes
- info_raw: bytes
- class Bencode:
- @staticmethod
- def decode_torrent(data: bytes) -> Tuple[Dict[str, Any], bytes]:
- if not data.startswith(b"d"):
- raise ValueError("torrent payload must be a bencoded dictionary")
- index = 1
- result: Dict[str, Any] = {}
- info_raw: Optional[bytes] = None
- while data[index:index + 1] != b"e":
- key_raw, index = Bencode._decode_bytes_at(data, index)
- value_start = index
- value, index = Bencode._decode_at(data, index)
- key = key_raw.decode("utf-8", errors="replace")
- if key_raw == b"info":
- info_raw = data[value_start:index]
- result[key] = value
- if info_raw is None:
- raise ValueError("torrent missing info dictionary")
- return result, info_raw
- @staticmethod
- def _decode_bytes_at(data: bytes, index: int) -> Tuple[bytes, int]:
- token = data[index:index + 1]
- if not token.isdigit():
- raise ValueError(f"invalid byte string token at {index}: {token!r}")
- colon = data.index(b":", index)
- length = int(data[index:colon])
- start = colon + 1
- end = start + length
- return data[start:end], end
- @staticmethod
- def _decode_at(data: bytes, index: int) -> Tuple[Any, int]:
- token = data[index:index + 1]
- if token == b"i":
- end = data.index(b"e", index)
- return int(data[index + 1:end]), end + 1
- if token == b"l":
- index += 1
- items = []
- while data[index:index + 1] != b"e":
- item, index = Bencode._decode_at(data, index)
- items.append(item)
- return items, index + 1
- if token == b"d":
- index += 1
- result = {}
- while data[index:index + 1] != b"e":
- key, index = Bencode._decode_at(data, index)
- value, index = Bencode._decode_at(data, index)
- if isinstance(key, bytes):
- key = key.decode("utf-8", errors="replace")
- result[key] = value
- return result, index + 1
- if token.isdigit():
- colon = data.index(b":", index)
- length = int(data[index:colon])
- start = colon + 1
- end = start + length
- raw = data[start:end]
- try:
- return raw.decode("utf-8"), end
- except UnicodeDecodeError:
- return raw, end
- raise ValueError(f"invalid bencode token at {index}: {token!r}")
- @staticmethod
- def encode(value: Any) -> bytes:
- if isinstance(value, RawBencoded):
- return value.value
- if isinstance(value, bool):
- value = int(value)
- if isinstance(value, int):
- return f"i{value}e".encode("ascii")
- if isinstance(value, bytes):
- return str(len(value)).encode("ascii") + b":" + value
- if isinstance(value, str):
- raw = value.encode("utf-8")
- return str(len(raw)).encode("ascii") + b":" + raw
- if isinstance(value, list):
- return b"l" + b"".join(Bencode.encode(item) for item in value) + b"e"
- if isinstance(value, dict):
- items = []
- for key in sorted(value.keys(), key=lambda item: item if isinstance(item, bytes) else str(item).encode("utf-8")):
- encoded_key = key if isinstance(key, bytes) else str(key)
- items.append(Bencode.encode(encoded_key))
- items.append(Bencode.encode(value[key]))
- return b"d" + b"".join(items) + b"e"
- raise TypeError(f"unsupported bencode type: {type(value)!r}")
- class ImportUploader:
- def __init__(self, args: argparse.Namespace):
- self.args = args
- self.base_url = validate_base_url(args.base_url, args.allow_insecure_http)
- self.importer_token = args.importer_token
- self.timeout = args.timeout
- self.state_path = Path(args.state_file)
- self.state: Dict[str, Any] = {}
- def run(self) -> int:
- candidates = self.collect_items()
- if not candidates:
- fatal("No import candidates found. Provide --torrent-root, --torrent, --data-root, or --path.")
- mode = self.infer_mode(candidates)
- client_job_id = self.args.client_job_id or default_client_job_id()
- info(f"Creating import session ({mode})")
- session = self.create_session(client_job_id, mode)
- session_id = session["id"]
- session_url = f"{self.base_url}/imports/{session_id}"
- self.state = {
- "client_job_id": client_job_id,
- "session_id": session_id,
- "mode": mode,
- "items": [],
- }
- self.write_state()
- print_scan_summary(candidates, mode)
- uploaded = []
- failures = 0
- for index, candidate in enumerate(candidates, start=1):
- label = candidate["original_name"]
- info(f"[{index}/{len(candidates)}] Uploading {label}")
- try:
- response = self.create_item(session_id, candidate)
- item = response["item"]
- uploaded.append({
- "client_item_id": candidate["client_item_id"],
- "item_id": item["id"],
- "info_hash": candidate.get("info_hash"),
- "status": item["status"],
- "name": candidate["original_name"],
- "source_kind": candidate["source_kind"],
- })
- except RuntimeError as error:
- failures += 1
- warn(f"{label}: {error}")
- uploaded.append({
- "client_item_id": candidate["client_item_id"],
- "info_hash": candidate.get("info_hash"),
- "status": "failed",
- "name": candidate["original_name"],
- "source_kind": candidate["source_kind"],
- "error": str(error),
- })
- self.state["items"] = uploaded
- self.write_state()
- print("")
- print("Import session created.")
- print(f"Session ID : {session_id}")
- print(f"Review URL : {session_url}")
- print(f"Uploaded : {len(candidates) - failures}/{len(candidates)} item(s)")
- if failures:
- print(f"Failed : {failures}")
- print(f"State file : {self.state_path}")
- print("")
- print("Next steps:")
- print("1. Open the review URL in Nostradamus")
- print("2. Review the imported items on the website")
- print("3. Finalize the items you want to send into pending moderation")
- print("4. After approval, download the prepared .torrent from the site")
- return 0 if failures == 0 else 1
- def create_session(self, client_job_id: str, mode: str) -> dict:
- response = self.request("POST", "/api/import/sessions", {
- "client_job_id": client_job_id,
- "mode": mode,
- })
- return response["data"]
- def create_item(self, session_id: str, payload: dict) -> dict:
- response = self.request("POST", f"/api/import/sessions/{session_id}/items", payload)
- return response["data"]
- def request(self, method: str, path: str, payload: Optional[dict] = None) -> dict:
- url = f"{self.base_url}{path}"
- body = None
- headers = {"x-importer-token": self.importer_token, "accept": "application/json"}
- if payload is not None:
- body = json.dumps(payload).encode("utf-8")
- headers["content-type"] = "application/json"
- request = urllib.request.Request(url, data=body, method=method, headers=headers)
- try:
- with urllib.request.urlopen(request, timeout=self.timeout) as response:
- raw = response.read().decode("utf-8")
- return json.loads(raw) if raw else {}
- except urllib.error.HTTPError as error:
- body_text = error.read().decode("utf-8", errors="replace")
- try:
- parsed = json.loads(body_text)
- message = parsed.get("error", body_text)
- except json.JSONDecodeError:
- message = body_text
- raise RuntimeError(f"{method} {path} failed with {error.code}: {message}") from error
- except urllib.error.URLError as error:
- raise RuntimeError(f"{method} {path} failed: {error}") from error
- def collect_items(self) -> List[dict]:
- torrent_paths = self.expand_torrent_paths()
- data_paths = self.expand_data_paths()
- data_matcher = self.build_data_path_matcher(data_paths)
- matched_data_paths = set()
- items: List[dict] = []
- for torrent_path in torrent_paths:
- parsed = self.parse_torrent_file(torrent_path)
- if parsed is None:
- continue
- matched_data_path = self.match_data_path(parsed.name, data_matcher)
- if matched_data_path is not None:
- matched_data_paths.add(matched_data_path)
- item = self.build_cross_seed_item(torrent_path, parsed, matched_data_path)
- if item:
- items.append(item)
- for data_path in data_paths:
- if data_path in matched_data_paths:
- continue
- item = self.build_data_only_item(data_path)
- if item:
- items.append(item)
- deduped: Dict[str, dict] = {}
- unnamed: List[dict] = []
- for item in items:
- info_hash = item.get("info_hash")
- if info_hash:
- deduped[info_hash] = item
- else:
- unnamed.append(item)
- return list(deduped.values()) + unnamed
- def expand_torrent_paths(self) -> List[Path]:
- paths: List[Path] = []
- for raw in self.args.torrent or []:
- path = Path(raw).expanduser()
- if path.is_file():
- paths.append(path)
- for raw in self.args.torrent_root or []:
- root = Path(raw).expanduser()
- if root.is_dir():
- paths.extend(sorted(root.rglob("*.torrent")))
- return sorted(set(paths))
- def expand_data_paths(self) -> List[Path]:
- paths: List[Path] = []
- for raw in self.args.path or []:
- path = Path(raw).expanduser()
- if path.exists():
- paths.append(path)
- for raw in self.args.data_root or []:
- root = Path(raw).expanduser()
- if root.is_dir():
- for child in sorted(root.iterdir()):
- if child.name.startswith("."):
- continue
- paths.append(child)
- return [path for path in sorted(set(paths)) if path.suffix.lower() != ".torrent"]
- def infer_mode(self, items: Sequence[dict]) -> str:
- kinds = {item["source_kind"] for item in items}
- if kinds == {"cross_seed"}:
- return "cross_seed"
- if kinds == {"data_only"}:
- return "data_only"
- return "mixed"
- def build_data_path_matcher(self, paths: Sequence[Path]) -> Dict[str, List[Path]]:
- matcher: Dict[str, List[Path]] = {}
- for path in paths:
- for key in data_match_keys(path.name):
- matcher.setdefault(key, []).append(path)
- return matcher
- def match_data_path(self, torrent_name: str, matcher: Dict[str, List[Path]]) -> Optional[Path]:
- for key in data_match_keys(torrent_name):
- candidates = matcher.get(key) or []
- while candidates:
- candidate = candidates.pop(0)
- if candidate.exists():
- return candidate
- return None
- def build_cross_seed_item(
- self,
- torrent_path: Path,
- parsed: ParsedTorrent,
- matched_data_path: Optional[Path],
- ) -> dict:
- nfo_content = load_adjacent_nfo([matched_data_path, torrent_path])
- local_path_hint = str(matched_data_path or torrent_path.parent)
- return {
- "client_item_id": stable_client_item_id(parsed.info_hash),
- "source_kind": "cross_seed",
- "original_name": parsed.name,
- "local_path_hint": local_path_hint,
- "info_hash": parsed.info_hash,
- "size": parsed.size,
- "raw_torrent": base64.b64encode(parsed.binary).decode("ascii"),
- "raw_nfo": nfo_content,
- "file_list": [{"path": path, "size": size} for path, size in parsed.files.items()],
- "detected_facts": {
- "source_path": str(torrent_path),
- "matched_data_path": str(matched_data_path) if matched_data_path else None,
- "has_nfo": bool(nfo_content),
- "file_count": len(parsed.files),
- },
- }
- def build_data_only_item(self, data_path: Path) -> Optional[dict]:
- info(f"Hashing data for {data_path.name}")
- torrent_binary, parsed = self.create_torrent_from_path(data_path)
- nfo_content = load_adjacent_nfo([data_path])
- return {
- "client_item_id": stable_client_item_id(parsed.info_hash),
- "source_kind": "data_only",
- "original_name": parsed.name,
- "local_path_hint": str(data_path),
- "info_hash": parsed.info_hash,
- "size": parsed.size,
- "raw_torrent": base64.b64encode(torrent_binary).decode("ascii"),
- "raw_nfo": nfo_content,
- "file_list": [{"path": path, "size": size} for path, size in parsed.files.items()],
- "detected_facts": {
- "source_path": str(data_path),
- "has_nfo": bool(nfo_content),
- "file_count": len(parsed.files),
- },
- }
- def parse_torrent_file(self, path: Path) -> Optional[ParsedTorrent]:
- try:
- binary = path.read_bytes()
- data, info_raw = Bencode.decode_torrent(binary)
- info_dict = data["info"]
- info_hash = hashlib.sha1(info_raw).hexdigest()
- files = torrent_files(info_dict)
- size = sum(files.values())
- return ParsedTorrent(
- name=str(info_dict["name"]),
- info_hash=info_hash,
- size=size,
- files=files,
- data=data,
- binary=binary,
- info_raw=info_raw,
- )
- except Exception as error:
- warn(f"Failed to parse {path}: {error}")
- return None
- def create_torrent_from_path(self, path: Path) -> Tuple[bytes, ParsedTorrent]:
- info_dict = build_info_dict(path)
- torrent_data = {
- "created by": "Nostradamus",
- "creation date": int(time.time()),
- "comment": "Proudly generated for Nostradamus",
- "info": info_dict,
- }
- torrent_binary = Bencode.encode(torrent_data)
- parsed = ParsedTorrent(
- name=str(info_dict["name"]),
- info_hash=hashlib.sha1(Bencode.encode(info_dict)).hexdigest(),
- size=sum(torrent_files(info_dict).values()),
- files=torrent_files(info_dict),
- data=torrent_data,
- binary=torrent_binary,
- info_raw=Bencode.encode(info_dict),
- )
- return torrent_binary, parsed
- def write_state(self) -> None:
- try:
- self.state_path.write_text(json.dumps(self.state, indent=2, sort_keys=True), encoding="utf-8")
- except OSError as error:
- warn(f"Could not write state file {self.state_path}: {error}")
- def validate_base_url(url: str, allow_insecure_http: bool) -> str:
- parsed = urllib.parse.urlparse(url)
- if parsed.scheme not in {"http", "https"} or not parsed.netloc:
- fatal("--base-url must be a full URL like https://nostradamus.foo")
- if parsed.scheme == "http" and not allow_insecure_http and parsed.hostname not in {"127.0.0.1", "localhost"}:
- fatal("Refusing insecure HTTP for a non-local host. Use HTTPS or pass --allow-insecure-http.")
- return parsed.geturl().rstrip("/")
- def load_adjacent_nfo(candidates: Sequence[Optional[Path]]) -> Optional[str]:
- visited = set()
- for candidate in candidates:
- if candidate is None:
- continue
- candidate = candidate.expanduser()
- search_roots = []
- if candidate.is_dir():
- search_roots.append(candidate)
- elif candidate.exists():
- search_roots.append(candidate.parent)
- for root in search_roots:
- if root in visited:
- continue
- visited.add(root)
- for entry in sorted(root.iterdir()):
- if entry.suffix.lower() in TEXT_EXTS and entry.suffix.lower() == ".nfo":
- try:
- return entry.read_text(encoding="utf-8", errors="replace")[:500_000]
- except OSError:
- continue
- return None
- def torrent_files(info_dict: dict) -> Dict[str, int]:
- if "files" in info_dict:
- return {
- "/".join(str(part) for part in file_entry["path"]): int(file_entry["length"])
- for file_entry in info_dict["files"]
- }
- return {str(info_dict["name"]): int(info_dict["length"])}
- def build_info_dict(path: Path) -> dict:
- piece_length = choose_piece_length(path)
- pieces = compute_pieces(path, piece_length)
- info_dict: Dict[str, Any] = {
- "name": path.name,
- "piece length": piece_length,
- "pieces": pieces,
- "private": 1,
- "source": "Nostradamus",
- }
- if path.is_file():
- info_dict["length"] = path.stat().st_size
- else:
- files = []
- for file_path in sorted(p for p in path.rglob("*") if p.is_file()):
- files.append({
- "length": file_path.stat().st_size,
- "path": list(file_path.relative_to(path).parts),
- })
- info_dict["files"] = files
- return info_dict
- def choose_piece_length(path: Path) -> int:
- total_size = total_path_size(path)
- if total_size < 1 * 1024**3:
- return 256 * 1024
- if total_size < 4 * 1024**3:
- return 512 * 1024
- if total_size < 16 * 1024**3:
- return 1024 * 1024
- return 2 * 1024 * 1024
- def compute_pieces(path: Path, piece_length: int) -> bytes:
- digest = []
- buffer = bytearray()
- for chunk in iter_path_chunks(path):
- buffer.extend(chunk)
- while len(buffer) >= piece_length:
- piece = bytes(buffer[:piece_length])
- digest.append(hashlib.sha1(piece).digest())
- del buffer[:piece_length]
- if buffer:
- digest.append(hashlib.sha1(bytes(buffer)).digest())
- return b"".join(digest)
- def iter_path_chunks(path: Path, chunk_size: int = 1024 * 1024) -> Iterable[bytes]:
- files = [path] if path.is_file() else sorted(p for p in path.rglob("*") if p.is_file())
- for file_path in files:
- with file_path.open("rb") as handle:
- while True:
- chunk = handle.read(chunk_size)
- if not chunk:
- break
- yield chunk
- def total_path_size(path: Path) -> int:
- if path.is_file():
- return path.stat().st_size
- return sum(file_path.stat().st_size for file_path in path.rglob("*") if file_path.is_file())
- def data_match_keys(name: str) -> List[str]:
- path = Path(name)
- raw = [name, path.stem]
- normalized = [normalize_key(value) for value in raw]
- return [key for key in dict.fromkeys(raw + normalized) if key]
- def normalize_key(value: str) -> str:
- import re
- return re.sub(r"[^a-z0-9]+", "", value.lower())
- def stable_client_item_id(info_hash: str) -> str:
- return f"item-{info_hash[:16]}"
- def default_client_job_id() -> str:
- return f"import-{int(time.time())}-{secrets.token_hex(4)}"
- def print_scan_summary(candidates: Sequence[dict], mode: str) -> None:
- counts = {"cross_seed": 0, "data_only": 0}
- for candidate in candidates:
- counts[candidate["source_kind"]] += 1
- print("")
- print("Local scan complete.")
- print(f"Mode : {mode}")
- print(f"Candidates : {len(candidates)}")
- print(f"Cross-seed : {counts['cross_seed']}")
- print(f"Data-only : {counts['data_only']}")
- print("")
- def info(message: str) -> None:
- print(f"==> {message}")
- def warn(message: str) -> None:
- print(f"[warn] {message}", file=sys.stderr)
- def fatal(message: str) -> None:
- print(f"[error] {message}", file=sys.stderr)
- raise SystemExit(1)
- def build_parser() -> argparse.ArgumentParser:
- parser = argparse.ArgumentParser(
- description="Official client for the Nostradamus upload API"
- )
- parser.add_argument("--base-url", required=True, help="Nostradamus base URL, for example https://nostradamus.foo")
- parser.add_argument("--importer-token", required=True, help="Dedicated importer token from Nostradamus settings")
- parser.add_argument("--torrent-root", action="append", help="Directory containing existing .torrent files")
- parser.add_argument("--torrent", action="append", help="Single .torrent file to import")
- parser.add_argument(
- "--data-root",
- action="append",
- help="Directory whose direct children should be imported as separate data-only items",
- )
- parser.add_argument(
- "--path",
- action="append",
- help="Single file or directory to import as exactly one data-only item",
- )
- parser.add_argument("--client-job-id", help="Optional stable client job id for the session")
- parser.add_argument("--state-file", default=DEFAULT_STATE_FILE, help=f"Local state file path (default: {DEFAULT_STATE_FILE})")
- parser.add_argument("--timeout", type=int, default=DEFAULT_TIMEOUT, help=f"HTTP timeout in seconds (default: {DEFAULT_TIMEOUT})")
- parser.add_argument("--allow-insecure-http", action="store_true", help="Allow plain HTTP for non-local hosts")
- return parser
- def main(argv: Optional[Sequence[str]] = None) -> int:
- parser = build_parser()
- args = parser.parse_args(argv)
- uploader = ImportUploader(args)
- return uploader.run()
- if __name__ == "__main__":
- raise SystemExit(main())
|