Source code for pengutils.utils.util_events

import socket
from os import environ as env
from os.path import join
from typing import Optional
import json


[docs] def in_container() -> bool: return env.get("PENGUIN_PROJECT_DIR", None) is not None
[docs] def get_default_socket_path() -> str: if in_container(): return join(env["PENGUIN_PROJECT_DIR"], "results", "latest", "remotectrl.sock") else: raise Exception("Socket path required")
[docs] def send_command(data: dict = None, sock: str = None) -> Optional[dict]: with socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) as c: c.connect(sock) c.sendall(json.dumps(data).encode()) c.shutdown(socket.SHUT_WR) data = b"" while True: chunk = c.recv(4096) if not chunk: break data += chunk if not data: return try: resp = json.loads(data.decode()) except json.JSONDecodeError: print("Error decoding response:", data.decode()) return return resp