import bge # Bibliothèque Blender Game Engine (UPBGE) import threading # Multithreading import trace import sys import time # import serial # Liaison série (jumeau numérique) # from serial.tools.list_ports import comports # Détection du port automatique ############################################################################### # porcou_lib.py # @title: Bibliothèque utilisateur du portail coulissant (pcl_*) # @project: Blender-EduTech # @lang: fr # @authors: Philippe Roy # @copyright: Copyright (C) 2020-2022 Philippe Roy # @license: GNU GPL # ############################################################################### scene = bge.logic.getCurrentScene() debug_mvt = scene.objects['System']['debug_mvt'] # Threads threads_cmd=[] threads_gostore=[] debug_thread = scene.objects['System']['debug_thread'] # Jumeau numérique # twins_serial = None # UPBGE constants JUST_ACTIVATED = bge.logic.KX_INPUT_JUST_ACTIVATED JUST_RELEASED = bge.logic.KX_INPUT_JUST_RELEASED ACTIVATE = bge.logic.KX_INPUT_ACTIVE # JUST_DEACTIVATED = bge.logic.KX_SENSOR_JUST_DEACTIVATED ############################################################################### # Méthode kill pour les tâches (threads) ############################################################################### class thread_with_trace(threading.Thread): def __init__(self, *args, **keywords): threading.Thread.__init__(self, *args, **keywords) self.killed = False def start(self): self.__run_backup = self.run self.run = self.__run threading.Thread.start(self) def __run(self): sys.settrace(self.globaltrace) self.__run_backup() self.run = self.__run_backup def globaltrace(self, frame, event, arg): if event == 'call': return self.localtrace else: return None def localtrace(self, frame, event, arg): if self.killed: if event == 'line': raise SystemExit() return self.localtrace def kill(self): self.killed = True ############################################################################### # Start et stop des tâches (threads) ############################################################################### def thread_start(threads, type_txt, fct): threads.append(thread_with_trace(target = fct)) threads[len(threads)-1].start() if (debug_thread): print ("Thread",type_txt, "#", len(threads)-1, "open.") def thread_stop(threads, type_txt): i=0 zombie_flag=False for t in threads: if not t.is_alive(): if (debug_thread): print ("Thread",type_txt, "#",i,"closed.") else: if (debug_thread): print ("Thread",type_txt, "#",i,"still open ...") t.kill() t.join() if not t.is_alive(): if (debug_thread): print ("Thread",type_txt, "#",i,"killed.") else: if (debug_thread): print ("Thread",type_txt, "#",i,"zombie...") zombie_flag=True i +=1 if zombie_flag==False: if (debug_thread): print ("All threads",type_txt, "are closed.") scene.objects['System']['thread_cmd']=False return True else: if (debug_thread): print ("There are zombies threads",type_txt, ".") return False def thread_cmd_start(fct): thread_start(threads_cmd, "commands", fct) def thread_cmd_stop(): thread_stop(threads_cmd, "commands") def porcou_end(): # Jumeau numérique # if scene.objects['Commands']['twins']: # serial_msg = "FI\n" # twins_serial.write(serial_msg.encode()) # Communication série : modele 3d -> carte communication ( arduino | micro:bit ) # rp_jumeau_close() # Thread if (debug_thread): print ("Thread commands is arrived.") time.sleep(0.125) scene.objects['System']['thread_cmd']=False time.sleep(0.125) def porcou_fin(): porcou_end() def porcou_quit(): porcou_end() ############################################################################### # Actionneurs ############################################################################### # Ordres utilisateur du clignotant def gyr (ordre): scene.objects['Module led']['actif']=ordre # Ordres utilisateur du moteur def mot_o (ordre): scene.objects['Ensemble moteur']['actif_ouvrir']=ordre def mot_f (ordre): scene.objects['Ensemble moteur']['actif_fermer']=ordre # Ordre utilisateur du capteur barrage IR def ir_emet(ordre): scene.objects['Module emetteur IR']['actif']=ordre ############################################################################### # Capteurs ############################################################################### # Compte-rendu utilisateur du capteur fin de course portail ouvert def fc_o (): return scene.objects['Capteur fdc ouvert']['actif'] # Compte-rendu utilisateur du capteur fin de course portail ouvert def fc_f (): return scene.objects['Capteur fdc ferme']['actif'] # Compte-rendu utilisateur du capteur barrage IR def ir_recep (): if scene.objects['Module recepteur IR']['actif'] : return False else: return True ############################################################################### # Boutons poussoirs ############################################################################### # Compte-rendu utilisateur du bouton pousssoir coté rue def bp_ext (): return scene.objects['Module bouton cote rue']['actif'] # Compte-rendu utilisateur du bouton pousssoir coté cour def bp_int (): return scene.objects['Module bouton cote cour']['actif'] ############################################################################### # Temporisation ############################################################################### def tempo (duree): time.sleep(duree)