Compare commits

...

6 Commits

Author SHA1 Message Date
Stephane Bortzmeyer 04fb72e408 Pickle implemented (but not activated by default). Closes #1 2021-07-07 15:50:47 +02:00
Stephane Bortzmeyer ae00a81f79 New code to retrieve the database, more robust 2021-07-07 15:23:26 +02:00
Stephane Bortzmeyer 09f5ccc3e4 Small cleanups 2021-07-07 12:08:37 +02:00
Stephane Bortzmeyer 65d8deb569 Documentation of timeout 2021-07-07 11:59:25 +02:00
Stephane Bortzmeyer 6b4f5b228e Timeouts in HTTP requests 2021-07-07 11:57:27 +02:00
Stephane Bortzmeyer a3a1f8c0d4 Lock the cache file 2021-07-07 11:13:36 +02:00
5 changed files with 192 additions and 35 deletions

View File

@ -11,6 +11,7 @@ check_expire follows the usual Nagios rules. The options are:
* -w: warning threshold in days
* -v: verbose details in output
* -u: unixtime output
* -t: timeout for RDAP requetss (in seconds)
## Installation
@ -30,6 +31,7 @@ object CheckCommand "expiration" {
"-H" = "$address$",
"-c" = "$expiration_critical$",
"-w" = "$expiration_warning$",
"-t" = "$expiration_timeout$",
"-v" = { set_if = "$expiration_verbose$" }
}

View File

@ -43,10 +43,10 @@ domain = None
critical_t = datetime.timedelta(days=2)
warning_t = datetime.timedelta(days=7)
unixtime = False
# TODO implement timeout for HTTPS requests. -t option. Does Requests allow it?
timeout = 20 # Seconds
def usage(msg=None):
print("Usage: %s -H domain-name [-c critical -w warning -u]" % sys.argv[0], end="")
print("Usage: %s -H domain-name [-c critical -w warning -u -t timeout]" % sys.argv[0], end="")
if msg is not None and msg != "":
print(" (%s)" % msg)
else:
@ -54,8 +54,8 @@ def usage(msg=None):
def details():
if verbose:
print(" RDAP database \"%s\", version %s published on %s, RDAP server is %s" % \
(database.description, database.version, database.publication, server))
print(" RDAP database \"%s\", version %s published on %s, retrieved on %s, RDAP server is %s" % \
(database.description, database.version, database.publication, database.retrieved, server))
else:
print("")
@ -73,6 +73,13 @@ def warning(msg=None):
details()
sys.exit(STATE_WARNING)
def unknown(msg=None):
if msg is None:
msg = "Unknown"
print("%s UNKNOWN: %s" % (domain, msg), end="")
details()
sys.exit(STATE_UNKNOWN)
def ok(msg=None):
if msg is None:
msg = "Unknown message but everything is OK"
@ -81,8 +88,10 @@ def ok(msg=None):
sys.exit(STATE_OK)
try:
optlist, args = getopt.getopt (sys.argv[1:], "c:hH:uvVw:",
["critical=", "expiration", "help", "unixtime", "verbose", "version", "warning="])
optlist, args = getopt.getopt (sys.argv[1:], "c:hH:t:uvVw:",
["critical=", "expiration", "help",
"timeout=", "unixtime", "verbose",
"version", "warning="])
for option, value in optlist:
if option == "--critical" or option == "-c":
critical_t = datetime.timedelta(days=int(value))
@ -91,6 +100,10 @@ try:
sys.exit(STATE_OK)
elif option == "--hostname" or option == "-H":
domain = value
elif option == "--timeout" or option == "-t":
timeout = int(value)
elif option == "--unixtime" or option == "-u":
unixtime = True
elif option == "--verbose" or option == "-v":
verbose = True
elif option == "--version" or option == "-V":
@ -98,8 +111,6 @@ try:
sys.exit(STATE_OK)
elif option == "--warning" or option == "-w":
warning_t = datetime.timedelta(days=int(value))
elif option == "--unixtime" or option == "-u":
unixtime = True
else:
# Should never occur, it is trapped by getopt
print("Unknown option %s" % option)
@ -113,13 +124,19 @@ if len(args) != 0:
if domain is None:
usage("-H is mandatory")
sys.exit(STATE_UNKNOWN)
database = ianardap.IanaRDAPDatabase()
try:
database = ianardap.IanaRDAPDatabase()
except Exception as e:
unknown("Exception when retrieving the IANA database: \"%s\"" % e)
server = database.find(domain)
if server is None:
error("No RDAP server found for %s" % domain)
unknown("No RDAP server found for %s" % domain)
if server.endswith("/"):
server = server[:-1] # Donuts RDAP server balks when there are two slashes and reply 404
response = requests.get("%s/domain/%s" % (server, domain))
try:
response = requests.get("%s/domain/%s" % (server, domain), timeout=timeout)
except requests.exceptions.Timeout:
unknown("Timeout when trying to reach %s" % server)
if response.status_code != 200:
error("Invalid RDAP return code: %s" % response.status_code)
rdap = json.loads(response.content)
@ -136,8 +153,7 @@ for event in rdap["events"]:
dt = re.sub(r"\+([0-9]{2}):([0-9]{2})$", r"+\1\2", event["eventDate"])
expiration = datetime.datetime.strptime(dt, RFC3339TZ)
else:
print("No recognized format for datetime \"%s\"" % event["eventDate"])
sys.exit(STATE_UNKNOWN)
unknown("No recognized format for datetime \"%s\"" % event["eventDate"])
now = datetime.datetime.now(tz=UTCINFO)
if unixtime == True:
print(int(expiration.strftime("%s")))

112
ianardap.py Normal file → Executable file
View File

@ -12,30 +12,86 @@ import datetime
import json
import os
import sys
import time
import fcntl
import pickle
IANABASE = "https://data.iana.org/rdap/dns.json"
CACHE = os.environ["HOME"] + "/.ianardapcache.json"
CACHE = os.environ["HOME"] + "/.ianardapcache"
MAXAGE = 24 # Hours
IANATIMEOUT = 10 # Seconds
MAXTESTS = 3 # Maximum attempts to get the database
class IanaRDAPDatabase():
def __init__(self, maxage=MAXAGE, cachefile=CACHE):
""" Retrieves the IANA databse, if not already cached. maxage is in hours. """
def __init__(self, maxage=MAXAGE, cachefile=CACHE, pickleformat=False):
"""Retrieves the IANA database, if not already cached. maxage is in
hours. The cache file argument should not have an extension (it will
be added automatically). pickleformat is not the default because it is
not really faster *and* it introduces security risks if someone can
write in the file (see the documentation of the module)."""
cache_valid = False
# TODO we should lock it
if os.path.exists(cachefile) and \
datetime.datetime.fromtimestamp(os.path.getmtime(cachefile)) >= \
(datetime.datetime.utcnow() - datetime.timedelta(hours = maxage)):
cache = open(cachefile, "rb")
content = cache.read()
cache.close()
cache_valid = True
if pickleformat:
self.cachefile = cachefile + ".pickle"
else:
response = requests.get(IANABASE)
if response.status_code != 200:
raise Exception("Invalid HTTPS return code when trying to get %s: %s" % (IANABASE, response.status_code))
content = response.content
database = json.loads(content)
self.cachefile = cachefile + ".json"
self.lockname = self.cachefile + ".lock"
loaded = False
tests = 0
errmsg = "No error"
while not loaded and tests < MAXTESTS:
self.lock()
if os.path.exists(self.cachefile) and \
datetime.datetime.fromtimestamp(os.path.getmtime(self.cachefile)) > \
(datetime.datetime.now() - datetime.timedelta(hours = maxage)):
cache = open(self.cachefile, "rb")
content = cache.read()
cache.close()
self.unlock()
if pickleformat:
try:
database = pickle.loads(content)
loaded = True
self.retrieved = datetime.datetime.fromtimestamp(os.path.getmtime(self.cachefile))
cache_valid = True
except (pickle.UnpicklingError, EOFError):
tests += 1
errmsg = "Invalid pickle content in %s" % self.cachefile
# Delete it without mercy
os.remove(self.cachefile)
continue
else:
try:
database = json.loads(content)
loaded = True
self.retrieved = datetime.datetime.fromtimestamp(os.path.getmtime(self.cachefile))
cache_valid = True
except json.decoder.JSONDecodeError:
tests += 1
errmsg = "Invalid JSON content in %s" % self.cachefile
# Delete it without mercy
os.remove(self.cachefile)
continue
else:
self.unlock()
response = requests.get(IANABASE, timeout=IANATIMEOUT)
if response.status_code != 200:
time.sleep(2)
tests += 1
errmsg = "Invalid HTTPS return code when trying to get %s: %s" % (IANABASE, response.status_code)
continue
else:
loaded = True
self.retrieved = datetime.datetime.now()
try:
content = response.content
database = json.loads(content)
except json.decoder.JSONDecodeError:
tests += 1
errmsg = "Invalid JSON retrieved from %s" % IANABASE
continue
if not loaded:
raise Exception("Cannot read IANA database: %s" % errmsg)
self.description = database["description"]
self.publication = database["publication"]
self.version = database["version"]
@ -45,13 +101,27 @@ class IanaRDAPDatabase():
for server in service[1]:
self.services[tld] = server
if not cache_valid:
# TODO we should lock it
cache = open(cachefile, "wb")
cache.write(content)
self.lock()
cache = open(self.cachefile, "wb")
if pickleformat:
cache.write(pickle.dumps(database))
else:
cache.write(content)
cache.close()
self.unlock()
def lock(self):
self.lockhandle = open(self.lockname, 'w')
fcntl.lockf(self.lockhandle, fcntl.LOCK_EX)
def unlock(self):
fcntl.lockf(self.lockhandle, fcntl.LOCK_UN)
self.lockhandle.close()
def find(self, domain):
""" Get the RDAP server for a given domain name. None if there is none."""
if domain.endswith("."):
domain = domain[:-1]
labels = domain.split(".")
tld = labels[len(labels)-1]
if tld in self.services:
@ -61,8 +131,8 @@ class IanaRDAPDatabase():
if __name__ == "__main__":
rdap = IanaRDAPDatabase(maxage=1)
print("Database \"%s\", version %s published on %s, %i services" % \
(rdap.description, rdap.version, rdap.publication, len(rdap.services)))
print("Database \"%s\", version %s published on %s, retrieved on %s, %i services" % \
(rdap.description, rdap.version, rdap.publication, rdap.retrieved, len(rdap.services)))
for domain in sys.argv[1:]:
print("%s -> %s" % (domain, rdap.find(domain)))

50
test_ianardap.py Normal file
View File

@ -0,0 +1,50 @@
#!/usr/bin/env python3
"""Test by executing 'pytest' https://docs.pytest.org/
Don't forget to set PYTHONPATH if you want to test the development
tree and not the installed package. """
import ianardap
import tempfile
import os.path
import datetime
def test_basic():
database = ianardap.IanaRDAPDatabase()
assert database.description.startswith("RDAP bootstrap file") and database.version == "1.0" and \
len(database.services) > 1000
def test_alternative_cache():
tmpfile = tempfile.NamedTemporaryFile(suffix=".testianacache", delete=False)
database = ianardap.IanaRDAPDatabase(cachefile=tmpfile.name, maxage=0)
assert os.path.exists(tmpfile.name) and \
datetime.datetime.fromtimestamp(os.path.getmtime(tmpfile.name)) > \
(datetime.datetime.now() - datetime.timedelta(minutes=1))
os.remove(tmpfile.name)
os.remove(tmpfile.name + ".json.lock")
def test_refresh():
# Force a resfresh
database = ianardap.IanaRDAPDatabase(maxage=0)
assert (database.retrieved > (datetime.datetime.now() - datetime.timedelta(minutes=1))) and \
(datetime.datetime.fromtimestamp(os.path.getmtime(database.cachefile)) > \
(datetime.datetime.now() - datetime.timedelta(minutes=1)))
def test_find_exists():
database = ianardap.IanaRDAPDatabase()
server = database.find("www.foobar.ar")
assert server == "https://rdap.nic.ar/"
def test_find_not_exists():
database = ianardap.IanaRDAPDatabase()
server = database.find("www.foobar.example")
assert server is None
def test_pickle():
database = ianardap.IanaRDAPDatabase(pickleformat=True)
assert database.description.startswith("RDAP bootstrap file") and database.version == "1.0" and \
len(database.services) > 1000

View File

@ -33,7 +33,7 @@ tests:
args:
- '-H'
- 'bie.re'
retcode: 2
retcode: 3
partstdout: 'No RDAP server'
# 2021-07-05: no expiration in RDAP for this TLD (but there is one
@ -45,6 +45,16 @@ tests:
retcode: 2
partstdout: 'No expiration found'
# Far away and slow, timeout is expected
- exe: './check_expire'
args:
- '-t'
- '1'
- '-H'
- 'nic.ar'
retcode: 3
partstdout: 'Timeout'
- exe: './check_expire'
args:
- '-H'
@ -62,6 +72,15 @@ tests:
stderr: ''
partstdout: 'RDAP server is'
- exe: './check_expire'
args:
- '-u'
- '-H'
- 'bortzmeyer.org'
retcode: 0
stderr: ''
# TODO how to test we get an integer?
- exe: './check_expire'
args:
- '-c'