Source code for bot_api.internal.base_bot_internals

import asyncio
import urllib.parse
import traceback
import time
import math
import os
import sys
import threading
from typing import Any, Optional, Set, Sequence

from ..base_bot_abc import BaseBotABC
from ..bot_abc import BotABC
from ..bot_exception import BotException
from ..bot_info import BotInfo
from ..bullet_state import BulletState
from ..events import TickEvent, BulletFiredEvent, RoundStartedEvent, BotEvent
from ..events.condition import Condition
from ..game_setup import GameSetup
from ..initial_position import InitialPosition
from ..util.math_util import MathUtil
from ..graphics import Color, GraphicsABC, SvgGraphics

from .bot_event_handlers import BotEventHandlers
from .event_queue import EventQueue
from .env_vars import EnvVars
from .intent_validator import IntentValidator
from .internal_event_handlers import InternalEventHandlers
from .json_util import to_json
from .recording_text_writer import RecordingTextWriter
from .stop_resume_listener_abs import StopResumeListenerABC
from .thread_interrupted_exception import ThreadInterruptedException
from .websocket_handler import WebSocketHandler

from ..constants import (
    MAX_SPEED,
    MAX_TURN_RATE,
    MAX_GUN_TURN_RATE,
    MAX_RADAR_TURN_RATE,
    DECELERATION,
    ACCELERATION,
    MAX_NUMBER_OF_TEAM_MESSAGES_PER_TURN,
    TEAM_MESSAGE_MAX_SIZE,
)

from robocode_tank_royale.schema import Message, BotIntent, ServerHandshake, TeamMessage

DEFAULT_SERVER_URL = "ws://localhost:7654"

GAME_NOT_RUNNING_MSG = (
    "Game is not running. Make sure onGameStarted() event handler has been called first"
)
TICK_NOT_AVAILABLE_MSG = "Game is not running or tick has not occurred yet. Make sure onTick() event handler has been called first"
NOT_CONNECTED_TO_SERVER_MSG = "Not connected to a game server. Make sure onConnected() event handler has been called first"


[docs] class BaseBotInternals: def __init__( self, base_bot: BaseBotABC, bot_info: Optional[BotInfo], server_url: Optional[str], server_secret: Optional[str], ): self.base_bot = base_bot if bot_info is None: self.bot_info = EnvVars.get_bot_info() else: self.bot_info = bot_info self.server_url = ( server_url if server_url is not None else self._get_server_url_from_setting() ) self.server_secret = ( server_secret if server_secret is not None else self._get_server_secret_from_setting() ) self.bot_intent: BotIntent = BotIntent( type=Message.Type.BOT_INTENT, team_messages=[] ) self._my_id: Optional[int] = None self._teammate_ids: Set[int] = set() self._game_setup: Optional[GameSetup] = None self._initial_position: Optional[InitialPosition] = None self._tick_event: Optional[TickEvent] = None self._tick_start_nano_time: int = 0 self._server_handshake: Optional[ServerHandshake] = None self._conditions: Set[Condition] = set() self._is_running_atomic: bool = False self._event_handling_disabled_turn: int = 0 self.graphics_state: GraphicsABC = SvgGraphics() # Fields for set_stop / set_resume self.is_stopped: bool = False self.saved_target_speed: Optional[float] = None self.saved_turn_rate: Optional[float] = None self.saved_gun_turn_rate: Optional[float] = None self.saved_radar_turn_rate: Optional[float] = None # Flag set when the current event handler was interrupted by a new event self.was_current_event_interrupted: bool = False # Recording writers for capturing stdout/stderr self.recording_stdout: Optional[object] = None self.recording_stderr: Optional[object] = None self.bot_event_handlers: BotEventHandlers = BotEventHandlers(base_bot) self.internal_event_handlers: InternalEventHandlers = InternalEventHandlers() self.event_queue: EventQueue = EventQueue(self, self.bot_event_handlers) self.closed_event: threading.Event = threading.Event() self.socket: Optional[Any] = None # To store the WebSocket connection self.web_socket_handler: Optional[WebSocketHandler] = None self._next_turn_condition: threading.Condition = threading.Condition() # WebSocket background thread with async event loop self._ws_thread: Optional[threading.Thread] = None self._ws_loop: Optional[asyncio.AbstractEventLoop] = None self._ws_loop_ready_event: threading.Event = threading.Event() # Bot thread (runs bot.run() and bot.go()) self.thread: Optional[threading.Thread] = None self.stop_resume_listener: Optional[StopResumeListenerABC] = None self.max_speed: float = MAX_SPEED self.max_turn_rate: float = MAX_TURN_RATE self.max_gun_turn_rate: float = MAX_GUN_TURN_RATE self.max_radar_turn_rate: float = MAX_RADAR_TURN_RATE self.last_execute_turn_number: int = -1 # Movement reset deferral flag (mirrors Java/.NET movementResetPending) self._movement_reset_pending: bool = False # Recording text writers for capturing stdout/stderr self._recording_stdout: Optional[RecordingTextWriter] = None self._recording_stderr: Optional[RecordingTextWriter] = None self._init() def _get_server_url_from_setting(self) -> str: url = os.getenv("SERVER_URL", os.getenv("SERVER_URL")) if url is None: url = DEFAULT_SERVER_URL return url def _get_server_secret_from_setting(self) -> Optional[str]: return os.getenv("SERVER_SECRET", os.getenv("SERVER_SECRET")) def _init(self) -> None: self._redirect_stdout_and_stderr() self._subscribe_to_events() def _redirect_stdout_and_stderr(self) -> None: """Redirect stdout and stderr to recording writers for sending to server.""" self._recording_stdout = RecordingTextWriter(sys.stdout) self._recording_stderr = RecordingTextWriter(sys.stderr) # Store in data so WebSocketHandler can access them self.recording_stdout = self._recording_stdout self.recording_stderr = self._recording_stderr sys.stdout = self._recording_stdout sys.stderr = self._recording_stderr def _subscribe_to_events(self) -> None: self.internal_event_handlers.on_round_started.subscribe( self._on_round_started, 100 ) self.internal_event_handlers.on_next_turn.subscribe(self._on_next_turn, 100) self.internal_event_handlers.on_bullet_fired.subscribe( self._on_bullet_fired, 100 ) def _on_round_started(self, event: RoundStartedEvent) -> None: """Handle round started event (matches Java's onRoundStarted)""" self.tick_event = None # Defer movement reset until after first intent has been sent, mirroring Java/.NET behavior if not hasattr(self, "_movement_reset_pending"): self._movement_reset_pending = False self._movement_reset_pending = True self.event_queue.clear() # Clears conditions in self.conditions self.is_stopped = False self.event_handling_disabled_turn = 0 self.last_execute_turn_number = -1 def _on_next_turn(self, event: TickEvent) -> None: """Handle next turn event - unblock waiting threads (matches Java's onNextTurn)""" with self._next_turn_condition: # Unblock methods waiting for the next turn self._next_turn_condition.notify_all() def _on_bullet_fired(self, event: BulletFiredEvent) -> None: """Handle bullet fired event (matches Java's onBulletFired)""" if self.bot_intent: # Reset firepower so the bot stops firing continuously self.bot_intent.firepower = 0.0 def _reset_movement(self) -> None: if self.bot_intent: self.bot_intent.turn_rate = None self.bot_intent.gun_turn_rate = None self.bot_intent.radar_turn_rate = None self.bot_intent.target_speed = None self.bot_intent.firepower = None @property def my_id(self) -> int: if self._my_id is None: raise BotException(GAME_NOT_RUNNING_MSG) return self._my_id @my_id.setter def my_id(self, value: int): self._my_id = value @property def teammate_ids(self) -> Set[int]: if self._my_id is None: raise BotException(GAME_NOT_RUNNING_MSG) return self._teammate_ids @teammate_ids.setter def teammate_ids(self, value: Set[int]): self._teammate_ids = value @property def game_setup(self) -> GameSetup: if self._game_setup is None: raise BotException(GAME_NOT_RUNNING_MSG) return self._game_setup @game_setup.setter def game_setup(self, value: GameSetup): self._game_setup = value @property def initial_position(self) -> Optional[InitialPosition]: return self._initial_position @initial_position.setter def initial_position(self, value: Optional[InitialPosition]): self._initial_position = value
[docs] def get_bot_intent(self) -> BotIntent: return self.bot_intent
@property def tick_event(self) -> Optional[TickEvent]: return self._tick_event @tick_event.setter def tick_event(self, value: Optional[TickEvent]): self._tick_event = value @property def current_tick_or_throw(self) -> TickEvent: if self._tick_event is None: raise BotException(TICK_NOT_AVAILABLE_MSG) return self._tick_event
[docs] def get_current_tick_or_throw(self) -> TickEvent: return self.current_tick_or_throw
[docs] def set_tick_event(self, tick_event: TickEvent) -> None: self.tick_event = tick_event
@property def current_tick_or_null(self) -> Optional[TickEvent]: return self._tick_event
[docs] def get_current_tick_or_null(self) -> Optional[TickEvent]: return self.current_tick_or_null
@property def tick_start_nano_time(self) -> int: return self._tick_start_nano_time @tick_start_nano_time.setter def tick_start_nano_time(self, value: int): self._tick_start_nano_time = value
[docs] def get_tick_start_nano_time(self) -> int: return self.tick_start_nano_time
[docs] def set_tick_start_nano_time(self, tick_start_nano_time: int) -> None: self.tick_start_nano_time = tick_start_nano_time
[docs] def get_time_left(self) -> int: if self.current_tick_or_null is None: return self.game_setup.turn_timeout passed_microseconds = ( time.monotonic_ns() - self.tick_start_nano_time ) // 1000 return max(0, self.game_setup.turn_timeout - passed_microseconds)
@property def event_handling_disabled_turn(self) -> int: return self._event_handling_disabled_turn @event_handling_disabled_turn.setter def event_handling_disabled_turn(self, value: int): self._event_handling_disabled_turn = value
[docs] def enable_event_handling(self, enable: bool) -> None: if enable: self.event_handling_disabled_turn = 0 else: # Ensure tick event is available before accessing turn_number current_tick = self.current_tick_or_null if current_tick: self.event_handling_disabled_turn = current_tick.turn_number else: # If called before any tick, disabling implies from turn 0. self.event_handling_disabled_turn = 0
[docs] def is_event_handling_disabled(self) -> bool: # Important! Allow an additional turn so events like RoundStarted can be handled current_tick = self.current_tick_or_null if not current_tick: return ( self.event_handling_disabled_turn != 0 ) # If no tick, rely purely on the flag return ( self.event_handling_disabled_turn != 0 and self.event_handling_disabled_turn < (current_tick.turn_number - 1) )
[docs] def get_events(self) -> Sequence[BotEvent]: turn_number = self.current_tick_or_throw.turn_number return self.event_queue.get_events(turn_number)
[docs] def clear_events(self) -> None: self.event_queue.clear_events()
# Public wrapper to enqueue events from a tick, aligning with Java BaseBotInternals
[docs] def add_events_from_tick(self, event: TickEvent) -> None: self.event_queue.add_events_from_tick(event)
[docs] def add_event(self, event: BotEvent) -> None: self.event_queue.add_event(event)
[docs] def set_interruptible(self, interruptible: bool) -> None: self.event_queue.set_current_event_interruptible(interruptible)
[docs] def dispatch_events(self, turn_number: int) -> None: try: self.event_queue.dispatch_events(turn_number) except BotException: # Suppress tick-unavailable errors during round transitions (bot is shutting down) if self.is_running(): traceback.print_exc() except Exception: # Align with Java: do not propagate interruptions from event handling traceback.print_exc()
[docs] def set_running(self, is_running: bool) -> None: self._is_running_atomic = is_running
[docs] def is_running(self) -> bool: return self._is_running_atomic
def _wait_until_first_tick_arrived(self) -> None: """Block the pre-warmed bot thread until the first tick of the round arrives. The thread is started at round-started (before any tick), so it must wait here before run() can safely read bot state. Notified by _on_next_turn() (priority 100) after BotInternals._on_first_turn() (priority 110) has already captured initial directions via _clear_remaining(). """ with self._next_turn_condition: while self.is_running() and self.current_tick_or_null is None: self._next_turn_condition.wait() # NOTE: Do NOT dispatch events here. Events are dispatched in go() → dispatch_events() # which is called from the first blocking bot method (forward, turn_left, etc.) in run(). # Pre-dispatching causes event handlers that call go() to send intents before run() has # set up state (colors, movement), and corrupts last_execute_turn_number for turn 1. def _create_runnable(self, bot: BotABC): """Create runnable function for bot thread (matches Java's createRunnable)""" def runnable(): self.set_running(True) try: self._wait_until_first_tick_arrived() bot.run() except ThreadInterruptedException: pass self._dispatch_final_turn_events() # Skip every turn after the run method has exited while self.is_running(): try: bot.go() except ThreadInterruptedException: break self._dispatch_final_turn_events() return runnable def _dispatch_final_turn_events(self) -> None: """Dispatch any remaining events from the current tick before the thread exits.""" tick = self.current_tick_or_null if tick is not None: self.dispatch_events(tick.turn_number)
[docs] def start_thread(self, bot: BotABC) -> None: """Start bot thread (matches Java's startThread)""" self.enable_event_handling(True) # reset on WebSocket thread — before new bot thread starts self.thread = threading.Thread(target=self._create_runnable(bot)) self.thread.start()
[docs] def stop_thread(self) -> None: """Stop bot thread (matches Java's stopThread)""" if not self.is_running(): return self.set_running(False) self.enable_event_handling(False) # disable on WebSocket thread — prevents new ticks from queuing after bot stops # Wake up any threads waiting on the next turn condition so they can see is_running=False with self._next_turn_condition: self._next_turn_condition.notify_all() thread = self.thread self.thread = None if thread is not None and thread is not threading.current_thread(): # Wait for the bot thread to finish so that handle_round_ended's # dispatch_events call does not race with _dispatch_final_turn_events. thread.join(timeout=5.0)
def _sanitize_url(self, uri: str) -> None: parsed_url = urllib.parse.urlparse(uri) if parsed_url.scheme not in ("ws", "wss"): raise BotException(f"Wrong scheme used with server URL: {uri}") async def _connect(self) -> None: self._sanitize_url(self.server_url) try: self.web_socket_handler = WebSocketHandler( # Store the handler instance self, # Pass BaseBotInternals instance self.server_url, self.server_secret, self.base_bot, # The bot instance itself (BaseBotABC) self.bot_info, # BotInfo needed by WebSocketHandler self.bot_event_handlers, # Event handlers needed by WebSocketHandler self.internal_event_handlers, # Event handlers needed by WebSocketHandler self.closed_event, # Threading event for signaling connection close self.event_queue, # Provide access to the shared EventQueue for staging events ) self.socket = await self.web_socket_handler.connect() except Exception as ex: raise BotException( f"Could not create web socket for URL: {self.server_url}", ex ) from ex
[docs] def start(self) -> None: """Start bot and block until game ends (matches Java's start)""" self._start_websocket_thread() self._connect_sync() self.closed_event.wait() # CRITICAL: Stop the WebSocket event loop when bot finishes self._stop_websocket_thread()
def _start_websocket_thread(self) -> None: """Start WebSocket background thread with async event loop""" def ws_thread_target(): self._ws_loop = asyncio.new_event_loop() asyncio.set_event_loop(self._ws_loop) self._ws_loop_ready_event.set() try: self._ws_loop.run_forever() finally: # Standard asyncio shutdown: cancel all tasks, shutdown executor, then close loop try: # Cancel all tasks pending = asyncio.all_tasks(self._ws_loop) for task in pending: task.cancel() # Wait for cancellation to complete if pending: self._ws_loop.run_until_complete( asyncio.gather(*pending, return_exceptions=True) ) # CRITICAL: Shutdown the default executor to prevent hanging self._ws_loop.run_until_complete(self._ws_loop.shutdown_default_executor()) except Exception: pass # Ignore errors during cleanup finally: self._ws_loop.close() self._ws_loop_ready_event.clear() self._ws_thread = threading.Thread(target=ws_thread_target, daemon=False) self._ws_thread.start() if not self._ws_loop_ready_event.wait(timeout=2.0): raise BotException("WebSocket event loop not started") def _stop_websocket_thread(self) -> None: """Stop WebSocket event loop and thread using proper asyncio shutdown""" if not self._ws_loop or self._ws_loop.is_closed(): return # Simply stop the loop - cleanup happens in the finally block try: self._ws_loop.call_soon_threadsafe(self._ws_loop.stop) except RuntimeError: pass # Wait for thread to finish (it will clean up all tasks) if self._ws_thread and self._ws_thread.is_alive(): self._ws_thread.join(timeout=3.0) def _connect_sync(self) -> None: """Connect to WebSocket server (synchronous wrapper)""" if self._ws_loop is None: raise BotException("WebSocket event loop not started") future = asyncio.run_coroutine_threadsafe(self._connect(), self._ws_loop) future.result() # Block until connection is established # Start receiving messages in the WebSocket thread if self.web_socket_handler: asyncio.run_coroutine_threadsafe( self.web_socket_handler.receive_messages(), self._ws_loop )
[docs] def execute(self, captured_turn_number: int) -> None: """Execute bot intent and wait for next turn. Args: captured_turn_number: The turn number captured by go() at the time events were dispatched, or -1 if no tick was available. """ # If no tick has been received yet, send current intent once to allow the server to progress if captured_turn_number < 0: self._send_intent() return if captured_turn_number != self.last_execute_turn_number: self.last_execute_turn_number = captured_turn_number # Events are dispatched from BaseBot.go(); staging happens on tick reception self._send_intent() if self._movement_reset_pending: self._reset_movement() self._movement_reset_pending = False self._wait_for_next_turn(captured_turn_number) # Dispatch events for the new turn *after* waiting, so that run() always reads state # that matches the events that just fired — matching Classic Robocode semantics. new_tick = self.current_tick_or_null if new_tick is not None: self.dispatch_events(new_tick.turn_number)
def _send_intent(self) -> None: """Send bot intent to server (synchronous)""" self._render_graphics_to_bot_intent() self._transfer_std_out_to_bot_intent() if self.socket and self._ws_loop: try: json_intent = to_json(self.bot_intent) # Send via WebSocket in the WebSocket thread asyncio.run_coroutine_threadsafe( self.socket.send(json_intent), self._ws_loop ) # Clear rescan flag after serializing — consumed by this intent if self.bot_intent.rescan: self.bot_intent.rescan = False # Clear team messages after sending intent (matches Java implementation) if self.bot_intent.team_messages: self.bot_intent.team_messages.clear() except Exception as e: print(f"Error sending bot intent: {e}") def _transfer_std_out_to_bot_intent(self) -> None: """Transfer captured stdout/stderr to bot intent for sending to server.""" if self._recording_stdout: output = self._recording_stdout.read_next() if output: self.bot_intent.std_out = output else: self.bot_intent.std_out = None if self._recording_stderr: error = self._recording_stderr.read_next() if error: self.bot_intent.std_err = error else: self.bot_intent.std_err = None def _render_graphics_to_bot_intent(self) -> None: current_tick = self.current_tick_or_null # Check if debugging is enabled for the bot in the current tick if ( current_tick and hasattr(current_tick, "bot_state") and current_tick.bot_state and hasattr(current_tick.bot_state, "debugging_enabled") and current_tick.bot_state.debugging_enabled ): svg_output = self.graphics_state.to_svg() self.bot_intent.debug_graphics = svg_output else: # Ensure it's not set if debugging is off or tick not available self.bot_intent.debug_graphics = None # Always clear the graphics state so previous frames don't accumulate self.graphics_state.clear() def _wait_for_next_turn(self, turn_number: int) -> None: """Wait for next turn (matches Java's waitForNextTurn)""" # Check if we're being called from the designated bot thread # If self.thread is None (test mode with no run() loop) or current thread doesn't match, # exit immediately - the intent was already sent in execute() before this call if self.thread is None or threading.current_thread() != self.thread: # In test mode or when called from wrong thread, just return without waiting # This allows tests to call go() from test threads without hanging return # Only wait if we're in the correct bot thread and bot is running with self._next_turn_condition: while self.is_running(): current_tick = self.current_tick_or_null if current_tick is None or current_tick.turn_number != turn_number: break try: # Use timeout to allow periodic check of is_running() flag # This makes the bot thread interruptible for clean shutdown self._next_turn_condition.wait(timeout=0.1) except InterruptedError: raise ThreadInterruptedException() def _stop_rogue_thread(self) -> None: """Stop rogue thread (matches Java's stopRogueThread)""" # This method is no longer called from _wait_for_next_turn # Kept for compatibility but effectively disabled pass
[docs] def set_fire(self, firepower: float) -> bool: """Set fire with given firepower. Matches Java's setFire() semantics exactly.""" IntentValidator.validate_firepower(firepower) # Match Java: use base_bot.energy and base_bot.gun_heat (which return 0 when no tick received) if self.base_bot.energy < firepower or self.base_bot.gun_heat > 0: return False # cannot fire yet self.bot_intent.firepower = firepower return True
[docs] def get_gun_heat(self) -> float: tick = self.current_tick_or_null return tick.bot_state.gun_heat if tick and tick.bot_state else 0.0
[docs] def get_speed(self) -> float: tick = self.current_tick_or_null return tick.bot_state.speed if tick and tick.bot_state else 0.0
@property def turn_rate(self) -> float: if self.bot_intent.turn_rate is not None: return self.bot_intent.turn_rate tick = self.current_tick_or_null return tick.bot_state.turn_rate if tick and tick.bot_state else 0.0 @turn_rate.setter def turn_rate(self, turn_rate: float) -> None: self.bot_intent.turn_rate = IntentValidator.validate_turn_rate( turn_rate, self.max_turn_rate ) @property def gun_turn_rate(self) -> float: if self.bot_intent.gun_turn_rate is not None: return self.bot_intent.gun_turn_rate tick = self.current_tick_or_null return tick.bot_state.gun_turn_rate if tick and tick.bot_state else 0.0 @gun_turn_rate.setter def gun_turn_rate(self, gun_turn_rate: float) -> None: self.bot_intent.gun_turn_rate = IntentValidator.validate_gun_turn_rate( gun_turn_rate, self.max_gun_turn_rate ) @property def radar_turn_rate(self) -> float: if self.bot_intent.radar_turn_rate is not None: return self.bot_intent.radar_turn_rate tick = self.current_tick_or_null return tick.bot_state.radar_turn_rate if tick and tick.bot_state else 0.0 @radar_turn_rate.setter def radar_turn_rate(self, radar_turn_rate: float) -> None: self.bot_intent.radar_turn_rate = IntentValidator.validate_radar_turn_rate( radar_turn_rate, self.max_radar_turn_rate ) @property def target_speed(self) -> float | None: return self.bot_intent.target_speed @target_speed.setter def target_speed(self, target_speed: float) -> None: self.bot_intent.target_speed = IntentValidator.validate_target_speed( target_speed, self.max_speed )
[docs] def get_max_speed(self) -> float: # Max speed is part of bot's own limits, not server state return self.max_speed
[docs] def set_max_speed(self, max_speed: float) -> None: # Max speed is part of bot's own limits self.max_speed = IntentValidator.validate_max_speed(max_speed)
[docs] def get_max_turn_rate(self) -> float: # Max turn rate is part of bot's own limits return self.max_turn_rate
[docs] def set_max_turn_rate(self, max_turn_rate: float) -> None: # Max turn rate is part of bot's own limits self.max_turn_rate = IntentValidator.validate_max_turn_rate(max_turn_rate)
[docs] def get_max_gun_turn_rate(self) -> float: # Max gun turn rate is part of bot's own limits return self.max_gun_turn_rate
[docs] def set_max_gun_turn_rate(self, max_gun_turn_rate: float) -> None: # Max gun turn rate is part of bot's own limits self.max_gun_turn_rate = IntentValidator.validate_max_gun_turn_rate(max_gun_turn_rate)
[docs] def get_max_radar_turn_rate(self) -> float: # Max radar turn rate is part of bot's own limits return self.max_radar_turn_rate
[docs] def set_max_radar_turn_rate(self, max_radar_turn_rate: float) -> None: # Max radar turn rate is part of bot's own limits self.max_radar_turn_rate = IntentValidator.validate_max_radar_turn_rate(max_radar_turn_rate)
[docs] def get_new_target_speed(self, speed: float, distance: float) -> float: return IntentValidator.get_new_target_speed(speed, distance, self.max_speed)
[docs] def get_distance_traveled_until_stop(self, speed: float) -> float: return IntentValidator.get_distance_traveled_until_stop(speed, self.max_speed)
@property def conditions(self) -> Set[Condition]: return self._conditions # Conditions
[docs] def add_condition(self, condition: Condition) -> bool: # Set add method does not return a boolean indicating if the add was successful # We need to check length before and after. prev_len = len(self.conditions) self.conditions.add(condition) return len(self.conditions) > prev_len
[docs] def remove_condition(self, condition: Condition) -> bool: try: self.conditions.remove(condition) return True # remove() raises KeyError if not found except KeyError: return False # Condition not found
[docs] def set_stop(self, overwrite: bool) -> None: if not self.is_stopped or overwrite: self.is_stopped = True # Save current intent values self.saved_target_speed = self.bot_intent.target_speed self.saved_turn_rate = self.bot_intent.turn_rate self.saved_gun_turn_rate = self.bot_intent.gun_turn_rate self.saved_radar_turn_rate = self.bot_intent.radar_turn_rate # Stop all movement/turning immediately self.bot_intent.target_speed = 0.0 self.bot_intent.turn_rate = 0.0 self.bot_intent.gun_turn_rate = 0.0 self.bot_intent.radar_turn_rate = 0.0 if self.stop_resume_listener is not None: self.stop_resume_listener.on_stop()
[docs] def set_resume(self) -> None: if self.is_stopped: # Restore saved intent values self.bot_intent.target_speed = self.saved_target_speed self.bot_intent.turn_rate = self.saved_turn_rate self.bot_intent.gun_turn_rate = self.saved_gun_turn_rate self.bot_intent.radar_turn_rate = self.saved_radar_turn_rate if self.stop_resume_listener is not None: self.stop_resume_listener.on_resume() self.is_stopped = False # Must be the last step
[docs] def is_teammate(self, bot_id: int) -> bool: return bot_id in self.teammate_ids # Uses property getter
[docs] def broadcast_team_message(self, message: "Any") -> None: self.send_team_message(None, message)
[docs] def send_team_message(self, teammate_id: Optional[int], message: Any) -> None: from ..team_message import serialize_team_message IntentValidator.validate_teammate_id(teammate_id, self.teammate_ids) team_messages_list = self.bot_intent.team_messages if team_messages_list is None: team_messages_list = [] self.bot_intent.team_messages = team_messages_list IntentValidator.validate_team_message(message, len(team_messages_list)) # Serialize the message using team_message module which handles Color objects json_message_str = serialize_team_message(message) IntentValidator.validate_team_message_size(json_message_str) team_message = TeamMessage( message_type=type(message).__name__, receiver_id=teammate_id, message=json_message_str, ) team_messages_list.append(team_message)
# Color and Graphics - Delegated @property def body_color(self) -> Optional[Color]: tick = self.current_tick_or_null return tick.bot_state.body_color if tick and tick.bot_state else None @body_color.setter def body_color(self, color: Optional[Color]) -> None: self.bot_intent.body_color = IntentValidator.color_to_schema(color) @property def turret_color(self) -> Optional[Color]: tick = self.current_tick_or_null return tick.bot_state.turret_color if tick and tick.bot_state else None @turret_color.setter def turret_color(self, color: Optional[Color]) -> None: self.bot_intent.turret_color = IntentValidator.color_to_schema(color) @property def radar_color(self) -> Optional[Color]: tick = self.current_tick_or_null return tick.bot_state.radar_color if tick and tick.bot_state else None @radar_color.setter def radar_color(self, color: Optional[Color]) -> None: self.bot_intent.radar_color = IntentValidator.color_to_schema(color) @property def bullet_color(self) -> Optional[Color]: tick = self.current_tick_or_null return tick.bot_state.bullet_color if tick and tick.bot_state else None @bullet_color.setter def bullet_color(self, color: Optional[Color]) -> None: self.bot_intent.bullet_color = IntentValidator.color_to_schema(color) @property def scan_color(self) -> Optional[Color]: tick = self.current_tick_or_null return tick.bot_state.scan_color if tick and tick.bot_state else None @scan_color.setter def scan_color(self, color: Optional[Color]) -> None: self.bot_intent.scan_color = IntentValidator.color_to_schema(color) @property def tracks_color(self) -> Optional[Color]: tick = self.current_tick_or_null return tick.bot_state.tracks_color if tick and tick.bot_state else None @tracks_color.setter def tracks_color(self, color: Optional[Color]) -> None: self.bot_intent.tracks_color = IntentValidator.color_to_schema(color) @property def gun_color(self) -> Optional[Color]: tick = self.current_tick_or_null return tick.bot_state.gun_color if tick and tick.bot_state else None @gun_color.setter def gun_color(self, color: Optional[Color]) -> None: self.bot_intent.gun_color = IntentValidator.color_to_schema(color)
[docs] def get_graphics(self) -> GraphicsABC: return self.graphics_state
# Bullet States - Delegated
[docs] def get_bullet_states(self) -> Sequence[BulletState | None]: tick = self.current_tick_or_null if tick and tick.bullet_states: return list(tick.bullet_states) return []
# Server Handshake @property def server_handshake(self) -> ServerHandshake: if self._server_handshake is None: raise BotException(NOT_CONNECTED_TO_SERVER_MSG) return self._server_handshake @server_handshake.setter def server_handshake(self, value: ServerHandshake): self._server_handshake = value @property def variant(self) -> str: return self.server_handshake.variant @variant.setter def variant(self, variant: str) -> None: self.server_handshake.variant = variant @property def version(self) -> str: return self.server_handshake.version @version.setter def version(self, version: str) -> None: self.server_handshake.version = version