Files
rpi-tulivision/quick_controller_setup.py
2025-09-27 22:46:20 +02:00

266 lines
9.2 KiB
Python

#!/usr/bin/env python3
"""
Quick IR Controller Setup
Simplified setup using existing IR listener infrastructure
"""
import os
import sys
import json
import time
import subprocess
import threading
import queue
from pathlib import Path
from typing import Optional
class QuickControllerSetup:
"""Quick controller setup using existing IR infrastructure"""
def __init__(self):
self.recorded_mappings = {}
self.command_queue = queue.Queue()
self.ir_process = None
# Controller commands in order
self.controller_commands = [
("power_toggle", "Power on/off button"),
("channel_1", "Channel 1 button"),
("channel_2", "Channel 2 button"),
("channel_3", "Channel 3 button"),
("channel_4", "Channel 4 button"),
("channel_5", "Channel 5 button"),
("channel_6", "Channel 6 button"),
("channel_7", "Channel 7 button"),
("channel_8", "Channel 8 button"),
("channel_9", "Channel 9 button"),
("channel_0", "Channel 0 button"),
("volume_up", "Volume up button"),
("volume_down", "Volume down button"),
("mute", "Mute button"),
("play_pause", "Play/pause button"),
("stop", "Stop button"),
("next_channel", "Next channel button"),
("prev_channel", "Previous channel button"),
("menu", "Menu button"),
("back", "Back button"),
("ok", "OK/Enter button"),
("up", "Up arrow button"),
("down", "Down arrow button"),
("left", "Left arrow button"),
("right", "Right arrow button")
]
def display_welcome(self):
"""Display welcome message"""
print("=" * 80)
print("QUICK IR CONTROLLER SETUP")
print("=" * 80)
print("This will help you map your IR remote buttons to functions.")
print("We'll use the existing IR listener to capture commands.")
print()
print("INSTRUCTIONS:")
print("1. When prompted, press the corresponding button on your remote")
print("2. The IR command will be captured and mapped")
print("3. Repeat for all buttons")
print("4. Mappings will be saved for other services")
print()
print("Press Ctrl+C to exit at any time")
print("=" * 80)
print()
def start_ir_listener(self):
"""Start the IR listener in background"""
try:
# Start the simple IR listener with custom protocol
cmd = [
"python3", "simple_ir_listener_polling.py",
"--gpio-pin", "18", "--verbose"
]
self.ir_process = subprocess.Popen(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
universal_newlines=True,
bufsize=1
)
# Start thread to monitor output
monitor_thread = threading.Thread(target=self._monitor_ir_output, daemon=True)
monitor_thread.start()
print("✅ IR listener started")
time.sleep(2) # Give it time to start
return True
except Exception as e:
print(f"❌ Failed to start IR listener: {e}")
return False
def _monitor_ir_output(self):
"""Monitor IR listener output for commands"""
try:
for line in iter(self.ir_process.stdout.readline, ''):
if "IR Command Received:" in line:
# Extract command from line
parts = line.split("IR Command Received:")
if len(parts) > 1:
command = parts[1].strip()
self.command_queue.put(command)
except Exception as e:
print(f"Error monitoring IR output: {e}")
def wait_for_ir_command(self, timeout: float = 30.0) -> Optional[str]:
"""Wait for an IR command with timeout"""
try:
return self.command_queue.get(timeout=timeout)
except queue.Empty:
return None
def record_command_mapping(self, command_name: str, description: str) -> bool:
"""Record a single command mapping"""
print(f"\n{'='*60}")
print(f"RECORDING: {command_name.upper()}")
print(f"Description: {description}")
print(f"{'='*60}")
print("Press the corresponding button on your remote now...")
print("(You have 30 seconds)")
print()
# Clear any existing commands in queue
while not self.command_queue.empty():
try:
self.command_queue.get_nowait()
except queue.Empty:
break
# Wait for IR command
ir_command = self.wait_for_ir_command(30.0)
if ir_command:
print(f"✅ RECORDED: {ir_command}")
self.recorded_mappings[ir_command] = {
"command": command_name,
"description": description,
"repeatable": self._is_repeatable_command(command_name)
}
return True
else:
print("❌ TIMEOUT: No IR command received")
print("You can skip this command or try again.")
while True:
choice = input("(r)etry, (s)kip, or (q)uit? ").lower().strip()
if choice == 'r':
return self.record_command_mapping(command_name, description)
elif choice == 's':
print(f"Skipped: {command_name}")
return False
elif choice == 'q':
return None
else:
print("Please enter 'r', 's', or 'q'")
def _is_repeatable_command(self, command_name: str) -> bool:
"""Determine if a command should be repeatable"""
repeatable_commands = [
"volume_up", "volume_down", "channel_up", "channel_down",
"up", "down", "left", "right"
]
return command_name in repeatable_commands
def save_mappings(self):
"""Save recorded mappings to file"""
mapping_file = "ir_mapping.json"
# Load existing mappings if they exist
existing_mappings = {}
if os.path.exists(mapping_file):
try:
with open(mapping_file, 'r') as f:
existing_mappings = json.load(f)
except Exception as e:
print(f"Warning: Could not load existing mappings: {e}")
# Merge with recorded mappings
existing_mappings.update(self.recorded_mappings)
# Save updated mappings
try:
with open(mapping_file, 'w') as f:
json.dump(existing_mappings, f, indent=2)
print(f"\n✅ Mappings saved to: {mapping_file}")
except Exception as e:
print(f"❌ Error saving mappings: {e}")
def display_summary(self):
"""Display setup summary"""
print("\n" + "=" * 80)
print("CONTROLLER SETUP COMPLETE")
print("=" * 80)
print(f"Recorded {len(self.recorded_mappings)} command mappings:")
print()
for ir_command, mapping in self.recorded_mappings.items():
print(f" {ir_command:25} -> {mapping['command']:15} ({mapping['description']})")
print()
print("The mappings have been saved and are ready for use by other services.")
print("=" * 80)
def run_setup(self):
"""Run the complete setup process"""
try:
self.display_welcome()
# Start IR listener
if not self.start_ir_listener():
return False
print("Starting controller setup...")
print()
# Record each command
for i, (command_name, description) in enumerate(self.controller_commands):
result = self.record_command_mapping(command_name, description)
if result is None: # User chose to quit
break
progress = (i + 1) / len(self.controller_commands) * 100
print(f"Progress: {progress:.1f}% ({i + 1}/{len(self.controller_commands)})")
# Save mappings
if self.recorded_mappings:
self.save_mappings()
self.display_summary()
else:
print("No mappings recorded.")
return True
except KeyboardInterrupt:
print("\nSetup interrupted by user.")
return False
except Exception as e:
print(f"Error in setup: {e}")
return False
finally:
self.cleanup()
def cleanup(self):
"""Cleanup resources"""
if self.ir_process:
self.ir_process.terminate()
self.ir_process.wait()
print("Cleanup complete.")
def main():
"""Main function"""
setup = QuickControllerSetup()
success = setup.run_setup()
sys.exit(0 if success else 1)
if __name__ == "__main__":
main()