feat: intercom for Raspberry 4 and below (GPIO library)

This commit is contained in:
2024-01-21 12:43:57 +01:00
commit b4a24e3956
6 changed files with 250 additions and 0 deletions

1
.gitignore vendored Normal file
View File

@@ -0,0 +1 @@
.idea/

27
README.md Normal file
View File

@@ -0,0 +1,27 @@
# Intercom
Comunicación VoIP entre dos raspberrys.
## How to
1. Copiar Raspbian Lite en el SD usando Balena Etcher.
2. Crear un archivo en la partición boot con el nombre ssh para activar SSHD.
3. Loguear en el raspberry (pass por defecto: raspberry) y ejecutar:
sudo systemctl enable ssh
sudo apt install libopus-dev build-essential libasound2-dev libogg-dev python3-gpiozero python3-pip
sudo pip3 install pyyaml
wget http://holdenc.altervista.org/parole/downloads/parole-010beta4.tgz
tar xf parole-010beta4.tgz
cd parole-010beta4/
make
sudo cp parole /usr/bin/
4. Crear /etc/systemd/system/parole.service y activarlo.
sudo chown root:root /etc/systemd/system/parole.service
sudo systemctl enable parole
5. Crear /etc/systemd/system/intercom.service y activarlo.
sudo chown root:root /etc/systemd/system/intercom.service
sudo systemctl enable intercom
6. Añadir intercom en /etc/hosts
7. Copiar intercom.py e intercom.yml en la home. Editar intercom.yml
CONFIGURAR ALSA?

179
intercom.py Normal file
View File

@@ -0,0 +1,179 @@
import RPi.GPIO as GPIO
import subprocess
from threading import Thread
import socket
import yaml
import time
NO_CALL = 0
INCOMING_CALL = 1
OUTGOING_CALL = 2
OP_CALL = 'CALL'.encode()
OP_HANG = 'HANG'.encode()
OP_PING = 'PING'.encode()
OP_OK = 'OK'.encode()
OP_ERROR = 'ERROR'.encode()
CHECK_STATUS_INTERVAL = 120
class CallManager:
state = NO_CALL
station = None
call_process = None
def call(self, station):
try:
self.call_process = subprocess.Popen(['parole', '-c', station['host'], '-d', config['audio device']])
s = socket.socket()
s.connect((station['host'], station['port']))
s.send(OP_CALL)
response = s.recv(1024)
print('Call notice sent', 'response=', response.decode())
s.close()
self.state = OUTGOING_CALL
self.station = station
GPIO.output(station['red led'], GPIO.HIGH)
except Exception as e:
print('Error al realizar la llamada: ', e)
GPIO.output(station['green led'], GPIO.LOW)
self.call_process.terminate()
self.call_process.communicate()
def incoming_call(self, station):
self.state = INCOMING_CALL
self.station = station
GPIO.output(station['red led'], GPIO.HIGH)
def hang(self):
print('Hang! self.station', self.station)
if self.station is not None:
GPIO.output(self.station['red led'], GPIO.LOW)
if self.call_process is not None:
self.call_process.terminate()
self.call_process.communicate()
self.state = NO_CALL
self.station = None
def notify_hang(self):
if self.station is not None:
s = socket.socket()
s.connect((self.station['host'], self.station['port']))
s.send(OP_HANG)
response = s.recv(1024)
print('Hang notify sent', 'response=', response.decode())
s.close()
callManager = CallManager()
def getstationbyip(ip):
for s in stations:
if s['ip'] == ip:
return s
return None
# Escuchamos en port para peticiones de operaciones.
def listen(call_manager):
serversocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
serversocket.bind(('0.0.0.0', port))
print('Listening address', socket.gethostname(), 'on port', port)
serversocket.listen(5)
while True:
# accept connections from outside
(clientsocket, ip_address) = serversocket.accept()
data = clientsocket.recv(1024)
print('Connection from', ip_address, 'host=', clientsocket.getpeername(), 'data=', data.decode())
station = getstationbyip(ip_address[0])
if data == OP_HANG:
call_manager.hang()
clientsocket.send(OP_OK)
elif data == OP_CALL:
if callManager.state == OUTGOING_CALL or callManager.state == INCOMING_CALL:
callManager.hang()
call_manager.incoming_call(station)
clientsocket.send(OP_OK)
elif data == OP_PING:
GPIO.output(station['green led'], GPIO.HIGH)
clientsocket.send(OP_OK)
else:
clientsocket.send(OP_ERROR)
clientsocket.close()
# Comprueba que el otro intercom esta activo.
def check_status(station):
while True:
s = socket.socket()
try:
s.connect((station['host'], station['port']))
s.send(OP_PING)
if s.recv(1024) == OP_OK:
GPIO.output(station['green led'], GPIO.HIGH)
else:
GPIO.output(station['green led'], GPIO.LOW)
GPIO.output(station['red led'], GPIO.LOW)
except socket.error:
GPIO.output(station['green led'], GPIO.LOW)
GPIO.output(station['red led'], GPIO.LOW)
print('Check status failed! host=', station['host'])
finally:
s.close()
time.sleep(CHECK_STATUS_INTERVAL)
config = yaml.safe_load(open("intercom.yml"))
port = config['port']
stations = config['stations']
GPIO.setmode(GPIO.BCM)
# Start thread to wait for incoming hang petitions.
thread_listen = Thread(target=listen, args=(callManager,))
thread_listen.daemon = True
thread_listen.start()
# Power off leds and start threads to check other intercoms status.
for _station in stations:
_station['ip'] = socket.gethostbyname(_station['host'])
GPIO.setup(_station['green led'], GPIO.OUT)
GPIO.setup(_station['red led'], GPIO.OUT)
GPIO.setup(_station['button'], GPIO.IN, pull_up_down=GPIO.PUD_UP)
_station['old button state'] = GPIO.input(_station['button'])
_station['button state changed time'] = time.clock_gettime(time.CLOCK_MONOTONIC)
_station['button pressed'] = False
thread_check = Thread(target=check_status, args=(_station,))
thread_check.daemon = True
thread_check.start()
# Listen for swith changes.
try:
while True:
for _station in stations:
new_input_state = GPIO.input(_station['button'])
# Al pulsar el interruptor cambia el estado.
elapsed_time = time.clock_gettime(time.CLOCK_MONOTONIC) - _station['button state changed time']
if new_input_state != _station['old button state'] and elapsed_time > 0.05:
if not _station['toogle button']:
_station['button pressed'] = not _station['button pressed']
if _station['button pressed']:
_station['old button state'] = new_input_state
_station['button state changed time'] = time.clock_gettime(time.CLOCK_MONOTONIC)
continue
print('Interruptor pulsado callManager.state=', callManager.state, 'elapsed_time', elapsed_time)
# Si no hay llamada en curso la hacemos.
if callManager.state == NO_CALL:
callManager.call(_station)
elif callManager.state == OUTGOING_CALL:
callManager.notify_hang()
callManager.hang()
elif callManager.state == INCOMING_CALL:
callManager.notify_hang()
callManager.hang()
_station['old button state'] = new_input_state
_station['button state changed time'] = time.clock_gettime(time.CLOCK_MONOTONIC)
time.sleep(0.02)
finally:
GPIO.cleanup()

14
intercom.service Normal file
View File

@@ -0,0 +1,14 @@
[Unit]
Description=Intercom
After=network.target
[Service]
ExecStart=/usr/bin/python3 -u intercom.py
WorkingDirectory=/home/pi
StandardOutput=inherit
StandardError=inherit
Restart=always
User=root
[Install]
WantedBy=multi-user.target

15
intercom.yml Normal file
View File

@@ -0,0 +1,15 @@
port: 8111
audio device: plughw:1,0
stations:
- host: rba
port: 8111
button: 25
toogle button: False
red led: 26
green led: 12
- host: kuku
port: 8111
button: 18
toogle button: False
red led: 26
green led: 2

14
parole.service Normal file
View File

@@ -0,0 +1,14 @@
[Unit]
Description=Parole
After=network.target
[Service]
ExecStart=/usr/bin/parole -la -d plughw:1,0
WorkingDirectory=/home/pi
StandardOutput=inherit
StandardError=inherit
Restart=always
User=pi
[Install]
WantedBy=multi-user.target