she's goin

This commit is contained in:
mia 2024-10-04 15:43:40 -07:00
parent bb8a48fd4d
commit 7e060e5cf2
9 changed files with 192 additions and 43 deletions

View file

@ -1,5 +1,6 @@
import json import json
import sys import sys
import time
from collections import namedtuple from collections import namedtuple
from functools import cache from functools import cache
from pathlib import Path from pathlib import Path
@ -26,6 +27,7 @@ cur = conn.execute(
[user_id], [user_id],
) )
while rows := cur.fetchmany(0xFF): while rows := cur.fetchmany(0xFF):
time.sleep(0.0001)
for row in rows: for row in rows:
note_ids.add(row[0]) note_ids.add(row[0])
if early_exit and len(note_ids) > early_exit: if early_exit and len(note_ids) > early_exit:
@ -34,6 +36,7 @@ while rows := cur.fetchmany(0xFF):
@cache @cache
def get_note(id: str) -> Note: def get_note(id: str) -> Note:
time.sleep(0.0001)
return Note( return Note(
*conn.execute( *conn.execute(
'select "renoteId", "replyId", "userId" from note where id = %s', [id] 'select "renoteId", "replyId", "userId" from note where id = %s', [id]
@ -102,6 +105,7 @@ def traverse(tree: Tree):
def expand(tree: Tree): def expand(tree: Tree):
time.sleep(0.0001)
for row in conn.execute( for row in conn.execute(
"select id from note_replies(%s, 1, 1000)", [tree.id] "select id from note_replies(%s, 1, 1000)", [tree.id]
).fetchall(): ).fetchall():

View file

@ -1,19 +1,21 @@
import time
from dataclasses import dataclass from dataclasses import dataclass
from pathlib import Path from pathlib import Path
from typing import Callable, List from typing import Callable, List
import psycopg import psycopg
from com import FilterableNote, Visibility, eval_config, parse_graph, progressbar from com import (FilterableNote, Visibility, eval_config, parse_graph,
progressbar, FilterAction)
config = eval_config() config = eval_config()
conn: psycopg.Connection = config["connect"]() conn: psycopg.Connection = config["connect"]()
criteria: Callable[[FilterableNote], bool] = config["criteria"] criteria: Callable[[FilterableNote], FilterAction] = config["criteria"]
intermediate = parse_graph() intermediate = parse_graph()
def transform(entry: dict) -> FilterableNote: def transform(entry: dict) -> FilterableNote:
time.sleep(0.0001)
note = conn.execute( note = conn.execute(
'select "createdAt", reactions, "renoteCount", visibility from note where id = %s', 'select "createdAt", reactions, "renoteCount", visibility from note where id = %s',
[entry["id"]], [entry["id"]],
@ -57,10 +59,10 @@ for entry in intermediate.values():
transformed = transform(entry) transformed = transform(entry)
if transformed is None: if transformed is None:
continue # we'll get to it next cycle continue # we'll get to it next cycle
if criteria(transformed): action = criteria(transformed)
targets.append(entry["id"]) if action != FilterAction.Ignore:
targets.append(f"{entry['id']} {action.value}")
pb.increment() pb.increment()
pb.finish() pb.finish()
Path("filtered.list").write_text("\n".join(targets)) Path("filtered.list").write_text("\n".join(targets))

View file

@ -1,4 +1,5 @@
import json import json
import time
from http.client import HTTPResponse from http.client import HTTPResponse
from pathlib import Path from pathlib import Path
from shutil import copyfileobj from shutil import copyfileobj
@ -16,11 +17,13 @@ conn: psycopg.Connection = config["connect"]()
graph = parse_graph() graph = parse_graph()
print("reading filterlist") print("reading filterlist")
filtered = Path("filtered.list").read_text().strip().splitlines() filtered = Path("filtered.list").read_text().strip().splitlines()
filtered = list(map(lambda line: line.split(' ')[0], filtered))
collected_users = {} collected_users = {}
def collect_user(id: str): def collect_user(id: str):
if id in collected_users: if id in collected_users:
return return
time.sleep(0.001)
user = conn.execute('select username, host, "avatarUrl" from "user" where id = %s', [id]).fetchone() user = conn.execute('select username, host, "avatarUrl" from "user" where id = %s', [id]).fetchone()
if user is None: if user is None:
return None return None
@ -44,10 +47,11 @@ def collect_note(id: str):
output = {} output = {}
output["id"] = id output["id"] = id
note = conn.execute('select text, "userId", "createdAt", "updatedAt", reactions, "renoteCount", visibility, "fileIds" from note where id = %s', [id]).fetchone() time.sleep(0.001)
note = conn.execute('select text, "userId", "createdAt", "updatedAt", reactions, "renoteCount", visibility, "fileIds", cw from note where id = %s', [id]).fetchone()
if note is None: if note is None:
return None return None
text, user_id, created_at, updated_at, reactions, renotes, visibility, file_ids = note text, user_id, created_at, updated_at, reactions, renotes, visibility, file_ids, cw = note
collect_user(user_id) collect_user(user_id)
output["text"] = text output["text"] = text
@ -59,6 +63,7 @@ def collect_note(id: str):
output["reactions"] = reactions output["reactions"] = reactions
output["renotes"] = renotes output["renotes"] = renotes
output["visibility"] = Visibility.from_db(visibility).code() output["visibility"] = Visibility.from_db(visibility).code()
output["cw"] = cw
node = graph[id] node = graph[id]
replies = [collect_note(reply) for reply in node["replies"]] replies = [collect_note(reply) for reply in node["replies"]]
@ -68,6 +73,7 @@ def collect_note(id: str):
output["attachments"] = [] output["attachments"] = []
for file_id in file_ids: for file_id in file_ids:
time.sleep(0.0005)
name, type_, comment, url = conn.execute('select name, type, comment, url from drive_file where id = %s', [file_id]).fetchone() name, type_, comment, url = conn.execute('select name, type, comment, url from drive_file where id = %s', [file_id]).fetchone()
attachment = { attachment = {
"id": file_id, "id": file_id,
@ -117,13 +123,15 @@ pb = progressbar.ProgressBar(
) )
for id, note in collected_notes: for id, note in collected_notes:
outfile = outdir / "note" / f"{id}.mpk.br" outfile = outdir / "note" / id[:3] / f"{id[3:]}.mpk.br"
outfile.parent.mkdir(exist_ok=True)
with outfile.open("wb") as f: with outfile.open("wb") as f:
f.write(brotli.compress(msgpack.dumps(note))) f.write(brotli.compress(msgpack.dumps(note)))
pb.increment() pb.increment()
for id, user in collected_users.items(): for id, user in collected_users.items():
outfile = outdir / "user" / f"{id}.mpk.br" outfile = outdir / "user" / id[:2] / f"{id[2:]}.mpk.br"
outfile.parent.mkdir(exist_ok=True)
with outfile.open("wb") as f: with outfile.open("wb") as f:
f.write(brotli.compress(msgpack.dumps(note))) f.write(brotli.compress(msgpack.dumps(note)))
pb.increment() pb.increment()
@ -134,8 +142,9 @@ pb = progressbar.ProgressBar(
len(files_to_collect), len(files_to_collect),
prefix="downloading attachments ", prefix="downloading attachments ",
) )
for (id, url) in files_to_collect: for (id, url) in files_to_collect:
outfile = outdir / "file" / id outfile = outdir / "file" / id[:2] / id[2:]
outfile.parent.mkdir(exist_ok=True)
response: HTTPResponse = urlopen(url) response: HTTPResponse = urlopen(url)
with outfile.open("wb") as f: with outfile.open("wb") as f:
copyfileobj(response, f) copyfileobj(response, f)

View file

@ -1,9 +1,11 @@
import sys
import time
from pathlib import Path from pathlib import Path
import httpx import httpx
import psycopg import psycopg
from com import eval_config, parse_graph, progressbar from com import eval_config, parse_graph, progressbar, FilterAction
config = eval_config() config = eval_config()
conn: psycopg.Connection = config["connect"]() conn: psycopg.Connection = config["connect"]()
@ -13,21 +15,129 @@ api: str = config["api"]
graph = parse_graph() graph = parse_graph()
print("reading filterlist") print("reading filterlist")
filtered = Path("filtered.list").read_text().strip().splitlines() filtered = Path("filtered.list").read_text().strip().splitlines()
filtered = list(map(lambda line: line.split(' '), filtered))
print("building queue")
queue = [] queue = []
def enqueue(note): def enqueue(note, action):
for reply in note["replies"]: for reply in note["replies"]:
enqueue(graph[reply]) enqueue(graph[reply], action)
for quote in note["quotes"]: for quote in note["quotes"]:
enqueue(graph[quote]) enqueue(graph[quote], action)
if "self" in note["flags"]: if "self" in note["flags"]:
files = conn.execute('select "fileIds" from note where id = %s', [note["id"]]).fetchone()[0] queue.append((note["id"], action))
queue.append((note["id"], files))
for id in filtered: for id, action in filtered:
enqueue(graph[id]) enqueue(graph[id], FilterAction(action))
print(queue) class CustomETA(progressbar.ETA):
def __init__(self, *args, **kwargs):
self.history = []
self.lastval = None
progressbar.ETA.__init__(self, *args, **kwargs)
# client = httpx.Client() def _calculate_eta(self, progress, data, value, elapsed):
if self.lastval != value:
self.history = [*self.history[-9:], elapsed.total_seconds()]
self.lastval = value
per_item = (self.history[-1] - self.history[0]) / len(self.history)
remaining = (progress.max_value - value) * per_item
spent = elapsed.total_seconds() - self.history[-1]
return max(remaining - spent, 0)
pb = progressbar.ProgressBar(
0,
len(queue),
widgets=[
progressbar.Variable("message", format="{formatted_value}"),
" ",
progressbar.Percentage(),
" ",
progressbar.Bar(),
" ",
progressbar.SimpleProgress("%(value_s)s/%(max_value_s)s"),
" ",
CustomETA(),
],
variables={"status": "work"}
)
pb.update(0) # force initial display
client = httpx.Client(timeout=60)
seeking = False
last_req = 0
for note, action in queue:
# seek through queue
# helps prevent rate limits on resumed deletions
if seeking:
while True:
resp = client.post(f"{api}/notes/show", json={
"i": token,
"noteId": note,
})
if resp.status_code == 502:
pb.update(message="down")
time.sleep(1)
continue
break
if resp.status_code == 404:
pb.increment(message="seeking")
continue
seeking = False
# wait for queue to empty
while True:
resp = client.post(f"{api}/admin/queue/stats", json={"i": token})
if resp.status_code == 502:
pb.update(message="down")
time.sleep(1)
continue
deliver_waiting = resp.json()["deliver"]["waiting"]
obliterate_waiting = resp.json()["obliterate"]["waiting"]
obliterate_delayed = resp.json()["obliterate"]["delayed"]
if deliver_waiting < 100 and obliterate_waiting + obliterate_delayed< 50000:
break
pb.update(message=f"queue ({deliver_waiting}/{obliterate_waiting + obliterate_delayed})")
time.sleep(10)
# prevent api rate limiting
req_delay = time.time() - last_req
if req_delay < 15:
pb.update(message="delaying")
time.sleep(req_delay)
# queue new deletions
err = 0
while True:
resp = client.post(f"{api}/notes/delete", json={
"i": token,
"noteId": note,
"obliterate": action == FilterAction.Obliterate,
})
if resp.status_code == 429:
pb.update(status="limit")
time.sleep(1)
continue
elif resp.status_code == 502:
pb.update(status="down")
continue
time.sleep(1)
elif resp.status_code >= 400:
body = resp.json()
if body["error"]["code"] == "NO_SUCH_NOTE":
pb.increment(message="seeking")
seeking = True
break
err += 1
if err > 10:
raise Exception(f"{body['error']['code']}: {body['error']['message']}")
sys.stdout.write("\r")
print(f"err {body['error']['code']} {body['error']['message']} ")
time.sleep(30)
pb.increment(message="deleting")
last_req = time.time()
break
pb.finish()

8
com.py
View file

@ -25,7 +25,7 @@ class Visibility(Enum):
case "followers": return cls.followers case "followers": return cls.followers
case "specified": return cls.direct case "specified": return cls.direct
case _: raise ValueError(f"unknown visibility `{raw}`") case _: raise ValueError(f"unknown visibility `{raw}`")
def code(self) -> str: def code(self) -> str:
match self: match self:
case self.public: return "p" case self.public: return "p"
@ -76,6 +76,12 @@ class FilterableNote:
} }
class FilterAction(Enum):
Ignore = 'ignore'
Delete = 'delete'
Obliterate = 'obliterate'
def eval_config() -> dict: def eval_config() -> dict:
print("configuring") print("configuring")
config = {} config = {}

View file

@ -1,18 +1,18 @@
import math import math
from datetime import UTC, datetime, timedelta from datetime import UTC, datetime, timedelta
from com import FilterableNote, Visibility from com import FilterableNote, Visibility, FilterAction
from sec import connect, tokens from sec import connect, tokens
user_id = "9gf2ev4ex5dflllo" user_id = "9gf2ev4ex5dflllo"
token = tokens["mia"] token = tokens["mia"]
api = "https://void.rehab/api/" api = "https://void.rehab/api"
early_exit = 0xFFF early_exit = 0xFFF
now = datetime.now(UTC) now = datetime.now(UTC)
threshold = 0.1 threshold = 0.1
def criteria(root: FilterableNote) -> bool: def criteria(root: FilterableNote) -> FilterAction:
thread = root.thread() thread = root.thread()
thread_self = root.thread_self() thread_self = root.thread_self()
@ -24,13 +24,13 @@ def criteria(root: FilterableNote) -> bool:
# ...and the dms are younger than two months... # ...and the dms are younger than two months...
if now - most_recent_direct.when < timedelta(days=30 * 2): if now - most_recent_direct.when < timedelta(days=30 * 2):
# ...do not delete the thread # ...do not delete the thread
return False return FilterAction.Ignore
# get the most recent post... # get the most recent post...
others_recency = max(thread, key=lambda note: note.when) others_recency = max(thread, key=lambda note: note.when)
# ...and bail if it's too new # ...and bail if it's too new
if now - others_recency.when < timedelta(days=14): if now - others_recency.when < timedelta(days=14):
return False return FilterAction.Ignore
# get my... # get my...
most_recent_post = max(thread_self, key=lambda note: note.when) # ...most recent post... most_recent_post = max(thread_self, key=lambda note: note.when) # ...most recent post...
@ -43,4 +43,13 @@ def criteria(root: FilterableNote) -> bool:
# ...weigh it... # ...weigh it...
weighted_score = high_score / math.sqrt(most_recent_age.days) weighted_score = high_score / math.sqrt(most_recent_age.days)
# ...and check it against a threshold # ...and check it against a threshold
return weighted_score < threshold if weighted_score < threshold:
if any(map(
lambda note: note.visibility in [Visibility.public, Visibility.unlisted],
thread_self,
)):
return FilterAction.Obliterate
else:
return FilterAction.Delete
else:
return FilterAction.Ignore

View file

@ -1,14 +1,17 @@
import math import math
from datetime import UTC, datetime, timedelta from datetime import UTC, datetime, timedelta
from com import FilterableNote from com import FilterableNote, FilterAction
from sec import connect, tokens from sec import connect, tokens
user_id = "9gszslkcdfnomssj" user_id = "9gszslkcdfnomssj"
token = tokens["pain"] token = tokens["pain"]
api = "https://void.rehab/api/" api = "https://void.rehab/api"
def criteria(root: FilterableNote) -> bool: def criteria(root: FilterableNote) -> (bool, FilterAction):
# if it's more than two months old, delete # if it's more than two months old, delete
# return (datetime.now(UTC) - root.when).days > 60 latest = max(map(lambda note: note.when, root.thread_self()))
return (datetime.now(UTC) - root.when).days > (12 * 30) if (datetime.now(UTC) - latest).days > 60:
return FilterAction.Obliterate
else:
return FilterAction.Ignore

22
go.sh
View file

@ -1,13 +1,19 @@
#!/bin/sh #!/bin/bash
set -ex set -e
if [[ "$1" = "" ]]; then
echo missing name
exit
fi
test -f graph.db && rm graph.db
test -f filtered.list && rm filtered.list
test -d out && rm -r out test -d out && rm -r out
set -x
python3 1_graph.py conf_$1.py python3 1_graph.py conf_$1.py
python3 2_filter.py conf_$1.py python3 2_filter.py conf_$1.py
# python3 3_archive.py conf_$1.py python3 3_archive.py conf_$1.py
# echo uploading to memorial rsync -r -e 'ssh -p23' --progress out/file/ memorial:fediverse/$1/file/
# rsync -r -e 'ssh -p23' --progress out/ memorial:fediverse/$1/ rsync -r -e 'ssh -p23' --progress --ignore-existing out/note/ memorial:fediverse/$1/note/
# python3 4_delete.py conf_$1.py rsync -r -e 'ssh -p23' --progress out/user/ memorial:fediverse/$1/user/
python3 4_delete.py conf_$1.py

View file

@ -1,2 +1,2 @@
#!/bin/sh #!/bin/sh
exec ssh -NL 5432:localhost:5432 vr exec ssh -NL 54321:localhost:5432 vr