Migrate lemonbar to own repo

This commit is contained in:
2021-01-19 20:57:09 +01:00
parent c70ddcf2a2
commit 1b0c9b6e6a
14 changed files with 4 additions and 1547 deletions

3
.gitmodules vendored Normal file
View File

@@ -0,0 +1,3 @@
[submodule ".i3/lemonbar"]
path = .i3/lemonbar
url = https://isabeljake.duckdns.org/gitea/kuba/lemonbar.git

1
.i3/lemonbar Submodule

Submodule .i3/lemonbar added at 7d48ad9543

View File

@@ -1 +0,0 @@
{"username":"kuben-","auth_type":1,"auth_data":"QVFDYzhMY0tnZ2g2M2N1czFhcXI4c0xRMkQzc3pRdXlFZVVVeEVzN0xRRE1VSnItVmw3Z05XWWhRVUNCX2pqS25mSXhGcVRyV0RFcGtTWVFMR3pzSG1jZVdTY0JnNnJFT0hDdFptdzM="}

View File

@@ -1 +0,0 @@
49151

View File

@@ -1,5 +0,0 @@
#!/bin/bash
acpi | head -n 1 | awk '{pct = substr($4, 1, length($4)-2);\
if($3 ~ "Dis") print "D"pct;\
else if($3 ~ "Cha") printf "C"pct;\
else print "F"substr($5, 1, length($5)-1);}'

View File

@@ -1,8 +0,0 @@
#!/bin/bash
RTN=$(amixer get Master | grep "Front Left: Playback" | \
awk -F'[]%[]' '{if ($5 == "off") {print "MUTE"} else {printf "%d", $2}}')
if [[ -z $RTN ]]; then
echo "NONE"
else
echo "$RTN"
fi

View File

@@ -1,77 +0,0 @@
import threading
import subprocess, os
import re # regexp
from enum import Enum
import i3_lemonbar_config as config
kill_on_unfocus = []
# Loggers, initialized in main function
logger = None
health_logger = None
# TODO bar mode can be moved to i3Module
class bar_mode(Enum):
power, normal, control = range(-1,2) # Don't cycle through power
def cycle(self):
try:
return bar_mode(self.value + 1)
except ValueError:
return bar_mode(0)
mode = bar_mode.normal
# Helper functions
def hostname():
return os.uname()[1]
ansi_escape = re.compile(r'(\x9B|\x1B\[)[0-?]*[ -/]*[@-~]')
def strip_ansi_unicode(s):
# ANSI escape sequences are for colors in terminal and similar
strip_ansi = ansi_escape.sub('', s)
strip_unicode = (strip_ansi.encode('ascii', 'ignore')).decode('utf-8')
return strip_unicode
def create_new_fifo(fifo_file):
"""
Create new fifo file, removing old one if it exists
"""
try:
os.remove(fifo_file)
logger.debug('''Removed old fifo file''')
except OSError:
logger.debug('''No old fifo file, good''')
try:
os.mkfifo(fifo_file)
except OSError:
logger.error('''Failed, couldn't create fifo file''')
os.remove(config.pid_file) # Clean up own PID file
sys.exit(0)
class Shelf():
# A one-slot queue implemented with locks and condition objects
def __init__(self, item = None):
self.lock = threading.Lock()
self.item_ready = threading.Condition(self.lock)
self.item = item
def get(self):
# Consumes item
self.item_ready.acquire()
if self.item is None:
# Wait until becomes ready
self.item_ready.wait()
ret = self.item
self.item = None
self.item_ready.release()
return ret
def put(self, item):
# Discards previous item
self.item_ready.acquire()
self.item = item
self.item_ready.notify()
self.item_ready.release()

View File

@@ -1,86 +0,0 @@
import subprocess
import getpass
# File locations
pid_file = '/tmp/i3_lemonbar_launcher.pid'
fifo_file_status = '/tmp/i3_lemonbar1_{}'.format(getpass.getuser())
fifo_screen_log = '/tmp/i3_screen_{}'.format(getpass.getuser())
health_file = '/tmp/i3_lemonbar_health.info'
# color definitions
color_back = "#FF1D1F21" # Default background
color_fore = "#FFC5C8C6" # Default foreground
color_poweropts = "#FF620A00"
color_head = "#FFB5BD68" # Background for first element
color_sec_b1 = "#FF282A2E" # Background for section 1
color_sec_b2 = "#FF454A4F" # Background for section 2
color_sec_b3 = "#FF60676E" # Background for section 3
color_vga = "#FF3F8C0F"
color_hdmi = "#FFDD5435"
color_icon = "#FF979997" # For icons
color_cpu = "#FF5F819D" # Background color for cpu alert
color_net = "#FF5E8D87" # Background color for net alert
color_disable = "#FF1D1F21" # Foreground for disable elements
color_wsp = "#FF8C9440" # Background for selected workspace
# Lemonbar settings
geometry="x24"
fonts = ["Hack:pixelsize=12" #":style=Bold"
,"Font Awesome 5 Free Solid:pixelsize=16"
,"Font Awesome 5 Brands:pixelsize=16"]
lemonbar_args = ['lemonbar', '-p', '-g', geometry, '-B', color_back
, '-F', color_fore, '-a' '20']
for font in fonts:
lemonbar_args.append('-f')
lemonbar_args.append(font)
# Misc. settings
cpu_alert = 75
net_alert = 5
#default space between sections
#if [ ${res_w} -gt 1024 ]; then
# stab=' '
#else
# stab=' '
#fi
# Icon definitions
sep_left = "" # Powerline separator left alt. 
sep_right = "" # Powerline separator right
sep_l_left = "" # Powerline light separator left
sep_l_right = "" # Powerline light sepatator right
# Icon glyphs from Terminusicons2
icon_clock = "" # Clock icon
icon_cpu = "" # CPU icon
icon_mem = "" # MEM icon
icon_dl = "" # Download icon
icon_ul = "" # Upload icon
icon_vol = "" # Volume icon
icon_vol_low = "" # Volume icon
icon_vol_mute = "" # Volume icon
icon_hd = " [/]" # HD / icon
icon_home = " [/home]" # HD /home icon
icon_mail = "" # Mail icon
icon_chat = "Ò" # IRC/Chat icon
icon_music = "Î" # Music icon
icon_prog = "" # Window icon alt. 
icon_contact = "Á" # Contact icon
icon_wsp = "" # Workspace icon
icon_wlan = ""
icon_eth = ""
icon_lang = ""
icon_keyset = ""
icon_bright = ""
icon_brightness = ''
icon_charged = ""
icon_charging = ""
icon_batt_0 = ""
icon_batt_1 = ""
icon_batt_2 = ""
icon_batt_3 = ""
icon_batt_4 = ""
icon_bluetooth = ""

View File

@@ -1,220 +0,0 @@
import fcntl, sys, os, time, logging
import queue
import signal, atexit
import subprocess
import contextlib
from threading import Thread
import argparse
import i3_lemonbar_config as config
import i3_lemonbar_common as common
import i3_lemonbar_modules as modules
import i3_lemonbar_parser as lemonparser
def assert_only_instance():
"""
If PID file exists:
Look for process with given PID
If found:
Exit program
If not found:
Delete PID file and continue
Look for fifo file
If exists:
Delete it
"""
try:
pid = None
with open(config.pid_file, 'r') as fp:
pid = int(fp.read())
except IOError:
common.logger.debug('Could not open PID file. Assuming non existent')
except ValueError:
common.logger.debug('''PID file contents broken''')
try:
os.remove(config.pid_file)
common.logger.debug('''Deleted old PID file, continuing as usual''')
except OSError:
common.logger.debug('''Failed deleting old PID file.''')
os._exit(1)
if pid is not None:
try:
common.logger.debug('''Found old PID file. Looking for owner''')
os.kill(pid, 0)
common.logger.debug('''Owner exists''')
common.logger.debug('''Failed, another instance of the launcher is running
(PID {})'''.format(pid))
os._exit(1)
except ProcessLookupError:
common.logger.debug('''Owner does not exist''')
try:
os.remove(config.pid_file)
common.logger.debug('''Deleted old PID file, continuing as usual''')
except OSError:
common.logger.debug('''Failed deleting old PID file.''')
os._exit(1)
with open(config.pid_file, 'w+') as fp:
fp.write('{:d}'.format(os.getpid()))
common.logger.debug('''Created and wrote to PID file''')
common.create_new_fifo(config.fifo_file_status)
def handle_exit(signum, frame):
common.logger.info('Signal handler called with signal {}'.format(signum))
common.logger.info('Calling os._exit(0)')
os._exit(0)
# Terminates process p nicely
def nice_term(p):
if p is not None:
p.terminate()
def nice_delete(f):
with contextlib.suppress(FileNotFoundError):
os.remove(f)
def clean_up():
common.logger.debug('Cleaning up')
nice_delete(config.pid_file)
nice_delete(config.fifo_file_status)
modules.stop_all()
os._exit(0)
def keep_fifo_open():
with open(config.fifo_file_status, 'w', buffering=1) as fifo_write:
while True:
fifo_write.write('HEARTBEAT\n')
time.sleep(30)
class LemonbarWrapper:
def __init__(self, only_instance = True):
assert only_instance, 'Multiple instances not supported' # TODO
assert_only_instance() # Creates pid file and fifo file
self.buffer_in = common.Shelf()
self.all_threads = []
# Start writing and reading threads
# Create readers before writers
self.p_lemonbar = subprocess.Popen(config.lemonbar_args, stdin=subprocess.PIPE
, stdout=subprocess.PIPE, text=True)
self.start_thread(target = self.exec_commands, desc='Execute commands thread')
self.start_thread(target = self.write_parsed, desc='Write parsed status thread')
self.start_thread(target = keep_fifo_open, desc='Keep fifo open thread')
self.start_thread(target = self.put_fifo_in_queue, desc='Put fifo entries in queue thread')
modules.start_all(self)
common.logger.debug('Threads started')
# Helpers to start threads
def start_thread(self, target, desc):
# Wrapper around arbitrary target
def run_thread(target, desc):
target()
common.health_logger.info('"%s" reached end', desc)
thread = Thread(target = run_thread, args=(target, desc))
self.all_threads.append(thread)
common.health_logger.info('"%s" starting', desc)
thread.start()
def join(self):
for thread in self.all_threads:
thread.join()
# Thread targets
def write_parsed(self):
# Write parse entries in queue to lemonbar
while True:
data = self.buffer_in.get() # Blocking read
self.p_lemonbar.stdin.write(data + '\n')
self.p_lemonbar.stdin.flush()
#common.logger.debug('Read: "{0}"'.format(data))
#common.logger.debug('Parsed "{0}"'.format(psd))
def exec_commands(self):
while True:
data = self.p_lemonbar.stdout.readline()
if not data:
common.logger.debug('Lemonbar closed, exiting')
common.health_logger.info('Lemonbar closed, exiting')
clean_up()
break
common.logger.debug('Trying reading: "{0}"'.format(data.strip('\n')))
try:
modules.do_action(data)
except Exception as e:
common.logger.debug('Exception occured executing command\n Line in: {}'.format(data))
common.logger.debug(str(e))
if len(data) == 0:
common.logger.debug("Lemonbar output closed")
break
common.logger.debug('Read: "{0}"'.format(data.strip('\n')))
def put_fifo_in_queue(self):
with open(config.fifo_file_status, 'r', buffering=1) as fifo_read:
# Let parser read from fifo
common.logger.debug("FIFO {} opened for reading".format(config.fifo_file_status))
while True:
try:
data = fifo_read.readline() # Blocking read
if len(data) == 0:
common.logger.debug("Writer closed")
break
data = data.rstrip() # Remove trailing newlines
self.buffer_in.put(lemonparser.parse_line(data))
except BrokenPipeError:
common.logger.debug('Broken pipe in parse status thread, exiting')
common.health_logger.info('Broken pipe in parse status thread, exiting')
clean_up()
except Exception as e:
common.logger.debug('Unknown exception in parse status thread, exiting')
common.health_logger.info('Unknown exception in parse status thread, exiting')
raise
common.logger.debug(str(e))
clean_up()
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument('--debug', action='store_true')
args = parser.parse_args()
if(args.debug):
debuglvl = logging.DEBUG
else:
debuglvl = logging.INFO
# Setup logger to stdout
formatter = logging.Formatter('%(asctime)s %(levelname)s %(message)s')
handler = logging.StreamHandler(sys.stdout)
handler.setFormatter(formatter)
common.logger = logging.getLogger('Normal logger')
common.logger.setLevel(debuglvl)
common.logger.addHandler(handler)
# Setup health logger to file in tmp
formatter = logging.Formatter('%(asctime)s %(message)s')
handler = logging.FileHandler(config.health_file)
handler.setFormatter(formatter)
common.health_logger = logging.getLogger('Health logger')
common.health_logger.addHandler(handler)
# Ensure clean up on exit
atexit.register(clean_up)
signal.signal(signal.SIGTERM, handle_exit)
signal.signal(signal.SIGINT, handle_exit)
# Start the lemonbar wrapper
LemonbarWrapper().join()
common.logger.debug('Reached end')

View File

@@ -1,719 +0,0 @@
import threading
import subprocess
import contextlib
import time, os, signal, getpass
import sys, inspect, re
import i3_lemonbar_config as config
import i3_lemonbar_common as common
import i3_lemonbar_parser as parser
import i3_workspaces as wspaces
class LemonModule(threading.Thread):
# Module spawns process that generates some status message, which the module parses and
# sends to lemonbar queue. On click actions are registered with the module.
# Every module runs in its own thread
def units(self):
# Iterate over units
for _, attr_instance in self.__dict__.items():
if isinstance(attr_instance, parser.LemonUnit):
yield attr_instance
def __init__(self, lemonbar_wrapper):
self.actions = {}
super().__init__()
self.lemonbar_wrapper = lemonbar_wrapper
self._start_module()
for unit in self.units():
parser.g_parser.register_unit(unit)
def run(self):
if self.status_handle is None:
return
common.logger.info('Started module {}'.format(self.__class__.__name__))
common.health_logger.info('Module {} up'.format(self.__class__.__name__))
while True:
line = self.status_handle.readline()
if not line:
common.logger.info('Reached end of module {}'.format(self.__class__.__name__))
common.health_logger.info('Module {} down'.format(self.__class__.__name__))
break
parsed = self.parse(line)
self.lemonbar_wrapper.buffer_in.put(parsed)
def stop(self):
self._stop_module()
for unit in self.units():
parser.g_parser.remove_unit(unit)
def parse(self, line):
data = line.split()
self._parse_data(data) # Update correct field
formatted_line = parser.format_line() # Construct entire line
return formatted_line
def register_action(self, keyword, do_action):
self.actions[keyword] = do_action
def do_action(self, command):
action_arr = command.split()
action = action_arr[0]
if (action in self.actions):
func = self.actions[action]
func(*action_arr[1:]) # Unpack arguments, if any
def format_load(data, module, alert):
# Helper function to format network modules
# Changes colors scheme to inactive when interfaces are down, or to alert when
# alert level is reached
# Returns tuple (down, up)
if data[0] == 'down':
module.alt_scheme = parser.COLOR_SCHEME.INA
return ('x', 'x')
else:
(d_v, u_v) = (data[0],data[1])
if max(float(d_v), float(u_v)) > float(alert):
module.alt_scheme = parser.COLOR_SCHEME.NET_ALERT
else:
# Reset to default
module.alt_scheme = None
return (d_v, u_v)
class DateTimeModule(LemonModule):
def __init__(self, lemonbar_wrapper):
super().__init__(lemonbar_wrapper)
def _start_module(self):
class DateTimeThread(threading.Thread):
def __init__(self, secs_mode = False):
self.secs_mode = secs_mode
self.last_output = ''
self.shelf = common.Shelf()
self.readline = self.shelf.get
self.clock_tick = threading.Condition(self.shelf.lock) # Needed, because sleep may be
# interrupted when user changes H:M to H:M:S
super().__init__()
def run(self):
while True:
fmt = "%a %d %b %H:%M:%S" if self.secs_mode else "%a %d %b %H:%M"
timestr = time.strftime(fmt, time.localtime())
if timestr != self.last_output:
self.last_output = timestr
self.shelf.put(timestr)
self.sleep_until_change()
def sleep_until_change(self):
# Sleep for an appropriate amount of time (depending on if showing seconds or not)
# with the possibility of being woken early
if self.secs_mode:
rem = 0.1 # TODO fiddle with this, benchmarking it
else:
rem = time.time() % 5.0 # 5 seconds, 60 is unnecessarily long
self.clock_tick.acquire()
self.clock_tick.wait(rem)
self.clock_tick.release()
def wake(self):
self.clock_tick.acquire()
self.clock_tick.notify()
self.clock_tick.release()
self.dt_thread = DateTimeThread()
self.dt_thread.start()
self.status_handle = self.dt_thread # Only readline() is used
self.date = parser.IconTextUnit('date', action='date', order=40)
self.time = parser.IconTextUnit('time', action='toggle_secs', order=41
, alt_scheme=parser.COLOR_SCHEME.SPECIAL)
self.time.modes = [mode for mode in common.bar_mode]
self.register_action('date' , self.date_comm)
self.register_action('toggle_secs' , self.toggle_secs)
def _stop_module(self):
if self.p_handle is not None:
self.p_handle.terminate()
def _parse_data(self, data):
# Date and time
self.date.items = [(config.icon_clock, ' '.join(data[0:3]))]
self.time.items = [('', data[3])]
def date_comm(self):
subprocess.Popen(['yad', '--no-buttons', '--calendar', '--sticky'
, '--on-top' , '--class' , '"YADWIN"', '--posx=1650', '--posy=24'
, '--close-on-unfocus'])
def toggle_secs(self):
self.dt_thread.secs_mode = not self.dt_thread.secs_mode
self.dt_thread.wake()
def conky_config(update_interval=5):
return """
conky.config = {{
background=false,
update_interval={},
total_run_times=0,
override_utf8_locale=true,
short_units=true,
uppercase=false,
out_to_console=true,
out_to_x=false,
if_up_strictness='address',
format_human_readable=true,
}}
""".format(update_interval)
def conky_net(iface):
return """\\
${{if_up {iface}}}${{downspeedf {iface}}} ${{upspeedf {iface}}}\\
${{else}}down down${{endif}} """.format(iface=iface)
conky_begin_body = """
conky.text = [["""
conky_end_body = """
]]"""
class ConkyFastModule(LemonModule):
def __init__(self, lemonbar_wrapper):
self.conky_config = (conky_config(update_interval=1)
+ conky_begin_body
+ """\\
${exec ~/.i3/lemonbar/get_vol.sh} """
+ ( conky_net('enp5s0') if common.hostname() == 'kubaDesktop' else
(conky_net('wlp3s0')
+ conky_net('enp2s0')))
+ conky_end_body)
super().__init__(lemonbar_wrapper)
def _start_module(self):
self.p_handle = subprocess.Popen(['conky', '-c', '-'],
stdin=subprocess.PIPE, stdout=subprocess.PIPE, text=True)
self.p_handle.stdin.write(self.conky_config)
self.p_handle.stdin.close() # Otherwise conky doesn't load
self.status_handle = self.p_handle.stdout
self.eth_load = parser.IconTextUnit('eth_load', action='eth', order=14)
self.volume = parser.IconTextUnit('volume', action='pavu', order=20)
self.register_action('pavu', self.pavu)
if common.hostname() != 'kubaDesktop':
self.wlan_load = parser.IconTextUnit('wlan_load', action='wlan', order=13)
self.register_action('wlan', self.nmtui)
self._parse_data = self._parse_data_w_wlan
else:
self._parse_data = self._parse_data_wo_wlan
def _stop_module(self):
if self.p_handle is not None:
self.p_handle.terminate()
def _parse_data_w_wlan(self, data):
self.parse_vol (data[0])
self.parse_wlan (data[1:3])
self.parse_eth (data[3:5])
def _parse_data_wo_wlan(self, data):
self.parse_vol (data[0])
self.parse_eth (data[1:3])
def parse_wlan(self, data):
(wland_v, wlanu_v) = format_load(data, self.wlan_load, config.net_alert)
self.wlan_load.items = [(config.icon_wlan + config.icon_dl, wland_v)
,(config.icon_ul, wlanu_v)]
def parse_eth(self, data):
(ethd_v, ethu_v) = format_load(data, self.eth_load, config.net_alert)
self.eth_load.items = [(config.icon_eth + config.icon_dl, ethd_v)
,(config.icon_ul, ethu_v)]
def parse_vol(self, data):
mute = data == 'MUTE' or data == 'NONE'
(vol,vols) = (-1,'×') if mute else (int(data), data+'%')
icon_v = (config.icon_vol_mute if vol == 0 else
config.icon_vol_low if vol < 50 else config.icon_vol)
self.volume.items = [(icon_v, vols)]
def nmtui(self):
p = subprocess.Popen(['xterm', '-class', 'FLOAT_TERM', '-e', 'nmtui'])
common.kill_on_unfocus.append(p.pid)
def pavu(self):
p = subprocess.Popen(['pavucontrol', '--class=FLOAT_PAVU'])
common.kill_on_unfocus.append(p.pid)
class ConkySlowModule(LemonModule):
def __init__(self, lemonbar_wrapper):
self.conky_config = (conky_config()
+ conky_begin_body
+ """\\
${cpu} \\
${mem} \\
${fs_used_perc /} \\
${fs_used_perc /home} """
+ ("""\\
${exec ~/.i3/lemonbar/get_bat.sh} \\
${exec brillo} """ if common.hostname() != 'kubaDesktop' else '')
+ """\\
${exec /home/kuba/.i3/scripts/lang.sh show} \\
${exec ~/.i3/lemonbar/get_vol.sh}"""
+ conky_end_body)
super().__init__(lemonbar_wrapper)
def _start_module(self):
self.p_handle = subprocess.Popen(['conky', '-c', '-'],
stdin=subprocess.PIPE, stdout=subprocess.PIPE, text=True)
self.p_handle.stdin.write(self.conky_config)
self.p_handle.stdin.close() # Otherwise conky doesn't load
self.status_handle = self.p_handle.stdout
self.sys_load = parser.IconTextUnit('sys_load', action='load', order=10)
self.disk = parser.IconTextUnit('disk', order=11)
self.language = parser.IconTextUnit('language', action='lang', order=32
, external={'LANG': self.parse_language})
self.register_action('load', self.htop)
self.register_action('lang', self.lang_comm)
if common.hostname() != 'kubaDesktop':
self.brightness = parser.IconTextUnit('brightness', action='adj_br', order=21
, external={'BRIGHT': self.parse_brightness})
self.battery = parser.IconTextUnit('battery', action='dpms', order=22)
self.register_action('adj_br' , self.adj_br)
self.register_action('dpms' , self.dpms_comm)
self._parse_data = self._parse_data_w_batt
else:
self._parse_data = self._parse_data_wo_batt
def _stop_module(self):
if self.p_handle is not None:
self.p_handle.terminate()
def _parse_data_w_batt(self, data):
self.parse_sys_load (data[0:2]) # System load
self.parse_disk (data[2:4]) # Disk usage
self.parse_battery (data[4:5]) # Battery
self.parse_brightness (data[5:6]) # Screen brightness
self.parse_language (data[6:7]) # Language
def _parse_data_wo_batt(self, data):
self.parse_sys_load (data[0:2]) # System load
self.parse_disk (data[2:4]) # Disk usage
self.parse_language (data[4:5]) # Language
def parse_sys_load(self, data):
if int(data[0]) > int(config.cpu_alert):
self.sys_load.alt_scheme = parser.COLOR_SCHEME.CPU_ALERT
else:
self.sys_load.alt_scheme = None
self.sys_load.items = [(config.icon_cpu, data[0] + '%')
,(config.icon_ul, data[1])]
def parse_disk(self, data):
self.disk.items = [(config.icon_hd , data[0] + '%')
,(config.icon_home, data[1] + '%')]
def parse_battery(self, data):
(batt_stat, batt) = (data[0][0], data[0][1:])
batt_i = int(batt)
icon_batt = config.icon_charging if batt_stat == 'C' else \
config.icon_charged if batt_stat == 'F' else \
config.icon_batt_0 if batt_i < 20 else \
config.icon_batt_1 if batt_i < 40 else \
config.icon_batt_2 if batt_i < 60 else \
config.icon_batt_3 if batt_i < 80 else \
config.icon_batt_4
self.battery.items = [(icon_batt, batt+'%')]
def parse_brightness(self, data):
brtxt = str(int(float(data[0])))
self.brightness.items = [(config.icon_bright, brtxt+'%')]
def parse_language(self, data):
self.language.items = [(config.icon_lang, data[0])]
def htop(self):
p = subprocess.Popen(['xterm', '-class', 'FLOAT_TERM', '-e', 'htop'])
common.kill_on_unfocus.append(p.pid)
def lang_comm(self):
# TODO Connect to i3Module (or let i3Module connect here to update the keymap)
subprocess.Popen(['sh', '/home/kuba/.i3/scripts/lang.sh', 'next'])
def dpms_comm(self):
subprocess.Popen(['sh', '/home/kuba/.i3/scripts/dpmsctl.sh'])
def adj_br(self):
subprocess.Popen(['/home/kuba/.i3/scripts/adjbr.sh', '-b', config.fifo_file_executor])
class i3Module(LemonModule):
# Handles outputs (displays), workspaces and active window
def __init__(self, lemonbar_wrapper):
super().__init__(lemonbar_wrapper)
self.displays = ''
self.win_title = ''
self.workspaces = ''
self.def_keymap = 'pl'
self.keymaps = {'firefox': 'se'}
self.cur_class = ''
def _start_module(self):
self.i3_ws_obj = wspaces.i3ws(logger=common.logger)
self.displays = parser.CustomUnit('displays', format_function = self.format_displays, order=-30)
self.workspaces = parser.CustomUnit('workspaces', format_function = self.format_workspaces, order=-20)
self.title = parser.CustomUnit('title', format_function = self.format_title, order=-10)
self.displays.modes.append(common.bar_mode.control)
self.workspaces.modes.append(common.bar_mode.control)
self.register_action('i3-msg' , self.i3msg_comm)
self.register_action('mode' , self.set_mode)
# Add callbacks for parsing
self.i3_ws_obj.change_callbacks.append(self.parse_displays)
self.i3_ws_obj.change_callbacks.append(self.parse_workspaces)
self.i3_ws_obj.focus_callbacks.append(self.parse_title)
# Callback after all other actions should flush the bar
self.i3_ws_obj.final_callback = self.update_bar
# Add callbacks for special actions
self.i3_ws_obj.change_callbacks.append(self.set_bg)
self.i3_ws_obj.focus_callbacks.append(self.set_keymap)
self.i3_ws_obj.focus_callbacks.append(self.kill_floating_windows)
def _stop_module(self):
if self.i3_ws_obj is not None:
self.i3_ws_obj.quit()
# Overload run as i3_ws_obj.work() is a blocking command
# Parsing is instead done through callbacks
def run(self):
common.logger.info('Started module {}'.format(self.__class__.__name__))
common.health_logger.info('Module {} up'.format(self.__class__.__name__))
try:
self.i3_ws_obj.work() # This is a blocking command
except ConnectionError:
common.logger.info('i3ws failed connecting to i3 socket')
common.logger.info('Reached end of module {}'.format(self.__class__.__name__))
common.health_logger.info('Module {} down'.format(self.__class__.__name__))
def parse_displays(self, i3ws):
parsed_list = [parser.block(click='displays', font='2')]
for output in i3ws.outputs:
output_name = output.name
if output.active:
if output_name == 'eDP1':
col_head = config.color_head
elif output_name == 'DP1':
col_head = config.color_vga
elif output_name == 'HDMI2':
col_head = config.color_hdmi
else:
col_head = '#00000000' # Undefined
parsed_list.append(parser.block(fg=config.color_back, bg=col_head))
parsed_list.append(config.icon_wsp)
parsed_list.append(parser.block(click=''))
self.displays = ' '.join(parsed_list)
def parse_workspaces(self, i3ws):
prefix = parser.block(font='1', fg=config.color_back, bg=config.color_head)
prefix_foc = ''.join([parser.block(fg = config.color_head, bg=config.color_wsp)
, ' ', config.sep_right, ' '
, parser.block(fg=config.color_back, bg=config.color_wsp, font='1')])
prefix_ina = parser.block(fg=config.color_back, bg=config.color_head, font='1', append=' ')
wspces = []
for workspace in i3ws.workspaces:
# Find out which output the workspace is on
output = None # TODO actually use this information
for output_ in i3ws.outputs:
if output_.name == workspace.output:
output = output_
break
if not output:
continue
status = i3ws.state.get_state(workspace, output) # FOC or INA
name = workspace.name # e.g. 5 terms
current = ''.join([parser.block(click=('i3-msg workspace' + name))
, name, parser.block(click='')])
if status == "FOC":
wspces.append(''.join([prefix_foc, current]))
else:
wspces.append(''.join([prefix_ina, current]))
self.workspaces = ''.join([prefix, ' '.join(wspces)])
def parse_title(self, i3ws):
if i3ws.focused_window is None:
return
self.win_title = ' '.join([parser.block(fg=config.color_head, bg=config.color_sec_b2)
, config.sep_right, parser.block(fg=config.color_head, bg=config.color_sec_b2, click='mode cycle')
, config.icon_prog
, parser.block(fg=config.color_sec_b2, bg='-')
, i3ws.focused_window.name])
def update_bar(self, i3ws):
parsed = parser.format_line() # Construct entire line
self.lemonbar_wrapper.buffer_in.put(parsed)
def format_displays(self):
return self.displays
def format_workspaces(self):
return self.workspaces
def format_title(self):
return self.win_title
def img_path(num):
dir = '/home/kuba/Obrazy/Wallpapers/'
return dir + {
1: '1_main',
2: '2_web',
3: '3_music',
4: '4_work',
5: '5_terms',
6: '6_stats',
7: '7',
8: '8',
9: '9',
}.get(int(num), 'default')
def set_bg(self, i3ws):
cmd_args = ['sh', '/home/kuba/scripts/set_bg.sh']
for output in i3ws.outputs:
if output.active:
bg = i3Module.img_path(output.current_workspace.partition(' ')[0])
cmd_args.append(bg)
subprocess.call(cmd_args)
def kill_floating_windows(self, i3ws):
if i3ws.focused_window is None:
return
wclass = i3ws.focused_window.window_class
if wclass != 'FLOAT_TERM' and wclass != 'FLOAT_PAVU' and wclass != 'YADWINBR':
# Is there a window that the bar has opened?
for pid in common.kill_on_unfocus:
try:
os.kill(pid, signal.SIGTERM)
except ProcessLookupError:
common.logger.debug('Tried killing process {} but it doesn\'t exist'.format(pid))
common.kill_on_unfocus = []
def set_keymap(self, i3ws):
if i3ws.focused_window is None:
return
wclass = i3ws.focused_window.window_class
self.cur_class = wclass
if wclass in self.keymaps:
new_km = self.keymaps[wclass]
common.logger.debug('Setting {} as keymap for {}'.format(new_km, wclass))
else:
new_km = self.def_keymap
common.logger.debug('Setting default keymap {} for {}'.format(new_km, wclass))
subprocess.call(['/home/kuba/.i3/scripts/lang.sh', 'qset', new_km]) # TODO rework lang script (after fifo for executing commands)
def i3msg_comm(self, *cmd):
self.i3_ws_obj.command(' '.join(cmd))
def set_mode(self, new_mode):
if new_mode == "cycle":
common.mode = common.mode.cycle()
for m in common.bar_mode:
if new_mode == m.name:
common.mode = m
break
parsed = parser.format_line()
self.lemonbar_wrapper.buffer_in.put(parsed)
class ScreenModule(LemonModule):
"""
@ignore host kubaArch-Desktop
"""
# Start, stop and send commands to screen instance
# Start detached, in UTF-8 mode. Log to fifo
start_flags = ['-d', '-m', '-U', '-L', '-Logfile', config.fifo_screen_log]
def __init__(self, lemonbar_wrapper):
super().__init__(lemonbar_wrapper)
def _start_module(self):
self.identifier = 'lemonbar_{}_{}'.format(getpass.getuser(), os.getpid())
self.empty_count = 0
common.create_new_fifo(config.fifo_screen_log)
self.send(self.start_flags) # Start screen
self.send_cmd('stty -echo') # Do not echo what is written to screen terminal
self.send_colon('logfile flush 0.1') # Log quickly
self.status_handle = open(config.fifo_screen_log, 'r', buffering=1)
self.controls = parser.ButtonsUnit('controls', order=-10)
self.response = parser.IconTextUnit('response', order=10)
self.controls.items = [('', config.icon_prog, 'mode cycle')
,('', '', None)
,('', 'on', 'bluetooth power on')
,('', 'off', 'bluetooth power off')
,('PXC 550', '', None)
,('', 'conn.', 'bluetooth connect pxc550')
,('', 'disc.', 'bluetooth disconnect pxc550')
]
self.response.modes = [common.bar_mode.control]
self.controls.modes = [common.bar_mode.control]
self.register_action('bluetooth', self.bluetooth)
def _stop_module(self):
self.send_colon('kill')
self.status_handle.close()
with contextlib.suppress(FileNotFoundError):
os.remove(config.fifo_screen_log)
def _parse_data(self, data):
line = ' '.join(data)
line = common.strip_ansi_unicode(line)
common.logger.debug('Screen read line {}'.format(line))
if (not line.startswith('[CHG]')
and not line.isspace()):
self.response.items = [('', line)]
if not line:
# End loop if many empty lines in a row
self.empty_count = self.empty_count + 1
if self.empty_count > 3:
common.logger.error('Too many empty lines, aborting')
# TODO actually abort
else:
self.empty_count = 0
def send(self, args):
# Send something to our screen instance
subprocess.call(['screen', '-S', self.identifier] + args)
def send_cmd(self, cmd):
# Send terminal input to our screen instance
self.send(['-X', 'stuff', cmd + '\n'])
def send_colon(self, cmd):
# Send colon command to our screen instance
self.send(['-X', 'colon', cmd + '\n'])
def bluetooth(self, *args):
btcargs = [a.replace('pxc550', '00:16:94:22:29:0E') for a in args]
inp = ' '.join(btcargs)
self.send_cmd('bluetoothctl ' + inp)
class PowerOptionsModule(LemonModule):
def __init__(self, lemonbar_wrapper):
super().__init__(lemonbar_wrapper)
def _start_module(self):
# No external commands needed
self.status_handle = None
self.power_opts = parser.CustomUnit('power', format_function=self.format_power_opts, order=-1)
self.power_opts.modes = [common.bar_mode.power]
def _stop_module(self):
pass
def _parse_data(self, data):
pass
def format_power_opts(self):
return ''.join([parser.block(fg=config.color_fore, bg=config.color_poweropts)
, ' Abort (Esc) | System (l) lock, (e) logout, (s) suspend, (h) hibernate'
, ', (r) reboot, (Shift+s) shutdown'])
def filter_ignored_modules(module_classes):
re_ignore = re.compile(r'\s*@ignore host (\S+)')
hostname = common.hostname()
filtered = module_classes.copy()
for module in module_classes:
doc = module.__doc__
# Search docstring for exceptions based on hostname
if not doc:
continue
for line in doc.split('\n'):
m = re_ignore.match(line)
if m:
ignored = m.group(1)
if ignored == hostname:
common.logger.debug('Ignoring module {} on host {}'.format(module.__name__, hostname))
filtered.remove(module)
continue
return filtered
def get_active_modules():
clsmembers = inspect.getmembers(sys.modules[__name__], inspect.isclass)
modules = []
for name, cls in clsmembers:
if LemonModule in inspect.getmro(cls) and cls is not LemonModule:
modules.append(cls)
modules = filter_ignored_modules(modules)
return modules
def do_action(keyword):
global all_modules
for module in all_modules:
module.do_action(keyword)
def start_all(lemonbar_wrapper):
global all_modules
all_modules = []
for cls in get_active_modules():
all_modules.append(cls(lemonbar_wrapper))
for module in all_modules:
module.start()
def stop_all():
global all_modules
for module in all_modules:
module.stop()

View File

@@ -1,200 +0,0 @@
import sys
import bisect # Sorting of list
from enum import Enum
import i3_lemonbar_config as config
import i3_lemonbar_common as common
sr = config.sep_right
slr = config.sep_l_right
sl = config.sep_left
sll = config.sep_l_left
class LemonUnit:
def __lt__(self, other):
return self.order < other.order
class IconTextUnit(LemonUnit):
def __init__(self, name, order, action = None, alt_scheme=None, external=None):
self.name = name
self.action = action
self.order = order
self.alt_scheme = alt_scheme # None means default
self.external = external
self.items = [] # List of tuples (icon, text)
self.modes = [common.bar_mode.normal]
def format(self):
close = block(click='') if self.action is not None else ''
b_color = self.alt_scheme.back_color
i_color = self.alt_scheme.icon_color
t_color = self.alt_scheme.text_color
# Start with a major separator. Keep previous background color, set foreground color
# of separator to background color of this unit
blocks = []
blocks.append(block(fg=b_color, append=sl))
blocks.append(' ') # TODO perhaps more nice solution than manual spacing
if len(self.items) >= 1:
(icon, text) = self.items[0]
blocks.append(block(click=self.action, fg=i_color, bg=b_color, font='2'
, append=' ' + icon))
blocks.append(' ')
blocks.append(block(fg=t_color, font='1', append=text))
if len(self.items) >= 2:
for item in self.items[1:]:
(icon, text) = item
# Append minor separator
blocks.append(' ')
blocks.append(sll)
blocks.append(block(fg=i_color, bg=b_color, font='2', append=' ' + icon))
blocks.append(' ')
blocks.append(block(fg=t_color, font='1', append=text))
if len(self.items) >= 1:
blocks.append(close)
return ''.join(blocks)
class ButtonsUnit(LemonUnit):
def __init__(self, name, order, alt_scheme=None, external=None):
self.name = name
self.order = order
self.alt_scheme = alt_scheme # None means default
self.external = external
self.items = [] # List of tuples (icon, text, action)
self.modes = [common.bar_mode.normal]
def format(self):
b_color = self.alt_scheme.back_color
i_color = self.alt_scheme.icon_color
t_color = self.alt_scheme.text_color
# Start with a major separator. Keep previous background color, set foreground color
# of separator to background color of this unit
blocks = []
blocks.append(block(fg=b_color, append=sl))
blocks.append(' ')
for item in self.items:
(icon, text, action) = item
close = block(click='') if action is not None else ''
blocks.append(block(click=action, fg=i_color, bg=b_color, font='2'
, append=' ' + icon))
blocks.append(' ')
blocks.append(block(fg=t_color, font='1', append=text))
blocks.append(close)
return ''.join(blocks)
class CustomUnit(LemonUnit):
def __init__(self, name, format_function, order, action = None, alt_scheme=None, external=None):
self.name = name
self.action = action
self.order = order
self.alt_scheme = alt_scheme # None means default
self.external = external
self.format = format_function
self.items = [] # List of tuples (icon, text)
self.modes = [common.bar_mode.normal]
class LemonParser:
# Handle parsing of units
# Apply alternating colors
# Contains list of units
def __init__(self):
self.units = []
def register_unit(self, unit):
# Keep the list sorted
bisect.insort_left(self.units, unit)
def remove_unit(self, unit):
self.units.remove(unit)
def format(self):
formatted = ['%{l}'] # Start left-justified
previously_left_justified = True
# Iterate over all units. Apply alternating color schemes
alt = 1
for unit in self.units:
# Show only units appropriate for the current mode
if common.mode not in unit.modes:
continue
# Negative order means left justified, positive means right justified
if previously_left_justified and unit.order > 0:
# Switch to right justified
formatted.append(block(fg='-', bg='-')) # Reset so the color doesn't fill
formatted.append('%{r}')
previously_left_justified = False
if unit.alt_scheme is None:
# Apply default color scheme, which is alternating
unit.alt_scheme = COLOR_SCHEME.A1 if alt == 1 else COLOR_SCHEME.A2
else:
# Keep alternate color scheme
# INA is a special case
if unit.alt_scheme == COLOR_SCHEME.INA:
unit.alt_scheme = COLOR_SCHEME.A1_INA if alt == 1 else COLOR_SCHEME.A2_INA
formatted.append(unit.format()) # TODO cache last format
alt = alt + 1 if alt + 1 <= 2 else 1
formatted.append(block(fg='-', bg='-')) # Not sure if needed
return ' '.join(formatted)
g_parser = LemonParser()
def block(fg = None, bg = None, font = None, click = None, append=''):
# Remeber that clickable blocks need to be closed
rtn = []
if click is not None:
if click == '':
rtn.append('A')
else:
rtn.append('A:'+click+':')
if fg is not None:
rtn.append('F'+fg)
if bg is not None:
rtn.append('B'+bg)
if font is not None:
rtn.append('T'+font)
return ''.join(['%{', ' '.join(rtn), '}', append])
class COLOR_SCHEME(Enum):
A1 = (config.color_sec_b1, config.color_fore, config.color_icon)
A2 = (config.color_sec_b2, config.color_fore, config.color_icon)
INA = (None, None, None) # This one is special, dynamically changed to A1_INA or A2_INA
A1_INA = (config.color_sec_b1, config.color_disable, config.color_disable)
A2_INA = (config.color_sec_b2, config.color_disable, config.color_disable)
CPU_ALERT= (config.color_cpu, config.color_back, config.color_back)
NET_ALERT= (config.color_net, config.color_back, config.color_back)
SPECIAL = (config.color_head, config.color_back, config.color_back)
def __init__(self, back_color, text_color, icon_color):
self.back_color = back_color
self.text_color = text_color
self.icon_color = icon_color
def format_line():
return g_parser.format()
def parse_line(line_in):
# Parse lines that external programs have written to fifo
for unit in g_parser.units:
if unit.external is not None:
for key,func in unit.external.items():
l = len(key)
if line_in[:l] == key:
func(line_in[l:].split())
break
formatted_line = format_line()
return formatted_line
if __name__ == "__main__":
for line in sys.stdin:
print(parse_line(line))

View File

@@ -1,201 +0,0 @@
#!/usr/bin/env python
# Print i3 workspaces on every change.
#
# Format:
# For every workspace (x = workspace name)
# - "FOCx" -> Focused workspace
# - "INAx" -> Inactive workspace
# - "ACTx" -> Ative workspace
# - "URGx" -> Urgent workspace
#
# Based in wsbar.py en examples dir
#
# 16 feb 2015 - Electro7
import sys, os, signal
import logging
import time
from subprocess import call
import i3ipc
print_stdout = False
class State(object):
# workspace states
focused = 'FOC'
active = 'ACT'
inactive = 'INA'
urgent = 'URG'
def get_state(self, workspace, output):
if workspace.focused:
if output.current_workspace == workspace.name:
return self.focused
else:
return self.active
if workspace.urgent:
return self.urgent
else:
return self.inactive
# Create the i3ws object, then call locking function work()
# Another implementation would be to create a thread in __init__
# and implement a locking join() function
class i3ws(object):
ws_format = '%s%s '
end_format = 'WSP%s'
def __init__(self, logger, state=None):
self.state = State()
self.outputs = []
self.workspaces = []
self.focused_window = None
self.running = True
if state:
self.state = state
self.logger = logger
# Callback functions have argument i3ws
self.change_callbacks = []
self.focus_callbacks = []
self.final_callback = None # Called after all other callbacks
def work(self):
# While loop to restart connection as long as there is an i3 ipc socket
while self.running:
self.resetConn()
self.logger.debug('Started i3 workspaces manager')
self.enterMain()
self.logger.debug('Finished i3 workspaces manager')
def resetConn(self):
def attempt_connection(attempt):
try:
self.conn = i3ipc.Connection()
except FileNotFoundError:
# Thrown when i3 is not up. Should try again in the case of a restart
self.logger.debug('Attempt {}: Failed connecting to i3 socket'.format(attempt))
if attempt > 5:
raise ConnectionError('Could not connect to i3 socked, quitting')
time.sleep(0.1 if attempt < 3 else 0.5)
attempt_connection(attempt + 1)
attempt_connection(attempt=1)
self.outputs = self.conn.get_outputs()
self.conn.on('workspace::focus' , self.change)
self.conn.on('workspace::init' , self.change)
self.conn.on('workspace::empty' , self.change)
self.conn.on('window::focus' , self.win_focused)
self.conn.on('shutdown' , self.shutdown)
# Run call backs once by calling these functions
self.change(self.conn, None)
self.win_focused(self.conn, None)
def enterMain(self):
try:
# Locking call
self.conn.main()
except BrokenPipeError:
self.logger.debug('Broken pipe in i3 workspaces thread, exiting')
except:
self.logger.debug('Unknown exception in i3 workspaces thread, exiting')
def shutdown(self, i3, e):
self.logger.debug('Shut down i3ipc')
def change(self, i3, e):
# Receives event and workspace data
self.outputs = i3.get_outputs()
fmt_outputs = ['DISP']
for output in self.outputs:
if output.active:
fmt_outputs.append(output.name)
self.display(':'.join(fmt_outputs))
self.workspaces = i3.get_workspaces()
text = self.format(self.workspaces, self.outputs)
self.display(text)
# Callbacks
for cb in self.change_callbacks:
cb(self)
if self.final_callback: # TODO ideally this would not be called on change if win_focused comes right after
self.final_callback(self)
def win_focused(self, i3, e):
self.focused_window = i3.get_tree().find_focused()
name = self.focused_window.name
text = 'WIN{}'.format(name)
self.display(text)
# Callbacks
for cb in self.focus_callbacks:
cb(self)
if self.final_callback:
self.final_callback(self)
def format(self, workspaces, outputs):
# Formats the text according to the workspace data given.
# Only important when running in free standing mode
out = ''
for workspace in workspaces:
output = None
for output_ in outputs:
if output_.name == workspace.output:
output = output_
break
if not output:
continue
st = self.state.get_state(workspace, output)
name = workspace.name.replace(" ","___")
item= self.ws_format % (st, name)
out += item
return self.end_format % out
def command(self, cmd):
self.conn.command(cmd)
def display(self, text):
global print_stdout
if print_stdout:
# Displays the text in stout
print(text)
sys.stdout.flush()
def quit(self):
self.logger.debug('Quitting i3 workspace script')
self.running = False
self.conn.main_quit()
def handle_exit(signal, frame):
global ws
print("Recieved Keyboard Interrupt from user")
ws.quit()
sys.exit(1)
if __name__ == '__main__':
# Run as stand-alone
print_stdout = True
# Setup logger to stdout with debug log level
formatter = logging.Formatter('%(asctime)s %(levelname)s %(message)s')
handler = logging.StreamHandler(sys.stdout)
handler.setFormatter(formatter)
logger = logging.getLogger('Normal logger')
logger.setLevel(logging.DEBUG)
logger.addHandler(handler)
# Capture Keyboard Interrupt (i3ipc captures this internally)
signal.signal(signal.SIGINT, handle_exit)
# Start main
ws = i3ws(logger = logger)
ws.work()

View File

@@ -1,24 +0,0 @@
#!/bin/bash
function get_index(){
for i in "${!xra_out[@]}"; do
if [[ "${xra_out[$i]}" = "$value" ]]; then
echo "${i}";
break
fi
done
}
xra_out=($(xrandr | grep ' connected' | awk '{print $1}'))
in=($@)
for i in `seq 0 2 ${#in[@]}`; do
value="${in[$i]}"
idx=$(get_index)
if [ ! -z "$idx" ]; then
screen_idx="$idx"
#"${xra_out[$idx-1]}"
paths[${screen_idx%:}]="--bg-scale ${in[$i+1]}"
fi
done
#echo "${paths[@]}"
str=`printf -v var "%s\n" "${System[@]}"`
sh -c "feh ${paths[@]}"

View File

@@ -1,5 +0,0 @@
#!/bin/bash
if [[ -p /tmp/i3_lemonbar2_$USER ]]; then
echo "$@" > /tmp/i3_lemonbar2_$USER
fi