Spark练习json文件-统计问答数据
目录
题目
准备数据
分析数据
实现数据
总结
题目
- 计算不同分类的问题数量
- 统计问题中的热搜词,并获取top10的热搜词
准备数据
将数据上传到hdfs上
分析数据
读取数据
from pyspark import SparkContext import json import jiebasc = SparkContext()# 读取hdfs数据 rdd = sc.textFile('hdfs://node1:8020/data/baike_qa_valid.json')
对每行的json字符串转换为字典
# 对每行的json字符串转为字典 rdd_dict = rdd.map(lambda x:json.loads(x)) print(rdd_dict.take(1))
第一问:计算不同分类的问题数量
# 计算不同分类的问题数量 rdd_kv = rdd_dict.map(lambda x:(x['category'],1)).reduceByKey(lambda x,y:x+y)res3 = rdd_kv.collect() print(res3)
第二问:统计问题中的热搜词,并获取top10的热搜词
1-对title中的数据分词
# # 1-对title中的数据分词 rdd_cut = rdd_dict.map(lambda x:list(jieba.cut(x['title']))) print(rdd_cut.take(10))
2-将这些数据合并成一个单一的序列
# # 2-将这些数据合并成一个单一的序列 rdd_flatmap = rdd_cut.flatMap(lambda x:x) print(rdd_flatmap.take(20))
3-只保留长度大于1的单词
# # # 3-筛选条件,只保留长度大于1的单词 rdd_filter = rdd_flatmap.filter(lambda x:len(x)>1) print(rdd_filter.take(10))
4-将每个单词转换成键值对
# # 4-将每个单词转换成键值对 rdd_map = rdd_filter.map(lambda x:(x,1)) print(rdd_map.take(10))
5-对键值对进行聚合
# # 5-对键值对进行聚合 rdd_reduce = rdd_map.reduceByKey(lambda x,y:x+y) print(rdd_reduce.take(10))
6-对最后的数据进行排名,取出top10
# # 6-对数据进行排序 rdd_sort = rdd_reduce.sortBy(lambda x:x[1],ascending=False) print(rdd_sort.take(10))
实现数据
第一种:一步一步分开写
from pyspark import SparkContext
import json
import jiebasc = SparkContext()# 读取hdfs数据
rdd = sc.textFile('hdfs://node1:8020/data/baike_qa_valid.json')# 对每行的json字符串转为字典
rdd_dict = rdd.map(lambda x:json.loads(x))# 计算不同分类的问题数量
rdd_kv = rdd_dict.map(lambda x:(x['category'],1)).reduceByKey(lambda x,y:x+y)# # 对title中的数据分词
# # 1-对title中的数据分词
rdd_cut = rdd_dict.map(lambda x:list(jieba.cut(x['title'])))
# print(rdd_cut.take(10))
# # 2-将这些数据合并成一个单一的序列
rdd_flatmap = rdd_cut.flatMap(lambda x:x)
# print(rdd_flatmap.take(20))
# # # 3-筛选条件,只保留长度大于1的单词
rdd_filter = rdd_flatmap.filter(lambda x:len(x)>1)
# print(rdd_filter.take(10))
# # 4-将每个单词转换成键值对
rdd_map = rdd_filter.map(lambda x:(x,1))
# print(rdd_map.take(10))
# # 5-对键值对进行聚合
rdd_reduce = rdd_map.reduceByKey(lambda x,y:x+y)
# print(rdd_reduce.take(10))
# # 6-对数据进行排序
rdd_sort = rdd_reduce.sortBy(lambda x:x[1],ascending=False)
print(rdd_sort.take(10))
第二种:通过链式操作
from pyspark import SparkContext
import json
import jiebasc = SparkContext()# 读取hdfs数据
rdd = sc.textFile('hdfs://node1:8020/data/baike_qa_valid.json')# 对每行的json字符串转为字典
rdd_dict = rdd.map(lambda x:json.loads(x))# 计算不同分类的问题数量
rdd_kv = rdd_dict.map(lambda x:(x['category'],1)).reduceByKey(lambda x,y:x+y)# 统计问题中的热搜词,并获取的热搜词
# 对title中的数据分词
rdd_jieba = (rdd_dict.map(lambda x:list(jieba.cut(x['title']))).flatMap(lambda x:x).filter(lambda x:len(x)>1).map(lambda x:(x,1)).reduceByKey(lambda x,y:x+y).sortBy(lambda x:x[1],ascending=False))
#
# # 查看读取的数据res3 = rdd_kv.collect()
print(res3)res4 = rdd_jieba.take(10)
print(res4)
总结
因为数据量过大,所以使用collect()将会出现下面错误,可以使用take(),只查看前几条