需要源码请点赞关注收藏后评论区留下QQ~~~
一、Sarsa算法简介
Sarsa算法每次更新都需要获取五元组(S,A,R,S',A')这也是该算法称为Sarsa的原因,每当从非终止状态进行一次转移后,就进行一次更新,但需要注意的是,动作A是情节中实际发生的动作。在更新(S,A)的动作值函数Q(S,A)时,Agent并不实际执行状态S'下的动作A‘。由于采用了贪心策略,Sarsa算法在各时间步都隐式地进行了策略改进,像这种在每个样本更新后都进行策略改进的策略迭代算法,也成为完全乐观策略算法
估算最优策略的Sarsa算法步骤如下
二、风险投资问题实战
在进行投资时,预期收益是一个非常重要的参考指标,现在越来越多的人接收概率的观点,但是收益为正的投资也未必一定理性。
假设一种风险投资,当前本金为S,下一个单位时间有0.5的概率变为原有资产的0.9倍,0.5的概率变为原有资产的1.11倍。经过一个时间单位后预期收益率变为百分之0.5,但是在实际情况中,进行2n各时间步后连续投资预期收益非常低,当N趋近于无穷时,该投资会血本无归,为了使投资更加合理,利用Sarsa算法给出在给定本金情况下的投资方案
该问题的MDP模型如下
1:状态空间
状态为当前资产的数目,因此状态空间为连续的实数空间
2:动作空间
该问题中一共有两个动作,分别为投资和不投资。用0表示不投资,用1表示投资
3:立即奖赏
在这个问题中 如果不投资则立即奖赏为0,如果进行投资则分为四种情况
资产增长且结果大于等于本金 +1奖赏
资产减少 且投资结果小于等于本金 -1奖赏
资产增加 且投资结果小于本金 +0.5奖赏
资产减少 且投资结果大于本金 -0.5奖赏
使用Sarsa算法解决风险投资问题 首先设置本金为10 将动作值函数设置为50×2的数组,且初值为0,初始学习率为0.05,使用贪心策略
当迭代到20000个情节时 迭代已经收敛 导出的Q值迭代过程表如下
结论:当资金小于本金时(10)时,不进行投资,当现有资金大于等于本金(10)时,可以进行投资
部分代码如下
import numpy as np from invest import InvestEnv np.random.seed(1) env = InvestEnv() def trans_q(Q): # 输出保留3位小数 new_q = [] new_q = [round(x, 3) for x in Q] return new_q def Attenuation(epsilon, alpha, episode_sum, episode): # epsilon和alpha衰减函数 epsilon = (float(episode_sum) - float(episode)) / float(episode_sum) * epsilon alpha = (float(episode_sum) - float(episode)) / float(episode_sum) * alpha return epsilon, alpha # 输出函数 def print_ff(list_q, Q, episode_i, epsilon_k, alpha_k): list_s = range(0,50) for em in list_q: if em == episode_i: print("*******************************情节数:%s*******************************" % (str(em))) for state in list_s: print("Q(%d,*)" % (state) + str(trans_q(Q[state]))) prob = [epsilon_k / 2.0, epsilon_k / 2.0] max_a = np.argmax(Q[state]) prob[max_a] = 1 - (epsilon_k / 2.0) print('概率值' + str(trans_q(prob))) print("epsilon_k: {}".format(epsilon_k)) print("alpha_k:{}".format(alpha_k)) # 输出单步计算过程 def print_dd(s, a, R, next_s, next_a, print_len, episode_i, Q, e_k, a_k, P, P_next): if s == 6 and a == 1: print("*********************************单步的计算过程************************************") print(6, 1) print("alpha:" + str(a_k)) print("epsilon:" + str(e_k)) print("Q_state: {} Q_next: {}".format(Q[s], Q[next_s])) print("Q[{},{}]: {}".format(s, a, Q[s, a])) print("Q[{},{}]: {}".format(next_s, next_a, Q[next_s, next_a])) print("update:" + str(Q[s, a] + a_k * (R + 0.8 * Q[next_s, next_a] - Q[s, a]))) # print(p) print("************************************************************************************") def policy_epsilon_greedy(env, s, Q, epsilon): Q_s = Q[s] if np.random.rand() < epsilon: a = np.random.choice(env.action_space) else: a = np.argmax(Q_s) return a def Sarsa(env, episode_num, alpha, gamma, epsilon): Q = np.zeros((env.state_space, env.action_space)) epsilon = epsilon count = 0 list_q = [0,1,2,3,4,9998,9999,10000,10001,10002,19996,19997,19998,19999,20000] for episode_i in range(episode_num): env.reset() S = env.state epsilon_k, alpha_k = Attenuation(epsilon, alpha, episode_num, episode_i) A = policy_epsilon_greedy(env, S, Q, epsilon_k) print_ff(list_q, Q, episode_i, epsilon_k, alpha_k) if episode_i == 10000: print("e_k:" + str(epsilon_k) + "a_k" + str(alpha_k)) done = False P_S = env.getState() for i in range(1000): next_S, R = env.step(A) P_S_next = env.getState() if next_S > 49: Q[S, A] = Q[S, A] + alpha_k * (R + gamma * 0.0 - Q[S, A]) break next_A = policy_epsilon_greedy(env, next_S, Q, epsilon_k) # 输出某一个 if episode_i in [9999, 10000]: count += 1 print(count) return Q Q = Sarsa(env, 20001, 0.05, 0.8, 0.5)
创作不易 觉得有帮助请点赞关注收藏~~~