import time import sys import datetime import threading class SimulationThread(threading.Thread): instance = None @classmethod def get_instance(cls, periods=None, start_time=None, run_time=None, time_factor=1.0): if cls.instance is not None: return cls.instance if periods is None: raise ValueError("Periods parameter must be specified on first call") cls.instance = SimulationThread(periods, start_time, run_time, time_factor) return cls.instance def __init__(self, periods, start_time, run_time, time_factor): super().__init__(daemon=True) self.mutex = threading.Lock() self.running = True self.periods = periods self.start_time = start_time self.run_time = run_time self.time_factor = time_factor self.current_period_index = 0 self.current_state_index = 0 self.next_changeover_time = None self.simulation_start_datetime = None for period in self.periods: period["timestart"] = datetime.time.fromisoformat(period["timestart"]) # Essential - periods are assumed to be in chronological order self.periods.sort(key=lambda x: x["timestart"]) def signal_stop(self): self.mutex.acquire() self.running = False self.mutex.release() def force_to_time(self, new_time): new_datetime = datetime.datetime.fromisoformat("1900-01-01 " + new_time.isoformat()) self.mutex.acquire() print() print("Forcing internal time change from {} to {}".format(self.simulation_start_datetime.time().isoformat(), new_time.isoformat())) self.current_period_index = self._get_period_index(self.periods, new_datetime.time()) self.current_state_index = 0 self.next_changeover_time = time.time() + (self.periods[self.current_period_index]["states"][self.current_state_index]["duration"] / self.time_factor) self.mutex.release() def get_snapshot(self): self.mutex.acquire() return_data = {"current_period": self.periods[self.current_period_index]["name"], "current_state": self.periods[self.current_period_index]["states"][self.current_state_index], "next_changeover_time": self.next_changeover_time, "simulation_time": self.current_simulation_datetime.time()} self.mutex.release() return return_data def _get_period_index(self, states, test_time: datetime.time): for i, state in enumerate(states): if i == len(states) - 1: return i if test_time >= state["timestart"] and \ test_time < states[i+1]["timestart"]: return i # Should NOT be reached since last state assumed to last until midnight # TODO: Raise exception? return None def run(self): real_start_time = time.time() self.simulation_start_datetime = None if self.start_time is None: self.simulation_start_datetime = datetime.datetime.now() else: self.simulation_start_datetime = datetime.datetime.fromisoformat( "1900-01-01 " + self.start_time.isoformat()) self.current_period_index = self._get_period_index(self.periods, self.simulation_start_datetime.time()) self.current_state_index = 0 self.next_changeover_time = real_start_time + (self.periods[self.current_period_index]["states"][self.current_state_index]["duration"] / self.time_factor) print("Starting with simulation time {}".format(self.simulation_start_datetime.time())) print("Initial state data: {}".format(self.periods[self.current_period_index]["states"][self.current_state_index])) if self.run_time is None: print("No simulation time limit.") else: print("Simulation duration (in real second(s)): {}".format(self.run_time)) print("Time compression factor: {}".format(self.run_time, self.time_factor)) while (self.run_time is None) or (self.run_time is not None and time.time() < real_start_time + self.run_time): self.mutex.acquire() if not self.running: self.mutex.release() break now = time.time() self.current_simulation_datetime = self.simulation_start_datetime + datetime.timedelta(seconds=int((now - real_start_time) * self.time_factor)) timed_period_index = self._get_period_index(self.periods, self.current_simulation_datetime.time()) if now >= self.next_changeover_time: timed_period_index = self._get_period_index(self.periods, self.current_simulation_datetime.time()) if timed_period_index != self.current_period_index and self.current_state_index == len(self.periods[self.current_period_index]["states"])-1: # Safe to change states print() print("{} : Changing PERIOD from {} to {}".format(self.current_simulation_datetime.time(), self.periods[self.current_period_index]["name"], self.periods[timed_period_index]["name"])) print(" Old state data: {}".format(self.periods[self.current_period_index]["states"][self.current_state_index])) self.current_period_index = timed_period_index self.current_state_index = 0 print(" New state data: {}".format(self.periods[self.current_period_index]["states"][self.current_state_index])) print(" Next changeover in {} second(s)".format(self.periods[self.current_period_index]["states"][self.current_state_index]["duration"])) else: next_cycle_index = (self.current_state_index + 1) % len(self.periods[self.current_period_index]["states"]) print() print("{} : {} : Changing state from {} to {}".format(self.current_simulation_datetime.time(), self.periods[self.current_period_index]["name"], self.current_state_index, next_cycle_index)) if self.current_period_index != timed_period_index: print(" [period change pending]") print(" Old state data: {}".format(self.periods[self.current_period_index]["states"][self.current_state_index])) print(" New state data: {}".format(self.periods[self.current_period_index]["states"][next_cycle_index])) print(" Next changeover in {} second(s)".format(self.periods[self.current_period_index]["states"][next_cycle_index]["duration"])) self.current_state_index = next_cycle_index self.next_changeover_time = self.next_changeover_time + (self.periods[self.current_period_index]["states"][self.current_state_index]["duration"] / self.time_factor) else: print(".", end="") sys.stdout.flush() self.mutex.release() time.sleep(1.0 / self.time_factor)