trafficlights/period-test.py

107 lines
4.7 KiB
Python

#!/usr/bin/env python3
import sys
import json
import datetime
import time
def get_period_index(periods, test_time : datetime.time):
for i, state in enumerate(periods):
if i == len(periods) - 1:
return(i)
if test_time >= state["timestart"] and test_time < periods[i+1]["timestart"]:
return(i)
# Should NOT be reached since last state assumed to last until midnight
# TODO: Raise exception?
return None
def simulate(periods, start_time, run_time=90, time_factor=1.0):
simulation_start_time = time.time()
simulation_start_datetime = datetime.datetime.fromisoformat("1900-01-01 " + start_time.isoformat())
current_period_index = get_period_index(periods, simulation_start_datetime.time())
current_state_index = 0
next_changeover_time = simulation_start_time + (periods[current_period_index]["states"][current_state_index]["duration"] / time_factor)
print("Starting with simulation time {}".format(simulation_start_datetime.time()))
print("Initial data: {}".format(periods[current_period_index]["states"][current_state_index]))
print("Starting simulation of {} second(s) with time factor {}".format(run_time, time_factor))
while (run_time is None) or (run_time is not None and time.time() < simulation_start_time + run_time):
now = time.time()
current_simulation_datetime = simulation_start_datetime + datetime.timedelta(seconds=int((now - simulation_start_time) * time_factor))
timed_state_index = get_period_index(periods, current_simulation_datetime.time())
if now >= next_changeover_time:
timed_state_index = get_period_index(periods, current_simulation_datetime.time())
if timed_state_index != current_period_index and current_state_index == len(periods[current_period_index]["states"])-1:
# Safe to change periods
print()
print("{} : Changing periods from {} to {}".format(current_simulation_datetime.time(), periods[current_period_index]["name"], periods[timed_state_index]["name"]))
print(" Old cycle data: {}".format(periods[current_period_index]["states"][current_state_index]))
current_period_index = timed_state_index
current_state_index = 0
print(" New cycle data: {}".format(periods[current_period_index]["states"][current_state_index]))
print(" Next changeover in {} second(s)".format(periods[current_period_index]["states"][current_state_index]["duration"]))
else:
next_cycle_index = (current_state_index + 1) % len(periods[current_period_index]["states"])
print()
print("{} : Changing cycle from {} to {}".format(current_simulation_datetime.time(), current_state_index, next_cycle_index))
if current_period_index != timed_state_index:
print(" [state change pending]")
print(" Old cycle data: {}".format(periods[current_period_index]["states"][current_state_index]))
print(" New cycle data: {}".format(periods[current_period_index]["states"][next_cycle_index]))
print(" Next changeover in {} second(s)".format(periods[current_period_index]["states"][next_cycle_index]["duration"]))
current_state_index = next_cycle_index
next_changeover_time = next_changeover_time + (periods[current_period_index]["states"][current_state_index]["duration"] / time_factor)
else:
print(".", end="")
sys.stdout.flush()
time.sleep(1.0 / time_factor)
def main():
periods = json.load(open("periods.json"))
# Essential due to assumed ordering further below
periods.sort(key=lambda x : x["timestart"])
for state in periods:
time = datetime.time.fromisoformat(state["timestart"])
print("Name: {}\t\tTime start: {}".format(state["name"], time))
print(" Number of states: {}".format(len(state["states"])))
state["timestart"] = datetime.time.fromisoformat(state["timestart"])
test_times = [
"00:00:00",
"07:59:59",
"08:00:00",
"09:59:59",
"10:00:00",
"16:59:59",
"17:00:00",
"18:59:59",
"19:00:00",
"23:59:59",
]
for test_time in test_times:
test_time_obj = datetime.time.fromisoformat(test_time)
print("Period matching time {} is '{}'".format(test_time, periods[get_period_index(periods, test_time_obj)]["name"]))
simulate(periods, datetime.time.fromisoformat("07:59"), 60, 4.0)
if __name__ == '__main__':
main()