
La ingeniería de datos es la práctica de diseñar, construir y mantener sistemas que recopilan, almacenan, procesan y analizan datos a escala. Mientras que los científicos de datos se centran en extraer conocimientos, los ingenieros de datos construyen la infraestructura confiable y escalable que hace posible esos conocimientos.
La ingeniería de datos es la práctica de diseñar, construir y mantener sistemas que recopilan, almacenan, procesan y analizan datos a escala. Mientras que los científicos de datos se centran en extraer conocimientos, los ingenieros de datos construyen la infraestructura confiable y escalable que hace posible esos conocimientos.
En 2026, los ingenieros de datos serán la columna vertebral de las organizaciones basadas en datos. Crean los canales que impulsan los modelos de aprendizaje automático, paneles de inteligencia empresarial, análisis en tiempo real e informes operativos.
┌─────────┐ ┌──────────┐ ┌──────────┐ ┌─────────────┐ ┌─────────┐
│ Data │ │ Data │ │ Data │ │ Data │ │ Data │
│ Sources │──▶│ Ingest │──▶│ Storage │──▶│ Transform │──▶│ Serving │
├─────────┤ ├──────────┤ ├──────────┤ ├─────────────┤ ├─────────┤
│ Apps │ │ Kafka │ │ Data Lake│ │ dbt, Spark │ │ BI │
│ DBs │ │ Kinesis │ │ Warehouse│ │ Airflow │ │ ML │
│ APIs │ │ Airbyte │ │ Lakehouse│ │ Flink │ │ Reverse │
│ Logs │ │ Fivetran │ │ │ │ │ │ ETL │
│ IoT │ │ Pub/Sub │ │ │ │ │ │ API │
└─────────┘ └──────────┘ └──────────┘ └─────────────┘ └─────────┘
Cargue datos en un horario (cada hora, diariamente):
# Airflow DAG — Daily ETL
dag_id: daily_orders_import
schedule: "0 2 * * *" # Daily at 2 AM
tasks:
- extract_from_postgres:
sql: >
COPY orders TO '/tmp/orders_export.csv'
WITH (FORMAT CSV, HEADER true)
WHERE created_at >= CURRENT_DATE - 1
- load_to_s3:
source: /tmp/orders_export.csv
destination: s3://data-lake/raw/orders/{{ ds }}/
- load_to_snowflake:
table: staging.orders
file: s3://data-lake/raw/orders/{{ ds }}/*
warehouse: etl_wh
Procesar datos en tiempo real:
# Kafka producer — Sending order events
from kafka import KafkaProducer
import json
producer = KafkaProducer(
bootstrap_servers=['kafka:9092'],
value_serializer=lambda v: json.dumps(v).encode('utf-8'),
acks='all'
)
def on_order_placed(order):
future = producer.send('orders', order)
future.get(timeout=10) # Wait for acknowledgment
print(f"Order {order['id']} sent")
# Kafka consumer — Processing stream
from kafka import KafkaConsumer
consumer = KafkaConsumer(
'orders',
bootstrap_servers=['kafka:9092'],
group_id='order-processors',
auto_offset_reset='earliest',
enable_auto_commit=False
)
for message in consumer:
order = json.loads(message.value)
process_order(order)
consumer.commit() # Manual commit for reliability
| Herramienta | Tipo | Mejor para |
|---|---|---|
| Apache Kafka | corriente | Transmisión de eventos de alto rendimiento |
| Apache Airbyte | ELT | Código abierto, más de 200 conectores |
| Fivetran | ELT SaaS | Conectores administrados y sin mantenimiento |
| AWS Kinesis | corriente | Ecosistema AWS |
| Google Pub/Sub | corriente | Ecosistema de PCG |
| Centros de eventos de Azure | corriente | Ecosistema azul |
| Puntada | ELT SaaS | Volúmenes de datos simples, pequeños y medianos |
| Debecio | CDC | Captura de datos de cambios de base de datos |
| Sistema | Mejor para | Tipo de consulta | esquema | Escalado |
|---|---|---|---|---|
| Lago de datos (S3, ADLS, GCS) | Datos sin procesar, cualquier formato | Nivel de archivo | Esquema en lectura | infinito |
| Almacén de datos (Snowflake, BigQuery, Redshift) | Análisis estructurado | SQL | Esquema en escritura | Elástico |
| Casa del lago (Ladrillos de datos, Iceberg) | AA + BI | SQL + chispa | Evolución del esquema | Elástico |
| NoSQL (DynamoDB, MongoDB, Cassandra) | Baja latencia, semiestructurada. | Valor-clave, documento | Flexibles | Horizontales |
data_storage_strategy:
# Raw data — never transformed
raw_layer: s3://data-lake/raw/ (Parquet format)
# Cleaned and validated data
staging_layer: s3://data-lake/staging/ (Delta Lake)
# Business-ready, modeled data
curated_layer: snowflake.prod.*
# Real-time operational data
operational: dynamodb-transactions
# ML features
feature_store: feast-feature-store
# Data sharing
analytics: looker-explores
┌─────────────────────────────────────────────┐
│ Data Sources │
│ Postgres │ Shopify │ Stripe │ GA4 │
└──────┬──────────┬──────────┬────────┬───────┘
│ │ │ │
▼ ▼ ▼ ▼
┌─────────────────────────────────────────────┐
│ Ingestion (Fivetran / Airbyte) │
├─────────────────────────────────────────────┤
│ Storage (Snowflake / BigQuery) │
├─────────────────────────────────────────────┤
│ Transformation (dbt) │
├─────────────────────────────────────────────┤
│ BI (Looker / Metabase) │
├─────────────────────────────────────────────┤
│ Reverse ETL (Hightouch / Census) │
└─────────────────────────────────────────────┘
| Aspecto | ETL (tradicional) | ELT (moderno) |
|---|---|---|
| Orden | Extraer → Transformar → Cargar | Extraer → Cargar → Transformar |
| Calcular ubicación | Servidor ETL separado | Almacén de datos/lago |
| esquema | Transformar antes de cargar | Transformar después de cargar |
| Flexibilidad | Transformaciones difíciles de cambiar. | Fácil de iterar |
| Almacenamiento | Sólo estructurado | Datos brutos disponibles |
| Herramientas | Informática, Talend | DBT, chispa, SQL |
dbt es el estándar de la industria para transformaciones basadas en SQL en la pila de datos moderna:
-- models/staging/stg_orders.sql
-- Stage raw orders data
with source as (
select * from {{ source('shopify', 'orders') }}
),
renamed as (
select
id as order_id,
created_at,
customer_id,
total_price::decimal(10,2) as total_amount,
financial_status,
fulfillment_status,
currency
from source
where _fivetran_deleted = false -- Handle soft deletes
)
select * from renamed
-- models/marts/order_summary.sql
-- Business-ready order metrics
with orders as (
select * from {{ ref('stg_orders') }}
),
customer_totals as (
select
customer_id,
count(*) as total_orders,
sum(total_amount) as lifetime_value,
min(created_at) as first_order_date,
max(created_at) as most_recent_order_date,
datediff('month', min(created_at), current_date) as customer_tenure_months
from orders
group by 1
)
select
*,
lifetime_value / nullif(customer_tenure_months, 0) as avg_monthly_value,
case
when total_orders = 1 then 'new'
when total_orders between 2 and 5 then 'regular'
when total_orders > 5 then 'loyal'
end as customer_segment
from customer_totals
# dbt_project.yml
name: ecommerce_analytics
version: 1.0
profile: snowflake
models:
ecommerce_analytics:
staging:
+materialized: view
+schema: staging
marts:
+materialized: table
+schema: analytics
+incremental_strategy: merge
+unique_key: order_id
# schema.yml — Data quality tests
version: 2
models:
- name: stg_orders
columns:
- name: order_id
tests:
- unique
- not_null
- name: total_amount
tests:
- not_null
- dbt_utils.accepted_range:
min_value: 0
max_value: 1000000
- name: financial_status
tests:
- accepted_values:
values:
- pending
- authorized
- paid
- refunded
- cancelled
- name: order_summary
tests:
- dbt_utils.expression_is_true:
expression: "total_orders >= 1"
columns:
- name: customer_id
tests:
- relationships:
to: ref('stg_customers')
field: customer_id
Las herramientas de orquestación gestionan las dependencias y la programación:
# dags/data_pipeline.py
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.snowflake.operators.snowflake import SnowflakeOperator
from datetime import datetime, timedelta
default_args = {
'owner': 'data_engineering',
'depends_on_past': False,
'retries': 2,
'retry_delay': timedelta(minutes=5),
}
with DAG(
'daily_data_pipeline',
default_args=default_args,
description='Main ETL pipeline for analytics',
schedule_interval='0 6 * * *', # 6 AM daily
start_date=datetime(2024, 1, 1),
catchup=False,
tags=['production']
) as dag:
# Task 1: Extract from source databases
extract_orders = PythonOperator(
task_id='extract_orders',
python_callable=extract_from_postgres,
)
# Task 2: Load to staging
load_raw_orders = SnowflakeOperator(
task_id='load_raw_orders',
sql='COPY INTO raw.orders FROM @s3_stage',
)
# Task 3: Transform with dbt
run_dbt_transform = BashOperator(
task_id='run_dbt',
bash_command='cd /app/dbt && dbt run --models marts',
)
# Task 4: Data quality checks
run_tests = BashOperator(
task_id='run_tests',
bash_command='cd /app/dbt && dbt test',
)
# Dependencies
extract_orders >> load_raw_orders >> run_dbt_transform >> run_tests
# Data freshness monitoring
def check_freshness(table: str, max_hours: int):
query = f"""
SELECT MAX(loaded_at) as last_updated
FROM monitoring.table_metadata
WHERE table_name = '{table}'
"""
result = run_query(query)
hours_since_update = (datetime.now() - result['last_updated']).total_seconds() / 3600
if hours_since_update > max_hours:
alert_pagerduty(f"DATA FRESHNESS: {table} is {hours_since_update:.1f}h stale")
| Herramienta | Tipo | Mejor para |
|---|---|---|
| Mirador | SaaS | Grandes empresas, capa de modelado LookML |
| Cuadro | Escritorio/SaaS | Análisis visual, autoservicio |
| Power BI | Ecosistema de Microsoft | Integración con Office 365 |
| Metabase | Código abierto | Análisis simples y autohospedados |
| Superconjunto | Código abierto | Nativo de la nube, Python |
Envíe datos transformados a los sistemas operativos:
# Census reverse ETL configuration
syncs:
- name: customer_segments_to_hubspot
source: snowflake.analytics.customer_segments
destination: hubspot
mapping:
- source_column: email
destination_field: email
- source_column: customer_segment
destination_field: lifecycle_stage
- source_column: lifetime_value
destination_field: hsf_calculated_amount
schedule: every 6 hours
# Feast feature store — Define features
from datetime import timedelta
from feast import FeatureView, Entity, Field
from feast.types import Float32, Int32
customer = Entity(
name='customer_id',
value_type=Int32,
description='Customer identifier'
)
customer_features = FeatureView(
name='customer_features',
entities=['customer_id'],
ttl=timedelta(days=90),
schema=[
Field(name='total_orders', dtype=Int32),
Field(name='lifetime_value', dtype=Float32),
Field(name='avg_order_value', dtype=Float32),
Field(name='days_since_last_order', dtype=Int32),
Field(name='churn_risk_score', dtype=Float32),
],
source=BigQuerySource(
query="SELECT * FROM analytics.customer_features",
),
)
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
spark = SparkSession.builder \
.appName("CustomerAnalytics") \
.config("spark.sql.adaptive.enabled", "true") \
.getOrCreate()
# Read from data lake
df = spark.read.parquet("s3://data-lake/raw/orders/*/*.parquet")
# Transform
aggregated = df \
.filter(col("created_at") >= "2024-01-01") \
.groupBy("customer_id") \
.agg(
count("order_id").alias("total_orders"),
sum("total_amount").alias("lifetime_value"),
avg("total_amount").alias("avg_order_value"),
max("created_at").alias("last_order_date")
) \
.withColumn("segment",
when(col("total_orders") >= 10, "VIP")
.when(col("total_orders") >= 3, "Regular")
.otherwise("New")
)
# Write to warehouse
aggregated.write \
.mode("overwrite") \
.format("delta") \
.save("s3://data-lake/curated/customer_segments/")
| Aspecto | lote | Transmisión |
|---|---|---|
| Latencia | Minutos a horas | Subsegundo a minutos |
| Volumen de datos | Terabytes+ por ejecución | Continuo, ilimitado |
| Procesamiento | Conjuntos de datos finitos | Corrientes infinitas |
| Estado | Sin estado por lote | Con estado (ventanas, uniones) |
| Complejidad | inferior | Mayor (exactamente una vez, marcas de agua) |
| Costo | Más bajo (puede reventar) | Más alto (siempre funcionando) |
| Tolerancia a fallos | Volver a ejecutar el lote fallido | Puntos de control, instantáneas de estado |
| Casos de uso | Informes diarios | Paneles de control en tiempo real, alertas |
# DataHub metadata
entities:
- name: orders
description: Customer order transactions
ownership:
- owner: data-engineering@company.com
type: TECHNICAL_OWNER
- owner: analytics@company.com
type: BUSINESS_OWNER
tags:
- pii: false
- classification: internal
- sla: 99.9% uptime
schema:
- column: email
type: string
description: Customer email
tags: [pii, sensitive]
Realice un seguimiento de cómo fluyen los datos desde la fuente al panel:
orders (Postgres)
└─► stg_orders (dbt view)
└─► order_summary (dbt table)
└─► customer_360 (dbt table)
├─► customer_segments (feature store)
└─► revenue_dashboard (Looker)
| Área de habilidades | Tecnologías |
|---|---|
| Programación | Python, SQL, Java/Scala |
| Orquestación | Flujo de aire, Dagster, Prefecto |
| Procesamiento de flujo | Kafka, Flink, Spark Streaming |
| Almacenamiento de datos | Copo de nieve, BigQuery, desplazamiento al rojo |
| Lagos de datos | T3, Lago Delta, Iceberg |
| Transformación | dbt, chispa |
| Infraestructura | Docker, Kubernetes, Terraform |
| Monitoreo | Prometeo, Grafana, Datadog |
| Calidad de los datos | Grandes expectativas, pruebas dbt. |
| CI/CD | Acciones de GitHub, GitLab CI |
La ingeniería de datos es la base de la organización basada en datos. Principios clave:
Los mejores ingenieros de datos construyen sistemas aburridos: confiables, predecibles y fáciles de mantener. Invierta en monitoreo, pruebas y documentación, y su plataforma de datos escalará con su organización.
Todavía no hay comentarios aprobados. Las respuestas nuevas pueden esperar moderación.