#!/usr/bin/env python3 """ Quick IR Controller Setup Simplified setup using existing IR listener infrastructure """ import os import sys import json import time import subprocess import threading import queue from pathlib import Path from typing import Optional class QuickControllerSetup: """Quick controller setup using existing IR infrastructure""" def __init__(self): self.recorded_mappings = {} self.command_queue = queue.Queue() self.ir_process = None # Controller commands in order self.controller_commands = [ ("power_toggle", "Power on/off button"), ("channel_1", "Channel 1 button"), ("channel_2", "Channel 2 button"), ("channel_3", "Channel 3 button"), ("channel_4", "Channel 4 button"), ("channel_5", "Channel 5 button"), ("channel_6", "Channel 6 button"), ("channel_7", "Channel 7 button"), ("channel_8", "Channel 8 button"), ("channel_9", "Channel 9 button"), ("channel_0", "Channel 0 button"), ("volume_up", "Volume up button"), ("volume_down", "Volume down button"), ("mute", "Mute button"), ("play_pause", "Play/pause button"), ("stop", "Stop button"), ("next_channel", "Next channel button"), ("prev_channel", "Previous channel button"), ("menu", "Menu button"), ("back", "Back button"), ("ok", "OK/Enter button"), ("up", "Up arrow button"), ("down", "Down arrow button"), ("left", "Left arrow button"), ("right", "Right arrow button") ] def display_welcome(self): """Display welcome message""" print("=" * 80) print("QUICK IR CONTROLLER SETUP") print("=" * 80) print("This will help you map your IR remote buttons to functions.") print("We'll use the existing IR listener to capture commands.") print() print("INSTRUCTIONS:") print("1. When prompted, press the corresponding button on your remote") print("2. The IR command will be captured and mapped") print("3. Repeat for all buttons") print("4. Mappings will be saved for other services") print() print("Press Ctrl+C to exit at any time") print("=" * 80) print() def start_ir_listener(self): """Start the IR listener in background""" try: # Start the simple IR listener with custom protocol cmd = [ "python3", "simple_ir_listener_polling.py", "--gpio-pin", "18", "--verbose" ] self.ir_process = subprocess.Popen( cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, universal_newlines=True, bufsize=1 ) # Start thread to monitor output monitor_thread = threading.Thread(target=self._monitor_ir_output, daemon=True) monitor_thread.start() print("✅ IR listener started") time.sleep(2) # Give it time to start return True except Exception as e: print(f"❌ Failed to start IR listener: {e}") return False def _monitor_ir_output(self): """Monitor IR listener output for commands""" try: for line in iter(self.ir_process.stdout.readline, ''): if "IR Command Received:" in line: # Extract command from line parts = line.split("IR Command Received:") if len(parts) > 1: command = parts[1].strip() self.command_queue.put(command) except Exception as e: print(f"Error monitoring IR output: {e}") def wait_for_ir_command(self, timeout: float = 30.0) -> Optional[str]: """Wait for an IR command with timeout""" try: return self.command_queue.get(timeout=timeout) except queue.Empty: return None def record_command_mapping(self, command_name: str, description: str) -> bool: """Record a single command mapping""" print(f"\n{'='*60}") print(f"RECORDING: {command_name.upper()}") print(f"Description: {description}") print(f"{'='*60}") print("Press the corresponding button on your remote now...") print("(You have 30 seconds)") print() # Clear any existing commands in queue while not self.command_queue.empty(): try: self.command_queue.get_nowait() except queue.Empty: break # Wait for IR command ir_command = self.wait_for_ir_command(30.0) if ir_command: print(f"✅ RECORDED: {ir_command}") self.recorded_mappings[ir_command] = { "command": command_name, "description": description, "repeatable": self._is_repeatable_command(command_name) } return True else: print("❌ TIMEOUT: No IR command received") print("You can skip this command or try again.") while True: choice = input("(r)etry, (s)kip, or (q)uit? ").lower().strip() if choice == 'r': return self.record_command_mapping(command_name, description) elif choice == 's': print(f"Skipped: {command_name}") return False elif choice == 'q': return None else: print("Please enter 'r', 's', or 'q'") def _is_repeatable_command(self, command_name: str) -> bool: """Determine if a command should be repeatable""" repeatable_commands = [ "volume_up", "volume_down", "channel_up", "channel_down", "up", "down", "left", "right" ] return command_name in repeatable_commands def save_mappings(self): """Save recorded mappings to file""" mapping_file = "ir_mapping.json" # Load existing mappings if they exist existing_mappings = {} if os.path.exists(mapping_file): try: with open(mapping_file, 'r') as f: existing_mappings = json.load(f) except Exception as e: print(f"Warning: Could not load existing mappings: {e}") # Merge with recorded mappings existing_mappings.update(self.recorded_mappings) # Save updated mappings try: with open(mapping_file, 'w') as f: json.dump(existing_mappings, f, indent=2) print(f"\n✅ Mappings saved to: {mapping_file}") except Exception as e: print(f"❌ Error saving mappings: {e}") def display_summary(self): """Display setup summary""" print("\n" + "=" * 80) print("CONTROLLER SETUP COMPLETE") print("=" * 80) print(f"Recorded {len(self.recorded_mappings)} command mappings:") print() for ir_command, mapping in self.recorded_mappings.items(): print(f" {ir_command:25} -> {mapping['command']:15} ({mapping['description']})") print() print("The mappings have been saved and are ready for use by other services.") print("=" * 80) def run_setup(self): """Run the complete setup process""" try: self.display_welcome() # Start IR listener if not self.start_ir_listener(): return False print("Starting controller setup...") print() # Record each command for i, (command_name, description) in enumerate(self.controller_commands): result = self.record_command_mapping(command_name, description) if result is None: # User chose to quit break progress = (i + 1) / len(self.controller_commands) * 100 print(f"Progress: {progress:.1f}% ({i + 1}/{len(self.controller_commands)})") # Save mappings if self.recorded_mappings: self.save_mappings() self.display_summary() else: print("No mappings recorded.") return True except KeyboardInterrupt: print("\nSetup interrupted by user.") return False except Exception as e: print(f"Error in setup: {e}") return False finally: self.cleanup() def cleanup(self): """Cleanup resources""" if self.ir_process: self.ir_process.terminate() self.ir_process.wait() print("Cleanup complete.") def main(): """Main function""" setup = QuickControllerSetup() success = setup.run_setup() sys.exit(0 if success else 1) if __name__ == "__main__": main()