mirror of
https://forge.apps.education.fr/blender-edutech/jumeaux-numeriques.git
synced 2024-01-27 06:56:18 +01:00
Gestion de la liaison série factorisée
This commit is contained in:
parent
48336b248c
commit
9db25dc27f
@ -1,9 +1,7 @@
|
|||||||
import bge # Bibliothèque Blender Game Engine (UPBGE)
|
import bge # Bibliothèque Blender Game Engine (UPBGE)
|
||||||
from twin_threading import * # Multithreading
|
from twin_threading import thread_cmd_start, thread_cmd_stop # Multithreading
|
||||||
|
from twin_serial import jumeau # Liaison série
|
||||||
import serial # Liaison série
|
import time
|
||||||
import pyfirmata # Protocole Firmata
|
|
||||||
from serial.tools.list_ports import comports # Détection du port automatique
|
|
||||||
|
|
||||||
###############################################################################
|
###############################################################################
|
||||||
# montchg_lib.py
|
# montchg_lib.py
|
||||||
@ -84,199 +82,3 @@ def ba_1 ():
|
|||||||
|
|
||||||
def tempo (duree):
|
def tempo (duree):
|
||||||
time.sleep(duree)
|
time.sleep(duree)
|
||||||
|
|
||||||
###############################################################################
|
|
||||||
# Jumeau numérique
|
|
||||||
###############################################################################
|
|
||||||
|
|
||||||
##
|
|
||||||
# Recherche automatique du port
|
|
||||||
##
|
|
||||||
|
|
||||||
def serial_autoget_port():
|
|
||||||
# USB Vendor ID, USB Product ID
|
|
||||||
board_dict={'microbit' :[3368, 516],
|
|
||||||
'uno' :[9025, 67],
|
|
||||||
'mega' :[9025, 66]}
|
|
||||||
for com in comports(): # Arduino Uno
|
|
||||||
if com.vid == board_dict["uno"][0] and com.pid == board_dict["uno"][1]:
|
|
||||||
return [com.device,"Arduino Uno"]
|
|
||||||
for com in comports(): # Arduino Mega
|
|
||||||
if com.vid == board_dict["mega"][0] and com.pid == board_dict["mega"][1]:
|
|
||||||
return [com.device,"Arduino Mega"]
|
|
||||||
return [None,""]
|
|
||||||
|
|
||||||
##
|
|
||||||
# Création de l'objet carte (protocole Firmata)
|
|
||||||
##
|
|
||||||
|
|
||||||
def board_init(port):
|
|
||||||
try:
|
|
||||||
return pyfirmata.Arduino(port)
|
|
||||||
except:
|
|
||||||
return None
|
|
||||||
|
|
||||||
##
|
|
||||||
# Création de l'objet serial (communication série)
|
|
||||||
##
|
|
||||||
|
|
||||||
def serial_init(port,speed):
|
|
||||||
try:
|
|
||||||
return serial.Serial(port,speed)
|
|
||||||
except:
|
|
||||||
return None
|
|
||||||
|
|
||||||
##
|
|
||||||
# Affiche la liste des cartes (communication série)
|
|
||||||
##
|
|
||||||
|
|
||||||
def serial_devices():
|
|
||||||
for com in comports():
|
|
||||||
print ("Name : "+str(com.name)+"\n"
|
|
||||||
+" Device : "+str(com.device)+"\n"
|
|
||||||
+" Hardware ID : "+str(com.hwid)+"\n"
|
|
||||||
+" USB Vendor ID : "+str(com.vid)+"\n"
|
|
||||||
+" USB Product ID : "+str(com.pid)+"\n"
|
|
||||||
+" USB device location : "+str(com.location)+"\n"
|
|
||||||
+" USB manufacturer : "+str(com.manufacturer)+"\n"
|
|
||||||
+" USB product : "+str(com.product)+"\n"
|
|
||||||
+" Interface-specific : "+str(com.interface))
|
|
||||||
|
|
||||||
##
|
|
||||||
# Activation de la communication avec la carte de communication (Arduino, Micro:bit)
|
|
||||||
# Vitesse : 115200 -> 7 fps, 38400 -> 6 fps, 9600 -> 2 fps
|
|
||||||
# pyserial : baudrate=115200
|
|
||||||
# pyfirmata : baudrate=57600
|
|
||||||
##
|
|
||||||
|
|
||||||
def jumeau(pins):
|
|
||||||
global board
|
|
||||||
# global gyr_pin
|
|
||||||
|
|
||||||
# UI : étape 1
|
|
||||||
scene.objects['Twins-text']['Text'] = "Connection en cours ..."
|
|
||||||
|
|
||||||
# Mise en place de la carte
|
|
||||||
speed = 57600
|
|
||||||
[device,board_name] =serial_autoget_port() # Recherche automatique du port
|
|
||||||
if device is None:
|
|
||||||
scene.objects['System']['twins'] = False
|
|
||||||
scene.objects['Twins-text']['Text'] = "Aucune connection disponible : jumeau réel débranché."
|
|
||||||
return False
|
|
||||||
board = board_init(device)
|
|
||||||
if board is None:
|
|
||||||
scene.objects['System']['twins'] = False
|
|
||||||
scene.objects['Twins-text']['Text'] = "Aucune connection disponible : port "+device+" pas prêt."
|
|
||||||
return False
|
|
||||||
scene.objects['System']['twins'] = True
|
|
||||||
# scene.objects['System']['twins_close'] = False
|
|
||||||
scene.objects['System']['twins_port'] = device
|
|
||||||
scene.objects['System']['twins_speed'] = speed
|
|
||||||
# scene.objects['System']['twins_readline'] = ""
|
|
||||||
board_it = pyfirmata.util.Iterator(board) # Itérateur pour les entrées
|
|
||||||
board_it.start()
|
|
||||||
|
|
||||||
# UI : étape 2
|
|
||||||
if board =="":
|
|
||||||
scene.objects['Twins-text']['Text'] = "Connection ouverte : "+device+" - "+str(speed)+" baud."
|
|
||||||
else:
|
|
||||||
scene.objects['Twins-text']['Text'] = "Connection ouverte : "+board_name+" sur "+device+" à "+str(speed)+" baud."
|
|
||||||
tempo (0.1)
|
|
||||||
|
|
||||||
# Déclaration des entrées - sorties
|
|
||||||
# for pin in pins:
|
|
||||||
# print (pin)
|
|
||||||
# if
|
|
||||||
# bp_ext_pin = board_io('d:'+str(es_dict['bp_ext'])+':i') # Bouton poussoir coté rue
|
|
||||||
# bp_int_pin = board_io('d:'+str(es_dict['bp_int'])+':i') # Bouton poussoir coté cour
|
|
||||||
# fdc_o_pin = board_io('d:'+str(es_dict['fdc_o'])+':i') # Capteur fin de course portail ouvert
|
|
||||||
# fdc_f_pin = board_io('d:'+str(es_dict['fdc_f'])+':i') # Capteur fin de course portail fermé
|
|
||||||
# ir_recept_pin = board_io('d:'+str(es_dict['ir_recept'])+':i') # Recepteur pour le capteur barrage IR
|
|
||||||
|
|
||||||
# gyr_pin = board_io('d:'+str(es_dict['gyr'])+':o') # Gyrophare
|
|
||||||
# mot_o_pin = board_io('d:'+str(es_dict['mot_o'])+':o') # Ouvrir le portail (moteur sens trigo)
|
|
||||||
# mot_f_pin = board_io('d:'+str(es_dict['mot_f'])+':o') # Fermer le portail (moteur sens horaire
|
|
||||||
# ir_emett_pin = board_io('d:'+str(es_dict['ir_emett'])+':o') # Emetteur pour le capteur barrage IR
|
|
||||||
return True
|
|
||||||
|
|
||||||
# def board_io(da,pin,io):
|
|
||||||
# if pin_def is not None:
|
|
||||||
# return board.get_pin(da+':'+pin_def)
|
|
||||||
# else:
|
|
||||||
# print ("Définition entrée-sortie non trouvée : "+pin_def)
|
|
||||||
|
|
||||||
##
|
|
||||||
# Fermeture de la communication série
|
|
||||||
##
|
|
||||||
|
|
||||||
def jumeau_close():
|
|
||||||
global board
|
|
||||||
# twins_serial.close() # Fermer proprement le port série
|
|
||||||
board.exit() # Fermer proprement la communication avec la carte
|
|
||||||
scene.objects['System']['twins'] = False
|
|
||||||
scene.objects['Twins-text']['Text'] = "Connection fermée."
|
|
||||||
|
|
||||||
# Configuration manuelle du port
|
|
||||||
# FIXME
|
|
||||||
def jumeau_config(port, speed):
|
|
||||||
pass
|
|
||||||
# global board
|
|
||||||
# global twins_serial
|
|
||||||
# if scene.objects['System']['twins']:
|
|
||||||
# serial_msg1 = "CF\n"
|
|
||||||
# twins_serial.write(serial_msg1.encode())
|
|
||||||
# tempo (1)
|
|
||||||
# serial_msg2 = str(speed)+"\n"
|
|
||||||
# twins_serial.write(serial_msg2.encode())
|
|
||||||
# tempo (1)
|
|
||||||
# serial_msg3 = str(temps_avancer)+"\n"
|
|
||||||
# twins_serial.write(serial_msg3.encode())
|
|
||||||
# tempo (1)
|
|
||||||
# serial_msg4 = str(temps_tourner)+"\n"
|
|
||||||
# twins_serial.write(serial_msg4.encode())
|
|
||||||
# tempo (1)
|
|
||||||
# serial_msg5 = "FC\n"
|
|
||||||
# twins_serial.write(serial_msg5.encode())
|
|
||||||
|
|
||||||
##
|
|
||||||
# Envoi d'un message vers la communication série
|
|
||||||
##
|
|
||||||
|
|
||||||
# def serie_msg(text):
|
|
||||||
# global twins_serial
|
|
||||||
# text2= text+"\n"
|
|
||||||
# scene.objects['Twins-text']['Text'] = "Communication : envoi message : "+text
|
|
||||||
# twins_serial.write(text2.encode())
|
|
||||||
|
|
||||||
##
|
|
||||||
# Mise en écoute de jumeau numérique (figeage de la scène)
|
|
||||||
##
|
|
||||||
|
|
||||||
# def twins_listen(cont):
|
|
||||||
# global twins_serial
|
|
||||||
# if scene.objects['System']['twins']:
|
|
||||||
# if scene.objects['System']['twins_readline'] != "":
|
|
||||||
# scene.objects['Twins-text']['Text'] = "Écoute de la connection figeage de la scène.... Message reçu : "+scene.objects['System']['twins_readline']
|
|
||||||
# else:
|
|
||||||
# scene.objects['Twins-text']['Text'] = "Écoute de la connection figeage de la scène..."
|
|
||||||
# if cont.sensors['Property'].positive:
|
|
||||||
# if scene.objects['System']['twins_listen'] :
|
|
||||||
# serial_msg = twins_serial.readline()
|
|
||||||
# if serial_msg is not None:
|
|
||||||
# scene.objects['System']['twins_readline'] = str(serial_msg)
|
|
||||||
# # scene.objects['Twins-text']['Text'] = "Message reçu : "+str(serial_msg)
|
|
||||||
# scene.objects['System']['twins_listen'] = False
|
|
||||||
|
|
||||||
##
|
|
||||||
# Réception d'un message de la communication série
|
|
||||||
##
|
|
||||||
|
|
||||||
# def serie_rcpt():
|
|
||||||
# # scene.objects['Twins-text']['Text'] = "Écoute de la \nconnection\n figeage de \n la scène"
|
|
||||||
# scene.objects['System']['twins_readline'] = ""
|
|
||||||
# scene.objects['System']['twins_listen'] = True
|
|
||||||
# while scene.objects['System']['twins_readline'] == "":
|
|
||||||
# if scene.objects['System']['twins_readline'] != "":
|
|
||||||
# break
|
|
||||||
# # scene.objects['Twins-text']['Text'] = "Connection\nouverte :\n"+scene.objects['System']['twins_port']+"\n"+str(scene.objects['System']['twins_speed'])+" baud"
|
|
||||||
# return scene.objects['System']['twins_readline']
|
|
||||||
|
1
monte_charge/twin_serial.py
Symbolic link
1
monte_charge/twin_serial.py
Symbolic link
@ -0,0 +1 @@
|
|||||||
|
/home/phroy/Bureau/seriousgames/blender-edutech/git/digital_twin/twin_serial.py
|
1
poppy_ergo_jr/twin_serial.py
Symbolic link
1
poppy_ergo_jr/twin_serial.py
Symbolic link
@ -0,0 +1 @@
|
|||||||
|
/home/phroy/Bureau/seriousgames/blender-edutech/git/digital_twin/twin_serial.py
|
@ -1,11 +1,8 @@
|
|||||||
import bge # Bibliothèque Blender Game Engine (UPBGE)
|
import bge # Bibliothèque Blender Game Engine (UPBGE)
|
||||||
from twin_threading import * # Multithreading
|
from twin_threading import thread_cmd_start, thread_cmd_stop # Multithreading
|
||||||
|
from twin_serial import jumeau # Liaison série
|
||||||
import time
|
import time
|
||||||
|
|
||||||
import serial # Liaison série
|
|
||||||
import pyfirmata # Protocole Firmata
|
|
||||||
from serial.tools.list_ports import comports # Détection du port automatique
|
|
||||||
|
|
||||||
###############################################################################
|
###############################################################################
|
||||||
# porcou_lib.py
|
# porcou_lib.py
|
||||||
# @title: Bibliothèque utilisateur du portail coulissant
|
# @title: Bibliothèque utilisateur du portail coulissant
|
||||||
@ -103,201 +100,3 @@ def bp_int ():
|
|||||||
|
|
||||||
def tempo (duree):
|
def tempo (duree):
|
||||||
time.sleep(duree)
|
time.sleep(duree)
|
||||||
|
|
||||||
###############################################################################
|
|
||||||
# Jumeau numérique
|
|
||||||
###############################################################################
|
|
||||||
|
|
||||||
##
|
|
||||||
# Recherche automatique du port
|
|
||||||
##
|
|
||||||
|
|
||||||
def serial_autoget_port():
|
|
||||||
# USB Vendor ID, USB Product ID
|
|
||||||
board_dict={'microbit' :[3368, 516],
|
|
||||||
'uno' :[9025, 67],
|
|
||||||
'mega' :[9025, 66]}
|
|
||||||
for com in comports(): # Arduino Uno
|
|
||||||
if com.vid == board_dict["uno"][0] and com.pid == board_dict["uno"][1]:
|
|
||||||
return [com.device,"Arduino Uno"]
|
|
||||||
for com in comports(): # Arduino Mega
|
|
||||||
if com.vid == board_dict["mega"][0] and com.pid == board_dict["mega"][1]:
|
|
||||||
return [com.device,"Arduino Mega"]
|
|
||||||
return [None,""]
|
|
||||||
|
|
||||||
##
|
|
||||||
# Création de l'objet carte (protocole Firmata)
|
|
||||||
##
|
|
||||||
|
|
||||||
def board_init(port):
|
|
||||||
try:
|
|
||||||
return pyfirmata.Arduino(port)
|
|
||||||
except:
|
|
||||||
return None
|
|
||||||
|
|
||||||
##
|
|
||||||
# Création de l'objet serial (communication série)
|
|
||||||
##
|
|
||||||
|
|
||||||
def serial_init(port,speed):
|
|
||||||
try:
|
|
||||||
return serial.Serial(port,speed)
|
|
||||||
except:
|
|
||||||
return None
|
|
||||||
|
|
||||||
##
|
|
||||||
# Affiche la liste des cartes (communication série)
|
|
||||||
##
|
|
||||||
|
|
||||||
def serial_devices():
|
|
||||||
for com in comports():
|
|
||||||
print ("Name : "+str(com.name)+"\n"
|
|
||||||
+" Device : "+str(com.device)+"\n"
|
|
||||||
+" Hardware ID : "+str(com.hwid)+"\n"
|
|
||||||
+" USB Vendor ID : "+str(com.vid)+"\n"
|
|
||||||
+" USB Product ID : "+str(com.pid)+"\n"
|
|
||||||
+" USB device location : "+str(com.location)+"\n"
|
|
||||||
+" USB manufacturer : "+str(com.manufacturer)+"\n"
|
|
||||||
+" USB product : "+str(com.product)+"\n"
|
|
||||||
+" Interface-specific : "+str(com.interface))
|
|
||||||
|
|
||||||
##
|
|
||||||
# Activation de la communication avec la carte de communication (Arduino, Micro:bit)
|
|
||||||
# Vitesse : 115200 -> 7 fps, 38400 -> 6 fps, 9600 -> 2 fps
|
|
||||||
# pyserial : baudrate=115200
|
|
||||||
# pyfirmata : baudrate=57600
|
|
||||||
##
|
|
||||||
|
|
||||||
def jumeau(pins):
|
|
||||||
global board
|
|
||||||
# global gyr_pin
|
|
||||||
|
|
||||||
# UI : étape 1
|
|
||||||
scene.objects['Twins-icon'].setVisible(True,True)
|
|
||||||
scene.objects['Twins-text']['Text'] = "Connection en cours ..."
|
|
||||||
scene.objects['Twins-text'].setVisible(True,False)
|
|
||||||
|
|
||||||
# Mise en place de la carte
|
|
||||||
speed = 57600
|
|
||||||
[device,board_name] =serial_autoget_port() # Recherche automatique du port
|
|
||||||
if device is None:
|
|
||||||
scene.objects['System']['twins'] = False
|
|
||||||
scene.objects['Twins-text']['Text'] = "Aucune connection disponible : jumeau réel débranché."
|
|
||||||
return False
|
|
||||||
board = board_init(device)
|
|
||||||
if board is None:
|
|
||||||
scene.objects['System']['twins'] = False
|
|
||||||
scene.objects['Twins-text']['Text'] = "Aucune connection disponible : port "+device+" pas prêt"
|
|
||||||
return False
|
|
||||||
scene.objects['System']['twins'] = True
|
|
||||||
# scene.objects['System']['twins_close'] = False
|
|
||||||
scene.objects['System']['twins_port'] = device
|
|
||||||
scene.objects['System']['twins_speed'] = speed
|
|
||||||
# scene.objects['System']['twins_readline'] = ""
|
|
||||||
board_it = pyfirmata.util.Iterator(board) # Itérateur pour les entrées
|
|
||||||
board_it.start()
|
|
||||||
|
|
||||||
# UI : étape 2
|
|
||||||
if board =="":
|
|
||||||
scene.objects['Twins-text']['Text'] = "Connection ouverte : "+device+" - "+str(speed)+" baud."
|
|
||||||
else:
|
|
||||||
scene.objects['Twins-text']['Text'] = "Connection ouverte : "+board_name+" sur "+device+" à "+str(speed)+" baud."
|
|
||||||
tempo (0.1)
|
|
||||||
|
|
||||||
# Déclaration des entrées - sorties
|
|
||||||
for pin in pins:
|
|
||||||
print (pin)
|
|
||||||
# if
|
|
||||||
# bp_ext_pin = board_io('d:'+str(es_dict['bp_ext'])+':i') # Bouton poussoir coté rue
|
|
||||||
# bp_int_pin = board_io('d:'+str(es_dict['bp_int'])+':i') # Bouton poussoir coté cour
|
|
||||||
# fdc_o_pin = board_io('d:'+str(es_dict['fdc_o'])+':i') # Capteur fin de course portail ouvert
|
|
||||||
# fdc_f_pin = board_io('d:'+str(es_dict['fdc_f'])+':i') # Capteur fin de course portail fermé
|
|
||||||
# ir_recept_pin = board_io('d:'+str(es_dict['ir_recept'])+':i') # Recepteur pour le capteur barrage IR
|
|
||||||
|
|
||||||
# gyr_pin = board_io('d:'+str(es_dict['gyr'])+':o') # Gyrophare
|
|
||||||
# mot_o_pin = board_io('d:'+str(es_dict['mot_o'])+':o') # Ouvrir le portail (moteur sens trigo)
|
|
||||||
# mot_f_pin = board_io('d:'+str(es_dict['mot_f'])+':o') # Fermer le portail (moteur sens horaire
|
|
||||||
# ir_emett_pin = board_io('d:'+str(es_dict['ir_emett'])+':o') # Emetteur pour le capteur barrage IR
|
|
||||||
return True
|
|
||||||
|
|
||||||
# def board_io(da,pin,io):
|
|
||||||
# if pin_def is not None:
|
|
||||||
# return board.get_pin(da+':'+pin_def)
|
|
||||||
# else:
|
|
||||||
# print ("Définition entrée-sortie non trouvée : "+pin_def)
|
|
||||||
|
|
||||||
##
|
|
||||||
# Fermeture de la communication série
|
|
||||||
##
|
|
||||||
|
|
||||||
def jumeau_close():
|
|
||||||
global board
|
|
||||||
# twins_serial.close() # Fermer proprement le port série
|
|
||||||
board.exit() # Fermer proprement la communication avec la carte
|
|
||||||
scene.objects['System']['twins'] = False
|
|
||||||
scene.objects['Twins-text']['Text'] = "Connection fermée."
|
|
||||||
|
|
||||||
# Configuration du port
|
|
||||||
# FIXME
|
|
||||||
def jumeau_config(port, speed):
|
|
||||||
# global board
|
|
||||||
pass
|
|
||||||
# global twins_serial
|
|
||||||
# if scene.objects['System']['twins']:
|
|
||||||
# serial_msg1 = "CF\n"
|
|
||||||
# twins_serial.write(serial_msg1.encode())
|
|
||||||
# tempo (1)
|
|
||||||
# serial_msg2 = str(speed)+"\n"
|
|
||||||
# twins_serial.write(serial_msg2.encode())
|
|
||||||
# tempo (1)
|
|
||||||
# serial_msg3 = str(temps_avancer)+"\n"
|
|
||||||
# twins_serial.write(serial_msg3.encode())
|
|
||||||
# tempo (1)
|
|
||||||
# serial_msg4 = str(temps_tourner)+"\n"
|
|
||||||
# twins_serial.write(serial_msg4.encode())
|
|
||||||
# tempo (1)
|
|
||||||
# serial_msg5 = "FC\n"
|
|
||||||
# twins_serial.write(serial_msg5.encode())
|
|
||||||
|
|
||||||
##
|
|
||||||
# Envoi d'un message vers la communication série
|
|
||||||
##
|
|
||||||
|
|
||||||
# def serie_msg(text):
|
|
||||||
# global twins_serial
|
|
||||||
# text2= text+"\n"
|
|
||||||
# scene.objects['Twins-text']['Text'] = "Communication : envoi message : "+text
|
|
||||||
# twins_serial.write(text2.encode())
|
|
||||||
|
|
||||||
##
|
|
||||||
# Mise en écoute de jumeau numérique (figeage de la scène)
|
|
||||||
##
|
|
||||||
|
|
||||||
# def twins_listen(cont):
|
|
||||||
# global twins_serial
|
|
||||||
# if scene.objects['System']['twins']:
|
|
||||||
# if scene.objects['System']['twins_readline'] != "":
|
|
||||||
# scene.objects['Twins-text']['Text'] = "Écoute de la connection figeage de la scène.... Message reçu : "+scene.objects['System']['twins_readline']
|
|
||||||
# else:
|
|
||||||
# scene.objects['Twins-text']['Text'] = "Écoute de la connection figeage de la scène..."
|
|
||||||
# if cont.sensors['Property'].positive:
|
|
||||||
# if scene.objects['System']['twins_listen'] :
|
|
||||||
# serial_msg = twins_serial.readline()
|
|
||||||
# if serial_msg is not None:
|
|
||||||
# scene.objects['System']['twins_readline'] = str(serial_msg)
|
|
||||||
# # scene.objects['Twins-text']['Text'] = "Message reçu : "+str(serial_msg)
|
|
||||||
# scene.objects['System']['twins_listen'] = False
|
|
||||||
|
|
||||||
##
|
|
||||||
# Réception d'un message de la communication série
|
|
||||||
##
|
|
||||||
|
|
||||||
# def serie_rcpt():
|
|
||||||
# # scene.objects['Twins-text']['Text'] = "Écoute de la \nconnection\n figeage de \n la scène"
|
|
||||||
# scene.objects['System']['twins_readline'] = ""
|
|
||||||
# scene.objects['System']['twins_listen'] = True
|
|
||||||
# while scene.objects['System']['twins_readline'] == "":
|
|
||||||
# if scene.objects['System']['twins_readline'] != "":
|
|
||||||
# break
|
|
||||||
# # scene.objects['Twins-text']['Text'] = "Connection\nouverte :\n"+scene.objects['System']['twins_port']+"\n"+str(scene.objects['System']['twins_speed'])+" baud"
|
|
||||||
# return scene.objects['System']['twins_readline']
|
|
||||||
|
1
portail_coulissant/twin_serial.py
Symbolic link
1
portail_coulissant/twin_serial.py
Symbolic link
@ -0,0 +1 @@
|
|||||||
|
/home/phroy/Bureau/seriousgames/blender-edutech/git/digital_twin/twin_serial.py
|
@ -1,6 +1,10 @@
|
|||||||
import bge # Bibliothèque Blender Game Engine (UPBGE)
|
import bge # Bibliothèque Blender Game Engine (UPBGE)
|
||||||
import numpy as np
|
import numpy as np
|
||||||
|
|
||||||
|
import serial # Liaison série
|
||||||
|
import pyfirmata # Protocole Firmata
|
||||||
|
from serial.tools.list_ports import comports # Détection du port automatique
|
||||||
|
|
||||||
###############################################################################
|
###############################################################################
|
||||||
# twin_serial.py
|
# twin_serial.py
|
||||||
# @title: Gestion de la laison série
|
# @title: Gestion de la laison série
|
||||||
|
1
volet_roulant/twin_serial.py
Symbolic link
1
volet_roulant/twin_serial.py
Symbolic link
@ -0,0 +1 @@
|
|||||||
|
/home/phroy/Bureau/seriousgames/blender-edutech/git/digital_twin/twin_serial.py
|
@ -1,11 +1,8 @@
|
|||||||
import bge # Bibliothèque Blender Game Engine (UPBGE)
|
import bge # Bibliothèque Blender Game Engine (UPBGE)
|
||||||
from twin_threading import * # Multithreading
|
from twin_threading import thread_cmd_start, thread_cmd_stop # Multithreading
|
||||||
|
from twin_serial import jumeau # Liaison série
|
||||||
import time
|
import time
|
||||||
|
|
||||||
import serial # Liaison série
|
|
||||||
import pyfirmata # Protocole Firmata
|
|
||||||
from serial.tools.list_ports import comports # Détection du port automatique
|
|
||||||
|
|
||||||
###############################################################################
|
###############################################################################
|
||||||
# volrou_lib.py
|
# volrou_lib.py
|
||||||
# @title: Bibliothèque utilisateur du volet roulant
|
# @title: Bibliothèque utilisateur du volet roulant
|
||||||
@ -97,201 +94,3 @@ def bp_auto ():
|
|||||||
|
|
||||||
def tempo (duree):
|
def tempo (duree):
|
||||||
time.sleep(duree)
|
time.sleep(duree)
|
||||||
|
|
||||||
###############################################################################
|
|
||||||
# Jumeau numérique
|
|
||||||
###############################################################################
|
|
||||||
|
|
||||||
##
|
|
||||||
# Recherche automatique du port
|
|
||||||
##
|
|
||||||
|
|
||||||
def serial_autoget_port():
|
|
||||||
# USB Vendor ID, USB Product ID
|
|
||||||
board_dict={'microbit' :[3368, 516],
|
|
||||||
'uno' :[9025, 67],
|
|
||||||
'mega' :[9025, 66]}
|
|
||||||
for com in comports(): # Arduino Uno
|
|
||||||
if com.vid == board_dict["uno"][0] and com.pid == board_dict["uno"][1]:
|
|
||||||
return [com.device,"Arduino Uno"]
|
|
||||||
for com in comports(): # Arduino Mega
|
|
||||||
if com.vid == board_dict["mega"][0] and com.pid == board_dict["mega"][1]:
|
|
||||||
return [com.device,"Arduino Mega"]
|
|
||||||
return [None,""]
|
|
||||||
|
|
||||||
##
|
|
||||||
# Création de l'objet carte (protocole Firmata)
|
|
||||||
##
|
|
||||||
|
|
||||||
def board_init(port):
|
|
||||||
try:
|
|
||||||
return pyfirmata.Arduino(port)
|
|
||||||
except:
|
|
||||||
return None
|
|
||||||
|
|
||||||
##
|
|
||||||
# Création de l'objet serial (communication série)
|
|
||||||
##
|
|
||||||
|
|
||||||
def serial_init(port,speed):
|
|
||||||
try:
|
|
||||||
return serial.Serial(port,speed)
|
|
||||||
except:
|
|
||||||
return None
|
|
||||||
|
|
||||||
##
|
|
||||||
# Affiche la liste des cartes (communication série)
|
|
||||||
##
|
|
||||||
|
|
||||||
def serial_devices():
|
|
||||||
for com in comports():
|
|
||||||
print ("Name : "+str(com.name)+"\n"
|
|
||||||
+" Device : "+str(com.device)+"\n"
|
|
||||||
+" Hardware ID : "+str(com.hwid)+"\n"
|
|
||||||
+" USB Vendor ID : "+str(com.vid)+"\n"
|
|
||||||
+" USB Product ID : "+str(com.pid)+"\n"
|
|
||||||
+" USB device location : "+str(com.location)+"\n"
|
|
||||||
+" USB manufacturer : "+str(com.manufacturer)+"\n"
|
|
||||||
+" USB product : "+str(com.product)+"\n"
|
|
||||||
+" Interface-specific : "+str(com.interface))
|
|
||||||
|
|
||||||
##
|
|
||||||
# Activation de la communication avec la carte de communication (Arduino, Micro:bit)
|
|
||||||
# Vitesse : 115200 -> 7 fps, 38400 -> 6 fps, 9600 -> 2 fps
|
|
||||||
# pyserial : baudrate=115200
|
|
||||||
# pyfirmata : baudrate=57600
|
|
||||||
##
|
|
||||||
|
|
||||||
def jumeau(pins):
|
|
||||||
global board
|
|
||||||
# global gyr_pin
|
|
||||||
|
|
||||||
# UI : étape 1
|
|
||||||
scene.objects['Twins-icon'].setVisible(True,True)
|
|
||||||
scene.objects['Twins-text']['Text'] = "Connection en cours ..."
|
|
||||||
scene.objects['Twins-text'].setVisible(True,False)
|
|
||||||
|
|
||||||
# Mise en place de la carte
|
|
||||||
speed = 57600
|
|
||||||
[device,board_name] =serial_autoget_port() # Recherche automatique du port
|
|
||||||
if device is None:
|
|
||||||
scene.objects['System']['twins'] = False
|
|
||||||
scene.objects['Twins-text']['Text'] = "Aucune connection disponible : jumeau réel débranché."
|
|
||||||
return False
|
|
||||||
board = board_init(device)
|
|
||||||
if board is None:
|
|
||||||
scene.objects['System']['twins'] = False
|
|
||||||
scene.objects['Twins-text']['Text'] = "Aucune connection disponible : port "+device+" pas prêt"
|
|
||||||
return False
|
|
||||||
scene.objects['System']['twins'] = True
|
|
||||||
# scene.objects['System']['twins_close'] = False
|
|
||||||
scene.objects['System']['twins_port'] = device
|
|
||||||
scene.objects['System']['twins_speed'] = speed
|
|
||||||
# scene.objects['System']['twins_readline'] = ""
|
|
||||||
board_it = pyfirmata.util.Iterator(board) # Itérateur pour les entrées
|
|
||||||
board_it.start()
|
|
||||||
|
|
||||||
# UI : étape 2
|
|
||||||
if board =="":
|
|
||||||
scene.objects['Twins-text']['Text'] = "Connection ouverte : "+device+" - "+str(speed)+" baud."
|
|
||||||
else:
|
|
||||||
scene.objects['Twins-text']['Text'] = "Connection ouverte : "+board_name+" sur "+device+" à "+str(speed)+" baud."
|
|
||||||
tempo (0.1)
|
|
||||||
|
|
||||||
# Déclaration des entrées - sorties
|
|
||||||
for pin in pins:
|
|
||||||
print (pin)
|
|
||||||
# if
|
|
||||||
# bp_ext_pin = board_io('d:'+str(es_dict['bp_ext'])+':i') # Bouton poussoir coté rue
|
|
||||||
# bp_int_pin = board_io('d:'+str(es_dict['bp_int'])+':i') # Bouton poussoir coté cour
|
|
||||||
# fdc_o_pin = board_io('d:'+str(es_dict['fdc_o'])+':i') # Capteur fin de course portail ouvert
|
|
||||||
# fdc_f_pin = board_io('d:'+str(es_dict['fdc_f'])+':i') # Capteur fin de course portail fermé
|
|
||||||
# ir_recept_pin = board_io('d:'+str(es_dict['ir_recept'])+':i') # Recepteur pour le capteur barrage IR
|
|
||||||
|
|
||||||
# gyr_pin = board_io('d:'+str(es_dict['gyr'])+':o') # Gyrophare
|
|
||||||
# mot_o_pin = board_io('d:'+str(es_dict['mot_o'])+':o') # Ouvrir le portail (moteur sens trigo)
|
|
||||||
# mot_f_pin = board_io('d:'+str(es_dict['mot_f'])+':o') # Fermer le portail (moteur sens horaire
|
|
||||||
# ir_emett_pin = board_io('d:'+str(es_dict['ir_emett'])+':o') # Emetteur pour le capteur barrage IR
|
|
||||||
return True
|
|
||||||
|
|
||||||
# def board_io(da,pin,io):
|
|
||||||
# if pin_def is not None:
|
|
||||||
# return board.get_pin(da+':'+pin_def)
|
|
||||||
# else:
|
|
||||||
# print ("Définition entrée-sortie non trouvée : "+pin_def)
|
|
||||||
|
|
||||||
##
|
|
||||||
# Fermeture de la communication série
|
|
||||||
##
|
|
||||||
|
|
||||||
def jumeau_close():
|
|
||||||
global board
|
|
||||||
# twins_serial.close() # Fermer proprement le port série
|
|
||||||
board.exit() # Fermer proprement la communication avec la carte
|
|
||||||
scene.objects['System']['twins'] = False
|
|
||||||
scene.objects['Twins-text']['Text'] = "Connection fermée."
|
|
||||||
|
|
||||||
# Configuration du port
|
|
||||||
# FIXME
|
|
||||||
def jumeau_config(port, speed):
|
|
||||||
# global board
|
|
||||||
pass
|
|
||||||
# global twins_serial
|
|
||||||
# if scene.objects['System']['twins']:
|
|
||||||
# serial_msg1 = "CF\n"
|
|
||||||
# twins_serial.write(serial_msg1.encode())
|
|
||||||
# tempo (1)
|
|
||||||
# serial_msg2 = str(speed)+"\n"
|
|
||||||
# twins_serial.write(serial_msg2.encode())
|
|
||||||
# tempo (1)
|
|
||||||
# serial_msg3 = str(temps_avancer)+"\n"
|
|
||||||
# twins_serial.write(serial_msg3.encode())
|
|
||||||
# tempo (1)
|
|
||||||
# serial_msg4 = str(temps_tourner)+"\n"
|
|
||||||
# twins_serial.write(serial_msg4.encode())
|
|
||||||
# tempo (1)
|
|
||||||
# serial_msg5 = "FC\n"
|
|
||||||
# twins_serial.write(serial_msg5.encode())
|
|
||||||
|
|
||||||
##
|
|
||||||
# Envoi d'un message vers la communication série
|
|
||||||
##
|
|
||||||
|
|
||||||
# def serie_msg(text):
|
|
||||||
# global twins_serial
|
|
||||||
# text2= text+"\n"
|
|
||||||
# scene.objects['Twins-text']['Text'] = "Communication : envoi message : "+text
|
|
||||||
# twins_serial.write(text2.encode())
|
|
||||||
|
|
||||||
##
|
|
||||||
# Mise en écoute de jumeau numérique (figeage de la scène)
|
|
||||||
##
|
|
||||||
|
|
||||||
# def twins_listen(cont):
|
|
||||||
# global twins_serial
|
|
||||||
# if scene.objects['System']['twins']:
|
|
||||||
# if scene.objects['System']['twins_readline'] != "":
|
|
||||||
# scene.objects['Twins-text']['Text'] = "Écoute de la connection figeage de la scène.... Message reçu : "+scene.objects['System']['twins_readline']
|
|
||||||
# else:
|
|
||||||
# scene.objects['Twins-text']['Text'] = "Écoute de la connection figeage de la scène..."
|
|
||||||
# if cont.sensors['Property'].positive:
|
|
||||||
# if scene.objects['System']['twins_listen'] :
|
|
||||||
# serial_msg = twins_serial.readline()
|
|
||||||
# if serial_msg is not None:
|
|
||||||
# scene.objects['System']['twins_readline'] = str(serial_msg)
|
|
||||||
# # scene.objects['Twins-text']['Text'] = "Message reçu : "+str(serial_msg)
|
|
||||||
# scene.objects['System']['twins_listen'] = False
|
|
||||||
|
|
||||||
##
|
|
||||||
# Réception d'un message de la communication série
|
|
||||||
##
|
|
||||||
|
|
||||||
# def serie_rcpt():
|
|
||||||
# # scene.objects['Twins-text']['Text'] = "Écoute de la \nconnection\n figeage de \n la scène"
|
|
||||||
# scene.objects['System']['twins_readline'] = ""
|
|
||||||
# scene.objects['System']['twins_listen'] = True
|
|
||||||
# while scene.objects['System']['twins_readline'] == "":
|
|
||||||
# if scene.objects['System']['twins_readline'] != "":
|
|
||||||
# break
|
|
||||||
# # scene.objects['Twins-text']['Text'] = "Connection\nouverte :\n"+scene.objects['System']['twins_port']+"\n"+str(scene.objects['System']['twins_speed'])+" baud"
|
|
||||||
# return scene.objects['System']['twins_readline']
|
|
||||||
|
Loading…
Reference in New Issue
Block a user