#!/usr/bin/env python # # Print i3 workspaces on every change. # # Format: # For every workspace (x = workspace name) # - "FOCx" -> Focused workspace # - "INAx" -> Inactive workspace # - "ACTx" -> Ative workspace # - "URGx" -> Urgent workspace # # Based in wsbar.py en examples dir # # 16 feb 2015 - Electro7 import sys, os, signal import logging import time from subprocess import call import i3ipc import i3_lemonbar_config as config import i3_lemonbar_common as common def img_path(num): dir = '/home/kuba/Obrazy/Wallpapers/' return dir + { 1: '1_main', 2: '2_web', 3: '3_music', 4: '4_work', 5: '5_terms', 6: '6_stats', 7: '7', 8: '8', 9: '9', }.get(int(num), 'default') class State(object): # workspace states focused = 'FOC' active = 'ACT' inactive = 'INA' urgent = 'URG' def get_state(self, workspace, output): if workspace['focused']: if output['current_workspace'] == workspace['name']: return self.focused else: return self.active if workspace['urgent']: return self.urgent else: return self.inactive # Create the i3ws object, then call locking function work() # Another implementation would be to create a thread in __init__ # and implement a locking join() function class i3ws(object): ws_format = '%s%s ' end_format = 'WSP%s' state = State() backgrounds = {} running = True fifo = None def __init__(self, state=None, fifo_file=None): if state: self.state = state if fifo_file: self.fifo = open(fifo_file, 'w') def work(self): # While loop to restart connection as long as there is an i3 ipc socket while self.running: self.resetConn() common.logger.debug('Started i3 workspaces manager') self.enterMain() time.sleep(0.5) # TODO max number of attempts + catch exception instead def resetConn(self): # conn self.conn = i3ipc.Connection() # Run call backs once self.change(self.conn, None) self.win_focused(self.conn, None) self.conn.on('workspace::focus' , self.change) self.conn.on('workspace::init' , self.change) self.conn.on('workspace::empty' , self.change) self.conn.on('window::focus' , self.win_focused) self.conn.on('shutdown' , self.shutdown) def enterMain(self): try: # Locking call self.conn.main() except BrokenPipeError: common.logger.debug('Broken pipe in i3 workspaces thread, exiting') except: common.logger.debug('Unknown exception in i3 workspaces thread, exiting') def shutdown(self, i3, e): common.logger.debug('Shut down i3ipc') def change(self, i3, e): # Receives event and workspace data outputs = i3.get_outputs() active = ['DISP'] for output in outputs: if output.name != 'xroot-0': active.append(output.name) self.backgrounds[output.name] = img_path(1) self.display(':'.join(active)) workspaces = i3.get_workspaces() text = self.format(workspaces, outputs) self.display(text) self.set_bg() def win_focused(self, i3, e): win = i3.get_tree().find_focused() name = win.name role = win.window_role wclass = win.window_class text = 'WIN{}'.format(name) self.display(text) common.cur_class = wclass # Kill floating windows if role != 'FLOAT_TERM' and wclass != 'FLOAT_PAVU' and wclass != 'YADWINBR': # Is there a window that the bar has opened? for pid in common.kill_on_unfocus: try: os.kill(pid, signal.SIGTERM) except ProcessLookupError: common.logger.debug('Tried killing process {} but it doesn\'t exist'.format(pid)) common.kill_on_unfocus = [] # Set correct keymap if wclass in common.keymaps: new_km = common.keymaps[wclass] common.logger.debug('Setting {} as keymap for {}'.format(new_km, wclass)) else: new_km = common.def_keymap common.logger.debug('Setting default keymap {} for {}'.format(common.def_keymap, wclass)) call(['/home/kuba/.i3/scripts/lang.sh', 'qset', new_km]) def format(self, workspaces, outputs): # Formats the text according to the workspace data given. out = '' for workspace in workspaces: output = None for output_ in outputs: if output_['name'] == workspace['output']: output = output_ break if not output: continue st = self.state.get_state(workspace, output) if st == 'FOC': self.backgrounds[output['name']] = img_path(workspace['name'].partition(' ')[0]) name = workspace['name'].replace(" ","___") item= self.ws_format % (st, name) out += item return self.end_format % out def display(self, text): if self.fifo is not None: self.fifo.write(text + '\n') self.fifo.flush() else: # Displays the text in stout print(text) sys.stdout.flush() def set_bg(self): args = '' for key in self.backgrounds.keys(): args += ' ' + key + ' ' + self.backgrounds[key] call(['sh', '/home/kuba/.i3/lemonbar/set_bg.sh', args]) def quit(self): common.logger.debug('Quitting i3 workspace script') self.running = False self.conn.main_quit() if self.fifo is not None: try: self.fifo.close() except BrokenPipeError: pass def handle_exit(signal, frame): global ws print("Recieved Keyboard Interrupt from user") ws.quit() sys.exit(1) if __name__ == '__main__': # Setup logger to stdout with debug log level formatter = logging.Formatter('%(asctime)s %(levelname)s %(message)s') handler = logging.StreamHandler(sys.stdout) handler.setFormatter(formatter) common.logger = logging.getLogger('Normal logger') common.logger.setLevel(logging.DEBUG) common.logger.addHandler(handler) # Capture Keyboard Interrupt (i3ipc captures this internally) signal.signal(signal.SIGINT, handle_exit) # Start main ws = i3ws() ws.work()