手把手教你用T-GCN实现交通流量预测(附代码详解)

张开发
2026/4/20 8:01:25 15 分钟阅读

分享文章

手把手教你用T-GCN实现交通流量预测(附代码详解)
从零构建T-GCN交通流量预测实战指南时空图神经网络基础概念交通流量预测一直是智慧城市建设中的核心挑战。传统方法往往将时间和空间特征割裂处理而T-GCNTemporal Graph Convolutional Network通过图卷积网络GCN捕捉道路网络的空间依赖关系结合门控循环单元GRU建模时间动态变化实现了时空特征的联合建模。关键优势空间建模将城市道路网络表示为图结构通过邻接矩阵量化路段间关联强度时序建模捕获交通流量的周期性早晚高峰和趋势性节假日变化端到端训练整个模型可微分支持反向传播优化典型应用场景包括实时交通调度优化导航路径规划城市基建容量规划异常流量预警数据准备与预处理数据集构建使用Los-loop高速公路数据集为例原始数据包含207个传感器节点2012年3月1日至6月30日的车流量记录5分钟采样间隔import pandas as pd # 加载原始数据 traffic_data pd.read_csv(los_loop.csv, index_col0) print(f数据集形状{traffic_data.shape}) # 节点特征矩阵示例 node_features traffic_data.iloc[:100, :] # 取前100个时间步图结构构建基于道路拓扑生成邻接矩阵import numpy as np def build_adjacency_matrix(sensor_locations, threshold1.0): 基于传感器地理距离构建邻接矩阵 :param sensor_locations: 各传感器经纬度坐标 :param threshold: 连接阈值(公里) num_nodes len(sensor_locations) adj np.zeros((num_nodes, num_nodes)) for i in range(num_nodes): for j in range(i1, num_nodes): dist haversine(sensor_locations[i], sensor_locations[j]) if dist threshold: adj[i][j] adj[j][i] 1.0 / (dist 1e-5) return adj数据标准化from sklearn.preprocessing import StandardScaler scaler StandardScaler() scaled_features scaler.fit_transform(node_features) # 划分训练/测试集 train_size int(0.8 * len(scaled_features)) train_data scaled_features[:train_size] test_data scaled_features[train_size:]T-GCN模型架构实现核心组件定义图卷积层import torch import torch.nn as nn class GraphConvolution(nn.Module): def __init__(self, adj, input_dim, output_dim): super().__init__() self.register_buffer(laplacian, self._normalize_adj(adj)) self.weight nn.Parameter(torch.randn(input_dim, output_dim)) self.bias nn.Parameter(torch.zeros(output_dim)) def _normalize_adj(self, adj): # 对称归一化拉普拉斯矩阵 D torch.diag(torch.sum(adj, dim1)) D_sqrt_inv torch.inverse(torch.sqrt(D)) return torch.eye(adj.shape[0]) - D_sqrt_inv adj D_sqrt_inv def forward(self, x): # x形状: [batch_size, num_nodes, input_dim] support torch.einsum(bnk,ko-bno, x, self.weight) output torch.einsum(ij,bnj-bni, self.laplacian, support) return output self.bias时序门控单元class TemporalGRU(nn.Module): def __init__(self, input_dim, hidden_dim): super().__init__() self.update_gate nn.Linear(input_dim hidden_dim, hidden_dim) self.reset_gate nn.Linear(input_dim hidden_dim, hidden_dim) self.candidate nn.Linear(input_dim hidden_dim, hidden_dim) def forward(self, x, h_prev): # x形状: [batch_size, num_nodes, input_dim] combined torch.cat([x, h_prev], dim-1) z torch.sigmoid(self.update_gate(combined)) r torch.sigmoid(self.reset_gate(combined)) h_tilde torch.tanh(self.candidate(torch.cat([x, r * h_prev], dim-1))) h_new (1 - z) * h_prev z * h_tilde return h_new完整T-GCN模型class TGCN(nn.Module): def __init__(self, adj, input_dim, hidden_dim, output_dim, seq_len): super().__init__() self.gcn GraphConvolution(adj, input_dim, hidden_dim) self.gru TemporalGRU(hidden_dim, hidden_dim) self.fc nn.Linear(hidden_dim * seq_len, output_dim) def forward(self, x): # x形状: [batch_size, seq_len, num_nodes, input_dim] batch_size x.size(0) h torch.zeros(batch_size, x.size(2), self.gru.hidden_dim).to(x.device) hidden_states [] for t in range(x.size(1)): x_gcn self.gcn(x[:, t]) h self.gru(x_gcn, h) hidden_states.append(h) out torch.cat(hidden_states, dim-1) return self.fc(out)模型训练与优化训练配置import torch.optim as optim # 初始化模型 adj_matrix build_adjacency_matrix(sensor_locs) model TGCN(adj_matrix, input_dim1, hidden_dim64, output_dim1, seq_len12).to(device) # 定义损失函数和优化器 criterion nn.MSELoss() optimizer optim.Adam(model.parameters(), lr0.001, weight_decay1e-5) scheduler optim.lr_scheduler.ReduceLROnPlateau(optimizer, min, patience5)训练循环def train_epoch(model, dataloader): model.train() total_loss 0 for batch_idx, (inputs, targets) in enumerate(dataloader): optimizer.zero_grad() outputs model(inputs) loss criterion(outputs, targets) loss.backward() optimizer.step() total_loss loss.item() return total_loss / len(dataloader) # 典型训练过程 for epoch in range(100): train_loss train_epoch(model, train_loader) val_loss evaluate(model, val_loader) scheduler.step(val_loss) if epoch % 10 0: print(fEpoch {epoch}: Train Loss {train_loss:.4f} | Val Loss {val_loss:.4f})关键训练技巧学习率调度初始学习率设为0.001采用ReduceLROnPlateau策略最小学习率不低于1e-6正则化方法# 在优化器中配置L2正则化 optimizer optim.Adam([ {params: model.gcn.parameters(), weight_decay: 1e-4}, {params: model.gru.parameters()}, {params: model.fc.parameters(), weight_decay: 1e-3} ], lr0.001)早停机制patience 10 best_loss float(inf) counter 0 while counter patience: val_loss evaluate(model, val_loader) if val_loss best_loss: best_loss val_loss counter 0 torch.save(model.state_dict(), best_model.pth) else: counter 1结果可视化与分析预测效果展示import matplotlib.pyplot as plt def plot_predictions(model, test_loader): model.eval() with torch.no_grad(): inputs, targets next(iter(test_loader)) predictions model(inputs) plt.figure(figsize(12, 6)) plt.plot(targets.cpu().numpy()[0], label真实值) plt.plot(predictions.cpu().numpy()[0], label预测值) plt.legend() plt.title(交通流量预测对比) plt.xlabel(时间步) plt.ylabel(标准化流量) plt.show() plot_predictions(model, test_loader)评估指标计算指标名称计算公式理想值范围MAE$\frac{1}{n}\sum|y-\hat{y}|$越小越好RMSE$\sqrt{\frac{1}{n}\sum(y-\hat{y})^2}$越小越好MAPE$\frac{100%}{n}\sum|\frac{y-\hat{y}}{y}|$10%from sklearn.metrics import mean_absolute_error, mean_squared_error def evaluate_metrics(model, dataloader): model.eval() targets_all, preds_all [], [] with torch.no_grad(): for inputs, targets in dataloader: preds model(inputs) targets_all.append(targets.cpu()) preds_all.append(preds.cpu()) y_true torch.cat(targets_all).numpy() y_pred torch.cat(preds_all).numpy() return { MAE: mean_absolute_error(y_true, y_pred), RMSE: np.sqrt(mean_squared_error(y_true, y_pred)), MAPE: np.mean(np.abs((y_true - y_pred) / y_true)) * 100 }工程实践建议性能优化技巧图结构优化采用KNN构建稀疏邻接矩阵使用阈值过滤弱连接def sparse_adjacency(adj, top_k5): 保留每个节点的top_k最强连接 adj_sparse np.zeros_like(adj) for i in range(adj.shape[0]): idx np.argpartition(adj[i], -top_k)[-top_k:] adj_sparse[i][idx] adj[i][idx] return adj_sparse内存优化使用混合精度训练from torch.cuda.amp import autocast, GradScaler scaler GradScaler() with autocast(): outputs model(inputs) loss criterion(outputs, targets) scaler.scale(loss).backward() scaler.step(optimizer) scaler.update()实际部署考量增量训练def incremental_train(model, new_data, epochs5): optimizer optim.Adam(model.parameters(), lr0.0001) for _ in range(epochs): model.train() for inputs, targets in new_data: optimizer.zero_grad() outputs model(inputs) loss criterion(outputs, targets) loss.backward() optimizer.step()模型轻量化知识蒸馏量化感知训练model_quant torch.quantization.quantize_dynamic( model, {nn.Linear}, dtypetorch.qint8 )异常处理机制def safe_predict(model, input_data): try: with torch.no_grad(): if input_data.isnan().any(): raise ValueError(输入包含NaN值) return model(input_data) except Exception as e: print(f预测错误: {str(e)}) return None

更多文章