278 lines
12 KiB
Python
278 lines
12 KiB
Python
```python
|
||
import re
|
||
import random
|
||
from typing import Any, Dict, List, Optional, Tuple
|
||
|
||
import textarena as ta
|
||
|
||
|
||
class LabyrinthCommandEnv(ta.Env):
|
||
"""
|
||
Deterministic, turn-based two-player tactical maze environment: "Labyrinth Command"
|
||
Two players (Explorer A and B) move through a deterministic maze to reach the Central Beacon.
|
||
"""
|
||
|
||
def __init__(self, max_turns: int = 40, maze_width: int = 7, maze_height: int = 7):
|
||
self.max_turns = max_turns
|
||
self.maze_width = maze_width
|
||
self.maze_height = maze_height
|
||
self.move_pattern = re.compile(r"^\[Move:(North|South|East|West)\]$")
|
||
self.scan_pattern = re.compile(r"^\[Scan\]$")
|
||
self.wait_pattern = re.compile(r"^\[Wait\]$")
|
||
|
||
# -------------------------------------------------------------------------
|
||
# ========== Helper: Extract boxed command ==========
|
||
def _extract_answer_content(self, action: str) -> str:
|
||
"""Extract content within \\boxed{{...}}."""
|
||
match = re.search(r"\\boxed\{\{(.*?)\}\}", action, re.DOTALL)
|
||
if match:
|
||
return match.group(1).strip()
|
||
match = re.search(r"\\boxed\{(.*?)\}", action, re.DOTALL)
|
||
if match:
|
||
return match.group(1).strip()
|
||
return action.strip()
|
||
|
||
# -------------------------------------------------------------------------
|
||
# ========== Maze and visibility helpers ==========
|
||
def _generate_deterministic_maze(self, seed: int) -> List[List[str]]:
|
||
"""Generate deterministic maze using random seeded layout of blocked cells."""
|
||
random.seed(seed)
|
||
maze = [["." for _ in range(self.maze_width)] for _ in range(self.maze_height)]
|
||
num_blocks = (self.maze_width * self.maze_height) // 10 # about 10% blocked
|
||
for _ in range(num_blocks):
|
||
x = random.randint(0, self.maze_width - 1)
|
||
y = random.randint(0, self.maze_height - 1)
|
||
if (x, y) != (0, 0) and (x, y) != (self.maze_width - 1, self.maze_height - 1):
|
||
maze[y][x] = "X"
|
||
return maze
|
||
|
||
def _compute_visible_map(self, maze: List[List[str]], pos: Tuple[int, int]) -> List[List[str]]:
|
||
"""Compute a 3x3 visible map centered on pos."""
|
||
visible = []
|
||
for dy in range(-1, 2):
|
||
row = []
|
||
for dx in range(-1, 2):
|
||
nx, ny = pos[0] + dx, pos[1] + dy
|
||
if 0 <= nx < self.maze_width and 0 <= ny < self.maze_height:
|
||
row.append(maze[ny][nx])
|
||
else:
|
||
row.append("?")
|
||
visible.append(row)
|
||
return visible
|
||
|
||
def _distance(self, a: Tuple[int, int], b: Tuple[int, int]) -> int:
|
||
return abs(a[0] - b[0]) + abs(a[1] - b[1])
|
||
|
||
# -------------------------------------------------------------------------
|
||
# ========== Reset ==========
|
||
def reset(self, num_players: int, seed: Optional[int] = None):
|
||
"""
|
||
Resets the environment to an initial state.
|
||
|
||
Args:
|
||
num_players: must be 2.
|
||
seed: optional deterministic seed.
|
||
"""
|
||
if num_players != 2:
|
||
raise ValueError("Labyrinth Command requires exactly 2 players.")
|
||
|
||
seed = seed if seed is not None else random.randint(1, 999999)
|
||
self.state = ta.TwoPlayerState(num_players=num_players, seed=seed, max_turns=self.max_turns)
|
||
maze = self._generate_deterministic_maze(seed)
|
||
beacon_pos = (self.maze_width // 2, self.maze_height // 2)
|
||
maze[beacon_pos[1]][beacon_pos[0]] = "B"
|
||
|
||
start_A = (0, 0)
|
||
start_B = (self.maze_width - 1, self.maze_height - 1)
|
||
|
||
player_states = {
|
||
"A": {
|
||
"position": start_A,
|
||
"visible_map": self._compute_visible_map(maze, start_A),
|
||
"visited_cells": [list(start_A)],
|
||
"last_action": None,
|
||
},
|
||
"B": {
|
||
"position": start_B,
|
||
"visible_map": self._compute_visible_map(maze, start_B),
|
||
"visited_cells": [list(start_B)],
|
||
"last_action": None,
|
||
},
|
||
}
|
||
|
||
cells_blocked = [[x, y] for y in range(self.maze_height) for x in range(self.maze_width) if maze[y][x] == "X"]
|
||
|
||
game_state = {
|
||
"seed": seed,
|
||
"turn_index": 0,
|
||
"max_turns": self.max_turns,
|
||
"maze_width": self.maze_width,
|
||
"maze_height": self.maze_height,
|
||
"beacon_position": list(beacon_pos),
|
||
"cells_blocked": cells_blocked,
|
||
"player_states": player_states,
|
||
"transcript": [],
|
||
"winner": None,
|
||
"terminated": False,
|
||
}
|
||
|
||
self.state.reset(game_state=game_state, player_prompt_function=self._generate_player_prompt)
|
||
self.state.add_observation(message="Welcome to Labyrinth Command!", observation_type=ta.ObservationType.GAME_MESSAGE)
|
||
self.state.add_observation(message=f"Seed: {seed} ensures deterministic maze generation.", observation_type=ta.ObservationType.GAME_MESSAGE)
|
||
return self.state
|
||
|
||
# -------------------------------------------------------------------------
|
||
# ========== Step ==========
|
||
def step(self, action: str) -> Tuple[bool, ta.Info]:
|
||
"""
|
||
Perform a single environment step for the current player.
|
||
"""
|
||
# log the player action
|
||
self.state.add_observation(action, ta.ObservationType.PLAYER_ACTION, from_id=self.state.current_player_id, to_id=-1)
|
||
player_id = self.state.current_player_id
|
||
player_label = "A" if player_id == 0 else "B"
|
||
opponent_label = "B" if player_label == "A" else "A"
|
||
|
||
if self.state.done:
|
||
self.state.set_invalid_move("Game already finished.")
|
||
return self.state.step()
|
||
|
||
answer = self._extract_answer_content(action)
|
||
gs = self.state.game_state
|
||
player_state = gs["player_states"][player_label]
|
||
opponent_state = gs["player_states"][opponent_label]
|
||
current_pos = tuple(player_state["position"])
|
||
beacon = tuple(gs["beacon_position"])
|
||
|
||
# Validate action syntax
|
||
if not (self.move_pattern.match(answer) or self.scan_pattern.match(answer) or self.wait_pattern.match(answer)):
|
||
self.state.set_invalid_move(reason="Invalid token format.")
|
||
return self.state.step()
|
||
|
||
new_pos = current_pos
|
||
maze_width, maze_height = gs["maze_width"], gs["maze_height"]
|
||
blocked = set(tuple(cell) for cell in gs["cells_blocked"])
|
||
|
||
# execute move if movement
|
||
if answer.startswith("[Move:"):
|
||
direction = answer[len("[Move:"):-1]
|
||
dx, dy = 0, 0
|
||
if direction == "North":
|
||
dy = -1
|
||
elif direction == "South":
|
||
dy = 1
|
||
elif direction == "West":
|
||
dx = -1
|
||
elif direction == "East":
|
||
dx = 1
|
||
nx, ny = current_pos[0] + dx, current_pos[1] + dy
|
||
if not (0 <= nx < maze_width and 0 <= ny < maze_height):
|
||
self.state.set_invalid_move("Move out of bounds")
|
||
return self.state.step()
|
||
if (nx, ny) in blocked:
|
||
self.state.set_invalid_move("Cell blocked")
|
||
return self.state.step()
|
||
new_pos = (nx, ny)
|
||
player_state["position"] = list(new_pos)
|
||
player_state["visited_cells"].append(list(new_pos))
|
||
player_state["visible_map"] = self._compute_visible_map(
|
||
[["X" if [x, y] in gs["cells_blocked"] else "." for x in range(maze_width)] for y in range(maze_height)],
|
||
new_pos,
|
||
)
|
||
elif answer == "[Scan]":
|
||
player_state["visible_map"] = self._compute_visible_map(
|
||
[["X" if [x, y] in gs["cells_blocked"] else "." for x in range(maze_width)] for y in range(maze_height)],
|
||
current_pos,
|
||
)
|
||
elif answer == "[Wait]":
|
||
pass # do nothing
|
||
|
||
player_state["last_action"] = answer
|
||
gs["transcript"].append({"player": player_label, "action": answer})
|
||
gs["turn_index"] += 1
|
||
|
||
# ===== Check terminal conditions =====
|
||
reached_A = tuple(gs["player_states"]["A"]["position"]) == beacon
|
||
reached_B = tuple(gs["player_states"]["B"]["position"]) == beacon
|
||
|
||
if reached_A and reached_B:
|
||
self.state.set_draw(reason="Both players reached the Beacon simultaneously.")
|
||
gs["winner"] = "Draw"
|
||
gs["terminated"] = True
|
||
return self.state.step()
|
||
elif reached_A:
|
||
self.state.set_winner(player_id=0, reason="Explorer A reached the Beacon first.")
|
||
gs["winner"] = "A"
|
||
gs["terminated"] = True
|
||
return self.state.step()
|
||
elif reached_B:
|
||
self.state.set_winner(player_id=1, reason="Explorer B reached the Beacon first.")
|
||
gs["winner"] = "B"
|
||
gs["terminated"] = True
|
||
return self.state.step()
|
||
|
||
# Check turn limit
|
||
if self.state.check_turn_limit():
|
||
posA = tuple(gs["player_states"]["A"]["position"])
|
||
posB = tuple(gs["player_states"]["B"]["position"])
|
||
distA = self._distance(posA, beacon)
|
||
distB = self._distance(posB, beacon)
|
||
if distA < distB:
|
||
self.state.set_winner(player_id=0, reason="Explorer A is closer to Beacon at turn limit.")
|
||
gs["winner"] = "A"
|
||
elif distB < distA:
|
||
self.state.set_winner(player_id=1, reason="Explorer B is closer to Beacon at turn limit.")
|
||
gs["winner"] = "B"
|
||
else:
|
||
self.state.set_draw(reason="Both explorers equally distant at turn limit.")
|
||
gs["winner"] = "Draw"
|
||
gs["terminated"] = True
|
||
|
||
return self.state.step()
|
||
|
||
# -------------------------------------------------------------------------
|
||
# ========== Prompt ==========
|
||
def _generate_player_prompt(self, player_id: int, game_state: Dict[str, Any]) -> str:
|
||
"""Generate player prompt based on Stage 1 design."""
|
||
player_label = "A" if player_id == 0 else "B"
|
||
state = game_state["player_states"][player_label]
|
||
pos = state["position"]
|
||
visible_map = "\n".join([" ".join(row) for row in state["visible_map"]])
|
||
turn_index = game_state["turn_index"]
|
||
max_turns = game_state["max_turns"]
|
||
opponent_label = "B" if player_label == "A" else "A"
|
||
last_opp_action = (
|
||
game_state["player_states"][opponent_label]["last_action"] or "None yet"
|
||
)
|
||
|
||
prompt = f"""
|
||
You are Explorer {player_label} navigating the labyrinth. Your goal is to reach the Central Beacon before your rival.
|
||
Each turn you may issue one command from this action grammar:
|
||
|
||
[Move:North] | [Move:South] | [Move:East] | [Move:West] | [Scan] | [Wait]
|
||
|
||
Remember:
|
||
- Maze bounds are 0 ≤ x < {game_state['maze_width']}, 0 ≤ y < {game_state['maze_height']}.
|
||
- Moving into blocked walls ('X') or out of bounds is invalid.
|
||
- The beacon lies at the labyrinth’s center at {game_state['beacon_position']}.
|
||
- You must wrap your command inside \\boxed{{}}.
|
||
|
||
Current turn: {turn_index}/{max_turns}
|
||
Your current position: {pos}
|
||
Your visible 3×3 map:
|
||
{visible_map}
|
||
|
||
Your opponent’s last known action: {last_opp_action}
|
||
|
||
Example valid response:
|
||
I want to go north toward the Beacon.
|
||
\\boxed{{[Move:North]}}
|
||
|
||
Example invalid response:
|
||
Let's go northeast! ← invalid direction keyword
|
||
|
||
Now choose your next command carefully.
|
||
Put your final answer within \\boxed{{}} at the end of your response.
|
||
""".strip()
|
||
return prompt
|
||
``` |