import threading import subprocess import contextlib import time, os, signal, getpass import queue import i3_lemonbar_config as config import i3_lemonbar_common as common import i3_lemonbar_parser as parser import i3_workspaces as wspaces class LemonModule(threading.Thread): # Module spawns process that generates some status message, which the module parses and # sends to lemonbar queue. On click actions are registered with the module. # Every module runs in its own thread def __init__(self): self.actions = {} super().__init__() self._start_module() def run(self): if self.status_handle is None: return common.logger.info('Started module {}'.format(self.__class__.__name__)) common.health_logger.info('Module {} up'.format(self.__class__.__name__)) while True: line = self.status_handle.readline() if not line: common.logger.info('Reached end of module {}'.format(self.__class__.__name__)) common.health_logger.info('Module {} down'.format(self.__class__.__name__)) break parsed = self.parse(line) common.write_queue.put(parsed) def stop(self): self._stop_module() def parse(self, line): data = line.split() self._parse_data(data) # Update correct field formatted_line = parser.format_line() # Construct entire line return formatted_line def register_action(self, keyword, do_action): self.actions[keyword] = do_action def do_action(self, command): action_arr = command.split() action = action_arr[0] if (action in self.actions): func = self.actions[action] func(*action_arr[1:]) # Unpack arguments, if any def format_load(data, module, alert): # Helper function to format network modules # Changes colors scheme to inactive when interfaces are down, or to alert when # alert level is reached # Returns tuple (down, up) if data[0] == 'down': module.alt_scheme = parser.COLOR_SCHEME.INA return ('x', 'x') else: (d_v, u_v) = (data[0],data[1]) if max(float(d_v), float(u_v)) > float(alert): module.alt_scheme = parser.COLOR_SCHEME.NET_ALERT else: # Reset to default module.alt_scheme = None return (d_v, u_v) class DateTimeModule(LemonModule): def __init__(self): self.show_secs = False super().__init__() def _start_module(self): class DateTimeThread(threading.Thread): def __init__(self): self.last_out = '' self.out_queue = queue.Queue() self.out_queue.readline = self.out_queue.get super().__init__() def run(self): while True: timestr = time.strftime("%a %d %b %H:%M:%S", time.gmtime()) if timestr != self.last_out: self.last_out = timestr self.out_queue.put('{}\n'.format(timestr)) time.sleep(0.1) self.dt_thread = DateTimeThread() self.dt_thread.start() self.status_handle = self.dt_thread.out_queue self.date = parser.IconTextUnit('date', action='date', order=40) self.time = parser.IconTextUnit('time', action='toggle_secs', order=41 , alt_scheme=parser.COLOR_SCHEME.SPECIAL) self.time.modes = [mode for mode in common.bar_mode] parser.g_parser.register_unit(self.date) parser.g_parser.register_unit(self.time) self.register_action('date' , self.date_comm) self.register_action('toggle_secs' , self.toggle_secs) def _stop_module(self): if self.p_handle is not None: self.p_handle.terminate() parser.g_parser.remove_unit(self.date) parser.g_parser.remove_unit(self.time) def _parse_data(self, data): # Date and time self.date.items = [(config.icon_clock, ' '.join(data[0:3]))] self.time.items = [('', data[3] if self.show_secs else data[3][:-3])] def date_comm(self): subprocess.Popen(['yad', '--no-buttons', '--calendar', '--sticky' , '--on-top' , '--class' , '"YADWIN"', '--posx=1650', '--posy=24' , '--close-on-unfocus']) def toggle_secs(self): self.show_secs = not self.show_secs class ConkyFastModule(LemonModule): conky_config_str = """ conky.config = { background=false, update_interval=1, total_run_times=0, override_utf8_locale=true, short_units=true, uppercase=false, out_to_console=true, out_to_x=false, if_up_strictness='address', format_human_readable=true, } """ if os.uname()[1] == 'kubaDesktop': conky_config = conky_config_str + """ conky.text = [[ ${exec ~/.i3/lemonbar/get_vol.sh} \ down down\ \ ${if_up enp5s0}${downspeedf enp5s0} ${upspeedf enp5s0}\ ${else}down down${endif}\ \ ]] """ else: conky_config = conky_config_str + """ conky.text = [[ ${exec ~/.i3/lemonbar/get_vol.sh} \ ${if_up wlp3s0}${downspeedf wlp3s0} ${upspeedf wlp3s0}\ ${else}down down${endif}\ \ ${if_up enp2s0}${downspeedf enp2s0} ${upspeedf enp2s0}\ ${else}down down${endif}\ \ ]] """ def __init__(self): super().__init__() def _start_module(self): self.p_handle = subprocess.Popen(['conky', '-c', '-'], stdin=subprocess.PIPE, stdout=subprocess.PIPE, text=True) self.p_handle.stdin.write(self.conky_config) self.p_handle.stdin.close() # Otherwise conky doesn't load self.status_handle = self.p_handle.stdout self.wlan_load = parser.IconTextUnit('wlan_load', action='wlan', order=13) self.eth_load = parser.IconTextUnit('eth_load', action='eth', order=14) self.volume = parser.IconTextUnit('volume', action='pavu', order=20) parser.g_parser.register_unit(self.wlan_load) parser.g_parser.register_unit(self.eth_load) parser.g_parser.register_unit(self.volume) self.register_action('wlan' , self.nmtui) self.register_action('pavu' , self.pavu) def _stop_module(self): if self.p_handle is not None: self.p_handle.terminate() parser.g_parser.remove_unit(self.wlan_load) parser.g_parser.remove_unit(self.eth_load) parser.g_parser.remove_unit(self.volume) def _parse_data(self, data): # wlan and eth (wland_v, wlanu_v) = format_load(data[1:3], self.wlan_load, config.net_alert) self.wlan_load.items = [(config.icon_wlan + config.icon_dl, wland_v) ,(config.icon_ul, wlanu_v)] (ethd_v, ethu_v) = format_load(data[3:5], self.eth_load, config.net_alert) self.eth_load.items = [(config.icon_eth + config.icon_dl, ethd_v) ,(config.icon_ul, ethu_v)] # Volume mute = data[0] == 'MUTE' or data[0] == 'NONE' (vol,vols) = (-1,'×') if mute else (int(data[0]), data[0]+'%') icon_v = config.icon_vol_mute if vol == 0 else \ config.icon_vol_low if vol < 50 else config.icon_vol self.volume.items = [(icon_v, vols)] def nmtui(self): p = subprocess.Popen(['xterm', '-class', 'FLOAT_TERM', '-e', 'nmtui']) common.kill_on_unfocus.append(p.pid) def pavu(self): p = subprocess.Popen(['pavucontrol', '--class=FLOAT_PAVU']) common.kill_on_unfocus.append(p.pid) class ConkySlowModule(LemonModule): conky_config_str = """ conky.config = { background=false, update_interval=5, total_run_times=0, override_utf8_locale=true, short_units=true, uppercase=false, out_to_console=true, out_to_x=false, if_up_strictness='address', format_human_readable=true, } """ if os.uname()[1] == 'kubaDesktop': conky_config = conky_config_str + """ conky.text = [[ ${cpu} \ ${mem} \ ${fs_used_perc /} \ ${fs_used_perc /home} \ F100 \ 100.00 \ ${exec /home/kuba/.i3/scripts/lang.sh show} ]] """ else: conky_config = conky_config_str + """ conky.text = [[ ${cpu} \ ${mem} \ ${fs_used_perc /} \ ${fs_used_perc /home} \ ${exec ~/.i3/lemonbar/get_bat.sh} \ ${exec brillo} \ ${exec /home/kuba/.i3/scripts/lang.sh show} ]] """ def __init__(self): super().__init__() def _start_module(self): self.p_handle = subprocess.Popen(['conky', '-c', '-'], stdin=subprocess.PIPE, stdout=subprocess.PIPE, text=True) self.p_handle.stdin.write(self.conky_config) self.p_handle.stdin.close() # Otherwise conky doesn't load self.status_handle = self.p_handle.stdout self.sys_load = parser.IconTextUnit('sys_load', action='load', order=10) self.disk = parser.IconTextUnit('disk', order=11) self.brightness = parser.IconTextUnit('brightness', action='adj_br', order=21 , external={'BRIGHT': self.parse_brightness}) self.battery = parser.IconTextUnit('battery', action='dpms', order=22) self.language = parser.IconTextUnit('language', action='lang', order=32 , external={'LANG': self.parse_language}) parser.g_parser.register_unit(self.sys_load) parser.g_parser.register_unit(self.disk) parser.g_parser.register_unit(self.brightness) parser.g_parser.register_unit(self.battery) parser.g_parser.register_unit(self.language) self.register_action('load' , self.htop) self.register_action('lang' , self.lang_comm) self.register_action('adj_br' , self.adj_br) self.register_action('dpms' , self.dpms_comm) def _stop_module(self): parser.g_parser.remove_unit(self.sys_load) parser.g_parser.remove_unit(self.disk) parser.g_parser.remove_unit(self.brightness) parser.g_parser.remove_unit(self.battery) parser.g_parser.remove_unit(self.language) if self.p_handle is not None: self.p_handle.terminate() def _parse_data(self, data): self.parse_sys_load (data[0:2]) # System load self.parse_disk (data[2:4]) # Disk usage self.parse_battery (data[4:5]) # Battery self.parse_brightness (data[5:6]) # Screen brightness self.parse_language (data[6:7]) # Language def parse_sys_load(self, data): if int(data[0]) > int(config.cpu_alert): self.sys_load.alt_scheme = parser.COLOR_SCHEME.CPU_ALERT else: self.sys_load.alt_scheme = None self.sys_load.items = [(config.icon_cpu, data[0] + '%') ,(config.icon_ul, data[1])] def parse_disk(self, data): self.disk.items = [(config.icon_hd , data[0] + '%') ,(config.icon_home, data[1] + '%')] def parse_battery(self, data): (batt_stat, batt) = (data[0][0], data[0][1:]) batt_i = int(batt) icon_batt = config.icon_charging if batt_stat == 'C' else \ config.icon_charged if batt_stat == 'F' else \ config.icon_batt_0 if batt_i < 20 else \ config.icon_batt_1 if batt_i < 40 else \ config.icon_batt_2 if batt_i < 60 else \ config.icon_batt_3 if batt_i < 80 else \ config.icon_batt_4 self.battery.items = [(icon_batt, batt+'%')] def parse_brightness(self, data): brtxt = str(int(float(data[0]))) self.brightness.items = [(config.icon_bright, brtxt+'%')] def parse_language(self, data): self.language.items = [(config.icon_lang, data[0])] def htop(self): p = subprocess.Popen(['xterm', '-class', 'FLOAT_TERM', '-e', 'htop']) common.kill_on_unfocus.append(p.pid) def lang_comm(self): # TODO Connect to i3Module (or let i3Module connect here to update the keymap) subprocess.Popen(['sh', '/home/kuba/.i3/scripts/lang.sh', 'next']) def dpms_comm(self): subprocess.Popen(['sh', '/home/kuba/.i3/scripts/dpmsctl.sh']) def adj_br(self): subprocess.Popen(['/home/kuba/.i3/scripts/adjbr.sh', '-b', config.fifo_file_executor]) class i3Module(LemonModule): # Handles outputs (displays), workspaces and active window # TODO trigger formatting def __init__(self): super().__init__() self.displays = '' self.win_title = '' self.workspaces = '' self.def_keymap = 'pl' self.keymaps = {'firefox': 'se'} self.cur_class = '' def _start_module(self): self.i3_ws_obj = wspaces.i3ws(logger=common.logger) self.displays = parser.CustomUnit('displays', format_function = self.format_displays, order=-30) self.workspaces = parser.CustomUnit('workspaces', format_function = self.format_workspaces, order=-20) self.title = parser.CustomUnit('title', format_function = self.format_title, order=-10) self.displays.modes.append(common.bar_mode.control) self.workspaces.modes.append(common.bar_mode.control) parser.g_parser.register_unit(self.displays) parser.g_parser.register_unit(self.workspaces) parser.g_parser.register_unit(self.title) self.register_action('i3-msg' , self.i3msg_comm) self.register_action('mode' , self.set_mode) # Add callbacks for parsing self.i3_ws_obj.change_callbacks.append(self.parse_displays) self.i3_ws_obj.change_callbacks.append(self.parse_workspaces) self.i3_ws_obj.focus_callbacks.append(self.parse_title) # Add callbacks for special actions self.i3_ws_obj.change_callbacks.append(self.set_bg) self.i3_ws_obj.focus_callbacks.append(self.set_keymap) self.i3_ws_obj.focus_callbacks.append(self.kill_floating_windows) def _stop_module(self): parser.g_parser.remove_unit(self.displays) parser.g_parser.remove_unit(self.workspaces) parser.g_parser.remove_unit(self.title) if self.i3_ws_obj is not None: self.i3_ws_obj.quit() # Overload run as i3_ws_obj.work() is a blocking command # Parsing is instead done through callbacks def run(self): common.logger.info('Started module {}'.format(self.__class__.__name__)) common.health_logger.info('Module {} up'.format(self.__class__.__name__)) self.i3_ws_obj.work() # This is a blocking command common.logger.info('Reached end of module {}'.format(self.__class__.__name__)) common.health_logger.info('Module {} down'.format(self.__class__.__name__)) def parse_displays(self, i3ws): parsed_list = [parser.block(click='displays', font='2')] for output in i3ws.outputs: output_name = output.name if output.active: if output_name == 'eDP1': col_head = config.color_head elif output_name == 'DP1': col_head = config.color_vga elif output_name == 'HDMI2': col_head = config.color_hdmi else: col_head = '#00000000' # Undefined parsed_list.append(parser.block(fg=config.color_back, bg=col_head)) parsed_list.append(config.icon_wsp) parsed_list.append(parser.block(click='')) self.displays = ' '.join(parsed_list) def parse_workspaces(self, i3ws): prefix = parser.block(font='1', fg=config.color_back, bg=config.color_head) prefix_foc = ''.join([parser.block(fg = config.color_head, bg=config.color_wsp) , ' ', config.sep_right, ' ' , parser.block(fg=config.color_back, bg=config.color_wsp, font='1')]) prefix_ina = parser.block(fg=config.color_back, bg=config.color_head, font='1', append=' ') wspces = [] for workspace in i3ws.workspaces: # Find out which output the workspace is on output = None # TODO actually use this information for output_ in i3ws.outputs: if output_.name == workspace.output: output = output_ break if not output: continue status = i3ws.state.get_state(workspace, output) # FOC or INA name = workspace.name # e.g. 5 terms current = ''.join([parser.block(click=('i3-msg workspace' + name)) , name, parser.block(click='')]) if status == "FOC": wspces.append(''.join([prefix_foc, current])) else: wspces.append(''.join([prefix_ina, current])) self.workspaces = ''.join([prefix, ' '.join(wspces)]) def parse_title(self, i3ws): if i3ws.focused_window is None: return self.win_title = ' '.join([parser.block(fg=config.color_head, bg=config.color_sec_b2) , config.sep_right, parser.block(fg=config.color_head, bg=config.color_sec_b2, click='mode cycle') , config.icon_prog , parser.block(fg=config.color_sec_b2, bg='-') , i3ws.focused_window.name]) def format_displays(self): return self.displays def format_workspaces(self): return self.workspaces def format_title(self): return self.win_title def img_path(num): dir = '/home/kuba/Obrazy/Wallpapers/' return dir + { 1: '1_main', 2: '2_web', 3: '3_music', 4: '4_work', 5: '5_terms', 6: '6_stats', 7: '7', 8: '8', 9: '9', }.get(int(num), 'default') def set_bg(self, i3ws): cmd_args = ['sh', '/home/kuba/scripts/set_bg.sh'] for output in i3ws.outputs: if output.active: bg = i3Module.img_path(output.current_workspace.partition(' ')[0]) cmd_args.append(bg) subprocess.call(cmd_args) def kill_floating_windows(self, i3ws): if i3ws.focused_window is None: return wclass = i3ws.focused_window.window_class if wclass != 'FLOAT_TERM' and wclass != 'FLOAT_PAVU' and wclass != 'YADWINBR': # Is there a window that the bar has opened? for pid in common.kill_on_unfocus: try: os.kill(pid, signal.SIGTERM) except ProcessLookupError: common.logger.debug('Tried killing process {} but it doesn\'t exist'.format(pid)) common.kill_on_unfocus = [] def set_keymap(self, i3ws): if i3ws.focused_window is None: return wclass = i3ws.focused_window.window_class self.cur_class = wclass if wclass in self.keymaps: new_km = self.keymaps[wclass] common.logger.debug('Setting {} as keymap for {}'.format(new_km, wclass)) else: new_km = self.def_keymap common.logger.debug('Setting default keymap {} for {}'.format(new_km, wclass)) subprocess.call(['/home/kuba/.i3/scripts/lang.sh', 'qset', new_km]) def i3msg_comm(self, *cmd): self.i3_ws_obj.command(' '.join(cmd)) def set_mode(self, new_mode): if new_mode == "cycle": common.mode = common.mode.cycle() for m in common.bar_mode: if new_mode == m.name: common.mode = m break class ScreenModule(LemonModule): # Start, stop and send commands to screen instance # Start detached, in UTF-8 mode. Log to fifo start_flags = ['-d', '-m', '-U', '-L', '-Logfile', config.fifo_screen_log] def __init__(self): super().__init__() def _start_module(self): self.identifier = 'lemonbar_{}_{}'.format(getpass.getuser(), os.getpid()) self.empty_count = 0 common.create_new_fifo(config.fifo_screen_log) self.send(self.start_flags) # Start screen self.send_cmd('stty -echo') # Do not echo what is written to screen terminal self.send_colon('logfile flush 0.1') # Log quickly self.status_handle = open(config.fifo_screen_log, 'r', buffering=1) self.controls = parser.ButtonsUnit('controls', order=-10) self.response = parser.IconTextUnit('response', order=10) self.controls.items = [('', config.icon_prog, 'mode cycle') ,('', '', None) ,('', 'on', 'bluetooth power on') ,('', 'off', 'bluetooth power off') ,('PXC 550', '', None) ,('', 'conn.', 'bluetooth connect pxc550') ,('', 'disc.', 'bluetooth disconnect pxc550') ] self.response.modes = [common.bar_mode.control] self.controls.modes = [common.bar_mode.control] parser.g_parser.register_unit(self.response) parser.g_parser.register_unit(self.controls) self.register_action('bluetooth', self.bluetooth) def _stop_module(self): self.send_colon('kill') self.status_handle.close() with contextlib.suppress(FileNotFoundError): os.remove(config.fifo_screen_log) parser.g_parser.remove_unit(self.response) parser.g_parser.remove_unit(self.controls) def _parse_data(self, data): line = ' '.join(data) line = common.strip_ansi_unicode(line) common.logger.debug('Screen read line {}'.format(line)) if (not line.startswith('[CHG]') and not line.isspace()): self.response.items = [('', line)] if not line: # End loop if many empty lines in a row self.empty_count = self.empty_count + 1 if self.empty_count > 3: common.logger.error('Too many empty lines, aborting') # TODO actually abort else: self.empty_count = 0 def send(self, args): # Send something to our screen instance subprocess.call(['screen', '-S', self.identifier] + args) def send_cmd(self, cmd): # Send terminal input to our screen instance self.send(['-X', 'stuff', cmd + '\n']) def send_colon(self, cmd): # Send colon command to our screen instance self.send(['-X', 'colon', cmd + '\n']) def bluetooth(self, *args): btcargs = [a.replace('pxc550', '00:16:94:22:29:0E') for a in args] inp = ' '.join(btcargs) self.send_cmd('bluetoothctl ' + inp) class PowerOptionsModule(LemonModule): def __init__(self): super().__init__() def _start_module(self): # No external commands needed self.status_handle = None self.power_opts = parser.CustomUnit('power', format_function=self.format_power_opts, order=-1) self.power_opts.modes = [common.bar_mode.power] parser.g_parser.register_unit(self.power_opts) def _stop_module(self): parser.g_parser.remove_unit(self.power_opts) def _parse_data(self, data): pass def format_power_opts(self): return ''.join([parser.block(fg=config.color_fore, bg=config.color_poweropts) , ' Abort (Esc) | System (l) lock, (e) logout, (s) suspend, (h) hibernate' , ', (r) reboot, (Shift+s) shutdown']) def do_action(keyword): global m_datetime, m_conky_fast, m_conky_slow, m_i3ws m_i3ws.do_action(keyword) m_datetime.do_action(keyword) m_conky_slow.do_action(keyword) m_conky_fast.do_action(keyword) m_screen.do_action(keyword) m_power.do_action(keyword) def start_all(): global m_datetime, m_conky_fast, m_conky_slow, m_i3ws, m_screen, m_power m_i3ws = i3Module() m_datetime = DateTimeModule() m_conky_slow = ConkySlowModule() m_conky_fast = ConkyFastModule() m_screen = ScreenModule() m_power = PowerOptionsModule() m_i3ws.start() m_datetime.start() m_conky_slow.start() m_conky_fast.start() m_screen.start() m_power.start() def stop_all(): global m_datetime, m_conky_fast, m_conky_slow, m_i3ws, m_screen, m_power m_i3ws.stop() m_datetime.stop() m_conky_slow.stop() m_conky_fast.stop() m_screen.stop() m_power.stop()