Files
rpi-tulivision/change_channel.py
2025-09-25 18:43:27 +02:00

208 lines
7.0 KiB
Python
Executable File

#!/usr/bin/env python3
"""
Channel Change Command Line Tool
Allows changing channels in the running video player service
"""
import os
import sys
import json
import argparse
import socket
import time
import signal
from pathlib import Path
class ChannelController:
"""Controller for changing channels in the video player service"""
def __init__(self, socket_path="/tmp/video_player_control.sock"):
self.socket_path = socket_path
self.config_path = "config.json"
self.config = self.load_config()
def load_config(self):
"""Load configuration from JSON file"""
try:
if os.path.exists(self.config_path):
with open(self.config_path, 'r') as f:
return json.load(f)
else:
return {}
except Exception as e:
print(f"Error loading config: {e}")
return {}
def send_command(self, command):
"""Send command to video player service via Unix socket"""
try:
# Create Unix socket connection
sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
sock.settimeout(5.0) # 5 second timeout
# Connect to the video player service
sock.connect(self.socket_path)
# Send command
command_data = json.dumps(command).encode('utf-8')
sock.send(command_data)
# Receive response
response_data = sock.recv(1024).decode('utf-8')
response = json.loads(response_data)
sock.close()
return response
except FileNotFoundError:
print("Error: Video player service is not running or control socket not found")
print("Make sure the video player service is started")
return {"success": False, "error": "Service not running"}
except ConnectionRefusedError:
print("Error: Cannot connect to video player service")
print("The service may not be running or may not support control commands")
return {"success": False, "error": "Connection refused"}
except Exception as e:
print(f"Error communicating with video player service: {e}")
return {"success": False, "error": str(e)}
def change_channel(self, channel_number=None):
"""Change to specified channel or random if not specified"""
if channel_number is None:
command = {"action": "random_channel"}
print("Changing to random channel...")
else:
command = {"action": "change_channel", "channel": channel_number}
print(f"Changing to channel {channel_number}...")
response = self.send_command(command)
if response.get("success"):
if channel_number is None:
actual_channel = response.get("channel")
channel_name = response.get("channel_name", "Unknown")
print(f"Successfully changed to random channel {actual_channel}: {channel_name}")
else:
channel_name = response.get("channel_name", "Unknown")
print(f"Successfully changed to channel {channel_number}: {channel_name}")
else:
error = response.get("error", "Unknown error")
print(f"Failed to change channel: {error}")
return False
return True
def list_channels(self):
"""List available channels"""
command = {"action": "list_channels"}
response = self.send_command(command)
if response.get("success"):
channels = response.get("channels", {})
if channels:
print("Available channels:")
print("-" * 50)
for channel_num in sorted(channels.keys()):
channel = channels[channel_num]
print(f"Channel {channel_num}: {channel['name']}")
else:
print("No channels available")
else:
error = response.get("error", "Unknown error")
print(f"Failed to list channels: {error}")
def get_current_channel(self):
"""Get current playing channel"""
command = {"action": "get_current_channel"}
response = self.send_command(command)
if response.get("success"):
channel = response.get("channel")
channel_name = response.get("channel_name", "Unknown")
if channel:
print(f"Currently playing: Channel {channel} - {channel_name}")
else:
print("No channel currently playing")
else:
error = response.get("error", "Unknown error")
print(f"Failed to get current channel: {error}")
def stop_service(self):
"""Stop the video player service"""
command = {"action": "stop"}
response = self.send_command(command)
if response.get("success"):
print("Video player service stopped")
else:
error = response.get("error", "Unknown error")
print(f"Failed to stop service: {error}")
def parse_arguments():
"""Parse command line arguments"""
parser = argparse.ArgumentParser(
description="Change channels in the video player service",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
python3 change_channel.py # Change to random channel
python3 change_channel.py 5 # Change to channel 5
python3 change_channel.py --list # List available channels
python3 change_channel.py --current # Show current channel
python3 change_channel.py --stop # Stop video player service
"""
)
parser.add_argument(
'channel',
nargs='?',
type=int,
help='Channel number to change to (if not specified, changes to random channel)'
)
parser.add_argument(
'--list',
action='store_true',
help='List available channels'
)
parser.add_argument(
'--current',
action='store_true',
help='Show current playing channel'
)
parser.add_argument(
'--stop',
action='store_true',
help='Stop the video player service'
)
parser.add_argument(
'--socket',
type=str,
default='/tmp/video_player_control.sock',
help='Path to control socket (default: /tmp/video_player_control.sock)'
)
return parser.parse_args()
def main():
"""Main entry point"""
args = parse_arguments()
controller = ChannelController(args.socket)
if args.list:
controller.list_channels()
elif args.current:
controller.get_current_channel()
elif args.stop:
controller.stop_service()
else:
# Change channel (random if not specified)
controller.change_channel(args.channel)
if __name__ == "__main__":
main()