69 lines
1.9 KiB
Python
69 lines
1.9 KiB
Python
import subprocess
|
|
import socket
|
|
|
|
NO_CALL = 0
|
|
INCOMING_CALL = 1
|
|
OUTGOING_CALL = 2
|
|
OP_CALL = 'CALL'.encode()
|
|
OP_HANG = 'HANG'.encode()
|
|
OP_PING = 'PING'.encode()
|
|
OP_OK = 'OK'.encode()
|
|
OP_ERROR = 'ERROR'.encode()
|
|
|
|
def exec_mumble(deaf: bool):
|
|
subprocess.run(['xvfb-run', 'mumble', 'rpc', 'deaf' if deaf else 'undeaf'])
|
|
subprocess.run(['xvfb-run', 'mumble', 'rpc', 'mute' if deaf else 'unmute'])
|
|
|
|
def deaf_mumble():
|
|
exec_mumble(True)
|
|
|
|
def undeaf_mumble():
|
|
exec_mumble(False)
|
|
|
|
class CallManager:
|
|
def __init__(self, config):
|
|
self.config = config
|
|
self.state = NO_CALL
|
|
self.station = None
|
|
|
|
def call(self, station):
|
|
try:
|
|
undeaf_mumble()
|
|
s = socket.socket()
|
|
print('host=', station['host'])
|
|
print('port=', station['port'])
|
|
s.connect((station['host'], station['port']))
|
|
s.send(OP_CALL)
|
|
response = s.recv(1024)
|
|
print('Call notice sent', 'response=', response.decode())
|
|
s.close()
|
|
self.state = OUTGOING_CALL
|
|
self.station = station
|
|
station['red'].on()
|
|
except Exception as e:
|
|
print('Error al realizar la llamada: ', e)
|
|
station['green'].off()
|
|
|
|
def incoming_call(self, station):
|
|
undeaf_mumble()
|
|
self.state = INCOMING_CALL
|
|
self.station = station
|
|
station['red'].on()
|
|
|
|
def hang(self):
|
|
print('Hang! self.station', self.station)
|
|
if self.station is not None:
|
|
self.station['red'].off()
|
|
deaf_mumble()
|
|
self.state = NO_CALL
|
|
self.station = None
|
|
|
|
def notify_hang(self):
|
|
if self.station is not None:
|
|
s = socket.socket()
|
|
s.connect((self.station['host'], self.station['port']))
|
|
s.send(OP_HANG)
|
|
response = s.recv(1024)
|
|
print('Hang notify sent', 'response=', response.decode())
|
|
s.close()
|