#!/usr/bin/env python3 """ Raspberry Pi Video Player Auto-Start Script Main video player with VLC integration, IR remote control, and TV channel system """ import os import sys import time import json import logging import threading import queue import subprocess from pathlib import Path from typing import Dict, List, Optional, Tuple import vlc import RPi.GPIO as GPIO from dotenv import load_dotenv import psutil # Load environment variables load_dotenv() class VideoPlayer: """Main video player class with VLC integration and IR remote control""" def __init__(self, config_path: str = "config.json"): """Initialize the video player with configuration""" self.config_path = config_path self.config = self.load_config() self.setup_logging() # Initialize components self.vlc_instance = None self.vlc_player = None self.channels = {} self.current_channel = None self.ir_queue = queue.Queue() self.running = False # IR Remote settings self.ir_pin = self.config.get('ir_pin', 18) self.ir_codes = self.config.get('ir_codes', {}) # Channel settings self.video_folder = Path(self.config.get('video_folder', os.getenv('VIDEO_FOLDER', '/home/pi/Videos'))) self.default_channel = self.config.get('default_channel', 1) self.channel_timeout = self.config.get('channel_timeout', 3.0) self.multi_digit_timeout = self.config.get('multi_digit_timeout', 1.0) # Multi-digit channel input self.channel_input = "" self.last_digit_time = 0 self.logger.info("Video Player initialized") def load_config(self) -> Dict: """Load configuration from JSON file""" try: if os.path.exists(self.config_path): with open(self.config_path, 'r') as f: return json.load(f) else: self.create_default_config() return self.load_config() except Exception as e: print(f"Error loading config: {e}") return self.get_default_config() def get_default_config(self) -> Dict: """Get default configuration""" return { "video_folder": os.getenv('VIDEO_FOLDER', '/home/pi/Videos'), "ir_pin": 18, "default_channel": 1, "channel_timeout": 3.0, "multi_digit_timeout": 1.0, "vlc_options": [ "--fullscreen", "--no-video-title-show", "--no-audio-display" ], "ir_codes": { "0": "channel_0", "1": "channel_1", "2": "channel_2", "3": "channel_3", "4": "channel_4", "5": "channel_5", "6": "channel_6", "7": "channel_7", "8": "channel_8", "9": "channel_9", "power": "power_toggle", "play": "play_pause", "stop": "stop", "next": "next_channel", "prev": "prev_channel", "vol_up": "volume_up", "vol_down": "volume_down" } } def create_default_config(self): """Create default configuration file""" config = self.get_default_config() with open(self.config_path, 'w') as f: json.dump(config, f, indent=2) print(f"Created default configuration file: {self.config_path}") def setup_logging(self): """Setup logging configuration""" log_level = self.config.get('log_level', 'INFO') log_file = self.config.get('log_file', '/var/log/video_player.log') logging.basicConfig( level=getattr(logging, log_level.upper()), format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', handlers=[ logging.FileHandler(log_file), logging.StreamHandler(sys.stdout) ] ) self.logger = logging.getLogger(__name__) def initialize_vlc(self): """Initialize VLC media player""" try: vlc_options = self.config.get('vlc_options', []) self.vlc_instance = vlc.Instance(vlc_options) self.vlc_player = self.vlc_instance.media_player_new() # Set fullscreen if specified if '--fullscreen' in vlc_options: self.vlc_player.set_fullscreen(True) self.logger.info("VLC player initialized successfully") return True except Exception as e: self.logger.error(f"Failed to initialize VLC: {e}") return False def scan_video_folder(self) -> List[Path]: """Scan video folder for supported video files""" video_extensions = {'.mp4', '.avi', '.mkv', '.mov', '.wmv', '.flv', '.webm', '.m4v'} video_files = [] if not self.video_folder.exists(): self.logger.error(f"Video folder does not exist: {self.video_folder}") return video_files try: for file_path in self.video_folder.rglob('*'): if file_path.is_file() and file_path.suffix.lower() in video_extensions: video_files.append(file_path) video_files.sort() # Sort alphabetically self.logger.info(f"Found {len(video_files)} video files") return video_files except Exception as e: self.logger.error(f"Error scanning video folder: {e}") return video_files def create_channels(self): """Create channel mapping from video files""" video_files = self.scan_video_folder() self.channels = {} for i, video_file in enumerate(video_files, 1): self.channels[i] = { 'number': i, 'name': video_file.stem, 'path': str(video_file), 'file': video_file } self.logger.info(f"Created {len(self.channels)} channels") # Save channel mapping to file self.save_channel_mapping() def save_channel_mapping(self): """Save channel mapping to JSON file""" try: channel_data = {} for channel_num, channel_info in self.channels.items(): channel_data[str(channel_num)] = { 'name': channel_info['name'], 'path': channel_info['path'] } with open('channels.json', 'w') as f: json.dump(channel_data, f, indent=2) self.logger.info("Channel mapping saved to channels.json") except Exception as e: self.logger.error(f"Error saving channel mapping: {e}") def load_channel_mapping(self): """Load channel mapping from JSON file""" try: if os.path.exists('channels.json'): with open('channels.json', 'r') as f: channel_data = json.load(f) self.channels = {} for channel_num, channel_info in channel_data.items(): video_path = Path(channel_info['path']) if video_path.exists(): self.channels[int(channel_num)] = { 'number': int(channel_num), 'name': channel_info['name'], 'path': channel_info['path'], 'file': video_path } self.logger.info(f"Loaded {len(self.channels)} channels from mapping file") return True except Exception as e: self.logger.error(f"Error loading channel mapping: {e}") return False def play_channel(self, channel_number: int) -> bool: """Play video for specified channel number""" if channel_number not in self.channels: self.logger.warning(f"Channel {channel_number} not found") return False try: channel = self.channels[channel_number] media = self.vlc_instance.media_new(channel['path']) self.vlc_player.set_media(media) self.vlc_player.play() self.current_channel = channel_number self.logger.info(f"Playing channel {channel_number}: {channel['name']}") # Wait for media to start playing time.sleep(0.5) return True except Exception as e: self.logger.error(f"Error playing channel {channel_number}: {e}") return False def setup_gpio(self): """Setup GPIO for IR receiver""" try: GPIO.setmode(GPIO.BCM) GPIO.setup(self.ir_pin, GPIO.IN, pull_up_down=GPIO.PUD_UP) GPIO.add_event_detect(self.ir_pin, GPIO.FALLING, callback=self.ir_callback, bouncetime=50) self.logger.info(f"GPIO setup complete on pin {self.ir_pin}") return True except Exception as e: self.logger.error(f"GPIO setup failed: {e}") return False def ir_callback(self, channel): """GPIO callback for IR signal detection""" try: # Simple IR signal detection (this is a basic implementation) # In a real implementation, you would decode the actual IR protocol start_time = time.time() # Wait for signal to stabilize while GPIO.input(self.ir_pin) == GPIO.LOW: if time.time() - start_time > 0.1: # Timeout after 100ms break time.sleep(0.001) signal_duration = time.time() - start_time # Basic IR code detection (this is simplified) # Real implementation would decode specific IR protocols ir_code = self.detect_ir_code(signal_duration) if ir_code: self.ir_queue.put(ir_code) except Exception as e: self.logger.error(f"IR callback error: {e}") def detect_ir_code(self, duration: float) -> Optional[str]: """Detect IR code from signal duration (simplified implementation)""" # This is a simplified implementation # Real IR decoding would analyze pulse patterns if duration < 0.01: # Very short pulse return "0" elif duration < 0.02: # Short pulse return "1" elif duration < 0.03: # Medium pulse return "2" elif duration < 0.04: # Long pulse return "3" elif duration < 0.05: # Very long pulse return "4" else: return None def process_ir_commands(self): """Process IR commands from queue""" while self.running: try: if not self.ir_queue.empty(): ir_code = self.ir_queue.get(timeout=0.1) self.handle_ir_command(ir_code) else: time.sleep(0.01) except queue.Empty: continue except Exception as e: self.logger.error(f"Error processing IR command: {e}") def handle_ir_command(self, ir_code: str): """Handle IR remote commands""" command = self.ir_codes.get(ir_code) if not command: self.logger.debug(f"Unknown IR code: {ir_code}") return self.logger.info(f"IR Command: {command}") if command.startswith('channel_'): # Handle channel selection channel_num = int(command.split('_')[1]) self.handle_channel_input(channel_num) elif command == 'play_pause': self.toggle_play_pause() elif command == 'stop': self.stop_playback() elif command == 'next_channel': self.next_channel() elif command == 'prev_channel': self.prev_channel() elif command == 'volume_up': self.volume_up() elif command == 'volume_down': self.volume_down() elif command == 'power_toggle': self.power_toggle() def handle_channel_input(self, digit: int): """Handle multi-digit channel input""" current_time = time.time() # Reset input if too much time has passed if current_time - self.last_digit_time > self.multi_digit_timeout: self.channel_input = "" self.channel_input += str(digit) self.last_digit_time = current_time # Wait for more digits or timeout threading.Timer(self.multi_digit_timeout, self.process_channel_input).start() def process_channel_input(self): """Process complete channel input""" if not self.channel_input: return try: channel_number = int(self.channel_input) if channel_number in self.channels: self.play_channel(channel_number) else: self.logger.warning(f"Invalid channel number: {channel_number}") except ValueError: self.logger.warning(f"Invalid channel input: {self.channel_input}") self.channel_input = "" def toggle_play_pause(self): """Toggle play/pause""" if self.vlc_player: if self.vlc_player.is_playing(): self.vlc_player.pause() else: self.vlc_player.play() def stop_playback(self): """Stop playback""" if self.vlc_player: self.vlc_player.stop() def next_channel(self): """Go to next channel""" if self.current_channel and self.current_channel < max(self.channels.keys()): self.play_channel(self.current_channel + 1) def prev_channel(self): """Go to previous channel""" if self.current_channel and self.current_channel > min(self.channels.keys()): self.play_channel(self.current_channel - 1) def volume_up(self): """Increase volume""" if self.vlc_player: current_volume = self.vlc_player.audio_get_volume() self.vlc_player.audio_set_volume(min(100, current_volume + 10)) def volume_down(self): """Decrease volume""" if self.vlc_player: current_volume = self.vlc_player.audio_get_volume() self.vlc_player.audio_set_volume(max(0, current_volume - 10)) def power_toggle(self): """Toggle power (exit application)""" self.logger.info("Power toggle - shutting down") self.running = False def start(self): """Start the video player""" self.logger.info("Starting video player...") self.running = True # Initialize VLC if not self.initialize_vlc(): self.logger.error("Failed to initialize VLC") return False # Setup GPIO if not self.setup_gpio(): self.logger.error("Failed to setup GPIO") return False # Load or create channels if not self.load_channel_mapping(): self.create_channels() if not self.channels: self.logger.error("No video files found") return False # Start IR processing thread ir_thread = threading.Thread(target=self.process_ir_commands, daemon=True) ir_thread.start() # Play default channel if self.default_channel in self.channels: self.play_channel(self.default_channel) else: # Play first available channel first_channel = min(self.channels.keys()) self.play_channel(first_channel) self.logger.info("Video player started successfully") return True def run(self): """Main run loop""" if not self.start(): return try: while self.running: # Check if VLC is still running if self.vlc_player and not self.vlc_player.is_playing(): # If no video is playing and we have a current channel, restart it if self.current_channel: self.logger.info("Video stopped, restarting current channel") self.play_channel(self.current_channel) time.sleep(1) except KeyboardInterrupt: self.logger.info("Received keyboard interrupt") except Exception as e: self.logger.error(f"Unexpected error in main loop: {e}") finally: self.cleanup() def cleanup(self): """Cleanup resources""" self.logger.info("Cleaning up...") self.running = False if self.vlc_player: self.vlc_player.stop() GPIO.cleanup() self.logger.info("Cleanup complete") def main(): """Main entry point""" # Check if running as root (needed for GPIO access) if os.geteuid() != 0: print("This script must be run as root for GPIO access") print("Use: sudo python3 video_player.py") sys.exit(1) # Check if another instance is running for proc in psutil.process_iter(['pid', 'name', 'cmdline']): try: if 'video_player.py' in ' '.join(proc.info['cmdline'] or []) and proc.info['pid'] != os.getpid(): print("Another instance of video_player.py is already running") sys.exit(1) except (psutil.NoSuchProcess, psutil.AccessDenied): continue # Create and run video player player = VideoPlayer() player.run() if __name__ == "__main__": main()