import bge # Bibliothèque Blender Game Engine (UPBGE) from twin_threading import * # Multithreading import time import serial # Liaison série import pyfirmata # Protocole Firmata from serial.tools.list_ports import comports # Détection du port automatique ############################################################################### # porcou_lib.py # @title: Bibliothèque utilisateur du portail coulissant # @project: Blender-EduTech # @lang: fr # @authors: Philippe Roy # @copyright: Copyright (C) 2020-2022 Philippe Roy # @license: GNU GPL ############################################################################### scene = bge.logic.getCurrentScene() # Carte du jumeau numérique board = None board_it = None # Iterator (input) # Brochage du jumeau numérique bp_int_pin = None bp_ext_pin = None fdc_o_pin = None fdc_f_pin = None ir_emett_pin = None ir_recept_pin = None mot_o_pin = None mot_f_pin = None gyr_pin = None # UPBGE constants JUST_ACTIVATED = bge.logic.KX_INPUT_JUST_ACTIVATED JUST_RELEASED = bge.logic.KX_INPUT_JUST_RELEASED ACTIVATE = bge.logic.KX_INPUT_ACTIVE # JUST_DEACTIVATED = bge.logic.KX_SENSOR_JUST_DEACTIVATED ############################################################################### # Gyrophare ############################################################################### # Ordre pour allumer le gyrophare def gyr (order): # global gyr_pin scene.objects['Led']['activated']=order # if scene.objects['System']['twins'] : # if ordre : # gyr_pin.write(1) # else: # gyr_pin.write(0) ############################################################################### # Actionneurs ############################################################################### # Ordre pour le moteur phase ouvrir def mot_o (order): scene.objects['Moteur']['open']=order # Ordre pour le moteur phase fermer def mot_f (order): scene.objects['Moteur']['close']=order # Ordre pour le capteur barrage IR def ir_emet(order): scene.objects['Emetteur IR']['active']=order ############################################################################### # Capteurs ############################################################################### # Compte-rendu du capteur fin de course portail ouvert def fdc_o (): return scene.objects['Microrupteur fdc ouvert']['activated'] # Compte-rendu du capteur fin de course portail ouvert def fdc_f (): return scene.objects['Microrupteur fdc ferme']['activated'] # Compte-rendu du capteur barrage IR def ir_recep (): return scene.objects['Recepteur IR']['activated'] ############################################################################### # Boutons poussoirs ############################################################################### # Compte-rendu du bouton pousssoir coté rue def bp_ext (): return scene.objects['Bp cote rue']['activated'] # Compte-rendu du bouton pousssoir coté cour def bp_int (): return scene.objects['Bp cote cour']['activated'] ############################################################################### # Temporisation ############################################################################### def tempo (duree): time.sleep(duree) ############################################################################### # Jumeau numérique ############################################################################### ## # Recherche automatique du port ## def serial_autoget_port(): # USB Vendor ID, USB Product ID board_dict={'microbit' :[3368, 516], 'uno' :[9025, 67], 'mega' :[9025, 66]} for com in comports(): # Arduino Uno if com.vid == board_dict["uno"][0] and com.pid == board_dict["uno"][1]: return [com.device,"Arduino Uno"] for com in comports(): # Arduino Mega if com.vid == board_dict["mega"][0] and com.pid == board_dict["mega"][1]: return [com.device,"Arduino Mega"] return [None,""] ## # Création de l'objet carte (protocole Firmata) ## def board_init(port): try: return pyfirmata.Arduino(port) except: return None ## # Création de l'objet serial (communication série) ## def serial_init(port,speed): try: return serial.Serial(port,speed) except: return None ## # Affiche la liste des cartes (communication série) ## def serial_devices(): for com in comports(): print ("Name : "+str(com.name)+"\n" +" Device : "+str(com.device)+"\n" +" Hardware ID : "+str(com.hwid)+"\n" +" USB Vendor ID : "+str(com.vid)+"\n" +" USB Product ID : "+str(com.pid)+"\n" +" USB device location : "+str(com.location)+"\n" +" USB manufacturer : "+str(com.manufacturer)+"\n" +" USB product : "+str(com.product)+"\n" +" Interface-specific : "+str(com.interface)) ## # Activation de la communication avec la carte de communication (Arduino, Micro:bit) # Vitesse : 115200 -> 7 fps, 38400 -> 6 fps, 9600 -> 2 fps # pyserial : baudrate=115200 # pyfirmata : baudrate=57600 ## def jumeau(pins): global board # global gyr_pin # UI : étape 1 scene.objects['Twins-icon'].setVisible(True,True) scene.objects['Twins-text']['Text'] = "Connection en cours ..." scene.objects['Twins-text'].setVisible(True,False) # Mise en place de la carte speed = 57600 [device,board_name] =serial_autoget_port() # Recherche automatique du port if device is None: scene.objects['System']['twins'] = False scene.objects['Twins-text']['Text'] = "Aucune connection disponible : jumeau réel débranché." return False board = board_init(device) if board is None: scene.objects['System']['twins'] = False scene.objects['Twins-text']['Text'] = "Aucune connection disponible : port "+device+" pas prêt" return False scene.objects['System']['twins'] = True # scene.objects['System']['twins_close'] = False scene.objects['System']['twins_port'] = device scene.objects['System']['twins_speed'] = speed # scene.objects['System']['twins_readline'] = "" board_it = pyfirmata.util.Iterator(board) # Itérateur pour les entrées board_it.start() # UI : étape 2 if board =="": scene.objects['Twins-text']['Text'] = "Connection ouverte : "+device+" - "+str(speed)+" baud." else: scene.objects['Twins-text']['Text'] = "Connection ouverte : "+board_name+" sur "+device+" à "+str(speed)+" baud." tempo (0.1) # Déclaration des entrées - sorties for pin in pins: print (pin) # if # bp_ext_pin = board_io('d:'+str(es_dict['bp_ext'])+':i') # Bouton poussoir coté rue # bp_int_pin = board_io('d:'+str(es_dict['bp_int'])+':i') # Bouton poussoir coté cour # fdc_o_pin = board_io('d:'+str(es_dict['fdc_o'])+':i') # Capteur fin de course portail ouvert # fdc_f_pin = board_io('d:'+str(es_dict['fdc_f'])+':i') # Capteur fin de course portail fermé # ir_recept_pin = board_io('d:'+str(es_dict['ir_recept'])+':i') # Recepteur pour le capteur barrage IR # gyr_pin = board_io('d:'+str(es_dict['gyr'])+':o') # Gyrophare # mot_o_pin = board_io('d:'+str(es_dict['mot_o'])+':o') # Ouvrir le portail (moteur sens trigo) # mot_f_pin = board_io('d:'+str(es_dict['mot_f'])+':o') # Fermer le portail (moteur sens horaire # ir_emett_pin = board_io('d:'+str(es_dict['ir_emett'])+':o') # Emetteur pour le capteur barrage IR return True # def board_io(da,pin,io): # if pin_def is not None: # return board.get_pin(da+':'+pin_def) # else: # print ("Définition entrée-sortie non trouvée : "+pin_def) ## # Fermeture de la communication série ## def jumeau_close(): global board # twins_serial.close() # Fermer proprement le port série board.exit() # Fermer proprement la communication avec la carte scene.objects['System']['twins'] = False scene.objects['Twins-text']['Text'] = "Connection fermée." # Configuration du port # FIXME def jumeau_config(port, speed): # global board pass # global twins_serial # if scene.objects['System']['twins']: # serial_msg1 = "CF\n" # twins_serial.write(serial_msg1.encode()) # tempo (1) # serial_msg2 = str(speed)+"\n" # twins_serial.write(serial_msg2.encode()) # tempo (1) # serial_msg3 = str(temps_avancer)+"\n" # twins_serial.write(serial_msg3.encode()) # tempo (1) # serial_msg4 = str(temps_tourner)+"\n" # twins_serial.write(serial_msg4.encode()) # tempo (1) # serial_msg5 = "FC\n" # twins_serial.write(serial_msg5.encode()) ## # Envoi d'un message vers la communication série ## # def serie_msg(text): # global twins_serial # text2= text+"\n" # scene.objects['Twins-text']['Text'] = "Communication : envoi message : "+text # twins_serial.write(text2.encode()) ## # Mise en écoute de jumeau numérique (figeage de la scène) ## # def twins_listen(cont): # global twins_serial # if scene.objects['System']['twins']: # if scene.objects['System']['twins_readline'] != "": # scene.objects['Twins-text']['Text'] = "Écoute de la connection figeage de la scène.... Message reçu : "+scene.objects['System']['twins_readline'] # else: # scene.objects['Twins-text']['Text'] = "Écoute de la connection figeage de la scène..." # if cont.sensors['Property'].positive: # if scene.objects['System']['twins_listen'] : # serial_msg = twins_serial.readline() # if serial_msg is not None: # scene.objects['System']['twins_readline'] = str(serial_msg) # # scene.objects['Twins-text']['Text'] = "Message reçu : "+str(serial_msg) # scene.objects['System']['twins_listen'] = False ## # Réception d'un message de la communication série ## # def serie_rcpt(): # # scene.objects['Twins-text']['Text'] = "Écoute de la \nconnection\n figeage de \n la scène" # scene.objects['System']['twins_readline'] = "" # scene.objects['System']['twins_listen'] = True # while scene.objects['System']['twins_readline'] == "": # if scene.objects['System']['twins_readline'] != "": # break # # scene.objects['Twins-text']['Text'] = "Connection\nouverte :\n"+scene.objects['System']['twins_port']+"\n"+str(scene.objects['System']['twins_speed'])+" baud" # return scene.objects['System']['twins_readline']