helloasso-to-discourse/helloasso_to_discourse.py
2023-03-18 00:47:31 +01:00

192 lines
5.9 KiB
Python

import argparse
import re
from itertools import count, groupby
import json
from pathlib import Path
from urllib.parse import urljoin
from time import sleep
from tabulate import tabulate
import requests
from helloasso_api import HaApiV5
def parse_args():
parser = argparse.ArgumentParser(description="Hello backup to Discourse badge")
subparsers = parser.add_subparsers(help="Choose a command")
fetch_parser = subparsers.add_parser(
"fetch", help="Fetch HelloAsso data from the HelloAsso API to a file."
)
fetch_parser.add_argument("client_id")
fetch_parser.add_argument("client_secret")
fetch_parser.add_argument("org")
fetch_parser.set_defaults(func=main_fetch)
sync_parser = subparsers.add_parser(
"sync", help="Sync the backup file to a given Discourse instance"
)
sync_parser.add_argument("discourse_url")
sync_parser.add_argument("discourse_api_key")
sync_parser.add_argument("helloasso_backup_file")
sync_parser.add_argument(
"form_slug",
help="See the `list-forms` subcommand to learn which one you can use.",
)
sync_parser.add_argument("badge", type=int)
sync_parser.set_defaults(func=main_sync)
list_form_parser = subparsers.add_parser(
"list-forms", help="List HelloAsso forms, to use with `sync`"
)
list_form_parser.set_defaults(func=main_list_form)
list_form_parser.add_argument("helloasso_backup_file")
return parser.parse_args()
class Discourse:
def __init__(self, url, api_key):
self.url = url
self.api_key = api_key
self.session = None
def __enter__(self):
self.session = requests.Session()
self.session.headers.update({"Api-Key": self.api_key})
return self
def __exit__(self, exc_type, exc_value, traceback):
self.session.close()
def post(self, url, data=None, json=None, **kwargs):
response = self.session.post(urljoin(self.url, url), data, json, **kwargs)
response.raise_for_status()
return response.json()
def get(self, url, **kwargs):
response = self.session.get(urljoin(self.url, url), **kwargs)
response.raise_for_status()
return response.json()
def users(self, flag="active"):
all_users = []
for page in count(1):
users = self.get(
f"/admin/users/list/{flag}.json",
params={"page": page, "show_emails": "true"},
)
if not users:
break
all_users.extend(users)
return all_users
def user_by_email(self, email):
users = self.get(
"/admin/users/list/active.json",
params={"filter": email, "show_emails": "true"},
)
for user in users:
if user["email"] == email:
return user
# Maybe, like me, he has a tag in its email address...
users = self.get(
"/admin/users/list/active.json",
params={
"show_emails": "true",
"filter": re.sub("@.*$", "", remove_email_tag(email)),
},
)
for user in users:
if remove_email_tag(user["email"]) == remove_email_tag(email):
return user
def assign_badge(self, badge_id, username):
return self.post(
"/user_badges", data={"badge_id": badge_id, "username": username}
)
def remove_email_tag(email):
return re.sub(r"\+.*@", "@", email)
def main():
args = parse_args()
return args.func(args)
def main_list_form(args):
helloasso_backup = json.loads(
Path(args.helloasso_backup_file).read_text(encoding="UTF-8")
)
forms = [
(item["order"]["formType"], item["order"]["formSlug"])
for item in helloasso_backup
if item["state"] == "Processed" and "payer" in item and "email" in item["payer"]
]
table = [key + (len(list(group)),) for key, group in groupby(sorted(forms))]
print(
"Here are the available HelloAsso forms you can you with the `sync` command ",
"to link a form to a badge:\n",
sep="\n",
)
print(tabulate(table, headers=("Type", "Name", "Members")))
print()
if table:
print("Use the `name` for the `sync` command, like:")
print(
f'helloasso-to-discourse sync https://discuss.afpy.org "$(pass discuss.afpy.org-api-key)" ./afpy {table[0][1]} 114'
)
def main_sync(args):
helloasso_backup = json.loads(
Path(args.helloasso_backup_file).read_text(encoding="UTF-8")
)
discourse = Discourse(args.discourse_url, args.discourse_api_key)
with discourse:
from_helloasso = {
remove_email_tag(item["payer"]["email"])
for item in helloasso_backup
if item["order"]["formSlug"] == args.form_slug
and item["state"] == "Processed"
and "payer" in item
and "email" in item["payer"]
}
discourse_users_by_email = {
remove_email_tag(user["email"]): user for user in discourse.users()
}
for email in set(discourse_users_by_email) & from_helloasso:
print(
discourse.assign_badge(
args.badge, discourse_users_by_email[email]["username"]
)
)
def main_fetch(args):
api = HaApiV5(
api_base="api.helloasso.com",
client_id=args.client_id,
client_secret=args.client_secret,
timeout=60,
)
backup = []
endpoint = f"/v5/organizations/{args.org}/items"
params = {"pageSize": 100}
items = api.call(endpoint, params=params).json()
while items["data"]:
backup.extend(items["data"])
params["continuationToken"] = items["pagination"]["continuationToken"]
items = api.call(endpoint, params=params).json()
Path(args.org).write_text(
json.dumps(backup, indent=4, ensure_ascii=False), encoding="UTF-8"
)
if __name__ == "__main__":
main()