Files
rpi-tulivision/custom_ir_protocol_fixed.py
2025-09-27 22:35:56 +02:00

310 lines
11 KiB
Python

#!/usr/bin/env python3
"""
Fixed Custom IR Protocol Decoder for Unknown Remote
Based on signal analysis showing:
- Most common: 71 pulses
- Header: ~8843μs pulse + ~4507μs space
- Bit pulse: ~484μs
- Bit 0 space: ~645μs
- Bit 1 space: ~1770μs
"""
import logging
from typing import Dict, List, Optional, Tuple
class IRProtocol:
"""Base class for IR protocol decoding"""
def __init__(self, name: str):
self.name = name
self.logger = logging.getLogger(f"{__name__}.{name}")
def decode(self, pulses: List[Tuple[bool, float]]) -> Optional[str]:
"""Decode IR pulses to command string"""
raise NotImplementedError
class CustomIRProtocol(IRProtocol):
"""
Custom IR Protocol Decoder for Unknown Remote
Based on signal analysis showing:
- Most common: 71 pulses
- Header: ~8843μs pulse + ~4507μs space
- Bit pulse: ~484μs
- Bit 0 space: ~645μs
- Bit 1 space: ~1770μs
"""
def __init__(self, name: str = "CUSTOM"):
super().__init__(name)
# Timing constants based on signal analysis
# Header timing
self.HEADER_PULSE = 8843 # microseconds (from analysis)
self.HEADER_SPACE = 4507 # microseconds (from analysis)
# Bit timing - this protocol uses space width modulation
self.BIT_PULSE = 484 # microseconds (consistent pulse width)
self.BIT_0_SPACE = 645 # microseconds (short space = bit 0)
self.BIT_1_SPACE = 1770 # microseconds (long space = bit 1)
# Repeat code timing (if supported)
self.REPEAT_PULSE = 8843 # microseconds
self.REPEAT_SPACE = 2093 # microseconds (from analysis)
# Tolerance for timing matching
self.TOLERANCE = 0.25 # 25% tolerance for this protocol
# Expected frame structure
self.EXPECTED_PULSE_COUNT = 71 # Most common pulse count
self.DATA_BITS = 32 # Standard 32-bit data
self.ADDRESS_BITS = 16 # 16-bit address
self.COMMAND_BITS = 16 # 16-bit command
# Footer timing (long gap before repeat)
self.FOOTER_PULSE = 41949 # Very long pulse at end
self.FOOTER_SPACE = 8997 # Space after footer
def decode(self, pulses: List[Tuple[bool, float]]) -> Optional[str]:
"""
Decode IR pulses to command string
Args:
pulses: List of (is_pulse, duration) tuples
is_pulse: True for pulse, False for space
duration: Duration in seconds (will be converted to microseconds)
Returns:
Command string in format "CUSTOM_ADDRESS_COMMAND" or None if decode fails
"""
if len(pulses) < 2:
return None
# Convert durations to microseconds
pulse_times = [duration * 1000000 for _, duration in pulses]
# Check for repeat code first
repeat_code = self._check_repeat_code(pulse_times)
if repeat_code:
return repeat_code
# Check for normal frame
if len(pulse_times) != self.EXPECTED_PULSE_COUNT:
self.logger.debug(f"Expected {self.EXPECTED_PULSE_COUNT} pulses, got {len(pulse_times)}")
return None
# Decode the frame
return self._decode_frame(pulse_times)
def _check_repeat_code(self, pulse_times: List[float]) -> Optional[str]:
"""Check if this is a repeat code"""
if len(pulse_times) == 2:
pulse_time = pulse_times[0]
space_time = pulse_times[1]
if (self._is_timing_match(pulse_time, self.REPEAT_PULSE) and
self._is_timing_match(space_time, self.REPEAT_SPACE)):
return "REPEAT"
return None
def _decode_frame(self, pulse_times: List[float]) -> Optional[str]:
"""Decode a complete frame"""
# Check header (first two timings)
if not self._check_header(pulse_times[:2]):
return None
# Find where the data ends (look for the footer)
data_end = self._find_data_end(pulse_times[2:])
if data_end is None:
return None
# Decode data bits (skip the footer)
data_pulses = pulse_times[2:2+data_end]
address, command = self._decode_data_bits(data_pulses)
if address is None or command is None:
return None
return f"CUSTOM_{address:04X}_{command:04X}"
def _check_header(self, header_times: List[float]) -> bool:
"""Check if the header matches expected timing"""
if len(header_times) < 2:
return False
pulse_time = header_times[0]
space_time = header_times[1]
return (self._is_timing_match(pulse_time, self.HEADER_PULSE) and
self._is_timing_match(space_time, self.HEADER_SPACE))
def _find_data_end(self, data_times: List[float]) -> Optional[int]:
"""Find where the data section ends by looking for the footer"""
# Look for the very long pulse that indicates end of data
for i in range(0, len(data_times), 2):
if i < len(data_times):
pulse_time = data_times[i]
# Check if this is the footer pulse (very long)
if self._is_timing_match(pulse_time, self.FOOTER_PULSE):
return i # Return the index where data ends
# If no footer found, assume it's a standard 32-bit protocol
return 64 # 32 bits * 2 (pulse + space)
def _decode_data_bits(self, data_times: List[float]) -> Tuple[Optional[int], Optional[int]]:
"""Decode data bits from timing data"""
if len(data_times) < self.DATA_BITS * 2:
print(f"Not enough data: {len(data_times)} < {self.DATA_BITS * 2}")
return None, None
address = 0
command = 0
# Process data bits in pairs (pulse, space)
for i in range(0, min(len(data_times), self.DATA_BITS * 2), 2):
if i + 1 >= len(data_times):
print(f"Not enough data at bit {i//2}")
break
pulse_time = data_times[i]
space_time = data_times[i + 1]
# Check if pulse timing is valid (should be ~484μs)
if not self._is_timing_match(pulse_time, self.BIT_PULSE):
print(f"Invalid pulse timing at bit {i//2}: {pulse_time}μs (expected ~{self.BIT_PULSE}μs)")
return None, None
bit_index = i // 2
bit_value = self._decode_bit(space_time)
if bit_value is None:
print(f"Invalid space timing at bit {bit_index}: {space_time}μs (expected ~{self.BIT_0_SPACE}μs or ~{self.BIT_1_SPACE}μs)")
return None, None
# Set the bit in the appropriate field
if bit_index < self.ADDRESS_BITS:
if bit_value:
address |= (1 << bit_index)
else:
command_bit_index = bit_index - self.ADDRESS_BITS
if bit_value:
command |= (1 << command_bit_index)
return address, command
def _decode_bit(self, space_time: float) -> Optional[bool]:
"""Decode a single bit from space timing"""
if self._is_timing_match(space_time, self.BIT_1_SPACE):
return True
elif self._is_timing_match(space_time, self.BIT_0_SPACE):
return False
else:
return None
def _is_timing_match(self, actual: float, expected: float) -> bool:
"""Check if actual timing matches expected timing within tolerance"""
min_time = expected * (1 - self.TOLERANCE)
max_time = expected * (1 + self.TOLERANCE)
return min_time <= actual <= max_time
# New method to decode from raw timing data (not pulse/space pairs)
def decode_from_raw_timings(raw_timings: List[float]) -> Optional[str]:
"""
Decode from raw timing data by determining pulse/space sequence
Args:
raw_timings: List of timing values in microseconds
Returns:
Command string or None if decode fails
"""
if len(raw_timings) < 2:
return None
# Create protocol instance
protocol = CustomIRProtocol("RAW_CUSTOM")
# Check for repeat code first
if len(raw_timings) == 2:
pulse_time = raw_timings[0]
space_time = raw_timings[1]
if (protocol._is_timing_match(pulse_time, protocol.REPEAT_PULSE) and
protocol._is_timing_match(space_time, protocol.REPEAT_SPACE)):
return "REPEAT"
# Check for normal frame
if len(raw_timings) != protocol.EXPECTED_PULSE_COUNT:
return None
# Check header
if not protocol._check_header(raw_timings[:2]):
return None
# Find where the data ends
data_end = protocol._find_data_end(raw_timings[2:])
if data_end is None:
return None
# Decode data bits
data_pulses = raw_timings[2:2+data_end]
address, command = protocol._decode_data_bits(data_pulses)
if address is None or command is None:
return None
return f"CUSTOM_{address:04X}_{command:04X}"
# Example usage and testing
if __name__ == "__main__":
import json
# Setup logging
logging.basicConfig(level=logging.DEBUG)
# Test with captured signals
try:
with open("ir_analysis_20250927_190536.json", 'r') as f:
signals = json.load(f)
print("Testing custom protocol decoder with raw timing data...")
successful_decodes = 0
failed_decodes = 0
decoded_commands = {}
for i, signal_data in enumerate(signals):
pulse_count = signal_data['pulse_count']
raw_timings = signal_data['pulses'] # Already in microseconds
# Try to decode using raw timings
command = decode_from_raw_timings(raw_timings)
if command:
successful_decodes += 1
if command not in decoded_commands:
decoded_commands[command] = 0
decoded_commands[command] += 1
print(f"Signal {i+1:2d} ({pulse_count:2d} pulses): {command}")
else:
failed_decodes += 1
if pulse_count == 71: # Only show failed 71-pulse signals
print(f"Signal {i+1:2d} ({pulse_count:2d} pulses): FAILED TO DECODE")
print(f"\nSuccessful decodes: {successful_decodes}")
print(f"Failed decodes: {failed_decodes}")
print(f"Success rate: {successful_decodes/(successful_decodes+failed_decodes)*100:.1f}%")
if decoded_commands:
print("\nDecoded commands:")
for command, count in sorted(decoded_commands.items()):
print(f" {command}: {count} occurrences")
except FileNotFoundError:
print("No analysis file found. Run ir_signal_analyzer.py first to capture signals.")
except Exception as e:
print(f"Error: {e}")