#!/usr/bin/python3 # coding: utf-8 from abc import ABC, abstractmethod from urllib.parse import urlparse import urllib.request import os from selenium import webdriver from selenium.webdriver.chrome.service import Service from selenium.webdriver.chrome.options import Options import icalendar from datetime import datetime, date class Downloader(ABC): def __init__(self): pass @abstractmethod def download(self, url): pass class SimpleDownloader(Downloader): def __init__(self): super().__init__() def download(self, url): print("Downloading {}".format(url)) try: resource = urllib.request.urlopen(url) data = resource.read().decode(resource.headers.get_content_charset()) return data except: return None class ChromiumHeadlessDownloader(Downloader): def __init__(self): super().__init__() options = Options() options.add_argument("--headless=new") service = Service("/usr/bin/chromedriver") self.driver = webdriver.Chrome(service=service, options=options) def download(self, url): print("Download {}".format(url)) self.driver.get(url) return driver.page_source class Extractor(ABC): def __init__(self): self.header = {} self.events = [] @abstractmethod def extract(self, content, url, url_human = None): pass def set_header(self, url): self.header["url"] = url self.header["date"] = datetime.now() def clear_events(self): self.events = [] def add_event(self, title, category, start_day, location, description, tags, url=None, url_human=None, start_time=None, end_day=None, end_time=None, last_modified=None, published=False): if title is None: print("ERROR: cannot import an event without name") return if start_day is None: print("ERROR: cannot import an event without start day") return event = { "title": title, "category": category, "start_day": start_day, "location": location, "descritpion": description, "tags": tags, "published": published } if url is not None: event["url"] = url if url_human is not None: event["url_human"] = url_human if start_time is not None: event["start_time"] = start_time if end_day is not None: event["end_day"] = end_day if end_time is not None: event["end_time"] = end_time if last_modified is not None: event["last_modified"] = last_modified self.events.append(event) def default_value_if_exists(self, default_values, key): return default_values[key] if default_values is not None and key in default_values else None def get_structure(self): return { "header": self.header, "events": self.events} class ICALExtractor(Extractor): def __init__(self): super().__init__() def get_item_from_vevent(self, event, name, raw = False): try: r = event.decoded(name) if raw: return r else: return r.decode() except: return None def get_dt_item_from_vevent(self, event, name): item = self.get_item_from_vevent(event, name, raw = True) day = None time = None if item is not None: if isinstance(item, datetime): day = item.date() time = item.time() elif isinstance(item, date): day = item time = None return day, time def extract(self, content, url, url_human = None, default_values = None, published = False): print("Extracting ical events from {}".format(url)) self.set_header(url) self.clear_events() calendar = icalendar.Calendar.from_ical(content) for event in calendar.walk('VEVENT'): title = self.get_item_from_vevent(event, "SUMMARY") category = self.default_value_if_exists(default_values, "category") start_day, start_time = self.get_dt_item_from_vevent(event, "DTSTART") end_day, end_time = self.get_dt_item_from_vevent(event, "DTEND") location = self.get_item_from_vevent(event, "LOCATION") if location is None: location = Zself.default_value_if_exists(default_values, "location") description = self.get_item_from_vevent(event, "DESCRIPTION") last_modified = self.get_item_from_vevent(event, "LAST_MODIFIED") uuid = self.get_item_from_vevent(event, "UID") if uuid is not None: event_url = url + "#" + uuid tags = self.default_value_if_exists(default_values, "tags") last_modified = self.get_item_from_vevent(event, "LAST-MODIFIED", raw = True) rrule = self.get_item_from_vevent(event, "RRULE", raw = True) if rrule is not None: print("Recurrent event not yet supported", rrule) self.add_event(title, category, start_day, location, description, tags, url=event_url, url_human=url_human, start_time=start_time, end_day=end_day, end_time=end_time, last_modified=last_modified, published=published) return self.get_structure() class URL2Events: def __init__(self, downloader, extractor): self.downloader = downloader self.extractor = extractor def process(self, url, url_human = None, cache = None, default_values = None): if cache and os.path.exists(cache): print("Loading cache ({})".format(cache)) with open(cache) as f: content = "\n".join(f.readlines()) else: content = self.downloader.download(url) if cache: print("Saving cache ({})".format(cache)) dir = os.path.dirname(cache) if dir != "" and not os.path.exists(dir): os.makedirs(dir) with open(cache, "w") as text_file: text_file.write(content) return self.extractor.extract(content, url, url_human, default_values) if __name__ == "__main__": u2e = URL2Events(SimpleDownloader(), ICALExtractor()) url = "https://calendar.google.com/calendar/ical/programmation.lesaugustes%40gmail.com/public/basic.ics" url_human = "https://www.cafelesaugustes.fr/la-programmation/" events = u2e.process(url, url_human, cache = "cache-augustes.ical", default_values = {"category": "Autre", "location": "Café lecture les Augustes"}, published = True) #print(events)