Presto 사용자 정의 함수 만들기

Presto는 SQL on Hadoop 계열의 오픈소스 솔루션으로 Facebook 에서 개발하였으며, 현재 Netflix 등에서 사용되고 있는 솔루션입니다. 분산된 서버에서 실행되며 SQL 처리 중 발생하는 중간 데이터는 메모리 기반으로 처리함으로써 기존 Hive on MR 보다 아주 빠른 성능으로 SQL을 처리할 수 있습니다. 또한 다양한 스토리지에 저장된 데이터를 SQL을 이용하여 Join 할 수 있는 장점이 있습니다. 자세한 내용은 다음 사이트 참고하세요.

https://prestodb.io/

http://www.popit.kr/tag/presto/

SQL처리 엔진들은 대부분 문자열 처리나 시간 처리를 위해 내장 함수를 제공하고 있지만 이 내장 함수이외에 특별한 기능을 수행하는 함수가 필요한 경우도 있습니다. 예를 들면 IP를 입력으로 받아 좌표나 주소를 반환하는 기능을 수행하는 함수가 대표적인 사용가 정의 함수가 될 수 있습니다.

Presto에서는 기본적으로는 사용자 정의 함수 기능을 제공하지 않습니다(0.15 버전 기준). 일반적으로 SQL을 사용하여 데이터를 처리하는 솔루션에서 사용자 정의 함수를 제공하기 위해서는 다음 기능이 제공되어야 사용자는 불편함 없이 사용할 수 있습니다.

  • 사용자 정의 함수 등록 및 삭제
    • Global 함수나 세션별 함수
    • DDL을 이용하여 함수 등록(Hive의 경우 Create function 명령 이용)
  • 운영 중인 시스템의 중지 없이 배포 가능

첫번째 항목을 구현하는 것은 그렇게 어렵지는 않습니다. 두번째 항목인 분산된 환경에서 수십/수백대의 서버에 수많은 질의가 실행되는 도중에 새로운 버전의 로직을 배포하고 로딩하는 것은 쉽지 않습니다. Hive는 실제 질의가 실행될 때 Task를 fork 하는 방식이기 때문에 상대적으로 쉽게 구현할 수 있었습니다. 다음은 Hive가 사용자 정의 함수 모듈을 배포, 로딩하는 과정입니다.

  • 질의 실행 전에 사용자 정의 함수의 라이브러리 HDFS에 저장
  • 각 서버에 있는 Task는 실행 전 HDFS에 저장된 라이브러리 파일을 복사
  • 질의 실행하는 Task는 이 라이브러리를 Classpath에 추가하여 실행

Presto는 위와 같은 기능의 사용자 정의 함수를 제공하지 않습니다. 그나마 다행인 것은 Presto 의 코어 코드를 수정하지 않고 사용자 정의 함수를 만들 수 있는 방법은 제공하고 있습니다. Presto 사용자 정의 함수는 Presto의 Plugin 기능을 이용합니다. Plugin은 주로 새로운 저장소를 Presto에 지원하기 위해 사용하는 기능으로 저장소의 Connector를 만들때 사용합니다. 기본적으로 제공되는 Connector는 Hive, Cassandra, Mongo, JDBC Basic, MySQL, Postgresql, Kafka, Raptor, Orc, Redis 등이 있습니다.

이 Plugin 기능을 이용하여 Connector 부분은 제외하고 FunctionFactory 를 새로 정의하여 사용자 정의 함수를 구현할 수 있습니다. 다음은 간단한 사용자 정의 함수를 위한 클래스 및 설명입니다.

사용자 정의 함수 만들기

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public class BabokimPlugin implements Plugin {
  private Map<String, String> optionalConfig = ImmutableMap.of();
  private TypeManager typeManager;
  @Override
  public void setOptionalConfig(Map<String, String> optionalConfig) {
    this.optionalConfig = ImmutableMap.copyOf(requireNonNull(optionalConfig, “optionalConfig is null”));
  }
  @Inject
  public synchronized void setTypeManager(TypeManager typeManager) {
    this.typeManager = requireNonNull(typeManager, “typeManager is null”);
  }
  @Override
  public <T> List<T> getServices(Class<T> type) {
    if (type == FunctionFactory.class) {
      return ImmutableList.of(type.cast(new BabokimFunctionFactory(typeManager)));
    }
    return ImmutableList.of();
  }
}

Plugin 클래스의 getServices() 메소드에서 Connector, FunctionFactor 등을 전달합니다. 예를 들어 Connector가 추가 되었으면 다음과 같이 할 수 있다.

1
2
3
if (type == ConnectorFactory.class) {
    return ImmutableList.of(type.cast(new BabokimConnectorFactory(“babokim”, optionalConfig)));
}

다음은 FunctionFactory를 다음과 같이 만든다.

1
2
3
4
5
6
7
8
9
10
11
public class BabokimFunctionFactory implements FunctionFactory {
  private final TypeManager typeManager;
  public BabokimFunctionFactory(TypeManager typeManager) {
    this.typeManager = requireNonNull(typeManager, “typeManager is null”);
  }
  public List<SqlFunction> listFunctions() {
    return new FunctionListBuilder(typeManager)
        .scalar(EchoFunctions.class)
        .getFunctions();
  }
}

이제 사용자 정의 함수를 만들 준비가 되었다. listFunctions() 에서 반환하고 있는 EchoFunctions 클래스 만들고 여기에 사용자 정의 함수를 구현하면 된다. 예제의 경우 간단하게 두개의 함수만 구현하기 위해 EchoFunctions 클래스만 반환하는데 여러 개의 클래스를 반환할 수도 있다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public class EchoFunctions {
  public EchoFunctions() {
  }
  @Description(“Echo message”)
  @ScalarFunction(“echo”)
  @SqlType(StandardTypes.VARCHAR)
  public static String echo(@SqlType(StandardTypes.VARCHAR) Slice str) {
    return str.toStringUtf8();
  }
  @Description(“Hello message”)
  @ScalarFunction(“hello”)
  @SqlType(StandardTypes.VARCHAR)
  public static String hello(@SqlType(StandardTypes.VARCHAR) Slice str) {
    return “Hello ” + str.toStringUtf8();
  }
}

여기서 마지막으로 META-INF/services에 다음과 같은 파일을 만들어서 plugin 정보를 저장한다.

  • 파일명: com.facebook.presto.spi.Plugin
  • 내용: plugin class
    • 예: kr.popit.BabokimPlugin

이렇게 한 다음 jar 파일을 만들고 presto 서버의 plugin 디렉토리(babokim)에 배포한다. Presto를 재시작 한 다음 콘솔에서 다음과 같이 입력한다.

  • select echo(‘test’);
  • select hello(‘test’);

어떤 경우 사용자 정의 함수에서 특정 데이터베이스의 테이블에 저장된 코드 정보를 조회하여 메모리에 올려놓고 처리하는 경우가 있는데 이 경우 DB 접속 정보 등 환경설정이 필요하다. Connector의 경우 etc/catalog 디렉토리에 property 파일 만들고 관련 정보를 저장할 수 있지만 아직 Function에서 설정 정보를 가져오는 부분은 찾지 못했다.  그래서 약간 우회 전략으로 다음과 같이 처리하였다.

Plugin 클래스의 setOptionalConfig() 메소드의 파라미터로 전달되는 config 정보에서 다음 key 값을 읽어 오면 Presto의 etc/config.properties 파일에 대한 pull path가 전달되는데 이 path를 이용하여 etc 디렉토리의 위치를 가져온 다음에 처리하도록 하였다.

1
2
3
4
5
6
7
8
9
10
11
12
public void setOptionalConfig(Map<String, String> optionalConfig) {
  String babokimConfigFile = null;
  if (optionalConfig != null) {
    String configFilePath = optionalConfig.get(“config”);
    if (configFilePath != null) {
      babokimConfigFile = (new File(configFilePath)).getParentFile().getAbsolutePath();
    }
  }
  this.optionalConfig = ImmutableMap.copyOf(requireNonNull(optionalConfig, “optionalConfig is null”));
  File myConfigFile = new File(babokimConfigFile + “/” + “babokim.properties”);
  //Load property
}

Popit은 페이스북 댓글만 사용하고 있습니다. 페이스북 로그인 후 글을 보시면 댓글이 나타납니다.