jumeaux-numeriques/portail_coulissant/porcou_lib.py

379 lines
12 KiB
Python
Raw Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import bge # Bibliothèque Blender Game Engine (UPBGE)
import threading # Multithreading
import trace
import sys
import time
import serial # Liaison série
import pyfirmata # Protocole Firmata
from serial.tools.list_ports import comports # Détection du port automatique
###############################################################################
# porcou_lib.py
# @title: Bibliothèque utilisateur du portail coulissant (pc_*)
# @project: Blender-EduTech
# @lang: fr
# @authors: Philippe Roy <philippe.roy@ac-grenoble.fr>
# @copyright: Copyright (C) 2020-2022 Philippe Roy
# @license: GNU GPL
###############################################################################
scene = bge.logic.getCurrentScene()
# Threads
threads_cmd=[]
debug_thread = scene.objects['System']['debug_thread']
# Jumeau numérique
# twins_serial = None
board = None
board_it = None # Iterator (input)
# board_io = None # Input - Ouput
gyr_pin = None
# UPBGE constants
JUST_ACTIVATED = bge.logic.KX_INPUT_JUST_ACTIVATED
JUST_RELEASED = bge.logic.KX_INPUT_JUST_RELEASED
ACTIVATE = bge.logic.KX_INPUT_ACTIVE
# JUST_DEACTIVATED = bge.logic.KX_SENSOR_JUST_DEACTIVATED
###############################################################################
# Méthode kill pour les tâches (threads)
###############################################################################
class thread_with_trace(threading.Thread):
def __init__(self, *args, **keywords):
threading.Thread.__init__(self, *args, **keywords)
self.killed = False
def start(self):
self.__run_backup = self.run
self.run = self.__run
threading.Thread.start(self)
def __run(self):
sys.settrace(self.globaltrace)
self.__run_backup()
self.run = self.__run_backup
def globaltrace(self, frame, event, arg):
if event == 'call':
return self.localtrace
else:
return None
def localtrace(self, frame, event, arg):
if self.killed:
if event == 'line':
raise SystemExit()
return self.localtrace
def kill(self):
self.killed = True
###############################################################################
# Start et stop des tâches (threads)
###############################################################################
def thread_start(threads, type_txt, fct):
threads.append(thread_with_trace(target = fct))
threads[len(threads)-1].start()
if (debug_thread):
print ("Thread",type_txt, "#", len(threads)-1, "open.")
def thread_stop(threads, type_txt):
i=0
zombie_flag=False
for t in threads:
if not t.is_alive():
if (debug_thread):
print ("Thread",type_txt, "#",i,"closed.")
else:
if (debug_thread):
print ("Thread",type_txt, "#",i,"still open ...")
t.kill()
t.join()
if not t.is_alive():
if (debug_thread):
print ("Thread",type_txt, "#",i,"killed.")
else:
if (debug_thread):
print ("Thread",type_txt, "#",i,"zombie...")
zombie_flag=True
i +=1
if zombie_flag==False:
if (debug_thread):
print ("All threads",type_txt, "are closed.")
scene.objects['System']['thread_cmd']=False
return True
else:
if (debug_thread):
print ("There are zombies threads",type_txt, ".")
return False
def thread_cmd_start(fct):
thread_start(threads_cmd, "commands", fct)
def thread_cmd_stop():
thread_stop(threads_cmd, "commands")
def end():
# Jumeau numérique
if scene.objects['System']['twins']:
# serial_msg = "FI\n"
# twins_serial.write(serial_msg.encode()) # Communication série : modele 3d -> carte communication ( arduino | micro:bit )
jumeau_close()
# Thread
if (debug_thread):
print ("Thread commands is arrived.")
time.sleep(0.125)
scene.objects['System']['thread_cmd']=False
time.sleep(0.125)
def fin():
end()
def quit():
end()
###############################################################################
# Actionneurs
###############################################################################
# Ordres utilisateur du clignotant
def gyr (ordre):
global gyr_pin
scene.objects['Module led']['actif']=ordre
if scene.objects['System']['twins'] :
if ordre :
gyr_pin.write(1)
else:
gyr_pin.write(0)
# Ordres utilisateur du moteur
def mot_o (ordre):
scene.objects['Ensemble moteur']['actif_ouvrir']=ordre
def mot_f (ordre):
scene.objects['Ensemble moteur']['actif_fermer']=ordre
# Ordre utilisateur du capteur barrage IR
def ir_emet(ordre):
scene.objects['Module emetteur IR']['actif']=ordre
###############################################################################
# Capteurs
###############################################################################
# Compte-rendu utilisateur du capteur fin de course portail ouvert
def fdc_o ():
return scene.objects['Capteur fdc ouvert']['actif']
# Compte-rendu utilisateur du capteur fin de course portail ouvert
def fdc_f ():
return scene.objects['Capteur fdc ferme']['actif']
# Compte-rendu utilisateur du capteur barrage IR
def ir_recep ():
if scene.objects['Module recepteur IR']['actif'] :
return False
else:
return True
###############################################################################
# Boutons poussoirs
###############################################################################
# Compte-rendu utilisateur du bouton pousssoir coté rue
def bp_ext ():
return scene.objects['Module bouton cote rue']['actif']
# Compte-rendu utilisateur du bouton pousssoir coté cour
def bp_int ():
return scene.objects['Module bouton cote cour']['actif']
###############################################################################
# Temporisation
###############################################################################
def tempo (duree):
time.sleep(duree)
###############################################################################
# Jumeau numérique
###############################################################################
##
# Recherche automatique du port
##
def serial_autoget_port():
# USB Vendor ID, USB Product ID
board_dict={'microbit' :[3368, 516],
'uno' :[9025, 67],
'mega' :[9025, 66]}
for com in comports(): # Arduino Uno
if com.vid == board_dict["uno"][0] and com.pid == board_dict["uno"][1]:
return [com.device,"Arduino Uno"]
for com in comports(): # Arduino Mega
if com.vid == board_dict["mega"][0] and com.pid == board_dict["mega"][1]:
return [com.device,"Arduino Mega"]
return None
##
# Création de l'objet carte (protocole Firmata)
##
def board_init(port):
try:
return pyfirmata.Arduino(port)
except:
return None
##
# Création de l'objet serial (communication série)
##
def serial_init(port,speed):
try:
return serial.Serial(port,speed)
except:
return None
##
# Affiche la liste des cartes (communication série)
##
def serial_devices():
for com in comports():
print ("Name : "+str(com.name)+"\n"
+" Device : "+str(com.device)+"\n"
+" Hardware ID : "+str(com.hwid)+"\n"
+" USB Vendor ID : "+str(com.vid)+"\n"
+" USB Product ID : "+str(com.pid)+"\n"
+" USB device location : "+str(com.location)+"\n"
+" USB manufacturer : "+str(com.manufacturer)+"\n"
+" USB product : "+str(com.product)+"\n"
+" Interface-specific : "+str(com.interface))
##
# Activation de la communication avec la carte de communication (Arduino, Micro:bit)
# Vitesse : 115200 -> 7 fps, 38400 -> 6 fps, 9600 -> 2 fps
# pyserial : baudrate=115200
# pyfirmata : baudrate=57600
##
def jumeau(es_dict):
global board
global gyr_pin
# UI : étape 1
scene.objects['Twins-icon'].setVisible(True,True)
scene.objects['Twins-text']['Text'] = "Connection en cours ..."
scene.objects['Twins-text'].setVisible(True,False)
# Mise en place de la carte
speed = 57600
[device,board_name] =serial_autoget_port() # Recherche automatique du port
board = board_init(device)
if board is None:
scene.objects['System']['twins'] = False
scene.objects['Twins-text']['Text'] = "Port "+device+" pas prêt"
return False
scene.objects['System']['twins'] = True
# scene.objects['System']['twins_close'] = False
scene.objects['System']['twins_port'] = device
scene.objects['System']['twins_speed'] = speed
# scene.objects['System']['twins_readline'] = ""
board_it = pyfirmata.util.Iterator(board) # Itérateur pour les entrées
board_it.start()
# UI : étape 2
if board =="":
scene.objects['Twins-text']['Text'] = "Connection ouverte : "+device+" - "+str(speed)+" baud"
else:
scene.objects['Twins-text']['Text'] = "Connection ouverte : "+board_name+" sur "+device+" à "+str(speed)+" baud"
tempo (0.1)
# Déclaration des entrées - sorties
gyr_pin = board.get_pin('d:'+str(es_dict['gyr'])+':o') # Gyrophare
return True
##
# Fermeture de la communication série
##
def jumeau_close():
global board
# twins_serial.close() # Fermer proprement le port série
board.exit() # Fermer proprement la communication avec la carte
scene.objects['System']['twins'] = False
scene.objects['Twins-text']['Text'] = "Connection fermée"
# Configuration du port
# FIXME
def jumeau_config(port, speed):
# global board
pass
# global twins_serial
# if scene.objects['System']['twins']:
# serial_msg1 = "CF\n"
# twins_serial.write(serial_msg1.encode())
# tempo (1)
# serial_msg2 = str(speed)+"\n"
# twins_serial.write(serial_msg2.encode())
# tempo (1)
# serial_msg3 = str(temps_avancer)+"\n"
# twins_serial.write(serial_msg3.encode())
# tempo (1)
# serial_msg4 = str(temps_tourner)+"\n"
# twins_serial.write(serial_msg4.encode())
# tempo (1)
# serial_msg5 = "FC\n"
# twins_serial.write(serial_msg5.encode())
##
# Envoi d'un message vers la communication série
##
# def serie_msg(text):
# global twins_serial
# text2= text+"\n"
# scene.objects['Twins-text']['Text'] = "Communication : envoi message : "+text
# twins_serial.write(text2.encode())
##
# Mise en écoute de jumeau numérique (figeage de la scène)
##
# def twins_listen(cont):
# global twins_serial
# if scene.objects['System']['twins']:
# if scene.objects['System']['twins_readline'] != "":
# scene.objects['Twins-text']['Text'] = "Écoute de la connection figeage de la scène.... Message reçu : "+scene.objects['System']['twins_readline']
# else:
# scene.objects['Twins-text']['Text'] = "Écoute de la connection figeage de la scène..."
# if cont.sensors['Property'].positive:
# if scene.objects['System']['twins_listen'] :
# serial_msg = twins_serial.readline()
# if serial_msg is not None:
# scene.objects['System']['twins_readline'] = str(serial_msg)
# # scene.objects['Twins-text']['Text'] = "Message reçu : "+str(serial_msg)
# scene.objects['System']['twins_listen'] = False
##
# Réception d'un message de la communication série
##
# def serie_rcpt():
# # scene.objects['Twins-text']['Text'] = "Écoute de la \nconnection\n figeage de \n la scène"
# scene.objects['System']['twins_readline'] = ""
# scene.objects['System']['twins_listen'] = True
# while scene.objects['System']['twins_readline'] == "":
# if scene.objects['System']['twins_readline'] != "":
# break
# # scene.objects['Twins-text']['Text'] = "Connection\nouverte :\n"+scene.objects['System']['twins_port']+"\n"+str(scene.objects['System']['twins_speed'])+" baud"
# return scene.objects['System']['twins_readline']