Compare commits

..

No commits in common. "04fb72e408a2e16c320a6287d09f1d94ea3a746a" and "79032c1021773f374eacea008ed7108f709f2e93" have entirely different histories.

5 changed files with 35 additions and 192 deletions

View File

@ -11,7 +11,6 @@ check_expire follows the usual Nagios rules. The options are:
* -w: warning threshold in days
* -v: verbose details in output
* -u: unixtime output
* -t: timeout for RDAP requetss (in seconds)
## Installation
@ -31,7 +30,6 @@ object CheckCommand "expiration" {
"-H" = "$address$",
"-c" = "$expiration_critical$",
"-w" = "$expiration_warning$",
"-t" = "$expiration_timeout$",
"-v" = { set_if = "$expiration_verbose$" }
}

View File

@ -43,10 +43,10 @@ domain = None
critical_t = datetime.timedelta(days=2)
warning_t = datetime.timedelta(days=7)
unixtime = False
timeout = 20 # Seconds
# TODO implement timeout for HTTPS requests. -t option. Does Requests allow it?
def usage(msg=None):
print("Usage: %s -H domain-name [-c critical -w warning -u -t timeout]" % sys.argv[0], end="")
print("Usage: %s -H domain-name [-c critical -w warning -u]" % sys.argv[0], end="")
if msg is not None and msg != "":
print(" (%s)" % msg)
else:
@ -54,8 +54,8 @@ def usage(msg=None):
def details():
if verbose:
print(" RDAP database \"%s\", version %s published on %s, retrieved on %s, RDAP server is %s" % \
(database.description, database.version, database.publication, database.retrieved, server))
print(" RDAP database \"%s\", version %s published on %s, RDAP server is %s" % \
(database.description, database.version, database.publication, server))
else:
print("")
@ -73,13 +73,6 @@ def warning(msg=None):
details()
sys.exit(STATE_WARNING)
def unknown(msg=None):
if msg is None:
msg = "Unknown"
print("%s UNKNOWN: %s" % (domain, msg), end="")
details()
sys.exit(STATE_UNKNOWN)
def ok(msg=None):
if msg is None:
msg = "Unknown message but everything is OK"
@ -88,10 +81,8 @@ def ok(msg=None):
sys.exit(STATE_OK)
try:
optlist, args = getopt.getopt (sys.argv[1:], "c:hH:t:uvVw:",
["critical=", "expiration", "help",
"timeout=", "unixtime", "verbose",
"version", "warning="])
optlist, args = getopt.getopt (sys.argv[1:], "c:hH:uvVw:",
["critical=", "expiration", "help", "unixtime", "verbose", "version", "warning="])
for option, value in optlist:
if option == "--critical" or option == "-c":
critical_t = datetime.timedelta(days=int(value))
@ -100,10 +91,6 @@ try:
sys.exit(STATE_OK)
elif option == "--hostname" or option == "-H":
domain = value
elif option == "--timeout" or option == "-t":
timeout = int(value)
elif option == "--unixtime" or option == "-u":
unixtime = True
elif option == "--verbose" or option == "-v":
verbose = True
elif option == "--version" or option == "-V":
@ -111,6 +98,8 @@ try:
sys.exit(STATE_OK)
elif option == "--warning" or option == "-w":
warning_t = datetime.timedelta(days=int(value))
elif option == "--unixtime" or option == "-u":
unixtime = True
else:
# Should never occur, it is trapped by getopt
print("Unknown option %s" % option)
@ -124,19 +113,13 @@ if len(args) != 0:
if domain is None:
usage("-H is mandatory")
sys.exit(STATE_UNKNOWN)
try:
database = ianardap.IanaRDAPDatabase()
except Exception as e:
unknown("Exception when retrieving the IANA database: \"%s\"" % e)
database = ianardap.IanaRDAPDatabase()
server = database.find(domain)
if server is None:
unknown("No RDAP server found for %s" % domain)
error("No RDAP server found for %s" % domain)
if server.endswith("/"):
server = server[:-1] # Donuts RDAP server balks when there are two slashes and reply 404
try:
response = requests.get("%s/domain/%s" % (server, domain), timeout=timeout)
except requests.exceptions.Timeout:
unknown("Timeout when trying to reach %s" % server)
response = requests.get("%s/domain/%s" % (server, domain))
if response.status_code != 200:
error("Invalid RDAP return code: %s" % response.status_code)
rdap = json.loads(response.content)
@ -153,7 +136,8 @@ for event in rdap["events"]:
dt = re.sub(r"\+([0-9]{2}):([0-9]{2})$", r"+\1\2", event["eventDate"])
expiration = datetime.datetime.strptime(dt, RFC3339TZ)
else:
unknown("No recognized format for datetime \"%s\"" % event["eventDate"])
print("No recognized format for datetime \"%s\"" % event["eventDate"])
sys.exit(STATE_UNKNOWN)
now = datetime.datetime.now(tz=UTCINFO)
if unixtime == True:
print(int(expiration.strftime("%s")))

112
ianardap.py Executable file → Normal file
View File

@ -12,86 +12,30 @@ import datetime
import json
import os
import sys
import time
import fcntl
import pickle
IANABASE = "https://data.iana.org/rdap/dns.json"
CACHE = os.environ["HOME"] + "/.ianardapcache"
CACHE = os.environ["HOME"] + "/.ianardapcache.json"
MAXAGE = 24 # Hours
IANATIMEOUT = 10 # Seconds
MAXTESTS = 3 # Maximum attempts to get the database
class IanaRDAPDatabase():
def __init__(self, maxage=MAXAGE, cachefile=CACHE, pickleformat=False):
"""Retrieves the IANA database, if not already cached. maxage is in
hours. The cache file argument should not have an extension (it will
be added automatically). pickleformat is not the default because it is
not really faster *and* it introduces security risks if someone can
write in the file (see the documentation of the module)."""
def __init__(self, maxage=MAXAGE, cachefile=CACHE):
""" Retrieves the IANA databse, if not already cached. maxage is in hours. """
cache_valid = False
if pickleformat:
self.cachefile = cachefile + ".pickle"
# TODO we should lock it
if os.path.exists(cachefile) and \
datetime.datetime.fromtimestamp(os.path.getmtime(cachefile)) >= \
(datetime.datetime.utcnow() - datetime.timedelta(hours = maxage)):
cache = open(cachefile, "rb")
content = cache.read()
cache.close()
cache_valid = True
else:
self.cachefile = cachefile + ".json"
self.lockname = self.cachefile + ".lock"
loaded = False
tests = 0
errmsg = "No error"
while not loaded and tests < MAXTESTS:
self.lock()
if os.path.exists(self.cachefile) and \
datetime.datetime.fromtimestamp(os.path.getmtime(self.cachefile)) > \
(datetime.datetime.now() - datetime.timedelta(hours = maxage)):
cache = open(self.cachefile, "rb")
content = cache.read()
cache.close()
self.unlock()
if pickleformat:
try:
database = pickle.loads(content)
loaded = True
self.retrieved = datetime.datetime.fromtimestamp(os.path.getmtime(self.cachefile))
cache_valid = True
except (pickle.UnpicklingError, EOFError):
tests += 1
errmsg = "Invalid pickle content in %s" % self.cachefile
# Delete it without mercy
os.remove(self.cachefile)
continue
else:
try:
database = json.loads(content)
loaded = True
self.retrieved = datetime.datetime.fromtimestamp(os.path.getmtime(self.cachefile))
cache_valid = True
except json.decoder.JSONDecodeError:
tests += 1
errmsg = "Invalid JSON content in %s" % self.cachefile
# Delete it without mercy
os.remove(self.cachefile)
continue
else:
self.unlock()
response = requests.get(IANABASE, timeout=IANATIMEOUT)
if response.status_code != 200:
time.sleep(2)
tests += 1
errmsg = "Invalid HTTPS return code when trying to get %s: %s" % (IANABASE, response.status_code)
continue
else:
loaded = True
self.retrieved = datetime.datetime.now()
try:
content = response.content
database = json.loads(content)
except json.decoder.JSONDecodeError:
tests += 1
errmsg = "Invalid JSON retrieved from %s" % IANABASE
continue
if not loaded:
raise Exception("Cannot read IANA database: %s" % errmsg)
response = requests.get(IANABASE)
if response.status_code != 200:
raise Exception("Invalid HTTPS return code when trying to get %s: %s" % (IANABASE, response.status_code))
content = response.content
database = json.loads(content)
self.description = database["description"]
self.publication = database["publication"]
self.version = database["version"]
@ -101,27 +45,13 @@ write in the file (see the documentation of the module)."""
for server in service[1]:
self.services[tld] = server
if not cache_valid:
self.lock()
cache = open(self.cachefile, "wb")
if pickleformat:
cache.write(pickle.dumps(database))
else:
cache.write(content)
# TODO we should lock it
cache = open(cachefile, "wb")
cache.write(content)
cache.close()
self.unlock()
def lock(self):
self.lockhandle = open(self.lockname, 'w')
fcntl.lockf(self.lockhandle, fcntl.LOCK_EX)
def unlock(self):
fcntl.lockf(self.lockhandle, fcntl.LOCK_UN)
self.lockhandle.close()
def find(self, domain):
""" Get the RDAP server for a given domain name. None if there is none."""
if domain.endswith("."):
domain = domain[:-1]
labels = domain.split(".")
tld = labels[len(labels)-1]
if tld in self.services:
@ -131,8 +61,8 @@ write in the file (see the documentation of the module)."""
if __name__ == "__main__":
rdap = IanaRDAPDatabase(maxage=1)
print("Database \"%s\", version %s published on %s, retrieved on %s, %i services" % \
(rdap.description, rdap.version, rdap.publication, rdap.retrieved, len(rdap.services)))
print("Database \"%s\", version %s published on %s, %i services" % \
(rdap.description, rdap.version, rdap.publication, len(rdap.services)))
for domain in sys.argv[1:]:
print("%s -> %s" % (domain, rdap.find(domain)))

View File

@ -1,50 +0,0 @@
#!/usr/bin/env python3
"""Test by executing 'pytest' https://docs.pytest.org/
Don't forget to set PYTHONPATH if you want to test the development
tree and not the installed package. """
import ianardap
import tempfile
import os.path
import datetime
def test_basic():
database = ianardap.IanaRDAPDatabase()
assert database.description.startswith("RDAP bootstrap file") and database.version == "1.0" and \
len(database.services) > 1000
def test_alternative_cache():
tmpfile = tempfile.NamedTemporaryFile(suffix=".testianacache", delete=False)
database = ianardap.IanaRDAPDatabase(cachefile=tmpfile.name, maxage=0)
assert os.path.exists(tmpfile.name) and \
datetime.datetime.fromtimestamp(os.path.getmtime(tmpfile.name)) > \
(datetime.datetime.now() - datetime.timedelta(minutes=1))
os.remove(tmpfile.name)
os.remove(tmpfile.name + ".json.lock")
def test_refresh():
# Force a resfresh
database = ianardap.IanaRDAPDatabase(maxage=0)
assert (database.retrieved > (datetime.datetime.now() - datetime.timedelta(minutes=1))) and \
(datetime.datetime.fromtimestamp(os.path.getmtime(database.cachefile)) > \
(datetime.datetime.now() - datetime.timedelta(minutes=1)))
def test_find_exists():
database = ianardap.IanaRDAPDatabase()
server = database.find("www.foobar.ar")
assert server == "https://rdap.nic.ar/"
def test_find_not_exists():
database = ianardap.IanaRDAPDatabase()
server = database.find("www.foobar.example")
assert server is None
def test_pickle():
database = ianardap.IanaRDAPDatabase(pickleformat=True)
assert database.description.startswith("RDAP bootstrap file") and database.version == "1.0" and \
len(database.services) > 1000

View File

@ -33,7 +33,7 @@ tests:
args:
- '-H'
- 'bie.re'
retcode: 3
retcode: 2
partstdout: 'No RDAP server'
# 2021-07-05: no expiration in RDAP for this TLD (but there is one
@ -45,16 +45,6 @@ tests:
retcode: 2
partstdout: 'No expiration found'
# Far away and slow, timeout is expected
- exe: './check_expire'
args:
- '-t'
- '1'
- '-H'
- 'nic.ar'
retcode: 3
partstdout: 'Timeout'
- exe: './check_expire'
args:
- '-H'
@ -72,15 +62,6 @@ tests:
stderr: ''
partstdout: 'RDAP server is'
- exe: './check_expire'
args:
- '-u'
- '-H'
- 'bortzmeyer.org'
retcode: 0
stderr: ''
# TODO how to test we get an integer?
- exe: './check_expire'
args:
- '-c'