from gpiozero import LED from gpiozero import Button from threading import Thread import socket import yaml import time import subprocess from call_manager import ( CallManager, NO_CALL, INCOMING_CALL, OUTGOING_CALL, OP_CALL, OP_HANG, OP_PING, OP_OK, OP_ERROR, ) CHECK_STATUS_INTERVAL = 120 config = yaml.safe_load(open("intercom.yml")) name = config["name"] port = config["port"] stations = config["stations"] callManager = CallManager(config) def getstationbyip(ip): for s in stations: if s["ip"] == ip: return s return None # Escuchamos en port para peticiones de operaciones. serversocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) def listen(call_manager): serversocket.bind(("0.0.0.0", port)) print("Listening address", socket.gethostname(), "on port", port) serversocket.listen(5) while True: # accept connections from outside (clientsocket, ip_address) = serversocket.accept() data = clientsocket.recv(1024) print( "Connection from", ip_address, "host=", clientsocket.getpeername(), "data=", data.decode(), ) station = getstationbyip(ip_address[0]) if data == OP_HANG: call_manager.hang() clientsocket.send(OP_OK) elif data == OP_CALL: if callManager.state == OUTGOING_CALL or callManager.state == INCOMING_CALL: callManager.hang() call_manager.incoming_call(station) clientsocket.send(OP_OK) elif data == OP_PING: station["green"].on() clientsocket.send(OP_OK) else: clientsocket.send(OP_ERROR) clientsocket.close() # Comprueba que el otro intercom esta activo. def check_status(station): while True: s = socket.socket() try: s.connect((station["host"], station["port"])) s.send(OP_PING) if s.recv(1024) == OP_OK: station["green"].on() else: station["red"].off() station["green"].off() except socket.error: station["red"].off() station["green"].off() print("Check status failed! host=", station["host"]) finally: s.close() time.sleep(CHECK_STATUS_INTERVAL) # Start mumble client subprocess.Popen(["xvfb-run", "-n", "1", "mumble", "mumble://" + name + "@miki"]) # Mute mumble callManager.deaf_mumble() # Start thread to wait for incoming hang petitions. thread_listen = Thread(target=listen, args=(callManager,)) thread_listen.daemon = True thread_listen.start() # Power off leds and start threads to check other intercoms status. for _station in stations: _station["ip"] = socket.gethostbyname(_station["host"]) _station["red"] = LED(_station["red led"]) _station["red"].off() _station["green"] = LED(_station["green led"]) _station["green"].off() _station["btn"] = Button(_station["button"]) _station["old button state"] = _station["btn"].is_pressed _station["button state changed time"] = time.clock_gettime(time.CLOCK_MONOTONIC) _station["button pressed"] = False thread_check = Thread(target=check_status, args=(_station,)) thread_check.daemon = True thread_check.start() # Listen for swith changes. try: while True: for _station in stations: new_input_state = _station["btn"].is_pressed # Al pulsar el interruptor cambia el estado. elapsed_time = ( time.clock_gettime(time.CLOCK_MONOTONIC) - _station["button state changed time"] ) if new_input_state != _station["old button state"] and elapsed_time > 0.2: if not _station["toogle button"]: _station["button pressed"] = not _station["button pressed"] if _station["button pressed"]: _station["old button state"] = new_input_state _station["button state changed time"] = time.clock_gettime( time.CLOCK_MONOTONIC ) continue print( "Interruptor pulsado callManager.state=", callManager.state, "elapsed_time", elapsed_time, ) # Si no hay llamada en curso la hacemos. if callManager.state == NO_CALL: callManager.call(_station) elif callManager.state == OUTGOING_CALL: callManager.notify_hang() callManager.hang() elif callManager.state == INCOMING_CALL: callManager.notify_hang() callManager.hang() _station["old button state"] = new_input_state _station["button state changed time"] = time.clock_gettime( time.CLOCK_MONOTONIC ) time.sleep(0.02) finally: print("Close listeing port ", port) serversocket.close() for _station in stations: _station["red"].close() _station["green"].close() _station["btn"].close()