forked from AFPy/afpy.org
Merge pull request #37 from debnet/data_xml
XML data-source refactoring
This commit is contained in:
commit
45e244a98e
263
afpy.py
263
afpy.py
|
@ -2,8 +2,6 @@ import email
|
|||
import locale
|
||||
import os
|
||||
import time
|
||||
from pathlib import Path
|
||||
from xml.etree import ElementTree
|
||||
|
||||
import docutils.core
|
||||
import docutils.writers.html5_polyglot
|
||||
|
@ -15,6 +13,8 @@ from flask_cache import Cache
|
|||
from itsdangerous import BadSignature, URLSafeSerializer
|
||||
from jinja2 import TemplateNotFound
|
||||
|
||||
import data_xml as data
|
||||
|
||||
locale.setlocale(locale.LC_ALL, 'fr_FR.UTF-8')
|
||||
|
||||
cache = Cache(config={'CACHE_TYPE': 'simple', 'CACHE_DEFAULT_TIMEOUT': 600})
|
||||
|
@ -22,6 +22,7 @@ signer = URLSafeSerializer(os.environ.get('SECRET', 'changeme!'))
|
|||
app = Flask(__name__)
|
||||
cache.init_app(app)
|
||||
|
||||
|
||||
PAGINATION = 12
|
||||
|
||||
PLANET = {
|
||||
|
@ -30,33 +31,21 @@ PLANET = {
|
|||
'Anybox': 'https://anybox.fr/site-feed/RSS?set_language=fr',
|
||||
'Ascendances': 'https://ascendances.wordpress.com/feed/',
|
||||
'Code en Seine': 'https://codeenseine.fr/feeds/all.atom.xml',
|
||||
'Yaal': 'https://www.yaal.fr/blog/feeds/all.atom.xml'
|
||||
'Yaal': 'https://www.yaal.fr/blog/feeds/all.atom.xml',
|
||||
}
|
||||
|
||||
MEETUPS = {
|
||||
'amiens': 'https://www.meetup.com/fr-FR/Python-Amiens',
|
||||
'bruxelles': (
|
||||
'https://www.meetup.com/fr-FR/'
|
||||
'Belgium-Python-Meetup-aka-AperoPythonBe/'),
|
||||
'grenoble': (
|
||||
'https://www.meetup.com/fr-FR/Groupe-dutilisateurs-Python-Grenoble/'),
|
||||
'bruxelles': 'https://www.meetup.com/fr-FR/'
|
||||
'Belgium-Python-Meetup-aka-AperoPythonBe/',
|
||||
'grenoble': 'https://www.meetup.com/fr-FR/'
|
||||
'Groupe-dutilisateurs-Python-Grenoble/',
|
||||
'lille': 'https://www.meetup.com/fr-FR/Lille-py/',
|
||||
'lyon': 'https://www.meetup.com/fr-FR/Python-AFPY-Lyon/',
|
||||
'nantes': 'https://www.meetup.com/fr-FR/Nantes-Python-Meetup/',
|
||||
'montpellier': 'https://www.meetup.com/fr-FR/Meetup-Python-Montpellier/',
|
||||
}
|
||||
|
||||
POSTS = {
|
||||
'actualites': 'Actualités',
|
||||
'emplois': 'Offres d’emploi',
|
||||
}
|
||||
|
||||
|
||||
root = Path(__file__).parent / 'posts'
|
||||
for category in POSTS:
|
||||
for status in ('waiting', 'published'):
|
||||
(root / category / status).mkdir(parents=True, exist_ok=True)
|
||||
|
||||
|
||||
@app.errorhandler(404)
|
||||
def page_not_found(e):
|
||||
|
@ -66,16 +55,16 @@ def page_not_found(e):
|
|||
@app.route('/')
|
||||
def index():
|
||||
posts = {}
|
||||
path = root / 'actualites' / 'published'
|
||||
timestamps = sorted(path.iterdir(), reverse=True)[:4]
|
||||
for timestamp in timestamps:
|
||||
tree = ElementTree.parse(timestamp / 'post.xml')
|
||||
posts[timestamp.name] = {item.tag: item.text for item in tree.iter()}
|
||||
if (timestamp / 'post.jpg').is_file():
|
||||
posts[timestamp.name]['image'] = '/'.join((
|
||||
'actualites', 'published', timestamp.name))
|
||||
for post in data.get_posts(data.POST_ACTUALITIES, end=4):
|
||||
timestamp = post[data.TIMESTAMP]
|
||||
posts[timestamp] = post
|
||||
if (post[data.DIR] / 'post.jpg').is_file():
|
||||
posts[timestamp]['image'] = '/'.join(
|
||||
(data.POST_ACTUALITIES, data.STATE_PUBLISHED, timestamp)
|
||||
)
|
||||
return render_template(
|
||||
'index.html', body_id='index', name='actualites', posts=posts)
|
||||
'index.html', body_id='index', name=data.POST_ACTUALITIES, posts=posts
|
||||
)
|
||||
|
||||
|
||||
@app.route('/<name>')
|
||||
|
@ -95,103 +84,57 @@ def rest(name):
|
|||
parts = docutils.core.publish_parts(
|
||||
source=fd.read(),
|
||||
writer=docutils.writers.html5_polyglot.Writer(),
|
||||
settings_overrides={'initial_header_level': 2})
|
||||
settings_overrides={'initial_header_level': 2},
|
||||
)
|
||||
except FileNotFoundError:
|
||||
abort(404)
|
||||
return render_template(
|
||||
'rst.html', body_id=name, html=parts['body'], title=parts['title'])
|
||||
|
||||
|
||||
def _get_post(name, timestamp):
|
||||
for state in ('waiting', 'published'):
|
||||
path = (root / name / state / timestamp / 'post.xml')
|
||||
if path.is_file():
|
||||
break
|
||||
else:
|
||||
return None
|
||||
tree = ElementTree.parse(path)
|
||||
post = {item.tag: (item.text or '').strip() for item in tree.iter()}
|
||||
post['state'] = state
|
||||
post['timestamp'] = timestamp
|
||||
return post
|
||||
'rst.html', body_id=name, html=parts['body'], title=parts['title']
|
||||
)
|
||||
|
||||
|
||||
@app.route('/post/edit/<name>')
|
||||
@app.route('/post/edit/<name>/token/<token>')
|
||||
def edit_post(name, token=None):
|
||||
if name not in POSTS:
|
||||
if name not in data.POSTS:
|
||||
abort(404)
|
||||
if token:
|
||||
try:
|
||||
timestamp = signer.loads(token)
|
||||
except BadSignature:
|
||||
abort(401)
|
||||
post = _get_post(name, timestamp)
|
||||
post = data.get_post(name, timestamp)
|
||||
if not post:
|
||||
abort(404)
|
||||
else:
|
||||
post = {'state': 'waiting'}
|
||||
if post['state'] != 'waiting':
|
||||
post = {data.STATE: data.STATE_WAITING}
|
||||
if post[data.STATE] != data.STATE_WAITING:
|
||||
return redirect(url_for('rest', name='already_published'))
|
||||
return render_template(
|
||||
'edit_post.html', body_id='edit-post', post=post, name=name,
|
||||
admin=False)
|
||||
'edit_post.html',
|
||||
body_id='edit-post',
|
||||
post=post,
|
||||
name=name,
|
||||
admin=False,
|
||||
)
|
||||
|
||||
|
||||
@app.route('/admin/post/edit/<name>/<timestamp>')
|
||||
def edit_post_admin(name, timestamp):
|
||||
if name not in POSTS:
|
||||
if name not in data.POSTS:
|
||||
abort(404)
|
||||
post = _get_post(name, timestamp)
|
||||
post = data.get_post(name, timestamp)
|
||||
if not post:
|
||||
abort(404)
|
||||
return render_template(
|
||||
'edit_post.html', body_id='edit-post', post=post, name=name,
|
||||
admin=True)
|
||||
|
||||
|
||||
def _save_post(name, timestamp, admin):
|
||||
if timestamp is None:
|
||||
timestamp = str(int(time.time()))
|
||||
status = 'waiting'
|
||||
folder = root / name / 'waiting' / timestamp
|
||||
folder.mkdir()
|
||||
post = folder / 'post.xml'
|
||||
elif (root / name / 'waiting' / timestamp / 'post.xml').is_file():
|
||||
status = 'waiting'
|
||||
elif (root / name / 'published' / timestamp / 'post.xml').is_file():
|
||||
status = 'published'
|
||||
else:
|
||||
abort(404)
|
||||
|
||||
if status == 'published' and not admin:
|
||||
abort(401)
|
||||
|
||||
post = root / name / status / timestamp / 'post.xml'
|
||||
tree = ElementTree.Element('entry')
|
||||
for key, value in request.form.items():
|
||||
element = ElementTree.SubElement(tree, key)
|
||||
element.text = value
|
||||
element = ElementTree.SubElement(tree, 'published')
|
||||
element.text = email.utils.formatdate(
|
||||
int(timestamp) if timestamp else time.time())
|
||||
ElementTree.ElementTree(tree).write(post)
|
||||
|
||||
if admin:
|
||||
if 'publish' in request.form and status == 'waiting':
|
||||
(root / name / 'waiting' / timestamp).rename(
|
||||
root / name / 'published' / timestamp)
|
||||
elif 'unpublish' in request.form and status == 'published':
|
||||
(root / name / 'published' / timestamp).rename(
|
||||
root / name / 'waiting' / timestamp)
|
||||
|
||||
return _get_post(name, timestamp)
|
||||
'edit_post.html', body_id='edit-post', post=post, name=name, admin=True
|
||||
)
|
||||
|
||||
|
||||
@app.route('/post/edit/<name>', methods=['post'])
|
||||
@app.route('/post/edit/<name>/token/<token>', methods=['post'])
|
||||
def save_post(name, token=None):
|
||||
if name not in POSTS:
|
||||
if name not in data.POSTS:
|
||||
abort(404)
|
||||
if token:
|
||||
try:
|
||||
|
@ -200,71 +143,90 @@ def save_post(name, token=None):
|
|||
abort(401)
|
||||
else:
|
||||
timestamp = None
|
||||
post = _save_post(name, timestamp=timestamp, admin=False)
|
||||
try:
|
||||
post = data.save_post(
|
||||
name, timestamp=timestamp, admin=False, form=request.form
|
||||
)
|
||||
except data.DataException as e:
|
||||
abort(e.http_code)
|
||||
edit_post_url = url_for(
|
||||
'edit_post', name=name, token=signer.dumps(post['timestamp']))
|
||||
'edit_post', name=name, token=signer.dumps(post['_timestamp'])
|
||||
)
|
||||
return render_template('confirmation.html', edit_post_url=edit_post_url)
|
||||
|
||||
|
||||
@app.route('/admin/post/edit/<name>/<timestamp>', methods=['post'])
|
||||
def save_post_admin(name, timestamp):
|
||||
if name not in POSTS:
|
||||
if name not in data.POSTS:
|
||||
abort(404)
|
||||
_save_post(name, timestamp=timestamp, admin=True)
|
||||
try:
|
||||
data.save_post(
|
||||
name, timestamp=timestamp, admin=True, form=request.form
|
||||
)
|
||||
except data.DataException as e:
|
||||
abort(e.http_code)
|
||||
return redirect(url_for('admin', name=name))
|
||||
|
||||
|
||||
@app.route('/posts/<name>')
|
||||
@app.route('/posts/<name>/page/<int:page>')
|
||||
def posts(name, page=1):
|
||||
if name not in POSTS:
|
||||
if name not in data.POSTS:
|
||||
abort(404)
|
||||
path = root / name / 'published'
|
||||
timestamps = sorted(path.iterdir(), reverse=True)
|
||||
end = page * PAGINATION
|
||||
start = end - PAGINATION
|
||||
total_pages = (len(timestamps) // PAGINATION) + 1
|
||||
total_pages = (
|
||||
data.count_posts(name, data.STATE_PUBLISHED) // PAGINATION
|
||||
) + 1
|
||||
posts = {}
|
||||
for timestamp in timestamps[start:end]:
|
||||
tree = ElementTree.parse(timestamp / 'post.xml')
|
||||
posts[timestamp.name] = {item.tag: item.text for item in tree.iter()}
|
||||
for post in data.get_posts(
|
||||
name, data.STATE_PUBLISHED, start=start, end=end
|
||||
):
|
||||
timestamp = post[data.TIMESTAMP]
|
||||
posts[timestamp] = post
|
||||
if (timestamp / 'post.jpg').is_file():
|
||||
posts[timestamp.name]['image'] = '/'.join((
|
||||
name, 'published', timestamp.name))
|
||||
posts[timestamp]['image'] = '/'.join(
|
||||
(name, data.STATE_PUBLISHED, timestamp)
|
||||
)
|
||||
return render_template(
|
||||
'posts.html', body_id=name, posts=posts, title=POSTS[name], name=name,
|
||||
page=page, total_pages=total_pages)
|
||||
'posts.html',
|
||||
body_id=name,
|
||||
posts=posts,
|
||||
title=data.POSTS[name],
|
||||
name=name,
|
||||
page=page,
|
||||
total_pages=total_pages,
|
||||
)
|
||||
|
||||
|
||||
@app.route('/admin/posts/<name>')
|
||||
def admin(name):
|
||||
if name not in POSTS:
|
||||
if name not in data.POSTS:
|
||||
abort(404)
|
||||
posts = {}
|
||||
for state in ('waiting', 'published'):
|
||||
for state in data.STATES:
|
||||
posts[state] = state_posts = {}
|
||||
timestamps = sorted((root / name / state).iterdir(), reverse=True)
|
||||
for timestamp in timestamps:
|
||||
tree = ElementTree.parse(timestamp / 'post.xml')
|
||||
state_posts[timestamp.name] = {
|
||||
item.tag: item.text for item in tree.iter()}
|
||||
for post in data.get_posts(name, state):
|
||||
timestamp = post[data.TIMESTAMP]
|
||||
state_posts[timestamp] = post
|
||||
return render_template(
|
||||
'admin.html', body_id='admin', posts=posts, title=POSTS[name],
|
||||
name=name)
|
||||
'admin.html',
|
||||
body_id='admin',
|
||||
posts=posts,
|
||||
title=data.POSTS[name],
|
||||
name=name,
|
||||
)
|
||||
|
||||
|
||||
@app.route('/posts/<name>/<timestamp>')
|
||||
def post(name, timestamp):
|
||||
if name not in POSTS:
|
||||
if name not in data.POSTS:
|
||||
abort(404)
|
||||
try:
|
||||
path = root / name / 'published' / timestamp
|
||||
tree = ElementTree.parse(path / 'post.xml')
|
||||
except Exception:
|
||||
post = data.get_post(name, timestamp, data.STATE_PUBLISHED)
|
||||
if not post:
|
||||
abort(404)
|
||||
post = {item.tag: item.text for item in tree.iter()}
|
||||
if (path / 'post.jpg').is_file():
|
||||
post['image'] = '/'.join((name, 'published', timestamp))
|
||||
if (post[data.DIR] / 'post.jpg').is_file():
|
||||
post['image'] = '/'.join((name, data.STATE_PUBLISHED, timestamp))
|
||||
return render_template('post.html', body_id='post', post=post, name=name)
|
||||
|
||||
|
||||
|
@ -273,32 +235,33 @@ def post_image(path):
|
|||
if path.count('/') != 2:
|
||||
abort(404)
|
||||
name, status, timestamp = path.split('/')
|
||||
if name not in POSTS:
|
||||
if name not in data.POSTS:
|
||||
abort(404)
|
||||
if status not in ('published', 'waiting'):
|
||||
if status not in data.STATES:
|
||||
abort(404)
|
||||
return send_from_directory(root / path, 'post.jpg')
|
||||
return send_from_directory(data.root / path, 'post.jpg')
|
||||
|
||||
|
||||
@app.route('/feed/<name>/rss.xml')
|
||||
@cache.cached()
|
||||
def feed(name):
|
||||
if name not in POSTS:
|
||||
if name not in data.POSTS:
|
||||
abort(404)
|
||||
path = root / name / 'published'
|
||||
timestamps = sorted(path.iterdir(), reverse=True)[:50]
|
||||
entries = []
|
||||
for timestamp in timestamps:
|
||||
tree = ElementTree.parse(timestamp / 'post.xml')
|
||||
entry = {item.tag: item.text for item in tree.iter()}
|
||||
entry['timestamp'] = int(timestamp.name)
|
||||
entry['link'] = url_for(
|
||||
'post', name=name, timestamp=timestamp.name, _external=True)
|
||||
entries.append({'content': entry})
|
||||
title = f'{POSTS[name]} AFPy.org'
|
||||
for post in data.get_posts(name, data.STATE_PUBLISHED, end=50):
|
||||
timestamp = post[data.TIMESTAMP]
|
||||
post['link'] = url_for(
|
||||
'post', name=name, timestamp=timestamp, _external=True
|
||||
)
|
||||
entries.append({'content': post})
|
||||
title = f'{data.POSTS[name]} AFPy.org'
|
||||
return render_template(
|
||||
'rss.xml', entries=entries, title=title, description=title,
|
||||
link=url_for('feed', name=name, _external=True))
|
||||
'rss.xml',
|
||||
entries=entries,
|
||||
title=title,
|
||||
description=title,
|
||||
link=url_for('feed', name=name, _external=True),
|
||||
)
|
||||
|
||||
|
||||
@app.route('/planet/')
|
||||
|
@ -313,9 +276,12 @@ def planet():
|
|||
entries.append({'feed': name, 'content': entry})
|
||||
entries.sort(reverse=True, key=lambda entry: entry['content']['timestamp'])
|
||||
return render_template(
|
||||
'rss.xml', entries=entries, title='Planet Python francophone',
|
||||
'rss.xml',
|
||||
entries=entries,
|
||||
title='Planet Python francophone',
|
||||
description='Nouvelles autour de Python en français',
|
||||
link=url_for('planet', _external=True))
|
||||
link=url_for('planet', _external=True),
|
||||
)
|
||||
|
||||
|
||||
@app.route('/rss-jobs/RSS')
|
||||
|
@ -326,11 +292,10 @@ def jobs():
|
|||
@app.route('/status')
|
||||
def status():
|
||||
stats = {}
|
||||
for category in POSTS:
|
||||
for category in data.POSTS:
|
||||
stats[category] = {}
|
||||
for status in ('waiting', 'published'):
|
||||
stats[category][status] = len(list(
|
||||
(root / category / status).iterdir()))
|
||||
for state in data.STATES:
|
||||
stats[category][state] = data.count_posts(category, state)
|
||||
|
||||
os_stats = os.statvfs(__file__)
|
||||
stats['disk_free'] = os_stats.f_bavail * os_stats.f_frsize
|
||||
|
@ -352,5 +317,7 @@ def parse_iso_datetime(iso_datetime, format_):
|
|||
|
||||
if app.env == 'development': # pragma: no cover
|
||||
from sassutils.wsgi import SassMiddleware
|
||||
|
||||
app.wsgi_app = SassMiddleware(
|
||||
app.wsgi_app, {'afpy': ('sass', 'static/css', '/static/css')})
|
||||
app.wsgi_app, {'afpy': ('sass', 'static/css', '/static/css')}
|
||||
)
|
||||
|
|
121
data_xml.py
Normal file
121
data_xml.py
Normal file
|
@ -0,0 +1,121 @@
|
|||
import email
|
||||
import time
|
||||
from pathlib import Path
|
||||
from xml.etree import ElementTree
|
||||
|
||||
|
||||
POST_ACTUALITIES = 'actualites'
|
||||
POST_JOBS = 'emplois'
|
||||
POSTS = {POST_ACTUALITIES: "Actualités", POST_JOBS: "Offres d’emploi"}
|
||||
|
||||
STATE_WAITING = 'waiting'
|
||||
STATE_PUBLISHED = 'published'
|
||||
STATES = {STATE_WAITING: "En attente", STATE_PUBLISHED: "Publié"}
|
||||
|
||||
ACTION_PUBLISH = 'publish'
|
||||
ACTION_UNPUBLISH = 'unpublish'
|
||||
ACTIONS = {ACTION_PUBLISH: "Publier", ACTION_UNPUBLISH: "Dépublier"}
|
||||
|
||||
TIMESTAMP = '_timestamp'
|
||||
STATE = '_state'
|
||||
PATH = '_path'
|
||||
DIR = '_dir'
|
||||
|
||||
BASE_DIR = 'posts'
|
||||
BASE_FILE = 'post.xml'
|
||||
|
||||
|
||||
class DataException(Exception):
|
||||
def __init__(self, *args, http_code=None, **kwargs):
|
||||
self.http_code = http_code
|
||||
super().__init__(*args, **kwargs)
|
||||
|
||||
|
||||
root = Path(__file__).parent / BASE_DIR
|
||||
for category in POSTS:
|
||||
for state in STATES:
|
||||
(root / category / state).mkdir(parents=True, exist_ok=True)
|
||||
|
||||
|
||||
def get_path(category, state, timestamp, *args, create_dir=False):
|
||||
path = root / category / state / timestamp
|
||||
if create_dir:
|
||||
path.mkdir(exist_ok=True)
|
||||
for arg in args:
|
||||
path /= arg
|
||||
return path
|
||||
|
||||
|
||||
def count_posts(category, state=STATE_PUBLISHED):
|
||||
return len(tuple((root / category / state).iterdir()))
|
||||
|
||||
|
||||
def get_posts(category, state=STATE_PUBLISHED, start=0, end=None):
|
||||
path = root / category / state
|
||||
timestamps = sorted(path.iterdir(), reverse=True)
|
||||
timestamps = timestamps[start:end] if end else timestamps[start:]
|
||||
for timestamp in timestamps:
|
||||
yield get_post(category, timestamp.name, state)
|
||||
|
||||
|
||||
def get_post(category, timestamp, states=None):
|
||||
states = (
|
||||
states
|
||||
if isinstance(states, (tuple, list))
|
||||
else [states]
|
||||
if isinstance(states, str)
|
||||
else STATES
|
||||
)
|
||||
for state in states:
|
||||
dir = root / category / state / timestamp
|
||||
path = dir / BASE_FILE
|
||||
if path.is_file():
|
||||
break
|
||||
else:
|
||||
return None
|
||||
tree = ElementTree.parse(path)
|
||||
post = {item.tag: (item.text or '').strip() for item in tree.iter()}
|
||||
post[TIMESTAMP] = int(timestamp)
|
||||
post[STATE] = state
|
||||
post[DIR] = dir
|
||||
post[PATH] = path
|
||||
return post
|
||||
|
||||
|
||||
def save_post(category, timestamp, admin, form):
|
||||
if timestamp is None:
|
||||
status = STATE_WAITING
|
||||
timestamp = str(int(time.time()))
|
||||
elif get_path(category, STATE_WAITING, timestamp, BASE_FILE).is_file():
|
||||
status = STATE_WAITING
|
||||
elif get_path(category, STATE_PUBLISHED, timestamp, BASE_FILE).is_file():
|
||||
status = STATE_PUBLISHED
|
||||
else:
|
||||
raise DataException(http_code=404)
|
||||
if status == STATE_PUBLISHED and not admin:
|
||||
raise DataException(http_code=401)
|
||||
|
||||
post = get_path(category, status, timestamp, BASE_FILE, create_dir=True)
|
||||
tree = ElementTree.Element('entry')
|
||||
for key, value in form.items():
|
||||
if key.startswith('_'):
|
||||
continue
|
||||
element = ElementTree.SubElement(tree, key)
|
||||
element.text = value
|
||||
element = ElementTree.SubElement(tree, STATE_PUBLISHED)
|
||||
element.text = email.utils.formatdate(
|
||||
int(timestamp) if timestamp else time.time()
|
||||
)
|
||||
ElementTree.ElementTree(tree).write(post)
|
||||
|
||||
if admin:
|
||||
if ACTION_PUBLISH in form and status == STATE_WAITING:
|
||||
(root / category / STATE_WAITING / timestamp).rename(
|
||||
root / category / STATE_PUBLISHED / timestamp
|
||||
)
|
||||
elif ACTION_UNPUBLISH in form and status == STATE_PUBLISHED:
|
||||
(root / category / STATE_PUBLISHED / timestamp).rename(
|
||||
root / category / STATE_WAITING / timestamp
|
||||
)
|
||||
|
||||
return get_post(category, timestamp)
|
11
requirements.txt
Normal file
11
requirements.txt
Normal file
|
@ -0,0 +1,11 @@
|
|||
Flask<1.0.1
|
||||
Flask-Cache
|
||||
libsass
|
||||
docutils
|
||||
feedparser
|
||||
python-dateutil
|
||||
itsdangerous
|
||||
pytest
|
||||
pytest-isort
|
||||
pytest-flake8
|
||||
pytest-coverage
|
Loading…
Reference in New Issue
Block a user