| 
积分20威望20 金圆券362 PB小红花0 臭鸡蛋0 法币0 FB 
 
 列兵 
 
 
	积分20威望20 金圆券362 PB小红花0 臭鸡蛋0 法币0 FB 
 | 
 
| import torch import torch.nn as nn
 import torch.optim as optim
 import pandas as pd
 import numpy as np
 from sklearn.preprocessing import MinMaxScaler, StandardScaler
 from sklearn.metrics import r2_score, mean_absolute_error, mean_squared_error
 from sklearn.model_selection import train_test_split
 import matplotlib.pyplot as plt
 from torch.utils.data import DataLoader, TensorDataset
 from sklearn.preprocessing import RobustScaler
 # 1. 增强版数据预处理
 def load_and_preprocess_data():
 try:
 # Load the data
 data = pd.read_csv('CS2_35.csv', na_values=[' ', 'NA', 'N/A', 'NaN'])
 
 # Ensure required columns exist
 required_cols = ['cycle', 'resistance', 'CCCT', 'CVCT', 'SoH']
 if not all(col in data.columns for col in required_cols):
 raise ValueError("CSV file is missing required columns.")
 
 # Filter and clean data
 data = data[required_cols].dropna()
 
 # Scale features and target
 x_scaler = RobustScaler()
 y_scaler = RobustScaler()
 
 X = x_scaler.fit_transform(data[['resistance', 'CCCT', 'CVCT']])
 y = y_scaler.fit_transform(data[['SoH']])
 
 return X, y, x_scaler, y_scaler
 
 except Exception as e:
 print(f"Error loading or preprocessing data: {e}")
 raise  # Re-raise the exception to stop execution
 
 
 # 2. 改进的模型架构
 class EnhancedModel(nn.Module):
 def __init__(self, input_size):
 super().__init__()
 self.net = nn.Sequential(
 nn.Linear(input_size, 128),
 nn.BatchNorm1d(128),
 nn.ReLU(),
 nn.Dropout(0.3),
 
 nn.Linear(128, 64),
 nn.BatchNorm1d(64),
 nn.ReLU(),
 nn.Dropout(0.3),
 
 nn.Linear(64, 32),
 nn.BatchNorm1d(32),
 nn.ReLU(),
 
 nn.Linear(32, 1)
 )
 
 def forward(self, x):
 return self.net(x)
 
 
 # 3. 优化的训练流程
 def train_and_evaluate():
 # 加载数据
 X, y, x_scaler, y_scaler = load_and_preprocess_data()
 
 # 数据集划分(分层抽样)
 X_train, X_test, y_train, y_test = train_test_split(
 X, y, test_size=0.2, random_state=42, shuffle=True)
 
 # 转换为Tensor
 X_train = torch.FloatTensor(X_train)
 X_test = torch.FloatTensor(X_test)
 y_train = torch.FloatTensor(y_train)
 y_test = torch.FloatTensor(y_test)
 
 # 创建DataLoader
 train_dataset = TensorDataset(X_train, y_train)
 train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True)
 
 # 初始化模型
 model = EnhancedModel(input_size=X.shape[1])
 criterion = nn.MSELoss()
 optimizer = optim.AdamW(model.parameters(), lr=0.0005, weight_decay=0.001)
 scheduler = optim.lr_scheduler.CosineAnnealingLR(optimizer, T_max=100)
 
 # 训练循环
 print("=== 开始训练 ===")
 best_r2 = -np.inf
 patience = 30
 patience_counter = 0
 
 for epoch in range(500):
 model.train()
 epoch_loss = 0
 for batch_x, batch_y in train_loader:
 optimizer.zero_grad()
 outputs = model(batch_x)
 loss = criterion(outputs, batch_y)
 loss.backward()
 optimizer.step()
 epoch_loss += loss.item()
 
 scheduler.step()
 
 # 验证
 model.eval()
 with torch.no_grad():
 y_pred = model(X_test)
 y_test_orig = y_scaler.inverse_transform(y_test.numpy())
 y_pred_orig = y_scaler.inverse_transform(y_pred.numpy())
 current_r2 = r2_score(y_test_orig, y_pred_orig)
 
 # 早停机制基于R²
 if current_r2 > best_r2:
 best_r2 = current_r2
 patience_counter = 0
 torch.save(model.state_dict(), 'best_model.pth')
 else:
 patience_counter += 1
 if patience_counter >= patience:
 print(f"Early stopping at epoch {epoch}")
 break
 
 if (epoch + 1) % 20 == 0:
 print(f"Epoch {epoch + 1:3d} | Loss: {epoch_loss / len(train_loader):.4f} | R²: {current_r2:.4f}")
 
 # 加载最佳模型
 model.load_state_dict(torch.load('best_model.pth'))
 
 # 最终评估
 model.eval()
 with torch.no_grad():
 y_pred = model(X_test)
 y_test_orig = y_scaler.inverse_transform(y_test.numpy())
 y_pred_orig = y_scaler.inverse_transform(y_pred.numpy())
 
 r2 = r2_score(y_test_orig, y_pred_orig)
 mae = mean_absolute_error(y_test_orig, y_pred_orig)
 rmse = np.sqrt(mean_squared_error(y_test_orig, y_pred_orig))
 mape = np.mean(np.abs((y_test_orig - y_pred_orig) / y_test_orig)) * 100
 
 print("\n=== 最终测试结果 ===")
 print(f"MAE: {mae:.4f}")
 print(f"MAPE: {mape:.4f}%")
 print(f"RMSE: {rmse:.4f}")
 print(f"R² Score: {r2:.4f}")
 
 # 可视化
 plt.figure(figsize=(15, 5))
 
 # 预测 vs 真实
 plt.subplot(1, 3, 1)
 plt.scatter(y_test_orig, y_pred_orig, alpha=0.5)
 plt.plot([y_test_orig.min(), y_test_orig.max()],
 [y_test_orig.min(), y_test_orig.max()], 'r--')
 plt.xlabel('True SoH')
 plt.ylabel('Predicted SoH')
 plt.title(f'Prediction (R²={r2:.2f})')
 
 # 趋势对比
 plt.subplot(1, 3, 2)
 plt.plot(y_test_orig, label='True')
 plt.plot(y_pred_orig, label='Predicted')
 plt.xlabel('Sample Index')
 plt.ylabel('SoH')
 plt.legend()
 plt.title('Trend Comparison')
 
 # 误差分布
 plt.subplot(1, 3, 3)
 errors = y_test_orig - y_pred_orig
 plt.hist(errors, bins=30)
 plt.xlabel('Prediction Error')
 plt.ylabel('Frequency')
 plt.title('Error Distribution')
 
 plt.tight_layout()
 plt.savefig('results.png', dpi=300)
 plt.show()
 
 # Create a DataFrame with true and predicted values
 results_df = pd.DataFrame({
 'True_SoH': y_test_orig.flatten(),
 'Predicted_SoH': y_pred_orig.flatten()
 })
 
 # Save to CSV
 results_df.to_csv('LSTM_predictions.csv', index=False)
 print("Predictions saved to LSTM_predictions.csv")
 
 
 if __name__ == "__main__":
 train_and_evaluate()
 
 
 | 
 评分
查看全部评分
 |