1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105
| import os from pyspark import SparkContext, SparkConf import pandas as pd from scipy import stats
file_path = ""
def set_spark_context(env): global file_path sparkConf = SparkConf() sparkConf.setAppName("movie_recommend")
if env == 'local': os.environ["HADOOP_HOME"] = "D:\green-soft\hadoop-2.7.3" os.environ["SPARK_HOME"] = "D:\green-soft\spark-2.4.8-bin-hadoop2.7" file_path = r"file:///C:\Users\Qingyuan_Qu\PycharmProjects\pythonProject\data\\" sparkConf.setMaster("local")
elif env == 'local_python_remote_spark': os.environ["HADOOP_HOME"] = "D:\green-soft\hadoop-2.7.3" os.environ["HADOOP_CONF_DIR"] = "D:\green-soft\etc\hadoop" os.environ["YARN_CONF_DIR"] = "D:\green-soft\etc\hadoop" sparkConf.setMaster("yarn") sparkConf.set("spark.submit.deployMode", "client") sparkConf.set("spark.eventLog.enabled", "true") sparkConf.set("spark.eventLog.dir", "hdfs://node0:9000/shared/spark-logs") sparkConf.set("spark.yarn.archive", "hdfs://node0:9000/shared/spark-archive")
elif env == 'yarn': sparkConf.set("spark.ui.showConsoleProgress", "true") os.environ["HADOOP_CONF_DIR"] = "/opt/bigdata/hadoop/default/etc/hadoop" os.environ["YARN_CONF_DIR"] = "/opt/bigdata/hadoop/default/etc/hadoop" file_path = "hdfs://node0:9000/data/" sparkConf.setMaster("yarn") sparkConf.set("spark.submit.deployMode", "client") sparkConf.set("spark.eventLog.enabled", "true") sparkConf.set("spark.eventLog.dir", "hdfs://node0:9000/shared/spark-logs") sparkConf.set("spark.yarn.archive", "hdfs://node0:9000/shared/spark-archive")
sc = SparkContext(conf=sparkConf) sc.setLogLevel("INFO") return sc
def sim(ur_ur): u_r_1 = pd.DataFrame(ur_ur[1][0], columns=['user', 'rating_a']) u_r_2 = pd.DataFrame(ur_ur[1][1], columns=['user', 'rating_b']) u_r_1.set_index('user', inplace=True) u_r_2.set_index('user', inplace=True) u_r_r = u_r_1.join(u_r_2, how='inner') print(u_r_r) pearson_value, P_value = stats.pearsonr(u_r_r['rating_a'].astype(float), u_r_r['rating_b'].astype(float)) print(ur_ur[0], pearson_value) return [ur_ur[0], pearson_value]
def recommend(bc, m_sim): bc_df = pd.DataFrame(bc.value, columns=['movie', 'value']) bc_df.set_index('movie', inplace=True)
m_sim_df = pd.DataFrame(m_sim, columns=['movie', 'value']) m_sim_df.set_index('movie', inplace=True) m_sim_sub_df = m_sim_df.loc[bc_df.index.values]
print(m_sim_sub_df)
score = m_sim_sub_df.T.astype(float).dot(bc_df.astype(float)) print("dot: \n ", score) return score.loc['value']['value']
if __name__ == '__main__': ''' 《数据挖掘与机器学习》122页,基于物品的协同过滤 ''' env = 'local' sc = set_spark_context(env) raw_umr_rdd = sc.textFile(file_path + "movies.csv") u_m_r_rdd = raw_umr_rdd.map(lambda line: line.split("\t")[:3]) m_ur_rdd = u_m_r_rdd.map(lambda line: (line[1], [line[0], line[2]])) m_urs_rdd = m_ur_rdd.groupByKey().mapValues(list) m_urs_cartesian_rdd = m_urs_rdd.cartesian(m_urs_rdd) m_urs_cartesian_unique_rdd = m_urs_cartesian_rdd.filter(lambda m_urs: m_urs[0][0] != m_urs[1][0])
m__m_rrs_rdd = m_urs_cartesian_unique_rdd.map( lambda m_urs_2: (m_urs_2[0][0], (m_urs_2[1][0], [m_urs_2[0][1], m_urs_2[1][1]]))) m__m_sim_rdd = m__m_rrs_rdd.mapValues(sim) m__m_sim_s_rdd = m__m_sim_rdd.groupByKey().mapValues(list)
u_mr_rdd = u_m_r_rdd.map(lambda line: (line[0], [line[1], line[2]])) u_mrs_rdd = u_mr_rdd.groupByKey().mapValues(list)
m_r_d = u_mrs_rdd.filter(lambda x: x[0] == 'A').values() m_r_d_rdd = sc.parallelize(m_r_d.first()) bc = sc.broadcast(m_r_d.first()) sub_sim_rdd = m__m_sim_s_rdd.subtractByKey(m_r_d_rdd) m_score_rdd = sub_sim_rdd.mapValues(lambda x: recommend(bc, x)) m_score_sorted_rdd = m_score_rdd.sortBy(lambda x: x[1], False) print(m_score_sorted_rdd.collect()) print(m_score_sorted_rdd.take(3))
|