.dotfiles/bin/sbar_async.py
2024-01-04 10:07:50 +01:00

178 lines
4.3 KiB
Python
Executable File

#!/usr/bin/env python3
import asyncio
import runpy
from datetime import datetime
from os import getenv, getpid
from signal import signal
from subprocess import CalledProcessError, check_output, run
from time import sleep, time
def get_stdout(command: list[str]) -> str:
try:
return check_output(command).decode().rstrip()
except CalledProcessError:
return ""
async def update_crypto():
crypto = runpy.run_path(getenv("HOME", "") + "/bin/crypto")
try:
fees = crypto["get_btc_fees"]()
coins = crypto["get_coins_values"]()
except:
return ""
return f"{fees} {coins}"
async def update_cpu():
loadavg: str
with open("/proc/loadavg") as f:
loadavg = f.read().split()[0]
return f"{loadavg}"
async def update_memory():
kbtot: int
kbavail: int
gbused: float
lines: list[str]
with open("/proc/meminfo") as f:
lines = f.read().split("\n")
kbtot = int(lines[0].split()[1])
kbavail = int(lines[2].split()[1])
gbused = (kbtot - kbavail) / 1024 / 1024
return f"{gbused:.1f}G"
async def update_bat():
dir = "/sys/class/power_supply/BAT0"
status: str
capacity: str
with open(dir + "/status") as f:
status = "" if f.read() == "Charging\n" else ""
with open(dir + "/capacity") as f:
capacity = f.read().rstrip()
return f"{status} {capacity}%"
async def update_vol():
vol = get_stdout(["wpctl", "get-volume", "@DEFAULT_AUDIO_SINK@"])
while not vol:
vol = get_stdout(["wpctl", "get-volume", "@DEFAULT_AUDIO_SINK@"])
sleep(1)
if "MUTED" in vol:
return "🔇"
return f"{int(float(vol.split()[1]) * 100)}"
async def update_wlp():
sig = 0
ssid = ""
lines: list[str]
with open("/proc/net/wireless") as f:
lines = f.read().split("\n")
for l in lines:
if l.startswith("wlp"):
sig = float(l.split()[2])
break
lines = get_stdout(
[
"nmcli",
"-t",
"-f",
"device,name",
"connection",
"show",
"--active",
]
).split("\n")
for l in lines:
if l.startswith("wlp"):
ssid = l.split(":")[1]
# https://gitlab.freedesktop.org/NetworkManager/NetworkManager/-/blob/d9b06a95/src/libnmc-base/nm-client-utils.c#L628
if sig > 56: # 70 is max
return f"▂▄▆█ {ssid}"
if sig > 38:
return f"▂▄▆_ {ssid}"
if sig > 21:
return f"▂▄__ {ssid}"
if sig > 3:
return f"▂___ {ssid}"
return f"____ {ssid}"
async def update_time():
return datetime.now().strftime("%a %m/%d %R")
async def display(*args):
res = []
for arg in args:
try:
res += arg.result()
except:
res += ""
print(res)
s = " | ".join(res)
if getenv("XDG_SESSION_TYPE") == "wayland":
print(s)
else:
run(["xsetroot", "-name", s])
def handler(signum: int, _):
if signum == 34:
asyncio.run(update_vol())
# elif signum == 35:
# pass
asyncio.run(display())
def setup_sig():
cache_dir = getenv("XDG_CACHE_HOME") or f"{getenv('HOME')}/.cache"
with open(cache_dir + "/pidofbar", "w") as f:
f.write(str(getpid()))
# kill -m "$(cat ~/.cache/pidofbar)"
signal(34, handler)
async def main():
sec = 0
crypto = asyncio.create_task(update_crypto())
date = asyncio.create_task(update_time())
cpu = asyncio.create_task(update_cpu())
mem = asyncio.create_task(update_memory())
vol = asyncio.create_task(update_vol())
bat = asyncio.create_task(update_bat())
wlp = asyncio.create_task(update_wlp())
setup_sig()
while True:
if not sec % 300:
crypto = asyncio.create_task(update_crypto())
if not sec % 10:
date = asyncio.create_task(update_time())
cpu = asyncio.create_task(update_cpu())
mem = asyncio.create_task(update_memory())
bat = asyncio.create_task(update_bat())
wlp = asyncio.create_task(update_wlp())
await display(crypto, cpu, mem, vol, wlp, bat, date)
sec += 10
sleep(10 - time() % 10)
if __name__ == "__main__":
asyncio.run(main())