관리 메뉴

IT.FARMER

spark sql 본문

BigData/Spark

spark sql

아이티.파머 2020. 4. 6. 09:30
반응형

2020/04/03 - [BigData/Spark] - spark linux install (Master / Worker)

2020/04/06 - [BigData/Spark] - spark sql

2020/04/06 - [BigData/Spark] - Spark 모니터링

2020/04/06 - [BigData/Spark] - Spark Dataset

2020/04/06 - [BigData/Spark] - Spark Dataset 데이터 조회 예제

2020/04/06 - [BigData/Spark] - Dataset DataFrame Convert

2020/04/06 - [BigData/Spark] - Spark submit

Spark SQL

Spark SQL 은 구조화된 데이터 작업을 위한 모듈 이다.

Spark Sql을 사용하면 Sql 또는 익숙한 DataFrameAPI를 사용하여 Spark 프로그램 내에서 구조화된 데이터를 쿼리 할 수 있다.

Spark SQL의 한가지 용도는 SQL 쿼리를 실행 하는 것이다. SparkSQL을 이용하여 기존 Hive 설치에서 데이터를 읽을수도 있고, 다른 프로그래밍 언어에서 SQL을 실행하여 결과를 Dataset/DataFrame 으로 반환하여 사용한다.

JDBC또는 ODBC를 통해 DB에 연결 할 수 있다.

 

Spark java example

package com.grp.calculate.spark;

import lombok.extern.slf4j.Slf4j;
import org.apache.spark.sql.*;

import java.util.ArrayList;
import java.util.List;
import java.util.Properties;

/**
 * <pre>
 * Description :
 *
 *
 * </pre>
 *
 * @author skan
 * @version Copyright (C) 2020 by skan. All right reserved.
 * @since 2020-02-12
 */
@Slf4j
public class SparkSql {

    {
        System.setProperty("hadoop.home.dir", "D:\\spark-data");
    }

    private static SparkSession sparkSession;

    private SparkSession createSession() {
        // session 설정.
        synchronized (SparkSql.class) {
            if (sparkSession == null) {
                SparkSql.sparkSession = SparkSession
                        .builder()
                        .master("local[*]")
                        .appName("Java Spark SQL basic example")
                        .config("spark.sql.shuffle.partitions", 6)
                        .getOrCreate();
            }
        }

        return sparkSession;
    }

    ;


    /**
     * 테이블 전체 데이터 조회
     *
     * @param table
     * @return
     */
    public Dataset<Row> selectTable(String table) {

        // session 설정.
        SparkSession spark = SparkSessionPool.getSparkSession();
        // db 및 테이블 설정
        String url = "jdbc:mysql://IP:3306/grp?characterEncoding=UTF-8&serverTimezone=UTC";
        Dataset<Row> tableRowDataset = spark
                .read()
                .format("jdbc")
                .option("driver", "com.mysql.cj.jdbc.Driver")
                .option("url", url)
                .option("dbtable", table)
                .option("user", "user_id")
                .option("password", "password")
                .load();

        return tableRowDataset;
    }

    /**
     * sql 문으로 filtering 된 데이터셋 조회
     *
     * @param sql
     * @param sql
     * @return
     */
    public Dataset<Row> selectSql(String sql) {

        // session 설정.
        SparkSession spark = SparkSessionPool.getSparkSession();

        // db 및 테이블 설정
        String url = "jdbc:mysql://IP:3306/grp?characterEncoding=UTF-8&serverTimezone=UTC";

        return spark
                .read()
                .format("jdbc")
                .option("driver", "com.mysql.cj.jdbc.Driver")
                .option("url", url)
                .option("user", "user_id")
                .option("password", "password")
                .option("query", sql)
                .load();
    }


    /**
     * 광고 상품 원본 데이터 조회
     *
     * @return
     */
    public Dataset<Row> adProductSelect(String tableName, String version, String productId) {

        Dataset<Row> rowDataset = this.selectTable(tableName);
        rowDataset = rowDataset.select("*")
                .where(new Column("VERSION").isin(version).and(new Column("PRODUCT_ID").isin(productId)));

        return rowDataset;
    }

    /**
     *
     * @param data
     * @param tableName
     * @param saveMode @link SaveMode
     * @param <T>
     */
    public <T> void insert(Dataset<T> data, String tableName, SaveMode saveMode) {

        Properties properties = new Properties();
        properties.put("user", "user_id");
        properties.put("password", "password");

        String url = "jdbc:mysql://IP:3306/grp?characterEncoding=UTF-8&serverTimezone=UTC";
        data
                .write()
                .mode(saveMode)
                .jdbc(url, tableName, properties);

    }

    /**
     * List 파일을 Dataset 으로 변형 하기
     * @param list
     * @param tClass
     * @param <T>
     * @param <D>
     * @return
     */
    public <T,D> Dataset<T> createDataSet (List<T> list, Class<T> tClass) {
        SparkSession sparkSession = this.createSession();
        Encoder<T> personEncoder = Encoders.bean(tClass);

        return sparkSession.createDataset(list,personEncoder);

    }


}
반응형

'BigData > Spark' 카테고리의 다른 글

Spark Dataset  (0) 2020.04.06
Spark 모니터링  (0) 2020.04.06
spark linux install (Master / Worker)  (0) 2020.04.03
Spark ML pipeline  (0) 2017.06.16
Spark 설치 및 실습  (0) 2017.06.16