Module skplumber.samplers.sampler
Expand source code
from abc import ABC, abstractmethod
import typing as t
from typing import NamedTuple
from time import time
import pandas as pd
from skplumber.primitives.primitive import Primitive
from skplumber.pipeline import Pipeline
from skplumber.consts import ProblemType
from skplumber.metrics import Metric
from skplumber.utils import (
logger,
conditional_timeout,
EvaluationTimeoutError,
PipelineRunError,
)
class SamplerState(NamedTuple):
score: float
pipeline: Pipeline
train_time: float
n_iters: int
class PipelineSampler(ABC):
def run(
self,
X: pd.DataFrame,
y: pd.Series,
*,
models: t.List[t.Type[Primitive]],
transformers: t.List[t.Type[Primitive]],
problem_type: ProblemType,
metric: Metric,
evaluator: t.Callable,
pipeline_timeout: t.Optional[int],
num_samples: t.Optional[int] = None,
callback: t.Union[None, t.Callable, t.List[t.Callable]] = None,
exit_on_pipeline_error: bool = True,
) -> t.Tuple[Pipeline, float, int]:
"""Samples `num_samples` pipelines, returning the best one found along the way.
Returns
-------
best_pipeline : Pipeline
The fitted best pipeline trained on the problem.
best_score : float
The score of the best pipeline that was trained.
n_iters : int
The total number of iterations the sampler completed.
"""
# Validate inputs
if num_samples is None and callback is None:
raise ValueError(
"either num_samples or callback must be"
" passed so the sampler knows when to stop"
)
if num_samples is not None and num_samples < 1:
raise ValueError(f"num_samples must be >= 1, got {num_samples}")
if callback is None:
callbacks: t.List[t.Callable] = []
elif callable(callback):
callbacks = [callback]
elif isinstance(callback, list):
callbacks = callback
else:
raise ValueError(f"unsupported type '{type(callback)}' for callback arg")
# Initialize
should_timeout = pipeline_timeout is not None
best_score = metric.worst_value
best_pipeline = None
# Conduct the sampling
i = 0
while True:
i += 1
logger.info(
f"sampling pipeline {i}"
f"{'/' + str(num_samples) if num_samples else ''}"
)
pipeline = self.sample_pipeline(problem_type, models, transformers)
try:
with conditional_timeout(pipeline_timeout, should_timeout):
# Train the pipeline and check its performance.
start_time = time()
test_score = evaluator(pipeline, X, y, metric)
logger.info(f"achieved test score: {test_score}")
if (
metric.is_better_than(test_score, best_score)
or best_pipeline is None
):
best_score = test_score
best_pipeline = pipeline
# Check to see if its time to stop sampling.
if callback is not None:
# We stop if any callback returns True.
train_time = time() - start_time
exit_early = any(
cb(SamplerState(test_score, pipeline, train_time, i))
for cb in callbacks
)
if exit_early:
break
if best_score == metric.best_value:
logger.info(
f"found best possible score {metric.best_value} early, "
"stopping the search"
)
break
if num_samples and i >= num_samples:
break
except EvaluationTimeoutError:
logger.info("pipeline took too long to evaluate, skipping")
logger.debug(pipeline)
except PipelineRunError as e:
logger.exception(e)
if exit_on_pipeline_error:
raise e
return best_pipeline, best_score, i
@abstractmethod
def sample_pipeline(
self,
problem_type: ProblemType,
models: t.List[t.Type[Primitive]],
transformers: t.List[t.Type[Primitive]],
) -> Pipeline:
pass
Classes
class PipelineSampler
-
Helper class that provides a standard way to create an ABC using inheritance.
Expand source code
class PipelineSampler(ABC): def run( self, X: pd.DataFrame, y: pd.Series, *, models: t.List[t.Type[Primitive]], transformers: t.List[t.Type[Primitive]], problem_type: ProblemType, metric: Metric, evaluator: t.Callable, pipeline_timeout: t.Optional[int], num_samples: t.Optional[int] = None, callback: t.Union[None, t.Callable, t.List[t.Callable]] = None, exit_on_pipeline_error: bool = True, ) -> t.Tuple[Pipeline, float, int]: """Samples `num_samples` pipelines, returning the best one found along the way. Returns ------- best_pipeline : Pipeline The fitted best pipeline trained on the problem. best_score : float The score of the best pipeline that was trained. n_iters : int The total number of iterations the sampler completed. """ # Validate inputs if num_samples is None and callback is None: raise ValueError( "either num_samples or callback must be" " passed so the sampler knows when to stop" ) if num_samples is not None and num_samples < 1: raise ValueError(f"num_samples must be >= 1, got {num_samples}") if callback is None: callbacks: t.List[t.Callable] = [] elif callable(callback): callbacks = [callback] elif isinstance(callback, list): callbacks = callback else: raise ValueError(f"unsupported type '{type(callback)}' for callback arg") # Initialize should_timeout = pipeline_timeout is not None best_score = metric.worst_value best_pipeline = None # Conduct the sampling i = 0 while True: i += 1 logger.info( f"sampling pipeline {i}" f"{'/' + str(num_samples) if num_samples else ''}" ) pipeline = self.sample_pipeline(problem_type, models, transformers) try: with conditional_timeout(pipeline_timeout, should_timeout): # Train the pipeline and check its performance. start_time = time() test_score = evaluator(pipeline, X, y, metric) logger.info(f"achieved test score: {test_score}") if ( metric.is_better_than(test_score, best_score) or best_pipeline is None ): best_score = test_score best_pipeline = pipeline # Check to see if its time to stop sampling. if callback is not None: # We stop if any callback returns True. train_time = time() - start_time exit_early = any( cb(SamplerState(test_score, pipeline, train_time, i)) for cb in callbacks ) if exit_early: break if best_score == metric.best_value: logger.info( f"found best possible score {metric.best_value} early, " "stopping the search" ) break if num_samples and i >= num_samples: break except EvaluationTimeoutError: logger.info("pipeline took too long to evaluate, skipping") logger.debug(pipeline) except PipelineRunError as e: logger.exception(e) if exit_on_pipeline_error: raise e return best_pipeline, best_score, i @abstractmethod def sample_pipeline( self, problem_type: ProblemType, models: t.List[t.Type[Primitive]], transformers: t.List[t.Type[Primitive]], ) -> Pipeline: pass
Ancestors
- abc.ABC
Subclasses
Methods
def run(self, X: pandas.core.frame.DataFrame, y: pandas.core.series.Series, *, models: List[Type[Primitive]], transformers: List[Type[Primitive]], problem_type: ProblemType, metric: Metric, evaluator: Callable, pipeline_timeout: Union[int, NoneType], num_samples: Union[int, NoneType] = None, callback: Union[NoneType, Callable, List[Callable]] = None, exit_on_pipeline_error: bool = True) ‑> Tuple[Pipeline, float, int]
-
Samples
num_samples
pipelines, returning the best one found along the way.Returns
best_pipeline
:Pipeline
- The fitted best pipeline trained on the problem.
best_score
:float
- The score of the best pipeline that was trained.
n_iters
:int
- The total number of iterations the sampler completed.
Expand source code
def run( self, X: pd.DataFrame, y: pd.Series, *, models: t.List[t.Type[Primitive]], transformers: t.List[t.Type[Primitive]], problem_type: ProblemType, metric: Metric, evaluator: t.Callable, pipeline_timeout: t.Optional[int], num_samples: t.Optional[int] = None, callback: t.Union[None, t.Callable, t.List[t.Callable]] = None, exit_on_pipeline_error: bool = True, ) -> t.Tuple[Pipeline, float, int]: """Samples `num_samples` pipelines, returning the best one found along the way. Returns ------- best_pipeline : Pipeline The fitted best pipeline trained on the problem. best_score : float The score of the best pipeline that was trained. n_iters : int The total number of iterations the sampler completed. """ # Validate inputs if num_samples is None and callback is None: raise ValueError( "either num_samples or callback must be" " passed so the sampler knows when to stop" ) if num_samples is not None and num_samples < 1: raise ValueError(f"num_samples must be >= 1, got {num_samples}") if callback is None: callbacks: t.List[t.Callable] = [] elif callable(callback): callbacks = [callback] elif isinstance(callback, list): callbacks = callback else: raise ValueError(f"unsupported type '{type(callback)}' for callback arg") # Initialize should_timeout = pipeline_timeout is not None best_score = metric.worst_value best_pipeline = None # Conduct the sampling i = 0 while True: i += 1 logger.info( f"sampling pipeline {i}" f"{'/' + str(num_samples) if num_samples else ''}" ) pipeline = self.sample_pipeline(problem_type, models, transformers) try: with conditional_timeout(pipeline_timeout, should_timeout): # Train the pipeline and check its performance. start_time = time() test_score = evaluator(pipeline, X, y, metric) logger.info(f"achieved test score: {test_score}") if ( metric.is_better_than(test_score, best_score) or best_pipeline is None ): best_score = test_score best_pipeline = pipeline # Check to see if its time to stop sampling. if callback is not None: # We stop if any callback returns True. train_time = time() - start_time exit_early = any( cb(SamplerState(test_score, pipeline, train_time, i)) for cb in callbacks ) if exit_early: break if best_score == metric.best_value: logger.info( f"found best possible score {metric.best_value} early, " "stopping the search" ) break if num_samples and i >= num_samples: break except EvaluationTimeoutError: logger.info("pipeline took too long to evaluate, skipping") logger.debug(pipeline) except PipelineRunError as e: logger.exception(e) if exit_on_pipeline_error: raise e return best_pipeline, best_score, i
def sample_pipeline(self, problem_type: ProblemType, models: List[Type[Primitive]], transformers: List[Type[Primitive]]) ‑> Pipeline
-
Expand source code
@abstractmethod def sample_pipeline( self, problem_type: ProblemType, models: t.List[t.Type[Primitive]], transformers: t.List[t.Type[Primitive]], ) -> Pipeline: pass
class SamplerState (score: float, pipeline: Pipeline, train_time: float, n_iters: int)
-
SamplerState(score, pipeline, train_time, n_iters)
Expand source code
class SamplerState(NamedTuple): score: float pipeline: Pipeline train_time: float n_iters: int
Ancestors
- builtins.tuple
Instance variables
var n_iters : int
-
Alias for field number 3
var pipeline : Pipeline
-
Alias for field number 1
var score : float
-
Alias for field number 0
var train_time : float
-
Alias for field number 2