Files
intercom/call_manager.py

77 lines
2.0 KiB
Python

import subprocess
import socket
NO_CALL = 0
INCOMING_CALL = 1
OUTGOING_CALL = 2
OP_CALL = "CALL".encode()
OP_HANG = "HANG".encode()
OP_PING = "PING".encode()
OP_OK = "OK".encode()
OP_ERROR = "ERROR".encode()
def exec_mumble(deaf: bool):
subprocess.run(
["xvfb-run", "-n", "1", "mumble", "rpc", "deaf" if deaf else "undeaf"]
)
subprocess.run(
["xvfb-run", "-n", "2", "mumble", "rpc", "mute" if deaf else "unmute"]
)
def deaf_mumble():
exec_mumble(True)
def undeaf_mumble():
exec_mumble(False)
class CallManager:
def __init__(self, config):
self.config = config
self.state = NO_CALL
self.station = None
def call(self, station):
try:
s = socket.socket()
print("host=", station["host"])
print("port=", station["port"])
s.connect((station["host"], station["port"]))
s.send(OP_CALL)
response = s.recv(1024)
print("Call notice sent", "response=", response.decode())
s.close()
undeaf_mumble()
self.state = OUTGOING_CALL
self.station = station
station["red"].on()
except Exception as e:
print("Error al realizar la llamada: ", e)
station["green"].off()
def incoming_call(self, station):
undeaf_mumble()
self.state = INCOMING_CALL
self.station = station
station["red"].on()
def hang(self):
print("Hang! self.station", self.station)
if self.station is not None:
self.station["red"].off()
deaf_mumble()
self.state = NO_CALL
self.station = None
def notify_hang(self):
if self.station is not None:
s = socket.socket()
s.connect((self.station["host"], self.station["port"]))
s.send(OP_HANG)
response = s.recv(1024)
print("Hang notify sent", "response=", response.decode())
s.close()