251 lines
11 KiB
Python
251 lines
11 KiB
Python
|
|
import re
|
|||
|
|
import random
|
|||
|
|
from typing import Any, Dict, List, Optional, Tuple
|
|||
|
|
import textarena as ta
|
|||
|
|
|
|||
|
|
|
|||
|
|
class MazeConquerorsEnv(ta.Env):
|
|||
|
|
"""Turn-based two-player deterministic environment for Maze Conquerors."""
|
|||
|
|
|
|||
|
|
def __init__(self, maze_size: int = 7, turn_limit: int = 30):
|
|||
|
|
self.maze_size = maze_size
|
|||
|
|
self.turn_limit = turn_limit
|
|||
|
|
self.state: Optional[ta.TwoPlayerState] = None
|
|||
|
|
# Precompile regexes for action grammar
|
|||
|
|
self.move_pattern = re.compile(r'^\[Move:(up|down|left|right)\]$')
|
|||
|
|
self.scan_pattern = re.compile(r'^\[Scan:[1-3]\]$')
|
|||
|
|
self.claim_pattern = re.compile(r'^\[Claim\]$')
|
|||
|
|
self.wait_pattern = re.compile(r'^\[Wait\]$')
|
|||
|
|
|
|||
|
|
# ------------------------------------------------------------------ #
|
|||
|
|
# Helper: extract boxed content
|
|||
|
|
# ------------------------------------------------------------------ #
|
|||
|
|
def _extract_answer_content(self, action: str) -> str:
|
|||
|
|
match = re.search(r'\\boxed\{\{([^}]*)\}\}', action)
|
|||
|
|
if not match:
|
|||
|
|
match = re.search(r'\\boxed\{([^}]*)\}', action)
|
|||
|
|
return match.group(1).strip() if match else action.strip()
|
|||
|
|
|
|||
|
|
# ------------------------------------------------------------------ #
|
|||
|
|
# Maze generation
|
|||
|
|
# ------------------------------------------------------------------ #
|
|||
|
|
def _generate_maze(self, seed: int) -> List[List[str]]:
|
|||
|
|
random.seed(seed)
|
|||
|
|
size = self.maze_size
|
|||
|
|
grid = [["." for _ in range(size)] for _ in range(size)]
|
|||
|
|
# place walls and runes
|
|||
|
|
for i in range(size):
|
|||
|
|
for j in range(size):
|
|||
|
|
roll = random.random()
|
|||
|
|
if roll < 0.15:
|
|||
|
|
grid[i][j] = "#"
|
|||
|
|
elif roll < 0.25:
|
|||
|
|
grid[i][j] = "R"
|
|||
|
|
# mark start and goal positions
|
|||
|
|
grid[0][0] = "S"
|
|||
|
|
grid[size - 1][size - 1] = "G"
|
|||
|
|
return grid
|
|||
|
|
|
|||
|
|
def _initial_visible_tiles(self, pos: Tuple[int, int]) -> List[List[int]]:
|
|||
|
|
visible = []
|
|||
|
|
for dx in (-1, 0, 1):
|
|||
|
|
for dy in (-1, 0, 1):
|
|||
|
|
x, y = pos[0] + dx, pos[1] + dy
|
|||
|
|
if 0 <= x < self.maze_size and 0 <= y < self.maze_size:
|
|||
|
|
visible.append([x, y])
|
|||
|
|
return visible
|
|||
|
|
|
|||
|
|
# ------------------------------------------------------------------ #
|
|||
|
|
# Reset
|
|||
|
|
# ------------------------------------------------------------------ #
|
|||
|
|
def reset(self, num_players: int, seed: Optional[int] = None):
|
|||
|
|
"""
|
|||
|
|
Resets the environment to an initial state.
|
|||
|
|
"""
|
|||
|
|
if num_players != 2:
|
|||
|
|
raise ValueError("Maze Conquerors requires exactly two players.")
|
|||
|
|
|
|||
|
|
self.state = ta.TwoPlayerState(num_players=num_players, seed=seed, max_turns=self.turn_limit)
|
|||
|
|
if seed is None:
|
|||
|
|
seed = random.randint(0, 9999999)
|
|||
|
|
maze_layout = self._generate_maze(seed)
|
|||
|
|
|
|||
|
|
game_state = {
|
|||
|
|
"global_turn": 0,
|
|||
|
|
"turn_limit": self.turn_limit,
|
|||
|
|
"maze_dimensions": [self.maze_size, self.maze_size],
|
|||
|
|
"seed": seed,
|
|||
|
|
"maze_layout": maze_layout,
|
|||
|
|
"players": {
|
|||
|
|
"ExplorerA": {
|
|||
|
|
"position": [0, 0],
|
|||
|
|
"runes_collected": 0,
|
|||
|
|
"moves_remaining": 5,
|
|||
|
|
"visible_tiles": self._initial_visible_tiles((0, 0)),
|
|||
|
|
"last_action": None,
|
|||
|
|
"is_trapped": False,
|
|||
|
|
},
|
|||
|
|
"ExplorerB": {
|
|||
|
|
"position": [self.maze_size - 1, self.maze_size - 1],
|
|||
|
|
"runes_collected": 0,
|
|||
|
|
"moves_remaining": 5,
|
|||
|
|
"visible_tiles": self._initial_visible_tiles(
|
|||
|
|
(self.maze_size - 1, self.maze_size - 1)
|
|||
|
|
),
|
|||
|
|
"last_action": None,
|
|||
|
|
"is_trapped": False,
|
|||
|
|
},
|
|||
|
|
},
|
|||
|
|
"observation_log": [],
|
|||
|
|
"game_status": "active",
|
|||
|
|
"winner": None,
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
roles = {0: "ExplorerA", 1: "ExplorerB"}
|
|||
|
|
self.state.reset(game_state=game_state, player_prompt_function=self._generate_player_prompt, role_mapping=roles)
|
|||
|
|
self.state.add_observation("Maze Conquerors initialized!", ta.ObservationType.GAME_MESSAGE)
|
|||
|
|
return self.state
|
|||
|
|
|
|||
|
|
# ------------------------------------------------------------------ #
|
|||
|
|
# Step
|
|||
|
|
# ------------------------------------------------------------------ #
|
|||
|
|
def step(self, action: str) -> Tuple[bool, ta.Info]:
|
|||
|
|
"""
|
|||
|
|
Perform a single environment step for the current player.
|
|||
|
|
"""
|
|||
|
|
player_id = self.state.current_player_id
|
|||
|
|
role = "ExplorerA" if player_id == 0 else "ExplorerB"
|
|||
|
|
gs = self.state.game_state
|
|||
|
|
player = gs["players"][role]
|
|||
|
|
maze = gs["maze_layout"]
|
|||
|
|
|
|||
|
|
# Record raw action
|
|||
|
|
self.state.add_observation(action, ta.ObservationType.PLAYER_ACTION, from_id=player_id)
|
|||
|
|
content = self._extract_answer_content(action)
|
|||
|
|
player["last_action"] = content
|
|||
|
|
|
|||
|
|
# Validate action syntax
|
|||
|
|
if not (
|
|||
|
|
self.move_pattern.match(content)
|
|||
|
|
or self.scan_pattern.match(content)
|
|||
|
|
or self.claim_pattern.match(content)
|
|||
|
|
or self.wait_pattern.match(content)
|
|||
|
|
):
|
|||
|
|
self.state.set_invalid_move(reason="Invalid format: action not recognized.")
|
|||
|
|
return self.state.step()
|
|||
|
|
|
|||
|
|
action_result = None
|
|||
|
|
if content.startswith("[Move:"):
|
|||
|
|
direction = content[6:-1]
|
|||
|
|
dx, dy = {"up": (-1, 0), "down": (1, 0), "left": (0, -1), "right": (0, 1)}[direction]
|
|||
|
|
newx, newy = player["position"][0] + dx, player["position"][1] + dy
|
|||
|
|
if not (0 <= newx < self.maze_size and 0 <= newy < self.maze_size):
|
|||
|
|
self.state.set_invalid_move("Invalid move: outside maze bounds.")
|
|||
|
|
elif maze[newx][newy] == "#":
|
|||
|
|
self.state.set_invalid_move("Invalid move: path blocked.")
|
|||
|
|
else:
|
|||
|
|
player["position"] = [newx, newy]
|
|||
|
|
player["visible_tiles"] = self._initial_visible_tiles((newx, newy))
|
|||
|
|
action_result = "moved successfully"
|
|||
|
|
elif content.startswith("[Scan:"):
|
|||
|
|
radius = int(content[6:-1])
|
|||
|
|
new_visible = []
|
|||
|
|
px, py = player["position"]
|
|||
|
|
for i in range(px - radius, px + radius + 1):
|
|||
|
|
for j in range(py - radius, py + radius + 1):
|
|||
|
|
if 0 <= i < self.maze_size and 0 <= j < self.maze_size:
|
|||
|
|
new_visible.append([i, j])
|
|||
|
|
player["visible_tiles"] = list({tuple(v) for v in player["visible_tiles"] + new_visible})
|
|||
|
|
action_result = "revealed tiles"
|
|||
|
|
elif content == "[Claim]":
|
|||
|
|
px, py = player["position"]
|
|||
|
|
if maze[px][py] == "R":
|
|||
|
|
player["runes_collected"] += 1
|
|||
|
|
maze[px][py] = "."
|
|||
|
|
action_result = "claimed rune"
|
|||
|
|
else:
|
|||
|
|
self.state.set_invalid_move("Invalid claim: no rune present.")
|
|||
|
|
elif content == "[Wait]":
|
|||
|
|
action_result = "waited"
|
|||
|
|
|
|||
|
|
gs["observation_log"].append(
|
|||
|
|
{"turn": gs["global_turn"], "player": role, "action": content, "result": action_result or "invalid"}
|
|||
|
|
)
|
|||
|
|
|
|||
|
|
# Advance global turn
|
|||
|
|
gs["global_turn"] += 1
|
|||
|
|
|
|||
|
|
# Terminal condition check
|
|||
|
|
done = self._check_terminal_conditions()
|
|||
|
|
if done:
|
|||
|
|
return True, {}
|
|||
|
|
|
|||
|
|
return self.state.step()
|
|||
|
|
|
|||
|
|
# ------------------------------------------------------------------ #
|
|||
|
|
# Terminal conditions
|
|||
|
|
# ------------------------------------------------------------------ #
|
|||
|
|
def _check_terminal_conditions(self) -> bool:
|
|||
|
|
gs = self.state.game_state
|
|||
|
|
turn = gs["global_turn"]
|
|||
|
|
if turn >= gs["turn_limit"]:
|
|||
|
|
self._determine_winner(reason="Turn limit reached.")
|
|||
|
|
return True
|
|||
|
|
# Check if all runes collected
|
|||
|
|
if not any("R" in row for row in gs["maze_layout"]):
|
|||
|
|
self._determine_winner(reason="All runes collected.")
|
|||
|
|
return True
|
|||
|
|
return False
|
|||
|
|
|
|||
|
|
# ------------------------------------------------------------------ #
|
|||
|
|
# Winner determination
|
|||
|
|
# ------------------------------------------------------------------ #
|
|||
|
|
def _determine_winner(self, reason: str):
|
|||
|
|
gs = self.state.game_state
|
|||
|
|
a, b = gs["players"]["ExplorerA"], gs["players"]["ExplorerB"]
|
|||
|
|
if a["runes_collected"] > b["runes_collected"]:
|
|||
|
|
self.state.set_winner(player_id=0, reason=reason)
|
|||
|
|
elif a["runes_collected"] < b["runes_collected"]:
|
|||
|
|
self.state.set_winner(player_id=1, reason=reason)
|
|||
|
|
else:
|
|||
|
|
core = (self.maze_size // 2, self.maze_size // 2)
|
|||
|
|
dist_a = abs(a["position"][0] - core[0]) + abs(a["position"][1] - core[1])
|
|||
|
|
dist_b = abs(b["position"][0] - core[0]) + abs(b["position"][1] - core[1])
|
|||
|
|
if dist_a < dist_b:
|
|||
|
|
self.state.set_winner(player_id=0, reason=reason)
|
|||
|
|
elif dist_b < dist_a:
|
|||
|
|
self.state.set_winner(player_id=1, reason=reason)
|
|||
|
|
else:
|
|||
|
|
self.state.set_draw(reason=reason)
|
|||
|
|
|
|||
|
|
# ------------------------------------------------------------------ #
|
|||
|
|
# Prompt
|
|||
|
|
# ------------------------------------------------------------------ #
|
|||
|
|
def _generate_player_prompt(self, player_id: int, game_state: Dict[str, Any]) -> str:
|
|||
|
|
role = "ExplorerA" if player_id == 0 else "ExplorerB"
|
|||
|
|
player = game_state["players"][role]
|
|||
|
|
status = (
|
|||
|
|
f"You are {role}, traversing an ancient shifting labyrinth to gather mystical runes.\n"
|
|||
|
|
f"Turn {game_state['global_turn']} of {game_state['turn_limit']}.\n"
|
|||
|
|
f"You have collected {player['runes_collected']} runes.\n"
|
|||
|
|
)
|
|||
|
|
surroundings = "Your visible tiles: " + str(player["visible_tiles"]) + "\n"
|
|||
|
|
grammar = (
|
|||
|
|
"Allowed actions:\n"
|
|||
|
|
"[Move:up], [Move:down], [Move:left], [Move:right]\n"
|
|||
|
|
"[Scan:1–3], [Claim], [Wait]\n"
|
|||
|
|
"Put your final answer within \\boxed{{}} at the end of your response.\n"
|
|||
|
|
"Example valid response:\n"
|
|||
|
|
"I will explore the passage ahead.\n"
|
|||
|
|
"\\boxed{{[Move:right]}}\n"
|
|||
|
|
)
|
|||
|
|
return status + surroundings + grammar
|
|||
|
|
|
|||
|
|
# ------------------------------------------------------------------ #
|
|||
|
|
# Boilerplate
|
|||
|
|
# ------------------------------------------------------------------ #
|
|||
|
|
def get_observation(self) -> Tuple[int, List]:
|
|||
|
|
return self.state.current_player_id, []
|
|||
|
|
def close(self) -> Tuple[Dict, Dict]:
|
|||
|
|
return self.state.rewards, self.state.game_info
|