pytoolkit.pipelines module

テーブルデータのパイプライン。

ここでいうパイプラインとは、pl.DataFrameを受け取り、pl.DataFrameを返すステップの連なり。 各ステップは0個以上の依存するステップを持ち、依存するステップの出力を結合したものを受け取って 当該ステップの処理を行う。

ステップ:

各ステップはpytoolkit.pipelines.Stepクラスやその派生クラスを継承して実装する。 基本は1ファイル1ステップで実装する。 ステップ名の既定値は実装したクラスのモジュールのファイル名の拡張子を除く部分となる。

ステップの実行の種類:

ステップの実行には、主にコンペでの利用を想定し、"train"と"test"の二種類がある。

ステップの種類:

以下のパターンで実装を補助する派生クラスがある。

  • "train"と"test"で同じ処理をするステップ: TransformStep

  • "train"で事前に学習や統計情報の取得などの処理を行い、 その後は"train"と"test"で同じ処理をするステップ: FitTransformStep

  • コンペなどで"train"と"test"のデータを全部まとめて処理するステップ: AllDataStep

  • "train"で学習してモデルを保存してout-of-fold predictionsを出力し、 "test"でモデルを読み込んで推論をするステップ: ModelStep

  • "train"のみ実装されているステップ: TrainOnlyStep

ステップを実装する際は、Stepクラスか上記の派生クラスの中から1つを選んで継承する。

メモリキャッシュ:

明示的に無効化していない場合、各ステップの結果はメモリにキャッシュされる。 依存関係などで同じステップが何度も呼び出される場合、2回目以降は初回の結果を返す。

ファイルキャッシュ:

明示的に無効化していない場合、各ステップの結果はファイルにもキャッシュされる。 ステップや依存先のステップが定義されたファイルの更新日時が新しくなっていれば自動的に再計算する。 (ソースコード上の依存関係などまでは追えないため注意が必要。)

リファクタリングなどで意図せず再計算が発生する場合があるため、 非常に計算に時間がかかるステップを実装する場合などには、 更新日時のチェックを個別または全体で無効化できる。 無効化した場合、必要に応じて手動で削除する。

fine:

HPOなどの時間がかかる処理はStepの中でself.fineがTrueの場合のみ行うよう実装し、 コンペの提出時などだけfine=Trueで実行する。 必要に応じてモデルやキャッシュが別管理となる。

class pytoolkit.pipelines.Step[ソース]

ベースクラス: object

パイプラインの各ステップ。

module_path[ソース]

ステップが定義されたファイルのパス

name[ソース]

ステップ名 (既定値: self.module_path.stem)

depends_on[ソース]

依存先ステップ名の配列 (既定値: [])

use_file_cache[ソース]

結果をファイルにキャッシュするのか否か (既定値: True)

use_memory_cache[ソース]

結果をメモリにキャッシュするのか否か (既定値: True)

check_file_cache_time[ソース]

ファイルキャッシュの簡易更新日時チェックをするのか否か (既定値: True)

has_fine_train[ソース]

fine=Trueな場合にtrainで特別な処理を実装しているのか否か (既定値: False)

has_fine_test[ソース]

fine=Trueな場合にtestで特別な処理を実装しているのか否か (既定値: False) has_fine_trainがFalseでhas_fine_testがTrueな場合はTTAなどを想定したもの。 モデルは共通だが推論結果のキャッシュは別となる。

サンプル

派生クラスでuse_file_cacheなどを変更したい場合は__init__をオーバーライドする。

def __init__(self) -> None:
    super().__init__()
    self.depends_on = []
    self.use_file_cache = True
    self.use_memory_cache = True
    self.check_file_cache_time = True
    self.has_fine_train = False
    self.has_fine_test = False
property logger[ソース]

ロガー (派生クラスで実装時に使う用)

has_fine(run_type)[ソース]

fine=Trueな場合に特別な処理を実装しているのか否か

パラメータ:

run_type (Literal['train', 'test']) --

戻り値の型:

bool

property fine: bool[ソース]

高精度な学習・推論を行うのか否か

property model_dir: Path[ソース]

モデルを保存したりしたいときのディレクトリ

abstract run(df, run_type)[ソース]

当該ステップの処理。Pipeline経由で呼び出される。

パラメータ:
  • df (DataFrame) --

  • run_type (Literal['train', 'test']) --

戻り値の型:

DataFrame

invoke_single(step_types, invoke_type=None, cache='use')[ソース]

指定ステップの実行。(結果が1列なことがわかっているとき用の糖衣構文)

パラメータ:
  • step_types (str | list[str]) -- 実行するステップのクラス

  • run_type -- 実行するステップの種類

  • cache (Literal['use', 'ignore', 'disable']) -- ignoreにするとキャッシュがあっても読み込まない、disableにすると保存もしない。(伝播はしない)

  • invoke_type (Literal['train', 'test'] | ~typing.Literal['all'] | None) --

戻り値:

実行結果

戻り値の型:

Series

invoke(step_types, invoke_type=None, cache='use')[ソース]

指定ステップの実行。

パラメータ:
  • step_types (str | list[str]) -- 実行するステップのクラス

  • run_type -- 実行するステップの種類

  • cache (Literal['use', 'ignore', 'disable']) -- ignoreにするとキャッシュがあっても読み込まない、disableにすると保存もしない。(伝播はしない)

  • invoke_type (Literal['train', 'test'] | ~typing.Literal['all'] | None) --

戻り値:

実行結果

戻り値の型:

DataFrame

class pytoolkit.pipelines.Pipeline(models_dir, cache_dir, fine=False, check_file_cache_time=True)[ソース]

ベースクラス: object

パイプラインを管理するクラス。

パラメータ:
  • models_dir (str | PathLike[str]) -- モデルやログの保存先ディレクトリ

  • cache_dir (str | PathLike[str]) -- キャッシュの保存先ディレクトリ

  • fine (bool) -- 高精度な学習・推論を行うのか否か

  • check_file_cache_time (bool) -- ファイルキャッシュの簡易更新日時チェックをするのか否か (既定値: True)

add_all()[ソース]

__main__と同じディレクトリの全*.pyファイルからStepを作成して追加する。

戻り値の型:

None

add_from_file(file)[ソース]

Pythonファイルからステップを作成して追加。

パラメータ:

file (str | PathLike[str]) --

戻り値の型:

None

add(step)[ソース]

ステップの追加。

パラメータ:

step (Step) --

戻り値の型:

None

check()[ソース]

依存関係のチェック

戻り値の型:

None

invoke(steps, invoke_type, cache='use')[ソース]

ステップの実行。

パラメータ:
  • steps (str | list[str] | Step | list[pytoolkit.pipelines.Step]) -- 実行するステップの名前 or インスタンス。複数指定可。

  • invoke_type (Literal['train', 'test'] | ~typing.Literal['all'] | None) -- 実行するステップの種類。省略時は現在実行中のステップ。

  • cache (Literal['use', 'ignore', 'disable']) -- ignoreにするとキャッシュがあっても読み込まない、disableにすると保存もしない。(伝播はしない)

戻り値:

処理結果

戻り値の型:

DataFrame

get_step(step_name)[ソース]

Stepのインスタンスを返す。

パラメータ:

step_name (str) --

戻り値の型:

Step

class pytoolkit.pipelines.TransformStep[ソース]

ベースクラス: Step

データ変換ステップ。

サンプル

class Step(pytoolkit.pipelines.TransformStep):
    def __init__(self):
        super().__init__()
        self.depends_on = ["xxx"]

    def transform(self, df: pl.DataFrame) -> pl.DataFrame:
        return df
run(df, run_type)[ソース]

当該ステップの処理

パラメータ:
  • df (DataFrame) --

  • run_type (Literal['train', 'test']) --

戻り値の型:

DataFrame

abstract transform(df)[ソース]

変換処理

パラメータ:

df (DataFrame) --

戻り値の型:

DataFrame

class pytoolkit.pipelines.FitTransformStep[ソース]

ベースクラス: Step

特徴量作成ステップ。

transformer_save_name[ソース]

保存するファイル名 (既定値: "transformer.pkl")

サンプル

class Step(pytoolkit.pipelines.FitTransformStep):
    def __init__(self):
        super().__init__()
        self.depends_on = ["xxx"]

    def fit(self, df_train: pl.DataFrame) -> typing.Any:
        transformer = {}
        return transformer

    def transform(self, transformer, df: pl.DataFrame) -> pl.DataFrame:
        return df
run(df, run_type)[ソース]

当該ステップの処理

パラメータ:
  • df (DataFrame) --

  • run_type (Literal['train', 'test']) --

戻り値の型:

DataFrame

abstract fit(df_train)[ソース]

訓練&保存

パラメータ:

df_train (DataFrame) --

戻り値の型:

Any

abstract transform(transformer, df)[ソース]

変換処理

パラメータ:

df (DataFrame) --

戻り値の型:

DataFrame

save_transformer(transformer, save_path)[ソース]

変換用情報の保存

パラメータ:

save_path (Path) --

戻り値の型:

Any

load_transformer(save_path)[ソース]

変換用情報の読み込み

パラメータ:

save_path (Path) --

戻り値の型:

Any

class pytoolkit.pipelines.AllDataStep[ソース]

ベースクラス: Step

trainで全データを使って処理するステップ。

self.is_test_column_name の bool列を追加して結合したものがdf_allとして渡される。

is_test_column_name[ソース]

追加する列名 (既定値: "_is_test_")

サンプル

class Step(pytoolkit.pipelines.AllDataStep):
    def __init__(self):
        super().__init__()
        self.depends_on = ["xxx"]

    def fit_transform(self, df_all: pl.DataFrame) -> pl.DataFrame:
        return df_all
run(df, run_type)[ソース]

当該ステップの処理

パラメータ:
  • df (DataFrame) --

  • run_type (Literal['train', 'test']) --

戻り値の型:

DataFrame

abstract fit_transform(df_all)[ソース]

処理

パラメータ:

df_all (DataFrame) --

戻り値の型:

DataFrame

class pytoolkit.pipelines.ModelStep[ソース]

ベースクラス: Step

機械学習モデルなど、train/testで別々の処理をするステップ。

サンプル

class Step(pytoolkit.pipelines.ModelStep):
    def __init__(self):
        super().__init__()
        self.depends_on = ["xxx"]

    def train(self, df_train: pl.DataFrame) -> pl.DataFrame:
        return df_train

    def test(self, df_test: pl.DataFrame) -> pl.DataFrame:
        return df_test
run(df, run_type)[ソース]

当該ステップの処理

パラメータ:
  • df (DataFrame) --

  • run_type (Literal['train', 'test']) --

戻り値の型:

DataFrame

abstract train(df_train)[ソース]

学習してモデルを保存してoofpを返す

パラメータ:

df_train (DataFrame) --

戻り値の型:

DataFrame

abstract test(df_test)[ソース]

モデルを読み込んで推論して結果を返す

パラメータ:

df_test (DataFrame) --

戻り値の型:

DataFrame

class pytoolkit.pipelines.TrainOnlyStep[ソース]

ベースクラス: Step

グループIDやラベル周りの処理など、"train"のみ実装されているステップ。

サンプル

class Step(pytoolkit.pipelines.TrainOnlyStep):
    def __init__(self):
        super().__init__()
        self.depends_on = ["xxx"]

    def train(self, df_train: pl.DataFrame) -> pl.DataFrame:
        return df_train
run(df, run_type)[ソース]

当該ステップの処理

パラメータ:
  • df (DataFrame) --

  • run_type (Literal['train', 'test']) --

戻り値の型:

DataFrame

abstract train(df_train)[ソース]

学習してモデルを保存してoofpを返す

パラメータ:

df_train (DataFrame) --

戻り値の型:

DataFrame