import subprocess import socket NO_CALL = 0 INCOMING_CALL = 1 OUTGOING_CALL = 2 OP_CALL = "CALL".encode() OP_HANG = "HANG".encode() OP_PING = "PING".encode() OP_OK = "OK".encode() OP_ERROR = "ERROR".encode() def exec_mumble(deaf: bool): subprocess.run( ["xvfb-run", "-n", "2", "mumble", "rpc", "deaf" if deaf else "undeaf"] ) subprocess.run( ["xvfb-run", "-n", "3", "mumble", "rpc", "mute" if deaf else "unmute"] ) class CallManager: def __init__(self, config): self.config = config self.state = NO_CALL self.station = None def call(self, station): try: s = socket.socket() print("host=", station["host"]) print("port=", station["port"]) s.connect((station["host"], station["port"])) s.send(OP_CALL) response = s.recv(1024) print("Call notice sent", "response=", response.decode()) s.close() self.undeaf_mumble() self.state = OUTGOING_CALL self.station = station station["red"].on() except Exception as e: print("Error al realizar la llamada: ", e) station["green"].off() def incoming_call(self, station): self.undeaf_mumble() self.state = INCOMING_CALL self.station = station station["red"].on() def hang(self): print("Hang! self.station", self.station) if self.station is not None: self.station["red"].off() self.deaf_mumble() self.state = NO_CALL self.station = None def notify_hang(self): if self.station is not None: s = socket.socket() s.connect((self.station["host"], self.station["port"])) s.send(OP_HANG) response = s.recv(1024) print("Hang notify sent", "response=", response.decode()) s.close() def deaf_mumble(self): exec_mumble(True) def undeaf_mumble(self): exec_mumble(False)