Files
rpi-tulivision/video_player.py

691 lines
24 KiB
Python

#!/usr/bin/env python3
"""
Raspberry Pi Video Player Auto-Start Script
Main video player with VLC integration, IR remote control, and TV channel system
"""
import os
import sys
import time
import json
import logging
import threading
import queue
import subprocess
import argparse
import random
from pathlib import Path
from typing import Dict, List, Optional, Tuple
import vlc
import RPi.GPIO as GPIO
from dotenv import load_dotenv
import psutil
# Load environment variables
load_dotenv()
class VideoPlayer:
"""Main video player class with VLC integration and IR remote control"""
def __init__(self, config_path: str = "config.json"):
"""Initialize the video player with configuration"""
self.config_path = config_path
self.config = self.load_config()
self.setup_logging()
# Initialize components
self.vlc_instance = None
self.vlc_player = None
self.channels = {}
self.current_channel = None
self.ir_queue = queue.Queue()
self.running = False
# IR Remote settings
self.ir_pin = self.config.get('ir_pin', 18)
self.ir_codes = self.config.get('ir_codes', {})
# Channel settings
self.video_folder = Path(self.config.get('video_folder', os.getenv('VIDEO_FOLDER', '/home/pi/Videos')))
self.default_channel = self.config.get('default_channel', 1)
self.channel_timeout = self.config.get('channel_timeout', 3.0)
self.multi_digit_timeout = self.config.get('multi_digit_timeout', 1.0)
# Multi-digit channel input
self.channel_input = ""
self.last_digit_time = 0
self.logger.info("Video Player initialized")
def load_config(self) -> Dict:
"""Load configuration from JSON file"""
try:
if os.path.exists(self.config_path):
with open(self.config_path, 'r') as f:
return json.load(f)
else:
self.create_default_config()
return self.load_config()
except Exception as e:
print(f"Error loading config: {e}")
return self.get_default_config()
def get_default_config(self) -> Dict:
"""Get default configuration"""
return {
"video_folder": os.getenv('VIDEO_FOLDER', '/home/pi/Videos'),
"ir_pin": 18,
"default_channel": 1,
"channel_timeout": 3.0,
"multi_digit_timeout": 1.0,
"vlc_options": [
"--fullscreen",
"--no-video-title-show",
"--no-audio-display"
],
"ir_codes": {
"0": "channel_0",
"1": "channel_1",
"2": "channel_2",
"3": "channel_3",
"4": "channel_4",
"5": "channel_5",
"6": "channel_6",
"7": "channel_7",
"8": "channel_8",
"9": "channel_9",
"power": "power_toggle",
"play": "play_pause",
"stop": "stop",
"next": "next_channel",
"prev": "prev_channel",
"vol_up": "volume_up",
"vol_down": "volume_down"
}
}
def create_default_config(self):
"""Create default configuration file"""
config = self.get_default_config()
with open(self.config_path, 'w') as f:
json.dump(config, f, indent=2)
print(f"Created default configuration file: {self.config_path}")
def setup_logging(self):
"""Setup logging configuration"""
log_level = self.config.get('log_level', 'INFO')
log_file = self.config.get('log_file', '/var/log/video_player.log')
logging.basicConfig(
level=getattr(logging, log_level.upper()),
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler(log_file),
logging.StreamHandler(sys.stdout)
]
)
self.logger = logging.getLogger(__name__)
def initialize_vlc(self):
"""Initialize VLC media player"""
try:
vlc_options = self.config.get('vlc_options', [])
self.vlc_instance = vlc.Instance(vlc_options)
self.vlc_player = self.vlc_instance.media_player_new()
# Set fullscreen if specified
if '--fullscreen' in vlc_options:
self.vlc_player.set_fullscreen(True)
self.logger.info("VLC player initialized successfully")
return True
except Exception as e:
self.logger.error(f"Failed to initialize VLC: {e}")
return False
def scan_video_folder(self) -> List[Path]:
"""Scan video folder for supported video files"""
video_extensions = {'.mp4', '.avi', '.mkv', '.mov', '.wmv', '.flv', '.webm', '.m4v'}
video_files = []
if not self.video_folder.exists():
self.logger.error(f"Video folder does not exist: {self.video_folder}")
return video_files
try:
for file_path in self.video_folder.rglob('*'):
if file_path.is_file() and file_path.suffix.lower() in video_extensions:
video_files.append(file_path)
video_files.sort() # Sort alphabetically
self.logger.info(f"Found {len(video_files)} video files")
return video_files
except Exception as e:
self.logger.error(f"Error scanning video folder: {e}")
return video_files
def create_channels(self):
"""Create channel mapping from video files"""
video_files = self.scan_video_folder()
self.channels = {}
for i, video_file in enumerate(video_files, 1):
self.channels[i] = {
'number': i,
'name': video_file.stem,
'path': str(video_file),
'file': video_file
}
self.logger.info(f"Created {len(self.channels)} channels")
# Save channel mapping to file
self.save_channel_mapping()
def save_channel_mapping(self):
"""Save channel mapping to JSON file"""
try:
channel_data = {}
for channel_num, channel_info in self.channels.items():
channel_data[str(channel_num)] = {
'name': channel_info['name'],
'path': channel_info['path']
}
with open('channels.json', 'w') as f:
json.dump(channel_data, f, indent=2)
self.logger.info("Channel mapping saved to channels.json")
except Exception as e:
self.logger.error(f"Error saving channel mapping: {e}")
def load_channel_mapping(self):
"""Load channel mapping from JSON file"""
try:
if os.path.exists('channels.json'):
with open('channels.json', 'r') as f:
channel_data = json.load(f)
self.channels = {}
for channel_num, channel_info in channel_data.items():
video_path = Path(channel_info['path'])
if video_path.exists():
self.channels[int(channel_num)] = {
'number': int(channel_num),
'name': channel_info['name'],
'path': channel_info['path'],
'file': video_path
}
self.logger.info(f"Loaded {len(self.channels)} channels from mapping file")
return True
except Exception as e:
self.logger.error(f"Error loading channel mapping: {e}")
return False
def play_channel(self, channel_number: int) -> bool:
"""Play video for specified channel number"""
if channel_number not in self.channels:
self.logger.warning(f"Channel {channel_number} not found")
return False
try:
channel = self.channels[channel_number]
media = self.vlc_instance.media_new(channel['path'])
self.vlc_player.set_media(media)
self.vlc_player.play()
self.current_channel = channel_number
self.logger.info(f"Playing channel {channel_number}: {channel['name']}")
# Wait for media to start playing
time.sleep(0.5)
return True
except Exception as e:
self.logger.error(f"Error playing channel {channel_number}: {e}")
return False
def play_random_video(self) -> bool:
"""Play a random video from the available channels"""
if not self.channels:
self.logger.error("No channels available for random playback")
return False
try:
# Get a random channel number
random_channel = random.choice(list(self.channels.keys()))
self.logger.info(f"Selected random channel: {random_channel}")
return self.play_channel(random_channel)
except Exception as e:
self.logger.error(f"Error playing random video: {e}")
return False
def play_video_by_index(self, index: int) -> bool:
"""Play video by folder index (0-based)"""
video_files = self.scan_video_folder()
if not video_files:
self.logger.error("No video files found")
return False
if index < 0 or index >= len(video_files):
self.logger.warning(f"Index {index} out of range (0-{len(video_files)-1})")
return False
try:
video_file = video_files[index]
media = self.vlc_instance.media_new(str(video_file))
self.vlc_player.set_media(media)
self.vlc_player.play()
self.current_channel = None # Not a channel-based playback
self.logger.info(f"Playing video by index {index}: {video_file.name}")
# Wait for media to start playing
time.sleep(0.5)
return True
except Exception as e:
self.logger.error(f"Error playing video by index {index}: {e}")
return False
def setup_gpio(self):
"""Setup GPIO for IR receiver"""
try:
GPIO.setmode(GPIO.BCM)
GPIO.setup(self.ir_pin, GPIO.IN, pull_up_down=GPIO.PUD_UP)
GPIO.add_event_detect(self.ir_pin, GPIO.FALLING,
callback=self.ir_callback, bouncetime=50)
self.logger.info(f"GPIO setup complete on pin {self.ir_pin}")
return True
except Exception as e:
self.logger.error(f"GPIO setup failed: {e}")
return False
def ir_callback(self, channel):
"""GPIO callback for IR signal detection"""
try:
# Simple IR signal detection (this is a basic implementation)
# In a real implementation, you would decode the actual IR protocol
start_time = time.time()
# Wait for signal to stabilize
while GPIO.input(self.ir_pin) == GPIO.LOW:
if time.time() - start_time > 0.1: # Timeout after 100ms
break
time.sleep(0.001)
signal_duration = time.time() - start_time
# Basic IR code detection (this is simplified)
# Real implementation would decode specific IR protocols
ir_code = self.detect_ir_code(signal_duration)
if ir_code:
self.ir_queue.put(ir_code)
except Exception as e:
self.logger.error(f"IR callback error: {e}")
def detect_ir_code(self, duration: float) -> Optional[str]:
"""Detect IR code from signal duration (simplified implementation)"""
# This is a simplified implementation
# Real IR decoding would analyze pulse patterns
if duration < 0.01: # Very short pulse
return "0"
elif duration < 0.02: # Short pulse
return "1"
elif duration < 0.03: # Medium pulse
return "2"
elif duration < 0.04: # Long pulse
return "3"
elif duration < 0.05: # Very long pulse
return "4"
else:
return None
def process_ir_commands(self):
"""Process IR commands from queue"""
while self.running:
try:
if not self.ir_queue.empty():
ir_code = self.ir_queue.get(timeout=0.1)
self.handle_ir_command(ir_code)
else:
time.sleep(0.01)
except queue.Empty:
continue
except Exception as e:
self.logger.error(f"Error processing IR command: {e}")
def handle_ir_command(self, ir_code: str):
"""Handle IR remote commands"""
command = self.ir_codes.get(ir_code)
if not command:
self.logger.debug(f"Unknown IR code: {ir_code}")
return
self.logger.info(f"IR Command: {command}")
if command.startswith('channel_'):
# Handle channel selection
channel_num = int(command.split('_')[1])
self.handle_channel_input(channel_num)
elif command == 'play_pause':
self.toggle_play_pause()
elif command == 'stop':
self.stop_playback()
elif command == 'next_channel':
self.next_channel()
elif command == 'prev_channel':
self.prev_channel()
elif command == 'volume_up':
self.volume_up()
elif command == 'volume_down':
self.volume_down()
elif command == 'power_toggle':
self.power_toggle()
def handle_channel_input(self, digit: int):
"""Handle multi-digit channel input"""
current_time = time.time()
# Reset input if too much time has passed
if current_time - self.last_digit_time > self.multi_digit_timeout:
self.channel_input = ""
self.channel_input += str(digit)
self.last_digit_time = current_time
# Wait for more digits or timeout
threading.Timer(self.multi_digit_timeout, self.process_channel_input).start()
def process_channel_input(self):
"""Process complete channel input"""
if not self.channel_input:
return
try:
channel_number = int(self.channel_input)
if channel_number in self.channels:
self.play_channel(channel_number)
else:
self.logger.warning(f"Invalid channel number: {channel_number}")
except ValueError:
self.logger.warning(f"Invalid channel input: {self.channel_input}")
self.channel_input = ""
def toggle_play_pause(self):
"""Toggle play/pause"""
if self.vlc_player:
if self.vlc_player.is_playing():
self.vlc_player.pause()
else:
self.vlc_player.play()
def stop_playback(self):
"""Stop playback"""
if self.vlc_player:
self.vlc_player.stop()
def next_channel(self):
"""Go to next channel"""
if self.current_channel and self.current_channel < max(self.channels.keys()):
self.play_channel(self.current_channel + 1)
def prev_channel(self):
"""Go to previous channel"""
if self.current_channel and self.current_channel > min(self.channels.keys()):
self.play_channel(self.current_channel - 1)
def volume_up(self):
"""Increase volume"""
if self.vlc_player:
current_volume = self.vlc_player.audio_get_volume()
self.vlc_player.audio_set_volume(min(100, current_volume + 10))
def volume_down(self):
"""Decrease volume"""
if self.vlc_player:
current_volume = self.vlc_player.audio_get_volume()
self.vlc_player.audio_set_volume(max(0, current_volume - 10))
def power_toggle(self):
"""Toggle power (exit application)"""
self.logger.info("Power toggle - shutting down")
self.running = False
def start(self, startup_mode: str = "default", channel_number: int = None, video_index: int = None):
"""Start the video player with specified mode
Args:
startup_mode: "default", "random", "channel", or "index"
channel_number: Channel number to play (for channel mode)
video_index: Video index to play (for index mode)
"""
self.logger.info(f"Starting video player in {startup_mode} mode...")
self.running = True
# Initialize VLC
if not self.initialize_vlc():
self.logger.error("Failed to initialize VLC")
return False
# Setup GPIO
if not self.setup_gpio():
self.logger.error("Failed to setup GPIO")
return False
# Load or create channels
if not self.load_channel_mapping():
self.create_channels()
if not self.channels:
self.logger.error("No video files found")
return False
# Start IR processing thread
ir_thread = threading.Thread(target=self.process_ir_commands, daemon=True)
ir_thread.start()
# Play based on startup mode
success = False
if startup_mode == "random":
success = self.play_random_video()
elif startup_mode == "channel" and channel_number is not None:
success = self.play_channel(channel_number)
elif startup_mode == "index" and video_index is not None:
success = self.play_video_by_index(video_index)
else: # default mode
if self.default_channel in self.channels:
success = self.play_channel(self.default_channel)
else:
# Play first available channel
first_channel = min(self.channels.keys())
success = self.play_channel(first_channel)
if success:
self.logger.info("Video player started successfully")
else:
self.logger.error("Failed to start video playback")
return success
def run(self, startup_mode: str = "default", channel_number: int = None, video_index: int = None):
"""Main run loop"""
if not self.start(startup_mode, channel_number, video_index):
return
try:
while self.running:
# Check if VLC is still running
if self.vlc_player and not self.vlc_player.is_playing():
# If no video is playing and we have a current channel, restart it
if self.current_channel:
self.logger.info("Video stopped, restarting current channel")
self.play_channel(self.current_channel)
time.sleep(1)
except KeyboardInterrupt:
self.logger.info("Received keyboard interrupt")
except Exception as e:
self.logger.error(f"Unexpected error in main loop: {e}")
finally:
self.cleanup()
def cleanup(self):
"""Cleanup resources"""
self.logger.info("Cleaning up...")
self.running = False
if self.vlc_player:
self.vlc_player.stop()
GPIO.cleanup()
self.logger.info("Cleanup complete")
def parse_arguments():
"""Parse command line arguments"""
parser = argparse.ArgumentParser(
description="Raspberry Pi Video Player with IR Remote Control",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
python3 video_player.py # Start with default channel
python3 video_player.py --random # Start with random video
python3 video_player.py --channel 5 # Start with channel 5
python3 video_player.py --index 2 # Start with video at index 2 (0-based)
python3 video_player.py --list-channels # List available channels
python3 video_player.py --list-videos # List available videos with indices
"""
)
# Startup mode arguments (mutually exclusive)
mode_group = parser.add_mutually_exclusive_group()
mode_group.add_argument(
'--random',
action='store_true',
help='Start with a random video'
)
mode_group.add_argument(
'--channel',
type=int,
metavar='N',
help='Start with specific channel number'
)
mode_group.add_argument(
'--index',
type=int,
metavar='N',
help='Start with video at specific index (0-based)'
)
# Information arguments
parser.add_argument(
'--list-channels',
action='store_true',
help='List available channels and exit'
)
parser.add_argument(
'--list-videos',
action='store_true',
help='List available videos with indices and exit'
)
# Configuration arguments
parser.add_argument(
'--config',
type=str,
default='config.json',
help='Path to configuration file (default: config.json)'
)
return parser.parse_args()
def list_channels(player):
"""List available channels"""
if not player.channels:
print("No channels available")
return
print("Available channels:")
print("-" * 50)
for channel_num in sorted(player.channels.keys()):
channel = player.channels[channel_num]
print(f"Channel {channel_num}: {channel['name']}")
print(f" Path: {channel['path']}")
print()
def list_videos(player):
"""List available videos with indices"""
video_files = player.scan_video_folder()
if not video_files:
print("No video files found")
return
print("Available videos (with indices):")
print("-" * 50)
for i, video_file in enumerate(video_files):
print(f"Index {i}: {video_file.name}")
print(f" Path: {video_file}")
print()
def main():
"""Main entry point"""
args = parse_arguments()
# Check if running as root (needed for GPIO access)
if os.geteuid() != 0:
print("This script must be run as root for GPIO access")
print("Use: sudo python3 video_player.py")
sys.exit(1)
# Check if another instance is running
for proc in psutil.process_iter(['pid', 'name', 'cmdline']):
try:
if 'video_player.py' in ' '.join(proc.info['cmdline'] or []) and proc.info['pid'] != os.getpid():
print("Another instance of video_player.py is already running")
sys.exit(1)
except (psutil.NoSuchProcess, psutil.AccessDenied):
continue
# Create video player
player = VideoPlayer(args.config)
# Handle list commands
if args.list_channels:
# Load channels first
if not player.load_channel_mapping():
player.create_channels()
list_channels(player)
return
if args.list_videos:
list_videos(player)
return
# Determine startup mode and parameters
startup_mode = "default"
channel_number = None
video_index = None
if args.random:
startup_mode = "random"
elif args.channel is not None:
startup_mode = "channel"
channel_number = args.channel
elif args.index is not None:
startup_mode = "index"
video_index = args.index
# Run video player
player.run(startup_mode, channel_number, video_index)
if __name__ == "__main__":
main()