Files
dotfiles/.i3/lemonbar/i3_lemonbar_modules.py

512 lines
19 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import threading
import subprocess
import contextlib
import os, getpass
import i3_lemonbar_config as config
import i3_lemonbar_common as common
import i3_lemonbar_parser as parser
import i3_workspaces as wspaces
class LemonModule(threading.Thread):
# Module spawns process that generates some status message, which the module parses and
# sends to lemonbar queue. On click actions are registered with the module.
# Every module runs in its own thread
def __init__(self):
self.actions = {}
super().__init__()
self._start_module()
def run(self):
if self.status_handle is None:
return
common.logger.info('Started module {}'.format(self.__class__.__name__))
common.health_logger.info('Module {} up'.format(self.__class__.__name__))
while True:
line = self.status_handle.readline()
if not line:
common.logger.info('Reached end of module {}'.format(self.__class__.__name__))
common.health_logger.info('Module {} down'.format(self.__class__.__name__))
break
parsed = self.parse(line)
common.write_queue.put(parsed)
def stop(self):
self._stop_module()
def parse(self, line):
# TODO get rid of prefix
data = line[len(self.prefix):].split()
self._parse_data(data) # Update correct field
formatted_line = parser.format_line() # Construct entire line
return formatted_line
def register_action(self, keyword, do_action):
self.actions[keyword] = do_action
def do_action(self, keyword):
if (keyword in self.actions):
func = self.actions[keyword]
func()
# TODO If this is a floating window action, save handle to thread, so we may kill it
def format_load(data, module, alert):
# Helper function to format network modules
# Changes colors scheme to inactive when interfaces are down, or to alert when
# alert level is reached
# Returns tuple (down, up)
if data[0] == 'down':
module.alt_scheme = parser.COLOR_SCHEME.INA
return ('x', 'x')
else:
(d_v, u_v) = (data[0],data[1])
if max(float(d_v), float(u_v)) > float(alert):
module.alt_scheme = parser.COLOR_SCHEME.NET_ALERT
else:
# Reset to default
module.alt_scheme = None
return (d_v, u_v)
class ConkyFastModule(LemonModule):
def __init__(self):
self.prefix = 'CNK_FAST'
super().__init__()
def _start_module(self):
self.p_handle = subprocess.Popen(['conky', '-c', config.path+'conky_fast'],
stdout=subprocess.PIPE, text=True)
self.status_handle = self.p_handle.stdout
self.wlan_load = parser.IconTextUnit('wlan_load', action='wlan', order=13)
self.eth_load = parser.IconTextUnit('eth_load', action='eth', order=14)
self.volume = parser.IconTextUnit('volume', action='pavu', order=20)
self.date = parser.IconTextUnit('date', action='date', order=40)
self.time = parser.IconTextUnit('time', action='toggle_secs', order=41
, alt_scheme=parser.COLOR_SCHEME.SPECIAL)
self.time.modes = [mode for mode in common.bar_mode]
parser.g_parser.register_unit(self.wlan_load)
parser.g_parser.register_unit(self.eth_load)
parser.g_parser.register_unit(self.volume)
parser.g_parser.register_unit(self.date)
parser.g_parser.register_unit(self.time)
self.register_action('wlan', self.nmtui)
def _stop_module(self):
if self.p_handle is not None:
self.p_handle.terminate()
parser.g_parser.remove_unit(self.wlan_load)
parser.g_parser.remove_unit(self.eth_load)
parser.g_parser.remove_unit(self.volume)
parser.g_parser.remove_unit(self.date)
parser.g_parser.remove_unit(self.time)
def _parse_data(self, data):
# wlan and eth
(wland_v, wlanu_v) = format_load(data[5:7], self.wlan_load, config.net_alert)
self.wlan_load.items = [(config.icon_wlan + config.icon_dl, wland_v)
,(config.icon_ul, wlanu_v)]
(ethd_v, ethu_v) = format_load(data[7:9], self.eth_load, config.net_alert)
self.eth_load.items = [(config.icon_eth + config.icon_dl, ethd_v)
,(config.icon_ul, ethu_v)]
# Volume
mute = data[4] == 'MUTE' or data[4] == 'NONE'
(vol,vols) = (-1,'×') if mute else (int(data[4]), data[4]+'%')
icon_v = config.icon_vol_mute if vol == 0 else \
config.icon_vol_low if vol < 50 else config.icon_vol
self.volume.items = [(icon_v, vols)]
# Date and time
self.date.items = [(config.icon_clock, ' '.join(data[0:3]))]
self.time.items = [('', data[3] if common.show_secs else data[3][:-3])]
def nmtui(self):
subprocess.Popen(['xterm', '-class', 'FLOAT_TERM', '-e', 'nmtui'])
class ConkySlowModule(LemonModule):
def __init__(self):
self.prefix = 'CNK_SLOW'
super().__init__()
def _start_module(self):
self.p_handle = subprocess.Popen(['conky', '-c', config.path+'conky_slow'],
stdout=subprocess.PIPE, text=True)
self.status_handle = self.p_handle.stdout
self.sys_load = parser.IconTextUnit('sys_load', action='load', order=10)
self.disk = parser.IconTextUnit('disk', order=11)
self.brightness = parser.IconTextUnit('brightness', action='adj_br', order=21
, external={'BRIGHT': self.parse_brightness})
self.battery = parser.IconTextUnit('battery', action='dpms', order=22)
self.language = parser.IconTextUnit('language', action='lang', order=32
, external={'LANG': self.parse_language})
parser.g_parser.register_unit(self.sys_load)
parser.g_parser.register_unit(self.disk)
parser.g_parser.register_unit(self.brightness)
parser.g_parser.register_unit(self.battery)
parser.g_parser.register_unit(self.language)
def _stop_module(self):
parser.g_parser.remove_unit(self.sys_load)
parser.g_parser.remove_unit(self.disk)
parser.g_parser.remove_unit(self.brightness)
parser.g_parser.remove_unit(self.battery)
parser.g_parser.remove_unit(self.language)
if self.p_handle is not None:
self.p_handle.terminate()
def _parse_data(self, data):
self.parse_sys_load (data[0:2]) # System load
self.parse_disk (data[2:4]) # Disk usage
self.parse_battery (data[4:5]) # Battery
self.parse_brightness (data[5:6]) # Screen brightness
self.parse_language (data[6:7]) # Language
def parse_sys_load(self, data):
if int(data[0]) > int(config.cpu_alert):
self.sys_load.alt_scheme = parser.COLOR_SCHEME.CPU_ALERT
else:
self.sys_load.alt_scheme = None
self.sys_load.items = [(config.icon_cpu, data[0] + '%')
,(config.icon_ul, data[1])]
def parse_disk(self, data):
self.disk.items = [(config.icon_hd , data[0] + '%')
,(config.icon_home, data[1] + '%')]
def parse_battery(self, data):
(batt_stat, batt) = (data[0][0], data[0][1:])
batt_i = int(batt)
icon_batt = config.icon_charging if batt_stat == 'C' else \
config.icon_charged if batt_stat == 'F' else \
config.icon_batt_0 if batt_i < 20 else \
config.icon_batt_1 if batt_i < 40 else \
config.icon_batt_2 if batt_i < 60 else \
config.icon_batt_3 if batt_i < 80 else \
config.icon_batt_4
self.battery.items = [(icon_batt, batt+'%')]
def parse_brightness(self, data):
brtxt = str(int(float(data[0])))
self.brightness.items = [(config.icon_bright, brtxt+'%')]
def parse_language(self, data):
self.language.items = [(config.icon_lang, data[0])]
class i3Module(LemonModule):
# Handles outputs (displays), workspaces and active window
# TODO trigger formatting
def __init__(self):
super().__init__()
self.displays = ''
self.win_title = ''
self.workspaces = ''
def _start_module(self):
self.i3_ws_obj = wspaces.i3ws(logger=common.logger)
self.displays = parser.CustomUnit('displays', format_function = self.format_displays, order=-30)
self.workspaces = parser.CustomUnit('workspaces', format_function = self.format_workspaces, order=-20)
self.title = parser.CustomUnit('title', format_function = self.format_title, order=-10)
self.displays.modes.append(common.bar_mode.control)
self.workspaces.modes.append(common.bar_mode.control)
parser.g_parser.register_unit(self.displays)
parser.g_parser.register_unit(self.workspaces)
parser.g_parser.register_unit(self.title)
# Add callbacks for parsing
self.i3_ws_obj.change_callbacks.append(self.parse_displays)
self.i3_ws_obj.change_callbacks.append(self.parse_workspaces)
self.i3_ws_obj.focus_callbacks.append(self.parse_title)
# Add callbacks for special actions
self.i3_ws_obj.change_callbacks.append(i3Module.set_bg)
self.i3_ws_obj.focus_callbacks.append(i3Module.set_keymap)
self.i3_ws_obj.focus_callbacks.append(i3Module.kill_floating_windows)
def _stop_module(self):
parser.g_parser.remove_unit(self.displays)
parser.g_parser.remove_unit(self.workspaces)
parser.g_parser.remove_unit(self.title)
if self.i3_ws_obj is not None:
self.i3_ws_obj.quit()
# Overload run as i3_ws_obj.work() is a blocking command
# Parsing is instead done through callbacks
def run(self):
common.logger.info('Started module {}'.format(self.__class__.__name__))
common.health_logger.info('Module {} up'.format(self.__class__.__name__))
self.i3_ws_obj.work() # This is a blocking command
common.logger.info('Reached end of module {}'.format(self.__class__.__name__))
common.health_logger.info('Module {} down'.format(self.__class__.__name__))
def parse_displays(self, i3ws):
parsed_list = [parser.block(click='displays', font='2')]
for output in i3ws.outputs:
output_name = output.name
if output.active:
if output_name == 'eDP1':
col_head = config.color_head
elif output_name == 'DP1':
col_head = config.color_vga
elif output_name == 'HDMI2':
col_head = config.color_hdmi
else:
col_head = '#00000000' # Undefined
parsed_list.append(parser.block(fg=config.color_back, bg=col_head))
parsed_list.append(config.icon_wsp)
parsed_list.append(parser.block(click=''))
self.displays = ' '.join(parsed_list)
def parse_workspaces(self, i3ws):
prefix = parser.block(font='1', fg=config.color_back, bg=config.color_head)
prefix_foc = ''.join([parser.block(fg = config.color_head, bg=config.color_wsp)
, ' ', config.sep_right, ' '
, parser.block(fg=config.color_back, bg=config.color_wsp, font='1')])
prefix_ina = parser.block(fg=config.color_back, bg=config.color_head, font='1', append=' ')
wspces = []
for workspace in i3ws.workspaces:
# Find out which output the workspace is on
output = None # TODO actually use this information
for output_ in i3ws.outputs:
if output_.name == workspace.output:
output = output_
break
if not output:
continue
status = i3ws.state.get_state(workspace, output) # FOC or INA
name = workspace.name # e.g. 5 terms
current = ''.join([parser.block(click=('i3-msg workspace' + name))
, name, parser.block(click='')])
if status == "FOC":
wspces.append(''.join([prefix_foc, current]))
else:
wspces.append(''.join([prefix_ina, current]))
self.workspaces = ''.join([prefix, ' '.join(wspces)])
def parse_title(self, i3ws):
if i3ws.focused_window is None:
return
self.win_title = ' '.join([parser.block(fg=config.color_head, bg=config.color_sec_b2)
, config.sep_right, parser.block(fg=config.color_head, bg=config.color_sec_b2, click='mode cycle')
, config.icon_prog
, parser.block(fg=config.color_sec_b2, bg='-')
, i3ws.focused_window.name])
def format_displays(self):
return self.displays
def format_workspaces(self):
return self.workspaces
def format_title(self):
return self.win_title
def img_path(num):
dir = '/home/kuba/Obrazy/Wallpapers/'
return dir + {
1: '1_main',
2: '2_web',
3: '3_music',
4: '4_work',
5: '5_terms',
6: '6_stats',
7: '7',
8: '8',
9: '9',
}.get(int(num), 'default')
def set_bg(i3ws):
cmd_args = ['sh', '/home/kuba/scripts/set_bg.sh']
for output in i3ws.outputs:
if output.active:
bg = i3Module.img_path(output.current_workspace.partition(' ')[0])
cmd_args.append(bg)
subprocess.call(cmd_args)
def kill_floating_windows(i3ws):
if i3ws.focused_window is None:
return
role = i3ws.focused_window.window_role
wclass = i3ws.focused_window.window_class
if role != 'FLOAT_TERM' and wclass != 'FLOAT_PAVU' and wclass != 'YADWINBR':
# Is there a window that the bar has opened?
for pid in common.kill_on_unfocus:
try:
os.kill(pid, signal.SIGTERM)
except ProcessLookupError:
common.logger.debug('Tried killing process {} but it doesn\'t exist'.format(pid))
common.kill_on_unfocus = []
def set_keymap(i3ws):
if i3ws.focused_window is None:
return
wclass = i3ws.focused_window.window_class
common.cur_class = wclass
if wclass in common.keymaps:
new_km = common.keymaps[wclass]
common.logger.debug('Setting {} as keymap for {}'.format(new_km, wclass))
else:
new_km = common.def_keymap
common.logger.debug('Setting default keymap {} for {}'.format(new_km, wclass))
subprocess.call(['/home/kuba/.i3/scripts/lang.sh', 'qset', new_km])
class ScreenModule(LemonModule):
# Start, stop and send commands to screen instance
# Start detached, in UTF-8 mode. Log to fifo
start_flags = ['-d', '-m', '-U', '-L', '-Logfile', config.fifo_screen_log]
def __init__(self):
self.prefix = ''
super().__init__()
def _start_module(self):
self.identifier = 'lemonbar_{}_{}'.format(getpass.getuser(), os.getpid())
self.empty_count = 0
common.screen_send_cmd = self.send_cmd
common.create_new_fifo(config.fifo_screen_log)
self.send(self.start_flags) # Start screen
self.send_cmd('stty -echo') # Do not echo what is written to screen terminal
self.send_colon('logfile flush 0.1') # Log quickly
self.status_handle = open(config.fifo_screen_log, 'r', buffering=1)
self.controls = parser.ButtonsUnit('controls', order=-10)
self.response = parser.IconTextUnit('response', order=10)
self.controls.items = [('', config.icon_prog, 'mode cycle')
,('', '', None)
,('', 'on', 'bluetooth power on')
,('', 'off', 'bluetooth power off')
,('PXC 550', '', None)
,('', 'conn.', 'bluetooth connect pxc550')
,('', 'disc.', 'bluetooth disconnect pxc550')
]
self.response.modes = [common.bar_mode.control]
self.controls.modes = [common.bar_mode.control]
parser.g_parser.register_unit(self.response)
parser.g_parser.register_unit(self.controls)
def _stop_module(self):
self.send_colon('kill')
self.status_handle.close()
with contextlib.suppress(FileNotFoundError):
os.remove(config.fifo_screen_log)
parser.g_parser.remove_unit(self.response)
parser.g_parser.remove_unit(self.controls)
def _parse_data(self, data):
line = ' '.join(data)
line = common.strip_ansi_unicode(line)
common.logger.debug('Screen read line {}'.format(line))
if (not line.startswith('[CHG]')
and not line.isspace()):
self.response.items = [('', line)]
if not line:
# End loop if many empty lines in a row
self.empty_count = self.empty_count + 1
if self.empty_count > 3:
common.logger.error('Too many empty lines, aborting')
# TODO actually abort
else:
self.empty_count = 0
def send(self, args):
# Send something to our screen instance
subprocess.call(['screen', '-S', self.identifier] + args)
def send_cmd(self, cmd):
# Send terminal input to our screen instance
self.send(['-X', 'stuff', cmd + '\n'])
def send_colon(self, cmd):
# Send colon command to our screen instance
self.send(['-X', 'colon', cmd + '\n'])
class PowerOptionsModule(LemonModule):
def __init__(self):
self.prefix = ''
super().__init__()
def _start_module(self):
# No external commands needed
self.status_handle = None
self.power_opts = parser.CustomUnit('power', format_function=self.format_power_opts, order=-1)
self.power_opts.modes = [common.bar_mode.power]
parser.g_parser.register_unit(self.power_opts)
def _stop_module(self):
parser.g_parser.remove_unit(self.power_opts)
def _parse_data(self, data):
pass
def format_power_opts(self):
return ''.join([parser.block(fg=config.color_fore, bg=config.color_poweropts)
, ' Abort (Esc) | System (l) lock, (e) logout, (s) suspend, (h) hibernate'
, ', (r) reboot, (Shift+s) shutdown'])
def do_action(keyword):
global m_conky_fast, m_conky_slow, m_i3ws
m_i3ws.do_action(keyword)
m_conky_slow.do_action(keyword)
m_conky_fast.do_action(keyword)
m_screen.do_action(keyword)
m_power.do_action(keyword)
def start_all():
global m_conky_fast, m_conky_slow, m_i3ws, m_screen, m_power
m_i3ws = i3Module()
m_conky_slow = ConkySlowModule()
m_conky_fast = ConkyFastModule()
m_screen = ScreenModule()
m_power = PowerOptionsModule()
m_i3ws.start()
m_conky_slow.start()
m_conky_fast.start()
m_screen.start()
m_power.start()
def stop_all():
global m_conky_fast, m_conky_slow, m_i3ws, m_screen, m_power
m_i3ws.stop()
m_conky_slow.stop()
m_conky_fast.stop()
m_screen.stop()
m_power.stop()