Merge pull request #3 from pimoroni/python3

Port to python3 for next major Mopidy release
remotes/origin/extra-event-options
Philip Howard 2020-01-30 10:15:33 +00:00 committed by GitHub
commit bcceb6001f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
17 changed files with 802 additions and 310 deletions

52
.circleci/config.yml Normal file
View File

@ -0,0 +1,52 @@
version: 2.1
orbs:
codecov: codecov/codecov@1.0.5
workflows:
version: 2
test:
jobs:
- py38
- py37
- black
- check-manifest
- flake8
jobs:
py38: &test-template
docker:
- image: mopidy/ci-python:3.8
steps:
- checkout
- restore_cache:
name: Restoring tox cache
key: tox-v1-{{ .Environment.CIRCLE_JOB }}-{{ checksum "setup.cfg" }}
- run:
name: Run tests
command: |
tox -e $CIRCLE_JOB -- \
--junit-xml=test-results/pytest/results.xml \
--cov-report=xml
- save_cache:
name: Saving tox cache
key: tox-v1-{{ .Environment.CIRCLE_JOB }}-{{ checksum "setup.cfg" }}
paths:
- ./.tox
- ~/.cache/pip
- codecov/upload:
file: coverage.xml
- store_test_results:
path: test-results
py37:
<<: *test-template
docker:
- image: mopidy/ci-python:3.7
black: *test-template
check-manifest: *test-template
flake8: *test-template

16
.gitignore vendored
View File

@ -1,9 +1,9 @@
*.egg-info
*.pyc
.coverage
.pytest_cache/
.tox/
MANIFEST
build/
dist/
.vscode/
/.coverage
/.mypy_cache/
/.pytest_cache/
/.tox/
/*.egg-info
/build/
/dist/
/MANIFEST

View File

@ -1,28 +0,0 @@
language: python
python:
- "2.7"
virtualenv:
system_site_packages: true
addons:
apt:
sources:
- mopidy-stable
packages:
- mopidy
env:
- TOX_ENV=py27
- TOX_ENV=flake8
- TOX_ENV=check-manifest
install:
- pip install tox
script:
- tox -e $TOX_ENV
after_success:
- if [ $TOX_ENV == 'py27' ]; then pip install coveralls; coveralls; fi

View File

@ -1,9 +1,15 @@
include .travis.yml
include CHANGELOG.rst
include *.py
include *.rst
include .mailmap
include LICENSE
include MANIFEST.in
include README.rst
include mopidy_raspberry_gpio/ext.conf
include pyproject.toml
include tox.ini
recursive-include .circleci *
recursive-include .github *
include mopidy_*/ext.conf
recursive-include tests *.py
recursive-include tests/data *

View File

@ -2,17 +2,17 @@
Mopidy-Raspberry-GPIO
****************************
.. image:: https://img.shields.io/pypi/v/Mopidy-Raspberry-GPIO.svg?style=flat
.. image:: https://img.shields.io/pypi/v/Mopidy-Raspberry-GPIO.svg
:target: https://pypi.org/project/Mopidy-Raspberry-GPIO/
:alt: Latest PyPI version
.. image:: https://img.shields.io/travis/pimoroni/mopidy-raspberry-gpio/master.svg?style=flat
:target: https://travis-ci.org/pimoroni/mopidy-raspberry-gpio
:alt: Travis CI build status
.. image:: https://img.shields.io/circleci/build/gh/pimoroni/mopidy-raspberry-gpio
:target: https://circleci.com/gh/pimoroni/mopidy-raspberry-gpio
:alt: CircleCI build status
.. image:: https://img.shields.io/coveralls/pimoroni/mopidy-raspberry-gpio/master.svg?style=flat
:target: https://coveralls.io/r/pimoroni/mopidy-raspberry-gpio
:alt: Test coverage
.. image:: https://img.shields.io/codecov/c/gh/pimoroni/mopidy-raspberry-gpio
:target: https://codecov.io/gh/pimoroni/mopidy-raspberry-gpio
:alt: Test coverage
Mopidy extension for GPIO input on a Raspberry Pi
@ -22,7 +22,7 @@ Installation
Install by running::
pip install Mopidy-Raspberry-GPIO
python3 -m pip install Mopidy-Raspberry-GPIO
Or, if available, install the Debian/Ubuntu package from `apt.mopidy.com
<https://apt.mopidy.com/>`_.

View File

@ -1,7 +1,5 @@
from __future__ import unicode_literals
import logging
import os
import pathlib
from mopidy import config, ext
@ -20,15 +18,15 @@ class Extension(ext.Extension):
version = __version__
def get_default_config(self):
conf_file = os.path.join(os.path.dirname(__file__), "ext.conf")
return config.read(conf_file)
return config.read(pathlib.Path(__file__).parent / "ext.conf")
def get_config_schema(self):
schema = super(Extension, self).get_config_schema()
schema = super().get_config_schema()
for pin in range(28):
schema["bcm{:d}".format(pin)] = PinConfig()
schema[f"bcm{pin:d}"] = PinConfig()
return schema
def setup(self, registry):
from .frontend import RaspberryGPIOFrontend
registry.add("frontend", RaspberryGPIOFrontend)

View File

@ -1,87 +1,82 @@
from __future__ import unicode_literals
import logging
from mopidy import core
import pykka
logger = logging.getLogger(__name__)
class RaspberryGPIOFrontend(pykka.ThreadingActor, core.CoreListener):
def __init__(self, config, core):
super(RaspberryGPIOFrontend, self).__init__()
import RPi.GPIO as GPIO
self.core = core
self.config = config["raspberry-gpio"]
self.pin_settings = {}
GPIO.setwarnings(False)
GPIO.setmode(GPIO.BCM)
# Iterate through any bcmN pins in the config
# and set them up as inputs with edge detection
for key in self.config:
if key.startswith("bcm"):
pin = int(key.replace("bcm", ""))
settings = self.config[key]
if settings is None:
continue
pull = GPIO.PUD_UP
edge = GPIO.FALLING
if settings.active == 'active_high':
pull = GPIO.PUD_DOWN
edge = GPIO.RISING
GPIO.setup(
pin,
GPIO.IN,
pull_up_down=pull)
GPIO.add_event_detect(
pin,
edge,
callback=self.gpio_event,
bouncetime=settings.bouncetime)
self.pin_settings[pin] = settings
def gpio_event(self, pin):
settings = self.pin_settings[pin]
self.dispatch_input(settings.event)
def dispatch_input(self, event):
handler_name = "handle_{}".format(event)
try:
getattr(self, handler_name)()
except AttributeError:
raise RuntimeError(
"Could not find input handler for event: {}".format(event)
)
def handle_play_pause(self):
if self.core.playback.state.get() == core.PlaybackState.PLAYING:
self.core.playback.pause()
else:
self.core.playback.play()
def handle_next(self):
self.core.playback.next()
def handle_prev(self):
self.core.playback.previous()
def handle_volume_up(self):
volume = self.core.playback.volume.get()
volume += 5
volume = min(volume, 100)
self.core.playback.volume = volume
def handle_volume_down(self):
volume = self.core.playback.volume.get()
volume -= 5
volume = max(volume, 0)
self.core.playback.volume = volume
import logging
import pykka
from mopidy import core
logger = logging.getLogger(__name__)
class RaspberryGPIOFrontend(pykka.ThreadingActor, core.CoreListener):
def __init__(self, config, core):
super().__init__()
import RPi.GPIO as GPIO
self.core = core
self.config = config["raspberry-gpio"]
self.pin_settings = {}
GPIO.setwarnings(False)
GPIO.setmode(GPIO.BCM)
# Iterate through any bcmN pins in the config
# and set them up as inputs with edge detection
for key in self.config:
if key.startswith("bcm"):
pin = int(key.replace("bcm", ""))
settings = self.config[key]
if settings is None:
continue
pull = GPIO.PUD_UP
edge = GPIO.FALLING
if settings.active == "active_high":
pull = GPIO.PUD_DOWN
edge = GPIO.RISING
GPIO.setup(pin, GPIO.IN, pull_up_down=pull)
GPIO.add_event_detect(
pin,
edge,
callback=self.gpio_event,
bouncetime=settings.bouncetime,
)
self.pin_settings[pin] = settings
def gpio_event(self, pin):
settings = self.pin_settings[pin]
self.dispatch_input(settings.event)
def dispatch_input(self, event):
handler_name = f"handle_{event}"
try:
getattr(self, handler_name)()
except AttributeError:
raise RuntimeError(
f"Could not find input handler for event: {event}"
)
def handle_play_pause(self):
if self.core.playback.get_state().get() == core.PlaybackState.PLAYING:
self.core.playback.pause()
else:
self.core.playback.play()
def handle_next(self):
self.core.playback.next()
def handle_prev(self):
self.core.playback.previous()
def handle_volume_up(self):
volume = self.core.mixer.get_volume().get()
volume += 5
volume = min(volume, 100)
self.core.mixer.set_volume(volume)
def handle_volume_down(self):
volume = self.core.mixer.get_volume().get()
volume -= 5
volume = max(volume, 0)
self.core.mixer.set_volume(volume)

View File

@ -1,56 +1,60 @@
from collections import namedtuple
from mopidy import config
class PinConfig(config.ConfigValue):
tuple_pinconfig = namedtuple("PinConfig",
("event", "active", "bouncetime"))
valid_events = "play_pause", "prev", "next", "volume_up", "volume_down"
valid_modes = "active_low", "active_high"
def __init__(self):
pass
def deserialize(self, value):
if value is None:
return None
value = config.decode(value).strip()
try:
event, active, bouncetime = value.split(',')
except ValueError:
return None
if event not in self.valid_events:
raise ValueError(
"invalid event for pin config {:s} (Must be {})".format(
event, ", ".join(self.valid_events)
)
)
if active not in self.valid_modes:
raise ValueError(
"invalid mode for pin config {:s} (Must be {})".format(
event, ", ".join(self.valid_events)
)
)
try:
bouncetime = int(bouncetime)
except ValueError:
raise ValueError(
"invalid bouncetime value for pin config {}".format(bouncetime)
)
return self.tuple_pinconfig(event, active, bouncetime)
def serialize(self, value, display=False):
if value is None:
return ""
value = "{:s},{:s},{:d}".format(
value.event, value.active, value.bouncetime)
return config.encode(value)
from collections import namedtuple
from mopidy import config
from mopidy.config import types
class ValidList(list):
def __format__(self, format_string=None):
if format_string is None:
format_string = ", "
return format_string.join(self)
class PinConfig(config.ConfigValue):
tuple_pinconfig = namedtuple("PinConfig", ("event", "active", "bouncetime"))
valid_events = ValidList(
["play_pause", "prev", "next", "volume_up", "volume_down"]
)
valid_modes = ValidList(["active_low", "active_high"])
def __init__(self):
pass
def deserialize(self, value):
if value is None:
return None
value = types.decode(value).strip()
try:
event, active, bouncetime = value.split(",")
except ValueError:
return None
if event not in self.valid_events:
raise ValueError(
f"invalid event for pin config {event} (Must be {self.valid_events})"
)
if active not in self.valid_modes:
raise ValueError(
f"invalid event for pin config {active} (Must be one of {self.valid_modes})"
)
try:
bouncetime = int(bouncetime)
except ValueError:
raise ValueError(
f"invalid bouncetime value for pin config {bouncetime}"
)
return self.tuple_pinconfig(event, active, bouncetime)
def serialize(self, value, display=False):
if value is None:
return ""
value = f"{value.event},{value.active},{value.bouncetime}"
return types.encode(value)

17
pyproject.toml Normal file
View File

@ -0,0 +1,17 @@
[build-system]
requires = ["setuptools >= 30.3.0", "wheel"]
[tool.black]
target-version = ["py37", "py38"]
line-length = 80
[tool.isort]
multi_line_output = 3
include_trailing_comma = true
force_grid_wrap = 0
use_parentheses = true
line_length = 88
known_tests = "tests"
sections = "FUTURE,STDLIB,THIRDPARTY,FIRSTPARTY,TESTS,LOCALFOLDER"

View File

@ -1,6 +1,85 @@
[flake8]
application-import-names = mopidy_raspberry_gpio,tests
exclude = .git,.tox
[metadata]
name = mopidy-raspberry-gpio
version = 0.0.2
url = https://github.com/pimoroni/mopidy-raspberry-gpio
author = Phil Howard
author_email = phil@pimoroni.com
license = Apache License, Version 2.0
license_file = LICENSE
description = Mopidy extension for GPIO input on a Raspberry Pi
long_description = file: README.rst
classifiers =
Environment :: No Input/Output (Daemon)
Intended Audience :: End Users/Desktop
License :: OSI Approved :: Apache Software License
Operating System :: OS Independent
Programming Language :: Python :: 3
Programming Language :: Python :: 3.7
Programming Language :: Python :: 3.8
Topic :: Multimedia :: Sound/Audio :: Players
[wheel]
universal = 1
[options]
zip_safe = False
include_package_data = True
packages = find:
python_requires = >= 3.7
install_requires =
Mopidy >= 3.0.0a4 # Change to >= 3.0 once final is released
Pykka >= 2.0.1
setuptools
[options.extras_require]
lint =
black
check-manifest
flake8
flake8-bugbear
flake8-import-order
isort[pyproject]
release =
twine
wheel
test =
pytest
pytest-cov
dev =
%(lint)s
%(release)s
%(test)s
[options.packages.find]
exclude =
tests
tests.*
[options.entry_points]
mopidy.ext =
raspberry-gpio = mopidy_raspberry_gpio:Extension
[flake8]
application-import-names = mopidy_raspberry_gpio, tests
max-line-length = 80
exclude = .git, .tox, build
select =
# Regular flake8 rules
C, E, F, W
# flake8-bugbear rules
B
# B950: line too long (soft speed limit)
B950
# pep8-naming rules
N
ignore =
# E203: whitespace before ':' (not PEP8 compliant)
E203
# E501: line too long (replaced by B950)
E501
# W503: line break before binary operator (not PEP8 compliant)
W503
# B305: .next() is not a thing on Python 3 (used by playback controller)
B305

View File

@ -1,45 +1,3 @@
from __future__ import unicode_literals
from setuptools import setup
import re
from setuptools import find_packages, setup
def get_version(filename):
with open(filename) as fh:
metadata = dict(re.findall('__([a-z]+)__ = "([^"]+)"', fh.read()))
return metadata["version"]
setup(
name="Mopidy-Raspberry-GPIO",
version=get_version("mopidy_raspberry_gpio/__init__.py"),
url="https://github.com/pimoroni/mopidy-raspberry-gpio",
license="Apache License, Version 2.0",
author="Phil Howard",
author_email="phil@pimoroni.com",
description="Mopidy extension for GPIO input on a Raspberry Pi",
long_description=open("README.rst").read(),
packages=find_packages(exclude=["tests", "tests.*"]),
zip_safe=False,
include_package_data=True,
python_requires="> 2.7, < 3",
install_requires=[
"setuptools",
"Mopidy >= 2.2",
"Pykka >= 2.0",
],
entry_points={
"mopidy.ext": [
"raspberry-gpio = mopidy_raspberry_gpio:Extension",
]
},
classifiers=[
"Environment :: No Input/Output (Daemon)",
"Intended Audience :: End Users/Desktop",
"License :: OSI Approved :: Apache Software License",
"Operating System :: OS Independent",
"Programming Language :: Python :: 2.7",
"Topic :: Multimedia :: Sound/Audio :: Players",
],
)
setup()

137
tests/dummy_audio.py Normal file
View File

@ -0,0 +1,137 @@
"""A dummy audio actor for use in tests.
This class implements the audio API in the simplest way possible. It is used in
tests of the core and backends.
"""
import pykka
from mopidy import audio
def create_proxy(config=None, mixer=None):
return DummyAudio.start(config, mixer).proxy()
# TODO: reset position on track change?
class DummyAudio(pykka.ThreadingActor):
def __init__(self, config=None, mixer=None):
super().__init__()
self.state = audio.PlaybackState.STOPPED
self._volume = 0
self._position = 0
self._callback = None
self._uri = None
self._stream_changed = False
self._live_stream = False
self._tags = {}
self._bad_uris = set()
def set_uri(self, uri, live_stream=False):
assert self._uri is None, "prepare change not called before set"
self._position = 0
self._uri = uri
self._stream_changed = True
self._live_stream = live_stream
self._tags = {}
def set_appsrc(self, *args, **kwargs):
pass
def emit_data(self, buffer_):
pass
def get_position(self):
return self._position
def set_position(self, position):
self._position = position
audio.AudioListener.send("position_changed", position=position)
return True
def start_playback(self):
return self._change_state(audio.PlaybackState.PLAYING)
def pause_playback(self):
return self._change_state(audio.PlaybackState.PAUSED)
def prepare_change(self):
self._uri = None
return True
def stop_playback(self):
return self._change_state(audio.PlaybackState.STOPPED)
def get_volume(self):
return self._volume
def set_volume(self, volume):
self._volume = volume
return True
def set_metadata(self, track):
pass
def get_current_tags(self):
return self._tags
def set_about_to_finish_callback(self, callback):
self._callback = callback
def enable_sync_handler(self):
pass
def wait_for_state_change(self):
pass
def _change_state(self, new_state):
if not self._uri:
return False
if new_state == audio.PlaybackState.STOPPED and self._uri:
self._stream_changed = True
self._uri = None
if self._uri is not None:
audio.AudioListener.send("position_changed", position=0)
if self._stream_changed:
self._stream_changed = False
audio.AudioListener.send("stream_changed", uri=self._uri)
old_state, self.state = self.state, new_state
audio.AudioListener.send(
"state_changed",
old_state=old_state,
new_state=new_state,
target_state=None,
)
if new_state == audio.PlaybackState.PLAYING:
self._tags["audio-codec"] = ["fake info..."]
audio.AudioListener.send("tags_changed", tags=["audio-codec"])
return self._uri not in self._bad_uris
def trigger_fake_playback_failure(self, uri):
self._bad_uris.add(uri)
def trigger_fake_tags_changed(self, tags):
self._tags.update(tags)
audio.AudioListener.send("tags_changed", tags=self._tags.keys())
def get_about_to_finish_callback(self):
# This needs to be called from outside the actor or we lock up.
def wrapper():
if self._callback:
self.prepare_change()
self._callback()
if not self._uri or not self._callback:
self._tags = {}
audio.AudioListener.send("reached_end_of_stream")
else:
audio.AudioListener.send("position_changed", position=0)
audio.AudioListener.send("stream_changed", uri=self._uri)
return wrapper

152
tests/dummy_backend.py Normal file
View File

@ -0,0 +1,152 @@
"""A dummy backend for use in tests.
This backend implements the backend API in the simplest way possible. It is
used in tests of the frontends.
"""
import pykka
from mopidy import backend
from mopidy.models import Playlist, Ref, SearchResult
def create_proxy(config=None, audio=None):
return DummyBackend.start(config=config, audio=audio).proxy()
class DummyBackend(pykka.ThreadingActor, backend.Backend):
def __init__(self, config, audio):
super().__init__()
self.library = DummyLibraryProvider(backend=self)
if audio:
self.playback = backend.PlaybackProvider(audio=audio, backend=self)
else:
self.playback = DummyPlaybackProvider(audio=audio, backend=self)
self.playlists = DummyPlaylistsProvider(backend=self)
self.uri_schemes = ["dummy"]
class DummyLibraryProvider(backend.LibraryProvider):
root_directory = Ref.directory(uri="dummy:/", name="dummy")
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.dummy_library = []
self.dummy_get_distinct_result = {}
self.dummy_browse_result = {}
self.dummy_find_exact_result = SearchResult()
self.dummy_search_result = SearchResult()
def browse(self, path):
return self.dummy_browse_result.get(path, [])
def get_distinct(self, field, query=None):
return self.dummy_get_distinct_result.get(field, set())
def lookup(self, uri):
uri = Ref.track(uri=uri).uri
return [t for t in self.dummy_library if uri == t.uri]
def refresh(self, uri=None):
pass
def search(self, query=None, uris=None, exact=False):
if exact: # TODO: remove uses of dummy_find_exact_result
return self.dummy_find_exact_result
return self.dummy_search_result
class DummyPlaybackProvider(backend.PlaybackProvider):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._uri = None
self._time_position = 0
def pause(self):
return True
def play(self):
return self._uri and self._uri != "dummy:error"
def change_track(self, track):
"""Pass a track with URI 'dummy:error' to force failure"""
self._uri = track.uri
self._time_position = 0
return True
def prepare_change(self):
pass
def resume(self):
return True
def seek(self, time_position):
self._time_position = time_position
return True
def stop(self):
self._uri = None
return True
def get_time_position(self):
return self._time_position
class DummyPlaylistsProvider(backend.PlaylistsProvider):
def __init__(self, backend):
super().__init__(backend)
self._playlists = []
self._allow_save = True
def set_dummy_playlists(self, playlists):
"""For tests using the dummy provider through an actor proxy."""
self._playlists = playlists
def set_allow_save(self, enabled):
self._allow_save = enabled
def as_list(self):
return [
Ref.playlist(uri=pl.uri, name=pl.name) for pl in self._playlists
]
def get_items(self, uri):
playlist = self.lookup(uri)
if playlist is None:
return
return [Ref.track(uri=t.uri, name=t.name) for t in playlist.tracks]
def lookup(self, uri):
uri = Ref.playlist(uri=uri).uri
for playlist in self._playlists:
if playlist.uri == uri:
return playlist
def refresh(self):
pass
def create(self, name):
playlist = Playlist(name=name, uri=f"dummy:{name}")
self._playlists.append(playlist)
return playlist
def delete(self, uri):
playlist = self.lookup(uri)
if playlist:
self._playlists.remove(playlist)
def save(self, playlist):
if not self._allow_save:
return None
old_playlist = self.lookup(playlist.uri)
if old_playlist is not None:
index = self._playlists.index(old_playlist)
self._playlists[index] = playlist
else:
self._playlists.append(playlist)
return playlist

30
tests/dummy_mixer.py Normal file
View File

@ -0,0 +1,30 @@
import pykka
from mopidy import mixer
def create_proxy(config=None):
return DummyMixer.start(config=None).proxy()
class DummyMixer(pykka.ThreadingActor, mixer.Mixer):
def __init__(self, config):
super().__init__()
# These must be initialised to avoid none type error in tests
self._volume = 50
self._mute = False
def get_volume(self):
return self._volume
def set_volume(self, volume):
self._volume = volume
self.trigger_volume_changed(volume=volume)
return True
def get_mute(self):
return self._mute
def set_mute(self, mute):
self._mute = mute
self.trigger_mute_changed(mute=mute)
return True

View File

@ -1,7 +1,4 @@
from __future__ import unicode_literals
import pytest
from mopidy_raspberry_gpio import Extension, PinConfig

View File

@ -1,43 +1,142 @@
from __future__ import unicode_literals
import sys
import mock
import pytest
from mopidy_raspberry_gpio import Extension, pinconfig
from mopidy_raspberry_gpio import frontend as frontend_lib
deserialize = pinconfig.PinConfig().deserialize
dummy_config = {
"raspberry-gpio": {
# Plugins expect settings to be deserialized
"bcm1": deserialize("play_pause,active_low,30")
}
}
def test_get_frontend_classes():
sys.modules['RPi'] = mock.Mock()
sys.modules['RPi.GPIO'] = mock.Mock()
ext = Extension()
registry = mock.Mock()
ext.setup(registry)
registry.add.assert_called_once_with(
'frontend', frontend_lib.RaspberryGPIOFrontend)
def test_frontend_handler_dispatch():
sys.modules['RPi'] = mock.Mock()
sys.modules['RPi.GPIO'] = mock.Mock()
frontend = frontend_lib.RaspberryGPIOFrontend(dummy_config, mock.Mock())
with pytest.raises(RuntimeError):
frontend.dispatch_input('tomato')
import sys
from unittest import mock
import pykka
from mopidy import core
import pytest
from mopidy_raspberry_gpio import Extension
from mopidy_raspberry_gpio import frontend as frontend_lib
from mopidy_raspberry_gpio import pinconfig
from . import dummy_audio, dummy_backend, dummy_mixer
deserialize = pinconfig.PinConfig().deserialize
dummy_config = {
"raspberry-gpio": {
# Plugins expect settings to be deserialized
"bcm1": deserialize("play_pause,active_low,30"),
"bcm2": deserialize("volume_up,active_high,30"),
"bcm3": deserialize("volume_down,active_high,30"),
}
}
def stop_mopidy_core():
pykka.ActorRegistry.stop_all()
def dummy_mopidy_core():
mixer = dummy_mixer.create_proxy()
audio = dummy_audio.create_proxy()
backend = dummy_backend.create_proxy(audio=audio)
return core.Core.start(audio=audio, mixer=mixer, backends=[backend]).proxy()
def test_get_frontend_classes():
sys.modules["RPi"] = mock.Mock()
sys.modules["RPi.GPIO"] = mock.Mock()
ext = Extension()
registry = mock.Mock()
ext.setup(registry)
registry.add.assert_called_once_with(
"frontend", frontend_lib.RaspberryGPIOFrontend
)
stop_mopidy_core()
def test_frontend_handler_dispatch_play_pause():
sys.modules["RPi"] = mock.Mock()
sys.modules["RPi.GPIO"] = mock.Mock()
frontend = frontend_lib.RaspberryGPIOFrontend(
dummy_config, dummy_mopidy_core()
)
frontend.dispatch_input("play_pause")
stop_mopidy_core()
def test_frontend_handler_dispatch_next():
sys.modules["RPi"] = mock.Mock()
sys.modules["RPi.GPIO"] = mock.Mock()
frontend = frontend_lib.RaspberryGPIOFrontend(
dummy_config, dummy_mopidy_core()
)
frontend.dispatch_input("next")
stop_mopidy_core()
def test_frontend_handler_dispatch_prev():
sys.modules["RPi"] = mock.Mock()
sys.modules["RPi.GPIO"] = mock.Mock()
frontend = frontend_lib.RaspberryGPIOFrontend(
dummy_config, dummy_mopidy_core()
)
frontend.dispatch_input("prev")
stop_mopidy_core()
def test_frontend_handler_dispatch_volume_up():
sys.modules["RPi"] = mock.Mock()
sys.modules["RPi.GPIO"] = mock.Mock()
frontend = frontend_lib.RaspberryGPIOFrontend(
dummy_config, dummy_mopidy_core()
)
frontend.dispatch_input("volume_up")
stop_mopidy_core()
def test_frontend_handler_dispatch_volume_down():
sys.modules["RPi"] = mock.Mock()
sys.modules["RPi.GPIO"] = mock.Mock()
frontend = frontend_lib.RaspberryGPIOFrontend(
dummy_config, dummy_mopidy_core()
)
frontend.dispatch_input("volume_down")
stop_mopidy_core()
def test_frontend_handler_dispatch_invalid_event():
sys.modules["RPi"] = mock.Mock()
sys.modules["RPi.GPIO"] = mock.Mock()
frontend = frontend_lib.RaspberryGPIOFrontend(
dummy_config, dummy_mopidy_core()
)
with pytest.raises(RuntimeError):
frontend.dispatch_input("tomato")
stop_mopidy_core()
def test_frontend_gpio_event():
sys.modules["RPi"] = mock.Mock()
sys.modules["RPi.GPIO"] = mock.Mock()
frontend = frontend_lib.RaspberryGPIOFrontend(
dummy_config, dummy_mopidy_core()
)
frontend.gpio_event(3)
stop_mopidy_core()

30
tox.ini
View File

@ -1,27 +1,23 @@
[tox]
envlist = py27, py27-flake8, py27-check-manifest
envlist = py37, py38, black, check-manifest, flake8
[testenv]
sitepackages = true
deps =
mock
pytest
pytest-cov
pytest-xdist
deps = .[test]
commands =
pytest \
-v -r wsx \
python -m pytest \
--basetemp={envtmpdir} \
--cov=mopidy_raspberry_gpio --cov-report=term-missing \
{posargs}
[testenv:py27-flake8]
deps =
flake8
flake8-import-order
skip_install = true
commands = python -m flake8
[testenv:black]
deps = .[lint]
commands = python -m black --check .
[testenv:py27-check-manifest]
deps = check-manifest
commands = check-manifest
[testenv:check-manifest]
deps = .[lint]
commands = python -m check_manifest
[testenv:flake8]
deps = .[lint]
commands = python -m flake8 --ignore=B950,B305,E501 --show-source --statistics