forked from AFPy/afpy.org
140 lines
5.4 KiB
Python
140 lines
5.4 KiB
Python
# coding: utf-8
|
||
import os
|
||
import shutil
|
||
from pathlib import Path
|
||
from xml.etree import ElementTree
|
||
|
||
from dateutil.parser import parse
|
||
from html2text import html2text
|
||
|
||
from afpy.models.AdminUser import AdminUser
|
||
from afpy.models.JobPost import JobPost
|
||
from afpy.models.NewsEntry import NewsEntry
|
||
from afpy.models.Slug import Slug
|
||
|
||
PAGINATION = 12
|
||
|
||
CATEGORY_ACTUALITIES = "actualites"
|
||
CATEGORY_JOBS = "emplois"
|
||
|
||
CATEGORIES = {CATEGORY_ACTUALITIES: "Actualités", CATEGORY_JOBS: "Offres d’emploi"}
|
||
|
||
STATE_WAITING = "waiting"
|
||
STATE_PUBLISHED = "published"
|
||
STATE_TRASHED = "trashed"
|
||
STATES = {STATE_WAITING: "En attente", STATE_PUBLISHED: "Publié", STATE_TRASHED: "Supprimé"}
|
||
|
||
FIELD_IMAGE = "_image"
|
||
FIELD_TIMESTAMP = "_timestamp"
|
||
FIELD_STATE = "_state"
|
||
FIELD_PATH = "_path"
|
||
FIELD_DIR = "_dir"
|
||
|
||
BASE_DIR = "posts"
|
||
BASE_FILE = "post.xml"
|
||
BASE_IMAGE = "post.jpg"
|
||
|
||
ROOT_DIR = Path(__file__).parent
|
||
POSTS_DIR = ROOT_DIR / BASE_DIR
|
||
IMAGE_DIR = ROOT_DIR / "images"
|
||
IMAGE_DIR.mkdir(exist_ok=True)
|
||
|
||
|
||
def get_posts(category, state=STATE_PUBLISHED, page=None, end=None):
|
||
start = 0
|
||
if page and not end:
|
||
end = page * PAGINATION
|
||
start = end - PAGINATION
|
||
path = POSTS_DIR / category / state
|
||
timestamps = sorted(path.iterdir(), reverse=True)
|
||
timestamps = timestamps[start:end] if end else timestamps[start:]
|
||
for timestamp in timestamps:
|
||
post = get_post(category, timestamp.name, state)
|
||
if post:
|
||
yield post
|
||
|
||
|
||
def get_post(category, timestamp, states=None):
|
||
states = tuple(
|
||
states if isinstance(states, (tuple, list)) else [states] if isinstance(states, str) else STATES.keys()
|
||
)
|
||
for state in states:
|
||
dir = POSTS_DIR / category / state / timestamp
|
||
path = dir / BASE_FILE
|
||
if path.is_file():
|
||
break
|
||
else:
|
||
return None
|
||
tree = ElementTree.parse(path)
|
||
post = {item.tag: (item.text or "").strip() for item in tree.iter()}
|
||
|
||
# Calculated fields
|
||
image = post.get("image") or post.get("old_image") or BASE_IMAGE
|
||
if (dir / image).is_file():
|
||
post[FIELD_IMAGE] = "/".join((category, state, timestamp, image))
|
||
post[FIELD_TIMESTAMP] = timestamp
|
||
post[FIELD_STATE] = state
|
||
post[FIELD_DIR] = dir
|
||
post[FIELD_PATH] = path
|
||
return post
|
||
|
||
|
||
if __name__ == "__main__":
|
||
admin_1 = AdminUser.get_by_id(1)
|
||
for category in CATEGORIES:
|
||
for state in STATES:
|
||
for post in get_posts(category, state):
|
||
timestamp = post.get(FIELD_TIMESTAMP)
|
||
if post.get(FIELD_IMAGE):
|
||
image = POSTS_DIR / post.get(FIELD_IMAGE)
|
||
name, ext = os.path.splitext(post.get(FIELD_IMAGE))
|
||
post["image"] = f"{category}.{timestamp}{ext}"
|
||
shutil.copy(str(image), str(IMAGE_DIR / post["image"]))
|
||
if category == "actualites":
|
||
new_post = NewsEntry.create(
|
||
title=post.get("title", "(untitled)"),
|
||
summary=post.get("summary"),
|
||
content=html2text(post.get("content", "")),
|
||
author="Admin",
|
||
author_email=post.get("email"),
|
||
image_path=post.get("image"),
|
||
dt_published=parse(post.get("published")).replace(tzinfo=None)
|
||
if state == "published"
|
||
else None,
|
||
dt_submitted=parse(post.get("published")).replace(tzinfo=None),
|
||
dt_updated=parse(post.get("published")).replace(tzinfo=None),
|
||
state=state,
|
||
approved_by=admin_1 if state == "published" or state == "rejected" else None,
|
||
)
|
||
Slug.create(url=f"/posts/actualites/{post.get(FIELD_TIMESTAMP)}", newsentry=new_post)
|
||
post_id = post.get("id")
|
||
if post_id:
|
||
Slug.create(url=post_id.split("afpy.org")[-1], newsentry=new_post)
|
||
else:
|
||
email = post.get("email", "")
|
||
phone = post.get("phone", "")
|
||
if not email and not phone:
|
||
phone = "(no phone)"
|
||
new_job = JobPost.create(
|
||
title=post.get("title", "(untitled)"),
|
||
summary=post.get("summary"),
|
||
content=html2text(post.get("content", "")),
|
||
company=post.get("company", ""),
|
||
email=email,
|
||
phone=phone,
|
||
location=post.get("address", ""),
|
||
contact_info=post.get("contact", ""),
|
||
dt_published=parse(post.get("published")).replace(tzinfo=None)
|
||
if state == "published"
|
||
else None,
|
||
dt_submitted=parse(post.get("published")).replace(tzinfo=None),
|
||
dt_updated=parse(post.get("published")).replace(tzinfo=None),
|
||
state=state,
|
||
approved_by=admin_1 if state == "published" or state == "rejected" else None,
|
||
image_path=post.get("image"),
|
||
)
|
||
Slug.create(url=f"/posts/emplois/{post.get(FIELD_TIMESTAMP)}", jobpost=new_job)
|
||
post_id = post.get("id")
|
||
if post_id:
|
||
Slug.create(url=post_id.split("afpy.org")[-1], jobpost=new_job)
|