Load Data from Hive Data Warehouse
Hive data warehouse is built on the HDFS of the Hadoop cluster, so the data in the Hive data warehouse is also stored in HDFS. SynxDB supports loading tables from a Hive cluster into SynxDB using the extended Hive Connector and Load Data from Object Storage and HDFS.
The Hive Connector loads tables from the Hive cluster as foreign tables in SynxDB, which store the paths to the data in HDFS. datalake_fdw
reads data from these external tables, thus loading data from Hive into SynxDB.
This document explains how to use the Hive Connector and datalake_fdw
to load tables from a Hive cluster into SynxDB.
Supported Hive file formats
You can load files in TEXT, CSV, ORC, PARQUET, Iceberg, or Hudi formats from Hive into SynxDB.
Usage limitations
Synchronizing Hive external tables is not supported.
Synchronizing Hive table statistics is not supported.
SynxDB can read data from HDFS and write data to HDFS, but the written data cannot be read by Hive.
Note
Q: How is write and update on HDFS synchronized to SynxDB? Are there any limitations?
A: The data is still stored in HDFS, and the Foreign Data Wrapper only reads the data from HDFS.
Steps
The general steps to use the Hive Connector are as follows:
Create a configuration file on the SynxDB node, specifying the target Hive cluster and HDFS information in the file. See Step 1. Create a configuration file on database cluster.
Create the foreign data wrapper and Hive Connector extension.
Create the server and user mapping.
Load Hive objects into SynxDB. You can load a single table from Hive or load an entire database.
Step 1. Create a configuration file on database cluster
Create a configuration file on the node of SynxDB, specifying the target Hive cluster and HDFS information in the file.
Configure Hive cluster information
The Hive Connector supports Hive v2.x and v3.x. You need to create a configuration file named gphive.conf
on the coordinator and standby nodes of the SynxDB data warehouse.
Configuration items
Item name |
Description |
Default value |
---|---|---|
uris |
Address of the Hive Metastore Service (HMS hostname) |
/ |
auth_method |
Authentication method for Hive Metastore Service: simple or kerberos |
simple |
krb_service_principal |
The service principal needed for Kerberos authentication of the Hive Metastore Service. If using HMS HA, set the instance in the principal to |
/ |
krb_client_principal |
The client principal needed for Kerberos authentication of the Hive Metastore Service. |
/ |
krb_client_keytab |
The keytab file corresponding to the client principal for Kerberos authentication of the Hive Metastore Service. |
/ |
debug |
Hive Connector debug flag: true or false |
false |
Note
When Kerberos authentication is enabled, you need to renew the Kerberos ticket periodically. If the ticket is not renewed, database performance will gradually degrade until the database eventually crashes and becomes unavailable. Add a renewal command to an operating-system scheduler (such as crontab
) or to your orchestration platform, for example:
kinit -kt /yourpath/greenplum.keytab bdms_greerplum/dev@BDMS.COM
In the command above, /yourpath/greenplum.keytab
is the path to your keytab file. The principal bdms_greerplum/dev@BDMS.COM
is an example, where bdms_greerplum
is the user, dev
is the instance, and BDMS.COM
is the REALM
. Replace these with your actual values.
It is recommended to add a renewal script to monitoring to ensure that the ticket remains valid. For more examples of ticket-renewal configuration, see the Greenplum documentation: <https://techdocs.broadcom.com/us/en/vmware-tanzu/data-solutions/tanzu-greenplum-platform-extension-framework/6-7/gp-pxf/pxf_kerbhdfs.html>.
gphive.conf configuration example
Create the gphive.conf
file on the coordinator and standby nodes of the SynxDB data warehouse using the following content. Replace example.net:8083
with the address of your Hive Metastore Service.
hive-cluster-1: # connector name
uris: thrift://example.net:8083
auth_method: simple
Configure multiple Hive clusters
To configure multiple Hive clusters, add more entries in gphive.conf
. The following example adds a new Hive cluster named hive-cluster-2
, which requires Kerberos authentication, and another Hive HA cluster named hive-cluster-3
, which also requires Kerberos authentication.
hive-cluster-1: #simple auth
uris: thrift://example1.net:9083
auth_method: simple
hive-cluster-2: #kerberos auth
uris: thrift://example2.net:9083
auth_method: kerberos
krb_service_principal: hive/synxdb@SYNXDB.CN
krb_client_principal: user/synxdb@SYNXDB.CN
krb_client_keytab: /home/gpadmin/user.keytab
hive-cluster-3: #kerberos auth(HMS HA)
uris: thrift://hms-primary.example2.net:9083,thrift://hms-standby.example2.net:9083
auth_method: kerberos
krb_service_principal: hive/_HOST@SYNXDB.CN
krb_client_principal: user/synxdb@SYNXDB.CN
krb_client_keytab: /home/gpadmin/user.keytab
Configure HDFS cluster information
The Hive Connector needs information about the HDFS cluster where the Hive cluster is located to create external tables and read them using the datalake_fdw
plugin. Therefore, you need to provide a configuration file named gphdfs.conf
on the Coordinator and Standby nodes of SynxDB.
Configuration Options
Option Name |
Description |
Default Value |
---|---|---|
hdfs_namenode_host |
Configure the HDFS host information, e.g., |
/ |
hdfs_namenode_port |
Configure the HDFS port. If not set, it defaults to 9000. |
|
hdfs_auth_method |
Configure the HDFS authentication method. Use |
/ |
krb_principal |
Kerberos principal. Set this when |
/ |
krb_principal_keytab |
Location of the keytab generated by the user. |
/ |
hadoop_rpc_protection |
Should match the configuration in |
/ |
data_transfer_protocol |
When Kerberos is configured for the HDFS cluster, there are two methods: 1. privileged resources 2. SASL RPC data transfer protection and SSL for HTTP. If the second method (“SASL”) is used, set |
/ |
is_ha_supported |
Indicates whether to use |
|
hdfs-ha Configuration Instructions
The program reads the HA configuration only when is_ha_supported
is set to true
. Users should provide the hdfs-ha
configuration information in key-value format in the configuration file. The program will read all HA configurations in sequence, and each HA configuration must match the corresponding configuration in the HDFS cluster. The value of each configuration item must be in lowercase; if it is in uppercase, it must be converted to lowercase before configuration. The configuration is as shown in the table below:
Option Name |
Description |
Default Value |
---|---|---|
|
The NameServices name of the HDFS cluster, represented as |
/ |
|
The list of NameNodes in the cluster where the NameService is |
/ |
|
The RPC address of the NameNode named |
/ |
|
The HTTP address of the NameNode named |
/ |
|
The Java class used to communicate with the Active NameNode in the |
/ |
HDFS cluster configuration example
The following configuration file contains the configurations for three HDFS clusters: paa_cluster
, pab_cluster
, and pac_cluster
. Among them, paa_cluster
does not use Kerberos authentication and does not use hdfs-ha
. pab_cluster
uses Kerberos authentication but does not use hdfs-ha
. pac_cluster
uses Kerberos authentication and is configured with a two-node hdfs-ha
cluster.
paa_cluster:
# namenode host
hdfs_namenode_host: paa_cluster_master
# name port
hdfs_namenode_port: 9000
# authentication method
hdfs_auth_method: simple
# rpc protection
hadoop_rpc_protection: privacy
data_transfer_protocol: true
pab_cluster:
hdfs_namenode_host: pab_cluster_master
hdfs_namenode_port: 9000
hdfs_auth_method: kerberos
krb_principal: gpadmin/hdw-68212b9b-master0@GPADMINCLUSTER2.COM
krb_principal_keytab: /home/gpadmin/hadoop.keytab
hadoop_rpc_protection: privacy
data_transfer_protocol: true
pac_cluster:
hdfs_namenode_host: pac_cluster_master
hdfs_namenode_port: 9000
hdfs_auth_method: kerberos
krb_principal: gpadmin/hdw-68212b9b-master0@GPADMINCLUSTER2.COM
krb_principal_keytab: /home/gpadmin/hadoop.keytab
hadoop_rpc_protection: privacy
is_ha_supported: true
dfs.nameservices: mycluster
dfs.ha.namenodes.mycluster: nn1,nn2
dfs.namenode.rpc-address.mycluster.nn1: 192.168.111.70:8020
dfs.namenode.rpc-address.mycluster.nn2: 192.168.111.71:8020
dfs.namenode.http-address.mycluster.nn1: 192.168.111.70:50070
dfs.namenode.http-address.mycluster.nn2: 192.168.111.71:50070
dfs.client.failover.proxy.provider.mycluster: org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailover..
Step 2. Create foreign data wrapper and Hive Connector plugin
Before synchronization, load the datalake_fdw
plugin used for reading HDFS, and create the foreign data wrapper for reading external tables.
Create the foreign data wrapper.
CREATE EXTENSION datalake_fdw; CREATE FOREIGN DATA WRAPPER datalake_fdw HANDLER datalake_fdw_handler VALIDATOR datalake_fdw_validator OPTIONS (mpp_execute 'all segments');
Before calling the function, you need to load the Hive Connector plugin.
CREATE EXTENSION hive_connector;
Step 3. Create server and user mapping
After creating the foreign data wrapper and Hive Connector, you need to create the server and user mapping, as shown in the following example:
SELECT public.create_foreign_server('sync_server', 'gpadmin', 'datalake_fdw', 'hdfs-cluster-1');
In the above example, the create_foreign_server
function takes the form as follows:
create_foreign_server(serverName,
userMapName,
dataWrapName,
hdfsClusterName);
This function creates a server and user mapping pointing to an HDFS cluster, which can be used by the Hive Connector to create foreign tables. The datalake_fdw
uses the server configuration to read data from the corresponding HDFS cluster when accessing external tables.
The parameters in the function are explained as follows:
serverName
: The name of the server to be created.userMapName
: The name of the user to be created on the server.dataWrapName
: The name of the data wrapper used for reading HDFS data.hdfsClusterName
: The name of the HDFS cluster where the Hive cluster is located, as specified in the configuration file.
Executing this function is equivalent to executing:
CREATE SERVER serverName FOREIGN DATA WRAPPER dataWrapName OPTIONS (......);
CREATE USER MAPPING FOR userMapName SERVER serverName OPTIONS (user 'userMapName');
Where the content of OPTIONS (......)
is be read from the configuration specified by hdfsClusterName
in the configuration file.
Step 4. Sync Hive objects to the database cluster
Syncing a Hive table
To sync a table from Hive to SynxDB, see the following example:
-- Syncs Hive tables in psql.
gpadmin=# select public.sync_hive_table('hive-cluster-1', 'mytestdb', 'weblogs', 'hdfs-cluster-1', 'mytestdb.weblogs', 'sync_server');
sync_hive_table
-----------------
t
(1 row)
The above example uses the sync_hive_table
function to perform the synchronization. The general form of the function is as follows:
sync_hive_table(hiveClusterName,
hiveDatabaseName,
hiveTableName,
hdfsClusterName,
destTableName,
serverName);
sync_hive_table(hiveClusterName,
hiveDatabaseName,
hiveTableName,
hdfsClusterName,
destTableName,
serverName,
forceSync);
This function syncs a table to SynxDB, with both non-forced and forced modes available. When forceSync
is set to true
, the sync is forced, which means that if a table with the same name already exists in SynxDB, the existing table is dropped before syncing. If the forceSync
parameter is not provided or is set to false
, the sync is forced, and an error will occur if a table with the same name exists.
The parameters are explained as follows:
hiveClusterName
: The name of the Hive cluster where the table to be synced is located, as specified in the configuration file.hiveDatabaseName
: The name of the database in Hive where the table to be synced belongs.hiveTableName
: The name of the table to be synced.hdfsClusterName
: The name of the HDFS cluster where the Hive cluster is located, as specified in the configuration file.destTableName
: The name of the table in SynxDB where the data will be synced.serverName
: The name of the server to be used when creating the foreign table with thedatalake_fdw
plugin.forceSync
: Indicates whether the sync should be forced. If yes, set totrue
; otherwise, set tofalse
.
Sync a Hive database
The following example shows how to sync a Hive database to SynxDB:
gpadmin=# select public.sync_hive_database('hive-cluster-1', 'default', 'hdfs-cluster-1', 'mytestdb', 'sync_server');
sync_hive_database
**--------------------
** t
(1 row)
The above example uses the sync_hive_database
function to perform the synchronization. The general form of the function is as follows:
sync_hive_database(hiveClusterName,
hiveDatabaseName,
hdfsClusterName,
destSchemaName,
serverName);
sync_hive_database(hiveClusterName,
hiveDatabaseName,
hdfsClusterName,
destSchemaName,
serverName,
forceSync);
This function syncs a Hive database into a schema in SynxDB, similar to syncing a single table. It supports both non-forced and forced modes. When forceSync
is set to true
, the sync is forced, which means that if tables with the same name already exist in SynxDB, the existing tables are dropped before syncing. If the forceSync
parameter is not provided or is set to false
, the sync is forced, and an error will occur if tables with the same name exist.
The parameters are explained as follows:
hiveClusterName
: The name of the Hive cluster as specified in the configuration file.hiveDatabaseName
: The name of the database to be synced.hdfsClusterName
: The name of the HDFS cluster where the Hive cluster is located, as specified in the configuration file.destSchemaName
: The name of the schema in SynxDB where the database is synced.serverName
: The name of the server to be used when creating the foreign table with thedatalake_fdw
plugin.Note
The interfaces used by the above functions are as follows:
sync_hive_table
calls the HMSthrift getTable
interface.sync_hive_database
calls the HMSthrift getTables
andgetTable
interfaces.
Examples of syncing tables
The following examples show only the commands for creating a table in Hive and syncing it to SynxDB, specifically the commands in Step 4. Sync Hive objects to the database cluster. The complete synchronization process should also include the steps before this Step 4.
Sync a Hive text table
Create the following text table in Hive.
-- Creates the Hive table in Beeline. CREATE TABLE weblogs ( client_ip STRING, full_request_date STRING, day STRING, month STRING, month_num INT, year STRING, referrer STRING, user_agent STRING ) STORED AS TEXTFILE;
Sync the text table to SynxDB.
-- Syncs the Hive table in psql. gpadmin=# select public.sync_hive_table('hive-cluster-1', 'mytestdb', 'weblogs', 'hdfs-cluster-1', 'mytestdb.weblogs', 'sync_server'); sync_hive_table ----------------- t (1 row)
Sync a Hive ORC table
Create an ORC table in Hive.
-- Creates the Hive table in Beeline. CREATE TABLE test_all_type ( column_a tinyint, column_b smallint, column_c int, column_d bigint, column_e float, column_f double, column_g string, column_h timestamp, column_i date, column_j char(20), column_k varchar(20), column_l decimal(20, 10) ) STORED AS ORC;
Sync the ORC table to SynxDB:
-- Syncs the Hive table in psql. gpadmin=# select public.sync_hive_table('hive-cluster-1', 'mytestdb', 'test_all_type', 'hdfs-cluster-1', 'mytestdb.test_all_type', 'sync_server'); sync_hive_table ----------------- t (1 row)
Sync a Hive ORC partitioned table
Create an ORC partitioned table in Hive.
-- Creates the Hive table in Beeline. CREATE TABLE test_partition_1_int ( a tinyint, b smallint, c int, d bigint, e float, f double, g string, h timestamp, i date, j char(20), k varchar(20), l decimal(20, 10) ) PARTITIONED BY ( m int ) STORED AS ORC; INSERT INTO test_partition_1_int VALUES (1, 1, 1, 1, 1, 1, '1', '2020-01-01 01:01:01', '2020-01-01', '1', '1', 10.01, 1); INSERT INTO test_partition_1_int VALUES (2, 2, 2, 2, 2, 2, '2', '2020-02-02 02:02:02', '2020-02-01', '2', '2', 11.01, 2); INSERT INTO test_partition_1_int VALUES (3, 3, 3, 3, 3, 3, '3', '2020-03-03 03:03:03', '2020-03-01', '3', '3', 12.01, 3); INSERT INTO test_partition_1_int VALUES (4, 4, 4, 4, 4, 4, '4', '2020-04-04 04:04:04', '2020-04-01', '4', '4', 13.01, 4); INSERT INTO test_partition_1_int VALUES (5, 5, 5, 5, 5, 5, '5', '2020-05-05 05:05:05', '2020-05-01', '5', '5', 14.01, 5);
Sync the ORC partitioned table to SynxDB.
-- psql syncs the Hive partitioned tables as one foreign table. gpadmin=# select public.sync_hive_table('hive-cluster-1', 'mytestdb', 'test_partition_1_int', 'hdfs-cluster-1', 'mytestdb.test_partition_1_int', 'sync_server'); sync_hive_table ----------------- t (1 row)
View the sync result.
gpadmin=# \d mytestdb.test_partition_1_int; Foreign table "mytestdb.test_partition_1_int" Column | Type | Collation | Nullable | Default | FDW options --------+-----------------------------+-----------+----------+---------+------------- a | smallint | | | | b | smallint | | | | c | integer | | | | d | bigint | | | | e | double precision | | | | f | double precision | | | | g | text | | | | h | timestamp without time zone | | | | i | date | | | | j | character(20) | | | | k | character varying(20) | | | | l | numeric(20,10) | | | | m | integer | | | | Server: sync_server FDW options: (filepath '/opt/hadoop/apache-hive-3.1.0-bin/user/hive/warehouse/mytestdb.db/test_partition_1_int', hive_cluster_name 'hive-cluster-1', datasource 'mytestdb.test_partition_1_int', hdfs_cluster_name 'hdfs-cluster-1', enablecache 'true', transactional 'false', partitionkeys 'm', format 'orc')
Sync a sample Hive database
Sync the Hive database to SynxDB.
gpadmin=# select public.sync_hive_database('hive-cluster-1', 'default', 'hdfs-cluster-1', 'mytestdb', 'sync_server'); sync_hive_database **-------------------- ** t (1 row)
View the result.
gpadmin=# \d mytestdb.* List of relations Schema | Name | Type | Owner | Storage ----------+-------------------------------+-------------------+---------+--------- mytestdb | test_all_type | foreign table | gpadmin | mytestdb | weblogs | foreign table | gpadmin | mytestdb | test_csv_default_option | foreign table | gpadmin | mytestdb | test_partition_1_int | foreign table | gpadmin | (4 rows)
Sync tables in Iceberg and Hudi formats
Apache Iceberg (hereafter referred to as Iceberg) is an open-source table format designed to improve the storage, access, and processing of big data. It is tailored for large-scale data warehouse scenarios, providing efficient data storage and query optimization. Apache Hudi (hereafter referred to as Hudi) is a library that offers efficient storage management for data lakes, aiming to simplify incremental data processing and stream processing.
When Hive was initially designed, it did not account for some of the modern data lake requirements, such as real-time data processing and more granular control over updates. However, Iceberg and Hudi offer Hive-compatible interfaces. Iceberg and Hudi provide efficient and flexible data management capabilities for modern big data platforms. Compared to traditional Hive data warehouses, they deliver higher performance and advanced data management features when handling large datasets. By integrating with Hive, they offer a smooth upgrade path, enabling you to transition from traditional data warehouse architectures to more modern and efficient data platform solutions.
The Hive Connector and datalake_fdw
support loading tables in Iceberg and Hudi formats into SynxDB.
Load Iceberg tables
Create a table in Iceberg format on Hive (using Hive v2.3.2 as an example).
CREATE DATABASE icebergdb; USE icebergdb; CREATE TABLE iceberg_table1 ( id int, name string, age int, address string ) STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler';
Create the corresponding foreign table in SynxDB and import the data.
CREATE FOREIGN TABLE iceberg_table1 ( id int, name text, age int, address text ) server sync_server OPTIONS (filePath 'icebergdb.iceberg_table1', catalog_type 'hive', server_name 'hive-cluster-1', hdfs_cluster_name 'hdfs-cluster-1', table_identifier 'icebergdb.iceberg_table1', format 'iceberg');
The table creation parameters are as follows:
catalog_type
: Specifies eitherhive
orhadoop
.filePath
If the
catalog_type
value ishive
, specify<database_name>.<table_name>
.If the
catalog_type
value ishadoop
, specify the path of the table in HDFS, such as/user/hadoop/hudidata/
.
table_identifier
: Specifies<database_name>.<table_name>
.format
: Specifiesiceberg
.
Load Hudi tables
Create a table in Hudi format on Hive (using Spark v2.4.4 as an example).
CREATE DATABASE hudidb; USE hudidb; _------ hudi_table1 ------_ CREATE TABLE hudi_table1 ( id int, name string, age int, address string ) using hudi;
Create the corresponding foreign table in SynxDB and import the data.
CREATE FOREIGN TABLE hudi_table1 ( id int, name text, age int, address text ) server sync_server OPTIONS (filePath 'hudidb.hudi_table1', catalog_type 'hive', server_name 'hive-cluster-1', hdfs_cluster_name 'hdfs-cluster-1', table_identifier 'hudidb.hudi_table1', format 'hudi');
Data type mapping
The following table shows the one-to-one mapping between table data types on a Hive cluster and table data types in SynxDB.
Hive |
SynxDB |
---|---|
binary |
bytea |
tinyint |
smallint |
smallint |
smallint |
int |
int |
bigint |
bigint |
float |
float4 |
double |
double precision |
string |
text |
timestamp |
timestamp |
date |
date |
char |
char |
varchar |
varchar |
decimal |
decimal |
Known issues
When the SynxDB coordinator and standby nodes are on the same machine, there might be port conflicts because of shared configurations. This might cause the dlagent
process to continuously restart, leading to high CPU usage.
Solution
Create a
config
folder in the standby node’s working directory (/home/gpadmin/workspace/cbdb_dev/gpAux/gpdemo/datadirs/standby/
).In the
config
directory, create a configuration fileapplication.properties
. Modify the port usingserver.port
, change the log file name withlogging.file.name
, and update the log file path withlogging.file.path
.The
application.properties
file is as follows:# Expose health, info, shutdown, metrics, and prometheus endpoints by default # 1. health: returns the status of the application {"status":"UP"} # 2. info: returns information about the build {"build":{"version":"X.X.X","artifact":"dlagent","name":"dlagent","time":"timestamp"}} # 3. shutdown: allows shutting down the application # 4. metrics: shows ‘metrics’ information for the application # 5. prometheus: exposes metrics in a format that can be scraped by a Prometheus server management.endpoints.web.exposure.include=health,info,shutdown,metrics,prometheus management.endpoint.shutdown.enabled=true management.endpoint.health.probes.enabled=true # common tags applied to all metrics management.metrics.tags.application=dlagent # dlagent-specific metrics dlagent.metrics.partition.enabled=true dlagent.metrics.report-frequency=1000 spring.profiles.active=default server.port=5888 # Whitelabel error options server.error.include-message=always server.error.include-stacktrace=on_param server.error.include-exception=false server.server-header=DlAgent Server server.max-http-header-size=1048576 # tomcat specific server.tomcat.threads.max=200 server.tomcat.accept-count=100 server.tomcat.connection-timeout=5m server.tomcat.mbeanregistry.enabled=true dlagent.tomcat.max-header-count=30000 dlagent.tomcat.disable-upload-timeout=false dlagent.tomcat.connection-upload-timeout=5m # timeout (ms) for the request - 1 day spring.mvc.async.request-timeout=86400000 dlagent.task.thread-name-prefix=dlagent-response- dlagent.task.pool.allow-core-thread-timeout=false dlagent.task.pool.core-size=8 dlagent.task.pool.max-size=200 dlagent.task.pool.queue-capacity=0 # logging dlagent.log.level=info logging.config=classpath:log4j2-dlagent.xml logging.file.name=${MASTER_DATA_DIRECTORY:/home/gpadmin/workspace/cbdb_dev/gpAux/gpdemo/datadirs/standby/demoDataDir-1}/pg_log/dlagent.log logging.file.path=${MASTER_DATA_DIRECTORY:/home/gpadmin/workspace/cbdb_dev/gpAux/gpdemo/datadirs/standby/demoDataDir-1}/pg_log