#!/usr/bin/env python3 """ IR Controller Setup App Interactive app to record and map IR commands for controller setup """ import os import sys import json import time import logging import threading import queue from pathlib import Path from typing import Dict, List, Optional, Tuple import RPi.GPIO as GPIO # Import the custom protocol decoder from custom_ir_protocol_final import CustomIRProtocol class IRControllerSetup: """Interactive IR controller setup application""" def __init__(self, gpio_pin: int = 18): self.gpio_pin = gpio_pin self.logger = self._setup_logging() self.running = False self.last_state = GPIO.HIGH self.pulse_start = 0 self.pulses = [] self.command_queue = queue.Queue() # Setup the custom protocol decoder self.protocol = CustomIRProtocol("SETUP_CUSTOM") # Controller mapping configuration self.controller_commands = [ "power_toggle", "channel_1", "channel_2", "channel_3", "channel_4", "channel_5", "channel_6", "channel_7", "channel_8", "channel_9", "channel_0", "volume_up", "volume_down", "mute", "play_pause", "stop", "next_channel", "prev_channel", "menu", "back", "ok", "up", "down", "left", "right" ] self.recorded_mappings = {} self.current_command_index = 0 def _setup_logging(self) -> logging.Logger: """Setup logging for the setup app""" logger = logging.getLogger(__name__) logger.setLevel(logging.INFO) # Create console handler handler = logging.StreamHandler() formatter = logging.Formatter( '%(asctime)s - %(levelname)s - %(message)s' ) handler.setFormatter(formatter) logger.addHandler(handler) return logger def display_welcome(self): """Display welcome message and instructions""" print("=" * 80) print("IR CONTROLLER SETUP") print("=" * 80) print("This app will help you set up your IR remote controller.") print("You will be prompted to press buttons in a specific order.") print("Each button press will be recorded and mapped to a function.") print() print("INSTRUCTIONS:") print("1. Point your IR remote at the receiver") print("2. When prompted, press the corresponding button on your remote") print("3. The app will record the IR signal and map it to the function") print("4. Repeat for all buttons") print("5. The mappings will be saved for use by other services") print() print("Press Ctrl+C at any time to exit") print("=" * 80) print() def setup_gpio(self): """Setup GPIO for IR receiver""" try: GPIO.setmode(GPIO.BCM) GPIO.setup(self.gpio_pin, GPIO.IN, pull_up_down=GPIO.PUD_UP) self.logger.info(f"GPIO setup complete on pin {self.gpio_pin}") return True except Exception as e: self.logger.error(f"GPIO setup failed: {e}") return False def poll_ir_signal(self): """Poll for IR signal changes""" while self.running: try: current_state = GPIO.input(self.gpio_pin) current_time = time.time() # Detect state change if current_state != self.last_state: if self.pulse_start > 0: # Calculate pulse/space duration duration = (current_time - self.pulse_start) * 1000000 # Convert to microseconds self.pulses.append(duration) self.pulse_start = current_time self.last_state = current_state # Check for end of signal (no change for 100ms) if self.pulse_start > 0 and (current_time - self.pulse_start) > 0.1: if len(self.pulses) > 0: self.process_signal(self.pulses.copy()) self.pulses = [] self.pulse_start = 0 time.sleep(0.0001) # 0.1ms polling interval except Exception as e: self.logger.error(f"Error in polling loop: {e}") time.sleep(0.01) def process_signal(self, pulses: List[float]): """Process captured signal and try to decode""" if len(pulses) < 2: return # Convert to the format expected by the decoder formatted_pulses = [] for j, duration_us in enumerate(pulses): is_pulse = (j % 2 == 0) # Alternating pulse/space duration_seconds = duration_us / 1000000.0 formatted_pulses.append((is_pulse, duration_seconds)) # Try to decode command = self.protocol.decode(formatted_pulses) if command: self.command_queue.put(command) def wait_for_ir_command(self, timeout: float = 30.0) -> Optional[str]: """Wait for an IR command with timeout""" try: return self.command_queue.get(timeout=timeout) except queue.Empty: return None def record_command_mapping(self, command_name: str, description: str) -> bool: """Record a single command mapping""" print(f"\n{'='*60}") print(f"RECORDING: {command_name.upper()}") print(f"Description: {description}") print(f"{'='*60}") print("Press the corresponding button on your remote now...") print("(You have 30 seconds)") print() # Wait for IR command ir_command = self.wait_for_ir_command(30.0) if ir_command: print(f"āœ… RECORDED: {ir_command}") self.recorded_mappings[ir_command] = { "command": command_name, "description": description, "repeatable": self._is_repeatable_command(command_name) } return True else: print("āŒ TIMEOUT: No IR command received") print("You can skip this command or try again.") while True: choice = input("(r)etry, (s)kip, or (q)uit? ").lower().strip() if choice == 'r': return self.record_command_mapping(command_name, description) elif choice == 's': print(f"Skipped: {command_name}") return False elif choice == 'q': return None else: print("Please enter 'r', 's', or 'q'") def _is_repeatable_command(self, command_name: str) -> bool: """Determine if a command should be repeatable""" repeatable_commands = [ "volume_up", "volume_down", "channel_up", "channel_down", "up", "down", "left", "right" ] return command_name in repeatable_commands def run_setup(self): """Run the complete controller setup process""" try: # Display welcome self.display_welcome() # Setup GPIO if not self.setup_gpio(): print("Failed to setup GPIO. Exiting.") return False # Start IR polling self.running = True polling_thread = threading.Thread(target=self.poll_ir_signal, daemon=True) polling_thread.start() print("IR receiver is ready!") print("Starting controller setup...") print() # Record each command for i, command_name in enumerate(self.controller_commands): description = self._get_command_description(command_name) result = self.record_command_mapping(command_name, description) if result is None: # User chose to quit break self.current_command_index = i + 1 progress = (i + 1) / len(self.controller_commands) * 100 print(f"Progress: {progress:.1f}% ({i + 1}/{len(self.controller_commands)})") # Save mappings if self.recorded_mappings: self.save_mappings() self.display_summary() else: print("No mappings recorded.") return True except KeyboardInterrupt: print("\nSetup interrupted by user.") return False except Exception as e: self.logger.error(f"Error in setup: {e}") return False finally: self.cleanup() def _get_command_description(self, command_name: str) -> str: """Get description for a command""" descriptions = { "power_toggle": "Power on/off button", "channel_1": "Channel 1 button", "channel_2": "Channel 2 button", "channel_3": "Channel 3 button", "channel_4": "Channel 4 button", "channel_5": "Channel 5 button", "channel_6": "Channel 6 button", "channel_7": "Channel 7 button", "channel_8": "Channel 8 button", "channel_9": "Channel 9 button", "channel_0": "Channel 0 button", "volume_up": "Volume up button", "volume_down": "Volume down button", "mute": "Mute button", "play_pause": "Play/pause button", "stop": "Stop button", "next_channel": "Next channel button", "prev_channel": "Previous channel button", "menu": "Menu button", "back": "Back button", "ok": "OK/Enter button", "up": "Up arrow button", "down": "Down arrow button", "left": "Left arrow button", "right": "Right arrow button" } return descriptions.get(command_name, f"{command_name} button") def save_mappings(self): """Save recorded mappings to file""" # Save in the format expected by other services mapping_file = "ir_mapping.json" # Load existing mappings if they exist existing_mappings = {} if os.path.exists(mapping_file): try: with open(mapping_file, 'r') as f: existing_mappings = json.load(f) except Exception as e: self.logger.warning(f"Could not load existing mappings: {e}") # Merge with recorded mappings existing_mappings.update(self.recorded_mappings) # Save updated mappings try: with open(mapping_file, 'w') as f: json.dump(existing_mappings, f, indent=2) print(f"\nāœ… Mappings saved to: {mapping_file}") except Exception as e: self.logger.error(f"Error saving mappings: {e}") print(f"āŒ Error saving mappings: {e}") def display_summary(self): """Display setup summary""" print("\n" + "=" * 80) print("CONTROLLER SETUP COMPLETE") print("=" * 80) print(f"Recorded {len(self.recorded_mappings)} command mappings:") print() for ir_command, mapping in self.recorded_mappings.items(): print(f" {ir_command:20} -> {mapping['command']:15} ({mapping['description']})") print() print("The mappings have been saved and are ready for use by other services.") print("=" * 80) def cleanup(self): """Cleanup resources""" self.running = False try: GPIO.cleanup() except: pass self.logger.info("Controller setup cleanup complete") def main(): """Main function""" import argparse parser = argparse.ArgumentParser(description="IR Controller Setup App") parser.add_argument( "--gpio-pin", type=int, default=18, help="GPIO pin for IR receiver (default: 18)" ) parser.add_argument( "--verbose", "-v", action="store_true", help="Enable verbose logging" ) args = parser.parse_args() # Set logging level if args.verbose: logging.getLogger().setLevel(logging.DEBUG) # Create and run setup setup = IRControllerSetup(args.gpio_pin) success = setup.run_setup() sys.exit(0 if success else 1) if __name__ == "__main__": main()