Compare commits

...

5 Commits

Author SHA1 Message Date
Stephane Bortzmeyer 9dc05e9c7b Error code outside of its visibility 2023-03-15 19:53:12 +00:00
Stephane Bortzmeyer b3709c64e9 TODO task done 2022-11-09 15:17:35 +00:00
Stephane Bortzmeyer 46538fa620 * Tests updated
* Correct handling of forced refresh
2022-11-09 15:17:03 +00:00
Stephane Bortzmeyer a117da20b1 New ianardap module, with support for several RDAP servers 2022-11-09 15:01:28 +00:00
Stephane Bortzmeyer 42f67a7e64 Domain names are case-insensitive 2022-10-24 18:11:32 +02:00
3 changed files with 105 additions and 41 deletions

View File

@ -128,17 +128,33 @@ try:
database = ianardap.IanaRDAPDatabase() database = ianardap.IanaRDAPDatabase()
except Exception as e: except Exception as e:
unknown("Exception when retrieving the IANA database: \"%s\"" % e) unknown("Exception when retrieving the IANA database: \"%s\"" % e)
server = database.find(domain) servers = database.find(domain)
if server is None: if servers is None:
unknown("No RDAP server found for %s" % domain) unknown("No RDAP server found for %s" % domain)
if server.endswith("/"): # find may return several RDAP servers
server = server[:-1] # Donuts RDAP server balks when there are two slashes and reply 404 found = False
try: unknowns= ""
response = requests.get("%s/domain/%s" % (server, domain), timeout=timeout) errors = ""
except requests.exceptions.Timeout: for server in servers:
unknown("Timeout when trying to reach %s" % server) if server.endswith("/"):
if response.status_code != 200: server = server[:-1] # Donuts RDAP server balks when there are two slashes and reply 404
error("Invalid RDAP return code: %s" % response.status_code) try:
response = requests.get("%s/domain/%s" % (server, domain), timeout=timeout)
if response.status_code != 200:
errors += "Invalid RDAP return code at %s: %s " % \
(server, response.status_code)
else:
found = True
break
except requests.exceptions.Timeout:
unknowns += "Timeout when trying to reach %s " % server
if not found:
if errors != "":
error(errors)
elif unknowns != "":
unknown(unknowns)
else:
unknown("No working server found and no error messages")
rdap = json.loads(response.content) rdap = json.loads(response.content)
for event in rdap["events"]: for event in rdap["events"]:
if event["eventAction"] == "expiration": if event["eventAction"] == "expiration":

View File

@ -1,8 +1,7 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
"""A simple module to get the RDAP server for a given domain name, """A simple module to get the RDAP server for a given domain name, IP
from the IANA database specified in RFC 9224. prefix or object, from the IANA databases specified in RFC 9224/8521.
""" """
# http://python-requests.org/ for easier HTTPS retrieval # http://python-requests.org/ for easier HTTPS retrieval
@ -17,8 +16,12 @@ import fcntl
import pickle import pickle
import pathlib import pathlib
IANABASE = "https://data.iana.org/rdap/dns.json" IANABASES = {"domains": "https://data.iana.org/rdap/dns.json",
CACHE = os.environ["HOME"] + "/.ianardapcache" "v4prefixes": "https://data.iana.org/rdap/ipv4.json",
"v6prefixes": "https://data.iana.org/rdap/ipv6.json",
"as": "https://data.iana.org/rdap/asn.json",
"objects": "https://data.iana.org/rdap/object-tags.json"}
CACHE = os.environ["HOME"] + "/.ianardapcaches"
MAXAGE = 24 # Hours. Used only if the server no longer gives the information. MAXAGE = 24 # Hours. Used only if the server no longer gives the information.
IANATIMEOUT = 10 # Seconds IANATIMEOUT = 10 # Seconds
MAXTESTS = 3 # Maximum attempts to get the database MAXTESTS = 3 # Maximum attempts to get the database
@ -46,19 +49,33 @@ def parse_expires(h):
class IanaRDAPDatabase(): class IanaRDAPDatabase():
def __init__(self, maxage=MAXAGE, cachefile=CACHE, pickleformat=False): def __init__(self, category="domains", maxage=None, cachedir=CACHE,
pickleformat=False):
"""Retrieves the IANA database, if not already cached. maxage is in """Retrieves the IANA database, if not already cached. maxage is in
hours. The cache file argument should not have an extension (it will hours. The cachedir is a directory (it will be created if not already
be added automatically). pickleformat is not the default because it is existant). pickleformat is not the default because it is not really
not really faster *and* it introduces security risks if someone can faster *and* it introduces security risks if someone can write in the
write in the file (see the documentation of the module).""" file (see the documentation of the module).
"""
cache_valid = False cache_valid = False
if not os.path.exists(cachedir):
os.mkdir(cachedir)
self.category = category
cachefile = os.path.join(cachedir, category)
if pickleformat: if pickleformat:
self.cachefile = cachefile + ".pickle" self.cachefile = cachefile + ".pickle"
else: else:
self.cachefile = cachefile + ".json" self.cachefile = cachefile + ".json"
self.lockname = self.cachefile + ".lock" self.lockname = self.cachefile + ".lock"
self.expirationfile = self.cachefile + ".expires" self.expirationfile = self.cachefile + ".expires"
if maxage is not None:
with open(self.expirationfile, 'w'):
self.expirationtime = time.mktime((datetime.datetime.now() + \
datetime.timedelta(hours=maxage)).timetuple())
os.utime(self.expirationfile,
times = (self.expirationtime, self.expirationtime))
loaded = False loaded = False
tests = 0 tests = 0
errmsg = "No error" errmsg = "No error"
@ -98,7 +115,7 @@ write in the file (see the documentation of the module)."""
continue continue
else: else:
self.unlock() self.unlock()
response = requests.get(IANABASE, timeout=IANATIMEOUT) response = requests.get(IANABASES[category], timeout=IANATIMEOUT)
expirationtime = None expirationtime = None
if "cache-control" in response.headers: if "cache-control" in response.headers:
directives = parse_cachecontrol(response.headers["cache-control"]) directives = parse_cachecontrol(response.headers["cache-control"])
@ -135,10 +152,27 @@ write in the file (see the documentation of the module)."""
self.publication = database["publication"] self.publication = database["publication"]
self.version = database["version"] self.version = database["version"]
self.services = {} self.services = {}
for service in database["services"]: if self.category == "domains":
for tld in service[0]: for service in database["services"]:
for server in service[1]: for tld in service[0]:
self.services[tld] = server if tld.lower() not in self.services:
self.services[tld.lower()] =[]
for server in service[1]:
# server is an URL so case-sensitive.
self.services[tld.lower()].append(server)
elif self.category == "objects":
for service in database["services"]:
maintainer = service[0]
for registry in service[1]:
if registry.upper() not in self.services:
self.services[registry.upper()] =[]
for server in service[2]:
self.services[registry.upper()]
for server in service[2]:
self.services[registry.upper()].append(server)
else: # IP addresses will be complicated, because of the
# longest prefix rule.
raise Exception("Unsupported category %s" % self.category)
if not cache_valid: if not cache_valid:
self.lock() self.lock()
cache = open(self.cachefile, "wb") cache = open(self.cachefile, "wb")
@ -157,16 +191,30 @@ write in the file (see the documentation of the module)."""
fcntl.lockf(self.lockhandle, fcntl.LOCK_UN) fcntl.lockf(self.lockhandle, fcntl.LOCK_UN)
self.lockhandle.close() self.lockhandle.close()
def find(self, domain): def find(self, id):
""" Get the RDAP server for a given domain name. None if there is none.""" """Get the RDAP server(s), as an array, for a given identifier. None
if domain.endswith("."): if there is none."""
domain = domain[:-1] if self.category == "domains":
labels = domain.split(".") domain = id
tld = labels[len(labels)-1] if domain.endswith("."):
if tld in self.services: domain = domain[:-1]
return self.services[tld] labels = domain.lower().split(".")
tld = labels[len(labels)-1]
if tld in self.services:
return self.services[tld]
else:
return None
elif self.category == "objects":
try:
(handle, registry) = id.rsplit("-", maxsplit=1)
except ValueError:
raise Exception("Not a valid RFC 8521 identifier: \"%s\"" % id)
if registry.upper() in self.services:
return self.services[registry.upper()]
else:
return None
else: else:
return None raise Exception("Unsupported category %s" % self.category)
if __name__ == "__main__": if __name__ == "__main__":
rdap = IanaRDAPDatabase(maxage=1) rdap = IanaRDAPDatabase(maxage=1)

View File

@ -17,13 +17,13 @@ def test_basic():
len(database.services) > 1000 len(database.services) > 1000
def test_alternative_cache(): def test_alternative_cache():
tmpfile = tempfile.NamedTemporaryFile(suffix=".testianacache", delete=False) tmpdir = tempfile.TemporaryDirectory(suffix=".testianacaches")
database = ianardap.IanaRDAPDatabase(cachefile=tmpfile.name, maxage=0) database = ianardap.IanaRDAPDatabase(cachedir=tmpdir.name, maxage=0)
assert os.path.exists(tmpfile.name) and \ assert os.path.exists(database.cachefile) and \
datetime.datetime.fromtimestamp(os.path.getmtime(tmpfile.name)) > \ datetime.datetime.fromtimestamp(os.path.getmtime(database.cachefile)) > \
(datetime.datetime.now() - datetime.timedelta(minutes=1)) (datetime.datetime.now() - datetime.timedelta(minutes=1))
os.remove(tmpfile.name) os.remove(database.cachefile)
os.remove(tmpfile.name + ".json.lock") os.remove(database.cachefile + ".lock")
def test_refresh(): def test_refresh():
# Force a resfresh # Force a resfresh
@ -35,7 +35,7 @@ def test_refresh():
def test_find_exists(): def test_find_exists():
database = ianardap.IanaRDAPDatabase() database = ianardap.IanaRDAPDatabase()
server = database.find("www.foobar.ar") server = database.find("www.foobar.ar")
assert server == "https://rdap.nic.ar/" assert server == ["https://rdap.nic.ar/"]
def test_find_not_exists(): def test_find_not_exists():
database = ianardap.IanaRDAPDatabase() database = ianardap.IanaRDAPDatabase()