반응형
2020/04/03 - [BigData/Spark] - spark linux install (Master / Worker)
2020/04/06 - [BigData/Spark] - spark sql
2020/04/06 - [BigData/Spark] - Spark 모니터링
2020/04/06 - [BigData/Spark] - Spark Dataset
2020/04/06 - [BigData/Spark] - Spark Dataset 데이터 조회 예제
2020/04/06 - [BigData/Spark] - Dataset DataFrame Convert
2020/04/06 - [BigData/Spark] - Spark submit
Apache Spark Dataset 예제
Dataset 을 기본 Dataset<Row> 로도 사용해보며 Java Bean 으로도 컨버팅 하여 사용해보도록 한다.
기본적인 테이블 조회 및 조회된 Dataset 을 조작 하여 그룹핑 및 필터링등에대한 예제
import com.grp.code.GrpConst;
import com.grp.domain.entity.ScenarioInformation;
import lombok.extern.slf4j.Slf4j;
import org.apache.spark.sql.*;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import java.util.List;
import static org.apache.spark.sql.functions.sum;
/**
* <pre>
* Description :
*
*
* </pre>
*
* @author skan
* @version Copyright (C) 2020 by skan. All right reserved.
* @since 2020-02-13
*/
@Slf4j
class SparkSqlTest {
private SparkSql sparkSql;
@BeforeEach
void init() {
sparkSql = new SparkSql();
}
@Test
void encoder() {
String scenarioId = "1";
Encoder<ScenarioInformation> scenarioInformationEncoder = Encoders.bean(ScenarioInformation.class);
Dataset<Row> rowDataset = sparkSql.selectSql(String.format("SELECT * FROM SCENARIO_INFO WHERE SCENARIO_ID = %s", scenarioId));
rowDataset.show();
List<ScenarioInformation> scenDataSet = rowDataset.as(scenarioInformationEncoder).collectAsList();
scenDataSet.forEach(scenarioInformation -> {
log.debug("SCENARIO_INFO campaign id = {} scenario id = {}", scenarioInformation.getCAMPAIGN_ID(), scenarioInformation.getSCENARIO_ID());
});
}
@Test
void adProductSelect() {
String version = "2020.1Q";
String productId = "11";
Dataset<Row> rowDataset = sparkSql.adProductSelect("PRODUCT_DETAIL_INFO", version, productId);
rowDataset.show();
}
@Test
void selectTable() {
String version = "2020.1Q";
String sql = String.format("select * from PRODUCT_DETAIL_INFO where VERSION = '%s'", version);
Dataset<Row> rowDataset = sparkSql.selectSql(sql);
rowDataset.show();
}
@Test
void groupByTest(){
SparkSql sparkSql = new SparkSql();
String scenarioId = "1";
Dataset<Row> step11Dataset = sparkSql.selectSql(String.format("SELECT * FROM `SCENARIO_SUM_INFO` WHERE SCENARIO_ID = %s ", scenarioId));
step11Dataset
.select("UV_1","UV_2","UV_3").show();
step11Dataset
.select("SCENARIO_ID","UV_1","UV_2","UV_3")
.groupBy("SCENARIO_ID")
.agg(sum(new Column("UV_1")).as("UV_1"),sum(new Column("UV_2")).as("UV_2"),sum(new Column("UV_3")).as("UV_3"))
.show();
Dataset<Row> scenarioProductInformationDataset = sparkSql
.selectSql(String.format("SELECT * FROM SCENARIO_PRODUCT_INFO WHERE SCENARIO_ID = %s", scenarioId));
scenarioProductInformationDataset.collectAsList().forEach(t -> {
int mediaId = t.getAs("MEDIA_ID");
Dataset<Row> step10Dataset = sparkSql.selectSql(String.format("SELECT * FROM `MEDIA_SUM_INFO` WHERE SCENARIO_ID = %s AND MEDIA_ID =%s", scenarioId, mediaId));
step10Dataset
.groupBy("SCENARIO_ID","MEDIA_ID")
.agg(sum("UV_1").as("UV_1"))
//.select("UV_1")
.show();
;
});
}
}
SparkSql.java 예제.
연동할때 마다 SparkSession을 계속 생성하고 DB 설정을 하는 반복적인 작업을 줄이기 위해 사용함.
package com.grp.calculate.spark;
import lombok.extern.slf4j.Slf4j;
import org.apache.spark.sql.*;
import com.grp.calculate.DataBaseConstants;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
/**
* <pre>
* Description :
*
*
* </pre>
*
* @author skan
* @version Copyright (C) 2020 by SKAN. All right reserved.
* @since 2020-02-12
*/
@Slf4j
public class SparkSql {
{
System.setProperty("hadoop.home.dir", "D:\\spark-data");
}
private static SparkSession sparkSession;
private SparkSession createSession() {
// session 설정.
synchronized (SparkSql.class) {
if (sparkSession == null) {
SparkSql.sparkSession = SparkSession
.builder()
.master("local[*]")
.appName("Java Spark SQL basic example")
.config("spark.sql.shuffle.partitions", 6)
.getOrCreate();
}
}
return sparkSession;
}
;
/**
* 테이블 전체 데이터 조회
*
* @param table
* @return
*/
public Dataset<Row> selectTable(String table) {
// session 설정.
SparkSession spark = SparkSessionPool.getSparkSession();
// db 및 테이블 설정
String url = DataBaseConstants.URL;
Dataset<Row> tableRowDataset = spark
.read()
.format("jdbc")
.option("driver", "com.mysql.cj.jdbc.Driver")
.option("url", url)
.option("dbtable", table)
.option("user", DataBaseConstants.USER_ID)
.option("password", DataBaseConstants.PASSWORD)
.load();
return tableRowDataset;
}
/**
* sql 문으로 filtering 된 데이터셋 조회
*
* @param sql
* @param sql
* @return
*/
public Dataset<Row> selectSql(String sql) {
// session 설정.
SparkSession spark = SparkSessionPool.getSparkSession();
// db 및 테이블 설정
String url = DataBaseConstants.URL;
return spark
.read()
.format("jdbc")
.option("driver", "com.mysql.cj.jdbc.Driver")
.option("url", url)
.option("user", DataBaseConstants.USER_ID)
.option("password", DataBaseConstants.PASSWORD)
.option("query", sql)
.load();
}
/**
* 광고 상품 원본 데이터 조회
*
* @return
*/
public Dataset<Row> adProductSelect(String tableName, String version, String productId) {
Dataset<Row> rowDataset = this.selectTable(tableName);
rowDataset = rowDataset.select("*")
.where(new Column("VERSION").isin(version).and(new Column("PRODUCT_ID").isin(productId)));
return rowDataset;
}
/**
*
* @param data
* @param tableName
* @param saveMode @link SaveMode
* @param <T>
*/
public <T> void insert(Dataset<T> data, String tableName, SaveMode saveMode) {
Properties properties = new Properties();
properties.put("user", DataBaseConstants.USER_ID);
properties.put("password", DataBaseConstants.PASSWORD);
String url = DataBaseConstants.URL;
data
.write()
.mode(saveMode)
.jdbc(url, tableName, properties);
}
}
반응형
'BigData > Spark' 카테고리의 다른 글
Spark submit (0) | 2020.04.06 |
---|---|
Dataset DataFrame Convert (0) | 2020.04.06 |
Spark Dataset (0) | 2020.04.06 |
Spark 모니터링 (0) | 2020.04.06 |
spark sql (0) | 2020.04.06 |