• Hive Beta
    • Supported Hive Version’s
      • Depedencies
    • Connecting To Hive
    • Supported Types
      • Limitations

    Hive Beta

    Apache Hive has established itself as a focal point of the data warehousing ecosystem.It serves as not only a SQL engine for big data analytics and ETL, but also a data management platform, where data is discovered, defined, and evolved.

    Flink offers a two-fold integration with Hive.The first is to leverage Hive’s Metastore as a persistent catalog for storing Flink specific metadata across sessions.The second is to offer Flink as an alternative engine for reading and writing Hive tables.

    The hive catalog is designed to be “out of the box” compatible with existing Hive installations.You do not need to modify your existing Hive Metastore or change the data placement or partitioning of your tables.

    • Supported Hive Version’s
      • Depedencies
    • Connecting To Hive
    • Supported Types
      • Limitations

    Supported Hive Version’s

    Flink supports Hive 2.3.4 and 1.2.1 and relies on Hive’s compatibility guarantee’s for other minor versions.

    If you use a different minor Hive version such as 1.2.2 or 2.3.1, it should also be ok to choose the closest version 1.2.1 (for 1.2.2) or 2.3.4 (for 2.3.1) to workaround. For example, you want to use Flink to integrate 2.3.1 hive version in sql client, just set the hive-version to 2.3.4 in YAML config. Similarly pass the version string when creating HiveCatalog instance via Table API.

    Users are welcome to try out different versions with this workaround. Since only 2.3.4 and 1.2.1 have been tested, there might be unexpected issues. We will test and support more versions in future releases.

    Depedencies

    To integrate with Hive, users need the following dependencies in their project.

    1. <dependency>
    2. <groupId>org.apache.flink</groupId>
    3. <artifactId>flink-connector-hive_2.11</artifactId>
    4. <version>1.9.0</version>
    5. <scope>provided</scope>
    6. </dependency>
    7. <!-- Hadoop Dependencies -->
    8. <dependency>
    9. <groupId>org.apache.flink</groupId>
    10. <artifactId>flink-hadoop-compatibility_2.11</artifactId>
    11. <version>1.9.0</version>
    12. <scope>provided</scope>
    13. </dependency>
    14. <!-- Hive 2.3.4 is built with Hadoop 2.7.2. We pick 2.7.5 which flink-shaded-hadoop is pre-built with, but users can pick their own hadoop version, as long as it's compatible with Hadoop 2.7.2 -->
    15. <dependency>
    16. <groupId>org.apache.flink</groupId>
    17. <artifactId>flink-shaded-hadoop-2-uber</artifactId>
    18. <version>2.7.5-8.0</version>
    19. <scope>provided</scope>
    20. </dependency>
    21. <!-- Hive Metastore -->
    22. <dependency>
    23. <groupId>org.apache.hive</groupId>
    24. <artifactId>hive-exec</artifactId>
    25. <version>2.3.4</version>
    26. </dependency>
    1. <dependency>
    2. <groupId>org.apache.flink</groupId>
    3. <artifactId>flink-connector-hive_2.11</artifactId>
    4. <version>1.9.0</version>
    5. <scope>provided</scope>
    6. </dependency>
    7. <!-- Hadoop Dependencies -->
    8. <dependency>
    9. <groupId>org.apache.flink</groupId>
    10. <artifactId>flink-hadoop-compatibility_2.11</artifactId>
    11. <version>1.9.0</version>
    12. <scope>provided</scope>
    13. </dependency>
    14. <!-- Hive 1.2.1 is built with Hadoop 2.6.0. We pick 2.6.5 which flink-shaded-hadoop is pre-built with, but users can pick their own hadoop version, as long as it's compatible with Hadoop 2.6.0 -->
    15. <dependency>
    16. <groupId>org.apache.flink</groupId>
    17. <artifactId>flink-shaded-hadoop-2-uber</artifactId>
    18. <version>2.6.5-8.0</version>
    19. <scope>provided</scope>
    20. </dependency>
    21. <!-- Hive Metastore -->
    22. <dependency>
    23. <groupId>org.apache.hive</groupId>
    24. <artifactId>hive-metastore</artifactId>
    25. <version>1.2.1</version>
    26. </dependency>
    27. <dependency>
    28. <groupId>org.apache.hive</groupId>
    29. <artifactId>hive-exec</artifactId>
    30. <version>1.2.1</version>
    31. </dependency>
    32. <dependency>
    33. <groupId>org.apache.thrift</groupId>
    34. <artifactId>libfb303</artifactId>
    35. <version>0.9.3</version>
    36. </dependency>

    Connecting To Hive

    Connect to an existing Hive installation using the Hive Catalog through the table environment or YAML configuration.

    1. String name = "myhive";
    2. String defaultDatabase = "mydatabase";
    3. String hiveConfDir = "/opt/hive-conf";
    4. String version = "2.3.4"; // or 1.2.1
    5. HiveCatalog hive = new HiveCatalog(name, defaultDatabase, hiveConfDir, version);
    6. tableEnv.registerCatalog("myhive", hive);
    1. val name = "myhive"
    2. val defaultDatabase = "mydatabase"
    3. val hiveConfDir = "/opt/hive-conf"
    4. val version = "2.3.4" // or 1.2.1
    5. val hive = new HiveCatalog(name, defaultDatabase, hiveConfDir, version)
    6. tableEnv.registerCatalog("myhive", hive)
    1. catalogs:
    2. - name: myhive
    3. type: hive
    4. property-version: 1
    5. hive-conf-dir: /opt/hive-conf
    6. hive-version: 2.3.4 # or 1.2.1

    Supported Types

    Currently HiveCatalog supports most Flink data types with the following mapping:

    Flink Data TypeHive Data Type
    CHAR(p)CHAR(p)
    VARCHAR(p)VARCHAR(p)
    STRINGSTRING
    BOOLEANBOOLEAN
    TINYINTTINYINT
    SMALLINTSMALLINT
    INTINT
    BIGINTLONG
    FLOATFLOAT
    DOUBLEDOUBLE
    DECIMAL(p, s)DECIMAL(p, s)
    DATEDATE
    BYTESBINARY
    ARRAY<T>LIST<T>
    MAP<K, V>MAP<K, V>
    ROWSTRUCT

    Limitations

    The following limitations in Hive’s data types impact the mapping between Flink and Hive:

    • CHAR(p) has a maximum length of 255
    • VARCHAR(p) has a maximum length of 65535
    • Hive’s MAP only supports primitive key types while Flink’s MAP can be any data type
    • Hive’s UNION type is not supported
    • Flink’s INTERVAL type cannot be mapped to Hive INTERVAL type
    • Flink’s TIMESTAMP_WITH_TIME_ZONE and TIMESTAMP_WITH_LOCAL_TIME_ZONE are not supported by Hive
    • Flink’s TIMESTAMP_WITHOUT_TIME_ZONE type cannot be mapped to Hive’s TIMESTAMP type due to precision difference.
    • Flink’s MULTISET is not supported by Hive