内容简介:代码可以参见目前,强化学习中很火的当属Q-Learning了,关于Q-Learning的具体介绍请参加我上一篇文章。从上一篇文章中,我们可以看到,Q table可以看做Q-Learning的大脑,Q table对应了一张state-action的表,但在实际应用中,state和action往往很多,内存很难装下Q table,因此需要用神经网络替代Q table。首先要解决的问题是如何获取训练样本。在 DQN 中有
代码可以参见 https://blog.csdn.net/bbbeoy/... ,本文我做了一些改动
目前,强化学习中很火的当属Q-Learning了,关于Q-Learning的具体介绍请参加我上一篇文章。从上一篇文章中,我们可以看到,Q table可以看做Q-Learning的大脑,Q table对应了一张state-action的表,但在实际应用中,state和action往往很多,内存很难装下Q table,因此需要用神经网络替代Q table。
训练样本
首先要解决的问题是如何获取训练样本。在 DQN 中有 Experience Replay 的概念,就是经验回放。即先让agent去探索环境,将经验(记忆)累积到一定程度,再随机抽取出一批样本进行训练。为什么要随机抽取?因为agent去探索环境时采集到的样本是一个时间序列,样本之间具有连续性,如果每次得到样本就更新Q值,受样本分布影响,会对收敛造成影响。
这里我们联想到数据库领域,我们需要使用benchmark去回放得到不同的action对应的Q值。增强学习是试错学习(Trail-and-error),由于没有直接的指导信息,agent要以不断与环境进行交互,通过试错的方式来获得最佳策略。因此一开始可以看做是盲目的、随机的试验,但是根据反馈的reward来优化损失函数可以使得我们想要的Q table慢慢收敛。
损失函数
上面提到了损失函数,那么如何选取呢。在DQN中,Q值表中表示的是当前已学习到的经验。而根据公式计算出的 Q 值是agent通过与环境交互及自身的经验总结得到的一个分数(即:目标 Q 值)。最后使用目标 Q 值(target_q)去更新原来旧的 Q 值(q)。而目标 Q 值与旧的 Q 值的对应关系,正好是监督学习神经网络中结果值与输出值的对应关系。
所以,loss = (target_q - q)^2
即:整个训练过程其实就是 Q 值(q)向目标 Q 值(target_q)逼近的过程。
代码实现
看代码是最直观的,我先给出整个代码流程,然后再详细解释。
import tensorflow as tf
import numpy as np
from collections import deque
import random
class DeepQNetwork:
r = np.array([[-1, -1, -1, -1, 0, -1],
[-1, -1, -1, 0, -1, 100.0],
[-1, -1, -1, 0, -1, -1],
[-1, 0, 0, -1, 0, -1],
[0, -1, -1, 1, -1, 100],
[-1, 0, -1, -1, 0, 100],
])
# 执行步数。
step_index = 0
# 状态数。
state_num = 6
# 动作数。
action_num = 6
# 训练之前观察多少步。
OBSERVE = 1000.
# 选取的小批量训练样本数。
BATCH = 20
# epsilon 的最小值,当 epsilon 小于该值时,将不在随机选择行为。
FINAL_EPSILON = 0.0001
# epsilon 的初始值,epsilon 逐渐减小。
INITIAL_EPSILON = 0.1
# epsilon 衰减的总步数。
EXPLORE = 3000000.
# 探索模式计数。
epsilon = 0
# 训练步数统计。
learn_step_counter = 0
# 学习率。
learning_rate = 0.001
# γ经验折损率。
gamma = 0.9
# 记忆上限。
memory_size = 5000
# 当前记忆数。
memory_counter = 0
# 保存观察到的执行过的行动的存储器,即:曾经经历过的记忆。
replay_memory_store = deque()
# 生成一个状态矩阵(6 X 6),每一行代表一个状态。
state_list = None
# 生成一个动作矩阵。
action_list = None
# q_eval 网络。
q_eval_input = None
action_input = None
q_target = None
q_eval = None
predict = None
loss = None
train_op = None
cost_his = None
reward_action = None
# tensorflow 会话。
session = None
def __init__(self, learning_rate=0.001, gamma=0.9, memory_size=5000):
self.learning_rate = learning_rate
self.gamma = gamma
self.memory_size = memory_size
# 初始化成一个 6 X 6 的状态矩阵。
self.state_list = np.identity(self.state_num)
# 初始化成一个 6 X 6 的动作矩阵。
self.action_list = np.identity(self.action_num)
# 创建神经网络。
self.create_network()
# 初始化 tensorflow 会话。
self.session = tf.InteractiveSession()
# 初始化 tensorflow 参数。
self.session.run(tf.initialize_all_variables())
# 记录所有 loss 变化。
self.cost_his = []
def create_network(self):
"""
创建神经网络。
:return:
"""
self.q_eval_input = tf.placeholder(shape=[None, self.state_num], dtype=tf.float32)
self.action_input = tf.placeholder(shape=[None, self.action_num], dtype=tf.float32)
self.q_target = tf.placeholder(shape=[None], dtype=tf.float32)
neuro_layer_1 = 3
w1 = tf.Variable(tf.random_normal([self.state_num, neuro_layer_1]))
b1 = tf.Variable(tf.zeros([1, neuro_layer_1]) + 0.1)
l1 = tf.nn.relu(tf.matmul(self.q_eval_input, w1) + b1)
w2 = tf.Variable(tf.random_normal([neuro_layer_1, self.action_num]))
b2 = tf.Variable(tf.zeros([1, self.action_num]) + 0.1)
self.q_eval = tf.matmul(l1, w2) + b2
# 取出当前动作的得分。
self.reward_action = tf.reduce_sum(tf.multiply(self.q_eval, self.action_input), reduction_indices=1)
self.loss = tf.reduce_mean(tf.square((self.q_target - self.reward_action)))
self.train_op = tf.train.GradientDescentOptimizer(self.learning_rate).minimize(self.loss)
self.predict = tf.argmax(self.q_eval, 1)
def select_action(self, state_index):
"""
根据策略选择动作。
:param state_index: 当前状态。
:return:
"""
current_state = self.state_list[state_index:state_index + 1]
if np.random.uniform() < self.epsilon:
current_action_index = np.random.randint(0, self.action_num)
else:
actions_value = self.session.run(self.q_eval, feed_dict={self.q_eval_input: current_state})
action = np.argmax(actions_value)
current_action_index = action
# 开始训练后,在 epsilon 小于一定的值之前,将逐步减小 epsilon。
if self.step_index > self.OBSERVE and self.epsilon > self.FINAL_EPSILON:
self.epsilon -= (self.INITIAL_EPSILON - self.FINAL_EPSILON) / self.EXPLORE
return current_action_index
def save_store(self, current_state_index, current_action_index, current_reward, next_state_index, done):
"""
保存记忆。
:param current_state_index: 当前状态 index。
:param current_action_index: 动作 index。
:param current_reward: 奖励。
:param next_state_index: 下一个状态 index。
:param done: 是否结束。
:return:
"""
current_state = self.state_list[current_state_index:current_state_index + 1]
current_action = self.action_list[current_action_index:current_action_index + 1]
next_state = self.state_list[next_state_index:next_state_index + 1]
# 记忆动作(当前状态, 当前执行的动作, 当前动作的得分,下一个状态)。
self.replay_memory_store.append((
current_state,
current_action,
current_reward,
next_state,
done))
# 如果超过记忆的容量,则将最久远的记忆移除。
if len(self.replay_memory_store) > self.memory_size:
self.replay_memory_store.popleft()
self.memory_counter += 1
def step(self, state, action):
"""
执行动作。
:param state: 当前状态。
:param action: 执行的动作。
:return:
"""
reward = self.r[state][action]
next_state = action
done = False
if action == 5:
done = True
return next_state, reward, done
def experience_replay(self):
"""
记忆回放。
:return:
"""
# 随机选择一小批记忆样本。
batch = self.BATCH if self.memory_counter > self.BATCH else self.memory_counter
minibatch = random.sample(self.replay_memory_store, batch)
batch_state = None
batch_action = None
batch_reward = None
batch_next_state = None
batch_done = None
for index in range(len(minibatch)):
if batch_state is None:
batch_state = minibatch[index][0]
elif batch_state is not None:
batch_state = np.vstack((batch_state, minibatch[index][0]))
if batch_action is None:
batch_action = minibatch[index][1]
elif batch_action is not None:
batch_action = np.vstack((batch_action, minibatch[index][1]))
if batch_reward is None:
batch_reward = minibatch[index][2]
elif batch_reward is not None:
batch_reward = np.vstack((batch_reward, minibatch[index][2]))
if batch_next_state is None:
batch_next_state = minibatch[index][3]
elif batch_next_state is not None:
batch_next_state = np.vstack((batch_next_state, minibatch[index][3]))
if batch_done is None:
batch_done = minibatch[index][4]
elif batch_done is not None:
batch_done = np.vstack((batch_done, minibatch[index][4]))
# q_next:下一个状态的 Q 值。
q_next = self.session.run([self.q_eval], feed_dict={self.q_eval_input: batch_next_state})
q_target = []
for i in range(len(minibatch)):
# 当前即时得分。
current_reward = batch_reward[i][0]
# # 游戏是否结束。
# current_done = batch_done[i][0]
# 更新 Q 值。
q_value = current_reward + self.gamma * np.max(q_next[0][i])
# 当得分小于 0 时,表示走了不可走的位置。
if current_reward < 0:
q_target.append(current_reward)
else:
q_target.append(q_value)
_, cost, reward = self.session.run([self.train_op, self.loss, self.reward_action],
feed_dict={self.q_eval_input: batch_state,
self.action_input: batch_action,
self.q_target: q_target})
self.cost_his.append(cost)
# if self.step_index % 1000 == 0:
# print("loss:", cost)
self.learn_step_counter += 1
def train(self):
"""
训练。
:return:
"""
# 初始化当前状态。
current_state = np.random.randint(0, self.action_num - 1)
self.epsilon = self.INITIAL_EPSILON
while True:
# 选择动作。
action = self.select_action(current_state)
# 执行动作,得到:下一个状态,执行动作的得分,是否结束。
next_state, reward, done = self.step(current_state, action)
# 保存记忆。
self.save_store(current_state, action, reward, next_state, done)
# 先观察一段时间累积足够的记忆在进行训练。
if self.step_index > self.OBSERVE:
self.experience_replay()
if self.step_index > 10000:
break
if done:
current_state = np.random.randint(0, self.action_num - 1)
else:
current_state = next_state
self.step_index += 1
def pay(self):
"""
运行并测试。
:return:
"""
self.train()
# 显示 R 矩阵。
print(self.r)
for index in range(5):
start_room = index
print("#############################", "Agent 在", start_room, "开始行动", "#############################")
current_state = start_room
step = 0
target_state = 5
while current_state != target_state:
out_result = self.session.run(self.q_eval, feed_dict={
self.q_eval_input: self.state_list[current_state:current_state + 1]})
next_state = np.argmax(out_result[0])
print("Agent 由", current_state, "号房间移动到了", next_state, "号房间")
current_state = next_state
step += 1
print("Agent 在", start_room, "号房间开始移动了", step, "步到达了目标房间 5")
print("#############################", "Agent 在", 5, "结束行动", "#############################")
if __name__ == "__main__":
q_network = DeepQNetwork()
q_network.pay()
以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网
猜你喜欢:- canvas实例 --- 制作简易迷宫
- 如何走出混网管理迷宫
- 使用 PICO-8 开发迷宫小游戏
- 数据结构与算法(七):迷宫回溯和八皇后问题
- 利用UPnP协议在物联网设备的迷宫中隐藏自己
- 用Q-learning算法实现自动走迷宫机器人
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
Web 2.0 Architectures
Duane Nickull、Dion Hinchcliffe、James Governor / O'Reilly / 2009 / USD 34.99
The "Web 2.0" phenomena has become more pervasive than ever before. It is impacting the very fabric of our society and presents opportunities for those with knowledge. The individuals who understand t......一起来看看 《Web 2.0 Architectures》 这本书的介绍吧!