Renaming of periods/states/cycles to something more appropriate. Basic

Django setup with threading added.
This commit is contained in:
Chris Davoren 2023-10-19 16:49:13 +10:00
parent eb897e18ae
commit 7bbd0cfd63
21 changed files with 510 additions and 113 deletions

2
.gitignore vendored
View File

@ -0,0 +1,2 @@
db.sqlite3
*.pyc

22
manage.py Normal file
View File

@ -0,0 +1,22 @@
#!/usr/bin/env python
"""Django's command-line utility for administrative tasks."""
import os
import sys
def main():
"""Run administrative tasks."""
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'trafficlights.settings')
try:
from django.core.management import execute_from_command_line
except ImportError as exc:
raise ImportError(
"Couldn't import Django. Are you sure it's installed and "
"available on your PYTHONPATH environment variable? Did you "
"forget to activate a virtual environment?"
) from exc
execute_from_command_line(sys.argv)
if __name__ == '__main__':
main()

106
period-test.py Normal file
View File

@ -0,0 +1,106 @@
#!/usr/bin/env python3
import sys
import json
import datetime
import time
def get_period_index(periods, test_time : datetime.time):
for i, state in enumerate(periods):
if i == len(periods) - 1:
return(i)
if test_time >= state["timestart"] and test_time < periods[i+1]["timestart"]:
return(i)
# Should NOT be reached since last state assumed to last until midnight
# TODO: Raise exception?
return None
def simulate(periods, start_time, run_time=90, time_factor=1.0):
simulation_start_time = time.time()
simulation_start_datetime = datetime.datetime.fromisoformat("1900-01-01 " + start_time.isoformat())
current_period_index = get_period_index(periods, simulation_start_datetime.time())
current_state_index = 0
next_changeover_time = simulation_start_time + (periods[current_period_index]["states"][current_state_index]["duration"] / time_factor)
print("Starting with simulation time {}".format(simulation_start_datetime.time()))
print("Initial data: {}".format(periods[current_period_index]["states"][current_state_index]))
print("Starting simulation of {} second(s) with time factor {}".format(run_time, time_factor))
while (run_time is None) or (run_time is not None and time.time() < simulation_start_time + run_time):
now = time.time()
current_simulation_datetime = simulation_start_datetime + datetime.timedelta(seconds=int((now - simulation_start_time) * time_factor))
timed_state_index = get_period_index(periods, current_simulation_datetime.time())
if now >= next_changeover_time:
timed_state_index = get_period_index(periods, current_simulation_datetime.time())
if timed_state_index != current_period_index and current_state_index == len(periods[current_period_index]["states"])-1:
# Safe to change periods
print()
print("{} : Changing periods from {} to {}".format(current_simulation_datetime.time(), periods[current_period_index]["name"], periods[timed_state_index]["name"]))
print(" Old cycle data: {}".format(periods[current_period_index]["states"][current_state_index]))
current_period_index = timed_state_index
current_state_index = 0
print(" New cycle data: {}".format(periods[current_period_index]["states"][current_state_index]))
print(" Next changeover in {} second(s)".format(periods[current_period_index]["states"][current_state_index]["duration"]))
else:
next_cycle_index = (current_state_index + 1) % len(periods[current_period_index]["states"])
print()
print("{} : Changing cycle from {} to {}".format(current_simulation_datetime.time(), current_state_index, next_cycle_index))
if current_period_index != timed_state_index:
print(" [state change pending]")
print(" Old cycle data: {}".format(periods[current_period_index]["states"][current_state_index]))
print(" New cycle data: {}".format(periods[current_period_index]["states"][next_cycle_index]))
print(" Next changeover in {} second(s)".format(periods[current_period_index]["states"][next_cycle_index]["duration"]))
current_state_index = next_cycle_index
next_changeover_time = next_changeover_time + (periods[current_period_index]["states"][current_state_index]["duration"] / time_factor)
else:
print(".", end="")
sys.stdout.flush()
time.sleep(1.0 / time_factor)
def main():
periods = json.load(open("periods.json"))
# Essential due to assumed ordering further below
periods.sort(key=lambda x : x["timestart"])
for state in periods:
time = datetime.time.fromisoformat(state["timestart"])
print("Name: {}\t\tTime start: {}".format(state["name"], time))
print(" Number of states: {}".format(len(state["states"])))
state["timestart"] = datetime.time.fromisoformat(state["timestart"])
test_times = [
"00:00:00",
"07:59:59",
"08:00:00",
"09:59:59",
"10:00:00",
"16:59:59",
"17:00:00",
"18:59:59",
"19:00:00",
"23:59:59",
]
for test_time in test_times:
test_time_obj = datetime.time.fromisoformat(test_time)
print("Period matching time {} is '{}'".format(test_time, periods[get_period_index(periods, test_time_obj)]["name"]))
simulate(periods, datetime.time.fromisoformat("07:59"), 60, 4.0)
if __name__ == '__main__':
main()

View File

@ -2,7 +2,7 @@
{
"name" : "offpeak-morning",
"timestart" : "00:00",
"cycles" : [
"states" : [
{ "duration" : 20, "north" : "green", "south" : "green", "east" : "red", "west" : "red", "north-right" : "red" },
{ "duration" : 4, "north" : "green", "south" : "amber", "east" : "red", "west" : "red", "north-right" : "red" },
{ "duration" : 2, "north" : "green", "south" : "red", "east" : "red", "west" : "red", "north-right" : "red" },
@ -17,7 +17,7 @@
{
"name" : "peak-morning",
"timestart" : "08:00",
"cycles" : [
"states" : [
{ "duration" : 40, "north" : "green", "south" : "green", "east" : "red", "west" : "red", "north-right" : "red" },
{ "duration" : 4, "north" : "green", "south" : "amber", "east" : "red", "west" : "red", "north-right" : "red" },
{ "duration" : 2, "north" : "green", "south" : "red", "east" : "red", "west" : "red", "north-right" : "red" },
@ -32,7 +32,7 @@
{
"name" : "offpeak-middle",
"timestart" : "10:00",
"cycles" : [
"states" : [
{ "duration" : 20, "north" : "green", "south" : "green", "east" : "red", "west" : "red", "north-right" : "red" },
{ "duration" : 4, "north" : "green", "south" : "amber", "east" : "red", "west" : "red", "north-right" : "red" },
{ "duration" : 2, "north" : "green", "south" : "red", "east" : "red", "west" : "red", "north-right" : "red" },
@ -47,7 +47,7 @@
{
"name" : "peak-afternoon",
"timestart" : "17:00",
"cycles" : [
"states" : [
{ "duration" : 40, "north" : "green", "south" : "green", "east" : "red", "west" : "red", "north-right" : "red" },
{ "duration" : 4, "north" : "green", "south" : "amber", "east" : "red", "west" : "red", "north-right" : "red" },
{ "duration" : 2, "north" : "green", "south" : "red", "east" : "red", "west" : "red", "north-right" : "red" },
@ -62,7 +62,7 @@
{
"name" : "offpeak-evening",
"timestart" : "19:00",
"cycles" : [
"states" : [
{ "duration" : 20, "north" : "green", "south" : "green", "east" : "red", "west" : "red", "north-right" : "red" },
{ "duration" : 4, "north" : "green", "south" : "amber", "east" : "red", "west" : "red", "north-right" : "red" },
{ "duration" : 2, "north" : "green", "south" : "red", "east" : "red", "west" : "red", "north-right" : "red" },

BIN
requirements.txt Normal file

Binary file not shown.

1
simulation/__init__.py Normal file
View File

@ -0,0 +1 @@
from .simulation_thread import SimulationThread

View File

@ -0,0 +1,138 @@
import time
import sys
import datetime
import threading
class SimulationThread(threading.Thread):
instance = None
@classmethod
def get_instance(cls, periods=None, start_time=None, run_time=None, time_factor=1.0):
if cls.instance is not None:
return cls.instance
if periods is None:
raise ValueError("Periods parameter must be specified on first call")
cls.instance = SimulationThread(periods, start_time, run_time, time_factor)
return cls.instance
def __init__(self, periods, start_time, run_time, time_factor):
super().__init__(daemon=True)
self.mutex = threading.Lock()
self.running = True
self.periods = periods
self.start_time = start_time
self.run_time = run_time
self.time_factor = time_factor
self.current_period_index = 0
self.current_state_index = 0
self.next_changeover_time = None
self.simulation_start_datetime = None
for period in self.periods:
period["timestart"] = datetime.time.fromisoformat(period["timestart"])
def signal_stop(self):
self.mutex.acquire()
self.running = False
self.mutex.release()
def force_to_time(self, new_time):
new_datetime = datetime.datetime.fromisoformat("1900-01-01 " + new_time.isoformat())
self.mutex.acquire()
print()
print("Forcing internal time change from {} to {}".format(self.simulation_start_datetime.time().isoformat(), new_time.isoformat()))
self.current_period_index = self._get_period_index(self.periods, new_datetime.time())
self.current_state_index = 0
self.next_changeover_time = time.time() + (self.periods[self.current_period_index]["states"][self.current_state_index]["duration"] / self.time_factor)
self.mutex.release()
def get_snapshot(self):
self.mutex.acquire()
return_data = {"current_period": self.periods[self.current_period_index]["name"], "current_state": self.periods[self.current_period_index]["states"][self.current_state_index], "next_changeover_time": self.next_changeover_time, "simulation_time": self.current_simulation_datetime.time()}
self.mutex.release()
return return_data
def _get_period_index(self, states, test_time: datetime.time):
for i, state in enumerate(states):
if i == len(states) - 1:
return i
if test_time >= state["timestart"] and \
test_time < states[i+1]["timestart"]:
return i
# Should NOT be reached since last state assumed to last until midnight
# TODO: Raise exception?
return None
def run(self):
real_start_time = time.time()
self.simulation_start_datetime = None
if self.start_time is None:
self.simulation_start_datetime = datetime.datetime.now()
else:
self.simulation_start_datetime = datetime.datetime.fromisoformat(
"1900-01-01 " + self.start_time.isoformat())
self.current_period_index = self._get_period_index(self.periods, self.simulation_start_datetime.time())
self.current_state_index = 0
self.next_changeover_time = real_start_time + (self.periods[self.current_period_index]["states"][self.current_state_index]["duration"] / self.time_factor)
print("Starting with simulation time {}".format(self.simulation_start_datetime.time()))
print("Initial state data: {}".format(self.periods[self.current_period_index]["states"][self.current_state_index]))
if self.run_time is None:
print("No simulation time limit.")
else:
print("Simulation duration (in real second(s)): {}".format(self.run_time))
print("Time compression factor: {}".format(self.run_time, self.time_factor))
while (self.run_time is None) or (self.run_time is not None and time.time() < real_start_time + self.run_time):
self.mutex.acquire()
if not self.running:
self.mutex.release()
break
now = time.time()
self.current_simulation_datetime = self.simulation_start_datetime + datetime.timedelta(seconds=int((now - real_start_time) * self.time_factor))
timed_period_index = self._get_period_index(self.periods, self.current_simulation_datetime.time())
if now >= self.next_changeover_time:
timed_period_index = self._get_period_index(self.periods, self.current_simulation_datetime.time())
if timed_period_index != self.current_period_index and self.current_state_index == len(self.periods[self.current_period_index]["states"])-1:
# Safe to change states
print()
print("{} : Changing PERIOD from {} to {}".format(self.current_simulation_datetime.time(), self.periods[self.current_period_index]["name"], self.periods[timed_period_index]["name"]))
print(" Old state data: {}".format(self.periods[self.current_period_index]["states"][self.current_state_index]))
self.current_period_index = timed_period_index
self.current_state_index = 0
print(" New state data: {}".format(self.periods[self.current_period_index]["states"][self.current_state_index]))
print(" Next changeover in {} second(s)".format(self.periods[self.current_period_index]["states"][self.current_state_index]["duration"]))
else:
next_cycle_index = (self.current_state_index + 1) % len(self.periods[self.current_period_index]["states"])
print()
print("{} : {} : Changing state from {} to {}".format(self.current_simulation_datetime.time(), self.periods[self.current_period_index]["name"], self.current_state_index, next_cycle_index))
if self.current_period_index != timed_period_index:
print(" [period change pending]")
print(" Old state data: {}".format(self.periods[self.current_period_index]["states"][self.current_state_index]))
print(" New state data: {}".format(self.periods[self.current_period_index]["states"][next_cycle_index]))
print(" Next changeover in {} second(s)".format(self.periods[self.current_period_index]["states"][next_cycle_index]["duration"]))
self.current_state_index = next_cycle_index
self.next_changeover_time = self.next_changeover_time + (self.periods[self.current_period_index]["states"][self.current_state_index]["duration"] / self.time_factor)
else:
print(".", end="")
sys.stdout.flush()
self.mutex.release()
time.sleep(1.0 / self.time_factor)

View File

@ -1,108 +0,0 @@
#!/usr/bin/env python3
import sys
import json
import datetime
import time
def get_state_index(states, test_time : datetime.time):
state_result = None
for i, state in enumerate(states):
if i == len(states) - 1:
return(i)
if test_time >= state["timestart"] and test_time < states[i+1]["timestart"]:
return(i)
# Should NOT be reached since last state assumed to last until midnight
# TODO: Raise exception?
return None
def simulate(states, start_time, run_time=90, time_factor=1.0):
simulation_start_time = time.time()
simulation_start_datetime = datetime.datetime.fromisoformat("1900-01-01 " + start_time.isoformat())
current_state_index = get_state_index(states, simulation_start_datetime.time())
current_cycle_index = 0
next_changeover_time = simulation_start_time + (states[current_state_index]["cycles"][current_cycle_index]["duration"] / time_factor)
print("Starting with simulation time {}".format(simulation_start_datetime.time()))
print("Initial data: {}".format(states[current_state_index]["cycles"][current_cycle_index]))
print("Starting simulation of {} second(s) with time factor {}".format(run_time, time_factor))
while (run_time is None) or (run_time is not None and time.time() < simulation_start_time + run_time):
now = time.time()
current_simulation_datetime = simulation_start_datetime + datetime.timedelta(seconds=int((now - simulation_start_time) * time_factor))
timed_state_index = get_state_index(states, current_simulation_datetime.time())
if now >= next_changeover_time:
timed_state_index = get_state_index(states, current_simulation_datetime.time())
if timed_state_index != current_state_index and current_cycle_index == len(states[current_state_index]["cycles"])-1:
# Safe to change states
print()
print("{} : Changing STATES from {} to {}".format(current_simulation_datetime.time(), states[current_state_index]["name"], states[timed_state_index]["name"]))
print(" Old cycle data: {}".format(states[current_state_index]["cycles"][current_cycle_index]))
current_state_index = timed_state_index
current_cycle_index = 0
print(" New cycle data: {}".format(states[current_state_index]["cycles"][current_cycle_index]))
print(" Next changeover in {} second(s)".format(states[current_state_index]["cycles"][current_cycle_index]["duration"]))
else:
next_cycle_index = (current_cycle_index + 1) % len(states[current_state_index]["cycles"])
print()
print("{} : Changing cycle from {} to {}".format(current_simulation_datetime.time(), current_cycle_index, next_cycle_index))
if current_state_index != timed_state_index:
print(" [state change pending]")
print(" Old cycle data: {}".format(states[current_state_index]["cycles"][current_cycle_index]))
print(" New cycle data: {}".format(states[current_state_index]["cycles"][next_cycle_index]))
print(" Next changeover in {} second(s)".format(states[current_state_index]["cycles"][next_cycle_index]["duration"]))
current_cycle_index = next_cycle_index
next_changeover_time = next_changeover_time + (states[current_state_index]["cycles"][current_cycle_index]["duration"] / time_factor)
else:
print(".", end="")
sys.stdout.flush()
time.sleep(1.0 / time_factor)
def main():
states = json.load(open("states.json"))
# Essential due to assumed ordering further below
states.sort(key=lambda x : x["timestart"])
for state in states:
time = datetime.time.fromisoformat(state["timestart"])
print("Name: {}\t\tTime start: {}".format(state["name"], time))
print(" Number of cycles: {}".format(len(state["cycles"])))
state["timestart"] = datetime.time.fromisoformat(state["timestart"])
test_times = [
"00:00:00",
"07:59:59",
"08:00:00",
"09:59:59",
"10:00:00",
"16:59:59",
"17:00:00",
"18:59:59",
"19:00:00",
"23:59:59",
]
for test_time in test_times:
test_time_obj = datetime.time.fromisoformat(test_time)
print("State matching time {} is '{}'".format(test_time, states[get_state_index(states, test_time_obj)]["name"]))
simulate(states, datetime.time.fromisoformat("07:59"), 60, 4.0)
if __name__ == '__main__':
main()

View File

View File

@ -0,0 +1,3 @@
from django.contrib import admin
# Register your models here.

View File

@ -0,0 +1,35 @@
import os
import datetime
import json
from django.apps import AppConfig
from django.conf import settings
from simulation import SimulationThread
def simulation():
while True:
print("Simulation: {}".format(time.time()))
time.sleep(1.0)
class TrafficlightfrontendConfig(AppConfig):
default_auto_field = 'django.db.models.BigAutoField'
name = 'trafficlightfrontend'
initialized = False
simulation_thread = None
# Note as per documentation, may be called more than once
# Therefore needs to be idempotent or use flag
def ready(self):
# Ensure RUN_MAIN to avoid starting thread on runserver "change watchdog" instance
if TrafficlightfrontendConfig.initialized or os.environ.get("RUN_MAIN") is None:
return
print("Traffic Light Frontend: initialization.")
print("Current working directory: {}".format(os.getcwd()))
print(settings.BASE_DIR)
periods = json.load(open("states.json"))
SimulationThread.get_instance(periods, start_time=datetime.time.fromisoformat("07:59"), time_factor=4.0).start()
TrafficlightfrontendConfig.initialized = True

View File

@ -0,0 +1,3 @@
from django.db import models
# Create your models here.

View File

@ -0,0 +1,3 @@
from django.test import TestCase
# Create your tests here.

View File

@ -0,0 +1,7 @@
from django.urls import path
from . import views
urlpatterns = [
path("", views.index, name="index")
]

View File

@ -0,0 +1,6 @@
from django.http import HttpResponse
from django.shortcuts import render
# Create your views here.
def index(request):
return HttpResponse("Basic test response.")

View File

16
trafficlights/asgi.py Normal file
View File

@ -0,0 +1,16 @@
"""
ASGI config for trafficlights project.
It exposes the ASGI callable as a module-level variable named ``application``.
For more information on this file, see
https://docs.djangoproject.com/en/4.2/howto/deployment/asgi/
"""
import os
from django.core.asgi import get_asgi_application
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'trafficlights.settings')
application = get_asgi_application()

124
trafficlights/settings.py Normal file
View File

@ -0,0 +1,124 @@
"""
Django settings for trafficlights project.
Generated by 'django-admin startproject' using Django 4.2.6.
For more information on this file, see
https://docs.djangoproject.com/en/4.2/topics/settings/
For the full list of settings and their values, see
https://docs.djangoproject.com/en/4.2/ref/settings/
"""
from pathlib import Path
# Build paths inside the project like this: BASE_DIR / 'subdir'.
BASE_DIR = Path(__file__).resolve().parent.parent
# Quick-start development settings - unsuitable for production
# See https://docs.djangoproject.com/en/4.2/howto/deployment/checklist/
# SECURITY WARNING: keep the secret key used in production secret!
SECRET_KEY = 'django-insecure-zin2hwf7t=w0@skuu3g(-9iyrp9h-c=myg8%7ifds%g-8*2)e5'
# SECURITY WARNING: don't run with debug turned on in production!
DEBUG = True
ALLOWED_HOSTS = []
# Application definition
INSTALLED_APPS = [
'django.contrib.admin',
'django.contrib.auth',
'django.contrib.contenttypes',
'django.contrib.sessions',
'django.contrib.messages',
'django.contrib.staticfiles',
'trafficlightfrontend',
]
MIDDLEWARE = [
'django.middleware.security.SecurityMiddleware',
'django.contrib.sessions.middleware.SessionMiddleware',
'django.middleware.common.CommonMiddleware',
'django.middleware.csrf.CsrfViewMiddleware',
'django.contrib.auth.middleware.AuthenticationMiddleware',
'django.contrib.messages.middleware.MessageMiddleware',
'django.middleware.clickjacking.XFrameOptionsMiddleware',
]
ROOT_URLCONF = 'trafficlights.urls'
TEMPLATES = [
{
'BACKEND': 'django.template.backends.django.DjangoTemplates',
'DIRS': [],
'APP_DIRS': True,
'OPTIONS': {
'context_processors': [
'django.template.context_processors.debug',
'django.template.context_processors.request',
'django.contrib.auth.context_processors.auth',
'django.contrib.messages.context_processors.messages',
],
},
},
]
WSGI_APPLICATION = 'trafficlights.wsgi.application'
# Database
# https://docs.djangoproject.com/en/4.2/ref/settings/#databases
DATABASES = {
'default': {
'ENGINE': 'django.db.backends.sqlite3',
'NAME': BASE_DIR / 'db.sqlite3',
}
}
# Password validation
# https://docs.djangoproject.com/en/4.2/ref/settings/#auth-password-validators
AUTH_PASSWORD_VALIDATORS = [
{
'NAME': 'django.contrib.auth.password_validation.UserAttributeSimilarityValidator',
},
{
'NAME': 'django.contrib.auth.password_validation.MinimumLengthValidator',
},
{
'NAME': 'django.contrib.auth.password_validation.CommonPasswordValidator',
},
{
'NAME': 'django.contrib.auth.password_validation.NumericPasswordValidator',
},
]
# Internationalization
# https://docs.djangoproject.com/en/4.2/topics/i18n/
LANGUAGE_CODE = 'en-us'
TIME_ZONE = 'UTC'
USE_I18N = True
USE_TZ = True
# Static files (CSS, JavaScript, Images)
# https://docs.djangoproject.com/en/4.2/howto/static-files/
STATIC_URL = 'static/'
# Default primary key field type
# https://docs.djangoproject.com/en/4.2/ref/settings/#default-auto-field
DEFAULT_AUTO_FIELD = 'django.db.models.BigAutoField'

23
trafficlights/urls.py Normal file
View File

@ -0,0 +1,23 @@
"""
URL configuration for trafficlights project.
The `urlpatterns` list routes URLs to views. For more information please see:
https://docs.djangoproject.com/en/4.2/topics/http/urls/
Examples:
Function views
1. Add an import: from my_app import views
2. Add a URL to urlpatterns: path('', views.home, name='home')
Class-based views
1. Add an import: from other_app.views import Home
2. Add a URL to urlpatterns: path('', Home.as_view(), name='home')
Including another URLconf
1. Import the include() function: from django.urls import include, path
2. Add a URL to urlpatterns: path('blog/', include('blog.urls'))
"""
from django.contrib import admin
from django.urls import path, include
urlpatterns = [
path('frontend/', include("trafficlightfrontend.urls")),
path('admin/', admin.site.urls),
]

16
trafficlights/wsgi.py Normal file
View File

@ -0,0 +1,16 @@
"""
WSGI config for trafficlights project.
It exposes the WSGI callable as a module-level variable named ``application``.
For more information on this file, see
https://docs.djangoproject.com/en/4.2/howto/deployment/wsgi/
"""
import os
from django.core.wsgi import get_wsgi_application
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'trafficlights.settings')
application = get_wsgi_application()