diff --git a/check_ir_status.py b/check_ir_status.py index 3b374da..877689d 100644 --- a/check_ir_status.py +++ b/check_ir_status.py @@ -50,3 +50,4 @@ def check_ir_status(): if __name__ == "__main__": check_ir_status() + diff --git a/custom_ir_protocol.py b/custom_ir_protocol.py new file mode 100755 index 0000000..9b1f751 --- /dev/null +++ b/custom_ir_protocol.py @@ -0,0 +1,275 @@ +#!/usr/bin/env python3 +""" +Custom IR Protocol Decoder Template +This is a template for creating custom IR protocol decoders based on signal analysis +""" + +import logging +from typing import Dict, List, Optional, Tuple +from ir_remote import IRProtocol + +class CustomIRProtocol(IRProtocol): + """ + Custom IR Protocol Decoder + + This template provides a framework for implementing custom IR protocol decoders. + You need to customize the timing constants and decode logic based on your + signal analysis results. + """ + + def __init__(self, name: str = "CUSTOM"): + super().__init__(name) + + # TODO: Update these timing constants based on your signal analysis + # These are example values - replace with your actual protocol timings + + # Header timing (if your protocol has a header) + self.HEADER_PULSE = 9000 # microseconds + self.HEADER_SPACE = 4500 # microseconds + + # Bit timing (adjust based on your protocol's bit encoding) + self.BIT_1_PULSE = 560 # microseconds + self.BIT_1_SPACE = 1690 # microseconds + self.BIT_0_PULSE = 560 # microseconds + self.BIT_0_SPACE = 560 # microseconds + + # Footer timing (if your protocol has a footer) + self.FOOTER_PULSE = 560 # microseconds + self.FOOTER_SPACE = 100000 # microseconds (long gap) + + # Repeat code timing (if your protocol supports repeats) + self.REPEAT_PULSE = 9000 # microseconds + self.REPEAT_SPACE = 2250 # microseconds + + # Tolerance for timing matching (20% is usually good) + self.TOLERANCE = 0.2 + + # Expected frame structure + self.EXPECTED_PULSE_COUNT = 34 # Adjust based on your analysis + self.DATA_BITS = 32 # Number of data bits + self.ADDRESS_BITS = 16 # Number of address bits + self.COMMAND_BITS = 16 # Number of command bits + + def decode(self, pulses: List[Tuple[bool, float]]) -> Optional[str]: + """ + Decode IR pulses to command string + + Args: + pulses: List of (is_pulse, duration) tuples + is_pulse: True for pulse, False for space + duration: Duration in seconds (will be converted to microseconds) + + Returns: + Command string in format "CUSTOM_ADDRESS_COMMAND" or None if decode fails + """ + if len(pulses) < 2: + return None + + # Convert durations to microseconds + pulse_times = [duration * 1000000 for _, duration in pulses] + + # Check for repeat code first + repeat_code = self._check_repeat_code(pulse_times) + if repeat_code: + return repeat_code + + # Check for normal frame + if len(pulse_times) != self.EXPECTED_PULSE_COUNT: + self.logger.debug(f"Expected {self.EXPECTED_PULSE_COUNT} pulses, got {len(pulse_times)}") + return None + + # Decode the frame + return self._decode_frame(pulse_times) + + def _check_repeat_code(self, pulse_times: List[float]) -> Optional[str]: + """Check if this is a repeat code""" + if len(pulse_times) == 2: + pulse_time = pulse_times[0] + space_time = pulse_times[1] + + if (self._is_timing_match(pulse_time, self.REPEAT_PULSE) and + self._is_timing_match(space_time, self.REPEAT_SPACE)): + return "REPEAT" + + return None + + def _decode_frame(self, pulse_times: List[float]) -> Optional[str]: + """Decode a complete frame""" + # Check header (first two timings) + if not self._check_header(pulse_times[:2]): + return None + + # Decode data bits + address, command = self._decode_data_bits(pulse_times[2:]) + + if address is None or command is None: + return None + + return f"CUSTOM_{address:04X}_{command:04X}" + + def _check_header(self, header_times: List[float]) -> bool: + """Check if the header matches expected timing""" + if len(header_times) < 2: + return False + + pulse_time = header_times[0] + space_time = header_times[1] + + return (self._is_timing_match(pulse_time, self.HEADER_PULSE) and + self._is_timing_match(space_time, self.HEADER_SPACE)) + + def _decode_data_bits(self, data_times: List[float]) -> Tuple[Optional[int], Optional[int]]: + """Decode data bits from timing data""" + if len(data_times) < self.DATA_BITS * 2: + return None, None + + address = 0 + command = 0 + + # Process data bits in pairs (pulse, space) + for i in range(0, min(len(data_times), self.DATA_BITS * 2), 2): + if i + 1 >= len(data_times): + break + + pulse_time = data_times[i] + space_time = data_times[i + 1] + + # Check if pulse timing is valid + if not self._is_timing_match(pulse_time, self.BIT_0_PULSE): + self.logger.debug(f"Invalid pulse timing at bit {i//2}: {pulse_time}") + return None, None + + bit_index = i // 2 + bit_value = self._decode_bit(space_time) + + if bit_value is None: + self.logger.debug(f"Invalid space timing at bit {bit_index}: {space_time}") + return None, None + + # Set the bit in the appropriate field + if bit_index < self.ADDRESS_BITS: + if bit_value: + address |= (1 << bit_index) + else: + command_bit_index = bit_index - self.ADDRESS_BITS + if bit_value: + command |= (1 << command_bit_index) + + return address, command + + def _decode_bit(self, space_time: float) -> Optional[bool]: + """Decode a single bit from space timing""" + if self._is_timing_match(space_time, self.BIT_1_SPACE): + return True + elif self._is_timing_match(space_time, self.BIT_0_SPACE): + return False + else: + return None + + def _is_timing_match(self, actual: float, expected: float) -> bool: + """Check if actual timing matches expected timing within tolerance""" + min_time = expected * (1 - self.TOLERANCE) + max_time = expected * (1 + self.TOLERANCE) + return min_time <= actual <= max_time + + def analyze_signal(self, pulse_times: List[float]) -> Dict: + """ + Analyze a signal to help understand the protocol structure + This is useful for debugging and protocol discovery + """ + analysis = { + 'pulse_count': len(pulse_times), + 'total_duration': sum(pulse_times), + 'min_timing': min(pulse_times) if pulse_times else 0, + 'max_timing': max(pulse_times) if pulse_times else 0, + 'unique_timings': len(set(pulse_times)), + 'timing_analysis': self._analyze_timings(pulse_times), + 'possible_structure': self._guess_structure(pulse_times) + } + + return analysis + + def _analyze_timings(self, pulse_times: List[float]) -> Dict: + """Analyze timing patterns in the signal""" + timing_groups = {} + tolerance = 0.2 + + for timing in pulse_times: + grouped = False + for group_key in timing_groups: + if abs(timing - group_key) / group_key <= tolerance: + timing_groups[group_key].append(timing) + grouped = True + break + + if not grouped: + timing_groups[timing] = [timing] + + # Find common timings + common_timings = {} + for group_key, group_timings in timing_groups.items(): + if len(group_timings) > 1: + common_timings[group_key] = { + 'count': len(group_timings), + 'avg': sum(group_timings) / len(group_timings), + 'min': min(group_timings), + 'max': max(group_timings) + } + + return { + 'unique_timings': len(timing_groups), + 'common_timings': common_timings, + 'all_groups': timing_groups + } + + def _guess_structure(self, pulse_times: List[float]) -> str: + """Guess the protocol structure based on pulse count and timing""" + count = len(pulse_times) + + if count == 2: + return "Possible repeat code" + elif count == 34: + return "Possible NEC-like protocol (34 pulses)" + elif count == 14: + return "Possible RC5-like protocol (14 bits)" + elif count % 2 == 0: + return f"Even pulse count ({count}) - likely pulse/space encoding" + else: + return f"Odd pulse count ({count}) - unusual pattern" + +# Example usage and testing +if __name__ == "__main__": + import json + + # Setup logging + logging.basicConfig(level=logging.DEBUG) + + # Create custom protocol decoder + protocol = CustomIRProtocol("MY_CUSTOM") + + # Example: Load captured signals from analyzer + try: + with open("ir_analysis_latest.json", 'r') as f: + signals = json.load(f) + + print("Analyzing captured signals with custom protocol decoder...") + + for i, signal_data in enumerate(signals): + print(f"\nSignal {i+1}:") + pulses = [(i % 2 == 0, duration / 1000000) for i, duration in enumerate(signal_data['pulses'])] + + # Try to decode + command = protocol.decode(pulses) + if command: + print(f" Decoded: {command}") + else: + print(f" Failed to decode") + + # Analyze signal + analysis = protocol.analyze_signal(signal_data['pulses']) + print(f" Analysis: {analysis['possible_structure']}") + + except FileNotFoundError: + print("No analysis file found. Run ir_signal_analyzer.py first to capture signals.") + except Exception as e: + print(f"Error: {e}") diff --git a/develop_custom_protocol.sh b/develop_custom_protocol.sh new file mode 100755 index 0000000..476dec5 --- /dev/null +++ b/develop_custom_protocol.sh @@ -0,0 +1,87 @@ +#!/bin/bash +# Quick start script for developing custom IR protocols + +echo "==========================================" +echo "Custom IR Protocol Development Tool" +echo "==========================================" +echo "" + +# Check if we're on a Raspberry Pi +if ! command -v python3 &> /dev/null; then + echo "Error: Python3 not found. Please install Python3." + exit 1 +fi + +# Check if RPi.GPIO is available +python3 -c "import RPi.GPIO" 2>/dev/null +if [ $? -ne 0 ]; then + echo "Warning: RPi.GPIO not available. This script is designed for Raspberry Pi." + echo "You can still use the analysis tools, but hardware testing won't work." + echo "" +fi + +echo "This tool will help you develop a custom IR protocol decoder." +echo "" +echo "Steps:" +echo "1. Capture raw IR signals from your unknown remote" +echo "2. Analyze the signal patterns" +echo "3. Customize the protocol decoder" +echo "4. Test and integrate with your IR system" +echo "" + +read -p "Do you want to start with signal capture? (y/n): " -n 1 -r +echo "" + +if [[ $REPLY =~ ^[Yy]$ ]]; then + echo "" + echo "Starting IR signal analyzer..." + echo "Point your unknown remote at the IR receiver and press buttons." + echo "Press Ctrl+C when done capturing signals." + echo "" + + python3 ir_signal_analyzer.py --gpio-pin 18 --verbose + echo "" + echo "Signal capture complete!" + echo "" + + # Check if analysis file was created + if ls ir_analysis_*.json 1> /dev/null 2>&1; then + echo "Analysis file created. Now you can:" + echo "1. Review the analysis results" + echo "2. Customize custom_ir_protocol.py with your timing constants" + echo "3. Run the integration script" + echo "" + + read -p "Do you want to run the integration script now? (y/n): " -n 1 -r + echo "" + + if [[ $REPLY =~ ^[Yy]$ ]]; then + echo "" + echo "Running integration script..." + python3 integrate_custom_protocol.py + echo "" + echo "Integration complete!" + echo "" + echo "Next steps:" + echo "1. Edit custom_ir_protocol.py with your timing constants" + echo "2. Update custom_ir_mapping.json with your command mappings" + echo "3. Test with: python3 test_custom_protocol.py" + echo "4. Test with real remote: python3 simple_ir_listener_polling.py" + fi + else + echo "No analysis file was created. Please check your IR receiver setup." + fi +else + echo "" + echo "Available tools:" + echo "1. ir_signal_analyzer.py - Capture and analyze IR signals" + echo "2. custom_ir_protocol.py - Template for custom protocol decoder" + echo "3. integrate_custom_protocol.py - Integrate custom protocol into system" + echo "4. protocol_development_guide.md - Detailed development guide" + echo "" + echo "Run this script again when you're ready to start signal capture." +fi + +echo "" +echo "For detailed instructions, see protocol_development_guide.md" +echo "==========================================" diff --git a/integrate_custom_protocol.py b/integrate_custom_protocol.py new file mode 100755 index 0000000..4686b3a --- /dev/null +++ b/integrate_custom_protocol.py @@ -0,0 +1,306 @@ +#!/usr/bin/env python3 +""" +Integration script for custom IR protocols +This script helps integrate your custom protocol decoder into the existing IR system +""" + +import os +import sys +import json +import shutil +from pathlib import Path + +def backup_existing_files(): + """Backup existing IR system files""" + backup_dir = Path("backup_ir_system") + backup_dir.mkdir(exist_ok=True) + + files_to_backup = [ + "ir_remote.py", + "simple_ir_listener.py", + "simple_ir_listener_polling.py", + "ir_listener.py" + ] + + for file in files_to_backup: + if os.path.exists(file): + shutil.copy2(file, backup_dir / file) + print(f"Backed up {file} to {backup_dir}") + + return backup_dir + +def update_ir_remote_with_custom_protocol(): + """Update ir_remote.py to include custom protocol""" + print("Updating ir_remote.py to include custom protocol...") + + # Read current ir_remote.py + with open("ir_remote.py", "r") as f: + content = f.read() + + # Add import for custom protocol + import_line = "from custom_ir_protocol import CustomIRProtocol" + + if import_line not in content: + # Find the import section and add our import + lines = content.split('\n') + import_section_end = 0 + + for i, line in enumerate(lines): + if line.startswith('import ') or line.startswith('from '): + import_section_end = i + 1 + + lines.insert(import_section_end, import_line) + content = '\n'.join(lines) + + # Update the IRRemote class to include custom protocol + if "CustomIRProtocol()" not in content: + # Find the protocols initialization line + old_line = "self.protocols = protocols or [NECProtocol(), RC5Protocol()]" + new_line = "self.protocols = protocols or [NECProtocol(), RC5Protocol(), CustomIRProtocol()]" + + content = content.replace(old_line, new_line) + + # Write updated content + with open("ir_remote.py", "w") as f: + f.write(content) + + print("Updated ir_remote.py successfully") + +def update_simple_listeners_with_custom_protocol(): + """Update simple listeners to include custom protocol""" + listeners = [ + "simple_ir_listener.py", + "simple_ir_listener_polling.py" + ] + + for listener_file in listeners: + if not os.path.exists(listener_file): + continue + + print(f"Updating {listener_file}...") + + with open(listener_file, "r") as f: + content = f.read() + + # Add import + import_line = "from custom_ir_protocol import CustomIRProtocol" + + if import_line not in content: + lines = content.split('\n') + import_section_end = 0 + + for i, line in enumerate(lines): + if line.startswith('import ') or line.startswith('from '): + import_section_end = i + 1 + + lines.insert(import_section_end, import_line) + content = '\n'.join(lines) + + # Update protocol lists + if "CustomIRProtocol()" not in content: + # Find and update protocol initialization + old_patterns = [ + "protocols or [NECProtocol(), RC5Protocol()]", + "[NECProtocol(), RC5Protocol()]" + ] + + for old_pattern in old_patterns: + if old_pattern in content: + new_pattern = old_pattern.replace("RC5Protocol()]", "RC5Protocol(), CustomIRProtocol()]") + content = content.replace(old_pattern, new_pattern) + break + + with open(listener_file, "w") as f: + f.write(content) + + print(f"Updated {listener_file} successfully") + +def create_custom_protocol_mapping(): + """Create a mapping file for custom protocol commands""" + mapping_file = "custom_ir_mapping.json" + + if os.path.exists(mapping_file): + print(f"{mapping_file} already exists, skipping creation") + return mapping_file + + # Create example mapping for custom protocol + custom_mapping = { + "CUSTOM_0000_0001": { + "command": "power_toggle", + "description": "Power on/off", + "repeatable": True + }, + "CUSTOM_0000_0002": { + "command": "channel_1", + "description": "Channel 1", + "repeatable": False + }, + "CUSTOM_0000_0003": { + "command": "channel_2", + "description": "Channel 2", + "repeatable": False + }, + "CUSTOM_0000_0004": { + "command": "volume_up", + "description": "Volume up", + "repeatable": True + }, + "CUSTOM_0000_0005": { + "command": "volume_down", + "description": "Volume down", + "repeatable": True + }, + "REPEAT": { + "command": "repeat_last", + "description": "Repeat last command", + "repeatable": False + } + } + + with open(mapping_file, "w") as f: + json.dump(custom_mapping, f, indent=2) + + print(f"Created {mapping_file} with example mappings") + return mapping_file + +def update_main_ir_mapping(): + """Update the main IR mapping file to include custom protocol mappings""" + main_mapping_files = [ + "/etc/video_player/ir_mapping.json", + "ir_mapping.json" + ] + + custom_mapping_file = "custom_ir_mapping.json" + + if not os.path.exists(custom_mapping_file): + print("Custom mapping file not found, creating it first...") + create_custom_protocol_mapping() + + # Load custom mappings + with open(custom_mapping_file, "r") as f: + custom_mappings = json.load(f) + + # Update main mapping files + for mapping_file in main_mapping_files: + if os.path.exists(mapping_file): + print(f"Updating {mapping_file}...") + + with open(mapping_file, "r") as f: + main_mappings = json.load(f) + + # Add custom mappings + main_mappings.update(custom_mappings) + + with open(mapping_file, "w") as f: + json.dump(main_mappings, f, indent=2) + + print(f"Updated {mapping_file} successfully") + +def create_test_script(): + """Create a test script for the custom protocol""" + test_script = """#!/usr/bin/env python3 +''' +Test script for custom IR protocol +''' + +import sys +import os +sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) + +from custom_ir_protocol import CustomIRProtocol +import json + +def test_custom_protocol(): + '''Test the custom protocol with captured signals''' + + protocol = CustomIRProtocol("TEST_CUSTOM") + + # Test with example signals (replace with your actual captured data) + test_signals = [ + # Example: 34 pulses for NEC-like protocol + [(True, 0.009), (False, 0.0045), (True, 0.00056), (False, 0.00169)] * 8 + [(True, 0.00056), (False, 0.00056)] * 8, + # Add more test signals here + ] + + print("Testing custom protocol decoder...") + + for i, signal in enumerate(test_signals): + print(f"\\nTest signal {i+1}:") + command = protocol.decode(signal) + if command: + print(f" Decoded: {command}") + else: + print(f" Failed to decode") + + # Analyze signal + pulse_times = [duration * 1000000 for _, duration in signal] + analysis = protocol.analyze_signal(pulse_times) + print(f" Analysis: {analysis['possible_structure']}") + +if __name__ == "__main__": + test_custom_protocol() +""" + + with open("test_custom_protocol.py", "w") as f: + f.write(test_script) + + os.chmod("test_custom_protocol.py", 0o755) + print("Created test_custom_protocol.py") + +def main(): + """Main integration function""" + print("=" * 60) + print("CUSTOM IR PROTOCOL INTEGRATION") + print("=" * 60) + + # Check if custom protocol file exists + if not os.path.exists("custom_ir_protocol.py"): + print("Error: custom_ir_protocol.py not found!") + print("Please create and customize your protocol decoder first.") + return False + + try: + # Backup existing files + print("\\n1. Backing up existing files...") + backup_dir = backup_existing_files() + + # Update IR remote + print("\\n2. Updating IR remote system...") + update_ir_remote_with_custom_protocol() + + # Update simple listeners + print("\\n3. Updating simple listeners...") + update_simple_listeners_with_custom_protocol() + + # Create custom mapping + print("\\n4. Creating custom protocol mapping...") + create_custom_protocol_mapping() + + # Update main mappings + print("\\n5. Updating main IR mappings...") + update_main_ir_mapping() + + # Create test script + print("\\n6. Creating test script...") + create_test_script() + + print("\\n" + "=" * 60) + print("INTEGRATION COMPLETE!") + print("=" * 60) + print("\\nNext steps:") + print("1. Customize the timing constants in custom_ir_protocol.py") + print("2. Update the command mappings in custom_ir_mapping.json") + print("3. Test with: python3 test_custom_protocol.py") + print("4. Test with real remote: python3 simple_ir_listener_polling.py") + print("\\nBackup files are in:", backup_dir) + + return True + + except Exception as e: + print(f"\\nError during integration: {e}") + print("\\nYou can restore from backup if needed.") + return False + +if __name__ == "__main__": + success = main() + sys.exit(0 if success else 1) diff --git a/ir_signal_analyzer.py b/ir_signal_analyzer.py new file mode 100755 index 0000000..a2e3dc2 --- /dev/null +++ b/ir_signal_analyzer.py @@ -0,0 +1,343 @@ +#!/usr/bin/env python3 +""" +IR Signal Analyzer for Unknown Protocols +Captures and analyzes raw IR signal timing to help develop custom decoders +""" + +import os +import sys +import json +import time +import logging +import threading +from pathlib import Path +from typing import Dict, List, Optional, Tuple +import RPi.GPIO as GPIO + +class IRSignalAnalyzer: + """Analyze IR signals to understand unknown protocols""" + + def __init__(self, gpio_pin: int = 18): + self.gpio_pin = gpio_pin + self.logger = self._setup_logging() + self.running = False + self.last_state = GPIO.HIGH + self.pulse_start = 0 + self.pulses = [] + self.captured_signals = [] + + def _setup_logging(self) -> logging.Logger: + """Setup logging for the analyzer""" + logger = logging.getLogger(__name__) + logger.setLevel(logging.INFO) + + # Create console handler + handler = logging.StreamHandler() + formatter = logging.Formatter( + '%(asctime)s - %(levelname)s - %(message)s' + ) + handler.setFormatter(formatter) + logger.addHandler(handler) + + return logger + + def display_startup_info(self): + """Display startup information""" + print("=" * 80) + print("IR SIGNAL ANALYZER FOR UNKNOWN PROTOCOLS") + print("=" * 80) + print(f"GPIO Pin: {self.gpio_pin}") + print("This tool will capture and analyze raw IR signal timing") + print("to help develop custom protocol decoders.") + print() + print("INSTRUCTIONS:") + print("1. Point your unknown remote at the IR receiver") + print("2. Press buttons to capture signals") + print("3. Press the same button multiple times to check consistency") + print("4. Press different buttons to understand the protocol structure") + print("5. Press Ctrl+C to stop and save analysis") + print("=" * 80) + print("LISTENING FOR IR SIGNALS...") + print("=" * 80) + print() + + def setup_gpio(self): + """Setup GPIO for IR receiver""" + try: + GPIO.setmode(GPIO.BCM) + GPIO.setup(self.gpio_pin, GPIO.IN, pull_up_down=GPIO.PUD_UP) + self.logger.info(f"GPIO setup complete on pin {self.gpio_pin}") + return True + except Exception as e: + self.logger.error(f"GPIO setup failed: {e}") + return False + + def poll_ir_signal(self): + """Poll for IR signal changes""" + while self.running: + try: + current_state = GPIO.input(self.gpio_pin) + current_time = time.time() + + # Detect state change + if current_state != self.last_state: + if self.pulse_start > 0: + # Calculate pulse/space duration + duration = (current_time - self.pulse_start) * 1000000 # Convert to microseconds + self.pulses.append(duration) + + self.pulse_start = current_time + self.last_state = current_state + + # Check for end of signal (no change for 100ms) + if self.pulse_start > 0 and (current_time - self.pulse_start) > 0.1: + if len(self.pulses) > 0: + self.process_signal(self.pulses.copy()) + self.pulses = [] + self.pulse_start = 0 + + time.sleep(0.0001) # 0.1ms polling interval + + except Exception as e: + self.logger.error(f"Error in polling loop: {e}") + time.sleep(0.01) + + def process_signal(self, pulses: List[float]): + """Process captured signal""" + if len(pulses) < 2: + return + + # Store the captured signal + signal_data = { + 'timestamp': time.time(), + 'pulse_count': len(pulses), + 'pulses': pulses.copy(), + 'total_duration': sum(pulses), + 'analysis': self.analyze_signal(pulses) + } + + self.captured_signals.append(signal_data) + + # Display signal information + self.display_signal_info(signal_data) + + def analyze_signal(self, pulses: List[float]) -> Dict: + """Analyze signal characteristics""" + analysis = { + 'pulse_count': len(pulses), + 'total_duration_us': sum(pulses), + 'min_pulse': min(pulses), + 'max_pulse': max(pulses), + 'avg_pulse': sum(pulses) / len(pulses), + 'unique_timings': list(set(pulses)), + 'timing_pattern': self.identify_timing_pattern(pulses), + 'possible_protocol': self.guess_protocol(pulses) + } + + return analysis + + def identify_timing_pattern(self, pulses: List[float]) -> Dict: + """Identify common timing patterns""" + # Group similar timings (within 20% tolerance) + timing_groups = {} + tolerance = 0.2 + + for pulse in pulses: + grouped = False + for group_key in timing_groups: + if abs(pulse - group_key) / group_key <= tolerance: + timing_groups[group_key].append(pulse) + grouped = True + break + + if not grouped: + timing_groups[pulse] = [pulse] + + # Find the most common timings + common_timings = {} + for group_key, group_pulses in timing_groups.items(): + if len(group_pulses) > 1: # Only groups with multiple occurrences + common_timings[group_key] = { + 'count': len(group_pulses), + 'avg': sum(group_pulses) / len(group_pulses), + 'min': min(group_pulses), + 'max': max(group_pulses) + } + + return { + 'unique_timings': len(timing_groups), + 'common_timings': common_timings, + 'all_groups': timing_groups + } + + def guess_protocol(self, pulses: List[float]) -> str: + """Make educated guesses about the protocol""" + pulse_count = len(pulses) + + # Check for known protocol patterns + if pulse_count == 2: + return "Possible repeat code or simple protocol" + elif pulse_count == 34: + return "Possible NEC protocol (34 pulses)" + elif pulse_count == 14: + return "Possible RC5 protocol (14 bits)" + elif pulse_count == 16: + return "Possible RC6 protocol (16 bits)" + elif pulse_count % 2 == 0: + return f"Even number of pulses ({pulse_count}) - likely pulse/space encoding" + else: + return f"Odd number of pulses ({pulse_count}) - unusual pattern" + + def display_signal_info(self, signal_data: Dict): + """Display information about captured signal""" + timestamp = time.strftime("%H:%M:%S") + analysis = signal_data['analysis'] + + print(f"[{timestamp}] Signal Captured:") + print(f" Pulse Count: {analysis['pulse_count']}") + print(f" Total Duration: {analysis['total_duration_us']:.0f} μs") + print(f" Min/Max/Avg Pulse: {analysis['min_pulse']:.0f}/{analysis['max_pulse']:.0f}/{analysis['avg_pulse']:.0f} μs") + print(f" Unique Timings: {len(analysis['unique_timings'])}") + print(f" Possible Protocol: {analysis['possible_protocol']}") + + # Show common timings + if analysis['timing_pattern']['common_timings']: + print(" Common Timings:") + for timing, info in analysis['timing_pattern']['common_timings'].items(): + print(f" {timing:.0f}μs (x{info['count']}, avg: {info['avg']:.0f}μs)") + + print() + + def save_analysis(self, filename: str = None): + """Save captured signals to file for analysis""" + if not filename: + timestamp = time.strftime("%Y%m%d_%H%M%S") + filename = f"ir_analysis_{timestamp}.json" + + try: + with open(filename, 'w') as f: + json.dump(self.captured_signals, f, indent=2) + + print(f"Analysis saved to: {filename}") + print(f"Captured {len(self.captured_signals)} signals") + + # Generate summary + self.generate_summary() + + except Exception as e: + self.logger.error(f"Error saving analysis: {e}") + + def generate_summary(self): + """Generate analysis summary""" + if not self.captured_signals: + return + + print("\n" + "=" * 80) + print("ANALYSIS SUMMARY") + print("=" * 80) + + # Group signals by pulse count + pulse_count_groups = {} + for signal in self.captured_signals: + count = signal['pulse_count'] + if count not in pulse_count_groups: + pulse_count_groups[count] = [] + pulse_count_groups[count].append(signal) + + print("Signals by pulse count:") + for count, signals in sorted(pulse_count_groups.items()): + print(f" {count} pulses: {len(signals)} signals") + + # Show timing analysis for this group + all_timings = [] + for signal in signals: + all_timings.extend(signal['pulses']) + + if all_timings: + unique_timings = list(set(all_timings)) + print(f" Unique timings: {len(unique_timings)}") + print(f" Timing range: {min(all_timings):.0f} - {max(all_timings):.0f} μs") + + print("\nRecommendations for protocol decoder:") + print("1. Look for consistent timing patterns across signals") + print("2. Identify header, data, and footer sections") + print("3. Determine bit encoding method (pulse width, space width, or both)") + print("4. Check for repeat codes (usually 2 pulses)") + print("5. Analyze data length and structure") + + print("=" * 80) + + def run(self): + """Main run loop""" + try: + # Display startup information + self.display_startup_info() + + # Setup GPIO + if not self.setup_gpio(): + print("Failed to setup GPIO. Exiting.") + return False + + # Start polling + self.running = True + self.poll_ir_signal() + + except KeyboardInterrupt: + print("\nStopping IR analyzer...") + return True + except Exception as e: + self.logger.error(f"Error in main loop: {e}") + return False + finally: + self.cleanup() + + def cleanup(self): + """Cleanup resources""" + self.running = False + try: + GPIO.cleanup() + except: + pass + + # Save analysis + if self.captured_signals: + self.save_analysis() + + self.logger.info("IR Analyzer cleanup complete") + +def main(): + """Main function""" + import argparse + + parser = argparse.ArgumentParser(description="IR Signal Analyzer for Unknown Protocols") + parser.add_argument( + "--gpio-pin", + type=int, + default=18, + help="GPIO pin for IR receiver (default: 18)" + ) + parser.add_argument( + "--output", "-o", + type=str, + help="Output filename for analysis data" + ) + parser.add_argument( + "--verbose", "-v", + action="store_true", + help="Enable verbose logging" + ) + + args = parser.parse_args() + + # Set logging level + if args.verbose: + logging.getLogger().setLevel(logging.DEBUG) + + # Create and run analyzer + analyzer = IRSignalAnalyzer(args.gpio_pin) + success = analyzer.run() + + sys.exit(0 if success else 1) + +if __name__ == "__main__": + main() diff --git a/monitor_ir_output.py b/monitor_ir_output.py index f269636..b8bd44a 100644 --- a/monitor_ir_output.py +++ b/monitor_ir_output.py @@ -40,3 +40,4 @@ def monitor_ir_listener(): if __name__ == "__main__": monitor_ir_listener() + diff --git a/protocol_development_guide.md b/protocol_development_guide.md new file mode 100644 index 0000000..0c5320c --- /dev/null +++ b/protocol_development_guide.md @@ -0,0 +1,198 @@ +# Custom IR Protocol Development Guide + +This guide will help you develop a custom decoder for an unknown IR protocol using the tools provided. + +## Step 1: Capture Raw Signal Data + +First, use the IR signal analyzer to capture raw timing data from your unknown remote: + +```bash +python3 ir_signal_analyzer.py --gpio-pin 18 --verbose +``` + +### Instructions: +1. Point your unknown remote at the IR receiver +2. Press buttons to capture signals +3. Press the same button multiple times to check consistency +4. Press different buttons to understand the protocol structure +5. Press Ctrl+C to stop and save analysis + +The analyzer will save a JSON file with all captured signals and generate an analysis summary. + +## Step 2: Analyze the Captured Data + +Examine the generated JSON file and analysis summary to understand: + +### Key Questions to Answer: +1. **How many pulses does each signal have?** + - Consistent pulse count indicates a structured protocol + - Variable pulse count might indicate variable-length data + +2. **What are the common timing values?** + - Look for repeated timing values across different buttons + - These likely represent bit 0, bit 1, header, footer, etc. + +3. **Is there a header pattern?** + - First few pulses often form a header + - Headers are usually longer than data bits + +4. **How are bits encoded?** + - **Pulse Width Modulation**: Different pulse lengths for 0/1 + - **Space Width Modulation**: Different space lengths for 0/1 + - **Both**: Different combinations of pulse and space lengths + +5. **Is there a repeat code?** + - Usually 2 pulses with specific timing + - Much shorter than normal frames + +## Step 3: Customize the Protocol Decoder + +Edit `custom_ir_protocol.py` and update the timing constants based on your analysis: + +### Example Analysis Results: +```python +# If your analysis shows these common timings: +# 9000μs, 4500μs, 560μs, 1690μs, 560μs + +# Update the constants: +self.HEADER_PULSE = 9000 # Long pulse at start +self.HEADER_SPACE = 4500 # Long space after header +self.BIT_1_PULSE = 560 # Short pulse for all bits +self.BIT_1_SPACE = 1690 # Long space for bit 1 +self.BIT_0_PULSE = 560 # Short pulse for all bits +self.BIT_0_SPACE = 560 # Short space for bit 0 +``` + +### Common Protocol Patterns: + +#### NEC-like Protocol: +- 34 pulses total (header + 32 data bits) +- Header: 9000μs pulse + 4500μs space +- Data: 560μs pulse + (560μs or 1690μs) space +- Repeat: 9000μs pulse + 2250μs space + +#### RC5-like Protocol: +- 14 bits total +- Manchester encoding +- 889μs bit time +- Start bits: 11 + +#### Custom Protocol Example: +- 20 pulses total +- Header: 8000μs pulse + 4000μs space +- Data: 500μs pulse + (500μs or 1500μs) space +- Footer: 500μs pulse + 100000μs space + +## Step 4: Test Your Decoder + +Test your custom decoder with the captured signals: + +```bash +python3 custom_ir_protocol.py +``` + +This will attempt to decode all captured signals using your custom protocol. + +## Step 5: Integrate with IR System + +Once your decoder works, integrate it into your IR system: + +1. **Add to protocol list** in your IR listeners +2. **Update IR mapping** to include your custom protocol codes +3. **Test with real remote** to ensure it works correctly + +## Troubleshooting + +### Common Issues: + +1. **No signals decoded:** + - Check timing constants match your analysis + - Verify pulse count expectations + - Check tolerance settings (try increasing to 0.3) + +2. **Inconsistent decoding:** + - Remote might have timing variations + - Increase tolerance or add timing ranges + - Check for different button types (some might be repeats) + +3. **Wrong data extracted:** + - Verify bit order (LSB vs MSB) + - Check address vs command bit allocation + - Ensure proper bit indexing + +### Debug Tips: + +1. **Enable debug logging:** + ```python + logging.basicConfig(level=logging.DEBUG) + ``` + +2. **Add print statements** in decode methods to see what's happening + +3. **Compare with known protocols** to understand similarities + +4. **Use the analyzer's timing analysis** to identify patterns + +## Advanced Features + +### Variable-Length Protocols: +Some protocols have variable data lengths. Modify the decoder to handle this: + +```python +def _determine_data_length(self, pulse_times): + # Analyze pulse count to determine data length + # Return appropriate bit counts + pass +``` + +### Multiple Protocol Variants: +If your remote uses multiple similar protocols: + +```python +class CustomIRProtocolVariant1(CustomIRProtocol): + def __init__(self): + super().__init__("CUSTOM_V1") + # Different timing constants + +class CustomIRProtocolVariant2(CustomIRProtocol): + def __init__(self): + super().__init__("CUSTOM_V2") + # Different timing constants +``` + +### Checksum Validation: +Some protocols include checksums: + +```python +def _validate_checksum(self, address, command): + # Calculate and validate checksum + # Return True if valid, False otherwise + pass +``` + +## Example: Complete Custom Protocol + +Here's an example of a complete custom protocol based on analysis: + +```python +class MyCustomProtocol(CustomIRProtocol): + def __init__(self): + super().__init__("MY_CUSTOM") + + # Based on analysis of captured signals + self.HEADER_PULSE = 8500 + self.HEADER_SPACE = 4200 + self.BIT_1_PULSE = 580 + self.BIT_1_SPACE = 1650 + self.BIT_0_PULSE = 580 + self.BIT_0_SPACE = 580 + self.REPEAT_PULSE = 8500 + self.REPEAT_SPACE = 2100 + + self.EXPECTED_PULSE_COUNT = 36 # 2 header + 34 data + self.DATA_BITS = 32 + self.ADDRESS_BITS = 16 + self.COMMAND_BITS = 16 +``` + +This protocol would decode signals as `MY_CUSTOM_1234_5678` format. diff --git a/simple_ir_listener_polling.py b/simple_ir_listener_polling.py index a972676..ab339bd 100644 --- a/simple_ir_listener_polling.py +++ b/simple_ir_listener_polling.py @@ -295,3 +295,4 @@ def main(): if __name__ == "__main__": main() + diff --git a/test_ir_listener.py b/test_ir_listener.py index 94d6460..96ebb8d 100644 --- a/test_ir_listener.py +++ b/test_ir_listener.py @@ -29,3 +29,4 @@ def test_ir_listener(): if __name__ == "__main__": test_ir_listener() +