mirror of
https://forge.apps.education.fr/blender-edutech/jumeaux-numeriques.git
synced 2024-01-27 06:56:18 +01:00
261 lines
10 KiB
Python
261 lines
10 KiB
Python
import bge # Bibliothèque Blender Game Engine (UPBGE)
|
|
import os
|
|
import sys
|
|
import importlib
|
|
import threading # Multithreading
|
|
import subprocess # Multiprocessus
|
|
import time
|
|
import csv
|
|
import xml.etree.ElementTree as ET # Creating/parsing XML file
|
|
|
|
###############################################################################
|
|
# twin_daq.py
|
|
# @title: Enregistement et visualisation des données
|
|
# @project: Blender-EduTech
|
|
# @lang: fr
|
|
# @authors: Philippe Roy <philippe.roy@ac-grenoble.fr>
|
|
# @copyright: Copyright (C) 2023 Philippe Roy
|
|
# @license: GNU GPL
|
|
###############################################################################
|
|
|
|
# UPBGE scene
|
|
scene = bge.logic.getCurrentScene()
|
|
|
|
# Récupérer la configuration du graphique
|
|
system=importlib.import_module(scene.objects['System']['system']) # Système
|
|
daq_config = system.get_public_vars()
|
|
|
|
# Données de l'acquisition
|
|
daq_data=[]
|
|
|
|
###############################################################################
|
|
# Accès aux variables publiques du système
|
|
###############################################################################
|
|
|
|
def get(data):
|
|
if data in daq_config:
|
|
if len (daq_config[data][0])>3: # Echelle à prendre en compte
|
|
return(scene.objects[daq_config[data][0][0]][daq_config[data][0][1]]*daq_config[data][0][3])
|
|
else:
|
|
return(scene.objects[daq_config[data][0][0]][daq_config[data][0][1]])
|
|
else:
|
|
print ("Erreur sur l'accès aux variables par get("+data+"), la variable '"+data+"' absente du système.")
|
|
return None
|
|
|
|
###############################################################################
|
|
# Enregistrement des données
|
|
###############################################################################
|
|
|
|
##
|
|
# Activation du mode DAQ
|
|
##
|
|
|
|
def daq(data):
|
|
daq_data.clear()
|
|
scene.objects['System']['daq_var'] =[]
|
|
row = ['t']
|
|
for var in data:
|
|
scene.objects['System']['daq_var'].append([daq_config[var][0][0], daq_config[var][0][1]])
|
|
row.append(var)
|
|
# print ("Acquisition des données :", scene.objects['System']['daq_var'])
|
|
daq_data.append(row)
|
|
scene.objects['System']['daq']=True
|
|
|
|
##
|
|
# Ajout des données (lecture/s = fps = 60)
|
|
##
|
|
|
|
def daq_add(cont):
|
|
row = [round(scene.objects['System']['time'], 3)]
|
|
for var in scene.objects['System']['daq_var']:
|
|
row.append(round(scene.objects[var[0]][var[1]], 3))
|
|
daq_data.append(row)
|
|
|
|
###############################################################################
|
|
# Tableau CSV
|
|
###############################################################################
|
|
|
|
# Format français des décimaux
|
|
def localize_floats(row):
|
|
return [
|
|
str(el).replace('.', ',') if isinstance(el, float) else el
|
|
for el in row
|
|
]
|
|
|
|
##
|
|
# Génération du fichier
|
|
##
|
|
|
|
def csv_generate():
|
|
scene.objects['System']['daq']=False
|
|
fichier_csv=os.path.join(os.path.split((scene.objects['System']['script']))[0], scene.objects['System']['system']+'.csv')
|
|
scene.objects['Cmd-text']['modal']= True
|
|
scene.objects['Cmd-text']['Text']= "Génération du fichier : "+fichier_csv
|
|
scene.objects['Cmd-text'].setVisible(True,False)
|
|
with open(fichier_csv, 'w') as csvfile:
|
|
writer = csv.writer(csvfile, delimiter = ';', lineterminator = '\n')
|
|
for t_data in daq_data:
|
|
writer.writerow(localize_floats(t_data))
|
|
|
|
###############################################################################
|
|
# Graphique statique
|
|
###############################################################################
|
|
|
|
##
|
|
# Création du fichier twin_plot.xml (configuration du graphique)
|
|
##
|
|
|
|
def plot_config_generate(data_groups):
|
|
|
|
# Création du XML
|
|
daq_config_list=list(daq_config)
|
|
xml_data = ET.Element('data')
|
|
xml_figure = ET.SubElement(xml_data, 'figure')
|
|
xml_plot = ET.SubElement(xml_figure, 'plot')
|
|
for var in daq_config_list:
|
|
xml_var = ET.SubElement(xml_plot, 'var')
|
|
xml_var.text=var
|
|
|
|
# Configuration graphique de ma série de donnée
|
|
if len(daq_config[var][2]) >=4:
|
|
xml_mark = ET.SubElement(xml_var, 'marker')
|
|
xml_mark.text=daq_config[var][2][0]
|
|
xml_line = ET.SubElement(xml_var, 'linestyle')
|
|
xml_line.text=daq_config[var][2][1]
|
|
xml_color = ET.SubElement(xml_var, 'color')
|
|
xml_color.text=daq_config[var][2][2]
|
|
xml_size = ET.SubElement(xml_var, 'linewidth')
|
|
xml_size.text=str(daq_config[var][2][3])
|
|
xml_type = ET.SubElement(xml_var, 'type') # Binaire, analogique ou numérique
|
|
xml_type.text=str(daq_config[var][0][2])
|
|
|
|
# Détection de groupe de graphique
|
|
data_group_i=0
|
|
xml_group = ET.SubElement(xml_var, 'group')
|
|
xml_group.text="-1" # Pas de groupe par défaut
|
|
for data_group in data_groups:
|
|
if isinstance(data_group, list):
|
|
data_group_i+=1
|
|
for data_subgroup in data_group: # Scan du sous-groupe
|
|
if data_subgroup == var:
|
|
xml_group.text=str(data_group_i)
|
|
else:
|
|
if data_group == var: # Pas de groupe
|
|
xml_group.text="0"
|
|
|
|
# Ecriture fichier
|
|
plot_config = ET.ElementTree(xml_data)
|
|
with open("plot_config.xml", "wb") as f: # XML généré plat (not pretty print)
|
|
plot_config.write(f)
|
|
|
|
##
|
|
# Activation du mode Plot
|
|
##
|
|
|
|
def plot(data_groups):
|
|
plot_config_generate(data_groups)
|
|
scene.objects['System']['plot']=True
|
|
|
|
##
|
|
# Génération du graphique
|
|
# Threading -> reste actif après le Stop
|
|
##
|
|
|
|
def plot_generate_thread(csv_file):
|
|
if sys.platform=="linux": # wxPython ne s'installe pas bien sur GNU/linux -> Qt5
|
|
command = sys.executable + " " + os.path.join(sys.executable, os.getcwd(), "twin_plot_qt.py") + " "+ csv_file
|
|
else: # Qt5 ne s'installe pas bien sur Windows -> wxPython
|
|
command = sys.executable + " " + os.path.join(sys.executable, os.getcwd(), "twin_plot_wx.py") + " "+ csv_file
|
|
os.system(command)
|
|
|
|
def plot_generate(threads):
|
|
csv_file=os.path.join(os.path.split((scene.objects['System']['script']))[0], scene.objects['System']['system']+'.csv')
|
|
threads.append(threading.Thread(target=plot_generate_thread, args=(csv_file,)))
|
|
threads[len(threads)-1].start()
|
|
|
|
###############################################################################
|
|
# Graphique interactif
|
|
# FIXME : ne marche pas
|
|
###############################################################################
|
|
|
|
##
|
|
# Mise en forme de décimaux
|
|
##
|
|
|
|
def truncate(n, decimals=0):
|
|
multiplier = 10**decimals
|
|
return int(n* multiplier)/multiplier
|
|
|
|
##
|
|
# Création du graphique interactif
|
|
##
|
|
|
|
def plot_start(data):
|
|
# subprocess.run([sys.executable, os.path.join(os.getcwd(), "twin_plot.py")], , stdin=subprocess.PIPE) # Process bloquant
|
|
|
|
# Terminer le processus plot précédent
|
|
if ('plot_proc' in scene.objects['System']):
|
|
if scene.objects['System']['plot_proc'].poll()==None:
|
|
scene.objects['System']['plot_proc'].terminate()
|
|
scene.objects['System']['plot_draw'] = True
|
|
scene.objects['System']['plot_time'] = 0
|
|
|
|
# Configuration des données
|
|
scene.objects['System']['plot_data'] =[]
|
|
for obj in data:
|
|
scene.objects['System']['plot_data'].append(daq_config[obj][0][0])
|
|
|
|
# Démarrer le processus plot
|
|
scene.objects['System']['plot_proc'] = subprocess.Popen([sys.executable, os.path.join(os.getcwd(), "twin_plot_qt.py")], stdin=subprocess.PIPE, encoding = 'utf8', universal_newlines=True)
|
|
# # scene.objects['System']['plot_proc'] = subprocess.Popen([sys.executable, os.path.join(os.getcwd(), "twin_plot_qt.py")], stdin=subprocess.PIPE, stdout=subprocess.PIPE, encoding = 'utf8')
|
|
# # stout = scene.objects['System']['plot_proc'].communicate() # FIXME : attente du message retour pour lancer l'acquisition
|
|
# # print("Blender stout : ", stout)
|
|
scene.objects['System']['plot']=True
|
|
|
|
##
|
|
# Ajout des données (lecture/s = fps = 60)
|
|
##
|
|
|
|
def plot_add(cont):
|
|
if cont.sensors['Plot'].positive :
|
|
|
|
# Affichage du graphique
|
|
if scene.objects['System']['plot_draw']:
|
|
if scene.objects['System']['plot_time'] != truncate(scene.objects['System']['time'], 0) or True:
|
|
|
|
# Préparation du message
|
|
# FIXME : ajouter les valeurs réelles et valeurs numériques ('activated_real')
|
|
scene.objects['System']['plot_time'] = truncate(scene.objects['System']['time'], 0)
|
|
# msg=str(round(scene.objects['System']['plot_time'], 1))
|
|
# msg=format(scene.objects['System']['plot_time'],".2f")
|
|
msg=format(scene.objects['System']['plot_time'],".2f")
|
|
for obj in scene.objects['System']['plot_data']:
|
|
if scene.objects[obj]['activated']:
|
|
msg = msg+",1"
|
|
else:
|
|
msg = msg+",0"
|
|
msg = msg+"\n"
|
|
|
|
# Envoi (Pipe)
|
|
if scene.objects['System']['plot_proc'].poll()==None:
|
|
# # scene.objects['System']['plot_proc'].communicate(input=time_send.encode())[0] # Communication bloquante
|
|
# scene.objects['System']['plot_proc'].communicate(input=msg.encode())[0] # Communication bloquante
|
|
scene.objects['System']['plot_proc'].communicate(input=msg.encode()) # Communication bloquante
|
|
# scene.objects['System']['plot_proc'].stdin.write(msg)
|
|
print ("twin : ", msg)
|
|
else:
|
|
# Arrêt
|
|
print ("Stop")
|
|
scene.objects['System']['plot']=False
|
|
scene.objects['System']['plot_draw'] =False
|
|
scene.objects['System']['plot_proc'].terminate()
|
|
|
|
# Arrêt de l'affichage du graphique
|
|
if scene.objects['System']['plot_proc'].poll()==None:
|
|
pass
|
|
else:
|
|
print ("Stop")
|
|
scene.objects['System']['plot']=False
|
|
scene.objects['System']['plot_draw'] =False
|
|
scene.objects['System']['plot_proc'].terminate()
|