"""
Poker Game State Machine.
This module implements a complete poker game using a state machine
architecture. The game manages player actions, betting rounds, card dealing, and
pot distribution.
"""
from __future__ import annotations
from typing import Deque, Optional
from collections import deque
import logging
import time
import uuid
from warnings import warn
import warnings
from .deck import Deck
from .enums import (
ActionType,
GameEventType,
GameStage,
PlayerStateType,
Street,
Suit,
)
from .events import GameEvent
from .holding import Holding
from .protocol import PlayerLike, EventHandler
from .state import GameState
from .playeraction import PlayerAction
from .playerstate import PlayerState
from .utils import find_highest_scoring_hand
from .eventbus import EventBus
from .rules import PokerRules, DealingRules, StakesRules, ShowdownRules
from .table import Table
__all__ = ["Game"]
[docs]
class Game:
"""
Texas Hold'em Poker Game.
Implements a Texas Hold'em poker game using an event-driven state machine.
The game manages player actions, betting rounds, dealing, and pot distribution.
Parameters
----------
small_blind : int
Amount for the small blind.
big_blind : int
Amount for the big blind.
ante : int
Amount for the ante.
min_players : int
Minimum number of players to start the game.
max_players : int
Maximum number of players allowed at the table.
max_hands : int
Maximum number of hands to play before ending the game. Default is 1000.
exc_handling_mode : {"log", "raise"}
If "raise", exceptions in event handlers will propagate. If "log", they will be logged. This setting
only effects event handling, not game logic. If an exception occurs in game logic, it will always raise.
log_events : bool
If True, game events will be logged to the console. This only affects logging, not event handling.
The purpose of this is allow users to have their own event handlers without excessive logging noise.
rules : PokerRules | None
Custom poker rules to use. If None, default Texas Hold'em rules are applied. When provided, other
parameters (small_blind, big_blind, ante, min_players, max_players) will override the corresponding
fields in the rules.
first_button_position : int | None
The seat index of the player who will be the button in the first hand. If None (the default), the
button is assigned randomly using a card draw.
"""
[docs]
def __init__(
self,
*,
small_blind: Optional[int] = None,
big_blind: Optional[int] = None,
ante: Optional[int] = None,
min_players: Optional[int] = 2,
max_players: Optional[int] = None,
max_hands: int = 1000,
exc_handling_mode: str = "log",
log_events: bool = True,
rules: Optional[PokerRules] = None,
first_button_position: Optional[int] = None,
):
if not exc_handling_mode in ["log", "raise"]:
raise ValueError("exc_handling_mode must be 'log' or 'raise'")
if rules is None:
rules = PokerRules(
dealing=DealingRules(),
stakes=StakesRules(
small_blind=small_blind,
big_blind=big_blind,
),
showdown=ShowdownRules(),
)
if small_blind:
rules.stakes.small_blind = small_blind
if big_blind:
rules.stakes.big_blind = big_blind
if ante:
rules.stakes.ante = ante
if min_players:
rules.dealing.min_players = min_players
if max_players:
rules.dealing.max_players = max_players
if first_button_position is not None:
if not isinstance(first_button_position, int):
raise ValueError("first_button_position must be an integer")
if not first_button_position >= 0:
raise ValueError("first_button_position must be non-negative")
self._rules = rules
self._max_hands = max_hands
self._state = GameState(
small_blind=rules.stakes.small_blind,
big_blind=rules.stakes.big_blind,
ante=rules.stakes.ante,
)
self._event_queue: Deque[GameEventType] = deque()
self._logger = logging.getLogger("maverick")
self._log_events = log_events
self._first_button_position = first_button_position
self._all_stacks_at_game_start = 0
# Event handling
self._events = EventBus(strict=exc_handling_mode == "raise")
self._event_history: list[GameEvent] = []
# Table
self._table = Table(n_seats=rules.dealing.max_players)
# Game UID - assigned when GAME_STARTED is processed
self._game_uid: Optional[str] = None
@property
def game_uid(self) -> Optional[str]:
"""Returns the unique identifier for the current game session.
A new ``game_uid`` is generated each time :meth:`start` is called (i.e.
each time the ``GAME_STARTED`` event is processed). The value is
``None`` before the game has been started.
Returns
-------
str | None
A 32-character hexadecimal UUID string, or ``None`` if the game
has not been started yet.
.. versionadded:: 0.5.0
"""
return self._game_uid
@property
def uid(self) -> Optional[str]:
"""Alias for game_uid."""
return self._game_uid
@property
def game_id(self) -> Optional[str]:
"""Deprecated: use ``uid`` instead."""
warnings.warn(
"Game.game_id is deprecated, use Game.uid instead.",
DeprecationWarning,
stacklevel=2,
)
return self._game_uid
@property
def rules(self) -> PokerRules:
"""Returns the poker rules used in this game."""
return self._rules
@property
def state(self) -> GameState:
"""Returns the current game state."""
return self._state
@property
def history(self) -> list[GameEvent]:
"""Returns the event history.
Returns
-------
list[GameEvent]
A list of all game events in chronological order.
"""
return self._event_history
@property
def table(self) -> Table:
"""Returns the game table.
.. versionadded:: 0.2.0
"""
return self._table
@property
def button(self) -> Optional[PlayerLike]:
"""Returns the player currently on the button, or None if no button assigned.
.. versionadded:: 0.3.0
"""
if self.state.button_position is not None:
return self.table[self.state.button_position]
return None
@property
def small_blind(self) -> Optional[PlayerLike]:
"""Returns the player currently in the small blind position, or None if no small blind assigned.
.. versionadded:: 0.3.0
"""
if self.state.small_blind_position is not None:
return self.table[self.state.small_blind_position]
return None
@property
def big_blind(self) -> Optional[PlayerLike]:
"""Returns the player currently in the big blind position, or None if no big blind assigned.
.. versionadded:: 0.3.0
"""
if self.state.big_blind_position is not None:
return self.table[self.state.big_blind_position]
return None
def _log(
self,
message: str,
loglevel: int = logging.INFO,
stage_prefix: bool = True,
**kwargs,
) -> None:
if not self._log_events: # pragma: no cover
return
# ANSI colors (set NO_COLOR=1 to disable)
color_map = {
GameStage.PRE_FLOP: "\033[38;5;39m", # blue
GameStage.FLOP: "\033[38;5;34m", # green
GameStage.TURN: "\033[38;5;214m", # orange
GameStage.RIVER: "\033[38;5;196m", # red
GameStage.SHOWDOWN: "\033[38;5;201m", # magenta
}
reset = "\033[0m"
stage = self.state.stage
stage_name = stage.name
stage_prefix_msg = f"{color_map.get(stage, '')}{stage_name}{reset}"
msg = f"{stage_prefix_msg} | {message}" if stage_prefix else message
self._logger.log(loglevel, msg, **kwargs)
[docs]
def subscribe(
self, event_type: GameEventType, handler: EventHandler, **kwargs
) -> str:
"""
Register a handler for a specific game event type.
Handlers are called synchronously in registration order when the event occurs.
Exceptions in handlers are caught, logged, and do not interrupt engine execution.
Parameters
----------
event_type : GameEventType
The type of event to listen for.
handler : EventHandler
A callable that accepts a GameEvent and returns None.
"""
return self._events.subscribe(event_type, handler, **kwargs)
[docs]
def unsubscribe(self, token: str) -> None:
"""Unsubscribe a handler using its token.
Parameters
----------
token : str
The subscription token returned by the subscribe method.
"""
return self._events.unsubscribe(token)
def _emit(self, event: GameEvent) -> None:
"""
Emit a game event to all registered handlers.
Dispatches the event to handlers in registration order. Exceptions in handlers
are caught and logged to prevent disruption of engine execution.
Parameters
----------
event : GameEvent
The event to emit to handlers.
"""
self._event_history.append(event)
# external listeners
self._events.emit(event, self)
# player hooks
for p in self.state.players:
fn = getattr(p, "on_event", None)
if callable(fn):
try:
fn(event, self)
except Exception:
self._logger.warning(
f"Exception in player {p.name} on_event hook for {event.type.name}",
exc_info=True,
)
specific = getattr(p, f"on_{event.type.name.lower()}", None)
if callable(specific):
try:
specific(event, self)
except Exception:
self._logger.warning(
f"Exception in player {p.name} {specific.__name__} hook for {event.type.name}",
exc_info=True,
)
def _create_event(
self,
event_type: GameEventType,
player_uid: Optional[str] = None,
action: Optional[ActionType] = None,
payload: Optional[dict] = None,
) -> GameEvent:
"""
Create a GameEvent with current game state.
Parameters
----------
event_type : GameEventType
The type of event.
player_uid : Optional[str]
UID of the player involved in the event.
action : Optional[ActionType]
Type of action taken (for PLAYER_ACTION events).
Returns
-------
GameEvent
An immutable event payload.
"""
return GameEvent(
type=event_type,
hand_number=self.state.hand_number,
street=self.state.street,
stage=self.state.stage,
player_uid=player_uid,
action=action,
payload=payload or {},
)
[docs]
def add_player(self, player: PlayerLike) -> None:
"""Add a player to the game.
Parameters
----------
player : PlayerLike
The player to add to the game.
"""
if not self.table.has_free_seat:
raise ValueError("Table is full")
if self.state.stage not in [
GameStage.WAITING_FOR_PLAYERS,
GameStage.READY,
]:
raise ValueError("Cannot add players while game is in progress")
existing_names = set([p.name for p in self.state.players])
if player.name in existing_names:
raise ValueError(f"Player name '{player.name}' is already taken")
existing_ids = set([p.uid for p in self.state.players])
if player.uid in existing_ids:
raise ValueError(f"Player uid '{player.uid}' is already taken")
if player.state is None:
player.state = PlayerState(state_type=PlayerStateType.ACTIVE)
self.table.seat_player(player)
else:
self.table.seat_player(player, seat_index=player.state.seat)
player.state.state_type = PlayerStateType.ACTIVE
self.state.players.append(player)
self._handle_event(GameEventType.PLAYER_JOINED)
self._emit(
self._create_event(GameEventType.PLAYER_JOINED, player_uid=player.uid)
)
self._log(
f"Player {player.name} joined the game.", logging.INFO, stage_prefix=False
)
[docs]
def remove_player(self, player: PlayerLike, flush: bool = True) -> None:
"""Remove a player from the game.
Parameters
----------
player : PlayerLike
The player to remove from the game.
flush : bool
If True, immediately process the resulting events. If False, the events will be
added to the queue but not processed until step() is called. Default is True.
"""
player_uid = player.uid
if self.state.stage not in [
GameStage.WAITING_FOR_PLAYERS,
GameStage.READY,
GameStage.HAND_COMPLETE,
GameStage.GAME_OVER,
]:
raise ValueError("Cannot remove players while hand is in progress")
player = next((p for p in self.state.players if p.uid == player_uid), None)
if not player:
raise ValueError(f"Player with uid {player_uid} not found")
self.table.remove_player(player)
self.state.players = [p for p in self.state.players if p.uid != player_uid]
if flush:
self._handle_event(GameEventType.PLAYER_LEFT)
else:
self._event_queue.append(GameEventType.PLAYER_LEFT)
self._emit(self._create_event(GameEventType.PLAYER_LEFT, player_uid=player_uid))
self._log(
f"Player {player.name} has left the game.", logging.INFO, stage_prefix=False
)
[docs]
def start(self) -> None:
"""Start the poker game."""
self._log("Game started.\n", logging.INFO, stage_prefix=False)
self._initialize_game()
self._event_queue.append(GameEventType.GAME_STARTED)
self._drain_event_queue()
def _find_first_button_position(self) -> int:
"""Determine the button position (seat index) for the first hand."""
if isinstance(self._first_button_position, int):
idx = self._first_button_position % len(self.table)
self.table.button_seat = idx
return idx
if len(self.state.players) == 0:
raise ValueError("No players to assign button position")
deck = Deck.standard_deck(shuffle=True)
suit_priority = {
Suit.SPADES: 3,
Suit.HEARTS: 2,
Suit.DIAMONDS: 1,
Suit.CLUBS: 0,
}
best_index = 0 # index in players list
best_score: tuple[int, int] | None = None
for idx in range(len(self.state.players)):
card = deck.deal(1)[0]
score = (card.rank.value, suit_priority[card.suit])
if best_score is None or score > best_score:
best_score = score
best_index = idx
idx = self.state.players[best_index].state.seat
self.table.button_seat = idx
return idx
def _handle_event(self, event: GameEventType) -> None:
match event:
case GameEventType.GAME_STARTED:
assert self.state.stage == GameStage.READY
self.state.stage = GameStage.STARTED
self._game_uid = uuid.uuid4().hex
self._emit(self._create_event(GameEventType.GAME_STARTED))
self._start_new_hand()
self._event_queue.append(GameEventType.HAND_STARTED)
case GameEventType.HAND_STARTED:
assert self.state.stage in [
GameStage.STARTED,
GameStage.HAND_COMPLETE,
]
self.state.stage = GameStage.DEALING
self._emit(self._create_event(GameEventType.HAND_STARTED))
self._deal_hole_cards()
self._emit(self._create_event(GameEventType.HOLE_CARDS_DEALT))
self._event_queue.append(GameEventType.HOLE_CARDS_DEALT)
case GameEventType.HOLE_CARDS_DEALT:
assert self.state.stage == GameStage.DEALING
self._post_blinds()
self._emit(self._create_event(GameEventType.BLINDS_POSTED))
self._event_queue.append(GameEventType.BLINDS_POSTED)
case GameEventType.BLINDS_POSTED:
assert self.state.stage == GameStage.DEALING
self._post_antes()
self._emit(self._create_event(GameEventType.ANTES_POSTED))
self._event_queue.append(GameEventType.ANTES_POSTED)
case GameEventType.ANTES_POSTED:
assert self.state.stage == GameStage.DEALING
self.state.stage = GameStage.PRE_FLOP
self._emit(self._create_event(GameEventType.BETTING_ROUND_STARTED))
self._take_action_from_current_player()
self._event_queue.append(GameEventType.PLAYER_ACTION_TAKEN)
case GameEventType.PLAYER_ACTION_TAKEN:
if self.state.is_betting_round_complete():
self._complete_betting_round()
self._emit(
self._create_event(GameEventType.BETTING_ROUND_COMPLETED)
)
self._event_queue.append(GameEventType.BETTING_ROUND_COMPLETED)
else:
self._advance_to_next_player()
self._take_action_from_current_player()
self._event_queue.append(GameEventType.PLAYER_ACTION_TAKEN)
case GameEventType.BETTING_ROUND_COMPLETED:
if len(self.state.get_players_in_hand()) == 1:
# There is no or only one active player left, so we skip to showdown
self.state.stage = GameStage.SHOWDOWN
self.state.street = None
self._emit(self._create_event(GameEventType.SHOWDOWN_STARTED))
self._handle_showdown()
self._emit(self._create_event(GameEventType.SHOWDOWN_COMPLETED))
self._event_queue.append(GameEventType.SHOWDOWN_COMPLETED)
else:
# There is at least 2 active players, so we continue to next street
if self.state.stage == GameStage.PRE_FLOP:
self.state.stage = GameStage.FLOP
self.state.street = Street.FLOP
self._deal_flop()
self._emit(self._create_event(GameEventType.FLOP_DEALT))
self._event_queue.append(GameEventType.FLOP_DEALT)
self._advance_to_first_active_player()
elif self.state.stage == GameStage.FLOP:
self.state.stage = GameStage.TURN
self.state.street = Street.TURN
self._deal_turn()
self._emit(self._create_event(GameEventType.TURN_DEALT))
self._event_queue.append(GameEventType.TURN_DEALT)
self._advance_to_first_active_player()
elif self.state.stage == GameStage.TURN:
self.state.stage = GameStage.RIVER
self.state.street = Street.RIVER
self._deal_river()
self._emit(self._create_event(GameEventType.RIVER_DEALT))
self._event_queue.append(GameEventType.RIVER_DEALT)
self._advance_to_first_active_player()
elif self.state.stage == GameStage.RIVER:
self.state.stage = GameStage.SHOWDOWN
self.state.street = None
self._emit(self._create_event(GameEventType.SHOWDOWN_STARTED))
self._handle_showdown()
self._emit(self._create_event(GameEventType.SHOWDOWN_COMPLETED))
self._event_queue.append(GameEventType.SHOWDOWN_COMPLETED)
case (
GameEventType.FLOP_DEALT
| GameEventType.TURN_DEALT
| GameEventType.RIVER_DEALT
):
self._emit(self._create_event(GameEventType.BETTING_ROUND_STARTED))
if self.state.is_betting_round_complete():
self._complete_betting_round()
self._emit(
self._create_event(GameEventType.BETTING_ROUND_COMPLETED)
)
self._event_queue.append(GameEventType.BETTING_ROUND_COMPLETED)
self._log(
"There are not active players at the table.",
logging.INFO,
)
else:
self._take_action_from_current_player()
self._event_queue.append(GameEventType.PLAYER_ACTION_TAKEN)
case GameEventType.SHOWDOWN_COMPLETED:
self.state.stage = GameStage.HAND_COMPLETE
self._emit(self._create_event(GameEventType.HAND_ENDED))
self._event_queue.append(GameEventType.HAND_ENDED)
case GameEventType.HAND_ENDED:
self._log("Hand ended\n", logging.INFO, stage_prefix=False)
# Eliminate players with zero stack
eliminated_players = [
p for p in self.state.players if p.state.stack == 0
]
for player in eliminated_players:
player.state.state_type = PlayerStateType.ELIMINATED
self._emit(
self._create_event(
GameEventType.PLAYER_ELIMINATED, player_uid=player.uid
)
)
self._log(
f"Player {player.name} has been eliminated from the game.",
logging.INFO,
stage_prefix=False,
)
self._event_queue.append(GameEventType.PLAYER_ELIMINATED)
self.remove_player(player, flush=False)
# Check if we have enough players to continue
if len(self.state.players) < self.rules.dealing.min_players:
self._log(
"Not enough players to continue, ending game.", logging.INFO
)
self.state.stage = GameStage.GAME_OVER
self._event_queue.append(GameEventType.GAME_ENDED)
else:
# Move button and start next hand
if self.state.hand_number >= self._max_hands:
self._log(
"Reached maximum number of hands, ending game.",
logging.INFO,
stage_prefix=False,
)
self.state.stage = GameStage.GAME_OVER
self._event_queue.append(GameEventType.GAME_ENDED)
else:
self._move_button()
self._start_new_hand()
self._event_queue.append(GameEventType.HAND_STARTED)
case GameEventType.GAME_ENDED:
self._log("Game ended", logging.INFO, stage_prefix=False)
self._emit(self._create_event(GameEventType.GAME_ENDED))
case GameEventType.PLAYER_JOINED:
if self.state.stage == GameStage.WAITING_FOR_PLAYERS:
if len(self.state.players) >= self.rules.dealing.min_players:
self.state.stage = GameStage.READY
case GameEventType.PLAYER_LEFT:
if len(self.state.players) < self.rules.dealing.min_players:
self.state.stage = GameStage.WAITING_FOR_PLAYERS
case GameEventType.PLAYER_ELIMINATED:
pass
case _: # pragma: no cover
raise ValueError(f"Unknown event: {event}")
def _drain_event_queue(self) -> None:
while self.step():
pass
[docs]
def step(self) -> bool:
"""Process the next event in the queue."""
if self.has_events():
event = self._event_queue.popleft()
self._handle_event(event)
return True
return False
[docs]
def has_events(self) -> bool:
"""Check if there are pending events in the queue."""
return len(self._event_queue) > 0
def _initialize_game(self) -> None:
self.state.hand_number = 0
self.state.button_position = self._find_first_button_position()
for p in self.state.players:
self._all_stacks_at_game_start += p.state.stack
def _start_new_hand(self) -> None:
self.state.hand_number += 1
self._log(
"=" * 30 + f" Hand {self.state.hand_number} " + "=" * 30 + "\n",
logging.INFO,
stage_prefix=False,
)
if len(self.state.players) < self.rules.dealing.min_players:
raise ValueError("Not enough players to start hand")
self.state.deck = Deck.standard_deck(shuffle=True)
self.state.community_cards = []
self.state.pot = 0
self.state.current_bet = 0
self.state.last_raise_size = 0
for player in self.state.players:
player.state.current_bet = 0
player.state.total_contributed = 0
player.state.acted_this_street = False
player.state.holding = None
if player.state.stack > 0:
player.state.state_type = PlayerStateType.ACTIVE
self.state.street = Street.PRE_FLOP
self._assign_blind_positions()
[docs]
def get_current_player(self) -> Optional[PlayerLike]:
"""Return the player whose turn it is.
.. versionadded:: 0.2.0
"""
return self.table[self.state.current_player_index]
def _calculate_min_raise_amount(self) -> int:
"""Calculates the minimum extra chips the current player must add
right now to complete a minimum raise.
Important: What this function returns is NOT the amount the pot needs to raise by
or raise to, but the amount the player must add on top of their current bet.
"""
player = self.get_current_player()
player_bet_before = player.state.current_bet
old_table_bet = self.state.current_bet
last_raise_size = self.state.last_raise_size
min_raise_to = old_table_bet + last_raise_size
min_raise_by = min_raise_to - player_bet_before
return min_raise_by
def _take_action_from_current_player(self) -> None:
current_player = self.get_current_player()
if (
not current_player
or current_player.state.state_type != PlayerStateType.ACTIVE
):
return
valid_actions = self._get_valid_actions(current_player)
min_raise_amount = self._calculate_min_raise_amount()
_t0 = time.perf_counter()
action: PlayerAction = current_player.decide_action(
game=self,
valid_actions=valid_actions,
min_raise_amount=min_raise_amount,
call_amount=self.state.current_bet - current_player.state.current_bet,
min_bet_amount=self.state.min_bet,
)
action.decision_time_seconds = time.perf_counter() - _t0
try:
self._register_player_action(current_player, action)
except Exception:
self._log(
f"Player {current_player.name} intended to take action: {action}.",
logging.DEBUG,
)
self._log(
f"Player {current_player.name} action invalid, folding.",
logging.WARNING,
exc_info=True,
)
warn(f"Player {current_player.name} action invalid, folding.")
action = PlayerAction(
player_uid=current_player.uid, action_type=ActionType.FOLD
)
self._register_player_action(current_player, action)
def _deal_hole_cards(self) -> None:
button = self.table[self.state.button_position]
self._log(f"Dealing hole cards. Button: {button.name}", logging.INFO)
for player in self.state.players:
if player.state.state_type == PlayerStateType.ACTIVE:
cards = self.state.deck.deal(self.rules.dealing.hole_cards)
player.state.holding = Holding(cards=cards)
def _post_blinds(self) -> None:
"""Post blinds with correct heads-up semantics (button posts SB in HU)."""
num_players = len(self.state.players)
min_num_players = self.rules.dealing.min_players
# Sanity check: ensure we have enough players to post blinds
if num_players < min_num_players: # pragma: no cover
raise ValueError(f"Need at least {min_num_players} players to post blinds")
# Sanity check: ensure small_blind and big_blind positions are assigned correctly
if not self.small_blind or not self.big_blind: # pragma: no cover
raise ValueError("Small blind or big blind player not assigned")
if self.small_blind.state.seat == self.big_blind.state.seat: # pragma: no cover
raise ValueError("Small blind and big blind cannot be the same player")
# --- Small blind ---
sb_player = self.small_blind
sb_amount = min(self.state.small_blind, sb_player.state.stack)
sb_player.state.current_bet = sb_amount
sb_player.state.total_contributed = sb_amount
sb_player.state.stack -= sb_amount
self.state.pot += sb_amount
if sb_player.state.stack == 0:
sb_player.state.state_type = PlayerStateType.ALL_IN
self._log(
f"Posting small blind of {sb_amount} by player {sb_player.name}. "
f"Remaining stack: {sb_player.state.stack}",
logging.INFO,
)
# --- Big blind ---
bb_player = self.big_blind
bb_amount = min(self.state.big_blind, bb_player.state.stack)
bb_player.state.current_bet = bb_amount
bb_player.state.total_contributed = bb_amount
bb_player.state.stack -= bb_amount
self.state.pot += bb_amount
if bb_player.state.stack == 0:
bb_player.state.state_type = PlayerStateType.ALL_IN
self._log(
f"Posting big blind of {bb_amount} by player {bb_player.name}. "
f"Remaining stack: {bb_player.state.stack}",
logging.INFO,
)
# Table betting state
# IMPORTANT: current_bet should reflect the actual posted BB amount if BB is short-stacked.
self.state.current_bet = bb_amount
self.state.min_bet = self.state.big_blind
self.state.last_raise_size = (
self.state.big_blind
) # preflop min raise increment is BB size
# Next to act preflop:
# - Heads-up: button (SB) acts first
# - Multi-way: player left of BB acts first
if num_players == 2:
self.state.current_player_index = sb_player.state.seat
else:
self.state.current_player_index = self.table.next_occupied_seat(
bb_player.state.seat, active=True
)
assert isinstance(
self.state.current_player_index, int
), "Current player index must be an integer"
def _post_antes(self) -> None:
"""Post antes for all active players."""
if not self.state.ante or self.state.ante <= 0:
return
self._log("Posting antes.", logging.INFO)
for player in self.state.players:
if player.state.state_type == PlayerStateType.ACTIVE:
ante_amount = min(self.state.ante, player.state.stack)
player.state.current_bet += ante_amount
player.state.total_contributed += ante_amount
player.state.stack -= ante_amount
self.state.pot += ante_amount
if player.state.stack == 0:
player.state.state_type = PlayerStateType.ALL_IN
self._log(
f"Player {player.name} posts ante of {ante_amount}. "
f"Remaining stack: {player.state.stack}",
logging.INFO,
)
def _calculate_raise_components(
self, player: PlayerLike, chips_to_add: int
) -> tuple[int, int, int, int, int, bool]:
"""
Returns:
(player_add, player_bet_after, new_table_bet, call_part, raise_size, is_all_in)
"""
stack_before = player.state.stack
player_bet_before = player.state.current_bet
old_table_bet = self.state.current_bet
player_add = min(chips_to_add, stack_before)
player_bet_after = player_bet_before + player_add
# Important: table bet cannot decrease
new_table_bet = max(old_table_bet, player_bet_after)
call_needed = max(0, old_table_bet - player_bet_before)
call_part = min(call_needed, player_add)
# Amount the table bet increases by (0 if player didn't exceed old_table_bet)
raise_size = new_table_bet - old_table_bet
is_all_in = player_add >= stack_before
return (
player_add,
player_bet_after,
new_table_bet,
call_part,
raise_size,
is_all_in,
)
def _reset_acted_flags_for_reopen(self, raiser_id: str) -> None:
"""
Betting was reopened by a *full* raise (>= last_raise_size).
Players get re-raise rights back, so we reset acted flags for ACTIVE players
other than the raiser.
"""
for p in self.state.players:
if p.uid != raiser_id and p.state.state_type == PlayerStateType.ACTIVE:
p.state.acted_this_street = False
def _register_player_action(self, player: PlayerLike, action: PlayerAction) -> None:
action_type = action.action_type
amount = action.amount or 0
current_player = self.get_current_player()
if not current_player or current_player.uid != player.uid:
raise ValueError("Not this player's turn")
if current_player.state.state_type != PlayerStateType.ACTIVE:
raise ValueError("Player cannot act (folded or all-in)")
valid_actions = self._get_valid_actions(current_player)
if action_type not in valid_actions:
raise ValueError(f"Invalid action: {action_type}")
if action_type == ActionType.FOLD:
current_player.state.state_type = PlayerStateType.FOLDED
self._log(f"Player {current_player.name} folds.", logging.INFO)
elif action_type == ActionType.CHECK:
if current_player.state.current_bet != self.state.current_bet:
raise ValueError("Cannot check when there is a bet to call")
self._log(f"Player {current_player.name} checks.", logging.INFO)
elif action_type == ActionType.CALL:
call_amount = self.state.current_bet - current_player.state.current_bet
actual_amount = min(call_amount, current_player.state.stack)
current_player.state.current_bet += actual_amount
current_player.state.total_contributed += actual_amount
current_player.state.stack -= actual_amount
self.state.pot += actual_amount
if current_player.state.stack == 0:
current_player.state.state_type = PlayerStateType.ALL_IN
self._log(
f"Player {current_player.name} calls with amount {actual_amount}. Remaining stack: {current_player.state.stack}.",
logging.INFO,
)
elif action_type == ActionType.BET:
if self.state.current_bet > 0:
raise ValueError("Cannot bet when there is already a bet")
if amount < self.state.min_bet:
raise ValueError(f"Bet must be at least {self.state.min_bet}")
actual_amount = min(amount, current_player.state.stack)
current_player.state.current_bet = actual_amount
current_player.state.total_contributed += actual_amount
current_player.state.stack -= actual_amount
self.state.pot += actual_amount
self.state.current_bet = actual_amount
self.state.last_raise_size = actual_amount
if current_player.state.stack == 0:
current_player.state.state_type = PlayerStateType.ALL_IN
# A bet opens action for others (everyone else must respond)
self._reset_acted_flags_for_reopen(raiser_id=current_player.uid)
self._log(
f"Player {current_player.name} bets amount {actual_amount}. Remaining stack: {current_player.state.stack}.",
logging.INFO,
)
elif action_type == ActionType.RAISE:
old_table_bet = self.state.current_bet
old_last_raise_size = self.state.last_raise_size
(
player_add,
player_bet_after,
new_table_bet,
_,
raise_size,
is_all_in,
) = self._calculate_raise_components(current_player, amount)
if raise_size == 0:
raise ValueError("RAISE must increase the table bet")
# Non-all-in raise must meet minimum raise size
if not is_all_in and raise_size < old_last_raise_size:
raise ValueError(
f"Raise size must be at least {old_last_raise_size} (attempted {raise_size})"
)
current_player.state.current_bet = player_bet_after
current_player.state.total_contributed += player_add
current_player.state.stack -= player_add
self.state.pot += player_add
self.state.current_bet = new_table_bet
if is_all_in:
current_player.state.state_type = PlayerStateType.ALL_IN
# Reopen betting ONLY on a full raise (>= old_last_raise_size)
reopens_betting = raise_size >= old_last_raise_size
if reopens_betting:
self.state.last_raise_size = raise_size
self._reset_acted_flags_for_reopen(raiser_id=current_player.uid)
# else: short all-in raise does NOT reopen betting and must NOT reset flags
self._log(
f"Player {current_player.name} raises by {player_add} chips "
f"to total bet {player_bet_after}. Remaining stack: {current_player.state.stack}.",
logging.INFO,
)
elif action_type == ActionType.ALL_IN:
old_table_bet = self.state.current_bet
old_last_raise_size = self.state.last_raise_size
chips_to_add = current_player.state.stack
(
player_add,
player_bet_after,
new_table_bet,
_,
raise_size,
_,
) = self._calculate_raise_components(current_player, chips_to_add)
current_player.state.current_bet = player_bet_after
current_player.state.total_contributed += player_add
current_player.state.stack = 0
self.state.pot += player_add
current_player.state.state_type = PlayerStateType.ALL_IN
# If the all-in increases the table bet, update it
if new_table_bet > old_table_bet:
self.state.current_bet = new_table_bet
# Reopen betting ONLY if raise_size meets minimum
reopens_betting = raise_size >= old_last_raise_size
if reopens_betting:
self.state.last_raise_size = raise_size
self._reset_acted_flags_for_reopen(raiser_id=current_player.uid)
# else: SHORT all-in -> DOES NOT reopen betting -> DO NOT reset acted flags
self._log(
f"Player {current_player.name} goes all-in with {player_add} chips.",
logging.INFO,
)
# Mark actor as having acted
current_player.state.acted_this_street = True
self._log(
f"Current pot: {self.state.pot} | Current bet: {self.state.current_bet}",
logging.INFO,
)
# Emit player action event after all state mutations
self._emit(
self._create_event(
GameEventType.PLAYER_ACTION_TAKEN,
player_uid=current_player.uid,
action=action,
)
)
def _get_valid_actions(self, player: PlayerLike) -> list[ActionType]:
actions = [ActionType.FOLD]
call_amount = self.state.current_bet - player.state.current_bet
if call_amount == 0:
actions.append(ActionType.CHECK)
else:
if call_amount > 0 and player.state.stack > 0:
actions.append(ActionType.CALL)
if self.state.current_bet == 0 and player.state.stack >= self.state.min_bet:
actions.append(ActionType.BET)
if self.state.current_bet > 0:
min_raise_increment = self.state.last_raise_size
call_amount = self.state.current_bet - player.state.current_bet
total_needed_for_min_raise = call_amount + min_raise_increment
if player.state.stack >= total_needed_for_min_raise:
actions.append(ActionType.RAISE)
if player.state.stack > 0:
actions.append(ActionType.ALL_IN)
return actions
def _advance_to_next_player(self) -> None:
"""
Move to the next player who needs to act.
IMPORTANT:
A player may have already 'acted_this_street' but still needs to act again
if they are facing a bet (player.current_bet < table.current_bet). This is
what makes short all-ins work correctly without resetting acted flags.
"""
idx = self.state.current_player_index
starting_idx = idx
num_players = len(self.table.seats)
for _ in range(num_players):
# get player at next occupied seat
idx = self.table.next_occupied_seat(idx)
p = self.table[idx]
# skip if not active
if p.state.state_type != PlayerStateType.ACTIVE:
continue
# check if player needs to act
facing_call = p.state.current_bet < self.state.current_bet
needs_action = (not p.state.acted_this_street) or facing_call
if needs_action:
# Safety check: ensure we've actually moved to a different player
if idx == starting_idx: # pragma: no cover
# This should not happen in a well-formed game state
raise RuntimeError(
"Cannot advance to next player: same player would act again"
)
# we've found the next player who needs to act
self.state.current_player_index = idx
return
def _complete_betting_round(self) -> None:
for player in self.state.players:
player.state.current_bet = 0
player.state.acted_this_street = False
self.state.current_bet = 0
self.state.last_raise_size = 0
self._log("Betting round complete\n", logging.INFO)
def _advance_to_first_active_player(self) -> None:
self.state.current_player_index = self.table.next_occupied_seat(
self.state.button_position,
active=True,
)
def _deal_flop(self) -> None:
self.state.deck.deal(1)
flop_cards = self.state.deck.deal(3)
self.state.community_cards.extend(flop_cards)
self._log(
f"Dealt flop. Community cards: {[card.utf8() for card in self.state.community_cards]}",
logging.INFO,
)
def _deal_turn(self) -> None:
self.state.deck.deal(1)
turn_card = self.state.deck.deal(1)[0]
self.state.community_cards.append(turn_card)
self._log(
f"Dealt turn. Community cards: {[card.utf8() for card in self.state.community_cards]}",
logging.INFO,
)
def _deal_river(self) -> None:
self.state.deck.deal(1)
river_card = self.state.deck.deal(1)[0]
self.state.community_cards.append(river_card)
self._log(
f"Dealt river. Community cards: {[card.utf8() for card in self.state.community_cards]}",
logging.INFO,
)
def _move_button(self) -> None:
self.table.move_button()
self.state.button_position = self.table.button_seat
def _assign_blind_positions(self) -> None:
num_players = len(self.state.players)
min_num_players = self.rules.dealing.min_players
if num_players < min_num_players: # pragma: no cover
raise ValueError(f"Need at least {min_num_players} players to post blinds")
# Heads-up special case:
# - Button is SMALL blind
# - Other player is BIG blind
if num_players == 2:
sb_index = self.state.button_position
bb_index = self.table.next_occupied_seat(sb_index, active=True)
else:
# Multi-way:
# - SB = left of button
# - BB = left of SB
sb_index = self.table.next_occupied_seat(
self.state.button_position, active=True
)
bb_index = self.table.next_occupied_seat(sb_index, active=True)
assert isinstance(sb_index, int)
assert isinstance(bb_index, int)
assert sb_index != bb_index, "SB and BB cannot be the same player"
# Store blind positions in state for easy access by event handlers and player logic
self.state.small_blind_position = sb_index
self.state.big_blind_position = bb_index
def _winners_in_button_order(self, winners: list[PlayerLike]) -> list[PlayerLike]:
n_players = len(self.state.players)
idx = self.state.button_position # seat index of the button player
rel_btn_idx = {}
for i in range(n_players):
idx = self.table.next_occupied_seat(idx)
p = self.table[idx]
rel_btn_idx[p.uid] = i
return sorted(winners, key=lambda p: rel_btn_idx[p.uid])
def _handle_showdown(self) -> None:
players_in_hand = self.state.get_players_in_hand()
if len(players_in_hand) == 1:
winner = players_in_hand[0]
winner.state.stack += self.state.pot
self._log(
f"Player {winner.name} wins {self.state.pot} from the pot.",
logging.INFO,
)
self._emit(
self._create_event(
GameEventType.POT_WON,
player_uid=winner.uid,
payload={"amount": self.state.pot},
)
)
self.state.pot = 0
else:
assert (
len(self.state.community_cards) == self.rules.dealing.board_cards_total
), "Community cards incomplete at multi-player showdown."
all_contributions = sum(
[p.state.total_contributed for p in self.state.players]
)
assert (
self.state.pot == all_contributions
), f"{self.state.pot} vs {all_contributions}"
# Calculate best hands and scores for all players in hand
player_scores: list[tuple[PlayerLike, float]] = []
for player in players_in_hand:
if player.state.holding:
player_holding = " ".join(
card.utf8() for card in player.state.holding.cards
)
self._log(
f"Player {player.name} has holding {player_holding} at showdown,",
logging.INFO,
)
best_hand, best_hand_type, best_score = find_highest_scoring_hand(
private_cards=player.state.holding.cards,
community_cards=self.state.community_cards,
n_private=self.rules.showdown.hole_cards_required,
)
player_scores.append((player, best_score))
self._emit(
self._create_event(
GameEventType.PLAYER_CARDS_REVEALED,
player_uid=player.uid,
payload={
"holding": [
card.code() for card in player.state.holding.cards
],
"best_hand": [card.code() for card in best_hand],
"best_hand_type": best_hand_type.name,
"best_score": best_score,
},
)
)
self._log(
(
f"Player {player.name} has hand {best_hand_type.name} with "
f"cards {[card.utf8() for card in best_hand]}"
f" (score: {best_score:.8g})"
),
logging.INFO,
)
score_by_id = {p.uid: s for p, s in player_scores}
players_in_hand_ids = {p.uid for p in players_in_hand}
# Record total contributions for pot distribution
contributions = {
p.uid: p.state.total_contributed for p in self.state.players
}
contribution_levels = sorted(
{amt for amt in contributions.values() if amt > 0}
)
# Distribute pot based on contributions (handles side pots)
pot_to_distribute = self.state.pot
awards = {p.uid: 0 for p in self.state.players}
previous_level = 0
for level in contribution_levels:
segment_contributors = [
p for p in self.state.players if contributions[p.uid] >= level
]
delta = level - previous_level
segment_amount = delta * len(segment_contributors)
previous_level = level
# eligibility: must have contributed enough AND not folded
eligible = [
p for p in segment_contributors if p.uid in players_in_hand_ids
]
if not eligible: # pragma: no cover
# should never happen in a sane game, but don't crash silently
raise RuntimeError("No eligible players for a pot segment.")
# Deduct distributed segment from pot
self.state.pot -= segment_amount
if len(segment_contributors) == 1:
# uncalled top layer -> refund to that one contributor
lone = segment_contributors[0]
awards[lone.uid] += segment_amount
continue
# determine winners among eligible for THIS segment
best = max(score_by_id[p.uid] for p in eligible)
segment_winners = [p for p in eligible if score_by_id[p.uid] == best]
share, rem = divmod(segment_amount, len(segment_winners))
# distribute shares
for w in segment_winners:
awards[w.uid] += share
# distribute remainder in relative button order
segment_winners_sorted = self._winners_in_button_order(segment_winners)
for i in range(rem):
idx = i % len(segment_winners_sorted)
awards[segment_winners_sorted[idx].uid] += 1
# Sanity check if the pot is fully distributed
assert (
self.state.pot == 0
), f"Pot should be fully distributed, {self.state.pot} remaining."
# Pay out awards to winners
pot_distributed = 0
id_to_player = {p.uid: p for p in self.state.players}
for p_id in awards:
amount = awards[p_id]
if amount == 0:
continue
player = id_to_player[p_id]
player.state.stack += amount
pot_distributed += amount
self._log(
f"Player {player.name} wins {amount} from the pot.", logging.INFO
)
self._emit(
self._create_event(
GameEventType.POT_WON,
player_uid=player.uid,
payload={"amount": amount},
)
)
# Final sanity check
assert pot_distributed == pot_to_distribute
# Sanity check
total_stacks = sum(p.state.stack for p in self.state.players)
if not total_stacks == self._all_stacks_at_game_start: # pragma: no cover
raise RuntimeError(
"Total chips in game do not match initial amount after showdown."
)
self._log("Showdown complete\n", logging.INFO)