import bge # Bibliothèque Blender Game Engine (UPBGE) import os import sys import importlib import threading # Multithreading import subprocess # Multiprocessus import time import csv import xml.etree.ElementTree as ET # Creating/parsing XML file ############################################################################### # twin_daq.py # @title: Enregistement et visualisation des données # @project: Blender-EduTech # @lang: fr # @authors: Philippe Roy # @copyright: Copyright (C) 2023 Philippe Roy # @license: GNU GPL ############################################################################### # UPBGE scene scene = bge.logic.getCurrentScene() # Récupérer la configuration du graphique system=importlib.import_module(scene.objects['System']['system']) # Système daq_config = system.get_public_vars() # Données de l'acquisition daq_data=[] ############################################################################### # Accès aux variables publiques du système ############################################################################### def get(data): if data in daq_config: if len (daq_config[data][0])>3: # Echelle à prendre en compte return(scene.objects[daq_config[data][0][0]][daq_config[data][0][1]]*daq_config[data][0][3]) else: return(scene.objects[daq_config[data][0][0]][daq_config[data][0][1]]) else: print ("Erreur sur l'accès aux variables par get("+data+"), la variable '"+data+"' absente du système.") return None ############################################################################### # Enregistrement des données ############################################################################### ## # Activation du mode DAQ ## def daq(data): daq_data.clear() scene.objects['System']['daq_var'] =[] row = ['t'] for var in data: scene.objects['System']['daq_var'].append([daq_config[var][0][0], daq_config[var][0][1]]) row.append(var) # print ("Acquisition des données :", scene.objects['System']['daq_var']) daq_data.append(row) scene.objects['System']['daq']=True ## # Ajout des données (lecture/s = fps = 60) ## def daq_add(cont): row = [round(scene.objects['System']['time'], 3)] for var in scene.objects['System']['daq_var']: row.append(round(scene.objects[var[0]][var[1]], 3)) daq_data.append(row) ############################################################################### # Tableau CSV ############################################################################### # Format français des décimaux def localize_floats(row): return [ str(el).replace('.', ',') if isinstance(el, float) else el for el in row ] ## # Génération du fichier ## def csv_generate(): scene.objects['System']['daq']=False fichier_csv=os.path.join(os.path.split((scene.objects['System']['script']))[0], scene.objects['System']['system']+'.csv') scene.objects['Cmd-text']['modal']= True scene.objects['Cmd-text']['Text']= "Génération du fichier : "+fichier_csv scene.objects['Cmd-text'].setVisible(True,False) with open(fichier_csv, 'w') as csvfile: writer = csv.writer(csvfile, delimiter = ';', lineterminator = '\n') for t_data in daq_data: writer.writerow(localize_floats(t_data)) ############################################################################### # Graphique statique ############################################################################### ## # Création du fichier twin_plot.xml (configuration du graphique) ## def plot_config_generate(data_groups): # Création du XML daq_config_list=list(daq_config) xml_data = ET.Element('data') xml_figure = ET.SubElement(xml_data, 'figure') xml_plot = ET.SubElement(xml_figure, 'plot') for var in daq_config_list: xml_var = ET.SubElement(xml_plot, 'var') xml_var.text=var # Configuration graphique de ma série de donnée if len(daq_config[var][2]) >=4: xml_mark = ET.SubElement(xml_var, 'marker') xml_mark.text=daq_config[var][2][0] xml_line = ET.SubElement(xml_var, 'linestyle') xml_line.text=daq_config[var][2][1] xml_color = ET.SubElement(xml_var, 'color') xml_color.text=daq_config[var][2][2] xml_size = ET.SubElement(xml_var, 'linewidth') xml_size.text=str(daq_config[var][2][3]) xml_type = ET.SubElement(xml_var, 'type') xml_type.text=str(daq_config[var][0][2]) # Détection de groupe de graphique data_group_i=0 xml_group = ET.SubElement(xml_var, 'group') xml_group.text="-1" # Pas de groupe par défaut for data_group in data_groups: if isinstance(data_group, list): data_group_i+=1 for data_subgroup in data_group: # Scan du sous-groupe if data_subgroup == var: xml_group.text=str(data_group_i) else: if data_group == var: # Pas de groupe xml_group.text="0" # Ecriture fichier plot_config = ET.ElementTree(xml_data) with open("plot_config.xml", "wb") as f: # XML généré plat (not pretty print) plot_config.write(f) ## # Activation du mode Plot ## def plot(data_groups): plot_config_generate(data_groups) scene.objects['System']['plot']=True ## # Génération du graphique # Threading -> reste actif après le Stop ## def plot_generate_thread(csv_file): if sys.platform=="linux": # wxPython ne s'installe pas bien sur GNU/linux -> Qt5 command = sys.executable + " " + os.path.join(sys.executable, os.getcwd(), "twin_plot_qt.py") + " "+ csv_file else: # Qt5 ne s'installe pas bien sur Windows -> wxPython command = sys.executable + " " + os.path.join(sys.executable, os.getcwd(), "twin_plot_wx.py") + " "+ csv_file os.system(command) def plot_generate(threads): csv_file=os.path.join(os.path.split((scene.objects['System']['script']))[0], scene.objects['System']['system']+'.csv') threads.append(threading.Thread(target=plot_generate_thread, args=(csv_file,))) threads[len(threads)-1].start() ############################################################################### # Graphique interactif # FIXME : ne marche pas ############################################################################### ## # Mise en forme de décimaux ## def truncate(n, decimals=0): multiplier = 10**decimals return int(n* multiplier)/multiplier ## # Création du graphique interactif ## def plot_start(data): # subprocess.run([sys.executable, os.path.join(os.getcwd(), "twin_plot.py")], , stdin=subprocess.PIPE) # Process bloquant # Terminer le processus plot précédent if ('plot_proc' in scene.objects['System']): if scene.objects['System']['plot_proc'].poll()==None: scene.objects['System']['plot_proc'].terminate() scene.objects['System']['plot_draw'] = True scene.objects['System']['plot_time'] = 0 # Configuration des données scene.objects['System']['plot_data'] =[] for obj in data: scene.objects['System']['plot_data'].append(daq_config[obj][0][0]) # Démarrer le processus plot scene.objects['System']['plot_proc'] = subprocess.Popen([sys.executable, os.path.join(os.getcwd(), "twin_plot_qt.py")], stdin=subprocess.PIPE, encoding = 'utf8', universal_newlines=True) # # scene.objects['System']['plot_proc'] = subprocess.Popen([sys.executable, os.path.join(os.getcwd(), "twin_plot_qt.py")], stdin=subprocess.PIPE, stdout=subprocess.PIPE, encoding = 'utf8') # # stout = scene.objects['System']['plot_proc'].communicate() # FIXME : attente du message retour pour lancer l'acquisition # # print("Blender stout : ", stout) scene.objects['System']['plot']=True ## # Ajout des données (lecture/s = fps = 60) ## def plot_add(cont): if cont.sensors['Plot'].positive : # Affichage du graphique if scene.objects['System']['plot_draw']: if scene.objects['System']['plot_time'] != truncate(scene.objects['System']['time'], 0) or True: # Préparation du message # FIXME : ajouter les valeurs réelles et valeurs numériques ('activated_real') scene.objects['System']['plot_time'] = truncate(scene.objects['System']['time'], 0) # msg=str(round(scene.objects['System']['plot_time'], 1)) # msg=format(scene.objects['System']['plot_time'],".2f") msg=format(scene.objects['System']['plot_time'],".2f") for obj in scene.objects['System']['plot_data']: if scene.objects[obj]['activated']: msg = msg+",1" else: msg = msg+",0" msg = msg+"\n" # Envoi (Pipe) if scene.objects['System']['plot_proc'].poll()==None: # # scene.objects['System']['plot_proc'].communicate(input=time_send.encode())[0] # Communication bloquante # scene.objects['System']['plot_proc'].communicate(input=msg.encode())[0] # Communication bloquante scene.objects['System']['plot_proc'].communicate(input=msg.encode()) # Communication bloquante # scene.objects['System']['plot_proc'].stdin.write(msg) print ("twin : ", msg) else: # Arrêt print ("Stop") scene.objects['System']['plot']=False scene.objects['System']['plot_draw'] =False scene.objects['System']['plot_proc'].terminate() # Arrêt de l'affichage du graphique if scene.objects['System']['plot_proc'].poll()==None: pass else: print ("Stop") scene.objects['System']['plot']=False scene.objects['System']['plot_draw'] =False scene.objects['System']['plot_proc'].terminate()