如果搭建的搜索系统是基于 query 的搜索的,建立 query 表是搭建 search system 的最基础工作,它是后续分词、chunk 分析、意图分类、term weight 等 query 分析工作的铺垫工作。
query 表的目的
建立 query 表主要是为了如下几点:
- 标注正确的分词、chunk 分析、意图分类、term weight 等,同时也可以作为对应模型的训练所用;
- 基于搜索业务建立的 term、短语抱团(chunk),作为分词器的自定义词表,使倒排索引更准确。
建立步骤
这里分享笔者建立 query 的步骤:
- 统计搜索系统一定时间内的 top 热词,实际是根据搜索词的埋点日志,按照出现次数进行排序后,以一定次数作为截断(如只同步出现 5 次及以上的)读取;
- 聚合 query 读取的,还需要进行一些预处理(如 去掉首尾空格、中间空格合并最多一个、英文小写化等等);
- 最后将处理后的 query,先查询是否已写入,若未写入,则插入操作;
- 每天按照如上 3 个步骤进行同步 query 操作,建立 query 同步机制。
代码实现
一般搜索词埋点数据在 hdfs 上,通过 hive 读取,query 表采用 mysql,我们通过 spark 任务每天调度同步任务,具体示例代码如下:
"""
同步 query 任务
"""
import logging
import sys
import traceback
from datetime import datetime, timedelta
from time import sleep
import pymysql
from pyspark.sql import SparkSession
from pyspark.sql.functions import trim, col, regexp_replace, desc, lower
def get_and_save_query(session: SparkSession, start_date: str, end_date: str, min_cnt: int, batch_size=1000, sleep_sec=1.0):
sql = """
SELECT
query,
COUNT(1) AS cnt
FROM (
SELECT
query
FROM xxx.maidian_log
WHERE dt >= '{start_date}'
AND dt <= '{end_date}'
AND action_id = 188
) a
GROUP BY query
HAVING cnt >= {min_cnt}
ORDER BY cnt DESC
""".format(start_date=start_date, end_date=end_date, min_cnt=min_cnt)
logging.info(sql)
df = session.sql(sql)
logging.info('df schema ===> %s', df)
# 去除首尾空格
df = df.withColumn('query', trim(col('query')))
# 中间若有多个空格,则只保留一个空格
df = df.withColumn('query', regexp_replace('query', '\s+', ' '))
# 英文字符小写化
df = df.withColumn('query', lower(col('query')))
df = df.filter('query is not null')
df = df.orderBy(desc('cnt'))
df.cache()
df_count = df.count()
logging.info("df count %s", df_count)
count = 0
for row in df.collect():
if row.query == '':
logging.info('query emtpy')
continue
if len(row.query) > 30:
logging.info('query %s length > 30', row.query)
continue
if 'http' in row.query:
logging.info('query %s contain http', row.query)
continue
if '¥' in row.query:
logging.info('query %s contain ¥', row.query)
continue
if '@' in row.query:
logging.info('query %s contain @', row.query)
continue
conn = connect_db(single_conn)
save_2_mysql(conn, row.query)
count += 1
if count % batch_size == 0:
logging.info('count %s, sleep %s sec', count, sleep_sec)
sleep(sleep_sec)
# noinspection PyBroadException
try:
conn.close()
logging.info('connection close successfully')
except Exception:
traceback.print_exc()
single_conn = None
def connect_db(sc):
if sc is None:
sc = pymysql.connect(
host=xxx.xx.xx.xxx',
port=3306,
user='xxxx',
password='xxx',
database='search',
charset='utf8mb4'
)
return sc
def save_2_mysql(conn, query):
conn.ping(reconnect=True)
cursor = conn.cursor()
# noinspection PyBroadException
try:
cursor.execute('SELECT * FROM tb_query where query = %s', query)
select_result = cursor.fetchone()
if select_result:
logging.info('exists ===> %s', query)
else:
cursor.execute('INSERT INTO tb_query SET query = %s', query)
conn.commit()
logging.info('insert =========> %s', query)
except Exception:
traceback.print_exc()
cursor.close()
if __name__ == '__main__':
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s %(filename)s : %(levelname)s %(message)s',
)
logging.info(sys.argv)
dp = sys.argv[1]
delta_day = 6
min_count = 5
yesterday_dt = datetime.strptime(dp, "%Y-%m-%d")
start_dt_str = (yesterday_dt - timedelta(days=delta_day)).strftime("%Y-%m-%d")
spark_session = SparkSession.builder.appName('sync_query').getOrCreate()
spark_session.sparkContext.setLogLevel('ERROR')
get_and_save_query(spark_session, start_dt_str, dp, min_count, batch_size=1000, sleep_sec=1.0)