#!/usr/bin/env python3 """ Fixed Custom IR Protocol Decoder for Unknown Remote Based on signal analysis showing: - Most common: 71 pulses - Header: ~8843μs pulse + ~4507μs space - Bit pulse: ~484μs - Bit 0 space: ~645μs - Bit 1 space: ~1770μs """ import logging from typing import Dict, List, Optional, Tuple class IRProtocol: """Base class for IR protocol decoding""" def __init__(self, name: str): self.name = name self.logger = logging.getLogger(f"{__name__}.{name}") def decode(self, pulses: List[Tuple[bool, float]]) -> Optional[str]: """Decode IR pulses to command string""" raise NotImplementedError class CustomIRProtocol(IRProtocol): """ Custom IR Protocol Decoder for Unknown Remote Based on signal analysis showing: - Most common: 71 pulses - Header: ~8843μs pulse + ~4507μs space - Bit pulse: ~484μs - Bit 0 space: ~645μs - Bit 1 space: ~1770μs """ def __init__(self, name: str = "CUSTOM"): super().__init__(name) # Timing constants based on signal analysis # Header timing self.HEADER_PULSE = 8843 # microseconds (from analysis) self.HEADER_SPACE = 4507 # microseconds (from analysis) # Bit timing - this protocol uses space width modulation self.BIT_PULSE = 484 # microseconds (consistent pulse width) self.BIT_0_SPACE = 645 # microseconds (short space = bit 0) self.BIT_1_SPACE = 1770 # microseconds (long space = bit 1) # Repeat code timing (if supported) self.REPEAT_PULSE = 8843 # microseconds self.REPEAT_SPACE = 2093 # microseconds (from analysis) # Tolerance for timing matching self.TOLERANCE = 0.25 # 25% tolerance for this protocol # Expected frame structure self.EXPECTED_PULSE_COUNT = 71 # Most common pulse count self.DATA_BITS = 32 # Standard 32-bit data self.ADDRESS_BITS = 16 # 16-bit address self.COMMAND_BITS = 16 # 16-bit command # Footer timing (long gap before repeat) self.FOOTER_PULSE = 41949 # Very long pulse at end self.FOOTER_SPACE = 8997 # Space after footer def decode(self, pulses: List[Tuple[bool, float]]) -> Optional[str]: """ Decode IR pulses to command string Args: pulses: List of (is_pulse, duration) tuples is_pulse: True for pulse, False for space duration: Duration in seconds (will be converted to microseconds) Returns: Command string in format "CUSTOM_ADDRESS_COMMAND" or None if decode fails """ if len(pulses) < 2: return None # Convert durations to microseconds pulse_times = [duration * 1000000 for _, duration in pulses] # Check for repeat code first repeat_code = self._check_repeat_code(pulse_times) if repeat_code: return repeat_code # Check for normal frame if len(pulse_times) != self.EXPECTED_PULSE_COUNT: self.logger.debug(f"Expected {self.EXPECTED_PULSE_COUNT} pulses, got {len(pulse_times)}") return None # Decode the frame return self._decode_frame(pulse_times) def _check_repeat_code(self, pulse_times: List[float]) -> Optional[str]: """Check if this is a repeat code""" if len(pulse_times) == 2: pulse_time = pulse_times[0] space_time = pulse_times[1] if (self._is_timing_match(pulse_time, self.REPEAT_PULSE) and self._is_timing_match(space_time, self.REPEAT_SPACE)): return "REPEAT" return None def _decode_frame(self, pulse_times: List[float]) -> Optional[str]: """Decode a complete frame""" # Check header (first two timings) if not self._check_header(pulse_times[:2]): return None # Find where the data ends (look for the footer) data_end = self._find_data_end(pulse_times[2:]) if data_end is None: return None # Decode data bits (skip the footer) data_pulses = pulse_times[2:2+data_end] address, command = self._decode_data_bits(data_pulses) if address is None or command is None: return None return f"CUSTOM_{address:04X}_{command:04X}" def _check_header(self, header_times: List[float]) -> bool: """Check if the header matches expected timing""" if len(header_times) < 2: return False pulse_time = header_times[0] space_time = header_times[1] return (self._is_timing_match(pulse_time, self.HEADER_PULSE) and self._is_timing_match(space_time, self.HEADER_SPACE)) def _find_data_end(self, data_times: List[float]) -> Optional[int]: """Find where the data section ends by looking for the footer""" # Look for the very long pulse that indicates end of data for i in range(0, len(data_times), 2): if i < len(data_times): pulse_time = data_times[i] # Check if this is the footer pulse (very long) if self._is_timing_match(pulse_time, self.FOOTER_PULSE): return i # Return the index where data ends # If no footer found, assume it's a standard 32-bit protocol return 64 # 32 bits * 2 (pulse + space) def _decode_data_bits(self, data_times: List[float]) -> Tuple[Optional[int], Optional[int]]: """Decode data bits from timing data""" if len(data_times) < self.DATA_BITS * 2: print(f"Not enough data: {len(data_times)} < {self.DATA_BITS * 2}") return None, None address = 0 command = 0 # Process data bits in pairs (pulse, space) for i in range(0, min(len(data_times), self.DATA_BITS * 2), 2): if i + 1 >= len(data_times): print(f"Not enough data at bit {i//2}") break pulse_time = data_times[i] space_time = data_times[i + 1] # Check if pulse timing is valid (should be ~484μs) if not self._is_timing_match(pulse_time, self.BIT_PULSE): print(f"Invalid pulse timing at bit {i//2}: {pulse_time}μs (expected ~{self.BIT_PULSE}μs)") return None, None bit_index = i // 2 bit_value = self._decode_bit(space_time) if bit_value is None: print(f"Invalid space timing at bit {bit_index}: {space_time}μs (expected ~{self.BIT_0_SPACE}μs or ~{self.BIT_1_SPACE}μs)") return None, None # Set the bit in the appropriate field if bit_index < self.ADDRESS_BITS: if bit_value: address |= (1 << bit_index) else: command_bit_index = bit_index - self.ADDRESS_BITS if bit_value: command |= (1 << command_bit_index) return address, command def _decode_bit(self, space_time: float) -> Optional[bool]: """Decode a single bit from space timing""" if self._is_timing_match(space_time, self.BIT_1_SPACE): return True elif self._is_timing_match(space_time, self.BIT_0_SPACE): return False else: return None def _is_timing_match(self, actual: float, expected: float) -> bool: """Check if actual timing matches expected timing within tolerance""" min_time = expected * (1 - self.TOLERANCE) max_time = expected * (1 + self.TOLERANCE) return min_time <= actual <= max_time # New method to decode from raw timing data (not pulse/space pairs) def decode_from_raw_timings(raw_timings: List[float]) -> Optional[str]: """ Decode from raw timing data by determining pulse/space sequence Args: raw_timings: List of timing values in microseconds Returns: Command string or None if decode fails """ if len(raw_timings) < 2: return None # Create protocol instance protocol = CustomIRProtocol("RAW_CUSTOM") # Check for repeat code first if len(raw_timings) == 2: pulse_time = raw_timings[0] space_time = raw_timings[1] if (protocol._is_timing_match(pulse_time, protocol.REPEAT_PULSE) and protocol._is_timing_match(space_time, protocol.REPEAT_SPACE)): return "REPEAT" # Check for normal frame if len(raw_timings) != protocol.EXPECTED_PULSE_COUNT: return None # Check header if not protocol._check_header(raw_timings[:2]): return None # Find where the data ends data_end = protocol._find_data_end(raw_timings[2:]) if data_end is None: return None # Decode data bits data_pulses = raw_timings[2:2+data_end] address, command = protocol._decode_data_bits(data_pulses) if address is None or command is None: return None return f"CUSTOM_{address:04X}_{command:04X}" # Example usage and testing if __name__ == "__main__": import json # Setup logging logging.basicConfig(level=logging.DEBUG) # Test with captured signals try: with open("ir_analysis_20250927_190536.json", 'r') as f: signals = json.load(f) print("Testing custom protocol decoder with raw timing data...") successful_decodes = 0 failed_decodes = 0 decoded_commands = {} for i, signal_data in enumerate(signals): pulse_count = signal_data['pulse_count'] raw_timings = signal_data['pulses'] # Already in microseconds # Try to decode using raw timings command = decode_from_raw_timings(raw_timings) if command: successful_decodes += 1 if command not in decoded_commands: decoded_commands[command] = 0 decoded_commands[command] += 1 print(f"Signal {i+1:2d} ({pulse_count:2d} pulses): {command}") else: failed_decodes += 1 if pulse_count == 71: # Only show failed 71-pulse signals print(f"Signal {i+1:2d} ({pulse_count:2d} pulses): FAILED TO DECODE") print(f"\nSuccessful decodes: {successful_decodes}") print(f"Failed decodes: {failed_decodes}") print(f"Success rate: {successful_decodes/(successful_decodes+failed_decodes)*100:.1f}%") if decoded_commands: print("\nDecoded commands:") for command, count in sorted(decoded_commands.items()): print(f" {command}: {count} occurrences") except FileNotFoundError: print("No analysis file found. Run ir_signal_analyzer.py first to capture signals.") except Exception as e: print(f"Error: {e}")