# -*- coding: utf-8 -*- ## Setup a Virtual Display To generate a replay video of agent and environment. """ # Commented out IPython magic to ensure Python compatibility. # %%capture # !pip install pyglet==1.5.1 # !apt install python-opengl # !apt install ffmpeg # !apt install xvfb # !pip3 install pyvirtualdisplay # # # Virtual display # from pyvirtualdisplay import Display # # virtual_display = Display(visible=0, size=(1400, 900)) # virtual_display.start() """## Install dependencies""" # Commented out IPython magic to ensure Python compatibility. # %%capture # !pip install gym==0.24 # !pip install pygame # !pip install numpy # # !pip install imageio imageio_ffmpeg """## Import the packages""" import numpy as np import gym import random import imageio from tqdm.notebook import trange """## Frozen Lake""" # Create the FrozenLake-v1 environment using 4x4 map and non-slippery version env = gym.make("FrozenLake-v1",map_name="4x4",is_slippery=False) """### Understanding the FrozenLake environment""" print("_____OBSERVATION SPACE_____ \n") print("Observation Space", env.observation_space) print("Sample observation", env.observation_space.sample()) # Get a random observation print("\n _____ACTION SPACE_____ \n") print("Action Space Shape", env.action_space.n) print("Action Space Sample", env.action_space.sample()) # Take a random action """## Create and Initialize the Q-table""" state_space = env.observation_space.n print("There are ", state_space, " possible states") action_space = env.action_space.n print("There are ", action_space, " possible actions") # Let's create our Qtable of size (state_space, action_space) and initialized each values at 0 using np.zeros def initialize_q_table(state_space, action_space): Qtable = np.zeros((state_space, action_space)) return Qtable Qtable_frozenlake = initialize_q_table(state_space, action_space) """## Define the epsilon-greedy policy""" def epsilon_greedy_policy(Qtable, state, epsilon): # Randomly generate a number between 0 and 1 random_int = random.uniform(0,1) # if random_int > greater than epsilon --> exploitation if random_int > epsilon: # Take the action with the highest value given a state # np.argmax can be useful here action = np.argmax(Qtable[state]) # else --> exploration else: action = env.action_space.sample() return action """## Define the greedy policy""" def greedy_policy(Qtable, state): # Exploitation: take the action with the highest state, action value action = np.argmax(Qtable[state]) return action """## Define the hyperparameters""" # Training parameters n_training_episodes = 10000 # Total training episodes learning_rate = 0.7 # Learning rate # Evaluation parameters n_eval_episodes = 100 # Total number of test episodes # Environment parameters env_id = "FrozenLake-v1" # Name of the environment max_steps = 99 # Max steps per episode gamma = 0.95 # Discounting rate eval_seed = [] # The evaluation seed of the environment # Exploration parameters max_epsilon = 1.0 # Exploration probability at start min_epsilon = 0.05 # Minimum exploration probability decay_rate = 0.0005 # Exponential decay rate for exploration prob """## Training the model""" def train(n_training_episodes, min_epsilon, max_epsilon, decay_rate, env, max_steps, Qtable): for episode in trange(n_training_episodes): # Reduce epsilon (because we need less and less exploration) epsilon = min_epsilon + (max_epsilon - min_epsilon)*np.exp(-decay_rate*episode) # Reset the environment state = env.reset() step = 0 done = False # repeat for step in range(max_steps): # Choose the action At using epsilon greedy policy action = epsilon_greedy_policy(Qtable, state, epsilon) # Take action At and observe Rt+1 and St+1 # Take the action (a) and observe the outcome state(s') and reward (r) new_state, reward, done, info = env.step(action) # Update Q(s,a):= Q(s,a) + lr [R(s,a) + gamma * max Q(s',a') - Q(s,a)] Qtable[state][action] = Qtable[state][action] + learning_rate * (reward + gamma * np.max(Qtable[new_state]) - Qtable[state][action]) # If done, finish the episode if done: break # Our state is the new state state = new_state return Qtable Qtable_frozenlake = train(n_training_episodes, min_epsilon, max_epsilon, decay_rate, env, max_steps, Qtable_frozenlake) """## Tranined Q-Learning table""" Qtable_frozenlake """## Model evaluation""" def evaluate_agent(env, max_steps, n_eval_episodes, Q, seed): """ Evaluate the agent for ``n_eval_episodes`` episodes and returns average reward and std of reward. :param env: The evaluation environment :param n_eval_episodes: Number of episode to evaluate the agent :param Q: The Q-table :param seed: The evaluation seed array (for taxi-v3) """ episode_rewards = [] for episode in range(n_eval_episodes): if seed: state = env.reset(seed=seed[episode]) else: state = env.reset() step = 0 done = False total_rewards_ep = 0 for step in range(max_steps): # Take the action (index) that have the maximum expected future reward given that state action = np.argmax(Q[state][:]) new_state, reward, done, info = env.step(action) total_rewards_ep += reward if done: break state = new_state episode_rewards.append(total_rewards_ep) mean_reward = np.mean(episode_rewards) std_reward = np.std(episode_rewards) return mean_reward, std_reward # Evaluate our Agent mean_reward, std_reward = evaluate_agent(env, max_steps, n_eval_episodes, Qtable_frozenlake, eval_seed) print(f"Mean_reward={mean_reward:.2f} +/- {std_reward:.2f}") """## Visualizing the results""" def record_video(env, Qtable, out_directory, fps=1): images = [] done = False state = env.reset(seed=random.randint(0,500)) img = env.render(mode='rgb_array') images.append(img) while not done: # Take the action (index) that have the maximum expected future reward given that state action = np.argmax(Qtable[state][:]) state, reward, done, info = env.step(action) # We directly put next_state = state for recording logic img = env.render(mode='rgb_array') images.append(img) imageio.mimsave(out_directory, [np.array(img) for i, img in enumerate(images)], fps=fps) """Saving animated file as gif with 1 frame per second""" video_path="/content/replay.gif" video_fps=1 record_video(env, Qtable_frozenlake, video_path, video_fps) from IPython.display import Image Image('./replay.gif')