This commit is contained in:
2025-09-27 05:00:20 +02:00
parent 8f063ec9dc
commit d2c4ff9f0b
4 changed files with 422 additions and 0 deletions

52
check_ir_status.py Normal file
View File

@@ -0,0 +1,52 @@
#!/usr/bin/env python3
"""
Check IR listener status on remote system
"""
import subprocess
import sys
def check_ir_status():
"""Check if IR listener is running"""
try:
# Check if process is running
result = subprocess.run([
"ssh", "tulivision@192.168.1.137",
"ps aux | grep simple_ir_listener_polling | grep -v grep"
], capture_output=True, text=True)
if result.returncode == 0 and result.stdout.strip():
print("✅ IR Listener is RUNNING")
print("Process details:")
print(result.stdout.strip())
# Check GPIO status
gpio_result = subprocess.run([
"ssh", "tulivision@192.168.1.137",
"python3 -c 'import RPi.GPIO as GPIO; GPIO.setmode(GPIO.BCM); print(f\"GPIO 18 state: {GPIO.input(18)}\")'"
], capture_output=True, text=True)
if gpio_result.returncode == 0:
print(f"GPIO Status: {gpio_result.stdout.strip()}")
else:
print("❌ IR Listener is NOT running")
# Try to start it
print("Attempting to start IR listener...")
start_result = subprocess.run([
"ssh", "tulivision@192.168.1.137",
"cd /home/tulivision/rpi-tulivision && nohup python3 simple_ir_listener_polling.py > ir_listener.log 2>&1 &"
], capture_output=True, text=True)
if start_result.returncode == 0:
print("✅ IR Listener started successfully")
else:
print("❌ Failed to start IR listener")
print(start_result.stderr)
except Exception as e:
print(f"Error checking status: {e}")
if __name__ == "__main__":
check_ir_status()

42
monitor_ir_output.py Normal file
View File

@@ -0,0 +1,42 @@
#!/usr/bin/env python3
"""
Monitor IR listener output
"""
import subprocess
import sys
import time
def monitor_ir_listener():
"""Monitor the IR listener process"""
try:
# Connect to the remote system and monitor the IR listener
cmd = [
"ssh", "tulivision@192.168.1.137",
"cd /home/tulivision/rpi-tulivision && python3 simple_ir_listener_polling.py"
]
print("Starting IR listener monitoring...")
print("The IR listener is now running on the Raspberry Pi.")
print("Point your IR remote at the IR receiver and press buttons.")
print("Press Ctrl+C to stop monitoring.")
print("=" * 60)
# Start the process
process = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT,
universal_newlines=True, bufsize=1)
# Monitor output
for line in iter(process.stdout.readline, ''):
print(line.rstrip())
sys.stdout.flush()
except KeyboardInterrupt:
print("\nStopping IR listener...")
process.terminate()
process.wait()
except Exception as e:
print(f"Error: {e}")
if __name__ == "__main__":
monitor_ir_listener()

View File

@@ -0,0 +1,297 @@
#!/usr/bin/env python3
"""
Simple IR Remote Listener with Polling (No Edge Detection)
Listens to IR commands using polling method instead of edge detection
"""
import os
import sys
import json
import time
import logging
import threading
from pathlib import Path
from typing import Dict, List, Optional, Tuple
import RPi.GPIO as GPIO
class SimpleIRListenerPolling:
"""Simple IR Remote Listener using Polling Method"""
def __init__(self, gpio_pin: int = 18):
self.gpio_pin = gpio_pin
self.logger = self._setup_logging()
self.running = False
self.last_state = GPIO.HIGH
self.pulse_start = 0
self.pulses = []
def _setup_logging(self) -> logging.Logger:
"""Setup logging for the listener"""
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
# Create console handler
handler = logging.StreamHandler()
formatter = logging.Formatter(
'%(asctime)s - %(levelname)s - %(message)s'
)
handler.setFormatter(formatter)
logger.addHandler(handler)
return logger
def display_startup_info(self):
"""Display startup information"""
print("=" * 60)
print("SIMPLE IR REMOTE LISTENER (POLLING MODE)")
print("=" * 60)
print(f"GPIO Pin: {self.gpio_pin}")
print("Method: Polling (no edge detection)")
print("=" * 60)
print("LISTENING FOR IR COMMANDS...")
print("Press Ctrl+C to exit")
print("=" * 60)
print()
def setup_gpio(self):
"""Setup GPIO for IR receiver"""
try:
GPIO.setmode(GPIO.BCM)
GPIO.setup(self.gpio_pin, GPIO.IN, pull_up_down=GPIO.PUD_UP)
self.logger.info(f"GPIO setup complete on pin {self.gpio_pin}")
return True
except Exception as e:
self.logger.error(f"GPIO setup failed: {e}")
return False
def decode_nec(self, pulses: List[float]) -> Optional[str]:
"""Simple NEC protocol decoder"""
if len(pulses) < 2:
return None
# Check for repeat code (2 pulses)
if len(pulses) == 2:
if 8000 <= pulses[0] <= 10000 and 2000 <= pulses[1] <= 2500:
return "REPEAT"
# Check for normal NEC frame (34 pulses)
if len(pulses) != 34:
return None
# Check header
if not (8000 <= pulses[0] <= 10000 and 4000 <= pulses[1] <= 5000):
return None
# Decode address and command
address = 0
command = 0
for i in range(2, 34, 2):
pulse_time = pulses[i]
space_time = pulses[i + 1]
# Check if it's a valid bit pulse
if not (400 <= pulse_time <= 700):
return None
bit_index = (i - 2) // 2
if 1400 <= space_time <= 2000: # Bit 1
if bit_index < 16:
address |= (1 << bit_index)
else:
command |= (1 << (bit_index - 16))
elif 400 <= space_time <= 700: # Bit 0
pass
else:
return None
return f"NEC_{address:04X}_{command:04X}"
def decode_rc5(self, pulses: List[float]) -> Optional[str]:
"""Simple RC5 protocol decoder"""
if len(pulses) < 14:
return None
# RC5 uses Manchester encoding
bits = []
for i in range(0, min(len(pulses), 28), 2):
if i + 1 < len(pulses):
if pulses[i] < 1000 and pulses[i + 1] < 1000: # Short-short = 0
bits.append(0)
elif pulses[i] > 1000 and pulses[i + 1] > 1000: # Long-long = 1
bits.append(1)
else:
return None
if len(bits) < 14:
return None
# Extract fields
start_bits = bits[0:2]
toggle = bits[2]
address = bits[3:8]
command = bits[8:14]
if start_bits != [1, 1]:
return None
addr_val = sum(bit << (4 - i) for i, bit in enumerate(address))
cmd_val = sum(bit << (5 - i) for i, bit in enumerate(command))
return f"RC5_{addr_val:02X}_{cmd_val:02X}_{toggle}"
def handle_ir_command(self, ir_code: str):
"""Handle received IR command"""
timestamp = time.strftime("%H:%M:%S")
# Print command information
print(f"[{timestamp}] IR Command Received: {ir_code}")
# Try to load mapping if available
mapping = self.load_ir_mapping()
if ir_code in mapping:
mapped_command = mapping[ir_code]
if isinstance(mapped_command, dict):
print(f" Mapped Command: {mapped_command.get('command', 'unknown')}")
if mapped_command.get('description'):
print(f" Description: {mapped_command['description']}")
else:
print(f" Mapped Command: {mapped_command}")
else:
print(f" Mapped Command: UNKNOWN (not in mapping)")
print()
def load_ir_mapping(self) -> Dict:
"""Load IR code mapping from file if available"""
mapping_files = [
"/home/tulivision/rpi-tulivision/ir_mapping.json",
"/etc/video_player/ir_mapping.json",
"ir_mapping.json"
]
for mapping_file in mapping_files:
if os.path.exists(mapping_file):
try:
with open(mapping_file, 'r') as f:
return json.load(f)
except Exception as e:
self.logger.debug(f"Could not load mapping from {mapping_file}: {e}")
return {}
def poll_ir_signal(self):
"""Poll for IR signal changes"""
while self.running:
try:
current_state = GPIO.input(self.gpio_pin)
current_time = time.time()
# Detect state change
if current_state != self.last_state:
if self.pulse_start > 0:
# Calculate pulse/space duration
duration = (current_time - self.pulse_start) * 1000000 # Convert to microseconds
self.pulses.append(duration)
self.pulse_start = current_time
self.last_state = current_state
# Check for end of signal (no change for 100ms)
if self.pulse_start > 0 and (current_time - self.pulse_start) > 0.1:
if len(self.pulses) > 0:
self.process_pulses(self.pulses.copy())
self.pulses = []
self.pulse_start = 0
time.sleep(0.0001) # 0.1ms polling interval
except Exception as e:
self.logger.error(f"Error in polling loop: {e}")
time.sleep(0.01)
def process_pulses(self, pulses: List[float]):
"""Process captured pulses"""
if len(pulses) < 2:
return
# Try NEC protocol first
nec_command = self.decode_nec(pulses)
if nec_command:
self.handle_ir_command(nec_command)
return
# Try RC5 protocol
rc5_command = self.decode_rc5(pulses)
if rc5_command:
self.handle_ir_command(rc5_command)
return
# If no protocol matched, log for debugging
self.logger.debug(f"No protocol matched for {len(pulses)} pulses")
def run(self):
"""Main run loop"""
try:
# Display startup information
self.display_startup_info()
# Setup GPIO
if not self.setup_gpio():
print("Failed to setup GPIO. Exiting.")
return False
# Start polling
self.running = True
self.poll_ir_signal()
except KeyboardInterrupt:
print("\nShutting down IR listener...")
return True
except Exception as e:
self.logger.error(f"Error in main loop: {e}")
return False
finally:
self.cleanup()
def cleanup(self):
"""Cleanup resources"""
self.running = False
try:
GPIO.cleanup()
except:
pass
self.logger.info("IR Listener cleanup complete")
def main():
"""Main function"""
import argparse
parser = argparse.ArgumentParser(description="Simple IR Remote Listener (Polling Mode)")
parser.add_argument(
"--gpio-pin",
type=int,
default=18,
help="GPIO pin for IR receiver (default: 18)"
)
parser.add_argument(
"--verbose", "-v",
action="store_true",
help="Enable verbose logging"
)
args = parser.parse_args()
# Set logging level
if args.verbose:
logging.getLogger().setLevel(logging.DEBUG)
# Create and run listener
listener = SimpleIRListenerPolling(args.gpio_pin)
success = listener.run()
sys.exit(0 if success else 1)
if __name__ == "__main__":
main()

31
test_ir_listener.py Normal file
View File

@@ -0,0 +1,31 @@
#!/usr/bin/env python3
"""
Test script for IR listener
"""
import time
import sys
import os
# Add current directory to path
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
from simple_ir_listener_polling import SimpleIRListenerPolling
def test_ir_listener():
"""Test the IR listener with simulated commands"""
print("Testing IR Listener...")
# Create listener
listener = SimpleIRListenerPolling(gpio_pin=18)
# Test the command handler directly
print("Testing command handler...")
listener.handle_ir_command("NEC_00FF_00FF")
listener.handle_ir_command("NEC_00FF_807F")
listener.handle_ir_command("RC5_00_0C_0")
print("Test completed successfully!")
if __name__ == "__main__":
test_ir_listener()