Title: 分布式机器学习原理及实战(Pyspark) · Issue #17 · aialgorithm/Blog · GitHub
Open Graph Title: 分布式机器学习原理及实战(Pyspark) · Issue #17 · aialgorithm/Blog
X Title: 分布式机器学习原理及实战(Pyspark) · Issue #17 · aialgorithm/Blog
Description: 一、大数据框架及Spark介绍 1.1 大数据框架 2003年,Google公布了3篇大数据奠基性论文,为大数据存储及分布式处理的核心问题提供了思路:非结构化文件分布式存储(GFS)、分布式计算(MapReduce)及结构化数据存储(BigTable),并奠定了现代大数据技术的理论基础,而后大数据技术便快速发展,诞生了很多日新月异的技术。 归纳现有大数据框架解决的核心问题及相关技术主要为: 分布式存储的问题:有GFS,HDFS等,使得大量的数据能横跨成百上千台机器; 大...
Open Graph Description: 一、大数据框架及Spark介绍 1.1 大数据框架 2003年,Google公布了3篇大数据奠基性论文,为大数据存储及分布式处理的核心问题提供了思路:非结构化文件分布式存储(GFS)、分布式计算(MapReduce)及结构化数据存储(BigTable),并奠定了现代大数据技术的理论基础,而后大数据技术便快速发展,诞生了很多日新月异的技术。 归纳现有大数据框架解决的核心问题及相关技术主要为: ...
X Description: 一、大数据框架及Spark介绍 1.1 大数据框架 2003年,Google公布了3篇大数据奠基性论文,为大数据存储及分布式处理的核心问题提供了思路:非结构化文件分布式存储(GFS)、分布式计算(MapReduce)及结构化数据存储(BigTable),并奠定了现代大数据技术的理论基础,而后大数据技术便快速发展,诞生了很多日新月异的技术。 归纳现有大数据框架解决的核心问题及相关技术主要为: ...
Opengraph URL: https://github.com/aialgorithm/Blog/issues/17
X: @github
Domain: github.com
{"@context":"https://schema.org","@type":"DiscussionForumPosting","headline":"分布式机器学习原理及实战(Pyspark)","articleBody":"# 一、大数据框架及Spark介绍\r\n## 1.1 大数据框架\r\n2003年,Google公布了3篇大数据奠基性论文,为大数据存储及分布式处理的核心问题提供了思路:非结构化文件分布式存储(GFS)、分布式计算(MapReduce)及结构化数据存储(BigTable),并奠定了现代大数据技术的理论基础,而后大数据技术便快速发展,诞生了很多日新月异的技术。\r\n\r\n归纳现有大数据框架解决的核心问题及相关技术主要为:\r\n- 分布式存储的问题:有GFS,HDFS等,使得大量的数据能横跨成百上千台机器;\r\n- 大数据计算的问题:有MapReduce、Spark批处理、Flink流处理等,可以分配计算任务给各个计算节点(机器);\r\n- 结构化数据存储及查询的问题:有Hbase、Bigtable等,可以快速获取/存储结构化的键值数据;\r\n- 大数据挖掘的问题:有Hadoop的mahout,spark的ml等,可以使用分布式机器学习算法挖掘信息;\r\n\r\n\r\n## 1.2 Spark的介绍\r\n\r\nSpark是一个分布式内存批计算处理框架,Spark集群由Driver, Cluster Manager(Standalone,Yarn 或 Mesos),以及Worker Node组成。对于每个Spark应用程序,Worker Node上存在一个Executor进程,Executor进程中包括多个Task线程。\r\n\r\n在执行具体的程序时,Spark会将程序拆解成一个任务DAG(有向无环图),再根据DAG决定程序各步骤执行的方法。该程序先分别从textFile和HadoopFile读取文件,经过一些列操作后再进行join,最终得到处理结果。\r\n\r\nPySpark是Spark的Python API,通过Pyspark可以方便地使用 Python编写 Spark 应用程序, 其支持 了Spark 的大部分功能,例如 Spark SQL、DataFrame、Streaming、MLLIB(ML)和 Spark Core。\r\n\r\n\r\n\r\n\r\n\r\n# 二、PySpark分布式机器学习\r\n\r\n## 2.1 PySpark机器学习库\r\nPyspark中支持两个机器学习库:mllib及ml,区别在于ml主要操作的是DataFrame,而mllib操作的是RDD,即二者面向的数据集不一样。相比于mllib在RDD提供的基础操作,ml在DataFrame上的抽象级别更高,数据和操作耦合度更低。\r\n\u003e 注:mllib在后面的版本中可能被废弃,本文示例使用的是ml库。\r\n\r\npyspark.ml训练机器学习库有三个主要的抽象类:Transformer、Estimator、Pipeline。\r\n\r\n- Transformer主要对应feature子模块,实现了算法训练前的一系列的特征预处理工作,例如MinMaxScaler、word2vec、onehotencoder等,对应操作为transform;\r\n``` \r\n# 举例:特征加工\r\nfrom pyspark.ml.feature import VectorAssembler\r\nfeaturesCreator = VectorAssembler(\r\n inputCols=[col[0] for col in labels[2:]] + [encoder.getOutputCol()],\r\n outputCol='features'\r\n)\r\n\r\n```\r\n- Estimator对应各种机器学习算法,主要为分类、回归、聚类和推荐算法4大类,具体可选算法大多在sklearn中均有对应,对应操作为fit;\r\n``` \r\n# 举例:分类模型\r\nfrom pyspark.ml.classification import LogisticRegression\r\n\r\nlogistic = LogisticRegression(featuresCol=featuresCreator.getOutputCol(),\r\n labelCol='INFANT_ALIVE_AT_REPORT')\r\n```\r\n- Pipeline可将一些列转换和训练过程串联形成流水线。\r\n```\r\n# 举例:创建流水线\r\nfrom pyspark.ml import Pipeline\r\n\r\npipeline = Pipeline(stages=[encoder, featuresCreator, logistic]) # 特征编码,特征加工,载入LR模型\r\n# 拟合模型\r\ntrain, test = data.randomSplit([0.7,0.3],seed=123)\r\nmodel = pipeline.fit(train)\r\n```\r\n\r\n## 2.2 PySpark分布式机器学习原理\r\n在分布式训练中,用于训练模型的工作负载会在多个微型处理器之间进行拆分和共享,这些处理器称为工作器节点,通过这些工作器节点并行工作以加速模型训练。 分布式训练可用于传统的 ML 模型,但更适用于计算和时间密集型任务,如用于训练深度神经网络。分布式训练有两种主要类型:数据并行及模型并行,主要代表有Spark ML,Parameter Server和TensorFlow。\r\n\r\n\r\nspark的分布式训练的实现为数据并行:按行对数据进行分区,从而可以对数百万甚至数十亿个实例进行分布式训练。 以其核心的梯度下降算法为例:\r\n1、首先对数据划分至各计算节点;\r\n2、把当前的模型参数广播到各个计算节点(当模型参数量较大时会比较耗带宽资源);\r\n3、各计算节点进行数据抽样得到mini batch的数据,分别计算梯度,再通过treeAggregate操作汇总梯度,得到最终梯度gradientSum;\r\n4、利用gradientSum更新模型权重(这里采用的阻断式的梯度下降方式,当各节点有数据倾斜时,每轮的时间起决于最慢的节点。这是Spark并行训练效率较低的主要原因)。\r\n\r\n\r\n# PySpark项目实战\r\n\u003e注:单纯拿Pyspark练练手,可无需配置Pyspark集群,直接本地配置下单机Pyspark,也可以使用线上spark集群(如: community.cloud.databricks.com)。\r\n\r\n本项目通过PySpark实现机器学习建模全流程:数据的载入,数据分析,特征加工,二分类模型训练及评估。\r\n```\r\n#!/usr/bin/env python\r\n# coding: utf-8\r\n\r\n\r\n# 初始化SparkSession\r\nfrom pyspark.sql import SparkSession\r\n\r\nspark = SparkSession.builder.appName(\"Python Spark RF example\").config(\"spark.some.config.option\", \"some-value\").getOrCreate()\r\n\r\n# 加载数据\r\ndf = spark.read.format('com.databricks.spark.csv').options(header='true', inferschema='true').load(\"./data.csv\",header=True)\r\n\r\nfrom pyspark.sql.functions import *\r\n# 数据基本信息分析\r\n\r\ndf.dtypes # Return df column names and data types\r\ndf.show() #Display the content of df\r\ndf.head() #Return first n rows\r\ndf.first() #Return first row \r\ndf.take(2) #Return the first n rows\r\ndf.schema # Return the schema of df\r\ndf.columns # Return the columns of df\r\ndf.count() #Count the number of rows in df\r\ndf.distinct().count() #Count the number of distinct rows in df\r\ndf.printSchema() #Print the schema of df\r\ndf.explain() #Print the (logical and physical) plans\r\ndf.describe().show() #Compute summary statistics \r\n\r\ndf.groupBy('Survived').agg(avg(\"Age\"),avg(\"Fare\")).show() # 聚合分析\r\ndf.select(df.Sex, df.Survived==1).show() # 带条件查询 \r\ndf.sort(\"Age\", ascending=False).collect() # 排序\r\n\r\ndf = df.dropDuplicates() # 删除重复值\r\n\r\ndf = df.na.fill(value=0) # 缺失填充值\r\ndf = df.na.drop() # 或者删除缺失值\r\n\r\n\r\ndf = df.withColumn('isMale', when(df['Sex']=='male',1).otherwise(0)) # 新增列:性别0 1\r\ndf = df.drop('_c0','Name','Sex') # 删除姓名、性别、索引列\r\n\r\n# 设定特征/标签列\r\nfrom pyspark.ml.feature import VectorAssembler\r\nignore=['Survived']\r\nvectorAssembler = VectorAssembler(inputCols=[x for x in df.columns \r\n if x not in ignore], outputCol = 'features')\r\nnew_df = vectorAssembler.transform(df)\r\nnew_df = new_df.select(['features', 'Survived'])\r\n\r\n# 划分测试集训练集\r\ntrain, test = new_df.randomSplit([0.75, 0.25], seed = 12345)\r\n\r\n# 模型训练\r\nfrom pyspark.ml.classification import LogisticRegression\r\n\r\nlr = LogisticRegression(featuresCol = 'features', \r\n labelCol='Survived')\r\nlr_model = lr.fit(test)\r\n\r\n# 模型评估\r\nfrom pyspark.ml.evaluation import BinaryClassificationEvaluator\r\n\r\npredictions = lr_model.transform(test)\r\nauc = BinaryClassificationEvaluator().setLabelCol('Survived')\r\nprint('AUC of the model:' + str(auc.evaluate(predictions)))\r\nprint('features weights', lr_model.coefficientMatrix)\r\n```\r\n---\r\n文章首发于算法进阶,公众号阅读原文可访问[GitHub项目源码](https://github.com/aialgorithm/Blog)\r\n\r\n","author":{"url":"https://github.com/aialgorithm","@type":"Person","name":"aialgorithm"},"datePublished":"2021-06-07T12:35:59.000Z","interactionStatistic":{"@type":"InteractionCounter","interactionType":"https://schema.org/CommentAction","userInteractionCount":0},"url":"https://github.com/17/Blog/issues/17"}
| route-pattern | /_view_fragments/issues/show/:user_id/:repository/:id/issue_layout(.:format) |
| route-controller | voltron_issues_fragments |
| route-action | issue_layout |
| fetch-nonce | v2:1475160e-2ee9-65ba-0389-2ce56ee8e1d7 |
| current-catalog-service-hash | 81bb79d38c15960b92d99bca9288a9108c7a47b18f2423d0f6438c5b7bcd2114 |
| request-id | 915E:3B638A:A77227:E2EE43:6969E957 |
| html-safe-nonce | 8f9743844f4127f904f100df90fcc70fd2ff8bef3039956a7e6a73fdeb6c8cf9 |
| visitor-payload | eyJyZWZlcnJlciI6IiIsInJlcXVlc3RfaWQiOiI5MTVFOjNCNjM4QTpBNzcyMjc6RTJFRTQzOjY5NjlFOTU3IiwidmlzaXRvcl9pZCI6IjE1NjE0MjMyNzE4OTY0MTA0NTUiLCJyZWdpb25fZWRnZSI6ImlhZCIsInJlZ2lvbl9yZW5kZXIiOiJpYWQifQ== |
| visitor-hmac | 82556beae3efb6ed0e4a01f80fb1eb73f834c8864b0e0be5c2466641866141df |
| hovercard-subject-tag | issue:913501071 |
| github-keyboard-shortcuts | repository,issues,copilot |
| google-site-verification | Apib7-x98H0j5cPqHWwSMm6dNU4GmODRoqxLiDzdx9I |
| octolytics-url | https://collector.github.com/github/collect |
| analytics-location | / |
| fb:app_id | 1401488693436528 |
| apple-itunes-app | app-id=1477376905, app-argument=https://github.com/_view_fragments/issues/show/aialgorithm/Blog/17/issue_layout |
| twitter:image | https://opengraph.githubassets.com/b2f6d44743fcd63c33bace45924c4355804d84ed9015422e47aebc8707a17cf1/aialgorithm/Blog/issues/17 |
| twitter:card | summary_large_image |
| og:image | https://opengraph.githubassets.com/b2f6d44743fcd63c33bace45924c4355804d84ed9015422e47aebc8707a17cf1/aialgorithm/Blog/issues/17 |
| og:image:alt | 一、大数据框架及Spark介绍 1.1 大数据框架 2003年,Google公布了3篇大数据奠基性论文,为大数据存储及分布式处理的核心问题提供了思路:非结构化文件分布式存储(GFS)、分布式计算(MapReduce)及结构化数据存储(BigTable),并奠定了现代大数据技术的理论基础,而后大数据技术便快速发展,诞生了很多日新月异的技术。 归纳现有大数据框架解决的核心问题及相关技术主要为: ... |
| og:image:width | 1200 |
| og:image:height | 600 |
| og:site_name | GitHub |
| og:type | object |
| og:author:username | aialgorithm |
| hostname | github.com |
| expected-hostname | github.com |
| None | 7b32f1c7c4549428ee399213e8345494fc55b5637195d3fc5f493657579235e8 |
| turbo-cache-control | no-preview |
| go-import | github.com/aialgorithm/Blog git https://github.com/aialgorithm/Blog.git |
| octolytics-dimension-user_id | 33707637 |
| octolytics-dimension-user_login | aialgorithm |
| octolytics-dimension-repository_id | 147093233 |
| octolytics-dimension-repository_nwo | aialgorithm/Blog |
| octolytics-dimension-repository_public | true |
| octolytics-dimension-repository_is_fork | false |
| octolytics-dimension-repository_network_root_id | 147093233 |
| octolytics-dimension-repository_network_root_nwo | aialgorithm/Blog |
| turbo-body-classes | logged-out env-production page-responsive |
| disable-turbo | false |
| browser-stats-url | https://api.github.com/_private/browser/stats |
| browser-errors-url | https://api.github.com/_private/browser/errors |
| release | bdde15ad1b403e23b08bbd89b53fbe6bdf688cad |
| ui-target | full |
| theme-color | #1e2327 |
| color-scheme | light dark |
Links:
Viewport: width=device-width