Files
dotfiles/.i3/lemonbar/i3_lemonbar_modules.py
Jakub Fojt c6f48faff0 lemonbar: Cleaning up
Add connection to i3ws object. Log exceptions with debugger
2019-12-21 10:21:56 +01:00

569 lines
21 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import threading
import subprocess
import contextlib
import os, signal, getpass
import i3_lemonbar_config as config
import i3_lemonbar_common as common
import i3_lemonbar_parser as parser
import i3_workspaces as wspaces
class LemonModule(threading.Thread):
# Module spawns process that generates some status message, which the module parses and
# sends to lemonbar queue. On click actions are registered with the module.
# Every module runs in its own thread
def __init__(self):
self.actions = {}
super().__init__()
self._start_module()
def run(self):
if self.status_handle is None:
return
common.logger.info('Started module {}'.format(self.__class__.__name__))
common.health_logger.info('Module {} up'.format(self.__class__.__name__))
while True:
line = self.status_handle.readline()
if not line:
common.logger.info('Reached end of module {}'.format(self.__class__.__name__))
common.health_logger.info('Module {} down'.format(self.__class__.__name__))
break
parsed = self.parse(line)
common.write_queue.put(parsed)
def stop(self):
self._stop_module()
def parse(self, line):
data = line.split()
self._parse_data(data) # Update correct field
formatted_line = parser.format_line() # Construct entire line
return formatted_line
def register_action(self, keyword, do_action):
self.actions[keyword] = do_action
def do_action(self, command):
action_arr = command.split()
action = action_arr[0]
if (action in self.actions):
func = self.actions[action]
func(*action_arr[1:]) # Unpack arguments, if any
def format_load(data, module, alert):
# Helper function to format network modules
# Changes colors scheme to inactive when interfaces are down, or to alert when
# alert level is reached
# Returns tuple (down, up)
if data[0] == 'down':
module.alt_scheme = parser.COLOR_SCHEME.INA
return ('x', 'x')
else:
(d_v, u_v) = (data[0],data[1])
if max(float(d_v), float(u_v)) > float(alert):
module.alt_scheme = parser.COLOR_SCHEME.NET_ALERT
else:
# Reset to default
module.alt_scheme = None
return (d_v, u_v)
class ConkyFastModule(LemonModule):
def __init__(self):
self.show_secs = False
super().__init__()
def _start_module(self):
self.p_handle = subprocess.Popen(['conky', '-c', config.path+'conky_fast'],
stdout=subprocess.PIPE, text=True)
self.status_handle = self.p_handle.stdout
self.wlan_load = parser.IconTextUnit('wlan_load', action='wlan', order=13)
self.eth_load = parser.IconTextUnit('eth_load', action='eth', order=14)
self.volume = parser.IconTextUnit('volume', action='pavu', order=20)
self.date = parser.IconTextUnit('date', action='date', order=40)
self.time = parser.IconTextUnit('time', action='toggle_secs', order=41
, alt_scheme=parser.COLOR_SCHEME.SPECIAL)
self.time.modes = [mode for mode in common.bar_mode]
parser.g_parser.register_unit(self.wlan_load)
parser.g_parser.register_unit(self.eth_load)
parser.g_parser.register_unit(self.volume)
parser.g_parser.register_unit(self.date)
parser.g_parser.register_unit(self.time)
self.register_action('wlan' , self.nmtui)
self.register_action('pavu' , self.pavu)
self.register_action('date' , self.date_comm)
self.register_action('toggle_secs' , self.toggle_secs)
def _stop_module(self):
if self.p_handle is not None:
self.p_handle.terminate()
parser.g_parser.remove_unit(self.wlan_load)
parser.g_parser.remove_unit(self.eth_load)
parser.g_parser.remove_unit(self.volume)
parser.g_parser.remove_unit(self.date)
parser.g_parser.remove_unit(self.time)
def _parse_data(self, data):
# wlan and eth
(wland_v, wlanu_v) = format_load(data[5:7], self.wlan_load, config.net_alert)
self.wlan_load.items = [(config.icon_wlan + config.icon_dl, wland_v)
,(config.icon_ul, wlanu_v)]
(ethd_v, ethu_v) = format_load(data[7:9], self.eth_load, config.net_alert)
self.eth_load.items = [(config.icon_eth + config.icon_dl, ethd_v)
,(config.icon_ul, ethu_v)]
# Volume
mute = data[4] == 'MUTE' or data[4] == 'NONE'
(vol,vols) = (-1,'×') if mute else (int(data[4]), data[4]+'%')
icon_v = config.icon_vol_mute if vol == 0 else \
config.icon_vol_low if vol < 50 else config.icon_vol
self.volume.items = [(icon_v, vols)]
# Date and time
self.date.items = [(config.icon_clock, ' '.join(data[0:3]))]
self.time.items = [('', data[3] if self.show_secs else data[3][:-3])]
def nmtui(self):
p = subprocess.Popen(['xterm', '-class', 'FLOAT_TERM', '-e', 'nmtui'])
common.kill_on_unfocus.append(p.pid)
def pavu(self):
p = subprocess.Popen(['pavucontrol', '--class=FLOAT_PAVU'])
common.kill_on_unfocus.append(p.pid)
def date_comm(self):
subprocess.Popen(['yad', '--no-buttons', '--calendar', '--sticky'
, '--on-top' , '--class' , '"YADWIN"', '--posx=1650', '--posy=24'
, '--close-on-unfocus'])
def toggle_secs(self):
self.show_secs = not self.show_secs
class ConkySlowModule(LemonModule):
def __init__(self):
super().__init__()
def _start_module(self):
self.p_handle = subprocess.Popen(['conky', '-c', config.path+'conky_slow'],
stdout=subprocess.PIPE, text=True)
self.status_handle = self.p_handle.stdout
self.sys_load = parser.IconTextUnit('sys_load', action='load', order=10)
self.disk = parser.IconTextUnit('disk', order=11)
self.brightness = parser.IconTextUnit('brightness', action='adj_br', order=21
, external={'BRIGHT': self.parse_brightness})
self.battery = parser.IconTextUnit('battery', action='dpms', order=22)
self.language = parser.IconTextUnit('language', action='lang', order=32
, external={'LANG': self.parse_language})
parser.g_parser.register_unit(self.sys_load)
parser.g_parser.register_unit(self.disk)
parser.g_parser.register_unit(self.brightness)
parser.g_parser.register_unit(self.battery)
parser.g_parser.register_unit(self.language)
self.register_action('load' , self.htop)
self.register_action('lang' , self.lang_comm)
self.register_action('adj_br' , self.adj_br)
self.register_action('dpms' , self.dpms_comm)
def _stop_module(self):
parser.g_parser.remove_unit(self.sys_load)
parser.g_parser.remove_unit(self.disk)
parser.g_parser.remove_unit(self.brightness)
parser.g_parser.remove_unit(self.battery)
parser.g_parser.remove_unit(self.language)
if self.p_handle is not None:
self.p_handle.terminate()
def _parse_data(self, data):
self.parse_sys_load (data[0:2]) # System load
self.parse_disk (data[2:4]) # Disk usage
self.parse_battery (data[4:5]) # Battery
self.parse_brightness (data[5:6]) # Screen brightness
self.parse_language (data[6:7]) # Language
def parse_sys_load(self, data):
if int(data[0]) > int(config.cpu_alert):
self.sys_load.alt_scheme = parser.COLOR_SCHEME.CPU_ALERT
else:
self.sys_load.alt_scheme = None
self.sys_load.items = [(config.icon_cpu, data[0] + '%')
,(config.icon_ul, data[1])]
def parse_disk(self, data):
self.disk.items = [(config.icon_hd , data[0] + '%')
,(config.icon_home, data[1] + '%')]
def parse_battery(self, data):
(batt_stat, batt) = (data[0][0], data[0][1:])
batt_i = int(batt)
icon_batt = config.icon_charging if batt_stat == 'C' else \
config.icon_charged if batt_stat == 'F' else \
config.icon_batt_0 if batt_i < 20 else \
config.icon_batt_1 if batt_i < 40 else \
config.icon_batt_2 if batt_i < 60 else \
config.icon_batt_3 if batt_i < 80 else \
config.icon_batt_4
self.battery.items = [(icon_batt, batt+'%')]
def parse_brightness(self, data):
brtxt = str(int(float(data[0])))
self.brightness.items = [(config.icon_bright, brtxt+'%')]
def parse_language(self, data):
self.language.items = [(config.icon_lang, data[0])]
def htop(self):
p = subprocess.Popen(['xterm', '-class', 'FLOAT_TERM', '-e', 'htop'])
common.kill_on_unfocus.append(p.pid)
def lang_comm(self):
# TODO Connect to i3Module (or let i3Module connect here to update the keymap)
subprocess.Popen(['sh', '/home/kuba/.i3/scripts/lang.sh', 'next'])
def dpms_comm(self):
subprocess.Popen(['sh', '/home/kuba/.i3/scripts/dpmsctl.sh'])
def adj_br(self):
subprocess.Popen(['/home/kuba/.i3/scripts/adjbr.sh', '-b', config.fifo_file_executor])
class i3Module(LemonModule):
# Handles outputs (displays), workspaces and active window
# TODO trigger formatting
def __init__(self):
super().__init__()
self.displays = ''
self.win_title = ''
self.workspaces = ''
self.def_keymap = 'pl'
self.keymaps = {'firefox': 'se'}
self.cur_class = ''
def _start_module(self):
self.i3_ws_obj = wspaces.i3ws(logger=common.logger)
self.displays = parser.CustomUnit('displays', format_function = self.format_displays, order=-30)
self.workspaces = parser.CustomUnit('workspaces', format_function = self.format_workspaces, order=-20)
self.title = parser.CustomUnit('title', format_function = self.format_title, order=-10)
self.displays.modes.append(common.bar_mode.control)
self.workspaces.modes.append(common.bar_mode.control)
parser.g_parser.register_unit(self.displays)
parser.g_parser.register_unit(self.workspaces)
parser.g_parser.register_unit(self.title)
self.register_action('i3-msg' , self.i3msg_comm)
self.register_action('mode' , self.set_mode)
# Add callbacks for parsing
self.i3_ws_obj.change_callbacks.append(self.parse_displays)
self.i3_ws_obj.change_callbacks.append(self.parse_workspaces)
self.i3_ws_obj.focus_callbacks.append(self.parse_title)
# Add callbacks for special actions
self.i3_ws_obj.change_callbacks.append(self.set_bg)
self.i3_ws_obj.focus_callbacks.append(self.set_keymap)
self.i3_ws_obj.focus_callbacks.append(self.kill_floating_windows)
def _stop_module(self):
parser.g_parser.remove_unit(self.displays)
parser.g_parser.remove_unit(self.workspaces)
parser.g_parser.remove_unit(self.title)
if self.i3_ws_obj is not None:
self.i3_ws_obj.quit()
# Overload run as i3_ws_obj.work() is a blocking command
# Parsing is instead done through callbacks
def run(self):
common.logger.info('Started module {}'.format(self.__class__.__name__))
common.health_logger.info('Module {} up'.format(self.__class__.__name__))
self.i3_ws_obj.work() # This is a blocking command
common.logger.info('Reached end of module {}'.format(self.__class__.__name__))
common.health_logger.info('Module {} down'.format(self.__class__.__name__))
def parse_displays(self, i3ws):
parsed_list = [parser.block(click='displays', font='2')]
for output in i3ws.outputs:
output_name = output.name
if output.active:
if output_name == 'eDP1':
col_head = config.color_head
elif output_name == 'DP1':
col_head = config.color_vga
elif output_name == 'HDMI2':
col_head = config.color_hdmi
else:
col_head = '#00000000' # Undefined
parsed_list.append(parser.block(fg=config.color_back, bg=col_head))
parsed_list.append(config.icon_wsp)
parsed_list.append(parser.block(click=''))
self.displays = ' '.join(parsed_list)
def parse_workspaces(self, i3ws):
prefix = parser.block(font='1', fg=config.color_back, bg=config.color_head)
prefix_foc = ''.join([parser.block(fg = config.color_head, bg=config.color_wsp)
, ' ', config.sep_right, ' '
, parser.block(fg=config.color_back, bg=config.color_wsp, font='1')])
prefix_ina = parser.block(fg=config.color_back, bg=config.color_head, font='1', append=' ')
wspces = []
for workspace in i3ws.workspaces:
# Find out which output the workspace is on
output = None # TODO actually use this information
for output_ in i3ws.outputs:
if output_.name == workspace.output:
output = output_
break
if not output:
continue
status = i3ws.state.get_state(workspace, output) # FOC or INA
name = workspace.name # e.g. 5 terms
current = ''.join([parser.block(click=('i3-msg workspace' + name))
, name, parser.block(click='')])
if status == "FOC":
wspces.append(''.join([prefix_foc, current]))
else:
wspces.append(''.join([prefix_ina, current]))
self.workspaces = ''.join([prefix, ' '.join(wspces)])
def parse_title(self, i3ws):
if i3ws.focused_window is None:
return
self.win_title = ' '.join([parser.block(fg=config.color_head, bg=config.color_sec_b2)
, config.sep_right, parser.block(fg=config.color_head, bg=config.color_sec_b2, click='mode cycle')
, config.icon_prog
, parser.block(fg=config.color_sec_b2, bg='-')
, i3ws.focused_window.name])
def format_displays(self):
return self.displays
def format_workspaces(self):
return self.workspaces
def format_title(self):
return self.win_title
def img_path(num):
dir = '/home/kuba/Obrazy/Wallpapers/'
return dir + {
1: '1_main',
2: '2_web',
3: '3_music',
4: '4_work',
5: '5_terms',
6: '6_stats',
7: '7',
8: '8',
9: '9',
}.get(int(num), 'default')
def set_bg(self, i3ws):
cmd_args = ['sh', '/home/kuba/scripts/set_bg.sh']
for output in i3ws.outputs:
if output.active:
bg = i3Module.img_path(output.current_workspace.partition(' ')[0])
cmd_args.append(bg)
subprocess.call(cmd_args)
def kill_floating_windows(self, i3ws):
if i3ws.focused_window is None:
return
wclass = i3ws.focused_window.window_class
if wclass != 'FLOAT_TERM' and wclass != 'FLOAT_PAVU' and wclass != 'YADWINBR':
# Is there a window that the bar has opened?
for pid in common.kill_on_unfocus:
try:
os.kill(pid, signal.SIGTERM)
except ProcessLookupError:
common.logger.debug('Tried killing process {} but it doesn\'t exist'.format(pid))
common.kill_on_unfocus = []
def set_keymap(self, i3ws):
if i3ws.focused_window is None:
return
wclass = i3ws.focused_window.window_class
self.cur_class = wclass
if wclass in self.keymaps:
new_km = self.keymaps[wclass]
common.logger.debug('Setting {} as keymap for {}'.format(new_km, wclass))
else:
new_km = self.def_keymap
common.logger.debug('Setting default keymap {} for {}'.format(new_km, wclass))
subprocess.call(['/home/kuba/.i3/scripts/lang.sh', 'qset', new_km])
def i3msg_comm(self, *cmd):
self.i3_ws_obj.command(' '.join(cmd))
def set_mode(self, new_mode):
if new_mode == "cycle":
common.mode = common.mode.cycle()
for m in common.bar_mode:
if new_mode == m.name:
common.mode = m
break
class ScreenModule(LemonModule):
# Start, stop and send commands to screen instance
# Start detached, in UTF-8 mode. Log to fifo
start_flags = ['-d', '-m', '-U', '-L', '-Logfile', config.fifo_screen_log]
def __init__(self):
super().__init__()
def _start_module(self):
self.identifier = 'lemonbar_{}_{}'.format(getpass.getuser(), os.getpid())
self.empty_count = 0
common.create_new_fifo(config.fifo_screen_log)
self.send(self.start_flags) # Start screen
self.send_cmd('stty -echo') # Do not echo what is written to screen terminal
self.send_colon('logfile flush 0.1') # Log quickly
self.status_handle = open(config.fifo_screen_log, 'r', buffering=1)
self.controls = parser.ButtonsUnit('controls', order=-10)
self.response = parser.IconTextUnit('response', order=10)
self.controls.items = [('', config.icon_prog, 'mode cycle')
,('', '', None)
,('', 'on', 'bluetooth power on')
,('', 'off', 'bluetooth power off')
,('PXC 550', '', None)
,('', 'conn.', 'bluetooth connect pxc550')
,('', 'disc.', 'bluetooth disconnect pxc550')
]
self.response.modes = [common.bar_mode.control]
self.controls.modes = [common.bar_mode.control]
parser.g_parser.register_unit(self.response)
parser.g_parser.register_unit(self.controls)
self.register_action('bluetooth', self.bluetooth)
def _stop_module(self):
self.send_colon('kill')
self.status_handle.close()
with contextlib.suppress(FileNotFoundError):
os.remove(config.fifo_screen_log)
parser.g_parser.remove_unit(self.response)
parser.g_parser.remove_unit(self.controls)
def _parse_data(self, data):
line = ' '.join(data)
line = common.strip_ansi_unicode(line)
common.logger.debug('Screen read line {}'.format(line))
if (not line.startswith('[CHG]')
and not line.isspace()):
self.response.items = [('', line)]
if not line:
# End loop if many empty lines in a row
self.empty_count = self.empty_count + 1
if self.empty_count > 3:
common.logger.error('Too many empty lines, aborting')
# TODO actually abort
else:
self.empty_count = 0
def send(self, args):
# Send something to our screen instance
subprocess.call(['screen', '-S', self.identifier] + args)
def send_cmd(self, cmd):
# Send terminal input to our screen instance
self.send(['-X', 'stuff', cmd + '\n'])
def send_colon(self, cmd):
# Send colon command to our screen instance
self.send(['-X', 'colon', cmd + '\n'])
def bluetooth(self, *args):
btcargs = [a.replace('pxc550', '00:16:94:22:29:0E') for a in args]
inp = ' '.join(btcargs)
self.send_cmd('bluetoothctl ' + inp)
class PowerOptionsModule(LemonModule):
def __init__(self):
super().__init__()
def _start_module(self):
# No external commands needed
self.status_handle = None
self.power_opts = parser.CustomUnit('power', format_function=self.format_power_opts, order=-1)
self.power_opts.modes = [common.bar_mode.power]
parser.g_parser.register_unit(self.power_opts)
def _stop_module(self):
parser.g_parser.remove_unit(self.power_opts)
def _parse_data(self, data):
pass
def format_power_opts(self):
return ''.join([parser.block(fg=config.color_fore, bg=config.color_poweropts)
, ' Abort (Esc) | System (l) lock, (e) logout, (s) suspend, (h) hibernate'
, ', (r) reboot, (Shift+s) shutdown'])
def do_action(keyword):
global m_conky_fast, m_conky_slow, m_i3ws
m_i3ws.do_action(keyword)
m_conky_slow.do_action(keyword)
m_conky_fast.do_action(keyword)
m_screen.do_action(keyword)
m_power.do_action(keyword)
def start_all():
global m_conky_fast, m_conky_slow, m_i3ws, m_screen, m_power
m_i3ws = i3Module()
m_conky_slow = ConkySlowModule()
m_conky_fast = ConkyFastModule()
m_screen = ScreenModule()
m_power = PowerOptionsModule()
m_i3ws.start()
m_conky_slow.start()
m_conky_fast.start()
m_screen.start()
m_power.start()
def stop_all():
global m_conky_fast, m_conky_slow, m_i3ws, m_screen, m_power
m_i3ws.stop()
m_conky_slow.stop()
m_conky_fast.stop()
m_screen.stop()
m_power.stop()