Preprocessors¶
Preprocessors are optional. If none is configured, observations pass through unchanged. Use @preprocess when records need Python logic before schema queries run.
Use a query when the source shape is stable and selection is enough. Use a preprocessor when you need type coercion, renaming, source-specific cleanup, windowing, session splitting, or derived fields. The preprocessor prepares records; the schema remains the model-facing contract.
This guide uses Iris data, but starts from a deliberately awkward raw shape: feature names live in a nested dictionary and the label lives outside that feature object.
import polars as pl
from loguru import logger
from rich.pretty import pprint
import json2vec as j2v
logger.remove()
raw_records = pl.read_ndjson("docs/data/iris-raw.jsonl").head(3).to_dicts()
pprint(raw_records[0])
{ │ 'features': { │ │ 'sepal length (cm)': 5.1, │ │ 'sepal width (cm)': 3.5, │ │ 'petal length (cm)': 1.4, │ │ 'petal width (cm)': 0.2 │ }, │ 'species': 'setosa' }
A transformation preprocessor returns one normalized record for each input record. It is the right choice for type coercion, renaming, flattening, or attaching derived fields. Pass it to PolarsDataModule(..., preprocessor=...) or configure it on a dataset so training, prediction, and serving share the same transformation.
@j2v.preprocess
def simplify_iris(record: dict) -> dict:
features = record["features"]
return {
"sepal_length": float(features["sepal length (cm)"]),
"petal_length": float(features["petal length (cm)"]),
"species": record["species"],
}
pprint(simplify_iris(raw_records[0]))
{'sepal_length': 5.1, 'petal_length': 1.4, 'species': 'setosa'}
After preprocessing, default queries are inferred against the returned record, not the original raw record. The same callable can be passed directly to model.encode(...) for debugging or to data modules for training.
model = j2v.Model.from_schema(
j2v.Number("sepal_length"),
j2v.Number("petal_length"),
j2v.Category("species", target=True, max_vocab_size=4),
d_model=16,
n_layers=1,
n_heads=4,
batch_size=2,
)
encoded = model.encode(raw_records[:2], preprocess=simplify_iris)
encoded.keys()
_StringKeys(dict_keys(['record/sepal_length', 'record/petal_length', 'record/species', <TensorKey.metadata: 'metadata'>]))
datamodule = j2v.PolarsDataModule(
model=model,
train=pl.DataFrame(raw_records),
validate=pl.DataFrame(raw_records),
preprocessor=simplify_iris,
num_workers=0,
persistent_workers=False,
pin_memory=False,
)
A yielding preprocessor can expand one input into many outputs. That pattern is useful when one account history should become multiple windows, one session log should become multiple training observations, or one raw export contains many model records.
@j2v.preprocess(yields=True)
def iris_records(records):
for record in records:
yield simplify_iris(record)
pprint(list(iris_records(raw_records)))
[ │ {'sepal_length': 5.1, 'petal_length': 1.4, 'species': 'setosa'}, │ {'sepal_length': 7.0, 'petal_length': 4.7, 'species': 'versicolor'}, │ {'sepal_length': 6.3, 'petal_length': 6.0, 'species': 'virginica'} ]