407 lines
13 KiB
Python
Raw Normal View History

import bge # Bibliothèque Blender Game Engine (UPBGE)
import threading # Multithreading
import trace
import sys
import time
import serial # Liaison série
import pyfirmata # Protocole Firmata
from serial.tools.list_ports import comports # Détection du port automatique
###############################################################################
# volrou_lib.py
# @title: Bibliothèque utilisateur du volet roulant
# @project: Blender-EduTech
# @lang: fr
# @authors: Philippe Roy <philippe.roy@ac-grenoble.fr>
# @copyright: Copyright (C) 2022 Philippe Roy
# @license: GNU GPL
###############################################################################
scene = bge.logic.getCurrentScene()
# Threads
threads_cmd=[]
debug_thread = scene.objects['System']['debug_thread']
# Jumeau numérique
board = None
board_it = None # Iterator (input)
bp_int_pin = None
bp_ext_pin = None
fdc_o_pin = None
fdc_f_pin = None
ir_emett_pin = None
ir_recept_pin = None
mot_o_pin = None
mot_f_pin = None
gyr_pin = None
# UPBGE constants
JUST_ACTIVATED = bge.logic.KX_INPUT_JUST_ACTIVATED
JUST_RELEASED = bge.logic.KX_INPUT_JUST_RELEASED
ACTIVATE = bge.logic.KX_INPUT_ACTIVE
# JUST_DEACTIVATED = bge.logic.KX_SENSOR_JUST_DEACTIVATED
###############################################################################
# Méthode kill pour les tâches (threads)
###############################################################################
class thread_with_trace(threading.Thread):
def __init__(self, *args, **keywords):
threading.Thread.__init__(self, *args, **keywords)
self.killed = False
def start(self):
self.__run_backup = self.run
self.run = self.__run
threading.Thread.start(self)
def __run(self):
sys.settrace(self.globaltrace)
self.__run_backup()
self.run = self.__run_backup
def globaltrace(self, frame, event, arg):
if event == 'call':
return self.localtrace
else:
return None
def localtrace(self, frame, event, arg):
if self.killed:
if event == 'line':
raise SystemExit()
return self.localtrace
def kill(self):
self.killed = True
###############################################################################
# Start et stop des tâches (threads)
###############################################################################
def thread_start(threads, type_txt, fct):
threads.append(thread_with_trace(target = fct))
threads[len(threads)-1].start()
if (debug_thread):
print ("Thread",type_txt, "#", len(threads)-1, "open.")
def thread_stop(threads, type_txt):
i=0
zombie_flag=False
for t in threads:
if not t.is_alive():
if (debug_thread):
print ("Thread",type_txt, "#",i,"closed.")
else:
if (debug_thread):
print ("Thread",type_txt, "#",i,"still open ...")
t.kill()
t.join()
if not t.is_alive():
if (debug_thread):
print ("Thread",type_txt, "#",i,"killed.")
else:
if (debug_thread):
print ("Thread",type_txt, "#",i,"zombie...")
zombie_flag=True
i +=1
if zombie_flag==False:
if (debug_thread):
print ("All threads",type_txt, "are closed.")
scene.objects['System']['thread_cmd']=False
return True
else:
if (debug_thread):
print ("There are zombies threads",type_txt, ".")
return False
def thread_cmd_start(fct):
thread_start(threads_cmd, "commands", fct)
def thread_cmd_stop():
thread_stop(threads_cmd, "commands")
def end():
# Jumeau numérique
if scene.objects['System']['twins']:
# serial_msg = "FI\n"
# twins_serial.write(serial_msg.encode()) # Communication série : modele 3d -> carte communication ( arduino | micro:bit )
jumeau_close()
# Thread
if (debug_thread):
print ("Thread commands is arrived.")
time.sleep(0.125)
scene.objects['System']['thread_cmd']=False
time.sleep(0.125)
def fin():
end()
def quit():
end()
###############################################################################
# Actionneurs
###############################################################################
# Ordres utilisateur du clignotant
def gyr (ordre):
global gyr_pin
scene.objects['Module led']['actif']=ordre
if scene.objects['System']['twins'] :
if ordre :
gyr_pin.write(1)
else:
gyr_pin.write(0)
# Ordres utilisateur du moteur
def mot_o (ordre):
scene.objects['Ensemble moteur']['actif_ouvrir']=ordre
def mot_f (ordre):
scene.objects['Ensemble moteur']['actif_fermer']=ordre
# Ordre utilisateur du capteur barrage IR
def ir_emet(ordre):
scene.objects['Module emetteur IR']['actif']=ordre
###############################################################################
# Capteurs
###############################################################################
# Compte-rendu utilisateur du capteur fin de course portail ouvert
def fdc_o ():
return scene.objects['Capteur fdc ouvert']['actif']
# Compte-rendu utilisateur du capteur fin de course portail ouvert
def fdc_f ():
return scene.objects['Capteur fdc ferme']['actif']
# Compte-rendu utilisateur du capteur barrage IR
def ir_recep ():
if scene.objects['Module recepteur IR']['actif'] :
return False
else:
return True
###############################################################################
# Boutons poussoirs
###############################################################################
# Compte-rendu utilisateur du bouton pousssoir coté rue
def bp_ext ():
return scene.objects['Module bouton cote rue']['actif']
# Compte-rendu utilisateur du bouton pousssoir coté cour
def bp_int ():
return scene.objects['Module bouton cote cour']['actif']
###############################################################################
# Temporisation
###############################################################################
def tempo (duree):
time.sleep(duree)
###############################################################################
# Jumeau numérique
###############################################################################
##
# Recherche automatique du port
##
def serial_autoget_port():
# USB Vendor ID, USB Product ID
board_dict={'microbit' :[3368, 516],
'uno' :[9025, 67],
'mega' :[9025, 66]}
for com in comports(): # Arduino Uno
if com.vid == board_dict["uno"][0] and com.pid == board_dict["uno"][1]:
return [com.device,"Arduino Uno"]
for com in comports(): # Arduino Mega
if com.vid == board_dict["mega"][0] and com.pid == board_dict["mega"][1]:
return [com.device,"Arduino Mega"]
return [None,""]
##
# Création de l'objet carte (protocole Firmata)
##
def board_init(port):
try:
return pyfirmata.Arduino(port)
except:
return None
##
# Création de l'objet serial (communication série)
##
def serial_init(port,speed):
try:
return serial.Serial(port,speed)
except:
return None
##
# Affiche la liste des cartes (communication série)
##
def serial_devices():
for com in comports():
print ("Name : "+str(com.name)+"\n"
+" Device : "+str(com.device)+"\n"
+" Hardware ID : "+str(com.hwid)+"\n"
+" USB Vendor ID : "+str(com.vid)+"\n"
+" USB Product ID : "+str(com.pid)+"\n"
+" USB device location : "+str(com.location)+"\n"
+" USB manufacturer : "+str(com.manufacturer)+"\n"
+" USB product : "+str(com.product)+"\n"
+" Interface-specific : "+str(com.interface))
##
# Activation de la communication avec la carte de communication (Arduino, Micro:bit)
# Vitesse : 115200 -> 7 fps, 38400 -> 6 fps, 9600 -> 2 fps
# pyserial : baudrate=115200
# pyfirmata : baudrate=57600
##
def jumeau(pins):
global board
# global gyr_pin
# UI : étape 1
scene.objects['Twins-icon'].setVisible(True,True)
scene.objects['Twins-text']['Text'] = "Connection en cours ..."
scene.objects['Twins-text'].setVisible(True,False)
# Mise en place de la carte
speed = 57600
[device,board_name] =serial_autoget_port() # Recherche automatique du port
if device is None:
scene.objects['System']['twins'] = False
scene.objects['Twins-text']['Text'] = "Aucune connection disponible : jumeau réel débranché."
return False
board = board_init(device)
if board is None:
scene.objects['System']['twins'] = False
scene.objects['Twins-text']['Text'] = "Aucune connection disponible : port "+device+" pas prêt"
return False
scene.objects['System']['twins'] = True
# scene.objects['System']['twins_close'] = False
scene.objects['System']['twins_port'] = device
scene.objects['System']['twins_speed'] = speed
# scene.objects['System']['twins_readline'] = ""
board_it = pyfirmata.util.Iterator(board) # Itérateur pour les entrées
board_it.start()
# UI : étape 2
if board =="":
scene.objects['Twins-text']['Text'] = "Connection ouverte : "+device+" - "+str(speed)+" baud"
else:
scene.objects['Twins-text']['Text'] = "Connection ouverte : "+board_name+" sur "+device+" à "+str(speed)+" baud"
tempo (0.1)
# Déclaration des entrées - sorties
for pin in pins:
print (pin)
# if
# bp_ext_pin = board_io('d:'+str(es_dict['bp_ext'])+':i') # Bouton poussoir coté rue
# bp_int_pin = board_io('d:'+str(es_dict['bp_int'])+':i') # Bouton poussoir coté cour
# fdc_o_pin = board_io('d:'+str(es_dict['fdc_o'])+':i') # Capteur fin de course portail ouvert
# fdc_f_pin = board_io('d:'+str(es_dict['fdc_f'])+':i') # Capteur fin de course portail fermé
# ir_recept_pin = board_io('d:'+str(es_dict['ir_recept'])+':i') # Recepteur pour le capteur barrage IR
# gyr_pin = board_io('d:'+str(es_dict['gyr'])+':o') # Gyrophare
# mot_o_pin = board_io('d:'+str(es_dict['mot_o'])+':o') # Ouvrir le portail (moteur sens trigo)
# mot_f_pin = board_io('d:'+str(es_dict['mot_f'])+':o') # Fermer le portail (moteur sens horaire
# ir_emett_pin = board_io('d:'+str(es_dict['ir_emett'])+':o') # Emetteur pour le capteur barrage IR
return True
# def board_io(da,pin,io):
# if pin_def is not None:
# return board.get_pin(da+':'+pin_def)
# else:
# print ("Définition entrée-sortie non trouvée : "+pin_def)
##
# Fermeture de la communication série
##
def jumeau_close():
global board
# twins_serial.close() # Fermer proprement le port série
board.exit() # Fermer proprement la communication avec la carte
scene.objects['System']['twins'] = False
scene.objects['Twins-text']['Text'] = "Connection fermée"
# Configuration du port
# FIXME
def jumeau_config(port, speed):
# global board
pass
# global twins_serial
# if scene.objects['System']['twins']:
# serial_msg1 = "CF\n"
# twins_serial.write(serial_msg1.encode())
# tempo (1)
# serial_msg2 = str(speed)+"\n"
# twins_serial.write(serial_msg2.encode())
# tempo (1)
# serial_msg3 = str(temps_avancer)+"\n"
# twins_serial.write(serial_msg3.encode())
# tempo (1)
# serial_msg4 = str(temps_tourner)+"\n"
# twins_serial.write(serial_msg4.encode())
# tempo (1)
# serial_msg5 = "FC\n"
# twins_serial.write(serial_msg5.encode())
##
# Envoi d'un message vers la communication série
##
# def serie_msg(text):
# global twins_serial
# text2= text+"\n"
# scene.objects['Twins-text']['Text'] = "Communication : envoi message : "+text
# twins_serial.write(text2.encode())
##
# Mise en écoute de jumeau numérique (figeage de la scène)
##
# def twins_listen(cont):
# global twins_serial
# if scene.objects['System']['twins']:
# if scene.objects['System']['twins_readline'] != "":
# scene.objects['Twins-text']['Text'] = "Écoute de la connection figeage de la scène.... Message reçu : "+scene.objects['System']['twins_readline']
# else:
# scene.objects['Twins-text']['Text'] = "Écoute de la connection figeage de la scène..."
# if cont.sensors['Property'].positive:
# if scene.objects['System']['twins_listen'] :
# serial_msg = twins_serial.readline()
# if serial_msg is not None:
# scene.objects['System']['twins_readline'] = str(serial_msg)
# # scene.objects['Twins-text']['Text'] = "Message reçu : "+str(serial_msg)
# scene.objects['System']['twins_listen'] = False
##
# Réception d'un message de la communication série
##
# def serie_rcpt():
# # scene.objects['Twins-text']['Text'] = "Écoute de la \nconnection\n figeage de \n la scène"
# scene.objects['System']['twins_readline'] = ""
# scene.objects['System']['twins_listen'] = True
# while scene.objects['System']['twins_readline'] == "":
# if scene.objects['System']['twins_readline'] != "":
# break
# # scene.objects['Twins-text']['Text'] = "Connection\nouverte :\n"+scene.objects['System']['twins_port']+"\n"+str(scene.objects['System']['twins_speed'])+" baud"
# return scene.objects['System']['twins_readline']