merge all processing steps into 'extract' script
This commit is contained in:
parent
5b233db286
commit
0616b4a329
@ -1,44 +0,0 @@
|
||||
#!/usr/bin/python3
|
||||
|
||||
import json
|
||||
import pathlib
|
||||
import datetime
|
||||
|
||||
"""
|
||||
ajoute une valeur de date lisible par un humain pour chaque entrée dans le JSON
|
||||
|
||||
"""
|
||||
|
||||
def proc(path_in):
|
||||
"""traite un document JSON"""
|
||||
|
||||
def stamp2date(i: int) -> str:
|
||||
"""normalise une date UNIX"""
|
||||
dt = str(i)
|
||||
if len(dt)>10: dt = dt[:10]
|
||||
sdt = datetime.datetime.fromtimestamp(int(dt))
|
||||
return str(sdt).replace(" ","_")
|
||||
|
||||
_json = None
|
||||
with open(path_in,'r') as f:
|
||||
_json = json.load(f)
|
||||
ids = tuple(i.get("x") for i in _json[0]["data"])
|
||||
dates = [{"x": i,"y": stamp2date(i)} for i in ids]
|
||||
dates_dict = {"name":"date_time", "data":[i for i in dates]}
|
||||
_json.insert(0,dates_dict)
|
||||
return _json
|
||||
|
||||
if __name__ == "__main__":
|
||||
here = pathlib.Path.cwd()
|
||||
docs = tuple(here.rglob("*.json"))
|
||||
if len(docs) == 0:
|
||||
print("Aucun fichier JSON ('.json') trouvé.")
|
||||
for i in docs:
|
||||
if i.exists() and i.stat().st_size > 0:
|
||||
data = proc(i)
|
||||
if data:
|
||||
with open(i,'w') as f:
|
||||
f.write(json.dumps(data,
|
||||
sort_keys=False, ensure_ascii=False, indent=2))
|
||||
print(f"INFO: {i.name} modifié")
|
||||
|
@ -1,65 +0,0 @@
|
||||
#!/usr/bin/python3
|
||||
"""
|
||||
exportation de données iSpindel JSON vers données applaties CSV
|
||||
"""
|
||||
import csv
|
||||
import json
|
||||
import pathlib
|
||||
|
||||
class Frame(object):
|
||||
"""un cadre de données"""
|
||||
|
||||
def __init__(self,fname):
|
||||
"""constructeur"""
|
||||
with open(fname,'r') as f:
|
||||
self._json = json.load(f)
|
||||
self.names = tuple(self._names())
|
||||
self.keys = tuple(self._keys())
|
||||
self.rows = tuple(self._rows())
|
||||
|
||||
def _names(self):
|
||||
"""génère une liste de noms"""
|
||||
for group in self._json:
|
||||
yield group["name"]
|
||||
|
||||
def _keys(self):
|
||||
"""génère un liste de clés"""
|
||||
for i in self._json[0]["data"]:
|
||||
yield i["x"]
|
||||
|
||||
def _rows(self):
|
||||
"""génère les lignes d'un tableau"""
|
||||
|
||||
def row(key):
|
||||
"""génère une ligne d'un tableau"""
|
||||
for name in self.names:
|
||||
yield self.get(name,key)
|
||||
|
||||
for k in self.keys:
|
||||
yield tuple(row(k))
|
||||
|
||||
def get(self,name,key):
|
||||
"""récupère la valeur d'une entrée d'après son nom et sa clé"""
|
||||
for group in self._json:
|
||||
if group["name"] == name:
|
||||
for value in group["data"]:
|
||||
if value.get("x") == key:
|
||||
return value.get("y")
|
||||
|
||||
if __name__ == "__main__":
|
||||
here = pathlib.Path.cwd()
|
||||
docs = tuple(here.rglob("*.json"))
|
||||
if len(docs) == 0:
|
||||
print("Aucun fichier JSON ('.json') trouvé.")
|
||||
for doc_i in docs:
|
||||
frame = None
|
||||
if doc_i.exists() and doc_i.stat().st_size > 0:
|
||||
frame = Frame(doc_i)
|
||||
doc_o = doc_i.with_suffix(".csv")
|
||||
with open(doc_o,'w', newline='') as f:
|
||||
writer = csv.writer(f,dialect='unix')
|
||||
writer.writerow(frame.names)
|
||||
writer.writerows(frame.rows)
|
||||
print(f"INFO: {doc_i.name} >> {doc_o.name}")
|
||||
|
||||
|
@ -1,54 +1,120 @@
|
||||
#!/usr/bin/python3
|
||||
"""
|
||||
extracteur de données iSpindel JSON entreposées par Little Bock en HTML
|
||||
iSpindle data extractor for Little Bock HTML documents
|
||||
"""
|
||||
import json, pathlib
|
||||
import csv
|
||||
import datetime
|
||||
import json
|
||||
import pathlib
|
||||
import sys
|
||||
try:
|
||||
import lxml.html as LX
|
||||
except ModuleNotFoundError:
|
||||
import sys
|
||||
sys.exit("Le module 'lxml' est nécessaire.\n\thttp://pypi.org/lxml")
|
||||
sys.exit("please install the 'lxml' module.\n\thttp://pypi.org/lxml")
|
||||
|
||||
def jsproc(data):
|
||||
"""traite les données JSON"""
|
||||
if data.get("data"):
|
||||
#nettoyage des données
|
||||
unwanted = ("color","opacity","yAxis","dashStyle", "tooltip",)
|
||||
for u in unwanted:
|
||||
if u in data.keys():
|
||||
if u == "tooltip": #rattache une unité à la valeur concernée
|
||||
tt = data.get(u)
|
||||
OUTPUT = ".json" if (
|
||||
len(sys.argv) > 1 and sys.argv[1] == "json"
|
||||
) else ".csv"
|
||||
|
||||
class Frame(object):
|
||||
"""a data frame"""
|
||||
|
||||
def __init__(self,data: json):
|
||||
"""constructor"""
|
||||
self.data = data
|
||||
self.names = tuple(self._names())
|
||||
self.keys = tuple(self._keys())
|
||||
self.rows = tuple(self._rows())
|
||||
|
||||
@classmethod
|
||||
def from_html(csl, pn: pathlib.Path):
|
||||
"""extract JSON data from an HTML document"""
|
||||
|
||||
def stamp2date(i: int) -> str:
|
||||
"""make a UNIX date stamp (Epoch) human-readable"""
|
||||
dt = str(i)
|
||||
if len(dt)>10: dt = dt[:10]
|
||||
sdt = datetime.datetime.fromtimestamp(int(dt))
|
||||
return str(sdt).replace(" ","_")
|
||||
|
||||
html, raw_js, js = LX.parse(str(pn)), None, []
|
||||
raw_js = html.xpath("//*[@id='fermentation_log_chart']")
|
||||
if raw_js:
|
||||
raw_js = raw_js[0].get('data-chart-options')
|
||||
if raw_js:
|
||||
for i in json.loads(raw_js).pop('series'):
|
||||
if i.get("data"): #clean up data
|
||||
for unwanted in (
|
||||
"color","opacity","yAxis","dashStyle","tooltip",):
|
||||
if unwanted in i.keys():
|
||||
if unwanted == "tooltip":
|
||||
#append an unit to its matching value
|
||||
tt = i.get(unwanted)
|
||||
suffix = tt.get("valueSuffix")
|
||||
if suffix: data.update({
|
||||
"name": " ".join([data["name"], suffix])
|
||||
})
|
||||
del(data[u]) #supprime la donnée indésirable
|
||||
return data
|
||||
if suffix: i.update(
|
||||
{"name": " ".join([i["name"], suffix])})
|
||||
del(i[unwanted]) #remove unwanted data
|
||||
js.append(i)
|
||||
if len(js) > 0:
|
||||
#prepend human readable dates
|
||||
ids = tuple(i.get("x") for i in js[0]["data"])
|
||||
dates = [{"x": i,"y": stamp2date(i)} for i in ids]
|
||||
dates_dict = {"name":"date_time", "data":[i for i in dates]}
|
||||
js.insert(0,dates_dict)
|
||||
return csl(js)
|
||||
|
||||
def hproc(path_in):
|
||||
"""traite un document HTML"""
|
||||
data1, data2 = None, []
|
||||
h = LX.parse(str(path_in))
|
||||
x = h.xpath("//*[@id='fermentation_log_chart']")
|
||||
if x: data1 = x[0].get('data-chart-options')
|
||||
if data1:
|
||||
for i in json.loads(data1).pop('series'):
|
||||
data = jsproc(i)
|
||||
if data: data2.append(data)
|
||||
if len(data2) > 0:
|
||||
path_out = path_in.with_suffix('.json')
|
||||
def _names(self):
|
||||
"""generate names"""
|
||||
for group in self.data:
|
||||
if group.get("name"): yield group["name"]
|
||||
|
||||
def _keys(self):
|
||||
"""generate keys"""
|
||||
if len(self.data) and self.data[0].get("data"):
|
||||
for i in self.data[0]["data"]:
|
||||
yield i["x"]
|
||||
|
||||
def _rows(self):
|
||||
"""generate table rows"""
|
||||
|
||||
def row(key):
|
||||
"""generate a row"""
|
||||
for name in self.names:
|
||||
yield self.query(name,key)
|
||||
|
||||
for k in self.keys:
|
||||
yield tuple(row(k))
|
||||
|
||||
def query(self,name,key):
|
||||
"""query a value depending on an entry name and key"""
|
||||
for group in self.data:
|
||||
if group["name"] == name:
|
||||
for value in group["data"]:
|
||||
if value.get("x") == key:
|
||||
return value.get("y")
|
||||
|
||||
def dump(self, path_out):
|
||||
"""dump data as either JSON or CSV depending on the output extension"""
|
||||
print(f"INFO: >> {path_out}")
|
||||
if path_out.suffix == ".json":
|
||||
with open(path_out,'w') as f:
|
||||
f.write(json.dumps(data2,
|
||||
f.write(json.dumps(self.data,
|
||||
sort_keys=False, ensure_ascii=False, indent=2))
|
||||
print(f"INFO: {path_in.name} >> {path_out.name}")
|
||||
elif path_out.suffix == ".csv":
|
||||
with open(path_out,'w', newline='') as f:
|
||||
writer = csv.writer(f,dialect='unix')
|
||||
writer.writerow(self.names)
|
||||
writer.writerows(self.rows)
|
||||
|
||||
if __name__ == "__main__":
|
||||
here = pathlib.Path.cwd()
|
||||
hdocs = tuple(here.rglob("*.html"))
|
||||
if len(hdocs) == 0:
|
||||
print("Aucun fichier HTML ('.html') trouvé.")
|
||||
for i in hdocs:
|
||||
docs = tuple(here.rglob("*.html"))
|
||||
for i in docs:
|
||||
if i.exists() and i.stat().st_size > 0:
|
||||
hproc(i)
|
||||
|
||||
print(f"INFO: {i}")
|
||||
frame = Frame.from_html(i)
|
||||
if len(frame.keys):
|
||||
frame.dump(i.with_suffix(OUTPUT))
|
||||
else:
|
||||
print("INFO: no data: no output")
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user