import threading import subprocess import contextlib import time, os, signal, getpass import sys, inspect, re import i3_lemonbar_config as config import i3_lemonbar_common as common import i3_lemonbar_parser as parser import i3_workspaces as wspaces class LemonModule(threading.Thread): # Module spawns process that generates some status message, which the module parses and # sends to lemonbar queue. On click actions are registered with the module. # Every module runs in its own thread def units(self): # Iterate over units for _, attr_instance in self.__dict__.items(): if isinstance(attr_instance, parser.LemonUnit): yield attr_instance def __init__(self, lemonbar_wrapper): self.actions = {} super().__init__() self.lemonbar_wrapper = lemonbar_wrapper self._start_module() for unit in self.units(): parser.g_parser.register_unit(unit) def run(self): if self.status_handle is None: return common.logger.info('Started module {}'.format(self.__class__.__name__)) common.health_logger.info('Module {} up'.format(self.__class__.__name__)) while True: line = self.status_handle.readline() if not line: common.logger.info('Reached end of module {}'.format(self.__class__.__name__)) common.health_logger.info('Module {} down'.format(self.__class__.__name__)) break parsed = self.parse(line) self.lemonbar_wrapper.buffer_in.put(parsed) def stop(self): self._stop_module() for unit in self.units(): parser.g_parser.remove_unit(unit) def parse(self, line): data = line.split() self._parse_data(data) # Update correct field formatted_line = parser.format_line() # Construct entire line return formatted_line def register_action(self, keyword, do_action): self.actions[keyword] = do_action def do_action(self, command): action_arr = command.split() action = action_arr[0] if (action in self.actions): func = self.actions[action] func(*action_arr[1:]) # Unpack arguments, if any def format_load(data, module, alert): # Helper function to format network modules # Changes colors scheme to inactive when interfaces are down, or to alert when # alert level is reached # Returns tuple (down, up) if data[0] == 'down': module.alt_scheme = parser.COLOR_SCHEME.INA return ('x', 'x') else: (d_v, u_v) = (data[0],data[1]) if max(float(d_v), float(u_v)) > float(alert): module.alt_scheme = parser.COLOR_SCHEME.NET_ALERT else: # Reset to default module.alt_scheme = None return (d_v, u_v) class DateTimeModule(LemonModule): def __init__(self, lemonbar_wrapper): super().__init__(lemonbar_wrapper) def _start_module(self): class DateTimeThread(threading.Thread): def __init__(self, secs_mode = False): self.secs_mode = secs_mode self.last_output = '' self.shelf = common.Shelf() self.readline = self.shelf.get self.clock_tick = threading.Condition(self.shelf.lock) # Needed, because sleep may be # interrupted when user changes H:M to H:M:S super().__init__() def run(self): while True: fmt = "%a %d %b %H:%M:%S" if self.secs_mode else "%a %d %b %H:%M" timestr = time.strftime(fmt, time.localtime()) if timestr != self.last_output: self.last_output = timestr self.shelf.put(timestr) self.sleep_until_change() def sleep_until_change(self): # Sleep for an appropriate amount of time (depending on if showing seconds or not) # with the possibility of being woken early if self.secs_mode: rem = 0.1 # TODO fiddle with this, benchmarking it else: rem = time.time() % 5.0 # 5 seconds, 60 is unnecessarily long self.clock_tick.acquire() self.clock_tick.wait(rem) self.clock_tick.release() def wake(self): self.clock_tick.acquire() self.clock_tick.notify() self.clock_tick.release() self.dt_thread = DateTimeThread() self.dt_thread.start() self.status_handle = self.dt_thread # Only readline() is used self.date = parser.IconTextUnit('date', action='date', order=40) self.time = parser.IconTextUnit('time', action='toggle_secs', order=41 , alt_scheme=parser.COLOR_SCHEME.SPECIAL) self.time.modes = [mode for mode in common.bar_mode] self.register_action('date' , self.date_comm) self.register_action('toggle_secs' , self.toggle_secs) def _stop_module(self): if self.p_handle is not None: self.p_handle.terminate() def _parse_data(self, data): # Date and time self.date.items = [(config.icon_clock, ' '.join(data[0:3]))] self.time.items = [('', data[3])] def date_comm(self): subprocess.Popen(['yad', '--no-buttons', '--calendar', '--sticky' , '--on-top' , '--class' , '"YADWIN"', '--posx=1650', '--posy=24' , '--close-on-unfocus']) def toggle_secs(self): self.dt_thread.secs_mode = not self.dt_thread.secs_mode self.dt_thread.wake() def conky_config(update_interval=5): return """ conky.config = {{ background=false, update_interval={}, total_run_times=0, override_utf8_locale=true, short_units=true, uppercase=false, out_to_console=true, out_to_x=false, if_up_strictness='address', format_human_readable=true, }} """.format(update_interval) def conky_net(iface): return """\\ ${{if_up {iface}}}${{downspeedf {iface}}} ${{upspeedf {iface}}}\\ ${{else}}down down${{endif}} """.format(iface=iface) conky_begin_body = """ conky.text = [[""" conky_end_body = """ ]]""" class ConkyFastModule(LemonModule): def __init__(self, lemonbar_wrapper): self.conky_config = (conky_config(update_interval=1) + conky_begin_body + """\\ ${exec ~/.i3/lemonbar/get_vol.sh} """ + ( conky_net('enp5s0') if common.hostname() == 'kubaDesktop' else (conky_net('wlp3s0') + conky_net('enp2s0'))) + conky_end_body) super().__init__(lemonbar_wrapper) def _start_module(self): self.p_handle = subprocess.Popen(['conky', '-c', '-'], stdin=subprocess.PIPE, stdout=subprocess.PIPE, text=True) self.p_handle.stdin.write(self.conky_config) self.p_handle.stdin.close() # Otherwise conky doesn't load self.status_handle = self.p_handle.stdout self.eth_load = parser.IconTextUnit('eth_load', action='eth', order=14) self.volume = parser.IconTextUnit('volume', action='pavu', order=20) self.register_action('pavu', self.pavu) if common.hostname() != 'kubaDesktop': self.wlan_load = parser.IconTextUnit('wlan_load', action='wlan', order=13) self.register_action('wlan', self.nmtui) self._parse_data = self._parse_data_w_wlan else: self._parse_data = self._parse_data_wo_wlan def _stop_module(self): if self.p_handle is not None: self.p_handle.terminate() def _parse_data_w_wlan(self, data): self.parse_vol (data[0]) self.parse_wlan (data[1:3]) self.parse_eth (data[3:5]) def _parse_data_wo_wlan(self, data): self.parse_vol (data[0]) self.parse_eth (data[1:3]) def parse_wlan(self, data): (wland_v, wlanu_v) = format_load(data, self.wlan_load, config.net_alert) self.wlan_load.items = [(config.icon_wlan + config.icon_dl, wland_v) ,(config.icon_ul, wlanu_v)] def parse_eth(self, data): (ethd_v, ethu_v) = format_load(data, self.eth_load, config.net_alert) self.eth_load.items = [(config.icon_eth + config.icon_dl, ethd_v) ,(config.icon_ul, ethu_v)] def parse_vol(self, data): mute = data == 'MUTE' or data == 'NONE' (vol,vols) = (-1,'×') if mute else (int(data), data+'%') icon_v = (config.icon_vol_mute if vol == 0 else config.icon_vol_low if vol < 50 else config.icon_vol) self.volume.items = [(icon_v, vols)] def nmtui(self): p = subprocess.Popen(['xterm', '-class', 'FLOAT_TERM', '-e', 'nmtui']) common.kill_on_unfocus.append(p.pid) def pavu(self): p = subprocess.Popen(['pavucontrol', '--class=FLOAT_PAVU']) common.kill_on_unfocus.append(p.pid) class ConkySlowModule(LemonModule): def __init__(self, lemonbar_wrapper): self.conky_config = (conky_config() + conky_begin_body + """\\ ${cpu} \\ ${mem} \\ ${fs_used_perc /} \\ ${fs_used_perc /home} """ + ("""\\ ${exec ~/.i3/lemonbar/get_bat.sh} \\ ${exec brillo} """ if common.hostname() != 'kubaDesktop' else '') + """\\ ${exec /home/kuba/.i3/scripts/lang.sh show} \\ ${exec ~/.i3/lemonbar/get_vol.sh}""" + conky_end_body) super().__init__(lemonbar_wrapper) def _start_module(self): self.p_handle = subprocess.Popen(['conky', '-c', '-'], stdin=subprocess.PIPE, stdout=subprocess.PIPE, text=True) self.p_handle.stdin.write(self.conky_config) self.p_handle.stdin.close() # Otherwise conky doesn't load self.status_handle = self.p_handle.stdout self.sys_load = parser.IconTextUnit('sys_load', action='load', order=10) self.disk = parser.IconTextUnit('disk', order=11) self.language = parser.IconTextUnit('language', action='lang', order=32 , external={'LANG': self.parse_language}) self.register_action('load', self.htop) self.register_action('lang', self.lang_comm) if common.hostname() != 'kubaDesktop': self.brightness = parser.IconTextUnit('brightness', action='adj_br', order=21 , external={'BRIGHT': self.parse_brightness}) self.battery = parser.IconTextUnit('battery', action='dpms', order=22) self.register_action('adj_br' , self.adj_br) self.register_action('dpms' , self.dpms_comm) self._parse_data = self._parse_data_w_batt else: self._parse_data = self._parse_data_wo_batt def _stop_module(self): if self.p_handle is not None: self.p_handle.terminate() def _parse_data_w_batt(self, data): self.parse_sys_load (data[0:2]) # System load self.parse_disk (data[2:4]) # Disk usage self.parse_battery (data[4:5]) # Battery self.parse_brightness (data[5:6]) # Screen brightness self.parse_language (data[6:7]) # Language def _parse_data_wo_batt(self, data): self.parse_sys_load (data[0:2]) # System load self.parse_disk (data[2:4]) # Disk usage self.parse_language (data[4:5]) # Language def parse_sys_load(self, data): if int(data[0]) > int(config.cpu_alert): self.sys_load.alt_scheme = parser.COLOR_SCHEME.CPU_ALERT else: self.sys_load.alt_scheme = None self.sys_load.items = [(config.icon_cpu, data[0] + '%') ,(config.icon_ul, data[1])] def parse_disk(self, data): self.disk.items = [(config.icon_hd , data[0] + '%') ,(config.icon_home, data[1] + '%')] def parse_battery(self, data): (batt_stat, batt) = (data[0][0], data[0][1:]) batt_i = int(batt) icon_batt = config.icon_charging if batt_stat == 'C' else \ config.icon_charged if batt_stat == 'F' else \ config.icon_batt_0 if batt_i < 20 else \ config.icon_batt_1 if batt_i < 40 else \ config.icon_batt_2 if batt_i < 60 else \ config.icon_batt_3 if batt_i < 80 else \ config.icon_batt_4 self.battery.items = [(icon_batt, batt+'%')] def parse_brightness(self, data): brtxt = str(int(float(data[0]))) self.brightness.items = [(config.icon_bright, brtxt+'%')] def parse_language(self, data): self.language.items = [(config.icon_lang, data[0])] def htop(self): p = subprocess.Popen(['xterm', '-class', 'FLOAT_TERM', '-e', 'htop']) common.kill_on_unfocus.append(p.pid) def lang_comm(self): # TODO Connect to i3Module (or let i3Module connect here to update the keymap) subprocess.Popen(['sh', '/home/kuba/.i3/scripts/lang.sh', 'next']) def dpms_comm(self): subprocess.Popen(['sh', '/home/kuba/.i3/scripts/dpmsctl.sh']) def adj_br(self): subprocess.Popen(['/home/kuba/.i3/scripts/adjbr.sh', '-b', config.fifo_file_executor]) class i3Module(LemonModule): # Handles outputs (displays), workspaces and active window def __init__(self, lemonbar_wrapper): super().__init__(lemonbar_wrapper) self.displays = '' self.win_title = '' self.workspaces = '' self.def_keymap = 'pl' self.keymaps = {'firefox': 'se'} self.cur_class = '' def _start_module(self): self.i3_ws_obj = wspaces.i3ws(logger=common.logger) self.displays = parser.CustomUnit('displays', format_function = self.format_displays, order=-30) self.workspaces = parser.CustomUnit('workspaces', format_function = self.format_workspaces, order=-20) self.title = parser.CustomUnit('title', format_function = self.format_title, order=-10) self.displays.modes.append(common.bar_mode.control) self.workspaces.modes.append(common.bar_mode.control) self.register_action('i3-msg' , self.i3msg_comm) self.register_action('mode' , self.set_mode) # Add callbacks for parsing self.i3_ws_obj.change_callbacks.append(self.parse_displays) self.i3_ws_obj.change_callbacks.append(self.parse_workspaces) self.i3_ws_obj.focus_callbacks.append(self.parse_title) # Callback after all other actions should flush the bar self.i3_ws_obj.final_callback = self.update_bar # Add callbacks for special actions self.i3_ws_obj.change_callbacks.append(self.set_bg) self.i3_ws_obj.focus_callbacks.append(self.set_keymap) self.i3_ws_obj.focus_callbacks.append(self.kill_floating_windows) def _stop_module(self): if self.i3_ws_obj is not None: self.i3_ws_obj.quit() # Overload run as i3_ws_obj.work() is a blocking command # Parsing is instead done through callbacks def run(self): common.logger.info('Started module {}'.format(self.__class__.__name__)) common.health_logger.info('Module {} up'.format(self.__class__.__name__)) try: self.i3_ws_obj.work() # This is a blocking command except ConnectionError: common.logger.info('i3ws failed connecting to i3 socket') common.logger.info('Reached end of module {}'.format(self.__class__.__name__)) common.health_logger.info('Module {} down'.format(self.__class__.__name__)) def parse_displays(self, i3ws): parsed_list = [parser.block(click='displays', font='2')] for output in i3ws.outputs: output_name = output.name if output.active: if output_name == 'eDP1': col_head = config.color_head elif output_name == 'DP1': col_head = config.color_vga elif output_name == 'HDMI2': col_head = config.color_hdmi else: col_head = '#00000000' # Undefined parsed_list.append(parser.block(fg=config.color_back, bg=col_head)) parsed_list.append(config.icon_wsp) parsed_list.append(parser.block(click='')) self.displays = ' '.join(parsed_list) def parse_workspaces(self, i3ws): prefix = parser.block(font='1', fg=config.color_back, bg=config.color_head) prefix_foc = ''.join([parser.block(fg = config.color_head, bg=config.color_wsp) , ' ', config.sep_right, ' ' , parser.block(fg=config.color_back, bg=config.color_wsp, font='1')]) prefix_ina = parser.block(fg=config.color_back, bg=config.color_head, font='1', append=' ') wspces = [] for workspace in i3ws.workspaces: # Find out which output the workspace is on output = None # TODO actually use this information for output_ in i3ws.outputs: if output_.name == workspace.output: output = output_ break if not output: continue status = i3ws.state.get_state(workspace, output) # FOC or INA name = workspace.name # e.g. 5 terms current = ''.join([parser.block(click=('i3-msg workspace' + name)) , name, parser.block(click='')]) if status == "FOC": wspces.append(''.join([prefix_foc, current])) else: wspces.append(''.join([prefix_ina, current])) self.workspaces = ''.join([prefix, ' '.join(wspces)]) def parse_title(self, i3ws): if i3ws.focused_window is None: return self.win_title = ' '.join([parser.block(fg=config.color_head, bg=config.color_sec_b2) , config.sep_right, parser.block(fg=config.color_head, bg=config.color_sec_b2, click='mode cycle') , config.icon_prog , parser.block(fg=config.color_sec_b2, bg='-') , i3ws.focused_window.name]) def update_bar(self, i3ws): parsed = parser.format_line() # Construct entire line self.lemonbar_wrapper.buffer_in.put(parsed) def format_displays(self): return self.displays def format_workspaces(self): return self.workspaces def format_title(self): return self.win_title def img_path(num): dir = '/home/kuba/Obrazy/Wallpapers/' return dir + { 1: '1_main', 2: '2_web', 3: '3_music', 4: '4_work', 5: '5_terms', 6: '6_stats', 7: '7', 8: '8', 9: '9', }.get(int(num), 'default') def set_bg(self, i3ws): cmd_args = ['sh', '/home/kuba/scripts/set_bg.sh'] for output in i3ws.outputs: if output.active: bg = i3Module.img_path(output.current_workspace.partition(' ')[0]) cmd_args.append(bg) subprocess.call(cmd_args) def kill_floating_windows(self, i3ws): if i3ws.focused_window is None: return wclass = i3ws.focused_window.window_class if wclass != 'FLOAT_TERM' and wclass != 'FLOAT_PAVU' and wclass != 'YADWINBR': # Is there a window that the bar has opened? for pid in common.kill_on_unfocus: try: os.kill(pid, signal.SIGTERM) except ProcessLookupError: common.logger.debug('Tried killing process {} but it doesn\'t exist'.format(pid)) common.kill_on_unfocus = [] def set_keymap(self, i3ws): if i3ws.focused_window is None: return wclass = i3ws.focused_window.window_class self.cur_class = wclass if wclass in self.keymaps: new_km = self.keymaps[wclass] common.logger.debug('Setting {} as keymap for {}'.format(new_km, wclass)) else: new_km = self.def_keymap common.logger.debug('Setting default keymap {} for {}'.format(new_km, wclass)) subprocess.call(['/home/kuba/.i3/scripts/lang.sh', 'qset', new_km]) # TODO rework lang script (after fifo for executing commands) def i3msg_comm(self, *cmd): self.i3_ws_obj.command(' '.join(cmd)) def set_mode(self, new_mode): if new_mode == "cycle": common.mode = common.mode.cycle() for m in common.bar_mode: if new_mode == m.name: common.mode = m break parsed = parser.format_line() self.lemonbar_wrapper.buffer_in.put(parsed) class ScreenModule(LemonModule): """ @ignore host kubaArch-Desktop """ # Start, stop and send commands to screen instance # Start detached, in UTF-8 mode. Log to fifo start_flags = ['-d', '-m', '-U', '-L', '-Logfile', config.fifo_screen_log] def __init__(self, lemonbar_wrapper): super().__init__(lemonbar_wrapper) def _start_module(self): self.identifier = 'lemonbar_{}_{}'.format(getpass.getuser(), os.getpid()) self.empty_count = 0 common.create_new_fifo(config.fifo_screen_log) self.send(self.start_flags) # Start screen self.send_cmd('stty -echo') # Do not echo what is written to screen terminal self.send_colon('logfile flush 0.1') # Log quickly self.status_handle = open(config.fifo_screen_log, 'r', buffering=1) self.controls = parser.ButtonsUnit('controls', order=-10) self.response = parser.IconTextUnit('response', order=10) self.controls.items = [('', config.icon_prog, 'mode cycle') ,('', '', None) ,('', 'on', 'bluetooth power on') ,('', 'off', 'bluetooth power off') ,('PXC 550', '', None) ,('', 'conn.', 'bluetooth connect pxc550') ,('', 'disc.', 'bluetooth disconnect pxc550') ] self.response.modes = [common.bar_mode.control] self.controls.modes = [common.bar_mode.control] self.register_action('bluetooth', self.bluetooth) def _stop_module(self): self.send_colon('kill') self.status_handle.close() with contextlib.suppress(FileNotFoundError): os.remove(config.fifo_screen_log) def _parse_data(self, data): line = ' '.join(data) line = common.strip_ansi_unicode(line) common.logger.debug('Screen read line {}'.format(line)) if (not line.startswith('[CHG]') and not line.isspace()): self.response.items = [('', line)] if not line: # End loop if many empty lines in a row self.empty_count = self.empty_count + 1 if self.empty_count > 3: common.logger.error('Too many empty lines, aborting') # TODO actually abort else: self.empty_count = 0 def send(self, args): # Send something to our screen instance subprocess.call(['screen', '-S', self.identifier] + args) def send_cmd(self, cmd): # Send terminal input to our screen instance self.send(['-X', 'stuff', cmd + '\n']) def send_colon(self, cmd): # Send colon command to our screen instance self.send(['-X', 'colon', cmd + '\n']) def bluetooth(self, *args): btcargs = [a.replace('pxc550', '00:16:94:22:29:0E') for a in args] inp = ' '.join(btcargs) self.send_cmd('bluetoothctl ' + inp) class PowerOptionsModule(LemonModule): def __init__(self, lemonbar_wrapper): super().__init__(lemonbar_wrapper) def _start_module(self): # No external commands needed self.status_handle = None self.power_opts = parser.CustomUnit('power', format_function=self.format_power_opts, order=-1) self.power_opts.modes = [common.bar_mode.power] def _stop_module(self): pass def _parse_data(self, data): pass def format_power_opts(self): return ''.join([parser.block(fg=config.color_fore, bg=config.color_poweropts) , ' Abort (Esc) | System (l) lock, (e) logout, (s) suspend, (h) hibernate' , ', (r) reboot, (Shift+s) shutdown']) def filter_ignored_modules(module_classes): re_ignore = re.compile(r'\s*@ignore host (\S+)') hostname = common.hostname() filtered = module_classes.copy() for module in module_classes: doc = module.__doc__ # Search docstring for exceptions based on hostname if not doc: continue for line in doc.split('\n'): m = re_ignore.match(line) if m: ignored = m.group(1) if ignored == hostname: common.logger.debug('Ignoring module {} on host {}'.format(module.__name__, hostname)) filtered.remove(module) continue return filtered def get_active_modules(): clsmembers = inspect.getmembers(sys.modules[__name__], inspect.isclass) modules = [] for name, cls in clsmembers: if LemonModule in inspect.getmro(cls) and cls is not LemonModule: modules.append(cls) modules = filter_ignored_modules(modules) return modules def do_action(keyword): global all_modules for module in all_modules: module.do_action(keyword) def start_all(lemonbar_wrapper): global all_modules all_modules = [] for cls in get_active_modules(): all_modules.append(cls(lemonbar_wrapper)) for module in all_modules: module.start() def stop_all(): global all_modules for module in all_modules: module.stop()