488 lines
18 KiB
Python
488 lines
18 KiB
Python
import threading
|
||
import subprocess
|
||
import contextlib
|
||
import os, getpass
|
||
|
||
import i3_lemonbar_config as config
|
||
import i3_lemonbar_common as common
|
||
import i3_lemonbar_parser as parser
|
||
import i3_workspaces as wspaces
|
||
|
||
class LemonModule(threading.Thread):
|
||
# Module generates some status message, parses it, and handles on click actions
|
||
# Every module runs in its own thread
|
||
|
||
def __init__(self):
|
||
super().__init__()
|
||
self._start_module()
|
||
|
||
def run(self):
|
||
if self.status_handle is None:
|
||
return
|
||
|
||
common.logger.info('Started module {}'.format(self.__class__.__name__))
|
||
common.health_logger.info('Module {} up'.format(self.__class__.__name__))
|
||
while True:
|
||
line = self.status_handle.readline()
|
||
if not line:
|
||
common.logger.info('Reached end of module {}'.format(self.__class__.__name__))
|
||
common.health_logger.info('Module {} down'.format(self.__class__.__name__))
|
||
break
|
||
|
||
parsed = self.parse(line)
|
||
common.write_queue.put(parsed)
|
||
|
||
def stop(self):
|
||
self._stop_module()
|
||
|
||
def parse(self, line):
|
||
# TODO get rid of prefix
|
||
data = line[len(self.prefix):].split()
|
||
self._parse_data(data) # Update correct field
|
||
|
||
formatted_line = parser.format_line() # Construct entire line
|
||
return formatted_line
|
||
|
||
def format_load(data, module, alert):
|
||
# Helper function to format network modules
|
||
# Changes colors scheme to inactive when interfaces are down, or to alert when
|
||
# alert level is reached
|
||
# Returns tuple (down, up)
|
||
if data[0] == 'down':
|
||
module.alt_scheme = parser.COLOR_SCHEME.INA
|
||
return ('x', 'x')
|
||
else:
|
||
(d_v, u_v) = (data[0],data[1])
|
||
if max(float(d_v), float(u_v)) > float(alert):
|
||
module.alt_scheme = parser.COLOR_SCHEME.NET_ALERT
|
||
else:
|
||
# Reset to default
|
||
module.alt_scheme = None
|
||
return (d_v, u_v)
|
||
|
||
class ConkyFastModule(LemonModule):
|
||
|
||
def __init__(self):
|
||
self.prefix = 'CNK_FAST'
|
||
super().__init__()
|
||
|
||
def _start_module(self):
|
||
self.p_handle = subprocess.Popen(['conky', '-c', config.path+'conky_fast'],
|
||
stdout=subprocess.PIPE, text=True)
|
||
self.status_handle = self.p_handle.stdout
|
||
|
||
self.wlan_load = parser.IconTextUnit('wlan_load', action='wlan', order=13)
|
||
self.eth_load = parser.IconTextUnit('eth_load', action='eth', order=14)
|
||
self.volume = parser.IconTextUnit('volume', action='pavu', order=20)
|
||
self.date = parser.IconTextUnit('date', action='date', order=40)
|
||
self.time = parser.IconTextUnit('time', action='toggle_secs', order=41
|
||
, alt_scheme=parser.COLOR_SCHEME.SPECIAL)
|
||
self.time.modes = [mode for mode in common.bar_mode]
|
||
|
||
parser.g_parser.register_unit(self.wlan_load)
|
||
parser.g_parser.register_unit(self.eth_load)
|
||
parser.g_parser.register_unit(self.volume)
|
||
parser.g_parser.register_unit(self.date)
|
||
parser.g_parser.register_unit(self.time)
|
||
|
||
def _stop_module(self):
|
||
if self.p_handle is not None:
|
||
self.p_handle.terminate()
|
||
|
||
parser.g_parser.remove_unit(self.wlan_load)
|
||
parser.g_parser.remove_unit(self.eth_load)
|
||
parser.g_parser.remove_unit(self.volume)
|
||
parser.g_parser.remove_unit(self.date)
|
||
parser.g_parser.remove_unit(self.time)
|
||
|
||
def _parse_data(self, data):
|
||
# wlan and eth
|
||
(wland_v, wlanu_v) = format_load(data[5:7], self.wlan_load, config.net_alert)
|
||
self.wlan_load.items = [(config.icon_wlan + config.icon_dl, wland_v)
|
||
,(config.icon_ul, wlanu_v)]
|
||
|
||
(ethd_v, ethu_v) = format_load(data[7:9], self.eth_load, config.net_alert)
|
||
self.eth_load.items = [(config.icon_eth + config.icon_dl, ethd_v)
|
||
,(config.icon_ul, ethu_v)]
|
||
|
||
# Volume
|
||
mute = data[4] == 'MUTE' or data[4] == 'NONE'
|
||
(vol,vols) = (-1,'×') if mute else (int(data[4]), data[4]+'%')
|
||
icon_v = config.icon_vol_mute if vol == 0 else \
|
||
config.icon_vol_low if vol < 50 else config.icon_vol
|
||
self.volume.items = [(icon_v, vols)]
|
||
|
||
# Date and time
|
||
self.date.items = [(config.icon_clock, ' '.join(data[0:3]))]
|
||
self.time.items = [('', data[3] if common.show_secs else data[3][:-3])]
|
||
|
||
class ConkySlowModule(LemonModule):
|
||
|
||
def __init__(self):
|
||
self.prefix = 'CNK_SLOW'
|
||
super().__init__()
|
||
|
||
def _start_module(self):
|
||
self.p_handle = subprocess.Popen(['conky', '-c', config.path+'conky_slow'],
|
||
stdout=subprocess.PIPE, text=True)
|
||
self.status_handle = self.p_handle.stdout
|
||
|
||
self.sys_load = parser.IconTextUnit('sys_load', action='load', order=10)
|
||
self.disk = parser.IconTextUnit('disk', order=11)
|
||
self.brightness = parser.IconTextUnit('brightness', action='adj_br', order=21
|
||
, external={'BRIGHT': self.parse_brightness})
|
||
self.battery = parser.IconTextUnit('battery', action='dpms', order=22)
|
||
self.language = parser.IconTextUnit('language', action='lang', order=32
|
||
, external={'LANG': self.parse_language})
|
||
|
||
parser.g_parser.register_unit(self.sys_load)
|
||
parser.g_parser.register_unit(self.disk)
|
||
parser.g_parser.register_unit(self.brightness)
|
||
parser.g_parser.register_unit(self.battery)
|
||
parser.g_parser.register_unit(self.language)
|
||
|
||
def _stop_module(self):
|
||
parser.g_parser.remove_unit(self.sys_load)
|
||
parser.g_parser.remove_unit(self.disk)
|
||
parser.g_parser.remove_unit(self.brightness)
|
||
parser.g_parser.remove_unit(self.battery)
|
||
parser.g_parser.remove_unit(self.language)
|
||
|
||
if self.p_handle is not None:
|
||
self.p_handle.terminate()
|
||
|
||
def _parse_data(self, data):
|
||
self.parse_sys_load (data[0:2]) # System load
|
||
self.parse_disk (data[2:4]) # Disk usage
|
||
self.parse_battery (data[4:5]) # Battery
|
||
self.parse_brightness (data[5:6]) # Screen brightness
|
||
self.parse_language (data[6:7]) # Language
|
||
|
||
def parse_sys_load(self, data):
|
||
if int(data[0]) > int(config.cpu_alert):
|
||
self.sys_load.alt_scheme = parser.COLOR_SCHEME.CPU_ALERT
|
||
else:
|
||
self.sys_load.alt_scheme = None
|
||
|
||
self.sys_load.items = [(config.icon_cpu, data[0] + '%')
|
||
,(config.icon_ul, data[1])]
|
||
|
||
def parse_disk(self, data):
|
||
self.disk.items = [(config.icon_hd , data[0] + '%')
|
||
,(config.icon_home, data[1] + '%')]
|
||
def parse_battery(self, data):
|
||
(batt_stat, batt) = (data[0][0], data[0][1:])
|
||
batt_i = int(batt)
|
||
icon_batt = config.icon_charging if batt_stat == 'C' else \
|
||
config.icon_charged if batt_stat == 'F' else \
|
||
config.icon_batt_0 if batt_i < 20 else \
|
||
config.icon_batt_1 if batt_i < 40 else \
|
||
config.icon_batt_2 if batt_i < 60 else \
|
||
config.icon_batt_3 if batt_i < 80 else \
|
||
config.icon_batt_4
|
||
self.battery.items = [(icon_batt, batt+'%')]
|
||
|
||
def parse_brightness(self, data):
|
||
brtxt = str(int(float(data[0])))
|
||
self.brightness.items = [(config.icon_bright, brtxt+'%')]
|
||
|
||
def parse_language(self, data):
|
||
self.language.items = [(config.icon_lang, data[0])]
|
||
|
||
class i3Module(LemonModule):
|
||
# Handles outputs (displays), workspaces and active window
|
||
# TODO trigger formatting
|
||
|
||
def __init__(self):
|
||
super().__init__()
|
||
self.displays = ''
|
||
self.win_title = ''
|
||
self.workspaces = ''
|
||
|
||
def _start_module(self):
|
||
self.i3_ws_obj = wspaces.i3ws(logger=common.logger)
|
||
|
||
self.displays = parser.CustomUnit('displays', format_function = self.format_displays, order=-30)
|
||
self.workspaces = parser.CustomUnit('workspaces', format_function = self.format_workspaces, order=-20)
|
||
self.title = parser.CustomUnit('title', format_function = self.format_title, order=-10)
|
||
self.displays.modes.append(common.bar_mode.control)
|
||
self.workspaces.modes.append(common.bar_mode.control)
|
||
|
||
parser.g_parser.register_unit(self.displays)
|
||
parser.g_parser.register_unit(self.workspaces)
|
||
parser.g_parser.register_unit(self.title)
|
||
|
||
# Add callbacks for parsing
|
||
self.i3_ws_obj.change_callbacks.append(self.parse_displays)
|
||
self.i3_ws_obj.change_callbacks.append(self.parse_workspaces)
|
||
self.i3_ws_obj.focus_callbacks.append(self.parse_title)
|
||
|
||
# Add callbacks for special actions
|
||
self.i3_ws_obj.change_callbacks.append(i3Module.set_bg)
|
||
self.i3_ws_obj.focus_callbacks.append(i3Module.set_keymap)
|
||
self.i3_ws_obj.focus_callbacks.append(i3Module.kill_floating_windows)
|
||
|
||
def _stop_module(self):
|
||
parser.g_parser.remove_unit(self.displays)
|
||
parser.g_parser.remove_unit(self.workspaces)
|
||
parser.g_parser.remove_unit(self.title)
|
||
|
||
if self.i3_ws_obj is not None:
|
||
self.i3_ws_obj.quit()
|
||
|
||
# Overload run as i3_ws_obj.work() is a blocking command
|
||
# Parsing is instead done through callbacks
|
||
def run(self):
|
||
common.logger.info('Started module {}'.format(self.__class__.__name__))
|
||
common.health_logger.info('Module {} up'.format(self.__class__.__name__))
|
||
|
||
self.i3_ws_obj.work() # This is a blocking command
|
||
|
||
common.logger.info('Reached end of module {}'.format(self.__class__.__name__))
|
||
common.health_logger.info('Module {} down'.format(self.__class__.__name__))
|
||
|
||
def parse_displays(self, i3ws):
|
||
parsed_list = [parser.block(click='displays', font='2')]
|
||
for output in i3ws.outputs:
|
||
output_name = output.name
|
||
if output.active:
|
||
if output_name == 'eDP1':
|
||
col_head = config.color_head
|
||
elif output_name == 'DP1':
|
||
col_head = config.color_vga
|
||
elif output_name == 'HDMI2':
|
||
col_head = config.color_hdmi
|
||
else:
|
||
col_head = '#00000000' # Undefined
|
||
parsed_list.append(parser.block(fg=config.color_back, bg=col_head))
|
||
parsed_list.append(config.icon_wsp)
|
||
parsed_list.append(parser.block(click=''))
|
||
|
||
self.displays = ' '.join(parsed_list)
|
||
|
||
def parse_workspaces(self, i3ws):
|
||
prefix = parser.block(font='1', fg=config.color_back, bg=config.color_head)
|
||
prefix_foc = ''.join([parser.block(fg = config.color_head, bg=config.color_wsp)
|
||
, ' ', config.sep_right, ' '
|
||
, parser.block(fg=config.color_back, bg=config.color_wsp, font='1')])
|
||
prefix_ina = parser.block(fg=config.color_back, bg=config.color_head, font='1', append=' ')
|
||
wspces = []
|
||
|
||
for workspace in i3ws.workspaces:
|
||
# Find out which output the workspace is on
|
||
output = None # TODO actually use this information
|
||
for output_ in i3ws.outputs:
|
||
if output_.name == workspace.output:
|
||
output = output_
|
||
break
|
||
if not output:
|
||
continue
|
||
status = i3ws.state.get_state(workspace, output) # FOC or INA
|
||
name = workspace.name # e.g. 5 terms
|
||
current = ''.join([parser.block(click=('i3-msg workspace' + name))
|
||
, name, parser.block(click='')])
|
||
if status == "FOC":
|
||
wspces.append(''.join([prefix_foc, current]))
|
||
else:
|
||
wspces.append(''.join([prefix_ina, current]))
|
||
|
||
self.workspaces = ''.join([prefix, ' '.join(wspces)])
|
||
|
||
def parse_title(self, i3ws):
|
||
if i3ws.focused_window is None:
|
||
return
|
||
self.win_title = ' '.join([parser.block(fg=config.color_head, bg=config.color_sec_b2)
|
||
, config.sep_right, parser.block(fg=config.color_head, bg=config.color_sec_b2, click='mode cycle')
|
||
, config.icon_prog
|
||
, parser.block(fg=config.color_sec_b2, bg='-')
|
||
, i3ws.focused_window.name])
|
||
|
||
def format_displays(self):
|
||
return self.displays
|
||
|
||
def format_workspaces(self):
|
||
return self.workspaces
|
||
|
||
def format_title(self):
|
||
return self.win_title
|
||
|
||
def img_path(num):
|
||
dir = '/home/kuba/Obrazy/Wallpapers/'
|
||
return dir + {
|
||
1: '1_main',
|
||
2: '2_web',
|
||
3: '3_music',
|
||
4: '4_work',
|
||
5: '5_terms',
|
||
6: '6_stats',
|
||
7: '7',
|
||
8: '8',
|
||
9: '9',
|
||
}.get(int(num), 'default')
|
||
|
||
def set_bg(i3ws):
|
||
cmd_args = ['sh', '/home/kuba/scripts/set_bg.sh']
|
||
for output in i3ws.outputs:
|
||
if output.active:
|
||
bg = i3Module.img_path(output.current_workspace.partition(' ')[0])
|
||
cmd_args.append(bg)
|
||
subprocess.call(cmd_args)
|
||
|
||
def kill_floating_windows(i3ws):
|
||
if i3ws.focused_window is None:
|
||
return
|
||
|
||
role = i3ws.focused_window.window_role
|
||
wclass = i3ws.focused_window.window_class
|
||
|
||
if role != 'FLOAT_TERM' and wclass != 'FLOAT_PAVU' and wclass != 'YADWINBR':
|
||
# Is there a window that the bar has opened?
|
||
for pid in common.kill_on_unfocus:
|
||
try:
|
||
os.kill(pid, signal.SIGTERM)
|
||
except ProcessLookupError:
|
||
common.logger.debug('Tried killing process {} but it doesn\'t exist'.format(pid))
|
||
common.kill_on_unfocus = []
|
||
|
||
def set_keymap(i3ws):
|
||
if i3ws.focused_window is None:
|
||
return
|
||
wclass = i3ws.focused_window.window_class
|
||
common.cur_class = wclass
|
||
|
||
if wclass in common.keymaps:
|
||
new_km = common.keymaps[wclass]
|
||
common.logger.debug('Setting {} as keymap for {}'.format(new_km, wclass))
|
||
else:
|
||
new_km = common.def_keymap
|
||
common.logger.debug('Setting default keymap {} for {}'.format(new_km, wclass))
|
||
subprocess.call(['/home/kuba/.i3/scripts/lang.sh', 'qset', new_km])
|
||
|
||
class ScreenModule(LemonModule):
|
||
# Start, stop and send commands to screen instance
|
||
|
||
# Start detached, in UTF-8 mode. Log to fifo
|
||
start_flags = ['-d', '-m', '-U', '-L', '-Logfile', config.fifo_screen_log]
|
||
|
||
def __init__(self):
|
||
self.prefix = ''
|
||
super().__init__()
|
||
|
||
def _start_module(self):
|
||
self.identifier = 'lemonbar_{}_{}'.format(getpass.getuser(), os.getpid())
|
||
self.empty_count = 0
|
||
common.screen_send_cmd = self.send_cmd
|
||
common.create_new_fifo(config.fifo_screen_log)
|
||
self.send(self.start_flags) # Start screen
|
||
self.send_cmd('stty -echo') # Do not echo what is written to screen terminal
|
||
self.send_colon('logfile flush 0.1') # Log quickly
|
||
|
||
self.status_handle = open(config.fifo_screen_log, 'r', buffering=1)
|
||
|
||
self.controls = parser.ButtonsUnit('controls', order=-10)
|
||
self.response = parser.IconTextUnit('response', order=10)
|
||
|
||
self.controls.items = [('', config.icon_prog, 'mode cycle')
|
||
,('', '', None)
|
||
,('', 'on', 'bluetooth power on')
|
||
,('', 'off', 'bluetooth power off')
|
||
,('PXC 550', '', None)
|
||
,('', 'conn.', 'bluetooth connect pxc550')
|
||
,('', 'disc.', 'bluetooth disconnect pxc550')
|
||
]
|
||
|
||
self.response.modes = [common.bar_mode.control]
|
||
self.controls.modes = [common.bar_mode.control]
|
||
|
||
parser.g_parser.register_unit(self.response)
|
||
parser.g_parser.register_unit(self.controls)
|
||
|
||
def _stop_module(self):
|
||
self.send_colon('kill')
|
||
self.status_handle.close()
|
||
with contextlib.suppress(FileNotFoundError):
|
||
os.remove(config.fifo_screen_log)
|
||
parser.g_parser.remove_unit(self.response)
|
||
parser.g_parser.remove_unit(self.controls)
|
||
|
||
def _parse_data(self, data):
|
||
line = ' '.join(data)
|
||
line = common.strip_ansi_unicode(line)
|
||
|
||
common.logger.debug('Screen read line {}'.format(line))
|
||
if (not line.startswith('[CHG]')
|
||
and not line.isspace()):
|
||
self.response.items = [('', line)]
|
||
|
||
if not line:
|
||
# End loop if many empty lines in a row
|
||
self.empty_count = self.empty_count + 1
|
||
if self.empty_count > 3:
|
||
common.logger.error('Too many empty lines, aborting')
|
||
# TODO actually abort
|
||
else:
|
||
self.empty_count = 0
|
||
|
||
def send(self, args):
|
||
# Send something to our screen instance
|
||
subprocess.call(['screen', '-S', self.identifier] + args)
|
||
|
||
def send_cmd(self, cmd):
|
||
# Send terminal input to our screen instance
|
||
self.send(['-X', 'stuff', cmd + '\n'])
|
||
|
||
def send_colon(self, cmd):
|
||
# Send colon command to our screen instance
|
||
self.send(['-X', 'colon', cmd + '\n'])
|
||
|
||
class PowerOptionsModule(LemonModule):
|
||
|
||
def __init__(self):
|
||
self.prefix = ''
|
||
super().__init__()
|
||
|
||
def _start_module(self):
|
||
# No external commands needed
|
||
self.status_handle = None
|
||
|
||
self.power_opts = parser.CustomUnit('power', format_function=self.format_power_opts, order=-1)
|
||
|
||
self.power_opts.modes = [common.bar_mode.power]
|
||
|
||
parser.g_parser.register_unit(self.power_opts)
|
||
|
||
def _stop_module(self):
|
||
parser.g_parser.remove_unit(self.power_opts)
|
||
|
||
def _parse_data(self, data):
|
||
pass
|
||
|
||
def format_power_opts(self):
|
||
return ''.join([parser.block(fg=config.color_fore, bg=config.color_poweropts)
|
||
, ' Abort (Esc) | System (l) lock, (e) logout, (s) suspend, (h) hibernate'
|
||
, ', (r) reboot, (Shift+s) shutdown'])
|
||
|
||
def start_all():
|
||
global m_conky_fast, m_conky_slow, m_i3ws
|
||
|
||
m_i3ws = i3Module()
|
||
m_conky_slow = ConkySlowModule()
|
||
m_conky_fast = ConkyFastModule()
|
||
m_screen = ScreenModule()
|
||
m_power = PowerOptionsModule()
|
||
|
||
m_i3ws.start()
|
||
m_conky_slow.start()
|
||
m_conky_fast.start()
|
||
m_screen.start()
|
||
m_power.start()
|
||
|
||
def stop_all():
|
||
global m_conky_fast, m_conky_slow, m_i3ws
|
||
|
||
m_i3ws.stop()
|
||
m_conky_slow.stop()
|
||
m_conky_fast.stop()
|
||
m_screen.stop()
|
||
m_power.stop()
|