
Unter Data Engineering versteht man die Praxis des Entwerfens, Erstellens und Wartens von Systemen, die Daten in großem Maßstab sammeln, speichern, verarbeiten und analysieren. Während sich Datenwissenschaftler auf die Gewinnung von Erkenntnissen konzentrieren, bauen Dateningenieure die zuverlässige, skalierbare Infrastruktur auf, die diese Erkenntnisse ermöglicht.
Unter Data Engineering versteht man die Praxis des Entwerfens, Erstellens und Wartens von Systemen, die Daten in großem Maßstab sammeln, speichern, verarbeiten und analysieren. Während sich Datenwissenschaftler auf die Gewinnung von Erkenntnissen konzentrieren, bauen Dateningenieure die zuverlässige, skalierbare Infrastruktur auf, die diese Erkenntnisse ermöglicht.
Im Jahr 2026 sind Dateningenieure das Rückgrat datengesteuerter Organisationen. Sie bauen die Pipelines auf, die Modelle für maschinelles Lernen, Business-Intelligence-Dashboards, Echtzeitanalysen und Betriebsberichte unterstützen.
┌─────────┐ ┌──────────┐ ┌──────────┐ ┌─────────────┐ ┌─────────┐
│ Data │ │ Data │ │ Data │ │ Data │ │ Data │
│ Sources │──▶│ Ingest │──▶│ Storage │──▶│ Transform │──▶│ Serving │
├─────────┤ ├──────────┤ ├──────────┤ ├─────────────┤ ├─────────┤
│ Apps │ │ Kafka │ │ Data Lake│ │ dbt, Spark │ │ BI │
│ DBs │ │ Kinesis │ │ Warehouse│ │ Airflow │ │ ML │
│ APIs │ │ Airbyte │ │ Lakehouse│ │ Flink │ │ Reverse │
│ Logs │ │ Fivetran │ │ │ │ │ │ ETL │
│ IoT │ │ Pub/Sub │ │ │ │ │ │ API │
└─────────┘ └──────────┘ └──────────┘ └─────────────┘ └─────────┘
Laden Sie Daten nach einem Zeitplan (stündlich, täglich):
# Airflow DAG — Daily ETL
dag_id: daily_orders_import
schedule: "0 2 * * *" # Daily at 2 AM
tasks:
- extract_from_postgres:
sql: >
COPY orders TO '/tmp/orders_export.csv'
WITH (FORMAT CSV, HEADER true)
WHERE created_at >= CURRENT_DATE - 1
- load_to_s3:
source: /tmp/orders_export.csv
destination: s3://data-lake/raw/orders/{{ ds }}/
- load_to_snowflake:
table: staging.orders
file: s3://data-lake/raw/orders/{{ ds }}/*
warehouse: etl_wh
Prozessdaten in Echtzeit:
# Kafka producer — Sending order events
from kafka import KafkaProducer
import json
producer = KafkaProducer(
bootstrap_servers=['kafka:9092'],
value_serializer=lambda v: json.dumps(v).encode('utf-8'),
acks='all'
)
def on_order_placed(order):
future = producer.send('orders', order)
future.get(timeout=10) # Wait for acknowledgment
print(f"Order {order['id']} sent")
# Kafka consumer — Processing stream
from kafka import KafkaConsumer
consumer = KafkaConsumer(
'orders',
bootstrap_servers=['kafka:9092'],
group_id='order-processors',
auto_offset_reset='earliest',
enable_auto_commit=False
)
for message in consumer:
order = json.loads(message.value)
process_order(order)
consumer.commit() # Manual commit for reliability
| Werkzeug | Typ | Am besten für |
|---|---|---|
| Apache Kafka | Streamen | Ereignis-Streaming mit hohem Durchsatz |
| Apache Airbyte | ELT | Open Source, über 200 Konnektoren |
| Fivetran | SaaS ELT | Verwaltete, wartungsfreie Steckverbinder |
| AWS Kinesis | Streamen | AWS-Ökosystem |
| Google Pub/Sub | Streamen | GCP-Ökosystem |
| Azure Event Hubs | Streamen | Azure-Ökosystem |
| Stich | SaaS ELT | Einfache, kleine bis mittlere Datenmengen |
| Debezium | CDC | Erfassung von Datenbankänderungsdaten |
| System | Am besten für | Abfragetyp | Schema | Skalierung |
|---|---|---|---|---|
| Data Lake (S3, ADLS, GCS) | Rohdaten, jedes Format | Dateiebene | Schema beim Lesen | Unendlich |
| Data Warehouse (Snowflake, BigQuery, Redshift) | Strukturierte Analyse | SQL | Schema-on-Write | Elastisch |
| Lakehouse (Databricks, Iceberg) | ML + BI | SQL + Spark | Schemaentwicklung | Elastisch |
| NoSQL (DynamoDB, MongoDB, Cassandra) | Niedrige Latenz, halbstrukturiert | Schlüsselwert, Dokument | Flexibel | Horizontal |
data_storage_strategy:
# Raw data — never transformed
raw_layer: s3://data-lake/raw/ (Parquet format)
# Cleaned and validated data
staging_layer: s3://data-lake/staging/ (Delta Lake)
# Business-ready, modeled data
curated_layer: snowflake.prod.*
# Real-time operational data
operational: dynamodb-transactions
# ML features
feature_store: feast-feature-store
# Data sharing
analytics: looker-explores
┌─────────────────────────────────────────────┐
│ Data Sources │
│ Postgres │ Shopify │ Stripe │ GA4 │
└──────┬──────────┬──────────┬────────┬───────┘
│ │ │ │
▼ ▼ ▼ ▼
┌─────────────────────────────────────────────┐
│ Ingestion (Fivetran / Airbyte) │
├─────────────────────────────────────────────┤
│ Storage (Snowflake / BigQuery) │
├─────────────────────────────────────────────┤
│ Transformation (dbt) │
├─────────────────────────────────────────────┤
│ BI (Looker / Metabase) │
├─────────────────────────────────────────────┤
│ Reverse ETL (Hightouch / Census) │
└─────────────────────────────────────────────┘
| Aspekt | ETL (traditionell) | ELT (Modern) |
|---|---|---|
| Bestellen | Extrahieren → Transformieren → Laden | Extrahieren → Laden → Transformieren |
| Standort berechnen | Separater ETL-Server | Data Warehouse/See |
| Schema | Vor dem Laden transformieren | Nach dem Laden transformieren |
| Flexibilität | Schwer zu ändernde Transformationen | Einfach zu iterieren |
| Lagerung | Nur strukturiert | Rohdaten verfügbar |
| Werkzeuge | Informatica, Talend | dbt, Spark, SQL |
dbt ist der Industriestandard für SQL-basierte Transformationen im modernen Datenstapel:
-- models/staging/stg_orders.sql
-- Stage raw orders data
with source as (
select * from {{ source('shopify', 'orders') }}
),
renamed as (
select
id as order_id,
created_at,
customer_id,
total_price::decimal(10,2) as total_amount,
financial_status,
fulfillment_status,
currency
from source
where _fivetran_deleted = false -- Handle soft deletes
)
select * from renamed
-- models/marts/order_summary.sql
-- Business-ready order metrics
with orders as (
select * from {{ ref('stg_orders') }}
),
customer_totals as (
select
customer_id,
count(*) as total_orders,
sum(total_amount) as lifetime_value,
min(created_at) as first_order_date,
max(created_at) as most_recent_order_date,
datediff('month', min(created_at), current_date) as customer_tenure_months
from orders
group by 1
)
select
*,
lifetime_value / nullif(customer_tenure_months, 0) as avg_monthly_value,
case
when total_orders = 1 then 'new'
when total_orders between 2 and 5 then 'regular'
when total_orders > 5 then 'loyal'
end as customer_segment
from customer_totals
# dbt_project.yml
name: ecommerce_analytics
version: 1.0
profile: snowflake
models:
ecommerce_analytics:
staging:
+materialized: view
+schema: staging
marts:
+materialized: table
+schema: analytics
+incremental_strategy: merge
+unique_key: order_id
# schema.yml — Data quality tests
version: 2
models:
- name: stg_orders
columns:
- name: order_id
tests:
- unique
- not_null
- name: total_amount
tests:
- not_null
- dbt_utils.accepted_range:
min_value: 0
max_value: 1000000
- name: financial_status
tests:
- accepted_values:
values:
- pending
- authorized
- paid
- refunded
- cancelled
- name: order_summary
tests:
- dbt_utils.expression_is_true:
expression: "total_orders >= 1"
columns:
- name: customer_id
tests:
- relationships:
to: ref('stg_customers')
field: customer_id
Orchestrierungstools verwalten Abhängigkeiten und Planung:
# dags/data_pipeline.py
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.snowflake.operators.snowflake import SnowflakeOperator
from datetime import datetime, timedelta
default_args = {
'owner': 'data_engineering',
'depends_on_past': False,
'retries': 2,
'retry_delay': timedelta(minutes=5),
}
with DAG(
'daily_data_pipeline',
default_args=default_args,
description='Main ETL pipeline for analytics',
schedule_interval='0 6 * * *', # 6 AM daily
start_date=datetime(2024, 1, 1),
catchup=False,
tags=['production']
) as dag:
# Task 1: Extract from source databases
extract_orders = PythonOperator(
task_id='extract_orders',
python_callable=extract_from_postgres,
)
# Task 2: Load to staging
load_raw_orders = SnowflakeOperator(
task_id='load_raw_orders',
sql='COPY INTO raw.orders FROM @s3_stage',
)
# Task 3: Transform with dbt
run_dbt_transform = BashOperator(
task_id='run_dbt',
bash_command='cd /app/dbt && dbt run --models marts',
)
# Task 4: Data quality checks
run_tests = BashOperator(
task_id='run_tests',
bash_command='cd /app/dbt && dbt test',
)
# Dependencies
extract_orders >> load_raw_orders >> run_dbt_transform >> run_tests
# Data freshness monitoring
def check_freshness(table: str, max_hours: int):
query = f"""
SELECT MAX(loaded_at) as last_updated
FROM monitoring.table_metadata
WHERE table_name = '{table}'
"""
result = run_query(query)
hours_since_update = (datetime.now() - result['last_updated']).total_seconds() / 3600
if hours_since_update > max_hours:
alert_pagerduty(f"DATA FRESHNESS: {table} is {hours_since_update:.1f}h stale")
| Werkzeug | Typ | Am besten für |
|---|---|---|
| Hingucker | SaaS | Große Unternehmen, LookML-Modellierungsschicht |
| Tableau | Desktop/SaaS | Visuelle Analyse, Self-Service |
| Power BI | Microsoft-Ökosystem | Office 365-Integration |
| Metabasis | Open Source | Selbstgehostete, einfache Analyse |
| Obermenge | Open Source | Cloud-nativ, Python |
Senden Sie transformierte Daten zurück an Betriebssysteme:
# Census reverse ETL configuration
syncs:
- name: customer_segments_to_hubspot
source: snowflake.analytics.customer_segments
destination: hubspot
mapping:
- source_column: email
destination_field: email
- source_column: customer_segment
destination_field: lifecycle_stage
- source_column: lifetime_value
destination_field: hsf_calculated_amount
schedule: every 6 hours
# Feast feature store — Define features
from datetime import timedelta
from feast import FeatureView, Entity, Field
from feast.types import Float32, Int32
customer = Entity(
name='customer_id',
value_type=Int32,
description='Customer identifier'
)
customer_features = FeatureView(
name='customer_features',
entities=['customer_id'],
ttl=timedelta(days=90),
schema=[
Field(name='total_orders', dtype=Int32),
Field(name='lifetime_value', dtype=Float32),
Field(name='avg_order_value', dtype=Float32),
Field(name='days_since_last_order', dtype=Int32),
Field(name='churn_risk_score', dtype=Float32),
],
source=BigQuerySource(
query="SELECT * FROM analytics.customer_features",
),
)
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
spark = SparkSession.builder \
.appName("CustomerAnalytics") \
.config("spark.sql.adaptive.enabled", "true") \
.getOrCreate()
# Read from data lake
df = spark.read.parquet("s3://data-lake/raw/orders/*/*.parquet")
# Transform
aggregated = df \
.filter(col("created_at") >= "2024-01-01") \
.groupBy("customer_id") \
.agg(
count("order_id").alias("total_orders"),
sum("total_amount").alias("lifetime_value"),
avg("total_amount").alias("avg_order_value"),
max("created_at").alias("last_order_date")
) \
.withColumn("segment",
when(col("total_orders") >= 10, "VIP")
.when(col("total_orders") >= 3, "Regular")
.otherwise("New")
)
# Write to warehouse
aggregated.write \
.mode("overwrite") \
.format("delta") \
.save("s3://data-lake/curated/customer_segments/")
| Aspekt | Charge | Streaming |
|---|---|---|
| Latenz | Minuten bis Stunden | Subsekunde bis Minuten |
| Datenvolumen | Terabyte+ pro Lauf | Kontinuierlich, unbegrenzt |
| Bearbeitung | Endliche Datensätze | Unendliche Ströme |
| Staat | Zustandslos pro Charge | Zustandsbehaftet (Windowing, Joins) |
| Komplexität | Niedriger | Höher (genau einmal, Wasserzeichen) |
| Kosten | Niedriger (kann platzen) | Höher (immer laufend) |
| Fehlertoleranz | Führen Sie den fehlgeschlagenen Stapel erneut aus | Checkpointing, Zustandsschnappschüsse |
| Anwendungsfälle | Tägliche Berichterstattung | Echtzeit-Dashboards, Warnungen |
# DataHub metadata
entities:
- name: orders
description: Customer order transactions
ownership:
- owner: data-engineering@company.com
type: TECHNICAL_OWNER
- owner: analytics@company.com
type: BUSINESS_OWNER
tags:
- pii: false
- classification: internal
- sla: 99.9% uptime
schema:
- column: email
type: string
description: Customer email
tags: [pii, sensitive]
Verfolgen Sie, wie Daten von der Quelle zum Dashboard fließen:
orders (Postgres)
└─► stg_orders (dbt view)
└─► order_summary (dbt table)
└─► customer_360 (dbt table)
├─► customer_segments (feature store)
└─► revenue_dashboard (Looker)
| Kompetenzbereich | Technologien |
|---|---|
| Programmierung | Python, SQL, Java/Scala |
| Orchestrierung | Airflow, Dagster, Präfekt |
| Stream-Verarbeitung | Kafka, Flink, Spark-Streaming |
| Data Warehousing | Snowflake, BigQuery, Redshift |
| Datenseen | S3, Deltasee, Eisberg |
| Verwandlung | dbt, Spark |
| Infrastruktur | Docker, Kubernetes, Terraform |
| Überwachung | Prometheus, Grafana, Datadog |
| Datenqualität | Große Erwartungen, DBT-Tests |
| CI/CD | GitHub-Aktionen, GitLab CI |
Data Engineering ist die Grundlage der datengesteuerten Organisation. Grundprinzipien:
Die besten Dateningenieure bauen Systeme, die langweilig sind – zuverlässig, vorhersehbar und einfach zu warten. Investieren Sie in Überwachung, Tests und Dokumentation, und Ihre Datenplattform passt sich der Größe Ihres Unternehmens an.
Noch keine freigegebenen Kommentare sichtbar. Neue Antworten können moderiert werden.