Dataframe使用

刘超 1年前 ⋅ 5161 阅读   编辑

目录

  1、创建
    a、创建空的dataframe
    b、使用toDF函数创建DataFrame
        本地seq + toDF创建DataFrame示例:
        通过case class + toDF创建DataFrame
    c、Spark中使用createDataFrame函数创建DataFrame
    d、通过文件直接创建DataFrame
         spark模糊匹配(glob语法)
    e、spark把资源文件生成dataframe(Schema(country,map(operator,operatorName)))

    备注:

- 使用包含null的序列创建dataframe会报错,可以使用withColumn、when,asInstanceOf特殊处理一下

  2、修改
    a、spark dataframe修改schema

 

  DataFrame是Spark中对带模式(schema)行列数据的抽象。DateFrame广泛应用于使用SQL处理大数据的各种场景。创建DataFrame有很多种方法,比如从本地List创建、从RDD创建或者从源数据创建,下面简要介绍创建DataFrame的几种方法。

方法一、创建空的dataframe

// 创建不带schema的dataframe
val df:DataFrame = spark.emptyDataFrame
// 创建空的带schema的dataframe
val priceSchema = new StructType().add("model",StringType).add("brand",StringType).add("release_date",StringType).add("price_group",StringType)
val df:DataFrame = spark.createDataFrame(spark.sparkContext.emptyRDD[Row],priceSchema)
方法二,使用toDF函数创建DataFrame
  通过导入(importing)Spark sql implicits, 就可以将本地序列(seq), 数组或者RDD转为DataFrame。只要这些数据的内容能指定数据类型即可。
  1、本地seq + toDF创建DataFrame示例:
import spark.implicits._
val df = Seq(
  (1, "First Value", java.sql.Date.valueOf("2010-01-01")),
  (2, "Second Value", java.sql.Date.valueOf("2010-02-01"))
).toDF("int_column", "string_column", "date_column")
  注意:如果直接用toDF()而不指定列名字,那么默认列名为"_1", "_2", ...
  2、通过case class + toDF创建DataFrame
import spark.implicits._

case class Person(name: String, age: Int)

val people = sc.textFile("src/main/resources/people.txt").map(_.split(",")).map(p => Person(p(0), p(1).trim.toInt)).toDF()
people.registerTempTable("people")
 
// 使用 sqlContext 执行 sql 语句.
val teenagers = sqlContext.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19")
 
// 注:sql()函数的执行结果也是DataFrame,支持各种常用的RDD操作.
// The columns of a row in the result can be accessed by ordinal.
teenagers.map(t => "Name: " + t(0)).collect().foreach(println)
方法三,Spark中使用createDataFrame函数创建DataFrame
在SqlContext中使用createDataFrame也可以创建DataFrame。跟toDF一样,这里创建DataFrame的数据形态也可以是本地数组或者RDD。
通过row+schema创建示例
import org.apache.spark.sql.types._
val schema = StructType(List(
    StructField("integer_column", IntegerType, nullable = false),
    StructField("string_column", StringType, nullable = true),
    StructField("date_column", DateType, nullable = true)
))
 
val rdd = sc.parallelize(Seq(
  Row(1, "First Value", java.sql.Date.valueOf("2010-01-01")),
  Row(2, "Second Value", java.sql.Date.valueOf("2010-02-01"))
))
val df = sqlContext.createDataFrame(rdd, schema)
方法四,通过文件直接创建DataFrame
// 使用parquet文件创建
// val df = sqlContext.read.parquet("hdfs:/path/to/file")
// 使用json文件创建
val df = spark.read.json("src/main/resources/people.json")
 
// Displays the content of the DataFrame to stdout
df.show()
// +----+-------+
// | age|   name|
// +----+-------+
// |null|Michael|
// |  30|   Andy|
// |  19| Justin|
// +----+-------+
// 使用csv文件,spark2.0+之后的版本可用
//然后使用SparkSessions对象加载CSV成为DataFrame
df = spark.read.format("com.databricks.spark.csv")
        .option("header", "true") //reading the headers
        .option("mode", "DROPMALFORMED")
        .load("csv/file/path"); //.csv("csv/file/path") //spark 2.0 api
 
df.show()


注意:本文归作者所有,未经作者允许,不得转载

全部评论: 0

    我有话说: