317 lines
12 KiB
Python
317 lines
12 KiB
Python
```python
|
|
import re
|
|
import random
|
|
from typing import Any, Dict, Optional, Tuple, List
|
|
|
|
import textarena as ta
|
|
|
|
|
|
class MazeBoundEnv(ta.Env):
|
|
"""
|
|
MazeBound: Deterministic, turn-based maze navigation game.
|
|
Two explorers compete to reach the Beacon Core first.
|
|
"""
|
|
|
|
def __init__(self, maze_size: int = 7, turn_limit: int = 40):
|
|
self.maze_size = maze_size
|
|
self.turn_limit = turn_limit
|
|
self.visibility_radius = 1
|
|
|
|
# Precompile regex patterns for action grammar
|
|
self.move_pattern = re.compile(r"^MOVE:(N|S|E|W)$")
|
|
self.scan_pattern = re.compile(r"^SCAN$")
|
|
self.pass_pattern = re.compile(r"^PASS$")
|
|
|
|
# -------------------------------
|
|
# Helper: Extract \boxed{} content
|
|
# -------------------------------
|
|
def _extract_answer_content(self, action: str) -> str:
|
|
"""Extract content from \boxed{} to validate the player's action."""
|
|
match = re.search(r"\\boxed\{([^}]*)\}", action, re.DOTALL)
|
|
if match:
|
|
return match.group(1).strip()
|
|
return action.strip()
|
|
|
|
# -------------------------------
|
|
# Maze Generation
|
|
# -------------------------------
|
|
def _generate_maze(self, seed: Optional[int]) -> Tuple[List[List[str]], Tuple[int, int]]:
|
|
"""Generate a simple deterministic maze and Beacon location given a seed."""
|
|
rnd = random.Random(seed)
|
|
maze = []
|
|
for i in range(self.maze_size):
|
|
row = []
|
|
for j in range(self.maze_size):
|
|
# Keep borders mostly passable, random walls elsewhere
|
|
if rnd.random() < 0.2:
|
|
row.append("#")
|
|
else:
|
|
row.append(" ")
|
|
maze.append(row)
|
|
|
|
# Ensure start and end are open
|
|
maze[0][0] = " "
|
|
maze[self.maze_size - 1][self.maze_size - 1] = " "
|
|
|
|
# Beacon location - ensure open cell (not on edge)
|
|
bx, by = rnd.randint(1, self.maze_size - 2), rnd.randint(1, self.maze_size - 2)
|
|
maze[bx][by] = "B"
|
|
return maze, (bx, by)
|
|
|
|
# -------------------------------
|
|
# Helper: Compute Manhattan distance
|
|
# -------------------------------
|
|
def _manhattan(self, a: Tuple[int, int], b: Tuple[int, int]) -> int:
|
|
return abs(a[0] - b[0]) + abs(a[1] - b[1])
|
|
|
|
# -------------------------------
|
|
# Reset method
|
|
# -------------------------------
|
|
def reset(self, num_players: int, seed: Optional[int] = None):
|
|
"""
|
|
Resets the environment to an initial state.
|
|
|
|
Args:
|
|
num_players: Number of players in the game. Must be 2.
|
|
seed: Optional seed for deterministic behavior.
|
|
|
|
Returns:
|
|
None
|
|
"""
|
|
if num_players != 2:
|
|
raise ValueError("MazeBound is strictly a two-player game.")
|
|
|
|
self.state = ta.TwoPlayerState(num_players=num_players, seed=seed, max_turns=self.turn_limit)
|
|
maze, beacon_coord = self._generate_maze(seed)
|
|
rnd = random.Random(seed)
|
|
|
|
# Initialize players
|
|
players = {
|
|
"A": {
|
|
"name": "Explorer Alpha",
|
|
"position": [0, 0],
|
|
"visible_cells": self._visible_cells((0, 0)),
|
|
"discovered_map": {},
|
|
"distance_to_beacon": 0,
|
|
"last_action": None,
|
|
},
|
|
"B": {
|
|
"name": "Explorer Beta",
|
|
"position": [self.maze_size - 1, self.maze_size - 1],
|
|
"visible_cells": self._visible_cells((self.maze_size - 1, self.maze_size - 1)),
|
|
"discovered_map": {},
|
|
"distance_to_beacon": 0,
|
|
"last_action": None,
|
|
},
|
|
}
|
|
|
|
players["A"]["distance_to_beacon"] = self._manhattan(tuple(players["A"]["position"]), beacon_coord)
|
|
players["B"]["distance_to_beacon"] = self._manhattan(tuple(players["B"]["position"]), beacon_coord)
|
|
|
|
game_state = {
|
|
"maze_size": self.maze_size,
|
|
"turn_number": 0,
|
|
"turn_limit": self.turn_limit,
|
|
"seed": seed,
|
|
"beacon_coord": list(beacon_coord),
|
|
"maze_layout": maze,
|
|
"players": players,
|
|
"history": [],
|
|
"winner": None,
|
|
"terminated": False,
|
|
"termination_reason": "",
|
|
}
|
|
|
|
self.state.reset(game_state=game_state, player_prompt_function=self._generate_player_prompt)
|
|
self.state.add_observation("Welcome to MazeBound!", ta.ObservationType.GAME_MESSAGE)
|
|
|
|
# -------------------------------
|
|
# Visibility Calculation
|
|
# -------------------------------
|
|
def _visible_cells(self, pos: Tuple[int, int]) -> List[List[int]]:
|
|
"""Return list of visible cells within radius 1 (including self)."""
|
|
cells = []
|
|
x, y = pos
|
|
for dx in [-1, 0, 1]:
|
|
for dy in [-1, 0, 1]:
|
|
nx, ny = x + dx, y + dy
|
|
if 0 <= nx < self.maze_size and 0 <= ny < self.maze_size:
|
|
cells.append([nx, ny])
|
|
return cells
|
|
|
|
# -------------------------------
|
|
# Step Method
|
|
# -------------------------------
|
|
def step(self, action: str) -> Tuple[bool, ta.Info]:
|
|
"""
|
|
Perform a single environment step for the current player.
|
|
|
|
Args:
|
|
action: The action text submitted by the current player.
|
|
|
|
Returns:
|
|
A tuple (done, info)
|
|
"""
|
|
player_idx = self.state.current_player_id
|
|
player_key = "A" if player_idx == 0 else "B"
|
|
opp_key = "B" if player_key == "A" else "A"
|
|
|
|
self.state.add_observation(
|
|
message=action,
|
|
observation_type=ta.ObservationType.PLAYER_ACTION,
|
|
from_id=player_idx,
|
|
to_id=-1,
|
|
)
|
|
|
|
extracted = self._extract_answer_content(action)
|
|
game_state = self.state.game_state
|
|
|
|
valid_action = False
|
|
reason_invalid = None
|
|
|
|
# Validate grammar
|
|
if self.move_pattern.match(extracted):
|
|
direction = extracted.split(":")[1]
|
|
valid_action = True
|
|
self._execute_move(player_key, direction)
|
|
elif self.scan_pattern.match(extracted):
|
|
valid_action = True
|
|
self._execute_scan(player_key)
|
|
elif self.pass_pattern.match(extracted):
|
|
valid_action = True
|
|
# do nothing
|
|
else:
|
|
reason_invalid = "UnrecognizedActionFormat"
|
|
|
|
if not valid_action:
|
|
self.state.set_invalid_move(reason=reason_invalid or "MalformedInput")
|
|
return self.state.step()
|
|
|
|
# Record history
|
|
game_state["players"][player_key]["last_action"] = extracted
|
|
turn_pair_number = (len(game_state["history"]) // 2) + 1
|
|
game_state["history"].append({"turn": turn_pair_number, "player": player_key, "action": extracted})
|
|
|
|
# Check beacon capture termination
|
|
player_pos = tuple(game_state["players"][player_key]["position"])
|
|
beacon = tuple(game_state["beacon_coord"])
|
|
if player_pos == beacon:
|
|
game_state["terminated"] = True
|
|
game_state["winner"] = player_key
|
|
game_state["termination_reason"] = "BeaconCaptured"
|
|
self.state.set_winner(player_id=player_idx, reason="BeaconCaptured")
|
|
return self.state.step()
|
|
|
|
# Update turn number every two moves
|
|
total_actions = len(game_state["history"])
|
|
if total_actions % 2 == 0:
|
|
game_state["turn_number"] += 1
|
|
|
|
# Check turn limit termination
|
|
if game_state["turn_number"] >= self.turn_limit // 2:
|
|
self._determine_end_by_distance()
|
|
|
|
return self.state.step()
|
|
|
|
# -------------------------------
|
|
# Action execution helpers
|
|
# -------------------------------
|
|
def _execute_move(self, player_key: str, direction: str):
|
|
"""Execute movement if possible, handling walls and bounds."""
|
|
game_state = self.state.game_state
|
|
pos = game_state["players"][player_key]["position"]
|
|
x, y = pos
|
|
if direction == "N":
|
|
nx, ny = x - 1, y
|
|
elif direction == "S":
|
|
nx, ny = x + 1, y
|
|
elif direction == "E":
|
|
nx, ny = x, y + 1
|
|
elif direction == "W":
|
|
nx, ny = x, y - 1
|
|
else:
|
|
self.state.set_invalid_move("UnrecognizedActionFormat")
|
|
return
|
|
|
|
if not (0 <= nx < self.maze_size and 0 <= ny < self.maze_size):
|
|
self.state.set_invalid_move("OutOfBounds")
|
|
return
|
|
if game_state["maze_layout"][nx][ny] == "#":
|
|
self.state.set_invalid_move("BlockedByWall")
|
|
return
|
|
# Apply move
|
|
game_state["players"][player_key]["position"] = [nx, ny]
|
|
game_state["players"][player_key]["visible_cells"] = self._visible_cells((nx, ny))
|
|
# Recalculate distance
|
|
beacon = tuple(game_state["beacon_coord"])
|
|
game_state["players"][player_key]["distance_to_beacon"] = self._manhattan((nx, ny), beacon)
|
|
|
|
def _execute_scan(self, player_key: str):
|
|
"""Reveal adjacent cells within visibility radius."""
|
|
game_state = self.state.game_state
|
|
pos = tuple(game_state["players"][player_key]["position"])
|
|
visible = self._visible_cells(pos)
|
|
game_state["players"][player_key]["visible_cells"] = visible
|
|
|
|
# -------------------------------
|
|
# Terminal Check helper (time expired)
|
|
# -------------------------------
|
|
def _determine_end_by_distance(self):
|
|
"""Determine winner by shortest distance to beacon upon timeout."""
|
|
game_state = self.state.game_state
|
|
A_dist = game_state["players"]["A"]["distance_to_beacon"]
|
|
B_dist = game_state["players"]["B"]["distance_to_beacon"]
|
|
if A_dist < B_dist:
|
|
game_state["terminated"] = True
|
|
game_state["winner"] = "A"
|
|
game_state["termination_reason"] = "TimeExpired"
|
|
self.state.set_winner(player_id=0, reason="TimeExpired")
|
|
elif B_dist < A_dist:
|
|
game_state["terminated"] = True
|
|
game_state["winner"] = "B"
|
|
game_state["termination_reason"] = "TimeExpired"
|
|
self.state.set_winner(player_id=1, reason="TimeExpired")
|
|
else:
|
|
game_state["terminated"] = True
|
|
game_state["winner"] = None
|
|
game_state["termination_reason"] = "Draw"
|
|
self.state.set_draw(reason="EqualDistance")
|
|
|
|
# -------------------------------
|
|
# Prompt generation for player
|
|
# -------------------------------
|
|
def _generate_player_prompt(self, player_id: int, game_state: Dict[str, Any]) -> str:
|
|
player_key = "A" if player_id == 0 else "B"
|
|
player_data = game_state["players"][player_key]
|
|
visible = player_data["visible_cells"]
|
|
coords_str = ", ".join([f"({x},{y})" for x, y in visible])
|
|
remaining = game_state["turn_limit"] - game_state["turn_number"]
|
|
return (
|
|
f"You are {player_data['name']} in MazeBound, a turn-based labyrinth navigation game.\n"
|
|
"Your goal is to reach the Beacon Core (marked 'B') before your opponent.\n\n"
|
|
f"Current coordinates: {tuple(player_data['position'])}\n"
|
|
f"Visible cells (radius {self.visibility_radius}): {coords_str}\n"
|
|
f"Turns remaining (approximate): {remaining}\n"
|
|
"Available actions:\n"
|
|
" - MOVE:N, MOVE:S, MOVE:E, MOVE:W\n"
|
|
" - SCAN\n"
|
|
" - PASS\n\n"
|
|
"Rules:\n"
|
|
" - Moves blocked by walls (#) or map edges cause Invalid Moves.\n"
|
|
" - SCAN reveals adjacent cells within your visibility range.\n"
|
|
" - Game ends when a player reaches the Beacon Core or after 40 turns.\n"
|
|
"\nUse \\boxed{} around your action token.\n"
|
|
"Example valid response:\n"
|
|
" It looks clear eastward, I'll proceed.\n"
|
|
" \\boxed{MOVE:E}\n"
|
|
"Example invalid response:\n"
|
|
" Let's go east! (missing box)\n"
|
|
)
|
|
|
|
# -------------------------------
|
|
# Close method
|
|
# -------------------------------
|
|
def close(self) -> Tuple[Dict, Dict]:
|
|
"""Return rewards and game_info at end of game."""
|
|
return self.state.rewards, self.state.game_info
|
|
``` |