forked from AFPy/afpy.org
122 lines
3.6 KiB
Python
122 lines
3.6 KiB
Python
import email
|
||
import time
|
||
from pathlib import Path
|
||
from xml.etree import ElementTree
|
||
|
||
|
||
POST_ACTUALITIES = 'actualites'
|
||
POST_JOBS = 'emplois'
|
||
POSTS = {POST_ACTUALITIES: "Actualités", POST_JOBS: "Offres d’emploi"}
|
||
|
||
STATE_WAITING = 'waiting'
|
||
STATE_PUBLISHED = 'published'
|
||
STATES = {STATE_WAITING: "En attente", STATE_PUBLISHED: "Publié"}
|
||
|
||
ACTION_PUBLISH = 'publish'
|
||
ACTION_UNPUBLISH = 'unpublish'
|
||
ACTIONS = {ACTION_PUBLISH: "Publier", ACTION_UNPUBLISH: "Dépublier"}
|
||
|
||
TIMESTAMP = '_timestamp'
|
||
STATE = '_state'
|
||
PATH = '_path'
|
||
DIR = '_dir'
|
||
|
||
BASE_DIR = 'posts'
|
||
BASE_FILE = 'post.xml'
|
||
|
||
|
||
class DataException(Exception):
|
||
def __init__(self, *args, http_code=None, **kwargs):
|
||
self.http_code = http_code
|
||
super().__init__(*args, **kwargs)
|
||
|
||
|
||
root = Path(__file__).parent / BASE_DIR
|
||
for category in POSTS:
|
||
for state in STATES:
|
||
(root / category / state).mkdir(parents=True, exist_ok=True)
|
||
|
||
|
||
def get_path(category, state, timestamp, *args, create_dir=False):
|
||
path = root / category / state / timestamp
|
||
if create_dir:
|
||
path.mkdir(exist_ok=True)
|
||
for arg in args:
|
||
path /= arg
|
||
return path
|
||
|
||
|
||
def count_posts(category, state=STATE_PUBLISHED):
|
||
return len(tuple((root / category / state).iterdir()))
|
||
|
||
|
||
def get_posts(category, state=STATE_PUBLISHED, start=0, end=None):
|
||
path = root / category / state
|
||
timestamps = sorted(path.iterdir(), reverse=True)
|
||
timestamps = timestamps[start:end] if end else timestamps[start:]
|
||
for timestamp in timestamps:
|
||
yield get_post(category, timestamp.name, state)
|
||
|
||
|
||
def get_post(category, timestamp, states=None):
|
||
states = (
|
||
states
|
||
if isinstance(states, (tuple, list))
|
||
else [states]
|
||
if isinstance(states, str)
|
||
else STATES
|
||
)
|
||
for state in states:
|
||
dir = root / category / state / timestamp
|
||
path = dir / BASE_FILE
|
||
if path.is_file():
|
||
break
|
||
else:
|
||
return None
|
||
tree = ElementTree.parse(path)
|
||
post = {item.tag: (item.text or '').strip() for item in tree.iter()}
|
||
post[TIMESTAMP] = int(timestamp)
|
||
post[STATE] = state
|
||
post[DIR] = dir
|
||
post[PATH] = path
|
||
return post
|
||
|
||
|
||
def save_post(category, timestamp, admin, form):
|
||
if timestamp is None:
|
||
status = STATE_WAITING
|
||
timestamp = str(int(time.time()))
|
||
elif get_path(category, STATE_WAITING, timestamp, BASE_FILE).is_file():
|
||
status = STATE_WAITING
|
||
elif get_path(category, STATE_PUBLISHED, timestamp, BASE_FILE).is_file():
|
||
status = STATE_PUBLISHED
|
||
else:
|
||
raise DataException(http_code=404)
|
||
if status == STATE_PUBLISHED and not admin:
|
||
raise DataException(http_code=401)
|
||
|
||
post = get_path(category, status, timestamp, BASE_FILE, create_dir=True)
|
||
tree = ElementTree.Element('entry')
|
||
for key, value in form.items():
|
||
if key.startswith('_'):
|
||
continue
|
||
element = ElementTree.SubElement(tree, key)
|
||
element.text = value
|
||
element = ElementTree.SubElement(tree, STATE_PUBLISHED)
|
||
element.text = email.utils.formatdate(
|
||
int(timestamp) if timestamp else time.time()
|
||
)
|
||
ElementTree.ElementTree(tree).write(post)
|
||
|
||
if admin:
|
||
if ACTION_PUBLISH in form and status == STATE_WAITING:
|
||
(root / category / STATE_WAITING / timestamp).rename(
|
||
root / category / STATE_PUBLISHED / timestamp
|
||
)
|
||
elif ACTION_UNPUBLISH in form and status == STATE_PUBLISHED:
|
||
(root / category / STATE_PUBLISHED / timestamp).rename(
|
||
root / category / STATE_WAITING / timestamp
|
||
)
|
||
|
||
return get_post(category, timestamp)
|