nostradamus_importer.py 23 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658
  1. #!/usr/bin/env python3
  2. """Upload script for the Nostradamus session-based upload/import API.
  3. The local tool only:
  4. - scans existing .torrent files and/or local data folders
  5. - creates a new import session on Nostradamus
  6. - uploads raw import candidates to that session
  7. - prints the web URL where the uploader should review the items
  8. """
  9. from __future__ import annotations
  10. import argparse
  11. import base64
  12. import hashlib
  13. import json
  14. import secrets
  15. import sys
  16. import time
  17. import urllib.error
  18. import urllib.parse
  19. import urllib.request
  20. from dataclasses import dataclass
  21. from pathlib import Path
  22. from typing import Any, Dict, Iterable, List, Optional, Sequence, Tuple
  23. TEXT_EXTS = {".nfo", ".txt"}
  24. DEFAULT_TIMEOUT = 30
  25. DEFAULT_STATE_FILE = ".nostradamus-import-state.json"
  26. @dataclass(frozen=True)
  27. class RawBencoded:
  28. value: bytes
  29. @dataclass
  30. class ParsedTorrent:
  31. name: str
  32. info_hash: str
  33. size: int
  34. files: Dict[str, int]
  35. data: Dict[str, Any]
  36. binary: bytes
  37. info_raw: bytes
  38. class Bencode:
  39. @staticmethod
  40. def decode_torrent(data: bytes) -> Tuple[Dict[str, Any], bytes]:
  41. if not data.startswith(b"d"):
  42. raise ValueError("torrent payload must be a bencoded dictionary")
  43. index = 1
  44. result: Dict[str, Any] = {}
  45. info_raw: Optional[bytes] = None
  46. while data[index:index + 1] != b"e":
  47. key_raw, index = Bencode._decode_bytes_at(data, index)
  48. value_start = index
  49. value, index = Bencode._decode_at(data, index)
  50. key = key_raw.decode("utf-8", errors="replace")
  51. if key_raw == b"info":
  52. info_raw = data[value_start:index]
  53. result[key] = value
  54. if info_raw is None:
  55. raise ValueError("torrent missing info dictionary")
  56. return result, info_raw
  57. @staticmethod
  58. def _decode_bytes_at(data: bytes, index: int) -> Tuple[bytes, int]:
  59. token = data[index:index + 1]
  60. if not token.isdigit():
  61. raise ValueError(f"invalid byte string token at {index}: {token!r}")
  62. colon = data.index(b":", index)
  63. length = int(data[index:colon])
  64. start = colon + 1
  65. end = start + length
  66. return data[start:end], end
  67. @staticmethod
  68. def _decode_at(data: bytes, index: int) -> Tuple[Any, int]:
  69. token = data[index:index + 1]
  70. if token == b"i":
  71. end = data.index(b"e", index)
  72. return int(data[index + 1:end]), end + 1
  73. if token == b"l":
  74. index += 1
  75. items = []
  76. while data[index:index + 1] != b"e":
  77. item, index = Bencode._decode_at(data, index)
  78. items.append(item)
  79. return items, index + 1
  80. if token == b"d":
  81. index += 1
  82. result = {}
  83. while data[index:index + 1] != b"e":
  84. key, index = Bencode._decode_at(data, index)
  85. value, index = Bencode._decode_at(data, index)
  86. if isinstance(key, bytes):
  87. key = key.decode("utf-8", errors="replace")
  88. result[key] = value
  89. return result, index + 1
  90. if token.isdigit():
  91. colon = data.index(b":", index)
  92. length = int(data[index:colon])
  93. start = colon + 1
  94. end = start + length
  95. raw = data[start:end]
  96. try:
  97. return raw.decode("utf-8"), end
  98. except UnicodeDecodeError:
  99. return raw, end
  100. raise ValueError(f"invalid bencode token at {index}: {token!r}")
  101. @staticmethod
  102. def encode(value: Any) -> bytes:
  103. if isinstance(value, RawBencoded):
  104. return value.value
  105. if isinstance(value, bool):
  106. value = int(value)
  107. if isinstance(value, int):
  108. return f"i{value}e".encode("ascii")
  109. if isinstance(value, bytes):
  110. return str(len(value)).encode("ascii") + b":" + value
  111. if isinstance(value, str):
  112. raw = value.encode("utf-8")
  113. return str(len(raw)).encode("ascii") + b":" + raw
  114. if isinstance(value, list):
  115. return b"l" + b"".join(Bencode.encode(item) for item in value) + b"e"
  116. if isinstance(value, dict):
  117. items = []
  118. for key in sorted(value.keys(), key=lambda item: item if isinstance(item, bytes) else str(item).encode("utf-8")):
  119. encoded_key = key if isinstance(key, bytes) else str(key)
  120. items.append(Bencode.encode(encoded_key))
  121. items.append(Bencode.encode(value[key]))
  122. return b"d" + b"".join(items) + b"e"
  123. raise TypeError(f"unsupported bencode type: {type(value)!r}")
  124. class ImportUploader:
  125. def __init__(self, args: argparse.Namespace):
  126. self.args = args
  127. self.base_url = validate_base_url(args.base_url, args.allow_insecure_http)
  128. self.importer_token = args.importer_token
  129. self.timeout = args.timeout
  130. self.state_path = Path(args.state_file)
  131. self.state: Dict[str, Any] = {}
  132. def run(self) -> int:
  133. candidates = self.collect_items()
  134. if not candidates:
  135. fatal("No import candidates found. Provide --torrent-root, --torrent, --data-root, or --path.")
  136. mode = self.infer_mode(candidates)
  137. client_job_id = self.args.client_job_id or default_client_job_id()
  138. info(f"Creating import session ({mode})")
  139. session = self.create_session(client_job_id, mode)
  140. session_id = session["id"]
  141. session_url = f"{self.base_url}/imports/{session_id}"
  142. self.state = {
  143. "client_job_id": client_job_id,
  144. "session_id": session_id,
  145. "mode": mode,
  146. "items": [],
  147. }
  148. self.write_state()
  149. print_scan_summary(candidates, mode)
  150. uploaded = []
  151. failures = 0
  152. for index, candidate in enumerate(candidates, start=1):
  153. label = candidate["original_name"]
  154. info(f"[{index}/{len(candidates)}] Uploading {label}")
  155. try:
  156. response = self.create_item(session_id, candidate)
  157. item = response["item"]
  158. uploaded.append({
  159. "client_item_id": candidate["client_item_id"],
  160. "item_id": item["id"],
  161. "info_hash": candidate.get("info_hash"),
  162. "status": item["status"],
  163. "name": candidate["original_name"],
  164. "source_kind": candidate["source_kind"],
  165. })
  166. except RuntimeError as error:
  167. failures += 1
  168. warn(f"{label}: {error}")
  169. uploaded.append({
  170. "client_item_id": candidate["client_item_id"],
  171. "info_hash": candidate.get("info_hash"),
  172. "status": "failed",
  173. "name": candidate["original_name"],
  174. "source_kind": candidate["source_kind"],
  175. "error": str(error),
  176. })
  177. self.state["items"] = uploaded
  178. self.write_state()
  179. print("")
  180. print("Import session created.")
  181. print(f"Session ID : {session_id}")
  182. print(f"Review URL : {session_url}")
  183. print(f"Uploaded : {len(candidates) - failures}/{len(candidates)} item(s)")
  184. if failures:
  185. print(f"Failed : {failures}")
  186. print(f"State file : {self.state_path}")
  187. print("")
  188. print("Next steps:")
  189. print("1. Open the review URL in Nostradamus")
  190. print("2. Review the imported items on the website")
  191. print("3. Finalize the items you want to send into pending moderation")
  192. print("4. After approval, download the prepared .torrent from the site")
  193. return 0 if failures == 0 else 1
  194. def create_session(self, client_job_id: str, mode: str) -> dict:
  195. response = self.request("POST", "/api/import/sessions", {
  196. "client_job_id": client_job_id,
  197. "mode": mode,
  198. })
  199. return response["data"]
  200. def create_item(self, session_id: str, payload: dict) -> dict:
  201. response = self.request("POST", f"/api/import/sessions/{session_id}/items", payload)
  202. return response["data"]
  203. def request(self, method: str, path: str, payload: Optional[dict] = None) -> dict:
  204. url = f"{self.base_url}{path}"
  205. body = None
  206. headers = {"x-importer-token": self.importer_token, "accept": "application/json"}
  207. if payload is not None:
  208. body = json.dumps(payload).encode("utf-8")
  209. headers["content-type"] = "application/json"
  210. request = urllib.request.Request(url, data=body, method=method, headers=headers)
  211. try:
  212. with urllib.request.urlopen(request, timeout=self.timeout) as response:
  213. raw = response.read().decode("utf-8")
  214. return json.loads(raw) if raw else {}
  215. except urllib.error.HTTPError as error:
  216. body_text = error.read().decode("utf-8", errors="replace")
  217. try:
  218. parsed = json.loads(body_text)
  219. message = parsed.get("error", body_text)
  220. except json.JSONDecodeError:
  221. message = body_text
  222. raise RuntimeError(f"{method} {path} failed with {error.code}: {message}") from error
  223. except urllib.error.URLError as error:
  224. raise RuntimeError(f"{method} {path} failed: {error}") from error
  225. def collect_items(self) -> List[dict]:
  226. torrent_paths = self.expand_torrent_paths()
  227. data_paths = self.expand_data_paths()
  228. data_matcher = self.build_data_path_matcher(data_paths)
  229. matched_data_paths = set()
  230. items: List[dict] = []
  231. for torrent_path in torrent_paths:
  232. parsed = self.parse_torrent_file(torrent_path)
  233. if parsed is None:
  234. continue
  235. matched_data_path = self.match_data_path(parsed.name, data_matcher)
  236. if matched_data_path is not None:
  237. matched_data_paths.add(matched_data_path)
  238. item = self.build_cross_seed_item(torrent_path, parsed, matched_data_path)
  239. if item:
  240. items.append(item)
  241. for data_path in data_paths:
  242. if data_path in matched_data_paths:
  243. continue
  244. item = self.build_data_only_item(data_path)
  245. if item:
  246. items.append(item)
  247. deduped: Dict[str, dict] = {}
  248. unnamed: List[dict] = []
  249. for item in items:
  250. info_hash = item.get("info_hash")
  251. if info_hash:
  252. deduped[info_hash] = item
  253. else:
  254. unnamed.append(item)
  255. return list(deduped.values()) + unnamed
  256. def expand_torrent_paths(self) -> List[Path]:
  257. paths: List[Path] = []
  258. for raw in self.args.torrent or []:
  259. path = Path(raw).expanduser()
  260. if path.is_file():
  261. paths.append(path)
  262. for raw in self.args.torrent_root or []:
  263. root = Path(raw).expanduser()
  264. if root.is_dir():
  265. paths.extend(sorted(root.rglob("*.torrent")))
  266. return sorted(set(paths))
  267. def expand_data_paths(self) -> List[Path]:
  268. paths: List[Path] = []
  269. for raw in self.args.path or []:
  270. path = Path(raw).expanduser()
  271. if path.exists():
  272. paths.append(path)
  273. for raw in self.args.data_root or []:
  274. root = Path(raw).expanduser()
  275. if root.is_dir():
  276. for child in sorted(root.iterdir()):
  277. if child.name.startswith("."):
  278. continue
  279. paths.append(child)
  280. return [path for path in sorted(set(paths)) if path.suffix.lower() != ".torrent"]
  281. def infer_mode(self, items: Sequence[dict]) -> str:
  282. kinds = {item["source_kind"] for item in items}
  283. if kinds == {"cross_seed"}:
  284. return "cross_seed"
  285. if kinds == {"data_only"}:
  286. return "data_only"
  287. return "mixed"
  288. def build_data_path_matcher(self, paths: Sequence[Path]) -> Dict[str, List[Path]]:
  289. matcher: Dict[str, List[Path]] = {}
  290. for path in paths:
  291. for key in data_match_keys(path.name):
  292. matcher.setdefault(key, []).append(path)
  293. return matcher
  294. def match_data_path(self, torrent_name: str, matcher: Dict[str, List[Path]]) -> Optional[Path]:
  295. for key in data_match_keys(torrent_name):
  296. candidates = matcher.get(key) or []
  297. while candidates:
  298. candidate = candidates.pop(0)
  299. if candidate.exists():
  300. return candidate
  301. return None
  302. def build_cross_seed_item(
  303. self,
  304. torrent_path: Path,
  305. parsed: ParsedTorrent,
  306. matched_data_path: Optional[Path],
  307. ) -> dict:
  308. nfo_content = load_adjacent_nfo([matched_data_path, torrent_path])
  309. local_path_hint = str(matched_data_path or torrent_path.parent)
  310. return {
  311. "client_item_id": stable_client_item_id(parsed.info_hash),
  312. "source_kind": "cross_seed",
  313. "original_name": parsed.name,
  314. "local_path_hint": local_path_hint,
  315. "info_hash": parsed.info_hash,
  316. "size": parsed.size,
  317. "raw_torrent": base64.b64encode(parsed.binary).decode("ascii"),
  318. "raw_nfo": nfo_content,
  319. "file_list": [{"path": path, "size": size} for path, size in parsed.files.items()],
  320. "detected_facts": {
  321. "source_path": str(torrent_path),
  322. "matched_data_path": str(matched_data_path) if matched_data_path else None,
  323. "has_nfo": bool(nfo_content),
  324. "file_count": len(parsed.files),
  325. },
  326. }
  327. def build_data_only_item(self, data_path: Path) -> Optional[dict]:
  328. info(f"Hashing data for {data_path.name}")
  329. torrent_binary, parsed = self.create_torrent_from_path(data_path)
  330. nfo_content = load_adjacent_nfo([data_path])
  331. return {
  332. "client_item_id": stable_client_item_id(parsed.info_hash),
  333. "source_kind": "data_only",
  334. "original_name": parsed.name,
  335. "local_path_hint": str(data_path),
  336. "info_hash": parsed.info_hash,
  337. "size": parsed.size,
  338. "raw_torrent": base64.b64encode(torrent_binary).decode("ascii"),
  339. "raw_nfo": nfo_content,
  340. "file_list": [{"path": path, "size": size} for path, size in parsed.files.items()],
  341. "detected_facts": {
  342. "source_path": str(data_path),
  343. "has_nfo": bool(nfo_content),
  344. "file_count": len(parsed.files),
  345. },
  346. }
  347. def parse_torrent_file(self, path: Path) -> Optional[ParsedTorrent]:
  348. try:
  349. binary = path.read_bytes()
  350. data, info_raw = Bencode.decode_torrent(binary)
  351. info_dict = data["info"]
  352. info_hash = hashlib.sha1(info_raw).hexdigest()
  353. files = torrent_files(info_dict)
  354. size = sum(files.values())
  355. return ParsedTorrent(
  356. name=str(info_dict["name"]),
  357. info_hash=info_hash,
  358. size=size,
  359. files=files,
  360. data=data,
  361. binary=binary,
  362. info_raw=info_raw,
  363. )
  364. except Exception as error:
  365. warn(f"Failed to parse {path}: {error}")
  366. return None
  367. def create_torrent_from_path(self, path: Path) -> Tuple[bytes, ParsedTorrent]:
  368. info_dict = build_info_dict(path)
  369. torrent_data = {
  370. "created by": "Nostradamus",
  371. "creation date": int(time.time()),
  372. "comment": "Proudly generated for Nostradamus",
  373. "info": info_dict,
  374. }
  375. torrent_binary = Bencode.encode(torrent_data)
  376. parsed = ParsedTorrent(
  377. name=str(info_dict["name"]),
  378. info_hash=hashlib.sha1(Bencode.encode(info_dict)).hexdigest(),
  379. size=sum(torrent_files(info_dict).values()),
  380. files=torrent_files(info_dict),
  381. data=torrent_data,
  382. binary=torrent_binary,
  383. info_raw=Bencode.encode(info_dict),
  384. )
  385. return torrent_binary, parsed
  386. def write_state(self) -> None:
  387. try:
  388. self.state_path.write_text(json.dumps(self.state, indent=2, sort_keys=True), encoding="utf-8")
  389. except OSError as error:
  390. warn(f"Could not write state file {self.state_path}: {error}")
  391. def validate_base_url(url: str, allow_insecure_http: bool) -> str:
  392. parsed = urllib.parse.urlparse(url)
  393. if parsed.scheme not in {"http", "https"} or not parsed.netloc:
  394. fatal("--base-url must be a full URL like https://nostradamus.foo")
  395. if parsed.scheme == "http" and not allow_insecure_http and parsed.hostname not in {"127.0.0.1", "localhost"}:
  396. fatal("Refusing insecure HTTP for a non-local host. Use HTTPS or pass --allow-insecure-http.")
  397. return parsed.geturl().rstrip("/")
  398. def load_adjacent_nfo(candidates: Sequence[Optional[Path]]) -> Optional[str]:
  399. visited = set()
  400. for candidate in candidates:
  401. if candidate is None:
  402. continue
  403. candidate = candidate.expanduser()
  404. search_roots = []
  405. if candidate.is_dir():
  406. search_roots.append(candidate)
  407. elif candidate.exists():
  408. search_roots.append(candidate.parent)
  409. for root in search_roots:
  410. if root in visited:
  411. continue
  412. visited.add(root)
  413. for entry in sorted(root.iterdir()):
  414. if entry.suffix.lower() in TEXT_EXTS and entry.suffix.lower() == ".nfo":
  415. try:
  416. return entry.read_text(encoding="utf-8", errors="replace")[:500_000]
  417. except OSError:
  418. continue
  419. return None
  420. def torrent_files(info_dict: dict) -> Dict[str, int]:
  421. if "files" in info_dict:
  422. return {
  423. "/".join(str(part) for part in file_entry["path"]): int(file_entry["length"])
  424. for file_entry in info_dict["files"]
  425. }
  426. return {str(info_dict["name"]): int(info_dict["length"])}
  427. def build_info_dict(path: Path) -> dict:
  428. piece_length = choose_piece_length(path)
  429. pieces = compute_pieces(path, piece_length)
  430. info_dict: Dict[str, Any] = {
  431. "name": path.name,
  432. "piece length": piece_length,
  433. "pieces": pieces,
  434. "private": 1,
  435. "source": "Nostradamus",
  436. }
  437. if path.is_file():
  438. info_dict["length"] = path.stat().st_size
  439. else:
  440. files = []
  441. for file_path in sorted(p for p in path.rglob("*") if p.is_file()):
  442. files.append({
  443. "length": file_path.stat().st_size,
  444. "path": list(file_path.relative_to(path).parts),
  445. })
  446. info_dict["files"] = files
  447. return info_dict
  448. def choose_piece_length(path: Path) -> int:
  449. total_size = total_path_size(path)
  450. if total_size < 1 * 1024**3:
  451. return 256 * 1024
  452. if total_size < 4 * 1024**3:
  453. return 512 * 1024
  454. if total_size < 16 * 1024**3:
  455. return 1024 * 1024
  456. return 2 * 1024 * 1024
  457. def compute_pieces(path: Path, piece_length: int) -> bytes:
  458. digest = []
  459. buffer = bytearray()
  460. for chunk in iter_path_chunks(path):
  461. buffer.extend(chunk)
  462. while len(buffer) >= piece_length:
  463. piece = bytes(buffer[:piece_length])
  464. digest.append(hashlib.sha1(piece).digest())
  465. del buffer[:piece_length]
  466. if buffer:
  467. digest.append(hashlib.sha1(bytes(buffer)).digest())
  468. return b"".join(digest)
  469. def iter_path_chunks(path: Path, chunk_size: int = 1024 * 1024) -> Iterable[bytes]:
  470. files = [path] if path.is_file() else sorted(p for p in path.rglob("*") if p.is_file())
  471. for file_path in files:
  472. with file_path.open("rb") as handle:
  473. while True:
  474. chunk = handle.read(chunk_size)
  475. if not chunk:
  476. break
  477. yield chunk
  478. def total_path_size(path: Path) -> int:
  479. if path.is_file():
  480. return path.stat().st_size
  481. return sum(file_path.stat().st_size for file_path in path.rglob("*") if file_path.is_file())
  482. def data_match_keys(name: str) -> List[str]:
  483. path = Path(name)
  484. raw = [name, path.stem]
  485. normalized = [normalize_key(value) for value in raw]
  486. return [key for key in dict.fromkeys(raw + normalized) if key]
  487. def normalize_key(value: str) -> str:
  488. import re
  489. return re.sub(r"[^a-z0-9]+", "", value.lower())
  490. def stable_client_item_id(info_hash: str) -> str:
  491. return f"item-{info_hash[:16]}"
  492. def default_client_job_id() -> str:
  493. return f"import-{int(time.time())}-{secrets.token_hex(4)}"
  494. def print_scan_summary(candidates: Sequence[dict], mode: str) -> None:
  495. counts = {"cross_seed": 0, "data_only": 0}
  496. for candidate in candidates:
  497. counts[candidate["source_kind"]] += 1
  498. print("")
  499. print("Local scan complete.")
  500. print(f"Mode : {mode}")
  501. print(f"Candidates : {len(candidates)}")
  502. print(f"Cross-seed : {counts['cross_seed']}")
  503. print(f"Data-only : {counts['data_only']}")
  504. print("")
  505. def info(message: str) -> None:
  506. print(f"==> {message}")
  507. def warn(message: str) -> None:
  508. print(f"[warn] {message}", file=sys.stderr)
  509. def fatal(message: str) -> None:
  510. print(f"[error] {message}", file=sys.stderr)
  511. raise SystemExit(1)
  512. def build_parser() -> argparse.ArgumentParser:
  513. parser = argparse.ArgumentParser(
  514. description="Official client for the Nostradamus upload API"
  515. )
  516. parser.add_argument("--base-url", required=True, help="Nostradamus base URL, for example https://nostradamus.foo")
  517. parser.add_argument("--importer-token", required=True, help="Dedicated importer token from Nostradamus settings")
  518. parser.add_argument("--torrent-root", action="append", help="Directory containing existing .torrent files")
  519. parser.add_argument("--torrent", action="append", help="Single .torrent file to import")
  520. parser.add_argument(
  521. "--data-root",
  522. action="append",
  523. help="Directory whose direct children should be imported as separate data-only items",
  524. )
  525. parser.add_argument(
  526. "--path",
  527. action="append",
  528. help="Single file or directory to import as exactly one data-only item",
  529. )
  530. parser.add_argument("--client-job-id", help="Optional stable client job id for the session")
  531. parser.add_argument("--state-file", default=DEFAULT_STATE_FILE, help=f"Local state file path (default: {DEFAULT_STATE_FILE})")
  532. parser.add_argument("--timeout", type=int, default=DEFAULT_TIMEOUT, help=f"HTTP timeout in seconds (default: {DEFAULT_TIMEOUT})")
  533. parser.add_argument("--allow-insecure-http", action="store_true", help="Allow plain HTTP for non-local hosts")
  534. return parser
  535. def main(argv: Optional[Sequence[str]] = None) -> int:
  536. parser = build_parser()
  537. args = parser.parse_args(argv)
  538. uploader = ImportUploader(args)
  539. return uploader.run()
  540. if __name__ == "__main__":
  541. raise SystemExit(main())