관리 메뉴

IT.FARMER

Spark Dataset 데이터 조회 예제 본문

BigData/Spark

Spark Dataset 데이터 조회 예제

아이티.파머 2020. 4. 6. 09:37
반응형

2020/04/03 - [BigData/Spark] - spark linux install (Master / Worker)

2020/04/06 - [BigData/Spark] - spark sql

2020/04/06 - [BigData/Spark] - Spark 모니터링

2020/04/06 - [BigData/Spark] - Spark Dataset

2020/04/06 - [BigData/Spark] - Spark Dataset 데이터 조회 예제

2020/04/06 - [BigData/Spark] - Dataset DataFrame Convert

2020/04/06 - [BigData/Spark] - Spark submit

 

 

 

Apache Spark Dataset 예제

Dataset 을 기본 Dataset<Row> 로도 사용해보며 Java Bean 으로도 컨버팅 하여 사용해보도록 한다. 

기본적인 테이블 조회 및 조회된 Dataset 을 조작 하여 그룹핑 및 필터링등에대한 예제

 

 

    
    
    import com.grp.code.GrpConst;
    import com.grp.domain.entity.ScenarioInformation;
    import lombok.extern.slf4j.Slf4j;
    import org.apache.spark.sql.*;
    import org.junit.jupiter.api.BeforeEach;
    import org.junit.jupiter.api.Test;
    
    import java.util.List;
    
    import static org.apache.spark.sql.functions.sum;
    
    /**
     * <pre>
     * Description :
     *
     *
     * </pre>
     *
     * @author skan
     * @version Copyright (C) 2020 by skan. All right reserved.
     * @since 2020-02-13
     */
    @Slf4j
    class SparkSqlTest {
    
        private SparkSql sparkSql;
    
        @BeforeEach
        void init() {
            sparkSql = new SparkSql();
        }
    
        @Test
        void encoder() {
            String scenarioId = "1";
            Encoder<ScenarioInformation> scenarioInformationEncoder = Encoders.bean(ScenarioInformation.class);
            Dataset<Row> rowDataset = sparkSql.selectSql(String.format("SELECT * FROM SCENARIO_INFO WHERE SCENARIO_ID = %s", scenarioId));
            rowDataset.show();
    
            List<ScenarioInformation> scenDataSet = rowDataset.as(scenarioInformationEncoder).collectAsList();
            scenDataSet.forEach(scenarioInformation -> {
                log.debug("SCENARIO_INFO campaign id = {} scenario id = {}", scenarioInformation.getCAMPAIGN_ID(), scenarioInformation.getSCENARIO_ID());
            });
    
        }
    
    
        @Test
        void adProductSelect() {
            String version = "2020.1Q";
            String productId = "11";
            Dataset<Row> rowDataset = sparkSql.adProductSelect("PRODUCT_DETAIL_INFO", version, productId);
            rowDataset.show();
        }
    
        @Test
        void selectTable() {
            String version = "2020.1Q";
            String sql = String.format("select * from PRODUCT_DETAIL_INFO where VERSION = '%s'", version);
            Dataset<Row> rowDataset = sparkSql.selectSql(sql);
            rowDataset.show();
    
        }
    
        @Test
        void groupByTest(){
            SparkSql sparkSql = new SparkSql();
            String scenarioId = "1";
            Dataset<Row> step11Dataset = sparkSql.selectSql(String.format("SELECT * FROM `SCENARIO_SUM_INFO` WHERE SCENARIO_ID = %s ", scenarioId));
            step11Dataset
                    .select("UV_1","UV_2","UV_3").show();
            step11Dataset
                    .select("SCENARIO_ID","UV_1","UV_2","UV_3")
                    .groupBy("SCENARIO_ID")
                    .agg(sum(new Column("UV_1")).as("UV_1"),sum(new Column("UV_2")).as("UV_2"),sum(new Column("UV_3")).as("UV_3"))
                    .show();
    
            Dataset<Row> scenarioProductInformationDataset = sparkSql
                    .selectSql(String.format("SELECT * FROM SCENARIO_PRODUCT_INFO WHERE SCENARIO_ID = %s", scenarioId));
    
            scenarioProductInformationDataset.collectAsList().forEach(t -> {
    
                    int mediaId = t.getAs("MEDIA_ID");
    
                    Dataset<Row> step10Dataset = sparkSql.selectSql(String.format("SELECT * FROM `MEDIA_SUM_INFO` WHERE SCENARIO_ID = %s AND MEDIA_ID =%s", scenarioId, mediaId));
                    step10Dataset
                            .groupBy("SCENARIO_ID","MEDIA_ID")
                            .agg(sum("UV_1").as("UV_1"))
                            //.select("UV_1")
                            .show();
                    ;
            });
    
    
    
        }
    }

 

 

SparkSql.java 예제.

연동할때 마다 SparkSession을 계속 생성하고 DB 설정을 하는 반복적인 작업을 줄이기 위해 사용함.

package com.grp.calculate.spark;

import lombok.extern.slf4j.Slf4j;
import org.apache.spark.sql.*;

import com.grp.calculate.DataBaseConstants;

import java.util.ArrayList;
import java.util.List;
import java.util.Properties;

/**
 * <pre>
 * Description :
 *
 *
 * </pre>
 *
 * @author skan
 * @version Copyright (C) 2020 by SKAN. All right reserved.
 * @since 2020-02-12
 */
@Slf4j
public class SparkSql {

    {
        System.setProperty("hadoop.home.dir", "D:\\spark-data");
    }

    private static SparkSession sparkSession;

    private SparkSession createSession() {
        // session 설정.
        synchronized (SparkSql.class) {
            if (sparkSession == null) {
                SparkSql.sparkSession = SparkSession
                        .builder()
                        .master("local[*]")
                        .appName("Java Spark SQL basic example")
                        .config("spark.sql.shuffle.partitions", 6)
                        .getOrCreate();
            }
        }

        return sparkSession;
    }

    ;


    /**
     * 테이블 전체 데이터 조회
     *
     * @param table
     * @return
     */
    public Dataset<Row> selectTable(String table) {

        // session 설정.
        SparkSession spark = SparkSessionPool.getSparkSession();
        // db 및 테이블 설정
        String url = DataBaseConstants.URL;
        Dataset<Row> tableRowDataset = spark
                .read()
                .format("jdbc")
                .option("driver", "com.mysql.cj.jdbc.Driver")
                .option("url", url)
                .option("dbtable", table)
                .option("user", DataBaseConstants.USER_ID)
                .option("password", DataBaseConstants.PASSWORD)
                .load();

        return tableRowDataset;
    }

    /**
     * sql 문으로 filtering 된 데이터셋 조회
     *
     * @param sql
     * @param sql
     * @return
     */
    public Dataset<Row> selectSql(String sql) {

        // session 설정.
        SparkSession spark = SparkSessionPool.getSparkSession();

        // db 및 테이블 설정
        String url = DataBaseConstants.URL;

        return spark
                .read()
                .format("jdbc")
                .option("driver", "com.mysql.cj.jdbc.Driver")
                .option("url", url)
                .option("user", DataBaseConstants.USER_ID)
                .option("password", DataBaseConstants.PASSWORD)
                .option("query", sql)
                .load();
    }


    /**
     * 광고 상품 원본 데이터 조회
     *
     * @return
     */
    public Dataset<Row> adProductSelect(String tableName, String version, String productId) {

        Dataset<Row> rowDataset = this.selectTable(tableName);
        rowDataset = rowDataset.select("*")
                .where(new Column("VERSION").isin(version).and(new Column("PRODUCT_ID").isin(productId)));

        return rowDataset;
    }

    /**
     *
     * @param data
     * @param tableName
     * @param saveMode @link SaveMode
     * @param <T>
     */
    public <T> void insert(Dataset<T> data, String tableName, SaveMode saveMode) {

        Properties properties = new Properties();
        properties.put("user", DataBaseConstants.USER_ID);
        properties.put("password", DataBaseConstants.PASSWORD);

        String url = DataBaseConstants.URL;
        data
                .write()
                .mode(saveMode)
                .jdbc(url, tableName, properties);

    }



}

 

 

반응형

'BigData > Spark' 카테고리의 다른 글

Spark submit  (0) 2020.04.06
Dataset DataFrame Convert  (0) 2020.04.06
Spark Dataset  (0) 2020.04.06
Spark 모니터링  (0) 2020.04.06
spark sql  (0) 2020.04.06