目录
1、创建
a、创建空的dataframe
b、使用toDF函数创建DataFrame
本地seq + toDF创建DataFrame示例:
通过case class + toDF创建DataFrame
c、Spark中使用createDataFrame函数创建DataFrame
d、通过文件直接创建DataFrame
spark模糊匹配(glob语法)
e、spark把资源文件生成dataframe(Schema(country,map(operator,operatorName)))
备注:
- 使用包含null的序列创建dataframe会报错,可以使用withColumn、when,asInstanceOf特殊处理一下
2、修改
a、spark dataframe修改schema
DataFrame是Spark中对带模式(schema)行列数据的抽象。DateFrame广泛应用于使用SQL处理大数据的各种场景。创建DataFrame有很多种方法,比如从本地List创建、从RDD创建或者从源数据创建,下面简要介绍创建DataFrame的几种方法。
方法一、创建空的dataframe
// 创建不带schema的dataframe
val df:DataFrame = spark.emptyDataFrame
// 创建空的带schema的dataframe
val priceSchema = new StructType().add("model",StringType).add("brand",StringType).add("release_date",StringType).add("price_group",StringType)
val df:DataFrame = spark.createDataFrame(spark.sparkContext.emptyRDD[Row],priceSchema)
方法二,使用toDF函数创建DataFrame
通过导入(importing)Spark sql implicits, 就可以将本地序列(seq), 数组或者RDD转为DataFrame。只要这些数据的内容能指定数据类型即可。
1、本地seq + toDF创建DataFrame示例:
import spark.implicits._
val df = Seq(
(1, "First Value", java.sql.Date.valueOf("2010-01-01")),
(2, "Second Value", java.sql.Date.valueOf("2010-02-01"))
).toDF("int_column", "string_column", "date_column")
注意:如果直接用toDF()而不指定列名字,那么默认列名为"_1", "_2", ...
2、通过case class + toDF创建DataFrame
import spark.implicits._
case class Person(name: String, age: Int)
val people = sc.textFile("src/main/resources/people.txt").map(_.split(",")).map(p => Person(p(0), p(1).trim.toInt)).toDF()
people.registerTempTable("people")
// 使用 sqlContext 执行 sql 语句.
val teenagers = sqlContext.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19")
// 注:sql()函数的执行结果也是DataFrame,支持各种常用的RDD操作.
// The columns of a row in the result can be accessed by ordinal.
teenagers.map(t => "Name: " + t(0)).collect().foreach(println)
方法三,Spark中使用createDataFrame函数创建DataFrame
在SqlContext中使用createDataFrame也可以创建DataFrame。跟toDF一样,这里创建DataFrame的数据形态也可以是本地数组或者RDD。
通过row+schema创建示例
import org.apache.spark.sql.types._
val schema = StructType(List(
StructField("integer_column", IntegerType, nullable = false),
StructField("string_column", StringType, nullable = true),
StructField("date_column", DateType, nullable = true)
))
val rdd = sc.parallelize(Seq(
Row(1, "First Value", java.sql.Date.valueOf("2010-01-01")),
Row(2, "Second Value", java.sql.Date.valueOf("2010-02-01"))
))
val df = sqlContext.createDataFrame(rdd, schema)
方法四,通过文件直接创建DataFrame
// 使用parquet文件创建
// val df = sqlContext.read.parquet("hdfs:/path/to/file")
// 使用json文件创建
val df = spark.read.json("src/main/resources/people.json")
// Displays the content of the DataFrame to stdout
df.show()
// +----+-------+
// | age| name|
// +----+-------+
// |null|Michael|
// | 30| Andy|
// | 19| Justin|
// +----+-------+
// 使用csv文件,spark2.0+之后的版本可用
//然后使用SparkSessions对象加载CSV成为DataFrame
df = spark.read.format("com.databricks.spark.csv")
.option("header", "true") //reading the headers
.option("mode", "DROPMALFORMED")
.load("csv/file/path"); //.csv("csv/file/path") //spark 2.0 api
df.show()
注意:本文归作者所有,未经作者允许,不得转载