220 lines
8.3 KiB
Python
220 lines
8.3 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Flexible Custom IR Protocol Decoder for Unknown Remote
|
|
|
|
This decoder tries to handle irregular timing patterns by being more flexible
|
|
about what constitutes a valid pulse/space sequence.
|
|
"""
|
|
|
|
import logging
|
|
from typing import Dict, List, Optional, Tuple
|
|
|
|
class CustomIRProtocol:
|
|
"""
|
|
Flexible Custom IR Protocol Decoder
|
|
|
|
This decoder is more tolerant of timing variations and tries to decode
|
|
the signal even if some timings don't match exactly.
|
|
"""
|
|
|
|
def __init__(self, name: str = "CUSTOM"):
|
|
self.name = name
|
|
self.logger = logging.getLogger(f"{__name__}.{name}")
|
|
|
|
# Timing constants based on signal analysis
|
|
self.HEADER_PULSE = 8843 # microseconds
|
|
self.HEADER_SPACE = 4507 # microseconds
|
|
|
|
# Data timing (with wider tolerance)
|
|
self.DATA_PULSE_MIN = 400 # Minimum pulse time
|
|
self.DATA_PULSE_MAX = 700 # Maximum pulse time
|
|
self.BIT_0_SPACE_MIN = 600 # Minimum bit 0 space
|
|
self.BIT_0_SPACE_MAX = 800 # Maximum bit 0 space
|
|
self.BIT_1_SPACE_MIN = 1500 # Minimum bit 1 space
|
|
self.BIT_1_SPACE_MAX = 2000 # Maximum bit 1 space
|
|
|
|
# Footer timing
|
|
self.FOOTER_PULSE = 8843 # microseconds
|
|
|
|
# Protocol structure
|
|
self.TOTAL_POSITIONS = 71 # Total positions in signal
|
|
self.DATA_START = 2 # Data starts at position 2
|
|
self.DATA_END = 68 # Data ends at position 68
|
|
self.DATA_BITS = 33 # 33 bits of data
|
|
|
|
def decode(self, raw_timings: List[float]) -> Optional[str]:
|
|
"""
|
|
Decode from raw timing data with flexible timing matching
|
|
|
|
Args:
|
|
raw_timings: List of timing values in microseconds
|
|
|
|
Returns:
|
|
Command string in format "CUSTOM_ADDRESS_COMMAND" or None if decode fails
|
|
"""
|
|
if len(raw_timings) != self.TOTAL_POSITIONS:
|
|
return None
|
|
|
|
# Check header
|
|
if not self._check_header(raw_timings[0], raw_timings[1]):
|
|
return None
|
|
|
|
# Check footer
|
|
if not self._check_footer(raw_timings[68]):
|
|
return None
|
|
|
|
# Decode data bits with flexible matching
|
|
address, command = self._decode_data_bits_flexible(raw_timings)
|
|
|
|
if address is None or command is None:
|
|
return None
|
|
|
|
return f"CUSTOM_{address:04X}_{command:04X}"
|
|
|
|
def _check_header(self, pulse_time: float, space_time: float) -> bool:
|
|
"""Check if header matches expected timing"""
|
|
return (self._is_timing_match(pulse_time, self.HEADER_PULSE, 0.1) and
|
|
self._is_timing_match(space_time, self.HEADER_SPACE, 0.1))
|
|
|
|
def _check_footer(self, pulse_time: float) -> bool:
|
|
"""Check if footer matches expected timing"""
|
|
return self._is_timing_match(pulse_time, self.FOOTER_PULSE, 0.1)
|
|
|
|
def _decode_data_bits_flexible(self, raw_timings: List[float]) -> Tuple[Optional[int], Optional[int]]:
|
|
"""Decode data bits with flexible timing matching"""
|
|
address = 0
|
|
command = 0
|
|
successful_bits = 0
|
|
|
|
# Process data bits (positions 2-67, 33 bits total)
|
|
for bit_index in range(self.DATA_BITS):
|
|
pulse_pos = self.DATA_START + (bit_index * 2)
|
|
space_pos = pulse_pos + 1
|
|
|
|
if pulse_pos >= len(raw_timings) or space_pos >= len(raw_timings):
|
|
break
|
|
|
|
pulse_time = raw_timings[pulse_pos]
|
|
space_time = raw_timings[space_pos]
|
|
|
|
# Check if pulse timing is reasonable (flexible)
|
|
if not (self.DATA_PULSE_MIN <= pulse_time <= self.DATA_PULSE_MAX):
|
|
# If pulse timing is wrong, maybe it's actually a space
|
|
# Try to decode based on the timing value itself
|
|
bit_value = self._decode_bit_flexible(pulse_time)
|
|
if bit_value is not None:
|
|
# Use this timing as the bit value
|
|
if bit_index < 16:
|
|
if bit_value:
|
|
address |= (1 << bit_index)
|
|
else:
|
|
command_bit_index = bit_index - 16
|
|
if bit_value:
|
|
command |= (1 << command_bit_index)
|
|
successful_bits += 1
|
|
continue
|
|
|
|
# Normal decoding: pulse should be reasonable, decode from space
|
|
bit_value = self._decode_bit_flexible(space_time)
|
|
if bit_value is not None:
|
|
if bit_index < 16:
|
|
if bit_value:
|
|
address |= (1 << bit_index)
|
|
else:
|
|
command_bit_index = bit_index - 16
|
|
if bit_value:
|
|
command |= (1 << command_bit_index)
|
|
successful_bits += 1
|
|
|
|
# Require at least 80% of bits to be successfully decoded
|
|
if successful_bits < (self.DATA_BITS * 0.8):
|
|
return None, None
|
|
|
|
return address, command
|
|
|
|
def _decode_bit_flexible(self, timing: float) -> Optional[bool]:
|
|
"""Decode a single bit from timing with flexible matching"""
|
|
if self.BIT_1_SPACE_MIN <= timing <= self.BIT_1_SPACE_MAX:
|
|
return True
|
|
elif self.BIT_0_SPACE_MIN <= timing <= self.BIT_0_SPACE_MAX:
|
|
return False
|
|
else:
|
|
return None
|
|
|
|
def _is_timing_match(self, actual: float, expected: float, tolerance: float = 0.25) -> bool:
|
|
"""Check if actual timing matches expected timing within tolerance"""
|
|
min_time = expected * (1 - tolerance)
|
|
max_time = expected * (1 + tolerance)
|
|
return min_time <= actual <= max_time
|
|
|
|
def decode_from_raw_timings(raw_timings: List[float]) -> Optional[str]:
|
|
"""
|
|
Decode from raw timing data with flexible matching
|
|
|
|
Args:
|
|
raw_timings: List of timing values in microseconds
|
|
|
|
Returns:
|
|
Command string or None if decode fails
|
|
"""
|
|
protocol = CustomIRProtocol("FLEXIBLE_CUSTOM")
|
|
return protocol.decode(raw_timings)
|
|
|
|
# Test with captured signals
|
|
if __name__ == "__main__":
|
|
import json
|
|
|
|
# Setup logging
|
|
logging.basicConfig(level=logging.INFO)
|
|
|
|
try:
|
|
with open("ir_analysis_20250927_190536.json", 'r') as f:
|
|
signals = json.load(f)
|
|
|
|
print("Testing flexible custom protocol decoder...")
|
|
print("=" * 60)
|
|
|
|
successful_decodes = 0
|
|
failed_decodes = 0
|
|
decoded_commands = {}
|
|
|
|
for i, signal_data in enumerate(signals):
|
|
pulse_count = signal_data['pulse_count']
|
|
raw_timings = signal_data['pulses'] # Already in microseconds
|
|
|
|
# Try to decode using raw timings
|
|
command = decode_from_raw_timings(raw_timings)
|
|
|
|
if command:
|
|
successful_decodes += 1
|
|
if command not in decoded_commands:
|
|
decoded_commands[command] = 0
|
|
decoded_commands[command] += 1
|
|
|
|
print(f"Signal {i+1:2d} ({pulse_count:2d} pulses): {command}")
|
|
else:
|
|
failed_decodes += 1
|
|
if pulse_count == 71: # Only show failed 71-pulse signals
|
|
print(f"Signal {i+1:2d} ({pulse_count:2d} pulses): FAILED TO DECODE")
|
|
|
|
print("=" * 60)
|
|
print(f"Successful decodes: {successful_decodes}")
|
|
print(f"Failed decodes: {failed_decodes}")
|
|
print(f"Success rate: {successful_decodes/(successful_decodes+failed_decodes)*100:.1f}%")
|
|
|
|
if decoded_commands:
|
|
print("\nDecoded commands:")
|
|
for command, count in sorted(decoded_commands.items()):
|
|
print(f" {command}: {count} occurrences")
|
|
|
|
if successful_decodes > 0:
|
|
print("\n✅ SUCCESS! Flexible custom protocol decoder is working!")
|
|
print("You can now integrate this into your IR system.")
|
|
else:
|
|
print("\n❌ Decoder still needs adjustment.")
|
|
|
|
except FileNotFoundError:
|
|
print("No analysis file found. Run ir_signal_analyzer.py first to capture signals.")
|
|
except Exception as e:
|
|
print(f"Error: {e}")
|