반응형
2020/04/03 - [BigData/Spark] - spark linux install (Master / Worker)
2020/04/06 - [BigData/Spark] - spark sql
2020/04/06 - [BigData/Spark] - Spark 모니터링
2020/04/06 - [BigData/Spark] - Spark Dataset
2020/04/06 - [BigData/Spark] - Spark Dataset 데이터 조회 예제
2020/04/06 - [BigData/Spark] - Dataset DataFrame Convert
2020/04/06 - [BigData/Spark] - Spark submit
Spark SQL
Spark SQL 은 구조화된 데이터 작업을 위한 모듈 이다.
Spark Sql을 사용하면 Sql 또는 익숙한 DataFrameAPI를 사용하여 Spark 프로그램 내에서 구조화된 데이터를 쿼리 할 수 있다.
Spark SQL의 한가지 용도는 SQL 쿼리를 실행 하는 것이다. SparkSQL을 이용하여 기존 Hive 설치에서 데이터를 읽을수도 있고, 다른 프로그래밍 언어에서 SQL을 실행하여 결과를 Dataset/DataFrame 으로 반환하여 사용한다.
JDBC또는 ODBC를 통해 DB에 연결 할 수 있다.
Spark java example
package com.grp.calculate.spark;
import lombok.extern.slf4j.Slf4j;
import org.apache.spark.sql.*;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
/**
* <pre>
* Description :
*
*
* </pre>
*
* @author skan
* @version Copyright (C) 2020 by skan. All right reserved.
* @since 2020-02-12
*/
@Slf4j
public class SparkSql {
{
System.setProperty("hadoop.home.dir", "D:\\spark-data");
}
private static SparkSession sparkSession;
private SparkSession createSession() {
// session 설정.
synchronized (SparkSql.class) {
if (sparkSession == null) {
SparkSql.sparkSession = SparkSession
.builder()
.master("local[*]")
.appName("Java Spark SQL basic example")
.config("spark.sql.shuffle.partitions", 6)
.getOrCreate();
}
}
return sparkSession;
}
;
/**
* 테이블 전체 데이터 조회
*
* @param table
* @return
*/
public Dataset<Row> selectTable(String table) {
// session 설정.
SparkSession spark = SparkSessionPool.getSparkSession();
// db 및 테이블 설정
String url = "jdbc:mysql://IP:3306/grp?characterEncoding=UTF-8&serverTimezone=UTC";
Dataset<Row> tableRowDataset = spark
.read()
.format("jdbc")
.option("driver", "com.mysql.cj.jdbc.Driver")
.option("url", url)
.option("dbtable", table)
.option("user", "user_id")
.option("password", "password")
.load();
return tableRowDataset;
}
/**
* sql 문으로 filtering 된 데이터셋 조회
*
* @param sql
* @param sql
* @return
*/
public Dataset<Row> selectSql(String sql) {
// session 설정.
SparkSession spark = SparkSessionPool.getSparkSession();
// db 및 테이블 설정
String url = "jdbc:mysql://IP:3306/grp?characterEncoding=UTF-8&serverTimezone=UTC";
return spark
.read()
.format("jdbc")
.option("driver", "com.mysql.cj.jdbc.Driver")
.option("url", url)
.option("user", "user_id")
.option("password", "password")
.option("query", sql)
.load();
}
/**
* 광고 상품 원본 데이터 조회
*
* @return
*/
public Dataset<Row> adProductSelect(String tableName, String version, String productId) {
Dataset<Row> rowDataset = this.selectTable(tableName);
rowDataset = rowDataset.select("*")
.where(new Column("VERSION").isin(version).and(new Column("PRODUCT_ID").isin(productId)));
return rowDataset;
}
/**
*
* @param data
* @param tableName
* @param saveMode @link SaveMode
* @param <T>
*/
public <T> void insert(Dataset<T> data, String tableName, SaveMode saveMode) {
Properties properties = new Properties();
properties.put("user", "user_id");
properties.put("password", "password");
String url = "jdbc:mysql://IP:3306/grp?characterEncoding=UTF-8&serverTimezone=UTC";
data
.write()
.mode(saveMode)
.jdbc(url, tableName, properties);
}
/**
* List 파일을 Dataset 으로 변형 하기
* @param list
* @param tClass
* @param <T>
* @param <D>
* @return
*/
public <T,D> Dataset<T> createDataSet (List<T> list, Class<T> tClass) {
SparkSession sparkSession = this.createSession();
Encoder<T> personEncoder = Encoders.bean(tClass);
return sparkSession.createDataset(list,personEncoder);
}
}
반응형
'BigData > Spark' 카테고리의 다른 글
Spark Dataset (0) | 2020.04.06 |
---|---|
Spark 모니터링 (0) | 2020.04.06 |
spark linux install (Master / Worker) (0) | 2020.04.03 |
Spark ML pipeline (0) | 2017.06.16 |
Spark 설치 및 실습 (0) | 2017.06.16 |