scrubber/4_delete.py
2024-11-20 06:52:59 -08:00

155 lines
4.6 KiB
Python

import sys
import time
from pathlib import Path
import httpx
import psutil
import psycopg
from com import FilterAction, eval_config, parse_graph, progressbar
config = eval_config()
conn: psycopg.Connection = config["connect"]()
token: str = config["token"]
api: str = config["api"]
graph = parse_graph()
print("reading filterlist")
filtered = Path("filtered.list").read_text().strip().splitlines()
filtered = list(map(lambda line: line.split(' '), filtered))
print("building queue")
queue = []
def enqueue(note, action):
for reply in note["replies"]:
enqueue(graph[reply], action)
for quote in note["quotes"]:
enqueue(graph[quote], action)
if "self" in note["flags"]:
queue.append((note["id"], action))
for id, action in filtered:
enqueue(graph[id], FilterAction(action))
class CustomETA(progressbar.ETA):
def __init__(self, *args, **kwargs):
self.history = []
self.lastval = None
progressbar.ETA.__init__(self, *args, **kwargs)
def _calculate_eta(self, progress, data, value, elapsed):
if self.lastval != value:
self.history = [*self.history[-9:], elapsed.total_seconds()]
self.lastval = value
per_item = (self.history[-1] - self.history[0]) / len(self.history)
remaining = (progress.max_value - value) * per_item
spent = elapsed.total_seconds() - self.history[-1]
return max(remaining - spent, 0)
pb = progressbar.ProgressBar(
0,
len(queue),
widgets=[
progressbar.Variable("message", format="{formatted_value}"),
" ",
progressbar.Percentage(),
" ",
progressbar.Bar(),
" ",
progressbar.SimpleProgress("%(value_s)s/%(max_value_s)s"),
" ",
CustomETA(),
],
variables={"status": "work"}
)
pb.update(0) # force initial display
client = httpx.Client(timeout=60)
seeking = False
last_req = 0
for note, action in queue:
# seek through queue
# helps prevent rate limits on resumed deletions
if seeking:
while True:
resp = client.post(f"{api}/notes/show", json={
"i": token,
"noteId": note,
})
if resp.status_code == 502:
pb.update(message="down")
time.sleep(1)
continue
break
if resp.status_code == 404:
pb.increment(message="seeking")
continue
seeking = False
# wait for queue to empty
while True:
resp = client.post(f"{api}/admin/queue/stats", json={"i": token})
if resp.status_code == 502:
pb.update(message="down")
time.sleep(1)
continue
Path('queue-stats.dump').write_text(f"status:{resp.status_code}\nbody:\n{resp.text}")
deliver_waiting = resp.json()["deliver"]["waiting"]
obliterate_waiting = resp.json()["obliterate"]["waiting"]
if deliver_waiting < 100 and obliterate_waiting < 50000:
break
pb.update(message=f"queue ({deliver_waiting}/{obliterate_waiting})")
time.sleep(10)
# make sure there's enough memory for new jobs
while True:
vmem = psutil.virtual_memory()
if vmem.available > (512 * 1024 * 1024):
break
pb.update(message="memory")
time.sleep(10)
# prevent api rate limiting
req_delay = time.time() - last_req
if req_delay < 30:
pb.update(message="delaying")
time.sleep(req_delay)
# queue new deletions
err = 0
while True:
resp = client.post(f"{api}/notes/delete", json={
"i": token,
"noteId": note,
"obliterate": action == FilterAction.Obliterate,
})
if resp.status_code == 429:
pb.update(status="limit")
time.sleep(1)
continue
elif resp.status_code == 502:
pb.update(status="down")
time.sleep(1)
continue
elif resp.status_code >= 400:
body = resp.json()
if body["error"]["code"] == "NO_SUCH_NOTE":
pb.increment(message="seeking")
seeking = True
break
elif body["error"]["code"] == "QUEUE_FULL":
print("\nobliterate queue overflowed, exiting to save server")
break
err += 1
if err > 3:
raise Exception(f"{body['error']['code']}: {body['error']['message']}")
sys.stdout.write("\r")
print(f"err {body['error']['code']} {body['error']['message']} ")
time.sleep(30)
pb.increment(message="deleting")
last_req = time.time()
break
pb.finish()