관리 메뉴

IT.FARMER

Spring R2DBC CRUD 예제 본문

Spring/Spring WebFlux

Spring R2DBC CRUD 예제

아이티.파머 2021. 4. 29. 20:00
반응형

Spring R2DBC CRUD 예제

WebFlux 를 사용한다면, 반쪽짜리 NonBlocking 서버가 되지 않기 위해선 persisent layer 의 영역도 reactive 하게 변경해주어야 한다. 다행이 spring-r2dbc-data 와 mysql 드라이버를 지원함으로 spring r2dbc 기반 Repository를 구현 및 테스트 할수 있다.

pom.xml 에 다음 라이브러리를 넣어 준다.

 


<!-- Spring r2dbc data -->
<dependency>
    <artifactId>spring-boot-starter-data-r2dbc</artifactId>
    <groupId>org.springframework.boot</groupId>
</dependency>

<!-- r2dbc mysql driver -->
<dependency>
    <groupId>dev.miku</groupId>
    <artifactId>r2dbc-mysql</artifactId>
    <scope>runtime</scope>
</dependency>

 

<참고> 

데이상 dev.miku 는 동작하지 않는다, 메인터런스를 포기하고  com.github.jasync-sql로 옮겼다.    mysql 의 r2dbc library 는 다음 라이브러리를 참고한다. 

 

<!-- r2dbc mysql library -->
<dependency>
    <groupId>com.github.jasync-sql</groupId>
    <artifactId>jasync-r2dbc-mysql</artifactId>
    <version>2.1.23</version>
</dependency>

 

 

 

r2dbc의 datasource를 application 에서 간단하게 설정 할수도 있고 java configuration 으로 수동으로 설정 할 수도 있다. 보통 configuration을 넣는 경우는 multiple data source 를 사용할때 이용할 수 있다.

application.yml 을 통한 r2dbc 설정

--
spring:
  profiles: test
  r2dbc:
    url: r2dbc:mysql://127.0.0.1:3306/my_db?useUnicode=true&characterEncoding=utf8&useSSL=false&serverTimezone=Asia/Seoul
    username: root
    password: mydb****
    pool:
      max-size: 100
      validation-query: SELECT 1

# log 설정 
logging.level.org.springframework.r2dbc=DEBUG

Bean 을 새로 생성해서 사용하고자 할경우 AbstractR2dbcConfiguration 을 상속받아 구현 한다.

java configuration 을 사용한 r2dbc 설정

import io.r2dbc.spi.Connection;
import io.r2dbc.spi.ConnectionFactories;
import io.r2dbc.spi.ConnectionFactory;
import io.r2dbc.spi.ConnectionFactoryMetadata;
import org.reactivestreams.Publisher;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.data.r2dbc.config.AbstractR2dbcConfiguration;

import javax.annotation.Nonnull;

/**
 * <pre>
 * Description :
 *
 *
 * </pre>
 *
 * @author skan
 * @version Copyright (C) 2021 by CJENM|MezzoMedia. All right reserved.
 * @since 2021-02-18
 */
@Configuration
@EnableTransactionManagement
public class DataSourceR2DBCConfig extends AbstractR2dbcConfiguration {

    @Nonnull
    @Override
    @Bean("customR2ConnectionFactory")
    @Primary
    public ConnectionFactory connectionFactory() {
        return ConnectionFactories.get("r2dbc:mysql://root:password_insert@127.0.0.1:3306/my_db?useUnicode=true&characterEncoding=utf8&useSSL=false&serverTimezone=Asia/Seoul");
    }

        @Bean
    ReactiveTransactionManager transactionManager(ConnectionFactory connectionFactory) {
        return new R2dbcTransactionManager(connectionFactory);
    }
}

JPA를 사용한적이 잇는 경험이 있다면 r2dbc-data 를 접하는데 큰 어려움은 없어 보인다. 다른 persistence layer 를 볼때도 ConnectionFaceory 와 DataSource를 살펴 보게 되는데 여기서도 r2dbc에서도 두가지를 주입받아 사용하게 된다.

  • ConnectionFactory : 커넥션 팩토리 생성 클래스
  • DatabaseClient :
  • R2dbcEntityTemplate을 사용해도됨, Spring Data R2dbc의 중심 진입점 이며, 데이터 쿼리, 삽입 업데이트 및 삭제와 같은 일반적인 임시 사례를 위한 엔티티 지향 메서드와 인터페이스를 제공 한다.
  • POJO 간의 통합된 매핑을 상숑하여 일반적인 R2dbc작업을 수행할 때 생산성을 높이는 엔티티바인딩 작업의 중심 클레스
private final ConnectionFactory connectionFactory;
private final DatabaseClient databaseClient;
private final R2dbcEntityTemplate r2dbcEntityTemplate;

public TargetR2Repository(ConnectionFactory connectionFactory, DatabaseClient databaseClient, R2dbcEntityTemplate r2dbcEntityTemplate) {
    this.connectionFactory = connectionFactory;
    this.databaseClient = databaseClient;
    this.r2dbcEntityTemplate = r2dbcEntityTemplate;
}

// 인스턴스 생성시 참고 -> 다음과 같이 생성자에 팩토리와 클라이언트를 넣어 사용함.
DatabaseClient.create(connectionFactory)
R2dbcEntityTemplate(datadaseClient)

다음은 예제를 통해 어떻게 CRUD가 이루어 지는지 알아보자

일반 사용 , PoJo 기반 으로 사용

Pojo 기반으로 사용시 .as(UserConnection.class) 를 활성화 한다.

// Select
public Flux<Map<String, Object>> findByUserConnections(String userId) {
        return databaseClient.
                execute("select * from user_connection where user_id = :user_id")
                .bind("user_id", userId)
                //.as(UserConnection.class)
                .fetch()
                .all()
                .switchIfEmpty(Mono.defer(Mono::empty))
                .doOnError(Throwable::printStackTrace)
                ;
    }

// Update
public Mono<Void> update(String name, String id) {
        return databaseClient
                .execute("update users set user_name = :user_name where user_id = :user_id")
                .bind("user_name", name)
                .bind("user_id", id)
                .then()
                ;
    }

// insert pojo
public void insertCriteria(String name) {
        databaseClient
                .insert()
                .into(Users.class).using(new Users("4", name))
                .then();
        ;
    }

// update pojo
public Mono<Void> update(String name, String id) {
    return databaseClient
            .execute("update users set user_name = :user_name where user_id = :user_id")
            .bind("user_name", name)
            .bind("user_id", id)
            .then()
            ;
}

// update criteria
public Mono<Void> updateCriteria(String id, String name) {
    return databaseClient
            .update()
            .table(Users.class).using(new Users(id, name))
            .then()

Transation 사용

/**
 * 프로그램 트랜잭션
 * @param id
 * @param name
 * @return
 */
public Mono<Void> insertProgramTransaction(@NonNull String id, @Nullable String name) {

    R2dbcTransactionManager tm = new R2dbcTransactionManager(connectionFactory);
    TransactionalOperator operator = TransactionalOperator.create(tm);

    return databaseClient.execute("select * from users where user_id =:user_id")
            .bind("user_id", id)
            .fetch()
            .one()
            .then(databaseClient.execute("update users set user_name = :user_name where user_id = :user_id")
                    .bind("user_id", id)
                    .bind("user_name", name)
                    .fetch().rowsUpdated()
            )
            .then(
                    databaseClient.execute("INSERT INTO users (user_id, user_name) VALUES(:id, :name)")
                            .bind("id", "joe")
                            .bind("name", "Joe")
                            .fetch().rowsUpdated()
            )
            .then()
            .as(operator::transactional)

            ;
}

선언적 트렌젝션 사용 @Transaction

@Configrration으로 지정후 사용한다.

@Configuration
@EnableTransactionManagement
public class Config extends AbstractR2dbcConfiguration {

    @Override
    public ConnectionFactory connectionFactory() {
        return null;
    }

    @Bean
    ReactiveTransactionManager transactionManager(ConnectionFactory connectionFactory) {
        return new R2dbcTransactionManager(connectionFactory);
    }
}

TransactionalOperator 를 사용하지 않고 선언적 트랜잭션으로 쉽게 가능하다.

@Transactional
public Mono<Integer> declarativeInsertTransaction(@NonNull String id, @Nullable String name) {

  return
          databaseClient.execute("update users set user_name = :user_name where user_id = :user_id")
                  .bind("user_id", id)
                  .bind("user_name", name)
                  .fetch().rowsUpdated()

                  .then(
                          databaseClient.execute("INSERT INTO users (user_id, user_name) VALUES(:id, :name)")
                                  .bind("id", id)
                                  .bind("name", name)
                                  .fetch().rowsUpdated()
                  )
          ;
}

R2dbc Repository

기존에 사용하던 Spring Data JPA 와 사용 방법은 크게 다르지 않다. Entity Class를 만들어 주고

Identity (@Id, @EmbededId) 를 설정한뒤 ReactiveCrudRepository를 상속하여 사용한다.

.

public class Person {

  @Id
  private Long id;
  private String firstname;
  private String lastname;

  // … getters and setters omitted
}

public interface PersonRepository extends ReactiveCrudRepository<Person, Long> {

  // additional custom query methods go here
}

도메인 저장소는 ReactiveCrudRepository 를 확장하여 사용함으로 엔티티에 액서스 할수 있는 반응형 CRUD 작업을 제공한다.

Persion Entity 에 대한 페이징 엑세스 테스트

@ExtendWith(SpringExtension.class)
@ContextConfiguration
class PersonRepositoryTests {

  @Autowired
  PersonRepository repository;

  @Test
  void readsAllEntitiesCorrectly() {

    repository.findAll()
      .as(StepVerifier::create)
      .expectNextCount(1)
      .verifyComplete();
  }

  @Test
  void readsEntitiesByNameCorrectly() {

    repository.findByFirstname("Hello World")
      .as(StepVerifier::create)
      .expectNextCount(1)
      .verifyComplete();
  }
}

앞의 예제는 테스트 케이스에 주석 기반 종속성 주입을 수행하는 Spring의 단위 테스트 지원으로 애플리케이션 컨텍스트를 생성한다.

테스트 방법으로는 저장소를 사용하여 데이터 베이스를 쿼리한다. StepVeifier는 결과에 대한 우리의 기대치를 확인 하기 위한 테스트 도구이다.

  • execute : 실행할 SQL 문
  • fetch : 쿼리 실행 결과를 읽기
  • all : 모든 쿼리 생핼 결과를 가져오기
  • one : 결과 하나만 가져오기
  •  
반응형

'Spring > Spring WebFlux' 카테고리의 다른 글

spring webflux error 처리  (0) 2020.06.22
webflux null, empty 처리  (0) 2020.05.26