深度强化学习DQN(DeepQNetwork)原理及例⼦:如何解决
迷宫问题,附源码
⽬前,强化学习中很⽕的当属Q-Learning了,关于Q-Learning的具体介绍请参加。从上⼀篇⽂章中,我们可以看到,Q table可以看做Q-Learning的⼤脑,Q table对应了⼀张state-action的表,但在实际应⽤中,state和action往往很多,内存很难装下Q table,因此需要⽤神经⽹络替代Q table。
训练样本
⾸先要解决的问题是如何获取训练样本。在 DQN 中有 Experience Replay 的概念,就是经验回放。即先让agent去探索环境,将经验(记忆)累积到⼀定程度,再随机抽取出⼀批样本进⾏训练。为什么要随机抽取?因为agent去探索环境时采集到的样本是⼀个时间序列,样本之间具有连续性,如果每次得到样本就更新Q值,受样本分布影响,会对收敛造成影响。
这⾥我们联想到数据库领域,我们需要使⽤benchmark去回放得到不同的action对应的Q值。增强学习是试错学习(Trail-and-error),由于没有直接的指导信息,agent要以不断与环境进⾏交互,通过试错的⽅式来获得最佳策略。因此⼀开始可以看做是盲⽬的、随机的试验,但是根据反馈的reward来优化损失函数可以使得我们想要的Q table慢慢收敛。
损失函数
上⾯提到了损失函数,那么如何选取呢。在DQN中,Q值表中表⽰的是当前已学习到的经验。⽽根据公式计算出的 Q 值是agent通过与环境交互及⾃⾝的经验总结得到的⼀个分数(即:⽬标 Q 值)。最后使⽤⽬标 Q 值(target_q)去更新原来旧的 Q 值(q)。⽽⽬标 Q 值与旧的Q 值的对应关系,正好是监督学习神经⽹络中结果值与输出值的对应关系。
所以,loss = (target_q - q)^2
即:整个训练过程其实就是 Q 值(q)向⽬标 Q 值(target_q)逼近的过程。
代码实现
看代码是最直观的,我先给出整个代码流程,然后再详细解释。下⾯是全部代码:
import tensorflow as tf
import numpy as np
from collections import deque
import random
class DeepQNetwork:
r = np.array([[-1, -1, -1, -1, 0, -1],
[-1, -1, -1, 0, -1, 100.0],
[-1, -1, -1, 0, -1, -1],
[-1, 0, 0, -1, 0, -1],
[0, -1, -1, 1, -1, 100],
[-1, 0, -1, -1, 0, 100],
])
# 执⾏步数。
step_index = 0
# 状态数。
state_num = 6
# 动作数。
action_num = 6
# 训练之前观察多少步。
OBSERVE = 1000.
# 选取的⼩批量训练样本数。
BATCH = 20
# epsilon 的最⼩值,当 epsilon ⼩于该值时,将不在随机选择⾏为。
# epsilon 的最⼩值,当 epsilon ⼩于该值时,将不在随机选择⾏为。
FINAL_EPSILON = 0.0001
# epsilon 的初始值,epsilon 逐渐减⼩。
INITIAL_EPSILON = 0.1
# epsilon 衰减的总步数。
EXPLORE = 3000000.
# 探索模式计数。
epsilon = 0
# 训练步数统计。
learn_step_counter = 0
# 学习率。
learning_rate = 0.001
# γ经验折损率。
gamma = 0.9
# 记忆上限。
memory_size = 5000
# 当前记忆数。
memory_counter = 0
# 保存观察到的执⾏过的⾏动的存储器,即:曾经经历过的记忆。
replay_memory_store = deque()
# ⽣成⼀个状态矩阵(6 X 6),每⼀⾏代表⼀个状态。
state_list = None
# ⽣成⼀个动作矩阵。
action_list = None
# q_eval ⽹络。
q_eval_input = None
action_input = None
q_target = None
q_eval = None
predict = None
loss = None
train_op = None
cost_his = None
reward_action = None
# tensorflow 会话。
session = None
def __init__(self, learning_rate=0.001, gamma=0.9, memory_size=5000):        self.learning_rate = learning_rate
self.gamma = gamma
<_size = memory_size
# 初始化成⼀个 6 X 6 的状态矩阵。
self.state_list = np.identity(self.state_num)
# 初始化成⼀个 6 X 6 的动作矩阵。
self.action_list = np.identity(self.action_num)
# 创建神经⽹络。
# 初始化 tensorflow 会话。
self.session = tf.InteractiveSession()
self.session = tf.InteractiveSession()
# 初始化 tensorflow 参数。
self.session.run(tf.initialize_all_variables())
# 记录所有 loss 变化。
def create_network(self):
"""
创建神经⽹络。
:return:
"""
self.q_eval_input = tf.placeholder(shape=[None, self.state_num], dtype=tf.float32)
self.action_input = tf.placeholder(shape=[None, self.action_num], dtype=tf.float32)
self.q_target = tf.placeholder(shape=[None], dtype=tf.float32)
neuro_layer_1 = 3
w1 = tf.Variable(tf.random_normal([self.state_num, neuro_layer_1]))
b1 = tf.s([1, neuro_layer_1]) + 0.1)
l1 = lu(tf.matmul(self.q_eval_input, w1) + b1)
w2 = tf.Variable(tf.random_normal([neuro_layer_1, self.action_num]))
b2 = tf.s([1, self.action_num]) + 0.1)
self.q_eval = tf.matmul(l1, w2) + b2
# 取出当前动作的得分。
self.loss = tf.reduce_mean(tf.square((self.q_target - ward_action)))
self.predict = tf.argmax(self.q_eval, 1)
def select_action(self, state_index):
"""
根据策略选择动作。
:param state_index: 当前状态。
:return:
"""
current_state = self.state_list[state_index:state_index + 1]
if np.random.uniform() < self.epsilon:
current_action_index = np.random.randint(0, self.action_num)
else:
actions_value = self.session.run(self.q_eval, feed_dict={self.q_eval_input: current_state})
action = np.argmax(actions_value)
current_action_index = action
# 开始训练后,在 epsilon ⼩于⼀定的值之前,将逐步减⼩ epsilon。
if self.step_index > self.OBSERVE and self.epsilon > self.FINAL_EPSILON:
self.epsilon -= (self.INITIAL_EPSILON - self.FINAL_EPSILON) / self.EXPLORE
return current_action_index
def save_store(self, current_state_index, current_action_index, current_reward, next_state_index, done):        """
保存记忆。
:param current_state_index: 当前状态 index。
:param current_action_index: 动作 index。
:param current_reward: 奖励。
:param next_state_index: 下⼀个状态 index。
:param done: 是否结束。
:return:
"""
current_state = self.state_list[current_state_index:current_state_index + 1]
current_action = self.action_list[current_action_index:current_action_index + 1]
current_action = self.action_list[current_action_index:current_action_index + 1]
next_state = self.state_list[next_state_index:next_state_index + 1]
# 记忆动作(当前状态,当前执⾏的动作,当前动作的得分,下⼀个状态)。
current_state,
current_action,
current_reward,
next_state,
done))
# 如果超过记忆的容量,则将最久远的记忆移除。
if play_memory_store) > _size:
<_counter += 1
def step(self, state, action):
"""
执⾏动作。
:param state: 当前状态。
:param action: 执⾏的动作。
:return:
"""
reward = self.r[state][action]
next_state = action
done = False
if action == 5:
done = True
return next_state, reward, done
def experience_replay(self):
"""
session怎么记忆记忆回放。
:return:
"""
# 随机选择⼀⼩批记忆样本。
batch = self.BATCH _counter > self.BATCH _counter        minibatch = random.play_memory_store, batch)
batch_state = None
batch_action = None
batch_reward = None
batch_next_state = None
batch_done = None
for index in range(len(minibatch)):
if batch_state is None:
batch_state = minibatch[index][0]
elif batch_state is not None:
batch_state = np.vstack((batch_state, minibatch[index][0]))
if batch_action is None:
batch_action = minibatch[index][1]
elif batch_action is not None:
batch_action = np.vstack((batch_action, minibatch[index][1]))
if batch_reward is None:
batch_reward = minibatch[index][2]
elif batch_reward is not None:
batch_reward = np.vstack((batch_reward, minibatch[index][2]))
if batch_next_state is None:
if batch_next_state is None:
batch_next_state = minibatch[index][3]
elif batch_next_state is not None:
batch_next_state = np.vstack((batch_next_state, minibatch[index][3]))
if batch_done is None:
batch_done = minibatch[index][4]
elif batch_done is not None:
batch_done = np.vstack((batch_done, minibatch[index][4]))
# q_next:下⼀个状态的 Q 值。
q_next = self.session.run([self.q_eval], feed_dict={self.q_eval_input: batch_next_state})
q_target = []
for i in range(len(minibatch)):
# 当前即时得分。
current_reward = batch_reward[i][0]
# # 游戏是否结束。
# current_done = batch_done[i][0]
# 更新 Q 值。
q_value = current_reward + self.gamma * np.max(q_next[0][i])
# 当得分⼩于 0 时,表⽰⾛了不可⾛的位置。
if current_reward < 0:
q_target.append(current_reward)
else:
q_target.append(q_value)
_, cost, reward = self.session.run([ain_op, self.loss, ward_action],
feed_dict={self.q_eval_input: batch_state,
self.action_input: batch_action,
self.q_target: q_target})
# if self.step_index % 1000 == 0:
#    print("loss:", cost)
self.learn_step_counter += 1
def train(self):
"""
训练。
:return:
"""
# 初始化当前状态。
current_state = np.random.randint(0, self.action_num - 1)
self.epsilon = self.INITIAL_EPSILON
while True:
# 选择动作。
action = self.select_action(current_state)
# 执⾏动作,得到:下⼀个状态,执⾏动作的得分,是否结束。
next_state, reward, done = self.step(current_state, action)
# 保存记忆。
self.save_store(current_state, action, reward, next_state, done)
# 先观察⼀段时间累积⾜够的记忆在进⾏训练。
if self.step_index > self.OBSERVE:
if self.step_index > 10000: