ir listener

This commit is contained in:
2025-09-27 04:56:21 +02:00
parent e619e5f412
commit 8f063ec9dc
3 changed files with 609 additions and 0 deletions

View File

@@ -0,0 +1,142 @@
# Simple IR Listener for Console
A simplified IR remote listener designed for console use on Raspberry Pi systems.
## Features
- **No Simulation Mode**: Designed for real hardware use only
- **Console Output**: Clear, timestamped IR command display
- **Multiple Protocols**: Supports NEC and RC5 IR protocols
- **Simple Configuration**: Minimal setup required
- **IR Mapping Support**: Automatically loads IR mappings if available
## Quick Start
### 1. Deploy to Remote System
```bash
# Run the deployment script
./deploy_simple_ir_listener.sh
```
### 2. Connect to Remote System
```bash
ssh tulivision@192.168.1.137
cd /home/tulivision/rpi-tulivision
```
### 3. Run the IR Listener
```bash
# Basic usage
python3 simple_ir_listener.py
# With verbose logging
python3 simple_ir_listener.py --verbose
# With custom GPIO pin
python3 simple_ir_listener.py --gpio-pin 17
```
## Hardware Requirements
- Raspberry Pi with GPIO access
- IR receiver module connected to GPIO pin (default: pin 18)
- IR remote control
## GPIO Pin Configuration
The default GPIO pin is 18, but you can specify a different pin:
```bash
python3 simple_ir_listener.py --gpio-pin 17
```
## IR Mapping
The listener will automatically look for IR mapping files in this order:
1. `/home/tulivision/rpi-tulivision/ir_mapping.json`
2. `/etc/video_player/ir_mapping.json`
3. `ir_mapping.json` (current directory)
If a mapping file is found, the listener will display both the raw IR code and the mapped command.
### Example IR Mapping File
```json
{
"NEC_00FF_00FF": {
"command": "power_toggle",
"description": "Power button"
},
"NEC_00FF_807F": {
"command": "channel_0",
"description": "Channel 0"
},
"RC5_00_0C_0": "power_toggle"
}
```
## Output Format
The listener displays IR commands in the following format:
```
[14:30:25] IR Command Received: NEC_00FF_00FF
Mapped Command: power_toggle
Description: Power button
[14:30:28] IR Command Received: NEC_00FF_807F
Mapped Command: channel_0
Description: Channel 0
```
## Command Line Options
- `--gpio-pin PIN`: Specify GPIO pin for IR receiver (default: 18)
- `--verbose, -v`: Enable verbose logging for debugging
- `--help, -h`: Show help message
## Troubleshooting
### No IR Commands Detected
1. Check GPIO pin connection
2. Verify IR receiver is working
3. Try different GPIO pin: `--gpio-pin 17`
4. Enable verbose logging: `--verbose`
### Permission Errors
Make sure the script is executable:
```bash
chmod +x simple_ir_listener.py
```
### Missing Dependencies
Install required packages:
```bash
pip3 install --user RPi.GPIO
```
## Stopping the Listener
Press `Ctrl+C` to stop the IR listener gracefully.
## Integration
This simple listener can be integrated into larger systems by:
1. Modifying the `handle_ir_command` method
2. Adding custom callback functions
3. Integrating with existing configuration systems
## Differences from Full IR Listener
- **No simulation mode**: Real hardware only
- **Simplified configuration**: No complex config management
- **Console focused**: Designed for direct console use
- **Minimal dependencies**: Only essential IR functionality
- **No service integration**: Standalone script only

47
deploy_simple_ir_listener.sh Executable file
View File

@@ -0,0 +1,47 @@
#!/bin/bash
"""
Deploy Simple IR Listener to Remote Raspberry Pi
"""
# Configuration
REMOTE_USER="tulivision"
REMOTE_HOST="192.168.1.137"
REMOTE_DIR="/home/tulivision/rpi-tulivision"
LOCAL_SCRIPT="simple_ir_listener.py"
echo "Deploying Simple IR Listener to $REMOTE_USER@$REMOTE_HOST:$REMOTE_DIR"
# Check if local script exists
if [ ! -f "$LOCAL_SCRIPT" ]; then
echo "Error: $LOCAL_SCRIPT not found in current directory"
exit 1
fi
# Create remote directory if it doesn't exist
echo "Creating remote directory..."
ssh $REMOTE_USER@$REMOTE_HOST "mkdir -p $REMOTE_DIR"
# Copy script to remote system
echo "Copying script to remote system..."
scp $LOCAL_SCRIPT $REMOTE_USER@$REMOTE_HOST:$REMOTE_DIR/
# Make script executable on remote system
echo "Making script executable..."
ssh $REMOTE_USER@$REMOTE_HOST "chmod +x $REMOTE_DIR/$LOCAL_SCRIPT"
# Install required Python packages if needed
echo "Installing required packages..."
ssh $REMOTE_USER@$REMOTE_HOST "cd $REMOTE_DIR && pip3 install --user RPi.GPIO"
echo "Deployment complete!"
echo ""
echo "To run the IR listener on the remote system:"
echo "ssh $REMOTE_USER@$REMOTE_HOST"
echo "cd $REMOTE_DIR"
echo "python3 $LOCAL_SCRIPT"
echo ""
echo "Or run with verbose logging:"
echo "python3 $LOCAL_SCRIPT --verbose"
echo ""
echo "Or specify a different GPIO pin:"
echo "python3 $LOCAL_SCRIPT --gpio-pin 17"

420
simple_ir_listener.py Normal file
View File

@@ -0,0 +1,420 @@
#!/usr/bin/env python3
"""
Simple IR Remote Listener for Console
Listens to IR commands and prints them to console
Designed for use on remote Raspberry Pi system
"""
import os
import sys
import json
import time
import logging
import threading
import queue
from pathlib import Path
from typing import Dict, List, Optional, Tuple
import RPi.GPIO as GPIO
class IRProtocol:
"""Base class for IR protocol decoding"""
def __init__(self, name: str):
self.name = name
self.logger = logging.getLogger(f"{__name__}.{name}")
def decode(self, pulses: List[Tuple[bool, float]]) -> Optional[str]:
"""Decode IR pulses to command string"""
raise NotImplementedError
class NECProtocol(IRProtocol):
"""NEC IR protocol decoder"""
def __init__(self):
super().__init__("NEC")
# NEC protocol timing (in microseconds)
self.HEADER_PULSE = 9000
self.HEADER_SPACE = 4500
self.BIT_1_PULSE = 560
self.BIT_1_SPACE = 1690
self.BIT_0_PULSE = 560
self.BIT_0_SPACE = 560
self.REPEAT_SPACE = 2250
self.TOLERANCE = 0.2 # 20% tolerance
def decode(self, pulses: List[Tuple[bool, float]]) -> Optional[str]:
"""Decode NEC protocol pulses"""
if len(pulses) < 2:
return None
# Check for repeat code
if len(pulses) == 2:
pulse_time = pulses[0][1] * 1000000 # Convert to microseconds
space_time = pulses[1][1] * 1000000
if (self.HEADER_PULSE * (1 - self.TOLERANCE) <= pulse_time <= self.HEADER_PULSE * (1 + self.TOLERANCE) and
self.REPEAT_SPACE * (1 - self.TOLERANCE) <= space_time <= self.REPEAT_SPACE * (1 + self.TOLERANCE)):
return "REPEAT"
# Check for normal NEC frame (should have 34 pulses: header + 32 data bits)
if len(pulses) != 34:
return None
# Check header
header_pulse = pulses[0][1] * 1000000
header_space = pulses[1][1] * 1000000
if not (self.HEADER_PULSE * (1 - self.TOLERANCE) <= header_pulse <= self.HEADER_PULSE * (1 + self.TOLERANCE) and
self.HEADER_SPACE * (1 - self.TOLERANCE) <= header_space <= self.HEADER_SPACE * (1 + self.TOLERANCE)):
return None
# Decode 32 data bits
address = 0
command = 0
for i in range(2, 34, 2): # Skip header, process data bits
pulse_time = pulses[i][1] * 1000000
space_time = pulses[i + 1][1] * 1000000
# Check if it's a valid bit
if not (self.BIT_0_PULSE * (1 - self.TOLERANCE) <= pulse_time <= self.BIT_0_PULSE * (1 + self.TOLERANCE)):
return None
bit_index = (i - 2) // 2
if self.BIT_1_SPACE * (1 - self.TOLERANCE) <= space_time <= self.BIT_1_SPACE * (1 + self.TOLERANCE):
# Bit 1
if bit_index < 16:
address |= (1 << bit_index)
else:
command |= (1 << (bit_index - 16))
elif self.BIT_0_SPACE * (1 - self.TOLERANCE) <= space_time <= self.BIT_0_SPACE * (1 + self.TOLERANCE):
# Bit 0 (already 0 in the variables)
pass
else:
return None
# Return command as hex string
return f"NEC_{address:04X}_{command:04X}"
class RC5Protocol(IRProtocol):
"""RC5 IR protocol decoder"""
def __init__(self):
super().__init__("RC5")
self.BIT_TIME = 889 # microseconds
self.TOLERANCE = 0.2
def decode(self, pulses: List[Tuple[bool, float]]) -> Optional[str]:
"""Decode RC5 protocol pulses"""
if len(pulses) < 3:
return None
# RC5 starts with a space, then alternating pulses and spaces
bits = []
for i, (is_pulse, duration) in enumerate(pulses):
time_us = duration * 1000000
if i == 0 and not is_pulse:
# First space - skip
continue
# Determine if this is a short or long pulse/space
if time_us < self.BIT_TIME * (1 + self.TOLERANCE):
bits.append(0)
else:
bits.append(1)
if len(bits) < 14: # RC5 has 14 bits
return None
# Extract fields
start_bits = bits[0:2]
toggle = bits[2]
address = bits[3:8]
command = bits[8:14]
# Check start bits
if start_bits != [1, 1]:
return None
# Convert to integers
addr_val = sum(bit << (4 - i) for i, bit in enumerate(address))
cmd_val = sum(bit << (5 - i) for i, bit in enumerate(command))
return f"RC5_{addr_val:02X}_{cmd_val:02X}_{toggle}"
class SimpleIRRemote:
"""Simple IR Remote Control System for Console Use"""
def __init__(self, gpio_pin: int = 18, protocols: List[IRProtocol] = None):
self.gpio_pin = gpio_pin
self.protocols = protocols or [NECProtocol(), RC5Protocol()]
self.logger = logging.getLogger(__name__)
# IR signal capture
self.pulses = []
self.capturing = False
self.last_pulse_time = 0
self.capture_timeout = 0.1 # 100ms timeout
# Command queue
self.command_queue = queue.Queue()
# GPIO setup
self.setup_gpio()
# Start capture thread
self.capture_thread = threading.Thread(target=self._capture_loop, daemon=True)
self.capture_thread.start()
# Start decode thread
self.decode_thread = threading.Thread(target=self._decode_loop, daemon=True)
self.decode_thread.start()
def setup_gpio(self):
"""Setup GPIO for IR receiver"""
try:
GPIO.setmode(GPIO.BCM)
GPIO.setup(self.gpio_pin, GPIO.IN, pull_up_down=GPIO.PUD_UP)
GPIO.add_event_detect(self.gpio_pin, GPIO.BOTH,
callback=self._gpio_callback, bouncetime=1)
self.logger.info(f"IR Remote GPIO setup complete on pin {self.gpio_pin}")
except Exception as e:
self.logger.error(f"GPIO setup failed: {e}")
raise
def _gpio_callback(self, channel):
"""GPIO callback for IR signal detection"""
current_time = time.time()
if not self.capturing:
# Start capturing on first pulse
self.capturing = True
self.pulses = []
self.last_pulse_time = current_time
else:
# Add pulse/space to capture
duration = current_time - self.last_pulse_time
is_pulse = GPIO.input(self.gpio_pin) == GPIO.LOW
self.pulses.append((is_pulse, duration))
self.last_pulse_time = current_time
def _capture_loop(self):
"""Main capture loop"""
while True:
if self.capturing:
# Check for capture timeout
if time.time() - self.last_pulse_time > self.capture_timeout:
if len(self.pulses) > 0:
# Process captured pulses
self._process_pulses(self.pulses.copy())
self.capturing = False
self.pulses = []
time.sleep(0.001) # 1ms sleep
def _process_pulses(self, pulses: List[Tuple[bool, float]]):
"""Process captured pulses through all protocols"""
for protocol in self.protocols:
try:
command = protocol.decode(pulses)
if command:
self.logger.debug(f"Decoded {protocol.name} command: {command}")
self.command_queue.put(command)
return
except Exception as e:
self.logger.debug(f"Protocol {protocol.name} decode error: {e}")
# If no protocol matched, log the raw pulses for debugging
self.logger.debug(f"No protocol matched for {len(pulses)} pulses")
def _decode_loop(self):
"""Main decode loop"""
while True:
try:
if not self.command_queue.empty():
command = self.command_queue.get(timeout=0.1)
self._handle_command(command)
else:
time.sleep(0.01)
except queue.Empty:
continue
except Exception as e:
self.logger.error(f"Error in decode loop: {e}")
def _handle_command(self, command: str):
"""Handle decoded IR command"""
self.logger.info(f"IR Command received: {command}")
# This method can be overridden by subclasses
# or connected to a callback system
if hasattr(self, 'command_callback'):
self.command_callback(command)
def set_command_callback(self, callback):
"""Set callback function for IR commands"""
self.command_callback = callback
def get_command(self, timeout: float = 1.0) -> Optional[str]:
"""Get next IR command with timeout"""
try:
return self.command_queue.get(timeout=timeout)
except queue.Empty:
return None
def cleanup(self):
"""Cleanup GPIO resources"""
GPIO.cleanup()
self.logger.info("IR Remote cleanup complete")
class SimpleIRListener:
"""Simple IR Remote Listener for Console"""
def __init__(self, gpio_pin: int = 18):
self.gpio_pin = gpio_pin
self.ir_remote = None
self.logger = self._setup_logging()
def _setup_logging(self) -> logging.Logger:
"""Setup logging for the listener"""
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
# Create console handler
handler = logging.StreamHandler()
formatter = logging.Formatter(
'%(asctime)s - %(levelname)s - %(message)s'
)
handler.setFormatter(formatter)
logger.addHandler(handler)
return logger
def display_startup_info(self):
"""Display startup information"""
print("=" * 60)
print("SIMPLE IR REMOTE LISTENER")
print("=" * 60)
print(f"GPIO Pin: {self.gpio_pin}")
print("Supported Protocols: NEC, RC5")
print("=" * 60)
print("LISTENING FOR IR COMMANDS...")
print("Press Ctrl+C to exit")
print("=" * 60)
print()
def setup_ir_remote(self):
"""Setup IR remote"""
try:
self.ir_remote = SimpleIRRemote(gpio_pin=self.gpio_pin)
self.ir_remote.set_command_callback(self.handle_ir_command)
self.logger.info(f"IR Remote setup complete on GPIO pin {self.gpio_pin}")
return True
except Exception as e:
self.logger.error(f"Failed to setup IR remote: {e}")
return False
def handle_ir_command(self, ir_code: str):
"""Handle received IR command"""
timestamp = time.strftime("%H:%M:%S")
# Print command information
print(f"[{timestamp}] IR Command Received: {ir_code}")
# Try to load mapping if available
mapping = self.load_ir_mapping()
if ir_code in mapping:
mapped_command = mapping[ir_code]
if isinstance(mapped_command, dict):
print(f" Mapped Command: {mapped_command.get('command', 'unknown')}")
if mapped_command.get('description'):
print(f" Description: {mapped_command['description']}")
else:
print(f" Mapped Command: {mapped_command}")
else:
print(f" Mapped Command: UNKNOWN (not in mapping)")
print()
def load_ir_mapping(self) -> Dict:
"""Load IR code mapping from file if available"""
mapping_files = [
"/home/tulivision/rpi-tulivision/ir_mapping.json",
"/etc/video_player/ir_mapping.json",
"ir_mapping.json"
]
for mapping_file in mapping_files:
if os.path.exists(mapping_file):
try:
with open(mapping_file, 'r') as f:
return json.load(f)
except Exception as e:
self.logger.debug(f"Could not load mapping from {mapping_file}: {e}")
return {}
def run(self):
"""Main run loop"""
try:
# Display startup information
self.display_startup_info()
# Setup IR remote
if not self.setup_ir_remote():
print("Failed to setup IR remote. Exiting.")
return False
# Main listening loop
while True:
time.sleep(0.1)
except KeyboardInterrupt:
print("\nShutting down IR listener...")
return True
except Exception as e:
self.logger.error(f"Error in main loop: {e}")
return False
finally:
self.cleanup()
def cleanup(self):
"""Cleanup resources"""
if self.ir_remote:
self.ir_remote.cleanup()
self.logger.info("IR Listener cleanup complete")
def main():
"""Main function"""
import argparse
parser = argparse.ArgumentParser(description="Simple IR Remote Listener for Console")
parser.add_argument(
"--gpio-pin",
type=int,
default=18,
help="GPIO pin for IR receiver (default: 18)"
)
parser.add_argument(
"--verbose", "-v",
action="store_true",
help="Enable verbose logging"
)
args = parser.parse_args()
# Set logging level
if args.verbose:
logging.getLogger().setLevel(logging.DEBUG)
# Create and run listener
listener = SimpleIRListener(args.gpio_pin)
success = listener.run()
sys.exit(0 if success else 1)
if __name__ == "__main__":
main()