custom IR
This commit is contained in:
216
custom_ir_protocol_corrected.py
Normal file
216
custom_ir_protocol_corrected.py
Normal file
@@ -0,0 +1,216 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Corrected Custom IR Protocol Decoder for Unknown Remote
|
||||
|
||||
Based on detailed signal analysis:
|
||||
- Header: position 0 (8843μs pulse) + position 1 (4508μs space)
|
||||
- Data pulses: positions 2,4,6,8... (486μs)
|
||||
- Data spaces: positions 3,5,7,9... (645μs for bit 0, 1770μs for bit 1)
|
||||
- Footer: position 68 (8843μs pulse)
|
||||
- Total: 71 positions (0-70)
|
||||
"""
|
||||
|
||||
import logging
|
||||
from typing import Dict, List, Optional, Tuple
|
||||
|
||||
class CustomIRProtocol:
|
||||
"""
|
||||
Custom IR Protocol Decoder for Unknown Remote
|
||||
|
||||
Protocol structure:
|
||||
- Position 0: Header pulse (8843μs)
|
||||
- Position 1: Header space (4508μs)
|
||||
- Positions 2,4,6,8...: Data pulses (486μs)
|
||||
- Positions 3,5,7,9...: Data spaces (645μs=bit0, 1770μs=bit1)
|
||||
- Position 68: Footer pulse (8843μs)
|
||||
- Positions 69-70: Footer spaces
|
||||
"""
|
||||
|
||||
def __init__(self, name: str = "CUSTOM"):
|
||||
self.name = name
|
||||
self.logger = logging.getLogger(f"{__name__}.{name}")
|
||||
|
||||
# Timing constants based on signal analysis
|
||||
self.HEADER_PULSE = 8843 # microseconds
|
||||
self.HEADER_SPACE = 4507 # microseconds (from analysis)
|
||||
|
||||
# Data timing
|
||||
self.DATA_PULSE = 486 # microseconds (data pulses)
|
||||
self.BIT_0_SPACE = 645 # microseconds (short space = bit 0)
|
||||
self.BIT_1_SPACE = 1770 # microseconds (long space = bit 1)
|
||||
|
||||
# Footer timing
|
||||
self.FOOTER_PULSE = 8843 # microseconds (same as header)
|
||||
self.FOOTER_SPACE = 8997 # microseconds (from analysis)
|
||||
|
||||
# Tolerance for timing matching
|
||||
self.TOLERANCE = 0.25 # 25% tolerance
|
||||
|
||||
# Protocol structure
|
||||
self.TOTAL_POSITIONS = 71 # Total positions in signal
|
||||
self.DATA_START = 2 # Data starts at position 2
|
||||
self.DATA_END = 68 # Data ends at position 68
|
||||
self.DATA_BITS = 33 # 33 bits of data (positions 2-67)
|
||||
|
||||
def decode(self, raw_timings: List[float]) -> Optional[str]:
|
||||
"""
|
||||
Decode from raw timing data
|
||||
|
||||
Args:
|
||||
raw_timings: List of timing values in microseconds
|
||||
|
||||
Returns:
|
||||
Command string in format "CUSTOM_ADDRESS_COMMAND" or None if decode fails
|
||||
"""
|
||||
if len(raw_timings) != self.TOTAL_POSITIONS:
|
||||
return None
|
||||
|
||||
# Check header
|
||||
if not self._check_header(raw_timings[0], raw_timings[1]):
|
||||
return None
|
||||
|
||||
# Check footer
|
||||
if not self._check_footer(raw_timings[68]):
|
||||
return None
|
||||
|
||||
# Decode data bits
|
||||
address, command = self._decode_data_bits(raw_timings)
|
||||
|
||||
if address is None or command is None:
|
||||
return None
|
||||
|
||||
return f"CUSTOM_{address:04X}_{command:04X}"
|
||||
|
||||
def _check_header(self, pulse_time: float, space_time: float) -> bool:
|
||||
"""Check if header matches expected timing"""
|
||||
return (self._is_timing_match(pulse_time, self.HEADER_PULSE) and
|
||||
self._is_timing_match(space_time, self.HEADER_SPACE))
|
||||
|
||||
def _check_footer(self, pulse_time: float) -> bool:
|
||||
"""Check if footer matches expected timing"""
|
||||
return self._is_timing_match(pulse_time, self.FOOTER_PULSE)
|
||||
|
||||
def _decode_data_bits(self, raw_timings: List[float]) -> Tuple[Optional[int], Optional[int]]:
|
||||
"""Decode data bits from raw timing data"""
|
||||
address = 0
|
||||
command = 0
|
||||
|
||||
# Process data bits (positions 2-67, 33 bits total)
|
||||
for bit_index in range(self.DATA_BITS):
|
||||
pulse_pos = self.DATA_START + (bit_index * 2)
|
||||
space_pos = pulse_pos + 1
|
||||
|
||||
if pulse_pos >= len(raw_timings) or space_pos >= len(raw_timings):
|
||||
print(f"Not enough data for bit {bit_index}")
|
||||
return None, None
|
||||
|
||||
pulse_time = raw_timings[pulse_pos]
|
||||
space_time = raw_timings[space_pos]
|
||||
|
||||
# Check pulse timing
|
||||
if not self._is_timing_match(pulse_time, self.DATA_PULSE):
|
||||
print(f"Invalid pulse timing at bit {bit_index}: {pulse_time}μs (expected ~{self.DATA_PULSE}μs)")
|
||||
return None, None
|
||||
|
||||
# Decode bit from space timing
|
||||
bit_value = self._decode_bit(space_time)
|
||||
if bit_value is None:
|
||||
print(f"Invalid space timing at bit {bit_index}: {space_time}μs (expected ~{self.BIT_0_SPACE}μs or ~{self.BIT_1_SPACE}μs)")
|
||||
return None, None
|
||||
|
||||
# Set the bit in the appropriate field
|
||||
if bit_index < 16: # First 16 bits are address
|
||||
if bit_value:
|
||||
address |= (1 << bit_index)
|
||||
else: # Last 17 bits are command
|
||||
command_bit_index = bit_index - 16
|
||||
if bit_value:
|
||||
command |= (1 << command_bit_index)
|
||||
|
||||
return address, command
|
||||
|
||||
def _decode_bit(self, space_time: float) -> Optional[bool]:
|
||||
"""Decode a single bit from space timing"""
|
||||
if self._is_timing_match(space_time, self.BIT_1_SPACE):
|
||||
return True
|
||||
elif self._is_timing_match(space_time, self.BIT_0_SPACE):
|
||||
return False
|
||||
else:
|
||||
return None
|
||||
|
||||
def _is_timing_match(self, actual: float, expected: float) -> bool:
|
||||
"""Check if actual timing matches expected timing within tolerance"""
|
||||
min_time = expected * (1 - self.TOLERANCE)
|
||||
max_time = expected * (1 + self.TOLERANCE)
|
||||
return min_time <= actual <= max_time
|
||||
|
||||
def decode_from_raw_timings(raw_timings: List[float]) -> Optional[str]:
|
||||
"""
|
||||
Decode from raw timing data
|
||||
|
||||
Args:
|
||||
raw_timings: List of timing values in microseconds
|
||||
|
||||
Returns:
|
||||
Command string or None if decode fails
|
||||
"""
|
||||
protocol = CustomIRProtocol("RAW_CUSTOM")
|
||||
return protocol.decode(raw_timings)
|
||||
|
||||
# Test with captured signals
|
||||
if __name__ == "__main__":
|
||||
import json
|
||||
|
||||
# Setup logging
|
||||
logging.basicConfig(level=logging.INFO)
|
||||
|
||||
try:
|
||||
with open("ir_analysis_20250927_190536.json", 'r') as f:
|
||||
signals = json.load(f)
|
||||
|
||||
print("Testing corrected custom protocol decoder...")
|
||||
print("=" * 60)
|
||||
|
||||
successful_decodes = 0
|
||||
failed_decodes = 0
|
||||
decoded_commands = {}
|
||||
|
||||
for i, signal_data in enumerate(signals):
|
||||
pulse_count = signal_data['pulse_count']
|
||||
raw_timings = signal_data['pulses'] # Already in microseconds
|
||||
|
||||
# Try to decode using raw timings
|
||||
command = decode_from_raw_timings(raw_timings)
|
||||
|
||||
if command:
|
||||
successful_decodes += 1
|
||||
if command not in decoded_commands:
|
||||
decoded_commands[command] = 0
|
||||
decoded_commands[command] += 1
|
||||
|
||||
print(f"Signal {i+1:2d} ({pulse_count:2d} pulses): {command}")
|
||||
else:
|
||||
failed_decodes += 1
|
||||
if pulse_count == 71: # Only show failed 71-pulse signals
|
||||
print(f"Signal {i+1:2d} ({pulse_count:2d} pulses): FAILED TO DECODE")
|
||||
|
||||
print("=" * 60)
|
||||
print(f"Successful decodes: {successful_decodes}")
|
||||
print(f"Failed decodes: {failed_decodes}")
|
||||
print(f"Success rate: {successful_decodes/(successful_decodes+failed_decodes)*100:.1f}%")
|
||||
|
||||
if decoded_commands:
|
||||
print("\nDecoded commands:")
|
||||
for command, count in sorted(decoded_commands.items()):
|
||||
print(f" {command}: {count} occurrences")
|
||||
|
||||
if successful_decodes > 0:
|
||||
print("\n✅ SUCCESS! Custom protocol decoder is working!")
|
||||
print("You can now integrate this into your IR system.")
|
||||
else:
|
||||
print("\n❌ Decoder still needs adjustment.")
|
||||
|
||||
except FileNotFoundError:
|
||||
print("No analysis file found. Run ir_signal_analyzer.py first to capture signals.")
|
||||
except Exception as e:
|
||||
print(f"Error: {e}")
|
||||
Reference in New Issue
Block a user