数据湖架构设计
目录
概述
数据湖架构演进
传统数据仓库 数据湖1.0 数据湖2.0 (Lakehouse)
┌─────────────┐ ┌─────────────┐ ┌─────────────────────┐
│ Oracle │ │ HDFS │ │ Delta Lake │
│ Teradata │ │ 原始文件存储 │ │ Iceberg │
│ 结构化 │ -> │ Schema-on- │ -> │ Hudi │
│ ETL复杂 │ │ Read │ │ ACID事务 │
│ 扩展困难 │ │ 无事务支持 │ │ 时间旅行 │
└─────────────┘ └─────────────┘ └─────────────────────┘
昂贵 灵活但 最佳实践
刚性 缺乏治理 BI + ML统一Lakehouse架构
┌──────────────────────────────────────────────────────────────┐
│ 应用层 │
├──────────┬──────────┬──────────┬──────────┬─────────────────┤
│ BI工具 │ ML训练 │ 实时分析 │ 数据科学 │ 流式处理 │
│ Tableau │ MLflow │ Presto │ Jupyter │ Flink │
└────┬─────┴────┬─────┴────┬─────┴────┬─────┴─────┬───────────┘
└──────────┴──────────┴──────────┴───────────┘
│
┌─────────────────────────▼─────────────────────────────────────┐
│ 查询引擎层 │
├──────────┬──────────┬──────────┬──────────┬──────────────────┤
│ Spark │ Presto │ Trino │ Hive │ Flink SQL │
└────┬─────┴────┬─────┴────┬─────┴────┬─────┴────┬─────────────┘
└──────────┴──────────┴──────────┴──────────┘
│
┌─────────────────────────▼─────────────────────────────────────┐
│ 表格式层 (Metadata Layer) │
├──────────────────┬────────────────┬───────────────────────────┤
│ Delta Lake │ Apache Iceberg │ Apache Hudi │
│ ┌───────────┐ │ ┌───────────┐ │ ┌───────────┐ │
│ │ Transaction│ │ │ Metadata │ │ │ Timeline │ │
│ │ Log │ │ │ Tree │ │ │ Service │ │
│ └───────────┘ │ └───────────┘ │ └───────────┘ │
└──────┬───────────┴────────┬───────┴──────────┬────────────────┘
│ │ │
┌──────▼────────────────────▼──────────────────▼────────────────┐
│ 存储层 │
├──────────────────┬────────────────┬───────────────────────────┤
│ 对象存储 │ HDFS │ 本地文件系统 │
│ S3/OSS/COS │ Hadoop │ Local FS │
│ ┌────┬────┬───┐│ ┌────┬────┐ │ ┌────┬────┐ │
│ │Par-│ORC │Avro││ │Par-│ORC │ │ │Par-│JSON│ │
│ │quet│ │ ││ │quet│ │ │ │quet│ │ │
│ └────┴────┴───┘│ └────┴────┘ │ └────┴────┘ │
└──────────────────┴────────────────┴───────────────────────────┘Delta Lake详解
Delta Lake架构
┌──────────────────────────────────────────────────────────────┐
│ Delta Lake Architecture │
├──────────────────────────────────────────────────────────────┤
│ │
│ 应用层 │
│ ┌──────────┬──────────┬──────────┬──────────┐ │
│ │ Spark SQL│ PySpark │ Delta DML│ Streaming│ │
│ └────┬─────┴────┬─────┴────┬─────┴────┬─────┘ │
│ └──────────┴──────────┴──────────┘ │
│ │ │
│ ┌───────────────────▼────────────────────────────────┐ │
│ │ Delta Lake Transaction Layer │ │
│ │ ┌──────────────────────────────────────────────┐ │ │
│ │ │ Transaction Log (_delta_log/) │ │ │
│ │ │ ┌─────┬─────┬─────┬─────┬─────┬─────┐ │ │ │
│ │ │ │ 0 │ 1 │ 2 │ ... │ 10 │ ... │ │ │ │
│ │ │ │.json│.json│.json│ │.chk │ │ │ │ │
│ │ │ └─────┴─────┴─────┴─────┴─────┴─────┘ │ │ │
│ │ │ Atomic Commits, Version Control │ │ │
│ │ └──────────────────────────────────────────────┘ │ │
│ │ │ │
│ │ ┌──────────────────────────────────────────────┐ │ │
│ │ │ Optimistic Concurrency Control │ │ │
│ │ │ - Read: Snapshot Isolation │ │ │
│ │ │ - Write: Conflict Detection │ │ │
│ │ └──────────────────────────────────────────────┘ │ │
│ └────────────────────────────────────────────────────┘ │
│ │ │
│ ┌───────────────────▼────────────────────────────────┐ │
│ │ Data Files (Parquet) │ │
│ │ ┌──────┬──────┬──────┬──────┬──────┐ │ │
│ │ │part-0│part-1│part-2│part-3│ ... │ │ │
│ │ │.parq │.parq │.parq │.parq │ │ │ │
│ │ └──────┴──────┴──────┴──────┴──────┘ │ │
│ └────────────────────────────────────────────────────┘ │
└──────────────────────────────────────────────────────────────┘Delta Lake核心特性
1. ACID事务
scala
import org.apache.spark.sql.SparkSession
import io.delta.tables._
val spark = SparkSession.builder()
.appName("DeltaLakeExample")
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
.getOrCreate()
import spark.implicits._
// 创建Delta表
val data = Seq(
(1, "Alice", 1000),
(2, "Bob", 2000),
(3, "Charlie", 3000)
).toDF("id", "name", "amount")
data.write
.format("delta")
.mode("overwrite")
.save("/delta/users")
// ACID更新操作
val deltaTable = DeltaTable.forPath(spark, "/delta/users")
// 1. UPDATE
deltaTable.update(
condition = expr("id = 1"),
set = Map("amount" -> expr("amount + 500"))
)
// 2. DELETE
deltaTable.delete(condition = expr("amount < 1500"))
// 3. MERGE (UPSERT)
val updates = Seq(
(1, "Alice", 1600), // 更新
(4, "David", 4000) // 插入
).toDF("id", "name", "amount")
deltaTable.alias("target")
.merge(
updates.alias("source"),
"target.id = source.id"
)
.whenMatched()
.updateAll()
.whenNotMatched()
.insertAll()
.execute()
// 查询结果
spark.read.format("delta").load("/delta/users").show()
/*
+---+-------+------+
| id| name|amount|
+---+-------+------+
| 1| Alice| 1600|
| 3|Charlie| 3000|
| 4| David| 4000|
+---+-------+------+
*/2. 时间旅行
scala
// 按版本查询
spark.read
.format("delta")
.option("versionAsOf", 0)
.load("/delta/users")
.show()
// 按时间戳查询
spark.read
.format("delta")
.option("timestampAsOf", "2026-02-01 10:00:00")
.load("/delta/users")
.show()
// 查看历史版本
deltaTable.history().show()
/*
+-------+-------------------+------+--------+---------+--------------------+
|version| timestamp|userId|userName|operation| operationMetrics|
+-------+-------------------+------+--------+---------+--------------------+
| 3|2026-02-07 14:30:00| user1| alice| MERGE|{numTargetRowsIns...|
| 2|2026-02-07 14:20:00| user1| alice| DELETE|{numDeletedRows: 1} |
| 1|2026-02-07 14:10:00| user1| alice| UPDATE|{numUpdatedRows: 1} |
| 0|2026-02-07 14:00:00| user1| alice| WRITE|{numFiles: 3} |
+-------+-------------------+------+--------+---------+--------------------+
*/
// 回滚到指定版本
deltaTable.restoreToVersion(1)3. Schema演化
scala
// 自动Schema演化
val newData = Seq(
(5, "Eve", 5000, "[email protected]") // 新增email列
).toDF("id", "name", "amount", "email")
newData.write
.format("delta")
.mode("append")
.option("mergeSchema", "true") // 启用Schema合并
.save("/delta/users")
// 显式Schema演化
deltaTable.toDF
.withColumn("email", lit(null).cast("string"))
.write
.format("delta")
.mode("overwrite")
.option("overwriteSchema", "true")
.save("/delta/users")4. 数据压缩与优化
scala
// OPTIMIZE - 合并小文件
deltaTable.optimize()
.executeCompaction()
// Z-ORDER - 多维度聚类
deltaTable.optimize()
.where("date >= '2026-01-01'")
.executeZOrderBy("city", "category")
// VACUUM - 清理过期文件
deltaTable.vacuum(168) // 保留7天的历史版本
// 查看表详情
spark.sql("DESCRIBE DETAIL delta.`/delta/users`").show()
// 查看表统计信息
spark.sql("DESCRIBE EXTENDED delta.`/delta/users`").show()Delta Lake实战:CDC数据湖
python
from delta import *
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
# 创建Spark会话
spark = SparkSession.builder \
.appName("CDC_DeltaLake") \
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
.getOrCreate()
# 读取CDC流数据
cdc_stream = spark.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "kafka1:9092") \
.option("subscribe", "cdc.orders") \
.option("startingOffsets", "latest") \
.load()
# 解析CDC事件
parsed_stream = cdc_stream \
.selectExpr("CAST(value AS STRING) as json") \
.select(from_json(col("json"), schema).alias("data")) \
.select("data.*")
# CDC处理逻辑
def upsert_to_delta(batch_df, batch_id):
"""
处理CDC变更数据
op: c=create, u=update, d=delete, r=read
"""
# 创建临时视图
batch_df.createOrReplaceTempView("updates")
# 获取Delta表
delta_table = DeltaTable.forPath(spark, "/delta/orders")
# 分别处理INSERT/UPDATE/DELETE
# 1. DELETE操作
delete_df = batch_df.filter(col("op") == "d")
if delete_df.count() > 0:
delta_table.alias("target") \
.merge(
delete_df.alias("source"),
"target.order_id = source.before.order_id"
) \
.whenMatchedDelete() \
.execute()
# 2. INSERT/UPDATE操作
upsert_df = batch_df.filter(col("op").isin("c", "u", "r"))
if upsert_df.count() > 0:
delta_table.alias("target") \
.merge(
upsert_df.select("after.*").alias("source"),
"target.order_id = source.order_id"
) \
.whenMatchedUpdateAll() \
.whenNotMatchedInsertAll() \
.execute()
print(f"Batch {batch_id} processed successfully")
# 启动流式写入
query = parsed_stream.writeStream \
.foreachBatch(upsert_to_delta) \
.outputMode("update") \
.option("checkpointLocation", "/delta/checkpoints/orders") \
.start()
query.awaitTermination()Apache Iceberg详解
Iceberg架构
┌──────────────────────────────────────────────────────────────┐
│ Apache Iceberg Architecture │
├──────────────────────────────────────────────────────────────┤
│ │
│ 查询引擎 │
│ ┌─────────┬─────────┬─────────┬─────────┬─────────┐ │
│ │ Spark │ Flink │ Presto │ Trino │ Hive │ │
│ └────┬────┴────┬────┴────┬────┴────┬────┴────┬────┘ │
│ └─────────┴─────────┴─────────┴─────────┘ │
│ │ │
│ ┌──────────────────────▼──────────────────────────────┐ │
│ │ Iceberg Table API │ │
│ └──────────────────────┬──────────────────────────────┘ │
│ │ │
│ ┌──────────────────────▼──────────────────────────────┐ │
│ │ Metadata Tree │ │
│ │ │ │
│ │ 当前元数据 (metadata.json) │ │
│ │ ┌────────────────────────────────────────────┐ │ │
│ │ │ schema, partition-spec, sort-order │ │ │
│ │ │ current-snapshot-id: 12345 │ │ │
│ │ │ snapshots: [...] │ │ │
│ │ └──────────────────┬─────────────────────────┘ │ │
│ │ │ │ │
│ │ 快照 (Snapshot) ▼ │ │
│ │ ┌────────────────────────────────────────────┐ │ │
│ │ │ snapshot-id: 12345 │ │ │
│ │ │ timestamp: 2026-02-07T14:30:00 │ │ │
│ │ │ manifest-list: s3://bucket/snap-12345.avro│ │ │
│ │ └──────────────────┬─────────────────────────┘ │ │
│ │ │ │ │
│ │ 清单列表 ▼ │ │
│ │ ┌────────────────────────────────────────────┐ │ │
│ │ │ Manifest List (snap-12345.avro) │ │ │
│ │ │ ┌──────────┬──────────┬──────────┐ │ │ │
│ │ │ │manifest-1│manifest-2│manifest-3│ │ │ │
│ │ │ └────┬─────┴────┬─────┴────┬─────┘ │ │ │
│ │ └──────┼──────────┼──────────┼──────────────┘ │ │
│ │ │ │ │ │ │
│ │ 清单文件▼ ▼ ▼ │ │
│ │ ┌───────────────────────────────────────────┐ │ │
│ │ │ Manifest Files (manifest-*.avro) │ │ │
│ │ │ ┌──────┬──────┬──────┬──────┐ │ │ │
│ │ │ │file-1│file-2│file-3│ ... │ │ │ │
│ │ │ │path │path │path │ │ │ │ │
│ │ │ │stats │stats │stats │ │ │ │ │
│ │ │ └──────┴──────┴──────┴──────┘ │ │ │
│ │ └───────────────────────────────────────────┘ │ │
│ └────────────────────┬──────────────────────────────┘ │
│ │ │
│ ┌────────────────────▼──────────────────────────────┐ │
│ │ Data Files (Parquet/ORC/Avro) │ │
│ │ ┌──────┬──────┬──────┬──────┬──────┐ │ │
│ │ │part-1│part-2│part-3│part-4│ ... │ │ │
│ │ └──────┴──────┴──────┴──────┴──────┘ │ │
│ └────────────────────────────────────────────────────┘ │
└──────────────────────────────────────────────────────────────┘Iceberg核心特性
1. 表创建与管理
sql
-- 使用Spark SQL创建Iceberg表
CREATE TABLE orders (
order_id BIGINT,
user_id BIGINT,
product_id BIGINT,
quantity INT,
price DECIMAL(10,2),
order_date DATE,
order_status STRING
)
USING iceberg
PARTITIONED BY (days(order_date))
TBLPROPERTIES (
'write.format.default'='parquet',
'write.parquet.compression-codec'='snappy',
'commit.retry.num-retries'='3'
);
-- 插入数据
INSERT INTO orders VALUES
(1, 101, 201, 2, 99.99, DATE '2026-02-07', 'paid'),
(2, 102, 202, 1, 199.99, DATE '2026-02-07', 'pending');
-- 查询表快照
SELECT * FROM orders.snapshots;
-- 查询表历史
SELECT * FROM orders.history;
-- 查询表文件
SELECT * FROM orders.files;2. 时间旅行与回滚
sql
-- 按快照ID查询
SELECT * FROM orders VERSION AS OF 3821550127947089987;
-- 按时间戳查询
SELECT * FROM orders TIMESTAMP AS OF '2026-02-07 10:00:00';
-- 增量读取
SELECT * FROM orders
WHERE _file_timestamp > TIMESTAMP '2026-02-06 00:00:00';
-- 回滚到指定快照
CALL catalog_name.system.rollback_to_snapshot('orders', 3821550127947089987);
-- 回滚到指定时间
CALL catalog_name.system.rollback_to_timestamp('orders', TIMESTAMP '2026-02-06');3. Schema演化
sql
-- 添加列
ALTER TABLE orders ADD COLUMN email STRING;
-- 删除列
ALTER TABLE orders DROP COLUMN email;
-- 重命名列
ALTER TABLE orders RENAME COLUMN order_status TO status;
-- 修改列类型 (向上兼容)
ALTER TABLE orders ALTER COLUMN quantity TYPE BIGINT;
-- 更新列注释
ALTER TABLE orders ALTER COLUMN price COMMENT '订单价格(单位:元)';4. 分区演化
sql
-- 更改分区策略 (无需重写数据)
ALTER TABLE orders
SET PARTITION SPEC (bucket(16, user_id), days(order_date));
-- 查看分区演化历史
SELECT * FROM orders.partition_specs;5. 表维护
scala
import org.apache.iceberg.spark.actions.SparkActions
val spark: SparkSession = ...
// 1. 合并小文件
SparkActions
.get(spark)
.rewriteDataFiles(
spark.table("orders")
)
.option("target-file-size-bytes", "536870912") // 512MB
.option("min-file-size-bytes", "134217728") // 128MB
.execute()
// 2. 删除过期快照
SparkActions
.get(spark)
.expireSnapshots(
spark.table("orders")
)
.expireOlderThan(System.currentTimeMillis() - 7 * 24 * 60 * 60 * 1000) // 7天
.execute()
// 3. 删除孤儿文件
SparkActions
.get(spark)
.deleteOrphanFiles(
spark.table("orders")
)
.olderThan(System.currentTimeMillis() - 3 * 24 * 60 * 60 * 1000) // 3天
.execute()
// 4. 重写Manifest文件
SparkActions
.get(spark)
.rewriteManifests(
spark.table("orders")
)
.execute()Iceberg实战:多引擎访问
python
# Spark写入
from pyspark.sql import SparkSession
spark = SparkSession.builder \
.config("spark.sql.catalog.my_catalog", "org.apache.iceberg.spark.SparkCatalog") \
.config("spark.sql.catalog.my_catalog.type", "hadoop") \
.config("spark.sql.catalog.my_catalog.warehouse", "s3://bucket/warehouse") \
.getOrCreate()
df = spark.read.parquet("s3://source/orders/")
df.writeTo("my_catalog.db.orders").create()
# Flink读写
from pyflink.table import EnvironmentSettings, TableEnvironment
env_settings = EnvironmentSettings.in_streaming_mode()
table_env = TableEnvironment.create(env_settings)
table_env.execute_sql("""
CREATE CATALOG my_catalog WITH (
'type' = 'iceberg',
'catalog-type' = 'hadoop',
'warehouse' = 's3://bucket/warehouse'
)
""")
table_env.execute_sql("""
INSERT INTO my_catalog.db.orders
SELECT * FROM kafka_source
""")
# Presto查询
# 在Presto配置catalog/iceberg.properties:
# connector.name=iceberg
# iceberg.catalog.type=hadoop
# iceberg.catalog.warehouse=s3://bucket/warehouse
# 然后在Presto中查询:
# SELECT * FROM iceberg.db.orders WHERE order_date = DATE '2026-02-07';Apache Hudi详解
Hudi架构
┌──────────────────────────────────────────────────────────────┐
│ Apache Hudi Architecture │
├──────────────────────────────────────────────────────────────┤
│ │
│ 应用层 │
│ ┌─────────┬─────────┬─────────┬─────────┐ │
│ │ Spark │ Flink │ Presto │ Hive │ │
│ └────┬────┴────┬────┴────┬────┴────┬────┘ │
│ └─────────┴─────────┴─────────┘ │
│ │ │
│ ┌─────────────────▼──────────────────────────────────┐ │
│ │ Hudi Table API │ │
│ │ ┌──────────────────────────────────────────┐ │ │
│ │ │ Table Types │ │ │
│ │ │ ┌────────────┐ ┌───────────────┐ │ │ │
│ │ │ │ Copy-on- │ │ Merge-on- │ │ │ │
│ │ │ │ Write(CoW) │ │ Read (MoR) │ │ │ │
│ │ │ └────────────┘ └───────────────┘ │ │ │
│ │ └──────────────────────────────────────────┘ │ │
│ └─────────────────┬──────────────────────────────────┘ │
│ │ │
│ ┌─────────────────▼──────────────────────────────────┐ │
│ │ Timeline Service │ │
│ │ ┌──────────────────────────────────────────┐ │ │
│ │ │ Timeline (时间轴) │ │ │
│ │ │ ┌────┬────┬────┬────┬────┬────┬────┐ │ │ │
│ │ │ │ C1 │ C2 │ C3 │ C4 │ C5 │ C6 │... │ │ │ │
│ │ │ │.cmt│.cmt│.cmt│.cmt│.cmt│.cmt│ │ │ │ │
│ │ │ └────┴────┴────┴────┴────┴────┴────┘ │ │ │
│ │ │ Instant: 提交时间点 │ │ │
│ │ └──────────────────────────────────────────┘ │ │
│ │ │ │
│ │ ┌──────────────────────────────────────────┐ │ │
│ │ │ Hoodie.properties (表元数据) │ │ │
│ │ │ - Table Type (CoW/MoR) │ │ │
│ │ │ - Record Key, Partition Path │ │ │
│ │ │ - Precombine Field │ │ │
│ │ └──────────────────────────────────────────┘ │ │
│ └─────────────────┬──────────────────────────────────┘ │
│ │ │
│ ┌─────────────────▼──────────────────────────────────┐ │
│ │ Data Files │ │
│ │ │ │
│ │ CoW模式: │ │
│ │ ┌──────────┬──────────┬──────────┐ │ │
│ │ │ Parquet │ Parquet │ Parquet │ │ │
│ │ │ File │ File │ File │ │ │
│ │ └──────────┴──────────┴──────────┘ │ │
│ │ │ │
│ │ MoR模式: │ │
│ │ ┌──────────────────────────────────────────┐ │ │
│ │ │ Base Files (Parquet) │ │ │
│ │ │ ┌──────┬──────┬──────┐ │ │ │
│ │ │ │base-1│base-2│base-3│ │ │ │
│ │ │ └──────┴──────┴──────┘ │ │ │
│ │ │ │ │ │
│ │ │ Log Files (Avro) │ │ │
│ │ │ ┌──────┬──────┬──────┐ │ │ │
│ │ │ │ log-1│ log-2│ log-3│ │ │ │
│ │ │ └──────┴──────┴──────┘ │ │ │
│ │ └──────────────────────────────────────────┘ │ │
│ └────────────────────────────────────────────────────┘ │
└──────────────────────────────────────────────────────────────┘Hudi表类型对比
Copy-on-Write (CoW)
scala
import org.apache.hudi.QuickstartUtils._
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.config.HoodieWriteConfig._
val spark = SparkSession.builder()
.appName("HudiCoWExample")
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.getOrCreate()
import spark.implicits._
// 创建CoW表
val tableName = "hudi_cow_orders"
val basePath = "s3://bucket/hudi/orders"
val inserts = Seq(
(1, "Alice", 100, "2026-02-07"),
(2, "Bob", 200, "2026-02-07"),
(3, "Charlie", 300, "2026-02-07")
).toDF("id", "name", "amount", "date")
inserts.write
.format("hudi")
.option(RECORDKEY_FIELD.key, "id")
.option(PARTITIONPATH_FIELD.key, "date")
.option(PRECOMBINE_FIELD.key, "amount")
.option(TBL_NAME.key, tableName)
.option("hoodie.datasource.write.table.type", "COPY_ON_WRITE")
.mode("overwrite")
.save(basePath)
// 更新数据 (Copy-on-Write: 重写整个Parquet文件)
val updates = Seq(
(1, "Alice", 150, "2026-02-07"), // 更新
(4, "David", 400, "2026-02-07") // 新增
).toDF("id", "name", "amount", "date")
updates.write
.format("hudi")
.option(RECORDKEY_FIELD.key, "id")
.option(PARTITIONPATH_FIELD.key, "date")
.option(PRECOMBINE_FIELD.key, "amount")
.option(TBL_NAME.key, tableName)
.mode("append")
.save(basePath)
// 查询数据 (快速读取)
spark.read.format("hudi").load(basePath).show()
/*
特点:
- 写入慢: 每次更新都重写整个文件
- 读取快: 直接读取Parquet文件
- 适用场景: 读多写少
*/Merge-on-Read (MoR)
scala
// 创建MoR表
val tableNameMoR = "hudi_mor_orders"
val basePathMoR = "s3://bucket/hudi/orders_mor"
inserts.write
.format("hudi")
.option(RECORDKEY_FIELD.key, "id")
.option(PARTITIONPATH_FIELD.key, "date")
.option(PRECOMBINE_FIELD.key, "amount")
.option(TBL_NAME.key, tableNameMoR)
.option("hoodie.datasource.write.table.type", "MERGE_ON_READ")
.option("hoodie.compact.inline", "false") // 禁用自动压缩
.mode("overwrite")
.save(basePathMoR)
// 更新数据 (Merge-on-Read: 追加到Log文件)
updates.write
.format("hudi")
.option(RECORDKEY_FIELD.key, "id")
.option(PARTITIONPATH_FIELD.key, "date")
.option(PRECOMBINE_FIELD.key, "amount")
.option(TBL_NAME.key, tableNameMoR)
.mode("append")
.save(basePathMoR)
// Snapshot查询 (合并Base + Log)
spark.read
.format("hudi")
.load(basePathMoR)
.show()
// Read Optimized查询 (只读Base文件)
spark.read
.format("hudi")
.option("hoodie.datasource.query.type", "read_optimized")
.load(basePathMoR)
.show()
// 增量查询
spark.read
.format("hudi")
.option("hoodie.datasource.query.type", "incremental")
.option("hoodie.datasource.read.begin.instanttime", "20260207000000")
.load(basePathMoR)
.show()
/*
特点:
- 写入快: 追加到Log文件
- 读取慢: 需要合并Base + Log
- 适用场景: 写多读少
*/Hudi表维护
scala
import org.apache.hudi.client.HoodieSparkCompactor
// 1. 压缩 (Compaction) - 合并Log到Base
spark.read.format("hudi")
.option("hoodie.compact.inline", "true")
.option("hoodie.compact.inline.max.delta.commits", "5")
.load(basePathMoR)
// 手动触发压缩
spark.sql(s"""
CALL compact('$tableNameMoR', 'schedule')
""")
// 2. 清理 (Cleaning) - 删除旧版本
spark.sql(s"""
CALL run_clean(table => '$tableNameMoR', retain_commits => 10)
""")
// 3. 聚簇 (Clustering) - 优化数据布局
spark.sql(s"""
CALL run_clustering(table => '$tableNameMoR', order => 'id')
""")
// 4. 归档 (Archiving) - 归档旧时间轴
// 自动归档,超过50次提交后归档
.option("hoodie.keep.min.commits", "30")
.option("hoodie.keep.max.commits", "50")表格式对比
功能对比表
┌────────────────┬──────────────┬──────────────┬──────────────┐
│ 特性 │ Delta Lake │ Iceberg │ Hudi │
├────────────────┼──────────────┼──────────────┼──────────────┤
│ ACID事务 │ ✓ │ ✓ │ ✓ │
│ 时间旅行 │ ✓ │ ✓ │ ✓ │
│ Schema演化 │ ✓ │ ✓ │ ✓ │
│ 分区演化 │ ✗ │ ✓ │ ✗ │
│ 行级更新 │ ✓ │ ✓ │ ✓ │
│ 增量读取 │ ✓ │ ✓ │ ✓ │
│ 多引擎支持 │ 中等 │ 最好 │ 中等 │
│ 流式写入 │ ✓ │ ✓ │ ✓ │
│ 写入性能 │ 好 │ 好 │ 最好 │
│ 读取性能 │ 好 │ 好 │ 中等 │
│ CDC支持 │ 好 │ 中等 │ 最好 │
│ 小文件问题 │ 需优化 │ 需优化 │ 自动处理 │
│ 成熟度 │ 高 │ 中 │ 中 │
│ 社区活跃度 │ 高 │ 高 │ 中 │
└────────────────┴──────────────┴──────────────┴──────────────┘选型建议
Delta Lake 最适合:
✓ Databricks平台用户
✓ 以Spark为主的工作负载
✓ 需要强ACID保证
✓ 批处理为主的场景
Apache Iceberg 最适合:
✓ 多引擎访问需求 (Spark/Flink/Presto/Trino)
✓ 需要分区演化
✓ 大规模数据湖
✓ 开放标准重视度高
Apache Hudi 最适合:
✓ 近实时数据摄入
✓ CDC场景
✓ 需要增量处理
✓ 流批一体架构数据湖vs数据仓库
架构对比
传统数据仓库 (Data Warehouse)
┌─────────────────────────────────────────────────────────┐
│ 应用层: BI工具, 报表 │
├─────────────────────────────────────────────────────────┤
│ 数据集市层 (Data Marts) │
│ ┌──────────┬──────────┬──────────┬──────────┐ │
│ │ 销售 │ 财务 │ HR │ 客服 │ │
│ └──────────┴──────────┴──────────┴──────────┘ │
├─────────────────────────────────────────────────────────┤
│ 数据仓库层 (DW) │
│ - 星型模型/雪花模型 │
│ - 严格Schema │
│ - SQL查询 │
├─────────────────────────────────────────────────────────┤
│ ETL层 │
│ - Extract, Transform, Load │
│ - 数据清洗、转换、加载 │
├─────────────────────────────────────────────────────────┤
│ 数据源 │
│ ┌──────────┬──────────┬──────────┐ │
│ │ OLTP DB │ ERP │ CRM │ │
│ └──────────┴──────────┴──────────┘ │
└─────────────────────────────────────────────────────────┘
缺点:
- 存储和计算耦合,扩展性差
- Schema刚性,难以适应变化
- ETL流程复杂,延迟高
- 成本高昂
数据湖 (Data Lake)
┌─────────────────────────────────────────────────────────┐
│ 应用层 │
│ ┌──────────┬──────────┬──────────┬──────────┐ │
│ │ BI │ 机器学习 │ 数据科学 │ 流式分析│ │
│ └──────────┴──────────┴──────────┴──────────┘ │
├─────────────────────────────────────────────────────────┤
│ 计算引擎层 │
│ ┌──────────┬──────────┬──────────┬──────────┐ │
│ │ Spark │ Presto │ Hive │ Flink │ │
│ └──────────┴──────────┴──────────┴──────────┘ │
├─────────────────────────────────────────────────────────┤
│ 表格式层 (Lakehouse) │
│ ┌──────────┬──────────┬──────────┐ │
│ │ Delta │ Iceberg │ Hudi │ │
│ └──────────┴──────────┴──────────┘ │
├─────────────────────────────────────────────────────────┤
│ 存储层 │
│ ┌────────────────────────────────────────────┐ │
│ │ 对象存储 (S3/OSS/COS/ADLS) │ │
│ │ ┌──────┬──────┬──────┬──────┬──────┐ │ │
│ │ │原始层 │清洗层 │加工层 │应用层 │归档层│ │ │
│ │ │ Raw │Bronze│Silver│ Gold │Archive│ │ │
│ │ └──────┴──────┴──────┴──────┴──────┘ │ │
│ └────────────────────────────────────────────┘ │
├─────────────────────────────────────────────────────────┤
│ 数据源 (ELT) │
│ ┌──────────┬──────────┬──────────┬──────────┐ │
│ │ Database│ API │ 日志 │ IoT │ │
│ └──────────┴──────────┴──────────┴──────────┘ │
└─────────────────────────────────────────────────────────┘
优点:
- 存储计算分离,弹性扩展
- Schema灵活,支持半结构化/非结构化
- 成本低(对象存储)
- 支持多种工作负载选型决策树
开始
│
是否需要实时查询?
┌───────┴───────┐
是 否
│ │
数据量 > 100TB? 是否已有数仓?
┌─────┴─────┐ ┌─────┴─────┐
是 否 是 否
│ │ │ │
数据湖+ 传统 保持 看预算
Lakehouse 数仓 现状 ┌──┴──┐
│ 高 低
│ │ │
选择表格式 数仓 数据湖
┌──┴──┐
│ │
多引擎? Spark为主?
│ │
是 否
│ │
Iceberg Delta实战案例
案例1:构建实时数据湖
python
"""
需求:
1. 实时采集业务数据库变更
2. 存储到数据湖
3. 支持SQL分析
4. 支持时间旅行
"""
from pyspark.sql import SparkSession
from delta import *
# 1. 配置Spark
spark = SparkSession.builder \
.appName("RealtimeDataLake") \
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
.getOrCreate()
# 2. 读取CDC流
cdc_stream = spark.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "kafka:9092") \
.option("subscribe", "mysql.inventory.orders") \
.option("startingOffsets", "latest") \
.load()
# 3. 解析并转换
from pyspark.sql.functions import from_json, col
schema = "order_id LONG, user_id LONG, amount DECIMAL(10,2), status STRING, ts TIMESTAMP"
parsed = cdc_stream \
.selectExpr("CAST(value AS STRING) as json") \
.select(from_json(col("json"), schema).alias("data")) \
.select("data.*") \
.withWatermark("ts", "10 seconds")
# 4. 写入Delta Lake
query = parsed.writeStream \
.format("delta") \
.outputMode("append") \
.option("checkpointLocation", "/tmp/checkpoint/orders") \
.option("mergeSchema", "true") \
.partitionBy("DATE(ts)") \
.start("/datalake/orders")
# 5. 实时查询
spark.sql("""
SELECT
DATE(ts) as date,
status,
COUNT(*) as order_count,
SUM(amount) as total_amount
FROM delta.`/datalake/orders`
WHERE DATE(ts) = CURRENT_DATE()
GROUP BY DATE(ts), status
""").show()
# 6. 时间旅行
spark.read \
.format("delta") \
.option("versionAsOf", 10) \
.load("/datalake/orders") \
.show()案例2:数据湖分层架构
/datalake/
├── bronze/ # 原始层 (Raw Data)
│ ├── orders/ # 原始订单数据
│ ├── users/ # 原始用户数据
│ └── products/ # 原始产品数据
│
├── silver/ # 清洗层 (Cleaned Data)
│ ├── orders/ # 清洗后订单
│ ├── users/ # 清洗后用户
│ └── products/ # 清洗后产品
│
└── gold/ # 应用层 (Business Data)
├── fact_orders/ # 订单事实表
├── dim_users/ # 用户维度表
└── agg_daily/ # 日汇总表python
# Bronze层: 原样存储原始数据
bronze_df.write \
.format("delta") \
.mode("append") \
.save("/datalake/bronze/orders")
# Silver层: 清洗转换
silver_df = spark.read.format("delta").load("/datalake/bronze/orders") \
.dropDuplicates(["order_id"]) \
.filter(col("amount") > 0) \
.withColumn("order_date", to_date(col("created_at")))
silver_df.write \
.format("delta") \
.mode("overwrite") \
.partitionBy("order_date") \
.save("/datalake/silver/orders")
# Gold层: 业务聚合
gold_df = spark.sql("""
SELECT
order_date,
user_id,
COUNT(*) as order_count,
SUM(amount) as total_spent
FROM delta.`/datalake/silver/orders`
GROUP BY order_date, user_id
""")
gold_df.write \
.format("delta") \
.mode("overwrite") \
.save("/datalake/gold/user_daily_stats")总结
数据湖关键技术选型:
表格式选择
- Delta Lake: Databricks生态,成熟稳定
- Iceberg: 多引擎支持最好,分区演化
- Hudi: CDC场景最优,写入性能好
架构设计
- 分层存储: Bronze/Silver/Gold
- 计算存储分离: 弹性伸缩
- 元数据管理: Catalog统一管理
性能优化
- 分区策略: 合理分区,避免小文件
- 数据压缩: Snappy/Gzip/Zstd
- 文件格式: Parquet列式存储
- Z-Order/Clustering: 数据排序
数据治理
- Schema管理: 版本控制,演化策略
- 数据质量: 自动化检查
- 访问控制: 细粒度权限
- 审计日志: 操作追踪
数据湖 vs 数据仓库:
- 数据湖: 灵活、低成本、支持多种工作负载
- 数据仓库: 性能好、SQL优化、BI分析
- Lakehouse: 融合两者优势,统一架构
💬 讨论
使用 GitHub 账号登录后即可参与讨论