#!/usr/bin/env python # Print i3 workspaces on every change. # # Format: # For every workspace (x = workspace name) # - "FOCx" -> Focused workspace # - "INAx" -> Inactive workspace # - "ACTx" -> Ative workspace # - "URGx" -> Urgent workspace # # Based in wsbar.py en examples dir # # 16 feb 2015 - Electro7 import sys, os, signal import logging import time from subprocess import call import i3ipc print_stdout = False class State(object): # workspace states focused = 'FOC' active = 'ACT' inactive = 'INA' urgent = 'URG' def get_state(self, workspace, output): if workspace.focused: if output.current_workspace == workspace.name: return self.focused else: return self.active if workspace.urgent: return self.urgent else: return self.inactive # Create the i3ws object, then call locking function work() # Another implementation would be to create a thread in __init__ # and implement a locking join() function class i3ws(object): ws_format = '%s%s ' end_format = 'WSP%s' def __init__(self, logger, state=None): self.state = State() self.outputs = [] self.workspaces = [] self.focused_window = None self.running = True if state: self.state = state self.logger = logger # Callback functions have argument i3ws self.change_callbacks = [] self.focus_callbacks = [] def work(self): # While loop to restart connection as long as there is an i3 ipc socket while self.running: self.resetConn() self.logger.debug('Started i3 workspaces manager') self.enterMain() self.logger.debug('Finished i3 workspaces manager') time.sleep(0.5) # TODO max number of attempts + catch exception instead def resetConn(self): self.conn = i3ipc.Connection() # Run call backs once for cb in self.change_callbacks: cb(self) for cb in self.focus_callbacks: cb(self) self.conn.on('workspace::focus' , self.change) self.conn.on('workspace::init' , self.change) self.conn.on('workspace::empty' , self.change) self.conn.on('window::focus' , self.win_focused) self.conn.on('shutdown' , self.shutdown) def enterMain(self): try: # Locking call self.conn.main() except BrokenPipeError: self.logger.debug('Broken pipe in i3 workspaces thread, exiting') except: self.logger.debug('Unknown exception in i3 workspaces thread, exiting') def shutdown(self, i3, e): self.logger.debug('Shut down i3ipc') def change(self, i3, e): # Receives event and workspace data self.outputs = i3.get_outputs() fmt_outputs = ['DISP'] for output in self.outputs: if output.active: fmt_outputs.append(output.name) self.display(':'.join(fmt_outputs)) self.workspaces = i3.get_workspaces() text = self.format(self.workspaces, self.outputs) self.display(text) # Callbacks for cb in self.change_callbacks: cb(self) def win_focused(self, i3, e): self.focused_window = i3.get_tree().find_focused() name = self.focused_window.name text = 'WIN{}'.format(name) self.display(text) # Callbacks for cb in self.focus_callbacks: cb(self) def format(self, workspaces, outputs): # Formats the text according to the workspace data given. # Only important when running in free standing mode out = '' for workspace in workspaces: output = None for output_ in outputs: if output_.name == workspace.output: output = output_ break if not output: continue st = self.state.get_state(workspace, output) name = workspace.name.replace(" ","___") item= self.ws_format % (st, name) out += item return self.end_format % out def display(self, text): global print_stdout if print_stdout: # Displays the text in stout print(text) sys.stdout.flush() def quit(self): self.logger.debug('Quitting i3 workspace script') self.running = False self.conn.main_quit() def handle_exit(signal, frame): global ws print("Recieved Keyboard Interrupt from user") ws.quit() sys.exit(1) if __name__ == '__main__': # Run as stand-alone print_stdout = True # Setup logger to stdout with debug log level formatter = logging.Formatter('%(asctime)s %(levelname)s %(message)s') handler = logging.StreamHandler(sys.stdout) handler.setFormatter(formatter) logger = logging.getLogger('Normal logger') logger.setLevel(logging.DEBUG) logger.addHandler(handler) # Capture Keyboard Interrupt (i3ipc captures this internally) signal.signal(signal.SIGINT, handle_exit) # Start main ws = i3ws(logger = logger) ws.work()