Batch Feature Computation¶
This guide shows how to use the FeatureByte SDK to compute batch feature values for a set of entities, populate a feature table, and retrieve those features for making batch predictions with a trained model.
Prerequisites¶
- FeatureByte SDK installed and configured (see SDK Setup)
- A saved Feature List in the catalog
- A configured Use Case
- A source warehouse table containing the entity keys for which features should be computed
Step 1: Create and Enable a Deployment¶
Retrieve the Feature List and Use Case from the catalog, create a deployment, then enable it.
import featurebyte as fb
fb.use_profile("staging")
catalog = fb.activate_and_get_catalog("My Catalog")
feature_list = catalog.get_feature_list("My Feature List")
use_case = catalog.get_use_case("My Use Case")
deployment = feature_list.deploy(
deployment_name="my_batch_deployment",
use_case_name=use_case.name,
)
deployment.enable()
Reusing an existing deployment
If a deployment already exists for your feature list, skip this step and retrieve it directly with catalog.get_deployment("my_batch_deployment").
Step 2: Compute Batch Features and Populate Feature Table¶
Use compute_batch_features on the deployment, passing a source warehouse table containing entity keys. Computed feature values are written to the specified output table, with a snapshot date column added for partitioning.
import datetime
import featurebyte as fb
fb.use_profile("staging")
catalog = fb.activate_and_get_catalog("My Catalog")
deployment = catalog.get_deployment("my_batch_deployment")
deployment.compute_batch_features(
batch_request_table="`my_db`.`my_schema`.`entity_keys_table`",
output_table_name="`my_db`.`my_schema`.`batch_features`",
output_table_snapshot_date=datetime.date.today(),
output_table_snapshot_date_name="snapshot_dt",
columns=["entity_id"], # Must include deployment primary entity
columns_rename_mapping={"entity_id": "ENTITY_ID"}, # Column name must match serving name of primary entity
point_in_time=None, # Cut-off time for feature computation
)
| Parameter | Type | Default | Description |
|---|---|---|---|
batch_request_table |
BatchRequestTable, SourceTable, View, or str |
— | Batch request table object or fully-qualified warehouse table name containing the required serving name columns |
output_table_name |
str |
— | Fully-qualified name of the output table to be created or appended in the data warehouse |
output_table_snapshot_date |
date |
date.today() |
Snapshot date written to the output table |
output_table_snapshot_date_name |
str |
"snapshot_date" |
Column name for the snapshot date in the output table |
output_table_snapshot_date_as_timeseries_key |
bool |
False |
When True, adds the snapshot date as a timeseries key rather than a regular primary key (Databricks only) |
columns |
list[str] or None |
None |
Include only these columns from the source table. If None, all columns are included. Not applicable when batch_request_table is a BatchRequestTable |
columns_rename_mapping |
dict[str, str] or None |
None |
Rename columns in the source table using a mapping of old name to new name. Not applicable when batch_request_table is a BatchRequestTable |
point_in_time |
str, datetime, or None |
None |
Point in time to use for feature computation (UTC, no timezone info). If None, the current time is used |
Output table upsert behaviour
If no rows with the specified output_table_snapshot_date exist in the output table, new rows are appended. If rows with the same snapshot date already exist, they are replaced.
Run this script on a schedule (daily, weekly, etc.) to keep the feature table up to date.
Step 3: Fetch Features for Making Batch Predictions¶
After features are computed, query the output table directly in your warehouse, filtering to the relevant snapshot date. Pass the resulting feature values to your trained model to generate predictions.
In a Databricks notebook, use Spark to read the feature table and filter by snapshot date.
import datetime
import joblib
snapshot_date = datetime.date.today().isoformat()
features_df = (
spark.table("`my_db`.`my_schema`.`batch_features`")
.filter(f"snapshot_dt = '{snapshot_date}'")
.toPandas()
)
# Score with your trained model
model = joblib.load("model.pkl")
entity_col = "entity_id"
feature_cols = [c for c in features_df.columns if c not in [entity_col, "snapshot_dt"]]
features_df["prediction"] = model.predict(features_df[feature_cols])
print(features_df[[entity_col, "prediction"]].head())
Use the snowflake-connector-python package to connect and read the feature table.
import datetime
import joblib
import pandas as pd
import snowflake.connector
snapshot_date = datetime.date.today().isoformat()
conn = snowflake.connector.connect(
user="<user>",
password="<password>",
account="<account>",
warehouse="<warehouse>",
database="MY_DB",
schema="MY_SCHEMA",
)
features_df = pd.read_sql(
f"SELECT * FROM batch_features WHERE snapshot_dt = '{snapshot_date}'",
con=conn,
)
conn.close()
# Score with your trained model
model = joblib.load("model.pkl")
entity_col = "entity_id"
feature_cols = [c for c in features_df.columns if c not in [entity_col, "snapshot_dt"]]
features_df["prediction"] = model.predict(features_df[feature_cols])
print(features_df[[entity_col, "prediction"]].head())
Writing predictions back to the warehouse
To persist predictions, write features_df back to your data warehouse using your preferred connector (e.g. spark.createDataFrame(features_df).write.saveAsTable(...) for Databricks, or snowflake.connector write_pandas for Snowflake).
Next Steps¶
- Schedule this script using your preferred orchestration tool (Airflow, Databricks Jobs, etc.)
- SQL Export Tutorials — export features as SQL instead of computing them via the SDK