⿊马头条推荐项⽬知识点总结(⼆)
在上述步骤中,我们已经将业务数据和⽤户⾏为数据同步到了推荐系统数据库当中,接下来,我们就要对⽂章数据和⽤户数据进⾏分析,构建⽂章画像和⽤户画像。
本⽂我们主要讲解如何构建⽂章画像。⽂章画像由关键词和主题词组成,我们将每个词的 IDF 权重和 TextRank 权重的乘积作为关键词权重,筛选出权重最⾼的 K 个词作为关键词;将 TextRank 权重最⾼的 K 个词与 TF-IDF 权重最⾼的 K 个词的共现词作为主题词。
⾸先,在 Hive 中创建⽂章数据库 article,⽤于存放离线⽂章画像计算过程的中间表数据和最后的结果数据article_profile.
黑马python入门教程
# 新建article数据库,存放⽂章画像相关的中间表和最后的结果表,
create database if not exists article comment "artcile information" location '/user/hive/warehouse/article.db/';
在开始计算前,先定义⼀个基类,设置spark初始化常⽤的公共部分,⽐如appName,执⾏的核数,启动⽅式等。避免每次都重复写那么多的代码,新建⼀个__init__.py⽂件(⽂章画像所有的⼯程⽂件都放在offline⽂件夹中),添加如下内容
from pyspark import SparkConf
from pyspark.sql import SparkSession
class SparkSessionBase(object):
SPARK_APP_NAME = None
SPARK_URL ="yarn"
SPARK_EXECUTOR_MEMORY ="2g"
SPARK_EXECUTOR_CORES = 2
SPARK_EXECUTOR_INSTANCES = 2
ENABLE_HIVE_SUPPORT = False
matlab求导后画图def _create_spark_session(self):
conf = SparkConf()# 创建spark config对象
config =(
("spark.app.name", self.SPARK_APP_NAME),  # 设置启动的spark的app名称,没有提供,将随机产⽣⼀个名称
("", self.SPARK_EXECUTOR_MEMORY),  # 设置该app启动时占⽤的内存⽤量,默认2g
("spark.master", self.SPARK_URL),  # spark master的地址
("s", self.SPARK_EXECUTOR_CORES),  # 设置spark executor使⽤的CPU核⼼数,默认是1核⼼
("utor.instances", self.SPARK_EXECUTOR_INSTANCES)
)
conf.setAll(config)
# 利⽤config对象,创建spark session
郑州袭击徐州
if self.ENABLE_HIVE_SUPPORT:
return fig(conf=conf).enableHiveSupport().getOrCreate()
else:
return fig(conf=conf).getOrCreate()
⼀、计算⽂章完整信息
为了计算⽂章画像,需要将⽂章信息表(news_article_basic)、⽂章内容表(news_article_content)及频道表(news_channel)进⾏合并,从⽽得到完整的⽂章信息,通常使⽤ Spark SQL 进⾏处理。
1.在hive中新建⼀张表article_data⽤于接收我们合并的完整⽂章信息
# 新建artice_data,存放合并后的内容,其中sentence:⽂章标题+内容+频道名字的合并结果
CREATE TABLE article_data(
article_id BIGINT comment "article_id",
channel_id INT comment "channel_id",
channel_name STRING comment "channel_name",
title STRING comment "title",
content STRING comment "content",
sentence STRING comment "sentence")
COMMENT "toutiao news_channel"
LOCATION '/user/hive/warehouse/article.db/article_data';
2.利⽤spark sql语句操作hive数仓,进⾏表合并⼯作
from offline import SparkSessionBase  # 导⼊基类
class OriginArticleData(SparkSessionBase):  #新建⼀个对象,继承基类
SPARK_APP_NAME ="mergeArticle"
modbustcpip地址
SPARK_URL ="yarn"
ENABLE_HIVE_SUPPORT = True
def __init__(self):
self.spark = self._create_spark_session()
oa = OriginArticleData()# 新建OriginArticleData的对象,这样就可以调⽤这个对象的⽅法spark
oa.spark.sql("use toutiao")# 调⽤的spark使⽤sql语句的形式是spark.sql(这⾥写sql语句),使⽤头条数据库
# 根据⽂章的id,将hive中的前两个⽂章相关的表:news_article_basic中的article_id,channel_id,title和news_article_content中的content合并
# 这⾥采⽤内连接,避免两张表不同步,我们只取两张表都有内容的⽂章
basic_content = oa.spark.sql(
"select a.article_id, a.channel_id, a.title, b.content from news_article_basic a inner join news_article_
content b on a.article_id=b.article_id)
到这⼀步,实现了需要的字段在basic_content中,注意spark sql⽣成的是DataFrame
baisc_content结果如下所⽰
⽬前channel只有id,但是我们后续处理需要的是名字,所以还要通过这个表left join news_channel那张表得到channel_name
# Spark DataFrame提供了registerTempTable这样的接⼝,可以将DataFrame注册成临时表,便于后续的各种查询操作等。如select, join等。
isterTempTable("temparticle")
channel_basic_content = oa.spark.sql("select t.*, n.channel_name from temparticle t left join news_channel n on t.channel_id=n.channel_id")
channel_basic_content 结果如下所⽰
3.合并相关字段内容⾄⼀个字段中
利⽤ concat_ws() ⽅法,将 channel_name, title, content 这 3 列数据合并为⼀列 sentence,并将结果写⼊⽂章完整信息表 article_data 中
# 利⽤concat_ws⽅法,将多列数据合并为⼀个长⽂本内容(频道,标题以及内容合并)
import pyspark.sql.functions as F
import gc
oa.spark.sql("use article")# 切换⾄hive数仓的article数据库(这个数据库我们之前创建⽤于存放⽂章画像的结果表和中间表)
# spark sql的DaraFrame查看数据的语法:df.select()
# pyspark.sql.functions有个函数concat_ws,可以将多个列合并为⼀列,第⼀个参数:指定列合并后之间的分隔符,后⾯就是要合并的列
# alias 给concat_ws合并后的新列起个名字
sentence_df = channel_basic_content.select("article_id", "channel_id", "channel_name", "title", "content", \
",",
channel_basic_content.channel_name,
channel_basic_content.title,
channel_t
).alias("sentence")
)
sentence_df.write.insertInto("article_data")
sentence_df 结果如下所⽰,⽂章完整信息包括 article_id, channel_id, channel_name, title, content, sentence,其中 sentence 为channel_name, title, content 合并⽽成的长⽂本内容
⼆、计算tf-idf
前⾯我们得到了⽂章的完整内容信息,接下来,我们要先对⽂章进⾏分词,然后计算每个词的 TF-IDF 权重,将 TF-IDF 权重最⾼的 K 个词作为⽂章的关键词。⾸先,先回顾⼀下tf-idf⾹⽠内容
------------------------------------------回顾TF-IDF-------------------------------------------
TF-IDF(Term Frequency-Inverse Document Frequency, 词频-逆⽂件频率)是⼀种统计⽅法,⽤以评估⼀个词语对于⼀个⽂件集或⼀个语料库中的⼀份⽂件的重要程度,其原理可概括为:
⼀个词语在⼀篇⽂章中出现次数越多,同时在所有⽂档中出现次数越少,越能够代表该⽂章
计算公式:TF-IDF = TF * IDF,其中:
vuejavascript框架词频(TF):⼀个词或短句在该⽂档中出现的次数
逆⽂档频率(IDF):总⽂档数除以出现该词的⽂档数,结果取log值
----------------------------------------回顾结束------------------------------------------------
1. 读取⽂章信息
# (1)新建对象
from offline import SparkSessionBase
class KeywordsToTfidf(SparkSessionBase):  # 新建对象,进⾏tfidf计算的对象
SPARK_APP_NAME ="keywordsByTFIDF"
SPARK_EXECUTOR_MEMORY ="7g"
ENABLE_HIVE_SUPPORT = True
def __init__(self):
rubywoo是几号self.spark = self._create_spark_session()
# (2)
ktt = KeywordsToTfidf()
#(3)spark sql读取hive数据
ktt.spark.sql("use article")# 使⽤artice数据库
article_dataframe = ktt.spark.sql("select * from article_data limit 20")
2.处理数据之分词
利⽤ mapPartitions() ⽅法,对每篇⽂章进⾏分词,这⾥使⽤的是 jieba 分词器
# spark.sql(‘select xxx’)读取出来的是DataFrame,使⽤⾃定义函数不太⽅便,流程较多,⼀般都是通过转换为rdd,处理结束后,再转为DataFrame # mapPartitions()内传了⼀个函数,这个函数的作⽤是——
words_df = article_dataframe.rdd.mapPartitions(segmentation).toDF(["article_id","channel_id", "words"])
# 第⼀步:创建分词函数segmentation
def segmentation(partition):
import os
import re
import jieba
import jieba.analyse
import jieba.posseg as pseg
import codecs
abspath ="/root/words"
# 第⼆步:结巴加载⽤户词典
userDict_path = os.path.join(abspath, "")
jieba.load_userdict(userDict_path)# 加载⾃定义词典
# 第三步:停⽤词⽂本
# ⼀些没有意义的词,例如我们,这个,边,这时,有点等,这些词要从我们最后的结果中去除,然后再计算tfidf
stopwords_path = os.path.join(abspath, "")#给出停⽤词⽂本位置
def get_stopwords_list():    # 定义⼀个停⽤词函数,将中的所有停⽤词来,以list形式返回
"""返回stopwords列表"""
# 读取⽂件:codecs.open(filepath,method,encoding)
# filepath:⽂件路径
# method:打开⽅式,r为读,w为写,rw为读写
# encoding:⽂件的编码,中⽂⽂件使⽤utf-8
# readlines:⼀次性读取整个⽂件,⾃动将⽂件内容分析成⼀个列表
# strip():⽤于移除字符串头尾指定的字符(默认为空格或换⾏符
stopwords_list =[i.strip()for i in codecs.open(stopwords_path).readlines()]
return stopwords_list
# 第五步:所有的停⽤词列表
stopwords_list = get_stopwords_list()# 调⽤函数,得到停⽤词列表
# 分词
def cut_sentence(sentence):
def cut_sentence(sentence):
"""对切割之后的词语进⾏过滤,去除停⽤词,保留名词,英⽂和⾃定义词库中的词,长度⼤于2的词"""
# print(sentence,"*"*100)
# jieba.lcut和pseg.lcut的区别在于,后者可以给出词性
# eg:[pair('今天', 't'), pair('有', 'd'), pair('雾', 'n'), pair('霾', 'g')]
seg_list = pseg.lcut(sentence)
seg_list =[i for i in seg_list if i.flag not in stopwords_list]# 去除停⽤词列表中的词
filtered_words_list =[]# ⾃定义⼀些条件,再去除⼀些词
for seg in seg_list:
# print(seg)
if len(seg.word)<= 1:  # 去除长度⼩于等于1 的词
continue
elif seg.flag =="eng":# 对于词性是eng,分情况
if len(seg.word)<= 2:  # 词长度⼩于等于2,去除
continue
else:
filtered_words_list.append(seg.word)# 词长度⼤于2,保留
elif seg.flag.startswith("n"):  # 词性是以n开头的词性,保留
filtered_words_list.append(seg.word)
elif seg.flag in["x", "eng"]:  # 是⾃定⼀个词语或者是英⽂单词
filtered_words_list.append(seg.word)
return filtered_words_list  # 返回这篇⽂章的sentence经过处理后list
# 第六步:
for row in partition:  # 利⽤for in取出的每⼀个row,实际就是每篇⽂章的相关信息
# python中,通过内嵌集成re模块,实现正则匹配
# re.sub⽤于替换字符串中的匹配项,
sentence = re.sub("<.*?>", "", row.sentence)# 替换掉每⼀⾏数据中的sentence中的标签数据
words = cut_sentence(sentence)#每篇⽂章的sentence列调⽤cut_sentence函数,得到处理后的分词列表
yield row.article_id, row.channel_id, words
words_df 结果如下所⽰,words 为将 sentence 分词后的单词列表
利⽤分词结果对词频统计模型(CountVectorizer)进⾏词频统计训练,并将 CountVectorizer 模型保存到 HDFS 中
# 词语与词频统计
from pyspark.ml.feature import CountVectorizer
# 统计所有⽂章不同的词,组成⼀个word_list=[java,python,javascript,传统……],按照词频从⾼到低排序
# vocabSize则限制了这个word_list是否完整,如果设置⼩于实际词个数,则排序较低的词不会出现,也就不会被统计,如果不希望丢词,那这个值设⼤⼀些
# min_DF也是词是否出现的限制,如果某个词出现的⽂档次数⼩于设置的阈值,也不会出现在word_list
# CountVectorizer会将该篇⽂本中的词语转换为词频矩阵,它通过fit_transform函数计算各个词语出现的次数
# inputCol:传⼊的数据中的word列,outputCol:⾃⼰定义⼀个名字,vocabSize:
# ①构建词频统计模型
cv =CountVectorizer(inputCol="words", outputCol="countFeatures", vocabSize=200*10000, minDF=1.0)
# ②训练词频统计模型
cv_model = cv.fit(words_df)
cv_model.write().overwrite().save("hdfs://hadoop-master:9000/headlines/del")# 这个模型得到的实际是就是⼀个词库,所有⽂档中出现的不重复的词库,可以通过vocabulary⽅法看⼀下结果
cv_model.vocabulary