diff --git a/mopidy_raspberry_gpio/frontend.py b/mopidy_raspberry_gpio/frontend.py index c3dd98d..9e2a3db 100644 --- a/mopidy_raspberry_gpio/frontend.py +++ b/mopidy_raspberry_gpio/frontend.py @@ -1,82 +1,82 @@ -import logging - -import pykka -from mopidy import core - -logger = logging.getLogger(__name__) - - -class RaspberryGPIOFrontend(pykka.ThreadingActor, core.CoreListener): - def __init__(self, config, core): - super().__init__() - import RPi.GPIO as GPIO - - self.core = core - self.config = config["raspberry-gpio"] - self.pin_settings = {} - - GPIO.setwarnings(False) - GPIO.setmode(GPIO.BCM) - - # Iterate through any bcmN pins in the config - # and set them up as inputs with edge detection - for key in self.config: - if key.startswith("bcm"): - pin = int(key.replace("bcm", "")) - settings = self.config[key] - if settings is None: - continue - - pull = GPIO.PUD_UP - edge = GPIO.FALLING - if settings.active == "active_high": - pull = GPIO.PUD_DOWN - edge = GPIO.RISING - - GPIO.setup(pin, GPIO.IN, pull_up_down=pull) - - GPIO.add_event_detect( - pin, - edge, - callback=self.gpio_event, - bouncetime=settings.bouncetime, - ) - - self.pin_settings[pin] = settings - - def gpio_event(self, pin): - settings = self.pin_settings[pin] - self.dispatch_input(settings.event) - - def dispatch_input(self, event): - handler_name = f"handle_{event}" - try: - getattr(self, handler_name)() - except AttributeError: - raise RuntimeError( - f"Could not find input handler for event: {event}" - ) - - def handle_play_pause(self): - if self.core.playback.state.get() == core.PlaybackState.PLAYING: - self.core.playback.pause() - else: - self.core.playback.play() - - def handle_next(self): - self.core.playback.next() - - def handle_prev(self): - self.core.playback.previous() - - def handle_volume_up(self): - volume = self.core.playback.volume.get() - volume += 5 - volume = min(volume, 100) - self.core.playback.volume = volume - - def handle_volume_down(self): - volume = self.core.playback.volume.get() - volume -= 5 - volume = max(volume, 0) - self.core.playback.volume = volume +import logging + +import pykka +from mopidy import core + +logger = logging.getLogger(__name__) + + +class RaspberryGPIOFrontend(pykka.ThreadingActor, core.CoreListener): + def __init__(self, config, core): + super().__init__() + import RPi.GPIO as GPIO + + self.core = core + self.config = config["raspberry-gpio"] + self.pin_settings = {} + + GPIO.setwarnings(False) + GPIO.setmode(GPIO.BCM) + + # Iterate through any bcmN pins in the config + # and set them up as inputs with edge detection + for key in self.config: + if key.startswith("bcm"): + pin = int(key.replace("bcm", "")) + settings = self.config[key] + if settings is None: + continue + + pull = GPIO.PUD_UP + edge = GPIO.FALLING + if settings.active == "active_high": + pull = GPIO.PUD_DOWN + edge = GPIO.RISING + + GPIO.setup(pin, GPIO.IN, pull_up_down=pull) + + GPIO.add_event_detect( + pin, + edge, + callback=self.gpio_event, + bouncetime=settings.bouncetime, + ) + + self.pin_settings[pin] = settings + + def gpio_event(self, pin): + settings = self.pin_settings[pin] + self.dispatch_input(settings.event) + + def dispatch_input(self, event): + handler_name = f"handle_{event}" + try: + getattr(self, handler_name)() + except AttributeError: + raise RuntimeError( + f"Could not find input handler for event: {event}" + ) + + def handle_play_pause(self): + if self.core.playback.state.get() == core.PlaybackState.PLAYING: + self.core.playback.pause() + else: + self.core.playback.play() + + def handle_next(self): + self.core.playback.next() + + def handle_prev(self): + self.core.playback.previous() + + def handle_volume_up(self): + volume = self.core.playback.volume.get() + volume += 5 + volume = min(volume, 100) + self.core.playback.volume = volume + + def handle_volume_down(self): + volume = self.core.playback.volume.get() + volume -= 5 + volume = max(volume, 0) + self.core.playback.volume = volume diff --git a/mopidy_raspberry_gpio/pinconfig.py b/mopidy_raspberry_gpio/pinconfig.py index 100248d..8e5c93e 100644 --- a/mopidy_raspberry_gpio/pinconfig.py +++ b/mopidy_raspberry_gpio/pinconfig.py @@ -1,59 +1,59 @@ - -from collections import namedtuple - -from mopidy import config -from mopidy.config import types - - -class ValidList(list): - def __format__(self, format_string=None): - if format_string is None: - format_string = ", " - return format_string.join(self) - - -class PinConfig(config.ConfigValue): - tuple_pinconfig = namedtuple("PinConfig", ("event", "active", "bouncetime")) - - valid_events = ValidList(["play_pause", "prev", "next", "volume_up", "volume_down"]) - - valid_modes = ValidList(["active_low", "active_high"]) - - def __init__(self): - pass - - def deserialize(self, value): - if value is None: - return None - - value = types.decode(value).strip() - - try: - event, active, bouncetime = value.split(",") - except ValueError: - return None - - if event not in self.valid_events: - raise ValueError( - f"invalid event for pin config {event} (Must be {valid_events})" - ) - - if active not in self.valid_modes: - raise ValueError( - f"invalid event for pin config {active} (Must be one of {valid_modes})" - ) - - try: - bouncetime = int(bouncetime) - except ValueError: - raise ValueError( - f"invalid bouncetime value for pin config {bouncetime}" - ) - - return self.tuple_pinconfig(event, active, bouncetime) - - def serialize(self, value, display=False): - if value is None: - return "" - value = f"{value.event},{value.active},{value.bouncetime}" - return types.encode(value) + +from collections import namedtuple + +from mopidy import config +from mopidy.config import types + + +class ValidList(list): + def __format__(self, format_string=None): + if format_string is None: + format_string = ", " + return format_string.join(self) + + +class PinConfig(config.ConfigValue): + tuple_pinconfig = namedtuple("PinConfig", ("event", "active", "bouncetime")) + + valid_events = ValidList(["play_pause", "prev", "next", "volume_up", "volume_down"]) + + valid_modes = ValidList(["active_low", "active_high"]) + + def __init__(self): + pass + + def deserialize(self, value): + if value is None: + return None + + value = types.decode(value).strip() + + try: + event, active, bouncetime = value.split(",") + except ValueError: + return None + + if event not in self.valid_events: + raise ValueError( + f"invalid event for pin config {event} (Must be {valid_events})" + ) + + if active not in self.valid_modes: + raise ValueError( + f"invalid event for pin config {active} (Must be one of {valid_modes})" + ) + + try: + bouncetime = int(bouncetime) + except ValueError: + raise ValueError( + f"invalid bouncetime value for pin config {bouncetime}" + ) + + return self.tuple_pinconfig(event, active, bouncetime) + + def serialize(self, value, display=False): + if value is None: + return "" + value = f"{value.event},{value.active},{value.bouncetime}" + return types.encode(value) diff --git a/tests/test_frontend.py b/tests/test_frontend.py index ef9f65a..c2834ce 100644 --- a/tests/test_frontend.py +++ b/tests/test_frontend.py @@ -1,40 +1,40 @@ -import sys -from unittest import mock - -import pytest -from mopidy_raspberry_gpio import Extension -from mopidy_raspberry_gpio import frontend as frontend_lib -from mopidy_raspberry_gpio import pinconfig - -deserialize = pinconfig.PinConfig().deserialize - -dummy_config = { - "raspberry-gpio": { - # Plugins expect settings to be deserialized - "bcm1": deserialize("play_pause,active_low,30") - } -} - - -def test_get_frontend_classes(): - sys.modules["RPi"] = mock.Mock() - sys.modules["RPi.GPIO"] = mock.Mock() - - ext = Extension() - registry = mock.Mock() - - ext.setup(registry) - - registry.add.assert_called_once_with( - "frontend", frontend_lib.RaspberryGPIOFrontend - ) - - -def test_frontend_handler_dispatch(): - sys.modules["RPi"] = mock.Mock() - sys.modules["RPi.GPIO"] = mock.Mock() - - frontend = frontend_lib.RaspberryGPIOFrontend(dummy_config, mock.Mock()) - - with pytest.raises(RuntimeError): - frontend.dispatch_input("tomato") +import sys +from unittest import mock + +import pytest +from mopidy_raspberry_gpio import Extension +from mopidy_raspberry_gpio import frontend as frontend_lib +from mopidy_raspberry_gpio import pinconfig + +deserialize = pinconfig.PinConfig().deserialize + +dummy_config = { + "raspberry-gpio": { + # Plugins expect settings to be deserialized + "bcm1": deserialize("play_pause,active_low,30") + } +} + + +def test_get_frontend_classes(): + sys.modules["RPi"] = mock.Mock() + sys.modules["RPi.GPIO"] = mock.Mock() + + ext = Extension() + registry = mock.Mock() + + ext.setup(registry) + + registry.add.assert_called_once_with( + "frontend", frontend_lib.RaspberryGPIOFrontend + ) + + +def test_frontend_handler_dispatch(): + sys.modules["RPi"] = mock.Mock() + sys.modules["RPi.GPIO"] = mock.Mock() + + frontend = frontend_lib.RaspberryGPIOFrontend(dummy_config, mock.Mock()) + + with pytest.raises(RuntimeError): + frontend.dispatch_input("tomato")