Files
rpi-tulivision/ir_controller_setup.py
2025-09-27 22:46:20 +02:00

356 lines
12 KiB
Python

#!/usr/bin/env python3
"""
IR Controller Setup App
Interactive app to record and map IR commands for controller setup
"""
import os
import sys
import json
import time
import logging
import threading
import queue
from pathlib import Path
from typing import Dict, List, Optional, Tuple
import RPi.GPIO as GPIO
# Import the custom protocol decoder
from custom_ir_protocol_final import CustomIRProtocol
class IRControllerSetup:
"""Interactive IR controller setup application"""
def __init__(self, gpio_pin: int = 18):
self.gpio_pin = gpio_pin
self.logger = self._setup_logging()
self.running = False
self.last_state = GPIO.HIGH
self.pulse_start = 0
self.pulses = []
self.command_queue = queue.Queue()
# Setup the custom protocol decoder
self.protocol = CustomIRProtocol("SETUP_CUSTOM")
# Controller mapping configuration
self.controller_commands = [
"power_toggle",
"channel_1", "channel_2", "channel_3", "channel_4", "channel_5",
"channel_6", "channel_7", "channel_8", "channel_9", "channel_0",
"volume_up", "volume_down", "mute",
"play_pause", "stop", "next_channel", "prev_channel",
"menu", "back", "ok", "up", "down", "left", "right"
]
self.recorded_mappings = {}
self.current_command_index = 0
def _setup_logging(self) -> logging.Logger:
"""Setup logging for the setup app"""
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
# Create console handler
handler = logging.StreamHandler()
formatter = logging.Formatter(
'%(asctime)s - %(levelname)s - %(message)s'
)
handler.setFormatter(formatter)
logger.addHandler(handler)
return logger
def display_welcome(self):
"""Display welcome message and instructions"""
print("=" * 80)
print("IR CONTROLLER SETUP")
print("=" * 80)
print("This app will help you set up your IR remote controller.")
print("You will be prompted to press buttons in a specific order.")
print("Each button press will be recorded and mapped to a function.")
print()
print("INSTRUCTIONS:")
print("1. Point your IR remote at the receiver")
print("2. When prompted, press the corresponding button on your remote")
print("3. The app will record the IR signal and map it to the function")
print("4. Repeat for all buttons")
print("5. The mappings will be saved for use by other services")
print()
print("Press Ctrl+C at any time to exit")
print("=" * 80)
print()
def setup_gpio(self):
"""Setup GPIO for IR receiver"""
try:
GPIO.setmode(GPIO.BCM)
GPIO.setup(self.gpio_pin, GPIO.IN, pull_up_down=GPIO.PUD_UP)
self.logger.info(f"GPIO setup complete on pin {self.gpio_pin}")
return True
except Exception as e:
self.logger.error(f"GPIO setup failed: {e}")
return False
def poll_ir_signal(self):
"""Poll for IR signal changes"""
while self.running:
try:
current_state = GPIO.input(self.gpio_pin)
current_time = time.time()
# Detect state change
if current_state != self.last_state:
if self.pulse_start > 0:
# Calculate pulse/space duration
duration = (current_time - self.pulse_start) * 1000000 # Convert to microseconds
self.pulses.append(duration)
self.pulse_start = current_time
self.last_state = current_state
# Check for end of signal (no change for 100ms)
if self.pulse_start > 0 and (current_time - self.pulse_start) > 0.1:
if len(self.pulses) > 0:
self.process_signal(self.pulses.copy())
self.pulses = []
self.pulse_start = 0
time.sleep(0.0001) # 0.1ms polling interval
except Exception as e:
self.logger.error(f"Error in polling loop: {e}")
time.sleep(0.01)
def process_signal(self, pulses: List[float]):
"""Process captured signal and try to decode"""
if len(pulses) < 2:
return
# Convert to the format expected by the decoder
formatted_pulses = []
for j, duration_us in enumerate(pulses):
is_pulse = (j % 2 == 0) # Alternating pulse/space
duration_seconds = duration_us / 1000000.0
formatted_pulses.append((is_pulse, duration_seconds))
# Try to decode
command = self.protocol.decode(formatted_pulses)
if command:
self.command_queue.put(command)
def wait_for_ir_command(self, timeout: float = 30.0) -> Optional[str]:
"""Wait for an IR command with timeout"""
try:
return self.command_queue.get(timeout=timeout)
except queue.Empty:
return None
def record_command_mapping(self, command_name: str, description: str) -> bool:
"""Record a single command mapping"""
print(f"\n{'='*60}")
print(f"RECORDING: {command_name.upper()}")
print(f"Description: {description}")
print(f"{'='*60}")
print("Press the corresponding button on your remote now...")
print("(You have 30 seconds)")
print()
# Wait for IR command
ir_command = self.wait_for_ir_command(30.0)
if ir_command:
print(f"✅ RECORDED: {ir_command}")
self.recorded_mappings[ir_command] = {
"command": command_name,
"description": description,
"repeatable": self._is_repeatable_command(command_name)
}
return True
else:
print("❌ TIMEOUT: No IR command received")
print("You can skip this command or try again.")
while True:
choice = input("(r)etry, (s)kip, or (q)uit? ").lower().strip()
if choice == 'r':
return self.record_command_mapping(command_name, description)
elif choice == 's':
print(f"Skipped: {command_name}")
return False
elif choice == 'q':
return None
else:
print("Please enter 'r', 's', or 'q'")
def _is_repeatable_command(self, command_name: str) -> bool:
"""Determine if a command should be repeatable"""
repeatable_commands = [
"volume_up", "volume_down", "channel_up", "channel_down",
"up", "down", "left", "right"
]
return command_name in repeatable_commands
def run_setup(self):
"""Run the complete controller setup process"""
try:
# Display welcome
self.display_welcome()
# Setup GPIO
if not self.setup_gpio():
print("Failed to setup GPIO. Exiting.")
return False
# Start IR polling
self.running = True
polling_thread = threading.Thread(target=self.poll_ir_signal, daemon=True)
polling_thread.start()
print("IR receiver is ready!")
print("Starting controller setup...")
print()
# Record each command
for i, command_name in enumerate(self.controller_commands):
description = self._get_command_description(command_name)
result = self.record_command_mapping(command_name, description)
if result is None: # User chose to quit
break
self.current_command_index = i + 1
progress = (i + 1) / len(self.controller_commands) * 100
print(f"Progress: {progress:.1f}% ({i + 1}/{len(self.controller_commands)})")
# Save mappings
if self.recorded_mappings:
self.save_mappings()
self.display_summary()
else:
print("No mappings recorded.")
return True
except KeyboardInterrupt:
print("\nSetup interrupted by user.")
return False
except Exception as e:
self.logger.error(f"Error in setup: {e}")
return False
finally:
self.cleanup()
def _get_command_description(self, command_name: str) -> str:
"""Get description for a command"""
descriptions = {
"power_toggle": "Power on/off button",
"channel_1": "Channel 1 button",
"channel_2": "Channel 2 button",
"channel_3": "Channel 3 button",
"channel_4": "Channel 4 button",
"channel_5": "Channel 5 button",
"channel_6": "Channel 6 button",
"channel_7": "Channel 7 button",
"channel_8": "Channel 8 button",
"channel_9": "Channel 9 button",
"channel_0": "Channel 0 button",
"volume_up": "Volume up button",
"volume_down": "Volume down button",
"mute": "Mute button",
"play_pause": "Play/pause button",
"stop": "Stop button",
"next_channel": "Next channel button",
"prev_channel": "Previous channel button",
"menu": "Menu button",
"back": "Back button",
"ok": "OK/Enter button",
"up": "Up arrow button",
"down": "Down arrow button",
"left": "Left arrow button",
"right": "Right arrow button"
}
return descriptions.get(command_name, f"{command_name} button")
def save_mappings(self):
"""Save recorded mappings to file"""
# Save in the format expected by other services
mapping_file = "ir_mapping.json"
# Load existing mappings if they exist
existing_mappings = {}
if os.path.exists(mapping_file):
try:
with open(mapping_file, 'r') as f:
existing_mappings = json.load(f)
except Exception as e:
self.logger.warning(f"Could not load existing mappings: {e}")
# Merge with recorded mappings
existing_mappings.update(self.recorded_mappings)
# Save updated mappings
try:
with open(mapping_file, 'w') as f:
json.dump(existing_mappings, f, indent=2)
print(f"\n✅ Mappings saved to: {mapping_file}")
except Exception as e:
self.logger.error(f"Error saving mappings: {e}")
print(f"❌ Error saving mappings: {e}")
def display_summary(self):
"""Display setup summary"""
print("\n" + "=" * 80)
print("CONTROLLER SETUP COMPLETE")
print("=" * 80)
print(f"Recorded {len(self.recorded_mappings)} command mappings:")
print()
for ir_command, mapping in self.recorded_mappings.items():
print(f" {ir_command:20} -> {mapping['command']:15} ({mapping['description']})")
print()
print("The mappings have been saved and are ready for use by other services.")
print("=" * 80)
def cleanup(self):
"""Cleanup resources"""
self.running = False
try:
GPIO.cleanup()
except:
pass
self.logger.info("Controller setup cleanup complete")
def main():
"""Main function"""
import argparse
parser = argparse.ArgumentParser(description="IR Controller Setup App")
parser.add_argument(
"--gpio-pin",
type=int,
default=18,
help="GPIO pin for IR receiver (default: 18)"
)
parser.add_argument(
"--verbose", "-v",
action="store_true",
help="Enable verbose logging"
)
args = parser.parse_args()
# Set logging level
if args.verbose:
logging.getLogger().setLevel(logging.DEBUG)
# Create and run setup
setup = IRControllerSetup(args.gpio_pin)
success = setup.run_setup()
sys.exit(0 if success else 1)
if __name__ == "__main__":
main()