#!/usr/bin/python3 # coding: utf-8 from abc import ABC, abstractmethod from urllib.parse import urlparse import urllib.request import os from selenium import webdriver from selenium.webdriver.chrome.service import Service from selenium.webdriver.chrome.options import Options import icalendar from icalendar import vDatetime from datetime import datetime, date import json from bs4 import BeautifulSoup import pickle class Downloader(ABC): def __init__(self): pass @abstractmethod def download(self, url): pass class SimpleDownloader(Downloader): def __init__(self): super().__init__() def download(self, url): print("Downloading {}".format(url)) try: resource = urllib.request.urlopen(url) data = resource.read().decode(resource.headers.get_content_charset()) return data except: return None class ChromiumHeadlessDownloader(Downloader): def __init__(self): super().__init__() options = Options() options.add_argument("--headless=new") service = Service("/usr/bin/chromedriver") self.driver = webdriver.Chrome(service=service, options=options) def download(self, url): print("Download {}".format(url)) self.driver.get(url) return driver.page_source class Extractor(ABC): def __init__(self): self.header = {} self.events = [] @abstractmethod def extract(self, content, url, url_human = None): pass def set_header(self, url): self.header["url"] = url self.header["date"] = datetime.now() def clear_events(self): self.events = [] def add_event(self, title, category, start_day, location, description, tags, uuid, recurrences=None, url_human=None, start_time=None, end_day=None, end_time=None, last_modified=None, published=False): if title is None: print("ERROR: cannot import an event without name") return if start_day is None: print("ERROR: cannot import an event without start day") return event = { "title": title, "category": category, "start_day": start_day, "uuid": uuid, "location": location, "description": description, "tags": tags, "published": published } if url_human is not None: event["url_human"] = url_human if start_time is not None: event["start_time"] = start_time if end_day is not None: event["end_day"] = end_day if end_time is not None: event["end_time"] = end_time if last_modified is not None: event["last_modified"] = last_modified if recurrences is not None: event["recurrences"] = recurrences self.events.append(event) def default_value_if_exists(self, default_values, key): return default_values[key] if default_values is not None and key in default_values else None def get_structure(self): return { "header": self.header, "events": self.events} class ICALExtractor(Extractor): def __init__(self): super().__init__() def get_item_from_vevent(self, event, name, raw = False): try: r = event.decoded(name) if raw: return r else: return r.decode() except: return None def get_dt_item_from_vevent(self, event, name): item = self.get_item_from_vevent(event, name, raw = True) day = None time = None if item is not None: if isinstance(item, datetime): day = item.date() time = item.time() elif isinstance(item, date): day = item time = None return day, time def extract(self, content, url, url_human = None, default_values = None, published = False): print("Extracting ical events from {}".format(url)) self.set_header(url) self.clear_events() self.uuids = {} calendar = icalendar.Calendar.from_ical(content) for event in calendar.walk('VEVENT'): title = self.get_item_from_vevent(event, "SUMMARY") category = self.default_value_if_exists(default_values, "category") start_day, start_time = self.get_dt_item_from_vevent(event, "DTSTART") end_day, end_time = self.get_dt_item_from_vevent(event, "DTEND") location = self.get_item_from_vevent(event, "LOCATION") if location is None: location = self.default_value_if_exists(default_values, "location") description = self.get_item_from_vevent(event, "DESCRIPTION") if description is not None: soup = BeautifulSoup(description) delimiter = '\n' for line_break in soup.findAll('br'): line_break.replaceWith(delimiter) description = soup.get_text() last_modified = self.get_item_from_vevent(event, "LAST_MODIFIED") uuid = self.get_item_from_vevent(event, "UID") if uuid is not None: if uuid in self.uuids: self.uuids[uuid] += 1 uuid += ":{:04}".format(self.uuids[uuid] - 1) else: self.uuids[uuid] = 1 event_url = url + "#" + uuid tags = self.default_value_if_exists(default_values, "tags") last_modified = self.get_item_from_vevent(event, "LAST-MODIFIED", raw = True) recurrence_entries = {} for e in ["RRULE", "EXRULE", "EXDATE", "RDATE"]: i = self.get_item_from_vevent(event, e, raw = True) if i is not None: recurrence_entries[e] = i if start_day is not None and len(recurrence_entries) != 0: recurrences = "" for k, r in recurrence_entries.items(): if isinstance(r, list): recurrences += "\n".join([k + ":" + e.to_ical().decode() for e in r]) + "\n" else: recurrences += k + ":" + r.to_ical().decode() + "\n" else: recurrences = None self.add_event(title, category, start_day, location, description, tags, recurrences=recurrences, uuid=event_url, url_human=url_human, start_time=start_time, end_day=end_day, end_time=end_time, last_modified=last_modified, published=published) return self.get_structure() class URL2Events: def __init__(self, downloader, extractor): self.downloader = downloader self.extractor = extractor def process(self, url, url_human = None, cache = None, default_values = None, published = False): if cache and os.path.exists(cache): print("Loading cache ({})".format(cache)) with open(cache) as f: content = "\n".join(f.readlines()) else: content = self.downloader.download(url) if cache: print("Saving cache ({})".format(cache)) dir = os.path.dirname(cache) if dir != "" and not os.path.exists(dir): os.makedirs(dir) with open(cache, "w") as text_file: text_file.write(content) return self.extractor.extract(content, url, url_human, default_values, published) if __name__ == "__main__": u2e = URL2Events(SimpleDownloader(), ICALExtractor()) url = "https://calendar.google.com/calendar/ical/programmation.lesaugustes%40gmail.com/public/basic.ics" url_human = "https://www.cafelesaugustes.fr/la-programmation/" events = u2e.process(url, url_human, cache = "cache-augustes.ical", default_values = {"category": "Autre", "location": "Café lecture les Augustes"}, published = True) exportfile = "events-augustes.json" print("Saving events to file {}".format(exportfile)) with open(exportfile, "w") as f: json.dump(events, f, indent=4, default=str)