#!/usr/bin/env python3 import sys import json import datetime import time def get_state_index(states, test_time : datetime.time): state_result = None for i, state in enumerate(states): if i == len(states) - 1: return(i) if test_time >= state["timestart"] and test_time < states[i+1]["timestart"]: return(i) # Should NOT be reached since last state assumed to last until midnight # TODO: Raise exception? return None def simulate(states, start_time, run_time=90, time_factor=1.0): simulation_start_time = time.time() simulation_start_datetime = datetime.datetime.fromisoformat("1900-01-01 " + start_time.isoformat()) current_state_index = get_state_index(states, simulation_start_datetime.time()) current_cycle_index = 0 next_changeover_time = simulation_start_time + (states[current_state_index]["cycles"][current_cycle_index]["duration"] / time_factor) print("Starting with simulation time {}".format(simulation_start_datetime.time())) print("Initial data: {}".format(states[current_state_index]["cycles"][current_cycle_index])) print("Starting simulation of {} second(s) with time factor {}".format(run_time, time_factor)) while (run_time is None) or (run_time is not None and time.time() < simulation_start_time + run_time): now = time.time() current_simulation_datetime = simulation_start_datetime + datetime.timedelta(seconds=int((now - simulation_start_time) * time_factor)) timed_state_index = get_state_index(states, current_simulation_datetime.time()) if now >= next_changeover_time: timed_state_index = get_state_index(states, current_simulation_datetime.time()) if timed_state_index != current_state_index and current_cycle_index == len(states[current_state_index]["cycles"])-1: # Safe to change states print() print("{} : Changing STATES from {} to {}".format(current_simulation_datetime.time(), states[current_state_index]["name"], states[timed_state_index]["name"])) print(" Old cycle data: {}".format(states[current_state_index]["cycles"][current_cycle_index])) current_state_index = timed_state_index current_cycle_index = 0 print(" New cycle data: {}".format(states[current_state_index]["cycles"][current_cycle_index])) print(" Next changeover in {} second(s)".format(states[current_state_index]["cycles"][current_cycle_index]["duration"])) else: next_cycle_index = (current_cycle_index + 1) % len(states[current_state_index]["cycles"]) print() print("{} : Changing cycle from {} to {}".format(current_simulation_datetime.time(), current_cycle_index, next_cycle_index)) if current_state_index != timed_state_index: print(" [state change pending]") print(" Old cycle data: {}".format(states[current_state_index]["cycles"][current_cycle_index])) print(" New cycle data: {}".format(states[current_state_index]["cycles"][next_cycle_index])) print(" Next changeover in {} second(s)".format(states[current_state_index]["cycles"][next_cycle_index]["duration"])) current_cycle_index = next_cycle_index next_changeover_time = next_changeover_time + (states[current_state_index]["cycles"][current_cycle_index]["duration"] / time_factor) else: print(".", end="") sys.stdout.flush() time.sleep(1.0 / time_factor) def main(): states = json.load(open("states.json")) # Essential due to assumed ordering further below states.sort(key=lambda x : x["timestart"]) for state in states: time = datetime.time.fromisoformat(state["timestart"]) print("Name: {}\t\tTime start: {}".format(state["name"], time)) print(" Number of cycles: {}".format(len(state["cycles"]))) state["timestart"] = datetime.time.fromisoformat(state["timestart"]) test_times = [ "00:00:00", "07:59:59", "08:00:00", "09:59:59", "10:00:00", "16:59:59", "17:00:00", "18:59:59", "19:00:00", "23:59:59", ] for test_time in test_times: test_time_obj = datetime.time.fromisoformat(test_time) print("State matching time {} is '{}'".format(test_time, states[get_state_index(states, test_time_obj)]["name"])) simulate(states, datetime.time.fromisoformat("07:59"), 60, 4.0) if __name__ == '__main__': main()