jumeaux-numeriques/twin_daq.py

187 lines
7.7 KiB
Python

import bge # Bibliothèque Blender Game Engine (UPBGE)
import os
import sys
import importlib
import subprocess # Multiprocessus
import time
import csv
###############################################################################
# twin_daq.py
# @title: Enregistement et visualisation des données
# @project: Blender-EduTech
# @lang: fr
# @authors: Philippe Roy <philippe.roy@ac-grenoble.fr>
# @copyright: Copyright (C) 2023 Philippe Roy
# @license: GNU GPL
###############################################################################
# UPBGE scene
scene = bge.logic.getCurrentScene()
# Récupérer la configuration du graphique
system=importlib.import_module(scene.objects['System']['system']) # Système
daq_config = system.get_public_vars()
# CSV
daq_data=[]
###############################################################################
# Accès aux variables publiques du système
###############################################################################
def get(data):
if data in daq_config:
if len (daq_config[data][0])>2: # Echelle à prendre en compte
return(scene.objects[daq_config[data][0][0]][daq_config[data][0][1]]*daq_config[data][0][2])
else:
return(scene.objects[daq_config[data][0][0]][daq_config[data][0][1]])
else:
print ("Erreur sur l'accès aux variables par get("+data+"), la variable '"+data+"' absente du système.")
return None
###############################################################################
# Enregistrement des données
###############################################################################
# Activation du mode DAQ
def daq(data):
daq_data.clear()
scene.objects['System']['daq_var'] =[]
data_line = ['t']
for var in data:
scene.objects['System']['daq_var'].append([daq_config[var][0][0], daq_config[var][0][1]])
data_line.append(var)
print ("Acquisition des données :", scene.objects['System']['daq_var'])
daq_data.append(data_line)
scene.objects['System']['daq']=True
# Ajout des données (fps = 60)
def daq_add(cont):
data_line = [round(scene.objects['System']['time'], 3)]
for var in scene.objects['System']['daq_var']:
data_line.append(round(scene.objects[var[0]][var[1]], 3))
daq_data.append(data_line)
###############################################################################
# Tableau CSV
###############################################################################
# Format français des décimaux
def localize_floats(row):
return [
str(el).replace('.', ',') if isinstance(el, float) else el
for el in row
]
# Génération du fichier
def csv_generate():
scene.objects['System']['daq']=False
fichier_csv=scene.objects['System']['system']+'.csv'
print ("Génération du fichier :", fichier_csv)
scene.objects['Cmd-text']['modal']= True
scene.objects['Cmd-text']['Text']= "Génération du fichier : "+fichier_csv
scene.objects['Cmd-text'].setVisible(True,False)
with open(fichier_csv, 'w') as csvfile:
writer = csv.writer(csvfile, delimiter = ';', lineterminator = '\n')
for t_data in daq_data:
writer.writerow(localize_floats(t_data))
###############################################################################
# Graphique statique
###############################################################################
# Activation du mode Plot
def plot():
scene.objects['System']['plot']=True
# Fermer le processus du graphique
def plot_close():
if scene.objects['System']['plot_proc'] is not None:
if scene.objects['System']['plot_proc'].poll()==None:
scene.objects['System']['plot_proc'].terminate()
# Génération du graphique
def plot_generate():
plot_close()
fichier_csv=scene.objects['System']['system']+'.csv'
scene.objects['System']['plot_proc'] = subprocess.Popen([sys.executable, os.path.join(os.getcwd(), "twin_plot_qt.py"), fichier_csv])
scene.objects['System']['plot']=False
###############################################################################
# Graphique interactif
# FIXME : ne marche pas
###############################################################################
# Mise en forme de décimaux
def truncate(n, decimals=0):
multiplier = 10**decimals
return int(n* multiplier)/multiplier
# Création du graphique interactif
def plot_start(data):
# subprocess.run([sys.executable, os.path.join(os.getcwd(), "twin_plot.py")], , stdin=subprocess.PIPE) # Process bloquant
# Terminer le processus précédent
if scene.objects['System']['plot_proc'] is not None:
if scene.objects['System']['plot_proc'].poll()==None:
scene.objects['System']['plot_proc'].terminate()
scene.objects['System']['plot_draw'] = True
scene.objects['System']['plot_time'] = 0
# Configuration des données
scene.objects['System']['plot_data'] =[]
for obj in data:
scene.objects['System']['plot_data'].append(daq_config[obj][0][0])
# Démarrer le processus
scene.objects['System']['plot_proc'] = subprocess.Popen([sys.executable, os.path.join(os.getcwd(), "twin_plot_qt.py")], stdin=subprocess.PIPE, encoding = 'utf8', universal_newlines=True)
# # scene.objects['System']['plot_proc'] = subprocess.Popen([sys.executable, os.path.join(os.getcwd(), "twin_plot_qt.py")], stdin=subprocess.PIPE, stdout=subprocess.PIPE, encoding = 'utf8')
# # stout = scene.objects['System']['plot_proc'].communicate() # FIXME : attente du message retour pour lancer l'acquisition
# # print("Blender stout : ", stout)
scene.objects['System']['plot']=True
# Ajout des données (fps = 60)
def plot_add(cont):
if cont.sensors['Plot'].positive :
# Affichage du graphique
if scene.objects['System']['plot_draw']:
if scene.objects['System']['plot_time'] != truncate(scene.objects['System']['time'], 0) or True:
# Préparation du message
# FIXME : ajouter les valeurs réelles et valeurs numériques ('activated_real')
scene.objects['System']['plot_time'] = truncate(scene.objects['System']['time'], 0)
# msg=str(round(scene.objects['System']['plot_time'], 1))
# msg=format(scene.objects['System']['plot_time'],".2f")
msg=format(scene.objects['System']['plot_time'],".2f")
for obj in scene.objects['System']['plot_data']:
if scene.objects[obj]['activated']:
msg = msg+",1"
else:
msg = msg+",0"
msg = msg+"\n"
# Envoi (Pipe)
if scene.objects['System']['plot_proc'].poll()==None:
# # scene.objects['System']['plot_proc'].communicate(input=time_send.encode())[0] # Communication bloquante
# scene.objects['System']['plot_proc'].communicate(input=msg.encode())[0] # Communication bloquante
scene.objects['System']['plot_proc'].communicate(input=msg.encode()) # Communication bloquante
# scene.objects['System']['plot_proc'].stdin.write(msg)
print ("twin : ", msg)
else:
# Arrêt
print ("Stop")
scene.objects['System']['plot']=False
scene.objects['System']['plot_draw'] =False
scene.objects['System']['plot_proc'].terminate()
# Arrêt de l'affichage du graphique
if scene.objects['System']['plot_proc'].poll()==None:
pass
else:
print ("Stop")
scene.objects['System']['plot']=False
scene.objects['System']['plot_draw'] =False
scene.objects['System']['plot_proc'].terminate()