diff --git a/check_ir_status.py b/check_ir_status.py new file mode 100644 index 0000000..3b374da --- /dev/null +++ b/check_ir_status.py @@ -0,0 +1,52 @@ +#!/usr/bin/env python3 +""" +Check IR listener status on remote system +""" + +import subprocess +import sys + +def check_ir_status(): + """Check if IR listener is running""" + try: + # Check if process is running + result = subprocess.run([ + "ssh", "tulivision@192.168.1.137", + "ps aux | grep simple_ir_listener_polling | grep -v grep" + ], capture_output=True, text=True) + + if result.returncode == 0 and result.stdout.strip(): + print("✅ IR Listener is RUNNING") + print("Process details:") + print(result.stdout.strip()) + + # Check GPIO status + gpio_result = subprocess.run([ + "ssh", "tulivision@192.168.1.137", + "python3 -c 'import RPi.GPIO as GPIO; GPIO.setmode(GPIO.BCM); print(f\"GPIO 18 state: {GPIO.input(18)}\")'" + ], capture_output=True, text=True) + + if gpio_result.returncode == 0: + print(f"GPIO Status: {gpio_result.stdout.strip()}") + + else: + print("❌ IR Listener is NOT running") + + # Try to start it + print("Attempting to start IR listener...") + start_result = subprocess.run([ + "ssh", "tulivision@192.168.1.137", + "cd /home/tulivision/rpi-tulivision && nohup python3 simple_ir_listener_polling.py > ir_listener.log 2>&1 &" + ], capture_output=True, text=True) + + if start_result.returncode == 0: + print("✅ IR Listener started successfully") + else: + print("❌ Failed to start IR listener") + print(start_result.stderr) + + except Exception as e: + print(f"Error checking status: {e}") + +if __name__ == "__main__": + check_ir_status() diff --git a/monitor_ir_output.py b/monitor_ir_output.py new file mode 100644 index 0000000..f269636 --- /dev/null +++ b/monitor_ir_output.py @@ -0,0 +1,42 @@ +#!/usr/bin/env python3 +""" +Monitor IR listener output +""" + +import subprocess +import sys +import time + +def monitor_ir_listener(): + """Monitor the IR listener process""" + try: + # Connect to the remote system and monitor the IR listener + cmd = [ + "ssh", "tulivision@192.168.1.137", + "cd /home/tulivision/rpi-tulivision && python3 simple_ir_listener_polling.py" + ] + + print("Starting IR listener monitoring...") + print("The IR listener is now running on the Raspberry Pi.") + print("Point your IR remote at the IR receiver and press buttons.") + print("Press Ctrl+C to stop monitoring.") + print("=" * 60) + + # Start the process + process = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, + universal_newlines=True, bufsize=1) + + # Monitor output + for line in iter(process.stdout.readline, ''): + print(line.rstrip()) + sys.stdout.flush() + + except KeyboardInterrupt: + print("\nStopping IR listener...") + process.terminate() + process.wait() + except Exception as e: + print(f"Error: {e}") + +if __name__ == "__main__": + monitor_ir_listener() diff --git a/simple_ir_listener_polling.py b/simple_ir_listener_polling.py new file mode 100644 index 0000000..a972676 --- /dev/null +++ b/simple_ir_listener_polling.py @@ -0,0 +1,297 @@ +#!/usr/bin/env python3 +""" +Simple IR Remote Listener with Polling (No Edge Detection) +Listens to IR commands using polling method instead of edge detection +""" + +import os +import sys +import json +import time +import logging +import threading +from pathlib import Path +from typing import Dict, List, Optional, Tuple +import RPi.GPIO as GPIO + +class SimpleIRListenerPolling: + """Simple IR Remote Listener using Polling Method""" + + def __init__(self, gpio_pin: int = 18): + self.gpio_pin = gpio_pin + self.logger = self._setup_logging() + self.running = False + self.last_state = GPIO.HIGH + self.pulse_start = 0 + self.pulses = [] + + def _setup_logging(self) -> logging.Logger: + """Setup logging for the listener""" + logger = logging.getLogger(__name__) + logger.setLevel(logging.INFO) + + # Create console handler + handler = logging.StreamHandler() + formatter = logging.Formatter( + '%(asctime)s - %(levelname)s - %(message)s' + ) + handler.setFormatter(formatter) + logger.addHandler(handler) + + return logger + + def display_startup_info(self): + """Display startup information""" + print("=" * 60) + print("SIMPLE IR REMOTE LISTENER (POLLING MODE)") + print("=" * 60) + print(f"GPIO Pin: {self.gpio_pin}") + print("Method: Polling (no edge detection)") + print("=" * 60) + print("LISTENING FOR IR COMMANDS...") + print("Press Ctrl+C to exit") + print("=" * 60) + print() + + def setup_gpio(self): + """Setup GPIO for IR receiver""" + try: + GPIO.setmode(GPIO.BCM) + GPIO.setup(self.gpio_pin, GPIO.IN, pull_up_down=GPIO.PUD_UP) + self.logger.info(f"GPIO setup complete on pin {self.gpio_pin}") + return True + except Exception as e: + self.logger.error(f"GPIO setup failed: {e}") + return False + + def decode_nec(self, pulses: List[float]) -> Optional[str]: + """Simple NEC protocol decoder""" + if len(pulses) < 2: + return None + + # Check for repeat code (2 pulses) + if len(pulses) == 2: + if 8000 <= pulses[0] <= 10000 and 2000 <= pulses[1] <= 2500: + return "REPEAT" + + # Check for normal NEC frame (34 pulses) + if len(pulses) != 34: + return None + + # Check header + if not (8000 <= pulses[0] <= 10000 and 4000 <= pulses[1] <= 5000): + return None + + # Decode address and command + address = 0 + command = 0 + + for i in range(2, 34, 2): + pulse_time = pulses[i] + space_time = pulses[i + 1] + + # Check if it's a valid bit pulse + if not (400 <= pulse_time <= 700): + return None + + bit_index = (i - 2) // 2 + + if 1400 <= space_time <= 2000: # Bit 1 + if bit_index < 16: + address |= (1 << bit_index) + else: + command |= (1 << (bit_index - 16)) + elif 400 <= space_time <= 700: # Bit 0 + pass + else: + return None + + return f"NEC_{address:04X}_{command:04X}" + + def decode_rc5(self, pulses: List[float]) -> Optional[str]: + """Simple RC5 protocol decoder""" + if len(pulses) < 14: + return None + + # RC5 uses Manchester encoding + bits = [] + for i in range(0, min(len(pulses), 28), 2): + if i + 1 < len(pulses): + if pulses[i] < 1000 and pulses[i + 1] < 1000: # Short-short = 0 + bits.append(0) + elif pulses[i] > 1000 and pulses[i + 1] > 1000: # Long-long = 1 + bits.append(1) + else: + return None + + if len(bits) < 14: + return None + + # Extract fields + start_bits = bits[0:2] + toggle = bits[2] + address = bits[3:8] + command = bits[8:14] + + if start_bits != [1, 1]: + return None + + addr_val = sum(bit << (4 - i) for i, bit in enumerate(address)) + cmd_val = sum(bit << (5 - i) for i, bit in enumerate(command)) + + return f"RC5_{addr_val:02X}_{cmd_val:02X}_{toggle}" + + def handle_ir_command(self, ir_code: str): + """Handle received IR command""" + timestamp = time.strftime("%H:%M:%S") + + # Print command information + print(f"[{timestamp}] IR Command Received: {ir_code}") + + # Try to load mapping if available + mapping = self.load_ir_mapping() + if ir_code in mapping: + mapped_command = mapping[ir_code] + if isinstance(mapped_command, dict): + print(f" Mapped Command: {mapped_command.get('command', 'unknown')}") + if mapped_command.get('description'): + print(f" Description: {mapped_command['description']}") + else: + print(f" Mapped Command: {mapped_command}") + else: + print(f" Mapped Command: UNKNOWN (not in mapping)") + + print() + + def load_ir_mapping(self) -> Dict: + """Load IR code mapping from file if available""" + mapping_files = [ + "/home/tulivision/rpi-tulivision/ir_mapping.json", + "/etc/video_player/ir_mapping.json", + "ir_mapping.json" + ] + + for mapping_file in mapping_files: + if os.path.exists(mapping_file): + try: + with open(mapping_file, 'r') as f: + return json.load(f) + except Exception as e: + self.logger.debug(f"Could not load mapping from {mapping_file}: {e}") + + return {} + + def poll_ir_signal(self): + """Poll for IR signal changes""" + while self.running: + try: + current_state = GPIO.input(self.gpio_pin) + current_time = time.time() + + # Detect state change + if current_state != self.last_state: + if self.pulse_start > 0: + # Calculate pulse/space duration + duration = (current_time - self.pulse_start) * 1000000 # Convert to microseconds + self.pulses.append(duration) + + self.pulse_start = current_time + self.last_state = current_state + + # Check for end of signal (no change for 100ms) + if self.pulse_start > 0 and (current_time - self.pulse_start) > 0.1: + if len(self.pulses) > 0: + self.process_pulses(self.pulses.copy()) + self.pulses = [] + self.pulse_start = 0 + + time.sleep(0.0001) # 0.1ms polling interval + + except Exception as e: + self.logger.error(f"Error in polling loop: {e}") + time.sleep(0.01) + + def process_pulses(self, pulses: List[float]): + """Process captured pulses""" + if len(pulses) < 2: + return + + # Try NEC protocol first + nec_command = self.decode_nec(pulses) + if nec_command: + self.handle_ir_command(nec_command) + return + + # Try RC5 protocol + rc5_command = self.decode_rc5(pulses) + if rc5_command: + self.handle_ir_command(rc5_command) + return + + # If no protocol matched, log for debugging + self.logger.debug(f"No protocol matched for {len(pulses)} pulses") + + def run(self): + """Main run loop""" + try: + # Display startup information + self.display_startup_info() + + # Setup GPIO + if not self.setup_gpio(): + print("Failed to setup GPIO. Exiting.") + return False + + # Start polling + self.running = True + self.poll_ir_signal() + + except KeyboardInterrupt: + print("\nShutting down IR listener...") + return True + except Exception as e: + self.logger.error(f"Error in main loop: {e}") + return False + finally: + self.cleanup() + + def cleanup(self): + """Cleanup resources""" + self.running = False + try: + GPIO.cleanup() + except: + pass + self.logger.info("IR Listener cleanup complete") + +def main(): + """Main function""" + import argparse + + parser = argparse.ArgumentParser(description="Simple IR Remote Listener (Polling Mode)") + parser.add_argument( + "--gpio-pin", + type=int, + default=18, + help="GPIO pin for IR receiver (default: 18)" + ) + parser.add_argument( + "--verbose", "-v", + action="store_true", + help="Enable verbose logging" + ) + + args = parser.parse_args() + + # Set logging level + if args.verbose: + logging.getLogger().setLevel(logging.DEBUG) + + # Create and run listener + listener = SimpleIRListenerPolling(args.gpio_pin) + success = listener.run() + + sys.exit(0 if success else 1) + +if __name__ == "__main__": + main() diff --git a/test_ir_listener.py b/test_ir_listener.py new file mode 100644 index 0000000..94d6460 --- /dev/null +++ b/test_ir_listener.py @@ -0,0 +1,31 @@ +#!/usr/bin/env python3 +""" +Test script for IR listener +""" + +import time +import sys +import os + +# Add current directory to path +sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) + +from simple_ir_listener_polling import SimpleIRListenerPolling + +def test_ir_listener(): + """Test the IR listener with simulated commands""" + print("Testing IR Listener...") + + # Create listener + listener = SimpleIRListenerPolling(gpio_pin=18) + + # Test the command handler directly + print("Testing command handler...") + listener.handle_ir_command("NEC_00FF_00FF") + listener.handle_ir_command("NEC_00FF_807F") + listener.handle_ir_command("RC5_00_0C_0") + + print("Test completed successfully!") + +if __name__ == "__main__": + test_ir_listener()