#!/usr/bin/env python3 """A simple module to get the RDAP server for a given domain name, from the IANA database specified in RFC 9224. """ # http://python-requests.org/ for easier HTTPS retrieval import requests import datetime import json import os import sys import time import fcntl import pickle import pathlib IANABASE = "https://data.iana.org/rdap/dns.json" CACHE = os.environ["HOME"] + "/.ianardapcache" MAXAGE = 24 # Hours. Used only if the server no longer gives the information. IANATIMEOUT = 10 # Seconds MAXTESTS = 3 # Maximum attempts to get the database # Don't touch HTTP_DATE_FORMAT = "%a, %d %b %Y %H:%M:%S %Z" # RFC 9111, section 5.2 def parse_cachecontrol(h): result = {} directives = h.split(",") for directive in directives: directive = directive.strip() if "=" in directive: (key, value) = directive.split("=") else: key = directive value = None result[key.lower()] = value return result def parse_expires(h): d = datetime.datetime.strptime(h, HTTP_DATE_FORMAT) return d class IanaRDAPDatabase(): def __init__(self, maxage=MAXAGE, cachefile=CACHE, pickleformat=False): """Retrieves the IANA database, if not already cached. maxage is in hours. The cache file argument should not have an extension (it will be added automatically). pickleformat is not the default because it is not really faster *and* it introduces security risks if someone can write in the file (see the documentation of the module).""" cache_valid = False if pickleformat: self.cachefile = cachefile + ".pickle" else: self.cachefile = cachefile + ".json" self.lockname = self.cachefile + ".lock" self.expirationfile = self.cachefile + ".expires" loaded = False tests = 0 errmsg = "No error" while not loaded and tests < MAXTESTS: self.lock() if os.path.exists(self.cachefile) and \ (pathlib.Path(self.expirationfile).exists() and \ datetime.datetime.fromtimestamp(os.path.getmtime(self.expirationfile)) > \ datetime.datetime.now()): cache = open(self.cachefile, "rb") content = cache.read() cache.close() self.unlock() if pickleformat: try: database = pickle.loads(content) loaded = True self.retrieved = datetime.datetime.fromtimestamp(os.path.getmtime(self.cachefile)) cache_valid = True except (pickle.UnpicklingError, EOFError): tests += 1 errmsg = "Invalid pickle content in %s" % self.cachefile # Delete it without mercy os.remove(self.cachefile) continue else: try: database = json.loads(content) loaded = True self.retrieved = datetime.datetime.fromtimestamp(os.path.getmtime(self.cachefile)) cache_valid = True except json.decoder.JSONDecodeError: tests += 1 errmsg = "Invalid JSON content in %s" % self.cachefile # Delete it without mercy os.remove(self.cachefile) continue else: self.unlock() response = requests.get(IANABASE, timeout=IANATIMEOUT) expirationtime = None if "cache-control" in response.headers: directives = parse_cachecontrol(response.headers["cache-control"]) if "max-age" in directives: maxage = int(directives["max-age"]) expirationtime = datetime.datetime.now() + datetime.timedelta(seconds=maxage) if not expirationtime: if "expires" in response.headers: expirationtime = parse_expires(response.headers["expires"]) else: expirationtime = datetime.datetime.now() + datetime.timedelta(hours=MAXAGE) self.expirationtime = time.mktime(expirationtime.timetuple()) if response.status_code != 200: time.sleep(2) tests += 1 errmsg = "Invalid HTTPS return code when trying to get %s: %s" % (IANABASE, response.status_code) continue else: loaded = True self.retrieved = datetime.datetime.now() try: content = response.content database = json.loads(content) with open(self.expirationfile, 'w'): os.utime(self.expirationfile, times = (self.expirationtime, self.expirationtime)) except json.decoder.JSONDecodeError: tests += 1 errmsg = "Invalid JSON retrieved from %s" % IANABASE continue if not loaded: raise Exception("Cannot read IANA database: %s" % errmsg) self.description = database["description"] self.publication = database["publication"] self.version = database["version"] self.services = {} for service in database["services"]: for tld in service[0]: for server in service[1]: self.services[tld] = server if not cache_valid: self.lock() cache = open(self.cachefile, "wb") if pickleformat: cache.write(pickle.dumps(database)) else: cache.write(content) cache.close() self.unlock() def lock(self): self.lockhandle = open(self.lockname, 'w') fcntl.lockf(self.lockhandle, fcntl.LOCK_EX) def unlock(self): fcntl.lockf(self.lockhandle, fcntl.LOCK_UN) self.lockhandle.close() def find(self, domain): """ Get the RDAP server for a given domain name. None if there is none.""" if domain.endswith("."): domain = domain[:-1] labels = domain.split(".") tld = labels[len(labels)-1] if tld in self.services: return self.services[tld] else: return None if __name__ == "__main__": rdap = IanaRDAPDatabase(maxage=1) print("Database \"%s\", version %s published on %s, retrieved on %s, %i services" % \ (rdap.description, rdap.version, rdap.publication, rdap.retrieved, len(rdap.services))) for domain in sys.argv[1:]: print("%s -> %s" % (domain, rdap.find(domain)))