In this tutorial, we walk through an end-to-end implementation of an advanced machine learning pipeline using ZenML. We begin by setting up the environmen…

MarkTechPost lagi ngeluarin cerita yang cukup penting: In this tutorial, we walk through an end-to-end implementation of an advanced machine learning pipeline using ZenML. We begin by setting up the environment and initializing a ZenML project, then define a custom materializer that enables seamless serialization and metadata extraction for a domain-specific dataset objec…. Buat AI, ini biasanya bukan cuma soal model atau demo baru, tapi soal arah product strategy. Kalau lo ngikutin ai updates, cerita kayak gini sering jadi tanda bahwa batas antara “eksperimen” dan “alat kerja harian” makin tipis.

Kalau kita lihat lebih jauh, In this tutorial, we walk through an end-to-end implementation of an advanced machine learning pipeline using ZenML . We begin by setting up the environment and initializing a ZenML project, then define a custom materializer that enables seamless serialization and metadata extraction for a domain-specific dataset object. As we progress, we build a modular pipeline that performs data loading, preprocessing, and a fan-out hyperparameter search across multiple models. We evaluate each candidate, log rich metadata at every step, and use a fan-in strategy to select and promote the best-performing model. Throughout the process, we leverage ZenML’s model control plane, artifact tracking, and caching mechanisms to ensure full reproducibility, transparency, and efficiency. Copy Code Copied Use a different Browser import os, sys, subprocess, json, shutil from pathlib import Path def _sh(cmd, check=True): print(f"$ {' '.join(cmd)}") return subprocess.run(cmd, check=check) _sh([sys.executable, "-m", "pip", "install", "-q", "zenml[server]", "scikit-learn", "pandas", "pyarrow"]) PROJECT = Path("/content/zenml_advanced_tutorial") if Path("/content").exists() \ else Path.cwd() / "zenml_advanced_tutorial" if PROJECT.exists(): shutil.rmtree(PROJECT) PROJECT.mkdir(parents=True) os.chdir(PROJECT) os.environ["ZENML_ANALYTICS_OPT_IN"] = "false" os.environ["ZENML_LOGGING_VERBOSITY"] = "WARN" _sh(["zenml", "init"], check=False) We set up the entire environment by installing required libraries and initializing a ZenML project workspace. We create a clean working directory and configure environment variables to control logging and analytics behavior. Finally, we bootstrap the ZenML repository so that all subsequent pipeline operations are properly tracked and managed. Copy Code Copied Use a different Browser from typing import Annotated, Tuple, Dict, List, Any import numpy as np from sklearn.datasets import load_breast_cancer from sklearn.model_selection import train_test_split from sklearn.ensemble import RandomForestClassifier, GradientBoostingClassifier from sklearn.linear_model import LogisticRegression from sklearn.metrics import accuracy_score, f1_score, roc_auc_score from sklearn.preprocessing import StandardScaler from zenml import pipeline, step, log_metadata, Model, get_step_context from zenml.client import Client from zenml.materializers.base_materializer import BaseMaterializer from zenml.enums import ArtifactType from zenml.io import fileio class DatasetBundle: def __init__(self, X, y, feature_names, stats=None): self.X = np.asarray(X) self.y = np.asarray(y) self.feature_names = list(feature_names) self.stats = stats or {} class DatasetBundleMaterializer(BaseMaterializer): ASSOCIATED_TYPES = (DatasetBundle,) ASSOCIATED_ARTIFACT_TYPE = ArtifactType.DATA def load(self, data_type): with fileio.open(os.path.join(self.uri, "X.npy"), "rb") as f: X = np.load(f) with fileio.open(os.path.join(self.uri, "y.npy"), "rb") as f: y = np.load(f) with fileio.open(os.path.join(self.uri, "meta.json"), "r") as f: meta = json.loads(f.read()) return DatasetBundle(X, y, meta["feature_names"], meta["stats"]) def save(self, bundle): with fileio.open(os.path.join(self.uri, "X.npy"), "wb") as f: np.save(f, bundle.X) with fileio.open(os.path.join(self.uri, "y.npy"), "wb") as f: np.save(f, bundle.y) with fileio.open(os.path.join(self.uri, "meta.json"), "w") as f: f.write(json.dumps({ "feature_names": bundle.feature_names, "stats": bundle.stats, })) def extract_metadata(self, bundle): classes, counts = np.unique(bundle.y, return_counts=True) return { "n_samples": int(bundle.X.shape[0]), "n_features": int(bundle.X.shape[1]), "class_distribution": {str(c): int(n) for c, n in zip(classes, counts)}, } We import all necessary libraries and define a custom data container along with its materializer. We implement logic to save, load, and extract metadata from our dataset, enabling seamless artifact handling in ZenML. This ensures that our data is not only stored efficiently but also enriched with meaningful, queryable metadata. Copy Code Copied Use a different Browser @step(enable_cache=True) def load_data() -> Annotated[DatasetBundle, "raw_dataset"]: data = load_breast_cancer() return DatasetBundle( data.data, data.target, data.feature_names, stats={"source": "sklearn.datasets.load_breast_cancer"}, ) @step def split_and_scale( bundle: DatasetBundle, test_size: float = 0.2, random_state: int = 42, ) -> Tuple[ Annotated[np.ndarray, "X_train"], Annotated[np.ndarray, "X_test"], Annotated[np.ndarray, "y_train"], Annotated[np.ndarray, "y_test"], ]: X_tr, X_te, y_tr, y_te = train_test_split( bundle.X, bundle.y, test_size=test_size, random_state=random_state, stratify=bundle.y, ) scaler = StandardScaler().fit(X_tr) X_tr, X_te = scaler.transform(X_tr), scaler.transform(X_te) log_metadata(metadata={"train_size": len(X_tr), "test_size": len(X_te)}) return X_tr, X_te, y_tr, y_te @step def train_candidate( X_train: np.ndarray, y_train: np.ndarray, model_type: str = "random_forest", n_estimators: int = 100, max_depth: int = 5, ) -> Annotated[Any, "candidate_model"]: if model_type == "random_forest": m = RandomForestClassifier(n_estimators=n_estimators, max_depth=max_depth, random_state=42) elif model_type == "gradient_boosting": m = GradientBoostingClassifier(n_estimators=n_estimators, max_depth=max_depth, random_state=42) else: m = LogisticRegression(max_iter=2000, random_state=42) m.fit(X_train, y_train) log_metadata(metadata={ "model_type": model_type, "hyperparameters": {"n_estimators": n_estimators, "max_depth": max_depth}, }) return m We define core pipeline steps for loading data, splitting it, scaling features, and training model candidates. We ensure that data loading is cached for efficiency while logging key metadata during preprocessing and training. This forms the backbone of our pipeline, where each model is trained independently with its respective configuration. Copy Code Copied Use a different Browser @step def evaluate_candidate( model: Any, X_test: np.ndarray, y_test: np.ndarray, label: str, ) -> Annotated[Dict[str, Any], "metrics"]: preds = model.predict(X_test) probs = (model.predict_proba(X_test)[:, 1] if hasattr(model, "predict_proba") else preds) metrics: Dict[str, Any] = { "accuracy": float(accuracy_score(y_test, preds)), "f1": float(f1_score(y_test, preds)), "roc_auc": float(roc_auc_score(y_test, probs)), "label": label, } log_metadata(metadata=metrics) return metrics @step def select_best( metrics_list: List[Dict[str, Any]], models: List[Any], ) -> Annotated[Any, "production_model"]: best_idx = max(range(len(metrics_list)), key=lambda i: metrics_list[i]["roc_auc"]) best = metrics_list[best_idx] ctx = get_step_context() try: ctx.model.log_metadata({"chosen_candidate": best, "candidate_index": best_idx}) except Exception as e: print(f" (model metadata log skipped: {e})") log_metadata(metadata={ "winning_metrics": {k: v for k, v in best.items() if k != "label"}, }) print(f"\n Best candidate: {best['label']} → " f"ROC AUC = {best['roc_auc']:.4f}\n") return models[best_idx] We evaluate each trained model using multiple performance metrics and log the results. We then implement a selection mechanism that identifies the best-performing model based on ROC AUC. Additionally, we attach relevant metadata to the model version, enabling traceability and informed decision-making. Copy Code Copied Use a different Browser SEARCH_SPACE = [ {"model_type": "random_forest", "n_estimators": 50, "max_depth": 3}, {"model_type": "random_forest", "n_estimators": 200, "max_depth": 7}, {"model_type": "gradient_boosting", "n_estimators": 100, "max_depth": 3}, {"model_type": "logistic", "n_estimators": 1, "max_depth": 1}, ] PRODUCTION_MODEL = Model( name="breast_cancer_classifier", description="Best model from in-pipeline hyperparameter search", tags=["tutorial", "advanced"], ) @pipeline(model=PRODUCTION_MODEL, enable_cache=True) def training_pipeline(test_size: float = 0.2): bundle = load_data() models, metrics = [], [] for i, cfg in enumerate(SEARCH_SPACE): m = train_candidate( X_train, y_train, **cfg, id=f"train_{i}_{cfg['model_type']}", ) s = evaluate_candidate( m, X_test, y_test, label=f"{cfg['model_type']}(n={cfg['n_estimators']},d={cfg['max_depth']})", id=f"eval_{i}", ) models.append(m) metrics.append(s) select_best(metrics, models) print("\n" + "=" * 70 + "\n RUNNING TRAINING PIPELINE\n" + "=" * 70) run_obj = training_pipeline() print("\n" + "=" * 70 + "\n INSPECTING THE RUN\n" + "=" * 70) client = Client() run = client.get_pipeline_run(run_obj.id) print(f"\nPipeline: {run.pipeline.name}") print(f"Run name: {run.name}") print(f"Status: {run.status}") print(f"Step runs: {len(run.steps)}") for name, step_run in run.steps.items(): print(f" • {name:35s} status={step_run.status}") print("\nRun-level metadata (aggregated from steps):") for k, v in (run.run_metadata or {}).items(): short = str(v) print(f" {k}: {short[:80]}{'…' if len(short) > 80 else ''}") print("\n" + "-" * 70 + "\n MODEL CONTROL PLANE\n" + "-" * 70) try: mv = client.get_model_version(PRODUCTION_MODEL.name, "latest") except Exception: mv = client.list_model_versions(model_name_or_id=PRODUCTION_MODEL.name)[0] print(f"Model: {mv.model.name}") print(f"Version: {mv.name} (number={mv.number})") linked = list(mv.data_artifact_ids.keys()) if hasattr(mv, "data_artifact_ids") else [] print(f"Linked outputs: {linked or '(see dashboard)'}") if mv.run_metadata: print("Version metadata:") for k, v in dict(mv.run_metadata).items(): print(f" {k}: {str(v)[:80]}") print("\n" + "-" * 70 + "\n RELOADING ARTIFACTS DIRECTLY\n" + "-" * 70) prod_artifact = client.get_artifact_version("production_model") prod_model = prod_artifact.load() print(f"Loaded model class: {type(prod_model).__name__}") print(f"Artifact metadata: {dict(prod_artifact.run_metadata) if prod_artifact.run_metadata else '{}'}"[:120]) X_test_arr = client.get_artifact_version("X_test").load() y_test_arr = client.get_artifact_version("y_test").load() acc = accuracy_score(y_test_arr, prod_model.predict(X_test_arr)) print(f"Sanity-check accuracy on stored X_test: {acc:.4f}") ds_artifact = client.get_artifact_version("raw_dataset") print(f"\nraw_dataset auto-extracted metadata:") for k, v in (ds_artifact.run_metadata or {}).items(): print(f" {k}: {v}") print("\n" + "=" * 70 + "\n RE-RUNNING — STEPS SHOULD BE CACHED\n" + "=" * 70) training_pipeline() print(""" Tutorial complete. What just happened: • Custom materializer serialized a domain object + auto-extracted metadata. • Fan-out: 4 candidates trained + evaluated as 8 distinct step runs. • Fan-in: select_best joined them and promoted the winner. • Model Control Plane created a versioned 'breast_cancer_classifier'. • Every artifact, metric, and hyperparameter was logged and queryable. • Second run hit the cache — zero recomputation. Explore further from this same Python session: Client().list_pipeline_runs() Client().list_model_versions(model_name_or_id="breast_cancer_classifier") Client().list_artifact_versions(name="metrics") """) We define the full pipeline, execute it, and inspect the results using the ZenML Client API. We perform a fan-out over multiple configurations, followed by a fan-in step to select the best model. Finally, we demonstrate artifact reuse, metadata inspection, and caching behavior by re-running the pipeline without redundant computation. In conclusion, we constructed a robust, production-style ML pipeline that demonstrates the full power of ZenML’s orchestration capabilities. We observed how custom materializers enrich artifacts with meaningful metadata, how multiple model candidates can be trained and evaluated in parallel, and how the best model is automatically selected and versioned. We also explored how to inspect pipeline runs, retrieve artifacts directly without recomputation, and verify model performance using stored data. Also, we saw caching in action during a re-run, confirming that redundant computations are avoided. This workflow provides a strong foundation for building scalable, maintainable, and reproducible machine learning systems in real-world scenarios. Check out the Full Codes with Notebook here . Also, feel free to follow us on Twitter and don’t forget to join our 130k+ ML SubReddit and Subscribe to our Newsletter . Wait! are you on telegram? now you can join us on telegram as well. Need to partner with us for promoting your GitHub Repo OR Hugging Face Page OR Product Release OR Webinar etc.?  Connect with us The post How to Build an End-to-End Production Grade Machine Learning Pipeline with ZenML, Including Custom Materializers, Metadata Tracking, and Hyperparameter Optimization appeared first on MarkTechPost . ngasih petunjuk tentang apa yang lagi dicari pasar: speed, reliability, dan output yang bisa diukur. Di AI, yang menang bukan yang paling heboh ngomongin capability, tapi yang paling gampang dipakai tim buat nyelesaiin kerjaan nyata.

Research tambahan ngasih konteks yang lebih tajam: Research lookup returned no usable results.. Ini bikin pembacaan awal jadi lebih grounded, bukan cuma bergantung ke judul atau ringkasan feed. Kalau ada detail yang saling nambah, gue pakai itu buat bikin cerita ini lebih utuh dan lebih berguna buat lo.

Advertisement

Di level produk dan operasional, cerita kayak gini biasanya nunjukin satu hal: perusahaan yang lebih cepat belajar bakal punya advantage. Kalau workflow makin otomatis, tim yang masih manual kebanyakan bakal kalah gesit. Kalau distribusi makin ketat, brand yang punya channel kuat bakal lebih unggul. Jadi meskipun judulnya kelihatan khusus, implikasinya sering masuk ke area yang jauh lebih dekat ke keputusan bisnis sehari-hari daripada yang orang kira.

Ada juga layer kompetisi yang sering kelewat. Begitu satu pemain besar bergerak, pemain kecil biasanya punya dua pilihan: ikut naik level atau makin susah relevan. Itu sebabnya gue suka lihat berita bukan sebagai peristiwa tunggal, tapi sebagai bagian dari pola. Siapa yang bergerak duluan? Siapa yang nunggu? Siapa yang bisa mengeksekusi lebih rapi? Dari situ biasanya kebaca apakah sebuah tren masih hype atau udah mulai jadi infrastruktur.

Buat pembaca yang peduli ke hasil praktis, pertanyaan yang paling berguna bukan “apakah ini keren?” tapi “apa yang harus gue ubah setelah baca ini?”. Kalau lo founder, bisa jadi jawabannya ada di positioning, pricing, atau channel distribusi. Kalau lo trader, mungkin yang perlu dipantau adalah sentimen, momentum, dan apakah pasar udah overreact. Kalau lo cuma pengin update cepat, minimal lo jadi ngerti kenapa topik ini muncul dan kenapa orang lain mulai ngomongin sekarang.

Gue juga sengaja ngasih ruang buat konteks yang sedikit lebih tenang, karena berita yang rame sering bikin orang lompat ke kesimpulan terlalu cepat. Tidak semua headline berarti revolusi. Kadang ada yang cuma noise, kadang ada yang benar-benar awal perubahan. Bedanya ada di konsistensi tindak lanjutnya. Kalau dalam beberapa siklus berikutnya topik ini terus muncul, besar kemungkinan kita lagi lihat pergeseran yang serius, bukan sekadar buzz harian.

Jadi kalau lo minta versi pendeknya: How to Build an End-to-End Production Grade Machine Learning Pipeline with ZenML, Including Custom Materializers, Metadata Tracking, and Hyperparameter Optimization penting bukan karena judulnya doang, tapi karena dia nunjukin arah pergerakan yang bisa berdampak ke cara orang bikin produk, baca pasar, dan nyusun strategi. Buat gue, itu inti yang paling worth it untuk dibawa pulang. Sisanya bisa lo simpan sebagai detail, tapi arah besarnya udah cukup jelas: pergeseran ini layak dipantau, bukan di-skip.

AI Updates lagi bergerak cepat, jadi jangan cuma lihat headline.

MarkTechPost

Catatan redaksi

Kalau lo cuma ambil satu hal dari artikel ini

AI Updates update dari MarkTechPost.

Sumber asli

Artikel ini merupakan rewrite editorial dari laporan MarkTechPost.

Baca artikel asli di MarkTechPost
#AIUpdates#MarkTechPost#rss