307 lines
12 KiB
Python
Executable File
307 lines
12 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
"""
|
|
Custom IR Protocol Decoder Template
|
|
This is a template for creating custom IR protocol decoders based on signal analysis
|
|
"""
|
|
|
|
import logging
|
|
from typing import Dict, List, Optional, Tuple
|
|
|
|
class IRProtocol:
|
|
"""Base class for IR protocol decoding"""
|
|
|
|
def __init__(self, name: str):
|
|
self.name = name
|
|
self.logger = logging.getLogger(f"{__name__}.{name}")
|
|
|
|
def decode(self, pulses: List[Tuple[bool, float]]) -> Optional[str]:
|
|
"""Decode IR pulses to command string"""
|
|
raise NotImplementedError
|
|
|
|
class CustomIRProtocol(IRProtocol):
|
|
"""
|
|
Custom IR Protocol Decoder for Unknown Remote
|
|
|
|
Based on signal analysis showing:
|
|
- Most common: 71 pulses
|
|
- Header: ~8843μs pulse + ~4507μs space
|
|
- Bit pulse: ~484μs
|
|
- Bit 0 space: ~645μs
|
|
- Bit 1 space: ~1770μs
|
|
"""
|
|
|
|
def __init__(self, name: str = "CUSTOM"):
|
|
super().__init__(name)
|
|
|
|
# Timing constants based on signal analysis
|
|
# Header timing
|
|
self.HEADER_PULSE = 8843 # microseconds (from analysis)
|
|
self.HEADER_SPACE = 4507 # microseconds (from analysis)
|
|
|
|
# Bit timing - this protocol uses space width modulation
|
|
self.BIT_PULSE = 484 # microseconds (consistent pulse width)
|
|
self.BIT_0_SPACE = 645 # microseconds (short space = bit 0)
|
|
self.BIT_1_SPACE = 1770 # microseconds (long space = bit 1)
|
|
|
|
# Repeat code timing (if supported)
|
|
self.REPEAT_PULSE = 8843 # microseconds
|
|
self.REPEAT_SPACE = 2093 # microseconds (from analysis)
|
|
|
|
# Tolerance for timing matching
|
|
self.TOLERANCE = 0.25 # 25% tolerance for this protocol
|
|
|
|
# Expected frame structure
|
|
self.EXPECTED_PULSE_COUNT = 71 # Most common pulse count
|
|
self.DATA_BITS = 32 # Standard 32-bit data
|
|
self.ADDRESS_BITS = 16 # 16-bit address
|
|
self.COMMAND_BITS = 16 # 16-bit command
|
|
|
|
# Footer timing (long gap before repeat)
|
|
self.FOOTER_PULSE = 41949 # Very long pulse at end
|
|
self.FOOTER_SPACE = 8997 # Space after footer
|
|
|
|
def decode(self, pulses: List[Tuple[bool, float]]) -> Optional[str]:
|
|
"""
|
|
Decode IR pulses to command string
|
|
|
|
Args:
|
|
pulses: List of (is_pulse, duration) tuples
|
|
is_pulse: True for pulse, False for space
|
|
duration: Duration in seconds (will be converted to microseconds)
|
|
|
|
Returns:
|
|
Command string in format "CUSTOM_ADDRESS_COMMAND" or None if decode fails
|
|
"""
|
|
if len(pulses) < 2:
|
|
return None
|
|
|
|
# Convert durations to microseconds
|
|
pulse_times = [duration * 1000000 for _, duration in pulses]
|
|
|
|
# Check for repeat code first
|
|
repeat_code = self._check_repeat_code(pulse_times)
|
|
if repeat_code:
|
|
return repeat_code
|
|
|
|
# Check for normal frame
|
|
if len(pulse_times) != self.EXPECTED_PULSE_COUNT:
|
|
self.logger.debug(f"Expected {self.EXPECTED_PULSE_COUNT} pulses, got {len(pulse_times)}")
|
|
return None
|
|
|
|
# Decode the frame
|
|
return self._decode_frame(pulse_times)
|
|
|
|
def _check_repeat_code(self, pulse_times: List[float]) -> Optional[str]:
|
|
"""Check if this is a repeat code"""
|
|
if len(pulse_times) == 2:
|
|
pulse_time = pulse_times[0]
|
|
space_time = pulse_times[1]
|
|
|
|
if (self._is_timing_match(pulse_time, self.REPEAT_PULSE) and
|
|
self._is_timing_match(space_time, self.REPEAT_SPACE)):
|
|
return "REPEAT"
|
|
|
|
return None
|
|
|
|
def _decode_frame(self, pulse_times: List[float]) -> Optional[str]:
|
|
"""Decode a complete frame"""
|
|
# Check header (first two timings)
|
|
if not self._check_header(pulse_times[:2]):
|
|
return None
|
|
|
|
# Find where the data ends (look for the footer)
|
|
data_end = self._find_data_end(pulse_times[2:])
|
|
if data_end is None:
|
|
return None
|
|
|
|
# Decode data bits (skip the footer)
|
|
data_pulses = pulse_times[2:2+data_end]
|
|
address, command = self._decode_data_bits(data_pulses)
|
|
|
|
if address is None or command is None:
|
|
return None
|
|
|
|
return f"CUSTOM_{address:04X}_{command:04X}"
|
|
|
|
def _check_header(self, header_times: List[float]) -> bool:
|
|
"""Check if the header matches expected timing"""
|
|
if len(header_times) < 2:
|
|
return False
|
|
|
|
pulse_time = header_times[0]
|
|
space_time = header_times[1]
|
|
|
|
return (self._is_timing_match(pulse_time, self.HEADER_PULSE) and
|
|
self._is_timing_match(space_time, self.HEADER_SPACE))
|
|
|
|
def _find_data_end(self, data_times: List[float]) -> Optional[int]:
|
|
"""Find where the data section ends by looking for the footer"""
|
|
# Look for the very long pulse that indicates end of data
|
|
for i in range(0, len(data_times), 2):
|
|
if i < len(data_times):
|
|
pulse_time = data_times[i]
|
|
# Check if this is the footer pulse (very long)
|
|
if self._is_timing_match(pulse_time, self.FOOTER_PULSE):
|
|
return i # Return the index where data ends
|
|
|
|
# If no footer found, assume it's a standard 32-bit protocol
|
|
return 64 # 32 bits * 2 (pulse + space)
|
|
|
|
def _decode_data_bits(self, data_times: List[float]) -> Tuple[Optional[int], Optional[int]]:
|
|
"""Decode data bits from timing data"""
|
|
if len(data_times) < self.DATA_BITS * 2:
|
|
print(f"Not enough data: {len(data_times)} < {self.DATA_BITS * 2}")
|
|
return None, None
|
|
|
|
address = 0
|
|
command = 0
|
|
|
|
# Process data bits in pairs (pulse, space)
|
|
for i in range(0, min(len(data_times), self.DATA_BITS * 2), 2):
|
|
if i + 1 >= len(data_times):
|
|
print(f"Not enough data at bit {i//2}")
|
|
break
|
|
|
|
pulse_time = data_times[i]
|
|
space_time = data_times[i + 1]
|
|
|
|
# Check if pulse timing is valid (should be ~484μs)
|
|
if not self._is_timing_match(pulse_time, self.BIT_PULSE):
|
|
print(f"Invalid pulse timing at bit {i//2}: {pulse_time}μs (expected ~{self.BIT_PULSE}μs)")
|
|
return None, None
|
|
|
|
bit_index = i // 2
|
|
bit_value = self._decode_bit(space_time)
|
|
|
|
if bit_value is None:
|
|
print(f"Invalid space timing at bit {bit_index}: {space_time}μs (expected ~{self.BIT_0_SPACE}μs or ~{self.BIT_1_SPACE}μs)")
|
|
return None, None
|
|
|
|
# Set the bit in the appropriate field
|
|
if bit_index < self.ADDRESS_BITS:
|
|
if bit_value:
|
|
address |= (1 << bit_index)
|
|
else:
|
|
command_bit_index = bit_index - self.ADDRESS_BITS
|
|
if bit_value:
|
|
command |= (1 << command_bit_index)
|
|
|
|
return address, command
|
|
|
|
def _decode_bit(self, space_time: float) -> Optional[bool]:
|
|
"""Decode a single bit from space timing"""
|
|
if self._is_timing_match(space_time, self.BIT_1_SPACE):
|
|
return True
|
|
elif self._is_timing_match(space_time, self.BIT_0_SPACE):
|
|
return False
|
|
else:
|
|
return None
|
|
|
|
def _is_timing_match(self, actual: float, expected: float) -> bool:
|
|
"""Check if actual timing matches expected timing within tolerance"""
|
|
min_time = expected * (1 - self.TOLERANCE)
|
|
max_time = expected * (1 + self.TOLERANCE)
|
|
return min_time <= actual <= max_time
|
|
|
|
def analyze_signal(self, pulse_times: List[float]) -> Dict:
|
|
"""
|
|
Analyze a signal to help understand the protocol structure
|
|
This is useful for debugging and protocol discovery
|
|
"""
|
|
analysis = {
|
|
'pulse_count': len(pulse_times),
|
|
'total_duration': sum(pulse_times),
|
|
'min_timing': min(pulse_times) if pulse_times else 0,
|
|
'max_timing': max(pulse_times) if pulse_times else 0,
|
|
'unique_timings': len(set(pulse_times)),
|
|
'timing_analysis': self._analyze_timings(pulse_times),
|
|
'possible_structure': self._guess_structure(pulse_times)
|
|
}
|
|
|
|
return analysis
|
|
|
|
def _analyze_timings(self, pulse_times: List[float]) -> Dict:
|
|
"""Analyze timing patterns in the signal"""
|
|
timing_groups = {}
|
|
tolerance = 0.2
|
|
|
|
for timing in pulse_times:
|
|
grouped = False
|
|
for group_key in timing_groups:
|
|
if abs(timing - group_key) / group_key <= tolerance:
|
|
timing_groups[group_key].append(timing)
|
|
grouped = True
|
|
break
|
|
|
|
if not grouped:
|
|
timing_groups[timing] = [timing]
|
|
|
|
# Find common timings
|
|
common_timings = {}
|
|
for group_key, group_timings in timing_groups.items():
|
|
if len(group_timings) > 1:
|
|
common_timings[group_key] = {
|
|
'count': len(group_timings),
|
|
'avg': sum(group_timings) / len(group_timings),
|
|
'min': min(group_timings),
|
|
'max': max(group_timings)
|
|
}
|
|
|
|
return {
|
|
'unique_timings': len(timing_groups),
|
|
'common_timings': common_timings,
|
|
'all_groups': timing_groups
|
|
}
|
|
|
|
def _guess_structure(self, pulse_times: List[float]) -> str:
|
|
"""Guess the protocol structure based on pulse count and timing"""
|
|
count = len(pulse_times)
|
|
|
|
if count == 2:
|
|
return "Possible repeat code"
|
|
elif count == 34:
|
|
return "Possible NEC-like protocol (34 pulses)"
|
|
elif count == 14:
|
|
return "Possible RC5-like protocol (14 bits)"
|
|
elif count % 2 == 0:
|
|
return f"Even pulse count ({count}) - likely pulse/space encoding"
|
|
else:
|
|
return f"Odd pulse count ({count}) - unusual pattern"
|
|
|
|
# Example usage and testing
|
|
if __name__ == "__main__":
|
|
import json
|
|
|
|
# Setup logging
|
|
logging.basicConfig(level=logging.DEBUG)
|
|
|
|
# Create custom protocol decoder
|
|
protocol = CustomIRProtocol("MY_CUSTOM")
|
|
|
|
# Example: Load captured signals from analyzer
|
|
try:
|
|
with open("ir_analysis_latest.json", 'r') as f:
|
|
signals = json.load(f)
|
|
|
|
print("Analyzing captured signals with custom protocol decoder...")
|
|
|
|
for i, signal_data in enumerate(signals):
|
|
print(f"\nSignal {i+1}:")
|
|
pulses = [(i % 2 == 0, duration / 1000000) for i, duration in enumerate(signal_data['pulses'])]
|
|
|
|
# Try to decode
|
|
command = protocol.decode(pulses)
|
|
if command:
|
|
print(f" Decoded: {command}")
|
|
else:
|
|
print(f" Failed to decode")
|
|
|
|
# Analyze signal
|
|
analysis = protocol.analyze_signal(signal_data['pulses'])
|
|
print(f" Analysis: {analysis['possible_structure']}")
|
|
|
|
except FileNotFoundError:
|
|
print("No analysis file found. Run ir_signal_analyzer.py first to capture signals.")
|
|
except Exception as e:
|
|
print(f"Error: {e}")
|