439 lines
15 KiB
Python
439 lines
15 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Advanced IR Remote Control System for Raspberry Pi
|
|
Supports multiple IR protocols (NEC, RC5, RC6, etc.) with proper decoding
|
|
"""
|
|
|
|
import time
|
|
import threading
|
|
import queue
|
|
import logging
|
|
from typing import Dict, List, Optional, Tuple
|
|
import RPi.GPIO as GPIO
|
|
|
|
class IRProtocol:
|
|
"""Base class for IR protocol decoding"""
|
|
|
|
def __init__(self, name: str):
|
|
self.name = name
|
|
self.logger = logging.getLogger(f"{__name__}.{name}")
|
|
|
|
def decode(self, pulses: List[Tuple[bool, float]]) -> Optional[str]:
|
|
"""Decode IR pulses to command string"""
|
|
raise NotImplementedError
|
|
|
|
class NECProtocol(IRProtocol):
|
|
"""NEC IR protocol decoder"""
|
|
|
|
def __init__(self):
|
|
super().__init__("NEC")
|
|
# NEC protocol timing (in microseconds)
|
|
self.HEADER_PULSE = 9000
|
|
self.HEADER_SPACE = 4500
|
|
self.BIT_1_PULSE = 560
|
|
self.BIT_1_SPACE = 1690
|
|
self.BIT_0_PULSE = 560
|
|
self.BIT_0_SPACE = 560
|
|
self.REPEAT_SPACE = 2250
|
|
self.TOLERANCE = 0.2 # 20% tolerance
|
|
|
|
def decode(self, pulses: List[Tuple[bool, float]]) -> Optional[str]:
|
|
"""Decode NEC protocol pulses"""
|
|
if len(pulses) < 2:
|
|
return None
|
|
|
|
# Check for repeat code
|
|
if len(pulses) == 2:
|
|
pulse_time = pulses[0][1] * 1000000 # Convert to microseconds
|
|
space_time = pulses[1][1] * 1000000
|
|
|
|
if (self.HEADER_PULSE * (1 - self.TOLERANCE) <= pulse_time <= self.HEADER_PULSE * (1 + self.TOLERANCE) and
|
|
self.REPEAT_SPACE * (1 - self.TOLERANCE) <= space_time <= self.REPEAT_SPACE * (1 + self.TOLERANCE)):
|
|
return "REPEAT"
|
|
|
|
# Check for normal NEC frame (should have 34 pulses: header + 32 data bits)
|
|
if len(pulses) != 34:
|
|
return None
|
|
|
|
# Check header
|
|
header_pulse = pulses[0][1] * 1000000
|
|
header_space = pulses[1][1] * 1000000
|
|
|
|
if not (self.HEADER_PULSE * (1 - self.TOLERANCE) <= header_pulse <= self.HEADER_PULSE * (1 + self.TOLERANCE) and
|
|
self.HEADER_SPACE * (1 - self.TOLERANCE) <= header_space <= self.HEADER_SPACE * (1 + self.TOLERANCE)):
|
|
return None
|
|
|
|
# Decode 32 data bits
|
|
address = 0
|
|
command = 0
|
|
|
|
for i in range(2, 34, 2): # Skip header, process data bits
|
|
pulse_time = pulses[i][1] * 1000000
|
|
space_time = pulses[i + 1][1] * 1000000
|
|
|
|
# Check if it's a valid bit
|
|
if not (self.BIT_0_PULSE * (1 - self.TOLERANCE) <= pulse_time <= self.BIT_0_PULSE * (1 + self.TOLERANCE)):
|
|
return None
|
|
|
|
bit_index = (i - 2) // 2
|
|
|
|
if self.BIT_1_SPACE * (1 - self.TOLERANCE) <= space_time <= self.BIT_1_SPACE * (1 + self.TOLERANCE):
|
|
# Bit 1
|
|
if bit_index < 16:
|
|
address |= (1 << bit_index)
|
|
else:
|
|
command |= (1 << (bit_index - 16))
|
|
elif self.BIT_0_SPACE * (1 - self.TOLERANCE) <= space_time <= self.BIT_0_SPACE * (1 + self.TOLERANCE):
|
|
# Bit 0 (already 0 in the variables)
|
|
pass
|
|
else:
|
|
return None
|
|
|
|
# Return command as hex string
|
|
return f"NEC_{address:04X}_{command:04X}"
|
|
|
|
class RC5Protocol(IRProtocol):
|
|
"""RC5 IR protocol decoder"""
|
|
|
|
def __init__(self):
|
|
super().__init__("RC5")
|
|
self.BIT_TIME = 889 # microseconds
|
|
self.TOLERANCE = 0.2
|
|
|
|
def decode(self, pulses: List[Tuple[bool, float]]) -> Optional[str]:
|
|
"""Decode RC5 protocol pulses"""
|
|
if len(pulses) < 3:
|
|
return None
|
|
|
|
# RC5 starts with a space, then alternating pulses and spaces
|
|
bits = []
|
|
|
|
for i, (is_pulse, duration) in enumerate(pulses):
|
|
time_us = duration * 1000000
|
|
|
|
if i == 0 and not is_pulse:
|
|
# First space - skip
|
|
continue
|
|
|
|
# Determine if this is a short or long pulse/space
|
|
if time_us < self.BIT_TIME * (1 + self.TOLERANCE):
|
|
bits.append(0)
|
|
else:
|
|
bits.append(1)
|
|
|
|
if len(bits) < 14: # RC5 has 14 bits
|
|
return None
|
|
|
|
# Extract fields
|
|
start_bits = bits[0:2]
|
|
toggle = bits[2]
|
|
address = bits[3:8]
|
|
command = bits[8:14]
|
|
|
|
# Check start bits
|
|
if start_bits != [1, 1]:
|
|
return None
|
|
|
|
# Convert to integers
|
|
addr_val = sum(bit << (4 - i) for i, bit in enumerate(address))
|
|
cmd_val = sum(bit << (5 - i) for i, bit in enumerate(command))
|
|
|
|
return f"RC5_{addr_val:02X}_{cmd_val:02X}_{toggle}"
|
|
|
|
class IRRemote:
|
|
"""Advanced IR Remote Control System"""
|
|
|
|
def __init__(self, gpio_pin: int = 18, protocols: List[IRProtocol] = None):
|
|
self.gpio_pin = gpio_pin
|
|
self.protocols = protocols or [NECProtocol(), RC5Protocol()]
|
|
self.logger = logging.getLogger(__name__)
|
|
|
|
# IR signal capture
|
|
self.pulses = []
|
|
self.capturing = False
|
|
self.last_pulse_time = 0
|
|
self.capture_timeout = 0.1 # 100ms timeout
|
|
|
|
# Command queue
|
|
self.command_queue = queue.Queue()
|
|
|
|
# GPIO setup
|
|
self.setup_gpio()
|
|
|
|
# Start capture thread
|
|
self.capture_thread = threading.Thread(target=self._capture_loop, daemon=True)
|
|
self.capture_thread.start()
|
|
|
|
# Start decode thread
|
|
self.decode_thread = threading.Thread(target=self._decode_loop, daemon=True)
|
|
self.decode_thread.start()
|
|
|
|
def setup_gpio(self):
|
|
"""Setup GPIO for IR receiver"""
|
|
try:
|
|
GPIO.setmode(GPIO.BCM)
|
|
GPIO.setup(self.gpio_pin, GPIO.IN, pull_up_down=GPIO.PUD_UP)
|
|
GPIO.add_event_detect(self.gpio_pin, GPIO.BOTH,
|
|
callback=self._gpio_callback, bouncetime=1)
|
|
self.logger.info(f"IR Remote GPIO setup complete on pin {self.gpio_pin}")
|
|
except Exception as e:
|
|
self.logger.error(f"GPIO setup failed: {e}")
|
|
raise
|
|
|
|
def _gpio_callback(self, channel):
|
|
"""GPIO callback for IR signal detection"""
|
|
current_time = time.time()
|
|
|
|
if not self.capturing:
|
|
# Start capturing on first pulse
|
|
self.capturing = True
|
|
self.pulses = []
|
|
self.last_pulse_time = current_time
|
|
else:
|
|
# Add pulse/space to capture
|
|
duration = current_time - self.last_pulse_time
|
|
is_pulse = GPIO.input(self.gpio_pin) == GPIO.LOW
|
|
|
|
self.pulses.append((is_pulse, duration))
|
|
self.last_pulse_time = current_time
|
|
|
|
def _capture_loop(self):
|
|
"""Main capture loop"""
|
|
while True:
|
|
if self.capturing:
|
|
# Check for capture timeout
|
|
if time.time() - self.last_pulse_time > self.capture_timeout:
|
|
if len(self.pulses) > 0:
|
|
# Process captured pulses
|
|
self._process_pulses(self.pulses.copy())
|
|
self.capturing = False
|
|
self.pulses = []
|
|
|
|
time.sleep(0.001) # 1ms sleep
|
|
|
|
def _process_pulses(self, pulses: List[Tuple[bool, float]]):
|
|
"""Process captured pulses through all protocols"""
|
|
for protocol in self.protocols:
|
|
try:
|
|
command = protocol.decode(pulses)
|
|
if command:
|
|
self.logger.debug(f"Decoded {protocol.name} command: {command}")
|
|
self.command_queue.put(command)
|
|
return
|
|
except Exception as e:
|
|
self.logger.debug(f"Protocol {protocol.name} decode error: {e}")
|
|
|
|
# If no protocol matched, log the raw pulses for debugging
|
|
self.logger.debug(f"No protocol matched for {len(pulses)} pulses")
|
|
|
|
def _decode_loop(self):
|
|
"""Main decode loop"""
|
|
while True:
|
|
try:
|
|
if not self.command_queue.empty():
|
|
command = self.command_queue.get(timeout=0.1)
|
|
self._handle_command(command)
|
|
else:
|
|
time.sleep(0.01)
|
|
except queue.Empty:
|
|
continue
|
|
except Exception as e:
|
|
self.logger.error(f"Error in decode loop: {e}")
|
|
|
|
def _handle_command(self, command: str):
|
|
"""Handle decoded IR command"""
|
|
self.logger.info(f"IR Command received: {command}")
|
|
|
|
# This method can be overridden by subclasses
|
|
# or connected to a callback system
|
|
if hasattr(self, 'command_callback'):
|
|
self.command_callback(command)
|
|
|
|
def set_command_callback(self, callback):
|
|
"""Set callback function for IR commands"""
|
|
self.command_callback = callback
|
|
|
|
def get_command(self, timeout: float = 1.0) -> Optional[str]:
|
|
"""Get next IR command with timeout"""
|
|
try:
|
|
return self.command_queue.get(timeout=timeout)
|
|
except queue.Empty:
|
|
return None
|
|
|
|
def cleanup(self):
|
|
"""Cleanup GPIO resources"""
|
|
GPIO.cleanup()
|
|
self.logger.info("IR Remote cleanup complete")
|
|
|
|
class IRCodeMapper:
|
|
"""Map IR codes to application commands"""
|
|
|
|
def __init__(self, mapping_file: str = "ir_mapping.json"):
|
|
self.mapping_file = mapping_file
|
|
self.mapping = self.load_mapping()
|
|
self.logger = logging.getLogger(__name__)
|
|
|
|
def load_mapping(self) -> Dict[str, str]:
|
|
"""Load IR code mapping from file"""
|
|
try:
|
|
if os.path.exists(self.mapping_file):
|
|
with open(self.mapping_file, 'r') as f:
|
|
return json.load(f)
|
|
else:
|
|
return self.create_default_mapping()
|
|
except Exception as e:
|
|
self.logger.error(f"Error loading IR mapping: {e}")
|
|
return {}
|
|
|
|
def create_default_mapping(self) -> Dict[str, str]:
|
|
"""Create default IR code mapping"""
|
|
default_mapping = {
|
|
# NEC protocol examples (these would be actual codes from your remote)
|
|
"NEC_00FF_00FF": "power_toggle",
|
|
"NEC_00FF_807F": "channel_0",
|
|
"NEC_00FF_40BF": "channel_1",
|
|
"NEC_00FF_C03F": "channel_2",
|
|
"NEC_00FF_20DF": "channel_3",
|
|
"NEC_00FF_A05F": "channel_4",
|
|
"NEC_00FF_609F": "channel_5",
|
|
"NEC_00FF_E01F": "channel_6",
|
|
"NEC_00FF_10EF": "channel_7",
|
|
"NEC_00FF_906F": "channel_8",
|
|
"NEC_00FF_50AF": "channel_9",
|
|
"NEC_00FF_00FF": "play_pause",
|
|
"NEC_00FF_807F": "stop",
|
|
"NEC_00FF_40BF": "next_channel",
|
|
"NEC_00FF_C03F": "prev_channel",
|
|
"NEC_00FF_20DF": "volume_up",
|
|
"NEC_00FF_A05F": "volume_down",
|
|
|
|
# RC5 protocol examples
|
|
"RC5_00_0C_0": "power_toggle",
|
|
"RC5_00_00_0": "channel_0",
|
|
"RC5_00_01_0": "channel_1",
|
|
"RC5_00_02_0": "channel_2",
|
|
"RC5_00_03_0": "channel_3",
|
|
"RC5_00_04_0": "channel_4",
|
|
"RC5_00_05_0": "channel_5",
|
|
"RC5_00_06_0": "channel_6",
|
|
"RC5_00_07_0": "channel_7",
|
|
"RC5_00_08_0": "channel_8",
|
|
"RC5_00_09_0": "channel_9",
|
|
"RC5_00_35_0": "play_pause",
|
|
"RC5_00_36_0": "stop",
|
|
"RC5_00_32_0": "next_channel",
|
|
"RC5_00_33_0": "prev_channel",
|
|
"RC5_00_10_0": "volume_up",
|
|
"RC5_00_11_0": "volume_down",
|
|
|
|
# Repeat command
|
|
"REPEAT": "repeat_last"
|
|
}
|
|
|
|
# Save default mapping
|
|
with open(self.mapping_file, 'w') as f:
|
|
json.dump(default_mapping, f, indent=2)
|
|
|
|
self.logger.info(f"Created default IR mapping file: {self.mapping_file}")
|
|
return default_mapping
|
|
|
|
def get_command(self, ir_code: str) -> Optional[str]:
|
|
"""Get application command for IR code"""
|
|
return self.mapping.get(ir_code)
|
|
|
|
def add_mapping(self, ir_code: str, command: str):
|
|
"""Add new IR code mapping"""
|
|
self.mapping[ir_code] = command
|
|
self.save_mapping()
|
|
|
|
def save_mapping(self):
|
|
"""Save IR code mapping to file"""
|
|
try:
|
|
with open(self.mapping_file, 'w') as f:
|
|
json.dump(self.mapping, f, indent=2)
|
|
self.logger.info("IR mapping saved")
|
|
except Exception as e:
|
|
self.logger.error(f"Error saving IR mapping: {e}")
|
|
|
|
class IRCodeLearner:
|
|
"""Learn IR codes from unknown remotes"""
|
|
|
|
def __init__(self, ir_remote: IRRemote, mapper: IRCodeMapper):
|
|
self.ir_remote = ir_remote
|
|
self.mapper = mapper
|
|
self.learning = False
|
|
self.logger = logging.getLogger(__name__)
|
|
|
|
def start_learning(self):
|
|
"""Start IR code learning mode"""
|
|
self.learning = True
|
|
self.logger.info("IR code learning mode started")
|
|
print("IR Code Learning Mode Started")
|
|
print("Press buttons on your remote to learn codes")
|
|
print("Press Ctrl+C to exit learning mode")
|
|
|
|
try:
|
|
while self.learning:
|
|
command = self.ir_remote.get_command(timeout=1.0)
|
|
if command:
|
|
self._learn_command(command)
|
|
except KeyboardInterrupt:
|
|
self.stop_learning()
|
|
|
|
def stop_learning(self):
|
|
"""Stop IR code learning mode"""
|
|
self.learning = False
|
|
self.logger.info("IR code learning mode stopped")
|
|
print("IR Code Learning Mode Stopped")
|
|
|
|
def _learn_command(self, ir_code: str):
|
|
"""Learn a new IR command"""
|
|
print(f"Received IR code: {ir_code}")
|
|
|
|
# Ask user what this command should do
|
|
print("What should this button do?")
|
|
print("Options: power_toggle, channel_0-9, play_pause, stop, next_channel, prev_channel, volume_up, volume_down, or custom command")
|
|
|
|
try:
|
|
user_input = input("Enter command (or 'skip' to ignore): ").strip()
|
|
|
|
if user_input.lower() == 'skip':
|
|
print("Skipped")
|
|
return
|
|
|
|
if user_input:
|
|
self.mapper.add_mapping(ir_code, user_input)
|
|
print(f"Added mapping: {ir_code} -> {user_input}")
|
|
else:
|
|
print("No command entered, skipped")
|
|
|
|
except EOFError:
|
|
# Handle case where input is not available (e.g., running as service)
|
|
print("Cannot get user input, skipping command")
|
|
except Exception as e:
|
|
self.logger.error(f"Error learning command: {e}")
|
|
|
|
# Example usage and testing
|
|
if __name__ == "__main__":
|
|
import json
|
|
|
|
# Setup logging
|
|
logging.basicConfig(level=logging.INFO)
|
|
|
|
# Create IR remote
|
|
ir_remote = IRRemote(gpio_pin=18)
|
|
|
|
# Create code mapper
|
|
mapper = IRCodeMapper()
|
|
|
|
# Create code learner
|
|
learner = IRCodeLearner(ir_remote, mapper)
|
|
|
|
try:
|
|
# Start learning mode
|
|
learner.start_learning()
|
|
except KeyboardInterrupt:
|
|
print("Exiting...")
|
|
finally:
|
|
ir_remote.cleanup()
|