#!/usr/bin/env python3 """ Advanced IR Remote Control System for Raspberry Pi Supports multiple IR protocols (NEC, RC5, RC6, etc.) with proper decoding """ import time import threading import queue import logging from typing import Dict, List, Optional, Tuple import RPi.GPIO as GPIO class IRProtocol: """Base class for IR protocol decoding""" def __init__(self, name: str): self.name = name self.logger = logging.getLogger(f"{__name__}.{name}") def decode(self, pulses: List[Tuple[bool, float]]) -> Optional[str]: """Decode IR pulses to command string""" raise NotImplementedError class NECProtocol(IRProtocol): """NEC IR protocol decoder""" def __init__(self): super().__init__("NEC") # NEC protocol timing (in microseconds) self.HEADER_PULSE = 9000 self.HEADER_SPACE = 4500 self.BIT_1_PULSE = 560 self.BIT_1_SPACE = 1690 self.BIT_0_PULSE = 560 self.BIT_0_SPACE = 560 self.REPEAT_SPACE = 2250 self.TOLERANCE = 0.2 # 20% tolerance def decode(self, pulses: List[Tuple[bool, float]]) -> Optional[str]: """Decode NEC protocol pulses""" if len(pulses) < 2: return None # Check for repeat code if len(pulses) == 2: pulse_time = pulses[0][1] * 1000000 # Convert to microseconds space_time = pulses[1][1] * 1000000 if (self.HEADER_PULSE * (1 - self.TOLERANCE) <= pulse_time <= self.HEADER_PULSE * (1 + self.TOLERANCE) and self.REPEAT_SPACE * (1 - self.TOLERANCE) <= space_time <= self.REPEAT_SPACE * (1 + self.TOLERANCE)): return "REPEAT" # Check for normal NEC frame (should have 34 pulses: header + 32 data bits) if len(pulses) != 34: return None # Check header header_pulse = pulses[0][1] * 1000000 header_space = pulses[1][1] * 1000000 if not (self.HEADER_PULSE * (1 - self.TOLERANCE) <= header_pulse <= self.HEADER_PULSE * (1 + self.TOLERANCE) and self.HEADER_SPACE * (1 - self.TOLERANCE) <= header_space <= self.HEADER_SPACE * (1 + self.TOLERANCE)): return None # Decode 32 data bits address = 0 command = 0 for i in range(2, 34, 2): # Skip header, process data bits pulse_time = pulses[i][1] * 1000000 space_time = pulses[i + 1][1] * 1000000 # Check if it's a valid bit if not (self.BIT_0_PULSE * (1 - self.TOLERANCE) <= pulse_time <= self.BIT_0_PULSE * (1 + self.TOLERANCE)): return None bit_index = (i - 2) // 2 if self.BIT_1_SPACE * (1 - self.TOLERANCE) <= space_time <= self.BIT_1_SPACE * (1 + self.TOLERANCE): # Bit 1 if bit_index < 16: address |= (1 << bit_index) else: command |= (1 << (bit_index - 16)) elif self.BIT_0_SPACE * (1 - self.TOLERANCE) <= space_time <= self.BIT_0_SPACE * (1 + self.TOLERANCE): # Bit 0 (already 0 in the variables) pass else: return None # Return command as hex string return f"NEC_{address:04X}_{command:04X}" class RC5Protocol(IRProtocol): """RC5 IR protocol decoder""" def __init__(self): super().__init__("RC5") self.BIT_TIME = 889 # microseconds self.TOLERANCE = 0.2 def decode(self, pulses: List[Tuple[bool, float]]) -> Optional[str]: """Decode RC5 protocol pulses""" if len(pulses) < 3: return None # RC5 starts with a space, then alternating pulses and spaces bits = [] for i, (is_pulse, duration) in enumerate(pulses): time_us = duration * 1000000 if i == 0 and not is_pulse: # First space - skip continue # Determine if this is a short or long pulse/space if time_us < self.BIT_TIME * (1 + self.TOLERANCE): bits.append(0) else: bits.append(1) if len(bits) < 14: # RC5 has 14 bits return None # Extract fields start_bits = bits[0:2] toggle = bits[2] address = bits[3:8] command = bits[8:14] # Check start bits if start_bits != [1, 1]: return None # Convert to integers addr_val = sum(bit << (4 - i) for i, bit in enumerate(address)) cmd_val = sum(bit << (5 - i) for i, bit in enumerate(command)) return f"RC5_{addr_val:02X}_{cmd_val:02X}_{toggle}" class IRRemote: """Advanced IR Remote Control System""" def __init__(self, gpio_pin: int = 18, protocols: List[IRProtocol] = None): self.gpio_pin = gpio_pin self.protocols = protocols or [NECProtocol(), RC5Protocol()] self.logger = logging.getLogger(__name__) # IR signal capture self.pulses = [] self.capturing = False self.last_pulse_time = 0 self.capture_timeout = 0.1 # 100ms timeout # Command queue self.command_queue = queue.Queue() # GPIO setup self.setup_gpio() # Start capture thread self.capture_thread = threading.Thread(target=self._capture_loop, daemon=True) self.capture_thread.start() # Start decode thread self.decode_thread = threading.Thread(target=self._decode_loop, daemon=True) self.decode_thread.start() def setup_gpio(self): """Setup GPIO for IR receiver""" try: GPIO.setmode(GPIO.BCM) GPIO.setup(self.gpio_pin, GPIO.IN, pull_up_down=GPIO.PUD_UP) GPIO.add_event_detect(self.gpio_pin, GPIO.BOTH, callback=self._gpio_callback, bouncetime=1) self.logger.info(f"IR Remote GPIO setup complete on pin {self.gpio_pin}") except Exception as e: self.logger.error(f"GPIO setup failed: {e}") raise def _gpio_callback(self, channel): """GPIO callback for IR signal detection""" current_time = time.time() if not self.capturing: # Start capturing on first pulse self.capturing = True self.pulses = [] self.last_pulse_time = current_time else: # Add pulse/space to capture duration = current_time - self.last_pulse_time is_pulse = GPIO.input(self.gpio_pin) == GPIO.LOW self.pulses.append((is_pulse, duration)) self.last_pulse_time = current_time def _capture_loop(self): """Main capture loop""" while True: if self.capturing: # Check for capture timeout if time.time() - self.last_pulse_time > self.capture_timeout: if len(self.pulses) > 0: # Process captured pulses self._process_pulses(self.pulses.copy()) self.capturing = False self.pulses = [] time.sleep(0.001) # 1ms sleep def _process_pulses(self, pulses: List[Tuple[bool, float]]): """Process captured pulses through all protocols""" for protocol in self.protocols: try: command = protocol.decode(pulses) if command: self.logger.debug(f"Decoded {protocol.name} command: {command}") self.command_queue.put(command) return except Exception as e: self.logger.debug(f"Protocol {protocol.name} decode error: {e}") # If no protocol matched, log the raw pulses for debugging self.logger.debug(f"No protocol matched for {len(pulses)} pulses") def _decode_loop(self): """Main decode loop""" while True: try: if not self.command_queue.empty(): command = self.command_queue.get(timeout=0.1) self._handle_command(command) else: time.sleep(0.01) except queue.Empty: continue except Exception as e: self.logger.error(f"Error in decode loop: {e}") def _handle_command(self, command: str): """Handle decoded IR command""" self.logger.info(f"IR Command received: {command}") # This method can be overridden by subclasses # or connected to a callback system if hasattr(self, 'command_callback'): self.command_callback(command) def set_command_callback(self, callback): """Set callback function for IR commands""" self.command_callback = callback def get_command(self, timeout: float = 1.0) -> Optional[str]: """Get next IR command with timeout""" try: return self.command_queue.get(timeout=timeout) except queue.Empty: return None def cleanup(self): """Cleanup GPIO resources""" GPIO.cleanup() self.logger.info("IR Remote cleanup complete") class IRCodeMapper: """Map IR codes to application commands""" def __init__(self, mapping_file: str = "ir_mapping.json"): self.mapping_file = mapping_file self.mapping = self.load_mapping() self.logger = logging.getLogger(__name__) def load_mapping(self) -> Dict[str, str]: """Load IR code mapping from file""" try: if os.path.exists(self.mapping_file): with open(self.mapping_file, 'r') as f: return json.load(f) else: return self.create_default_mapping() except Exception as e: self.logger.error(f"Error loading IR mapping: {e}") return {} def create_default_mapping(self) -> Dict[str, str]: """Create default IR code mapping""" default_mapping = { # NEC protocol examples (these would be actual codes from your remote) "NEC_00FF_00FF": "power_toggle", "NEC_00FF_807F": "channel_0", "NEC_00FF_40BF": "channel_1", "NEC_00FF_C03F": "channel_2", "NEC_00FF_20DF": "channel_3", "NEC_00FF_A05F": "channel_4", "NEC_00FF_609F": "channel_5", "NEC_00FF_E01F": "channel_6", "NEC_00FF_10EF": "channel_7", "NEC_00FF_906F": "channel_8", "NEC_00FF_50AF": "channel_9", "NEC_00FF_00FF": "play_pause", "NEC_00FF_807F": "stop", "NEC_00FF_40BF": "next_channel", "NEC_00FF_C03F": "prev_channel", "NEC_00FF_20DF": "volume_up", "NEC_00FF_A05F": "volume_down", # RC5 protocol examples "RC5_00_0C_0": "power_toggle", "RC5_00_00_0": "channel_0", "RC5_00_01_0": "channel_1", "RC5_00_02_0": "channel_2", "RC5_00_03_0": "channel_3", "RC5_00_04_0": "channel_4", "RC5_00_05_0": "channel_5", "RC5_00_06_0": "channel_6", "RC5_00_07_0": "channel_7", "RC5_00_08_0": "channel_8", "RC5_00_09_0": "channel_9", "RC5_00_35_0": "play_pause", "RC5_00_36_0": "stop", "RC5_00_32_0": "next_channel", "RC5_00_33_0": "prev_channel", "RC5_00_10_0": "volume_up", "RC5_00_11_0": "volume_down", # Repeat command "REPEAT": "repeat_last" } # Save default mapping with open(self.mapping_file, 'w') as f: json.dump(default_mapping, f, indent=2) self.logger.info(f"Created default IR mapping file: {self.mapping_file}") return default_mapping def get_command(self, ir_code: str) -> Optional[str]: """Get application command for IR code""" return self.mapping.get(ir_code) def add_mapping(self, ir_code: str, command: str): """Add new IR code mapping""" self.mapping[ir_code] = command self.save_mapping() def save_mapping(self): """Save IR code mapping to file""" try: with open(self.mapping_file, 'w') as f: json.dump(self.mapping, f, indent=2) self.logger.info("IR mapping saved") except Exception as e: self.logger.error(f"Error saving IR mapping: {e}") class IRCodeLearner: """Learn IR codes from unknown remotes""" def __init__(self, ir_remote: IRRemote, mapper: IRCodeMapper): self.ir_remote = ir_remote self.mapper = mapper self.learning = False self.logger = logging.getLogger(__name__) def start_learning(self): """Start IR code learning mode""" self.learning = True self.logger.info("IR code learning mode started") print("IR Code Learning Mode Started") print("Press buttons on your remote to learn codes") print("Press Ctrl+C to exit learning mode") try: while self.learning: command = self.ir_remote.get_command(timeout=1.0) if command: self._learn_command(command) except KeyboardInterrupt: self.stop_learning() def stop_learning(self): """Stop IR code learning mode""" self.learning = False self.logger.info("IR code learning mode stopped") print("IR Code Learning Mode Stopped") def _learn_command(self, ir_code: str): """Learn a new IR command""" print(f"Received IR code: {ir_code}") # Ask user what this command should do print("What should this button do?") print("Options: power_toggle, channel_0-9, play_pause, stop, next_channel, prev_channel, volume_up, volume_down, or custom command") try: user_input = input("Enter command (or 'skip' to ignore): ").strip() if user_input.lower() == 'skip': print("Skipped") return if user_input: self.mapper.add_mapping(ir_code, user_input) print(f"Added mapping: {ir_code} -> {user_input}") else: print("No command entered, skipped") except EOFError: # Handle case where input is not available (e.g., running as service) print("Cannot get user input, skipping command") except Exception as e: self.logger.error(f"Error learning command: {e}") # Example usage and testing if __name__ == "__main__": import json # Setup logging logging.basicConfig(level=logging.INFO) # Create IR remote ir_remote = IRRemote(gpio_pin=18) # Create code mapper mapper = IRCodeMapper() # Create code learner learner = IRCodeLearner(ir_remote, mapper) try: # Start learning mode learner.start_learning() except KeyboardInterrupt: print("Exiting...") finally: ir_remote.cleanup()