Make magic happen
parent
2aa3e5b28b
commit
cfdf07e3b6
|
@ -5,10 +5,11 @@ import os
|
|||
|
||||
from mopidy import config, ext
|
||||
|
||||
from .pinconfig import PinConfig
|
||||
|
||||
__version__ = "0.0.1"
|
||||
|
||||
# TODO: If you need to log, use loggers named after the current Python module
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
|
@ -24,14 +25,10 @@ class Extension(ext.Extension):
|
|||
|
||||
def get_config_schema(self):
|
||||
schema = super(Extension, self).get_config_schema()
|
||||
# TODO: Comment in and edit, or remove entirely
|
||||
#schema["username"] = config.String()
|
||||
#schema["password"] = config.Secret()
|
||||
for pin in range(28):
|
||||
schema["bcm{:d}".format(pin)] = PinConfig()
|
||||
return schema
|
||||
|
||||
def setup(self, registry):
|
||||
# You will typically only implement one of the following things
|
||||
# in a single extension.
|
||||
|
||||
from .frontend import RaspberryGPIO
|
||||
registry.add("frontend", RaspberryGPIO)
|
||||
from .frontend import RaspberryGPIOFrontend
|
||||
registry.add("frontend", RaspberryGPIOFrontend)
|
||||
|
|
|
@ -1,17 +1,83 @@
|
|||
from __future__ import unicode_literals
|
||||
|
||||
import logging
|
||||
import os
|
||||
|
||||
from mopidy import core
|
||||
|
||||
|
||||
import pykka
|
||||
|
||||
from.pinconfig import PinConfig
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class RaspberryGPIO(pykka.ThreadingActor, core.CoreListener):
|
||||
class RaspberryGPIOFrontend(pykka.ThreadingActor, core.CoreListener):
|
||||
def __init__(self, config, core):
|
||||
pass
|
||||
import RPi.GPIO as GPIO
|
||||
self.core = core
|
||||
self.config = config["raspberry-gpio"]
|
||||
self.pin_settings = {}
|
||||
|
||||
GPIO.setwarnings(False)
|
||||
GPIO.setmode(GPIO.BCM)
|
||||
|
||||
# Iterate through any bcmN pins in the config
|
||||
# and set them up as inputs with edge detection
|
||||
for key in self.config:
|
||||
if key.startswith("bcm"):
|
||||
pin = int(key.replace("bcm", ""))
|
||||
self.pin_settings[pin] = PinConfig().deserialize(
|
||||
self.config[key]
|
||||
)
|
||||
|
||||
pull = GPIO.PUD_UP
|
||||
edge = GPIO.FALLING
|
||||
if self.pin_settings[pin].active == 'active_high':
|
||||
pull = GPIO.PUD_DOWN
|
||||
edge = GPIO.RISING
|
||||
|
||||
GPIO.setup(
|
||||
pin,
|
||||
GPIO.IN,
|
||||
pull_up_down=pull)
|
||||
|
||||
GPIO.add_event_detect(
|
||||
pin,
|
||||
edge,
|
||||
callback=self.gpio_event,
|
||||
bouncetime=self.pin_settings[pin].bouncetime)
|
||||
|
||||
def gpio_event(self, pin):
|
||||
settings = self.pin_settings[pin]
|
||||
self.dispatch_input(settings.event)
|
||||
|
||||
def dispatch_input(self, event):
|
||||
handler_name = "handle_{}".format(event)
|
||||
try:
|
||||
getattr(self, handler_name)(self)
|
||||
except AttributeError:
|
||||
raise RuntimeError(
|
||||
"Could not find input handler for event: {}".format(event)
|
||||
)
|
||||
|
||||
def handle_play_pause(self):
|
||||
if self.core.playback.state.get() == core.PlaybackState.PLAYING:
|
||||
self.core.playback.pause()
|
||||
else:
|
||||
self.core.playback.play()
|
||||
|
||||
def handle_next(self):
|
||||
self.core.playback.next()
|
||||
|
||||
def handle_prev(self):
|
||||
self.core.playback.previous()
|
||||
|
||||
def handle_volume_up(self):
|
||||
volume = self.core.playback.volume.get()
|
||||
volume += 10
|
||||
self.core.playback.volume = volume
|
||||
|
||||
def handle_volume_down(self):
|
||||
volume = self.core.playback.volume.get()
|
||||
volume -= 10
|
||||
self.core.playback.volume = volume
|
||||
|
|
|
@ -0,0 +1,48 @@
|
|||
from collections import namedtuple
|
||||
|
||||
from mopidy import config
|
||||
|
||||
|
||||
class PinConfig(config.String):
|
||||
tuple_pinconfig = namedtuple("PinConfig",
|
||||
("event", "active", "bouncetime"))
|
||||
|
||||
valid_events = "play_pause", "prev", "next", "volume_up", "volume_down"
|
||||
|
||||
valid_modes = "active_low", "active_high"
|
||||
|
||||
def __init__(self):
|
||||
config.String.__init__(self, optional=True)
|
||||
|
||||
def deserialize(self, value):
|
||||
value = config.String.deserialize(self, value)
|
||||
event, active, bouncetime = value.split(',')
|
||||
|
||||
if event not in self.valid_events:
|
||||
raise ValueError(
|
||||
"invalid event for pin config {:s} (Must be {})".format(
|
||||
event, ", ".join(self.valid_events)
|
||||
)
|
||||
)
|
||||
|
||||
if active not in self.valid_modes:
|
||||
raise ValueError(
|
||||
"invalid mode for pin config {:s} (Must be {})".format(
|
||||
event, ", ".join(self.valid_events)
|
||||
)
|
||||
)
|
||||
|
||||
try:
|
||||
bouncetime = int(bouncetime)
|
||||
except ValueError:
|
||||
raise ValueError(
|
||||
"invalid bouncetime value for pin config {}".format(bouncetime)
|
||||
)
|
||||
|
||||
return self.tuple_pinconfig(event, active, bouncetime)
|
||||
|
||||
def serialize(self, value, display=False):
|
||||
value = "{:s},{:s},{:d}".format(
|
||||
value.event, value.active, value.bouncetime)
|
||||
value = config.String.serialize(self, value, display)
|
||||
return value
|
|
@ -0,0 +1,64 @@
|
|||
from __future__ import unicode_literals
|
||||
|
||||
import pytest
|
||||
|
||||
from mopidy_raspberry_gpio import Extension, PinConfig
|
||||
|
||||
|
||||
def test_get_default_config():
|
||||
ext = Extension()
|
||||
|
||||
config = ext.get_default_config()
|
||||
|
||||
assert "[raspberry-gpio]" in config
|
||||
assert "enabled = true" in config
|
||||
|
||||
|
||||
def test_get_config_schema():
|
||||
ext = Extension()
|
||||
|
||||
schema = ext.get_config_schema()
|
||||
|
||||
# Test the content of config schema
|
||||
assert "bcm0" in schema
|
||||
assert "bcm27" in schema
|
||||
|
||||
|
||||
def test_pinconfig():
|
||||
ext = Extension()
|
||||
|
||||
schema = ext.get_config_schema()
|
||||
bcm0 = schema["bcm0"].deserialize("play_pause,active_low,30")
|
||||
|
||||
assert type(bcm0) == PinConfig.tuple_pinconfig
|
||||
assert type(bcm0.bouncetime) == int
|
||||
|
||||
|
||||
def test_pinconfig_invalid_event_raises_valueerror():
|
||||
ext = Extension()
|
||||
|
||||
schema = ext.get_config_schema()
|
||||
|
||||
with pytest.raises(ValueError):
|
||||
bcm1 = schema["bcm1"].deserialize("tomato,active_low,30")
|
||||
del bcm1
|
||||
|
||||
|
||||
def test_pinconfig_invalid_mode_raises_valueerror():
|
||||
ext = Extension()
|
||||
|
||||
schema = ext.get_config_schema()
|
||||
|
||||
with pytest.raises(ValueError):
|
||||
bcm1 = schema["bcm1"].deserialize("play_pause,tomato,30")
|
||||
del bcm1
|
||||
|
||||
|
||||
def test_pinconfig_invalid_bouncetime_raises_valueerror():
|
||||
ext = Extension()
|
||||
|
||||
schema = ext.get_config_schema()
|
||||
|
||||
with pytest.raises(ValueError):
|
||||
bcm1 = schema["bcm1"].deserialize("play_pause,active_low,tomato")
|
||||
del bcm1
|
|
@ -1,25 +0,0 @@
|
|||
from __future__ import unicode_literals
|
||||
|
||||
from mopidy_raspberry_gpio import Extension, frontend as frontend_lib
|
||||
|
||||
|
||||
def test_get_default_config():
|
||||
ext = Extension()
|
||||
|
||||
config = ext.get_default_config()
|
||||
|
||||
assert "[raspberry-gpio]" in config
|
||||
assert "enabled = true" in config
|
||||
|
||||
|
||||
def test_get_config_schema():
|
||||
ext = Extension()
|
||||
|
||||
schema = ext.get_config_schema()
|
||||
|
||||
# TODO Test the content of your config schema
|
||||
#assert "username" in schema
|
||||
#assert "password" in schema
|
||||
|
||||
|
||||
# TODO Write more tests
|
|
@ -0,0 +1,39 @@
|
|||
from __future__ import unicode_literals
|
||||
|
||||
import sys
|
||||
|
||||
import mock
|
||||
|
||||
import pytest
|
||||
|
||||
from mopidy_raspberry_gpio import Extension, frontend as frontend_lib
|
||||
|
||||
|
||||
dummy_config = {
|
||||
"raspberry-gpio": {
|
||||
"bcm1": "play_pause,active_low,30"
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
def test_get_frontend_classes():
|
||||
sys.modules['RPi'] = mock.Mock()
|
||||
sys.modules['RPi.GPIO'] = mock.Mock()
|
||||
|
||||
ext = Extension()
|
||||
registry = mock.Mock()
|
||||
|
||||
ext.setup(registry)
|
||||
|
||||
registry.add.assert_called_once_with(
|
||||
'frontend', frontend_lib.RaspberryGPIOFrontend)
|
||||
|
||||
|
||||
def test_frontend_handler_dispatch():
|
||||
sys.modules['RPi'] = mock.Mock()
|
||||
sys.modules['RPi.GPIO'] = mock.Mock()
|
||||
|
||||
frontend = frontend_lib.RaspberryGPIOFrontend(dummy_config, mock.Mock())
|
||||
|
||||
with pytest.raises(RuntimeError):
|
||||
frontend.dispatch_input('tomato')
|
4
tox.ini
4
tox.ini
|
@ -10,6 +10,7 @@ deps =
|
|||
pytest-xdist
|
||||
commands =
|
||||
pytest \
|
||||
-v -r wsx \
|
||||
--basetemp={envtmpdir} \
|
||||
--cov=mopidy_raspberry_gpio --cov-report=term-missing \
|
||||
{posargs}
|
||||
|
@ -23,4 +24,5 @@ commands = python -m flake8
|
|||
|
||||
[testenv:check-manifest]
|
||||
deps = check-manifest
|
||||
commands = check-manifest
|
||||
skip_install = true
|
||||
commands = python -m check-manifest
|
||||
|
|
Loading…
Reference in New Issue