使用 python 的 spark 模块,即 pyspark,通过 spark sql 从 hive 或 mysql 等存储层读取相关数据,对应 spark 的 dataframe 对象,经过一系列的数据变化操作后(若需要),将最终数据通过 pipeline,并利用 spark 的分布式形式分批写入 redis。
推荐解决方式
假设 dataframe 数据中 id 为其中一列,其余为对应的统计数据列,以 hash 方式写入每行数据,基于 pyspark 封装了如下函数:
from pyspark.sql import DataFrame
from redis import Redis
from time import sleep
def df_2_redis(df: DataFrame, num_partitions=2, batch_size=500, sleep_secs=0.2):
"""
将 spark sql 的 DataFrame 数据分布批量写入 Redis
``df`` 要写入的 spark DataFrame 对象数据。
``num_partitions`` int 类型,指定数据的分区数,默认为 2。
``batch_size`` int 类型,指定每个 worker 批量执行的数据条数,默认为 500。
``sleep_secs`` float 类型,指定每个 worker 批量执行数据后的睡眠时间,单位为秒,默认为 0.2。
"""
count = df.count()
if count == 0:
print('df count 0')
return
col_names = df.columns
def _save_2_redis(ite):
redis_conn = Redis(
host='xxxx.xxxxx.xxx',
port=6379,
password='xxxxxxxx'
)
pipe = redis_conn.pipeline(transaction=False)
# 7 天有效期
ex = 60 * 60 * 24 * 7
idx = 1
for i, row in enumerate(ite, start=1):
idx = i
redis_key = 'id_' + str(row.id)
fv = {}
for x in col_names:
if x == 'id':
continue
val = getattr(row, x, None)
if val is not None:
fv[x] = val
pipe.hmset(redis_key, fv)
pipe.expire(redis_key, ex)
if i % batch_size == 0:
pipe.execute()
# 批量提交后,睡眠指定时间,控制写入频率
sleep(sleep_secs)
if idx % batch_size != 0:
# 最后不够 batch_size 的数据批量提交
pipe.execute()
# 分布批量写入
df.repartition(num_partitions).foreachPartition(_save_2_redis)