import bge # Bibliothèque Blender Game Engine (UPBGE) import os import sys import importlib import subprocess # Multiprocessus import time import csv ############################################################################### # twin_plot.py # @title: Visualisation des données # @project: Blender-EduTech # @lang: fr # @authors: Philippe Roy # @copyright: Copyright (C) 2023 Philippe Roy # @license: GNU GPL ############################################################################### # UPBGE scene scene = bge.logic.getCurrentScene() # Récupérer la configuration du graphique system=importlib.import_module(scene.objects['System']['system']) # Système plot_config = system.get_public_vars() # CSV csv_data=[] ############################################################################### # Accès aux variables publiques du système ############################################################################### def get(data): if data in plot_config: if len (plot_config[data][0])>2: # Echelle à prendre en compte return(scene.objects[plot_config[data][0][0]][plot_config[data][0][1]]*plot_config[data][0][2]) else: return(scene.objects[plot_config[data][0][0]][plot_config[data][0][1]]) else: print ("Erreur sur l'accès aux variables par get("+data+"), la variable '"+data+"' absente du système.") return None ############################################################################### # Rapport CSV ############################################################################### # Démarage et configuration des variables def csv_start(data): print ("Acquisition des données CSV") scene.objects['System']['csv_var'] =[] data_line = ['t'] for var in data: scene.objects['System']['csv_var'].append([plot_config[var][0][0], plot_config[var][0][1]]) data_line.append(var) print (scene.objects['System']['csv_var']) csv_data.append(data_line) scene.objects['System']['csv']=True # Ajout des données (fps = 60) def csv_maj(cont): data_line = [round(scene.objects['System']['time'], 3)] for var in scene.objects['System']['csv_var']: data_line.append(round(scene.objects[var[0]][var[1]], 3)) csv_data.append(data_line) # Génération du fichier def localize_floats(row): return [ str(el).replace('.', ',') if isinstance(el, float) else el for el in row ] def csv_generate(): fichier_csv=scene.objects['System']['system']+'.csv' print ("Génération du fichier :", fichier_csv) scene.objects['Cmd-text']['Text']= "Génération du fichier :", fichier_csv scene.objects['System']['csv']=False with open(fichier_csv, 'w') as csvfile: writer = csv.writer(csvfile, delimiter = ';', lineterminator = '\n') for t_data in csv_data: writer.writerow(localize_floats(t_data)) ############################################################################### # Création du graphique ############################################################################### def plot(data): # subprocess.run([sys.executable, os.path.join(os.getcwd(), "twin_plot.py")], , stdin=subprocess.PIPE) # Process bloquant # Terminer le processus précédent if scene.objects['System']['plot_proc'] is not None: if scene.objects['System']['plot_proc'].poll()==None: scene.objects['System']['plot_proc'].terminate() scene.objects['System']['plot_draw'] = True scene.objects['System']['plot_time'] = 0 # Configuration des données scene.objects['System']['plot_data'] =[] for obj in data: scene.objects['System']['plot_data'].append(plot_config[obj][0][0]) # Démarrer le processus scene.objects['System']['plot_proc'] = subprocess.Popen([sys.executable, os.path.join(os.getcwd(), "twin_plot_qt.py")], stdin=subprocess.PIPE, encoding = 'utf8', universal_newlines=True) # # scene.objects['System']['plot_proc'] = subprocess.Popen([sys.executable, os.path.join(os.getcwd(), "twin_plot_qt.py")], stdin=subprocess.PIPE, stdout=subprocess.PIPE, encoding = 'utf8') # # stout = scene.objects['System']['plot_proc'].communicate() # FIXME : attente du message retour pour lancer l'acquisition # # print("Blender stout : ", stout) scene.objects['System']['plot']=True ############################################################################### # Mise à jour du graphique ############################################################################### def truncate(n, decimals=0): multiplier = 10**decimals return int(n* multiplier)/multiplier def plot_maj(cont): if cont.sensors['Plot'].positive : # Affichage du graphique if scene.objects['System']['plot_draw']: if scene.objects['System']['plot_time'] != truncate(scene.objects['System']['time'], 0) or True: # Préparation du message # FIXME : ajouter les valeurs réelles et valeurs numériques ('activated_real') scene.objects['System']['plot_time'] = truncate(scene.objects['System']['time'], 0) # msg=str(round(scene.objects['System']['plot_time'], 1)) # msg=format(scene.objects['System']['plot_time'],".2f") msg=format(scene.objects['System']['plot_time'],".2f") for obj in scene.objects['System']['plot_data']: if scene.objects[obj]['activated']: msg = msg+",1" else: msg = msg+",0" msg = msg+"\n" # Envoi (Pipe) if scene.objects['System']['plot_proc'].poll()==None: # # scene.objects['System']['plot_proc'].communicate(input=time_send.encode())[0] # Communication bloquante # scene.objects['System']['plot_proc'].communicate(input=msg.encode())[0] # Communication bloquante scene.objects['System']['plot_proc'].communicate(input=msg.encode()) # Communication bloquante # scene.objects['System']['plot_proc'].stdin.write(msg) print ("twin : ", msg) else: # Arrêt print ("Stop") scene.objects['System']['plot']=False scene.objects['System']['plot_draw'] =False scene.objects['System']['plot_proc'].terminate() # Arrêt de l'affichage du graphique if scene.objects['System']['plot_proc'].poll()==None: pass else: print ("Stop") scene.objects['System']['plot']=False scene.objects['System']['plot_draw'] =False scene.objects['System']['plot_proc'].terminate()