import sys import time from pathlib import Path import httpx import psutil import psycopg from com import FilterAction, eval_config, parse_graph, progressbar config = eval_config() conn: psycopg.Connection = config["connect"]() token: str = config["token"] api: str = config["api"] graph = parse_graph() print("reading filterlist") filtered = Path("filtered.list").read_text().strip().splitlines() filtered = list(map(lambda line: line.split(' '), filtered)) print("building queue") queue = [] def enqueue(note, action): for reply in note["replies"]: enqueue(graph[reply], action) for quote in note["quotes"]: enqueue(graph[quote], action) if "self" in note["flags"]: queue.append((note["id"], action)) for id, action in filtered: enqueue(graph[id], FilterAction(action)) class CustomETA(progressbar.ETA): def __init__(self, *args, **kwargs): self.history = [] self.lastval = None progressbar.ETA.__init__(self, *args, **kwargs) def _calculate_eta(self, progress, data, value, elapsed): if self.lastval != value: self.history = [*self.history[-9:], elapsed.total_seconds()] self.lastval = value per_item = (self.history[-1] - self.history[0]) / len(self.history) remaining = (progress.max_value - value) * per_item spent = elapsed.total_seconds() - self.history[-1] return max(remaining - spent, 0) pb = progressbar.ProgressBar( 0, len(queue), widgets=[ progressbar.Variable("message", format="{formatted_value}"), " ", progressbar.Percentage(), " ", progressbar.Bar(), " ", progressbar.SimpleProgress("%(value_s)s/%(max_value_s)s"), " ", CustomETA(), ], variables={"status": "work"} ) pb.update(0) # force initial display client = httpx.Client(timeout=60) seeking = False last_req = 0 for note, action in queue: # seek through queue # helps prevent rate limits on resumed deletions if seeking: while True: resp = client.post(f"{api}/notes/show", json={ "i": token, "noteId": note, }) if resp.status_code == 502: pb.update(message="down") time.sleep(1) continue break if resp.status_code == 404: pb.increment(message="seeking") continue seeking = False # wait for queue to empty while True: resp = client.post(f"{api}/admin/queue/stats", json={"i": token}) if resp.status_code == 502: pb.update(message="down") time.sleep(1) continue Path('queue-stats.dump').write_text(f"status:{resp.status_code}\nbody:\n{resp.text}") deliver_waiting = resp.json()["deliver"]["waiting"] obliterate_waiting = resp.json()["obliterate"]["waiting"] if deliver_waiting < 100 and obliterate_waiting < 50000: break pb.update(message=f"queue ({deliver_waiting}/{obliterate_waiting})") time.sleep(10) # make sure there's enough memory for new jobs while True: vmem = psutil.virtual_memory() if vmem.available > (512 * 1024 * 1024): break pb.update(message="memory") time.sleep(10) # prevent api rate limiting req_delay = time.time() - last_req if req_delay < 30: pb.update(message="delaying") time.sleep(req_delay) # queue new deletions err = 0 while True: resp = client.post(f"{api}/notes/delete", json={ "i": token, "noteId": note, "obliterate": action == FilterAction.Obliterate, }) if resp.status_code == 429: pb.update(status="limit") time.sleep(1) continue elif resp.status_code == 502: pb.update(status="down") time.sleep(1) continue elif resp.status_code >= 400: body = resp.json() if body["error"]["code"] == "NO_SUCH_NOTE": pb.increment(message="seeking") seeking = True break elif body["error"]["code"] == "QUEUE_FULL": print("\nobliterate queue overflowed, exiting to save server") break err += 1 if err > 3: raise Exception(f"{body['error']['code']}: {body['error']['message']}") sys.stdout.write("\r") print(f"err {body['error']['code']} {body['error']['message']} ") time.sleep(30) pb.increment(message="deleting") last_req = time.time() break pb.finish()