#!/usr/bin/env python3 """ Corrected Custom IR Protocol Decoder for Unknown Remote Based on detailed signal analysis: - Header: position 0 (8843μs pulse) + position 1 (4508μs space) - Data pulses: positions 2,4,6,8... (486μs) - Data spaces: positions 3,5,7,9... (645μs for bit 0, 1770μs for bit 1) - Footer: position 68 (8843μs pulse) - Total: 71 positions (0-70) """ import logging from typing import Dict, List, Optional, Tuple class CustomIRProtocol: """ Custom IR Protocol Decoder for Unknown Remote Protocol structure: - Position 0: Header pulse (8843μs) - Position 1: Header space (4508μs) - Positions 2,4,6,8...: Data pulses (486μs) - Positions 3,5,7,9...: Data spaces (645μs=bit0, 1770μs=bit1) - Position 68: Footer pulse (8843μs) - Positions 69-70: Footer spaces """ def __init__(self, name: str = "CUSTOM"): self.name = name self.logger = logging.getLogger(f"{__name__}.{name}") # Timing constants based on signal analysis self.HEADER_PULSE = 8843 # microseconds self.HEADER_SPACE = 4507 # microseconds (from analysis) # Data timing self.DATA_PULSE = 486 # microseconds (data pulses) self.BIT_0_SPACE = 645 # microseconds (short space = bit 0) self.BIT_1_SPACE = 1770 # microseconds (long space = bit 1) # Footer timing self.FOOTER_PULSE = 8843 # microseconds (same as header) self.FOOTER_SPACE = 8997 # microseconds (from analysis) # Tolerance for timing matching self.TOLERANCE = 0.25 # 25% tolerance # Protocol structure self.TOTAL_POSITIONS = 71 # Total positions in signal self.DATA_START = 2 # Data starts at position 2 self.DATA_END = 68 # Data ends at position 68 self.DATA_BITS = 33 # 33 bits of data (positions 2-67) def decode(self, raw_timings: List[float]) -> Optional[str]: """ Decode from raw timing data Args: raw_timings: List of timing values in microseconds Returns: Command string in format "CUSTOM_ADDRESS_COMMAND" or None if decode fails """ if len(raw_timings) != self.TOTAL_POSITIONS: return None # Check header if not self._check_header(raw_timings[0], raw_timings[1]): return None # Check footer if not self._check_footer(raw_timings[68]): return None # Decode data bits address, command = self._decode_data_bits(raw_timings) if address is None or command is None: return None return f"CUSTOM_{address:04X}_{command:04X}" def _check_header(self, pulse_time: float, space_time: float) -> bool: """Check if header matches expected timing""" return (self._is_timing_match(pulse_time, self.HEADER_PULSE) and self._is_timing_match(space_time, self.HEADER_SPACE)) def _check_footer(self, pulse_time: float) -> bool: """Check if footer matches expected timing""" return self._is_timing_match(pulse_time, self.FOOTER_PULSE) def _decode_data_bits(self, raw_timings: List[float]) -> Tuple[Optional[int], Optional[int]]: """Decode data bits from raw timing data""" address = 0 command = 0 # Process data bits (positions 2-67, 33 bits total) for bit_index in range(self.DATA_BITS): pulse_pos = self.DATA_START + (bit_index * 2) space_pos = pulse_pos + 1 if pulse_pos >= len(raw_timings) or space_pos >= len(raw_timings): print(f"Not enough data for bit {bit_index}") return None, None pulse_time = raw_timings[pulse_pos] space_time = raw_timings[space_pos] # Check pulse timing if not self._is_timing_match(pulse_time, self.DATA_PULSE): print(f"Invalid pulse timing at bit {bit_index}: {pulse_time}μs (expected ~{self.DATA_PULSE}μs)") return None, None # Decode bit from space timing bit_value = self._decode_bit(space_time) if bit_value is None: print(f"Invalid space timing at bit {bit_index}: {space_time}μs (expected ~{self.BIT_0_SPACE}μs or ~{self.BIT_1_SPACE}μs)") return None, None # Set the bit in the appropriate field if bit_index < 16: # First 16 bits are address if bit_value: address |= (1 << bit_index) else: # Last 17 bits are command command_bit_index = bit_index - 16 if bit_value: command |= (1 << command_bit_index) return address, command def _decode_bit(self, space_time: float) -> Optional[bool]: """Decode a single bit from space timing""" if self._is_timing_match(space_time, self.BIT_1_SPACE): return True elif self._is_timing_match(space_time, self.BIT_0_SPACE): return False else: return None def _is_timing_match(self, actual: float, expected: float) -> bool: """Check if actual timing matches expected timing within tolerance""" min_time = expected * (1 - self.TOLERANCE) max_time = expected * (1 + self.TOLERANCE) return min_time <= actual <= max_time def decode_from_raw_timings(raw_timings: List[float]) -> Optional[str]: """ Decode from raw timing data Args: raw_timings: List of timing values in microseconds Returns: Command string or None if decode fails """ protocol = CustomIRProtocol("RAW_CUSTOM") return protocol.decode(raw_timings) # Test with captured signals if __name__ == "__main__": import json # Setup logging logging.basicConfig(level=logging.INFO) try: with open("ir_analysis_20250927_190536.json", 'r') as f: signals = json.load(f) print("Testing corrected custom protocol decoder...") print("=" * 60) successful_decodes = 0 failed_decodes = 0 decoded_commands = {} for i, signal_data in enumerate(signals): pulse_count = signal_data['pulse_count'] raw_timings = signal_data['pulses'] # Already in microseconds # Try to decode using raw timings command = decode_from_raw_timings(raw_timings) if command: successful_decodes += 1 if command not in decoded_commands: decoded_commands[command] = 0 decoded_commands[command] += 1 print(f"Signal {i+1:2d} ({pulse_count:2d} pulses): {command}") else: failed_decodes += 1 if pulse_count == 71: # Only show failed 71-pulse signals print(f"Signal {i+1:2d} ({pulse_count:2d} pulses): FAILED TO DECODE") print("=" * 60) print(f"Successful decodes: {successful_decodes}") print(f"Failed decodes: {failed_decodes}") print(f"Success rate: {successful_decodes/(successful_decodes+failed_decodes)*100:.1f}%") if decoded_commands: print("\nDecoded commands:") for command, count in sorted(decoded_commands.items()): print(f" {command}: {count} occurrences") if successful_decodes > 0: print("\n✅ SUCCESS! Custom protocol decoder is working!") print("You can now integrate this into your IR system.") else: print("\n❌ Decoder still needs adjustment.") except FileNotFoundError: print("No analysis file found. Run ir_signal_analyzer.py first to capture signals.") except Exception as e: print(f"Error: {e}")