import bge # Bibliothèque Blender Game Engine (UPBGE) import threading # Multithreading import trace import sys import time import serial # Liaison série import pyfirmata # Protocole Firmata from serial.tools.list_ports import comports # Détection du port automatique ############################################################################### # volrou_lib.py # @title: Bibliothèque utilisateur du volet roulant # @project: Blender-EduTech # @lang: fr # @authors: Philippe Roy # @copyright: Copyright (C) 2022 Philippe Roy # @license: GNU GPL ############################################################################### scene = bge.logic.getCurrentScene() # Threads threads_cmd=[] debug_thread = scene.objects['System']['debug_thread'] # Jumeau numérique board = None board_it = None # Iterator (input) bp_int_pin = None bp_ext_pin = None fdc_o_pin = None fdc_f_pin = None ir_emett_pin = None ir_recept_pin = None mot_o_pin = None mot_f_pin = None gyr_pin = None # UPBGE constants JUST_ACTIVATED = bge.logic.KX_INPUT_JUST_ACTIVATED JUST_RELEASED = bge.logic.KX_INPUT_JUST_RELEASED ACTIVATE = bge.logic.KX_INPUT_ACTIVE # JUST_DEACTIVATED = bge.logic.KX_SENSOR_JUST_DEACTIVATED ############################################################################### # Méthode kill pour les tâches (threads) ############################################################################### class thread_with_trace(threading.Thread): def __init__(self, *args, **keywords): threading.Thread.__init__(self, *args, **keywords) self.killed = False def start(self): self.__run_backup = self.run self.run = self.__run threading.Thread.start(self) def __run(self): sys.settrace(self.globaltrace) self.__run_backup() self.run = self.__run_backup def globaltrace(self, frame, event, arg): if event == 'call': return self.localtrace else: return None def localtrace(self, frame, event, arg): if self.killed: if event == 'line': raise SystemExit() return self.localtrace def kill(self): self.killed = True ############################################################################### # Start et stop des tâches (threads) ############################################################################### def thread_start(threads, type_txt, fct): threads.append(thread_with_trace(target = fct)) threads[len(threads)-1].start() if (debug_thread): print ("Thread",type_txt, "#", len(threads)-1, "open.") def thread_stop(threads, type_txt): i=0 zombie_flag=False for t in threads: if not t.is_alive(): if (debug_thread): print ("Thread",type_txt, "#",i,"closed.") else: if (debug_thread): print ("Thread",type_txt, "#",i,"still open ...") t.kill() t.join() if not t.is_alive(): if (debug_thread): print ("Thread",type_txt, "#",i,"killed.") else: if (debug_thread): print ("Thread",type_txt, "#",i,"zombie...") zombie_flag=True i +=1 if zombie_flag==False: if (debug_thread): print ("All threads",type_txt, "are closed.") scene.objects['System']['thread_cmd']=False return True else: if (debug_thread): print ("There are zombies threads",type_txt, ".") return False def thread_cmd_start(fct): thread_start(threads_cmd, "commands", fct) def thread_cmd_stop(): thread_stop(threads_cmd, "commands") def end(): # Jumeau numérique if scene.objects['System']['twins']: # serial_msg = "FI\n" # twins_serial.write(serial_msg.encode()) # Communication série : modele 3d -> carte communication ( arduino | micro:bit ) jumeau_close() # Thread if (debug_thread): print ("Thread commands is arrived.") time.sleep(0.125) scene.objects['System']['thread_cmd']=False time.sleep(0.125) def fin(): end() def quit(): end() ############################################################################### # Actionneurs ############################################################################### # Ordres utilisateur du clignotant def gyr (ordre): global gyr_pin scene.objects['Module led']['actif']=ordre if scene.objects['System']['twins'] : if ordre : gyr_pin.write(1) else: gyr_pin.write(0) # Ordres utilisateur du moteur def mot_o (ordre): scene.objects['Ensemble moteur']['actif_ouvrir']=ordre def mot_f (ordre): scene.objects['Ensemble moteur']['actif_fermer']=ordre # Ordre utilisateur du capteur barrage IR def ir_emet(ordre): scene.objects['Module emetteur IR']['actif']=ordre ############################################################################### # Capteurs ############################################################################### # Compte-rendu utilisateur du capteur fin de course portail ouvert def fdc_o (): return scene.objects['Capteur fdc ouvert']['actif'] # Compte-rendu utilisateur du capteur fin de course portail ouvert def fdc_f (): return scene.objects['Capteur fdc ferme']['actif'] # Compte-rendu utilisateur du capteur barrage IR def ir_recep (): if scene.objects['Module recepteur IR']['actif'] : return False else: return True ############################################################################### # Boutons poussoirs ############################################################################### # Compte-rendu utilisateur du bouton pousssoir coté rue def bp_ext (): return scene.objects['Module bouton cote rue']['actif'] # Compte-rendu utilisateur du bouton pousssoir coté cour def bp_int (): return scene.objects['Module bouton cote cour']['actif'] ############################################################################### # Temporisation ############################################################################### def tempo (duree): time.sleep(duree) ############################################################################### # Jumeau numérique ############################################################################### ## # Recherche automatique du port ## def serial_autoget_port(): # USB Vendor ID, USB Product ID board_dict={'microbit' :[3368, 516], 'uno' :[9025, 67], 'mega' :[9025, 66]} for com in comports(): # Arduino Uno if com.vid == board_dict["uno"][0] and com.pid == board_dict["uno"][1]: return [com.device,"Arduino Uno"] for com in comports(): # Arduino Mega if com.vid == board_dict["mega"][0] and com.pid == board_dict["mega"][1]: return [com.device,"Arduino Mega"] return [None,""] ## # Création de l'objet carte (protocole Firmata) ## def board_init(port): try: return pyfirmata.Arduino(port) except: return None ## # Création de l'objet serial (communication série) ## def serial_init(port,speed): try: return serial.Serial(port,speed) except: return None ## # Affiche la liste des cartes (communication série) ## def serial_devices(): for com in comports(): print ("Name : "+str(com.name)+"\n" +" Device : "+str(com.device)+"\n" +" Hardware ID : "+str(com.hwid)+"\n" +" USB Vendor ID : "+str(com.vid)+"\n" +" USB Product ID : "+str(com.pid)+"\n" +" USB device location : "+str(com.location)+"\n" +" USB manufacturer : "+str(com.manufacturer)+"\n" +" USB product : "+str(com.product)+"\n" +" Interface-specific : "+str(com.interface)) ## # Activation de la communication avec la carte de communication (Arduino, Micro:bit) # Vitesse : 115200 -> 7 fps, 38400 -> 6 fps, 9600 -> 2 fps # pyserial : baudrate=115200 # pyfirmata : baudrate=57600 ## def jumeau(pins): global board # global gyr_pin # UI : étape 1 scene.objects['Twins-icon'].setVisible(True,True) scene.objects['Twins-text']['Text'] = "Connection en cours ..." scene.objects['Twins-text'].setVisible(True,False) # Mise en place de la carte speed = 57600 [device,board_name] =serial_autoget_port() # Recherche automatique du port if device is None: scene.objects['System']['twins'] = False scene.objects['Twins-text']['Text'] = "Aucune connection disponible : jumeau réel débranché." return False board = board_init(device) if board is None: scene.objects['System']['twins'] = False scene.objects['Twins-text']['Text'] = "Aucune connection disponible : port "+device+" pas prêt" return False scene.objects['System']['twins'] = True # scene.objects['System']['twins_close'] = False scene.objects['System']['twins_port'] = device scene.objects['System']['twins_speed'] = speed # scene.objects['System']['twins_readline'] = "" board_it = pyfirmata.util.Iterator(board) # Itérateur pour les entrées board_it.start() # UI : étape 2 if board =="": scene.objects['Twins-text']['Text'] = "Connection ouverte : "+device+" - "+str(speed)+" baud" else: scene.objects['Twins-text']['Text'] = "Connection ouverte : "+board_name+" sur "+device+" à "+str(speed)+" baud" tempo (0.1) # Déclaration des entrées - sorties for pin in pins: print (pin) # if # bp_ext_pin = board_io('d:'+str(es_dict['bp_ext'])+':i') # Bouton poussoir coté rue # bp_int_pin = board_io('d:'+str(es_dict['bp_int'])+':i') # Bouton poussoir coté cour # fdc_o_pin = board_io('d:'+str(es_dict['fdc_o'])+':i') # Capteur fin de course portail ouvert # fdc_f_pin = board_io('d:'+str(es_dict['fdc_f'])+':i') # Capteur fin de course portail fermé # ir_recept_pin = board_io('d:'+str(es_dict['ir_recept'])+':i') # Recepteur pour le capteur barrage IR # gyr_pin = board_io('d:'+str(es_dict['gyr'])+':o') # Gyrophare # mot_o_pin = board_io('d:'+str(es_dict['mot_o'])+':o') # Ouvrir le portail (moteur sens trigo) # mot_f_pin = board_io('d:'+str(es_dict['mot_f'])+':o') # Fermer le portail (moteur sens horaire # ir_emett_pin = board_io('d:'+str(es_dict['ir_emett'])+':o') # Emetteur pour le capteur barrage IR return True # def board_io(da,pin,io): # if pin_def is not None: # return board.get_pin(da+':'+pin_def) # else: # print ("Définition entrée-sortie non trouvée : "+pin_def) ## # Fermeture de la communication série ## def jumeau_close(): global board # twins_serial.close() # Fermer proprement le port série board.exit() # Fermer proprement la communication avec la carte scene.objects['System']['twins'] = False scene.objects['Twins-text']['Text'] = "Connection fermée" # Configuration du port # FIXME def jumeau_config(port, speed): # global board pass # global twins_serial # if scene.objects['System']['twins']: # serial_msg1 = "CF\n" # twins_serial.write(serial_msg1.encode()) # tempo (1) # serial_msg2 = str(speed)+"\n" # twins_serial.write(serial_msg2.encode()) # tempo (1) # serial_msg3 = str(temps_avancer)+"\n" # twins_serial.write(serial_msg3.encode()) # tempo (1) # serial_msg4 = str(temps_tourner)+"\n" # twins_serial.write(serial_msg4.encode()) # tempo (1) # serial_msg5 = "FC\n" # twins_serial.write(serial_msg5.encode()) ## # Envoi d'un message vers la communication série ## # def serie_msg(text): # global twins_serial # text2= text+"\n" # scene.objects['Twins-text']['Text'] = "Communication : envoi message : "+text # twins_serial.write(text2.encode()) ## # Mise en écoute de jumeau numérique (figeage de la scène) ## # def twins_listen(cont): # global twins_serial # if scene.objects['System']['twins']: # if scene.objects['System']['twins_readline'] != "": # scene.objects['Twins-text']['Text'] = "Écoute de la connection figeage de la scène.... Message reçu : "+scene.objects['System']['twins_readline'] # else: # scene.objects['Twins-text']['Text'] = "Écoute de la connection figeage de la scène..." # if cont.sensors['Property'].positive: # if scene.objects['System']['twins_listen'] : # serial_msg = twins_serial.readline() # if serial_msg is not None: # scene.objects['System']['twins_readline'] = str(serial_msg) # # scene.objects['Twins-text']['Text'] = "Message reçu : "+str(serial_msg) # scene.objects['System']['twins_listen'] = False ## # Réception d'un message de la communication série ## # def serie_rcpt(): # # scene.objects['Twins-text']['Text'] = "Écoute de la \nconnection\n figeage de \n la scène" # scene.objects['System']['twins_readline'] = "" # scene.objects['System']['twins_listen'] = True # while scene.objects['System']['twins_readline'] == "": # if scene.objects['System']['twins_readline'] != "": # break # # scene.objects['Twins-text']['Text'] = "Connection\nouverte :\n"+scene.objects['System']['twins_port']+"\n"+str(scene.objects['System']['twins_speed'])+" baud" # return scene.objects['System']['twins_readline']