Files
rpi-tulivision/ir_signal_analyzer.py
2025-09-27 19:04:09 +02:00

344 lines
12 KiB
Python
Executable File

#!/usr/bin/env python3
"""
IR Signal Analyzer for Unknown Protocols
Captures and analyzes raw IR signal timing to help develop custom decoders
"""
import os
import sys
import json
import time
import logging
import threading
from pathlib import Path
from typing import Dict, List, Optional, Tuple
import RPi.GPIO as GPIO
class IRSignalAnalyzer:
"""Analyze IR signals to understand unknown protocols"""
def __init__(self, gpio_pin: int = 18):
self.gpio_pin = gpio_pin
self.logger = self._setup_logging()
self.running = False
self.last_state = GPIO.HIGH
self.pulse_start = 0
self.pulses = []
self.captured_signals = []
def _setup_logging(self) -> logging.Logger:
"""Setup logging for the analyzer"""
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
# Create console handler
handler = logging.StreamHandler()
formatter = logging.Formatter(
'%(asctime)s - %(levelname)s - %(message)s'
)
handler.setFormatter(formatter)
logger.addHandler(handler)
return logger
def display_startup_info(self):
"""Display startup information"""
print("=" * 80)
print("IR SIGNAL ANALYZER FOR UNKNOWN PROTOCOLS")
print("=" * 80)
print(f"GPIO Pin: {self.gpio_pin}")
print("This tool will capture and analyze raw IR signal timing")
print("to help develop custom protocol decoders.")
print()
print("INSTRUCTIONS:")
print("1. Point your unknown remote at the IR receiver")
print("2. Press buttons to capture signals")
print("3. Press the same button multiple times to check consistency")
print("4. Press different buttons to understand the protocol structure")
print("5. Press Ctrl+C to stop and save analysis")
print("=" * 80)
print("LISTENING FOR IR SIGNALS...")
print("=" * 80)
print()
def setup_gpio(self):
"""Setup GPIO for IR receiver"""
try:
GPIO.setmode(GPIO.BCM)
GPIO.setup(self.gpio_pin, GPIO.IN, pull_up_down=GPIO.PUD_UP)
self.logger.info(f"GPIO setup complete on pin {self.gpio_pin}")
return True
except Exception as e:
self.logger.error(f"GPIO setup failed: {e}")
return False
def poll_ir_signal(self):
"""Poll for IR signal changes"""
while self.running:
try:
current_state = GPIO.input(self.gpio_pin)
current_time = time.time()
# Detect state change
if current_state != self.last_state:
if self.pulse_start > 0:
# Calculate pulse/space duration
duration = (current_time - self.pulse_start) * 1000000 # Convert to microseconds
self.pulses.append(duration)
self.pulse_start = current_time
self.last_state = current_state
# Check for end of signal (no change for 100ms)
if self.pulse_start > 0 and (current_time - self.pulse_start) > 0.1:
if len(self.pulses) > 0:
self.process_signal(self.pulses.copy())
self.pulses = []
self.pulse_start = 0
time.sleep(0.0001) # 0.1ms polling interval
except Exception as e:
self.logger.error(f"Error in polling loop: {e}")
time.sleep(0.01)
def process_signal(self, pulses: List[float]):
"""Process captured signal"""
if len(pulses) < 2:
return
# Store the captured signal
signal_data = {
'timestamp': time.time(),
'pulse_count': len(pulses),
'pulses': pulses.copy(),
'total_duration': sum(pulses),
'analysis': self.analyze_signal(pulses)
}
self.captured_signals.append(signal_data)
# Display signal information
self.display_signal_info(signal_data)
def analyze_signal(self, pulses: List[float]) -> Dict:
"""Analyze signal characteristics"""
analysis = {
'pulse_count': len(pulses),
'total_duration_us': sum(pulses),
'min_pulse': min(pulses),
'max_pulse': max(pulses),
'avg_pulse': sum(pulses) / len(pulses),
'unique_timings': list(set(pulses)),
'timing_pattern': self.identify_timing_pattern(pulses),
'possible_protocol': self.guess_protocol(pulses)
}
return analysis
def identify_timing_pattern(self, pulses: List[float]) -> Dict:
"""Identify common timing patterns"""
# Group similar timings (within 20% tolerance)
timing_groups = {}
tolerance = 0.2
for pulse in pulses:
grouped = False
for group_key in timing_groups:
if abs(pulse - group_key) / group_key <= tolerance:
timing_groups[group_key].append(pulse)
grouped = True
break
if not grouped:
timing_groups[pulse] = [pulse]
# Find the most common timings
common_timings = {}
for group_key, group_pulses in timing_groups.items():
if len(group_pulses) > 1: # Only groups with multiple occurrences
common_timings[group_key] = {
'count': len(group_pulses),
'avg': sum(group_pulses) / len(group_pulses),
'min': min(group_pulses),
'max': max(group_pulses)
}
return {
'unique_timings': len(timing_groups),
'common_timings': common_timings,
'all_groups': timing_groups
}
def guess_protocol(self, pulses: List[float]) -> str:
"""Make educated guesses about the protocol"""
pulse_count = len(pulses)
# Check for known protocol patterns
if pulse_count == 2:
return "Possible repeat code or simple protocol"
elif pulse_count == 34:
return "Possible NEC protocol (34 pulses)"
elif pulse_count == 14:
return "Possible RC5 protocol (14 bits)"
elif pulse_count == 16:
return "Possible RC6 protocol (16 bits)"
elif pulse_count % 2 == 0:
return f"Even number of pulses ({pulse_count}) - likely pulse/space encoding"
else:
return f"Odd number of pulses ({pulse_count}) - unusual pattern"
def display_signal_info(self, signal_data: Dict):
"""Display information about captured signal"""
timestamp = time.strftime("%H:%M:%S")
analysis = signal_data['analysis']
print(f"[{timestamp}] Signal Captured:")
print(f" Pulse Count: {analysis['pulse_count']}")
print(f" Total Duration: {analysis['total_duration_us']:.0f} μs")
print(f" Min/Max/Avg Pulse: {analysis['min_pulse']:.0f}/{analysis['max_pulse']:.0f}/{analysis['avg_pulse']:.0f} μs")
print(f" Unique Timings: {len(analysis['unique_timings'])}")
print(f" Possible Protocol: {analysis['possible_protocol']}")
# Show common timings
if analysis['timing_pattern']['common_timings']:
print(" Common Timings:")
for timing, info in analysis['timing_pattern']['common_timings'].items():
print(f" {timing:.0f}μs (x{info['count']}, avg: {info['avg']:.0f}μs)")
print()
def save_analysis(self, filename: str = None):
"""Save captured signals to file for analysis"""
if not filename:
timestamp = time.strftime("%Y%m%d_%H%M%S")
filename = f"ir_analysis_{timestamp}.json"
try:
with open(filename, 'w') as f:
json.dump(self.captured_signals, f, indent=2)
print(f"Analysis saved to: {filename}")
print(f"Captured {len(self.captured_signals)} signals")
# Generate summary
self.generate_summary()
except Exception as e:
self.logger.error(f"Error saving analysis: {e}")
def generate_summary(self):
"""Generate analysis summary"""
if not self.captured_signals:
return
print("\n" + "=" * 80)
print("ANALYSIS SUMMARY")
print("=" * 80)
# Group signals by pulse count
pulse_count_groups = {}
for signal in self.captured_signals:
count = signal['pulse_count']
if count not in pulse_count_groups:
pulse_count_groups[count] = []
pulse_count_groups[count].append(signal)
print("Signals by pulse count:")
for count, signals in sorted(pulse_count_groups.items()):
print(f" {count} pulses: {len(signals)} signals")
# Show timing analysis for this group
all_timings = []
for signal in signals:
all_timings.extend(signal['pulses'])
if all_timings:
unique_timings = list(set(all_timings))
print(f" Unique timings: {len(unique_timings)}")
print(f" Timing range: {min(all_timings):.0f} - {max(all_timings):.0f} μs")
print("\nRecommendations for protocol decoder:")
print("1. Look for consistent timing patterns across signals")
print("2. Identify header, data, and footer sections")
print("3. Determine bit encoding method (pulse width, space width, or both)")
print("4. Check for repeat codes (usually 2 pulses)")
print("5. Analyze data length and structure")
print("=" * 80)
def run(self):
"""Main run loop"""
try:
# Display startup information
self.display_startup_info()
# Setup GPIO
if not self.setup_gpio():
print("Failed to setup GPIO. Exiting.")
return False
# Start polling
self.running = True
self.poll_ir_signal()
except KeyboardInterrupt:
print("\nStopping IR analyzer...")
return True
except Exception as e:
self.logger.error(f"Error in main loop: {e}")
return False
finally:
self.cleanup()
def cleanup(self):
"""Cleanup resources"""
self.running = False
try:
GPIO.cleanup()
except:
pass
# Save analysis
if self.captured_signals:
self.save_analysis()
self.logger.info("IR Analyzer cleanup complete")
def main():
"""Main function"""
import argparse
parser = argparse.ArgumentParser(description="IR Signal Analyzer for Unknown Protocols")
parser.add_argument(
"--gpio-pin",
type=int,
default=18,
help="GPIO pin for IR receiver (default: 18)"
)
parser.add_argument(
"--output", "-o",
type=str,
help="Output filename for analysis data"
)
parser.add_argument(
"--verbose", "-v",
action="store_true",
help="Enable verbose logging"
)
args = parser.parse_args()
# Set logging level
if args.verbose:
logging.getLogger().setLevel(logging.DEBUG)
# Create and run analyzer
analyzer = IRSignalAnalyzer(args.gpio_pin)
success = analyzer.run()
sys.exit(0 if success else 1)
if __name__ == "__main__":
main()