做有温度的教育!
全国统一咨询热线:18003608300
北京
校区
首页> 知识宝藏分享,为您打开学习之门 > 技术分享

Spark--数据读取与保存

时间:2019-04-22
浏览:5992
发布:来源:
赞:5992

今天给大家介绍一下大数据开发中Spark的数据读取与保存,希望大家在开发中可以了解这个过程!


1、动机

有时候数据量会大到本机可能无法存储,这时就需要探索别的读取和保存方法了。

Spark支持很多种输入源和输出源。一部分原因是Spark本身是基于Hadoop生态圈二构建的,so spark可以通过Hadoop MapReduce 所使用的InputFormat 和 OutPutFormat 接口访问数据,而大部分常见的文件格式与存储系统(S3,HDFS,Cassandra,H等)都支持这种接口。

Spark所支持的三种常见数据源:

· 文本格式与文件系统

· SparkSQL

· 数据库与键值存储


2、文件格式:

结构化

- 文本文件:不是结构化,普通的文本文件,每一行一条记录

- JSON: 半结构化,常见的基于文本的格式,半结构化;大多数库都要求每行一条记录。

- CSV:结构化,非常常见的基于文本的格式,通常在电子表格应用中使用。

- SequenceFile:结构化,一种用于键值对数据的常见Hadoop文件格式。

- Protocol buffers:结构化,一种快速的,节约空间的跨语言格式。

- 对象文件:用来将Spark作业中的数据存储下来以让共享的代码读取。改变类的时候它会失效,因为他依赖于java序列化。

(1) 文本文件

当我们将一个文本文件读取为RDD时,输入的每一行都会成为RDD的每一个元素。也可以将多个文本文件读取为一个pair RDD,其中键是文件名,值是文件内容。

scala> val input = sc.textFile("/Users/mac/Documents/javascala/VNCluster/500points.txt ")input: org.apache.spark.rdd.RDD[String] = /Users/mac/Documents/javascala/VNCluster/500points.txt MapPartitionsRDD[1] at textFile at:24


同样可以指定分区数

scala> val input = sc.textFile("/Users/mac/Documents/javascala/VNCluster/500points.txt ",4)input: org.apache.spark.rdd.RDD[String] = /Users/mac/Documents/javascala/VNCluster/500points.txt MapPartitionsRDD[3] at textFile at:24

我是读入一个数据点文本文件,然后对数据做一些操作存储在源目录中:

scala> val data = input.map(x => (x.split(" ")(0),x.split(" ")(1)))data: org.apache.spark.rdd.RDD[(String, String)] = MapPartitionsRDD[6] at map at:26

scala> data.firstres3: (String, String) = (0.2489,0.7091)

scala> data.filter{case(x,y) => y.toDouble > 0.5}res5: org.apache.spark.rdd.RDD[(String, String)] = MapPartitionsRDD[10] at filter at:29

scala> val afterDeal = data.filter{case(x,y) => y.toDouble > 0.5}afterDeal: org.apache.spark.rdd.RDD[(String, String)] = MapPartitionsRDD[11] at filter at:28

scala> afterDeal.firstres7: (String, String) = (0.2489,0.7091)

scala> afterDeal.saveAsTextFile("/Users/mac/Documents/javascala/VNCluster/rdd.txt")

(2)JSON

· 读取JSON文件:将数据作为文本文件,然后对JSON数据进行解析,这样的方法可以在所有支持的编程语言中使用。

· 保存JSON文件:不需要考虑格式错误的数据,并且也知道要写出的数据类型。

(3)逗号分隔值与制表符分隔值

逗号分隔值(CSV)文件每行都有固定数目的字段,字段间用逗号隔开。记录通常是一行一条,有时也可以跨行。

· 读取CSV:

val conf = new SparkConf().setMaster("local").setAppName("My app")

val sc = new SparkContext(conf)

val input = sc.textFile("inputFile")

val result = input.map{ line =>

val reader = new CSVReader(new StringBuilder(line))

reader.readNext()

· 保存CSV:和保存保存文本文件基本一样

scala> val input = sc.textFile("/Users/mac/Desktop/500points.csv")input: org.apache.spark.rdd.RDD[String] = /Users/mac/Desktop/500points.csv MapPartitionsRDD[1] at textFile at:24

scala> input.firstres0: String = 237.09,712.1

scala> val data = input.map(x => (x.split(",")(0),x.split(",")(1)))data: org.apache.spark.rdd.RDD[(String, String)] = MapPartitionsRDD[2] at map at:26

scala> data.firstres1: (String, String) = (237.09,712.1)

scala> val deal = data.mapValues(x => x.toDouble / x.max.toDouble)deal: org.apache.spark.rdd.RDD[(String, Double)] = MapPartitionsRDD[3] at mapValues at:28

scala> deal.saveAsTextFile("/Users/mac/Desktop/500pointsresult.csv" )


【版权与免责声明】如发现内容存在版权问题,烦请提供相关信息联系我们,我们将及时沟通与处理。本站内容除非来源注明甲骨文华育兴业,否则均为网友转载,涉及言论、版权与本站无关。

精彩推荐

友情链接: bif

Copyright ©2016-2020. All Rights Reserved. 京ICP备17018991号-4