spark2.0.1 csv文件转换parquet文件

import org.apache.spark.sql.{SQLContext, SparkSession}
import org.apache.spark.sql.types._
object Demo {
  def main(args: Array[String]): Unit = {
    if(args.length != 2){
      println("jar args: inputFiles outPath ")
      System.exit(0)
    }
    val inputPath = args(0)
    val outPath = args(1)

    /**
      * spark2.0以前的方法
      */
    //val conf = new SparkConf().setAppName("Demo")
    //val sc = new SparkContext(conf) //参数SparkConf创建SparkContext,
    //val sqlContext = new SQLContext(sc) //参数SparkContext创建SQLContext
    val spark=SparkSession.builder().appName("Demo").enableHiveSupport().getOrCreate()
    val sqlContext=spark.sqlContext
    val schema = StructType(Array(
      StructField("usernumber",DataTypes.StringType,false),
      StructField("lasttime",DataTypes.StringType,false),
      StructField("createtime",DataTypes.StringType,false),
      StructField("modifytime",DataTypes.StringType,false)))
    convert(sqlContext, inputPath, schema, outPath)
  }

  //CSV转换为parquet
  def convert(sqlContext: SQLContext, inputpath: String, schema: StructType, outpath: String) {
    val df = sqlContext.read.format("com.databricks.spark.csv").
      schema(schema).option("delimiter", ",").load(inputpath)
    // 转换为parquet
     df.write.parquet(outpath)

  }
}


./bin/spark-submit --master yarn --deploy-mode cluster --class Demo /home/SparkTest.jar  /csv/* /parquet

 
import org.apache.spark.sql.SparkSessioe

object read_parquet {

  def main(args: Array[String]): Unit = {
    val parquetFile = args(0)
    SparkSession.clearDefaultSession()
    val spark=SparkSession.builder().appName("read_parquet").enableHiveSupport().getOrCreate().sqlContext
    val parquet=spark.read.parquet(parquetFile)
     parquet.createOrReplaceTempView("test")
     parquet.cache()
     val results=spark.sql("select * from test")
     results.show(10)
    }


}
 
./bin/spark-submit --master yarn --deploy-mode client --class read_parquet /home/read_parquet.jar /parquet/*






<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>SparkTest</groupId>
    <artifactId>SparkTest</artifactId>
    <version>1.0-SNAPSHOT</version>
    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <spark.version>2.0.1</spark.version>
        <scala.version>2.11</scala.version>
        <hadoop.version>2.7.2</hadoop.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.10</artifactId>
            <version>2.0.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.10</artifactId>
            <version>2.0.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>2.7.2</version>
        </dependency>
        <dependency>
            <groupId>com.databricks</groupId>
            <artifactId>spark-csv_2.11</artifactId>
            <version>1.4.0</version>
        </dependency>
        <dependency>
            <groupId>com.ecfront</groupId>
            <artifactId>ez-fs</artifactId>
            <version>0.9</version>
        </dependency>

    </dependencies>
</project>
 

Administrator

知人不必言尽,留三分余地与人,留些口德与己。 责人不必苛尽,留三分余地与人,留些肚量与己。 才能不必傲尽,留三分余地与人,留些内涵与己。 锋芒不必露尽,留三分余地与人,留些深敛与己。 有功不必邀尽,留三分余地与人,留些谦让与己。

发表评论

电子邮件地址不会被公开。 必填项已用*标注