#!/usr/bin/env python3 """A simple module to get the RDAP server for a given domain name, from the IANA database specified in RFC 7484. """ # http://python-requests.org/ for easier HTTPS retrieval import requests import datetime import json import os import sys import time import fcntl IANABASE = "https://data.iana.org/rdap/dns.json" CACHE = os.environ["HOME"] + "/.ianardapcache.json" MAXAGE = 24 # Hours IANATIMEOUT = 10 # Seconds MAXTESTS = 3 # Maximum attempts to get the database class IanaRDAPDatabase(): def __init__(self, maxage=MAXAGE, cachefile=CACHE): """ Retrieves the IANA databse, if not already cached. maxage is in hours. """ cache_valid = False self.cachefile = cachefile self.lockname = self.cachefile + ".lock" loaded = False tests = 0 errmsg = "No error" while not loaded and tests < MAXTESTS: self.lock() if os.path.exists(cachefile) and \ datetime.datetime.fromtimestamp(os.path.getmtime(cachefile)) > \ (datetime.datetime.now() - datetime.timedelta(hours = maxage)): cache = open(cachefile, "rb") content = cache.read() cache.close() self.unlock() try: database = json.loads(content) loaded = True self.retrieved = datetime.datetime.fromtimestamp(os.path.getmtime(cachefile)) cache_valid = True except json.decoder.JSONDecodeError: tests += 1 errmsg = "Invalid JSON content in %s" % cachefile # Delete it without mercy os.remove(cachefile) continue else: self.unlock() response = requests.get(IANABASE, timeout=IANATIMEOUT) if response.status_code != 200: time.sleep(2) tests += 1 errmsg = "Invalid HTTPS return code when trying to get %s: %s" % (IANABASE, response.status_code) continue else: loaded = True self.retrieved = datetime.datetime.now() try: content = response.content database = json.loads(content) except json.decoder.JSONDecodeError: tests += 1 errmsg = "Invalid JSON retrieved from %s" % IANABASE continue if not loaded: raise Exception("Cannot read IANA database: %s" % errmsg) self.description = database["description"] self.publication = database["publication"] self.version = database["version"] self.services = {} for service in database["services"]: for tld in service[0]: for server in service[1]: self.services[tld] = server if not cache_valid: self.lock() cache = open(cachefile, "wb") cache.write(content) cache.close() self.unlock() def lock(self): self.lockhandle = open(self.lockname, 'w') fcntl.lockf(self.lockhandle, fcntl.LOCK_EX) def unlock(self): fcntl.lockf(self.lockhandle, fcntl.LOCK_UN) self.lockhandle.close() def find(self, domain): """ Get the RDAP server for a given domain name. None if there is none.""" if domain.endswith("."): domain = domain[:-1] labels = domain.split(".") tld = labels[len(labels)-1] if tld in self.services: return self.services[tld] else: return None if __name__ == "__main__": rdap = IanaRDAPDatabase(maxage=1) print("Database \"%s\", version %s published on %s, retrieved on %s, %i services" % \ (rdap.description, rdap.version, rdap.publication, rdap.retrieved, len(rdap.services))) for domain in sys.argv[1:]: print("%s -> %s" % (domain, rdap.find(domain)))