ir listener
This commit is contained in:
336
ir_listener.py
Executable file
336
ir_listener.py
Executable file
@@ -0,0 +1,336 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
IR Remote Listener Script
|
||||
Listens to IR commands and prints them to console with configuration display
|
||||
"""
|
||||
|
||||
import os
|
||||
import sys
|
||||
import json
|
||||
import time
|
||||
import logging
|
||||
from pathlib import Path
|
||||
from typing import Dict, Optional
|
||||
|
||||
# Add current directory to path to import local modules
|
||||
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
|
||||
|
||||
try:
|
||||
from ir_remote import IRRemote, IRCodeMapper
|
||||
from config_manager import ConfigManager
|
||||
HARDWARE_AVAILABLE = True
|
||||
except ImportError as e:
|
||||
print(f"Warning: Hardware modules not available: {e}")
|
||||
print("Running in simulation mode (no actual IR hardware)")
|
||||
HARDWARE_AVAILABLE = False
|
||||
|
||||
# Create mock classes for testing
|
||||
class MockIRRemote:
|
||||
def __init__(self, gpio_pin=18):
|
||||
self.gpio_pin = gpio_pin
|
||||
self.command_callback = None
|
||||
|
||||
def set_command_callback(self, callback):
|
||||
self.command_callback = callback
|
||||
|
||||
def cleanup(self):
|
||||
pass
|
||||
|
||||
class MockIRCodeMapper:
|
||||
def __init__(self, mapping_file="ir_mapping.json"):
|
||||
self.mapping_file = mapping_file
|
||||
self.mapping = {}
|
||||
|
||||
def get_command(self, ir_code):
|
||||
return self.mapping.get(ir_code)
|
||||
|
||||
class MockConfigManager:
|
||||
def __init__(self, config_dir="/etc/video_player"):
|
||||
self.config_dir = config_dir
|
||||
self.main_config = type('Config', (), {
|
||||
'ir_pin': 18,
|
||||
'ir_protocols': ['NEC', 'RC5'],
|
||||
'ir_repeat_delay': 0.1
|
||||
})()
|
||||
self.channels = {}
|
||||
self.ir_mapping = {}
|
||||
self.ir_mapping_file = "ir_mapping.json"
|
||||
|
||||
# Load template mapping for testing
|
||||
template_file = os.path.join(os.path.dirname(__file__), "templates", "ir_mapping.json.template")
|
||||
if os.path.exists(template_file):
|
||||
try:
|
||||
with open(template_file, 'r') as f:
|
||||
template_data = json.load(f)
|
||||
# Convert template to IRMappingConfig objects
|
||||
for ir_code, mapping_info in template_data.items():
|
||||
if isinstance(mapping_info, dict):
|
||||
self.ir_mapping[ir_code] = type('IRMappingConfig', (), {
|
||||
'ir_code': ir_code,
|
||||
'command': mapping_info.get('command', ''),
|
||||
'description': mapping_info.get('description', ''),
|
||||
'repeatable': mapping_info.get('repeatable', True)
|
||||
})()
|
||||
except Exception as e:
|
||||
print(f"Warning: Could not load template mapping: {e}")
|
||||
|
||||
class IRListener:
|
||||
"""IR Remote Listener with Configuration Display"""
|
||||
|
||||
def __init__(self, config_dir: str = "/etc/video_player"):
|
||||
if HARDWARE_AVAILABLE:
|
||||
self.config_manager = ConfigManager(config_dir)
|
||||
else:
|
||||
self.config_manager = MockConfigManager(config_dir)
|
||||
self.ir_remote = None
|
||||
self.mapper = None
|
||||
self.logger = self._setup_logging()
|
||||
|
||||
def _setup_logging(self) -> logging.Logger:
|
||||
"""Setup logging for the listener"""
|
||||
logger = logging.getLogger(__name__)
|
||||
logger.setLevel(logging.INFO)
|
||||
|
||||
# Create console handler
|
||||
handler = logging.StreamHandler()
|
||||
formatter = logging.Formatter(
|
||||
'%(asctime)s - %(name)s - %(levelname)s - %(message)s'
|
||||
)
|
||||
handler.setFormatter(formatter)
|
||||
logger.addHandler(handler)
|
||||
|
||||
return logger
|
||||
|
||||
def display_configuration(self):
|
||||
"""Display relevant IR configuration information"""
|
||||
print("=" * 60)
|
||||
print("IR REMOTE LISTENER - CONFIGURATION")
|
||||
print("=" * 60)
|
||||
|
||||
# Display main IR configuration
|
||||
config = self.config_manager.main_config
|
||||
print(f"IR GPIO Pin: {config.ir_pin}")
|
||||
print(f"Supported Protocols: {', '.join(config.ir_protocols)}")
|
||||
print(f"Repeat Delay: {config.ir_repeat_delay}s")
|
||||
print()
|
||||
|
||||
# Display IR mapping configuration
|
||||
ir_mapping = self.config_manager.ir_mapping
|
||||
print(f"Configured IR Mappings: {len(ir_mapping)}")
|
||||
print("-" * 40)
|
||||
|
||||
if ir_mapping:
|
||||
# Group mappings by protocol
|
||||
nec_mappings = {}
|
||||
rc5_mappings = {}
|
||||
other_mappings = {}
|
||||
|
||||
for ir_code, mapping in ir_mapping.items():
|
||||
if ir_code.startswith("NEC_"):
|
||||
nec_mappings[ir_code] = mapping
|
||||
elif ir_code.startswith("RC5_"):
|
||||
rc5_mappings[ir_code] = mapping
|
||||
else:
|
||||
other_mappings[ir_code] = mapping
|
||||
|
||||
# Display NEC mappings
|
||||
if nec_mappings:
|
||||
print("NEC Protocol Mappings:")
|
||||
for ir_code, mapping in sorted(nec_mappings.items()):
|
||||
print(f" {ir_code:20} -> {mapping.command:15} ({mapping.description})")
|
||||
print()
|
||||
|
||||
# Display RC5 mappings
|
||||
if rc5_mappings:
|
||||
print("RC5 Protocol Mappings:")
|
||||
for ir_code, mapping in sorted(rc5_mappings.items()):
|
||||
print(f" {ir_code:20} -> {mapping.command:15} ({mapping.description})")
|
||||
print()
|
||||
|
||||
# Display other mappings
|
||||
if other_mappings:
|
||||
print("Other Protocol Mappings:")
|
||||
for ir_code, mapping in sorted(other_mappings.items()):
|
||||
print(f" {ir_code:20} -> {mapping.command:15} ({mapping.description})")
|
||||
print()
|
||||
else:
|
||||
print("No IR mappings configured")
|
||||
print()
|
||||
|
||||
# Display channel information
|
||||
channels = self.config_manager.channels
|
||||
print(f"Available Channels: {len(channels)}")
|
||||
if channels:
|
||||
print("-" * 40)
|
||||
for channel_num, channel in sorted(channels.items()):
|
||||
status = "ENABLED" if channel.enabled else "DISABLED"
|
||||
print(f" Channel {channel_num:2}: {channel.name:20} ({status})")
|
||||
print()
|
||||
|
||||
print("=" * 60)
|
||||
if HARDWARE_AVAILABLE:
|
||||
print("LISTENING FOR IR COMMANDS...")
|
||||
else:
|
||||
print("SIMULATION MODE - NO HARDWARE AVAILABLE")
|
||||
print("Use --simulate flag to test with simulated commands")
|
||||
print("Press Ctrl+C to exit")
|
||||
print("=" * 60)
|
||||
print()
|
||||
|
||||
def setup_ir_remote(self):
|
||||
"""Setup IR remote with configuration"""
|
||||
try:
|
||||
config = self.config_manager.main_config
|
||||
|
||||
# Create IR remote instance
|
||||
if HARDWARE_AVAILABLE:
|
||||
self.ir_remote = IRRemote(gpio_pin=config.ir_pin)
|
||||
self.mapper = IRCodeMapper(str(self.config_manager.ir_mapping_file))
|
||||
else:
|
||||
self.ir_remote = MockIRRemote(gpio_pin=config.ir_pin)
|
||||
self.mapper = MockIRCodeMapper(str(self.config_manager.ir_mapping_file))
|
||||
|
||||
# Set up command callback
|
||||
self.ir_remote.set_command_callback(self.handle_ir_command)
|
||||
|
||||
self.logger.info(f"IR Remote setup complete on GPIO pin {config.ir_pin}")
|
||||
return True
|
||||
|
||||
except Exception as e:
|
||||
self.logger.error(f"Failed to setup IR remote: {e}")
|
||||
return False
|
||||
|
||||
def handle_ir_command(self, ir_code: str):
|
||||
"""Handle received IR command"""
|
||||
timestamp = time.strftime("%H:%M:%S")
|
||||
|
||||
# Get mapped command
|
||||
mapped_command = self.mapper.get_command(ir_code) if self.mapper else None
|
||||
|
||||
# Print command information
|
||||
print(f"[{timestamp}] IR Command Received:")
|
||||
print(f" Raw Code: {ir_code}")
|
||||
|
||||
if mapped_command:
|
||||
print(f" Mapped Command: {mapped_command}")
|
||||
|
||||
# Get additional info from config manager mapping
|
||||
if ir_code in self.config_manager.ir_mapping:
|
||||
mapping_info = self.config_manager.ir_mapping[ir_code]
|
||||
if mapping_info.description:
|
||||
print(f" Description: {mapping_info.description}")
|
||||
print(f" Repeatable: {mapping_info.repeatable}")
|
||||
else:
|
||||
print(f" Mapped Command: UNKNOWN (not in mapping)")
|
||||
|
||||
print()
|
||||
|
||||
def run(self, simulate=False):
|
||||
"""Main run loop"""
|
||||
try:
|
||||
print("DEBUG: Starting run method")
|
||||
# Display configuration
|
||||
print("DEBUG: Displaying configuration")
|
||||
self.display_configuration()
|
||||
|
||||
# Setup IR remote
|
||||
print("DEBUG: Setting up IR remote")
|
||||
if not self.setup_ir_remote():
|
||||
print("Failed to setup IR remote. Exiting.")
|
||||
return False
|
||||
|
||||
print("DEBUG: Starting main loop")
|
||||
if simulate or not HARDWARE_AVAILABLE:
|
||||
print("DEBUG: Running simulation")
|
||||
self._run_simulation()
|
||||
else:
|
||||
print("DEBUG: Running hardware mode")
|
||||
# Main listening loop
|
||||
while True:
|
||||
time.sleep(0.1)
|
||||
|
||||
except KeyboardInterrupt:
|
||||
print("\nShutting down IR listener...")
|
||||
return True
|
||||
except Exception as e:
|
||||
self.logger.error(f"Error in main loop: {e}")
|
||||
return False
|
||||
finally:
|
||||
self.cleanup()
|
||||
|
||||
def _run_simulation(self):
|
||||
"""Run simulation mode for testing"""
|
||||
print("Simulation mode active. Simulating IR commands...")
|
||||
print("Available test commands:")
|
||||
print(" - NEC_00FF_00FF (Power toggle)")
|
||||
print(" - NEC_00FF_807F (Channel 0)")
|
||||
print(" - NEC_00FF_40BF (Channel 1)")
|
||||
print(" - RC5_00_0C_0 (Power toggle RC5)")
|
||||
print(" - REPEAT (Repeat last command)")
|
||||
print()
|
||||
|
||||
# Simulate some commands
|
||||
test_commands = [
|
||||
"NEC_00FF_00FF",
|
||||
"NEC_00FF_807F",
|
||||
"NEC_00FF_40BF",
|
||||
"RC5_00_0C_0",
|
||||
"REPEAT",
|
||||
"UNKNOWN_CODE_123"
|
||||
]
|
||||
|
||||
for i, command in enumerate(test_commands):
|
||||
print(f"Simulating command {i+1}/{len(test_commands)} in 2 seconds...")
|
||||
time.sleep(2)
|
||||
self.handle_ir_command(command)
|
||||
|
||||
print("Simulation complete. Press Ctrl+C to exit...")
|
||||
|
||||
# Keep running for manual testing
|
||||
try:
|
||||
while True:
|
||||
time.sleep(1)
|
||||
except KeyboardInterrupt:
|
||||
print("\nSimulation interrupted by user")
|
||||
|
||||
def cleanup(self):
|
||||
"""Cleanup resources"""
|
||||
if self.ir_remote:
|
||||
self.ir_remote.cleanup()
|
||||
self.logger.info("IR Listener cleanup complete")
|
||||
|
||||
def main():
|
||||
"""Main function"""
|
||||
import argparse
|
||||
|
||||
parser = argparse.ArgumentParser(description="IR Remote Listener")
|
||||
parser.add_argument(
|
||||
"--config-dir",
|
||||
default="/etc/video_player",
|
||||
help="Configuration directory path (default: /etc/video_player)"
|
||||
)
|
||||
parser.add_argument(
|
||||
"--verbose", "-v",
|
||||
action="store_true",
|
||||
help="Enable verbose logging"
|
||||
)
|
||||
parser.add_argument(
|
||||
"--simulate", "-s",
|
||||
action="store_true",
|
||||
help="Run in simulation mode (for testing without hardware)"
|
||||
)
|
||||
|
||||
args = parser.parse_args()
|
||||
|
||||
# Set logging level
|
||||
if args.verbose:
|
||||
logging.getLogger().setLevel(logging.DEBUG)
|
||||
|
||||
# Create and run listener
|
||||
listener = IRListener(args.config_dir)
|
||||
success = listener.run(simulate=args.simulate)
|
||||
|
||||
sys.exit(0 if success else 1)
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
Reference in New Issue
Block a user