trafficlights/simulation/simulation_thread.py

145 lines
7.1 KiB
Python

import time
import sys
import datetime
import threading
class SimulationThread(threading.Thread):
instance = None
@classmethod
def initialize(cls, periods, start_time=None, run_time=None, time_factor=1.0):
cls.instance = SimulationThread(periods, start_time, run_time, time_factor)
@classmethod
def get_instance(cls, periods=None, start_time=None, run_time=None, time_factor=1.0):
return cls.instance
def __init__(self, periods, start_time, run_time, time_factor):
super().__init__(daemon=True)
self.mutex = threading.Lock()
self.running = True
self.periods = periods
self.start_time = start_time
self.run_time = run_time
self.time_factor = time_factor
self.current_period_index = 0
self.current_state_index = 0
self.next_changeover_time = None
self.simulation_start_datetime = None
for period in self.periods:
period["timestart"] = datetime.time.fromisoformat(period["timestart"])
# Essential - periods are assumed to be in chronological order
self.periods.sort(key=lambda x: x["timestart"])
def signal_stop(self):
self.mutex.acquire()
self.running = False
self.mutex.release()
def force_to_time(self, new_time):
new_datetime = datetime.datetime.fromisoformat("1900-01-01 " + new_time.isoformat())
self.mutex.acquire()
print()
print("Forcing internal time change from {} to {}".format(self.simulation_start_datetime.time().isoformat(), new_time.isoformat()))
self.current_period_index = self._get_period_index(self.periods, new_datetime.time())
self.current_state_index = 0
self.next_changeover_time = time.time() + (self.periods[self.current_period_index]["states"][self.current_state_index]["duration"] / self.time_factor)
self.mutex.release()
def get_snapshot(self):
self.mutex.acquire()
return_data = {
"current_period_name": self.periods[self.current_period_index]["name"],
"current_period_verbose_name": self.periods[self.current_period_index]["verbose-name"],
"current_state": self.periods[self.current_period_index]["states"][self.current_state_index],
"next_changeover_time": self.next_changeover_time,
"simulation_time": self.current_simulation_datetime.time(),
}
self.mutex.release()
return return_data
def _get_period_index(self, states, test_time: datetime.time):
for i, state in enumerate(states):
if i == len(states) - 1:
return i
if test_time >= state["timestart"] and \
test_time < states[i+1]["timestart"]:
return i
# Should NOT be reached since last state assumed to last until midnight
# TODO: Raise exception?
return None
def run(self):
real_start_time = time.time()
self.simulation_start_datetime = None
if self.start_time is None:
self.simulation_start_datetime = datetime.datetime.now()
else:
self.simulation_start_datetime = datetime.datetime.fromisoformat(
"1900-01-01 " + self.start_time.isoformat())
self.current_period_index = self._get_period_index(self.periods, self.simulation_start_datetime.time())
self.current_state_index = 0
self.next_changeover_time = real_start_time + (self.periods[self.current_period_index]["states"][self.current_state_index]["duration"] / self.time_factor)
print("Starting with simulation time {}".format(self.simulation_start_datetime.time()))
print("Initial state data: {}".format(self.periods[self.current_period_index]["states"][self.current_state_index]))
if self.run_time is None:
print("No simulation time limit.")
else:
print("Simulation duration (in real second(s)): {}".format(self.run_time))
print("Time compression factor: {}".format(self.run_time, self.time_factor))
while (self.run_time is None) or (self.run_time is not None and time.time() < real_start_time + self.run_time):
self.mutex.acquire()
if not self.running:
self.mutex.release()
break
now = time.time()
self.current_simulation_datetime = self.simulation_start_datetime + datetime.timedelta(seconds=int((now - real_start_time) * self.time_factor))
timed_period_index = self._get_period_index(self.periods, self.current_simulation_datetime.time())
if now >= self.next_changeover_time:
timed_period_index = self._get_period_index(self.periods, self.current_simulation_datetime.time())
if timed_period_index != self.current_period_index and self.current_state_index == len(self.periods[self.current_period_index]["states"])-1:
# Safe to change states
print()
print("{} : Changing PERIOD from {} to {}".format(self.current_simulation_datetime.time(), self.periods[self.current_period_index]["name"], self.periods[timed_period_index]["name"]))
print(" Old state data: {}".format(self.periods[self.current_period_index]["states"][self.current_state_index]))
self.current_period_index = timed_period_index
self.current_state_index = 0
print(" New state data: {}".format(self.periods[self.current_period_index]["states"][self.current_state_index]))
print(" Next changeover in {} second(s)".format(self.periods[self.current_period_index]["states"][self.current_state_index]["duration"]))
else:
next_cycle_index = (self.current_state_index + 1) % len(self.periods[self.current_period_index]["states"])
print()
print("{} : {} : Changing state from {} to {}".format(self.current_simulation_datetime.time(), self.periods[self.current_period_index]["name"], self.current_state_index, next_cycle_index))
if self.current_period_index != timed_period_index:
print(" [period change pending]")
print(" Old state data: {}".format(self.periods[self.current_period_index]["states"][self.current_state_index]))
print(" New state data: {}".format(self.periods[self.current_period_index]["states"][next_cycle_index]))
print(" Next changeover in {} second(s)".format(self.periods[self.current_period_index]["states"][next_cycle_index]["duration"]))
self.current_state_index = next_cycle_index
self.next_changeover_time = self.next_changeover_time + (self.periods[self.current_period_index]["states"][self.current_state_index]["duration"] / self.time_factor)
else:
print(".", end="")
sys.stdout.flush()
self.mutex.release()
time.sleep(1.0 / self.time_factor)