#!/usr/bin/env python3 """ IR Signal Analyzer for Unknown Protocols Captures and analyzes raw IR signal timing to help develop custom decoders """ import os import sys import json import time import logging import threading from pathlib import Path from typing import Dict, List, Optional, Tuple import RPi.GPIO as GPIO class IRSignalAnalyzer: """Analyze IR signals to understand unknown protocols""" def __init__(self, gpio_pin: int = 18): self.gpio_pin = gpio_pin self.logger = self._setup_logging() self.running = False self.last_state = GPIO.HIGH self.pulse_start = 0 self.pulses = [] self.captured_signals = [] def _setup_logging(self) -> logging.Logger: """Setup logging for the analyzer""" logger = logging.getLogger(__name__) logger.setLevel(logging.INFO) # Create console handler handler = logging.StreamHandler() formatter = logging.Formatter( '%(asctime)s - %(levelname)s - %(message)s' ) handler.setFormatter(formatter) logger.addHandler(handler) return logger def display_startup_info(self): """Display startup information""" print("=" * 80) print("IR SIGNAL ANALYZER FOR UNKNOWN PROTOCOLS") print("=" * 80) print(f"GPIO Pin: {self.gpio_pin}") print("This tool will capture and analyze raw IR signal timing") print("to help develop custom protocol decoders.") print() print("INSTRUCTIONS:") print("1. Point your unknown remote at the IR receiver") print("2. Press buttons to capture signals") print("3. Press the same button multiple times to check consistency") print("4. Press different buttons to understand the protocol structure") print("5. Press Ctrl+C to stop and save analysis") print("=" * 80) print("LISTENING FOR IR SIGNALS...") print("=" * 80) print() def setup_gpio(self): """Setup GPIO for IR receiver""" try: GPIO.setmode(GPIO.BCM) GPIO.setup(self.gpio_pin, GPIO.IN, pull_up_down=GPIO.PUD_UP) self.logger.info(f"GPIO setup complete on pin {self.gpio_pin}") return True except Exception as e: self.logger.error(f"GPIO setup failed: {e}") return False def poll_ir_signal(self): """Poll for IR signal changes""" while self.running: try: current_state = GPIO.input(self.gpio_pin) current_time = time.time() # Detect state change if current_state != self.last_state: if self.pulse_start > 0: # Calculate pulse/space duration duration = (current_time - self.pulse_start) * 1000000 # Convert to microseconds self.pulses.append(duration) self.pulse_start = current_time self.last_state = current_state # Check for end of signal (no change for 100ms) if self.pulse_start > 0 and (current_time - self.pulse_start) > 0.1: if len(self.pulses) > 0: self.process_signal(self.pulses.copy()) self.pulses = [] self.pulse_start = 0 time.sleep(0.0001) # 0.1ms polling interval except Exception as e: self.logger.error(f"Error in polling loop: {e}") time.sleep(0.01) def process_signal(self, pulses: List[float]): """Process captured signal""" if len(pulses) < 2: return # Store the captured signal signal_data = { 'timestamp': time.time(), 'pulse_count': len(pulses), 'pulses': pulses.copy(), 'total_duration': sum(pulses), 'analysis': self.analyze_signal(pulses) } self.captured_signals.append(signal_data) # Display signal information self.display_signal_info(signal_data) def analyze_signal(self, pulses: List[float]) -> Dict: """Analyze signal characteristics""" analysis = { 'pulse_count': len(pulses), 'total_duration_us': sum(pulses), 'min_pulse': min(pulses), 'max_pulse': max(pulses), 'avg_pulse': sum(pulses) / len(pulses), 'unique_timings': list(set(pulses)), 'timing_pattern': self.identify_timing_pattern(pulses), 'possible_protocol': self.guess_protocol(pulses) } return analysis def identify_timing_pattern(self, pulses: List[float]) -> Dict: """Identify common timing patterns""" # Group similar timings (within 20% tolerance) timing_groups = {} tolerance = 0.2 for pulse in pulses: grouped = False for group_key in timing_groups: if abs(pulse - group_key) / group_key <= tolerance: timing_groups[group_key].append(pulse) grouped = True break if not grouped: timing_groups[pulse] = [pulse] # Find the most common timings common_timings = {} for group_key, group_pulses in timing_groups.items(): if len(group_pulses) > 1: # Only groups with multiple occurrences common_timings[group_key] = { 'count': len(group_pulses), 'avg': sum(group_pulses) / len(group_pulses), 'min': min(group_pulses), 'max': max(group_pulses) } return { 'unique_timings': len(timing_groups), 'common_timings': common_timings, 'all_groups': timing_groups } def guess_protocol(self, pulses: List[float]) -> str: """Make educated guesses about the protocol""" pulse_count = len(pulses) # Check for known protocol patterns if pulse_count == 2: return "Possible repeat code or simple protocol" elif pulse_count == 34: return "Possible NEC protocol (34 pulses)" elif pulse_count == 14: return "Possible RC5 protocol (14 bits)" elif pulse_count == 16: return "Possible RC6 protocol (16 bits)" elif pulse_count % 2 == 0: return f"Even number of pulses ({pulse_count}) - likely pulse/space encoding" else: return f"Odd number of pulses ({pulse_count}) - unusual pattern" def display_signal_info(self, signal_data: Dict): """Display information about captured signal""" timestamp = time.strftime("%H:%M:%S") analysis = signal_data['analysis'] print(f"[{timestamp}] Signal Captured:") print(f" Pulse Count: {analysis['pulse_count']}") print(f" Total Duration: {analysis['total_duration_us']:.0f} μs") print(f" Min/Max/Avg Pulse: {analysis['min_pulse']:.0f}/{analysis['max_pulse']:.0f}/{analysis['avg_pulse']:.0f} μs") print(f" Unique Timings: {len(analysis['unique_timings'])}") print(f" Possible Protocol: {analysis['possible_protocol']}") # Show common timings if analysis['timing_pattern']['common_timings']: print(" Common Timings:") for timing, info in analysis['timing_pattern']['common_timings'].items(): print(f" {timing:.0f}μs (x{info['count']}, avg: {info['avg']:.0f}μs)") print() def save_analysis(self, filename: str = None): """Save captured signals to file for analysis""" if not filename: timestamp = time.strftime("%Y%m%d_%H%M%S") filename = f"ir_analysis_{timestamp}.json" try: with open(filename, 'w') as f: json.dump(self.captured_signals, f, indent=2) print(f"Analysis saved to: {filename}") print(f"Captured {len(self.captured_signals)} signals") # Generate summary self.generate_summary() except Exception as e: self.logger.error(f"Error saving analysis: {e}") def generate_summary(self): """Generate analysis summary""" if not self.captured_signals: return print("\n" + "=" * 80) print("ANALYSIS SUMMARY") print("=" * 80) # Group signals by pulse count pulse_count_groups = {} for signal in self.captured_signals: count = signal['pulse_count'] if count not in pulse_count_groups: pulse_count_groups[count] = [] pulse_count_groups[count].append(signal) print("Signals by pulse count:") for count, signals in sorted(pulse_count_groups.items()): print(f" {count} pulses: {len(signals)} signals") # Show timing analysis for this group all_timings = [] for signal in signals: all_timings.extend(signal['pulses']) if all_timings: unique_timings = list(set(all_timings)) print(f" Unique timings: {len(unique_timings)}") print(f" Timing range: {min(all_timings):.0f} - {max(all_timings):.0f} μs") print("\nRecommendations for protocol decoder:") print("1. Look for consistent timing patterns across signals") print("2. Identify header, data, and footer sections") print("3. Determine bit encoding method (pulse width, space width, or both)") print("4. Check for repeat codes (usually 2 pulses)") print("5. Analyze data length and structure") print("=" * 80) def run(self): """Main run loop""" try: # Display startup information self.display_startup_info() # Setup GPIO if not self.setup_gpio(): print("Failed to setup GPIO. Exiting.") return False # Start polling self.running = True self.poll_ir_signal() except KeyboardInterrupt: print("\nStopping IR analyzer...") return True except Exception as e: self.logger.error(f"Error in main loop: {e}") return False finally: self.cleanup() def cleanup(self): """Cleanup resources""" self.running = False try: GPIO.cleanup() except: pass # Save analysis if self.captured_signals: self.save_analysis() self.logger.info("IR Analyzer cleanup complete") def main(): """Main function""" import argparse parser = argparse.ArgumentParser(description="IR Signal Analyzer for Unknown Protocols") parser.add_argument( "--gpio-pin", type=int, default=18, help="GPIO pin for IR receiver (default: 18)" ) parser.add_argument( "--output", "-o", type=str, help="Output filename for analysis data" ) parser.add_argument( "--verbose", "-v", action="store_true", help="Enable verbose logging" ) args = parser.parse_args() # Set logging level if args.verbose: logging.getLogger().setLevel(logging.DEBUG) # Create and run analyzer analyzer = IRSignalAnalyzer(args.gpio_pin) success = analyzer.run() sys.exit(0 if success else 1) if __name__ == "__main__": main()