230 lines
7.4 KiB
Python
230 lines
7.4 KiB
Python
"""AFPy's bot to tweet about PyConFr."""
|
||
|
||
import argparse
|
||
import logging
|
||
import re
|
||
import sqlite3
|
||
import locale
|
||
from datetime import datetime, timedelta, timezone
|
||
from time import sleep
|
||
from pathlib import Path
|
||
from zoneinfo import ZoneInfo
|
||
from contextlib import suppress
|
||
|
||
from mastodon import Mastodon
|
||
from jinja2 import Environment, FileSystemLoader
|
||
from html2image import Html2Image
|
||
|
||
|
||
def generate_file(output_path, title, author, type, date, salle):
|
||
environment = Environment(loader=FileSystemLoader("template/"))
|
||
template = environment.get_template("template.html")
|
||
|
||
rend = template.render(
|
||
title=title, author=author, type=type, date=date, salle=salle
|
||
)
|
||
|
||
hti = Html2Image()
|
||
hti.load_file("template/illustration_1.png")
|
||
hti.load_file("template/logo_pyconfr_1.png")
|
||
hti.screenshot(
|
||
html_str=rend,
|
||
css_file="template/styles.css",
|
||
save_as=output_path,
|
||
size=(1200, 675),
|
||
)
|
||
|
||
|
||
def convert_datetime(val):
|
||
"""Convert ISO 8601 datetime to datetime.datetime object."""
|
||
return datetime.fromisoformat(val.decode() + "+00:00")
|
||
|
||
|
||
sqlite3.register_converter("datetime", convert_datetime)
|
||
|
||
|
||
def to_twitter_handle(twitter):
|
||
"""Convert a twitter URL to a twitter handle."""
|
||
username = re.sub("https://(www.)?twitter.com/", "", twitter, flags=re.I).lstrip(
|
||
"@"
|
||
)
|
||
return f"@{username}@twitter.com"
|
||
|
||
|
||
def to_mastodon_handle(mastodon):
|
||
"""Convert a mastodon URL to a mastodon handle."""
|
||
if match := re.match("https?://([^/]+)/@([^/]*)/?$", mastodon):
|
||
return f"@{match[2]}@{match[1]}"
|
||
return mastodon
|
||
|
||
|
||
class Talk:
|
||
def __init__(self, cursor, row):
|
||
fields = [column[0] for column in cursor.description]
|
||
for key, value in zip(fields, row):
|
||
setattr(self, key, value)
|
||
self.participants = self.participant.split("\x00")
|
||
self.twitters = self.twitter.split("\x00")
|
||
self.mastodons = self.mastodon.split("\x00")
|
||
|
||
@property
|
||
def author(self):
|
||
"""If there's multiple authors, mention them all, regardless how."""
|
||
participants = []
|
||
for participant, twitter, mastodon in zip(
|
||
self.participants, self.twitters, self.mastodons
|
||
):
|
||
if mastodon:
|
||
participants.append(to_mastodon_handle(mastodon))
|
||
elif twitter:
|
||
participants.append(to_twitter_handle(twitter))
|
||
else:
|
||
participants.append(participant)
|
||
return ", ".join(participants)
|
||
|
||
|
||
def get_talks(django_site_id=4):
|
||
con = sqlite3.connect("db.sqlite3", detect_types=sqlite3.PARSE_DECLTYPES)
|
||
con.row_factory = Talk
|
||
with con:
|
||
res = con.execute(
|
||
f"""
|
||
SELECT cfp_talk.id,
|
||
cfp_talk.start_date,
|
||
cfp_room.name room,
|
||
GROUP_CONCAT(cfp_participant.name, x'00') participant,
|
||
GROUP_CONCAT(cfp_participant.mastodon, x'00') mastodon,
|
||
GROUP_CONCAT(cfp_participant.twitter, x'00') twitter,
|
||
cfp_talk.title,
|
||
cfp_talkcategory.name category
|
||
FROM cfp_talk
|
||
JOIN cfp_room ON cfp_room.id = cfp_talk.room_id
|
||
JOIN cfp_talk_speakers ON cfp_talk_speakers.talk_id = cfp_talk.id
|
||
JOIN cfp_participant ON cfp_participant.id = cfp_talk_speakers.participant_id
|
||
JOIN cfp_talkcategory ON cfp_talkcategory.id = cfp_talk.category_id
|
||
WHERE cfp_talk.site_id={django_site_id}
|
||
AND cfp_talk.accepted = 1
|
||
AND cfp_talk.confirmed = 1
|
||
GROUP BY cfp_talk.id
|
||
ORDER BY cfp_talk.start_date
|
||
"""
|
||
)
|
||
return res.fetchall()
|
||
|
||
|
||
def parse_args():
|
||
parser = argparse.ArgumentParser(description=__doc__)
|
||
parser.add_argument("--django-site-id", type=int, default=4)
|
||
parser.add_argument("client_id")
|
||
parser.add_argument("client_secret")
|
||
parser.add_argument("access_token")
|
||
parser.add_argument(
|
||
"--dry-run", action="store_true", help="Do not actually toot, just print."
|
||
)
|
||
parser.add_argument(
|
||
"--ahead-days",
|
||
type=int,
|
||
default=0,
|
||
help="How many days before the talk we should pouet about it.",
|
||
)
|
||
return parser.parse_args()
|
||
|
||
|
||
salle2livelink = {
|
||
"Rosalind Franklin" : "https://indymotion.fr/w/dm3DZdr1FQGQJjyvfm419M",
|
||
"Charles Darwin" : "https://indymotion.fr/w/p7yGkZtpHeatdAaxwktweC",
|
||
"Thomas Edison": "https://indymotion.fr/w/fBgSz6w6qPuseoN6p2VDA2",
|
||
"Alfred Wegener": "https://indymotion.fr/w/kNpt3HaqJZBctY3v2AZGR2",
|
||
"Henri Poincaré": "https://indymotion.fr/w/bzkxJBMBcz2ucxXV8N5U1a"
|
||
}
|
||
|
||
def toot_for(talk, ahead_days=0):
|
||
"""Redact a toot for a given talk."""
|
||
if ahead_days:
|
||
msg = f"Dans {ahead_days} jours exactement, salle {talk.room} : « {talk.title} » présenté par {talk.author}"
|
||
else:
|
||
msg = f"Salle {talk.room} : « {talk.title} » présenté par {talk.author}, c'est maintenant !"
|
||
|
||
msg += f"\n\nRetrouvez la conférence en direct sur {salle2livelink[talk.room]}"
|
||
return msg
|
||
|
||
|
||
|
||
|
||
def live_toot(talks, mastodon: Mastodon, ahead_days=0):
|
||
for talk in talks:
|
||
if talk.category == "Sprint" or talk.category.startswith("Atelier"):
|
||
continue # We don't toot sprints.
|
||
if talk.room not in salle2livelink.keys():
|
||
print("Missing room live", talk.room)
|
||
continue
|
||
toot = toot_for(talk, ahead_days)
|
||
png_name = f"talk-{talk.id}.png"
|
||
with suppress(FileNotFoundError):
|
||
Path(png_name).unlink()
|
||
generate_file(
|
||
output_path=png_name,
|
||
title=talk.title,
|
||
author=", ".join(talk.participants),
|
||
type=talk.category.split()[0],
|
||
date=talk.start_date.astimezone(ZoneInfo("Europe/Paris")).strftime(
|
||
"%A %d %B %HH%M"
|
||
),
|
||
salle="Salle " + talk.room.split("/")[0],
|
||
)
|
||
delta = (
|
||
talk.start_date - datetime.now(timezone.utc) - timedelta(days=ahead_days)
|
||
)
|
||
if delta.total_seconds() < -300:
|
||
logging.info(
|
||
"Skipping %s with image %s (already happened).", toot, png_name
|
||
)
|
||
continue
|
||
if delta.total_seconds() > 10:
|
||
logging.info("Waiting %s for %s to start.", delta, talk.title)
|
||
sleep(delta.total_seconds() - 2) # Wait for the talk to start.
|
||
sleep(2) # This is just a fool guard so we can Ctrl-C easily anytime.
|
||
media = mastodon.media_post(png_name)
|
||
mastodon.status_post(toot, media_ids=[media])
|
||
|
||
|
||
class DryRunMastodon:
|
||
"""For the --dry-run option."""
|
||
|
||
def __init__(*args, **kwargs):
|
||
pass
|
||
|
||
def account_verify_credentials(self):
|
||
return {"username": "(Dry run mode)"}
|
||
|
||
def toot(self, message):
|
||
print(message)
|
||
|
||
def media_post(self, *args, **kwargs):
|
||
print("Would upload media to Mastodon.")
|
||
|
||
def status_post(self, *args, **kwargs):
|
||
print("Would tooting", args, kwargs)
|
||
|
||
|
||
def main():
|
||
args = parse_args()
|
||
locale.setlocale(locale.LC_TIME, "fr_FR.UTF-8")
|
||
logging.basicConfig(level=logging.INFO)
|
||
talks = get_talks(args.django_site_id)
|
||
mastodon_instance = DryRunMastodon if args.dry_run else Mastodon
|
||
mastodon = mastodon_instance(
|
||
api_base_url="https://mamot.fr",
|
||
client_id=args.client_id,
|
||
client_secret=args.client_secret,
|
||
access_token=args.access_token,
|
||
)
|
||
logging.info(
|
||
"Mastodon connected as %s", mastodon.account_verify_credentials()["username"]
|
||
)
|
||
live_toot(talks, mastodon, args.ahead_days)
|
||
|
||
|
||
if __name__ == "__main__":
|
||
main()
|