feat: replaced RPi.GPIO library with gpiozero for Raspberry 5 compatibility

This commit is contained in:
2024-01-21 13:11:38 +01:00
parent b4a24e3956
commit 98afafd0e4
2 changed files with 33 additions and 20 deletions

View File

@@ -24,4 +24,6 @@ Comunicación VoIP entre dos raspberrys.
6. Añadir intercom en /etc/hosts
7. Copiar intercom.py e intercom.yml en la home. Editar intercom.yml
CONFIGURAR ALSA?
## Avoid PulseAudio for using parole directly
`pasuspender -- parole -c rba -d plughw:0,0``

View File

@@ -1,4 +1,6 @@
import RPi.GPIO as GPIO
from gpiozero import LED
from gpiozero import Button
import subprocess
from threading import Thread
import socket
@@ -32,22 +34,22 @@ class CallManager:
s.close()
self.state = OUTGOING_CALL
self.station = station
GPIO.output(station['red led'], GPIO.HIGH)
station['red'].on()
except Exception as e:
print('Error al realizar la llamada: ', e)
GPIO.output(station['green led'], GPIO.LOW)
station['green'].off()
self.call_process.terminate()
self.call_process.communicate()
def incoming_call(self, station):
self.state = INCOMING_CALL
self.station = station
GPIO.output(station['red led'], GPIO.HIGH)
station['red'].on()
def hang(self):
print('Hang! self.station', self.station)
if self.station is not None:
GPIO.output(self.station['red led'], GPIO.LOW)
self.station['red'].off()
if self.call_process is not None:
self.call_process.terminate()
self.call_process.communicate()
@@ -96,7 +98,7 @@ def listen(call_manager):
call_manager.incoming_call(station)
clientsocket.send(OP_OK)
elif data == OP_PING:
GPIO.output(station['green led'], GPIO.HIGH)
station['green'].on()
clientsocket.send(OP_OK)
else:
clientsocket.send(OP_ERROR)
@@ -111,13 +113,13 @@ def check_status(station):
s.connect((station['host'], station['port']))
s.send(OP_PING)
if s.recv(1024) == OP_OK:
GPIO.output(station['green led'], GPIO.HIGH)
station['green'].on()
else:
GPIO.output(station['green led'], GPIO.LOW)
GPIO.output(station['red led'], GPIO.LOW)
station['red'].off()
station['green'].off()
except socket.error:
GPIO.output(station['green led'], GPIO.LOW)
GPIO.output(station['red led'], GPIO.LOW)
station['red'].off()
station['green'].off()
print('Check status failed! host=', station['host'])
finally:
s.close()
@@ -127,7 +129,6 @@ def check_status(station):
config = yaml.safe_load(open("intercom.yml"))
port = config['port']
stations = config['stations']
GPIO.setmode(GPIO.BCM)
# Start thread to wait for incoming hang petitions.
thread_listen = Thread(target=listen, args=(callManager,))
@@ -137,12 +138,18 @@ thread_listen.start()
# Power off leds and start threads to check other intercoms status.
for _station in stations:
_station['ip'] = socket.gethostbyname(_station['host'])
GPIO.setup(_station['green led'], GPIO.OUT)
GPIO.setup(_station['red led'], GPIO.OUT)
GPIO.setup(_station['button'], GPIO.IN, pull_up_down=GPIO.PUD_UP)
_station['old button state'] = GPIO.input(_station['button'])
_station['red'] = LED(_station['red led'])
_station['red'].off()
_station['green'] = LED(_station['green led'])
_station['green'].off()
_station['btn'] = Button(_station['button'])
_station['old button state'] = _station['btn'].is_pressed
_station['button state changed time'] = time.clock_gettime(time.CLOCK_MONOTONIC)
_station['button pressed'] = False
thread_check = Thread(target=check_status, args=(_station,))
thread_check.daemon = True
thread_check.start()
@@ -151,10 +158,11 @@ for _station in stations:
try:
while True:
for _station in stations:
new_input_state = GPIO.input(_station['button'])
# new_input_state = GPIO.input(_station['button'])
new_input_state = station['btn'].is_pressed
# Al pulsar el interruptor cambia el estado.
elapsed_time = time.clock_gettime(time.CLOCK_MONOTONIC) - _station['button state changed time']
if new_input_state != _station['old button state'] and elapsed_time > 0.05:
if new_input_state != _station['old button state'] and elapsed_time > 0.2:
if not _station['toogle button']:
_station['button pressed'] = not _station['button pressed']
if _station['button pressed']:
@@ -176,4 +184,7 @@ try:
_station['button state changed time'] = time.clock_gettime(time.CLOCK_MONOTONIC)
time.sleep(0.02)
finally:
GPIO.cleanup()
for _station in stations:
_station['red'].close()
_station['green'].close()
_station['btn'].close()