diff --git a/tests/dummy_audio.py b/tests/dummy_audio.py new file mode 100644 index 0000000..144f1a4 --- /dev/null +++ b/tests/dummy_audio.py @@ -0,0 +1,138 @@ +"""A dummy audio actor for use in tests. + +This class implements the audio API in the simplest way possible. It is used in +tests of the core and backends. +""" + + +import pykka + +from mopidy import audio + + +def create_proxy(config=None, mixer=None): + return DummyAudio.start(config, mixer).proxy() + + +# TODO: reset position on track change? +class DummyAudio(pykka.ThreadingActor): + def __init__(self, config=None, mixer=None): + super().__init__() + self.state = audio.PlaybackState.STOPPED + self._volume = 0 + self._position = 0 + self._callback = None + self._uri = None + self._stream_changed = False + self._live_stream = False + self._tags = {} + self._bad_uris = set() + + def set_uri(self, uri, live_stream=False): + assert self._uri is None, "prepare change not called before set" + self._position = 0 + self._uri = uri + self._stream_changed = True + self._live_stream = live_stream + self._tags = {} + + def set_appsrc(self, *args, **kwargs): + pass + + def emit_data(self, buffer_): + pass + + def get_position(self): + return self._position + + def set_position(self, position): + self._position = position + audio.AudioListener.send("position_changed", position=position) + return True + + def start_playback(self): + return self._change_state(audio.PlaybackState.PLAYING) + + def pause_playback(self): + return self._change_state(audio.PlaybackState.PAUSED) + + def prepare_change(self): + self._uri = None + return True + + def stop_playback(self): + return self._change_state(audio.PlaybackState.STOPPED) + + def get_volume(self): + return self._volume + + def set_volume(self, volume): + self._volume = volume + return True + + def set_metadata(self, track): + pass + + def get_current_tags(self): + return self._tags + + def set_about_to_finish_callback(self, callback): + self._callback = callback + + def enable_sync_handler(self): + pass + + def wait_for_state_change(self): + pass + + def _change_state(self, new_state): + if not self._uri: + return False + + if new_state == audio.PlaybackState.STOPPED and self._uri: + self._stream_changed = True + self._uri = None + + if self._uri is not None: + audio.AudioListener.send("position_changed", position=0) + + if self._stream_changed: + self._stream_changed = False + audio.AudioListener.send("stream_changed", uri=self._uri) + + old_state, self.state = self.state, new_state + audio.AudioListener.send( + "state_changed", + old_state=old_state, + new_state=new_state, + target_state=None, + ) + + if new_state == audio.PlaybackState.PLAYING: + self._tags["audio-codec"] = ["fake info..."] + audio.AudioListener.send("tags_changed", tags=["audio-codec"]) + + return self._uri not in self._bad_uris + + def trigger_fake_playback_failure(self, uri): + self._bad_uris.add(uri) + + def trigger_fake_tags_changed(self, tags): + self._tags.update(tags) + audio.AudioListener.send("tags_changed", tags=self._tags.keys()) + + def get_about_to_finish_callback(self): + # This needs to be called from outside the actor or we lock up. + def wrapper(): + if self._callback: + self.prepare_change() + self._callback() + + if not self._uri or not self._callback: + self._tags = {} + audio.AudioListener.send("reached_end_of_stream") + else: + audio.AudioListener.send("position_changed", position=0) + audio.AudioListener.send("stream_changed", uri=self._uri) + + return wrapper diff --git a/tests/dummy_backend.py b/tests/dummy_backend.py new file mode 100644 index 0000000..0ee3d2d --- /dev/null +++ b/tests/dummy_backend.py @@ -0,0 +1,153 @@ +"""A dummy backend for use in tests. + +This backend implements the backend API in the simplest way possible. It is +used in tests of the frontends. +""" + + +import pykka + +from mopidy import backend +from mopidy.models import Playlist, Ref, SearchResult + + +def create_proxy(config=None, audio=None): + return DummyBackend.start(config=config, audio=audio).proxy() + + +class DummyBackend(pykka.ThreadingActor, backend.Backend): + def __init__(self, config, audio): + super().__init__() + + self.library = DummyLibraryProvider(backend=self) + if audio: + self.playback = backend.PlaybackProvider(audio=audio, backend=self) + else: + self.playback = DummyPlaybackProvider(audio=audio, backend=self) + self.playlists = DummyPlaylistsProvider(backend=self) + + self.uri_schemes = ["dummy"] + + +class DummyLibraryProvider(backend.LibraryProvider): + root_directory = Ref.directory(uri="dummy:/", name="dummy") + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.dummy_library = [] + self.dummy_get_distinct_result = {} + self.dummy_browse_result = {} + self.dummy_find_exact_result = SearchResult() + self.dummy_search_result = SearchResult() + + def browse(self, path): + return self.dummy_browse_result.get(path, []) + + def get_distinct(self, field, query=None): + return self.dummy_get_distinct_result.get(field, set()) + + def lookup(self, uri): + uri = Ref.track(uri=uri).uri + return [t for t in self.dummy_library if uri == t.uri] + + def refresh(self, uri=None): + pass + + def search(self, query=None, uris=None, exact=False): + if exact: # TODO: remove uses of dummy_find_exact_result + return self.dummy_find_exact_result + return self.dummy_search_result + + +class DummyPlaybackProvider(backend.PlaybackProvider): + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self._uri = None + self._time_position = 0 + + def pause(self): + return True + + def play(self): + return self._uri and self._uri != "dummy:error" + + def change_track(self, track): + """Pass a track with URI 'dummy:error' to force failure""" + self._uri = track.uri + self._time_position = 0 + return True + + def prepare_change(self): + pass + + def resume(self): + return True + + def seek(self, time_position): + self._time_position = time_position + return True + + def stop(self): + self._uri = None + return True + + def get_time_position(self): + return self._time_position + + +class DummyPlaylistsProvider(backend.PlaylistsProvider): + def __init__(self, backend): + super().__init__(backend) + self._playlists = [] + self._allow_save = True + + def set_dummy_playlists(self, playlists): + """For tests using the dummy provider through an actor proxy.""" + self._playlists = playlists + + def set_allow_save(self, enabled): + self._allow_save = enabled + + def as_list(self): + return [ + Ref.playlist(uri=pl.uri, name=pl.name) for pl in self._playlists + ] + + def get_items(self, uri): + playlist = self.lookup(uri) + if playlist is None: + return + return [Ref.track(uri=t.uri, name=t.name) for t in playlist.tracks] + + def lookup(self, uri): + uri = Ref.playlist(uri=uri).uri + for playlist in self._playlists: + if playlist.uri == uri: + return playlist + + def refresh(self): + pass + + def create(self, name): + playlist = Playlist(name=name, uri=f"dummy:{name}") + self._playlists.append(playlist) + return playlist + + def delete(self, uri): + playlist = self.lookup(uri) + if playlist: + self._playlists.remove(playlist) + + def save(self, playlist): + if not self._allow_save: + return None + + old_playlist = self.lookup(playlist.uri) + + if old_playlist is not None: + index = self._playlists.index(old_playlist) + self._playlists[index] = playlist + else: + self._playlists.append(playlist) + + return playlist diff --git a/tests/dummy_mixer.py b/tests/dummy_mixer.py new file mode 100644 index 0000000..f030ca2 --- /dev/null +++ b/tests/dummy_mixer.py @@ -0,0 +1,30 @@ +import pykka + +from mopidy import mixer + + +def create_proxy(config=None): + return DummyMixer.start(config=None).proxy() + + +class DummyMixer(pykka.ThreadingActor, mixer.Mixer): + def __init__(self, config): + super().__init__() + self._volume = 50 # Had to be initialised to avoid none type error in tests + self._mute = False # Ditto + + def get_volume(self): + return self._volume + + def set_volume(self, volume): + self._volume = volume + self.trigger_volume_changed(volume=volume) + return True + + def get_mute(self): + return self._mute + + def set_mute(self, mute): + self._mute = mute + self.trigger_mute_changed(mute=mute) + return True diff --git a/tests/test_frontend.py b/tests/test_frontend.py index 427d4e6..08c3442 100644 --- a/tests/test_frontend.py +++ b/tests/test_frontend.py @@ -6,6 +6,11 @@ from mopidy_raspberry_gpio import Extension from mopidy_raspberry_gpio import frontend as frontend_lib from mopidy_raspberry_gpio import pinconfig +from mopidy import core +from . import dummy_mixer +from . import dummy_backend +from . import dummy_audio + deserialize = pinconfig.PinConfig().deserialize dummy_config = { @@ -18,6 +23,17 @@ dummy_config = { } +def dummy_mopidy_core(): + mixer = dummy_mixer.create_proxy() + audio = dummy_audio.create_proxy() + backend = dummy_backend.create_proxy(audio=audio) + return core.Core.start( + audio=audio, + mixer=mixer, + backends=[backend] + ).proxy() + + def test_get_frontend_classes(): sys.modules["RPi"] = mock.Mock() sys.modules["RPi.GPIO"] = mock.Mock() @@ -36,7 +52,7 @@ def test_frontend_handler_dispatch_play_pause(): sys.modules["RPi"] = mock.Mock() sys.modules["RPi.GPIO"] = mock.Mock() - frontend = frontend_lib.RaspberryGPIOFrontend(dummy_config, mock.Mock()) + frontend = frontend_lib.RaspberryGPIOFrontend(dummy_config, dummy_mopidy_core()) frontend.dispatch_input("play_pause") @@ -45,7 +61,7 @@ def test_frontend_handler_dispatch_next(): sys.modules["RPi"] = mock.Mock() sys.modules["RPi.GPIO"] = mock.Mock() - frontend = frontend_lib.RaspberryGPIOFrontend(dummy_config, mock.Mock()) + frontend = frontend_lib.RaspberryGPIOFrontend(dummy_config, dummy_mopidy_core()) frontend.dispatch_input("next") @@ -54,7 +70,7 @@ def test_frontend_handler_dispatch_prev(): sys.modules["RPi"] = mock.Mock() sys.modules["RPi.GPIO"] = mock.Mock() - frontend = frontend_lib.RaspberryGPIOFrontend(dummy_config, mock.Mock()) + frontend = frontend_lib.RaspberryGPIOFrontend(dummy_config, dummy_mopidy_core()) frontend.dispatch_input("prev") @@ -63,7 +79,7 @@ def test_frontend_handler_dispatch_volume_up(): sys.modules["RPi"] = mock.Mock() sys.modules["RPi.GPIO"] = mock.Mock() - frontend = frontend_lib.RaspberryGPIOFrontend(dummy_config, mock.Mock()) + frontend = frontend_lib.RaspberryGPIOFrontend(dummy_config, dummy_mopidy_core()) frontend.dispatch_input("volume_up") @@ -72,7 +88,7 @@ def test_frontend_handler_dispatch_volume_down(): sys.modules["RPi"] = mock.Mock() sys.modules["RPi.GPIO"] = mock.Mock() - frontend = frontend_lib.RaspberryGPIOFrontend(dummy_config, mock.Mock()) + frontend = frontend_lib.RaspberryGPIOFrontend(dummy_config, dummy_mopidy_core()) frontend.dispatch_input("volume_down") @@ -81,7 +97,7 @@ def test_frontend_handler_dispatch_invalid_event(): sys.modules["RPi"] = mock.Mock() sys.modules["RPi.GPIO"] = mock.Mock() - frontend = frontend_lib.RaspberryGPIOFrontend(dummy_config, mock.Mock()) + frontend = frontend_lib.RaspberryGPIOFrontend(dummy_config, dummy_mopidy_core()) with pytest.raises(RuntimeError): frontend.dispatch_input("tomato") @@ -91,6 +107,6 @@ def test_frontend_gpio_event(): sys.modules["RPi"] = mock.Mock() sys.modules["RPi.GPIO"] = mock.Mock() - frontend = frontend_lib.RaspberryGPIOFrontend(dummy_config, mock.Mock()) + frontend = frontend_lib.RaspberryGPIOFrontend(dummy_config, dummy_mopidy_core()) frontend.gpio_event(3) diff --git a/tox.ini b/tox.ini index d57a105..d92eb3d 100644 --- a/tox.ini +++ b/tox.ini @@ -20,4 +20,4 @@ commands = python -m check_manifest [testenv:flake8] deps = .[lint] -commands = python -m flake8 --show-source --statistics \ No newline at end of file +commands = python -m flake8 --ignore=B950 --show-source --statistics