#!./bin/python3 import os import sys import threading import time import traceback from typing import Dict import pygame # type: ignore import tornado.ioloop, tornado.web import asyncio try: import RPi.GPIO as GPIO # Support for Raspberry PI GPIO input except: import mock_rpi_gpio as GPIO from display import Display from player import Player class WebHandler(tornado.web.RequestHandler): def initialize(self, musikautomat): self.musikautomat = musikautomat def get(self, a): if a.startswith("play"): if a == "play/dir": path = self.get_argument('path') self.musikautomat.play({ "type" : "dir", "name" : self.get_argument('name', path), "path" : path }, start=True) class Musikautomat: def __init__(self, pydsp, config: Dict): # Initialise state objects self._autostart = config.get("autostart") self._pydsp = pydsp self._display = Display(config, self._pydsp) self._display.update() self._eventSink = self._display self._player = Player(config, self._pydsp) self._dx = config.get("dimx", 128) threading.Thread(target=self.runWebServer, daemon=True).start() GPIO.setmode(GPIO.BOARD) # Use physical pin numbering GPIO.setwarnings(False) # Disable warnings GPIO.setup(3, GPIO.IN, pull_up_down=GPIO.PUD_DOWN) GPIO.setup(5, GPIO.IN, pull_up_down=GPIO.PUD_DOWN) GPIO.setup(7, GPIO.IN, pull_up_down=GPIO.PUD_UP) GPIO.setup(11, GPIO.IN, pull_up_down=GPIO.PUD_UP) def runWebServer(self): ''' Run event loop for web interface. ''' urls = [ (r"/api/(.*)", WebHandler, dict(musikautomat=self)), (r"/(.*)", tornado.web.StaticFileHandler, {"path" : "./web", "default_filename" : "index.html" }), ] asyncio.set_event_loop(asyncio.new_event_loop()) app = tornado.web.Application(urls, debug=False) app.listen(8080) tornado.ioloop.IOLoop.instance().start() def eventUp(self): self._eventSink.action("up") def eventDown(self): self._eventSink.action("down") def eventLeft(self): # Get back to display self._eventSink = self._display self._display.update() def eventRight(self): if self._eventSink == self._player: self._player.togglePlay() else: self.play(self._display.currentItem()) def runDisplay(self): ''' Run event loop for display and buttons. ''' clk = pygame.time.Clock() gpio_event_timer = 0 if self._autostart: self.eventRight() self.eventRight() while True: clk.tick(10) # We only need 10 FPS if gpio_event_timer > 1: if GPIO.input(3) == 0: # Up gpio_event_timer=0 self.eventUp() elif GPIO.input(5) == 0: # LEFT gpio_event_timer=0 self.eventLeft() elif GPIO.input(7) == 0: # RIGHT gpio_event_timer=0 self.eventRight() elif GPIO.input(11) == 0: # DOWN gpio_event_timer=0 self.eventDown() else: gpio_event_timer+=1 try: for event in pygame.event.get(): # Handle exit event if event.type == pygame.QUIT: pygame.quit() sys.exit() # Handle key events if event.type == pygame.KEYDOWN: if event.key == pygame.K_UP: self.eventUp() elif event.key == pygame.K_LEFT: self.eventLeft() elif event.key == pygame.K_RIGHT: self.eventRight() elif event.key == pygame.K_DOWN: self.eventDown() except Exception as e: # Display error traceback.print_exc() self._pydsp.fill((0,0,0)) font = pygame.font.Font('freesansbold.ttf', 12) s = str(e) n = int(self._dx / 6) for i, start in enumerate(range(0, len(s), n)): text = font.render(s[start:start+n], True, (255, 0, 0) , (0, 0, 0)) self._pydsp.blit(text, (0, i * 12)) self._eventSink = self._display pygame.display.update() def toDisplayName(self, s): s = os.path.splitext(s)[0] return s.replace("_", " ").title() def play(self, item, start=False): ''' Play an item. ''' print("Player data:", item) self._eventSink = self._player t = item.get("type") if t == "dir": self.playDir(item) elif t == "m3u": self.playM3U(item) elif t == "stream": self.playStream(item) else: raise Exception("Unknown type: %s" % t) if start: self._player.togglePlay() def playStream(self, item): ''' Play a stream. ''' self._player.setPlaylist([{ "name" : item["name"], "path" : item["url"] }]) self._player.update() def playDir(self, item): ''' Play all files in a directory. ''' path = item.get("path") files = os.listdir(path) self._player.setPlaylist(sorted([{ "name" : self.toDisplayName(f), "path" : os.path.join(path, f) } for f in files], key=lambda i: i["name"])) self._player.update() def playM3U(self, item): ''' Play all files of a M3U playlist. ''' path = item.get("path") items = [] with open(path) as f: for line in f: item_path = str(line).strip() if not item_path.startswith("#"): items.append({ "name" : self.toDisplayName(os.path.basename(item_path)), "path" : os.path.join(os.path.dirname(path), item_path) }) self._player.setPlaylist(items) self._player.update()