ppo算法简单实现

张开发
2026/4/16 13:20:13 15 分钟阅读

分享文章

ppo算法简单实现
导入必要的库importtorchimporttorch.nnasnnimporttorch.optimasoptimimporttorch.nn.functionalasFimportgymimportnumpyasnp超参数# --- 超参数 ---learning_rate0.002gamma0.99lmbda0.95# GAE 参数eps_clip0.1# PPO 剪切范围K_epochs3# 同一批数据重复训练次数T_horizon20# 步长周期定义PPO模型classPPO(nn.Module):def__init__(self):super(PPO,self).__init__()self.data[]self.fc1nn.Linear(4,256)self.fc_pinn.Linear(256,2)# 策略头 (Actor)self.fc_vnn.Linear(256,1)# 价值头 (Critic)self.optimizeroptim.Adam(self.parameters(),lrlearning_rate)defpi(self,x,softmax_dim0):xF.relu(self.fc1(x))xself.fc_pi(x)probsF.softmax(x,dimsoftmax_dim)returnprobsdefv(self,x):xF.relu(self.fc1(x))vself.fc_v(x)returnvdefput_data(self,transition):self.data.append(transition)defmake_batch(self):s_lst,a_lst,r_lst,s_prime_lst,prob_a_lst,done_lst[],[],[],[],[],[]fortransitioninself.data:s,a,r,s_prime,prob_a,donetransition s_lst.append(s);a_lst.append([a]);r_lst.append([r])s_prime_lst.append(s_prime);prob_a_lst.append([prob_a]);done_lst.append([done])s,a,r,s_prime,done,prob_atorch.tensor(s_lst,dtypetorch.float),torch.tensor(a_lst),\ torch.tensor(r_lst),torch.tensor(s_prime_lst,dtypetorch.float),\ torch.tensor(done_lst,dtypetorch.float),torch.tensor(prob_a_lst)self.data[]returns,a,r,s_prime,done,prob_adeftrain_net(self):s,a,r,s_prime,done,prob_aself.make_batch()foriinrange(K_epochs):# 计算 TD Target 和 Advantage (GAE 简化版)td_targetrgamma*self.v(s_prime)*(1-done)deltatd_target-self.v(s)deltadelta.detach().numpy()advantage_lst[]adv0.0fordelta_tindelta[::-1]:advgamma*lmbda*advdelta_t[0]advantage_lst.append([adv])advantage_lst.reverse()advantagetorch.tensor(advantage_lst,dtypetorch.float)# 计算 Ratiopiself.pi(s,softmax_dim1)pi_api.gather(1,a)ratiotorch.exp(torch.log(pi_a)-torch.log(prob_a))# PPO 核心损失函数surr1ratio*advantage surr2torch.clamp(ratio,1-eps_clip,1eps_clip)*advantage loss-torch.min(surr1,surr2)F.smooth_l1_loss(self.v(s),td_target.detach())self.optimizer.zero_grad()loss.mean().backward()self.optimizer.step()主循环# --- 主循环 ---defmain():envgym.make(CartPole-v1)modelPPO()score0.0forn_epiinrange(1000):senv.reset()[0]ifisinstance(env.reset(),tuple)elseenv.reset()doneFalsewhilenotdone:fortinrange(T_horizon):probmodel.pi(torch.from_numpy(s).float())mtorch.distributions.Categorical(prob)am.sample().item()step_resultenv.step(a)iflen(step_result)5:s_prime,r,terminated,truncated,infostep_result doneterminatedortruncatedelse:s_prime,r,done,infostep_result model.put_data((s,a,r/100.0,s_prime,prob[a].item(),done))ss_prime scorerifdone:breakmodel.train_net()ifn_epi%200andn_epi!0:print(f# Episode:{n_epi}, Avg Score:{score/20})score0.0env.close()if__name____main__:main()

更多文章