Module skplumber.pipeline
Expand source code
import typing as t
import pandas as pd
from skplumber.primitives.primitive import Primitive
from skplumber.primitives.custom_primitives.preprocessing import (
OneHotEncoder,
RandomImputer,
)
from skplumber.primitives.parammeta import ParamMeta
from skplumber.utils import PipelineRunError
class PrimitiveStep:
def __init__(self, primitive_cls: t.Type[Primitive], inputs: t.List[int], **params):
"""
Parameters
----------
primitive_cls : class inheriting Primitive
The primitive class this step is associated with.
inputs : list of int
The indices of the pipeline steps for whose output
this step will use as its input.
params : kwargs
Any hyperparameters to set in the primitive.
"""
self.primitive = primitive_cls(**params)
self.inputs = inputs
class Pipeline:
def __init__(self, add_preprocessing: bool = True) -> None:
"""
Initializes the pipeline, including some preliminary
common data preprocessing if `add_preprocessing == True`.
"""
self.steps: t.List[PrimitiveStep] = []
if add_preprocessing:
self.add_step(RandomImputer)
self.add_step(OneHotEncoder)
@property
def curr_step_i(self) -> int:
return len(self.steps) - 1
def add_step(
self, primitive_cls: t.Type[Primitive], inputs: t.List[int] = None
) -> None:
"""
Adds `primitive` as the next step to this pipeline. If `inputs` is `None`,
the outputs of the most recent step will be used as `inputs`.
"""
if inputs is None:
inputs = [len(self.steps) - 1]
step = PrimitiveStep(primitive_cls, inputs)
self.steps.append(step)
def _run(
self, X: pd.DataFrame, y: t.Optional[pd.Series], *, fit: bool,
) -> pd.DataFrame:
try:
if fit and y is None:
raise ValueError("`y` cannot be `None` when fitting a pipeline")
all_step_outputs: t.List[pd.DataFrame] = []
for step_i, step in enumerate(self.steps):
if step_i == 0:
step_inputs = X
else:
step_inputs = pd.concat(
[all_step_outputs[i] for i in step.inputs], axis=1
)
if fit:
step.primitive.fit(step_inputs, y)
step_outputs = step.primitive.produce(step_inputs)
if isinstance(step_outputs, pd.Series) and step_i < len(self.steps) - 1:
# Every step's output but the last step must be a dataframe, since
# it might be used as the `X` input for a future step.
step_outputs = pd.DataFrame({"output": step_outputs})
all_step_outputs.append(step_outputs)
final_predictions = all_step_outputs[-1]
if not isinstance(final_predictions, pd.Series):
raise ValueError(
f"final pipeline step {self.steps[-1].primitive} "
"did not output a pandas Series"
)
return final_predictions
except Exception as e:
raise PipelineRunError from e
@property
def param_metas(self) -> t.Dict[int, t.Dict[str, ParamMeta]]:
return {i: step.primitive.param_metas for i, step in enumerate(self.steps)}
@property
def num_params(self) -> int:
return sum(len(step.primitive.param_metas) for step in self.steps)
def param_metas_with_data(
self, X: pd.DataFrame
) -> t.Dict[int, t.Dict[str, ParamMeta]]:
return {
i: step.primitive.param_metas_with_data(X)
for i, step in enumerate(self.steps)
}
def get_params(self) -> t.Dict[int, t.Dict[str, t.Any]]:
"""
Get all the pipeline's tunable hyperparameters. A given
param for a given step can be accessed via e.g.:
```
params = pipeline.get_params()
params[0]["criterion"]
```
That yields the value of the `"criterion"` param of
the 0th step in the pipeline.
"""
return {i: step.primitive.get_params() for i, step in enumerate(self.steps)}
def set_params(self, params: t.Dict[int, t.Dict[str, t.Any]]) -> None:
"""
Sets any tunable hyperparameters on one or more steps in the
pipeline. E.g. to set the `"criterion"` param of the 0th step:
```
pipeline.set_params({0: {"criterion": "gini"}})
```
"""
for i, step_params in params.items():
if i < 0 or i >= len(self.steps):
raise ValueError(f"pipeline does not have a step at index {i}")
self.steps[i].primitive.set_params(**step_params)
def fit(self, X: pd.DataFrame, y: pd.Series) -> None:
"""
Fits the pipeline on `X` and `y`, meaning, learns how to use `X`
to predict `y`.
Parameters
----------
X : pandas.DataFrame
The dataframe of features.
y : pandas.Series
The series of targets to learn to predict.
"""
self._run(X, y, fit=True)
def predict(self, X: pd.DataFrame) -> pd.Series:
"""
Makes a prediction for each instance in `X`, returning the predictions.
"""
return self._run(X, None, fit=False)
def __str__(self) -> str:
string = f"Pipeline object with {len(self.steps)} steps:"
for step in self.steps:
string += "\n\t" + str(step.primitive)
return string
Classes
class Pipeline (add_preprocessing: bool = True)
-
Initializes the pipeline, including some preliminary common data preprocessing if
add_preprocessing == True
.Expand source code
class Pipeline: def __init__(self, add_preprocessing: bool = True) -> None: """ Initializes the pipeline, including some preliminary common data preprocessing if `add_preprocessing == True`. """ self.steps: t.List[PrimitiveStep] = [] if add_preprocessing: self.add_step(RandomImputer) self.add_step(OneHotEncoder) @property def curr_step_i(self) -> int: return len(self.steps) - 1 def add_step( self, primitive_cls: t.Type[Primitive], inputs: t.List[int] = None ) -> None: """ Adds `primitive` as the next step to this pipeline. If `inputs` is `None`, the outputs of the most recent step will be used as `inputs`. """ if inputs is None: inputs = [len(self.steps) - 1] step = PrimitiveStep(primitive_cls, inputs) self.steps.append(step) def _run( self, X: pd.DataFrame, y: t.Optional[pd.Series], *, fit: bool, ) -> pd.DataFrame: try: if fit and y is None: raise ValueError("`y` cannot be `None` when fitting a pipeline") all_step_outputs: t.List[pd.DataFrame] = [] for step_i, step in enumerate(self.steps): if step_i == 0: step_inputs = X else: step_inputs = pd.concat( [all_step_outputs[i] for i in step.inputs], axis=1 ) if fit: step.primitive.fit(step_inputs, y) step_outputs = step.primitive.produce(step_inputs) if isinstance(step_outputs, pd.Series) and step_i < len(self.steps) - 1: # Every step's output but the last step must be a dataframe, since # it might be used as the `X` input for a future step. step_outputs = pd.DataFrame({"output": step_outputs}) all_step_outputs.append(step_outputs) final_predictions = all_step_outputs[-1] if not isinstance(final_predictions, pd.Series): raise ValueError( f"final pipeline step {self.steps[-1].primitive} " "did not output a pandas Series" ) return final_predictions except Exception as e: raise PipelineRunError from e @property def param_metas(self) -> t.Dict[int, t.Dict[str, ParamMeta]]: return {i: step.primitive.param_metas for i, step in enumerate(self.steps)} @property def num_params(self) -> int: return sum(len(step.primitive.param_metas) for step in self.steps) def param_metas_with_data( self, X: pd.DataFrame ) -> t.Dict[int, t.Dict[str, ParamMeta]]: return { i: step.primitive.param_metas_with_data(X) for i, step in enumerate(self.steps) } def get_params(self) -> t.Dict[int, t.Dict[str, t.Any]]: """ Get all the pipeline's tunable hyperparameters. A given param for a given step can be accessed via e.g.: ``` params = pipeline.get_params() params[0]["criterion"] ``` That yields the value of the `"criterion"` param of the 0th step in the pipeline. """ return {i: step.primitive.get_params() for i, step in enumerate(self.steps)} def set_params(self, params: t.Dict[int, t.Dict[str, t.Any]]) -> None: """ Sets any tunable hyperparameters on one or more steps in the pipeline. E.g. to set the `"criterion"` param of the 0th step: ``` pipeline.set_params({0: {"criterion": "gini"}}) ``` """ for i, step_params in params.items(): if i < 0 or i >= len(self.steps): raise ValueError(f"pipeline does not have a step at index {i}") self.steps[i].primitive.set_params(**step_params) def fit(self, X: pd.DataFrame, y: pd.Series) -> None: """ Fits the pipeline on `X` and `y`, meaning, learns how to use `X` to predict `y`. Parameters ---------- X : pandas.DataFrame The dataframe of features. y : pandas.Series The series of targets to learn to predict. """ self._run(X, y, fit=True) def predict(self, X: pd.DataFrame) -> pd.Series: """ Makes a prediction for each instance in `X`, returning the predictions. """ return self._run(X, None, fit=False) def __str__(self) -> str: string = f"Pipeline object with {len(self.steps)} steps:" for step in self.steps: string += "\n\t" + str(step.primitive) return string
Instance variables
var curr_step_i : int
-
Expand source code
@property def curr_step_i(self) -> int: return len(self.steps) - 1
var num_params : int
-
Expand source code
@property def num_params(self) -> int: return sum(len(step.primitive.param_metas) for step in self.steps)
var param_metas : Dict[int, Dict[str, ParamMeta]]
-
Expand source code
@property def param_metas(self) -> t.Dict[int, t.Dict[str, ParamMeta]]: return {i: step.primitive.param_metas for i, step in enumerate(self.steps)}
Methods
def add_step(self, primitive_cls: Type[Primitive], inputs: List[int] = None) ‑> NoneType
-
Adds
primitive
as the next step to this pipeline. Ifinputs
isNone
, the outputs of the most recent step will be used asinputs
.Expand source code
def add_step( self, primitive_cls: t.Type[Primitive], inputs: t.List[int] = None ) -> None: """ Adds `primitive` as the next step to this pipeline. If `inputs` is `None`, the outputs of the most recent step will be used as `inputs`. """ if inputs is None: inputs = [len(self.steps) - 1] step = PrimitiveStep(primitive_cls, inputs) self.steps.append(step)
def fit(self, X: pandas.core.frame.DataFrame, y: pandas.core.series.Series) ‑> NoneType
-
Fits the pipeline on
X
andy
, meaning, learns how to useX
to predicty
.Parameters
X
:pandas.DataFrame
- The dataframe of features.
y
:pandas.Series
- The series of targets to learn to predict.
Expand source code
def fit(self, X: pd.DataFrame, y: pd.Series) -> None: """ Fits the pipeline on `X` and `y`, meaning, learns how to use `X` to predict `y`. Parameters ---------- X : pandas.DataFrame The dataframe of features. y : pandas.Series The series of targets to learn to predict. """ self._run(X, y, fit=True)
def get_params(self) ‑> Dict[int, Dict[str, Any]]
-
Get all the pipeline's tunable hyperparameters. A given param for a given step can be accessed via e.g.:
params = pipeline.get_params() params[0]["criterion"]
That yields the value of the
"criterion"
param of the 0th step in the pipeline.Expand source code
def get_params(self) -> t.Dict[int, t.Dict[str, t.Any]]: """ Get all the pipeline's tunable hyperparameters. A given param for a given step can be accessed via e.g.: ``` params = pipeline.get_params() params[0]["criterion"] ``` That yields the value of the `"criterion"` param of the 0th step in the pipeline. """ return {i: step.primitive.get_params() for i, step in enumerate(self.steps)}
def param_metas_with_data(self, X: pandas.core.frame.DataFrame) ‑> Dict[int, Dict[str, ParamMeta]]
-
Expand source code
def param_metas_with_data( self, X: pd.DataFrame ) -> t.Dict[int, t.Dict[str, ParamMeta]]: return { i: step.primitive.param_metas_with_data(X) for i, step in enumerate(self.steps) }
def predict(self, X: pandas.core.frame.DataFrame) ‑> pandas.core.series.Series
-
Makes a prediction for each instance in
X
, returning the predictions.Expand source code
def predict(self, X: pd.DataFrame) -> pd.Series: """ Makes a prediction for each instance in `X`, returning the predictions. """ return self._run(X, None, fit=False)
def set_params(self, params: Dict[int, Dict[str, Any]]) ‑> NoneType
-
Sets any tunable hyperparameters on one or more steps in the pipeline. E.g. to set the
"criterion"
param of the 0th step:pipeline.set_params({0: {"criterion": "gini"}})
Expand source code
def set_params(self, params: t.Dict[int, t.Dict[str, t.Any]]) -> None: """ Sets any tunable hyperparameters on one or more steps in the pipeline. E.g. to set the `"criterion"` param of the 0th step: ``` pipeline.set_params({0: {"criterion": "gini"}}) ``` """ for i, step_params in params.items(): if i < 0 or i >= len(self.steps): raise ValueError(f"pipeline does not have a step at index {i}") self.steps[i].primitive.set_params(**step_params)
class PrimitiveStep (primitive_cls: Type[Primitive], inputs: List[int], **params)
-
Parameters
primitive_cls
:class inheriting Primitive
- The primitive class this step is associated with.
inputs
:list
ofint
- The indices of the pipeline steps for whose output this step will use as its input.
params
:kwargs
- Any hyperparameters to set in the primitive.
Expand source code
class PrimitiveStep: def __init__(self, primitive_cls: t.Type[Primitive], inputs: t.List[int], **params): """ Parameters ---------- primitive_cls : class inheriting Primitive The primitive class this step is associated with. inputs : list of int The indices of the pipeline steps for whose output this step will use as its input. params : kwargs Any hyperparameters to set in the primitive. """ self.primitive = primitive_cls(**params) self.inputs = inputs