import threading import subprocess, os import re # regexp from enum import Enum import i3_lemonbar_config as config kill_on_unfocus = [] # Loggers, initialized in main function logger = None health_logger = None bar_in_shelf = None # TODO bar mode can be moved to i3Module class bar_mode(Enum): power, normal, control = range(-1,2) # Don't cycle through power def cycle(self): try: return bar_mode(self.value + 1) except ValueError: return bar_mode(0) mode = bar_mode.normal # Helper functions def hostname(): return os.uname()[1] ansi_escape = re.compile(r'(\x9B|\x1B\[)[0-?]*[ -/]*[@-~]') def strip_ansi_unicode(s): # ANSI escape sequences are for colors in terminal and similar strip_ansi = ansi_escape.sub('', s) strip_unicode = (strip_ansi.encode('ascii', 'ignore')).decode('utf-8') return strip_unicode def create_new_fifo(fifo_file): """ Create new fifo file, removing old one if it exists """ try: os.remove(fifo_file) logger.debug('''Removed old fifo file''') except OSError: logger.debug('''No old fifo file, good''') try: os.mkfifo(fifo_file) except OSError: logger.error('''Failed, couldn't create fifo file''') os.remove(config.pid_file) # Clean up own PID file sys.exit(0) class Shelf(): # A one-slot queue implemented with locks and condition objects def __init__(self, item = None): self.lock = threading.Lock() self.item_ready = threading.Condition(self.lock) self.item = item def get(self): # Consumes item self.item_ready.acquire() if self.item is None: # Wait until becomes ready self.item_ready.wait() ret = self.item self.item = None self.item_ready.release() return ret def put(self, item): # Discards previous item self.item_ready.acquire() self.item = item self.item_ready.notify() self.item_ready.release()