Compare commits

..

No commits in common. "master" and "master" have entirely different histories.

3 changed files with 41 additions and 105 deletions

View File

@ -128,33 +128,17 @@ try:
database = ianardap.IanaRDAPDatabase()
except Exception as e:
unknown("Exception when retrieving the IANA database: \"%s\"" % e)
servers = database.find(domain)
if servers is None:
server = database.find(domain)
if server is None:
unknown("No RDAP server found for %s" % domain)
# find may return several RDAP servers
found = False
unknowns= ""
errors = ""
for server in servers:
if server.endswith("/"):
server = server[:-1] # Donuts RDAP server balks when there are two slashes and reply 404
try:
response = requests.get("%s/domain/%s" % (server, domain), timeout=timeout)
if response.status_code != 200:
errors += "Invalid RDAP return code at %s: %s " % \
(server, response.status_code)
else:
found = True
break
except requests.exceptions.Timeout:
unknowns += "Timeout when trying to reach %s " % server
if not found:
if errors != "":
error(errors)
elif unknowns != "":
unknown(unknowns)
else:
unknown("No working server found and no error messages")
if server.endswith("/"):
server = server[:-1] # Donuts RDAP server balks when there are two slashes and reply 404
try:
response = requests.get("%s/domain/%s" % (server, domain), timeout=timeout)
except requests.exceptions.Timeout:
unknown("Timeout when trying to reach %s" % server)
if response.status_code != 200:
error("Invalid RDAP return code: %s" % response.status_code)
rdap = json.loads(response.content)
for event in rdap["events"]:
if event["eventAction"] == "expiration":

View File

@ -1,7 +1,8 @@
#!/usr/bin/env python3
"""A simple module to get the RDAP server for a given domain name, IP
prefix or object, from the IANA databases specified in RFC 9224/8521.
"""A simple module to get the RDAP server for a given domain name,
from the IANA database specified in RFC 9224.
"""
# http://python-requests.org/ for easier HTTPS retrieval
@ -16,12 +17,8 @@ import fcntl
import pickle
import pathlib
IANABASES = {"domains": "https://data.iana.org/rdap/dns.json",
"v4prefixes": "https://data.iana.org/rdap/ipv4.json",
"v6prefixes": "https://data.iana.org/rdap/ipv6.json",
"as": "https://data.iana.org/rdap/asn.json",
"objects": "https://data.iana.org/rdap/object-tags.json"}
CACHE = os.environ["HOME"] + "/.ianardapcaches"
IANABASE = "https://data.iana.org/rdap/dns.json"
CACHE = os.environ["HOME"] + "/.ianardapcache"
MAXAGE = 24 # Hours. Used only if the server no longer gives the information.
IANATIMEOUT = 10 # Seconds
MAXTESTS = 3 # Maximum attempts to get the database
@ -49,33 +46,19 @@ def parse_expires(h):
class IanaRDAPDatabase():
def __init__(self, category="domains", maxage=None, cachedir=CACHE,
pickleformat=False):
def __init__(self, maxage=MAXAGE, cachefile=CACHE, pickleformat=False):
"""Retrieves the IANA database, if not already cached. maxage is in
hours. The cachedir is a directory (it will be created if not already
existant). pickleformat is not the default because it is not really
faster *and* it introduces security risks if someone can write in the
file (see the documentation of the module).
"""
hours. The cache file argument should not have an extension (it will
be added automatically). pickleformat is not the default because it is
not really faster *and* it introduces security risks if someone can
write in the file (see the documentation of the module)."""
cache_valid = False
if not os.path.exists(cachedir):
os.mkdir(cachedir)
self.category = category
cachefile = os.path.join(cachedir, category)
if pickleformat:
self.cachefile = cachefile + ".pickle"
else:
self.cachefile = cachefile + ".json"
self.lockname = self.cachefile + ".lock"
self.expirationfile = self.cachefile + ".expires"
if maxage is not None:
with open(self.expirationfile, 'w'):
self.expirationtime = time.mktime((datetime.datetime.now() + \
datetime.timedelta(hours=maxage)).timetuple())
os.utime(self.expirationfile,
times = (self.expirationtime, self.expirationtime))
loaded = False
tests = 0
errmsg = "No error"
@ -115,7 +98,7 @@ file (see the documentation of the module).
continue
else:
self.unlock()
response = requests.get(IANABASES[category], timeout=IANATIMEOUT)
response = requests.get(IANABASE, timeout=IANATIMEOUT)
expirationtime = None
if "cache-control" in response.headers:
directives = parse_cachecontrol(response.headers["cache-control"])
@ -152,27 +135,10 @@ file (see the documentation of the module).
self.publication = database["publication"]
self.version = database["version"]
self.services = {}
if self.category == "domains":
for service in database["services"]:
for tld in service[0]:
if tld.lower() not in self.services:
self.services[tld.lower()] =[]
for server in service[1]:
# server is an URL so case-sensitive.
self.services[tld.lower()].append(server)
elif self.category == "objects":
for service in database["services"]:
maintainer = service[0]
for registry in service[1]:
if registry.upper() not in self.services:
self.services[registry.upper()] =[]
for server in service[2]:
self.services[registry.upper()]
for server in service[2]:
self.services[registry.upper()].append(server)
else: # IP addresses will be complicated, because of the
# longest prefix rule.
raise Exception("Unsupported category %s" % self.category)
for service in database["services"]:
for tld in service[0]:
for server in service[1]:
self.services[tld] = server
if not cache_valid:
self.lock()
cache = open(self.cachefile, "wb")
@ -191,30 +157,16 @@ file (see the documentation of the module).
fcntl.lockf(self.lockhandle, fcntl.LOCK_UN)
self.lockhandle.close()
def find(self, id):
"""Get the RDAP server(s), as an array, for a given identifier. None
if there is none."""
if self.category == "domains":
domain = id
if domain.endswith("."):
domain = domain[:-1]
labels = domain.lower().split(".")
tld = labels[len(labels)-1]
if tld in self.services:
return self.services[tld]
else:
return None
elif self.category == "objects":
try:
(handle, registry) = id.rsplit("-", maxsplit=1)
except ValueError:
raise Exception("Not a valid RFC 8521 identifier: \"%s\"" % id)
if registry.upper() in self.services:
return self.services[registry.upper()]
else:
return None
def find(self, domain):
""" Get the RDAP server for a given domain name. None if there is none."""
if domain.endswith("."):
domain = domain[:-1]
labels = domain.split(".")
tld = labels[len(labels)-1]
if tld in self.services:
return self.services[tld]
else:
raise Exception("Unsupported category %s" % self.category)
return None
if __name__ == "__main__":
rdap = IanaRDAPDatabase(maxage=1)

View File

@ -17,13 +17,13 @@ def test_basic():
len(database.services) > 1000
def test_alternative_cache():
tmpdir = tempfile.TemporaryDirectory(suffix=".testianacaches")
database = ianardap.IanaRDAPDatabase(cachedir=tmpdir.name, maxage=0)
assert os.path.exists(database.cachefile) and \
datetime.datetime.fromtimestamp(os.path.getmtime(database.cachefile)) > \
tmpfile = tempfile.NamedTemporaryFile(suffix=".testianacache", delete=False)
database = ianardap.IanaRDAPDatabase(cachefile=tmpfile.name, maxage=0)
assert os.path.exists(tmpfile.name) and \
datetime.datetime.fromtimestamp(os.path.getmtime(tmpfile.name)) > \
(datetime.datetime.now() - datetime.timedelta(minutes=1))
os.remove(database.cachefile)
os.remove(database.cachefile + ".lock")
os.remove(tmpfile.name)
os.remove(tmpfile.name + ".json.lock")
def test_refresh():
# Force a resfresh
@ -35,7 +35,7 @@ def test_refresh():
def test_find_exists():
database = ianardap.IanaRDAPDatabase()
server = database.find("www.foobar.ar")
assert server == ["https://rdap.nic.ar/"]
assert server == "https://rdap.nic.ar/"
def test_find_not_exists():
database = ianardap.IanaRDAPDatabase()