77 lines
2.0 KiB
Python
77 lines
2.0 KiB
Python
import subprocess
|
|
import socket
|
|
|
|
NO_CALL = 0
|
|
INCOMING_CALL = 1
|
|
OUTGOING_CALL = 2
|
|
OP_CALL = "CALL".encode()
|
|
OP_HANG = "HANG".encode()
|
|
OP_PING = "PING".encode()
|
|
OP_OK = "OK".encode()
|
|
OP_ERROR = "ERROR".encode()
|
|
|
|
|
|
def exec_mumble(deaf: bool):
|
|
subprocess.run(
|
|
["xvfb-run", "-n", "2", "mumble", "rpc", "deaf" if deaf else "undeaf"]
|
|
)
|
|
subprocess.run(
|
|
["xvfb-run", "-n", "3", "mumble", "rpc", "mute" if deaf else "unmute"]
|
|
)
|
|
|
|
|
|
def deaf_mumble():
|
|
exec_mumble(True)
|
|
|
|
|
|
def undeaf_mumble():
|
|
exec_mumble(False)
|
|
|
|
|
|
class CallManager:
|
|
def __init__(self, config):
|
|
self.config = config
|
|
self.state = NO_CALL
|
|
self.station = None
|
|
|
|
def call(self, station):
|
|
try:
|
|
s = socket.socket()
|
|
print("host=", station["host"])
|
|
print("port=", station["port"])
|
|
s.connect((station["host"], station["port"]))
|
|
s.send(OP_CALL)
|
|
response = s.recv(1024)
|
|
print("Call notice sent", "response=", response.decode())
|
|
s.close()
|
|
undeaf_mumble()
|
|
self.state = OUTGOING_CALL
|
|
self.station = station
|
|
station["red"].on()
|
|
except Exception as e:
|
|
print("Error al realizar la llamada: ", e)
|
|
station["green"].off()
|
|
|
|
def incoming_call(self, station):
|
|
undeaf_mumble()
|
|
self.state = INCOMING_CALL
|
|
self.station = station
|
|
station["red"].on()
|
|
|
|
def hang(self):
|
|
print("Hang! self.station", self.station)
|
|
if self.station is not None:
|
|
self.station["red"].off()
|
|
deaf_mumble()
|
|
self.state = NO_CALL
|
|
self.station = None
|
|
|
|
def notify_hang(self):
|
|
if self.station is not None:
|
|
s = socket.socket()
|
|
s.connect((self.station["host"], self.station["port"]))
|
|
s.send(OP_HANG)
|
|
response = s.recv(1024)
|
|
print("Hang notify sent", "response=", response.decode())
|
|
s.close()
|