A Coding Guide to Building Fast End-to-End Computing and Machine Learning Routing on Millions of Rows Using Vaex

In this tutorial, we design an end-to-end, production-style analysis and modeling pipeline using it Vax efficient for millions of rows without committing data to memory. We generate a real-time, large-scale data set, rich in engineering behavior and city-level characteristics using lazy expressions and approximate statistics, as well as aggregated data at scale. We then combined Vaex with scikit-learn to train and test a predictive model, showing how Vaex can act as the backbone of analytics that evaluates performance and machine learning workflows.
!pip -q install "vaex==4.19.0" "vaex-core==4.19.0" "vaex-ml==0.19.0" "vaex-viz==0.6.0" "vaex-hdf5==0.15.0" "pyarrow>=14" "scikit-learn>=1.3"
import os, time, json, numpy as np, pandas as pd
import vaex
import vaex.ml
from vaex.ml.sklearn import Predictor
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import roc_auc_score, average_precision_score
print("Python:", __import__("sys").version.split()[0])
print("vaex:", vaex.__version__)
print("numpy:", np.__version__)
print("pandas:", pd.__version__)
rng = np.random.default_rng(7)
n = 2_000_000
cities = np.array(["Montreal","Toronto","Vancouver","Calgary","Ottawa","Edmonton","Quebec City","Winnipeg"], dtype=object)
city = rng.choice(cities, size=n, replace=True, p=np.array([0.16,0.18,0.12,0.10,0.10,0.10,0.10,0.14]))
age = rng.integers(18, 75, size=n, endpoint=False).astype("int32")
tenure_m = rng.integers(0, 180, size=n, endpoint=False).astype("int32")
tx = rng.poisson(lam=22, size=n).astype("int32")
base_income = rng.lognormal(mean=10.6, sigma=0.45, size=n).astype("float64")
city_mult = pd.Series({"Montreal":0.92,"Toronto":1.05,"Vancouver":1.10,"Calgary":1.02,"Ottawa":1.00,"Edmonton":0.98,"Quebec City":0.88,"Winnipeg":0.90}).reindex(city).to_numpy()
income = (base_income * city_mult * (1.0 + 0.004*(age-35)) * (1.0 + 0.0025*np.minimum(tenure_m,120))).astype("float64")
income = np.clip(income, 18_000, 420_000)
noise = rng.normal(0, 1, size=n).astype("float64")
score_latent = (
0.55*np.log1p(income/1000.0)
+ 0.28*np.log1p(tx)
+ 0.18*np.sqrt(np.maximum(tenure_m,0)/12.0 + 1e-9)
- 0.012*(age-40)
+ 0.22*(city == "Vancouver").astype("float64")
+ 0.15*(city == "Toronto").astype("float64")
+ 0.10*(city == "Ottawa").astype("float64")
+ 0.65*noise
)
p = 1.0/(1.0 + np.exp(-(score_latent - np.quantile(score_latent, 0.70))))
target = (rng.random(n) < p).astype("int8")
df = vaex.from_arrays(city=city, age=age, tenure_m=tenure_m, tx=tx, income=income, target=target)
df["income_k"] = df.income / 1000.0
df["tenure_y"] = df.tenure_m / 12.0
df["log_income"] = df.income.log1p()
df["tx_per_year"] = df.tx / (df.tenure_y + 0.25)
df["value_score"] = (0.35*df.log_income + 0.20*df.tx_per_year + 0.10*df.tenure_y - 0.015*df.age)
df["is_new"] = df.tenure_m < 6
df["is_senior"] = df.age >= 60
print("nRows:", len(df), "Columns:", len(df.get_column_names()))
print(df[["city","age","tenure_m","tx","income","income_k","value_score","target"]].head(5))
We create a large, virtual synthetic dataset and implement a Vaex DataFrame to run lazily on millions of rows. We engineer numeric properties directly as expressions so that no intermediate data is made portable. We verify the setup by checking the schema, counting rows, and a small sample of calculated values.
encoder = vaex.ml.LabelEncoder(features=["city"])
df = encoder.fit_transform(df)
city_map = encoder.labels_["city"]
inv_city_map = {v:k for k,v in city_map.items()}
n_cities = len(city_map)
p95_income_k_by_city = df.percentile_approx("income_k", 95, binby="label_encoded_city", shape=n_cities, limits=[-0.5, n_cities-0.5])
p50_value_by_city = df.percentile_approx("value_score", 50, binby="label_encoded_city", shape=n_cities, limits=[-0.5, n_cities-0.5])
avg_income_k_by_city = df.mean("income_k", binby="label_encoded_city", shape=n_cities, limits=[-0.5, n_cities-0.5])
target_rate_by_city = df.mean("target", binby="label_encoded_city", shape=n_cities, limits=[-0.5, n_cities-0.5])
n_by_city = df.count(binby="label_encoded_city", shape=n_cities, limits=[-0.5, n_cities-0.5])
p95_income_k_by_city = np.asarray(p95_income_k_by_city).reshape(-1)
p50_value_by_city = np.asarray(p50_value_by_city).reshape(-1)
avg_income_k_by_city = np.asarray(avg_income_k_by_city).reshape(-1)
target_rate_by_city = np.asarray(target_rate_by_city).reshape(-1)
n_by_city = np.asarray(n_by_city).reshape(-1)
city_table = pd.DataFrame({
"city_id": np.arange(n_cities),
"city": [inv_city_map[i] for i in range(n_cities)],
"n": n_by_city.astype("int64"),
"avg_income_k": avg_income_k_by_city,
"p95_income_k": p95_income_k_by_city,
"median_value_score": p50_value_by_city,
"target_rate": target_rate_by_city
}).sort_values(["target_rate","p95_income_k"], ascending=False)
print("nCity summary:")
print(city_table.to_string(index=False))
df_city_features = vaex.from_pandas(city_table[["city","p95_income_k","avg_income_k","median_value_score","target_rate"]], copy_index=False)
df = df.join(df_city_features, on="city", rsuffix="_city")
df["income_vs_city_p95"] = df.income_k / (df.p95_income_k + 1e-9)
df["value_vs_city_median"] = df.value_score - df.median_value_score
We code the city data and calculate measurable statistics, for almost every city using binning-based functions. We collate these statistics into a city-level table and merge them back into the main dataset. We then find related factors that compare each record against its city context.
features_num = [
"age","tenure_y","tx","income_k","log_income","tx_per_year","value_score",
"p95_income_k","avg_income_k","median_value_score","target_rate",
"income_vs_city_p95","value_vs_city_median"
]
scaler = vaex.ml.StandardScaler(features=features_num, with_mean=True, with_std=True, prefix="z_")
df = scaler.fit_transform(df)
features = ["z_"+f for f in features_num] + ["label_encoded_city"]
df_train, df_test = df.split_random([0.80, 0.20], random_state=42)
model = LogisticRegression(max_iter=250, solver="lbfgs", n_jobs=None)
vaex_model = Predictor(model=model, features=features, target="target", prediction_name="pred")
t0 = time.time()
vaex_model.fit(df=df_train)
fit_s = time.time() - t0
df_test = vaex_model.transform(df_test)
y_true = df_test["target"].to_numpy()
y_pred = df_test["pred"].to_numpy()
auc = roc_auc_score(y_true, y_pred)
ap = average_precision_score(y_true, y_pred)
print("nModel:")
print("fit_seconds:", round(fit_s, 3))
print("test_auc:", round(float(auc), 4))
print("test_avg_precision:", round(float(ap), 4))
We measure all numerical features using Vaex’s ML tools and prepare a constant feature vector for modeling. We partition the dataset without loading the entire dataset into memory. We train a regression model with Vaex’s sklearn wrapper and test its predictive performance.
deciles = np.linspace(0, 1, 11)
cuts = np.quantile(y_pred, deciles)
cuts[0] = -np.inf
cuts[-1] = np.inf
bucket = np.digitize(y_pred, cuts[1:-1], right=True).astype("int32")
df_test_local = vaex.from_arrays(y_true=y_true.astype("int8"), y_pred=y_pred.astype("float64"), bucket=bucket)
lift = df_test_local.groupby(by="bucket", agg={"n": vaex.agg.count(), "rate": vaex.agg.mean("y_true"), "avg_pred": vaex.agg.mean("y_pred")}).sort("bucket")
lift_pd = lift.to_pandas_df()
baseline = float(df_test_local["y_true"].mean())
lift_pd["lift"] = lift_pd["rate"] / (baseline + 1e-12)
print("nDecile lift table:")
print(lift_pd.to_string(index=False))
We analyze the behavior of the model by dividing the predictions into deciles and computing lift metrics. We calculate the base rates and compare them across the score buckets to assess the quality of the ranking. We summarize the results in a joint lift table showing real-world model diagnostics.
out_dir = "/content/vaex_artifacts"
os.makedirs(out_dir, exist_ok=True)
parquet_path = os.path.join(out_dir, "customers_vaex.parquet")
state_path = os.path.join(out_dir, "vaex_pipeline.json")
base_cols = ["city","label_encoded_city","age","tenure_m","tenure_y","tx","income","income_k","value_score","target"]
export_cols = base_cols + ["z_"+f for f in features_num]
df_export = df[export_cols].sample(n=500_000, random_state=1)
if os.path.exists(parquet_path):
os.remove(parquet_path)
df_export.export_parquet(parquet_path)
pipeline_state = {
"vaex_version": vaex.__version__,
"encoder_labels": {k: {str(kk): int(vv) for kk,vv in v.items()} for k,v in encoder.labels_.items()},
"scaler_mean": [float(x) for x in scaler.mean_],
"scaler_std": [float(x) for x in scaler.std_],
"features_num": features_num,
"export_cols": export_cols,
}
with open(state_path, "w") as f:
json.dump(pipeline_state, f)
df_reopen = vaex.open(parquet_path)
df_reopen["income_k"] = df_reopen.income / 1000.0
df_reopen["tenure_y"] = df_reopen.tenure_m / 12.0
df_reopen["log_income"] = df_reopen.income.log1p()
df_reopen["tx_per_year"] = df_reopen.tx / (df_reopen.tenure_y + 0.25)
df_reopen["value_score"] = (0.35*df_reopen.log_income + 0.20*df_reopen.tx_per_year + 0.10*df_reopen.tenure_y - 0.015*df_reopen.age)
df_city_features = vaex.from_pandas(city_table[["city","p95_income_k","avg_income_k","median_value_score","target_rate"]], copy_index=False)
df_reopen = df_reopen.join(df_city_features, on="city", rsuffix="_city")
df_reopen["income_vs_city_p95"] = df_reopen.income_k / (df_reopen.p95_income_k + 1e-9)
df_reopen["value_vs_city_median"] = df_reopen.value_score - df_reopen.median_value_score
with open(state_path, "r") as f:
st = json.load(f)
labels_city = {k: int(v) for k,v in st["encoder_labels"]["city"].items()}
df_reopen["label_encoded_city"] = df_reopen.city.map(labels_city, default_value=-1)
for i, feat in enumerate(st["features_num"]):
mean_i = st["scaler_mean"][i]
std_i = st["scaler_std"][i] if st["scaler_std"][i] != 0 else 1.0
df_reopen["z_"+feat] = (df_reopen[feat] - mean_i) / std_i
df_reopen = vaex_model.transform(df_reopen)
print("nArtifacts written:")
print(parquet_path)
print(state_path)
print("nReopened parquet predictions (head):")
print(df_reopen[["city","income_k","value_score","pred","target"]].head(8))
print("nDone.")
We export a large, full-featured sample to Parquet and insist on a full pre-processing condition for reproduction. We reload the data and painstakingly rebuild all the engineered features from the saved metadata. We make assumptions on the reloaded dataset to ensure that the pipeline remains stable and usable in the end.
In conclusion, we have shown how Vaex enables fast, memory-efficient data processing while still supporting advanced feature engineering, integration, and model integration. We’ve shown that finite statistics, lazy computation, and out-of-the-box implementations allow us to scale cleanly from analytics to ready-to-use artifacts. By deploying reproducible features and insisting on a full pipeline state, we’ve closed the loop from raw data to inference, showing how Vaex fits naturally into modern Python workflows for big data.
Check it out Full Codes here. Also, feel free to follow us Twitter and don’t forget to join our 120k+ ML SubReddit and Subscribe to Our newspaper. Wait! are you on telegram? now you can join us on telegram too.



