Merge pull request #4 from pimoroni/extra-event-options

Add support for additional per pin event options
remotes/origin/HEAD
Philip Howard 2020-03-25 13:41:53 +00:00 committed by GitHub
commit d2f4003970
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 109 additions and 27 deletions

View File

@ -58,6 +58,17 @@ Supported modes:
- active_low - configures the pin with a pull-up and triggers when it reads 0/low (RECOMMENDED)
- active_high - configures the pin as a pull-down and triggers when it reads 1/high
Events volume_up and volume_down both support an (optional) "step" option, which controls the amount (in percent) that the volume is adjusted with each button press.
Eg::
[raspberry-gpio]
enabled = true
bcm5 = play_pause,active_low,250
bcm6 = volume_down,active_low,250,step=1
bcm16 = next,active_low,250
bcm20 = volume_up,active_low,250,step=1
Project resources
=================

View File

@ -46,37 +46,39 @@ class RaspberryGPIOFrontend(pykka.ThreadingActor, core.CoreListener):
def gpio_event(self, pin):
settings = self.pin_settings[pin]
self.dispatch_input(settings.event)
self.dispatch_input(settings)
def dispatch_input(self, event):
handler_name = f"handle_{event}"
def dispatch_input(self, settings):
handler_name = f"handle_{settings.event}"
try:
getattr(self, handler_name)()
getattr(self, handler_name)(settings.options)
except AttributeError:
raise RuntimeError(
f"Could not find input handler for event: {event}"
f"Could not find input handler for event: {settings.event}"
)
def handle_play_pause(self):
def handle_play_pause(self, config):
if self.core.playback.get_state().get() == core.PlaybackState.PLAYING:
self.core.playback.pause()
else:
self.core.playback.play()
def handle_next(self):
def handle_next(self, config):
self.core.playback.next()
def handle_prev(self):
def handle_prev(self, config):
self.core.playback.previous()
def handle_volume_up(self):
def handle_volume_up(self, config):
step = int(config.get("step", 5))
volume = self.core.mixer.get_volume().get()
volume += 5
volume += step
volume = min(volume, 100)
self.core.mixer.set_volume(volume)
def handle_volume_down(self):
def handle_volume_down(self, config):
step = int(config.get("step", 5))
volume = self.core.mixer.get_volume().get()
volume -= 5
volume -= step
volume = max(volume, 0)
self.core.mixer.set_volume(volume)

View File

@ -12,7 +12,9 @@ class ValidList(list):
class PinConfig(config.ConfigValue):
tuple_pinconfig = namedtuple("PinConfig", ("event", "active", "bouncetime"))
tuple_pinconfig = namedtuple(
"PinConfig", ("event", "active", "bouncetime", "options")
)
valid_events = ValidList(
["play_pause", "prev", "next", "volume_up", "volume_down"]
@ -29,11 +31,14 @@ class PinConfig(config.ConfigValue):
value = types.decode(value).strip()
try:
event, active, bouncetime = value.split(",")
except ValueError:
value = value.split(",")
# At least Event, Active and Bouncetime settings required
if len(value) < 3:
return None
event, active, bouncetime = value[0:3]
if event not in self.valid_events:
raise ValueError(
f"invalid event for pin config {event} (Must be {self.valid_events})"
@ -51,10 +56,17 @@ class PinConfig(config.ConfigValue):
f"invalid bouncetime value for pin config {bouncetime}"
)
return self.tuple_pinconfig(event, active, bouncetime)
options = {}
for option in value[3:]:
key, value = option.split("=")
options[key] = value
return self.tuple_pinconfig(event, active, bouncetime, options)
def serialize(self, value, display=False):
if value is None:
return ""
value = f"{value.event},{value.active},{value.bouncetime}"
options = ",".join({f"{k}={v}" for k, v in value.options.items()})
value = f"{value.event},{value.active},{value.bouncetime},{options}"
return types.encode(value)

View File

@ -59,3 +59,21 @@ def test_pinconfig_invalid_bouncetime_raises_valueerror():
with pytest.raises(ValueError):
bcm1 = schema["bcm1"].deserialize("play_pause,active_low,tomato")
del bcm1
def test_pinconfig_additional_options():
ext = Extension()
schema = ext.get_config_schema()
bcm1 = schema["bcm1"].deserialize("volume_up,active_low,30,steps=1")
del bcm1
def test_pinconfig_serialize():
ext = Extension()
schema = ext.get_config_schema()
bcm1 = schema["bcm1"].deserialize("volume_up,active_low,30,steps=1")
assert schema["bcm1"].serialize(bcm1) == "volume_up,active_low,30,steps=1"

View File

@ -4,7 +4,6 @@ from unittest import mock
import pykka
from mopidy import core
import pytest
from mopidy_raspberry_gpio import Extension
from mopidy_raspberry_gpio import frontend as frontend_lib
from mopidy_raspberry_gpio import pinconfig
@ -58,7 +57,11 @@ def test_frontend_handler_dispatch_play_pause():
dummy_config, dummy_mopidy_core()
)
frontend.dispatch_input("play_pause")
ext = Extension()
schema = ext.get_config_schema()
settings = schema["bcm1"].deserialize("play_pause,active_low,30")
frontend.dispatch_input(settings)
stop_mopidy_core()
@ -71,7 +74,11 @@ def test_frontend_handler_dispatch_next():
dummy_config, dummy_mopidy_core()
)
frontend.dispatch_input("next")
ext = Extension()
schema = ext.get_config_schema()
settings = schema["bcm1"].deserialize("next,active_low,30")
frontend.dispatch_input(settings)
stop_mopidy_core()
@ -84,7 +91,11 @@ def test_frontend_handler_dispatch_prev():
dummy_config, dummy_mopidy_core()
)
frontend.dispatch_input("prev")
ext = Extension()
schema = ext.get_config_schema()
settings = schema["bcm1"].deserialize("prev,active_low,30")
frontend.dispatch_input(settings)
stop_mopidy_core()
@ -97,7 +108,11 @@ def test_frontend_handler_dispatch_volume_up():
dummy_config, dummy_mopidy_core()
)
frontend.dispatch_input("volume_up")
ext = Extension()
schema = ext.get_config_schema()
settings = schema["bcm1"].deserialize("volume_up,active_low,30")
frontend.dispatch_input(settings)
stop_mopidy_core()
@ -110,12 +125,16 @@ def test_frontend_handler_dispatch_volume_down():
dummy_config, dummy_mopidy_core()
)
frontend.dispatch_input("volume_down")
ext = Extension()
schema = ext.get_config_schema()
settings = schema["bcm1"].deserialize("volume_down,active_low,30")
frontend.dispatch_input(settings)
stop_mopidy_core()
def test_frontend_handler_dispatch_invalid_event():
def test_frontend_handler_dispatch_volume_up_custom_step():
sys.modules["RPi"] = mock.Mock()
sys.modules["RPi.GPIO"] = mock.Mock()
@ -123,8 +142,28 @@ def test_frontend_handler_dispatch_invalid_event():
dummy_config, dummy_mopidy_core()
)
with pytest.raises(RuntimeError):
frontend.dispatch_input("tomato")
ext = Extension()
schema = ext.get_config_schema()
settings = schema["bcm1"].deserialize("volume_up,active_low,30,step=1")
frontend.dispatch_input(settings)
stop_mopidy_core()
def test_frontend_handler_dispatch_volume_down_custom_step():
sys.modules["RPi"] = mock.Mock()
sys.modules["RPi.GPIO"] = mock.Mock()
frontend = frontend_lib.RaspberryGPIOFrontend(
dummy_config, dummy_mopidy_core()
)
ext = Extension()
schema = ext.get_config_schema()
settings = schema["bcm1"].deserialize("volume_down,active_low,30,step=1")
frontend.dispatch_input(settings)
stop_mopidy_core()