#!/usr/bin/python # hella curses interface for http://www.hellanzb.com/ # by Ashok Argent-Katwala # No rights reserved. Go crazy. # v1.18 import curses import time import xmlrpclib import curses.wrapper import pprint import threading import socket import sys,os,statvfs import imp pw = None try: f = open(os.path.expanduser('.hellapass')) pw = f.read() except: try: pw = getpass.getpass("HellaNZB Password: ") except: exit(-1) if not pw: exit(-2) SERVER = "http://hellanzb:"+pw+"@localhost:8760" REFRESH_TIME = 2 SHOW_DIRECTORY_COMMAND = None if os.uname()[0] =="Darwin" and not SHOW_DIRECTORY_COMMAND: SHOW_DIRECTORY_COMMAND = "open" def format_duration(secs): mins, secs = divmod(secs, 60) hours, mins = divmod(mins, 60) return '%02d:%02d:%02d' % (hours, mins, secs) def format_megabytes(meg): if (meg<1060): return "%dMB" % meg elif (meg<1060*1024): return "%.2fGB" % (meg/1024.) else: return "%.3fTB" % (meg/1024./1024.) def format_kilobytes(kilo): if (kilo<1060): return "%dKB" % kilo if (kilo<1060*1024): return "%.2fMB" % (kilo/1024.) elif (kilo<1060*1024*1024): return "%.2fGB" % (kilo/1024./1024.) else: return "%.3fTB" % (kilo/1024./1024./1024.) def clip_fill(t,size): if len(t)>size: return t[0:size-3]+"..." else: return t.ljust(size) def ord_list(c): return [ord(c),ord(c.upper())] # These just here so that we can exec the config file. def defineServer(**kwargs): pass def defineMusicType(x,y,z): pass class ShowHella: selected = -1 server = False screen = None list = None current = None width = None height = None starting_up = False DEST_DIR = False def __init__(self): self.pp = pprint.PrettyPrinter(indent=4) try: self.server = xmlrpclib.ServerProxy(SERVER) self.run_config() except socket.error: pass # should catch and report error properly #self.pp.pprint(self.server.status()) #self.pp.pprint(self.server.list()) curses.wrapper(self.main) def auto_refresh(self): time.sleep(3*REFRESH_TIME) while 1: self.draw_screen() time.sleep(REFRESH_TIME) def update_list(self): status = self.server.status() self.screen.addstr(self.height-2,0,("Processing: %d; Waiting: %d; Downloading: %d" % (len(status["currently_processing"]),len(status["queued"]),len(status["currently_downloading"]))).rjust(self.width-1)) if self.DEST_DIR: f = os.statvfs(self.DEST_DIR) self.screen.addstr(self.height-2,1,("Free: %s" % format_kilobytes(int(f[statvfs.F_BAVAIL])*int(f[statvfs.F_FRSIZE])/1024))) try: dl_status = status["currently_downloading"][0] except: return if status["is_paused"]: state = "[=Paused=]" supplemental = " e.t.a --:--:--" else: state = "".join(("[",(format_kilobytes(status["rate"])+"/s").rjust(8),"]")) supplemental = " e.t.a "+format_duration(status["eta"]) if self.selected==-1: marker = "=> " color = curses.color_pair(3) hicolor = curses.color_pair(4) else: marker = " " color = curses.color_pair(0) hicolor = curses.color_pair(2) self.screen.addstr(2,1,marker) self.screen.addstr(state+": ",color) self.screen.addstr(clip_fill(dl_status["nzbName"],self.width-37),curses.A_BOLD|hicolor) self.screen.addstr(supplemental,color) bar_width = self.width-28 parts_done = status["percent_complete"]*bar_width/100 progress = "#"*parts_done + "-"*(bar_width-parts_done) try: remain = " "+format_megabytes(status["queued_mb"])+" left" except KeyError: pass self.screen.addstr(3,1,marker+"["+progress+"]"+str(status["percent_complete"]).rjust(3)+"%"+remain) self.list = status["queued"] i = 0 for l in self.list: if i==self.selected: marker = "=> " color = curses.color_pair(3) else: marker = " " color = curses.color_pair(0) try: self.screen.addstr(i+4,1,marker) self.screen.addstr(clip_fill(l["nzbName"],self.width-24),color) self.screen.addstr((" ("+format_megabytes(l["total_mb"])+")").rjust(10),color) except Exception: pass i+=1 x = 0 y = self.height - 3 self.screen.addstr(y,x," ") def up(self): self.selected = max(-1,self.selected-1) self.update_list() def down(self): self.selected = min(len(self.server.list())-1,self.selected+1) self.update_list() def toggle_pause(self): if self.server.status()["is_paused"]: getattr(self.server, 'continue')() else: self.server.pause() self.draw_screen() def init_screen(self,scr): self.screen = scr if curses.has_colors(): curses.init_pair(8, curses.COLOR_BLACK, curses.COLOR_WHITE) curses.init_pair(1, curses.COLOR_WHITE, curses.COLOR_RED) curses.init_pair(2, curses.COLOR_RED, curses.COLOR_WHITE) curses.init_pair(3, curses.COLOR_WHITE, curses.COLOR_BLUE) curses.init_pair(4, curses.COLOR_RED, curses.COLOR_BLUE) self.screen.bkgd(' ',curses.color_pair(8)) def draw_screen(self): self.screen.clear() self.height, self.width = self.screen.getmaxyx() menu = ["Quit","Pause","eXpel","Up","Down","Shutdown"] if self.DEST_DIR and SHOW_DIRECTORY_COMMAND: menu.append("showFiles") try: if self.starting_up: title_status = "Starting..." else: status = self.server.status() if not len(status["currently_downloading"]): if len(status["currently_processing"]): title_status = "Processing" else: title_status = "Waiting" menu[1] = " " elif status["is_paused"]: title_status = "Paused" menu[1] = "unPause" else: title_status = "Downloading" except socket.error: menu[5] = "Startup" title_status = "Not running (press S to start)" self.screen.addstr(0, 0, ("HellaNZB: "+title_status).center(self.width), curses.A_BOLD|curses.color_pair(1)) bar = " "+" ".join(menu) self.screen.addstr(self.height-1, 1, bar.ljust(self.width-2), curses.color_pair(1)) try: self.screen.addstr(self.height-7,1,status["log_entries"][-3]["ERROR"].ljust(self.width-2)) self.screen.addstr(self.height-6,1,status["log_entries"][-2]["ERROR"].ljust(self.width-2)) self.screen.addstr(self.height-5,1,status["log_entries"][-1]["ERROR"].ljust(self.width-2)) except: pass try: self.update_list() except socket.error: pass self.screen.move(self.height-3,0) self.screen.refresh() def showfiles(self): if self.DEST_DIR and SHOW_DIRECTORY_COMMAND: os.system(" ".join([SHOW_DIRECTORY_COMMAND,self.DEST_DIR])) def expel(self): if self.selected==-1 or len(self.list)==0: self.server.cancel() else: self.server.dequeue(self.list[self.selected]["id"]) if len(self.list)==self.selected+1: self.up() self.draw_screen() def move_down(self): if self.selected==-1: if len(self.list)>1: self.server.force(self.list[0]["id"]) self.selected = 0 else: self.server.down(self.list[self.selected]["id"]) self.down() self.draw_screen() def move_up(self): if self.selected==-1: pass elif self.selected==0: self.server.force(self.list[0]["id"]) self.selected=-1 else: self.server.up(self.list[self.selected]["id"]) self.up() self.draw_screen() def run_config(self): config_file = self.server.status()['config_file'] Hellanzb = self execfile(config_file) def startup_or_shutdown(self): try: self.server.shutdown() except socket.error: self.starting_up = True self.draw_screen() os.system("hellanzb.py -D") self.starting_up = False self.run_config() self.draw_screen() def main(self,scr): self.init_screen(scr) self.draw_screen() refresher = threading.Thread(target=self.auto_refresh) refresher.setDaemon(True) refresher.start() chars = ord_list while 1: c = self.screen.getch() if c in chars('q'): break # Exit the while() elif c in chars('p'): self.toggle_pause() elif c in [curses.KEY_UP,65]: self.up() elif c in [curses.KEY_DOWN,66]: self.down() elif c == curses.KEY_RESIZE: self.draw_screen() elif c in chars('s'): self.startup_or_shutdown() elif c in chars('u'): self.move_up() elif c in chars('d'): self.move_down() elif c in chars('x'): self.expel() elif c in chars('f'): self.showfiles() try: x = ShowHella() except socket.error, e: sys.stderr.write(e[1]+"\n") sys.exit(2)