Hive에서 Druid로 주저 없이 데이터 적재하기[번외:airflow]
Hive에 있는 데이터 Druid에 적재하려면?
다음과 같은 다양한 삽질 방법이 존재한다.
- Hive에 metastore로부터 hdfs location을 파악한 후 orc format이나 parquet format이냐에 따라 그에 맞는 hadoop ingestion spec을 작성해야함. orc인 경우 struct type정보를 잘 파악해야하며, parquet인 경우에는 avro schema를 읽기 때문에 orc에 비해 덜 번거롭다. 자세한 내용은 요기를 참고: ORC ingestion spec , Parquest ingestion spec -> 여기서 문제점 하나가 발생 partition column의 경우는 어떻게 ingestion하지?
partition column이 dimension이나 metric 또는 timestamp로 들어가는 경우가 있어서 이를 위해서는 별도 패치가 필요하다. 다행히 내부 브랜치에서 이런 기능을 구현하여 사용중
- Hive CLI를 통해 hdfs에 데이터를 내린후 hdfs데이터를 ingestion한다. 좀 수고스럽긴 하지만 parquet, orc, partition column등의 문제를 회피할 수 있는 방법은 Hive에서 필요 데이터를 join이나 쿼리 연산으로 처리 후csv,tsv 타입으로 hdfs패스에 내리고 로딩하는 방식이다. 무척이나 번거롭다.
- 필자의 삽질 Hive thrift server를 띄우고 oozie를 통해 데이터를 질의한 후 hdfs write하고 이를 인덱싱한다. 이미 기존에 푸던 삽질 방식이다. 관련된 내용이 궁금하다면 여길 참고하길
1,2,3 뭘 해도 깔끔한 방법이 없다. 2번의 경우 SQL의 자유도를 주기 위해 SQL로 데이터를 내리고 druid에 적재한후 해당 데이터는 깔끔하게 삭제해 주어야 한다. 이번 글에서는 hive to druid를 한방에 끝낼 수 있는 airflow plugin에 대해 이야기 해보려 한다.
먼저 간단하게 나마 airflow에 대해 설명하고 넘어가자
Airflow
oozie나 azkaban은 좀 들어봤는데 airflow는 뭐지? Apache Airflow는 프로그래밍 가능한 워크플로우 플랫폼이다. Airflow의 가장 큰 특징은 task에 대한 DAGs(Directed Acyclic Graphs)를 지원하는 워크 플로우이다. 사용자가 정의한 DAG pipeline을 통해 task가 수행되며 workflow 는 python code로 구성되어 있어 파이선 개발자에겐 상당히 유용한 워크플로우 플랫폼 이다.
Airflow가 쿨한 이유 [https://airflow.apache.org]
- Dynamic: Airflow pipeline은 code로 설정 가능하고 이는 dynamic 한 pipeline생성을 허용한다.
- Extensible: task를 수행하기 위한 operator를 쉽게 정의 할 수 있고 library를 확장하여 환경에 적합하게 확장성을 가진다.
- Elegant: 내부적으로 Jinja template engine을 사용하고 있어 수행하는 script의 매개변수를 손쉽게 관리할 수 있다.
- Scalable: 임의의 worker를 띄우고 job queue를 관리하고 있어 규모있는 운영이 가능하다.
Druid Hook 과 Hive to Druid Operator
airflow에 대해 더 깊게 이야기 하고 싶지만 이 글의 주제가 airflow는 아닌 관계로 중간 생략하고 hive에서 druid로 바로 적재하는 방식에 대해 알아보자. 다음과 같이 두개의 파일만 참고하면 된다.
https://github.com/apache/incubator-airflow/blob/master/airflow/hooks/druid_hook.py
https://github.com/apache/incubator-airflow/blob/master/airflow/operators/hive_to_druid.py
한줄 요약하면 druid hook을 통해 druid overload에 연결하고 hive to druid가 실제로 hive에 데이터를 로딩하여 druid로 인덱싱하게 만드는 operator이다. 아래 hive_to_druid의 일부 코드를 살펴보자. python code가 독해가 되시는 분은 술술 읽히실 것이다. hive cli hook을 통해 hive로 접속한 후 해당 쿼리를 질의하여 temp table을 생성하여 데이터를 넣는다. temp table이 생성되고 나서 해당 hdfs location을 읽어온 후 druid hook을 통해 hadoop ingestion을 수행하고 색인이 끝나면 table을 drop한다. 깔끔한 마무리이;;;;
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44
def execute(self, context): hive = HiveCliHook(hive_cli_conn_id=self.hive_cli_conn_id) self.log.info("Extracting data from Hive") hive_table = 'druid.' + context['task_instance_key_str'].replace('.', '_') sql = self.sql.strip().strip(';') tblproperties = ''.join([", '{}' = '{}'".format(k, v) for k, v in self.hive_tblproperties.items()]) hql = """\ SET mapred.output.compress=false; SET hive.exec.compress.output=false; DROP TABLE IF EXISTS {hive_table}; CREATE TABLE {hive_table} ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' STORED AS TEXTFILE TBLPROPERTIES ('serialization.null.format' = ''{tblproperties}) AS {sql} """.format(**locals()) self.log.info("Running command:\n %s", hql) hive.run_cli(hql) m = HiveMetastoreHook(self.metastore_conn_id) # Get the Hive table and extract the columns t = m.get_table(hive_table) columns = [col.name for col in t.sd.cols] # Get the path on hdfs hdfs_uri = m.get_table(hive_table).sd.location pos = hdfs_uri.find('/user') static_path = hdfs_uri[pos:] schema, table = hive_table.split('.') druid = DruidHook(druid_ingest_conn_id=self.druid_ingest_conn_id) try: index_spec = self.construct_ingest_query( static_path=static_path, columns=columns, ) self.log.info("Inserting rows into Druid, hdfs path: %s", static_path) druid.submit_indexing_job(index_spec) self.log.info("Load seems to have succeeded!") finally: self.log.info( "Cleaning up by dropping the temp Hive table %s", hive_table ) hql = "DROP TABLE IF EXISTS {}".format(hive_table) hive.run_cli(hql)
해당 Operator를 수행하기 위한 python script는 다음과 같다. 그 전에 airflow에서 druid overload 와 hive관련 설정은 미리 되어 있어야 한다.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
default_args = { 'owner': 'jerry', 'depends_on_past': False, 'start_date': datetime(2017, 11, 10), 'email': ['jerry@go.go'], 'email_on_failure': False, 'email_on_retry': False, 'retries': 1, 'retry_delay': timedelta(minutes=5), } dag = DAG('daily_druid', <code class="java plain"> schedule_interval=</code><code class="java string">'0 12 * * *'</code><code class="java plain">,</code> default_args=default_args) HiveToDruidTransfer(task_id="load_dummy_test", sql="select * from default.sales \ limit 10;", druid_datasource="airflow_test", intervals=["2017-11-01/2017-11-10"], ts_dim="create_date", dag=dag )
airflow의 dag 이름은 daily_druid이고 task이름은 load_dummy_test이다. 위의 내용처럼 load_dummy_test task는 default db의 sales table을 질의하여 timestamp filed는 create_date로 지정하여 druid에 인덱싱하는 task이다. airflow는 확장 가능한 모듈 구조로 되어 있기때문에 자세한 설명은 따로 없다. ㅎㅎ 자세한 파라미터는 https://pythonhosted.org/airflow/_modules/hive_to_druid.html 의 코드를 확인한다. 여기까지 읽었는데 Druid가 뭐지라고 생각하신다면 이미 낚이셨;;;; Druid 가 낯설다면 Druid 소개 글을 먼저 읽어보실 :)
만에 하나 Timezone변경 및 hadoop property 변경 등으로 hive_to_druid.py 의 ingest_query_dict를 변경해야 할 필요가 있다. 삽집을 해보시고 안되시면 연락을 May be force be with you!