252 lines
11 KiB
Python
252 lines
11 KiB
Python
```python
|
||
import re
|
||
import random
|
||
from typing import Any, Dict, List, Optional, Tuple
|
||
import textarena as ta
|
||
|
||
|
||
class MazeConquerorsEnv(ta.Env):
|
||
"""Turn-based two-player deterministic environment for Maze Conquerors."""
|
||
|
||
def __init__(self, maze_size: int = 7, turn_limit: int = 30):
|
||
self.maze_size = maze_size
|
||
self.turn_limit = turn_limit
|
||
self.state: Optional[ta.TwoPlayerState] = None
|
||
# Precompile regexes for action grammar
|
||
self.move_pattern = re.compile(r'^\[Move:(up|down|left|right)\]$')
|
||
self.scan_pattern = re.compile(r'^\[Scan:[1-3]\]$')
|
||
self.claim_pattern = re.compile(r'^\[Claim\]$')
|
||
self.wait_pattern = re.compile(r'^\[Wait\]$')
|
||
|
||
# ------------------------------------------------------------------ #
|
||
# Helper: extract boxed content
|
||
# ------------------------------------------------------------------ #
|
||
def _extract_answer_content(self, action: str) -> str:
|
||
match = re.search(r'\\boxed\{\{([^}]*)\}\}', action)
|
||
if not match:
|
||
match = re.search(r'\\boxed\{([^}]*)\}', action)
|
||
return match.group(1).strip() if match else action.strip()
|
||
|
||
# ------------------------------------------------------------------ #
|
||
# Maze generation
|
||
# ------------------------------------------------------------------ #
|
||
def _generate_maze(self, seed: int) -> List[List[str]]:
|
||
random.seed(seed)
|
||
size = self.maze_size
|
||
grid = [["." for _ in range(size)] for _ in range(size)]
|
||
# place walls and runes
|
||
for i in range(size):
|
||
for j in range(size):
|
||
roll = random.random()
|
||
if roll < 0.15:
|
||
grid[i][j] = "#"
|
||
elif roll < 0.25:
|
||
grid[i][j] = "R"
|
||
# mark start and goal positions
|
||
grid[0][0] = "S"
|
||
grid[size - 1][size - 1] = "G"
|
||
return grid
|
||
|
||
def _initial_visible_tiles(self, pos: Tuple[int, int]) -> List[List[int]]:
|
||
visible = []
|
||
for dx in (-1, 0, 1):
|
||
for dy in (-1, 0, 1):
|
||
x, y = pos[0] + dx, pos[1] + dy
|
||
if 0 <= x < self.maze_size and 0 <= y < self.maze_size:
|
||
visible.append([x, y])
|
||
return visible
|
||
|
||
# ------------------------------------------------------------------ #
|
||
# Reset
|
||
# ------------------------------------------------------------------ #
|
||
def reset(self, num_players: int, seed: Optional[int] = None):
|
||
"""
|
||
Resets the environment to an initial state.
|
||
"""
|
||
if num_players != 2:
|
||
raise ValueError("Maze Conquerors requires exactly two players.")
|
||
|
||
self.state = ta.TwoPlayerState(num_players=num_players, seed=seed, max_turns=self.turn_limit)
|
||
if seed is None:
|
||
seed = random.randint(0, 9999999)
|
||
maze_layout = self._generate_maze(seed)
|
||
|
||
game_state = {
|
||
"global_turn": 0,
|
||
"turn_limit": self.turn_limit,
|
||
"maze_dimensions": [self.maze_size, self.maze_size],
|
||
"seed": seed,
|
||
"maze_layout": maze_layout,
|
||
"players": {
|
||
"ExplorerA": {
|
||
"position": [0, 0],
|
||
"runes_collected": 0,
|
||
"moves_remaining": 5,
|
||
"visible_tiles": self._initial_visible_tiles((0, 0)),
|
||
"last_action": None,
|
||
"is_trapped": False,
|
||
},
|
||
"ExplorerB": {
|
||
"position": [self.maze_size - 1, self.maze_size - 1],
|
||
"runes_collected": 0,
|
||
"moves_remaining": 5,
|
||
"visible_tiles": self._initial_visible_tiles(
|
||
(self.maze_size - 1, self.maze_size - 1)
|
||
),
|
||
"last_action": None,
|
||
"is_trapped": False,
|
||
},
|
||
},
|
||
"observation_log": [],
|
||
"game_status": "active",
|
||
"winner": None,
|
||
}
|
||
|
||
roles = {0: "ExplorerA", 1: "ExplorerB"}
|
||
self.state.reset(game_state=game_state, player_prompt_function=self._generate_player_prompt, role_mapping=roles)
|
||
self.state.add_observation("Maze Conquerors initialized!", ta.ObservationType.GAME_MESSAGE)
|
||
return self.state
|
||
|
||
# ------------------------------------------------------------------ #
|
||
# Step
|
||
# ------------------------------------------------------------------ #
|
||
def step(self, action: str) -> Tuple[bool, ta.Info]:
|
||
"""
|
||
Perform a single environment step for the current player.
|
||
"""
|
||
player_id = self.state.current_player_id
|
||
role = "ExplorerA" if player_id == 0 else "ExplorerB"
|
||
gs = self.state.game_state
|
||
player = gs["players"][role]
|
||
maze = gs["maze_layout"]
|
||
|
||
# Record raw action
|
||
self.state.add_observation(action, ta.ObservationType.PLAYER_ACTION, from_id=player_id)
|
||
content = self._extract_answer_content(action)
|
||
player["last_action"] = content
|
||
|
||
# Validate action syntax
|
||
if not (
|
||
self.move_pattern.match(content)
|
||
or self.scan_pattern.match(content)
|
||
or self.claim_pattern.match(content)
|
||
or self.wait_pattern.match(content)
|
||
):
|
||
self.state.set_invalid_move(reason="Invalid format: action not recognized.")
|
||
return self.state.step()
|
||
|
||
action_result = None
|
||
if content.startswith("[Move:"):
|
||
direction = content[6:-1]
|
||
dx, dy = {"up": (-1, 0), "down": (1, 0), "left": (0, -1), "right": (0, 1)}[direction]
|
||
newx, newy = player["position"][0] + dx, player["position"][1] + dy
|
||
if not (0 <= newx < self.maze_size and 0 <= newy < self.maze_size):
|
||
self.state.set_invalid_move("Invalid move: outside maze bounds.")
|
||
elif maze[newx][newy] == "#":
|
||
self.state.set_invalid_move("Invalid move: path blocked.")
|
||
else:
|
||
player["position"] = [newx, newy]
|
||
player["visible_tiles"] = self._initial_visible_tiles((newx, newy))
|
||
action_result = "moved successfully"
|
||
elif content.startswith("[Scan:"):
|
||
radius = int(content[6:-1])
|
||
new_visible = []
|
||
px, py = player["position"]
|
||
for i in range(px - radius, px + radius + 1):
|
||
for j in range(py - radius, py + radius + 1):
|
||
if 0 <= i < self.maze_size and 0 <= j < self.maze_size:
|
||
new_visible.append([i, j])
|
||
player["visible_tiles"] = list({tuple(v) for v in player["visible_tiles"] + new_visible})
|
||
action_result = "revealed tiles"
|
||
elif content == "[Claim]":
|
||
px, py = player["position"]
|
||
if maze[px][py] == "R":
|
||
player["runes_collected"] += 1
|
||
maze[px][py] = "."
|
||
action_result = "claimed rune"
|
||
else:
|
||
self.state.set_invalid_move("Invalid claim: no rune present.")
|
||
elif content == "[Wait]":
|
||
action_result = "waited"
|
||
|
||
gs["observation_log"].append(
|
||
{"turn": gs["global_turn"], "player": role, "action": content, "result": action_result or "invalid"}
|
||
)
|
||
|
||
# Advance global turn
|
||
gs["global_turn"] += 1
|
||
|
||
# Terminal condition check
|
||
done = self._check_terminal_conditions()
|
||
if done:
|
||
return True, {}
|
||
|
||
return self.state.step()
|
||
|
||
# ------------------------------------------------------------------ #
|
||
# Terminal conditions
|
||
# ------------------------------------------------------------------ #
|
||
def _check_terminal_conditions(self) -> bool:
|
||
gs = self.state.game_state
|
||
turn = gs["global_turn"]
|
||
if turn >= gs["turn_limit"]:
|
||
self._determine_winner(reason="Turn limit reached.")
|
||
return True
|
||
# Check if all runes collected
|
||
if not any("R" in row for row in gs["maze_layout"]):
|
||
self._determine_winner(reason="All runes collected.")
|
||
return True
|
||
return False
|
||
|
||
# ------------------------------------------------------------------ #
|
||
# Winner determination
|
||
# ------------------------------------------------------------------ #
|
||
def _determine_winner(self, reason: str):
|
||
gs = self.state.game_state
|
||
a, b = gs["players"]["ExplorerA"], gs["players"]["ExplorerB"]
|
||
if a["runes_collected"] > b["runes_collected"]:
|
||
self.state.set_winner(player_id=0, reason=reason)
|
||
elif a["runes_collected"] < b["runes_collected"]:
|
||
self.state.set_winner(player_id=1, reason=reason)
|
||
else:
|
||
core = (self.maze_size // 2, self.maze_size // 2)
|
||
dist_a = abs(a["position"][0] - core[0]) + abs(a["position"][1] - core[1])
|
||
dist_b = abs(b["position"][0] - core[0]) + abs(b["position"][1] - core[1])
|
||
if dist_a < dist_b:
|
||
self.state.set_winner(player_id=0, reason=reason)
|
||
elif dist_b < dist_a:
|
||
self.state.set_winner(player_id=1, reason=reason)
|
||
else:
|
||
self.state.set_draw(reason=reason)
|
||
|
||
# ------------------------------------------------------------------ #
|
||
# Prompt
|
||
# ------------------------------------------------------------------ #
|
||
def _generate_player_prompt(self, player_id: int, game_state: Dict[str, Any]) -> str:
|
||
role = "ExplorerA" if player_id == 0 else "ExplorerB"
|
||
player = game_state["players"][role]
|
||
status = (
|
||
f"You are {role}, traversing an ancient shifting labyrinth to gather mystical runes.\n"
|
||
f"Turn {game_state['global_turn']} of {game_state['turn_limit']}.\n"
|
||
f"You have collected {player['runes_collected']} runes.\n"
|
||
)
|
||
surroundings = "Your visible tiles: " + str(player["visible_tiles"]) + "\n"
|
||
grammar = (
|
||
"Allowed actions:\n"
|
||
"[Move:up], [Move:down], [Move:left], [Move:right]\n"
|
||
"[Scan:1–3], [Claim], [Wait]\n"
|
||
"Put your final answer within \\boxed{{}} at the end of your response.\n"
|
||
"Example valid response:\n"
|
||
"I will explore the passage ahead.\n"
|
||
"\\boxed{{[Move:right]}}\n"
|
||
)
|
||
return status + surroundings + grammar
|
||
|
||
# ------------------------------------------------------------------ #
|
||
# Boilerplate
|
||
# ------------------------------------------------------------------ #
|
||
def get_observation(self) -> Tuple[int, List]:
|
||
return self.state.current_player_id, []
|
||
def close(self) -> Tuple[Dict, Dict]:
|
||
return self.state.rewards, self.state.game_info
|
||
``` |