#!/usr/bin/env python3 """ Channel Change Command Line Tool Allows changing channels in the running video player service """ import os import sys import json import argparse import socket import time import signal from pathlib import Path class ChannelController: """Controller for changing channels in the video player service""" def __init__(self, socket_path="/tmp/video_player_control.sock"): self.socket_path = socket_path self.config_path = "config.json" self.config = self.load_config() def load_config(self): """Load configuration from JSON file""" try: if os.path.exists(self.config_path): with open(self.config_path, 'r') as f: return json.load(f) else: return {} except Exception as e: print(f"Error loading config: {e}") return {} def send_command(self, command): """Send command to video player service via Unix socket""" try: # Create Unix socket connection sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) sock.settimeout(5.0) # 5 second timeout # Connect to the video player service sock.connect(self.socket_path) # Send command command_data = json.dumps(command).encode('utf-8') sock.send(command_data) # Receive response response_data = sock.recv(1024).decode('utf-8') response = json.loads(response_data) sock.close() return response except FileNotFoundError: print("Error: Video player service is not running or control socket not found") print("Make sure the video player service is started") return {"success": False, "error": "Service not running"} except ConnectionRefusedError: print("Error: Cannot connect to video player service") print("The service may not be running or may not support control commands") return {"success": False, "error": "Connection refused"} except Exception as e: print(f"Error communicating with video player service: {e}") return {"success": False, "error": str(e)} def change_channel(self, channel_number=None): """Change to specified channel or random if not specified""" if channel_number is None: command = {"action": "random_channel"} print("Changing to random channel...") else: command = {"action": "change_channel", "channel": channel_number} print(f"Changing to channel {channel_number}...") response = self.send_command(command) if response.get("success"): if channel_number is None: actual_channel = response.get("channel") channel_name = response.get("channel_name", "Unknown") print(f"Successfully changed to random channel {actual_channel}: {channel_name}") else: channel_name = response.get("channel_name", "Unknown") print(f"Successfully changed to channel {channel_number}: {channel_name}") else: error = response.get("error", "Unknown error") print(f"Failed to change channel: {error}") return False return True def list_channels(self): """List available channels""" command = {"action": "list_channels"} response = self.send_command(command) if response.get("success"): channels = response.get("channels", {}) if channels: print("Available channels:") print("-" * 50) for channel_num in sorted(channels.keys()): channel = channels[channel_num] print(f"Channel {channel_num}: {channel['name']}") else: print("No channels available") else: error = response.get("error", "Unknown error") print(f"Failed to list channels: {error}") def get_current_channel(self): """Get current playing channel""" command = {"action": "get_current_channel"} response = self.send_command(command) if response.get("success"): channel = response.get("channel") channel_name = response.get("channel_name", "Unknown") if channel: print(f"Currently playing: Channel {channel} - {channel_name}") else: print("No channel currently playing") else: error = response.get("error", "Unknown error") print(f"Failed to get current channel: {error}") def stop_service(self): """Stop the video player service""" command = {"action": "stop"} response = self.send_command(command) if response.get("success"): print("Video player service stopped") else: error = response.get("error", "Unknown error") print(f"Failed to stop service: {error}") def parse_arguments(): """Parse command line arguments""" parser = argparse.ArgumentParser( description="Change channels in the video player service", formatter_class=argparse.RawDescriptionHelpFormatter, epilog=""" Examples: python3 change_channel.py # Change to random channel python3 change_channel.py 5 # Change to channel 5 python3 change_channel.py --list # List available channels python3 change_channel.py --current # Show current channel python3 change_channel.py --stop # Stop video player service """ ) parser.add_argument( 'channel', nargs='?', type=int, help='Channel number to change to (if not specified, changes to random channel)' ) parser.add_argument( '--list', action='store_true', help='List available channels' ) parser.add_argument( '--current', action='store_true', help='Show current playing channel' ) parser.add_argument( '--stop', action='store_true', help='Stop the video player service' ) parser.add_argument( '--socket', type=str, default='/tmp/video_player_control.sock', help='Path to control socket (default: /tmp/video_player_control.sock)' ) return parser.parse_args() def main(): """Main entry point""" args = parse_arguments() controller = ChannelController(args.socket) if args.list: controller.list_channels() elif args.current: controller.get_current_channel() elif args.stop: controller.stop_service() else: # Change channel (random if not specified) controller.change_channel(args.channel) if __name__ == "__main__": main()