개요

springboot 환경에서 몽고디비 벌크인서트를 해본다.

 

코드

@Component
class MongoDbInitializer(
    private val mongoTemplate: MongoTemplate
) : CommandLineRunner {

    private val log = LoggerFactory.getLogger(javaClass)

    override fun run(vararg args: String?) {
        this.bulkInsertUsers()
    }

    private fun bulkInsertUsers() {
        userRepository.deleteAll()

        val users = (1..31234).map { sequence ->
            User(
                userUniqueId = UUID.randomUUID().toString(),
                name = "홍길동-${UUID.randomUUID().toString().substring(1, 5)}",
                sequence = sequence.toLong()
            )
        }

		// (1) User 타입에 대한 중간 에러를 무시한 벌크모드 설정
        val bulkOperation = mongoTemplate.bulkOps(BulkMode.UNORDERED, User::class.java)
        
        // (2) users collection 을 인서트
        bulkOperation.insert(users)
        
        // (3) 실제 트랜잭션 실행
        val bulkOperationResult = bulkOperation.execute()

		// (4) 트랜잭션 결과에 따른 인서트된 로우수 로깅
        log.debug("insertedCount : ${bulkOperationResult.insertedCount}")
    }
}

 

BulkMode 에는 UNORDERED / ORDERED 가 있다. : ORDERED 를 먼저 살핀다.

  • ORDERED : 인서트를 수행하는 컬렉션의 요소 중 하나라도 문제가 생긴다면, 거기서 멈춘다. 에러가 발생하기전에 들어간 요소들은 트랜잭션 처리된다.
  • UNORDERED : 인서트를 수행하는 컬렉션의 요소 중 하나라도 문제가 생기더라도 무시하고, 나머지 요소들을 처리한다.

mongoTemplate.bulkOps() 객체 자체는 스레드 세이프하지 않기 때문에 병렬처리를 할 수 없다. 결국 아래와 같이 쓸 수 없다. 쓰고 싶다면 각각의 forEach {} 블럭 내부에서 mongoTemplate.bulkOps() 객체를 따로따로 만들어서 실행시켜주어야 한다.

val documents = mutableListOf<Member>()
File(fileName).forEachLine { line -> documents.add(Member(line)) }

val bulkOps = mongoTemplate.bulkOps(BulkOperations.BulkMode.ORDERED, "member")

documents.chunked(50).parallelStream().forEach { chunk ->
    bulkOps.insert(chunk)
    bulkOps.execute()
}

 

Posted by doubler
,