25f9b8c8e6
Fix #46
243 lines
7.4 KiB
Python
Executable File
243 lines
7.4 KiB
Python
Executable File
#!/usr/bin/python3
|
|
# coding: utf-8
|
|
|
|
|
|
from abc import ABC, abstractmethod
|
|
from urllib.parse import urlparse
|
|
import urllib.request
|
|
import os
|
|
from selenium import webdriver
|
|
from selenium.webdriver.chrome.service import Service
|
|
from selenium.webdriver.chrome.options import Options
|
|
import icalendar
|
|
from datetime import datetime, date
|
|
import json
|
|
from bs4 import BeautifulSoup
|
|
|
|
|
|
|
|
class Downloader(ABC):
|
|
|
|
def __init__(self):
|
|
pass
|
|
|
|
@abstractmethod
|
|
def download(self, url):
|
|
pass
|
|
|
|
class SimpleDownloader(Downloader):
|
|
|
|
def __init__(self):
|
|
super().__init__()
|
|
|
|
|
|
def download(self, url):
|
|
print("Downloading {}".format(url))
|
|
|
|
try:
|
|
resource = urllib.request.urlopen(url)
|
|
data = resource.read().decode(resource.headers.get_content_charset())
|
|
return data
|
|
except:
|
|
return None
|
|
|
|
|
|
|
|
class ChromiumHeadlessDownloader(Downloader):
|
|
|
|
def __init__(self):
|
|
super().__init__()
|
|
options = Options()
|
|
options.add_argument("--headless=new")
|
|
service = Service("/usr/bin/chromedriver")
|
|
self.driver = webdriver.Chrome(service=service, options=options)
|
|
|
|
|
|
def download(self, url):
|
|
print("Download {}".format(url))
|
|
|
|
self.driver.get(url)
|
|
return driver.page_source
|
|
|
|
|
|
class Extractor(ABC):
|
|
|
|
def __init__(self):
|
|
self.header = {}
|
|
self.events = []
|
|
|
|
@abstractmethod
|
|
def extract(self, content, url, url_human = None):
|
|
pass
|
|
|
|
def set_header(self, url):
|
|
self.header["url"] = url
|
|
self.header["date"] = datetime.now()
|
|
|
|
def clear_events(self):
|
|
self.events = []
|
|
|
|
def add_event(self, title, category, start_day, location, description, tags, uuid, url_human=None, start_time=None, end_day=None, end_time=None, last_modified=None, published=False):
|
|
if title is None:
|
|
print("ERROR: cannot import an event without name")
|
|
return
|
|
if start_day is None:
|
|
print("ERROR: cannot import an event without start day")
|
|
return
|
|
|
|
event = {
|
|
"title": title,
|
|
"category": category,
|
|
"start_day": start_day,
|
|
"uuid": uuid,
|
|
"location": location,
|
|
"description": description,
|
|
"tags": tags,
|
|
"published": published
|
|
}
|
|
if url_human is not None:
|
|
event["url_human"] = url_human
|
|
if start_time is not None:
|
|
event["start_time"] = start_time
|
|
if end_day is not None:
|
|
event["end_day"] = end_day
|
|
if end_time is not None:
|
|
event["end_time"] = end_time
|
|
|
|
if last_modified is not None:
|
|
event["last_modified"] = last_modified
|
|
|
|
self.events.append(event)
|
|
|
|
def default_value_if_exists(self, default_values, key):
|
|
return default_values[key] if default_values is not None and key in default_values else None
|
|
|
|
def get_structure(self):
|
|
return { "header": self.header, "events": self.events}
|
|
|
|
class ICALExtractor(Extractor):
|
|
|
|
def __init__(self):
|
|
super().__init__()
|
|
|
|
def get_item_from_vevent(self, event, name, raw = False):
|
|
try:
|
|
r = event.decoded(name)
|
|
if raw:
|
|
return r
|
|
else:
|
|
return r.decode()
|
|
except:
|
|
return None
|
|
|
|
def get_dt_item_from_vevent(self, event, name):
|
|
item = self.get_item_from_vevent(event, name, raw = True)
|
|
|
|
day = None
|
|
time = None
|
|
|
|
if item is not None:
|
|
if isinstance(item, datetime):
|
|
day = item.date()
|
|
time = item.time()
|
|
elif isinstance(item, date):
|
|
day = item
|
|
time = None
|
|
|
|
return day, time
|
|
|
|
|
|
def extract(self, content, url, url_human = None, default_values = None, published = False):
|
|
print("Extracting ical events from {}".format(url))
|
|
self.set_header(url)
|
|
self.clear_events()
|
|
self.uuids = {}
|
|
|
|
calendar = icalendar.Calendar.from_ical(content)
|
|
|
|
for event in calendar.walk('VEVENT'):
|
|
title = self.get_item_from_vevent(event, "SUMMARY")
|
|
category = self.default_value_if_exists(default_values, "category")
|
|
|
|
start_day, start_time = self.get_dt_item_from_vevent(event, "DTSTART")
|
|
|
|
end_day, end_time = self.get_dt_item_from_vevent(event, "DTEND")
|
|
|
|
location = self.get_item_from_vevent(event, "LOCATION")
|
|
if location is None:
|
|
location = self.default_value_if_exists(default_values, "location")
|
|
|
|
description = self.get_item_from_vevent(event, "DESCRIPTION")
|
|
if description is not None:
|
|
soup = BeautifulSoup(description)
|
|
delimiter = '\n'
|
|
for line_break in soup.findAll('br'):
|
|
line_break.replaceWith(delimiter)
|
|
description = soup.get_text()
|
|
|
|
last_modified = self.get_item_from_vevent(event, "LAST_MODIFIED")
|
|
|
|
uuid = self.get_item_from_vevent(event, "UID")
|
|
|
|
if uuid is not None:
|
|
if uuid in self.uuids:
|
|
self.uuids[uuid] += 1
|
|
uuid += ":{:04}".format(self.uuids[uuid] - 1)
|
|
else:
|
|
self.uuids[uuid] = 1
|
|
event_url = url + "#" + uuid
|
|
|
|
tags = self.default_value_if_exists(default_values, "tags")
|
|
|
|
last_modified = self.get_item_from_vevent(event, "LAST-MODIFIED", raw = True)
|
|
|
|
rrule = self.get_item_from_vevent(event, "RRULE", raw = True)
|
|
if rrule is not None:
|
|
print("Recurrent event not yet supported", rrule)
|
|
|
|
self.add_event(title, category, start_day, location, description, tags, uuid=event_url, url_human=url_human, start_time=start_time, end_day=end_day, end_time=end_time, last_modified=last_modified, published=published)
|
|
|
|
return self.get_structure()
|
|
|
|
|
|
|
|
class URL2Events:
|
|
|
|
def __init__(self, downloader, extractor):
|
|
|
|
self.downloader = downloader
|
|
self.extractor = extractor
|
|
|
|
def process(self, url, url_human = None, cache = None, default_values = None, published = False):
|
|
|
|
if cache and os.path.exists(cache):
|
|
print("Loading cache ({})".format(cache))
|
|
with open(cache) as f:
|
|
content = "\n".join(f.readlines())
|
|
else:
|
|
content = self.downloader.download(url)
|
|
|
|
if cache:
|
|
print("Saving cache ({})".format(cache))
|
|
dir = os.path.dirname(cache)
|
|
if dir != "" and not os.path.exists(dir):
|
|
os.makedirs(dir)
|
|
with open(cache, "w") as text_file:
|
|
text_file.write(content)
|
|
|
|
return self.extractor.extract(content, url, url_human, default_values, published)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
|
|
u2e = URL2Events(SimpleDownloader(), ICALExtractor())
|
|
url = "https://calendar.google.com/calendar/ical/programmation.lesaugustes%40gmail.com/public/basic.ics"
|
|
url_human = "https://www.cafelesaugustes.fr/la-programmation/"
|
|
|
|
events = u2e.process(url, url_human, cache = "cache-augustes.ical", default_values = {"category": "Autre", "location": "Café lecture les Augustes"}, published = True)
|
|
|
|
exportfile = "events-augustes.json"
|
|
print("Saving events to file {}".format(exportfile))
|
|
with open(exportfile, "w") as f:
|
|
json.dump(events, f, indent=4, default=str)
|