카프카 커넥트와 오프셋 관리(2)

지난 글에 이어 이번에는 카프카 커넥트 오프셋 관리 기능에 대해 설명하겠습니다. 혹시라도 1부 글을 읽지 않으신 분들은 1부 글과 이어지는 내용이 있으니, 1부 글을 먼저 읽고 오시는 편이 좋을 것 같습니다.

지난 글에서 소스 커넥터는 프로듀서와 유사하고, 싱크 커넥터는 컨슈머와 유사하다는 말씀을 드린 적이 있습니다. 이번에 주로 다룰 내용은 싱크 커넥터와 오프셋 관련된 부분이므로 참고하시기 바랍니다.

싱크 커넥터와 오프셋

싱크 커넥터는 컨슈머와 유사한 기능을 하며, 카프카의 토픽에서 데이터를 읽은 뒤 싱크 시스템으로 전송하는 역할을 합니다. 컨슈머의 경우 카프카의 토픽에서 메시지를 읽어가게 되면, 컨슈머의 점검, 컨슈머의 확장, 컨슈머의 재시작 등을 유연하게 대응하기 위해 컨슈머가 메시지를 어디까지 읽었는지 위치를 표시하게 됩니다. 이러한 위치를 카프카에서는 오프셋이라고 하고, 컨슈머 그룹마다 오프셋 위치를 별도의 공간에 저장하게 됩니다. 컨슈머 그룹의 오프셋은 초기 카프카 버전에서는 주키퍼의 지노드에 저장하였습니다. 이는 성능상의 이슈로 현재 버전의 카프카에서는 카프카의 내부 토픽인 __consumer_offset 토픽에 저장하고 있습니다.

컨슈머와 유사한 기능을 하는 싱크 커넥터 역시 오프셋 위치를 저장해야 합니다. 오프셋이 저장된 내용을 통해 커넥터의 장애 및 재시작 시 데이터를 처리할 위치를 조절하게 됩니다.

카프카 커넥트의 경우 총 3개의 내부 토픽을 사용하게 되는데, 각각의 토픽과 용도는 다음과 같습니다.

  • config.storage.topic: 커넥트의 구성과 관련된 정보 저장
  • offset.storage.topic: 커넥트의 오프셋 정보 저장
  • status.storage.topic: 커넥트의 상태 정보 저장

따라서, 카프카 커넥트의 싱크 커넥터는 __consumer_offset 토픽에 오프셋 정보를 저장하지 않고, 커넥트에서 내부적으로 사용하는 offset.storage.topic 토픽에 저장하게 됩니다.

만약 테스트나 개발 목적으로 카프카 커넥트를 단독모드로 구성하는 경우, 오프셋 정보 등은 로컬에 위치하는 별도의 파일에 저장하며, 카프카 커넥트를 분산모드로 구성하는 경우에만 커넥트에서 내부적으로 자체 관리하는 토픽에 저장합니다.

이렇게 별도의 토픽으로 오프셋을 관리하다 보니, 지금까지는 카프카 클라이언트를 이용한 컨슈머 그룹 관리도구등을 사용할 수 없었고, 관리자 또는 개발자 입장에서는 다소 불편한 점들이 있었습니다.

하지만, 카프카 3.5 버전부터는 오프셋을 관리할 수 있는 REST API 기능이 추가되어, 관리자 또는 개발자 입장에서 매우 유연하게 대응할 수 있게 되었습니다.

  • GET /connectors/{connector}/offsets
  • PUT /connectors/{connector}/stop

지금부터 오프셋을 관리할 수 있는 REST API 기능에 대해 설명하겠습니다. 이 글에서는 카프카 설치와 커넥트 설치등의 내용은 따로 다루지 않고, REST API를 이용하여 오프셋을 조정하는 방법에 대해서만 설명하는 점 양해 부탁드리겠습니다.

오프셋 관리 예제

  • 테스트 환경은 다음과 같이 구성되어 있다고 가정하겠습니다.
  • 카프카 버전: 3.6.1
  • 브로커 이름: peter-kafka01.foo.bar
  • 데이터가 저장된 토픽명: sqlite-sample-Models
  • 사용할 커넥터: FileStreamSinkConnector
  • 파일명: sink.txt
  • 시나리오: FileStreamSinkConnector커넥터를 이용해 카프카의 sqlite-sample-Models토픽에서 레코드를 전부 가져왔으나, 로컬 파일의 손상 등으로 특정 오프셋으로 조정한 후 다시 레코드를 가져옴

해당 시나리오를 그림으로 표현하면 다음과 같습니다.

file-connector

리눅스 명령어인 curl을 이용해 카프카 커넥트를 다루는 REST API와 출력 결과를 예제로 같이 보면서 설명하겠습니다. 가장 먼저 카프카 커넥트의 상태를 확인합니다.

1
2
3
4
5
6
$ curl http://peter-kafka01.foo.bar:8083 | jq
 {
 "version": "3.6.1",
 "commit": "5e3c2b738d253ff5",
 "kafka_cluster_id": "QrqgwH_BR8-md0QbhKrxpQ"
 }

카프카 커넥트는 3.6.1이며, 카프카의 아이디가 표시되고 있습니다.

FileStreamSinkConnector를 실행하여 sqlite-sample-Models 토픽의 레코드를 sink.txt로 저장합니다.

1
2
3
4
5
6
7
$ curl -s -X POST \
 -H "Content-Type: application/json" \ 
 --data '{   "name": "File-Sink-Connector",   
"config": {     "topics": "sqlite-sample-Models",     
"connector.class": "org.apache.kafka.connect.file.FileStreamSinkConnector",    
"value.converter": "org.apache.kafka.connect.json.JsonConverter",     
"file": "sink.txt"   } }' http://peter-kafka01.foo.bar:8083/connectors

FileStreamSinkConnector 커넥터가 정상적으로 실행되었는지 상태를 확인합니다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
$curl -s http://peter-kafka01.foo.bar:8083/connectors/File-Sink-Connector/status | jq
{
"name": "File-Sink-Connector",
"connector": {
"state": "RUNNING",
"worker_id": "192.168.0.4:8083"
},
"tasks": [
{
 "id": 0,
 "state": "RUNNING",
 "worker_id": "192.168.0.4:8083"
 }
 ],
 "type": "sink"
}

커넥터의 상태는 RUNNING이며, 정상인 것을 확인할 수 있습니다. 그럼 sqlite-sample-Models 토픽의 레코드가 모두 sink.txt 파일에 저장되었는지 파일의 내용을 확인합니다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
$ cat sink.txt
Struct{model_id=1,model_name=The Blonde,model_base_price=23000,brand_id=1}
Struct{model_id=2,model_name=The Brunette,model_base_price=25000,brand_id=1}
Struct{model_id=3,model_name=The Red Head,model_base_price=29000,brand_id=1}
Struct{model_id=4,model_name=Hat,model_base_price=22000,brand_id=2}
Struct{model_id=5,model_name=Sweater,model_base_price=25000,brand_id=2}
Struct{model_id=6,model_name=T-Shirt,model_base_price=27000,brand_id=2}
Struct{model_id=7,model_name=Orange,model_base_price=15000,brand_id=3}
Struct{model_id=8,model_name=Blue,model_base_price=12000,brand_id=3}
Struct{model_id=9,model_name=Green,model_base_price=17000,brand_id=3}
Struct{model_id=10,model_name=LaFerrari,model_base_price=125000,brand_id=4}
Struct{model_id=11,model_name=450,model_base_price=75000,brand_id=4}
Struct{model_id=12,model_name=F12 Berlinetta,model_base_price=110000,brand_id=4}
Struct{model_id=13,model_name=F40,model_base_price=100000,brand_id=4}
Struct{model_id=14,model_name=Extra,model_base_price=30000,brand_id=5}
Struct{model_id=15,model_name=Too Much,model_base_price=35000,brand_id=5}
Struct{model_id=16,model_name=Beats,model_base_price=24000,brand_id=6}
Struct{model_id=17,model_name=Bars,model_base_price=35000,brand_id=6}

sink.txt 파일에 모든 내용이 저장된 것을 확인하였습니다. FileStreamSinkConnector 커넥터의 오프셋을 확인합니다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
$ curl -s -X GET http://peter-kafka01.foo.bar:8083/connectors/File-Sink-Connector/offsets | jq
{
 "offsets": [
 {
 "partition": {
 "kafka_partition": 0,
 "kafka_topic": "sqlite-sample-Models"
 },
 "offset": {
 "kafka_offset": 17
 }
 }
 ]
}

현재 오프셋은 17인 것을 알 수 있습니다. 그럼 오류가 생겼다고 가정하고, 오프셋을 10으로 변경해 보겠습니다. 오프셋을 변경하기 전에 커넥터를 중지한 후 오프셋을 변경합니다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
$ curl -s -X PUT http://peter-kafka01.foo.bar:8083/connectors/File-Sink-Connector/stop
$ curl -s -X PATCH \
-H "Content-Type: application/json" \
--data '{
 "offsets": [
 {
 "partition": {
 "kafka_topic": "sqlite-sample-Models",
 "kafka_partition": 0
 },
 "offset": {
 "kafka_offset": 10
 }
 }
 ]
}' http://kpeter-kafka01.foo.bar:8083/connectors/File-Sink-Connector/offsets
{"message":"The offsets for this connector have been altered successfully"}

응답 메시지를 통해 오프셋이 정상적으로 변경된 것을 확인할 수 있습니다. 다시 한번 FileStreamSinkConnector 커넥터의 변경된 오프셋을 확인합니다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
$ curl -s -X GET http://peter-kafka01.foo.bar:8083/connectors/File-Sink-Connector/offsets | jq
{
 "offsets": [
 {
 "partition": {
 "kafka_partition": 0,
 "kafka_topic": "sqlite-sample-Models"
 },
 "offset": {
 "kafka_offset": 10
 }
 }
 ]
}

오프셋이 10으로 변경된 것을 확인하였습니다. 이제 FileStreamSinkConnector 커넥터를 재시작합니다.

1
$ curl -s -X PUT http://peter-kafka01.foo.bar:8083/connectors/File-Sink-Connector/resume

다음으로 파일의 내용을 확인해 보겠습니다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
$ cat sink.txt
Struct{model_id=1,model_name=The Blonde,model_base_price=23000,brand_id=1}
Struct{model_id=2,model_name=The Brunette,model_base_price=25000,brand_id=1}
Struct{model_id=3,model_name=The Red Head,model_base_price=29000,brand_id=1}
Struct{model_id=4,model_name=Hat,model_base_price=22000,brand_id=2}
Struct{model_id=5,model_name=Sweater,model_base_price=25000,brand_id=2}
Struct{model_id=6,model_name=T-Shirt,model_base_price=27000,brand_id=2}
Struct{model_id=7,model_name=Orange,model_base_price=15000,brand_id=3}
Struct{model_id=8,model_name=Blue,model_base_price=12000,brand_id=3}
Struct{model_id=9,model_name=Green,model_base_price=17000,brand_id=3}
Struct{model_id=10,model_name=LaFerrari,model_base_price=125000,brand_id=4}
Struct{model_id=11,model_name=450,model_base_price=75000,brand_id=4}
Struct{model_id=12,model_name=F12 Berlinetta,model_base_price=110000,brand_id=4}
Struct{model_id=13,model_name=F40,model_base_price=100000,brand_id=4}
Struct{model_id=14,model_name=Extra,model_base_price=30000,brand_id=5}
Struct{model_id=15,model_name=Too Much,model_base_price=35000,brand_id=5}
Struct{model_id=16,model_name=Beats,model_base_price=24000,brand_id=6}
Struct{model_id=17,model_name=Bars,model_base_price=35000,brand_id=6}
Struct{model_id=11,model_name=450,model_base_price=75000,brand_id=4}
Struct{model_id=12,model_name=F12 Berlinetta,model_base_price=110000,brand_id=4}
Struct{model_id=13,model_name=F40,model_base_price=100000,brand_id=4}
Struct{model_id=14,model_name=Extra,model_base_price=30000,brand_id=5}
Struct{model_id=15,model_name=Too Much,model_base_price=35000,brand_id=5}
Struct{model_id=16,model_name=Beats,model_base_price=24000,brand_id=6}
Struct{model_id=17,model_name=Bars,model_base_price=35000,brand_id=6}

파일의 내용을 확인해 보면, model_id=17 이후에 model_id=11부터 다시 추가된 것을 확인할 수 있습니다. 커넥터의 상태를 확인하느라 다소 REST API 호출이 많았지만, 결과적으로 커넥터의 오프셋을 변경한 후 레코드도 재처리되었습니다.

정리

지금까지 카프카 커넥트의 오프셋 관리 기능을 활용하여, 손상된 데이터를 다시 처리하거나 특정 상황에서 원하는 오프셋으로 재조정할 수 있음을 확인했습니다. 특히 REST API를 통해 손쉽게 커넥터의 상태를 점검하고, 오프셋을 관리할 수 있어 관리자 입장에서 유연하게 대응할 수 있습니다.

앞서 설명드린 내용은 단순한 테스트 시나리오에 불과하지만, 실제 운영 환경에서도 유사한 문제들이 발생할 수 있기 때문에 이를 사전에 숙지하고, 적절히 대응한다면 좋은 결과가 있을 것입니다.

앞으로 카프카 커넥트를 더욱 효율적으로 사용하기를 바라며, 이상으로 카프카 커넥트에 대한 글을 마치도록 하겠습니다. 감사합니다.


Popit은 페이스북 댓글만 사용하고 있습니다. 페이스북 로그인 후 글을 보시면 댓글이 나타납니다.