摘要:交叉验证是评估机器学习模型泛化能力、防止过拟合的重要技术。然而,当应用于时间序列数据时,标准交叉验证方法可能带来数据泄漏和自相关风险,导致模型性能估计过于乐观。本文将系统介绍时间序列数据交叉验证的核心原则、主要方法及在PyTorch GRU和Scikit-Le
交叉验证是评估机器学习模型泛化能力、防止过拟合的重要技术。然而,当应用于时间序列数据时,标准交叉验证方法可能带来数据泄漏和自相关风险,导致模型性能估计过于乐观。本文将系统介绍时间序列数据交叉验证的核心原则、主要方法及在PyTorch GRU和Scikit-Learn SVR中的实际应用。
交叉验证(CV)是一种评估机器学习模型泛化能力的统计技术。
CV 首先将原始数据集划分为训练集和验证(或测试)集。
然后,它在不同的训练集上反复训练模型,并在验证集上验证其性能,使评估比单次训练测试分割更可靠。
主要交叉验证方法有许多 CV 方法以独特的方式对数据进行划分。
下图比较了常见的 CV 方法:
最佳 CV 方法取决于数据类型(例如独立样本与时间序列)和模型对结构的敏感度。
何时应用交叉验证CV 可防止过度拟合,从而有助于:
从不同的模型系列中 选择最佳模型 调整特定模型的 超参数和神经结构通过反复训练和验证模型。
适用情况:
顺序数据分析:单个随机保留分割可能会导致数据泄漏(我将在下一节中介绍核心原理)。模型选择或调整:使用单一分割可能无法推广最佳模型或超参数。使用小规模交叉验证可以提高选择/调整结果的可信度。分类任务中高度不平衡的情况:随机的单一拆分可能会导致类别极度不平衡,从而导致模型出现偏差。交叉验证可以减轻模型性能中的偏差。另一方面,可不使用情况:
这种外部信息泄露使得模型在训练或验证过程中可以作弊,但模型无法在未见数据上达到相同的性能。
数据泄露主要有两种类型:
一种简单的情况是,要预测的目标变量作为特征包含在训练数据中。
即,使用包含实际房屋销售价格作为特征的数据来训练 预测房屋销售价格的模型。2.间接泄漏:
训练数据中包含与目标变量高度相关但在预测时未知的特征。
即,使用包括 当前房产税率在内的数据来训练预测房屋销售价格的模型。这里的漏洞是,当模型预测销售价格时,未来的房产税率是未知的。
下图展示了泄漏的流动方式:
首先,对模型进行包括当前房产税率在内的输入特征的训练。
该模型预测房屋销售价格为 450,000 美元。 该房屋售价为 500,000 美元。当地政府根据售价更新房屋的评估价值。
新的房产税是根据这个新的、更高的评估价值计算的。然后以这个新的房产税价值作为特征来训练模型。
该模型得知,高税率意味着高房屋售价。
这种学习是不准确的,因为税率是在销售后计算的,所以税率和销售价格之间存在相关性。
因此,税率不应包含在训练数据中,模型专注于预测税前销售价格。
防止顺序数据中的数据泄漏在顺序数据环境中,当未来的信息(验证数据)直接或间接地与过去的信息(训练数据)结合时,就会发生数据泄漏。
为了避免泄漏,需要遵循几个核心原则:
使用简单的验证方法来维持时间顺序,利用时间序列特定的验证,以及防止自相关,即训练集和验证集中在时间上接近的数据点相互关联。在下一节中,我将探讨利用这些原理的技术。
序列数据的交叉验证必须保留临时顺序,同时防止数据泄漏。现在,我们来看看主要的方法。
1. 维持时间顺序的简单验证方法简单的验证方法使用一些数据分割,同时保持时间顺序(只是不打乱数据)。
这些方法是评估的良好起点,尤其是在数据集很大的情况下。
主要方法有:
单一训练-测试分割这是最简单的方法,其中数据集分为两部分:训练集(第一部分)和验证集(最后一部分)。
该模型在历史数据上进行一次训练,并在验证集上进行评估:
适用:
缺点:
如果所选的验证期不能代表未来数据,则可能导致不可靠的性能估计。蒙特卡洛交叉验证该方法为每个折叠随机选择一个验证原点,使用该原点之前的数据进行训练,使用该原点之后的数据进行验证。
蒙特卡洛交叉验证假设数据点是独立的,并且它们的统计属性不会随时间而改变。
当满足这些条件时,随机抽样不会破坏底层数据结构,使该方法成为传统时间序列交叉验证的有效替代方案。
独立数据的标准交叉验证方法。它将观测值随机打乱,并分成 K 个大小相等的折叠。
适用:
缺点:
与蒙特卡洛交叉验证类似,数据泄漏发生在非平稳时间序列上。
对于具有强自相关的平稳数据,折叠边界存在数据泄漏风险,因为折叠之间没有时间间隔。
块状 K 折交叉验证K-fold 的修改版本,其中数据未被打乱。
时间序列被分成连续的块(折叠),每个块依次用作验证集。
适用:
时间序列可以是非 平稳的,但自相关性有限。由于样本量较小,需要使用训练集中的所有数据。
缺点:
2. 时间序列特定验证这些方法专门用于维护时间序列数据的时间顺序,重点关注模型在前瞻性场景中的表现。
主要技术包括:
前向验证(“滚动窗口”或“滑动窗口”)该方法与增长窗口方法类似,但训练和验证窗口大小保持不变。随着验证窗口向前移动,最旧的训练数据将被丢弃。
该方法是在交叉验证时间序列中最常见且最稳健的方法。
适用:
缺点:
这是计算成本最高的方法,因为模型在滚动窗口的每一步都要重新训练。丢弃旧数据可能会丢失宝贵的长期趋势或季节性模式。
时间序列交叉验证(“生长窗口”)一种顺序方法,其中每次折叠的训练集都会扩展以包含所有先前的数据。
该模型根据不断增长的历史记录进行重新训练,并在固定大小的后续数据块上进行验证。
图:数据分区图像适用:
模型的性能受益于更多数据。
模拟生产,其中模型会定期使用新信息进行更新。
缺点:
3. 防止自相关这些方法旨在通过在集合之间故意添加间隙来防止时间序列数据的训练和验证集的自相关。
主要方法包括:
具有间隙的时间序列交叉验证(“Gap”)此方法是时间序列交叉验证的一种变体,它在训练集和验证集之间引入了未使用的数据间隙。
这种差距有助于在两组之间建立更多的独立性。
适用:
数据具有高度自相关性
确保训练数据和验证数据严格分离,以避免数据泄露。
缺点:
hv-Blocked K-Fold交叉验证阻塞验证的一种高级形式,在训练块和验证块之间引入了时间间隙。
原理上与“清除和禁运”技术类似。它是基本阻断方法的更强大版本。
适用:
缺点:
清除和禁运交叉验证该方法旨在防止验证集中的事件可能影响训练数据的情况下的数据泄漏。
它会“清除”(删除)那些距离验证期太近的训练数据点。
然后,它会通过删除验证期后可能受到未来信息影响的训练数据来实施“禁运”。
适用:
时间序列具有较高的自相关性, 严格防止数据泄露至关重要(例如,金融)缺点:
一些训练数据由于存在间隙而未被使用,这可能导致欠拟合(与基于 K 折的方法相比,丢弃的数据量可能很大)。
这些是交叉验证序列数据的主要方法。
良好的损失函数与经过深思熟虑的交叉验证方法相结合可以产生性能最佳的模型。
基于 PyTorch 构建的GRU (门控循环单元)网络和 一个更简单的模型,Scikit-Learn 上的 SVR(支持向量回归) 。创建数据集首先加载并设计了交通量数据:
# 将数据加载为 df
file_path = 'data/Metro_Interstate_Traffic_Volume.csv'
df = pd.read_csv(file_path, sep= ',' )
# 添加 dt 相关特征(gru 的关键步骤)
df['date_time'] = pd.to_datetime(df['date_time'])
df['year'] = df['date_time'].dt.year
df['month'] = df['date_time'].dt.month
df['hour'] = df['date_time'].dt.hour
df['day_of_week'] = df['date_time'].dt.dayofweek # cat (0 to 6)
df['is_weekend'] = df['day_of_week'].isin([5, 6])
df['is_holiday'] = df['holiday'].notna
# 删除不必要的列
df = df.drop(columns=['holiday', 'weather_description', 'date_time'], axis=1)
# 创建输入和目标变量
target_col = 'traffic_volume'
y = df[target_col]
X = df.drop(target_col, axis=1)
# 将数据分成两组:训练集和测试集
from sklearn.model_selection import train_test_split
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size= 0.2 , shuffle= False )
为了保持时间顺序,在创建训练集和测试集时不应打乱原始数据。
X_test用于评估泛化能力。训练/验证阶段不得使用,以免数据泄露。最后,转换输入特征:
from sklearn.impute import SimpleImputerfrom sklearn.preprocessing import StandardScaler
from sklearn.compose import ColumnTransformer
from sklearn.pipeline import Pipeline
from category_encoders import BinaryEncoder
# 对数值和分类特征进行分类
cat_cols, num_cols = ,
for col in df.columns.to_list:
if col == target_col: pass
else:
if df[col].dtype == 'object'or df[col].dtype == 'bool': cat_cols.append(col)
else: num_cols.append(col)
# 定义列转换器
num_transformer = Pipeline(steps=[
('imputer', SimpleImputer(strategy='median')),
('scaler', StandardScaler)]
)
cat_transformer = Pipeline(steps=[('encoder', BinaryEncoder(cols=cat_cols))])
preprocessor = ColumnTransformer(
transformers=[
('num', num_transformer, num_cols),
('cat', cat_transformer, cat_cols)
],
remainder='passthrough'
)
# 转换输入特征
X_train = preprocessor.fit_transform(X_train)
X_test = preprocessor.transform(X_test)
最终的训练集有38,563 个样本,包含16 个输入特征。
正如我们所了解的,如果这不是时间序列或者我们不需要调整模型,那么像这样的大型数据集就不需要交叉验证。
定义 GRU 模型接下来,我在 PyTorch 库上定义了GRUimport torch.nn as nn# 定义一个简单的 gru 模型(多对一架构)
class GRU(nn.Module):
def __init__(self, input_size=X_train.shape[1], hidden_size=64, output_size=1):
super(GRU, self).__init__
self.gru = nn.GRU(input_size=input_size, hidden_size=hidden_size, batch_first=True)
self.fc = nn.Linear(hidden_size, output_size)
def forward(self, x):
h_gru, _ = self.gru(x)
o_final = self.fc(h_gru[:, -1, :])
return o_final
该类GRU采用简单的多对一架构,其中按时间步长的输出组合成一个最终输出。交叉验证接下来,我定义了一个函数,其中X_train
遵循交叉验证的最佳实践,该函数在每次折叠中启动优化器和模型。
为了进行比较,我在所有方法中使用了相同的参数:
num_epochs:训练周期数(设置为300)lr:优化器的学习率(设置为0.001)num_folds:交叉验证方法创建的折叠数(设置为 5),以及test_size:来自训练数据的测试(验证)数据的大小(设置为 0.2(20%))。上下滑动查看更多源码
import numpy as npimport torch
from sklearn.model_selection import KFold, TimeSeriesSplit, train_test_split
from tqdm import tqdm
def train_and_validate(
validation_method,
num_epochs,
lr,
X_train=X_train,y_train=y_train,# 根据给定的交叉验证方法将 X_train 拆分为折叠
num_folds= 5,
test_size= 0.2
)-> dict:
# 记录损失历史
fold_train_losses =
fold_val_losses =
model = None
# 根据所选验证方法定义拆分
match validation_method:
case "Holdout":
train_size = int((1 - test_size) * len(X_train))
train_indices = np.arange(train_size)
val_indices = np.arange(train_size, len(X_train))
splits = [(train_indices, val_indices)]
case "Monte Carlo":
splits =
for _ in range(num_folds):
train_indices, val_indices = train_test_split(
np.arange(len(X_train)),
test_size=test_size,
shuffle=True
)
splits.append((train_indices, val_indices))
case "K-Fold":
kf = KFold(n_splits=num_folds, shuffle=True, random_state=42)
splits = kf.split(X_train)
case "Blocked K-Fold":
kf_blocked = KFold(n_splits=num_folds, shuffle=False)
splits = kf_blocked.split(X_train)
case "Growing Window":
tss = TimeSeriesSplit(n_splits=num_folds)
splits = tss.split(X_train)
case "Sliding Window":
splits =
window_size = int(len(X_train) / (num_folds + 1))
for i in range(num_folds):
train_start = i * window_size
train_end = train_start + window_size
val_start = train_end
val_end = val_start + window_size
if val_end > len(X_train):
val_end = len(X_train)
splits.append(
(np.arange(train_start, train_end), np.arange(val_start, val_end))
)
case "Gap":
splits =
tss_gap = TimeSeriesSplit(n_splits=num_folds)
for train_idx, val_idx in tss_gap.split(X_train):
gap_size = int(0.1 * len(val_idx))
train_idx = train_idx[:-gap_size]
splits.append((train_idx, val_idx))
case "hv-Blocked K-Fold":
splits =
kf_blocked_gap = KFold(n_splits=num_folds, shuffle=False)
for train_idx, val_idx in kf_blocked_gap.split(X_train):
case _:
raise ValueError(f"Unknown validation method: {validation_method}")
# 训练和验证循环
for fold, (train_idx, val_idx) in enumerate (tqdm(splits, desc= f"Training with {validation_method} " )):
# 定义模型、优化器和损失函数(为每个 fold 初始化一个新模型和优化器)
model = GRU(hidden_size=64, output_size=1)
optimizer = torch.optim.Adam(model.parameters, lr=lr)
criterion = nn.MSELoss
# 创建训练和验证 X_train 和张量 X_train 加载器
train_idx = range(int(0.8 * len(X_train)))
val_idx = range(int(0.8 * len(X_train)), len(X_train))
# 分离特征 X 和目标 y 以用于 cv 的训练和验证
X_train_cv, y_train_cv = X_train[train_idx, :], y_train[train_idx]
X_val_cv, y_val_cv = X_train[val_idx, :], y_train[val_idx]
# 将 numpy 数组转换为 pytorch 张量
X_train_cv = torch.from_numpy(X_train_cv).float
y_train_cv = torch.from_numpy(y_train_cv.values).float
X_val_cv = torch.from_numpy(X_val_cv).float
y_val_cv = torch.from_numpy(y_val_cv.values).float
# 创建张量数据集和数据加载器
train_dataset = torch.utils.data.TensorDataset(X_train_cv, y_train_cv)
train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=32, shuffle=False)
val_dataset = torch.utils.data.TensorDataset(X_val_cv, y_val_cv)
val_loader = torch.utils.data.DataLoader(val_dataset, batch_size=32, shuffle=False)
# 开始验证
fold_train_loss_history =
fold_val_loss_history =
for _ in range (num_epochs):
# 训练循环
model.train
train_loss = 0
for X_batch, y_batch in train_loader:
X_batch = X_batch.unsqueeze(1)
y_batch = y_batch.unsqueeze(1)
outputs = model(X_batch)
loss = criterion(outputs, y_batch)
optimizer.zero_grad
loss.backward
optimizer.step
train_loss += loss.item
avg_train_loss = train_loss / len(train_loader)
fold_train_loss_history.append(avg_train_loss)
# 验证
model.eval
val_loss = 0
with torch.inference_mode:
for X_batch, y_batch in val_loader:
val_loss += criterion(outputs, y_batch).item
avg_val_loss = val_loss / len(val_loader) if len(val_loader) > 0else0
fold_val_loss_history.append(avg_val_loss)
# 用于记录
fold_train_losses.append(fold_train_loss_history)
fold_val_losses.append(fold_val_loss_history)
# 完成 cv 循环后,使用整个 X_train / y_train 集重新训练模型
if model isnotNone :
model.eval # 将 numpy 数组转换为 pytorch 张量
X_train = torch.from_numpy(X_train).float
y_train = torch.from_numpy(y_train.values).float
train_dataset = torch.utils.data.TensorDataset(X_train, y_train)
for _ in range(num_epochs):
model.train
optimizer.zero_grad
loss.backward
optimizer.step
return {
'model': model, # trained model is returned
"fold_train_losses": fold_train_losses,
"fold_val_losses": fold_val_losses,
"average_train_loss": np.mean(fold_train_losses),
"average_val_loss": np.mean(fold_val_losses)
}
推理训练结束后,模型对新的、未见过的数据(X_test)进行推理。
记录损失以评估模型的泛化能力。
# 将 X_test (numpy) 转换为 torch 数据X_test_float = torch.from_numpy(X_test).float
y_test_float = torch.from_numpy(y_test.values).float
# 创建测试加载器
test_dataset = torch.utils.data.TensorDataset(X_test_float, y_test_float)
test_loader = torch.utils.data.DataLoader(test_dataset, batch_size=32, shuffle=False)
# 执行推理
model.eval
test_loss = 0
criterion = nn.MSELoss
with torch.inference_mode:
for X_batch, y_batch in test_loader:
X_batch = X_batch.unsqueeze(1)
y_batch = y_batch.unsqueeze(1)
outputs = model(X_batch)
test_loss += criterion(outputs, y_batch).item
# 计算平均损失 (MSE)
ave_test_loss = test_loss / len (test_loader)
结果1. GRU
Blocked K-fold在所有方法中实现了最佳的泛化损失 (MSE)。
下面的每个图表都通过 CV 方法绘制了CV 损失(针对所有折叠)、平均损失(蓝线)和泛化损失(X_test红色垂直线)。彩色区域表示模型推广学习的程度(越小越好)。
当平均 CV 损失(蓝线)推翻泛化损失(红线)时,就会发生过度拟合。
Blocked K-fold取得了最佳的泛化效果,其次是带 Gap 的 K-fold CV,但两者都在第 150 个 epoch 左右开始过拟合。实施 Early Stopping 可以进一步优化结果。
增长窗口取得了很好的平衡结果,很好地推广了学习以避免过度拟合,同时最大限度地减少了泛化误差。
Holdout方法和Monte Carlo方法对训练集的过拟合最为严重,导致极高的测试误差(粉色区域较大)。对于本数据集和模型而言,这两种 CV 方法均不合适。
2. SVR滑动窗口在所有方法中取得了最佳性能。
下面的每个图表都绘制了五倍的 CV 损失,显示了每种 CV 方法的平均 CV 损失(蓝线)和泛化损失(红线) 。
滑动窗口法准确率最高,平均均方误差 (MSE) 最低(0.6149),稳定性也最高。这意味着,对于支持向量回归 (SVR) 来说,使用固定大小的训练窗口是解决该数据集局部自相关的最有效方法。
生长窗口和hv-Blocked K-Fold在某些折叠中显示出最坏情况的错误,这表明在使用这些方法进行训练时,模型可能容易对过去的数据产生严重的过度拟合。
其他标准方法,如Holdout、Monte Carlo和K-Fold,表现出良好的泛化能力,但它们的损失仍然很高,这表明与滑动窗口相比,它们无法学到太多东西。
交叉验证是评估时间序列模型的一种强大技术,因为它可以通过镜像原始数据的结构帮助它们有效地概括并避免过度拟合。
在我们的案例中,我们了解到,当我们评估模型的泛化能力时,为模型和数据类型选择正确的 CV 技术非常重要。
目标是开发一个在生产中表现最佳的模型,而交叉验证是通过模拟真实世界数据实现这一目标的关键。
来源:一个数据人的自留地