pleroma-ebooks/functions.py
2021-06-16 03:49:34 +00:00

135 lines
4.4 KiB
Python
Executable file

# SPDX-License-Identifier: EUPL-1.2
import re
import os
import html
import json
import shutil
import sqlite3
import argparse
import markovify
import multiprocessing
import pytomlpp as toml
from random import randint
from bs4 import BeautifulSoup
def arg_parser_factory(*, description):
parser = argparse.ArgumentParser(description=description)
parser.add_argument(
'-c', '--cfg', dest='cfg', default='config.toml', nargs='?',
help='Specify a custom location for the config file.'
)
return parser
def parse_args(*, description):
return arg_parser_factory(description=description).parse_args()
def load_config(cfg_path):
# TOML doesn't support null here so we have to use JSON 😒
with open('config.defaults.json') as f:
cfg = json.load(f)
with open(cfg_path) as f:
cfg.update(toml.load(f))
if not cfg['site'].startswith('https://') and not cfg['site'].startswith('http://'):
print("Site must begin with 'https://' or 'http://'. Value '{0}' is invalid - try 'https://{0}' instead.".format(cfg['site']), file=sys.stderr)
sys.exit(1)
if 'access_token' not in cfg:
print('No authentication info', file=sys.stderr)
print('Get a client id, client secret, and access token here: https://tinysubversions.com/notes/mastodon-bot/', file=sys.stderr)
print('Then put `access_token` in your config file.', file=sys.stderr)
sys.exit(1)
return cfg
def make_sentence(output, cfg):
class nlt_fixed(markovify.NewlineText): # modified version of NewlineText that never rejects sentences
def test_sentence_input(self, sentence):
return True # all sentences are valid <3
shutil.copyfile("toots.db", "toots-copy.db") # create a copy of the database because reply.py will be using the main one
db = sqlite3.connect("toots-copy.db")
db.text_factory = str
c = db.cursor()
if cfg['learn_from_cw']:
ignored_cws_query_params = "(" + ",".join("?" * len(cfg["ignored_cws"])) + ")"
toots = c.execute(f"SELECT content FROM `toots` WHERE cw IS NULL OR CW NOT IN {ignored_cws_query_params} ORDER BY RANDOM() LIMIT 10000", cfg["ignored_cws"]).fetchall()
else:
toots = c.execute("SELECT content FROM `toots` WHERE cw IS NULL ORDER BY RANDOM() LIMIT 10000").fetchall()
if len(toots) == 0:
output.send("Database is empty! Try running main.py.")
return
nlt = markovify.NewlineText if cfg['overlap_ratio_enabled'] else nlt_fixed
model = nlt(
"\n".join([toot[0] for toot in toots])
)
db.close()
os.remove("toots-copy.db")
if cfg['limit_length']:
sentence_len = randint(cfg['length_lower_limit'], cfg['length_upper_limit'])
sentence = None
tries = 0
while sentence is None and tries < 10:
sentence = model.make_short_sentence(
max_chars=500,
tries=10000,
max_overlap_ratio=cfg['overlap_ratio'] if cfg['overlap_ratio_enabled'] else 0.7,
max_words=sentence_len if cfg['limit_length'] else None
)
tries = tries + 1
# optionally remove mentions
if cfg['mention_handling'] == 1:
sentence = re.sub(r"^\S*@\u200B\S*\s?", "", sentence)
elif cfg['mention_handling'] == 0:
sentence = re.sub(r"\S*@\u200B\S*\s?", "", sentence)
output.send(sentence)
def make_toot(cfg):
toot = None
pin, pout = multiprocessing.Pipe(False)
p = multiprocessing.Process(target=make_sentence, args=[pout, cfg])
p.start()
p.join(5) # wait 5 seconds to get something
if p.is_alive(): # if it's still trying to make a toot after 5 seconds
p.terminate()
p.join()
else:
toot = pin.recv()
if toot is None:
toot = 'Toot generation failed! Contact io@csdisaster.club for assistance.'
return toot
def extract_toot(toot):
toot = html.unescape(toot) # convert HTML escape codes to text
soup = BeautifulSoup(toot, "html.parser")
for lb in soup.select("br"): # replace <br> with linebreak
lb.name = "\n"
for p in soup.select("p"): # ditto for <p>
p.name = "\n"
for ht in soup.select("a.hashtag"): # convert hashtags from links to text
ht.unwrap()
for link in soup.select("a"): # convert <a href='https://example.com>example.com</a> to just https://example.com
if 'href' in link:
# apparently not all a tags have a href, which is understandable if you're doing normal web stuff, but on a social media platform??
link.replace_with(link["href"])
text = soup.get_text()
text = re.sub(r"https://([^/]+)/(@[^\s]+)", r"\2@\1", text) # put mastodon-style mentions back in
text = re.sub(r"https://([^/]+)/users/([^\s/]+)", r"@\2@\1", text) # put pleroma-style mentions back in
text = text.rstrip("\n") # remove trailing newline(s)
return text