A Nagios (and compatible) plugin to check the impending expiration of a domain name. It relies on RDAP exclusively (no whois) and works with every top-level domain with RDAP (which includes all the ICANN ones).
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

139 lines
5.2 KiB

1 year ago
#!/usr/bin/env python3
"""A simple module to get the RDAP server for a given domain name,
from the IANA database specified in RFC 7484.
"""
# http://python-requests.org/ for easier HTTPS retrieval
import requests
import datetime
import json
import os
import sys
import time
import fcntl
import pickle
1 year ago
IANABASE = "https://data.iana.org/rdap/dns.json"
CACHE = os.environ["HOME"] + "/.ianardapcache"
1 year ago
MAXAGE = 24 # Hours
1 year ago
IANATIMEOUT = 10 # Seconds
MAXTESTS = 3 # Maximum attempts to get the database
1 year ago
class IanaRDAPDatabase():
def __init__(self, maxage=MAXAGE, cachefile=CACHE, pickleformat=False):
"""Retrieves the IANA database, if not already cached. maxage is in
hours. The cache file argument should not have an extension (it will
be added automatically). pickleformat is not the default because it is
not really faster *and* it introduces security risks if someone can
write in the file (see the documentation of the module)."""
1 year ago
cache_valid = False
if pickleformat:
self.cachefile = cachefile + ".pickle"
else:
self.cachefile = cachefile + ".json"
self.lockname = self.cachefile + ".lock"
loaded = False
tests = 0
errmsg = "No error"
while not loaded and tests < MAXTESTS:
self.lock()
if os.path.exists(self.cachefile) and \
datetime.datetime.fromtimestamp(os.path.getmtime(self.cachefile)) > \
(datetime.datetime.now() - datetime.timedelta(hours = maxage)):
cache = open(self.cachefile, "rb")
content = cache.read()
cache.close()
self.unlock()
if pickleformat:
try:
database = pickle.loads(content)
loaded = True
self.retrieved = datetime.datetime.fromtimestamp(os.path.getmtime(self.cachefile))
cache_valid = True
except (pickle.UnpicklingError, EOFError):
tests += 1
errmsg = "Invalid pickle content in %s" % self.cachefile
# Delete it without mercy
os.remove(self.cachefile)
continue
else:
try:
database = json.loads(content)
loaded = True
self.retrieved = datetime.datetime.fromtimestamp(os.path.getmtime(self.cachefile))
cache_valid = True
except json.decoder.JSONDecodeError:
tests += 1
errmsg = "Invalid JSON content in %s" % self.cachefile
# Delete it without mercy
os.remove(self.cachefile)
continue
else:
self.unlock()
response = requests.get(IANABASE, timeout=IANATIMEOUT)
if response.status_code != 200:
time.sleep(2)
tests += 1
errmsg = "Invalid HTTPS return code when trying to get %s: %s" % (IANABASE, response.status_code)
continue
else:
loaded = True
self.retrieved = datetime.datetime.now()
try:
content = response.content
database = json.loads(content)
except json.decoder.JSONDecodeError:
tests += 1
errmsg = "Invalid JSON retrieved from %s" % IANABASE
continue
if not loaded:
raise Exception("Cannot read IANA database: %s" % errmsg)
1 year ago
self.description = database["description"]
self.publication = database["publication"]
self.version = database["version"]
self.services = {}
for service in database["services"]:
for tld in service[0]:
for server in service[1]:
self.services[tld] = server
if not cache_valid:
self.lock()
cache = open(self.cachefile, "wb")
if pickleformat:
cache.write(pickle.dumps(database))
else:
cache.write(content)
1 year ago
cache.close()
self.unlock()
1 year ago
def lock(self):
self.lockhandle = open(self.lockname, 'w')
fcntl.lockf(self.lockhandle, fcntl.LOCK_EX)
def unlock(self):
fcntl.lockf(self.lockhandle, fcntl.LOCK_UN)
self.lockhandle.close()
1 year ago
def find(self, domain):
""" Get the RDAP server for a given domain name. None if there is none."""
1 year ago
if domain.endswith("."):
domain = domain[:-1]
1 year ago
labels = domain.split(".")
tld = labels[len(labels)-1]
if tld in self.services:
return self.services[tld]
else:
return None
if __name__ == "__main__":
rdap = IanaRDAPDatabase(maxage=1)
print("Database \"%s\", version %s published on %s, retrieved on %s, %i services" % \
(rdap.description, rdap.version, rdap.publication, rdap.retrieved, len(rdap.services)))
1 year ago
for domain in sys.argv[1:]:
print("%s -> %s" % (domain, rdap.find(domain)))