dos2unix crlf -> lf
parent
eadc6248df
commit
68abdf508b
|
@ -1,82 +1,82 @@
|
|||
import logging
|
||||
|
||||
import pykka
|
||||
from mopidy import core
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class RaspberryGPIOFrontend(pykka.ThreadingActor, core.CoreListener):
|
||||
def __init__(self, config, core):
|
||||
super().__init__()
|
||||
import RPi.GPIO as GPIO
|
||||
|
||||
self.core = core
|
||||
self.config = config["raspberry-gpio"]
|
||||
self.pin_settings = {}
|
||||
|
||||
GPIO.setwarnings(False)
|
||||
GPIO.setmode(GPIO.BCM)
|
||||
|
||||
# Iterate through any bcmN pins in the config
|
||||
# and set them up as inputs with edge detection
|
||||
for key in self.config:
|
||||
if key.startswith("bcm"):
|
||||
pin = int(key.replace("bcm", ""))
|
||||
settings = self.config[key]
|
||||
if settings is None:
|
||||
continue
|
||||
|
||||
pull = GPIO.PUD_UP
|
||||
edge = GPIO.FALLING
|
||||
if settings.active == "active_high":
|
||||
pull = GPIO.PUD_DOWN
|
||||
edge = GPIO.RISING
|
||||
|
||||
GPIO.setup(pin, GPIO.IN, pull_up_down=pull)
|
||||
|
||||
GPIO.add_event_detect(
|
||||
pin,
|
||||
edge,
|
||||
callback=self.gpio_event,
|
||||
bouncetime=settings.bouncetime,
|
||||
)
|
||||
|
||||
self.pin_settings[pin] = settings
|
||||
|
||||
def gpio_event(self, pin):
|
||||
settings = self.pin_settings[pin]
|
||||
self.dispatch_input(settings.event)
|
||||
|
||||
def dispatch_input(self, event):
|
||||
handler_name = f"handle_{event}"
|
||||
try:
|
||||
getattr(self, handler_name)()
|
||||
except AttributeError:
|
||||
raise RuntimeError(
|
||||
f"Could not find input handler for event: {event}"
|
||||
)
|
||||
|
||||
def handle_play_pause(self):
|
||||
if self.core.playback.state.get() == core.PlaybackState.PLAYING:
|
||||
self.core.playback.pause()
|
||||
else:
|
||||
self.core.playback.play()
|
||||
|
||||
def handle_next(self):
|
||||
self.core.playback.next()
|
||||
|
||||
def handle_prev(self):
|
||||
self.core.playback.previous()
|
||||
|
||||
def handle_volume_up(self):
|
||||
volume = self.core.playback.volume.get()
|
||||
volume += 5
|
||||
volume = min(volume, 100)
|
||||
self.core.playback.volume = volume
|
||||
|
||||
def handle_volume_down(self):
|
||||
volume = self.core.playback.volume.get()
|
||||
volume -= 5
|
||||
volume = max(volume, 0)
|
||||
self.core.playback.volume = volume
|
||||
import logging
|
||||
|
||||
import pykka
|
||||
from mopidy import core
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class RaspberryGPIOFrontend(pykka.ThreadingActor, core.CoreListener):
|
||||
def __init__(self, config, core):
|
||||
super().__init__()
|
||||
import RPi.GPIO as GPIO
|
||||
|
||||
self.core = core
|
||||
self.config = config["raspberry-gpio"]
|
||||
self.pin_settings = {}
|
||||
|
||||
GPIO.setwarnings(False)
|
||||
GPIO.setmode(GPIO.BCM)
|
||||
|
||||
# Iterate through any bcmN pins in the config
|
||||
# and set them up as inputs with edge detection
|
||||
for key in self.config:
|
||||
if key.startswith("bcm"):
|
||||
pin = int(key.replace("bcm", ""))
|
||||
settings = self.config[key]
|
||||
if settings is None:
|
||||
continue
|
||||
|
||||
pull = GPIO.PUD_UP
|
||||
edge = GPIO.FALLING
|
||||
if settings.active == "active_high":
|
||||
pull = GPIO.PUD_DOWN
|
||||
edge = GPIO.RISING
|
||||
|
||||
GPIO.setup(pin, GPIO.IN, pull_up_down=pull)
|
||||
|
||||
GPIO.add_event_detect(
|
||||
pin,
|
||||
edge,
|
||||
callback=self.gpio_event,
|
||||
bouncetime=settings.bouncetime,
|
||||
)
|
||||
|
||||
self.pin_settings[pin] = settings
|
||||
|
||||
def gpio_event(self, pin):
|
||||
settings = self.pin_settings[pin]
|
||||
self.dispatch_input(settings.event)
|
||||
|
||||
def dispatch_input(self, event):
|
||||
handler_name = f"handle_{event}"
|
||||
try:
|
||||
getattr(self, handler_name)()
|
||||
except AttributeError:
|
||||
raise RuntimeError(
|
||||
f"Could not find input handler for event: {event}"
|
||||
)
|
||||
|
||||
def handle_play_pause(self):
|
||||
if self.core.playback.state.get() == core.PlaybackState.PLAYING:
|
||||
self.core.playback.pause()
|
||||
else:
|
||||
self.core.playback.play()
|
||||
|
||||
def handle_next(self):
|
||||
self.core.playback.next()
|
||||
|
||||
def handle_prev(self):
|
||||
self.core.playback.previous()
|
||||
|
||||
def handle_volume_up(self):
|
||||
volume = self.core.playback.volume.get()
|
||||
volume += 5
|
||||
volume = min(volume, 100)
|
||||
self.core.playback.volume = volume
|
||||
|
||||
def handle_volume_down(self):
|
||||
volume = self.core.playback.volume.get()
|
||||
volume -= 5
|
||||
volume = max(volume, 0)
|
||||
self.core.playback.volume = volume
|
||||
|
|
|
@ -1,59 +1,59 @@
|
|||
|
||||
from collections import namedtuple
|
||||
|
||||
from mopidy import config
|
||||
from mopidy.config import types
|
||||
|
||||
|
||||
class ValidList(list):
|
||||
def __format__(self, format_string=None):
|
||||
if format_string is None:
|
||||
format_string = ", "
|
||||
return format_string.join(self)
|
||||
|
||||
|
||||
class PinConfig(config.ConfigValue):
|
||||
tuple_pinconfig = namedtuple("PinConfig", ("event", "active", "bouncetime"))
|
||||
|
||||
valid_events = ValidList(["play_pause", "prev", "next", "volume_up", "volume_down"])
|
||||
|
||||
valid_modes = ValidList(["active_low", "active_high"])
|
||||
|
||||
def __init__(self):
|
||||
pass
|
||||
|
||||
def deserialize(self, value):
|
||||
if value is None:
|
||||
return None
|
||||
|
||||
value = types.decode(value).strip()
|
||||
|
||||
try:
|
||||
event, active, bouncetime = value.split(",")
|
||||
except ValueError:
|
||||
return None
|
||||
|
||||
if event not in self.valid_events:
|
||||
raise ValueError(
|
||||
f"invalid event for pin config {event} (Must be {valid_events})"
|
||||
)
|
||||
|
||||
if active not in self.valid_modes:
|
||||
raise ValueError(
|
||||
f"invalid event for pin config {active} (Must be one of {valid_modes})"
|
||||
)
|
||||
|
||||
try:
|
||||
bouncetime = int(bouncetime)
|
||||
except ValueError:
|
||||
raise ValueError(
|
||||
f"invalid bouncetime value for pin config {bouncetime}"
|
||||
)
|
||||
|
||||
return self.tuple_pinconfig(event, active, bouncetime)
|
||||
|
||||
def serialize(self, value, display=False):
|
||||
if value is None:
|
||||
return ""
|
||||
value = f"{value.event},{value.active},{value.bouncetime}"
|
||||
return types.encode(value)
|
||||
|
||||
from collections import namedtuple
|
||||
|
||||
from mopidy import config
|
||||
from mopidy.config import types
|
||||
|
||||
|
||||
class ValidList(list):
|
||||
def __format__(self, format_string=None):
|
||||
if format_string is None:
|
||||
format_string = ", "
|
||||
return format_string.join(self)
|
||||
|
||||
|
||||
class PinConfig(config.ConfigValue):
|
||||
tuple_pinconfig = namedtuple("PinConfig", ("event", "active", "bouncetime"))
|
||||
|
||||
valid_events = ValidList(["play_pause", "prev", "next", "volume_up", "volume_down"])
|
||||
|
||||
valid_modes = ValidList(["active_low", "active_high"])
|
||||
|
||||
def __init__(self):
|
||||
pass
|
||||
|
||||
def deserialize(self, value):
|
||||
if value is None:
|
||||
return None
|
||||
|
||||
value = types.decode(value).strip()
|
||||
|
||||
try:
|
||||
event, active, bouncetime = value.split(",")
|
||||
except ValueError:
|
||||
return None
|
||||
|
||||
if event not in self.valid_events:
|
||||
raise ValueError(
|
||||
f"invalid event for pin config {event} (Must be {valid_events})"
|
||||
)
|
||||
|
||||
if active not in self.valid_modes:
|
||||
raise ValueError(
|
||||
f"invalid event for pin config {active} (Must be one of {valid_modes})"
|
||||
)
|
||||
|
||||
try:
|
||||
bouncetime = int(bouncetime)
|
||||
except ValueError:
|
||||
raise ValueError(
|
||||
f"invalid bouncetime value for pin config {bouncetime}"
|
||||
)
|
||||
|
||||
return self.tuple_pinconfig(event, active, bouncetime)
|
||||
|
||||
def serialize(self, value, display=False):
|
||||
if value is None:
|
||||
return ""
|
||||
value = f"{value.event},{value.active},{value.bouncetime}"
|
||||
return types.encode(value)
|
||||
|
|
|
@ -1,40 +1,40 @@
|
|||
import sys
|
||||
from unittest import mock
|
||||
|
||||
import pytest
|
||||
from mopidy_raspberry_gpio import Extension
|
||||
from mopidy_raspberry_gpio import frontend as frontend_lib
|
||||
from mopidy_raspberry_gpio import pinconfig
|
||||
|
||||
deserialize = pinconfig.PinConfig().deserialize
|
||||
|
||||
dummy_config = {
|
||||
"raspberry-gpio": {
|
||||
# Plugins expect settings to be deserialized
|
||||
"bcm1": deserialize("play_pause,active_low,30")
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
def test_get_frontend_classes():
|
||||
sys.modules["RPi"] = mock.Mock()
|
||||
sys.modules["RPi.GPIO"] = mock.Mock()
|
||||
|
||||
ext = Extension()
|
||||
registry = mock.Mock()
|
||||
|
||||
ext.setup(registry)
|
||||
|
||||
registry.add.assert_called_once_with(
|
||||
"frontend", frontend_lib.RaspberryGPIOFrontend
|
||||
)
|
||||
|
||||
|
||||
def test_frontend_handler_dispatch():
|
||||
sys.modules["RPi"] = mock.Mock()
|
||||
sys.modules["RPi.GPIO"] = mock.Mock()
|
||||
|
||||
frontend = frontend_lib.RaspberryGPIOFrontend(dummy_config, mock.Mock())
|
||||
|
||||
with pytest.raises(RuntimeError):
|
||||
frontend.dispatch_input("tomato")
|
||||
import sys
|
||||
from unittest import mock
|
||||
|
||||
import pytest
|
||||
from mopidy_raspberry_gpio import Extension
|
||||
from mopidy_raspberry_gpio import frontend as frontend_lib
|
||||
from mopidy_raspberry_gpio import pinconfig
|
||||
|
||||
deserialize = pinconfig.PinConfig().deserialize
|
||||
|
||||
dummy_config = {
|
||||
"raspberry-gpio": {
|
||||
# Plugins expect settings to be deserialized
|
||||
"bcm1": deserialize("play_pause,active_low,30")
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
def test_get_frontend_classes():
|
||||
sys.modules["RPi"] = mock.Mock()
|
||||
sys.modules["RPi.GPIO"] = mock.Mock()
|
||||
|
||||
ext = Extension()
|
||||
registry = mock.Mock()
|
||||
|
||||
ext.setup(registry)
|
||||
|
||||
registry.add.assert_called_once_with(
|
||||
"frontend", frontend_lib.RaspberryGPIOFrontend
|
||||
)
|
||||
|
||||
|
||||
def test_frontend_handler_dispatch():
|
||||
sys.modules["RPi"] = mock.Mock()
|
||||
sys.modules["RPi.GPIO"] = mock.Mock()
|
||||
|
||||
frontend = frontend_lib.RaspberryGPIOFrontend(dummy_config, mock.Mock())
|
||||
|
||||
with pytest.raises(RuntimeError):
|
||||
frontend.dispatch_input("tomato")
|
||||
|
|
Loading…
Reference in New Issue