226 lines
9.2 KiB
Python
Executable File
226 lines
9.2 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
|
|
"""A simple module to get the RDAP server for a given domain name, IP
|
|
prefix or object, from the IANA databases specified in RFC 9224/8521.
|
|
"""
|
|
|
|
# http://python-requests.org/ for easier HTTPS retrieval
|
|
import requests
|
|
|
|
import datetime
|
|
import json
|
|
import os
|
|
import sys
|
|
import time
|
|
import fcntl
|
|
import pickle
|
|
import pathlib
|
|
|
|
IANABASES = {"domains": "https://data.iana.org/rdap/dns.json",
|
|
"v4prefixes": "https://data.iana.org/rdap/ipv4.json",
|
|
"v6prefixes": "https://data.iana.org/rdap/ipv6.json",
|
|
"as": "https://data.iana.org/rdap/asn.json",
|
|
"objects": "https://data.iana.org/rdap/object-tags.json"}
|
|
CACHE = os.environ["HOME"] + "/.ianardapcaches"
|
|
MAXAGE = 24 # Hours. Used only if the server no longer gives the information.
|
|
IANATIMEOUT = 10 # Seconds
|
|
MAXTESTS = 3 # Maximum attempts to get the database
|
|
|
|
# Don't touch
|
|
HTTP_DATE_FORMAT = "%a, %d %b %Y %H:%M:%S %Z"
|
|
|
|
# RFC 9111, section 5.2
|
|
def parse_cachecontrol(h):
|
|
result = {}
|
|
directives = h.split(",")
|
|
for directive in directives:
|
|
directive = directive.strip()
|
|
if "=" in directive:
|
|
(key, value) = directive.split("=")
|
|
else:
|
|
key = directive
|
|
value = None
|
|
result[key.lower()] = value
|
|
return result
|
|
|
|
def parse_expires(h):
|
|
d = datetime.datetime.strptime(h, HTTP_DATE_FORMAT)
|
|
return d
|
|
|
|
class IanaRDAPDatabase():
|
|
|
|
def __init__(self, category="domains", maxage=None, cachedir=CACHE,
|
|
pickleformat=False):
|
|
"""Retrieves the IANA database, if not already cached. maxage is in
|
|
hours. The cachedir is a directory (it will be created if not already
|
|
existant). pickleformat is not the default because it is not really
|
|
faster *and* it introduces security risks if someone can write in the
|
|
file (see the documentation of the module).
|
|
|
|
"""
|
|
|
|
cache_valid = False
|
|
if not os.path.exists(cachedir):
|
|
os.mkdir(cachedir)
|
|
self.category = category
|
|
cachefile = os.path.join(cachedir, category)
|
|
if pickleformat:
|
|
self.cachefile = cachefile + ".pickle"
|
|
else:
|
|
self.cachefile = cachefile + ".json"
|
|
self.lockname = self.cachefile + ".lock"
|
|
self.expirationfile = self.cachefile + ".expires"
|
|
if maxage is not None:
|
|
with open(self.expirationfile, 'w'):
|
|
self.expirationtime = time.mktime((datetime.datetime.now() + \
|
|
datetime.timedelta(hours=maxage)).timetuple())
|
|
os.utime(self.expirationfile,
|
|
times = (self.expirationtime, self.expirationtime))
|
|
loaded = False
|
|
tests = 0
|
|
errmsg = "No error"
|
|
while not loaded and tests < MAXTESTS:
|
|
self.lock()
|
|
if os.path.exists(self.cachefile) and \
|
|
(pathlib.Path(self.expirationfile).exists() and \
|
|
datetime.datetime.fromtimestamp(os.path.getmtime(self.expirationfile)) > \
|
|
datetime.datetime.now()):
|
|
cache = open(self.cachefile, "rb")
|
|
content = cache.read()
|
|
cache.close()
|
|
self.unlock()
|
|
if pickleformat:
|
|
try:
|
|
database = pickle.loads(content)
|
|
loaded = True
|
|
self.retrieved = datetime.datetime.fromtimestamp(os.path.getmtime(self.cachefile))
|
|
cache_valid = True
|
|
except (pickle.UnpicklingError, EOFError):
|
|
tests += 1
|
|
errmsg = "Invalid pickle content in %s" % self.cachefile
|
|
# Delete it without mercy
|
|
os.remove(self.cachefile)
|
|
continue
|
|
else:
|
|
try:
|
|
database = json.loads(content)
|
|
loaded = True
|
|
self.retrieved = datetime.datetime.fromtimestamp(os.path.getmtime(self.cachefile))
|
|
cache_valid = True
|
|
except json.decoder.JSONDecodeError:
|
|
tests += 1
|
|
errmsg = "Invalid JSON content in %s" % self.cachefile
|
|
# Delete it without mercy
|
|
os.remove(self.cachefile)
|
|
continue
|
|
else:
|
|
self.unlock()
|
|
response = requests.get(IANABASES[category], timeout=IANATIMEOUT)
|
|
expirationtime = None
|
|
if "cache-control" in response.headers:
|
|
directives = parse_cachecontrol(response.headers["cache-control"])
|
|
if "max-age" in directives:
|
|
maxage = int(directives["max-age"])
|
|
expirationtime = datetime.datetime.now() + datetime.timedelta(seconds=maxage)
|
|
if not expirationtime:
|
|
if "expires" in response.headers:
|
|
expirationtime = parse_expires(response.headers["expires"])
|
|
else:
|
|
expirationtime = datetime.datetime.now() + datetime.timedelta(hours=MAXAGE)
|
|
self.expirationtime = time.mktime(expirationtime.timetuple())
|
|
if response.status_code != 200:
|
|
time.sleep(2)
|
|
tests += 1
|
|
errmsg = "Invalid HTTPS return code when trying to get %s: %s" % (IANABASE, response.status_code)
|
|
continue
|
|
else:
|
|
loaded = True
|
|
self.retrieved = datetime.datetime.now()
|
|
try:
|
|
content = response.content
|
|
database = json.loads(content)
|
|
with open(self.expirationfile, 'w'):
|
|
os.utime(self.expirationfile,
|
|
times = (self.expirationtime, self.expirationtime))
|
|
except json.decoder.JSONDecodeError:
|
|
tests += 1
|
|
errmsg = "Invalid JSON retrieved from %s" % IANABASE
|
|
continue
|
|
if not loaded:
|
|
raise Exception("Cannot read IANA database: %s" % errmsg)
|
|
self.description = database["description"]
|
|
self.publication = database["publication"]
|
|
self.version = database["version"]
|
|
self.services = {}
|
|
if self.category == "domains":
|
|
for service in database["services"]:
|
|
for tld in service[0]:
|
|
if tld.lower() not in self.services:
|
|
self.services[tld.lower()] =[]
|
|
for server in service[1]:
|
|
# server is an URL so case-sensitive.
|
|
self.services[tld.lower()].append(server)
|
|
elif self.category == "objects":
|
|
for service in database["services"]:
|
|
maintainer = service[0]
|
|
for registry in service[1]:
|
|
if registry.upper() not in self.services:
|
|
self.services[registry.upper()] =[]
|
|
for server in service[2]:
|
|
self.services[registry.upper()]
|
|
for server in service[2]:
|
|
self.services[registry.upper()].append(server)
|
|
else: # IP addresses will be complicated, because of the
|
|
# longest prefix rule.
|
|
raise Exception("Unsupported category %s" % self.category)
|
|
if not cache_valid:
|
|
self.lock()
|
|
cache = open(self.cachefile, "wb")
|
|
if pickleformat:
|
|
cache.write(pickle.dumps(database))
|
|
else:
|
|
cache.write(content)
|
|
cache.close()
|
|
self.unlock()
|
|
|
|
def lock(self):
|
|
self.lockhandle = open(self.lockname, 'w')
|
|
fcntl.lockf(self.lockhandle, fcntl.LOCK_EX)
|
|
|
|
def unlock(self):
|
|
fcntl.lockf(self.lockhandle, fcntl.LOCK_UN)
|
|
self.lockhandle.close()
|
|
|
|
def find(self, id):
|
|
"""Get the RDAP server(s), as an array, for a given identifier. None
|
|
if there is none."""
|
|
if self.category == "domains":
|
|
domain = id
|
|
if domain.endswith("."):
|
|
domain = domain[:-1]
|
|
labels = domain.lower().split(".")
|
|
tld = labels[len(labels)-1]
|
|
if tld in self.services:
|
|
return self.services[tld]
|
|
else:
|
|
return None
|
|
elif self.category == "objects":
|
|
try:
|
|
(handle, registry) = id.rsplit("-", maxsplit=1)
|
|
except ValueError:
|
|
raise Exception("Not a valid RFC 8521 identifier: \"%s\"" % id)
|
|
if registry.upper() in self.services:
|
|
return self.services[registry.upper()]
|
|
else:
|
|
return None
|
|
else:
|
|
raise Exception("Unsupported category %s" % self.category)
|
|
|
|
if __name__ == "__main__":
|
|
rdap = IanaRDAPDatabase(maxage=1)
|
|
print("Database \"%s\", version %s published on %s, retrieved on %s, %i services" % \
|
|
(rdap.description, rdap.version, rdap.publication, rdap.retrieved, len(rdap.services)))
|
|
for domain in sys.argv[1:]:
|
|
print("%s -> %s" % (domain, rdap.find(domain)))
|
|
|