356 lines
12 KiB
Python
356 lines
12 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
IR Controller Setup App
|
|
Interactive app to record and map IR commands for controller setup
|
|
"""
|
|
|
|
import os
|
|
import sys
|
|
import json
|
|
import time
|
|
import logging
|
|
import threading
|
|
import queue
|
|
from pathlib import Path
|
|
from typing import Dict, List, Optional, Tuple
|
|
import RPi.GPIO as GPIO
|
|
|
|
# Import the custom protocol decoder
|
|
from custom_ir_protocol_final import CustomIRProtocol
|
|
|
|
class IRControllerSetup:
|
|
"""Interactive IR controller setup application"""
|
|
|
|
def __init__(self, gpio_pin: int = 18):
|
|
self.gpio_pin = gpio_pin
|
|
self.logger = self._setup_logging()
|
|
self.running = False
|
|
self.last_state = GPIO.HIGH
|
|
self.pulse_start = 0
|
|
self.pulses = []
|
|
self.command_queue = queue.Queue()
|
|
|
|
# Setup the custom protocol decoder
|
|
self.protocol = CustomIRProtocol("SETUP_CUSTOM")
|
|
|
|
# Controller mapping configuration
|
|
self.controller_commands = [
|
|
"power_toggle",
|
|
"channel_1", "channel_2", "channel_3", "channel_4", "channel_5",
|
|
"channel_6", "channel_7", "channel_8", "channel_9", "channel_0",
|
|
"volume_up", "volume_down", "mute",
|
|
"play_pause", "stop", "next_channel", "prev_channel",
|
|
"menu", "back", "ok", "up", "down", "left", "right"
|
|
]
|
|
|
|
self.recorded_mappings = {}
|
|
self.current_command_index = 0
|
|
|
|
def _setup_logging(self) -> logging.Logger:
|
|
"""Setup logging for the setup app"""
|
|
logger = logging.getLogger(__name__)
|
|
logger.setLevel(logging.INFO)
|
|
|
|
# Create console handler
|
|
handler = logging.StreamHandler()
|
|
formatter = logging.Formatter(
|
|
'%(asctime)s - %(levelname)s - %(message)s'
|
|
)
|
|
handler.setFormatter(formatter)
|
|
logger.addHandler(handler)
|
|
|
|
return logger
|
|
|
|
def display_welcome(self):
|
|
"""Display welcome message and instructions"""
|
|
print("=" * 80)
|
|
print("IR CONTROLLER SETUP")
|
|
print("=" * 80)
|
|
print("This app will help you set up your IR remote controller.")
|
|
print("You will be prompted to press buttons in a specific order.")
|
|
print("Each button press will be recorded and mapped to a function.")
|
|
print()
|
|
print("INSTRUCTIONS:")
|
|
print("1. Point your IR remote at the receiver")
|
|
print("2. When prompted, press the corresponding button on your remote")
|
|
print("3. The app will record the IR signal and map it to the function")
|
|
print("4. Repeat for all buttons")
|
|
print("5. The mappings will be saved for use by other services")
|
|
print()
|
|
print("Press Ctrl+C at any time to exit")
|
|
print("=" * 80)
|
|
print()
|
|
|
|
def setup_gpio(self):
|
|
"""Setup GPIO for IR receiver"""
|
|
try:
|
|
GPIO.setmode(GPIO.BCM)
|
|
GPIO.setup(self.gpio_pin, GPIO.IN, pull_up_down=GPIO.PUD_UP)
|
|
self.logger.info(f"GPIO setup complete on pin {self.gpio_pin}")
|
|
return True
|
|
except Exception as e:
|
|
self.logger.error(f"GPIO setup failed: {e}")
|
|
return False
|
|
|
|
def poll_ir_signal(self):
|
|
"""Poll for IR signal changes"""
|
|
while self.running:
|
|
try:
|
|
current_state = GPIO.input(self.gpio_pin)
|
|
current_time = time.time()
|
|
|
|
# Detect state change
|
|
if current_state != self.last_state:
|
|
if self.pulse_start > 0:
|
|
# Calculate pulse/space duration
|
|
duration = (current_time - self.pulse_start) * 1000000 # Convert to microseconds
|
|
self.pulses.append(duration)
|
|
|
|
self.pulse_start = current_time
|
|
self.last_state = current_state
|
|
|
|
# Check for end of signal (no change for 100ms)
|
|
if self.pulse_start > 0 and (current_time - self.pulse_start) > 0.1:
|
|
if len(self.pulses) > 0:
|
|
self.process_signal(self.pulses.copy())
|
|
self.pulses = []
|
|
self.pulse_start = 0
|
|
|
|
time.sleep(0.0001) # 0.1ms polling interval
|
|
|
|
except Exception as e:
|
|
self.logger.error(f"Error in polling loop: {e}")
|
|
time.sleep(0.01)
|
|
|
|
def process_signal(self, pulses: List[float]):
|
|
"""Process captured signal and try to decode"""
|
|
if len(pulses) < 2:
|
|
return
|
|
|
|
# Convert to the format expected by the decoder
|
|
formatted_pulses = []
|
|
for j, duration_us in enumerate(pulses):
|
|
is_pulse = (j % 2 == 0) # Alternating pulse/space
|
|
duration_seconds = duration_us / 1000000.0
|
|
formatted_pulses.append((is_pulse, duration_seconds))
|
|
|
|
# Try to decode
|
|
command = self.protocol.decode(formatted_pulses)
|
|
if command:
|
|
self.command_queue.put(command)
|
|
|
|
def wait_for_ir_command(self, timeout: float = 30.0) -> Optional[str]:
|
|
"""Wait for an IR command with timeout"""
|
|
try:
|
|
return self.command_queue.get(timeout=timeout)
|
|
except queue.Empty:
|
|
return None
|
|
|
|
def record_command_mapping(self, command_name: str, description: str) -> bool:
|
|
"""Record a single command mapping"""
|
|
print(f"\n{'='*60}")
|
|
print(f"RECORDING: {command_name.upper()}")
|
|
print(f"Description: {description}")
|
|
print(f"{'='*60}")
|
|
print("Press the corresponding button on your remote now...")
|
|
print("(You have 30 seconds)")
|
|
print()
|
|
|
|
# Wait for IR command
|
|
ir_command = self.wait_for_ir_command(30.0)
|
|
|
|
if ir_command:
|
|
print(f"✅ RECORDED: {ir_command}")
|
|
self.recorded_mappings[ir_command] = {
|
|
"command": command_name,
|
|
"description": description,
|
|
"repeatable": self._is_repeatable_command(command_name)
|
|
}
|
|
return True
|
|
else:
|
|
print("❌ TIMEOUT: No IR command received")
|
|
print("You can skip this command or try again.")
|
|
|
|
while True:
|
|
choice = input("(r)etry, (s)kip, or (q)uit? ").lower().strip()
|
|
if choice == 'r':
|
|
return self.record_command_mapping(command_name, description)
|
|
elif choice == 's':
|
|
print(f"Skipped: {command_name}")
|
|
return False
|
|
elif choice == 'q':
|
|
return None
|
|
else:
|
|
print("Please enter 'r', 's', or 'q'")
|
|
|
|
def _is_repeatable_command(self, command_name: str) -> bool:
|
|
"""Determine if a command should be repeatable"""
|
|
repeatable_commands = [
|
|
"volume_up", "volume_down", "channel_up", "channel_down",
|
|
"up", "down", "left", "right"
|
|
]
|
|
return command_name in repeatable_commands
|
|
|
|
def run_setup(self):
|
|
"""Run the complete controller setup process"""
|
|
try:
|
|
# Display welcome
|
|
self.display_welcome()
|
|
|
|
# Setup GPIO
|
|
if not self.setup_gpio():
|
|
print("Failed to setup GPIO. Exiting.")
|
|
return False
|
|
|
|
# Start IR polling
|
|
self.running = True
|
|
polling_thread = threading.Thread(target=self.poll_ir_signal, daemon=True)
|
|
polling_thread.start()
|
|
|
|
print("IR receiver is ready!")
|
|
print("Starting controller setup...")
|
|
print()
|
|
|
|
# Record each command
|
|
for i, command_name in enumerate(self.controller_commands):
|
|
description = self._get_command_description(command_name)
|
|
|
|
result = self.record_command_mapping(command_name, description)
|
|
if result is None: # User chose to quit
|
|
break
|
|
|
|
self.current_command_index = i + 1
|
|
progress = (i + 1) / len(self.controller_commands) * 100
|
|
print(f"Progress: {progress:.1f}% ({i + 1}/{len(self.controller_commands)})")
|
|
|
|
# Save mappings
|
|
if self.recorded_mappings:
|
|
self.save_mappings()
|
|
self.display_summary()
|
|
else:
|
|
print("No mappings recorded.")
|
|
|
|
return True
|
|
|
|
except KeyboardInterrupt:
|
|
print("\nSetup interrupted by user.")
|
|
return False
|
|
except Exception as e:
|
|
self.logger.error(f"Error in setup: {e}")
|
|
return False
|
|
finally:
|
|
self.cleanup()
|
|
|
|
def _get_command_description(self, command_name: str) -> str:
|
|
"""Get description for a command"""
|
|
descriptions = {
|
|
"power_toggle": "Power on/off button",
|
|
"channel_1": "Channel 1 button",
|
|
"channel_2": "Channel 2 button",
|
|
"channel_3": "Channel 3 button",
|
|
"channel_4": "Channel 4 button",
|
|
"channel_5": "Channel 5 button",
|
|
"channel_6": "Channel 6 button",
|
|
"channel_7": "Channel 7 button",
|
|
"channel_8": "Channel 8 button",
|
|
"channel_9": "Channel 9 button",
|
|
"channel_0": "Channel 0 button",
|
|
"volume_up": "Volume up button",
|
|
"volume_down": "Volume down button",
|
|
"mute": "Mute button",
|
|
"play_pause": "Play/pause button",
|
|
"stop": "Stop button",
|
|
"next_channel": "Next channel button",
|
|
"prev_channel": "Previous channel button",
|
|
"menu": "Menu button",
|
|
"back": "Back button",
|
|
"ok": "OK/Enter button",
|
|
"up": "Up arrow button",
|
|
"down": "Down arrow button",
|
|
"left": "Left arrow button",
|
|
"right": "Right arrow button"
|
|
}
|
|
return descriptions.get(command_name, f"{command_name} button")
|
|
|
|
def save_mappings(self):
|
|
"""Save recorded mappings to file"""
|
|
# Save in the format expected by other services
|
|
mapping_file = "ir_mapping.json"
|
|
|
|
# Load existing mappings if they exist
|
|
existing_mappings = {}
|
|
if os.path.exists(mapping_file):
|
|
try:
|
|
with open(mapping_file, 'r') as f:
|
|
existing_mappings = json.load(f)
|
|
except Exception as e:
|
|
self.logger.warning(f"Could not load existing mappings: {e}")
|
|
|
|
# Merge with recorded mappings
|
|
existing_mappings.update(self.recorded_mappings)
|
|
|
|
# Save updated mappings
|
|
try:
|
|
with open(mapping_file, 'w') as f:
|
|
json.dump(existing_mappings, f, indent=2)
|
|
print(f"\n✅ Mappings saved to: {mapping_file}")
|
|
except Exception as e:
|
|
self.logger.error(f"Error saving mappings: {e}")
|
|
print(f"❌ Error saving mappings: {e}")
|
|
|
|
def display_summary(self):
|
|
"""Display setup summary"""
|
|
print("\n" + "=" * 80)
|
|
print("CONTROLLER SETUP COMPLETE")
|
|
print("=" * 80)
|
|
print(f"Recorded {len(self.recorded_mappings)} command mappings:")
|
|
print()
|
|
|
|
for ir_command, mapping in self.recorded_mappings.items():
|
|
print(f" {ir_command:20} -> {mapping['command']:15} ({mapping['description']})")
|
|
|
|
print()
|
|
print("The mappings have been saved and are ready for use by other services.")
|
|
print("=" * 80)
|
|
|
|
def cleanup(self):
|
|
"""Cleanup resources"""
|
|
self.running = False
|
|
try:
|
|
GPIO.cleanup()
|
|
except:
|
|
pass
|
|
self.logger.info("Controller setup cleanup complete")
|
|
|
|
def main():
|
|
"""Main function"""
|
|
import argparse
|
|
|
|
parser = argparse.ArgumentParser(description="IR Controller Setup App")
|
|
parser.add_argument(
|
|
"--gpio-pin",
|
|
type=int,
|
|
default=18,
|
|
help="GPIO pin for IR receiver (default: 18)"
|
|
)
|
|
parser.add_argument(
|
|
"--verbose", "-v",
|
|
action="store_true",
|
|
help="Enable verbose logging"
|
|
)
|
|
|
|
args = parser.parse_args()
|
|
|
|
# Set logging level
|
|
if args.verbose:
|
|
logging.getLogger().setLevel(logging.DEBUG)
|
|
|
|
# Create and run setup
|
|
setup = IRControllerSetup(args.gpio_pin)
|
|
success = setup.run_setup()
|
|
|
|
sys.exit(0 if success else 1)
|
|
|
|
if __name__ == "__main__":
|
|
main()
|