710 lines
26 KiB
Python
710 lines
26 KiB
Python
import threading
|
||
import subprocess
|
||
import contextlib
|
||
import time, os, signal, getpass
|
||
import sys, inspect, re
|
||
|
||
import i3_lemonbar_config as config
|
||
import i3_lemonbar_common as common
|
||
import i3_lemonbar_parser as parser
|
||
import i3_workspaces as wspaces
|
||
|
||
class LemonModule(threading.Thread):
|
||
# Module spawns process that generates some status message, which the module parses and
|
||
# sends to lemonbar queue. On click actions are registered with the module.
|
||
# Every module runs in its own thread
|
||
|
||
def units(self):
|
||
# Iterate over units
|
||
for _, attr_instance in self.__dict__.items():
|
||
if isinstance(attr_instance, parser.LemonUnit):
|
||
yield attr_instance
|
||
|
||
def __init__(self, lemonbar_wrapper):
|
||
self.actions = {}
|
||
super().__init__()
|
||
self.lemonbar_wrapper = lemonbar_wrapper
|
||
self._start_module()
|
||
|
||
for unit in self.units():
|
||
parser.g_parser.register_unit(unit)
|
||
|
||
|
||
def run(self):
|
||
if self.status_handle is None:
|
||
return
|
||
|
||
common.logger.info('Started module {}'.format(self.__class__.__name__))
|
||
common.health_logger.info('Module {} up'.format(self.__class__.__name__))
|
||
while True:
|
||
line = self.status_handle.readline()
|
||
if not line:
|
||
common.logger.info('Reached end of module {}'.format(self.__class__.__name__))
|
||
common.health_logger.info('Module {} down'.format(self.__class__.__name__))
|
||
break
|
||
|
||
parsed = self.parse(line)
|
||
self.lemonbar_wrapper.buffer_in.put(parsed)
|
||
|
||
def stop(self):
|
||
self._stop_module()
|
||
|
||
for unit in self.units():
|
||
parser.g_parser.remove_unit(unit)
|
||
|
||
def parse(self, line):
|
||
data = line.split()
|
||
self._parse_data(data) # Update correct field
|
||
|
||
formatted_line = parser.format_line() # Construct entire line
|
||
return formatted_line
|
||
|
||
def register_action(self, keyword, do_action):
|
||
self.actions[keyword] = do_action
|
||
|
||
def do_action(self, command):
|
||
action_arr = command.split()
|
||
action = action_arr[0]
|
||
if (action in self.actions):
|
||
func = self.actions[action]
|
||
func(*action_arr[1:]) # Unpack arguments, if any
|
||
|
||
def format_load(data, module, alert):
|
||
# Helper function to format network modules
|
||
# Changes colors scheme to inactive when interfaces are down, or to alert when
|
||
# alert level is reached
|
||
# Returns tuple (down, up)
|
||
if data[0] == 'down':
|
||
module.alt_scheme = parser.COLOR_SCHEME.INA
|
||
return ('x', 'x')
|
||
else:
|
||
(d_v, u_v) = (data[0],data[1])
|
||
if max(float(d_v), float(u_v)) > float(alert):
|
||
module.alt_scheme = parser.COLOR_SCHEME.NET_ALERT
|
||
else:
|
||
# Reset to default
|
||
module.alt_scheme = None
|
||
return (d_v, u_v)
|
||
|
||
class DateTimeModule(LemonModule):
|
||
|
||
def __init__(self, lemonbar_wrapper):
|
||
super().__init__(lemonbar_wrapper)
|
||
|
||
def _start_module(self):
|
||
class DateTimeThread(threading.Thread):
|
||
def __init__(self, secs_mode = False):
|
||
self.secs_mode = secs_mode
|
||
self.last_output = ''
|
||
|
||
self.shelf = common.Shelf()
|
||
self.readline = self.shelf.get
|
||
self.clock_tick = threading.Condition(self.shelf.lock) # Needed, because sleep may be
|
||
# interrupted when user changes H:M to H:M:S
|
||
super().__init__()
|
||
|
||
def run(self):
|
||
while True:
|
||
fmt = "%a %d %b %H:%M:%S" if self.secs_mode else "%a %d %b %H:%M"
|
||
timestr = time.strftime(fmt, time.localtime())
|
||
if timestr != self.last_output:
|
||
self.last_output = timestr
|
||
self.shelf.put(timestr)
|
||
self.sleep_until_change()
|
||
|
||
def sleep_until_change(self):
|
||
# Sleep for an appropriate amount of time (depending on if showing seconds or not)
|
||
# with the possibility of being woken early
|
||
if self.secs_mode:
|
||
rem = 0.1 # TODO fiddle with this, benchmarking it
|
||
else:
|
||
rem = time.time() % 5.0 # 5 seconds, 60 is unnecessarily long
|
||
self.clock_tick.acquire()
|
||
self.clock_tick.wait(rem)
|
||
self.clock_tick.release()
|
||
|
||
def wake(self):
|
||
self.clock_tick.acquire()
|
||
self.clock_tick.notify()
|
||
self.clock_tick.release()
|
||
|
||
self.dt_thread = DateTimeThread()
|
||
self.dt_thread.start()
|
||
self.status_handle = self.dt_thread # Only readline() is used
|
||
|
||
self.date = parser.IconTextUnit('date', action='date', order=40)
|
||
self.time = parser.IconTextUnit('time', action='toggle_secs', order=41
|
||
, alt_scheme=parser.COLOR_SCHEME.SPECIAL)
|
||
self.time.modes = [mode for mode in common.bar_mode]
|
||
|
||
|
||
self.register_action('date' , self.date_comm)
|
||
self.register_action('toggle_secs' , self.toggle_secs)
|
||
|
||
def _stop_module(self):
|
||
if self.p_handle is not None:
|
||
self.p_handle.terminate()
|
||
|
||
|
||
def _parse_data(self, data):
|
||
# Date and time
|
||
self.date.items = [(config.icon_clock, ' '.join(data[0:3]))]
|
||
self.time.items = [('', data[3])]
|
||
|
||
def date_comm(self):
|
||
subprocess.Popen(['yad', '--no-buttons', '--calendar', '--sticky'
|
||
, '--on-top' , '--class' , '"YADWIN"', '--posx=1650', '--posy=24'
|
||
, '--close-on-unfocus'])
|
||
|
||
def toggle_secs(self):
|
||
self.dt_thread.secs_mode = not self.dt_thread.secs_mode
|
||
self.dt_thread.wake()
|
||
|
||
def conky_config(update_interval=5):
|
||
return """
|
||
conky.config = {{
|
||
background=false,
|
||
update_interval={},
|
||
total_run_times=0,
|
||
override_utf8_locale=true,
|
||
short_units=true,
|
||
uppercase=false,
|
||
out_to_console=true,
|
||
out_to_x=false,
|
||
if_up_strictness='address',
|
||
format_human_readable=true,
|
||
}}
|
||
""".format(update_interval)
|
||
|
||
def conky_net(iface):
|
||
return """\\
|
||
${{if_up {iface}}}${{downspeedf {iface}}} ${{upspeedf {iface}}}\\
|
||
${{else}}down down${{endif}} """.format(iface=iface)
|
||
|
||
conky_begin_body = """
|
||
conky.text = [["""
|
||
|
||
conky_end_body = """
|
||
]]"""
|
||
class ConkyFastModule(LemonModule):
|
||
|
||
def __init__(self, lemonbar_wrapper):
|
||
self.conky_config = (conky_config(update_interval=1)
|
||
+ conky_begin_body
|
||
+ """\\
|
||
${exec ~/.i3/lemonbar/get_vol.sh} """
|
||
+ ( conky_net('enp5s0') if common.hostname() == 'kubaDesktop' else
|
||
(conky_net('wlp3s0')
|
||
+ conky_net('enp2s0')))
|
||
+ conky_end_body)
|
||
super().__init__(lemonbar_wrapper)
|
||
|
||
def _start_module(self):
|
||
self.p_handle = subprocess.Popen(['conky', '-c', '-'],
|
||
stdin=subprocess.PIPE, stdout=subprocess.PIPE, text=True)
|
||
self.p_handle.stdin.write(self.conky_config)
|
||
self.p_handle.stdin.close() # Otherwise conky doesn't load
|
||
self.status_handle = self.p_handle.stdout
|
||
|
||
self.eth_load = parser.IconTextUnit('eth_load', action='eth', order=14)
|
||
self.volume = parser.IconTextUnit('volume', action='pavu', order=20)
|
||
|
||
|
||
self.register_action('pavu', self.pavu)
|
||
|
||
if common.hostname() != 'kubaDesktop':
|
||
self.wlan_load = parser.IconTextUnit('wlan_load', action='wlan', order=13)
|
||
self.register_action('wlan', self.nmtui)
|
||
self._parse_data = self._parse_data_w_wlan
|
||
else:
|
||
self._parse_data = self._parse_data_wo_wlan
|
||
|
||
|
||
def _stop_module(self):
|
||
if self.p_handle is not None:
|
||
self.p_handle.terminate()
|
||
|
||
|
||
def _parse_data_w_wlan(self, data):
|
||
self.parse_vol (data[0])
|
||
self.parse_wlan (data[1:3])
|
||
self.parse_eth (data[3:5])
|
||
|
||
def _parse_data_wo_wlan(self, data):
|
||
self.parse_vol (data[0])
|
||
self.parse_eth (data[1:3])
|
||
|
||
def parse_wlan(self, data):
|
||
(wland_v, wlanu_v) = format_load(data, self.wlan_load, config.net_alert)
|
||
self.wlan_load.items = [(config.icon_wlan + config.icon_dl, wland_v)
|
||
,(config.icon_ul, wlanu_v)]
|
||
|
||
def parse_eth(self, data):
|
||
(ethd_v, ethu_v) = format_load(data, self.eth_load, config.net_alert)
|
||
self.eth_load.items = [(config.icon_eth + config.icon_dl, ethd_v)
|
||
,(config.icon_ul, ethu_v)]
|
||
|
||
def parse_vol(self, data):
|
||
mute = data == 'MUTE' or data == 'NONE'
|
||
(vol,vols) = (-1,'×') if mute else (int(data), data+'%')
|
||
icon_v = (config.icon_vol_mute if vol == 0 else
|
||
config.icon_vol_low if vol < 50 else config.icon_vol)
|
||
self.volume.items = [(icon_v, vols)]
|
||
|
||
def nmtui(self):
|
||
p = subprocess.Popen(['xterm', '-class', 'FLOAT_TERM', '-e', 'nmtui'])
|
||
common.kill_on_unfocus.append(p.pid)
|
||
|
||
def pavu(self):
|
||
p = subprocess.Popen(['pavucontrol', '--class=FLOAT_PAVU'])
|
||
common.kill_on_unfocus.append(p.pid)
|
||
|
||
class ConkySlowModule(LemonModule):
|
||
|
||
def __init__(self, lemonbar_wrapper):
|
||
self.conky_config = (conky_config()
|
||
+ conky_begin_body
|
||
+ """\\
|
||
${cpu} \\
|
||
${mem} \\
|
||
${fs_used_perc /} \\
|
||
${fs_used_perc /home} """
|
||
+ ("""\\
|
||
${exec ~/.i3/lemonbar/get_bat.sh} \\
|
||
${exec brillo} """ if common.hostname() != 'kubaDesktop' else '')
|
||
+ """\\
|
||
${exec /home/kuba/.i3/scripts/lang.sh show} \\
|
||
${exec ~/.i3/lemonbar/get_vol.sh}"""
|
||
+ conky_end_body)
|
||
super().__init__(lemonbar_wrapper)
|
||
|
||
def _start_module(self):
|
||
self.p_handle = subprocess.Popen(['conky', '-c', '-'],
|
||
stdin=subprocess.PIPE, stdout=subprocess.PIPE, text=True)
|
||
self.p_handle.stdin.write(self.conky_config)
|
||
self.p_handle.stdin.close() # Otherwise conky doesn't load
|
||
self.status_handle = self.p_handle.stdout
|
||
|
||
self.sys_load = parser.IconTextUnit('sys_load', action='load', order=10)
|
||
self.disk = parser.IconTextUnit('disk', order=11)
|
||
self.language = parser.IconTextUnit('language', action='lang', order=32
|
||
, external={'LANG': self.parse_language})
|
||
|
||
|
||
self.register_action('load', self.htop)
|
||
self.register_action('lang', self.lang_comm)
|
||
|
||
if common.hostname() != 'kubaDesktop':
|
||
self.brightness = parser.IconTextUnit('brightness', action='adj_br', order=21
|
||
, external={'BRIGHT': self.parse_brightness})
|
||
self.battery = parser.IconTextUnit('battery', action='dpms', order=22)
|
||
self.register_action('adj_br' , self.adj_br)
|
||
self.register_action('dpms' , self.dpms_comm)
|
||
self._parse_data = self._parse_data_w_batt
|
||
else:
|
||
self._parse_data = self._parse_data_wo_batt
|
||
|
||
def _stop_module(self):
|
||
|
||
if self.p_handle is not None:
|
||
self.p_handle.terminate()
|
||
|
||
def _parse_data_w_batt(self, data):
|
||
self.parse_sys_load (data[0:2]) # System load
|
||
self.parse_disk (data[2:4]) # Disk usage
|
||
self.parse_battery (data[4:5]) # Battery
|
||
self.parse_brightness (data[5:6]) # Screen brightness
|
||
self.parse_language (data[6:7]) # Language
|
||
|
||
def _parse_data_wo_batt(self, data):
|
||
self.parse_sys_load (data[0:2]) # System load
|
||
self.parse_disk (data[2:4]) # Disk usage
|
||
self.parse_language (data[4:5]) # Language
|
||
|
||
def parse_sys_load(self, data):
|
||
if int(data[0]) > int(config.cpu_alert):
|
||
self.sys_load.alt_scheme = parser.COLOR_SCHEME.CPU_ALERT
|
||
else:
|
||
self.sys_load.alt_scheme = None
|
||
|
||
self.sys_load.items = [(config.icon_cpu, data[0] + '%')
|
||
,(config.icon_ul, data[1])]
|
||
|
||
def parse_disk(self, data):
|
||
self.disk.items = [(config.icon_hd , data[0] + '%')
|
||
,(config.icon_home, data[1] + '%')]
|
||
def parse_battery(self, data):
|
||
(batt_stat, batt) = (data[0][0], data[0][1:])
|
||
batt_i = int(batt)
|
||
icon_batt = config.icon_charging if batt_stat == 'C' else \
|
||
config.icon_charged if batt_stat == 'F' else \
|
||
config.icon_batt_0 if batt_i < 20 else \
|
||
config.icon_batt_1 if batt_i < 40 else \
|
||
config.icon_batt_2 if batt_i < 60 else \
|
||
config.icon_batt_3 if batt_i < 80 else \
|
||
config.icon_batt_4
|
||
self.battery.items = [(icon_batt, batt+'%')]
|
||
|
||
def parse_brightness(self, data):
|
||
brtxt = str(int(float(data[0])))
|
||
self.brightness.items = [(config.icon_bright, brtxt+'%')]
|
||
|
||
def parse_language(self, data):
|
||
self.language.items = [(config.icon_lang, data[0])]
|
||
|
||
def htop(self):
|
||
p = subprocess.Popen(['xterm', '-class', 'FLOAT_TERM', '-e', 'htop'])
|
||
common.kill_on_unfocus.append(p.pid)
|
||
|
||
def lang_comm(self):
|
||
# TODO Connect to i3Module (or let i3Module connect here to update the keymap)
|
||
subprocess.Popen(['sh', '/home/kuba/.i3/scripts/lang.sh', 'next'])
|
||
|
||
def dpms_comm(self):
|
||
subprocess.Popen(['sh', '/home/kuba/.i3/scripts/dpmsctl.sh'])
|
||
|
||
def adj_br(self):
|
||
subprocess.Popen(['/home/kuba/.i3/scripts/adjbr.sh', '-b', config.fifo_file_executor])
|
||
|
||
class i3Module(LemonModule):
|
||
# Handles outputs (displays), workspaces and active window
|
||
# TODO trigger formatting
|
||
|
||
def __init__(self, lemonbar_wrapper):
|
||
super().__init__(lemonbar_wrapper)
|
||
self.displays = ''
|
||
self.win_title = ''
|
||
self.workspaces = ''
|
||
|
||
self.def_keymap = 'pl'
|
||
self.keymaps = {'firefox': 'se'}
|
||
self.cur_class = ''
|
||
|
||
def _start_module(self):
|
||
self.i3_ws_obj = wspaces.i3ws(logger=common.logger)
|
||
|
||
self.displays = parser.CustomUnit('displays', format_function = self.format_displays, order=-30)
|
||
self.workspaces = parser.CustomUnit('workspaces', format_function = self.format_workspaces, order=-20)
|
||
self.title = parser.CustomUnit('title', format_function = self.format_title, order=-10)
|
||
self.displays.modes.append(common.bar_mode.control)
|
||
self.workspaces.modes.append(common.bar_mode.control)
|
||
|
||
|
||
self.register_action('i3-msg' , self.i3msg_comm)
|
||
self.register_action('mode' , self.set_mode)
|
||
|
||
# Add callbacks for parsing
|
||
self.i3_ws_obj.change_callbacks.append(self.parse_displays)
|
||
self.i3_ws_obj.change_callbacks.append(self.parse_workspaces)
|
||
self.i3_ws_obj.focus_callbacks.append(self.parse_title)
|
||
|
||
# Add callbacks for special actions
|
||
self.i3_ws_obj.change_callbacks.append(self.set_bg)
|
||
self.i3_ws_obj.focus_callbacks.append(self.set_keymap)
|
||
self.i3_ws_obj.focus_callbacks.append(self.kill_floating_windows)
|
||
|
||
def _stop_module(self):
|
||
|
||
if self.i3_ws_obj is not None:
|
||
self.i3_ws_obj.quit()
|
||
|
||
# Overload run as i3_ws_obj.work() is a blocking command
|
||
# Parsing is instead done through callbacks
|
||
def run(self):
|
||
common.logger.info('Started module {}'.format(self.__class__.__name__))
|
||
common.health_logger.info('Module {} up'.format(self.__class__.__name__))
|
||
|
||
try:
|
||
self.i3_ws_obj.work() # This is a blocking command
|
||
except ConnectionError:
|
||
common.logger.info('i3ws failed connecting to i3 socket')
|
||
|
||
common.logger.info('Reached end of module {}'.format(self.__class__.__name__))
|
||
common.health_logger.info('Module {} down'.format(self.__class__.__name__))
|
||
|
||
def parse_displays(self, i3ws):
|
||
parsed_list = [parser.block(click='displays', font='2')]
|
||
for output in i3ws.outputs:
|
||
output_name = output.name
|
||
if output.active:
|
||
if output_name == 'eDP1':
|
||
col_head = config.color_head
|
||
elif output_name == 'DP1':
|
||
col_head = config.color_vga
|
||
elif output_name == 'HDMI2':
|
||
col_head = config.color_hdmi
|
||
else:
|
||
col_head = '#00000000' # Undefined
|
||
parsed_list.append(parser.block(fg=config.color_back, bg=col_head))
|
||
parsed_list.append(config.icon_wsp)
|
||
parsed_list.append(parser.block(click=''))
|
||
|
||
self.displays = ' '.join(parsed_list)
|
||
|
||
def parse_workspaces(self, i3ws):
|
||
prefix = parser.block(font='1', fg=config.color_back, bg=config.color_head)
|
||
prefix_foc = ''.join([parser.block(fg = config.color_head, bg=config.color_wsp)
|
||
, ' ', config.sep_right, ' '
|
||
, parser.block(fg=config.color_back, bg=config.color_wsp, font='1')])
|
||
prefix_ina = parser.block(fg=config.color_back, bg=config.color_head, font='1', append=' ')
|
||
wspces = []
|
||
|
||
for workspace in i3ws.workspaces:
|
||
# Find out which output the workspace is on
|
||
output = None # TODO actually use this information
|
||
for output_ in i3ws.outputs:
|
||
if output_.name == workspace.output:
|
||
output = output_
|
||
break
|
||
if not output:
|
||
continue
|
||
status = i3ws.state.get_state(workspace, output) # FOC or INA
|
||
name = workspace.name # e.g. 5 terms
|
||
current = ''.join([parser.block(click=('i3-msg workspace' + name))
|
||
, name, parser.block(click='')])
|
||
if status == "FOC":
|
||
wspces.append(''.join([prefix_foc, current]))
|
||
else:
|
||
wspces.append(''.join([prefix_ina, current]))
|
||
|
||
self.workspaces = ''.join([prefix, ' '.join(wspces)])
|
||
|
||
def parse_title(self, i3ws):
|
||
if i3ws.focused_window is None:
|
||
return
|
||
self.win_title = ' '.join([parser.block(fg=config.color_head, bg=config.color_sec_b2)
|
||
, config.sep_right, parser.block(fg=config.color_head, bg=config.color_sec_b2, click='mode cycle')
|
||
, config.icon_prog
|
||
, parser.block(fg=config.color_sec_b2, bg='-')
|
||
, i3ws.focused_window.name])
|
||
|
||
def format_displays(self):
|
||
return self.displays
|
||
|
||
def format_workspaces(self):
|
||
return self.workspaces
|
||
|
||
def format_title(self):
|
||
return self.win_title
|
||
|
||
def img_path(num):
|
||
dir = '/home/kuba/Obrazy/Wallpapers/'
|
||
return dir + {
|
||
1: '1_main',
|
||
2: '2_web',
|
||
3: '3_music',
|
||
4: '4_work',
|
||
5: '5_terms',
|
||
6: '6_stats',
|
||
7: '7',
|
||
8: '8',
|
||
9: '9',
|
||
}.get(int(num), 'default')
|
||
|
||
def set_bg(self, i3ws):
|
||
cmd_args = ['sh', '/home/kuba/scripts/set_bg.sh']
|
||
for output in i3ws.outputs:
|
||
if output.active:
|
||
bg = i3Module.img_path(output.current_workspace.partition(' ')[0])
|
||
cmd_args.append(bg)
|
||
subprocess.call(cmd_args)
|
||
|
||
def kill_floating_windows(self, i3ws):
|
||
if i3ws.focused_window is None:
|
||
return
|
||
|
||
wclass = i3ws.focused_window.window_class
|
||
|
||
if wclass != 'FLOAT_TERM' and wclass != 'FLOAT_PAVU' and wclass != 'YADWINBR':
|
||
# Is there a window that the bar has opened?
|
||
for pid in common.kill_on_unfocus:
|
||
try:
|
||
os.kill(pid, signal.SIGTERM)
|
||
except ProcessLookupError:
|
||
common.logger.debug('Tried killing process {} but it doesn\'t exist'.format(pid))
|
||
common.kill_on_unfocus = []
|
||
|
||
def set_keymap(self, i3ws):
|
||
if i3ws.focused_window is None:
|
||
return
|
||
wclass = i3ws.focused_window.window_class
|
||
self.cur_class = wclass
|
||
|
||
if wclass in self.keymaps:
|
||
new_km = self.keymaps[wclass]
|
||
common.logger.debug('Setting {} as keymap for {}'.format(new_km, wclass))
|
||
else:
|
||
new_km = self.def_keymap
|
||
common.logger.debug('Setting default keymap {} for {}'.format(new_km, wclass))
|
||
subprocess.call(['/home/kuba/.i3/scripts/lang.sh', 'qset', new_km]) # TODO it is this script that writes to fifo, triggering the formatting
|
||
|
||
def i3msg_comm(self, *cmd):
|
||
self.i3_ws_obj.command(' '.join(cmd))
|
||
|
||
def set_mode(self, new_mode):
|
||
if new_mode == "cycle":
|
||
common.mode = common.mode.cycle()
|
||
|
||
for m in common.bar_mode:
|
||
if new_mode == m.name:
|
||
common.mode = m
|
||
break
|
||
|
||
class ScreenModule(LemonModule):
|
||
"""
|
||
@ignore host kubaArch-Desktop
|
||
"""
|
||
# Start, stop and send commands to screen instance
|
||
|
||
# Start detached, in UTF-8 mode. Log to fifo
|
||
start_flags = ['-d', '-m', '-U', '-L', '-Logfile', config.fifo_screen_log]
|
||
|
||
def __init__(self, lemonbar_wrapper):
|
||
super().__init__(lemonbar_wrapper)
|
||
|
||
def _start_module(self):
|
||
self.identifier = 'lemonbar_{}_{}'.format(getpass.getuser(), os.getpid())
|
||
self.empty_count = 0
|
||
common.create_new_fifo(config.fifo_screen_log)
|
||
self.send(self.start_flags) # Start screen
|
||
self.send_cmd('stty -echo') # Do not echo what is written to screen terminal
|
||
self.send_colon('logfile flush 0.1') # Log quickly
|
||
|
||
self.status_handle = open(config.fifo_screen_log, 'r', buffering=1)
|
||
|
||
self.controls = parser.ButtonsUnit('controls', order=-10)
|
||
self.response = parser.IconTextUnit('response', order=10)
|
||
|
||
self.controls.items = [('', config.icon_prog, 'mode cycle')
|
||
,('', '', None)
|
||
,('', 'on', 'bluetooth power on')
|
||
,('', 'off', 'bluetooth power off')
|
||
,('PXC 550', '', None)
|
||
,('', 'conn.', 'bluetooth connect pxc550')
|
||
,('', 'disc.', 'bluetooth disconnect pxc550')
|
||
]
|
||
|
||
self.response.modes = [common.bar_mode.control]
|
||
self.controls.modes = [common.bar_mode.control]
|
||
|
||
|
||
self.register_action('bluetooth', self.bluetooth)
|
||
|
||
def _stop_module(self):
|
||
self.send_colon('kill')
|
||
self.status_handle.close()
|
||
with contextlib.suppress(FileNotFoundError):
|
||
os.remove(config.fifo_screen_log)
|
||
|
||
def _parse_data(self, data):
|
||
line = ' '.join(data)
|
||
line = common.strip_ansi_unicode(line)
|
||
|
||
common.logger.debug('Screen read line {}'.format(line))
|
||
if (not line.startswith('[CHG]')
|
||
and not line.isspace()):
|
||
self.response.items = [('', line)]
|
||
|
||
if not line:
|
||
# End loop if many empty lines in a row
|
||
self.empty_count = self.empty_count + 1
|
||
if self.empty_count > 3:
|
||
common.logger.error('Too many empty lines, aborting')
|
||
# TODO actually abort
|
||
else:
|
||
self.empty_count = 0
|
||
|
||
def send(self, args):
|
||
# Send something to our screen instance
|
||
subprocess.call(['screen', '-S', self.identifier] + args)
|
||
|
||
def send_cmd(self, cmd):
|
||
# Send terminal input to our screen instance
|
||
self.send(['-X', 'stuff', cmd + '\n'])
|
||
|
||
def send_colon(self, cmd):
|
||
# Send colon command to our screen instance
|
||
self.send(['-X', 'colon', cmd + '\n'])
|
||
|
||
def bluetooth(self, *args):
|
||
btcargs = [a.replace('pxc550', '00:16:94:22:29:0E') for a in args]
|
||
inp = ' '.join(btcargs)
|
||
self.send_cmd('bluetoothctl ' + inp)
|
||
|
||
class PowerOptionsModule(LemonModule):
|
||
|
||
def __init__(self, lemonbar_wrapper):
|
||
super().__init__(lemonbar_wrapper)
|
||
|
||
def _start_module(self):
|
||
# No external commands needed
|
||
self.status_handle = None
|
||
|
||
self.power_opts = parser.CustomUnit('power', format_function=self.format_power_opts, order=-1)
|
||
|
||
self.power_opts.modes = [common.bar_mode.power]
|
||
|
||
|
||
def _stop_module(self):
|
||
pass
|
||
|
||
def _parse_data(self, data):
|
||
pass
|
||
|
||
def format_power_opts(self):
|
||
return ''.join([parser.block(fg=config.color_fore, bg=config.color_poweropts)
|
||
, ' Abort (Esc) | System (l) lock, (e) logout, (s) suspend, (h) hibernate'
|
||
, ', (r) reboot, (Shift+s) shutdown'])
|
||
|
||
def filter_ignored_modules(module_classes):
|
||
re_ignore = re.compile(r'\s*@ignore host (\S+)')
|
||
hostname = common.hostname()
|
||
|
||
filtered = module_classes.copy()
|
||
for module in module_classes:
|
||
doc = module.__doc__
|
||
# Search docstring for exceptions based on hostname
|
||
if not doc:
|
||
continue
|
||
|
||
for line in doc.split('\n'):
|
||
m = re_ignore.match(line)
|
||
if m:
|
||
ignored = m.group(1)
|
||
if ignored == hostname:
|
||
common.logger.debug('Ignoring module {} on host {}'.format(module.__name__, hostname))
|
||
filtered.remove(module)
|
||
continue
|
||
|
||
return filtered
|
||
|
||
def get_active_modules():
|
||
clsmembers = inspect.getmembers(sys.modules[__name__], inspect.isclass)
|
||
modules = []
|
||
for name, cls in clsmembers:
|
||
if LemonModule in inspect.getmro(cls) and cls is not LemonModule:
|
||
modules.append(cls)
|
||
|
||
modules = filter_ignored_modules(modules)
|
||
return modules
|
||
|
||
def do_action(keyword):
|
||
global all_modules
|
||
for module in all_modules:
|
||
module.do_action(keyword)
|
||
|
||
def start_all(lemonbar_wrapper):
|
||
global all_modules
|
||
|
||
all_modules = []
|
||
for cls in get_active_modules():
|
||
all_modules.append(cls(lemonbar_wrapper))
|
||
|
||
for module in all_modules:
|
||
module.start()
|
||
|
||
def stop_all():
|
||
global all_modules
|
||
for module in all_modules:
|
||
module.stop()
|