Обучението на змийски алгоритъм е един от най-основните проекти, свързани с обучението с подсилване. Това е нещо като „Здравей свят“ на обучението за укрепване. Наскоро открих поредицата в youtube от инженер на python, където той изгради алгоритъм за обучение на подсилване от нулата с помощта на pytorch, но имах чувството, че той не обясни концепциите или модела много добре. Пиша тази статия, за да обясня основите на алгоритъма и може би част от математиката зад него. Видео в Youtube от инженера на Python: https://youtu.be/PJl4iabBEz0. Започваме с просто кодиране на игра със змия, която може да бъде направена с помощта на pygame
import pygame import random from enum import Enum from collections import namedtuple import time pygame.init() font = pygame.font.Font('arial.ttf', 25) #font = pygame.font.SysFont('arial', 25) class Direction(Enum): RIGHT = 1 LEFT = 2 UP = 3 DOWN = 4 Point = namedtuple('Point', 'x, y') # rgb colors WHITE = (255, 255, 255) RED = (200,0,0) BLUE1 = (0, 0, 255) BLUE2 = (0, 100, 255) BLACK = (0,0,0) BLOCK_SIZE = 20 SPEED = 20 class Snake: def __init__(self, w=640, h=480): self.w = w self.h = h # init display self.display = pygame.display.set_mode((self.w, self.h)) pygame.display.set_caption('Snake') self.clock = pygame.time.Clock() # init game state self.direction = Direction.RIGHT self.head = Point(self.w/2, self.h/2) self.snake = [self.head, Point(self.head.x-BLOCK_SIZE, self.head.y), Point(self.head.x-(2*BLOCK_SIZE), self.head.y)] self.score = 0 self.food = None self._place_food() def _place_food(self): x = random.randint(0, (self.w-BLOCK_SIZE )//BLOCK_SIZE )*BLOCK_SIZE y = random.randint(0, (self.h-BLOCK_SIZE )//BLOCK_SIZE )*BLOCK_SIZE self.food = Point(x, y) if self.food in self.snake: self._place_food() def play_step(self): # 1. collect user input for event in pygame.event.get(): if event.type == pygame.QUIT: pygame.quit() quit() if event.type == pygame.KEYDOWN: if event.key == pygame.K_LEFT: if self.direction == Direction.RIGHT: self.direction = Direction.RIGHT else: self.direction = Direction.LEFT time.sleep(0.03) elif event.key == pygame.K_RIGHT: if self.direction == Direction.LEFT: self.direction = Direction.LEFT else: self.direction = Direction.RIGHT time.sleep(0.03) elif event.key == pygame.K_UP: if self.direction == Direction.DOWN: self.direction = Direction.DOWN else: self.direction = Direction.UP time.sleep(0.03) elif event.key == pygame.K_DOWN: if self.direction == Direction.UP: self.direction = Direction.UP else: self.direction = Direction.DOWN time.sleep(0.03) # 2. move self._move(self.direction) # update the head self.snake.insert(0, self.head) # 3. check if game over game_over = False if self._is_collision(): game_over = True return game_over, self.score # 4. place new food or just move if self.head == self.food: self.score += 1 self._place_food() else: self.snake.pop() # 5. update ui and clock self._update_ui() self.clock.tick(SPEED) # 6. return game over and score return game_over, self.score def _is_collision(self): # hits boundary if self.head.x > self.w - BLOCK_SIZE or self.head.x < 0 or self.head.y > self.h - BLOCK_SIZE or self.head.y < 0: return True # hits itself if self.head in self.snake[1:]: return True return False def _update_ui(self): self.display.fill(BLACK) for pt in self.snake: pygame.draw.rect(self.display, BLUE1, pygame.Rect(pt.x, pt.y, BLOCK_SIZE, BLOCK_SIZE)) pygame.draw.rect(self.display, BLUE2, pygame.Rect(pt.x+4, pt.y+4, 12, 12)) pygame.draw.rect(self.display, RED, pygame.Rect(self.food.x, self.food.y, BLOCK_SIZE, BLOCK_SIZE)) text = font.render("Score: " + str(self.score), True, WHITE) self.display.blit(text, [0, 0]) pygame.display.flip() def _move(self, direction): x = self.head.x y = self.head.y if direction == Direction.RIGHT: x += BLOCK_SIZE elif direction == Direction.LEFT: x -= BLOCK_SIZE elif direction == Direction.DOWN: y += BLOCK_SIZE elif direction == Direction.UP: y -= BLOCK_SIZE self.head = Point(x, y) if __name__ == '__main__': game = Snake() # game loop while True: game_over, score = game.play_step() if game_over == True: break print('Final Score', score) pygame.quit()
Това е кодът на играта. Нека преминем през него
class Direction(Enum): RIGHT = 1 LEFT = 2 UP = 3 DOWN = 4
Първо създаваме клас Direction, за да опростим процеса на промяна на посоката на змията с четирите основни посоки. Останалата част от играта е доста разбираема със система за откриване на сблъсък.
def _is_collision(self): # hits boundary if self.head.x > self.w - BLOCK_SIZE or self.head.x < 0 or self.head.y > self.h - BLOCK_SIZE or self.head.y < 0: return True # hits itself if self.head in self.snake[1:]: return True return False
За да проверите дали змията излиза от картата или да видите дали главата на змията се удря в точка, която не е главата. Той също така прилага много проста система за придвижване
def _move(self, direction): x = self.head.x y = self.head.y if direction == Direction.RIGHT: x += BLOCK_SIZE elif direction == Direction.LEFT: x -= BLOCK_SIZE elif direction == Direction.DOWN: y += BLOCK_SIZE elif direction == Direction.UP: y -= BLOCK_SIZE self.head = Point(x, y)
който просто актуализира главата и тялото на змията и измества тялото наляво, надясно, нагоре или надолу въз основа на текущата посока на змията. Промяната на посоката се улеснява от проста система с ключове, реализирана чрез вградената система за въвеждане на pygames
for event in pygame.event.get(): if event.type == pygame.QUIT: pygame.quit() quit() if event.type == pygame.KEYDOWN: if event.key == pygame.K_LEFT: if self.direction == Direction.RIGHT: self.direction = Direction.RIGHT else: self.direction = Direction.LEFT time.sleep(0.03) elif event.key == pygame.K_RIGHT: if self.direction == Direction.LEFT: self.direction = Direction.LEFT else: self.direction = Direction.RIGHT time.sleep(0.03) elif event.key == pygame.K_UP: if self.direction == Direction.DOWN: self.direction = Direction.DOWN else: self.direction = Direction.UP time.sleep(0.03) elif event.key == pygame.K_DOWN: if self.direction == Direction.UP: self.direction = Direction.UP else: self.direction = Direction.DOWN time.sleep(0.03)
Добавяме и caviet, който ни предпазва от движение в обратната посока, в която пътуваме, в която PE не е внедрил. Също така добавяме time.sleep() за 0,03 секунди, за да предотвратим случайно убиване на играча при бързи натискания, което PE също не е внедрило. Това покрива основата на играта. Сега преминаваме към околната среда.
import pygame import random from enum import Enum from collections import namedtuple import numpy as np pygame.init() font = pygame.font.Font('arial.ttf', 25) #font = pygame.font.SysFont('arial', 25) class Direction(Enum): RIGHT = 1 LEFT = 2 UP = 3 DOWN = 4 Point = namedtuple('Point', 'x, y') # rgb colors WHITE = (255, 255, 255) RED = (200,0,0) BLUE1 = (0, 0, 255) BLUE2 = (0, 100, 255) BLACK = (0,0,0) BLOCK_SIZE = 20 SPEED = 100 class SnakeGameAI: def __init__(self, w=640, h=480): self.w = w self.h = h # init display self.display = pygame.display.set_mode((self.w, self.h)) pygame.display.set_caption('Snake') self.clock = pygame.time.Clock() self.reset() def reset(self): # init game state self.direction = Direction.RIGHT self.head = Point(self.w/2, self.h/2) self.snake = [self.head, Point(self.head.x-BLOCK_SIZE, self.head.y), Point(self.head.x-(2*BLOCK_SIZE), self.head.y)] self.score = 0 self.food = None self._place_food() self.frame_iteration = 0 def _place_food(self): x = random.randint(0, (self.w-BLOCK_SIZE )//BLOCK_SIZE )*BLOCK_SIZE y = random.randint(0, (self.h-BLOCK_SIZE )//BLOCK_SIZE )*BLOCK_SIZE self.food = Point(x, y) if self.food in self.snake: self._place_food() def play_step(self, action): self.frame_iteration += 1 # 1. collect user input for event in pygame.event.get(): if event.type == pygame.QUIT: pygame.quit() quit() # 2. move self._move(action) # update the head self.snake.insert(0, self.head) # 3. check if game over reward = 0 game_over = False if self.is_collision() or self.frame_iteration > 100*len(self.snake): game_over = True reward = -10 return reward, game_over, self.score # 4. place new food or just move if self.head == self.food: self.score += 1 reward = 10 self._place_food() else: self.snake.pop() # 5. update ui and clock self._update_ui() self.clock.tick(SPEED) # 6. return game over and score return reward, game_over, self.score def is_collision(self, pt=None): if pt is None: pt = self.head # hits boundary if pt.x > self.w - BLOCK_SIZE or pt.x < 0 or pt.y > self.h - BLOCK_SIZE or pt.y < 0: return True # hits itself if pt in self.snake[1:]: return True return False def _update_ui(self): self.display.fill(BLACK) for pt in self.snake: pygame.draw.rect(self.display, BLUE1, pygame.Rect(pt.x, pt.y, BLOCK_SIZE, BLOCK_SIZE)) pygame.draw.rect(self.display, BLUE2, pygame.Rect(pt.x+4, pt.y+4, 12, 12)) pygame.draw.rect(self.display, RED, pygame.Rect(self.food.x, self.food.y, BLOCK_SIZE, BLOCK_SIZE)) text = font.render("Score: " + str(self.score), True, WHITE) self.display.blit(text, [0, 0]) pygame.display.flip() def _move(self, action): # [straight, right, left] clock_wise = [Direction.RIGHT, Direction.DOWN, Direction.LEFT, Direction.UP] idx = clock_wise.index(self.direction) if np.array_equal(action, [1, 0, 0]): new_dir = clock_wise[idx] # no change elif np.array_equal(action, [0, 1, 0]): next_idx = (idx + 1) % 4 new_dir = clock_wise[next_idx] # right turn r -> d -> l -> u else: # [0, 0, 1] next_idx = (idx - 1) % 4 new_dir = clock_wise[next_idx] # left turn r -> u -> l -> d self.direction = new_dir x = self.head.x y = self.head.y if self.direction == Direction.RIGHT: x += BLOCK_SIZE elif self.direction == Direction.LEFT: x -= BLOCK_SIZE elif self.direction == Direction.DOWN: y += BLOCK_SIZE elif self.direction == Direction.UP: y -= BLOCK_SIZE self.head = Point(x, y)
Env е изключително подобен на играта с няколко допълнителни функции, които позволяват на алгоритъма да се учи, включително функцията play_step, функцията за нулиране и функцията _move. Функцията play_step обработва всички актуализации, които се случват след функцията за преместване, която приема 1d масив, който е или [1, 0, 0], [0, 1, 0], или [0, 0, 1], който контролира дали агентът се движи по посока на часовниковата стрелка, обратно на часовниковата стрелка или не извършва никакво действие. Тези действия се извършват във функцията _move. След това преминаваме към агента
import torch import random import numpy as np from collections import deque from env import SnakeGameAI, Direction, Point from model import Linear_QNet, QTrainer MAX_MEMORY = 100_000 BATCH_SIZE = 1000 LR = 0.001 class Agent: def __init__(self): self.n_games = 0 self.epsilon = 0 # randomness self.gamma = 0.9 # discount rate self.memory = deque(maxlen=MAX_MEMORY) # popleft() self.model = Linear_QNet(11, 256, 3)#.to(device=torch.device('cuda:0')) self.trainer = QTrainer(self.model, lr=LR, gamma=self.gamma)#.to(device=torch.device('cuda:0')) def get_state(self, game): head = game.snake[0] point_l = Point(head.x - 20, head.y) point_r = Point(head.x + 20, head.y) point_u = Point(head.x, head.y - 20) point_d = Point(head.x, head.y + 20) dir_l = game.direction == Direction.LEFT dir_r = game.direction == Direction.RIGHT dir_u = game.direction == Direction.UP dir_d = game.direction == Direction.DOWN state = [ # Danger straight (dir_r and game.is_collision(point_r)) or (dir_l and game.is_collision(point_l)) or (dir_u and game.is_collision(point_u)) or (dir_d and game.is_collision(point_d)), # Danger right (dir_u and game.is_collision(point_r)) or (dir_d and game.is_collision(point_l)) or (dir_l and game.is_collision(point_u)) or (dir_r and game.is_collision(point_d)), # Danger left (dir_d and game.is_collision(point_r)) or (dir_u and game.is_collision(point_l)) or (dir_r and game.is_collision(point_u)) or (dir_l and game.is_collision(point_d)), # Move direction dir_l, dir_r, dir_u, dir_d, # Food location game.food.x < game.head.x, # food left game.food.x > game.head.x, # food right game.food.y < game.head.y, # food up game.food.y > game.head.y # food down ] return np.array(state, dtype=int) def remember(self, state, action, reward, next_state, done): self.memory.append((state, action, reward, next_state, done)) # popleft if MAX_MEMORY is reached def train_long_memory(self): if len(self.memory) > BATCH_SIZE: mini_sample = random.sample(self.memory, BATCH_SIZE) # list of tuples else: mini_sample = self.memory states, actions, rewards, next_states, dones = zip(*mini_sample) self.trainer.train_step(states, actions, rewards, next_states, dones) #for state, action, reward, nexrt_state, done in mini_sample: # self.trainer.train_step(state, action, reward, next_state, done) def train_short_memory(self, state, action, reward, next_state, done): self.trainer.train_step(state, action, reward, next_state, done) def get_action(self, state): # random moves: tradeoff exploration / exploitation self.epsilon = 80 - self.n_games final_move = [0,0,0] if random.randint(0, 200) < self.epsilon: move = random.randint(0, 2) final_move[move] = 1 else: state0 = torch.tensor(state, dtype=torch.float) prediction = self.model(state0) move = torch.argmax(prediction).item() final_move[move] = 1 return final_move def train(): record = 0 agent = Agent() game = SnakeGameAI() while True: # get old state state_old = agent.get_state(game) # get move final_move = agent.get_action(state_old) # perform move and get new state reward, done, score = game.play_step(final_move) state_new = agent.get_state(game) # train short memory agent.train_short_memory(state_old, final_move, reward, state_new, done) # remember agent.remember(state_old, final_move, reward, state_new, done) if done: # train long memory, plot result game.reset() agent.n_games += 1 agent.train_long_memory() if score > record: record = score agent.model.save() print('Game', agent.n_games, 'Score', score, 'Record:', record) if __name__ == '__main__': train()
За околната среда, моля, вижте видеото на PE, защото той обясни по красив начин, който не мога да репликирам в текст. След това можем да преминем към модела, който е частта, в която открих, че липсва обяснението на PE. Кодът зад това е
import torch import torch.nn as nn import torch.optim as optim import torch.nn.functional as F import os class Linear_QNet(nn.Module): def __init__(self, input_size, hidden_size, output_size): super().__init__() self.linear1 = nn.Linear(input_size, hidden_size) self.linear2 = nn.Linear(hidden_size, output_size) def forward(self, x): x = F.relu(self.linear1(x)) x = self.linear2(x) return x def save(self, file_name='model.pth'): model_folder_path = './model' if not os.path.exists(model_folder_path): os.makedirs(model_folder_path) file_name = os.path.join(model_folder_path, file_name) torch.save(self.state_dict(), file_name) class QTrainer: def __init__(self, model, lr, gamma): self.lr = lr self.gamma = gamma self.model = model self.optimizer = optim.Adam(model.parameters(), lr=self.lr) self.criterion = nn.MSELoss() def train_step(self, state, action, reward, next_state, done): state = torch.tensor(state, dtype=torch.float) next_state = torch.tensor(next_state, dtype=torch.float) action = torch.tensor(action, dtype=torch.long) reward = torch.tensor(reward, dtype=torch.float) # (n, x) if len(state.shape) == 1: # (1, x) state = torch.unsqueeze(state, 0) next_state = torch.unsqueeze(next_state, 0) action = torch.unsqueeze(action, 0) reward = torch.unsqueeze(reward, 0) done = (done, ) # 1: predicted Q values with current state pred = self.model(state) target = pred.clone() for idx in range(len(done)): Q_new = reward[idx] if not done[idx]: Q_new = reward[idx] + self.gamma * torch.max(self.model(next_state[idx])) target[idx][torch.argmax(action[idx]).item()] = Q_new # 2: Q_new = r + y * max(next_predicted Q value) -> only do this if not done # pred.clone() # preds[argmax(action)] = Q_new self.optimizer.zero_grad() loss = self.criterion(target, pred) loss.backward() self.optimizer.step()
Моделът е много лек линеен модел, внедрен в pytorch, изключително гъвкава библиотека за машинно обучение. Избрах pytorch пред нормалния си избор на tensorflow, защото беше просто да внедря модел на pytorch в сравнение с модел на pytorch. Моделът приема 11 стойности като вход, има 256 възела като скрит слой и извежда стойност от 0 до 2 като изход. Преобразуваме всеки параметър в тензор, използвайки тези редове код
state = torch.tensor(state, dtype=torch.float) next_state = torch.tensor(next_state, dtype=torch.float) action = torch.tensor(action, dtype=torch.long) reward = torch.tensor(reward, dtype=torch.float)
и всеки от тези параметри след това се освобождава, за да стане съвместим с входните слоеве на невронната мрежа и готовият параметър се преобразува в кортеж с един елемент
state = torch.unsqueeze(state, 0) next_state = torch.unsqueeze(next_state, 0) action = torch.unsqueeze(action, 0) reward = torch.unsqueeze(reward, 0) done = (done, )
След всичко, което е направено, ние изчисляваме прогнозирания резултат и резултата, предоставен от модела, и използвайки ги, изчисляваме загубата.
Q_new = reward[idx] if not done[idx]: Q_new = reward[idx] + self.gamma * torch.max(self.model(next_state[idx])) target[idx][torch.argmax(action[idx]).item()] = Q_new # 2: Q_new = r + y * max(next_predicted Q value)
След това просто оптимизираме модела и неговите параметри
self.optimizer.zero_grad() loss = self.criterion(target, pred) loss.backward() self.optimizer.step()
Този модел и метод на обучение дават резултати като: https://youtu.be/kxFVMeoqzkM, което е постигнато след 30 минути обучение.