Created
February 4, 2014 22:01
-
-
Save cirrusUK/8813208 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
""" | |
pms-youtube. | |
https://github.com/np1/pms-youtube | |
Copyright (C) 2014 nagev | |
This program is free software: you can redistribute it and/or modify | |
it under the terms of the GNU General Public License as published by | |
the Free Software Foundation, either version 3 of the License, or | |
(at your option) any later version. | |
This program is distributed in the hope that it will be useful, | |
but WITHOUT ANY WARRANTY; without even the implied warranty of | |
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | |
GNU General Public License for more details. | |
You should have received a copy of the GNU General Public License | |
along with this program. If not, see <http://www.gnu.org/licenses/>. | |
""" | |
from __future__ import print_function | |
__version__ = "0.01.05" | |
__author__ = "nagev" | |
__license__ = "GPLv3" | |
import subprocess | |
import logging | |
import random | |
import socket | |
import time | |
import pafy | |
import json | |
import sys | |
import re | |
import os | |
try: | |
# pylint: disable=F0401 | |
from colorama import init as init_colorama, Fore, Style | |
has_colorama = True | |
except ImportError: | |
has_colorama = False | |
# Python 3 compatibility hack | |
if sys.version_info[:2] >= (3, 0): | |
# pylint: disable=E0611,F0401 | |
import pickle | |
from urllib.request import build_opener | |
from urllib.error import HTTPError, URLError | |
from urllib.parse import urlencode | |
py2utf8_encode = lambda x: x | |
py2utf8_decode = lambda x: x | |
compat_input = input | |
else: | |
from urllib2 import build_opener, HTTPError, URLError | |
import cPickle as pickle | |
from urllib import urlencode | |
py2utf8_encode = lambda x: x.encode("utf8") | |
py2utf8_decode = lambda x: x.decode("utf8") | |
compat_input = raw_input | |
mswin = os.name == "nt" | |
member_var = lambda x: not(x.startswith("__") or callable(x)) | |
def mswinenc(txt): | |
""" Encoding for Windows. """ | |
if mswin: | |
sse = sys.stdout.encoding | |
txt = txt.encode(sse, "replace").decode("utf8", "ignore") | |
return txt | |
def mswinfn(filename): | |
""" Fix filename for Windows. """ | |
if mswin: | |
filename = mswinenc(filename) | |
allowed = re.compile(r'[^\\/?*$\'"%&:<>|]') | |
filename = "".join(x if allowed.match(x) else "_" for x in filename) | |
return filename | |
def get_default_ddir(): | |
""" Get system default Download directory, append PMS dir. """ | |
try: | |
# pylint: disable=E0611 | |
from gi.repository import GLib | |
ddir = GLib.get_user_special_dir(GLib.USER_DIRECTORY_DOWNLOAD) | |
ddir = ddir or os.path.join(os.path.expanduser("~"), "Downloads") | |
ddir = py2utf8_decode(ddir) | |
except (AttributeError, ImportError): | |
ddir = os.path.join(os.path.expanduser("~"), "Downloads") | |
return os.path.join(ddir, "PMS") | |
def get_config_dir(): | |
""" Get user's configuration directory. """ | |
if mswin: | |
confdir = os.environ["APPDATA"] | |
else: | |
if 'XDG_CONFIG_HOME' in os.environ: | |
confdir = os.environ['XDG_CONFIG_HOME'] | |
else: | |
print("XDG not found") | |
confdir = os.path.join(os.path.expanduser("~"), '.config') | |
if not os.path.exists(confdir): | |
os.mkdir(confdir) | |
confdir = os.path.join(confdir, "pms-youtube") | |
if not os.path.exists(confdir): | |
os.mkdir(confdir) | |
return confdir | |
class Config(object): | |
""" Holds various configuration values. """ | |
PLAYER = "mplayer" | |
PLAYERARGS = "-nolirc -nocache -prefer-ipv4 -really-quiet" | |
COLOURS = False if mswin and not has_colorama else True | |
CHECKUPDATE = True | |
SHOW_MPLAYER_KEYS = True | |
SHOW_STATUS = True | |
DDIR = get_default_ddir() | |
SHOW_VIDEO = False | |
SEARCH_MUSIC = True | |
if os.path.exists(os.path.join(os.path.expanduser("~"), ".pmsyt-debug")): | |
logging.basicConfig(level=logging.DEBUG) | |
if not mswin: | |
try: | |
import readline # import readline if not running on windows | |
readline.get_history_length() # redundant, prevents unused import warn | |
except ImportError: | |
pass # no biggie | |
opener = build_opener() | |
ua = "Mozilla/5.0 (compatible; MSIE 9.0; Windows NT 6.1; WOW64; Trident/5.0)" | |
opener.addheaders = [("User-Agent", ua)] | |
urlopen = opener.open | |
class Playlist(object): | |
""" Representation of a playist, has list of songs. """ | |
def __init__(self, name=None, songs=None): | |
self.name = name | |
self.creation = time.time() | |
self.songs = songs or [] | |
@property | |
def is_empty(self): | |
""" Return True / False if songs are populated or not. """ | |
return bool(not self.songs) | |
@property | |
def size(self): | |
""" Return number of tracks. """ | |
return len(self.songs) | |
@property | |
def duration(self): | |
""" Sum duration of the playlist. """ | |
duration = 0 | |
for song in self.songs: | |
duration += int(song['duration']) | |
duration = time.strftime('%H:%M:%S', time.gmtime(int(duration))) | |
return duration | |
class g(object): | |
""" Class for holding globals that are needed throught the module. """ | |
max_results = 19 | |
url_memo = {} | |
model = Playlist(name="model") | |
last_search_query = "" | |
current_page = 1 | |
active = Playlist(name="active") | |
noblank = False | |
text = {} | |
userpl = {} | |
last_opened = message = content = "" | |
config = [x for x in sorted(dir(Config)) if member_var(x)] | |
configbool = [x for x in config if type(getattr(Config, x)) is bool] | |
defaults = {setting: getattr(Config, setting) for setting in config} | |
CFFILE = os.path.join(get_config_dir(), "config") | |
PLFILE = os.path.join(get_config_dir(), "playlist") | |
def showconfig(_): | |
""" Dump config data. """ | |
s = " %s%-17s%s : \"%s\"\n" | |
out = " %s%-17s %s%s%s\n" % (c.ul, "Key", "Value", " " * 40, c.w) | |
for setting in g.config: | |
out += s % (c.g, setting.lower(), c.w, getattr(Config, setting)) | |
g.content = out | |
g.message = "Enter %sset <key> <value>%s to change\n" % (c.g, c.w) | |
g.message += "Enter %sset all default%s to reset all" % (c.g, c.w) | |
def saveconfig(): | |
""" Save current config to file. """ | |
config = {setting: getattr(Config, setting) for setting in g.config} | |
pickle.dump(config, open(g.CFFILE, "wb"), protocol=2) | |
# override config if config file exists | |
if os.path.exists(g.CFFILE): | |
# load and set config | |
saved_config = pickle.load(open(g.CFFILE, "rb")) | |
for kk, vv in saved_config.items(): | |
setattr(Config, kk, vv) | |
class c(object): | |
""" Class for holding colour code values. """ | |
if mswin and has_colorama: | |
white = Style.RESET_ALL | |
ul = Style.DIM + Fore.YELLOW | |
red, green, yellow = Fore.RED, Fore.GREEN, Fore.YELLOW | |
blue, pink = Fore.CYAN, Fore.MAGENTA | |
elif mswin: | |
Config.COLOURS = False | |
else: | |
white = "\x1b[%sm" % 0 | |
ul = "\x1b[%sm" * 3 % (2, 4, 33) | |
cols = ["\x1b[%sm" % n for n in range(91, 96)] | |
red, green, yellow, blue, pink = cols | |
if not Config.COLOURS: | |
ul = red = green = yellow = blue = pink = white = "" | |
r, g, y, b, p, w = red, green, yellow, blue, pink, white | |
def setconfig(key, val): | |
""" Set configuration variable. """ | |
# pylint: disable=R0912 | |
success_msg = fail_msg = "" | |
key = key.upper() | |
if key == "ALL" and val.upper() == "DEFAULT": | |
for k, v in g.defaults.items(): | |
setattr(Config, k, v) | |
success_msg = "Default configuration reinstated" | |
elif key == "DDIR" and not val.upper() == "DEFAULT": | |
valid = os.path.exists(val) and os.path.isdir(val) | |
if valid: | |
setattr(Config, key, val) | |
success_msg = "Downloads will be saved to %s%s%s" % (c.y, val, c.w) | |
else: | |
fail_msg = "Invalid path: %s%s%s" % (c.r, val, c.w) | |
elif key in g.configbool and not val.upper() == "DEFAULT": | |
if val.upper() in "0 FALSE OFF NO".split(): | |
setattr(Config, key, False) | |
success_msg = "%s set to disabled (restart may be required)" % key | |
elif key == "COLOURS" and mswin and not has_colorama: | |
fail_msg = "Can't enable colours, colorama not found" | |
else: | |
setattr(Config, key, True) | |
success_msg = "%s set to enabled (restart may be required)" % key | |
elif key in g.config: | |
if val.upper() == "DEFAULT": | |
val = g.defaults[key] | |
setattr(Config, key, val) | |
success_msg = "%s has been set to %s" % (key.upper(), val) | |
else: | |
fail_msg = "Unknown config item: %s%s%s" % (c.r, key, c.w) | |
showconfig(1) | |
if success_msg: | |
saveconfig() | |
g.message = success_msg | |
elif fail_msg: | |
g.message = fail_msg | |
HELP = """ | |
Note: More documentation is available at {3}https://github.com/np1/pms-youtube\ | |
{1} | |
{0}Searching{1} | |
You can enter a search term to search whenever the program is expecting | |
text input. Searches must be prefixed with either a {2}.{1} or {2}/{1} \ | |
character. | |
When a list of items is displayed, you can use the following commands: | |
{0}Downloading{1} | |
{2}d 3{1} to download item 3 | |
{0}Selecting Items{1} | |
{2}all{1} to play all | |
{2}1 2 3{1} to play items 1 2 and 3 | |
{2}2-4 6 7-3{1} to play items 2 3 4 6 7 6 5 4 3 | |
{2}3-6 9-12 shuffle{1} to play selected items in random order | |
{2}3-6 9-12 repeat{1} to play selected items continuously | |
{0}Manipulating Items{1} | |
{2}rm 1 3{1} to remove items 1 and 3. Also use rm 1 2 5-7 to remove a range | |
{2}rm all{1} to remove all items | |
{2}sw 1 3{1} to swap the position of items 1 and 3 | |
{2}mv 1 3{1} to move items 1 to postion 3 | |
{0}Playlist commands{1} | |
{2}add 1 2 5-7{1} to add items 1 2 5 6 and 7 to the current playlist. | |
{2}add 1 2 3 <playlist_name>{1} to add items 1,2,3 to a saved playlist. A new | |
playlist will be created if the given name doesn't already exist. | |
{2}vp{1} to view the current playlist (then use rm, mv and sw to modify it) | |
{2}ls{1} to list your saved playlists | |
{2}open <playlist_name or ID>{1} to open a saved playlist as the current \ | |
playlist | |
{2}view <playlist name or ID>{1} to view a playlist (leaves current playlist \ | |
intact) | |
{2}play <playlist name or ID>{1} to play a saved playlist directly | |
{2}save{1} or {2}save <playlist_name>{1} to save the currently displayed items | |
as a stored playlist on disk | |
{2}mv <old_name or ID> <new_name>{1} to rename a playlist | |
{2}rmp <playlist_name or ID>{1} to delete a playlist from disk | |
{2}q{1} to quit | |
""".format(c.ul, c.w, c.g, c.r) | |
def F(key, nb=0, na=0, percent=r"\*", nums=r"\*\*", textlib=None): | |
"""Format text. | |
nb, na indicate newlines before and after to return | |
percent is the delimter for %s | |
nums is the delimiter for the str.format command (**1 will become {1}) | |
textlib is the dictionary to use (defaults to g.text if not given) | |
""" | |
textlib = textlib or g.text | |
assert key in textlib | |
text = textlib[key] | |
percent_fmt = textlib.get(key + "_") | |
number_fmt = textlib.get("_" + key) | |
if number_fmt: | |
text = re.sub(r"(%s(\d))" % nums, "{\\2}", text) | |
text = text.format(*number_fmt) | |
if percent_fmt: | |
text = re.sub(r"%s" % percent, r"%s", text) | |
text = text % percent_fmt | |
text = re.sub(r"&&", r"%s", text) | |
return "\n" * nb + text + c.w + "\n" * na | |
g.text = { | |
"exitmsg": """\ | |
**0pms - **1http://github.com/np1/pms-youtube**0 | |
Released under the GPLv3 license | |
(c) 2014 nagev**2\n""", | |
"_exitmsg": (c.r, c.b, c.w), | |
# Error / Warning messages | |
'no playlists': "*No saved playlists found!*", | |
'no playlists_': (c.r, c.w), | |
'pl bad name': '*&&* is not valid a valid name. Ensure it starts with a ' | |
'letter or _', | |
'pl bad name_': (c.r, c.w), | |
'pl not found': 'Playlist *&&* unknown. Saved playlists are shown above', | |
'pl not found_': (c.r, c.w), | |
'pl not found advise ls': 'Playlist "*&&*" not found. Use *ls* to list', | |
'pl not found advise ls_': (c.y, c.w, c.g, c.w), | |
'pl empty': 'Playlist is empty!', | |
'advise add': 'Use *add N* to add a track', | |
'advise add_': (c.g, c.w), | |
'advise search': 'Search for items and then use *add* to add them', | |
'advise search_': (c.g, c.w), | |
'no data': 'Error fetching data. Perhaps http://pleer.com is down.\n*&&*', | |
'no data_': (c.r, c.w), | |
'use dot': 'Start your query with a *.* to perform a search', | |
'use dot_': (c.g, c.w), | |
'cant get track': 'Problem fetching this track: *&&*', | |
'cant get track_': (c.r, c.w), | |
'track unresolved': 'Sorry, this track is not available', | |
'no player': '*&&* was not found on this system', | |
'no player_': (c.y, c.w), | |
'no pl match for rename': '*Couldn\'t find matching playlist to rename*', | |
'no pl match for rename_': (c.r, c.w), | |
'invalid range': "*Invalid item / range entered!*", | |
'invalid range_': (c.r, c.w), | |
# Info messages | |
'pl renamed': 'Playlist *&&* renamed to *&&*', | |
'pl renamed_': (c.y, c.w, c.y, c.w), | |
'pl saved': 'Playlist saved as *&&*. Use *ls* to list playlists', | |
'pl saved_': (c.y, c.w, c.g, c.w), | |
'pl loaded': 'Loaded playlist *&&* as current playlist', | |
'pl loaded_': (c.y, c.w), | |
'pl viewed': 'Showing playlist *&&*', | |
'pl viewed_': (c.y, c.w), | |
'pl help': 'Enter *open <name or ID>* to load a playlist', | |
'pl help_': (c.g, c.w), | |
'added to pl': '*&&* tracks added (*&&* total [*&&*]). Use *vp* to view', | |
'added to pl_': (c.y, c.w, c.y, c.w, c.y, c.w, c.g, c.w), | |
'added to saved pl': '*&&* tracks added to *&&* (*&&* total [*&&*])', | |
'added to saved pl_': (c.y, c.w, c.y, c.w, c.y, c.w, c.y, c.w), | |
'song move': 'Moved *&&* to position *&&*', | |
'song move_': (c.y, c.w, c.y, c.w), | |
'song sw': ("Switched item *&&* with *&&*"), | |
'song sw_': (c.y, c.w, c.y, c.w), | |
'current pl': "This is the current playlist. Use *save <name>* to save it", | |
'current pl_': (c.g, c.w), | |
'songs rm': '*&&* tracks removed &&', | |
'songs rm_': (c.y, c.w) | |
} | |
def save_to_file(): | |
""" Save playlists. Called each time a playlist is saved or deleted. """ | |
f = open(g.PLFILE, "wb") | |
pickle.dump(g.userpl, f, protocol=2) | |
def open_from_file(): | |
""" Open playlists. Called once on script invocation. """ | |
try: | |
f = open(g.PLFILE, "rb") | |
g.userpl = pickle.load(f) | |
except IOError: | |
g.userpl = {} | |
save_to_file() | |
def logo(col=None, version=""): | |
""" Return text logo. """ | |
col = col if col else random.choice((c.g, c.r, c.y, c.b, c.p, c.w)) | |
LOGO = col + """\ | |
8888888b. 888b d888 .d8888b. | |
888 Y88b 8888b d8888 d88P Y88b | |
888 888 88888b.d88888 Y88b. | |
888 d88P 888Y88888P888 "Y888b. | |
8888888P" 888 Y888P 888 "Y88b. | |
888 888 Y8P 888 "888 | |
888 888 " 888 Y88b d88P | |
888 888 888 "Y8888P" %s | |
""" % (c.w + "v" + version + " (YouTube)" if version else "") | |
return LOGO + c.w | |
def playlists_display(): | |
""" Produce a list of all playlists. """ | |
if not g.userpl: | |
g.message = F("no playlists") | |
return logo(c.y) + "\n\n" if g.model.is_empty else \ | |
generate_songlist_display() | |
maxname = max(len(a) for a in g.userpl) | |
out = " {0}Saved Playlists{1}\n".format(c.ul, c.w) | |
start = " " | |
fmt = "%s%s%-3s %-" + str(maxname + 3) + "s%s %s%-7s%s %-5s%s" | |
head = (start, c.b, "ID", "Name", c.b, c.b, "Count", c.b, "Duration", c.w) | |
out += "\n" + fmt % head + "\n\n" | |
for v, z in enumerate(sorted(g.userpl)): | |
n, p = z, g.userpl[z] | |
l = fmt % (start, c.g, v + 1, n, c.w, c.y, str(p.size), c.y, | |
p.duration, c.w) + "\n" | |
out += l | |
return out | |
def mplayer_help(short=True): | |
""" Mplayer help. """ | |
volume = "[{0}9{1}] volume [{0}0{1}]" | |
volume = volume if short else volume + " [{0}ctrl-c{1}] return" | |
seek = u"[{0}\u2190{1}] seek [{0}\u2192{1}]" | |
pause = u"[{0}\u2193{1}] SEEK [{0}\u2191{1}] [{0}space{1}] pause" | |
if mswin: | |
seek = "[{0}<-{1}] seek [{0}->{1}]" | |
pause = "[{0}DN{1}] SEEK [{0}UP{1}] [{0}space{1}] pause" | |
ret = "[{0}q{1}] %s" % ("return" if short else "next track") | |
fmt = " %-20s %-20s" | |
lines = fmt % (seek, volume) + "\n" + fmt % (pause, ret) | |
return lines.format(c.g, c.w) | |
def tidy(raw, field): | |
""" Tidy HTML entities, format songlength if field is duration. """ | |
if field == "duration": | |
raw = time.strftime('%M:%S', time.gmtime(int(raw))) | |
else: | |
for r in (("'", "'"), ("&#039;", "'"), ("&amp;", "&"), | |
(" ", " "), ("&", "&"), (""", '"')): | |
raw = raw.replace(r[0], r[1]) | |
return raw | |
def get_tracks_from_json(json): | |
""" Get search results from web page. """ | |
try: | |
items = json['data']['items'] | |
except KeyError: | |
items = [] | |
songs = [] | |
for item in items: | |
cursong = dict( | |
title=item['title'].strip(), | |
duration=item['duration'], | |
length=tidy(item['duration'], "duration"), | |
link=item['id'], | |
rating=item.get('rating') | |
) | |
songs.append(cursong) | |
if not items: | |
logging.debug("got unexpected data or no search results") | |
return False | |
return songs | |
def screen_update(): | |
""" Display content, show message, blank screen.""" | |
if not g.noblank: | |
print("\n" * 200) | |
if g.content: | |
g.content = mswinenc(g.content) | |
print(py2utf8_encode(g.content)) | |
if g.message: | |
print(g.message) | |
g.message = g.content = False | |
g.noblank = False | |
def playback_progress(idx, allsongs, repeat=False): | |
""" Generate string to show selected tracks, indicate current track. """ | |
# pylint: disable=R0914 | |
# too many local variables | |
out = " %s%-66s %s %s\n" % (c.ul, "Title", "Time", c.w) | |
show_key_help = (Config.PLAYER == "mplayer" or Config.PLAYER == "mpv")\ | |
and Config.SHOW_MPLAYER_KEYS | |
multi = len(allsongs) > 1 | |
for n, song in enumerate(allsongs): | |
i = song['title'][:64], song['length'] | |
fmt = (c.w, " ", c.b, i[0], c.w, c.y, i[1], c.w) | |
if n == idx: | |
fmt = (c.y, "> ", c.p, i[0], c.w, c.p, i[1], c.w) | |
cur = i | |
out += "%s%s%s%-66s%s [%s%s%s]\n" % fmt | |
out += "\n" * (3 - len(allsongs)) | |
pos = 8 * " ", c.y, idx + 1, c.w, c.y, len(allsongs), c.w | |
playing = "{}{}{}{} of {}{}{}\n\n".format(*pos) if multi else "\n\n" | |
keys = mplayer_help(short=(not multi and not repeat)) | |
out = out if multi else generate_songlist_display(song=allsongs[0]) | |
if show_key_help: | |
out += "\n" + keys | |
else: | |
playing = "{}{}{}{} of {}{}{}\n".format(*pos) if multi else "\n" | |
out += "\n" + " " * 58 if multi else "" | |
fmt = playing, c.r, cur[0], c.w, c.w, cur[1], c.w | |
out += "%s %s%s%s %s[%s]%s" % fmt | |
out += " REPEAT MODE" if repeat else "" | |
return out | |
def generate_songlist_display(song=False): | |
""" Generate list of choices from a song list.""" | |
songs = g.model.songs or [] | |
if not songs: | |
return logo(c.g) + "\n\n" | |
fmtrow = "%s%-5s %-63s %-7s%s\n" | |
head = (c.ul, "Item", "Title", "Length", c.w) | |
out = "\n" + fmtrow % head | |
for n, x in enumerate(songs): | |
col = (c.r if n % 2 == 0 else c.p) if not song else c.b | |
length = x.get('length') or 0 | |
title = x.get('title') or "unknown title" | |
if not song or song != songs[n]: | |
out += (fmtrow % (col, str(n + 1), title[:63], str(length), c.w)) | |
else: | |
out += (fmtrow % (c.p, str(n + 1), title[:63], str(length), c.w)) | |
return out + "\n" * (5 - len(songs)) if not song else out | |
def writestatus(text): | |
""" Update status line """ | |
if Config.SHOW_STATUS: | |
writeline(text) | |
def writeline(text): | |
""" Print text on same line. """ | |
spaces = 75 - len(text) | |
sys.stdout.write(text + (" " * spaces) + "\r") | |
sys.stdout.flush() | |
def get_stream(song, force=True): | |
""" Return the url for a song. """ | |
if not "track_url" in song or force: | |
statusline = "getting stream url for %s..." % song['link'] | |
writestatus(statusline) | |
p = pafy.new(song['link'], callback=writestatus) # if not "pafy" in song else song['pafy'] | |
try: | |
track = p.getbest() if Config.SHOW_VIDEO else p.getbestaudio() | |
except ValueError: | |
track = p.allstreams[0] | |
finally: | |
track = p.allstreams[0] if not track else track | |
extension = track.extension | |
filesize = track.get_filesize() | |
quality = track.rawbitrate | |
l = song['link'] | |
filesize = int(filesize / (1024 ** 2)) | |
writestatus("%s : %s : %s Mb" % (l, extension, filesize)) | |
return track.url | |
else: | |
return song['track_url'] | |
def playsong(song, failcount=0): | |
""" Play song using config.PLAYER called with args config.PLAYERARGS.""" | |
try: | |
track_url = get_stream(song) | |
song['track_url'] = track_url | |
except (URLError, HTTPError, socket.timeout) as e: | |
g.message = F('cant get track') % str(e) | |
return | |
except ValueError: | |
g.message = F('track unresolved') | |
return | |
try: | |
cmd = [Config.PLAYER] + Config.PLAYERARGS.split() + [song['track_url']] | |
logging.debug("starting player with " + song['track_url']) | |
stdout = stderr = None | |
with open(os.devnull, "w") as fnull: | |
if "mpv" in Config.PLAYER or "mplayer" in Config.PLAYER: | |
stderr = fnull | |
if mswin: | |
stdout = stderr = fnull | |
now = time.time() | |
subprocess.call(cmd, stdout=stdout, stderr=stderr) | |
fin = time.time() | |
failed = fin - now < 1 and song['duration'] > 10 | |
if failed and failcount < 4: | |
writestatus("trying again (attempt %s)" % (2 + failcount)) | |
time.sleep(1) | |
failcount += 1 | |
playsong(song, failcount=failcount) | |
except OSError: | |
g.message = F('no player') % Config.PLAYER | |
def search(term, page=1, splash=True): | |
""" Perform search. """ | |
if not term or len(term) < 2: | |
g.message = c.r + "Not enough input" + c.w | |
g.content = generate_songlist_display() | |
else: | |
original_term = term | |
url = "https://gdata.youtube.com/feeds/api/videos" | |
term = term.replace(" ", "+") | |
query = { | |
'q': term, | |
'v': 2, | |
'alt': 'jsonc', | |
'start-index': ((page - 1)* g.max_results + 1) or 1, | |
#'category': "Music" if Config.SEARCH_MUSIC else "All", | |
'safeSearch': "none", | |
'max-results': g.max_results, | |
'orderby': 'relevance' | |
} | |
if Config.SEARCH_MUSIC: | |
query['category'] = "Music" | |
g.message = "Searching for '%s%s%s'" % (c.y, original_term, c.w) | |
url = url + "?" + urlencode(query) | |
logging.debug(url) | |
memokey = str(Config.SEARCH_MUSIC) + url | |
if memokey in g.url_memo: | |
songs = g.url_memo[memokey] | |
else: | |
if splash: | |
g.content = logo(c.b) + "\n\n" | |
screen_update() | |
try: | |
wdata = urlopen(url).read().decode("utf8") | |
wdata = json.loads(wdata) | |
songs = get_tracks_from_json(wdata) | |
except (URLError, HTTPError) as e: | |
g.message = F('no data') % e | |
g.content = logo(c.r) | |
return | |
if songs: | |
g.url_memo[memokey] = songs | |
g.model.songs = songs | |
g.message = "Search results for %s%s%s" % (c.y, original_term, c.w) | |
g.last_opened = "" | |
g.last_search_query = original_term | |
g.current_page = page | |
g.content = generate_songlist_display() | |
else: | |
g.message = "Found nothing for %s%s%s" % (c.y, term, c.w) | |
g.content = logo(c.r) | |
g.current_page = 1 | |
g.last_search_query = "" | |
def _make_fname(song): | |
"""" Create download directory, generate filename. """ | |
if not os.path.exists(Config.DDIR): | |
os.makedirs(Config.DDIR) | |
p = pafy.new(song['link']) # if not 'pafy' in song else song['pafy'] | |
if not Config.SHOW_VIDEO: | |
extension = p.getbestaudio().extension | |
else: | |
extension = p.getbest().extension | |
filename = song['title'][:59] + "." + extension | |
filename = os.path.join(Config.DDIR, mswinfn(filename.replace("/", "-"))) | |
return filename | |
def _download(song, filename): | |
""" Download file, show status, return filename. """ | |
print("Downloading %s%s%s ..\n" % (c.g, filename, c.w)) | |
status_string = (' {0}{1:,}{2} Bytes [{0}{3:.2%}{2}] received. Rate: ' | |
'[{0}{4:4.0f} kbps{2}]. ETA: [{0}{5:.0f} secs{2}]') | |
song['track_url'] = get_stream(song) | |
logging.debug("[4] fetching url " + song['track_url']) | |
resp = urlopen(song['track_url']) | |
logging.debug("fetched url " + song['track_url']) | |
total = int(resp.info()['Content-Length'].strip()) | |
chunksize, bytesdone, t0 = 16384, 0, time.time() | |
outfh = open(filename, 'wb') | |
while True: | |
chunk = resp.read(chunksize) | |
outfh.write(chunk) | |
elapsed = time.time() - t0 | |
bytesdone += len(chunk) | |
rate = (bytesdone / 1024) / elapsed | |
eta = (total - bytesdone) / (rate * 1024) | |
stats = (c.y, bytesdone, c.w, bytesdone * 1.0 / total, rate, eta) | |
if not chunk: | |
outfh.close() | |
break | |
status = status_string.format(*stats) | |
sys.stdout.write("\r" + status + ' ' * 4 + "\r") | |
sys.stdout.flush() | |
return filename | |
def _bi_range(start, end): | |
""" | |
Inclusive range function, works for reverse ranges. | |
eg. 5,2 returns [5,4,3,2] and 2, 4 returns [2,3,4] | |
""" | |
if start == end: | |
return (start,) | |
elif end < start: | |
return reversed(range(end, start + 1)) | |
else: | |
return range(start, end + 1) | |
def _parse_multi(choice, end=None): | |
""" Handle ranges like 5-9, 9-5, 5- and -5. Return list of ints. """ | |
end = end or str(g.model.size) | |
pattern = r'(?<![-\d])(\d+-\d+|-\d+|\d+-|\d+)(?![-\d])' | |
items = re.findall(pattern, choice) | |
alltracks = [] | |
for x in items: | |
if x.startswith("-"): | |
x = "1" + x | |
elif x.endswith("-"): | |
x = x + str(end) | |
if "-" in x: | |
nrange = x.split("-") | |
startend = map(int, nrange) | |
alltracks += _bi_range(*startend) | |
else: | |
alltracks.append(int(x)) | |
return alltracks | |
def _get_near_plname(begin): | |
""" Return the closest matching playlist name that starts with begin. """ | |
for name in sorted(g.userpl): | |
if name.lower().startswith(begin.lower()): | |
break | |
else: | |
return begin | |
return name | |
def play_pl(name): | |
""" Play a playlist by name. """ | |
if name.isdigit(): | |
name = int(name) | |
name = sorted(g.userpl)[name - 1] | |
saved = g.userpl.get(name) | |
if not saved: | |
name = _get_near_plname(name) | |
saved = g.userpl.get(name) | |
if saved: | |
g.model.songs = list(saved.songs) | |
play_all("", "", "") | |
else: | |
g.message = F("pl not found") % name | |
g.content = playlists_display() | |
#return | |
def save_last(args=None): | |
""" Save command with no playlist name. """ | |
if g.last_opened: | |
open_save_view("save", g.last_opened) | |
else: | |
saveas = "" | |
#save using artist name in postion 1 | |
if not g.model.is_empty: | |
saveas = g.model.songs[0]['title'][:18].strip() | |
saveas = re.sub(r"[^-\w]", "-", saveas, re.UNICODE) | |
# loop to find next available name | |
post = 0 | |
while g.userpl.get(saveas): | |
post += 1 | |
saveas = g.model.songs[0]['title'][:18].strip() + "-" + str(post) | |
open_save_view("save", saveas) | |
def open_save_view(action, name): | |
""" Open, save or view a playlist by name. Get closest name match. """ | |
if action == "open" or action == "view": | |
saved = g.userpl.get(name) | |
if not saved: | |
name = _get_near_plname(name) | |
saved = g.userpl.get(name) | |
if saved and action == "open": | |
g.model.songs = g.active.songs = list(saved.songs) | |
g.message = F("pl loaded") % name | |
g.last_opened = name | |
elif saved and action == "view": | |
g.model.songs = list(saved.songs) | |
g.message = F("pl viewed") % name | |
g.last_opened = "" | |
elif not saved and action in "view open".split(): | |
g.message = F("pl not found") % name | |
g.content = playlists_display() | |
elif action == "save": | |
if not g.model.songs: | |
g.message = "Nothing to save. " + F('advise search') | |
else: | |
name = name.replace(" ", "-") | |
g.userpl[name] = Playlist(name, list(g.model.songs)) | |
g.message = F('pl saved') % name | |
save_to_file() | |
g.content = generate_songlist_display() | |
def open_view_bynum(action, num): | |
""" Open or view a saved playlist by number. """ | |
srt = sorted(g.userpl) | |
name = srt[int(num) - 1] | |
open_save_view(action, name) | |
def songlist_rm_add(action, songrange): | |
""" Remove or add tracks. works directly on user input. """ | |
selection = _parse_multi(songrange) | |
if action == "add": | |
for songnum in selection: | |
g.active.songs.append(g.model.songs[songnum - 1]) | |
d = g.active.duration | |
g.message = F('added to pl') % (len(selection), g.active.size, d) | |
elif action == "rm": | |
selection = list(reversed(sorted(list(set(selection))))) | |
removed = str(tuple(reversed(selection))).replace(",", "") | |
for x in selection: | |
g.model.songs.pop(x - 1) | |
g.message = F('songs rm') % (len(selection), removed) | |
g.content = generate_songlist_display() | |
def play(pre, choice, post=""): | |
""" Play choice. Use repeat/random if appears in pre/post. """ | |
if not g.model.songs: | |
g.message = c.r + "There are no tracks to select" + c.w | |
g.content = g.content or generate_songlist_display() | |
else: | |
shuffle = "shuffle" in pre + post | |
repeat = "repeat" in pre + post | |
selection = _parse_multi(choice) | |
debug = ("shuffle=" + str(shuffle) + " : repeat=" + | |
str(repeat) + " : " + str(selection)) | |
logging.debug(debug) | |
songlist = [g.model.songs[x - 1] for x in selection] | |
play_range(songlist, shuffle, repeat) | |
def play_all(pre, choice, post=""): | |
""" Play all tracks in model (last displayed). shuffle/repeat if req'd.""" | |
options = pre + choice + post | |
play(options, "1-" + str(len(g.model.songs))) | |
def ls(): | |
""" List user saved playlists. """ | |
if not g.userpl: | |
g.message = F('no playlists') | |
g.content = g.content or generate_songlist_display() | |
else: | |
g.content = playlists_display() | |
g.message = F('pl help') | |
def vp(): | |
""" View current working playlist. """ | |
if g.active.is_empty: | |
txt = F('advise search') if g.model.is_empty else F('advise add') | |
g.message = F('pl empty') + " " + txt | |
else: | |
g.model.songs = g.active.songs | |
g.message = F('current pl') | |
g.content = generate_songlist_display() | |
def play_range(songlist, shuffle=False, repeat=False): | |
""" Play a range of songs, exit cleanly on keyboard interrupt. """ | |
if shuffle: | |
random.shuffle(songlist) | |
if not repeat: | |
for n, song in enumerate(songlist): | |
g.content = playback_progress(n, songlist, repeat=False) | |
screen_update() | |
try: | |
playsong(song) | |
except KeyboardInterrupt: | |
print("Stopping...") | |
time.sleep(1) | |
g.message = c.y + "Playback halted" + c.w | |
break | |
elif repeat: | |
while True: | |
try: | |
for n, song in enumerate(songlist): | |
g.content = playback_progress(n, songlist, repeat=True) | |
screen_update() | |
playsong(song) | |
g.content = generate_songlist_display() | |
except KeyboardInterrupt: | |
print("Stopping...") | |
time.sleep(2) | |
g.message = c.y + "Playback halted" + c.w | |
break | |
g.content = generate_songlist_display() | |
def show_help(helpname=None): | |
""" Print help message. """ | |
print("\n" * 200) | |
print(HELP) | |
print("Press Enter to continue", end="") | |
try: | |
compat_input("") | |
except (KeyboardInterrupt, EOFError): | |
prompt_for_exit() | |
g.content = generate_songlist_display() | |
def quits(showlogo=True): | |
""" Exit the program. """ | |
msg = ("\n" * 200) + logo(c.r, version=__version__) if showlogo else "" | |
vermsg = "" | |
print(msg + F("exitmsg", 2)) | |
if Config.CHECKUPDATE and showlogo: | |
try: | |
url = "https://github.com/np1/pms-youtube/raw/master/VERSION" | |
v = urlopen(url).read().decode("utf8") | |
v = re.search(r"^version\s*([\d\.]+)\s*$", v, re.MULTILINE) | |
if v: | |
v = v.group(1) | |
if v > __version__: | |
vermsg += "\nA newer version is available (%s)\n" % v | |
except (URLError, HTTPError, socket.timeout): | |
pass | |
sys.exit(vermsg) | |
def download(num): | |
""" Download a track. """ | |
song = (g.model.songs[int(num) - 1]) | |
filename = _make_fname(song) | |
try: | |
f = _download(song, filename) | |
g.message = "Downloaded " + c.g + f + c.w | |
except IndexError: | |
g.message = c.r + "Invalid index" + c.w | |
except KeyboardInterrupt: | |
g.message = c.r + "Download halted!" + c.w | |
try: | |
os.remove(filename) | |
except IOError: | |
pass | |
finally: | |
g.content = generate_songlist_display() | |
def prompt_for_exit(): | |
""" Ask for exit confirmation. """ | |
g.message = c.r + "Press ctrl-c again to exit" + c.w | |
g.content = generate_songlist_display() | |
screen_update() | |
try: | |
userinput = compat_input(c.r + " > " + c.w) | |
except (KeyboardInterrupt, EOFError): | |
quits(showlogo=False) | |
return userinput | |
def playlist_remove(name): | |
""" Delete a saved playlist by name - or purge working playlist if *all.""" | |
if name.isdigit() or g.userpl.get(name): | |
if name.isdigit(): | |
name = int(name) - 1 | |
name = sorted(g.userpl)[name] | |
del g.userpl[name] | |
g.message = "Deleted playlist %s%s%s" % (c.y, name, c.w) | |
g.content = playlists_display() | |
save_to_file() | |
else: | |
g.message = F('pl not found advise ls') % name | |
g.content = playlists_display() | |
def songlist_mv_sw(action, a, b): | |
""" Move a song or swap two songs. """ | |
i, j = int(a) - 1, int(b) - 1 | |
if action == "mv": | |
g.model.songs.insert(j, g.model.songs.pop(i)) | |
g.message = F('song move') % (g.model.songs[j]['title'], b) | |
elif action == "sw": | |
g.model.songs[i], g.model.songs[j] = g.model.songs[j], g.model.songs[i] | |
g.message = F('song sw') % (min(a, b), max(a, b)) | |
g.content = generate_songlist_display() | |
def playlist_add(nums, playlist): | |
""" Add selected song nums to saved playlist. """ | |
nums = _parse_multi(nums) | |
if not g.userpl.get(playlist): | |
playlist = playlist.replace(" ", "-") | |
g.userpl[playlist] = Playlist(playlist) | |
for songnum in nums: | |
g.userpl[playlist].songs.append(g.model.songs[songnum - 1]) | |
dur = g.userpl[playlist].duration | |
f = (len(nums), playlist, g.userpl[playlist].size, dur) | |
g.message = F('added to saved pl') % f | |
save_to_file() | |
g.content = generate_songlist_display() | |
def playlist_rename_idx(_id, name): | |
""" Rename a playlist by ID. """ | |
_id = int(_id) - 1 | |
playlist_rename(sorted(g.userpl)[_id] + " " + name) | |
def playlist_rename(playlists): | |
""" Rename a playlist using mv command. """ | |
# Deal with old playlist names that permitted spaces | |
a, b = "", playlists.split(" ") | |
while a not in g.userpl: | |
a = (a + " " + (b.pop(0))).strip() | |
if not b and not a in g.userpl: | |
g.message = F('no pl match for rename') | |
g.content = g.content or playlists_display() | |
return | |
b = "-".join(b) | |
g.userpl[b] = Playlist(b) | |
g.userpl[b].songs = list(g.userpl[a].songs) | |
playlist_remove(a) | |
g.message = F('pl renamed') % (a, b) | |
save_to_file() | |
def add_rm_all(action): | |
""" Add all displayed songs to current playlist. | |
remove all displayed songs from view. | |
""" | |
if action == "rm": | |
for n in reversed(range(0, len(g.model.songs))): | |
g.model.songs.pop(n) | |
g.message = c.b + "Cleared all songs" + c.w | |
g.content = generate_songlist_display() | |
elif action == "add": | |
size = g.model.size | |
songlist_rm_add("add", "-" + str(size)) | |
def nextprev(np): | |
""" Get next / previous search results. """ | |
if np == "n": | |
if len(g.model.songs) == g.max_results and g.last_search_query: | |
g.current_page += 1 | |
search(g.last_search_query, g.current_page, splash=False) | |
g.message += " : page %s" % g.current_page | |
else: | |
g.message = "No more items to display" | |
elif np == "p": | |
if g.current_page > 1 and g.last_search_query: | |
g.current_page -= 1 | |
search(g.last_search_query, g.current_page, splash=False) | |
g.message += " : page %s" % g.current_page | |
else: | |
g.message = "No previous items to display" | |
g.content = generate_songlist_display() | |
def info(num): | |
""" Get video description """ | |
item = (g.model.songs[int(num) - 1]) | |
writestatus("Fetching Description..") | |
#if not "pafy" in item: | |
item['pafy'] = pafy.new(item['link']) | |
p = item['pafy'] | |
out = c.ul + "Video Info" + c.w + "\n\n" | |
out += p.description or "" | |
out += "\n\nAuthor: " + py2utf8_decode(p.author) | |
out += "\nViewcount: " + str(p.viewcount) | |
out += "\nRating: " + str(p.rating)[:4] | |
out += "\nCategory: " + p.category | |
out += "\n\n%s[%sPress enter to go back%s]%s" % (c.y, c.w, c.y,c.w) | |
g.content = out | |
def plist(parturl): | |
""" Import playlist created on website. """ | |
url = "http://pleer.com/en/" + parturl | |
try: | |
page = urlopen(url).read().decode("utf8") | |
logging.debug(page) | |
songs = get_tracks_from_page(page) | |
g.model.songs = songs | |
g.active.songs = songs | |
g.message = c.y + "Playlist imported" + c.w | |
except IOError: | |
g.message = "Problem fetching that list." | |
g.content = generate_songlist_display() | |
def main(): | |
""" Main control loop. """ | |
g.content = generate_songlist_display() | |
g.content = logo(col=c.g, version=__version__) + "\n" | |
g.message = "Enter .artist/song name to search or [h]elp" | |
screen_update() | |
# open playlists from file | |
open_from_file() | |
# get cmd line input | |
inp = " ".join(sys.argv[1:]) | |
# input types | |
word = r'[^\W\d][-\w\s]{,100}' | |
rs = r'(?:repeat\s*|shuffle\s*)' | |
regx = { | |
'ls': r'ls$', | |
'vp': r'vp$', | |
'top': r'top(|3m|6m|year|all)\s*$', | |
'plist': r'.*(list[\da-zA-Z]{8,14})$', | |
'play': r'(%s{0,3})([-,\d\s]{1,250})\s*(%s{0,2})$' % (rs, rs), | |
'info': r'i\s*(\d{1,4})$', | |
'quits': r'(?:q|quit|exit)$', | |
'search': r'(?:search|\.|/)\s*(.{2,500})', | |
'play_pl': r'play\s*(%s|\d+)$' % word, | |
'download': r'(?:d|dl|download)\s*(\d{1,4})$', | |
'nextprev': r'(n|p)$', | |
'play_all': r'(%s{0,3})all\s*(%s{0,3})$' % (rs, rs), | |
'save_last': r'(save)\s*$', | |
#'setconfig': r'set\s*(\w+)\s*"([^"]*)"\s*$', | |
'setconfig': r'set\s*(\w+)\s*"?([^"]*)"?\s*$', | |
'show_help': r'(?:help|h)$', | |
'add_rm_all': r'(rm|add)\s*all$', | |
#'showconfig': r'(showconfig)', | |
'showconfig': r'(set|showconfig)\s*$', | |
'playlist_add': r'add\s*(-?\d[-,\d\s]{1,250})(%s)$' % word, | |
'open_save_view': r'(open|save|view)\s*(%s)$' % word, | |
'songlist_mv_sw': r'(mv|sw)\s*(\d{1,4})\s*[\s,]\s*(\d{1,4})$', | |
'songlist_rm_add': r'(rm|add)\s*(-?\d[-,\d\s]{,250})$', | |
'playlist_rename': r'mv\s*(%s\s+%s)$' % (word, word), | |
'playlist_remove': r'rmp\s*(\d+|%s)$' % word, | |
'open_view_bynum': r'(open|view)\s*(\d{1,4})$', | |
'playlist_rename_idx': r'mv\s*(\d{1,3})\s*(%s)\s*$' % word | |
} | |
# compile regexp's | |
regx = {name: re.compile(val, re.UNICODE) for name, val in regx.items()} | |
prompt = "> " + c.y if not mswin else "> " | |
while True: | |
try: | |
# get user input | |
userinput = inp or compat_input(prompt) | |
userinput = userinput.strip() | |
print(c.w) | |
except (KeyboardInterrupt, EOFError): | |
userinput = prompt_for_exit() | |
inp = None | |
for k, v in regx.items(): | |
if v.match(userinput): | |
func, matches = k, v.match(userinput).groups() | |
try: | |
globals()[func](*matches) | |
except IndexError: | |
g.message = F('invalid range') | |
g.content = g.content or generate_songlist_display() | |
break | |
else: | |
g.content = g.content or generate_songlist_display() | |
if userinput: | |
g.message = c.b + "Bad syntax. Enter h for help" + c.w | |
screen_update() | |
if __name__ == "__main__": | |
if has_colorama: | |
init_colorama() | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment