Source code for bot_api.internal.bot_internals

from typing import Callable, TYPE_CHECKING
import math

if TYPE_CHECKING:
    from ..bot import Bot

from .base_bot_internals import BaseBotInternals
from .stop_resume_listener_abs import StopResumeListenerABC
from .event_interruption import EventInterruption
from .thread_interrupted_exception import ThreadInterruptedException
from ..events import TickEvent, HitBotEvent, ScannedBotEvent
from ..constants import MAX_SPEED


[docs] class BotInternals(StopResumeListenerABC): def __init__(self, bot: "Bot", base_bot_internals: BaseBotInternals): self._base_bot_internals = base_bot_internals self._bot = bot self._base_bot_internals.stop_resume_listener = self self._clear_remaining() # Internal handlers are used for maintaining internal state only handlers = self._base_bot_internals.internal_event_handlers def _stop_thread(_: ...) -> None: self._base_bot_internals.stop_thread() # Priority 110 ensures _process_turn() runs BEFORE BaseBotInternals._on_next_turn (priority 100) # which calls notify_all() to wake up the bot thread. This prevents a race condition # where the bot thread wakes up before turn_remaining/distance_remaining are updated. handlers.on_next_turn.subscribe(self.on_next_turn, 110) # Priority 90 ensures BaseBotInternals.on_round_started (priority 100) resets state first, # then we pre-warm the bot thread so it is alive and waiting before turn 1 arrives. handlers.on_round_started.subscribe(lambda _: self._on_round_started_prewarm(), 90) handlers.on_game_aborted.subscribe(_stop_thread, 100) handlers.on_round_ended.subscribe(_stop_thread, 90) handlers.on_game_ended.subscribe(_stop_thread, 90) handlers.on_disconnected.subscribe(_stop_thread, 90) handlers.on_death.subscribe(_stop_thread, 90) handlers.on_hit_wall.subscribe(lambda _: self.on_hit_wall(), 90) handlers.on_hit_bot.subscribe(self.on_hit_bot, 90) # Did we go over desired distance to travel self._is_over_driving = False self._override_target_speed = False self._override_turn_rate = False self._override_gun_turn_rate = False self._override_radar_turn_rate = False
[docs] def on_next_turn(self, e: TickEvent) -> None: """Handle the next turn event.""" if e.turn_number == 1: self._on_first_turn() self._process_turn()
def _on_round_started_prewarm(self) -> None: """Pre-warm bot thread at RoundStarted (P90) so thread is alive before turn 1 arrives.""" self._base_bot_internals.stop_thread() self._clear_remaining() self._base_bot_internals.start_thread(self._bot) def _on_first_turn(self) -> None: """Handle the first turn — capture initial directions for delta tracking.""" self._clear_remaining() def _clear_remaining(self) -> None: """Clear the remaining movement and turn values.""" self.distance_remaining: float = 0.0 self.turn_remaining: float = 0.0 self.gun_turn_remaining: float = 0.0 self.radar_turn_remaining: float = 0.0 self._continuous_turn_rate: float = 0.0 self._continuous_gun_turn_rate: float = 0.0 self._continuous_radar_turn_rate: float = 0.0 self._continuous_target_speed: float = 0.0 try: self._previous_direction: float = self._bot.direction self._previous_gun_direction: float = self._bot.gun_direction self._previous_radar_direction: float = self._bot.radar_direction except Exception: self._previous_direction = 0.0 self._previous_gun_direction = 0.0 self._previous_radar_direction = 0.0 def _process_turn(self) -> None: """Process the bot's turn, updating movement and turn values.""" if self._bot.disabled: self._clear_remaining() return self._update_turn_remaining() self._update_gun_turn_remaining() self._update_radar_turn_remaining() self._update_movement()
[docs] def on_hit_wall(self): self.distance_remaining = 0.0
[docs] def on_hit_bot(self, e: HitBotEvent) -> None: if e.rammed: self.distance_remaining = 0.0
@property def turn_rate(self) -> float: return self._base_bot_internals.turn_rate @turn_rate.setter def turn_rate(self, x: float) -> None: self._override_turn_rate = False self._continuous_turn_rate = x # Match Java semantics: remaining is based on the new requested turn rate self.turn_remaining = self._to_infinite_value(x) self._base_bot_internals.turn_rate = x @property def gun_turn_rate(self) -> float: return self._base_bot_internals.gun_turn_rate @gun_turn_rate.setter def gun_turn_rate(self, x: float) -> None: self._override_gun_turn_rate = False self._continuous_gun_turn_rate = x # Match Java semantics: remaining is based on the new requested gun turn rate self.gun_turn_remaining = self._to_infinite_value(x) self._base_bot_internals.gun_turn_rate = x @property def radar_turn_rate(self) -> float: return self._base_bot_internals.radar_turn_rate @radar_turn_rate.setter def radar_turn_rate(self, x: float) -> None: self._override_radar_turn_rate = False self._continuous_radar_turn_rate = x # Match Java semantics: remaining is based on the new requested radar turn rate self.radar_turn_remaining = self._to_infinite_value(x) self._base_bot_internals.radar_turn_rate = x def _to_infinite_value(self, x: float) -> float: """Convert a turn rate to an infinite value for remaining turns.""" if x > 0: return float("inf") elif x < 0: return float("-inf") return 0.0 @property def target_speed(self) -> float | None: return self._base_bot_internals.target_speed @target_speed.setter def target_speed(self, speed: float) -> None: self._override_target_speed = False self._continuous_target_speed = speed self.distance_remaining = self._to_infinite_value(speed) self._base_bot_internals.target_speed = speed
[docs] def set_forward(self, distance: float) -> None: self._override_target_speed = True if math.isnan(distance): raise ValueError("'distance' cannot be NaN") self._get_and_set_new_target_speed(distance) self.distance_remaining = distance
[docs] def forward(self, distance: float) -> None: if self._bot.stopped: self._bot.go() else: self.set_forward(distance) self.wait_for( lambda: self.distance_remaining == 0.0 and self._bot.speed == 0.0 )
[docs] def set_turn_left(self, degrees: float) -> None: self._override_turn_rate = True self.turn_remaining = degrees self._base_bot_internals.turn_rate = degrees
[docs] def turn_left(self, degrees: float) -> None: if self._bot.stopped: self._bot.go() else: self.set_turn_left(degrees) self.wait_for(lambda: self.turn_remaining == 0)
[docs] def set_turn_gun_left(self, degrees: float) -> None: self._override_gun_turn_rate = True self.gun_turn_remaining = degrees self._base_bot_internals.gun_turn_rate = degrees
[docs] def turn_gun_left(self, degrees: float) -> None: if self._bot.stopped: self._bot.go() else: self.set_turn_gun_left(degrees) self.wait_for(lambda: self.gun_turn_remaining == 0)
[docs] def set_turn_radar_left(self, degrees: float) -> None: self._override_radar_turn_rate = True self.radar_turn_remaining = degrees self._base_bot_internals.radar_turn_rate = degrees
[docs] def turn_radar_left(self, degrees: float) -> None: if self._bot.stopped: self._bot.go() else: self.set_turn_radar_left(degrees) self.wait_for(lambda: self.radar_turn_remaining == 0)
[docs] def fire(self, firepower: float) -> None: self._bot.set_fire(firepower) self._bot.go()
[docs] def rescan(self) -> None: # Mark ScannedBotEvent as interruptible so a new scan can interrupt the current handler EventInterruption.set_interruptible(ScannedBotEvent, True) self._bot.set_rescan() # Align with Java: do not raise interruption here; event dispatch handles and swallows it self._bot.go()
[docs] def wait_for(self, condition: Callable[[], bool]) -> None: self._bot.go() while self._base_bot_internals.is_running() and not condition(): self._bot.go()
[docs] def stop(self, overwrite: bool) -> None: self._bot.set_stop(overwrite) self._bot.go()
[docs] def resume(self) -> None: self._base_bot_internals.set_resume() self._bot.go()
[docs] def on_stop(self) -> None: self._saved_previous_direction = self._previous_direction self._saved_previous_gun_direction = self._previous_gun_direction self._saved_previous_radar_direction = self._previous_radar_direction self._saved_distance_remaining = self.distance_remaining self._saved_turn_remaining = self.turn_remaining self._saved_gun_turn_remaining = self.gun_turn_remaining self._saved_radar_turn_remaining = self.radar_turn_remaining
[docs] def on_resume(self): self._previous_direction = self._saved_previous_direction self._previous_gun_direction = self._saved_previous_gun_direction self._previous_radar_direction = self._saved_previous_radar_direction self.distance_remaining = self._saved_distance_remaining self.turn_remaining = self._saved_turn_remaining self.gun_turn_remaining = self._saved_gun_turn_remaining self.radar_turn_remaining = self._saved_radar_turn_remaining
def _update_turn_remaining(self) -> None: """Update the turn remaining value based on the bot's current direction.""" delta = self._bot.calc_delta_angle( self._bot.direction, self._previous_direction ) self._previous_direction = self._bot.direction if not self._override_turn_rate: self._base_bot_internals.turn_rate = self._continuous_turn_rate return if abs(self.turn_remaining) <= abs(delta): self.turn_remaining = 0 else: self.turn_remaining -= delta if self._is_near_zero(self.turn_remaining): self.turn_remaining = 0 self._base_bot_internals.turn_rate = self.turn_remaining def _update_gun_turn_remaining(self) -> None: """Update the gun turn remaining value based on the bot's current gun direction.""" delta = self._bot.calc_delta_angle( self._bot.gun_direction, self._previous_gun_direction ) self._previous_gun_direction = self._bot.gun_direction if not self._override_gun_turn_rate: self._base_bot_internals.gun_turn_rate = self._continuous_gun_turn_rate return if abs(self.gun_turn_remaining) <= abs(delta): self.gun_turn_remaining = 0 else: self.gun_turn_remaining -= delta if self._is_near_zero(self.gun_turn_remaining): self.gun_turn_remaining = 0 self._base_bot_internals.gun_turn_rate = self.gun_turn_remaining def _update_radar_turn_remaining(self) -> None: """Update the radar turn remaining value based on the bot's current radar direction.""" delta = self._bot.calc_delta_angle( self._bot.radar_direction, self._previous_radar_direction ) self._previous_radar_direction = self._bot.radar_direction if not self._override_radar_turn_rate: self._base_bot_internals.radar_turn_rate = self._continuous_radar_turn_rate return if abs(self.radar_turn_remaining) <= abs(delta): self.radar_turn_remaining = 0 else: self.radar_turn_remaining -= delta if self._is_near_zero(self.radar_turn_remaining): self.radar_turn_remaining = 0 self._base_bot_internals.radar_turn_rate = self.radar_turn_remaining def _update_movement(self): if not self._override_target_speed: self._base_bot_internals.target_speed = self._continuous_target_speed if abs(self.distance_remaining) < abs(self._bot.speed): self.distance_remaining = 0 else: self.distance_remaining -= self._bot.speed elif math.isinf(self.distance_remaining): self._base_bot_internals.target_speed = ( MAX_SPEED if self.distance_remaining > 0 else -MAX_SPEED ) else: distance = self.distance_remaining # This is Nat Pavasant's method described here: # https://robowiki.net/wiki/User:Positive/Optimal_Velocity#Nat.27s_updateMovement new_speed = self._get_and_set_new_target_speed(distance) # If we are over-driving our distance, and we are now at speed=0, then we stopped if self._is_near_zero(new_speed) and self._is_over_driving: distance = 0 self._is_over_driving = False # the overdrive flag if distance * new_speed >= 0: self._is_over_driving = ( self._base_bot_internals.get_distance_traveled_until_stop(new_speed) > abs(distance) ) self.distance_remaining = distance - new_speed def _get_and_set_new_target_speed(self, distance: float) -> float: """Get and set the new target speed based on the remaining distance.""" speed = self._base_bot_internals.get_new_target_speed( self._bot.speed, distance ) self._base_bot_internals.target_speed = speed return speed def _is_near_zero(self, x: float) -> bool: return abs(x) < 1e-5