Files
dotfiles/.i3/lemonbar/i3_workspaces.py

181 lines
5.2 KiB
Python
Executable File

#!/usr/bin/env python
# Print i3 workspaces on every change.
#
# Format:
# For every workspace (x = workspace name)
# - "FOCx" -> Focused workspace
# - "INAx" -> Inactive workspace
# - "ACTx" -> Ative workspace
# - "URGx" -> Urgent workspace
#
# Based in wsbar.py en examples dir
#
# 16 feb 2015 - Electro7
import sys, os, signal
import logging
import time
from subprocess import call
import i3ipc
print_stdout = False
class State(object):
# workspace states
focused = 'FOC'
active = 'ACT'
inactive = 'INA'
urgent = 'URG'
def get_state(self, workspace, output):
if workspace.focused:
if output.current_workspace == workspace.name:
return self.focused
else:
return self.active
if workspace.urgent:
return self.urgent
else:
return self.inactive
# Create the i3ws object, then call locking function work()
# Another implementation would be to create a thread in __init__
# and implement a locking join() function
class i3ws(object):
ws_format = '%s%s '
end_format = 'WSP%s'
def __init__(self, logger, state=None):
self.state = State()
self.outputs = []
self.workspaces = []
self.focused_window = None
self.running = True
if state:
self.state = state
self.logger = logger
# Callback functions have argument i3ws
self.change_callbacks = []
self.focus_callbacks = []
def work(self):
# While loop to restart connection as long as there is an i3 ipc socket
while self.running:
self.resetConn()
self.logger.debug('Started i3 workspaces manager')
self.enterMain()
self.logger.debug('Finished i3 workspaces manager')
time.sleep(0.5) # TODO max number of attempts + catch exception instead
def resetConn(self):
self.conn = i3ipc.Connection()
self.outputs = self.conn.get_outputs()
self.conn.on('workspace::focus' , self.change)
self.conn.on('workspace::init' , self.change)
self.conn.on('workspace::empty' , self.change)
self.conn.on('window::focus' , self.win_focused)
self.conn.on('shutdown' , self.shutdown)
# Run call backs once by calling these functions
self.change(self.conn, None)
self.win_focused(self.conn, None)
def enterMain(self):
try:
# Locking call
self.conn.main()
except BrokenPipeError:
self.logger.debug('Broken pipe in i3 workspaces thread, exiting')
except:
self.logger.debug('Unknown exception in i3 workspaces thread, exiting')
def shutdown(self, i3, e):
self.logger.debug('Shut down i3ipc')
def change(self, i3, e):
# Receives event and workspace data
self.outputs = i3.get_outputs()
fmt_outputs = ['DISP']
for output in self.outputs:
if output.active:
fmt_outputs.append(output.name)
self.display(':'.join(fmt_outputs))
self.workspaces = i3.get_workspaces()
text = self.format(self.workspaces, self.outputs)
self.display(text)
# Callbacks
for cb in self.change_callbacks:
cb(self)
def win_focused(self, i3, e):
self.focused_window = i3.get_tree().find_focused()
name = self.focused_window.name
text = 'WIN{}'.format(name)
self.display(text)
# Callbacks
for cb in self.focus_callbacks:
cb(self)
def format(self, workspaces, outputs):
# Formats the text according to the workspace data given.
# Only important when running in free standing mode
out = ''
for workspace in workspaces:
output = None
for output_ in outputs:
if output_.name == workspace.output:
output = output_
break
if not output:
continue
st = self.state.get_state(workspace, output)
name = workspace.name.replace(" ","___")
item= self.ws_format % (st, name)
out += item
return self.end_format % out
def display(self, text):
global print_stdout
if print_stdout:
# Displays the text in stout
print(text)
sys.stdout.flush()
def quit(self):
self.logger.debug('Quitting i3 workspace script')
self.running = False
self.conn.main_quit()
def handle_exit(signal, frame):
global ws
print("Recieved Keyboard Interrupt from user")
ws.quit()
sys.exit(1)
if __name__ == '__main__':
# Run as stand-alone
print_stdout = True
# Setup logger to stdout with debug log level
formatter = logging.Formatter('%(asctime)s %(levelname)s %(message)s')
handler = logging.StreamHandler(sys.stdout)
handler.setFormatter(formatter)
logger = logging.getLogger('Normal logger')
logger.setLevel(logging.DEBUG)
logger.addHandler(handler)
# Capture Keyboard Interrupt (i3ipc captures this internally)
signal.signal(signal.SIGINT, handle_exit)
# Start main
ws = i3ws(logger = logger)
ws.work()