forked from bortzmeyer/check_expire
Compare commits
No commits in common. "04fb72e408a2e16c320a6287d09f1d94ea3a746a" and "79032c1021773f374eacea008ed7108f709f2e93" have entirely different histories.
04fb72e408
...
79032c1021
|
@ -11,7 +11,6 @@ check_expire follows the usual Nagios rules. The options are:
|
|||
* -w: warning threshold in days
|
||||
* -v: verbose details in output
|
||||
* -u: unixtime output
|
||||
* -t: timeout for RDAP requetss (in seconds)
|
||||
|
||||
## Installation
|
||||
|
||||
|
@ -31,7 +30,6 @@ object CheckCommand "expiration" {
|
|||
"-H" = "$address$",
|
||||
"-c" = "$expiration_critical$",
|
||||
"-w" = "$expiration_warning$",
|
||||
"-t" = "$expiration_timeout$",
|
||||
"-v" = { set_if = "$expiration_verbose$" }
|
||||
}
|
||||
|
||||
|
|
42
check_expire
42
check_expire
|
@ -43,10 +43,10 @@ domain = None
|
|||
critical_t = datetime.timedelta(days=2)
|
||||
warning_t = datetime.timedelta(days=7)
|
||||
unixtime = False
|
||||
timeout = 20 # Seconds
|
||||
# TODO implement timeout for HTTPS requests. -t option. Does Requests allow it?
|
||||
|
||||
def usage(msg=None):
|
||||
print("Usage: %s -H domain-name [-c critical -w warning -u -t timeout]" % sys.argv[0], end="")
|
||||
print("Usage: %s -H domain-name [-c critical -w warning -u]" % sys.argv[0], end="")
|
||||
if msg is not None and msg != "":
|
||||
print(" (%s)" % msg)
|
||||
else:
|
||||
|
@ -54,8 +54,8 @@ def usage(msg=None):
|
|||
|
||||
def details():
|
||||
if verbose:
|
||||
print(" RDAP database \"%s\", version %s published on %s, retrieved on %s, RDAP server is %s" % \
|
||||
(database.description, database.version, database.publication, database.retrieved, server))
|
||||
print(" RDAP database \"%s\", version %s published on %s, RDAP server is %s" % \
|
||||
(database.description, database.version, database.publication, server))
|
||||
else:
|
||||
print("")
|
||||
|
||||
|
@ -73,13 +73,6 @@ def warning(msg=None):
|
|||
details()
|
||||
sys.exit(STATE_WARNING)
|
||||
|
||||
def unknown(msg=None):
|
||||
if msg is None:
|
||||
msg = "Unknown"
|
||||
print("%s UNKNOWN: %s" % (domain, msg), end="")
|
||||
details()
|
||||
sys.exit(STATE_UNKNOWN)
|
||||
|
||||
def ok(msg=None):
|
||||
if msg is None:
|
||||
msg = "Unknown message but everything is OK"
|
||||
|
@ -88,10 +81,8 @@ def ok(msg=None):
|
|||
sys.exit(STATE_OK)
|
||||
|
||||
try:
|
||||
optlist, args = getopt.getopt (sys.argv[1:], "c:hH:t:uvVw:",
|
||||
["critical=", "expiration", "help",
|
||||
"timeout=", "unixtime", "verbose",
|
||||
"version", "warning="])
|
||||
optlist, args = getopt.getopt (sys.argv[1:], "c:hH:uvVw:",
|
||||
["critical=", "expiration", "help", "unixtime", "verbose", "version", "warning="])
|
||||
for option, value in optlist:
|
||||
if option == "--critical" or option == "-c":
|
||||
critical_t = datetime.timedelta(days=int(value))
|
||||
|
@ -100,10 +91,6 @@ try:
|
|||
sys.exit(STATE_OK)
|
||||
elif option == "--hostname" or option == "-H":
|
||||
domain = value
|
||||
elif option == "--timeout" or option == "-t":
|
||||
timeout = int(value)
|
||||
elif option == "--unixtime" or option == "-u":
|
||||
unixtime = True
|
||||
elif option == "--verbose" or option == "-v":
|
||||
verbose = True
|
||||
elif option == "--version" or option == "-V":
|
||||
|
@ -111,6 +98,8 @@ try:
|
|||
sys.exit(STATE_OK)
|
||||
elif option == "--warning" or option == "-w":
|
||||
warning_t = datetime.timedelta(days=int(value))
|
||||
elif option == "--unixtime" or option == "-u":
|
||||
unixtime = True
|
||||
else:
|
||||
# Should never occur, it is trapped by getopt
|
||||
print("Unknown option %s" % option)
|
||||
|
@ -124,19 +113,13 @@ if len(args) != 0:
|
|||
if domain is None:
|
||||
usage("-H is mandatory")
|
||||
sys.exit(STATE_UNKNOWN)
|
||||
try:
|
||||
database = ianardap.IanaRDAPDatabase()
|
||||
except Exception as e:
|
||||
unknown("Exception when retrieving the IANA database: \"%s\"" % e)
|
||||
database = ianardap.IanaRDAPDatabase()
|
||||
server = database.find(domain)
|
||||
if server is None:
|
||||
unknown("No RDAP server found for %s" % domain)
|
||||
error("No RDAP server found for %s" % domain)
|
||||
if server.endswith("/"):
|
||||
server = server[:-1] # Donuts RDAP server balks when there are two slashes and reply 404
|
||||
try:
|
||||
response = requests.get("%s/domain/%s" % (server, domain), timeout=timeout)
|
||||
except requests.exceptions.Timeout:
|
||||
unknown("Timeout when trying to reach %s" % server)
|
||||
response = requests.get("%s/domain/%s" % (server, domain))
|
||||
if response.status_code != 200:
|
||||
error("Invalid RDAP return code: %s" % response.status_code)
|
||||
rdap = json.loads(response.content)
|
||||
|
@ -153,7 +136,8 @@ for event in rdap["events"]:
|
|||
dt = re.sub(r"\+([0-9]{2}):([0-9]{2})$", r"+\1\2", event["eventDate"])
|
||||
expiration = datetime.datetime.strptime(dt, RFC3339TZ)
|
||||
else:
|
||||
unknown("No recognized format for datetime \"%s\"" % event["eventDate"])
|
||||
print("No recognized format for datetime \"%s\"" % event["eventDate"])
|
||||
sys.exit(STATE_UNKNOWN)
|
||||
now = datetime.datetime.now(tz=UTCINFO)
|
||||
if unixtime == True:
|
||||
print(int(expiration.strftime("%s")))
|
||||
|
|
|
@ -12,86 +12,30 @@ import datetime
|
|||
import json
|
||||
import os
|
||||
import sys
|
||||
import time
|
||||
import fcntl
|
||||
import pickle
|
||||
|
||||
IANABASE = "https://data.iana.org/rdap/dns.json"
|
||||
CACHE = os.environ["HOME"] + "/.ianardapcache"
|
||||
CACHE = os.environ["HOME"] + "/.ianardapcache.json"
|
||||
MAXAGE = 24 # Hours
|
||||
IANATIMEOUT = 10 # Seconds
|
||||
MAXTESTS = 3 # Maximum attempts to get the database
|
||||
|
||||
class IanaRDAPDatabase():
|
||||
|
||||
def __init__(self, maxage=MAXAGE, cachefile=CACHE, pickleformat=False):
|
||||
"""Retrieves the IANA database, if not already cached. maxage is in
|
||||
hours. The cache file argument should not have an extension (it will
|
||||
be added automatically). pickleformat is not the default because it is
|
||||
not really faster *and* it introduces security risks if someone can
|
||||
write in the file (see the documentation of the module)."""
|
||||
def __init__(self, maxage=MAXAGE, cachefile=CACHE):
|
||||
""" Retrieves the IANA databse, if not already cached. maxage is in hours. """
|
||||
cache_valid = False
|
||||
if pickleformat:
|
||||
self.cachefile = cachefile + ".pickle"
|
||||
# TODO we should lock it
|
||||
if os.path.exists(cachefile) and \
|
||||
datetime.datetime.fromtimestamp(os.path.getmtime(cachefile)) >= \
|
||||
(datetime.datetime.utcnow() - datetime.timedelta(hours = maxage)):
|
||||
cache = open(cachefile, "rb")
|
||||
content = cache.read()
|
||||
cache.close()
|
||||
cache_valid = True
|
||||
else:
|
||||
self.cachefile = cachefile + ".json"
|
||||
self.lockname = self.cachefile + ".lock"
|
||||
loaded = False
|
||||
tests = 0
|
||||
errmsg = "No error"
|
||||
while not loaded and tests < MAXTESTS:
|
||||
self.lock()
|
||||
if os.path.exists(self.cachefile) and \
|
||||
datetime.datetime.fromtimestamp(os.path.getmtime(self.cachefile)) > \
|
||||
(datetime.datetime.now() - datetime.timedelta(hours = maxage)):
|
||||
cache = open(self.cachefile, "rb")
|
||||
content = cache.read()
|
||||
cache.close()
|
||||
self.unlock()
|
||||
if pickleformat:
|
||||
try:
|
||||
database = pickle.loads(content)
|
||||
loaded = True
|
||||
self.retrieved = datetime.datetime.fromtimestamp(os.path.getmtime(self.cachefile))
|
||||
cache_valid = True
|
||||
except (pickle.UnpicklingError, EOFError):
|
||||
tests += 1
|
||||
errmsg = "Invalid pickle content in %s" % self.cachefile
|
||||
# Delete it without mercy
|
||||
os.remove(self.cachefile)
|
||||
continue
|
||||
else:
|
||||
try:
|
||||
database = json.loads(content)
|
||||
loaded = True
|
||||
self.retrieved = datetime.datetime.fromtimestamp(os.path.getmtime(self.cachefile))
|
||||
cache_valid = True
|
||||
except json.decoder.JSONDecodeError:
|
||||
tests += 1
|
||||
errmsg = "Invalid JSON content in %s" % self.cachefile
|
||||
# Delete it without mercy
|
||||
os.remove(self.cachefile)
|
||||
continue
|
||||
else:
|
||||
self.unlock()
|
||||
response = requests.get(IANABASE, timeout=IANATIMEOUT)
|
||||
if response.status_code != 200:
|
||||
time.sleep(2)
|
||||
tests += 1
|
||||
errmsg = "Invalid HTTPS return code when trying to get %s: %s" % (IANABASE, response.status_code)
|
||||
continue
|
||||
else:
|
||||
loaded = True
|
||||
self.retrieved = datetime.datetime.now()
|
||||
try:
|
||||
content = response.content
|
||||
database = json.loads(content)
|
||||
except json.decoder.JSONDecodeError:
|
||||
tests += 1
|
||||
errmsg = "Invalid JSON retrieved from %s" % IANABASE
|
||||
continue
|
||||
if not loaded:
|
||||
raise Exception("Cannot read IANA database: %s" % errmsg)
|
||||
response = requests.get(IANABASE)
|
||||
if response.status_code != 200:
|
||||
raise Exception("Invalid HTTPS return code when trying to get %s: %s" % (IANABASE, response.status_code))
|
||||
content = response.content
|
||||
database = json.loads(content)
|
||||
self.description = database["description"]
|
||||
self.publication = database["publication"]
|
||||
self.version = database["version"]
|
||||
|
@ -101,27 +45,13 @@ write in the file (see the documentation of the module)."""
|
|||
for server in service[1]:
|
||||
self.services[tld] = server
|
||||
if not cache_valid:
|
||||
self.lock()
|
||||
cache = open(self.cachefile, "wb")
|
||||
if pickleformat:
|
||||
cache.write(pickle.dumps(database))
|
||||
else:
|
||||
cache.write(content)
|
||||
# TODO we should lock it
|
||||
cache = open(cachefile, "wb")
|
||||
cache.write(content)
|
||||
cache.close()
|
||||
self.unlock()
|
||||
|
||||
def lock(self):
|
||||
self.lockhandle = open(self.lockname, 'w')
|
||||
fcntl.lockf(self.lockhandle, fcntl.LOCK_EX)
|
||||
|
||||
def unlock(self):
|
||||
fcntl.lockf(self.lockhandle, fcntl.LOCK_UN)
|
||||
self.lockhandle.close()
|
||||
|
||||
def find(self, domain):
|
||||
""" Get the RDAP server for a given domain name. None if there is none."""
|
||||
if domain.endswith("."):
|
||||
domain = domain[:-1]
|
||||
labels = domain.split(".")
|
||||
tld = labels[len(labels)-1]
|
||||
if tld in self.services:
|
||||
|
@ -131,8 +61,8 @@ write in the file (see the documentation of the module)."""
|
|||
|
||||
if __name__ == "__main__":
|
||||
rdap = IanaRDAPDatabase(maxage=1)
|
||||
print("Database \"%s\", version %s published on %s, retrieved on %s, %i services" % \
|
||||
(rdap.description, rdap.version, rdap.publication, rdap.retrieved, len(rdap.services)))
|
||||
print("Database \"%s\", version %s published on %s, %i services" % \
|
||||
(rdap.description, rdap.version, rdap.publication, len(rdap.services)))
|
||||
for domain in sys.argv[1:]:
|
||||
print("%s -> %s" % (domain, rdap.find(domain)))
|
||||
|
||||
|
|
|
@ -1,50 +0,0 @@
|
|||
#!/usr/bin/env python3
|
||||
|
||||
"""Test by executing 'pytest' https://docs.pytest.org/
|
||||
|
||||
Don't forget to set PYTHONPATH if you want to test the development
|
||||
tree and not the installed package. """
|
||||
|
||||
import ianardap
|
||||
|
||||
import tempfile
|
||||
import os.path
|
||||
import datetime
|
||||
|
||||
def test_basic():
|
||||
database = ianardap.IanaRDAPDatabase()
|
||||
assert database.description.startswith("RDAP bootstrap file") and database.version == "1.0" and \
|
||||
len(database.services) > 1000
|
||||
|
||||
def test_alternative_cache():
|
||||
tmpfile = tempfile.NamedTemporaryFile(suffix=".testianacache", delete=False)
|
||||
database = ianardap.IanaRDAPDatabase(cachefile=tmpfile.name, maxage=0)
|
||||
assert os.path.exists(tmpfile.name) and \
|
||||
datetime.datetime.fromtimestamp(os.path.getmtime(tmpfile.name)) > \
|
||||
(datetime.datetime.now() - datetime.timedelta(minutes=1))
|
||||
os.remove(tmpfile.name)
|
||||
os.remove(tmpfile.name + ".json.lock")
|
||||
|
||||
def test_refresh():
|
||||
# Force a resfresh
|
||||
database = ianardap.IanaRDAPDatabase(maxage=0)
|
||||
assert (database.retrieved > (datetime.datetime.now() - datetime.timedelta(minutes=1))) and \
|
||||
(datetime.datetime.fromtimestamp(os.path.getmtime(database.cachefile)) > \
|
||||
(datetime.datetime.now() - datetime.timedelta(minutes=1)))
|
||||
|
||||
def test_find_exists():
|
||||
database = ianardap.IanaRDAPDatabase()
|
||||
server = database.find("www.foobar.ar")
|
||||
assert server == "https://rdap.nic.ar/"
|
||||
|
||||
def test_find_not_exists():
|
||||
database = ianardap.IanaRDAPDatabase()
|
||||
server = database.find("www.foobar.example")
|
||||
assert server is None
|
||||
|
||||
def test_pickle():
|
||||
database = ianardap.IanaRDAPDatabase(pickleformat=True)
|
||||
assert database.description.startswith("RDAP bootstrap file") and database.version == "1.0" and \
|
||||
len(database.services) > 1000
|
||||
|
||||
|
21
tests.yaml
21
tests.yaml
|
@ -33,7 +33,7 @@ tests:
|
|||
args:
|
||||
- '-H'
|
||||
- 'bie.re'
|
||||
retcode: 3
|
||||
retcode: 2
|
||||
partstdout: 'No RDAP server'
|
||||
|
||||
# 2021-07-05: no expiration in RDAP for this TLD (but there is one
|
||||
|
@ -45,16 +45,6 @@ tests:
|
|||
retcode: 2
|
||||
partstdout: 'No expiration found'
|
||||
|
||||
# Far away and slow, timeout is expected
|
||||
- exe: './check_expire'
|
||||
args:
|
||||
- '-t'
|
||||
- '1'
|
||||
- '-H'
|
||||
- 'nic.ar'
|
||||
retcode: 3
|
||||
partstdout: 'Timeout'
|
||||
|
||||
- exe: './check_expire'
|
||||
args:
|
||||
- '-H'
|
||||
|
@ -72,15 +62,6 @@ tests:
|
|||
stderr: ''
|
||||
partstdout: 'RDAP server is'
|
||||
|
||||
- exe: './check_expire'
|
||||
args:
|
||||
- '-u'
|
||||
- '-H'
|
||||
- 'bortzmeyer.org'
|
||||
retcode: 0
|
||||
stderr: ''
|
||||
# TODO how to test we get an integer?
|
||||
|
||||
- exe: './check_expire'
|
||||
args:
|
||||
- '-c'
|
||||
|
|
Loading…
Reference in New Issue