#!/usr/bin/env python3 """ Raspberry Pi Video Player Auto-Start Script Main video player with VLC integration, IR remote control, and TV channel system """ import os import sys import time import json import logging import threading import queue import subprocess import argparse import random import socket from pathlib import Path from typing import Dict, List, Optional, Tuple import vlc import RPi.GPIO as GPIO from dotenv import load_dotenv import psutil # Load environment variables load_dotenv() class VideoPlayer: """Main video player class with VLC integration and IR remote control""" def __init__(self, config_path: str = "config.json", enable_ir: bool = True): """Initialize the video player with configuration""" self.config_path = config_path self.config = self.load_config() self.setup_logging() # Initialize components self.vlc_instance = None self.vlc_player = None self.channels = {} self.current_channel = None self.ir_queue = queue.Queue() self.running = False self.enable_ir = enable_ir # IR Remote settings self.ir_pin = self.config.get('ir_pin', 18) self.ir_codes = self.config.get('ir_codes', {}) # Channel settings self.video_folder = Path(self.config.get('video_folder', os.getenv('VIDEO_FOLDER', '/home/pi/Videos'))) self.default_channel = self.config.get('default_channel', 1) self.channel_timeout = self.config.get('channel_timeout', 3.0) self.multi_digit_timeout = self.config.get('multi_digit_timeout', 1.0) self.channel_refresh_interval = self.config.get('channel_refresh_interval', 30.0) # seconds # Multi-digit channel input self.channel_input = "" self.last_digit_time = 0 # Channel refresh tracking self.last_channel_refresh = time.time() # Control socket for external commands self.control_socket_path = "/tmp/video_player_control.sock" self.control_socket = None self.control_thread = None self.logger.info("Video Player initialized") def load_config(self) -> Dict: """Load configuration from JSON file""" try: if os.path.exists(self.config_path): with open(self.config_path, 'r') as f: return json.load(f) else: self.create_default_config() return self.load_config() except Exception as e: print(f"Error loading config: {e}") return self.get_default_config() def get_default_config(self) -> Dict: """Get default configuration""" return { "video_folder": os.getenv('VIDEO_FOLDER', '/home/pi/Videos'), "ir_pin": 18, "default_channel": 1, "channel_timeout": 3.0, "multi_digit_timeout": 1.0, "channel_refresh_interval": 30.0, "vlc_options": [ "--fullscreen", "--no-video-title-show", "--quiet" ], "ir_codes": { "0": "channel_0", "1": "channel_1", "2": "channel_2", "3": "channel_3", "4": "channel_4", "5": "channel_5", "6": "channel_6", "7": "channel_7", "8": "channel_8", "9": "channel_9", "power": "power_toggle", "play": "play_pause", "stop": "stop", "next": "next_channel", "prev": "prev_channel", "vol_up": "volume_up", "vol_down": "volume_down" } } def create_default_config(self): """Create default configuration file""" config = self.get_default_config() with open(self.config_path, 'w') as f: json.dump(config, f, indent=2) print(f"Created default configuration file: {self.config_path}") def setup_logging(self): """Setup logging configuration""" log_level = self.config.get('log_level', 'INFO') log_file = self.config.get('log_file', '/var/log/video_player.log') logging.basicConfig( level=getattr(logging, log_level.upper()), format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', handlers=[ logging.FileHandler(log_file), logging.StreamHandler(sys.stdout) ] ) self.logger = logging.getLogger(__name__) def initialize_vlc(self): """Initialize VLC media player""" try: vlc_options = self.config.get('vlc_options', []) # Use the configured VLC options all_vlc_options = vlc_options self.vlc_instance = vlc.Instance(all_vlc_options) self.vlc_player = self.vlc_instance.media_player_new() # Set up event manager for video end detection self.vlc_event_manager = self.vlc_player.event_manager() self.vlc_event_manager.event_attach(vlc.EventType.MediaPlayerEndReached, self.on_video_end) # Set fullscreen and always on top if '--fullscreen' in vlc_options: self.vlc_player.set_fullscreen(True) # Force video to stay on top self.vlc_player.set_xwindow(0) # Use root window # Set video to always be on top try: import subprocess # Use wmctrl to ensure VLC stays on top subprocess.run(['wmctrl', '-r', 'VLC media player', '-b', 'add,above'], capture_output=True, timeout=5) except: pass # wmctrl might not be available self.logger.info("VLC player initialized successfully") return True except Exception as e: self.logger.error(f"Failed to initialize VLC: {e}") return False def scan_video_folder(self) -> List[Path]: """Scan video folder for supported video files""" video_extensions = {'.mp4', '.avi', '.mkv', '.mov', '.wmv', '.flv', '.webm', '.m4v'} video_files = [] if not self.video_folder.exists(): self.logger.error(f"Video folder does not exist: {self.video_folder}") return video_files try: for file_path in self.video_folder.rglob('*'): if file_path.is_file() and file_path.suffix.lower() in video_extensions: video_files.append(file_path) video_files.sort() # Sort alphabetically self.logger.info(f"Found {len(video_files)} video files") return video_files except Exception as e: self.logger.error(f"Error scanning video folder: {e}") return video_files def create_channels(self): """Create channel mapping from video files automatically""" video_files = self.scan_video_folder() self.channels = {} for i, video_file in enumerate(video_files, 1): self.channels[i] = { 'number': i, 'name': video_file.stem, 'path': str(video_file), 'file': video_file } self.logger.info(f"Created {len(self.channels)} channels automatically from directory") def load_channels_from_directory(self): """Load channels automatically from video directory""" try: self.create_channels() if self.channels: self.logger.info(f"Loaded {len(self.channels)} channels from directory") return True else: self.logger.warning("No video files found in directory") return False except Exception as e: self.logger.error(f"Error loading channels from directory: {e}") return False def refresh_channels(self): """Refresh channel list from directory (useful when videos are added/removed)""" old_channel_count = len(self.channels) self.load_channels_from_directory() new_channel_count = len(self.channels) if new_channel_count != old_channel_count: self.logger.info(f"Channel count changed: {old_channel_count} -> {new_channel_count}") # If current channel no longer exists, switch to first available if self.current_channel and self.current_channel not in self.channels: if self.channels: first_channel = min(self.channels.keys()) self.logger.info(f"Current channel {self.current_channel} no longer exists, switching to channel {first_channel}") self.play_channel(first_channel) else: self.logger.warning("No channels available after refresh") self.current_channel = None def play_channel(self, channel_number: int) -> bool: """Play video for specified channel number""" if channel_number not in self.channels: self.logger.warning(f"Channel {channel_number} not found") return False try: channel = self.channels[channel_number] media = self.vlc_instance.media_new(channel['path']) self.vlc_player.set_media(media) self.vlc_player.play() self.current_channel = channel_number self.logger.info(f"Playing channel {channel_number}: {channel['name']}") # Wait for media to start playing time.sleep(0.5) return True except Exception as e: self.logger.error(f"Error playing channel {channel_number}: {e}") return False def play_random_video(self) -> bool: """Play a random video from the available channels""" if not self.channels: self.logger.error("No channels available for random playback") return False try: # Get a random channel number random_channel = random.choice(list(self.channels.keys())) self.logger.info(f"Selected random channel: {random_channel}") return self.play_channel(random_channel) except Exception as e: self.logger.error(f"Error playing random video: {e}") return False def play_video_by_index(self, index: int) -> bool: """Play video by folder index (0-based)""" video_files = self.scan_video_folder() if not video_files: self.logger.error("No video files found") return False if index < 0 or index >= len(video_files): self.logger.warning(f"Index {index} out of range (0-{len(video_files)-1})") return False try: video_file = video_files[index] media = self.vlc_instance.media_new(str(video_file)) self.vlc_player.set_media(media) self.vlc_player.play() self.current_channel = None # Not a channel-based playback self.logger.info(f"Playing video by index {index}: {video_file.name}") # Wait for media to start playing time.sleep(0.5) return True except Exception as e: self.logger.error(f"Error playing video by index {index}: {e}") return False def setup_gpio(self): """Setup GPIO for IR receiver""" if not self.enable_ir: self.logger.info("IR remote control disabled, skipping GPIO setup") return True try: GPIO.setmode(GPIO.BCM) GPIO.setup(self.ir_pin, GPIO.IN, pull_up_down=GPIO.PUD_UP) GPIO.add_event_detect(self.ir_pin, GPIO.FALLING, callback=self.ir_callback, bouncetime=50) self.logger.info(f"GPIO setup complete on pin {self.ir_pin}") return True except Exception as e: self.logger.error(f"GPIO setup failed: {e}") return False def ir_callback(self, channel): """GPIO callback for IR signal detection""" try: # Simple IR signal detection (this is a basic implementation) # In a real implementation, you would decode the actual IR protocol start_time = time.time() # Wait for signal to stabilize while GPIO.input(self.ir_pin) == GPIO.LOW: if time.time() - start_time > 0.1: # Timeout after 100ms break time.sleep(0.001) signal_duration = time.time() - start_time # Basic IR code detection (this is simplified) # Real implementation would decode specific IR protocols ir_code = self.detect_ir_code(signal_duration) if ir_code: self.ir_queue.put(ir_code) except Exception as e: self.logger.error(f"IR callback error: {e}") def detect_ir_code(self, duration: float) -> Optional[str]: """Detect IR code from signal duration (simplified implementation)""" # This is a simplified implementation # Real IR decoding would analyze pulse patterns if duration < 0.01: # Very short pulse return "0" elif duration < 0.02: # Short pulse return "1" elif duration < 0.03: # Medium pulse return "2" elif duration < 0.04: # Long pulse return "3" elif duration < 0.05: # Very long pulse return "4" else: return None def process_ir_commands(self): """Process IR commands from queue""" while self.running: try: if not self.ir_queue.empty(): ir_code = self.ir_queue.get(timeout=0.1) self.handle_ir_command(ir_code) else: time.sleep(0.01) except queue.Empty: continue except Exception as e: self.logger.error(f"Error processing IR command: {e}") def handle_ir_command(self, ir_code: str): """Handle IR remote commands""" command = self.ir_codes.get(ir_code) if not command: self.logger.debug(f"Unknown IR code: {ir_code}") return self.logger.info(f"IR Command: {command}") if command.startswith('channel_'): # Handle channel selection channel_num = int(command.split('_')[1]) self.handle_channel_input(channel_num) elif command == 'play_pause': self.toggle_play_pause() elif command == 'stop': self.stop_playback() elif command == 'next_channel': self.next_channel() elif command == 'prev_channel': self.prev_channel() elif command == 'volume_up': self.volume_up() elif command == 'volume_down': self.volume_down() elif command == 'power_toggle': self.power_toggle() def handle_channel_input(self, digit: int): """Handle multi-digit channel input""" current_time = time.time() # Reset input if too much time has passed if current_time - self.last_digit_time > self.multi_digit_timeout: self.channel_input = "" self.channel_input += str(digit) self.last_digit_time = current_time # Wait for more digits or timeout threading.Timer(self.multi_digit_timeout, self.process_channel_input).start() def process_channel_input(self): """Process complete channel input""" if not self.channel_input: return try: channel_number = int(self.channel_input) if channel_number in self.channels: self.play_channel(channel_number) else: self.logger.warning(f"Invalid channel number: {channel_number}") except ValueError: self.logger.warning(f"Invalid channel input: {self.channel_input}") self.channel_input = "" def toggle_play_pause(self): """Toggle play/pause""" if self.vlc_player: if self.vlc_player.is_playing(): self.vlc_player.pause() else: self.vlc_player.play() def stop_playback(self): """Stop playback""" if self.vlc_player: self.vlc_player.stop() def next_channel(self): """Go to next channel""" if self.current_channel and self.current_channel < max(self.channels.keys()): self.play_channel(self.current_channel + 1) def prev_channel(self): """Go to previous channel""" if self.current_channel and self.current_channel > min(self.channels.keys()): self.play_channel(self.current_channel - 1) def volume_up(self): """Increase volume""" if self.vlc_player: current_volume = self.vlc_player.audio_get_volume() self.vlc_player.audio_set_volume(min(100, current_volume + 10)) def volume_down(self): """Decrease volume""" if self.vlc_player: current_volume = self.vlc_player.audio_get_volume() self.vlc_player.audio_set_volume(max(0, current_volume - 10)) def power_toggle(self): """Toggle power (exit application)""" self.logger.info("Power toggle - shutting down") self.running = False def setup_control_socket(self): """Setup Unix socket for external control commands""" try: # Remove existing socket file if it exists if os.path.exists(self.control_socket_path): os.unlink(self.control_socket_path) # Create Unix socket self.control_socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) self.control_socket.bind(self.control_socket_path) self.control_socket.listen(1) # Start control thread self.control_thread = threading.Thread(target=self.handle_control_commands, daemon=True) self.control_thread.start() self.logger.info(f"Control socket setup at {self.control_socket_path}") return True except Exception as e: self.logger.error(f"Failed to setup control socket: {e}") return False def handle_control_commands(self): """Handle control commands from external clients""" while self.running: try: # Accept connection client_socket, _ = self.control_socket.accept() # Receive command data = client_socket.recv(1024).decode('utf-8') command = json.loads(data) # Process command response = self.process_control_command(command) # Send response response_data = json.dumps(response).encode('utf-8') client_socket.send(response_data) client_socket.close() except Exception as e: if self.running: # Only log if we're still supposed to be running self.logger.error(f"Error handling control command: {e}") time.sleep(0.1) def process_control_command(self, command): """Process a control command and return response""" action = command.get("action") try: if action == "change_channel": channel = command.get("channel") if channel and channel in self.channels: success = self.play_channel(channel) if success: return { "success": True, "channel": channel, "channel_name": self.channels[channel]["name"] } else: return {"success": False, "error": "Failed to play channel"} else: return {"success": False, "error": f"Invalid channel: {channel}"} elif action == "random_channel": success = self.play_random_video() if success: return { "success": True, "channel": self.current_channel, "channel_name": self.channels[self.current_channel]["name"] if self.current_channel else "Unknown" } else: return {"success": False, "error": "Failed to play random channel"} elif action == "list_channels": return { "success": True, "channels": self.channels } elif action == "get_current_channel": if self.current_channel and self.current_channel in self.channels: return { "success": True, "channel": self.current_channel, "channel_name": self.channels[self.current_channel]["name"] } else: return { "success": True, "channel": None, "channel_name": None } elif action == "stop": self.logger.info("Stop command received via control socket") self.running = False return {"success": True} else: return {"success": False, "error": f"Unknown action: {action}"} except Exception as e: self.logger.error(f"Error processing control command: {e}") return {"success": False, "error": str(e)} def start(self, startup_mode: str = "default", channel_number: int = None, video_index: int = None): """Start the video player with specified mode Args: startup_mode: "default" (random), "random", "channel", or "index" channel_number: Channel number to play (for channel mode) video_index: Video index to play (for index mode) """ self.logger.info(f"Starting video player in {startup_mode} mode...") self.running = True # Initialize VLC if not self.initialize_vlc(): self.logger.error("Failed to initialize VLC") return False # Setup GPIO if not self.setup_gpio(): self.logger.error("Failed to setup GPIO") return False # Load channels automatically from directory if not self.load_channels_from_directory(): self.logger.error("Failed to load channels from directory") return False if not self.channels: self.logger.error("No video files found") return False # Start IR processing thread only if IR is enabled if self.enable_ir: ir_thread = threading.Thread(target=self.process_ir_commands, daemon=True) ir_thread.start() else: self.logger.info("IR remote control disabled, skipping IR processing thread") # Setup control socket for external commands if not self.setup_control_socket(): self.logger.warning("Failed to setup control socket, external commands will not work") # Play based on startup mode success = False if startup_mode == "random": success = self.play_random_video() elif startup_mode == "channel" and channel_number is not None: success = self.play_channel(channel_number) elif startup_mode == "index" and video_index is not None: success = self.play_video_by_index(video_index) else: # default mode - use random channel for variety success = self.play_random_video() if success: self.logger.info("Video player started successfully") else: self.logger.error("Failed to start video playback") return success def run(self, startup_mode: str = "default", channel_number: int = None, video_index: int = None): """Main run loop""" if not self.start(startup_mode, channel_number, video_index): return # Ensure we always have a video playing self.ensure_video_playing() try: while self.running: # Check if VLC is still running if self.vlc_player and not self.vlc_player.is_playing(): # If no video is playing, restart the current channel or play a default one self.logger.info("Video stopped, ensuring video is playing") self.ensure_video_playing() # Periodically refresh channels to detect new/removed videos current_time = time.time() if current_time - self.last_channel_refresh > self.channel_refresh_interval: self.refresh_channels() self.last_channel_refresh = current_time # Keep VLC on top self.keep_vlc_on_top() time.sleep(1) except KeyboardInterrupt: self.logger.info("Received keyboard interrupt") except Exception as e: self.logger.error(f"Unexpected error in main loop: {e}") finally: self.cleanup() def on_video_end(self, event): """Callback for when a video ends - start a random channel""" self.logger.info("Video ended, starting random channel") if self.channels: self.play_random_video() else: self.logger.error("No channels available to play after video end") def ensure_video_playing(self): """Ensure a video is always playing""" if not self.vlc_player or not self.vlc_player.is_playing(): if self.channels: # Play a random channel instead of restarting current or first channel self.logger.info("No video playing, starting random channel") self.play_random_video() else: self.logger.error("No channels available to play") def keep_vlc_on_top(self): """Keep VLC window on top""" try: import subprocess # Use wmctrl to keep VLC on top subprocess.run(['wmctrl', '-r', 'VLC media player', '-b', 'add,above'], capture_output=True, timeout=2) except: pass # wmctrl might not be available or VLC window might not exist yet def cleanup(self): """Cleanup resources""" self.logger.info("Cleaning up...") self.running = False if self.vlc_player: self.vlc_player.stop() if self.enable_ir: GPIO.cleanup() # Cleanup control socket if self.control_socket: try: self.control_socket.close() except: pass # Remove socket file if os.path.exists(self.control_socket_path): try: os.unlink(self.control_socket_path) except: pass self.logger.info("Cleanup complete") def parse_arguments(): """Parse command line arguments""" parser = argparse.ArgumentParser( description="Raspberry Pi Video Player with IR Remote Control", formatter_class=argparse.RawDescriptionHelpFormatter, epilog=""" Examples: python3 video_player.py # Start with random channel python3 video_player.py --random # Start with random video python3 video_player.py --channel 5 # Start with channel 5 (auto-assigned) python3 video_player.py --index 2 # Start with video at index 2 (0-based) python3 video_player.py --no-ir # Start without IR remote control python3 video_player.py --list-channels # List available channels (auto-assigned) python3 video_player.py --list-videos # List available videos with indices python3 video_player.py --refresh-channels # Refresh channel list from directory Note: Channels are automatically assigned numbers (1, 2, 3...) based on alphabetical order of video files found in the video directory. No channel mapping file is needed. Use --no-ir to start without IR remote control if GPIO access is not available. """ ) # Startup mode arguments (mutually exclusive) mode_group = parser.add_mutually_exclusive_group() mode_group.add_argument( '--random', action='store_true', help='Start with a random video' ) mode_group.add_argument( '--channel', type=int, metavar='N', help='Start with specific channel number' ) mode_group.add_argument( '--index', type=int, metavar='N', help='Start with video at specific index (0-based)' ) # Information arguments parser.add_argument( '--list-channels', action='store_true', help='List available channels and exit' ) parser.add_argument( '--list-videos', action='store_true', help='List available videos with indices and exit' ) parser.add_argument( '--refresh-channels', action='store_true', help='Refresh channel list from directory and exit' ) # Configuration arguments parser.add_argument( '--config', type=str, default='config.json', help='Path to configuration file (default: config.json)' ) parser.add_argument( '--no-ir', action='store_true', help='Start without IR remote control (skip GPIO setup)' ) return parser.parse_args() def list_channels(player): """List available channels""" if not player.channels: print("No channels available") return print("Available channels:") print("-" * 50) for channel_num in sorted(player.channels.keys()): channel = player.channels[channel_num] print(f"Channel {channel_num}: {channel['name']}") print(f" Path: {channel['path']}") print() def list_videos(player): """List available videos with indices""" video_files = player.scan_video_folder() if not video_files: print("No video files found") return print("Available videos (with indices):") print("-" * 50) for i, video_file in enumerate(video_files): print(f"Index {i}: {video_file.name}") print(f" Path: {video_file}") print() def main(): """Main entry point""" args = parse_arguments() # Check if user is in gpio group (needed for GPIO access) only if IR is enabled if not args.no_ir: import grp try: gpio_group = grp.getgrnam('gpio') current_groups = os.getgroups() if gpio_group.gr_gid not in current_groups: print("This script requires GPIO access. Please add your user to the gpio group:") print("sudo usermod -a -G gpio $USER") print("Then log out and log back in, or run: newgrp gpio") print("Alternatively, use --no-ir to start without IR remote control") sys.exit(1) except KeyError: print("GPIO group not found. Please ensure your system has GPIO support.") print("Alternatively, use --no-ir to start without IR remote control") sys.exit(1) # Check if another instance is running current_pid = os.getpid() for proc in psutil.process_iter(['pid', 'name', 'cmdline']): try: if proc.info['cmdline']: cmdline = proc.info['cmdline'] # Check if this is actually running video_player.py (not just containing it in args) if (len(cmdline) > 1 and cmdline[1].endswith('video_player.py') and proc.info['pid'] != current_pid): print("Another instance of video_player.py is already running") sys.exit(1) except (psutil.NoSuchProcess, psutil.AccessDenied): continue # Create video player enable_ir = not args.no_ir player = VideoPlayer(args.config, enable_ir=enable_ir) # Handle list commands if args.list_channels: # Load channels from directory if not player.load_channels_from_directory(): print("No video files found in directory") return list_channels(player) return if args.list_videos: list_videos(player) return if args.refresh_channels: if player.load_channels_from_directory(): print(f"Refreshed channels: {len(player.channels)} channels found") list_channels(player) else: print("No video files found in directory") return # Determine startup mode and parameters startup_mode = "default" channel_number = None video_index = None if args.random: startup_mode = "random" elif args.channel is not None: startup_mode = "channel" channel_number = args.channel elif args.index is not None: startup_mode = "index" video_index = args.index # Run video player player.run(startup_mode, channel_number, video_index) if __name__ == "__main__": main()