195 lines
5.8 KiB
Python
Executable File
195 lines
5.8 KiB
Python
Executable File
#!/usr/bin/env python
|
|
# Print i3 workspaces on every change.
|
|
#
|
|
# Format:
|
|
# For every workspace (x = workspace name)
|
|
# - "FOCx" -> Focused workspace
|
|
# - "INAx" -> Inactive workspace
|
|
# - "ACTx" -> Ative workspace
|
|
# - "URGx" -> Urgent workspace
|
|
#
|
|
# Based in wsbar.py en examples dir
|
|
#
|
|
# 16 feb 2015 - Electro7
|
|
|
|
|
|
import sys, os, signal
|
|
import logging
|
|
import time
|
|
from subprocess import call
|
|
import i3ipc
|
|
|
|
print_stdout = False
|
|
|
|
class State(object):
|
|
# workspace states
|
|
focused = 'FOC'
|
|
active = 'ACT'
|
|
inactive = 'INA'
|
|
urgent = 'URG'
|
|
|
|
def get_state(self, workspace, output):
|
|
if workspace.focused:
|
|
if output.current_workspace == workspace.name:
|
|
return self.focused
|
|
else:
|
|
return self.active
|
|
if workspace.urgent:
|
|
return self.urgent
|
|
else:
|
|
return self.inactive
|
|
|
|
|
|
# Create the i3ws object, then call locking function work()
|
|
# Another implementation would be to create a thread in __init__
|
|
# and implement a locking join() function
|
|
class i3ws(object):
|
|
ws_format = '%s%s '
|
|
end_format = 'WSP%s'
|
|
|
|
def __init__(self, logger, state=None):
|
|
self.state = State()
|
|
self.outputs = []
|
|
self.workspaces = []
|
|
self.focused_window = None
|
|
self.running = True
|
|
if state:
|
|
self.state = state
|
|
self.logger = logger
|
|
|
|
# Callback functions have argument i3ws
|
|
self.change_callbacks = []
|
|
self.focus_callbacks = []
|
|
|
|
def work(self):
|
|
# While loop to restart connection as long as there is an i3 ipc socket
|
|
while self.running:
|
|
self.resetConn()
|
|
self.logger.debug('Started i3 workspaces manager')
|
|
self.enterMain()
|
|
self.logger.debug('Finished i3 workspaces manager')
|
|
|
|
def resetConn(self):
|
|
|
|
def attempt_connection(attempt):
|
|
try:
|
|
self.conn = i3ipc.Connection()
|
|
except FileNotFoundError:
|
|
# Thrown when i3 is not up. Should try again in the case of a restart
|
|
self.logger.debug('Attempt {}: Failed connecting to i3 socket'.format(attempt))
|
|
if attempt > 5:
|
|
raise ConnectionError('Could not connect to i3 socked, quitting')
|
|
time.sleep(0.1 if attempt < 3 else 0.5)
|
|
attempt_connection(attempt + 1)
|
|
|
|
attempt_connection(attempt=1)
|
|
self.outputs = self.conn.get_outputs()
|
|
|
|
self.conn.on('workspace::focus' , self.change)
|
|
self.conn.on('workspace::init' , self.change)
|
|
self.conn.on('workspace::empty' , self.change)
|
|
self.conn.on('window::focus' , self.win_focused)
|
|
self.conn.on('shutdown' , self.shutdown)
|
|
|
|
# Run call backs once by calling these functions
|
|
self.change(self.conn, None)
|
|
self.win_focused(self.conn, None)
|
|
|
|
def enterMain(self):
|
|
try:
|
|
# Locking call
|
|
self.conn.main()
|
|
except BrokenPipeError:
|
|
self.logger.debug('Broken pipe in i3 workspaces thread, exiting')
|
|
except:
|
|
self.logger.debug('Unknown exception in i3 workspaces thread, exiting')
|
|
|
|
def shutdown(self, i3, e):
|
|
self.logger.debug('Shut down i3ipc')
|
|
|
|
def change(self, i3, e):
|
|
# Receives event and workspace data
|
|
self.outputs = i3.get_outputs()
|
|
fmt_outputs = ['DISP']
|
|
for output in self.outputs:
|
|
if output.active:
|
|
fmt_outputs.append(output.name)
|
|
self.display(':'.join(fmt_outputs))
|
|
|
|
self.workspaces = i3.get_workspaces()
|
|
text = self.format(self.workspaces, self.outputs)
|
|
self.display(text)
|
|
|
|
# Callbacks
|
|
for cb in self.change_callbacks:
|
|
cb(self)
|
|
|
|
def win_focused(self, i3, e):
|
|
self.focused_window = i3.get_tree().find_focused()
|
|
name = self.focused_window.name
|
|
text = 'WIN{}'.format(name)
|
|
self.display(text)
|
|
|
|
# Callbacks
|
|
for cb in self.focus_callbacks:
|
|
cb(self)
|
|
|
|
def format(self, workspaces, outputs):
|
|
# Formats the text according to the workspace data given.
|
|
# Only important when running in free standing mode
|
|
out = ''
|
|
for workspace in workspaces:
|
|
output = None
|
|
for output_ in outputs:
|
|
if output_.name == workspace.output:
|
|
output = output_
|
|
break
|
|
if not output:
|
|
continue
|
|
st = self.state.get_state(workspace, output)
|
|
name = workspace.name.replace(" ","___")
|
|
item= self.ws_format % (st, name)
|
|
out += item
|
|
return self.end_format % out
|
|
|
|
def command(self, cmd):
|
|
self.conn.command(cmd)
|
|
|
|
def display(self, text):
|
|
global print_stdout
|
|
if print_stdout:
|
|
# Displays the text in stout
|
|
print(text)
|
|
sys.stdout.flush()
|
|
|
|
def quit(self):
|
|
self.logger.debug('Quitting i3 workspace script')
|
|
self.running = False
|
|
self.conn.main_quit()
|
|
|
|
def handle_exit(signal, frame):
|
|
global ws
|
|
print("Recieved Keyboard Interrupt from user")
|
|
ws.quit()
|
|
sys.exit(1)
|
|
|
|
if __name__ == '__main__':
|
|
# Run as stand-alone
|
|
print_stdout = True
|
|
|
|
# Setup logger to stdout with debug log level
|
|
formatter = logging.Formatter('%(asctime)s %(levelname)s %(message)s')
|
|
handler = logging.StreamHandler(sys.stdout)
|
|
handler.setFormatter(formatter)
|
|
|
|
logger = logging.getLogger('Normal logger')
|
|
logger.setLevel(logging.DEBUG)
|
|
logger.addHandler(handler)
|
|
|
|
# Capture Keyboard Interrupt (i3ipc captures this internally)
|
|
signal.signal(signal.SIGINT, handle_exit)
|
|
|
|
# Start main
|
|
ws = i3ws(logger = logger)
|
|
ws.work()
|