software/raspi: kivy code dump

This commit is contained in:
2022-06-17 19:57:37 +02:00
parent 011c0b3a58
commit ba3e9a19b4
12 changed files with 373 additions and 0 deletions

2
software/.gitignore vendored
View File

@@ -1,2 +0,0 @@
.pioenvs
.piolibdeps

View File

@@ -1,36 +0,0 @@
This directory is intended for the project specific (private) libraries.
PlatformIO will compile them to static libraries and link to executable file.
The source code of each library should be placed in separate directory, like
"lib/private_lib/[here are source files]".
For example, see how can be organized `Foo` and `Bar` libraries:
|--lib
| |--Bar
| | |--docs
| | |--examples
| | |--src
| | |- Bar.c
| | |- Bar.h
| |--Foo
| | |- Foo.c
| | |- Foo.h
| |- readme.txt --> THIS FILE
|- platformio.ini
|--src
|- main.c
Then in `src/main.c` you should use:
#include <Foo.h>
#include <Bar.h>
// rest H/C/CPP code
PlatformIO will find your libraries automatically, configure preprocessor's
include paths and build them.
More information about PlatformIO Library Dependency Finder
- http://docs.platformio.org/page/librarymanager/ldf.html

View File

@@ -1,21 +0,0 @@
; PlatformIO Project Configuration File
;
; Build options: build flags, source filter
; Upload options: custom upload port, speed and extra flags
; Library options: dependencies, extra library storages
; Advanced options: extra scripting
;
; Please visit documentation for the other options and examples
; http://docs.platformio.org/page/projectconf.html
[env:uno]
platform = atmelavr
board = uno
framework = arduino
monitor_port = /dev/serial/by-id/usb-FTDI_TTL232R_FTF48YOZ-if00-port0
upload_port = /dev/serial/by-id/usb-FTDI_TTL232R_FTF48YOZ-if00-port0
targets = upload, monitor
monitor_baud = 115200
lib_deps =
https://github.com/yaacov/ArduinoModbusSlave/archive/03e0182795fee5e7025c14c5817cac835f760eae.zip

12
software/raspi/config.yml Normal file
View File

@@ -0,0 +1,12 @@
tasks:
- module: key # klucz
- module: code # potencjometr
- module: key # kulka
- module: key # korba
- module: code # szuflada mała
- module: barcode # kody paskowe
- module: snake
speed: 6
- module: microphone # ??
- module: nfc # ??
- module: final_microphone

13
software/raspi/sejf.kv Normal file
View File

@@ -0,0 +1,13 @@
<Sejf>:
orientation: "vertical"
BoxLayout:
id: progress
size_hint_y: 0.1
Label:
text: "hello"
ScreenManager:
id: sm
Screen:
name: "splashscreen"
Label:
text: "Splash screen"

89
software/raspi/sejf.py Normal file
View File

@@ -0,0 +1,89 @@
import os
import logging
os.environ["KIVY_NO_CONSOLELOG"] = "1"
logging.basicConfig(level=logging.INFO)
import kivy
kivy.require("1.0.7")
from kivy.app import App
from kivy.properties import ObjectProperty
from kivy.uix.button import Button
from kivy.uix.label import Label
from kivy.uix.boxlayout import BoxLayout
logging.getLogger("kivy").setLevel(level=logging.WARN)
logger = logging.getLogger("core")
from sejf.modules.base import BaseModule
from sejf.modules.snake import SnakeModule
from sejf.modules.barcode import BarcodeModule
modules = {
"snake": SnakeModule,
"barcode": BarcodeModule,
"input": InputModule,
}
class Sejf(BoxLayout):
pass
class ProgressReport(Label):
task = ObjectProperty(None)
def on_task(self, obj, v: BaseModule = None):
v.bind(finished=lambda o, v: self.update_text())
self.update_text()
def update_text(self):
if self.task.finished:
self.text = "%s: done" % (self.task.__class__.__name__,)
self.color = (0.0, 1.0, 0.0, 1.0)
elif self.task.enabled:
self.text = "%s: inprogress" % (self.task.__class__.__name__,)
self.color = (1.0, 1.0, 0.0, 1.0)
else:
self.text = "%s: nope" % (self.task.__class__.__name__,)
self.text.color = (0.3, 0.3, 0.3, 1.0)
class SejfApp(App):
def __init__(self, config, *args, **kwargs):
logger.info("Loading modules...")
logger.info("%r", config)
self.tasks = []
for i, task in enumerate(config["tasks"]):
if task["module"] in modules:
logger.info("%d: %r", i, task)
conf = task.copy()
conf.pop("module")
self.tasks.append(modules[task["module"]](self, conf))
else:
logger.warn("%d: %r - unsupported module", i, task)
super(SejfApp, self).__init__(*args, **kwargs)
def build(self):
main = Sejf()
for task in self.tasks:
task.build(main)
w = main.ids["progress"]
w.add_widget(ProgressReport(task=task))
return main
if __name__ == "__main__":
import yaml
with open("config.yml") as fd:
config = yaml.load(fd)
SejfApp(config).run()

View File

View File

View File

@@ -0,0 +1,25 @@
from sejf.modules.base import BaseModule, Schema, fields
from kivy.core.window import Window
class BarcodeModule(BaseModule):
text_buffer: str = ""
class ConfigSchema(Schema):
code = fields.String(missing="jp2gmd")
def __init__(self, *args, **kwargs):
super(BarcodeModule, self).__init__(*args, **kwargs)
self._keyboard = Window.request_keyboard(None, self, "text")
self._keyboard.bind(on_textinput=self._on_keyboard_down)
def build(self, parent):
return None
def _on_keyboard_down(self, widget, text):
self.text_buffer += text
self.text_buffer = self.text_buffer[-len(self.config["code"]) :]
if self.text_buffer == self.config["code"]:
self.finished = True

View File

@@ -0,0 +1,20 @@
from marshmallow import Schema, fields
from kivy.event import EventDispatcher
from kivy.properties import BooleanProperty
class BaseModule(EventDispatcher):
ConfigSchema = None
enabled = BooleanProperty(True)
finished = BooleanProperty(False)
def __init__(self, parent, config):
self.parent = parent
if self.ConfigSchema:
self.config = self.ConfigSchema().load(config)
print(self.config)
else:
self.config = config
def build(self, parent):
pass

View File

@@ -0,0 +1,214 @@
import logging
import random
from kivy.clock import Clock
from kivy.uix.label import Label
from kivy.core.window import Window
from kivy.graphics import Ellipse, Rectangle, Color
from kivy.uix.screenmanager import Screen
from kivy.properties import (
NumericProperty,
ListProperty,
StringProperty,
ObjectProperty,
BooleanProperty,
)
from .base import BaseModule, Schema, fields
logger = logging.getLogger(__name__)
class SnakeCell(Label):
type = StringProperty("head")
def __init__(self, *args, **kwargs):
super(SnakeCell, self).__init__(*args, **kwargs)
with self.canvas:
self.c = Color()
self.rect = Rectangle()
self.bind(
x=self.update_rect,
y=self.update_rect,
size=self.update_rect,
pos=self.update_rect,
)
def update_rect(self, *args):
if self.type == "head":
self.c.rgba = (1.0, 0, 0, 1.0)
elif self.type == "fruit":
self.c.rgba = (1.0, 1.0, 0, 1.0)
else:
self.c.rgba = (1.0, 1.0, 1.0, 1.0)
self.rect.size = sz = [15, 15]
self.rect.pos = [
(self.x + 0.5) * self.parent.width / self.parent.col_width - sz[0] / 2,
((self.parent.col_height - self.y - 1) + 0.5)
* self.parent.height
/ self.parent.col_height
- sz[0] / 2,
]
def clone(self, **kwargs):
return SnakeCell(x=self.x, y=self.y, **kwargs)
def collide(self, other):
return self.x == other.x and self.y == other.y
def __repr__(self):
return "<SnakeCell(%d,%d)>" % (self.x, self.y)
class SnakeWidget(Label):
interval = None
direction = ObjectProperty([1, 0])
head = ObjectProperty(None)
tail = ObjectProperty([])
fruits = ObjectProperty([])
tail_length = NumericProperty(5)
col_width = NumericProperty(16)
col_height = NumericProperty(10)
speed = NumericProperty(3)
running = BooleanProperty(False)
def __init__(self, *args, **kwargs):
super(SnakeWidget, self).__init__(*args, **kwargs)
self._keyboard = Window.request_keyboard(None, self, "text")
self._keyboard.bind(on_key_down=self._on_keyboard_down)
self.bind(
pos=self.update_rects, size=self.update_rects,
)
def reset(self):
self.remove_widget(self.head)
for t in self.tail:
self.remove_widget(t)
for f in self.fruits:
self.remove_widget(f)
self.tail = []
self.fruits = []
self.head = SnakeCell(x=8, y=5)
self.add_widget(self.head, index=0)
self.add_fruit()
def on_parent(self, *args):
self.reset()
def on_running(self, *args):
print("on_running", self.running, self.speed)
if self.running and not self.interval:
self.interval = Clock.schedule_interval(self.tick, 1 / self.speed)
elif not self.running and self.interval:
self.interval.cancel()
self.interval = None
def update_rects(self, *args):
for c in self.fruits + self.tail + [self.head]:
c.update_rect()
def tick(self, dt):
n = self.head.clone(type="tail")
self.tail.insert(0, n)
self.add_widget(n, index=1)
n.update_rect()
for to_rem in self.tail[self.tail_length :]:
self.remove_widget(to_rem)
self.tail = self.tail[: self.tail_length]
self.head.x = (self.head.x + self.direction[0]) % self.col_width
self.head.y = (self.head.y + self.direction[1]) % self.col_height
if any(self.head.collide(t) for t in self.tail):
self.text = "collision!"
self.running = False
# self.interval.cancel()
for f in self.fruits[::-1]:
if self.head.collide(f):
self.fruits.remove(f)
self.remove_widget(f)
self.tail_length += 1
self.add_fruit()
def _on_keyboard_down(self, widget, event, *args):
directions = {
"up": (0, -1),
"down": (0, 1),
"left": (-1, 0),
"right": (1, 0),
}
if event[1] in directions:
new_dir = directions[event[1]]
old_dir = self.direction
if new_dir[0] == -old_dir[0] or old_dir[1] == -new_dir[1]:
return
self.direction = new_dir
def add_fruit(self):
for n in range(5):
c = SnakeCell(
x=random.randint(0, self.col_width - 1),
y=random.randint(0, self.col_height - 1),
type="fruit",
)
if any(c.collide(o) for o in self.tail + self.fruits + [self.head]):
continue
self.fruits.append(c)
self.add_widget(c)
c.update_rect()
break
class SnakeModule(BaseModule):
class ConfigSchema(Schema):
speed = fields.Integer(missing=5)
target_score = fields.Integer(missing=8)
start_length = fields.Integer(missing=2)
def __init__(self, *args, **kwargs):
super(SnakeModule, self).__init__(*args, **kwargs)
logger.info("Initializing...")
def build(self, parent):
self.screen = Screen(name="snek")
print(self.config)
self.widget = SnakeWidget(tail_length=self.config["start_length"])
self.widget.speed = self.config["speed"]
# self.widget.running = True
self.update_running_state()
self.bind(
finished=lambda w, v: self.update_running_state(),
enabled=lambda w, v: self.update_running_state(),
)
self.widget.bind(tail_length=self.on_score)
self.screen.add_widget(self.widget)
parent.ids["sm"].add_widget(self.screen)
Clock.schedule_interval(self.tick, 0.2)
self.sm = parent.ids["sm"]
def tick(self, dt):
if self.enabled:
self.sm.switch_to(self.screen)
def on_score(self, widget, score):
print("on_score", score)
if score >= self.config["target_score"]:
self.finished = True
def update_running_state(self):
self.widget.running = self.enabled and not self.finished

View File

@@ -1,35 +0,0 @@
#include <ModbusSlave.h>
#include <EEPROM.h>
// explicitly set stream to use the Serial serialport
Modbus modbus(Serial, 1, 3); // stream = Serial, slave id = 1, rs485 control-pin = 8
void setup() {
pinMode(13, OUTPUT);
Serial.begin(115200);
Serial.print("Initializing! ");
for (int i = 0; i < 16; i++) {
Serial.print(EEPROM.read(1));
Serial.print(" ");
}
Serial.println();
modbus.cbVector[CB_READ_INPUT_REGISTERS] = ReadAnalogIn;
modbus.begin(115200);
}
void loop() {
modbus.poll();
digitalWrite(13, (millis() / 200) % 2);
}
// Handel Read Input Registers (FC=04)
uint8_t ReadAnalogIn(uint8_t fc, uint16_t address, uint16_t length) {
// write registers into the answer buffer
for (int i = 0; i < length; i++) {
modbus.writeRegisterToBuffer(i, analogRead(address + i));
}
return STATUS_OK;
}