278 lines
12 KiB
Python
278 lines
12 KiB
Python
|
|
```python
|
|||
|
|
import re
|
|||
|
|
import random
|
|||
|
|
from typing import Any, Dict, List, Optional, Tuple
|
|||
|
|
|
|||
|
|
import textarena as ta
|
|||
|
|
|
|||
|
|
|
|||
|
|
class LabyrinthCommandEnv(ta.Env):
|
|||
|
|
"""
|
|||
|
|
Deterministic, turn-based two-player tactical maze environment: "Labyrinth Command"
|
|||
|
|
Two players (Explorer A and B) move through a deterministic maze to reach the Central Beacon.
|
|||
|
|
"""
|
|||
|
|
|
|||
|
|
def __init__(self, max_turns: int = 40, maze_width: int = 7, maze_height: int = 7):
|
|||
|
|
self.max_turns = max_turns
|
|||
|
|
self.maze_width = maze_width
|
|||
|
|
self.maze_height = maze_height
|
|||
|
|
self.move_pattern = re.compile(r"^\[Move:(North|South|East|West)\]$")
|
|||
|
|
self.scan_pattern = re.compile(r"^\[Scan\]$")
|
|||
|
|
self.wait_pattern = re.compile(r"^\[Wait\]$")
|
|||
|
|
|
|||
|
|
# -------------------------------------------------------------------------
|
|||
|
|
# ========== Helper: Extract boxed command ==========
|
|||
|
|
def _extract_answer_content(self, action: str) -> str:
|
|||
|
|
"""Extract content within \\boxed{{...}}."""
|
|||
|
|
match = re.search(r"\\boxed\{\{(.*?)\}\}", action, re.DOTALL)
|
|||
|
|
if match:
|
|||
|
|
return match.group(1).strip()
|
|||
|
|
match = re.search(r"\\boxed\{(.*?)\}", action, re.DOTALL)
|
|||
|
|
if match:
|
|||
|
|
return match.group(1).strip()
|
|||
|
|
return action.strip()
|
|||
|
|
|
|||
|
|
# -------------------------------------------------------------------------
|
|||
|
|
# ========== Maze and visibility helpers ==========
|
|||
|
|
def _generate_deterministic_maze(self, seed: int) -> List[List[str]]:
|
|||
|
|
"""Generate deterministic maze using random seeded layout of blocked cells."""
|
|||
|
|
random.seed(seed)
|
|||
|
|
maze = [["." for _ in range(self.maze_width)] for _ in range(self.maze_height)]
|
|||
|
|
num_blocks = (self.maze_width * self.maze_height) // 10 # about 10% blocked
|
|||
|
|
for _ in range(num_blocks):
|
|||
|
|
x = random.randint(0, self.maze_width - 1)
|
|||
|
|
y = random.randint(0, self.maze_height - 1)
|
|||
|
|
if (x, y) != (0, 0) and (x, y) != (self.maze_width - 1, self.maze_height - 1):
|
|||
|
|
maze[y][x] = "X"
|
|||
|
|
return maze
|
|||
|
|
|
|||
|
|
def _compute_visible_map(self, maze: List[List[str]], pos: Tuple[int, int]) -> List[List[str]]:
|
|||
|
|
"""Compute a 3x3 visible map centered on pos."""
|
|||
|
|
visible = []
|
|||
|
|
for dy in range(-1, 2):
|
|||
|
|
row = []
|
|||
|
|
for dx in range(-1, 2):
|
|||
|
|
nx, ny = pos[0] + dx, pos[1] + dy
|
|||
|
|
if 0 <= nx < self.maze_width and 0 <= ny < self.maze_height:
|
|||
|
|
row.append(maze[ny][nx])
|
|||
|
|
else:
|
|||
|
|
row.append("?")
|
|||
|
|
visible.append(row)
|
|||
|
|
return visible
|
|||
|
|
|
|||
|
|
def _distance(self, a: Tuple[int, int], b: Tuple[int, int]) -> int:
|
|||
|
|
return abs(a[0] - b[0]) + abs(a[1] - b[1])
|
|||
|
|
|
|||
|
|
# -------------------------------------------------------------------------
|
|||
|
|
# ========== Reset ==========
|
|||
|
|
def reset(self, num_players: int, seed: Optional[int] = None):
|
|||
|
|
"""
|
|||
|
|
Resets the environment to an initial state.
|
|||
|
|
|
|||
|
|
Args:
|
|||
|
|
num_players: must be 2.
|
|||
|
|
seed: optional deterministic seed.
|
|||
|
|
"""
|
|||
|
|
if num_players != 2:
|
|||
|
|
raise ValueError("Labyrinth Command requires exactly 2 players.")
|
|||
|
|
|
|||
|
|
seed = seed if seed is not None else random.randint(1, 999999)
|
|||
|
|
self.state = ta.TwoPlayerState(num_players=num_players, seed=seed, max_turns=self.max_turns)
|
|||
|
|
maze = self._generate_deterministic_maze(seed)
|
|||
|
|
beacon_pos = (self.maze_width // 2, self.maze_height // 2)
|
|||
|
|
maze[beacon_pos[1]][beacon_pos[0]] = "B"
|
|||
|
|
|
|||
|
|
start_A = (0, 0)
|
|||
|
|
start_B = (self.maze_width - 1, self.maze_height - 1)
|
|||
|
|
|
|||
|
|
player_states = {
|
|||
|
|
"A": {
|
|||
|
|
"position": start_A,
|
|||
|
|
"visible_map": self._compute_visible_map(maze, start_A),
|
|||
|
|
"visited_cells": [list(start_A)],
|
|||
|
|
"last_action": None,
|
|||
|
|
},
|
|||
|
|
"B": {
|
|||
|
|
"position": start_B,
|
|||
|
|
"visible_map": self._compute_visible_map(maze, start_B),
|
|||
|
|
"visited_cells": [list(start_B)],
|
|||
|
|
"last_action": None,
|
|||
|
|
},
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
cells_blocked = [[x, y] for y in range(self.maze_height) for x in range(self.maze_width) if maze[y][x] == "X"]
|
|||
|
|
|
|||
|
|
game_state = {
|
|||
|
|
"seed": seed,
|
|||
|
|
"turn_index": 0,
|
|||
|
|
"max_turns": self.max_turns,
|
|||
|
|
"maze_width": self.maze_width,
|
|||
|
|
"maze_height": self.maze_height,
|
|||
|
|
"beacon_position": list(beacon_pos),
|
|||
|
|
"cells_blocked": cells_blocked,
|
|||
|
|
"player_states": player_states,
|
|||
|
|
"transcript": [],
|
|||
|
|
"winner": None,
|
|||
|
|
"terminated": False,
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
self.state.reset(game_state=game_state, player_prompt_function=self._generate_player_prompt)
|
|||
|
|
self.state.add_observation(message="Welcome to Labyrinth Command!", observation_type=ta.ObservationType.GAME_MESSAGE)
|
|||
|
|
self.state.add_observation(message=f"Seed: {seed} ensures deterministic maze generation.", observation_type=ta.ObservationType.GAME_MESSAGE)
|
|||
|
|
return self.state
|
|||
|
|
|
|||
|
|
# -------------------------------------------------------------------------
|
|||
|
|
# ========== Step ==========
|
|||
|
|
def step(self, action: str) -> Tuple[bool, ta.Info]:
|
|||
|
|
"""
|
|||
|
|
Perform a single environment step for the current player.
|
|||
|
|
"""
|
|||
|
|
# log the player action
|
|||
|
|
self.state.add_observation(action, ta.ObservationType.PLAYER_ACTION, from_id=self.state.current_player_id, to_id=-1)
|
|||
|
|
player_id = self.state.current_player_id
|
|||
|
|
player_label = "A" if player_id == 0 else "B"
|
|||
|
|
opponent_label = "B" if player_label == "A" else "A"
|
|||
|
|
|
|||
|
|
if self.state.done:
|
|||
|
|
self.state.set_invalid_move("Game already finished.")
|
|||
|
|
return self.state.step()
|
|||
|
|
|
|||
|
|
answer = self._extract_answer_content(action)
|
|||
|
|
gs = self.state.game_state
|
|||
|
|
player_state = gs["player_states"][player_label]
|
|||
|
|
opponent_state = gs["player_states"][opponent_label]
|
|||
|
|
current_pos = tuple(player_state["position"])
|
|||
|
|
beacon = tuple(gs["beacon_position"])
|
|||
|
|
|
|||
|
|
# Validate action syntax
|
|||
|
|
if not (self.move_pattern.match(answer) or self.scan_pattern.match(answer) or self.wait_pattern.match(answer)):
|
|||
|
|
self.state.set_invalid_move(reason="Invalid token format.")
|
|||
|
|
return self.state.step()
|
|||
|
|
|
|||
|
|
new_pos = current_pos
|
|||
|
|
maze_width, maze_height = gs["maze_width"], gs["maze_height"]
|
|||
|
|
blocked = set(tuple(cell) for cell in gs["cells_blocked"])
|
|||
|
|
|
|||
|
|
# execute move if movement
|
|||
|
|
if answer.startswith("[Move:"):
|
|||
|
|
direction = answer[len("[Move:"):-1]
|
|||
|
|
dx, dy = 0, 0
|
|||
|
|
if direction == "North":
|
|||
|
|
dy = -1
|
|||
|
|
elif direction == "South":
|
|||
|
|
dy = 1
|
|||
|
|
elif direction == "West":
|
|||
|
|
dx = -1
|
|||
|
|
elif direction == "East":
|
|||
|
|
dx = 1
|
|||
|
|
nx, ny = current_pos[0] + dx, current_pos[1] + dy
|
|||
|
|
if not (0 <= nx < maze_width and 0 <= ny < maze_height):
|
|||
|
|
self.state.set_invalid_move("Move out of bounds")
|
|||
|
|
return self.state.step()
|
|||
|
|
if (nx, ny) in blocked:
|
|||
|
|
self.state.set_invalid_move("Cell blocked")
|
|||
|
|
return self.state.step()
|
|||
|
|
new_pos = (nx, ny)
|
|||
|
|
player_state["position"] = list(new_pos)
|
|||
|
|
player_state["visited_cells"].append(list(new_pos))
|
|||
|
|
player_state["visible_map"] = self._compute_visible_map(
|
|||
|
|
[["X" if [x, y] in gs["cells_blocked"] else "." for x in range(maze_width)] for y in range(maze_height)],
|
|||
|
|
new_pos,
|
|||
|
|
)
|
|||
|
|
elif answer == "[Scan]":
|
|||
|
|
player_state["visible_map"] = self._compute_visible_map(
|
|||
|
|
[["X" if [x, y] in gs["cells_blocked"] else "." for x in range(maze_width)] for y in range(maze_height)],
|
|||
|
|
current_pos,
|
|||
|
|
)
|
|||
|
|
elif answer == "[Wait]":
|
|||
|
|
pass # do nothing
|
|||
|
|
|
|||
|
|
player_state["last_action"] = answer
|
|||
|
|
gs["transcript"].append({"player": player_label, "action": answer})
|
|||
|
|
gs["turn_index"] += 1
|
|||
|
|
|
|||
|
|
# ===== Check terminal conditions =====
|
|||
|
|
reached_A = tuple(gs["player_states"]["A"]["position"]) == beacon
|
|||
|
|
reached_B = tuple(gs["player_states"]["B"]["position"]) == beacon
|
|||
|
|
|
|||
|
|
if reached_A and reached_B:
|
|||
|
|
self.state.set_draw(reason="Both players reached the Beacon simultaneously.")
|
|||
|
|
gs["winner"] = "Draw"
|
|||
|
|
gs["terminated"] = True
|
|||
|
|
return self.state.step()
|
|||
|
|
elif reached_A:
|
|||
|
|
self.state.set_winner(player_id=0, reason="Explorer A reached the Beacon first.")
|
|||
|
|
gs["winner"] = "A"
|
|||
|
|
gs["terminated"] = True
|
|||
|
|
return self.state.step()
|
|||
|
|
elif reached_B:
|
|||
|
|
self.state.set_winner(player_id=1, reason="Explorer B reached the Beacon first.")
|
|||
|
|
gs["winner"] = "B"
|
|||
|
|
gs["terminated"] = True
|
|||
|
|
return self.state.step()
|
|||
|
|
|
|||
|
|
# Check turn limit
|
|||
|
|
if self.state.check_turn_limit():
|
|||
|
|
posA = tuple(gs["player_states"]["A"]["position"])
|
|||
|
|
posB = tuple(gs["player_states"]["B"]["position"])
|
|||
|
|
distA = self._distance(posA, beacon)
|
|||
|
|
distB = self._distance(posB, beacon)
|
|||
|
|
if distA < distB:
|
|||
|
|
self.state.set_winner(player_id=0, reason="Explorer A is closer to Beacon at turn limit.")
|
|||
|
|
gs["winner"] = "A"
|
|||
|
|
elif distB < distA:
|
|||
|
|
self.state.set_winner(player_id=1, reason="Explorer B is closer to Beacon at turn limit.")
|
|||
|
|
gs["winner"] = "B"
|
|||
|
|
else:
|
|||
|
|
self.state.set_draw(reason="Both explorers equally distant at turn limit.")
|
|||
|
|
gs["winner"] = "Draw"
|
|||
|
|
gs["terminated"] = True
|
|||
|
|
|
|||
|
|
return self.state.step()
|
|||
|
|
|
|||
|
|
# -------------------------------------------------------------------------
|
|||
|
|
# ========== Prompt ==========
|
|||
|
|
def _generate_player_prompt(self, player_id: int, game_state: Dict[str, Any]) -> str:
|
|||
|
|
"""Generate player prompt based on Stage 1 design."""
|
|||
|
|
player_label = "A" if player_id == 0 else "B"
|
|||
|
|
state = game_state["player_states"][player_label]
|
|||
|
|
pos = state["position"]
|
|||
|
|
visible_map = "\n".join([" ".join(row) for row in state["visible_map"]])
|
|||
|
|
turn_index = game_state["turn_index"]
|
|||
|
|
max_turns = game_state["max_turns"]
|
|||
|
|
opponent_label = "B" if player_label == "A" else "A"
|
|||
|
|
last_opp_action = (
|
|||
|
|
game_state["player_states"][opponent_label]["last_action"] or "None yet"
|
|||
|
|
)
|
|||
|
|
|
|||
|
|
prompt = f"""
|
|||
|
|
You are Explorer {player_label} navigating the labyrinth. Your goal is to reach the Central Beacon before your rival.
|
|||
|
|
Each turn you may issue one command from this action grammar:
|
|||
|
|
|
|||
|
|
[Move:North] | [Move:South] | [Move:East] | [Move:West] | [Scan] | [Wait]
|
|||
|
|
|
|||
|
|
Remember:
|
|||
|
|
- Maze bounds are 0 ≤ x < {game_state['maze_width']}, 0 ≤ y < {game_state['maze_height']}.
|
|||
|
|
- Moving into blocked walls ('X') or out of bounds is invalid.
|
|||
|
|
- The beacon lies at the labyrinth’s center at {game_state['beacon_position']}.
|
|||
|
|
- You must wrap your command inside \\boxed{{}}.
|
|||
|
|
|
|||
|
|
Current turn: {turn_index}/{max_turns}
|
|||
|
|
Your current position: {pos}
|
|||
|
|
Your visible 3×3 map:
|
|||
|
|
{visible_map}
|
|||
|
|
|
|||
|
|
Your opponent’s last known action: {last_opp_action}
|
|||
|
|
|
|||
|
|
Example valid response:
|
|||
|
|
I want to go north toward the Beacon.
|
|||
|
|
\\boxed{{[Move:North]}}
|
|||
|
|
|
|||
|
|
Example invalid response:
|
|||
|
|
Let's go northeast! ← invalid direction keyword
|
|||
|
|
|
|||
|
|
Now choose your next command carefully.
|
|||
|
|
Put your final answer within \\boxed{{}} at the end of your response.
|
|||
|
|
""".strip()
|
|||
|
|
return prompt
|
|||
|
|
```
|