Обучението на змийски алгоритъм е един от най-основните проекти, свързани с обучението с подсилване. Това е нещо като „Здравей свят“ на обучението за укрепване. Наскоро открих поредицата в youtube от инженер на python, където той изгради алгоритъм за обучение на подсилване от нулата с помощта на pytorch, но имах чувството, че той не обясни концепциите или модела много добре. Пиша тази статия, за да обясня основите на алгоритъма и може би част от математиката зад него. Видео в Youtube от инженера на Python: https://youtu.be/PJl4iabBEz0. Започваме с просто кодиране на игра със змия, която може да бъде направена с помощта на pygame

import pygame
import random
from enum import Enum
from collections import namedtuple
import time
pygame.init()
font = pygame.font.Font('arial.ttf', 25)
#font = pygame.font.SysFont('arial', 25)
class Direction(Enum):
RIGHT = 1
LEFT = 2
UP = 3
DOWN = 4
Point = namedtuple('Point', 'x, y')
# rgb colors
WHITE = (255, 255, 255)
RED = (200,0,0)
BLUE1 = (0, 0, 255)
BLUE2 = (0, 100, 255)
BLACK = (0,0,0)
BLOCK_SIZE = 20
SPEED = 20
class Snake:
def __init__(self, w=640, h=480):
self.w = w
self.h = h
# init display
self.display = pygame.display.set_mode((self.w, self.h))
pygame.display.set_caption('Snake')
self.clock = pygame.time.Clock()
# init game state
self.direction = Direction.RIGHT
self.head = Point(self.w/2, self.h/2)
self.snake = [self.head,
Point(self.head.x-BLOCK_SIZE, self.head.y),
Point(self.head.x-(2*BLOCK_SIZE), self.head.y)]
self.score = 0
self.food = None
self._place_food()
def _place_food(self):
x = random.randint(0, (self.w-BLOCK_SIZE )//BLOCK_SIZE )*BLOCK_SIZE
y = random.randint(0, (self.h-BLOCK_SIZE )//BLOCK_SIZE )*BLOCK_SIZE
self.food = Point(x, y)
if self.food in self.snake:
self._place_food()
def play_step(self):
# 1. collect user input
for event in pygame.event.get():
if event.type == pygame.QUIT:
pygame.quit()
quit()
if event.type == pygame.KEYDOWN:
if event.key == pygame.K_LEFT:
if self.direction == Direction.RIGHT:
self.direction = Direction.RIGHT
else:
self.direction = Direction.LEFT
time.sleep(0.03)
elif event.key == pygame.K_RIGHT:
if self.direction == Direction.LEFT:
self.direction = Direction.LEFT
else:
self.direction = Direction.RIGHT
time.sleep(0.03)
elif event.key == pygame.K_UP:
if self.direction == Direction.DOWN:
self.direction = Direction.DOWN
else:
self.direction = Direction.UP
time.sleep(0.03)
elif event.key == pygame.K_DOWN:
if self.direction == Direction.UP:
self.direction = Direction.UP
else:
self.direction = Direction.DOWN
time.sleep(0.03)
# 2. move
self._move(self.direction) # update the head
self.snake.insert(0, self.head)
# 3. check if game over
game_over = False
if self._is_collision():
game_over = True
return game_over, self.score
# 4. place new food or just move
if self.head == self.food:
self.score += 1
self._place_food()
else:
self.snake.pop()
# 5. update ui and clock
self._update_ui()
self.clock.tick(SPEED)
# 6. return game over and score
return game_over, self.score
def _is_collision(self):
# hits boundary
if self.head.x > self.w - BLOCK_SIZE or self.head.x < 0 or self.head.y > self.h - BLOCK_SIZE or self.head.y < 0:
return True
# hits itself
if self.head in self.snake[1:]:
return True
return False
def _update_ui(self):
self.display.fill(BLACK)
for pt in self.snake:
pygame.draw.rect(self.display, BLUE1, pygame.Rect(pt.x, pt.y, BLOCK_SIZE, BLOCK_SIZE))
pygame.draw.rect(self.display, BLUE2, pygame.Rect(pt.x+4, pt.y+4, 12, 12))
pygame.draw.rect(self.display, RED, pygame.Rect(self.food.x, self.food.y, BLOCK_SIZE, BLOCK_SIZE))
text = font.render("Score: " + str(self.score), True, WHITE)
self.display.blit(text, [0, 0])
pygame.display.flip()
def _move(self, direction):
x = self.head.x
y = self.head.y
if direction == Direction.RIGHT:
x += BLOCK_SIZE
elif direction == Direction.LEFT:
x -= BLOCK_SIZE
elif direction == Direction.DOWN:
y += BLOCK_SIZE
elif direction == Direction.UP:
y -= BLOCK_SIZE
self.head = Point(x, y)
if __name__ == '__main__':
game = Snake()
# game loop
while True:
game_over, score = game.play_step()
if game_over == True:
break
print('Final Score', score)
pygame.quit()

Това е кодът на играта. Нека преминем през него

class Direction(Enum):
RIGHT = 1
LEFT = 2
UP = 3
DOWN = 4

Първо създаваме клас Direction, за да опростим процеса на промяна на посоката на змията с четирите основни посоки. Останалата част от играта е доста разбираема със система за откриване на сблъсък.

def _is_collision(self):
# hits boundary
if self.head.x > self.w - BLOCK_SIZE or self.head.x < 0 or self.head.y > self.h - BLOCK_SIZE or self.head.y < 0:
return True
# hits itself
if self.head in self.snake[1:]:
return True
return False

За да проверите дали змията излиза от картата или да видите дали главата на змията се удря в точка, която не е главата. Той също така прилага много проста система за придвижване

def _move(self, direction):
x = self.head.x
y = self.head.y
if direction == Direction.RIGHT:
x += BLOCK_SIZE
elif direction == Direction.LEFT:
x -= BLOCK_SIZE
elif direction == Direction.DOWN:
y += BLOCK_SIZE
elif direction == Direction.UP:
y -= BLOCK_SIZE
self.head = Point(x, y)

който просто актуализира главата и тялото на змията и измества тялото наляво, надясно, нагоре или надолу въз основа на текущата посока на змията. Промяната на посоката се улеснява от проста система с ключове, реализирана чрез вградената система за въвеждане на pygames

for event in pygame.event.get():
if event.type == pygame.QUIT:
pygame.quit()
quit()
if event.type == pygame.KEYDOWN:
if event.key == pygame.K_LEFT:
if self.direction == Direction.RIGHT:
self.direction = Direction.RIGHT
else:
self.direction = Direction.LEFT
time.sleep(0.03)
elif event.key == pygame.K_RIGHT:
if self.direction == Direction.LEFT:
self.direction = Direction.LEFT
else:
self.direction = Direction.RIGHT
time.sleep(0.03)
elif event.key == pygame.K_UP:
if self.direction == Direction.DOWN:
self.direction = Direction.DOWN
else:
self.direction = Direction.UP
time.sleep(0.03)
elif event.key == pygame.K_DOWN:
if self.direction == Direction.UP:
self.direction = Direction.UP
else:
self.direction = Direction.DOWN
time.sleep(0.03)

Добавяме и caviet, който ни предпазва от движение в обратната посока, в която пътуваме, в която PE не е внедрил. Също така добавяме time.sleep() за 0,03 секунди, за да предотвратим случайно убиване на играча при бързи натискания, което PE също не е внедрило. Това покрива основата на играта. Сега преминаваме към околната среда.

import pygame
import random
from enum import Enum
from collections import namedtuple
import numpy as np
pygame.init()
font = pygame.font.Font('arial.ttf', 25)
#font = pygame.font.SysFont('arial', 25)
class Direction(Enum):
RIGHT = 1
LEFT = 2
UP = 3
DOWN = 4
Point = namedtuple('Point', 'x, y')
# rgb colors
WHITE = (255, 255, 255)
RED = (200,0,0)
BLUE1 = (0, 0, 255)
BLUE2 = (0, 100, 255)
BLACK = (0,0,0)
BLOCK_SIZE = 20
SPEED = 100
class SnakeGameAI:
def __init__(self, w=640, h=480):
self.w = w
self.h = h
# init display
self.display = pygame.display.set_mode((self.w, self.h))
pygame.display.set_caption('Snake')
self.clock = pygame.time.Clock()
self.reset()
def reset(self):
# init game state
self.direction = Direction.RIGHT
self.head = Point(self.w/2, self.h/2)
self.snake = [self.head,
Point(self.head.x-BLOCK_SIZE, self.head.y),
Point(self.head.x-(2*BLOCK_SIZE), self.head.y)]
self.score = 0
self.food = None
self._place_food()
self.frame_iteration = 0
def _place_food(self):
x = random.randint(0, (self.w-BLOCK_SIZE )//BLOCK_SIZE )*BLOCK_SIZE
y = random.randint(0, (self.h-BLOCK_SIZE )//BLOCK_SIZE )*BLOCK_SIZE
self.food = Point(x, y)
if self.food in self.snake:
self._place_food()
def play_step(self, action):
self.frame_iteration += 1
# 1. collect user input
for event in pygame.event.get():
if event.type == pygame.QUIT:
pygame.quit()
quit()
# 2. move
self._move(action) # update the head
self.snake.insert(0, self.head)
# 3. check if game over
reward = 0
game_over = False
if self.is_collision() or self.frame_iteration > 100*len(self.snake):
game_over = True
reward = -10
return reward, game_over, self.score
# 4. place new food or just move
if self.head == self.food:
self.score += 1
reward = 10
self._place_food()
else:
self.snake.pop()
# 5. update ui and clock
self._update_ui()
self.clock.tick(SPEED)
# 6. return game over and score
return reward, game_over, self.score
def is_collision(self, pt=None):
if pt is None:
pt = self.head
# hits boundary
if pt.x > self.w - BLOCK_SIZE or pt.x < 0 or pt.y > self.h - BLOCK_SIZE or pt.y < 0:
return True
# hits itself
if pt in self.snake[1:]:
return True
return False
def _update_ui(self):
self.display.fill(BLACK)
for pt in self.snake:
pygame.draw.rect(self.display, BLUE1, pygame.Rect(pt.x, pt.y, BLOCK_SIZE, BLOCK_SIZE))
pygame.draw.rect(self.display, BLUE2, pygame.Rect(pt.x+4, pt.y+4, 12, 12))
pygame.draw.rect(self.display, RED, pygame.Rect(self.food.x, self.food.y, BLOCK_SIZE, BLOCK_SIZE))
text = font.render("Score: " + str(self.score), True, WHITE)
self.display.blit(text, [0, 0])
pygame.display.flip()
def _move(self, action):
# [straight, right, left]
clock_wise = [Direction.RIGHT, Direction.DOWN, Direction.LEFT, Direction.UP]
idx = clock_wise.index(self.direction)
if np.array_equal(action, [1, 0, 0]):
new_dir = clock_wise[idx] # no change
elif np.array_equal(action, [0, 1, 0]):
next_idx = (idx + 1) % 4
new_dir = clock_wise[next_idx] # right turn r -> d -> l -> u
else: # [0, 0, 1]
next_idx = (idx - 1) % 4
new_dir = clock_wise[next_idx] # left turn r -> u -> l -> d
self.direction = new_dir
x = self.head.x
y = self.head.y
if self.direction == Direction.RIGHT:
x += BLOCK_SIZE
elif self.direction == Direction.LEFT:
x -= BLOCK_SIZE
elif self.direction == Direction.DOWN:
y += BLOCK_SIZE
elif self.direction == Direction.UP:
y -= BLOCK_SIZE
self.head = Point(x, y)

Env е изключително подобен на играта с няколко допълнителни функции, които позволяват на алгоритъма да се учи, включително функцията play_step, функцията за нулиране и функцията _move. Функцията play_step обработва всички актуализации, които се случват след функцията за преместване, която приема 1d масив, който е или [1, 0, 0], [0, 1, 0], или [0, 0, 1], който контролира дали агентът се движи по посока на часовниковата стрелка, обратно на часовниковата стрелка или не извършва никакво действие. Тези действия се извършват във функцията _move. След това преминаваме към агента

import torch
import random
import numpy as np
from collections import deque
from env import SnakeGameAI, Direction, Point
from model import Linear_QNet, QTrainer
MAX_MEMORY = 100_000
BATCH_SIZE = 1000
LR = 0.001
class Agent:
def __init__(self):
self.n_games = 0
self.epsilon = 0 # randomness
self.gamma = 0.9 # discount rate
self.memory = deque(maxlen=MAX_MEMORY) # popleft()
self.model = Linear_QNet(11, 256, 3)#.to(device=torch.device('cuda:0'))
self.trainer = QTrainer(self.model, lr=LR, gamma=self.gamma)#.to(device=torch.device('cuda:0'))
def get_state(self, game):
head = game.snake[0]
point_l = Point(head.x - 20, head.y)
point_r = Point(head.x + 20, head.y)
point_u = Point(head.x, head.y - 20)
point_d = Point(head.x, head.y + 20)
dir_l = game.direction == Direction.LEFT
dir_r = game.direction == Direction.RIGHT
dir_u = game.direction == Direction.UP
dir_d = game.direction == Direction.DOWN
state = [
# Danger straight
(dir_r and game.is_collision(point_r)) or
(dir_l and game.is_collision(point_l)) or
(dir_u and game.is_collision(point_u)) or
(dir_d and game.is_collision(point_d)),
# Danger right
(dir_u and game.is_collision(point_r)) or
(dir_d and game.is_collision(point_l)) or
(dir_l and game.is_collision(point_u)) or
(dir_r and game.is_collision(point_d)),
# Danger left
(dir_d and game.is_collision(point_r)) or
(dir_u and game.is_collision(point_l)) or
(dir_r and game.is_collision(point_u)) or
(dir_l and game.is_collision(point_d)),
# Move direction
dir_l,
dir_r,
dir_u,
dir_d,
# Food location
game.food.x < game.head.x, # food left
game.food.x > game.head.x, # food right
game.food.y < game.head.y, # food up
game.food.y > game.head.y # food down
]
return np.array(state, dtype=int)
def remember(self, state, action, reward, next_state, done):
self.memory.append((state, action, reward, next_state, done)) # popleft if MAX_MEMORY is reached
def train_long_memory(self):
if len(self.memory) > BATCH_SIZE:
mini_sample = random.sample(self.memory, BATCH_SIZE) # list of tuples
else:
mini_sample = self.memory
states, actions, rewards, next_states, dones = zip(*mini_sample)
self.trainer.train_step(states, actions, rewards, next_states, dones)
#for state, action, reward, nexrt_state, done in mini_sample:
# self.trainer.train_step(state, action, reward, next_state, done)
def train_short_memory(self, state, action, reward, next_state, done):
self.trainer.train_step(state, action, reward, next_state, done)
def get_action(self, state):
# random moves: tradeoff exploration / exploitation
self.epsilon = 80 - self.n_games
final_move = [0,0,0]
if random.randint(0, 200) < self.epsilon:
move = random.randint(0, 2)
final_move[move] = 1
else:
state0 = torch.tensor(state, dtype=torch.float)
prediction = self.model(state0)
move = torch.argmax(prediction).item()
final_move[move] = 1
return final_move
def train():
record = 0
agent = Agent()
game = SnakeGameAI()
while True:
# get old state
state_old = agent.get_state(game)
# get move
final_move = agent.get_action(state_old)
# perform move and get new state
reward, done, score = game.play_step(final_move)
state_new = agent.get_state(game)
# train short memory
agent.train_short_memory(state_old, final_move, reward, state_new, done)
# remember
agent.remember(state_old, final_move, reward, state_new, done)
if done:
# train long memory, plot result
game.reset()
agent.n_games += 1
agent.train_long_memory()
if score > record:
record = score
agent.model.save()
print('Game', agent.n_games, 'Score', score, 'Record:', record)
if __name__ == '__main__':
train()

За околната среда, моля, вижте видеото на PE, защото той обясни по красив начин, който не мога да репликирам в текст. След това можем да преминем към модела, който е частта, в която открих, че липсва обяснението на PE. Кодът зад това е

import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import os
class Linear_QNet(nn.Module):
def __init__(self, input_size, hidden_size, output_size):
super().__init__()
self.linear1 = nn.Linear(input_size, hidden_size)
self.linear2 = nn.Linear(hidden_size, output_size)
def forward(self, x):
x = F.relu(self.linear1(x))
x = self.linear2(x)
return x
def save(self, file_name='model.pth'):
model_folder_path = './model'
if not os.path.exists(model_folder_path):
os.makedirs(model_folder_path)
file_name = os.path.join(model_folder_path, file_name)
torch.save(self.state_dict(), file_name)
class QTrainer:
def __init__(self, model, lr, gamma):
self.lr = lr
self.gamma = gamma
self.model = model
self.optimizer = optim.Adam(model.parameters(), lr=self.lr)
self.criterion = nn.MSELoss()
def train_step(self, state, action, reward, next_state, done):
state = torch.tensor(state, dtype=torch.float)
next_state = torch.tensor(next_state, dtype=torch.float)
action = torch.tensor(action, dtype=torch.long)
reward = torch.tensor(reward, dtype=torch.float)
# (n, x)
if len(state.shape) == 1:
# (1, x)
state = torch.unsqueeze(state, 0)
next_state = torch.unsqueeze(next_state, 0)
action = torch.unsqueeze(action, 0)
reward = torch.unsqueeze(reward, 0)
done = (done, )
# 1: predicted Q values with current state
pred = self.model(state)
target = pred.clone()
for idx in range(len(done)):
Q_new = reward[idx]
if not done[idx]:
Q_new = reward[idx] + self.gamma * torch.max(self.model(next_state[idx]))
target[idx][torch.argmax(action[idx]).item()] = Q_new
# 2: Q_new = r + y * max(next_predicted Q value) -> only do this if not done
# pred.clone()
# preds[argmax(action)] = Q_new
self.optimizer.zero_grad()
loss = self.criterion(target, pred)
loss.backward()
self.optimizer.step()

Моделът е много лек линеен модел, внедрен в pytorch, изключително гъвкава библиотека за машинно обучение. Избрах pytorch пред нормалния си избор на tensorflow, защото беше просто да внедря модел на pytorch в сравнение с модел на pytorch. Моделът приема 11 стойности като вход, има 256 възела като скрит слой и извежда стойност от 0 до 2 като изход. Преобразуваме всеки параметър в тензор, използвайки тези редове код

state = torch.tensor(state, dtype=torch.float)
next_state = torch.tensor(next_state, dtype=torch.float)
action = torch.tensor(action, dtype=torch.long)
reward = torch.tensor(reward, dtype=torch.float)

и всеки от тези параметри след това се освобождава, за да стане съвместим с входните слоеве на невронната мрежа и готовият параметър се преобразува в кортеж с един елемент

state = torch.unsqueeze(state, 0)
next_state = torch.unsqueeze(next_state, 0)
action = torch.unsqueeze(action, 0)
reward = torch.unsqueeze(reward, 0)
done = (done, )

След всичко, което е направено, ние изчисляваме прогнозирания резултат и резултата, предоставен от модела, и използвайки ги, изчисляваме загубата.

Q_new = reward[idx]
if not done[idx]:
Q_new = reward[idx] + self.gamma * torch.max(self.model(next_state[idx]))
target[idx][torch.argmax(action[idx]).item()] = Q_new
# 2: Q_new = r + y * max(next_predicted Q value)

След това просто оптимизираме модела и неговите параметри

self.optimizer.zero_grad()
loss = self.criterion(target, pred)
loss.backward()
self.optimizer.step()

Този модел и метод на обучение дават резултати като: https://youtu.be/kxFVMeoqzkM, което е постигнато след 30 минути обучение.



Станете писател