7.1. 流水线和复合估计器#

为了构建一个复合估计器,通常会将转换器(transformers)与其他转换器或预测器(例如分类器或回归器)结合起来。最常用的复合估计器工具是流水线(Pipeline)。流水线要求除了最后一个步骤之外的所有步骤都是转换器。最后一个步骤可以是任何东西,一个转换器、一个预测器,或者一个可能具有或不具有.predict(...)方法的聚类估计器。流水线会公开最后一个估计器提供的所有方法:如果最后一个步骤提供了transform方法,那么流水线将具有transform方法并表现得像一个转换器。如果最后一个步骤提供了predict方法,那么流水线将公开该方法,并给定数据X,使用除最后一个步骤之外的所有步骤转换数据,然后将转换后的数据传递给流水线的最后一个步骤的predict方法。Pipeline类通常与ColumnTransformerFeatureUnion结合使用,它们将转换器的输出连接成一个复合特征空间。TransformedTargetRegressor处理目标的转换(即对y进行对数转换)。

7.1.1. 流水线:链式估计器#

Pipeline可用于将多个估计器链接成一个。这很有用,因为数据处理中通常存在固定的步骤序列,例如特征选择、归一化和分类。Pipeline在此处有多种用途:

便利性和封装性

你只需对数据调用一次fitpredict,即可拟合一整个估计器序列。

联合参数选择

你可以一次性对流水线中所有估计器的参数进行网格搜索

安全性

通过确保使用相同的样本来训练转换器和预测器,流水线有助于避免在交叉验证中将测试数据中的统计信息泄露到训练模型中。

流水线中除了最后一个估计器之外的所有估计器都必须是转换器(即必须有一个transform方法)。最后一个估计器可以是任何类型(转换器、分类器等)。

注意

在流水线上调用fit与依次对每个估计器调用fit,然后转换输入并将其传递给下一步骤是相同的。流水线拥有其最后一个估计器所拥有的所有方法,即如果最后一个估计器是分类器,则Pipeline可以作为一个分类器使用。如果最后一个估计器是转换器,同样,流水线也是转换器。

7.1.1.1. 用法#

7.1.1.1.1. 构建流水线#

Pipeline是使用(key, value)对列表构建的,其中key是您希望为此步骤指定的名称字符串,value是估计器对象

>>> from sklearn.pipeline import Pipeline
>>> from sklearn.svm import SVC
>>> from sklearn.decomposition import PCA
>>> estimators = [('reduce_dim', PCA()), ('clf', SVC())]
>>> pipe = Pipeline(estimators)
>>> pipe
Pipeline(steps=[('reduce_dim', PCA()), ('clf', SVC())])
使用make_pipeline的简写版本#

实用函数make_pipeline是构造流水线的简写;它接受可变数量的估计器并返回一个流水线,自动填充名称

>>> from sklearn.pipeline import make_pipeline
>>> make_pipeline(PCA(), SVC())
Pipeline(steps=[('pca', PCA()), ('svc', SVC())])

7.1.1.1.2. 访问流水线步骤#

流水线的估计器作为列表存储在steps属性中。可以使用Python序列(如列表或字符串)常用的切片表示法提取子流水线(尽管只允许步长为1)。这对于仅执行部分转换(或其逆转换)很方便

>>> pipe[:1]
Pipeline(steps=[('reduce_dim', PCA())])
>>> pipe[-1:]
Pipeline(steps=[('clf', SVC())])
按名称或位置访问步骤#

通过索引(使用[idx])流水线,也可以按索引或名称访问特定步骤

>>> pipe.steps[0]
('reduce_dim', PCA())
>>> pipe[0]
PCA()
>>> pipe['reduce_dim']
PCA()

Pipelinenamed_steps属性允许在交互式环境中通过Tab键补全功能按名称访问步骤

>>> pipe.named_steps.reduce_dim is pipe['reduce_dim']
True

7.1.1.1.3. 在流水线中跟踪特征名称#

为了启用模型检查,Pipeline有一个get_feature_names_out()方法,就像所有转换器一样。您可以使用流水线切片来获取进入每个步骤的特征名称

>>> from sklearn.datasets import load_iris
>>> from sklearn.linear_model import LogisticRegression
>>> from sklearn.feature_selection import SelectKBest
>>> iris = load_iris()
>>> pipe = Pipeline(steps=[
...    ('select', SelectKBest(k=2)),
...    ('clf', LogisticRegression())])
>>> pipe.fit(iris.data, iris.target)
Pipeline(steps=[('select', SelectKBest(...)), ('clf', LogisticRegression(...))])
>>> pipe[:-1].get_feature_names_out()
array(['x2', 'x3'], ...)
自定义特征名称#

您还可以使用get_feature_names_out为输入数据提供自定义特征名称

>>> pipe[:-1].get_feature_names_out(iris.feature_names)
array(['petal length (cm)', 'petal width (cm)'], ...)

7.1.1.1.4. 访问嵌套参数#

通常需要调整流水线中估计器的参数。因此,此参数是嵌套的,因为它属于特定的子步骤。流水线中估计器的参数可以使用<estimator>__<parameter>语法访问

>>> pipe = Pipeline(steps=[("reduce_dim", PCA()), ("clf", SVC())])
>>> pipe.set_params(clf__C=10)
Pipeline(steps=[('reduce_dim', PCA()), ('clf', SVC(C=10))])
何时重要?#

这对于进行网格搜索尤为重要

>>> from sklearn.model_selection import GridSearchCV
>>> param_grid = dict(reduce_dim__n_components=[2, 5, 10],
...                   clf__C=[0.1, 10, 100])
>>> grid_search = GridSearchCV(pipe, param_grid=param_grid)

各个步骤也可以作为参数替换,非最终步骤可以通过将其设置为'passthrough'来忽略。

>>> param_grid = dict(reduce_dim=['passthrough', PCA(5), PCA(10)],
...                   clf=[SVC(), LogisticRegression()],
...                   clf__C=[0.1, 10, 100])
>>> grid_search = GridSearchCV(pipe, param_grid=param_grid)

示例

7.1.1.2. 缓存转换器:避免重复计算#

拟合转换器可能会耗费大量计算资源。当Pipelinememory参数被设置后,它会在调用fit后缓存每个转换器。此功能用于避免在流水线中重复计算拟合过的转换器,如果参数和输入数据相同的话。一个典型的例子是网格搜索,其中转换器可以只拟合一次并用于每个配置。最后一个步骤永远不会被缓存,即使它是一个转换器。

需要memory参数才能缓存转换器。memory可以是一个字符串,其中包含缓存转换器的目录,也可以是一个joblib.Memory对象

>>> from tempfile import mkdtemp
>>> from shutil import rmtree
>>> from sklearn.decomposition import PCA
>>> from sklearn.svm import SVC
>>> from sklearn.pipeline import Pipeline
>>> estimators = [('reduce_dim', PCA()), ('clf', SVC())]
>>> cachedir = mkdtemp()
>>> pipe = Pipeline(estimators, memory=cachedir)
>>> pipe
Pipeline(memory=...,
         steps=[('reduce_dim', PCA()), ('clf', SVC())])
>>> # Clear the cache directory when you don't need it anymore
>>> rmtree(cachedir)
缓存转换器的副作用#

在不启用缓存的情况下使用Pipeline,可以检查原始实例,例如:

>>> from sklearn.datasets import load_digits
>>> X_digits, y_digits = load_digits(return_X_y=True)
>>> pca1 = PCA(n_components=10)
>>> svm1 = SVC()
>>> pipe = Pipeline([('reduce_dim', pca1), ('clf', svm1)])
>>> pipe.fit(X_digits, y_digits)
Pipeline(steps=[('reduce_dim', PCA(n_components=10)), ('clf', SVC())])
>>> # The pca instance can be inspected directly
>>> pca1.components_.shape
(10, 64)

启用缓存会在拟合之前触发转换器的克隆。因此,无法直接检查给定给流水线的转换器实例。在以下示例中,访问PCA实例pca2将引发AttributeError,因为pca2将是一个未拟合的转换器。相反,请使用named_steps属性来检查流水线中的估计器:

>>> cachedir = mkdtemp()
>>> pca2 = PCA(n_components=10)
>>> svm2 = SVC()
>>> cached_pipe = Pipeline([('reduce_dim', pca2), ('clf', svm2)],
...                        memory=cachedir)
>>> cached_pipe.fit(X_digits, y_digits)
Pipeline(memory=...,
         steps=[('reduce_dim', PCA(n_components=10)), ('clf', SVC())])
>>> cached_pipe.named_steps['reduce_dim'].components_.shape
(10, 64)
>>> # Remove the cache directory
>>> rmtree(cachedir)

示例

7.1.2. 回归中的目标转换#

TransformedTargetRegressor在拟合回归模型之前转换目标y。预测结果通过逆转换映射回原始空间。它接受用于预测的回归器以及将应用于目标变量的转换器作为参数。

>>> import numpy as np
>>> from sklearn.datasets import fetch_california_housing
>>> from sklearn.compose import TransformedTargetRegressor
>>> from sklearn.preprocessing import QuantileTransformer
>>> from sklearn.linear_model import LinearRegression
>>> from sklearn.model_selection import train_test_split
>>> X, y = fetch_california_housing(return_X_y=True)
>>> X, y = X[:2000, :], y[:2000]  # select a subset of data
>>> transformer = QuantileTransformer(output_distribution='normal')
>>> regressor = LinearRegression()
>>> regr = TransformedTargetRegressor(regressor=regressor,
...                                   transformer=transformer)
>>> X_train, X_test, y_train, y_test = train_test_split(X, y, random_state=0)
>>> regr.fit(X_train, y_train)
TransformedTargetRegressor(...)
>>> print('R2 score: {0:.2f}'.format(regr.score(X_test, y_test)))
R2 score: 0.61
>>> raw_target_regr = LinearRegression().fit(X_train, y_train)
>>> print('R2 score: {0:.2f}'.format(raw_target_regr.score(X_test, y_test)))
R2 score: 0.59

对于简单的转换,可以传递一对函数(定义转换及其逆映射)而不是转换器对象

>>> def func(x):
...     return np.log(x)
>>> def inverse_func(x):
...     return np.exp(x)

随后,对象创建为

>>> regr = TransformedTargetRegressor(regressor=regressor,
...                                   func=func,
...                                   inverse_func=inverse_func)
>>> regr.fit(X_train, y_train)
TransformedTargetRegressor(...)
>>> print('R2 score: {0:.2f}'.format(regr.score(X_test, y_test)))
R2 score: 0.51

默认情况下,每次拟合时都会检查所提供的函数是否互为逆函数。但是,可以通过将check_inverse设置为False来跳过此检查。

>>> def inverse_func(x):
...     return x
>>> regr = TransformedTargetRegressor(regressor=regressor,
...                                   func=func,
...                                   inverse_func=inverse_func,
...                                   check_inverse=False)
>>> regr.fit(X_train, y_train)
TransformedTargetRegressor(...)
>>> print('R2 score: {0:.2f}'.format(regr.score(X_test, y_test)))
R2 score: -1.57

注意

可以通过设置transformer或函数对funcinverse_func来触发转换。但是,同时设置这两个选项将引发错误。

示例

7.1.3. 特征联合(FeatureUnion):复合特征空间#

FeatureUnion将多个转换器对象组合成一个新的转换器,该转换器结合了它们的输出。FeatureUnion接受一个转换器对象列表。在拟合期间,每个转换器都独立地拟合数据。这些转换器并行应用,它们输出的特征矩阵并排连接成一个更大的矩阵。

当您想要对数据的每个字段应用不同的转换时,请参阅相关类ColumnTransformer(请参阅用户指南)。

FeatureUnionPipeline具有相同的目的——便利性以及联合参数估计和验证。

FeatureUnionPipeline可以结合起来创建复杂的模型。

FeatureUnion无法检查两个转换器是否会生成相同的特征。它只在特征集不相交时生成一个联合,并且确保它们不相交是调用者的责任。)

7.1.3.1. 用法#

FeatureUnion使用(key, value)对列表构建,其中key是您希望赋予给定转换的名称(任意字符串;仅用作标识符),value是估计器对象

>>> from sklearn.pipeline import FeatureUnion
>>> from sklearn.decomposition import PCA
>>> from sklearn.decomposition import KernelPCA
>>> estimators = [('linear_pca', PCA()), ('kernel_pca', KernelPCA())]
>>> combined = FeatureUnion(estimators)
>>> combined
FeatureUnion(transformer_list=[('linear_pca', PCA()),
                               ('kernel_pca', KernelPCA())])

与流水线类似,特征联合也有一个简写构造函数make_union,它不需要显式命名组件。

Pipeline一样,各个步骤可以通过set_params替换,并通过设置为'drop'来忽略

>>> combined.set_params(kernel_pca='drop')
FeatureUnion(transformer_list=[('linear_pca', PCA()),
                               ('kernel_pca', 'drop')])

示例

7.1.4. 用于异构数据的ColumnTransformer#

许多数据集包含不同类型的特征,例如文本、浮点数和日期,其中每种类型的特征都需要单独的预处理或特征提取步骤。通常,在应用scikit-learn方法之前(例如使用pandas)预处理数据是最简单的。在将数据传递给scikit-learn之前对其进行处理可能会出现以下问题:

  1. 将测试数据中的统计信息整合到预处理器中,会使交叉验证分数不可靠(称为数据泄露),例如在缩放器或缺失值填充的情况下。

  2. 您可能希望将预处理器的参数包含在参数搜索中。

ColumnTransformer有助于在Pipeline中对数据的不同列执行不同的转换,该流水线可以防止数据泄露并可以参数化。ColumnTransformer适用于数组、稀疏矩阵和pandas DataFrames

对每列可以应用不同的转换,例如预处理或特定的特征提取方法

>>> import pandas as pd
>>> X = pd.DataFrame(
...     {'city': ['London', 'London', 'Paris', 'Sallisaw'],
...      'title': ["His Last Bow", "How Watson Learned the Trick",
...                "A Moveable Feast", "The Grapes of Wrath"],
...      'expert_rating': [5, 3, 4, 5],
...      'user_rating': [4, 5, 4, 3]})

对于此数据,我们可能希望使用OneHotEncoder'city'列编码为分类变量,但对'title'列应用CountVectorizer。由于我们可能对同一列使用多种特征提取方法,因此我们为每个转换器赋予一个唯一的名称,例如'city_category''title_bow'。默认情况下,其余的评分列被忽略(remainder='drop'

>>> from sklearn.compose import ColumnTransformer
>>> from sklearn.feature_extraction.text import CountVectorizer
>>> from sklearn.preprocessing import OneHotEncoder
>>> column_trans = ColumnTransformer(
...     [('categories', OneHotEncoder(dtype='int'), ['city']),
...      ('title_bow', CountVectorizer(), 'title')],
...     remainder='drop', verbose_feature_names_out=False)

>>> column_trans.fit(X)
ColumnTransformer(transformers=[('categories', OneHotEncoder(dtype='int'),
                                 ['city']),
                                ('title_bow', CountVectorizer(), 'title')],
                  verbose_feature_names_out=False)

>>> column_trans.get_feature_names_out()
array(['city_London', 'city_Paris', 'city_Sallisaw', 'bow', 'feast',
'grapes', 'his', 'how', 'last', 'learned', 'moveable', 'of', 'the',
 'trick', 'watson', 'wrath'], ...)

>>> column_trans.transform(X).toarray()
array([[1, 0, 0, 1, 0, 0, 1, 0, 1, 0, 0, 0, 0, 0, 0, 0],
       [1, 0, 0, 0, 0, 0, 0, 1, 0, 1, 0, 0, 1, 1, 1, 0],
       [0, 1, 0, 0, 1, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0],
       [0, 0, 1, 0, 0, 1, 0, 0, 0, 0, 0, 1, 1, 0, 0, 1]]...)

在上述示例中,CountVectorizer期望1D数组作为输入,因此列被指定为字符串('title')。然而,OneHotEncoder与大多数其他转换器一样期望2D数据,因此在这种情况下,您需要将列指定为字符串列表(['city'])。

除了标量或单项列表外,列选择还可以指定为多项列表、整数数组、切片、布尔掩码或使用make_column_selectormake_column_selector用于根据数据类型或列名选择列

>>> from sklearn.preprocessing import StandardScaler
>>> from sklearn.compose import make_column_selector
>>> ct = ColumnTransformer([
...       ('scale', StandardScaler(),
...       make_column_selector(dtype_include=np.number)),
...       ('onehot',
...       OneHotEncoder(),
...       make_column_selector(pattern='city', dtype_include=object))])
>>> ct.fit_transform(X)
array([[ 0.904,  0.      ,  1. ,  0. ,  0. ],
       [-1.507,  1.414,  1. ,  0. ,  0. ],
       [-0.301,  0.      ,  0. ,  1. ,  0. ],
       [ 0.904, -1.414,  0. ,  0. ,  1. ]])

如果输入是DataFrame,字符串可以引用列;整数始终被解释为位置列。

我们可以通过设置remainder='passthrough'来保留剩余的评分列。这些值会附加到转换的末尾

>>> column_trans = ColumnTransformer(
...     [('city_category', OneHotEncoder(dtype='int'),['city']),
...      ('title_bow', CountVectorizer(), 'title')],
...     remainder='passthrough')

>>> column_trans.fit_transform(X)
array([[1, 0, 0, 1, 0, 0, 1, 0, 1, 0, 0, 0, 0, 0, 0, 0, 5, 4],
       [1, 0, 0, 0, 0, 0, 0, 1, 0, 1, 0, 0, 1, 1, 1, 0, 3, 5],
       [0, 1, 0, 0, 1, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 4, 4],
       [0, 0, 1, 0, 0, 1, 0, 0, 0, 0, 0, 1, 1, 0, 0, 1, 5, 3]]...)

remainder参数可以设置为一个估计器,用于转换剩余的评分列。转换后的值将附加到转换的末尾。

>>> from sklearn.preprocessing import MinMaxScaler
>>> column_trans = ColumnTransformer(
...     [('city_category', OneHotEncoder(), ['city']),
...      ('title_bow', CountVectorizer(), 'title')],
...     remainder=MinMaxScaler())

>>> column_trans.fit_transform(X)[:, -2:]
array([[1. , 0.5],
       [0. , 1. ],
       [0.5, 0.5],
       [1. , 0. ]])

可以使用make_column_transformer函数更方便地创建ColumnTransformer对象。具体来说,名称将自动给定。上述示例的等效代码是:

>>> from sklearn.compose import make_column_transformer
>>> column_trans = make_column_transformer(
...     (OneHotEncoder(), ['city']),
...     (CountVectorizer(), 'title'),
...     remainder=MinMaxScaler())
>>> column_trans
ColumnTransformer(remainder=MinMaxScaler(),
                  transformers=[('onehotencoder', OneHotEncoder(), ['city']),
                                ('countvectorizer', CountVectorizer(),
                                 'title')])

如果ColumnTransformer使用DataFrame拟合,并且DataFrame仅包含字符串列名,那么转换DataFrame时将使用列名来选择列

>>> ct = ColumnTransformer(
...          [("scale", StandardScaler(), ["expert_rating"])]).fit(X)
>>> X_new = pd.DataFrame({"expert_rating": [5, 6, 1],
...                       "ignored_new_col": [1.2, 0.3, -0.1]})
>>> ct.transform(X_new)
array([[ 0.9],
       [ 2.1],
       [-3.9]])

7.1.5. 可视化复合估计器#

当在Jupyter Notebook中显示时,估计器会以HTML形式呈现。这对于诊断或可视化包含许多估计器的流水线非常有用。此可视化默认是激活的。

>>> column_trans  

可以通过将set_config中的display选项设置为'text'来停用。

>>> from sklearn import set_config
>>> set_config(display='text')  
>>> # displays text representation in a jupyter context
>>> column_trans  

HTML输出的示例可以在混合类型列转换器流水线的HTML表示部分中看到。作为替代方案,可以使用estimator_html_repr将HTML写入文件

>>> from sklearn.utils import estimator_html_repr
>>> with open('my_estimator.html', 'w') as f:  
...     f.write(estimator_html_repr(clf))

示例