Compare commits

..

No commits in common. "04fb72e408a2e16c320a6287d09f1d94ea3a746a" and "79032c1021773f374eacea008ed7108f709f2e93" have entirely different histories.

5 changed files with 35 additions and 192 deletions

View File

@ -11,7 +11,6 @@ check_expire follows the usual Nagios rules. The options are:
* -w: warning threshold in days * -w: warning threshold in days
* -v: verbose details in output * -v: verbose details in output
* -u: unixtime output * -u: unixtime output
* -t: timeout for RDAP requetss (in seconds)
## Installation ## Installation
@ -31,7 +30,6 @@ object CheckCommand "expiration" {
"-H" = "$address$", "-H" = "$address$",
"-c" = "$expiration_critical$", "-c" = "$expiration_critical$",
"-w" = "$expiration_warning$", "-w" = "$expiration_warning$",
"-t" = "$expiration_timeout$",
"-v" = { set_if = "$expiration_verbose$" } "-v" = { set_if = "$expiration_verbose$" }
} }

View File

@ -43,10 +43,10 @@ domain = None
critical_t = datetime.timedelta(days=2) critical_t = datetime.timedelta(days=2)
warning_t = datetime.timedelta(days=7) warning_t = datetime.timedelta(days=7)
unixtime = False unixtime = False
timeout = 20 # Seconds # TODO implement timeout for HTTPS requests. -t option. Does Requests allow it?
def usage(msg=None): def usage(msg=None):
print("Usage: %s -H domain-name [-c critical -w warning -u -t timeout]" % sys.argv[0], end="") print("Usage: %s -H domain-name [-c critical -w warning -u]" % sys.argv[0], end="")
if msg is not None and msg != "": if msg is not None and msg != "":
print(" (%s)" % msg) print(" (%s)" % msg)
else: else:
@ -54,8 +54,8 @@ def usage(msg=None):
def details(): def details():
if verbose: if verbose:
print(" RDAP database \"%s\", version %s published on %s, retrieved on %s, RDAP server is %s" % \ print(" RDAP database \"%s\", version %s published on %s, RDAP server is %s" % \
(database.description, database.version, database.publication, database.retrieved, server)) (database.description, database.version, database.publication, server))
else: else:
print("") print("")
@ -73,13 +73,6 @@ def warning(msg=None):
details() details()
sys.exit(STATE_WARNING) sys.exit(STATE_WARNING)
def unknown(msg=None):
if msg is None:
msg = "Unknown"
print("%s UNKNOWN: %s" % (domain, msg), end="")
details()
sys.exit(STATE_UNKNOWN)
def ok(msg=None): def ok(msg=None):
if msg is None: if msg is None:
msg = "Unknown message but everything is OK" msg = "Unknown message but everything is OK"
@ -88,10 +81,8 @@ def ok(msg=None):
sys.exit(STATE_OK) sys.exit(STATE_OK)
try: try:
optlist, args = getopt.getopt (sys.argv[1:], "c:hH:t:uvVw:", optlist, args = getopt.getopt (sys.argv[1:], "c:hH:uvVw:",
["critical=", "expiration", "help", ["critical=", "expiration", "help", "unixtime", "verbose", "version", "warning="])
"timeout=", "unixtime", "verbose",
"version", "warning="])
for option, value in optlist: for option, value in optlist:
if option == "--critical" or option == "-c": if option == "--critical" or option == "-c":
critical_t = datetime.timedelta(days=int(value)) critical_t = datetime.timedelta(days=int(value))
@ -100,10 +91,6 @@ try:
sys.exit(STATE_OK) sys.exit(STATE_OK)
elif option == "--hostname" or option == "-H": elif option == "--hostname" or option == "-H":
domain = value domain = value
elif option == "--timeout" or option == "-t":
timeout = int(value)
elif option == "--unixtime" or option == "-u":
unixtime = True
elif option == "--verbose" or option == "-v": elif option == "--verbose" or option == "-v":
verbose = True verbose = True
elif option == "--version" or option == "-V": elif option == "--version" or option == "-V":
@ -111,6 +98,8 @@ try:
sys.exit(STATE_OK) sys.exit(STATE_OK)
elif option == "--warning" or option == "-w": elif option == "--warning" or option == "-w":
warning_t = datetime.timedelta(days=int(value)) warning_t = datetime.timedelta(days=int(value))
elif option == "--unixtime" or option == "-u":
unixtime = True
else: else:
# Should never occur, it is trapped by getopt # Should never occur, it is trapped by getopt
print("Unknown option %s" % option) print("Unknown option %s" % option)
@ -124,19 +113,13 @@ if len(args) != 0:
if domain is None: if domain is None:
usage("-H is mandatory") usage("-H is mandatory")
sys.exit(STATE_UNKNOWN) sys.exit(STATE_UNKNOWN)
try: database = ianardap.IanaRDAPDatabase()
database = ianardap.IanaRDAPDatabase()
except Exception as e:
unknown("Exception when retrieving the IANA database: \"%s\"" % e)
server = database.find(domain) server = database.find(domain)
if server is None: if server is None:
unknown("No RDAP server found for %s" % domain) error("No RDAP server found for %s" % domain)
if server.endswith("/"): if server.endswith("/"):
server = server[:-1] # Donuts RDAP server balks when there are two slashes and reply 404 server = server[:-1] # Donuts RDAP server balks when there are two slashes and reply 404
try: response = requests.get("%s/domain/%s" % (server, domain))
response = requests.get("%s/domain/%s" % (server, domain), timeout=timeout)
except requests.exceptions.Timeout:
unknown("Timeout when trying to reach %s" % server)
if response.status_code != 200: if response.status_code != 200:
error("Invalid RDAP return code: %s" % response.status_code) error("Invalid RDAP return code: %s" % response.status_code)
rdap = json.loads(response.content) rdap = json.loads(response.content)
@ -153,7 +136,8 @@ for event in rdap["events"]:
dt = re.sub(r"\+([0-9]{2}):([0-9]{2})$", r"+\1\2", event["eventDate"]) dt = re.sub(r"\+([0-9]{2}):([0-9]{2})$", r"+\1\2", event["eventDate"])
expiration = datetime.datetime.strptime(dt, RFC3339TZ) expiration = datetime.datetime.strptime(dt, RFC3339TZ)
else: else:
unknown("No recognized format for datetime \"%s\"" % event["eventDate"]) print("No recognized format for datetime \"%s\"" % event["eventDate"])
sys.exit(STATE_UNKNOWN)
now = datetime.datetime.now(tz=UTCINFO) now = datetime.datetime.now(tz=UTCINFO)
if unixtime == True: if unixtime == True:
print(int(expiration.strftime("%s"))) print(int(expiration.strftime("%s")))

98
ianardap.py Executable file → Normal file
View File

@ -12,86 +12,30 @@ import datetime
import json import json
import os import os
import sys import sys
import time
import fcntl
import pickle
IANABASE = "https://data.iana.org/rdap/dns.json" IANABASE = "https://data.iana.org/rdap/dns.json"
CACHE = os.environ["HOME"] + "/.ianardapcache" CACHE = os.environ["HOME"] + "/.ianardapcache.json"
MAXAGE = 24 # Hours MAXAGE = 24 # Hours
IANATIMEOUT = 10 # Seconds
MAXTESTS = 3 # Maximum attempts to get the database
class IanaRDAPDatabase(): class IanaRDAPDatabase():
def __init__(self, maxage=MAXAGE, cachefile=CACHE, pickleformat=False): def __init__(self, maxage=MAXAGE, cachefile=CACHE):
"""Retrieves the IANA database, if not already cached. maxage is in """ Retrieves the IANA databse, if not already cached. maxage is in hours. """
hours. The cache file argument should not have an extension (it will
be added automatically). pickleformat is not the default because it is
not really faster *and* it introduces security risks if someone can
write in the file (see the documentation of the module)."""
cache_valid = False cache_valid = False
if pickleformat: # TODO we should lock it
self.cachefile = cachefile + ".pickle" if os.path.exists(cachefile) and \
else: datetime.datetime.fromtimestamp(os.path.getmtime(cachefile)) >= \
self.cachefile = cachefile + ".json" (datetime.datetime.utcnow() - datetime.timedelta(hours = maxage)):
self.lockname = self.cachefile + ".lock" cache = open(cachefile, "rb")
loaded = False
tests = 0
errmsg = "No error"
while not loaded and tests < MAXTESTS:
self.lock()
if os.path.exists(self.cachefile) and \
datetime.datetime.fromtimestamp(os.path.getmtime(self.cachefile)) > \
(datetime.datetime.now() - datetime.timedelta(hours = maxage)):
cache = open(self.cachefile, "rb")
content = cache.read() content = cache.read()
cache.close() cache.close()
self.unlock()
if pickleformat:
try:
database = pickle.loads(content)
loaded = True
self.retrieved = datetime.datetime.fromtimestamp(os.path.getmtime(self.cachefile))
cache_valid = True cache_valid = True
except (pickle.UnpicklingError, EOFError):
tests += 1
errmsg = "Invalid pickle content in %s" % self.cachefile
# Delete it without mercy
os.remove(self.cachefile)
continue
else: else:
try: response = requests.get(IANABASE)
database = json.loads(content)
loaded = True
self.retrieved = datetime.datetime.fromtimestamp(os.path.getmtime(self.cachefile))
cache_valid = True
except json.decoder.JSONDecodeError:
tests += 1
errmsg = "Invalid JSON content in %s" % self.cachefile
# Delete it without mercy
os.remove(self.cachefile)
continue
else:
self.unlock()
response = requests.get(IANABASE, timeout=IANATIMEOUT)
if response.status_code != 200: if response.status_code != 200:
time.sleep(2) raise Exception("Invalid HTTPS return code when trying to get %s: %s" % (IANABASE, response.status_code))
tests += 1
errmsg = "Invalid HTTPS return code when trying to get %s: %s" % (IANABASE, response.status_code)
continue
else:
loaded = True
self.retrieved = datetime.datetime.now()
try:
content = response.content content = response.content
database = json.loads(content) database = json.loads(content)
except json.decoder.JSONDecodeError:
tests += 1
errmsg = "Invalid JSON retrieved from %s" % IANABASE
continue
if not loaded:
raise Exception("Cannot read IANA database: %s" % errmsg)
self.description = database["description"] self.description = database["description"]
self.publication = database["publication"] self.publication = database["publication"]
self.version = database["version"] self.version = database["version"]
@ -101,27 +45,13 @@ write in the file (see the documentation of the module)."""
for server in service[1]: for server in service[1]:
self.services[tld] = server self.services[tld] = server
if not cache_valid: if not cache_valid:
self.lock() # TODO we should lock it
cache = open(self.cachefile, "wb") cache = open(cachefile, "wb")
if pickleformat:
cache.write(pickle.dumps(database))
else:
cache.write(content) cache.write(content)
cache.close() cache.close()
self.unlock()
def lock(self):
self.lockhandle = open(self.lockname, 'w')
fcntl.lockf(self.lockhandle, fcntl.LOCK_EX)
def unlock(self):
fcntl.lockf(self.lockhandle, fcntl.LOCK_UN)
self.lockhandle.close()
def find(self, domain): def find(self, domain):
""" Get the RDAP server for a given domain name. None if there is none.""" """ Get the RDAP server for a given domain name. None if there is none."""
if domain.endswith("."):
domain = domain[:-1]
labels = domain.split(".") labels = domain.split(".")
tld = labels[len(labels)-1] tld = labels[len(labels)-1]
if tld in self.services: if tld in self.services:
@ -131,8 +61,8 @@ write in the file (see the documentation of the module)."""
if __name__ == "__main__": if __name__ == "__main__":
rdap = IanaRDAPDatabase(maxage=1) rdap = IanaRDAPDatabase(maxage=1)
print("Database \"%s\", version %s published on %s, retrieved on %s, %i services" % \ print("Database \"%s\", version %s published on %s, %i services" % \
(rdap.description, rdap.version, rdap.publication, rdap.retrieved, len(rdap.services))) (rdap.description, rdap.version, rdap.publication, len(rdap.services)))
for domain in sys.argv[1:]: for domain in sys.argv[1:]:
print("%s -> %s" % (domain, rdap.find(domain))) print("%s -> %s" % (domain, rdap.find(domain)))

View File

@ -1,50 +0,0 @@
#!/usr/bin/env python3
"""Test by executing 'pytest' https://docs.pytest.org/
Don't forget to set PYTHONPATH if you want to test the development
tree and not the installed package. """
import ianardap
import tempfile
import os.path
import datetime
def test_basic():
database = ianardap.IanaRDAPDatabase()
assert database.description.startswith("RDAP bootstrap file") and database.version == "1.0" and \
len(database.services) > 1000
def test_alternative_cache():
tmpfile = tempfile.NamedTemporaryFile(suffix=".testianacache", delete=False)
database = ianardap.IanaRDAPDatabase(cachefile=tmpfile.name, maxage=0)
assert os.path.exists(tmpfile.name) and \
datetime.datetime.fromtimestamp(os.path.getmtime(tmpfile.name)) > \
(datetime.datetime.now() - datetime.timedelta(minutes=1))
os.remove(tmpfile.name)
os.remove(tmpfile.name + ".json.lock")
def test_refresh():
# Force a resfresh
database = ianardap.IanaRDAPDatabase(maxage=0)
assert (database.retrieved > (datetime.datetime.now() - datetime.timedelta(minutes=1))) and \
(datetime.datetime.fromtimestamp(os.path.getmtime(database.cachefile)) > \
(datetime.datetime.now() - datetime.timedelta(minutes=1)))
def test_find_exists():
database = ianardap.IanaRDAPDatabase()
server = database.find("www.foobar.ar")
assert server == "https://rdap.nic.ar/"
def test_find_not_exists():
database = ianardap.IanaRDAPDatabase()
server = database.find("www.foobar.example")
assert server is None
def test_pickle():
database = ianardap.IanaRDAPDatabase(pickleformat=True)
assert database.description.startswith("RDAP bootstrap file") and database.version == "1.0" and \
len(database.services) > 1000

View File

@ -33,7 +33,7 @@ tests:
args: args:
- '-H' - '-H'
- 'bie.re' - 'bie.re'
retcode: 3 retcode: 2
partstdout: 'No RDAP server' partstdout: 'No RDAP server'
# 2021-07-05: no expiration in RDAP for this TLD (but there is one # 2021-07-05: no expiration in RDAP for this TLD (but there is one
@ -45,16 +45,6 @@ tests:
retcode: 2 retcode: 2
partstdout: 'No expiration found' partstdout: 'No expiration found'
# Far away and slow, timeout is expected
- exe: './check_expire'
args:
- '-t'
- '1'
- '-H'
- 'nic.ar'
retcode: 3
partstdout: 'Timeout'
- exe: './check_expire' - exe: './check_expire'
args: args:
- '-H' - '-H'
@ -72,15 +62,6 @@ tests:
stderr: '' stderr: ''
partstdout: 'RDAP server is' partstdout: 'RDAP server is'
- exe: './check_expire'
args:
- '-u'
- '-H'
- 'bortzmeyer.org'
retcode: 0
stderr: ''
# TODO how to test we get an integer?
- exe: './check_expire' - exe: './check_expire'
args: args:
- '-c' - '-c'