diff --git a/SIMPLE_IR_LISTENER_README.md b/SIMPLE_IR_LISTENER_README.md new file mode 100644 index 0000000..970131c --- /dev/null +++ b/SIMPLE_IR_LISTENER_README.md @@ -0,0 +1,142 @@ +# Simple IR Listener for Console + +A simplified IR remote listener designed for console use on Raspberry Pi systems. + +## Features + +- **No Simulation Mode**: Designed for real hardware use only +- **Console Output**: Clear, timestamped IR command display +- **Multiple Protocols**: Supports NEC and RC5 IR protocols +- **Simple Configuration**: Minimal setup required +- **IR Mapping Support**: Automatically loads IR mappings if available + +## Quick Start + +### 1. Deploy to Remote System + +```bash +# Run the deployment script +./deploy_simple_ir_listener.sh +``` + +### 2. Connect to Remote System + +```bash +ssh tulivision@192.168.1.137 +cd /home/tulivision/rpi-tulivision +``` + +### 3. Run the IR Listener + +```bash +# Basic usage +python3 simple_ir_listener.py + +# With verbose logging +python3 simple_ir_listener.py --verbose + +# With custom GPIO pin +python3 simple_ir_listener.py --gpio-pin 17 +``` + +## Hardware Requirements + +- Raspberry Pi with GPIO access +- IR receiver module connected to GPIO pin (default: pin 18) +- IR remote control + +## GPIO Pin Configuration + +The default GPIO pin is 18, but you can specify a different pin: + +```bash +python3 simple_ir_listener.py --gpio-pin 17 +``` + +## IR Mapping + +The listener will automatically look for IR mapping files in this order: +1. `/home/tulivision/rpi-tulivision/ir_mapping.json` +2. `/etc/video_player/ir_mapping.json` +3. `ir_mapping.json` (current directory) + +If a mapping file is found, the listener will display both the raw IR code and the mapped command. + +### Example IR Mapping File + +```json +{ + "NEC_00FF_00FF": { + "command": "power_toggle", + "description": "Power button" + }, + "NEC_00FF_807F": { + "command": "channel_0", + "description": "Channel 0" + }, + "RC5_00_0C_0": "power_toggle" +} +``` + +## Output Format + +The listener displays IR commands in the following format: + +``` +[14:30:25] IR Command Received: NEC_00FF_00FF + Mapped Command: power_toggle + Description: Power button + +[14:30:28] IR Command Received: NEC_00FF_807F + Mapped Command: channel_0 + Description: Channel 0 +``` + +## Command Line Options + +- `--gpio-pin PIN`: Specify GPIO pin for IR receiver (default: 18) +- `--verbose, -v`: Enable verbose logging for debugging +- `--help, -h`: Show help message + +## Troubleshooting + +### No IR Commands Detected + +1. Check GPIO pin connection +2. Verify IR receiver is working +3. Try different GPIO pin: `--gpio-pin 17` +4. Enable verbose logging: `--verbose` + +### Permission Errors + +Make sure the script is executable: +```bash +chmod +x simple_ir_listener.py +``` + +### Missing Dependencies + +Install required packages: +```bash +pip3 install --user RPi.GPIO +``` + +## Stopping the Listener + +Press `Ctrl+C` to stop the IR listener gracefully. + +## Integration + +This simple listener can be integrated into larger systems by: + +1. Modifying the `handle_ir_command` method +2. Adding custom callback functions +3. Integrating with existing configuration systems + +## Differences from Full IR Listener + +- **No simulation mode**: Real hardware only +- **Simplified configuration**: No complex config management +- **Console focused**: Designed for direct console use +- **Minimal dependencies**: Only essential IR functionality +- **No service integration**: Standalone script only diff --git a/deploy_simple_ir_listener.sh b/deploy_simple_ir_listener.sh new file mode 100755 index 0000000..32dc80b --- /dev/null +++ b/deploy_simple_ir_listener.sh @@ -0,0 +1,47 @@ +#!/bin/bash +""" +Deploy Simple IR Listener to Remote Raspberry Pi +""" + +# Configuration +REMOTE_USER="tulivision" +REMOTE_HOST="192.168.1.137" +REMOTE_DIR="/home/tulivision/rpi-tulivision" +LOCAL_SCRIPT="simple_ir_listener.py" + +echo "Deploying Simple IR Listener to $REMOTE_USER@$REMOTE_HOST:$REMOTE_DIR" + +# Check if local script exists +if [ ! -f "$LOCAL_SCRIPT" ]; then + echo "Error: $LOCAL_SCRIPT not found in current directory" + exit 1 +fi + +# Create remote directory if it doesn't exist +echo "Creating remote directory..." +ssh $REMOTE_USER@$REMOTE_HOST "mkdir -p $REMOTE_DIR" + +# Copy script to remote system +echo "Copying script to remote system..." +scp $LOCAL_SCRIPT $REMOTE_USER@$REMOTE_HOST:$REMOTE_DIR/ + +# Make script executable on remote system +echo "Making script executable..." +ssh $REMOTE_USER@$REMOTE_HOST "chmod +x $REMOTE_DIR/$LOCAL_SCRIPT" + +# Install required Python packages if needed +echo "Installing required packages..." +ssh $REMOTE_USER@$REMOTE_HOST "cd $REMOTE_DIR && pip3 install --user RPi.GPIO" + +echo "Deployment complete!" +echo "" +echo "To run the IR listener on the remote system:" +echo "ssh $REMOTE_USER@$REMOTE_HOST" +echo "cd $REMOTE_DIR" +echo "python3 $LOCAL_SCRIPT" +echo "" +echo "Or run with verbose logging:" +echo "python3 $LOCAL_SCRIPT --verbose" +echo "" +echo "Or specify a different GPIO pin:" +echo "python3 $LOCAL_SCRIPT --gpio-pin 17" diff --git a/simple_ir_listener.py b/simple_ir_listener.py new file mode 100644 index 0000000..450072b --- /dev/null +++ b/simple_ir_listener.py @@ -0,0 +1,420 @@ +#!/usr/bin/env python3 +""" +Simple IR Remote Listener for Console +Listens to IR commands and prints them to console +Designed for use on remote Raspberry Pi system +""" + +import os +import sys +import json +import time +import logging +import threading +import queue +from pathlib import Path +from typing import Dict, List, Optional, Tuple +import RPi.GPIO as GPIO + +class IRProtocol: + """Base class for IR protocol decoding""" + + def __init__(self, name: str): + self.name = name + self.logger = logging.getLogger(f"{__name__}.{name}") + + def decode(self, pulses: List[Tuple[bool, float]]) -> Optional[str]: + """Decode IR pulses to command string""" + raise NotImplementedError + +class NECProtocol(IRProtocol): + """NEC IR protocol decoder""" + + def __init__(self): + super().__init__("NEC") + # NEC protocol timing (in microseconds) + self.HEADER_PULSE = 9000 + self.HEADER_SPACE = 4500 + self.BIT_1_PULSE = 560 + self.BIT_1_SPACE = 1690 + self.BIT_0_PULSE = 560 + self.BIT_0_SPACE = 560 + self.REPEAT_SPACE = 2250 + self.TOLERANCE = 0.2 # 20% tolerance + + def decode(self, pulses: List[Tuple[bool, float]]) -> Optional[str]: + """Decode NEC protocol pulses""" + if len(pulses) < 2: + return None + + # Check for repeat code + if len(pulses) == 2: + pulse_time = pulses[0][1] * 1000000 # Convert to microseconds + space_time = pulses[1][1] * 1000000 + + if (self.HEADER_PULSE * (1 - self.TOLERANCE) <= pulse_time <= self.HEADER_PULSE * (1 + self.TOLERANCE) and + self.REPEAT_SPACE * (1 - self.TOLERANCE) <= space_time <= self.REPEAT_SPACE * (1 + self.TOLERANCE)): + return "REPEAT" + + # Check for normal NEC frame (should have 34 pulses: header + 32 data bits) + if len(pulses) != 34: + return None + + # Check header + header_pulse = pulses[0][1] * 1000000 + header_space = pulses[1][1] * 1000000 + + if not (self.HEADER_PULSE * (1 - self.TOLERANCE) <= header_pulse <= self.HEADER_PULSE * (1 + self.TOLERANCE) and + self.HEADER_SPACE * (1 - self.TOLERANCE) <= header_space <= self.HEADER_SPACE * (1 + self.TOLERANCE)): + return None + + # Decode 32 data bits + address = 0 + command = 0 + + for i in range(2, 34, 2): # Skip header, process data bits + pulse_time = pulses[i][1] * 1000000 + space_time = pulses[i + 1][1] * 1000000 + + # Check if it's a valid bit + if not (self.BIT_0_PULSE * (1 - self.TOLERANCE) <= pulse_time <= self.BIT_0_PULSE * (1 + self.TOLERANCE)): + return None + + bit_index = (i - 2) // 2 + + if self.BIT_1_SPACE * (1 - self.TOLERANCE) <= space_time <= self.BIT_1_SPACE * (1 + self.TOLERANCE): + # Bit 1 + if bit_index < 16: + address |= (1 << bit_index) + else: + command |= (1 << (bit_index - 16)) + elif self.BIT_0_SPACE * (1 - self.TOLERANCE) <= space_time <= self.BIT_0_SPACE * (1 + self.TOLERANCE): + # Bit 0 (already 0 in the variables) + pass + else: + return None + + # Return command as hex string + return f"NEC_{address:04X}_{command:04X}" + +class RC5Protocol(IRProtocol): + """RC5 IR protocol decoder""" + + def __init__(self): + super().__init__("RC5") + self.BIT_TIME = 889 # microseconds + self.TOLERANCE = 0.2 + + def decode(self, pulses: List[Tuple[bool, float]]) -> Optional[str]: + """Decode RC5 protocol pulses""" + if len(pulses) < 3: + return None + + # RC5 starts with a space, then alternating pulses and spaces + bits = [] + + for i, (is_pulse, duration) in enumerate(pulses): + time_us = duration * 1000000 + + if i == 0 and not is_pulse: + # First space - skip + continue + + # Determine if this is a short or long pulse/space + if time_us < self.BIT_TIME * (1 + self.TOLERANCE): + bits.append(0) + else: + bits.append(1) + + if len(bits) < 14: # RC5 has 14 bits + return None + + # Extract fields + start_bits = bits[0:2] + toggle = bits[2] + address = bits[3:8] + command = bits[8:14] + + # Check start bits + if start_bits != [1, 1]: + return None + + # Convert to integers + addr_val = sum(bit << (4 - i) for i, bit in enumerate(address)) + cmd_val = sum(bit << (5 - i) for i, bit in enumerate(command)) + + return f"RC5_{addr_val:02X}_{cmd_val:02X}_{toggle}" + +class SimpleIRRemote: + """Simple IR Remote Control System for Console Use""" + + def __init__(self, gpio_pin: int = 18, protocols: List[IRProtocol] = None): + self.gpio_pin = gpio_pin + self.protocols = protocols or [NECProtocol(), RC5Protocol()] + self.logger = logging.getLogger(__name__) + + # IR signal capture + self.pulses = [] + self.capturing = False + self.last_pulse_time = 0 + self.capture_timeout = 0.1 # 100ms timeout + + # Command queue + self.command_queue = queue.Queue() + + # GPIO setup + self.setup_gpio() + + # Start capture thread + self.capture_thread = threading.Thread(target=self._capture_loop, daemon=True) + self.capture_thread.start() + + # Start decode thread + self.decode_thread = threading.Thread(target=self._decode_loop, daemon=True) + self.decode_thread.start() + + def setup_gpio(self): + """Setup GPIO for IR receiver""" + try: + GPIO.setmode(GPIO.BCM) + GPIO.setup(self.gpio_pin, GPIO.IN, pull_up_down=GPIO.PUD_UP) + GPIO.add_event_detect(self.gpio_pin, GPIO.BOTH, + callback=self._gpio_callback, bouncetime=1) + self.logger.info(f"IR Remote GPIO setup complete on pin {self.gpio_pin}") + except Exception as e: + self.logger.error(f"GPIO setup failed: {e}") + raise + + def _gpio_callback(self, channel): + """GPIO callback for IR signal detection""" + current_time = time.time() + + if not self.capturing: + # Start capturing on first pulse + self.capturing = True + self.pulses = [] + self.last_pulse_time = current_time + else: + # Add pulse/space to capture + duration = current_time - self.last_pulse_time + is_pulse = GPIO.input(self.gpio_pin) == GPIO.LOW + + self.pulses.append((is_pulse, duration)) + self.last_pulse_time = current_time + + def _capture_loop(self): + """Main capture loop""" + while True: + if self.capturing: + # Check for capture timeout + if time.time() - self.last_pulse_time > self.capture_timeout: + if len(self.pulses) > 0: + # Process captured pulses + self._process_pulses(self.pulses.copy()) + self.capturing = False + self.pulses = [] + + time.sleep(0.001) # 1ms sleep + + def _process_pulses(self, pulses: List[Tuple[bool, float]]): + """Process captured pulses through all protocols""" + for protocol in self.protocols: + try: + command = protocol.decode(pulses) + if command: + self.logger.debug(f"Decoded {protocol.name} command: {command}") + self.command_queue.put(command) + return + except Exception as e: + self.logger.debug(f"Protocol {protocol.name} decode error: {e}") + + # If no protocol matched, log the raw pulses for debugging + self.logger.debug(f"No protocol matched for {len(pulses)} pulses") + + def _decode_loop(self): + """Main decode loop""" + while True: + try: + if not self.command_queue.empty(): + command = self.command_queue.get(timeout=0.1) + self._handle_command(command) + else: + time.sleep(0.01) + except queue.Empty: + continue + except Exception as e: + self.logger.error(f"Error in decode loop: {e}") + + def _handle_command(self, command: str): + """Handle decoded IR command""" + self.logger.info(f"IR Command received: {command}") + + # This method can be overridden by subclasses + # or connected to a callback system + if hasattr(self, 'command_callback'): + self.command_callback(command) + + def set_command_callback(self, callback): + """Set callback function for IR commands""" + self.command_callback = callback + + def get_command(self, timeout: float = 1.0) -> Optional[str]: + """Get next IR command with timeout""" + try: + return self.command_queue.get(timeout=timeout) + except queue.Empty: + return None + + def cleanup(self): + """Cleanup GPIO resources""" + GPIO.cleanup() + self.logger.info("IR Remote cleanup complete") + +class SimpleIRListener: + """Simple IR Remote Listener for Console""" + + def __init__(self, gpio_pin: int = 18): + self.gpio_pin = gpio_pin + self.ir_remote = None + self.logger = self._setup_logging() + + def _setup_logging(self) -> logging.Logger: + """Setup logging for the listener""" + logger = logging.getLogger(__name__) + logger.setLevel(logging.INFO) + + # Create console handler + handler = logging.StreamHandler() + formatter = logging.Formatter( + '%(asctime)s - %(levelname)s - %(message)s' + ) + handler.setFormatter(formatter) + logger.addHandler(handler) + + return logger + + def display_startup_info(self): + """Display startup information""" + print("=" * 60) + print("SIMPLE IR REMOTE LISTENER") + print("=" * 60) + print(f"GPIO Pin: {self.gpio_pin}") + print("Supported Protocols: NEC, RC5") + print("=" * 60) + print("LISTENING FOR IR COMMANDS...") + print("Press Ctrl+C to exit") + print("=" * 60) + print() + + def setup_ir_remote(self): + """Setup IR remote""" + try: + self.ir_remote = SimpleIRRemote(gpio_pin=self.gpio_pin) + self.ir_remote.set_command_callback(self.handle_ir_command) + self.logger.info(f"IR Remote setup complete on GPIO pin {self.gpio_pin}") + return True + except Exception as e: + self.logger.error(f"Failed to setup IR remote: {e}") + return False + + def handle_ir_command(self, ir_code: str): + """Handle received IR command""" + timestamp = time.strftime("%H:%M:%S") + + # Print command information + print(f"[{timestamp}] IR Command Received: {ir_code}") + + # Try to load mapping if available + mapping = self.load_ir_mapping() + if ir_code in mapping: + mapped_command = mapping[ir_code] + if isinstance(mapped_command, dict): + print(f" Mapped Command: {mapped_command.get('command', 'unknown')}") + if mapped_command.get('description'): + print(f" Description: {mapped_command['description']}") + else: + print(f" Mapped Command: {mapped_command}") + else: + print(f" Mapped Command: UNKNOWN (not in mapping)") + + print() + + def load_ir_mapping(self) -> Dict: + """Load IR code mapping from file if available""" + mapping_files = [ + "/home/tulivision/rpi-tulivision/ir_mapping.json", + "/etc/video_player/ir_mapping.json", + "ir_mapping.json" + ] + + for mapping_file in mapping_files: + if os.path.exists(mapping_file): + try: + with open(mapping_file, 'r') as f: + return json.load(f) + except Exception as e: + self.logger.debug(f"Could not load mapping from {mapping_file}: {e}") + + return {} + + def run(self): + """Main run loop""" + try: + # Display startup information + self.display_startup_info() + + # Setup IR remote + if not self.setup_ir_remote(): + print("Failed to setup IR remote. Exiting.") + return False + + # Main listening loop + while True: + time.sleep(0.1) + + except KeyboardInterrupt: + print("\nShutting down IR listener...") + return True + except Exception as e: + self.logger.error(f"Error in main loop: {e}") + return False + finally: + self.cleanup() + + def cleanup(self): + """Cleanup resources""" + if self.ir_remote: + self.ir_remote.cleanup() + self.logger.info("IR Listener cleanup complete") + +def main(): + """Main function""" + import argparse + + parser = argparse.ArgumentParser(description="Simple IR Remote Listener for Console") + parser.add_argument( + "--gpio-pin", + type=int, + default=18, + help="GPIO pin for IR receiver (default: 18)" + ) + parser.add_argument( + "--verbose", "-v", + action="store_true", + help="Enable verbose logging" + ) + + args = parser.parse_args() + + # Set logging level + if args.verbose: + logging.getLogger().setLevel(logging.DEBUG) + + # Create and run listener + listener = SimpleIRListener(args.gpio_pin) + success = listener.run() + + sys.exit(0 if success else 1) + +if __name__ == "__main__": + main()