mySpindel/LittleBock/extract.py

121 lines
4.1 KiB
Python
Executable File

#!/usr/bin/python3
"""
iSpindle data extractor for Little Bock HTML documents
"""
import csv
import datetime
import json
import pathlib
import sys
try:
import lxml.html as LX
except ModuleNotFoundError:
sys.exit("please install the 'lxml' module.\n\thttp://pypi.org/lxml")
OUTPUT = ".json" if (
len(sys.argv) > 1 and sys.argv[1] == "json"
) else ".csv"
class Frame(object):
"""a data frame"""
def __init__(self,data: json):
"""constructor"""
self.data = data
self.names = tuple(self._names())
self.keys = tuple(self._keys())
self.rows = tuple(self._rows())
@classmethod
def from_html(csl, pn: pathlib.Path):
"""extract JSON data from an HTML document"""
def stamp2date(i: int) -> str:
"""make a UNIX date stamp (Epoch) human-readable"""
dt = str(i)
if len(dt)>10: dt = dt[:10]
sdt = datetime.datetime.fromtimestamp(int(dt))
return str(sdt).replace(" ","_")
html, raw_js, js = LX.parse(str(pn)), None, []
raw_js = html.xpath("//*[@id='fermentation_log_chart']")
if raw_js:
raw_js = raw_js[0].get('data-chart-options')
if raw_js:
for i in json.loads(raw_js).pop('series'):
if i.get("data"): #clean up data
for unwanted in (
"color","opacity","yAxis","dashStyle","tooltip",):
if unwanted in i.keys():
if unwanted == "tooltip":
#append an unit to its matching value
tt = i.get(unwanted)
suffix = tt.get("valueSuffix")
if suffix: i.update(
{"name": " ".join([i["name"], suffix])})
del(i[unwanted]) #remove unwanted data
js.append(i)
if len(js) > 0:
#prepend human readable dates
ids = tuple(i.get("x") for i in js[0]["data"])
dates = [{"x": i,"y": stamp2date(i)} for i in ids]
dates_dict = {"name":"date_time", "data":[i for i in dates]}
js.insert(0,dates_dict)
return csl(js)
def _names(self):
"""generate names"""
for group in self.data:
if group.get("name"): yield group["name"]
def _keys(self):
"""generate keys"""
if len(self.data) and self.data[0].get("data"):
for i in self.data[0]["data"]:
yield i["x"]
def _rows(self):
"""generate table rows"""
def row(key):
"""generate a row"""
for name in self.names:
yield self.query(name,key)
for k in self.keys:
yield tuple(row(k))
def query(self,name,key):
"""query a value depending on an entry name and key"""
for group in self.data:
if group["name"] == name:
for value in group["data"]:
if value.get("x") == key:
return value.get("y")
def dump(self, path_out):
"""dump data as either JSON or CSV depending on the output extension"""
print(f"INFO: >> {path_out}")
if path_out.suffix == ".json":
with open(path_out,'w') as f:
f.write(json.dumps(self.data,
sort_keys=False, ensure_ascii=False, indent=2))
elif path_out.suffix == ".csv":
with open(path_out,'w', newline='') as f:
writer = csv.writer(f,dialect='unix')
writer.writerow(self.names)
writer.writerows(self.rows)
if __name__ == "__main__":
here = pathlib.Path.cwd()
docs = tuple(here.rglob("*.html"))
for i in docs:
if i.exists() and i.stat().st_size > 0:
print(f"INFO: {i}")
frame = Frame.from_html(i)
if len(frame.keys):
frame.dump(i.with_suffix(OUTPUT))
else:
print("INFO: no data: no output")