Files
intercom/call_manager.py

68 lines
1.8 KiB
Python

import subprocess
import socket
NO_CALL = 0
INCOMING_CALL = 1
OUTGOING_CALL = 2
OP_CALL = 'CALL'.encode()
OP_HANG = 'HANG'.encode()
OP_PING = 'PING'.encode()
OP_OK = 'OK'.encode()
OP_ERROR = 'ERROR'.encode()
def exec_mumble(deaf: bool):
subprocess.Popen(['mumble', 'rpc', 'deaf' if deaf else 'undeaf'])
def deaf_mumble():
exec_mumble(true)
def undeaf_mumble():
exec_mumble(false)
class CallManager:
def __init__(self, config):
self.config = config
self.state = NO_CALL
self.station = None
def call(self, station):
try:
undeaf_mumble()
s = socket.socket()
print('host=', station['host'])
print('port=', station['port'])
s.connect((station['host'], station['port']))
s.send(OP_CALL)
response = s.recv(1024)
print('Call notice sent', 'response=', response.decode())
s.close()
self.state = OUTGOING_CALL
self.station = station
station['red'].on()
except Exception as e:
print('Error al realizar la llamada: ', e)
station['green'].off()
def incoming_call(self, station):
undeaf_mumble()
self.state = INCOMING_CALL
self.station = station
station['red'].on()
def hang(self):
print('Hang! self.station', self.station)
if self.station is not None:
self.station['red'].off()
deaf_mumble()
self.state = NO_CALL
self.station = None
def notify_hang(self):
if self.station is not None:
s = socket.socket()
s.connect((self.station['host'], self.station['port']))
s.send(OP_HANG)
response = s.recv(1024)
print('Hang notify sent', 'response=', response.decode())
s.close()