Source code for bot_api.internal.websocket_handler

import asyncio
import json
import threading
import time
from typing import Any, Optional, Dict, TYPE_CHECKING
import websockets

if TYPE_CHECKING:
    from .base_bot_internals import BaseBotInternals

from ..base_bot_abc import BaseBotABC
from ..bot_info import BotInfo
from ..events import (
    ConnectedEvent,
    DisconnectedEvent,
    ConnectionErrorEvent,
    GameStartedEvent,
    GameEndedEvent,
    RoundStartedEvent,
    RoundEndedEvent,
)
from .bot_event_handlers import BotEventHandlers
from .internal_event_handlers import InternalEventHandlers
from .json_util import to_json, from_json
from ..initial_position import InitialPosition
from ..bot_exception import BotException
from ..mapper.event_mapper import EventMapper
from ..mapper.game_setup_mapper import GameSetupMapper
from ..mapper.results_mapper import ResultsMapper
from robocode_tank_royale.schema import (
    Message,
    TickEventForBot,
    BotReady,
    GameStartedEventForBot,
    GameEndedEventForBot,
    RoundEndedEventForBot,
    SkippedTurnEvent,
    ServerHandshake,
    RoundStartedEvent as RoundStartedEventForBot,
)


[docs] class WebSocketHandler: """ Websocket handler for Robocode Tank Royale Bot API that handles websocket connections and messages from the server. """
[docs] def __init__( self, base_bot_internals: "BaseBotInternals", server_url: str, server_secret: Optional[str], base_bot: BaseBotABC, bot_info: BotInfo, bot_event_handlers: BotEventHandlers, internal_event_handlers: InternalEventHandlers, closed_event: threading.Event, event_queue: 'EventQueue', ): """Initialize the websocket handler.""" self.base_bot_internals = base_bot_internals self.server_url = server_url self.server_secret: Optional[str] = server_secret self.base_bot = base_bot self.bot_info = bot_info self.bot_event_handlers = bot_event_handlers self.internal_event_handlers = internal_event_handlers self.closed_event = closed_event self.event_queue = event_queue self.websocket: None | websockets.ClientConnection = None
[docs] async def connect(self): """Connect to the WebSocket server.""" try: self.websocket = await websockets.connect(self.server_url) # Publish connected event self.bot_event_handlers.on_connected.publish( ConnectedEvent(self.server_url) ) return self.websocket except Exception as e: self.bot_event_handlers.on_connection_error.publish( ConnectionErrorEvent(self.server_url, e) ) self.closed_event.set() raise
[docs] async def disconnect(self, code: int = 1000, reason: str = ""): """Disconnect from the WebSocket server.""" if self.websocket: await self.websocket.close(code, reason)
[docs] async def on_close( self, websocket: websockets.ClientConnection, code: int, reason: str ) -> None: """Handle WebSocket close event.""" # Unused parameter, but kept for compatibility # Publish to both event handlers disconnected_event = DisconnectedEvent(self.server_url, True, code, reason) self.bot_event_handlers.on_disconnected.publish(disconnected_event) self.internal_event_handlers.on_disconnected.publish(disconnected_event) self.closed_event.set()
[docs] async def on_error(self, websocket: websockets.ClientConnection, error: Exception): """Handle WebSocket error.""" del websocket # Unused parameter, but kept for compatibility self.bot_event_handlers.on_connection_error.publish( ConnectionErrorEvent(self.server_url, error) ) self.closed_event.set()
[docs] async def receive_messages(self): """Main loop for receiving messages from the WebSocket server.""" assert self.websocket is not None, "WebSocket connection is not established." try: async for message in self.websocket: if isinstance(message, bytes): message = message.decode("utf-8") assert isinstance(message, str), "Received message is not a string." await self.process_message(message) except websockets.exceptions.ConnectionClosed as e: assert e.rcvd is not None, "ConnectionClosed without received data." await self.on_close(self.websocket, e.rcvd.code, e.rcvd.reason) except Exception as e: print(f'Unexpected error: {e}') await self.on_error(self.websocket, e)
[docs] async def process_message(self, message: str): """Process the received WebSocket message.""" json_msg = json.loads(message) if "type" in json_msg: msg_type = json_msg["type"] if msg_type == "TickEventForBot": await self.handle_tick(json_msg) elif msg_type == "RoundStartedEvent": await self.handle_round_started(json_msg) elif msg_type == "RoundEndedEventForBot": await self.handle_round_ended(json_msg) elif msg_type == "GameStartedEventForBot": await self.handle_game_started(json_msg) elif msg_type == "GameEndedEventForBot": await self.handle_game_ended(json_msg) elif msg_type == "SkippedTurnEvent": await self.handle_skipped_turn(json_msg) elif msg_type == "ServerHandshake": await self.handle_server_handshake(json_msg) elif msg_type == "GameAbortedEvent": await self.handle_game_aborted() else: raise BotException(f"Unsupported WebSocket message type: {msg_type}")
def _is_event_handling_disabled(self, current_turn: int) -> bool: disabled_turn = self.base_bot_internals.event_handling_disabled_turn return disabled_turn != 0 and disabled_turn < (int(current_turn) - 1)
[docs] async def handle_tick(self, json_msg: Dict[Any, Any]) -> None: """Handle a tick event from the server.""" # Determine turn number early to apply correct disabled-handling semantics turn_number = json_msg.get("turn_number") or json_msg.get("turnNumber") if turn_number is not None and self._is_event_handling_disabled(int(turn_number)): return self.base_bot_internals.tick_start_nano_time = time.monotonic_ns() tick_event_for_bot: TickEventForBot = from_json(json_msg) # type: ignore mapped_tick_event = EventMapper.map_tick_event( tick_event_for_bot, self.base_bot ) self.base_bot_internals.tick_event = mapped_tick_event # Stage events from this tick into the event queue (Java parity) self.event_queue.add_events_from_tick(mapped_tick_event) # mapped_tick_event.events should still be iterable for event in mapped_tick_event.events: self.internal_event_handlers.fire_event(event) # Trigger next turn (not tick-event!) self.internal_event_handlers.on_next_turn.publish(mapped_tick_event)
[docs] async def handle_round_started(self, json_msg: Dict[Any, Any]) -> None: """Handle a round started event from the server.""" schema_evt: RoundStartedEventForBot = from_json(json_msg) # type: ignore round_started_event = RoundStartedEvent(schema_evt.round_number) self.internal_event_handlers.on_round_started.publish(round_started_event) self.bot_event_handlers.on_round_started.publish(round_started_event)
[docs] async def handle_round_ended(self, json_msg: Dict[Any, Any]): """Handle a round ended event from the server.""" schema_evt: RoundEndedEventForBot = from_json(json_msg) # type: ignore results = ResultsMapper.map(schema_evt.results) round_ended_event = RoundEndedEvent( schema_evt.round_number, schema_evt.turn_number, results ) self.bot_event_handlers.on_round_ended.publish(round_ended_event) self.internal_event_handlers.on_round_ended.publish(round_ended_event) # triggers stop_thread() # Dispatch any queued events (e.g. WonRoundEvent from the last tick). Bot thread is now # stopped so there is no concurrent dispatch race. Must run before ROUND_STARTED clears # the event queue. self.event_queue.dispatch_events(schema_evt.turn_number) # Transfer any remaining stdout/stderr from event handlers (e.g. on_won_round) before the round ends self._transfer_std_out_to_bot_intent()
[docs] async def handle_game_started(self, json_msg: Dict[Any, Any]) -> None: """Handle a game started event from the server.""" assert self.websocket is not None, "WebSocket connection is not established." game_started_event: GameStartedEventForBot = from_json(json_msg) # type: ignore self.base_bot_internals.my_id = game_started_event.my_id if game_started_event.teammate_ids is not None: self.base_bot_internals.teammate_ids = set( id for id in game_started_event.teammate_ids if id is not None ) self.base_bot_internals.game_setup = GameSetupMapper.map( game_started_event.game_setup ) initial_position = InitialPosition( game_started_event.start_x, game_started_event.start_y, game_started_event.start_direction, ) self.base_bot_internals.initial_position = initial_position self.bot_event_handlers.on_game_started.publish( GameStartedEvent( game_started_event.my_id, initial_position, self.base_bot_internals.game_setup, ) ) # Send ready signal await self.websocket.send(to_json(BotReady(type=Message.Type.BOT_READY)))
[docs] async def handle_game_ended(self, json_msg: Dict[Any, Any]) -> None: """Handle a game ended event from the server.""" schema_evt: GameEndedEventForBot = from_json(json_msg) # type: ignore game_ended_event = GameEndedEvent() game_ended_event.number_of_rounds = schema_evt.number_of_rounds game_ended_event.results = ResultsMapper.map(schema_evt.results) self.bot_event_handlers.on_game_ended.publish(game_ended_event) self.internal_event_handlers.on_game_ended.publish(game_ended_event)
[docs] async def handle_game_aborted(self) -> None: """Handle a game aborted event from the server.""" self.bot_event_handlers.on_game_aborted.publish(None) self.internal_event_handlers.on_game_aborted.publish(None)
[docs] async def handle_skipped_turn(self, json_msg: Dict[Any, Any]) -> None: """Handle a skipped turn event from the server.""" schema_evt: SkippedTurnEvent = from_json(json_msg) # type: ignore skipped_turn_event = EventMapper.map_skipped_turn_event(schema_evt) self.event_queue.add_event(skipped_turn_event)
[docs] async def handle_server_handshake(self, json_msg: Dict[Any, Any]) -> None: """Handle a server handshake from the server.""" assert self.websocket is not None, "WebSocket connection is not established." server_handshake: ServerHandshake = from_json(json_msg) # type: ignore self.base_bot_internals.server_handshake = server_handshake # Validate bot info before sending bot handshake self._validate_bot_info() # Reply by sending bot handshake # Infer droid status by marker interface inheritance (Java parity), with fallback to explicit flag for backward compatibility try: from ..droid_abc import DroidABC # type: ignore except Exception: DroidABC = None # type: ignore is_droid: bool = False if 'DroidABC' in locals() and DroidABC is not None and isinstance(self.base_bot, DroidABC): # type: ignore is_droid = True elif hasattr(self.base_bot, "is_droid"): # Allow legacy bots explicitly setting the flag is_droid = bool(getattr(self.base_bot, "is_droid")) assert isinstance(is_droid, bool), "is_droid must be a boolean value" # Create bot handshake message from ..internal.bot_handshake_factory import BotHandshakeFactory bot_handshake = BotHandshakeFactory.create( server_handshake.session_id, self.bot_info, is_droid, self.server_secret ) # Send handshake message # Ensure backward compatibility for tests expecting 'session_id' (snake_case) payload_str = to_json(bot_handshake) try: payload = json.loads(payload_str) if "sessionId" in payload and "session_id" not in payload: payload["session_id"] = payload["sessionId"] await self.websocket.send(json.dumps(payload)) except Exception: # Fallback to original payload if any unexpected error occurs await self.websocket.send(payload_str)
def _transfer_std_out_to_bot_intent(self) -> None: """Transfer captured stdout/stderr to bot intent for sending to server.""" if self.base_bot_internals.recording_stdout: output = self.base_bot_internals.recording_stdout.read_next() if output: self.base_bot_internals.bot_intent.std_out = output else: self.base_bot_internals.bot_intent.std_out = None if self.base_bot_internals.recording_stderr: error = self.base_bot_internals.recording_stderr.read_next() if error: self.base_bot_internals.bot_intent.std_err = error else: self.base_bot_internals.bot_intent.std_err = None def _validate_bot_info(self) -> None: """Validate bot info before sending handshake to server.""" if self._is_blank(self.bot_info.name): self._throw_missing_property_exception("name") if self._is_blank(self.bot_info.version): self._throw_missing_property_exception("version") if not self.bot_info.authors or self._is_all_blank(self.bot_info.authors): self._throw_missing_property_exception("authors") def _throw_missing_property_exception(self, property_name: str) -> None: """Throw a BotException for a missing required property.""" raise BotException( f"Required bot property '{property_name}' is missing. " f"This property is required in order for the bot to be recognized when booting it up and " f"when it needs to join the game. You must set this property in your bot code " f"or provide a .json configuration file." ) def _is_blank(self, s: Optional[str]) -> bool: """Check if a string is None or whitespace-only.""" return s is None or not s.strip() def _is_all_blank(self, strings: list[str]) -> bool: """Check if all strings in a list are blank.""" return all(self._is_blank(s) for s in strings)