PyTorch序列数据加载:随机与顺序批处理策略
本文详细介绍了在深度学习中处理长序列数据的两种高效批处理方法:随机抽样与顺序分区,并通过PyTorch实现展示了如何生成小批量输入和目标序列。
- 深度学习
- 自然语言处理
读取长序列数据
import random
import torch
def seq_data_iter_random(corpus,batch_size,num_steps):
"""
使用随机变量抽样生成一个小批量子序列
:param corpus: 语料库,通常是一个包含 token ID 的长列表或序列。
:param batch_size: 每个小批量中包含的子序列数量。
:param num_steps: 每个子序列的长度(时间步数)。
:return: 一个生成器,每次产出 (X, Y) 张量对。
"""
# 首先生成一个 0 到 num_steps - 1 之间的随机整数,然后将 corpus 从这个随机索引位置开始进行切片。这个随机索引之前的所有数据都被丢弃了。
corpus = corpus[random.randint(0,num_steps-1):]
# 计算可以获得多少个长度为 num_steps 的子序列
num_subseqs = (len(corpus) - 1) // num_steps
"""
创建一个包含所有子序列起始索引的列表。这些索引从 0 开始,每次增加 num_steps。这定义了语料库中 不重叠 的块的起始点。例如,如果 num_subseqs = 5 且 num_steps = 10,则 initial_indices 会是 [0, 10, 20, 30, 40]。
"""
initial_indices = list(range(0,num_subseqs * num_steps,num_steps))
# 将这些起始索引进行随机排序。这是该函数实现“随机抽样”的核心:不是按顺序处理子序列,而是随机选择它们的起始位置。
random.shuffle(initial_indices)
def data(pos):
# 返回从 pos 位置开始,长度为 num_steps 的子序列
return corpus[pos: pos + num_steps]
# 返回的批次数量
num_batches = num_subseqs // batch_size
for i in range(0, batch_size * num_batches, batch_size):
# 在这里,initial_indices 的长度可能不足 batch_size (如果 num_subseqs 不能被 batch_size 整除)但循环范围保证了我们只取完整的批次
initial_indices_per_batch = initial_indices[i: i + batch_size]
X = [data(j) for j in initial_indices_per_batch]
# 注意:Y 应该是 X 向后移动一位的结果
Y = [data(j + 1) for j in initial_indices_per_batch]
yield torch.tensor(X), torch.tensor(Y)
my_seq = list(range(35))
for X, Y in seq_data_iter_random(my_seq, batch_size=2, num_steps=5):
print('X:', X, 'Y:', Y)
X: tensor([[ 1, 2, 3, 4, 5],
[ 6, 7, 8, 9, 10]]) Y: tensor([[ 2, 3, 4, 5, 6],
[ 7, 8, 9, 10, 11]])
X: tensor([[26, 27, 28, 29, 30],
[11, 12, 13, 14, 15]]) Y: tensor([[27, 28, 29, 30, 31],
[12, 13, 14, 15, 16]])
X: tensor([[21, 22, 23, 24, 25],
[16, 17, 18, 19, 20]]) Y: tensor([[22, 23, 24, 25, 26],
[17, 18, 19, 20, 21]])
顺序分区
顺序分区保证两个相邻的小批量中的子序列在原始序列中也是相邻的。这种策略保留了拆分子序列。
def seq_data_iter_seqiuential(corpus,batch_size,num_steps):
"""
使用顺序分区生成小批量子序列
从一个长序列 corpus 中,生成一系列小批量 (X, Y) 数据对,其中 X 是输入子序列,Y 是对应的目标子序列(通常是 X 向后移动一个时间步)。这种方法试图保持数据的部分顺序性,同时将数据分割成适合并行处理的批次。
:param corpus: 语料库 (列表或类似序列的对象,包含 token ID)
:param batch_size: 批量大小 (每个批次包含多少条序列)
:param num_steps: 每个子序列的时间步长
:return:
"""
offset = random.randint(0,num_steps)
"""
计算从偏移量 offset 之后可用的 token 数量。减 1 是因为我们需要为 Y 序列留出最后一个 token 的对应目标(即 Y 比 X 晚一个位置)。
将可用 token 数整除以 batch_size。这计算出,如果我们将这些 token 平均分配到 batch_size 个并行的序列中,每条序列 可以包含多少个 token。
将上面得到的“每条序列的 token 数”再乘以 batch_size。这得到了最终要使用的 总 token 数量 (num_tokens)。这样做可以确保这个总数 num_tokens 能够被 batch_size 整除,方便后续的 reshape 操作。
"""
num_tokens = ((len(corpus) - offset - 1) // batch_size) * batch_size
"""
从语料库中提取从 offset 开始,总共 num_tokens 个 token,并将它们转换成一个 PyTorch 张量。此时 Xs 是一个一维张量(形状为 [num_tokens])。
"""
Xs = torch.tensor([corpus[offset: offset + num_tokens]])
Ys = torch.tensor([corpus[offset + 1: offset + 1 + num_tokens]])
"""
核心:
Xs.reshape(batch_size, -1): 将一维的长张量 Xs (包含 num_tokens 个元素) 重塑成一个二维张量,形状为 (batch_size, num_tokens // batch_size)。
这个 reshape 操作会按顺序将 Xs 中的数据填充到新的二维张量中。结果是,第一行 (Xs[0, :]) 包含了原始序列的第一个块,第二行 (Xs[1, :]) 包含了原始序列的第二个块,以此类推,直到第 batch_size 行。这意味着来自同一个时间步但在不同批次样本(不同行)的数据,在原始语料库中并不相邻。相反,同一行内的数据在原始语料库中是连续的。这与许多期望数据按 (时间步, 批次) 排列的模型输入方式不同。
"""
Xs , Ys = Xs.reshape(batch_size,-1), Ys.reshape(batch_size,-1)
"""
Xs.shape[1] 获取 reshape 后 Xs 张量的列数,也就是每条并行序列的长度 (num_tokens // batch_size)。
用这个长度整除以 num_steps(每个子序列的长度),得到可以从每条并行序列中切分出多少个长度为 num_steps 的完整子序列。这就是我们能生成的批次总数 num_batches。
"""
num_batches = Xs.shape[1] // num_steps
"""
这个循环沿着 reshape 后的张量的列(时间步维度)进行迭代,步长为 num_steps。i 代表当前批次子序列的起始列索引。
X = Xs[:, i : i + num_steps]: 从 Xs 中提取当前批次的输入数据。它选择所有行 (:),即 batch_size 个并行序列,并从列 i 到 i + num_steps(不包含)提取数据。结果 X 是一个形状为 (batch_size, num_steps) 的张量。
Y = Ys[:, i : i + num_steps]: 同样地从 Ys 中提取对应的目标批次。由于 Ys 在创建时就已经相对于 Xs 移动了一位,所以这里得到的 Y 正好是 X 的下一个时间步的目标。Y 的形状也是 (batch_size, num_steps)。
yield X, Y: 生成器返回当前的 (X, Y) 张量对。
"""
for i in range(0,num_steps * num_batches,num_steps):
X = Xs[:,i: i + num_steps]
Y = Ys[:,i: i + num_steps]
yield X, Y
for X,Y in seq_data_iter_seqiuential(my_seq, batch_size=2, num_steps=5):
print('X:', X, '\nY:', Y)
X: tensor([[ 3, 4, 5, 6, 7],
[18, 19, 20, 21, 22]])
Y: tensor([[ 4, 5, 6, 7, 8],
[19, 20, 21, 22, 23]])
X: tensor([[ 8, 9, 10, 11, 12],
[23, 24, 25, 26, 27]])
Y: tensor([[ 9, 10, 11, 12, 13],
[24, 25, 26, 27, 28]])
X: tensor([[13, 14, 15, 16, 17],
[28, 29, 30, 31, 32]])
Y: tensor([[14, 15, 16, 17, 18],
[29, 30, 31, 32, 33]])
import rnn.text_preprocessing as pp
class SeqDataLoader:
def __init__(self,batch_size,num_steps,use_random_iter, max_tokens):
if use_random_iter:
self.data_iter_fn = seq_data_iter_random
else:
self.data_iter_fn = seq_data_iter_seqiuential
"""调用文本预处理的函数"""
self.corpus , self.vocab = pp.load_corpus_time_machine(max_tokens)
self.batch_size, self.num_steps = batch_size, num_steps
def __iter__(self):
return self.data_iter_fn(self.corpus, self.batch_size, self.num_steps)
文本总行数:3221
the time machine by h g wells
twinkled and his usually pale face was flushed and animated the
['the', 'time', 'machine', 'by', 'h', 'g', 'wells']
[]
[]
[]
[]
['i']
[]
[]
['the', 'time', 'traveller', 'for', 'so', 'it', 'will', 'be', 'convenient', 'to', 'speak', 'of', 'him']
['was', 'expounding', 'a', 'recondite', 'matter', 'to', 'us', 'his', 'grey', 'eyes', 'shone', 'and']
['twinkled', 'and', 'his', 'usually', 'pale', 'face', 'was', 'flushed', 'and', 'animated', 'the']
[('<unk>', 0), ('the', 2), ('i', 3), ('and', 4), ('of', 5), ('a', 6), ('to', 7), ('was', 8), ('in', 9), ('that', 10)]
文本: ['the', 'time', 'machine', 'by', 'h', 'g', 'wells']
索引: [2, 20, 51, 41, 2184, 2185, 401]
文本: ['twinkled', 'and', 'his', 'usually', 'pale', 'face', 'was', 'flushed', 'and', 'animated', 'the']
索引: [2187, 4, 26, 1045, 363, 114, 8, 1422, 4, 1046, 2]
def load_data_time_machine(batch_size, num_steps, use_random_iter=False,max_tokens=10000):
data_iter = SeqDataLoader(
batch_size,
num_steps,
use_random_iter=use_random_iter,
max_tokens=max_tokens
)
return data_iter, data_iter.vocab
留言讨论
0 条留言
正在加载留言...