import time import sys import datetime import threading class SimulationThread(threading.Thread): """ Singleton class for the simulation thread. Usage is the call the classmethod initialize first """ instance = None @classmethod def initialize(cls, periods, start_time=None, run_time=None, time_factor=1.0, verbose=False): """ Initializes the singleton Thread with the given values. The given arguments are passed directly to the instance constructor. See the `__init__` method for argument details. """ cls.instance = SimulationThread(periods, start_time, run_time, time_factor, verbose) @classmethod def get_instance(cls): """ Returns the simulation thread object, or None if initialize() has not been called. """ return cls.instance def __init__(self, periods, start_time, run_time, time_factor, verbose): """ Creates a simulation with the provided data and settings. Args: periods (list): A list of dictionaries that represent the traffic light periods (peak/off-peak etc) as specified in the problem description. See the provided `periods.json` file for the intended format, as this variable is intended to be a json.load() version of this data. start_time (datetime.time): The time at which the simulation should start. run_time (int): The amount of (real) time the simulation should run for before terminating. If None, run indefinitely or until signal_stop() is called. time_factor (float): The time "accelleration" factor at which the simulation should run. A value of 1.0 indicates indentity with real time, and higher values run faster (e.g. 4.0 = four simulated seconds for each real second). verbose (bool): If True, outputs progress information to stdout (for testing purposes only). """ # Parent constructor call (required). Uses parameter daemon=True to ensure thread shutdown when parent program finishes executing # See https://docs.python.org/3/library/threading.html super().__init__(daemon=True) self.mutex = threading.Lock() self.periods = periods self.start_simulation_time = start_time self.run_real_time = run_time self.time_factor = time_factor self.verbose = verbose # NOT a status variable, instead indicates whether signal_stop() has been called self.running = True self.current_period_index = 0 self.current_state_index = 0 self.next_changeover_real_time = None self.start_simulation_datetime = None self.current_simulation_datetime = None # For convenience, convert all ISO format times to datetime.time objects for period in self.periods: period["timestart"] = datetime.time.fromisoformat(period["timestart"]) # Essential - periods are assumed to be in chronological order self.periods.sort(key=lambda x: x["timestart"]) def signal_stop(self): """ Signals the thread to stop, regardless of run time """ self.mutex.acquire() self.running = False self.mutex.release() def force_to_time(self, new_time): """ Resets the simulation time to the provided time. This is for testing purposes; the current state is IGNORED and the sequence immediately resets to the beginning. """ new_simulation_datetime = datetime.datetime.fromisoformat("1900-01-01 " + new_time.isoformat()) self.mutex.acquire() print() self.current_period_index = self._get_period_index(self.periods, new_simulation_datetime.time()) self.current_state_index = 0 self.next_changeover_real_time = time.time() + (self.periods[self.current_period_index]["states"][self.current_state_index]["duration"] / self.time_factor) self.mutex.release() def get_snapshot(self): """ Returns a dictionary of the current simulated state """ self.mutex.acquire() return_data = { "current_period_name": self.periods[self.current_period_index]["name"], "current_period_verbose_name": self.periods[self.current_period_index]["verbose-name"], "current_period_index": self.current_period_index, "current_state_index": self.current_state_index, "current_state_data": self.periods[self.current_period_index]["states"][self.current_state_index], "next_changeover_real_time": self.next_changeover_real_time, "simulation_time": self.current_simulation_datetime.time(), "time_remaining": (self.next_changeover_real_time - time.time()) * self.time_factor, } self.mutex.release() return return_data def _get_period_index(self, test_time: datetime.time): """ Returns the index of the period corresponding to the given time """ for i, period in enumerate(self.periods): if i == len(self.periods) - 1: return i if test_time >= period["timestart"] and \ test_time < self.periods[i + 1]["timestart"]: return i # Should NOT be reached since last state assumed to last until midnight raise def run(self): """ Main thread loop. Note that this method should NOT be called directly: as a subclass of threading.Thread, the inherited start() method should be used instead. The loop executes until one of the following conditions is met: - The given run_time is exceeded (in real seconds), or - signal_stop() has been called - The executing program finishes (daemon=True) """ start_real_time = time.time() if self.start_simulation_time is None: self.start_simulation_datetime = datetime.datetime.now() else: self.start_simulation_datetime = datetime.datetime.fromisoformat( "1900-01-01 " + self.start_simulation_time.isoformat()) self.current_period_index = self._get_period_index(self.start_simulation_datetime.time()) self.current_state_index = 0 self.next_changeover_real_time = start_real_time + (self.periods[self.current_period_index]["states"][self.current_state_index]["duration"] / self.time_factor) if self.verbose: print("Starting with simulation time {}".format(self.start_simulation_datetime.time())) print("Initial state data: {}".format(self.periods[self.current_period_index]["states"][self.current_state_index])) if self.run_real_time is None: print("No simulation time limit.") else: print("Simulation duration (in real second(s)): {}".format(self.run_real_time)) print("Time compression factor: {}".format(self.time_factor)) while (self.run_real_time is None) or (self.run_real_time is not None and time.time() < start_real_time + self.run_real_time): self.mutex.acquire() if not self.running: self.mutex.release() break now = time.time() self.current_simulation_datetime = self.start_simulation_datetime + datetime.timedelta(seconds=int((now - start_real_time) * self.time_factor)) timed_period_index = self._get_period_index(self.current_simulation_datetime.time()) if now >= self.next_changeover_real_time: timed_period_index = self._get_period_index(self.current_simulation_datetime.time()) if timed_period_index != self.current_period_index and self.current_state_index == len(self.periods[self.current_period_index]["states"])-1: if self.verbose: print() print("{} : Changing PERIOD from {} to {}".format(self.current_simulation_datetime.time(), self.periods[self.current_period_index]["name"], self.periods[timed_period_index]["name"])) print(" Old state data: {}".format(self.periods[self.current_period_index]["states"][self.current_state_index])) # Safe to change period self.current_period_index = timed_period_index self.current_state_index = 0 if self.verbose: print(" New state data: {}".format(self.periods[self.current_period_index]["states"][self.current_state_index])) print(" Next changeover in {} second(s)".format(self.periods[self.current_period_index]["states"][self.current_state_index]["duration"])) else: # Current sequence still completing, not safe to change to next period next_cycle_index = (self.current_state_index + 1) % len(self.periods[self.current_period_index]["states"]) if self.verbose: print() print("{} : {} : Changing state from {} to {}".format(self.current_simulation_datetime.time(), self.periods[self.current_period_index]["name"], self.current_state_index, next_cycle_index)) if self.current_period_index != timed_period_index: print(" [period change pending]") print(" Old state data: {}".format(self.periods[self.current_period_index]["states"][self.current_state_index])) print(" New state data: {}".format(self.periods[self.current_period_index]["states"][next_cycle_index])) print(" Next changeover in {} second(s)".format(self.periods[self.current_period_index]["states"][next_cycle_index]["duration"])) self.current_state_index = next_cycle_index # Ensure this calculation is done based on the PREVIOUS scheduled time to avoid "timing drift" self.next_changeover_real_time = self.next_changeover_real_time + (self.periods[self.current_period_index]["states"][self.current_state_index]["duration"] / self.time_factor) else: if self.verbose: print(".", end="") sys.stdout.flush() self.mutex.release() time.sleep(1.0 / self.time_factor)