diff --git a/1_graph.py b/1_graph.py index 824d723..e2de90d 100644 --- a/1_graph.py +++ b/1_graph.py @@ -1,5 +1,6 @@ import json import sys +import time from collections import namedtuple from functools import cache from pathlib import Path @@ -26,6 +27,7 @@ cur = conn.execute( [user_id], ) while rows := cur.fetchmany(0xFF): + time.sleep(0.0001) for row in rows: note_ids.add(row[0]) if early_exit and len(note_ids) > early_exit: @@ -34,6 +36,7 @@ while rows := cur.fetchmany(0xFF): @cache def get_note(id: str) -> Note: + time.sleep(0.0001) return Note( *conn.execute( 'select "renoteId", "replyId", "userId" from note where id = %s', [id] @@ -102,6 +105,7 @@ def traverse(tree: Tree): def expand(tree: Tree): + time.sleep(0.0001) for row in conn.execute( "select id from note_replies(%s, 1, 1000)", [tree.id] ).fetchall(): diff --git a/2_filter.py b/2_filter.py index 8e77945..89311d2 100644 --- a/2_filter.py +++ b/2_filter.py @@ -1,19 +1,21 @@ +import time from dataclasses import dataclass from pathlib import Path from typing import Callable, List import psycopg -from com import FilterableNote, Visibility, eval_config, parse_graph, progressbar - +from com import (FilterableNote, Visibility, eval_config, parse_graph, + progressbar, FilterAction) config = eval_config() conn: psycopg.Connection = config["connect"]() -criteria: Callable[[FilterableNote], bool] = config["criteria"] +criteria: Callable[[FilterableNote], FilterAction] = config["criteria"] intermediate = parse_graph() def transform(entry: dict) -> FilterableNote: + time.sleep(0.0001) note = conn.execute( 'select "createdAt", reactions, "renoteCount", visibility from note where id = %s', [entry["id"]], @@ -57,10 +59,10 @@ for entry in intermediate.values(): transformed = transform(entry) if transformed is None: continue # we'll get to it next cycle - if criteria(transformed): - targets.append(entry["id"]) + action = criteria(transformed) + if action != FilterAction.Ignore: + targets.append(f"{entry['id']} {action.value}") pb.increment() pb.finish() - Path("filtered.list").write_text("\n".join(targets)) diff --git a/3_archive.py b/3_archive.py index 6eef0e1..39affdd 100644 --- a/3_archive.py +++ b/3_archive.py @@ -1,4 +1,5 @@ import json +import time from http.client import HTTPResponse from pathlib import Path from shutil import copyfileobj @@ -16,11 +17,13 @@ conn: psycopg.Connection = config["connect"]() graph = parse_graph() print("reading filterlist") filtered = Path("filtered.list").read_text().strip().splitlines() +filtered = list(map(lambda line: line.split(' ')[0], filtered)) collected_users = {} def collect_user(id: str): if id in collected_users: return + time.sleep(0.001) user = conn.execute('select username, host, "avatarUrl" from "user" where id = %s', [id]).fetchone() if user is None: return None @@ -44,10 +47,11 @@ def collect_note(id: str): output = {} output["id"] = id - note = conn.execute('select text, "userId", "createdAt", "updatedAt", reactions, "renoteCount", visibility, "fileIds" from note where id = %s', [id]).fetchone() + time.sleep(0.001) + note = conn.execute('select text, "userId", "createdAt", "updatedAt", reactions, "renoteCount", visibility, "fileIds", cw from note where id = %s', [id]).fetchone() if note is None: return None - text, user_id, created_at, updated_at, reactions, renotes, visibility, file_ids = note + text, user_id, created_at, updated_at, reactions, renotes, visibility, file_ids, cw = note collect_user(user_id) output["text"] = text @@ -59,6 +63,7 @@ def collect_note(id: str): output["reactions"] = reactions output["renotes"] = renotes output["visibility"] = Visibility.from_db(visibility).code() + output["cw"] = cw node = graph[id] replies = [collect_note(reply) for reply in node["replies"]] @@ -68,6 +73,7 @@ def collect_note(id: str): output["attachments"] = [] for file_id in file_ids: + time.sleep(0.0005) name, type_, comment, url = conn.execute('select name, type, comment, url from drive_file where id = %s', [file_id]).fetchone() attachment = { "id": file_id, @@ -117,13 +123,15 @@ pb = progressbar.ProgressBar( ) for id, note in collected_notes: - outfile = outdir / "note" / f"{id}.mpk.br" + outfile = outdir / "note" / id[:3] / f"{id[3:]}.mpk.br" + outfile.parent.mkdir(exist_ok=True) with outfile.open("wb") as f: f.write(brotli.compress(msgpack.dumps(note))) pb.increment() for id, user in collected_users.items(): - outfile = outdir / "user" / f"{id}.mpk.br" + outfile = outdir / "user" / id[:2] / f"{id[2:]}.mpk.br" + outfile.parent.mkdir(exist_ok=True) with outfile.open("wb") as f: f.write(brotli.compress(msgpack.dumps(note))) pb.increment() @@ -134,8 +142,9 @@ pb = progressbar.ProgressBar( len(files_to_collect), prefix="downloading attachments ", ) -for (id, url) in files_to_collect: - outfile = outdir / "file" / id +for (id, url) in files_to_collect: + outfile = outdir / "file" / id[:2] / id[2:] + outfile.parent.mkdir(exist_ok=True) response: HTTPResponse = urlopen(url) with outfile.open("wb") as f: copyfileobj(response, f) diff --git a/4_delete.py b/4_delete.py index 51e1ef3..615fbab 100644 --- a/4_delete.py +++ b/4_delete.py @@ -1,9 +1,11 @@ +import sys +import time from pathlib import Path import httpx import psycopg -from com import eval_config, parse_graph, progressbar +from com import eval_config, parse_graph, progressbar, FilterAction config = eval_config() conn: psycopg.Connection = config["connect"]() @@ -13,21 +15,129 @@ api: str = config["api"] graph = parse_graph() print("reading filterlist") filtered = Path("filtered.list").read_text().strip().splitlines() +filtered = list(map(lambda line: line.split(' '), filtered)) +print("building queue") queue = [] -def enqueue(note): +def enqueue(note, action): for reply in note["replies"]: - enqueue(graph[reply]) + enqueue(graph[reply], action) for quote in note["quotes"]: - enqueue(graph[quote]) + enqueue(graph[quote], action) if "self" in note["flags"]: - files = conn.execute('select "fileIds" from note where id = %s', [note["id"]]).fetchone()[0] - queue.append((note["id"], files)) + queue.append((note["id"], action)) -for id in filtered: - enqueue(graph[id]) +for id, action in filtered: + enqueue(graph[id], FilterAction(action)) -print(queue) +class CustomETA(progressbar.ETA): + def __init__(self, *args, **kwargs): + self.history = [] + self.lastval = None + progressbar.ETA.__init__(self, *args, **kwargs) -# client = httpx.Client() + def _calculate_eta(self, progress, data, value, elapsed): + if self.lastval != value: + self.history = [*self.history[-9:], elapsed.total_seconds()] + self.lastval = value + per_item = (self.history[-1] - self.history[0]) / len(self.history) + remaining = (progress.max_value - value) * per_item + spent = elapsed.total_seconds() - self.history[-1] + return max(remaining - spent, 0) + +pb = progressbar.ProgressBar( + 0, + len(queue), + widgets=[ + progressbar.Variable("message", format="{formatted_value}"), + " ", + progressbar.Percentage(), + " ", + progressbar.Bar(), + " ", + progressbar.SimpleProgress("%(value_s)s/%(max_value_s)s"), + " ", + CustomETA(), + ], + variables={"status": "work"} +) +pb.update(0) # force initial display +client = httpx.Client(timeout=60) +seeking = False +last_req = 0 + +for note, action in queue: + + # seek through queue + # helps prevent rate limits on resumed deletions + if seeking: + while True: + resp = client.post(f"{api}/notes/show", json={ + "i": token, + "noteId": note, + }) + if resp.status_code == 502: + pb.update(message="down") + time.sleep(1) + continue + break + if resp.status_code == 404: + pb.increment(message="seeking") + continue + seeking = False + + # wait for queue to empty + while True: + resp = client.post(f"{api}/admin/queue/stats", json={"i": token}) + if resp.status_code == 502: + pb.update(message="down") + time.sleep(1) + continue + deliver_waiting = resp.json()["deliver"]["waiting"] + obliterate_waiting = resp.json()["obliterate"]["waiting"] + obliterate_delayed = resp.json()["obliterate"]["delayed"] + if deliver_waiting < 100 and obliterate_waiting + obliterate_delayed< 50000: + break + pb.update(message=f"queue ({deliver_waiting}/{obliterate_waiting + obliterate_delayed})") + time.sleep(10) + + # prevent api rate limiting + req_delay = time.time() - last_req + if req_delay < 15: + pb.update(message="delaying") + time.sleep(req_delay) + + # queue new deletions + err = 0 + while True: + resp = client.post(f"{api}/notes/delete", json={ + "i": token, + "noteId": note, + "obliterate": action == FilterAction.Obliterate, + }) + if resp.status_code == 429: + pb.update(status="limit") + time.sleep(1) + continue + elif resp.status_code == 502: + pb.update(status="down") + continue + time.sleep(1) + elif resp.status_code >= 400: + body = resp.json() + if body["error"]["code"] == "NO_SUCH_NOTE": + pb.increment(message="seeking") + seeking = True + break + err += 1 + if err > 10: + raise Exception(f"{body['error']['code']}: {body['error']['message']}") + sys.stdout.write("\r") + print(f"err {body['error']['code']} {body['error']['message']} ") + time.sleep(30) + pb.increment(message="deleting") + last_req = time.time() + break + +pb.finish() diff --git a/com.py b/com.py index 4ceb849..3ebb948 100644 --- a/com.py +++ b/com.py @@ -25,7 +25,7 @@ class Visibility(Enum): case "followers": return cls.followers case "specified": return cls.direct case _: raise ValueError(f"unknown visibility `{raw}`") - + def code(self) -> str: match self: case self.public: return "p" @@ -76,6 +76,12 @@ class FilterableNote: } +class FilterAction(Enum): + Ignore = 'ignore' + Delete = 'delete' + Obliterate = 'obliterate' + + def eval_config() -> dict: print("configuring") config = {} diff --git a/conf_mia.py b/conf_mia.py index 6496e3b..a32255f 100644 --- a/conf_mia.py +++ b/conf_mia.py @@ -1,18 +1,18 @@ import math from datetime import UTC, datetime, timedelta -from com import FilterableNote, Visibility +from com import FilterableNote, Visibility, FilterAction from sec import connect, tokens user_id = "9gf2ev4ex5dflllo" token = tokens["mia"] -api = "https://void.rehab/api/" +api = "https://void.rehab/api" early_exit = 0xFFF now = datetime.now(UTC) threshold = 0.1 -def criteria(root: FilterableNote) -> bool: +def criteria(root: FilterableNote) -> FilterAction: thread = root.thread() thread_self = root.thread_self() @@ -24,13 +24,13 @@ def criteria(root: FilterableNote) -> bool: # ...and the dms are younger than two months... if now - most_recent_direct.when < timedelta(days=30 * 2): # ...do not delete the thread - return False + return FilterAction.Ignore # get the most recent post... others_recency = max(thread, key=lambda note: note.when) # ...and bail if it's too new if now - others_recency.when < timedelta(days=14): - return False + return FilterAction.Ignore # get my... most_recent_post = max(thread_self, key=lambda note: note.when) # ...most recent post... @@ -43,4 +43,13 @@ def criteria(root: FilterableNote) -> bool: # ...weigh it... weighted_score = high_score / math.sqrt(most_recent_age.days) # ...and check it against a threshold - return weighted_score < threshold + if weighted_score < threshold: + if any(map( + lambda note: note.visibility in [Visibility.public, Visibility.unlisted], + thread_self, + )): + return FilterAction.Obliterate + else: + return FilterAction.Delete + else: + return FilterAction.Ignore diff --git a/conf_pain.py b/conf_pain.py index 85e7095..9690de1 100644 --- a/conf_pain.py +++ b/conf_pain.py @@ -1,14 +1,17 @@ import math from datetime import UTC, datetime, timedelta -from com import FilterableNote +from com import FilterableNote, FilterAction from sec import connect, tokens user_id = "9gszslkcdfnomssj" token = tokens["pain"] -api = "https://void.rehab/api/" +api = "https://void.rehab/api" -def criteria(root: FilterableNote) -> bool: +def criteria(root: FilterableNote) -> (bool, FilterAction): # if it's more than two months old, delete - # return (datetime.now(UTC) - root.when).days > 60 - return (datetime.now(UTC) - root.when).days > (12 * 30) + latest = max(map(lambda note: note.when, root.thread_self())) + if (datetime.now(UTC) - latest).days > 60: + return FilterAction.Obliterate + else: + return FilterAction.Ignore diff --git a/go.sh b/go.sh index 39f3779..169d025 100755 --- a/go.sh +++ b/go.sh @@ -1,13 +1,19 @@ -#!/bin/sh +#!/bin/bash -set -ex +set -e + +if [[ "$1" = "" ]]; then + echo missing name + exit +fi -test -f graph.db && rm graph.db -test -f filtered.list && rm filtered.list test -d out && rm -r out + +set -x python3 1_graph.py conf_$1.py python3 2_filter.py conf_$1.py -# python3 3_archive.py conf_$1.py -# echo uploading to memorial -# rsync -r -e 'ssh -p23' --progress out/ memorial:fediverse/$1/ -# python3 4_delete.py conf_$1.py +python3 3_archive.py conf_$1.py +rsync -r -e 'ssh -p23' --progress out/file/ memorial:fediverse/$1/file/ +rsync -r -e 'ssh -p23' --progress --ignore-existing out/note/ memorial:fediverse/$1/note/ +rsync -r -e 'ssh -p23' --progress out/user/ memorial:fediverse/$1/user/ +python3 4_delete.py conf_$1.py diff --git a/proxy.sh b/proxy.sh index 9628fab..1689ab2 100755 --- a/proxy.sh +++ b/proxy.sh @@ -1,2 +1,2 @@ #!/bin/sh -exec ssh -NL 5432:localhost:5432 vr +exec ssh -NL 54321:localhost:5432 vr