111 lines
3.8 KiB
Python
111 lines
3.8 KiB
Python
# coding: utf-8
|
|
import os
|
|
import shutil
|
|
from pathlib import Path
|
|
from xml.etree import ElementTree
|
|
|
|
from dateutil.parser import parse
|
|
from html2text import html2text
|
|
|
|
from afpy.models.AdminUser import AdminUser
|
|
from afpy.models.NewsEntry import NewsEntry
|
|
from afpy.models.Slug import Slug
|
|
|
|
PAGINATION = 12
|
|
|
|
CATEGORY_ACTUALITIES = "actualites"
|
|
|
|
CATEGORIES = {CATEGORY_ACTUALITIES: "Actualités"}
|
|
|
|
STATE_WAITING = "waiting"
|
|
STATE_PUBLISHED = "published"
|
|
STATE_TRASHED = "trashed"
|
|
STATES = {STATE_WAITING: "En attente", STATE_PUBLISHED: "Publié", STATE_TRASHED: "Supprimé"}
|
|
|
|
FIELD_IMAGE = "_image"
|
|
FIELD_TIMESTAMP = "_timestamp"
|
|
FIELD_STATE = "_state"
|
|
FIELD_PATH = "_path"
|
|
FIELD_DIR = "_dir"
|
|
|
|
BASE_DIR = "posts"
|
|
BASE_FILE = "post.xml"
|
|
BASE_IMAGE = "post.jpg"
|
|
|
|
ROOT_DIR = Path(__file__).parent
|
|
POSTS_DIR = ROOT_DIR / BASE_DIR
|
|
IMAGE_DIR = ROOT_DIR / "images"
|
|
IMAGE_DIR.mkdir(exist_ok=True)
|
|
|
|
|
|
def get_posts(category, state=STATE_PUBLISHED, page=None, end=None):
|
|
start = 0
|
|
if page and not end:
|
|
end = page * PAGINATION
|
|
start = end - PAGINATION
|
|
path = POSTS_DIR / category / state
|
|
timestamps = sorted(path.iterdir(), reverse=True)
|
|
timestamps = timestamps[start:end] if end else timestamps[start:]
|
|
for timestamp in timestamps:
|
|
post = get_post(category, timestamp.name, state)
|
|
if post:
|
|
yield post
|
|
|
|
|
|
def get_post(category, timestamp, states=None):
|
|
states = tuple(
|
|
states if isinstance(states, (tuple, list)) else [states] if isinstance(states, str) else STATES.keys()
|
|
)
|
|
for state in states:
|
|
dir = POSTS_DIR / category / state / timestamp
|
|
path = dir / BASE_FILE
|
|
if path.is_file():
|
|
break
|
|
else:
|
|
return None
|
|
tree = ElementTree.parse(path)
|
|
post = {item.tag: (item.text or "").strip() for item in tree.iter()}
|
|
|
|
# Calculated fields
|
|
image = post.get("image") or post.get("old_image") or BASE_IMAGE
|
|
if (dir / image).is_file():
|
|
post[FIELD_IMAGE] = "/".join((category, state, timestamp, image))
|
|
post[FIELD_TIMESTAMP] = timestamp
|
|
post[FIELD_STATE] = state
|
|
post[FIELD_DIR] = dir
|
|
post[FIELD_PATH] = path
|
|
return post
|
|
|
|
|
|
if __name__ == "__main__":
|
|
admin_1 = AdminUser.get_by_id(1)
|
|
for category in CATEGORIES:
|
|
for state in STATES:
|
|
for post in get_posts(category, state):
|
|
timestamp = post.get(FIELD_TIMESTAMP)
|
|
if post.get(FIELD_IMAGE):
|
|
image = POSTS_DIR / post.get(FIELD_IMAGE)
|
|
name, ext = os.path.splitext(post.get(FIELD_IMAGE))
|
|
post["image"] = f"{category}.{timestamp}{ext}"
|
|
shutil.copy(str(image), str(IMAGE_DIR / post["image"]))
|
|
if category == "actualites":
|
|
new_post = NewsEntry.create(
|
|
title=post.get("title", "(untitled)"),
|
|
summary=post.get("summary"),
|
|
content=html2text(post.get("content", "")),
|
|
author="Admin",
|
|
author_email=post.get("email"),
|
|
image_path=post.get("image"),
|
|
dt_published=parse(post.get("published")).replace(tzinfo=None)
|
|
if state == "published"
|
|
else None,
|
|
dt_submitted=parse(post.get("published")).replace(tzinfo=None),
|
|
dt_updated=parse(post.get("published")).replace(tzinfo=None),
|
|
state=state,
|
|
approved_by=admin_1 if state == "published" or state == "rejected" else None,
|
|
)
|
|
Slug.create(url=f"/posts/actualites/{post.get(FIELD_TIMESTAMP)}", newsentry=new_post)
|
|
post_id = post.get("id")
|
|
if post_id:
|
|
Slug.create(url=post_id.split("afpy.org")[-1], newsentry=new_post)
|