From a117da20b19f0379b0061c18e74b8857e66e3e1f Mon Sep 17 00:00:00 2001 From: Stephane Bortzmeyer Date: Wed, 9 Nov 2022 15:01:28 +0000 Subject: [PATCH] New ianardap module, with support for several RDAP servers --- check_expire | 38 +++++++++++++++------- ianardap.py | 89 ++++++++++++++++++++++++++++++++++++++-------------- 2 files changed, 92 insertions(+), 35 deletions(-) diff --git a/check_expire b/check_expire index f651817..8e1ab29 100755 --- a/check_expire +++ b/check_expire @@ -99,7 +99,7 @@ try: usage() sys.exit(STATE_OK) elif option == "--hostname" or option == "-H": - domain = value.lower() + domain = value elif option == "--timeout" or option == "-t": timeout = int(value) elif option == "--unixtime" or option == "-u": @@ -128,17 +128,33 @@ try: database = ianardap.IanaRDAPDatabase() except Exception as e: unknown("Exception when retrieving the IANA database: \"%s\"" % e) -server = database.find(domain) -if server is None: +servers = database.find(domain) +if servers is None: unknown("No RDAP server found for %s" % domain) -if server.endswith("/"): - server = server[:-1] # Donuts RDAP server balks when there are two slashes and reply 404 -try: - response = requests.get("%s/domain/%s" % (server, domain), timeout=timeout) -except requests.exceptions.Timeout: - unknown("Timeout when trying to reach %s" % server) -if response.status_code != 200: - error("Invalid RDAP return code: %s" % response.status_code) +# find may return several RDAP servers +found = False +unknowns= "" +errors = "" +for server in servers: + if server.endswith("/"): + server = server[:-1] # Donuts RDAP server balks when there are two slashes and reply 404 + try: + response = requests.get("%s/domain/%s" % (server, domain), timeout=timeout) + except requests.exceptions.Timeout: + unknowns += "Timeout when trying to reach %s " % server + if response.status_code != 200: + errors += "Invalid RDAP return code at %s: %s " % \ + (server, response.status_code) + else: + found = True + break +if not found: + if errors != "": + error(errors) + elif unknowns != "": + unknown(unknowns) + else: + unknown("No working server found and no error messages") rdap = json.loads(response.content) for event in rdap["events"]: if event["eventAction"] == "expiration": diff --git a/ianardap.py b/ianardap.py index 8e0cb6e..2bb401f 100755 --- a/ianardap.py +++ b/ianardap.py @@ -1,8 +1,7 @@ #!/usr/bin/env python3 -"""A simple module to get the RDAP server for a given domain name, -from the IANA database specified in RFC 9224. - +"""A simple module to get the RDAP server for a given domain name, IP +prefix or object, from the IANA databases specified in RFC 9224/8521. """ # http://python-requests.org/ for easier HTTPS retrieval @@ -17,8 +16,12 @@ import fcntl import pickle import pathlib -IANABASE = "https://data.iana.org/rdap/dns.json" -CACHE = os.environ["HOME"] + "/.ianardapcache" +IANABASES = {"domains": "https://data.iana.org/rdap/dns.json", + "v4prefixes": "https://data.iana.org/rdap/ipv4.json", + "v6prefixes": "https://data.iana.org/rdap/ipv6.json", + "as": "https://data.iana.org/rdap/asn.json", + "objects": "https://data.iana.org/rdap/object-tags.json"} +CACHE = os.environ["HOME"] + "/.ianardapcaches" MAXAGE = 24 # Hours. Used only if the server no longer gives the information. IANATIMEOUT = 10 # Seconds MAXTESTS = 3 # Maximum attempts to get the database @@ -46,13 +49,20 @@ def parse_expires(h): class IanaRDAPDatabase(): - def __init__(self, maxage=MAXAGE, cachefile=CACHE, pickleformat=False): + def __init__(self, category="domains", maxage=MAXAGE, cachedir=CACHE, + pickleformat=False): """Retrieves the IANA database, if not already cached. maxage is in -hours. The cache file argument should not have an extension (it will -be added automatically). pickleformat is not the default because it is -not really faster *and* it introduces security risks if someone can -write in the file (see the documentation of the module).""" +hours. The cachedir is a directory (it will be created if not already +existant). pickleformat is not the default because it is not really +faster *and* it introduces security risks if someone can write in the +file (see the documentation of the module). + """ + cache_valid = False + if not os.path.exists(cachedir): + os.mkdir(cachedir) + self.category = category + cachefile = os.path.join(cachedir, category) if pickleformat: self.cachefile = cachefile + ".pickle" else: @@ -98,7 +108,7 @@ write in the file (see the documentation of the module).""" continue else: self.unlock() - response = requests.get(IANABASE, timeout=IANATIMEOUT) + response = requests.get(IANABASES[category], timeout=IANATIMEOUT) expirationtime = None if "cache-control" in response.headers: directives = parse_cachecontrol(response.headers["cache-control"]) @@ -135,10 +145,27 @@ write in the file (see the documentation of the module).""" self.publication = database["publication"] self.version = database["version"] self.services = {} - for service in database["services"]: - for tld in service[0]: - for server in service[1]: - self.services[tld] = server + if self.category == "domains": + for service in database["services"]: + for tld in service[0]: + if tld.lower() not in self.services: + self.services[tld.lower()] =[] + for server in service[1]: + # server is an URL so case-sensitive. + self.services[tld.lower()].append(server) + elif self.category == "objects": + for service in database["services"]: + maintainer = service[0] + for registry in service[1]: + if registry.upper() not in self.services: + self.services[registry.upper()] =[] + for server in service[2]: + self.services[registry.upper()] + for server in service[2]: + self.services[registry.upper()].append(server) + else: # IP addresses will be complicated, because of the + # longest prefix rule. + raise Exception("Unsupported category %s" % self.category) if not cache_valid: self.lock() cache = open(self.cachefile, "wb") @@ -157,16 +184,30 @@ write in the file (see the documentation of the module).""" fcntl.lockf(self.lockhandle, fcntl.LOCK_UN) self.lockhandle.close() - def find(self, domain): - """ Get the RDAP server for a given domain name. None if there is none.""" - if domain.endswith("."): - domain = domain[:-1] - labels = domain.split(".") - tld = labels[len(labels)-1] - if tld in self.services: - return self.services[tld] + def find(self, id): + """Get the RDAP server(s), as an array, for a given identifier. None +if there is none. TODO port check_expire on that""" + if self.category == "domains": + domain = id + if domain.endswith("."): + domain = domain[:-1] + labels = domain.lower().split(".") + tld = labels[len(labels)-1] + if tld in self.services: + return self.services[tld] + else: + return None + elif self.category == "objects": + try: + (handle, registry) = id.rsplit("-", maxsplit=1) + except ValueError: + raise Exception("Not a valid RFC 8521 identifier: \"%s\"" % id) + if registry.upper() in self.services: + return self.services[registry.upper()] + else: + return None else: - return None + raise Exception("Unsupported category %s" % self.category) if __name__ == "__main__": rdap = IanaRDAPDatabase(maxage=1)