import bge # Bibliothèque Blender Game Engine (UPBGE) import threading # Multithreading import trace import sys import time ############################################################################### # twin_threading.py # @title: Gestion des tâches (threads) # @project: Blender-EduTech # @lang: fr # @authors: Philippe Roy # @copyright: Copyright (C) 2020-2023 Philippe Roy # @license: GNU GPL ############################################################################### scene = bge.logic.getCurrentScene() # Threads threads_cmd=[] debug_thread = scene.objects['System']['debug_thread'] ############################################################################### # Méthode kill pour les tâches (threads) ############################################################################### class thread_with_trace(threading.Thread): def __init__(self, *args, **keywords): threading.Thread.__init__(self, *args, **keywords) self.killed = False def start(self): self.__run_backup = self.run self.run = self.__run threading.Thread.start(self) def __run(self): sys.settrace(self.globaltrace) self.__run_backup() self.run = self.__run_backup def globaltrace(self, frame, event, arg): if event == 'call': return self.localtrace else: return None def localtrace(self, frame, event, arg): if self.killed: if event == 'line': raise SystemExit() return self.localtrace def kill(self): self.killed = True ############################################################################### # Start et stop des tâches (threads) ############################################################################### def thread_start(threads, type_txt, fct): threads.append(thread_with_trace(target = fct)) threads[len(threads)-1].start() if (debug_thread): print ("Thread",type_txt, "#", len(threads)-1, "open.") # Stop des threads def thread_stop(threads, type_txt): i=0 zombie_flag=False for t in threads: if not t.is_alive(): if (debug_thread): print ("Thread",type_txt, "#",i,"closed.") else: if (debug_thread): print ("Thread",type_txt, "#",i,"still open ...") t.kill() t.join() if not t.is_alive(): if (debug_thread): print ("Thread",type_txt, "#",i,"killed.") else: if (debug_thread): print ("Thread",type_txt, "#",i,"zombie...") zombie_flag=True i +=1 if zombie_flag==False: if (debug_thread): print ("All threads",type_txt, "are closed.") scene.objects['System']['thread_cmd']=False return True else: if (debug_thread): print ("There are zombies threads",type_txt, ".") return False ############################################################################### # Fonctions utilisateur ############################################################################### def thread_cmd_start(fct): thread_start(threads_cmd, "commands", fct) def thread_cmd_stop(): thread_stop(threads_cmd, "commands") def thread_cmd_end(): if (debug_thread): print ("Thread commands is arrived.") time.sleep(0.125) scene.objects['System']['thread_cmd']=False time.sleep(0.125)