MySQL 개요
- 오픈 소스 기반의 관계형 데이터베이스
- 데이터 읽기 쓰기 등에서 높은 성능 제공
- MVCC(Multi Version Concurrency Control) 기술을 통해 트랜잭션 간 충돌을 방지하고 읽기 성능 향상
- 자동으로 데드락을 감지하여 강제 종료하며 완료되지 못한 트랜잭션에 대한 복구 작업도 자동으로 수행
- 인덱스 및 다양한 튜닝 옵션 등을 통해 성능 향상 도모할 수 있음
- 트랜잭션 ACID 원칙 준수
- master-slave 구조를 통한 데이터 복제 및 replication 지원
- 대규모 트랜잭션 지원이 필요한 서비스에 적합
- 5.5 버전 이후부터는 InnoDB 엔진이 기본 엔진
- 5.5 버전 이전에는 MyISAM 엔진이 디폴트 엔진
JDBC, JPA는 non-blocking 지원 불가
- JDBC는 동기 blocking I/O 기반으로 설계됨
- 소켓에 대한 연결과 쿼리 실행 모두 동기 blocking으로 동작
- JPA 또한 JDBC 기반이기 때문에 비동기 non-blocking 지원 불가능
- 이에 따라 새로운 드라이버인 R2DBC를 Pivotal 사에서 2017년에 개발
R2DBC (Reactive Relational Database Connectivity)
- 비동기 non-blocking 관계형 데이터베이스 드라이버
- Reactive Streams 스펙을 제공하며 Project Reactor 기반으로 구현
1. R2DBC가 지원하는 데이터베이스
1.1 공식적으로 지원하는 데이터베이스
- r2dbc-h2
- r2dbc-mssql
- r2dbc-pool: Reactor pool로 커넥션 풀 제공
1.2 벤더사에서 지원하는 데이터베이스
- oracle-r2dbc
- r2dbc-mariadb
- r2dbc-postgresql
1.3 커뮤니티에서 지원하는 데이터베이스
- r2dbc-mysql
- 메인테이너가 2020년 이후로 업데이트를 안 하고 있음
- 이에 따라 https://github.com/asyncer-io/r2dbc-mysql로 포크 진행
- r2dbc-spi 1.0.0.RELEASE 지원
R2DBC MySQL 구조
- r2dbc-spi와 Reactor Netty 기반
- Reactor Netty를 이용하여 r2dbc-spi 스펙을 구현
- Reactor Netty client로 성능과 확장성 모두 제공
- r2dbc-spi 스펙을 구현하여 여러 데이터베이스 시스템과 호환
R2DBC SPI (Service Provider Interface)
- 리액티브 스트림을 통해 비동기적인 데이터베이스 접근을 지원하는 Java 기반의 API
- 다음과 같은 스펙을 지원
- Connection, ConnectionFactory 등 DB Connection 스펙
- R2dbcException, R2dbcTimeoutException, R2dbcBadGrammarException 등의 Exception 스펙
- Result, Row, RowMetadata 등 result 스펙
- Statement 등 statement 스펙
- etc.

1. R2DBC SPI Connection
- Closable을 구현하여 close 메서드로 conneciton을 닫을 수 있음 (try-with-resources 가능)
- ConnectionMetadata를 제공하여 db의 version과 productName 제공
- createStatement 메서드를 통해 sql을 넘기고 Statement 생성
- 트랜잭션 관련 기능 제공

2. R2DBC SPI ConnectionFactory
- DB 커넥션을 생성하는 팩토리
- ConnectionFactoryMetadata를 통해 name과 같은 ConnectionFactory 정보 제공

3. R2DBC SPI Statement
- Statement는 Connection으로부터 createStatement를 통해 생성
- bind: sql에 파라미터를 바인딩
- add: 이전까지 진행한 바인딩을 저장하고 새로운 바인딩 생성
- execute: 생성된 바인딩 수만큼 쿼리를 실행하고 Publisher로 반환

R2DBC MySqlConnection 구성요소
- Connection을 구현한 MySqlConnection
- ConnectionFactory를 구현한 MySqlConnectionFactory
- MySqlConnectionFactory를 생성할 때 필요한 MySqlConnectionConfiguration
- ConnectionMetadata를 구현한 MySqlConnectionMetadata
- Statement를 구현한 MySqlStatement
1. DB 연결 및 쿼리 실행 방법
- MySqlConnectionConfiguration 정보를 기반으로 MySqlConnectionFactory 생성
- MySqlConnectionFactory로 MySqlConnection 생성
- MySqlConnection으로 MySqlStatement 생성
- MySqlConnection으로 transaction start, rollback, commit
// MySQL 연결 설정 구성 | |
MySqlConnectionConfiguration configuration = MySqlConnectionConfiguration.builder() | |
.host("localhost") // MySQL 서버 호스트 | |
.port(3306) // MySQL 서버 포트 (기본값: 3306) | |
.username("root") // MySQL 사용자 이름 | |
.password("password") // MySQL 사용자 비밀번호 | |
.database("test_db") // 사용할 데이터베이스 이름 | |
.build(); | |
// MySQL 연결 팩토리 생성 | |
MySqlConnectionFactory connectionFactory = MySqlConnectionFactory.from(configuration); | |
// 연결을 통해 트랜잭션 시작, 쿼리 실행, 커밋 또는 롤백 수행 | |
connectionFactory.create() | |
.flatMap(connection -> | |
connection.beginTransaction() // 트랜잭션 시작 | |
.then(connection.createStatement("INSERT INTO users (name, email) VALUES (?, ?)") | |
.bind(0, "John Doe") // 첫 번째 파라미터 바인딩 | |
.bind(1, "john.doe@example.com") // 두 번째 파라미터 바인딩 | |
.execute()) // 쿼리 실행 | |
.then(connection.commitTransaction()) // 성공 시 트랜잭션 커밋 | |
.onErrorResume(e -> | |
connection.rollbackTransaction() // 오류 발생 시 롤백 | |
.then(Mono.error(e)) // 오류 전달 | |
) | |
.then(connection.close()) // 작업 완료 후 연결 닫기 | |
).subscribe(); |
2. MySqlConnection 한계
- SQL 쿼리를 명시적으로 전달해야 하기 때문에 개발 편의성이 떨어지고 쿼리 재사용이 제한됨
- 반환된 결과를 수동으로 파싱 하거나 별도의 mapper를 만들어야 하므로 확장성이 제한됨
Spring Data R2DBC 구성요소
1. Entity
- DB에 하나의 Row와 매핑되는 클래스 (JPA의 Entity와 유사)
- Table, Row, Column에 필요한 데이터베이스 메타데이터를 어노테이션 등으로 제공
- R2dbcEntityTemplate, R2dbcRepository 등은 DB 요청을 보내고 그 결과를 Entity 형태로 반환
2. R2dbcEntityTemplate
- Spring data r2dbc의 추상화 클래스
- SQL 쿼리들을 문자열 형태로 넘기거나 결과를 처리하지 않더라도 메서드 체이닝을 통해 쿼리를 수행하고 결과를 entity 객체로 받을 수 있음
- ConnectionFactory를 제공하거나 R2dbcDialect, R2dbcConverter를 제공하여 생성자로 생성 가능
- RdbcEntityOperations를 구현
public class R2dbcEntityTemplate implements R2dbcEntityOperations, BeanFactoryAware, ApplicationContextAware { | |
private final DatabaseClient databaseClient; | |
private final ReactiveDataAccessStrategy dataAccessStrategy; | |
private final MappingContext<? extends RelationalPersistentEntity<?>, ? extends RelationalPersistentProperty> mappingContext; | |
private final SpelAwareProxyProjectionFactory projectionFactory; | |
@Nullable | |
private ReactiveEntityCallbacks entityCallbacks; | |
public R2dbcEntityTemplate(ConnectionFactory connectionFactory) { | |
Assert.notNull(connectionFactory, "ConnectionFactory must not be null"); | |
R2dbcDialect dialect = DialectResolver.getDialect(connectionFactory); | |
this.databaseClient = DatabaseClient.builder().connectionFactory(connectionFactory).bindMarkers(dialect.getBindMarkersFactory()).build(); | |
this.dataAccessStrategy = new DefaultReactiveDataAccessStrategy(dialect); | |
this.mappingContext = this.dataAccessStrategy.getConverter().getMappingContext(); | |
this.projectionFactory = new SpelAwareProxyProjectionFactory(); | |
} | |
public R2dbcEntityTemplate(DatabaseClient databaseClient, R2dbcDialect dialect) { | |
this((DatabaseClient)databaseClient, (ReactiveDataAccessStrategy)(new DefaultReactiveDataAccessStrategy(dialect))); | |
} | |
public R2dbcEntityTemplate(DatabaseClient databaseClient, R2dbcDialect dialect, R2dbcConverter converter) { | |
this((DatabaseClient)databaseClient, (ReactiveDataAccessStrategy)(new DefaultReactiveDataAccessStrategy(dialect, converter))); | |
} | |
// 중략 | |
} |
3. R2dbcEntityOperations
- DatabaseClient와 R2dbcConverter를 제공
- DatabaseClient는 ConnectionFactory를 wrapping 하여 결과를 Map이나 Integer로 반환
- R2dbcConverter는 주어진 Row를 Entity로 변환하는 converter
- R2dbcEntityTemplate에서는 DatabaseClient와 R2dbcConverter로 쿼리를 실행하고 결과를 entity로 반환

4. DatabaseClient
- 내부에 포함된 ConnectionFactory에 접근 가능
- SQL 메서드를 통해 GenericExecuteSpec 반환
- GenericExecuteSpec은 bind 메서드를 통해 파라미터를 sql에 추가
- fetch를 통해 FetchSpec 반환


5. FetchSpec
- RowFetchSpec과 UpdatedRowsFetchSpec 상속
- RowFetchSpec은 one, first 그리고 all 메서드를 제공하여 결과를 Mono나 Flux 형태로 제공
- UpdateedRowsFetchSpec은 쿼리의 영향을 받은 row 수를 Mono로 제공
* 여태까지 내용을 종합한 DatabaseClient 실행 flow *
- sql을 실행하여 GenericExecuteSpec 반환
- GenericExecuteSpec에 bind를 한 후 fetch를 호출하여 FetchSpec 반환
- rowsUpdated를 호출하여 영향받은 row 수 조회 혹은 all을 호출하여 결과 row 조회
- 하지만 여전히 직접 mapping을 해야 한다는 한계점 존재
import org.springframework.r2dbc.core.DatabaseClient; | |
import reactor.core.publisher.Mono; | |
import reactor.core.publisher.Flux; | |
public class ExampleService { | |
private final DatabaseClient databaseClient; | |
public ExampleService(DatabaseClient databaseClient) { | |
this.databaseClient = databaseClient; | |
} | |
public Mono<Integer> updateRow(Long id, String newName) { | |
// 1. SQL을 실행하여 GenericExecuteSpec 반환 | |
DatabaseClient.GenericExecuteSpec executeSpec = databaseClient.sql("UPDATE users SET name = :name WHERE id = :id"); | |
// 2. GenericExecuteSpec에 bind를 한 후 fetch를 호출하여 FetchSpec 반환 | |
DatabaseClient.GenericExecuteSpec boundSpec = executeSpec.bind("name", newName).bind("id", id); | |
DatabaseClient.GenericExecuteSpec.FetchSpec<Integer> fetchSpec = boundSpec.fetch(); | |
// 3. rowsUpdated를 호출하여 영향 받은 row 수 조회 | |
return fetchSpec.rowsUpdated(); | |
} | |
public Flux<User> findAllUsers() { | |
// 1. SQL을 실행하여 GenericExecuteSpec 반환 | |
DatabaseClient.GenericExecuteSpec executeSpec = databaseClient.sql("SELECT * FROM users"); | |
// 2. fetch를 호출하여 FetchSpec 반환 | |
DatabaseClient.GenericExecuteSpec.FetchSpec<User> fetchSpec = executeSpec.fetch(); | |
// 3. all을 호출하여 결과 row 조회 (직접 매핑해야한다는 한계 존재) | |
return fetchSpec.all() | |
.map(row -> new User(row.get("id", Long.class) | |
, row.get("name", String.class) | |
, row.get("email", String.class))); | |
} | |
} |
6. R2dbcConverter
- 앞서 직접 mapping 해야 하는 한계점을 극복하기 위해 제공되는 구성요소
- EntityReader와 EntityWriter 상속하는 인터페이스
- MappingR2dbcConverter가 해당 인터페이스 구현체
- 다양한 전략을 통해 Object를 DB의 row로(Outbound Converter), DB의 row를 Object로 변환(Inbound Converter)
- 커스텀 converter로 매핑
- Spring data의 object 매핑
- convention 기반의 매핑
- metadata 기반의 매핑
6.1 Custom Converter Mapping
- Configuration 통해 converter 등록
- target 클래스를 지원하는 converter 탐색
- 이를 위해 두 개의 converter 필요
- row를 Target 클래스로 변환하는 converter
- target 클래스를 OutboundRow로 변환하는 converter
@Getter | |
@Setter | |
@NoArgsConstructor | |
@AllArgsConstructor | |
public class MyCustomType { | |
private final String name; | |
private final int value; | |
} | |
public class RowToMyCustomTypeConverter implements Converter<Row, MyCustomType> { | |
@Override | |
public MyCustomType convert(Row row) { | |
String name = row.get("name", String.class); | |
int value = row.get("value", Integer.class); | |
return new MyCustomType(name, value); | |
} | |
} | |
@WritingConverter | |
public class MyCustomTypeToOutboundRowConverter implements Converter<MyCustomType, OutboundRow> { | |
@Override | |
public OutboundRow convert(MyCustomType source) { | |
OutboundRow row = new OutboundRow(); | |
row.put(SqlIdentifier.unquoted("name"), Parameter.from(source.getName())); | |
row.put(SqlIdentifier.unquoted("value"), Parameter.from(source.getValue())); | |
return row; | |
} | |
} | |
@Configuration | |
public class R2dbcConfig extends AbstractR2dbcConfiguration { | |
@Override | |
protected List<Object> getCustomConverters() { | |
return List.of(new RowToMyCustomTypeConverter(), new MyCustomTypeToOutboundRowConverter()); | |
} | |
} | |
// Inbound Converter 예시 | |
databaseClient.sql("SELECT name, value FROM my_table WHERE id = :id") | |
.bind("id", id) | |
.map((row, rowMetadata) -> row.get("name", MyCustomType.class)) // Custom Type으로 매핑 | |
.one() | |
.subscribe(myCustomType -> { | |
// myCustomType 사용 | |
}); | |
// Outbound Converter 예시 | |
MyCustomType myCustomType = new MyCustomType("exampleName", 123); | |
databaseClient.insert() | |
.into(MyCustomType.class) | |
.using(myCustomType) | |
.then() | |
.subscribe(); |
6.2 ReadConverter
- row를 source로 Entity를 target으로 하는 converter
- row로부터 name 혹은 index로 column에 접근할 수 있고 변환하고자 하는 type을 Class로 전달
@ReadingConverter | |
public class RowToMyCustomTypeConverter implements Converter<Row, MyCustomType> { | |
@Override | |
public MyCustomType convert(Row source) { | |
String name = source.get("name", String.class); | |
Integer value = source.get("value", Integer.class); | |
return new MyCustomType(name, value); | |
} | |
} |
6.3 WriteConverter
- entity를 source로 row를 target으로 하는 converter
- OutboundRow에 값을 추가
- key에는 column명, value에는 Parameter.from을 이용해 entity의 속성을 전달
- DefaultDatabaseClient에서 OutboundRow를 이용해 SQL 생성
@WritingConverter | |
public class MyCustomTypeWriteConverter implements Converter<MyCustomType, OutboundRow> { | |
@Override | |
public String convert(MyCustomType source) { | |
OutboundRow row = new OutboundRow(); | |
row.put("name", Parameter.from(source.getName()); | |
row.put("value", Parameter.from(source.getValue()); | |
return row; | |
} | |
} |
Spring data의 object mapping
1. Entity Mapping 전체적인 flow
- target의 custom converter가 존재할 경우 custom converter 사용
- 만약 지원하는 converter가 없을 경우 MappingR2dbcConverter는 다음 과정을 거쳐 row를 entity로 변환
- Object Creation: constructor, factory method 등을 이용해 row의 column들로 Object 생성
- Property Population: direct set, setter, with 메서드 등을 이용해 row의 column을 Object에 주입
2. Object Creation
다음 순서로 체크하여 해당하는 알고리즘으로 row를 object로 변환합니다.
- 정적 팩토리 메서드 확인 (@PersistenceCreator 어노테이션을 갖는 경우)
- 먼저, 클래스에 @PersistenceCreator 어노테이션이 있는 정적 팩토리 메서드가 있는지 확인
- 만약 해당 메서드가 정확히 하나 존재한다면, 해당 메서드를 사용하여 객체를 생성
- 생성자 확인
- 만약 정적 팩토리 메서드가 없거나 사용되지 않는 경우, 클래스의 생성자에서 @PersistenceCreator 어노테이션이 있는 생성자를 확인
- @PersistenceCreator 어노테이션이 있는 생성자가 존재한다면, 해당 생성자를 사용하여 객체를 생성
- @PersistenceCreator 어노테이션이 있는 생성자가 여러 개가 존재한다면 가장 마지막 PersistenceCreator가 붙은 생성자를 사용하지만 혼동을 피하기 위해 가급적 PersistenceCreator 생성자는 하나만 선언하는 것을 권장
- 기본 생성자 확인 (인자가 없는 생성자)
- @PersistenceCreator 생성자가 없다면, 인자가 없는 기본 생성자가 있는지 확인
- 기본 생성자가 존재하면, 이를 사용하여 객체를 생성
- 하나의 생성자만 있는 경우
- 기본 생성자도 없고, @PersistenceCreator로 지정된 생성자도 없는 경우, 클래스에 하나의 생성자만 존재한다면 해당 생성자를 사용하여 객체를 생성
- 위에 언급한 어떠한 조건도 충족하지 않을 경우 exception 발생
3. Property Population
- Object Creation 이후의 필드 할당: 객체가 생성된 후, 데이터베이스에서 읽어온 결과를 객체의 필드에 할당하며 이는 주로 리플렉션(Reflection)을 사용하거나, 세터(setter) 메서드를 통해 이루어짐
- 필드 접근 또는 세터 메서드 사용: Property Population은 필드에 직접 접근하거나, 엔티티 클래스에 정의된 세터 메서드를 통해 값을 할당할 수 있으며 해당 방법은 객체의 상태를 초기화하는 데 사용
- Null 값 처리: 데이터베이스에서 반환된 값이 null인 경우, 해당 필드도 null로 설정되며 해당 부분에서 적절한 null-safety 처리가 필요할 수 있음
- 타입 변환: 데이터베이스에서 반환된 값과 엔티티 클래스의 필드 타입이 일치하지 않는 경우, 타입 변환이 필요할 수 있는데 R2DBC는 이러한 변환을 위해 컨버터를 사용할 수 있으며, 필요한 경우 커스텀 컨버터를 정의하여 데이터 형식을 맞출 수 있음
- @Transient: 특정 필드를 데이터베이스와 매핑에서 제외하고 싶을 때, 해당 필드에 @Transient 어노테이션을 적용하여 Property Population 과정에서 무시되도록 설정할 수 있음
* 리플렉션을 사용하기 때문에 r2dbc에서는 property가 mutable 할 때만 property population 적용
4. Object Mapping 최적화하는 방법
- 객체를 가능한 한 immutable 하게 선언하고 모든 property를 인자로 갖는 생성자를 제공하여 property population이 발생하지 않도록 하는 것을 권장
- 생성자만 호출하기 때문에 30% 정도 성능 향상 효과
R2dbcEntityOperations
- FluentR2dbcOperations를 상속
- FluentR2dbcOperations는 여러 Operations를 상속
- ReactiveSelectOperation: SELECT 쿼리와 관련된 메서드 제공
- ReactiveInsertOperation: INSERT 쿼리와 관련된 메서드 제공
- ReactiveUpdateOperation: UPDATE 쿼리와 관련된 메서드 제공
- ReactiveDeleteOperation: DELETE 쿼리와 관련된 메서드 제공
1. ReactiveSelectOperation
- ReactiveSelectOperation 인터페이스는 메서드 체이닝 방식으로 쿼리를 점진적으로 구성할 수 있게 하며, select, from, as, matching, all 또는 one 등의 메서드를 통해 데이터베이스 쿼리를 구성하고 실행할 수 있음
- ReactiveSelectOperation의 select부터 시작하여 TerminatingSelect의 count, exists, first, one, all 등으로 종료
1.1 ReactiveSelectOperation 주요 메서드
- select(Class<T> entityClass): 조회할 엔티티 클래스의 타입을 지정하며 데이터베이스에서 조회한 결과는 해당 엔티티 클래스의 객체로 매핑됨
- from(String tableName): 조회할 테이블명을 지정하며 지정된 테이블에서 데이터를 조회
- as(Class<R> resultType): 조회 결과를 매핑할 대상 클래스의 타입을 지정하며 주로 조회한 결과를 다른 형태의 DTO로 변환하고자 할 때 사용
- matching(Query query): 쿼리 조건을 지정하며 해당 메서드는 조회 조건을 설정하며, 필터링, 정렬 등을 처리 가능
- all(): 쿼리를 실행하여 결과를 Flux로 반환하며 이는 조회된 모든 결과를 비동기 스트림으로 반환
- count(): 쿼리를 실행하여 조건에 맞는 row의 개수를 Mono<Long>으로 반환
- exists(): 쿼리를 실행하여 조건에 맞는 row 존재 여부를 Mono<Boolean>으로 반환
- first(): 쿼리를 실행하여 조건에 맞는 첫 번째 row를 Mono로 반환
- one(): 쿼리를 실행하여 결과를 Mono로 반환하며 하나의 결과만 필요할 때 사용
- 만약 여러 개의 결과가 반환되면 오류가 발생
1.2 ReactiveSelectOperation 메서드 체이닝 조합
- 다양한 메서드 체이닝 조합을 사용하여 쿼리를 구성할 수 있음
1.2.1 select -> from -> as -> matching -> 실행
- 조회할 엔티티 클래스와 테이블을 지정하고, 결과를 특정 클래스 타입으로 변환한 후, 쿼리 조건을 적용하여 실행하는 방법
Mono<MyDTO> result = r2dbcEntityTemplate | |
.select(User.class) // 조회할 엔티티 클래스 지정 | |
.from("users") // 테이블 이름 지정 | |
.as(MyDTO.class) // 결과를 매핑할 클래스 지정 | |
.matching(query(where("name").is("jaimemin"))) // 쿼리 조건 지정 | |
.one(); // 쿼리 실행 후 결과를 Mono로 반환 |
1.2.2 select -> from -> matching -> 실행
- 조회할 엔티티 클래스와 테이블을 지정하고, 쿼리 조건을 적용한 후 실행하는 방법
Flux<User> results = r2dbcEntityTemplate | |
.select(User.class) // 조회할 엔티티 클래스 지정 | |
.from("users") // 테이블 이름 지정 | |
.matching(query(where("age").greaterThan(30))) // 쿼리 조건 지정 | |
.all(); // 쿼리 실행 후 결과를 Flux로 반환 |
1.2.3 select -> as -> matching -> 실행
- 조회할 엔티티 클래스를 지정하고, 결과를 특정 클래스 타입으로 변환한 후, 쿼리 조건을 적용하여 실행하는 방법
Flux<MyDTO> results = r2dbcEntityTemplate | |
.select(User.class) // 조회할 엔티티 클래스 지정 | |
.as(MyDTO.class) // 결과를 매핑할 클래스 지정 | |
.matching(query(where("status").is("ACTIVE"))) // 쿼리 조건 지정 | |
.all(); // 쿼리 실행 후 결과를 Flux로 반환 |
1.2.4 select -> matching -> 실행
- 조회할 엔티티 클래스를 지정하고, 쿼리 조건을 적용하여 실행하는 방법
- 테이블명이나 결과 타입을 명시적으로 지정하지 않은 경우, 기본값 사용
Mono<User> result = r2dbcEntityTemplate | |
.select(User.class) // 조회할 엔티티 클래스 지정 | |
.matching(query(where("id").is(1L))) // 쿼리 조건 지정 | |
.one(); // 쿼리 실행 후 결과를 Mono로 반환 |
1.2.5 select -> 실행
- 기본적으로 엔티티 클래스를 지정하고, 조건 없이 모든 데이터를 조회하는 방법
Flux<User> results = r2dbcEntityTemplate | |
.select(User.class) // 조회할 엔티티 클래스 지정 | |
.all(); // 쿼리 실행 후 모든 결과를 Flux로 반환 |
2. ReactiveInsertOperation
- ReactiveInsertOperation의 insert부터 시작하여 TerminatingInsert의 using으로 종료
2.1 ReactiveInsertOperation 주요 메서드
- into(Class<T>): 삽입할 엔티티의 클래스를 지정
- into(String): 삽입할 테이블의 이름을 지정
- using(T entity): 삽입할 객체(엔티티)를 지정
- using(Publisher<? extends T> entities): 여러 엔티티 객체를 Publisher 형태로 지정하여 삽입 가능
- execute(): 삽입 작업을 실제로 실행하고 결과를 반환
2.2 ReactiveInsertOperation 메서드 체이닝 조합
- 대표적으로 두 가지 메서드 체이닝 조합을 사용하여 쿼리를 구성할 수 있음
2.2.1 insert -> into -> using
- 데이터베이스에 데이터를 삽입할 때, 엔티티 클래스와 테이블 이름을 명시적으로 지정하고, 그다음 삽입할 데이터를 제공하여 실행하는 방법
Mono<User> result = r2dbcEntityTemplate | |
.insert(User.class) // 삽입할 엔티티 클래스 지정 | |
.into("users") // 테이블 이름 지정 | |
.using(new User("jaimemin", "min", "jaime.min@example.com")) // 삽입할 엔티티 객체 제공 | |
.execute(); // 삽입 작업 실행 |
2.2.2 insert -> using
- 단순히 엔티티 클래스를 지정하고 데이터만 제공하여 삽입 작업을 수행하는 방법
- 해당 방식은 기본적으로 클래스가 매핑된 테이블에 데이터를 삽입할 때 사용
Mono<User> result = r2dbcEntityTemplate | |
.insert(User.class) // 삽입할 엔티티 클래스 지정 | |
.using(new User("jaime", "min", "jaime.min@example.com")) // 삽입할 엔티티 객체 제공 | |
.execute(); // 삽입 작업 실행 |
3. ReactiveUpdateOperation
- ReactiveUpdateOperation의 update부터 시작하여 TerminatingUpdate의 apply로 종료
3.1 ReactiveUpdateOperation 주요 메서드
- update(Class<T> entityClass): 업데이트할 엔티티 클래스의 타입을 지정하며 해당 엔티티 클래스는 데이터베이스의 특정 테이블과 매핑
- inTable(String tableName): 업데이트할 테이블의 이름을 명시적으로 지정하며 해당 메서드를 통해 엔티티 클래스와 다른 테이블 이름을 지정할 수 있음
- matching(Query query): 쿼리 조건을 지정하며 해당 메서드를 사용하여 SQL의 WHERE 절과 같은 조건을 설정할 수 있음
- apply(Update update): 실제로 업데이트할 필드와 값을 지정하며 해당 메서드는 SQL의 SET 절과 같은 역할을 수행
- execute(): 업데이트 작업을 실제로 실행하며 해당 메서드는 영향을 받은 행(row)의 수를 Mono<Integer> 또는 Flux<Integer> 형태로 반환
3.2 ReactiveUpdateOperation 메서드 체이닝 조합
- 다양한 메서드 체이닝 조합을 사용하여 쿼리를 구성할 수 있음
3.2.1 update -> inTable -> matching -> apply
- 엔티티 클래스와 테이블 이름을 명시적으로 지정하고, 쿼리 조건을 설정한 후, 업데이트할 필드와 값을 지정하여 실행하는 방법
Mono<Integer> result = r2dbcEntityTemplate | |
.update(User.class) // 업데이트할 엔티티 클래스 지정 | |
.inTable("users") // 테이블 이름 지정 | |
.matching(query(where("id").is(1L))) // 업데이트할 조건 지정 | |
.apply(Update.update("name", "gudetama")) // 업데이트할 필드와 값 지정 | |
.execute(); // 업데이트 작업 실행 |
3.2.2 update -> inTable -> apply
- 엔티티 클래스와 테이블 이름을 명시적으로 지정하고, 조건 없이 모든 레코드에 대해 업데이트할 필드와 값을 지정하여 실행하는 방법
Mono<Integer> result = r2dbcEntityTemplate | |
.update(User.class) // 업데이트할 엔티티 클래스 지정 | |
.inTable("users") // 테이블 이름 지정 | |
.apply(Update.update("status", "INACTIVE")) // 업데이트할 필드와 값 지정 | |
.execute(); // 업데이트 작업 실행 |
3.2.3 update -> matching -> apply
- 엔티티 클래스와 쿼리 조건을 지정하고, 업데이트할 필드와 값을 지정하여 실행하는 방법
- 테이블 이름을 명시적으로 지정하지 않고, 기본 매핑된 테이블에서 업데이트가 이루어짐
Mono<Integer> result = r2dbcEntityTemplate | |
.update(User.class) // 업데이트할 엔티티 클래스 지정 | |
.matching(query(where("email").is("john.doe@example.com"))) // 업데이트할 조건 지정 | |
.apply(Update.update("status", "ACTIVE")) // 업데이트할 필드와 값 지정 | |
.execute(); // 업데이트 작업 실행 |
3.2.4 update -> apply
- 엔티티 클래스를 지정하고, 조건 없이 모든 레코드에 대해 업데이트할 필드와 값을 지정하여 실행하는 가장 간단한 방법
- 테이블 이름이나 조건을 명시적으로 지정하지 않고, 기본적으로 매핑된 테이블에 업데이트가 이루어짐
Mono<Integer> result = r2dbcEntityTemplate | |
.update(User.class) // 업데이트할 엔티티 클래스 지정 | |
.apply(Update.update("status", "ACTIVE")) // 업데이트할 필드와 값 지정 | |
.execute(); // 업데이트 작업 실행 |
4. ReactiveDeleteOperation
- ReactiveDeleteOperation의 delete부터 시작하여 TerminatingDelete의 all로 종료
4.1 ReactiveDeleteOperation 주요 메서드
- delete(Class<T> entityClass): 삭제할 엔티티 클래스의 타입을 지정하며 해당 엔티티 클래스는 데이터베이스의 특정 테이블과 매핑됨
- from(String tableName): 삭제할 테이블의 이름을 명시적으로 지정하며 해당 메서드를 통해 엔티티 클래스와 다른 테이블 이름을 지정 가능
- matching(Query query): 쿼리 조건을 지정하며 해당 메서드를 사용하여 SQL의 WHERE 절과 같은 조건을 설정할 수 있음
- all(): 삭제 작업을 실행하고, 삭제된 레코드의 수를 반환
4.2 ReactiveDeleteOperation 메서드 체이닝 조합
- 다양한 메서드 체이닝 조합을 사용하여 쿼리를 구성할 수 있음
4.2.1 delete -> from -> matching -> all
- 엔티티 클래스와 테이블 이름을 명시적으로 지정하고, 쿼리 조건을 설정한 후, 해당 조건에 맞는 데이터를 삭제하는 방법
Mono<Integer> result = r2dbcEntityTemplate | |
.delete(User.class) // 삭제할 엔티티 클래스 지정 | |
.from("users") // 테이블 이름 지정 | |
.matching(query(where("age").lessThan(18))) // 삭제할 조건 지정 | |
.all(); // 삭제 작업 실행 후 결과 반환 |
4.2.2 delete -> from -> all
- 엔티티 클래스와 테이블 이름을 명시적으로 지정하고, 조건 없이 해당 테이블의 모든 데이터를 삭제하는 방법
Mono<Integer> result = r2dbcEntityTemplate | |
.delete(User.class) // 삭제할 엔티티 클래스 지정 | |
.from("users") // 테이블 이름 지정 | |
.all(); // 테이블의 모든 데이터를 삭제 |
4.2.3 delete -> matching -> all
- 엔티티 클래스와 쿼리 조건을 지정하고, 해당 조건에 맞는 데이터를 삭제하는 방법
- 테이블 이름을 명시적으로 지정하지 않고, 기본 매핑된 테이블에서 삭제 작업이 이루어짐
Mono<Integer> result = r2dbcEntityTemplate | |
.delete(User.class) // 삭제할 엔티티 클래스 지정 | |
.matching(query(where("status").is("INACTIVE"))) // 삭제할 조건 지정 | |
.all(); // 삭제 작업 실행 후 결과 반환 |
4.2.4 delete -> all
- 엔티티 클래스를 지정하고, 조건 없이 해당 클래스와 매핑된 테이블의 모든 데이터를 삭제하는 가장 간단한 방법
- 테이블 이름이나 조건을 명시적으로 지정하지 않음
Mono<Integer> result = r2dbcEntityTemplate | |
.delete(User.class) // 삭제할 엔티티 클래스 지정 | |
.all(); // 모든 데이터를 삭제 |
R2dbcRepository
- ReactiveSortingRepository와 ReactiveQueryByExampleExecutor를 상속한 interface
- SimpleR2dbcRepository가 reactor.core에서 제공하는 R2dbcRepository 구현체
- Query method, Query By Example, 그리고 Entity callback 제공

1. R2dbcRepository 등록
- R2dbcRepositoriesAutoConfiguration이 활성화되어 있을 경우 SpringBootApplication 기준으로 AutoScan

- 혹은 EnableR2dbcRepositories 통해 repository scan
- 만약 여러 r2dbcEntityTemplate이 존재하거나 여러 DB를 사용하는 경우 basePackages, entityOperationRef 등을 통해 다른 경로, 다른 entityTemplate 설정 가능
@Configuration | |
@EnableR2dbcRepositories( | |
basePackages = "com.tistory.jaimemin.r2dbc.spring.repository", | |
entityOperationsRef = "r2dbcEntityTemplate" | |
) | |
public class R2dbcConfig extends AbstractR2dbcConfiguration { | |
} |
2. ReactiveCrudRepository
- Spring data reactive에서는 CrudRepository의 Reactive 버전인 ReactiveCrudRepository 지원하며 해당 repository는 CRUD에 집중
- 모든 결괏값 그리고 일부 인자들이 Publisher 지원
2.1 ReactiveCrudRepository save 메서드
- 하나의 entity를 저장하거나
- entity Iterable을 저장하거나
- entity Publisher를 인자로 받고 저장
- saveAll은 @Transactional 어노테이션을 사용해서 각각의 save를 하나의 트랜잭션으로 묶고 concatMap Operator를 통해 save를 순차적으로 수행

2.2 ReactiveCrudRepository find 메서드
- id 기반으로 하나 혹은 여러 개의 항목을 탐색하거나 존재 여부 확인
- 모든 항목을 탐색하거나 모든 항목의 개수 확인

2.3 ReactiveCrudRepository delete 메서드
- id 기반으로 하나 혹은 여러 개의 항목을 제거하거나
- 하나 혹은 여러 개의 entity를 기반으로 id를 추출하여 제거하거나
- 모두 제거

3. ReactiveSortingRepository
- ReactiveCrudRepository를 상속
- Spring Data의 Sort를 기반으로 여러 항목 탐색
- Sort 객체는 여러 Order 객체를 포함하며 이를 기반으로 쿼리에 sort 옵션 제공

4. SimpleR2dbcRepository
- R2dbcRepository 구현체
- R2dbcEntityOperations를 기반으로 SQL 쿼리를 실행하고 결과를 Entity로 매핑
- 기본적으로 모든 메서드에 @Transaction(readOnly = true) 적용
- 트랜잭션의 읽기 전용 모드
- 데이터베이스에 대한 쓰기 작업(INSERT, UPDATE, DELETE)이 제한되며 이러한 작업을 시도할 경우 예외가 발생할 수 있음
- 데이터베이스가 특정한 잠금(lock)을 사용하지 않거나 캐시를 더 효과적으로 사용함에 따라 성능 향상 효과

4.1 SimpleR2dbcRepository save 메서드
- save 메서드는 entityOperations의 update를 이용
- saveAll 메서드는 concatMap을 이용하여 save 메서드를 순차적으로 실행
- save가 완료되고 complete 이벤트가 발생하면 다음 save 수행
- @Transactional 어노테이션으로 묶여있어 예외가 발생하여 throw 되면 롤백
- JPA처럼 @Id 필드가 null 혹은 0일 경우 신규 entity로 간주하여 insert
- 1 이상일 경우 기존에 존재하던 entity로 간주하여 update

4.2 SimpleR2dbcRepository find 메서드
- findById, existsById, count 모두 R2dbcEntityOperations에서 제공하는 단축 메서드 사용
- selectOne, exists, count
@Transactional( | |
readOnly = true | |
) | |
public class SimpleR2dbcRepository<T, ID> implements R2dbcRepository<T, ID> { | |
// 중략 | |
public Mono<T> findById(ID id) { | |
Assert.notNull(id, "Id must not be null!"); | |
return this.entityOperations.selectOne(this.getIdQuery(id), this.entity.getJavaType()); | |
} | |
// 중략 | |
public Mono<Boolean> existsById(ID id) { | |
Assert.notNull(id, "Id must not be null!"); | |
return this.entityOperations.exists(this.getIdQuery(id), this.entity.getJavaType()); | |
} | |
// 중략 | |
public Mono<Long> count() { | |
return this.entityOperations.count(Query.empty(), this.entity.getJavaType()); | |
} | |
// 후략 | |
} |
4.3 SimpleR2dbcRepository delete 메서드
- R2dbcEntityOperations에서 제공하는 단축 메서드 delete 사용
@Transactional( | |
readOnly = true | |
) | |
public class SimpleR2dbcRepository<T, ID> implements R2dbcRepository<T, ID> { | |
// 중략 | |
@Transactional | |
public Mono<Void> deleteById(ID id) { | |
Assert.notNull(id, "Id must not be null!"); | |
return this.entityOperations.delete(this.getIdQuery(id), this.entity.getJavaType()).then(); | |
} | |
@Transactional | |
public Mono<Void> deleteById(Publisher<ID> idPublisher) { | |
Assert.notNull(idPublisher, "The Id Publisher must not be null!"); | |
return Flux.from(idPublisher).buffer().filter((ids) -> { | |
return !ids.isEmpty(); | |
}).concatMap((ids) -> { | |
if (ids.isEmpty()) { | |
return Flux.empty(); | |
} else { | |
String idProperty = this.getIdProperty().getName(); | |
return this.entityOperations.delete(Query.query(Criteria.where(idProperty).in(ids)), this.entity.getJavaType()); | |
} | |
}).then(); | |
} | |
// 중략 | |
@Transactional | |
public Mono<Void> deleteAll(Iterable<? extends T> iterable) { | |
Assert.notNull(iterable, "The iterable of Id's must not be null!"); | |
return this.deleteAll((Publisher)Flux.fromIterable(iterable)); | |
} | |
// 후략 | |
} |
4.5 R2dbcRepository의 한계
- 기본적으로 CRUD를 수행할 수 있는 메서드를 제공하지만 join이나 집계와 관련된 함수들은 제공되지 않음
- 제공되지 않는 함수들의 기능을 수행하기 위해서는 JPA처럼 @Query 메서드를 작성해줘야 함
Query 메서드
- R2dbcRepsitory를 상속한 repository 인터페이스에 메서드 추가
- 메서드명 기반으로 Query 생성
- 조회, 삭제 지원
- @Query 어노테이션을 사용해서 복잡한 쿼리나 update문도 실행 가능
- JpaRepository를 떠올리면 이해하기 쉬움
1. 쿼리 메서드 시작 키워드
- <시작 키워드>..By 형태로 제공
키워드 | 설명 |
find read get query search stream |
find 쿼리를 실행하고 결과를 Publisher<T>로 반환 ex) findByUsername(String username) ex) readByEmail(String email) ex) getById(Long id) ex) queryByStatus(String status) ex) searchByTitle(String title) ex) streamByCategory(String category) |
exists | find exists 쿼리를 실행하고 결과를 Publisher<Boolean>로 반환 ex) existsByUsername(String username) |
count | find count 쿼리를 실행하고 결과를 Publisher<Integer>로 반환 ex) countByStatus(String status) |
delete remove |
delete 쿼리를 실행하고 Publisher<Void> 혹은 Publisher<Integer>로 삭제된 개수 반환 ex) removeById(Long id) |
2. 쿼리 메서드 제한 키워드
키워드 | 설명 |
First<N> Top<N> |
쿼리의 limit을 N으로 설정 find와 By 사이 어디에든 등장 가능 ex) findFirst3ByOrderByCreatedDateDesc() ex) findTop5OrderByRatingDesc() |
Distinct | distinct 기능 제공 find와 By 사이 어디에든 등장 가능 |
3. 쿼리 메서드 반환 타입
반환 타입 | 설명 |
Mono | Reactor에서 제공 0개 혹은 하나의 값을 반환하는 Publisher 만약 결과가 2개 이상일 경우 IncorrectResultSizeDataAccessException 발생 |
Flux | Reactor에서 제공 0개 이상의 값을 반환하는 Publisher 끝이 없는 수의 결과를 반환 가능 |
Single | RxJava에서 제공 무조건 1개의 값을 반환하는 Publisher 만약 결과가 2개 이상일 경우 IncorrectResultSizeDataAccessException 발생 만약 결과가 0개라면 NoSuchElementException 발생 |
Maybe | RxJava에서 제공 0개 혹은 하나의 값을 반환하는 Publisher 만약 결과가 2개 이상일 경우 IncorrectResultSizeDataAccessException 발생 |
Flowable | RxJava에서 제공 0개 이상의 값을 반환하는 Publisher 끝이 없는 수의 결과를 반환 가능 |
4. @Query 어노테이션
- 쿼리가 메서드명으로 전부 표현이 되지 않는 경우 사용
- 쿼리 메서드 예약어에서 지원되지 않는 문법을 사용하는 경우 사용
- 복잡한 쿼리를 작성하는 경우 사용
@Data | |
@Entity | |
public class User { | |
@Id | |
private Long id; | |
private String username; | |
} | |
@Data | |
@Entity | |
public class Order { | |
@Id | |
private Long id; | |
private Long userId; | |
private String productName; | |
} | |
@Repository | |
public interface UserRepository extends ReactiveSortingRepository<User, Long> { | |
@Query("SELECT u.*, o.* " + | |
"FROM user u " + | |
"JOIN order o ON u.id = o.user_id " + | |
"WHERE u.username = :username") | |
Flux<Tuple2<User, Order>> findUsersWithOrdersByUsername(@Param("username") String username); | |
} |
Transaction 적용 방법
1. @Transactional
- @Transactional을 사용하여 여러 쿼리를 묶어서 진행
@Data | |
@Entity | |
public class User { | |
@Id | |
private Long id; | |
private String username; | |
private String email; | |
} | |
@Data | |
@Entity | |
public class Order { | |
@Id | |
private Long id; | |
private Long userId; | |
private String productName; | |
private Integer quantity; | |
} | |
@Repository | |
public interface UserRepository extends ReactiveCrudRepository<User, Long> { | |
} | |
@Repository | |
public interface OrderRepository extends ReactiveCrudRepository<Order, Long> { | |
} | |
@Service | |
public class UserService { | |
private final UserRepository userRepository; | |
private final OrderRepository orderRepository; | |
public UserService(UserRepository userRepository, OrderRepository orderRepository) { | |
this.userRepository = userRepository; | |
this.orderRepository = orderRepository; | |
} | |
@Transactional | |
public Mono<Void> createUserAndOrder(User user, Order order) { | |
return userRepository.save(user) | |
.flatMap(savedUser -> { | |
order.setUserId(savedUser.getId()); | |
return orderRepository.save(order); | |
}) | |
.then(); | |
} | |
} |
2. TransactionalOperator
- @Transactional 대신 프로그래밍 방식으로 트랜잭션을 관리
- transactional 메서드를 통해 주어진 Flux 혹은 Mono를 transaction 안에서 실행
- ex) Flux를 바로 반환하지 않고 transactionalOperator의 transactional로 wrapping 하여 전달
- ex) execute 메서드를 통해 TransactionCallback 형태로 실행
@Configuration | |
public class R2dbcConfig { | |
@Bean | |
public R2dbcTransactionManager transactionManager(ConnectionFactory connectionFactory) { | |
return new R2dbcTransactionManager(connectionFactory); | |
} | |
@Bean | |
public TransactionalOperator transactionalOperator(R2dbcTransactionManager transactionManager) { | |
return TransactionalOperator.create(transactionManager); | |
} | |
} | |
@Service | |
@RequiredArgsConstructor | |
public class UserService { | |
private final UserRepository userRepository; | |
private final OrderRepository orderRepository; | |
private final TransactionalOperator transactionalOperator; | |
public Mono<Void> createUserAndOrder(User user, Order order) { | |
return transactionalOperator.transactional( | |
userRepository.save(user) | |
.flatMap(savedUser -> { | |
order.setUserId(savedUser.getId()); | |
return orderRepository.save(order); | |
}) | |
.then() | |
); | |
} | |
public Mono<Void> updateUserEmailInTransaction(Long userId, String newEmail) { | |
return transactionalOperator.execute(status -> | |
userRepository.findById(userId) | |
.flatMap(user -> { | |
user.setEmail(newEmail); | |
return userRepository.save(user); | |
}) | |
.then() | |
); | |
} | |
} |
참고하면 좋은 글
R2DBC의 한계와 그 사용법
Webflux 기반으로 React 한 코드 — 비동기적, 넌 블럭킹-를 애플리케이션 전체에 적용하기 위해서 DB를 다루는 영역 또한 React 하게 처리되어야 한다.
yongkyu-jang.medium.com
참고
패스트 캠퍼스 - Spring Webflux 완전 정복 : 코루틴부터 리액티브 MSA 프로젝트까지
'Spring > Spring WebFlux' 카테고리의 다른 글
[Spring WebFlux] Server Sent Event (0) | 2024.08.14 |
---|