123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235 |
- #!./bin/python3
- import os
- import sys
- import threading
- import time
- import traceback
- from typing import Dict
- import pygame # type: ignore
- import tornado.ioloop, tornado.web
- import asyncio
- try:
- import RPi.GPIO as GPIO # Support for Raspberry PI GPIO input
- except:
- import mock_rpi_gpio as GPIO
- from display import Display
- from player import Player
- class WebHandler(tornado.web.RequestHandler):
- def initialize(self, musikautomat):
- self.musikautomat = musikautomat
- def get(self, a):
- if a.startswith("play"):
- if a == "play/dir":
- path = self.get_argument('path')
- self.musikautomat.play({
- "type" : "dir",
- "name" : self.get_argument('name', path),
- "path" : path
- }, start=True)
- class Musikautomat:
- def __init__(self, pydsp, config: Dict):
- # Initialise state objects
- self._autostart = config.get("autostart")
- self._pydsp = pydsp
- self._display = Display(config, self._pydsp)
- self._display.update()
- self._eventSink = self._display
- self._player = Player(config, self._pydsp)
- self._dx = config.get("dimx", 128)
- threading.Thread(target=self.runWebServer, daemon=True).start()
- GPIO.setmode(GPIO.BOARD) # Use physical pin numbering
- GPIO.setwarnings(False) # Disable warnings
- GPIO.setup(3, GPIO.IN, pull_up_down=GPIO.PUD_DOWN)
- GPIO.setup(5, GPIO.IN, pull_up_down=GPIO.PUD_DOWN)
- GPIO.setup(7, GPIO.IN, pull_up_down=GPIO.PUD_UP)
- GPIO.setup(11, GPIO.IN, pull_up_down=GPIO.PUD_UP)
- def runWebServer(self):
- '''
- Run event loop for web interface.
- '''
- urls = [
- (r"/api/(.*)", WebHandler, dict(musikautomat=self)),
- (r"/(.*)", tornado.web.StaticFileHandler, {"path" : "./web",
- "default_filename" : "index.html" }),
- ]
- asyncio.set_event_loop(asyncio.new_event_loop())
- app = tornado.web.Application(urls, debug=False)
- app.listen(8080)
- tornado.ioloop.IOLoop.instance().start()
- def eventUp(self):
- self._eventSink.action("up")
- def eventDown(self):
- self._eventSink.action("down")
- def eventLeft(self):
- # Get back to display
- self._eventSink = self._display
- self._display.update()
- def eventRight(self):
- if self._eventSink == self._player:
- self._player.togglePlay()
- else:
- self.play(self._display.currentItem())
- def runDisplay(self):
- '''
- Run event loop for display and buttons.
- '''
- clk = pygame.time.Clock()
- gpio_event_timer = 0
- if self._autostart:
- self.eventRight()
- self.eventRight()
- while True:
- clk.tick(10) # We only need 10 FPS
- if gpio_event_timer > 1:
- if GPIO.input(3) == 0: # Up
- gpio_event_timer=0
- self.eventUp()
- elif GPIO.input(5) == 0: # LEFT
- gpio_event_timer=0
- self.eventLeft()
- elif GPIO.input(7) == 0: # RIGHT
- gpio_event_timer=0
- self.eventRight()
- elif GPIO.input(11) == 0: # DOWN
- gpio_event_timer=0
- self.eventDown()
- else:
- gpio_event_timer+=1
- try:
- for event in pygame.event.get():
- # Handle exit event
- if event.type == pygame.QUIT:
- pygame.quit()
- sys.exit()
- # Handle key events
- if event.type == pygame.KEYDOWN:
- if event.key == pygame.K_UP:
- self.eventUp()
- elif event.key == pygame.K_LEFT:
- self.eventLeft()
- elif event.key == pygame.K_RIGHT:
- self.eventRight()
- elif event.key == pygame.K_DOWN:
- self.eventDown()
- except Exception as e:
- # Display error
- traceback.print_exc()
- self._pydsp.fill((0,0,0))
- font = pygame.font.Font('freesansbold.ttf', 12)
- s = str(e)
- n = int(self._dx / 6)
- for i, start in enumerate(range(0, len(s), n)):
- text = font.render(s[start:start+n], True, (255, 0, 0) , (0, 0, 0))
- self._pydsp.blit(text, (0, i * 12))
- self._eventSink = self._display
- pygame.display.update()
- def toDisplayName(self, s):
- s = os.path.splitext(s)[0]
- return s.replace("_", " ").title()
- def play(self, item, start=False):
- '''
- Play an item.
- '''
- print("Player data:", item)
- self._eventSink = self._player
- t = item.get("type")
- if t == "dir":
- self.playDir(item)
- elif t == "m3u":
- self.playM3U(item)
- elif t == "stream":
- self.playStream(item)
- else:
- raise Exception("Unknown type: %s" % t)
- if start:
- self._player.togglePlay()
- def playStream(self, item):
- '''
- Play a stream.
- '''
- self._player.setPlaylist([{
- "name" : item["name"],
- "path" : item["url"]
- }])
- self._player.update()
- def playDir(self, item):
- '''
- Play all files in a directory.
- '''
- path = item.get("path")
- files = os.listdir(path)
- self._player.setPlaylist(sorted([{
- "name" : self.toDisplayName(f),
- "path" : os.path.join(path, f)
- } for f in files], key=lambda i: i["name"]))
- self._player.update()
- def playM3U(self, item):
- '''
- Play all files of a M3U playlist.
- '''
- path = item.get("path")
- items = []
- with open(path) as f:
- for line in f:
- item_path = str(line).strip()
- if not item_path.startswith("#"):
- items.append({
- "name" : self.toDisplayName(os.path.basename(item_path)),
- "path" : os.path.join(os.path.dirname(path), item_path)
- })
- self._player.setPlaylist(items)
- self._player.update()
|