helloasso-to-discourse/helloasso_to_discourse.py

249 lines
8.5 KiB
Python

import argparse
import datetime as dt
import json
import re
import sys
from itertools import count, groupby
from pathlib import Path
from typing import Any, Dict, Set
from urllib.parse import urljoin
import requests
from helloasso_api import HaApiV5
from tabulate import tabulate
def parse_args():
parser = argparse.ArgumentParser(description="Hello backup to Discourse badge")
subparsers = parser.add_subparsers(help="Choose a command")
fetch_parser = subparsers.add_parser(
"fetch", help="Fetch HelloAsso data from the HelloAsso API to a file."
)
fetch_parser.add_argument("client_id")
fetch_parser.add_argument("client_secret")
fetch_parser.add_argument("org")
fetch_parser.set_defaults(func=main_fetch)
sync_parser = subparsers.add_parser(
"sync", help="Sync the backup file to a given Discourse instance"
)
sync_parser.add_argument("discourse_url")
sync_parser.add_argument("discourse_api_key")
sync_parser.add_argument("org")
sync_parser.add_argument(
"form_slug",
help="See the `list-forms` subcommand to learn which one you can use.",
)
sync_parser.add_argument("badge_slug")
sync_parser.set_defaults(func=main_sync)
list_form_parser = subparsers.add_parser(
"list-forms", help="List HelloAsso forms, to use with `sync`"
)
list_form_parser.set_defaults(func=main_list_form)
list_form_parser.add_argument("org")
list_badges_parser = subparsers.add_parser(
"list-badges", help="List Discourse badges, to use with `sync`"
)
list_badges_parser.set_defaults(func=main_list_badges)
list_badges_parser.add_argument("discourse_url")
list_badges_parser.add_argument("discourse_api_key")
args = parser.parse_args()
if not hasattr(args, "func"):
parser.print_help()
sys.exit(1)
return args
class Discourse:
def __init__(self, url, api_key):
self.url = url
self.api_key = api_key
self.session = None
self._email_to_user_map = None
def __enter__(self):
self.session = requests.Session()
self.session.headers.update({"Api-Key": self.api_key, "Api-Username": "system"})
return self
def __exit__(self, exc_type, exc_value, traceback):
self.session.close()
def post(self, url, data=None, json=None, **kwargs):
response = self.session.post(urljoin(self.url, url), data, json, **kwargs)
response.raise_for_status()
return response.json()
def get(self, url, **kwargs):
response = self.session.get(urljoin(self.url, url), **kwargs)
response.raise_for_status()
return response.json()
def get_badge(self, badge_id):
return self.get(f"/user_badges.json?badge_id={badge_id}")
def get_badges(self):
badges_info = self.get(f"/badges.json")
badge_types = {
badge_type["id"]: badge_type for badge_type in badges_info["badge_types"]
}
badge_groups = {
badge_group["id"]: badge_group
for badge_group in badges_info["badge_groupings"]
}
badges = badges_info["badges"]
for badge in badges:
badge["type"] = badge_types[badge["badge_type_id"]]
badge["group"] = badge_groups[badge["badge_grouping_id"]]
return {badge["slug"]: badge for badge in badges}
def users(self, flag="active"):
all_users = []
for page in count(1):
users = self.get(
f"/admin/users/list/{flag}.json",
params={"page": page, "show_emails": "true"},
)
if not users:
break
all_users.extend(users)
return all_users
@property
def email_to_user_map(self) -> Dict[str, Dict[str, Any]]:
if self._email_to_user_map:
return self._email_to_user_map
email_to_user = {}
for user in self.users():
email_to_user[remove_email_tag(user["email"])] = user
for secondary_email in user["secondary_emails"]:
email_to_user[remove_email_tag(secondary_email)] = user
self._email_to_user_map = email_to_user
return email_to_user
def assign_badge(self, badge_id, username):
return self.post(
"/user_badges", data={"badge_id": badge_id, "username": username}
)
def remove_email_tag(email):
return re.sub(r"\+.*@", "@", email)
def main():
args = parse_args()
return args.func(args)
def main_list_form(args):
mtime = dt.datetime.fromtimestamp(Path(args.org).stat().st_mtime)
if dt.datetime.now() - mtime > dt.timedelta(days=100):
print("\nBeware, local copy of Helloasso data is outdated, maybe run:")
print(
""" helloasso-to-discourse fetch "$(pass helloasso-clientid)" """
f""""$(pass helloasso-clientsecret)" {args.org}\n"""
)
helloasso_backup = json.loads(Path(args.org).read_text(encoding="UTF-8"))
forms = [
(item["order"]["formType"], item["order"]["formSlug"])
for item in helloasso_backup
if item["state"] == "Processed" and "payer" in item and "email" in item["payer"]
]
table = [key + (len(list(group)),) for key, group in groupby(sorted(forms))]
print(
"Here are the available HelloAsso forms you can you with the `sync` command ",
"to link a form to a badge:\n",
sep="\n",
)
print(tabulate(table, headers=("Type", "Name", "Members")))
print()
print("Use the `name` for the `sync` command, like:")
print(
f'helloasso-to-discourse sync https://discuss.afpy.org "$(pass discuss.afpy.org-api-key)" afpy form-slug badge-slug'
)
def main_list_badges(args):
discourse = Discourse(args.discourse_url, args.discourse_api_key)
table = []
with discourse:
for slug, badge in discourse.get_badges().items():
table.append(
(
badge["group"]["name"],
badge["type"]["name"],
slug,
badge["grant_count"],
)
)
table.sort()
print(tabulate(table, headers=("Group", "Type", "Slug", "Grant count")))
print()
print("Use the tag `slug` for the `sync` command, like:")
print(
f'helloasso-to-discourse sync https://discuss.afpy.org "$(pass discuss.afpy.org-api-key)" ./afpy form-slug badge-slug'
)
def main_sync(args):
helloasso_backup = json.loads(Path(args.org).read_text(encoding="UTF-8"))
discourse = Discourse(args.discourse_url, args.discourse_api_key)
with discourse:
from_helloasso = {
remove_email_tag(item["payer"]["email"])
for item in helloasso_backup
if item["order"]["formSlug"] == args.form_slug
and item["state"] == "Processed"
and "payer" in item
and "email" in item["payer"]
}
print(f"Found {len(from_helloasso)} emails in HelloAsso")
badges = discourse.get_badges()
badge = badges[args.badge_slug]
badge_users = discourse.get_badge(badges[args.badge_slug]["id"])["users"]
already_assigned = {user["username"] for user in badge_users}
print(f"Found {len(discourse.email_to_user_map)} emails in Discourse")
common_emails = set(discourse.email_to_user_map) & from_helloasso
print(f"Found {len(common_emails)} in common")
already_assigned_count = 0
for email in common_emails:
discourse_user = discourse.email_to_user_map[email]
if discourse_user["username"] in already_assigned:
already_assigned_count += 1
continue
print(f"Assigning {badge['name']!r} to {discourse_user['username']!r}")
discourse.assign_badge(badge["id"], discourse_user["username"])
print(
f"{already_assigned_count} Discourse users already have the badge {badge['name']!r}"
)
def main_fetch(args):
api = HaApiV5(
api_base="api.helloasso.com",
client_id=args.client_id,
client_secret=args.client_secret,
timeout=60,
)
backup = []
endpoint = f"/v5/organizations/{args.org}/items"
params = {"pageSize": 100}
items = api.call(endpoint, params=params).json()
while items["data"]:
backup.extend(items["data"])
params["continuationToken"] = items["pagination"]["continuationToken"]
items = api.call(endpoint, params=params).json()
Path(args.org).write_text(
json.dumps(backup, indent=4, ensure_ascii=False), encoding="UTF-8"
)
if __name__ == "__main__":
main()