Skip to content

Batch Feature Computation

This guide shows how to use the FeatureByte SDK to compute batch feature values for a set of entities, populate a feature table, and retrieve those features for making batch predictions with a trained model.

Prerequisites

  • FeatureByte SDK installed and configured (see SDK Setup)
  • A saved Feature List in the catalog
  • A configured Use Case
  • A source warehouse table containing the entity keys for which features should be computed

Step 1: Create and Enable a Deployment

Retrieve the Feature List and Use Case from the catalog, create a deployment, then enable it.

Create and enable deployment
import featurebyte as fb

fb.use_profile("staging")
catalog = fb.activate_and_get_catalog("My Catalog")

feature_list = catalog.get_feature_list("My Feature List")
use_case = catalog.get_use_case("My Use Case")

deployment = feature_list.deploy(
    deployment_name="my_batch_deployment",
    use_case_name=use_case.name,
)
deployment.enable()

Reusing an existing deployment

If a deployment already exists for your feature list, skip this step and retrieve it directly with catalog.get_deployment("my_batch_deployment").


Step 2: Compute Batch Features and Populate Feature Table

Use compute_batch_features on the deployment, passing a source warehouse table containing entity keys. Computed feature values are written to the specified output table, with a snapshot date column added for partitioning.

Compute batch features
import datetime
import featurebyte as fb

fb.use_profile("staging")
catalog = fb.activate_and_get_catalog("My Catalog")

deployment = catalog.get_deployment("my_batch_deployment")

deployment.compute_batch_features(
    batch_request_table="`my_db`.`my_schema`.`entity_keys_table`",
    output_table_name="`my_db`.`my_schema`.`batch_features`",
    output_table_snapshot_date=datetime.date.today(),
    output_table_snapshot_date_name="snapshot_dt",
    columns=["entity_id"],  # Must include deployment primary entity
    columns_rename_mapping={"entity_id": "ENTITY_ID"},  # Column name must match serving name of primary entity
    point_in_time=None,  # Cut-off time for feature computation
)
Parameter Type Default Description
batch_request_table BatchRequestTable, SourceTable, View, or str Batch request table object or fully-qualified warehouse table name containing the required serving name columns
output_table_name str Fully-qualified name of the output table to be created or appended in the data warehouse
output_table_snapshot_date date date.today() Snapshot date written to the output table
output_table_snapshot_date_name str "snapshot_date" Column name for the snapshot date in the output table
output_table_snapshot_date_as_timeseries_key bool False When True, adds the snapshot date as a timeseries key rather than a regular primary key (Databricks only)
columns list[str] or None None Include only these columns from the source table. If None, all columns are included. Not applicable when batch_request_table is a BatchRequestTable
columns_rename_mapping dict[str, str] or None None Rename columns in the source table using a mapping of old name to new name. Not applicable when batch_request_table is a BatchRequestTable
point_in_time str, datetime, or None None Point in time to use for feature computation (UTC, no timezone info). If None, the current time is used

Output table upsert behaviour

If no rows with the specified output_table_snapshot_date exist in the output table, new rows are appended. If rows with the same snapshot date already exist, they are replaced.

Run this script on a schedule (daily, weekly, etc.) to keep the feature table up to date.


Step 3: Fetch Features for Making Batch Predictions

After features are computed, query the output table directly in your warehouse, filtering to the relevant snapshot date. Pass the resulting feature values to your trained model to generate predictions.

In a Databricks notebook, use Spark to read the feature table and filter by snapshot date.

Fetch features and score (Databricks)
import datetime
import joblib

snapshot_date = datetime.date.today().isoformat()

features_df = (
    spark.table("`my_db`.`my_schema`.`batch_features`")
    .filter(f"snapshot_dt = '{snapshot_date}'")
    .toPandas()
)

# Score with your trained model
model = joblib.load("model.pkl")
entity_col = "entity_id"
feature_cols = [c for c in features_df.columns if c not in [entity_col, "snapshot_dt"]]
features_df["prediction"] = model.predict(features_df[feature_cols])

print(features_df[[entity_col, "prediction"]].head())

Use the snowflake-connector-python package to connect and read the feature table.

Fetch features and score (Snowflake)
import datetime
import joblib
import pandas as pd
import snowflake.connector

snapshot_date = datetime.date.today().isoformat()

conn = snowflake.connector.connect(
    user="<user>",
    password="<password>",
    account="<account>",
    warehouse="<warehouse>",
    database="MY_DB",
    schema="MY_SCHEMA",
)

features_df = pd.read_sql(
    f"SELECT * FROM batch_features WHERE snapshot_dt = '{snapshot_date}'",
    con=conn,
)
conn.close()

# Score with your trained model
model = joblib.load("model.pkl")
entity_col = "entity_id"
feature_cols = [c for c in features_df.columns if c not in [entity_col, "snapshot_dt"]]
features_df["prediction"] = model.predict(features_df[feature_cols])

print(features_df[[entity_col, "prediction"]].head())

Writing predictions back to the warehouse

To persist predictions, write features_df back to your data warehouse using your preferred connector (e.g. spark.createDataFrame(features_df).write.saveAsTable(...) for Databricks, or snowflake.connector write_pandas for Snowflake).


Next Steps