From ae00a81f7998d49e0ae6b07068843d9281620919 Mon Sep 17 00:00:00 2001 From: Stephane Bortzmeyer Date: Wed, 7 Jul 2021 15:23:26 +0200 Subject: [PATCH] New code to retrieve the database, more robust --- check_expire | 4 +-- ianardap.py | 65 ++++++++++++++++++++++++++++++++++-------------- test_ianardap.py | 43 ++++++++++++++++++++++++++++++++ 3 files changed, 92 insertions(+), 20 deletions(-) create mode 100644 test_ianardap.py diff --git a/check_expire b/check_expire index f4ae0af..b2c1c43 100755 --- a/check_expire +++ b/check_expire @@ -54,8 +54,8 @@ def usage(msg=None): def details(): if verbose: - print(" RDAP database \"%s\", version %s published on %s, RDAP server is %s" % \ - (database.description, database.version, database.publication, server)) + print(" RDAP database \"%s\", version %s published on %s, retrieved on %s, RDAP server is %s" % \ + (database.description, database.version, database.publication, database.retrieved, server)) else: print("") diff --git a/ianardap.py b/ianardap.py index 1de37a6..c659849 100755 --- a/ianardap.py +++ b/ianardap.py @@ -12,12 +12,14 @@ import datetime import json import os import sys +import time import fcntl IANABASE = "https://data.iana.org/rdap/dns.json" CACHE = os.environ["HOME"] + "/.ianardapcache.json" MAXAGE = 24 # Hours IANATIMEOUT = 10 # Seconds +MAXTESTS = 3 # Maximum attempts to get the database class IanaRDAPDatabase(): @@ -26,22 +28,49 @@ class IanaRDAPDatabase(): cache_valid = False self.cachefile = cachefile self.lockname = self.cachefile + ".lock" - self.lock() - if os.path.exists(cachefile) and \ - datetime.datetime.fromtimestamp(os.path.getmtime(cachefile)) >= \ - (datetime.datetime.utcnow() - datetime.timedelta(hours = maxage)): - cache = open(cachefile, "rb") - content = cache.read() - cache.close() - cache_valid = True - self.unlock() - else: - self.unlock() - response = requests.get(IANABASE, timeout=IANATIMEOUT) - if response.status_code != 200: - raise Exception("Invalid HTTPS return code when trying to get %s: %s" % (IANABASE, response.status_code)) - content = response.content - database = json.loads(content) + loaded = False + tests = 0 + errmsg = "No error" + while not loaded and tests < MAXTESTS: + self.lock() + if os.path.exists(cachefile) and \ + datetime.datetime.fromtimestamp(os.path.getmtime(cachefile)) > \ + (datetime.datetime.now() - datetime.timedelta(hours = maxage)): + cache = open(cachefile, "rb") + content = cache.read() + cache.close() + self.unlock() + try: + database = json.loads(content) + loaded = True + self.retrieved = datetime.datetime.fromtimestamp(os.path.getmtime(cachefile)) + cache_valid = True + except json.decoder.JSONDecodeError: + tests += 1 + errmsg = "Invalid JSON content in %s" % cachefile + # Delete it without mercy + os.remove(cachefile) + continue + else: + self.unlock() + response = requests.get(IANABASE, timeout=IANATIMEOUT) + if response.status_code != 200: + time.sleep(2) + tests += 1 + errmsg = "Invalid HTTPS return code when trying to get %s: %s" % (IANABASE, response.status_code) + continue + else: + loaded = True + self.retrieved = datetime.datetime.now() + try: + content = response.content + database = json.loads(content) + except json.decoder.JSONDecodeError: + tests += 1 + errmsg = "Invalid JSON retrieved from %s" % IANABASE + continue + if not loaded: + raise Exception("Cannot read IANA database: %s" % errmsg) self.description = database["description"] self.publication = database["publication"] self.version = database["version"] @@ -78,8 +107,8 @@ class IanaRDAPDatabase(): if __name__ == "__main__": rdap = IanaRDAPDatabase(maxage=1) - print("Database \"%s\", version %s published on %s, %i services" % \ - (rdap.description, rdap.version, rdap.publication, len(rdap.services))) + print("Database \"%s\", version %s published on %s, retrieved on %s, %i services" % \ + (rdap.description, rdap.version, rdap.publication, rdap.retrieved, len(rdap.services))) for domain in sys.argv[1:]: print("%s -> %s" % (domain, rdap.find(domain))) diff --git a/test_ianardap.py b/test_ianardap.py new file mode 100644 index 0000000..7a2f449 --- /dev/null +++ b/test_ianardap.py @@ -0,0 +1,43 @@ +#!/usr/bin/env python3 + +"""Test by executing 'pytest' https://docs.pytest.org/ + +Don't forget to set PYTHONPATH if you want to test the development +tree and not the installed package. """ + +import ianardap + +import tempfile +import os.path +import datetime + +def test_basic(): + database = ianardap.IanaRDAPDatabase() + assert database.description.startswith("RDAP bootstrap file") and database.version == "1.0" and \ + len(database.services) > 1000 + +def test_alternative_cache(): + tmpfile = tempfile.NamedTemporaryFile(suffix=".testianacache", delete=False) + database = ianardap.IanaRDAPDatabase(cachefile=tmpfile.name, maxage=0) + assert os.path.exists(tmpfile.name) and \ + datetime.datetime.fromtimestamp(os.path.getmtime(tmpfile.name)) > \ + (datetime.datetime.now() - datetime.timedelta(minutes=1)) + os.remove(tmpfile.name) + os.remove(tmpfile.name + ".lock") + +def test_refresh(): + # Force a resfresh + database = ianardap.IanaRDAPDatabase(maxage=0) + assert (database.retrieved > (datetime.datetime.now() - datetime.timedelta(minutes=1))) and \ + (datetime.datetime.fromtimestamp(os.path.getmtime(database.cachefile)) > \ + (datetime.datetime.now() - datetime.timedelta(minutes=1))) + +def test_find_exists(): + database = ianardap.IanaRDAPDatabase() + server = database.find("www.foobar.ar") + assert server == "https://rdap.nic.ar/" + +def test_find_not_exists(): + database = ianardap.IanaRDAPDatabase() + server = database.find("www.foobar.example") + assert server is None