Files
rpi-tulivision/custom_ir_protocol.py
2025-09-27 19:04:09 +02:00

276 lines
10 KiB
Python
Executable File

#!/usr/bin/env python3
"""
Custom IR Protocol Decoder Template
This is a template for creating custom IR protocol decoders based on signal analysis
"""
import logging
from typing import Dict, List, Optional, Tuple
from ir_remote import IRProtocol
class CustomIRProtocol(IRProtocol):
"""
Custom IR Protocol Decoder
This template provides a framework for implementing custom IR protocol decoders.
You need to customize the timing constants and decode logic based on your
signal analysis results.
"""
def __init__(self, name: str = "CUSTOM"):
super().__init__(name)
# TODO: Update these timing constants based on your signal analysis
# These are example values - replace with your actual protocol timings
# Header timing (if your protocol has a header)
self.HEADER_PULSE = 9000 # microseconds
self.HEADER_SPACE = 4500 # microseconds
# Bit timing (adjust based on your protocol's bit encoding)
self.BIT_1_PULSE = 560 # microseconds
self.BIT_1_SPACE = 1690 # microseconds
self.BIT_0_PULSE = 560 # microseconds
self.BIT_0_SPACE = 560 # microseconds
# Footer timing (if your protocol has a footer)
self.FOOTER_PULSE = 560 # microseconds
self.FOOTER_SPACE = 100000 # microseconds (long gap)
# Repeat code timing (if your protocol supports repeats)
self.REPEAT_PULSE = 9000 # microseconds
self.REPEAT_SPACE = 2250 # microseconds
# Tolerance for timing matching (20% is usually good)
self.TOLERANCE = 0.2
# Expected frame structure
self.EXPECTED_PULSE_COUNT = 34 # Adjust based on your analysis
self.DATA_BITS = 32 # Number of data bits
self.ADDRESS_BITS = 16 # Number of address bits
self.COMMAND_BITS = 16 # Number of command bits
def decode(self, pulses: List[Tuple[bool, float]]) -> Optional[str]:
"""
Decode IR pulses to command string
Args:
pulses: List of (is_pulse, duration) tuples
is_pulse: True for pulse, False for space
duration: Duration in seconds (will be converted to microseconds)
Returns:
Command string in format "CUSTOM_ADDRESS_COMMAND" or None if decode fails
"""
if len(pulses) < 2:
return None
# Convert durations to microseconds
pulse_times = [duration * 1000000 for _, duration in pulses]
# Check for repeat code first
repeat_code = self._check_repeat_code(pulse_times)
if repeat_code:
return repeat_code
# Check for normal frame
if len(pulse_times) != self.EXPECTED_PULSE_COUNT:
self.logger.debug(f"Expected {self.EXPECTED_PULSE_COUNT} pulses, got {len(pulse_times)}")
return None
# Decode the frame
return self._decode_frame(pulse_times)
def _check_repeat_code(self, pulse_times: List[float]) -> Optional[str]:
"""Check if this is a repeat code"""
if len(pulse_times) == 2:
pulse_time = pulse_times[0]
space_time = pulse_times[1]
if (self._is_timing_match(pulse_time, self.REPEAT_PULSE) and
self._is_timing_match(space_time, self.REPEAT_SPACE)):
return "REPEAT"
return None
def _decode_frame(self, pulse_times: List[float]) -> Optional[str]:
"""Decode a complete frame"""
# Check header (first two timings)
if not self._check_header(pulse_times[:2]):
return None
# Decode data bits
address, command = self._decode_data_bits(pulse_times[2:])
if address is None or command is None:
return None
return f"CUSTOM_{address:04X}_{command:04X}"
def _check_header(self, header_times: List[float]) -> bool:
"""Check if the header matches expected timing"""
if len(header_times) < 2:
return False
pulse_time = header_times[0]
space_time = header_times[1]
return (self._is_timing_match(pulse_time, self.HEADER_PULSE) and
self._is_timing_match(space_time, self.HEADER_SPACE))
def _decode_data_bits(self, data_times: List[float]) -> Tuple[Optional[int], Optional[int]]:
"""Decode data bits from timing data"""
if len(data_times) < self.DATA_BITS * 2:
return None, None
address = 0
command = 0
# Process data bits in pairs (pulse, space)
for i in range(0, min(len(data_times), self.DATA_BITS * 2), 2):
if i + 1 >= len(data_times):
break
pulse_time = data_times[i]
space_time = data_times[i + 1]
# Check if pulse timing is valid
if not self._is_timing_match(pulse_time, self.BIT_0_PULSE):
self.logger.debug(f"Invalid pulse timing at bit {i//2}: {pulse_time}")
return None, None
bit_index = i // 2
bit_value = self._decode_bit(space_time)
if bit_value is None:
self.logger.debug(f"Invalid space timing at bit {bit_index}: {space_time}")
return None, None
# Set the bit in the appropriate field
if bit_index < self.ADDRESS_BITS:
if bit_value:
address |= (1 << bit_index)
else:
command_bit_index = bit_index - self.ADDRESS_BITS
if bit_value:
command |= (1 << command_bit_index)
return address, command
def _decode_bit(self, space_time: float) -> Optional[bool]:
"""Decode a single bit from space timing"""
if self._is_timing_match(space_time, self.BIT_1_SPACE):
return True
elif self._is_timing_match(space_time, self.BIT_0_SPACE):
return False
else:
return None
def _is_timing_match(self, actual: float, expected: float) -> bool:
"""Check if actual timing matches expected timing within tolerance"""
min_time = expected * (1 - self.TOLERANCE)
max_time = expected * (1 + self.TOLERANCE)
return min_time <= actual <= max_time
def analyze_signal(self, pulse_times: List[float]) -> Dict:
"""
Analyze a signal to help understand the protocol structure
This is useful for debugging and protocol discovery
"""
analysis = {
'pulse_count': len(pulse_times),
'total_duration': sum(pulse_times),
'min_timing': min(pulse_times) if pulse_times else 0,
'max_timing': max(pulse_times) if pulse_times else 0,
'unique_timings': len(set(pulse_times)),
'timing_analysis': self._analyze_timings(pulse_times),
'possible_structure': self._guess_structure(pulse_times)
}
return analysis
def _analyze_timings(self, pulse_times: List[float]) -> Dict:
"""Analyze timing patterns in the signal"""
timing_groups = {}
tolerance = 0.2
for timing in pulse_times:
grouped = False
for group_key in timing_groups:
if abs(timing - group_key) / group_key <= tolerance:
timing_groups[group_key].append(timing)
grouped = True
break
if not grouped:
timing_groups[timing] = [timing]
# Find common timings
common_timings = {}
for group_key, group_timings in timing_groups.items():
if len(group_timings) > 1:
common_timings[group_key] = {
'count': len(group_timings),
'avg': sum(group_timings) / len(group_timings),
'min': min(group_timings),
'max': max(group_timings)
}
return {
'unique_timings': len(timing_groups),
'common_timings': common_timings,
'all_groups': timing_groups
}
def _guess_structure(self, pulse_times: List[float]) -> str:
"""Guess the protocol structure based on pulse count and timing"""
count = len(pulse_times)
if count == 2:
return "Possible repeat code"
elif count == 34:
return "Possible NEC-like protocol (34 pulses)"
elif count == 14:
return "Possible RC5-like protocol (14 bits)"
elif count % 2 == 0:
return f"Even pulse count ({count}) - likely pulse/space encoding"
else:
return f"Odd pulse count ({count}) - unusual pattern"
# Example usage and testing
if __name__ == "__main__":
import json
# Setup logging
logging.basicConfig(level=logging.DEBUG)
# Create custom protocol decoder
protocol = CustomIRProtocol("MY_CUSTOM")
# Example: Load captured signals from analyzer
try:
with open("ir_analysis_latest.json", 'r') as f:
signals = json.load(f)
print("Analyzing captured signals with custom protocol decoder...")
for i, signal_data in enumerate(signals):
print(f"\nSignal {i+1}:")
pulses = [(i % 2 == 0, duration / 1000000) for i, duration in enumerate(signal_data['pulses'])]
# Try to decode
command = protocol.decode(pulses)
if command:
print(f" Decoded: {command}")
else:
print(f" Failed to decode")
# Analyze signal
analysis = protocol.analyze_signal(signal_data['pulses'])
print(f" Analysis: {analysis['possible_structure']}")
except FileNotFoundError:
print("No analysis file found. Run ir_signal_analyzer.py first to capture signals.")
except Exception as e:
print(f"Error: {e}")