Presto SQL을 이용하여 Kafka topic 데이터 조회하기
Presto는 다양한 저장소를 지원하고 있는데 데이터 수집을 위해 Queue로 많이 사용되고 있는 Kafka도 기본적으로 제공하고 있습니다. 즉, Kafka의 topic에 저장된 데이터를 Presto SQL을 이용하여 조회할 수 있으며 Hadoop, S3, MySQL 등에 저장된 테이블과 Join 연산도 가능하다는 의미입니다. Presto와 Kafka를 연결하려는 이유는 저희 로그 데이터는 S3에 저장되는데 S3의 경우 Block Storage가 아닌 Object Storage라서 저장하는 Client가 로컬의 임시 파일에 저장하고 close() 처리를 하면서 S3로 업로드 하고 있습니다. 이 때문에 close 되는 시점에 데이터를 조회할 수 있기 때문에 실시간으로 로그 데이터를 조회하기 어려운 단점이 있습니다. Kafka에 저장된 데이터를 직접 조회하면 S3 에 저장되기 이전에 조회할 수 있기 때문에 실시간 데이터를 바로 분석할 수 있습니다. 다음과 같은 아키텍처를 간단하게 구현할 수 있지 않을까 하는 생각에서 진행해 보고 있습니다.
(http://www.benstopford.com/2015/04/28/elements-of-scale-composing-and-scaling-data-platforms/)
Kafka Connector에 대한 자세한 내용은 다음 Document를 참고하시면 됩니다.
https://prestodb.io/docs/current/connector/kafka.html
설정 방법은 etc/catalog/kafka.properties 파일에 다음과 같은 기본 정보를 추가합니다.
1 2 3 4
connector.name=kafka kafka.nodes=localhost:9092 kafka.table-names=sample.test_topic kafka.hide-internal-columns=true
kafka.table-names 설정에서 sample.test_topic 라고 하면 실제 presto에서는 schema는 kafka, databases는 sample, table 명은 test_topic로 나타납니다. 여러 topic을 Presto에서 사용할 경우 kafka.table-names 에 ',' 구분자를 이용하여 여러 개의 topic을 지정합니다.
여기서 주의해야 하는 사항이 presto의 테이블 명에는 하이픈(-)을 사용할 수 없기 때문에(kafka의 topic 이름에서는 사용가능하지만) 하이픈이 아닌 ‘_’ 를 사용하시면 됩니다.
Presto는 schema less가 아니기 때문에 테이블에 대한 schema 정의가 반드시 필요합니다. 위 설정만 하는 경우 Presto에서 test_topic 테이블에 대한 schema는 다음과 같이 나타납니다.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
presto> desc kafka.sample.test_topic; Column | Type | Comment ——————-+———+——————————————— _partition_id | bigint | Partition Id _partition_offset | bigint | Offset for the message within the partition _segment_start | bigint | Segment start offset _segment_end | bigint | Segment end offset _segment_count | bigint | Running message count per segment _key | varchar | Key text _key_corrupt | boolean | Key data is corrupt _key_length | bigint | Total number of key bytes _message | varchar | Message text _message_corrupt | boolean | Message data is corrupt _message_length | bigint | Total number of message bytes (11 rows)
이 스키마는 kafka의 모든 topic이 동일하게 나타나며 실제 사용자 메시지는 _message 컬럼에 존재하게 됩니다. 위 설정에서 kafka.hide-internal-columns=true라고 하면 위 컬럼 정보는 나타나지 않습니다.
_message의 data type이 varchar이기 때문에 Presto에서 제공하는 String 함수를 이용하여 사용가능합니다. 하지만 SQL 이라면 컬럼을 정의하는 것이 좋기 때문에 각 Topic에 대해 schema 정의를 하는 것이 좋습니다.
Schema 정의를 위해서는 etc/kafka 디렉토리를 생성하고 <kafka.table-names의 데이터베이스.테이블명>.json 파일에 다음과 같이 컬럼 정보를 입력합니다.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
{ “tableName”: “test_topic”, “schemaName”: “sample”, “topicName”: “sample.test_topic”, “key”: { “dataFormat”: “raw”, “fields”: [ { “name”: “kafka_key”, “dataFormat”: “LONG”, “type”: “BIGINT”, “hidden”: “false” }] }, “message”: { “dataFormat”: “json”, “fields”: [ { “name”: “col1”, “mapping”: “col1”, “type”: “VARCHAR” }, { “name”: “col2”, “mapping”: “col2”, “type”: “VARCHAR”}, { “name”: “col3”, “mapping”: “col3”, “dataFormat”: “json”, “type”: “JSON”} ] } }
key에 대한 정의는 kafka connector의 internal column인 _key에 대한 매핑으로 kafka의 각 레코드에 대한 key 값이 생성되는 경우 이 key와 매핑된다고 하는데 필자의 경우 null 값만 반환되는데 이 부분은 kafka에 대해 좀 더 살펴본 다음에 사용해야 할 것 같습니다.
실제 사용자 데이터에 대한 column 정의는 message 영역에서 정의해준는데 여기의 dateFormat는 row에 대한 decoder를 정의입니다. 필자가 사용한 예제 데이터는 json 포맷이라서 dataFormat을 json으로 정의하였습니다. 이외에도 raw, csv 등이 있습니다. fields 에서는 각 필드에 대한 정의를 할 수 있으며 nested json의 경우 col3와 같이 정의하면 Presto의 json type으로 반환됩니다. json data type인 경우 다음과 같이 map type으로 cast 하여 사용 가능합니다. Presto에서 hive 연결하는 경우 테이블 생성 시 data type에 map type과 serde에 JsonSerde를 설정함으로써 한번에 해결할 수 있었는데 kafka connector에서는 이 기능을 제공하지 않습니다. 질의는 다음과 같이 할 수 있습니다.
select col1, cast(col3 as map<varchar, varchar>)[‘some_key’] from kafka.sample.test_topic where col2 is not null;
Kafka의 topic 중 특정 offset 이후부터 조회하는 기능 등은 제공하지 않는데 _partition_id, _segment_count 등 시스템 컬럼을 잘 이용하면 가능할 것도 같습니다. 이제 설정하는 단계라 실제 환경에서 사용한 다음에 성능 등에 대한 리포팅은 드리겠습니다.