跳转至

导入 HIVE 数据Graph

本文以一个示例说明如何使用 Exchange 将存储在 HIVE 的数据导入 NebulaGraph。

数据集Graph

本文以美国 Stanford Network Analysis Platform (SNAP) 提供的 Graph 以及由公开网络上获取的不重复的 97 个课程名称作为示例数据集,包括:

  • 两类点(usercourse),共计 7,144 个点。
  • 一种关系(action),共计 411,749 条边。

详细的数据集,您可以从 Graph 仓库中下载。

在本示例中,该数据集已经存入 HIVE 中名为 mooc 的数据库中,以 userscoursesactions 三个表存储了所有点和边的信息。以下为各个表的结构。

scala> sql("describe mooc.users").show
+--------+---------+-------+
|col_name|data_type|comment|
+--------+---------+-------+
|  userid|   bigint|   null|
+--------+---------+-------+

scala> sql("describe mooc.courses").show
+----------+---------+-------+
|  col_name|data_type|comment|
+----------+---------+-------+
|  courseid|   bigint|   null|
|coursename|   string|   null|
+----------+---------+-------+

scala> sql("describe mooc.actions").show
+--------+---------+-------+
|col_name|data_type|comment|
+--------+---------+-------+
|actionid|   bigint|   null|
|   srcid|   bigint|   null|
|   dstid|   string|   null|
|duration|   double|   null|
|feature0|   double|   null|
|feature1|   double|   null|
|feature2|   double|   null|
|feature3|   double|   null|
|   label|  boolean|   null|
+--------+---------+-------+

说明:Hive 的 bigint 与 NebulaGraph 的 int 对应。

环境配置Graph

本文示例在 MacOS 下完成,以下是相关的环境配置信息:

  • 硬件规格:
    • CPU:1.7 GHz Quad-Core Intel Core i7
    • 内存:16 GB
  • Spark:2.4.7,单机版
  • Hadoop:2.9.2,伪分布式部署
  • HIVE:2.3.7,Hive Metastore 数据库为 MySQL 8.0.22
  • NebulaGraph:V1.2.0,使用 Docker Compose 部署。详细信息,参考 Graph

前提条件Graph

开始导入数据之前,您需要确认以下信息:

  • 已经完成 Exchange 编译。详细信息,参考 Graph。本示例中使用 Exchange v1.2.1。
  • 已经安装 Spark。
  • 已经安装并开启 Hadoop 服务,并已启动 Hive Metastore 数据库(本示例中为 MySQL)。
  • 已经部署并启动 NebulaGraph,并获取:
    • Graph 服务、Meta 服务所在机器的 IP 地址和端口信息。
    • NebulaGraph 数据库的拥有写权限的用户名及其密码。
  • 在 NebulaGraph 中创建图数据模式(Schema)所需的所有信息,包括标签和边类型的名称、属性等。

操作步骤Graph

步骤 1. 在 NebulaGraph 中创建 SchemaGraph

按以下步骤在 NebulaGraph 中创建 Schema:

  1. 确认 Schema 要素:NebulaGraph 中的 Schema 要素如下表所示。

    要素 名称 属性
    标签(Tag) user userId int
    标签(Tag) course courseId int, courseName string
    边类型(Edge Type) action actionId int, duration double, label bool, feature0 double, feature1 double, feature2 double, feature3 double
  2. 在 NebulaGraph 里创建一个图空间 hive,并创建一个 Schema,如下所示。

    -- 创建图空间
    CREATE SPACE hive(partition_num=10, replica_factor=1);
    
    -- 选择图空间 hive
    USE hive;
    
    -- 创建标签 user
    CREATE TAG user(userId int);
    
    -- 创建标签 course
    CREATE TAG course(courseId int, courseName string);
    
    -- 创建边类型 action
    CREATE EDGE action (actionId int, duration double, label bool, feature0 double, feature1 double, feature2 double, feature3 double);
    

关于 NebulaGraph 构图的更多信息,参考《NebulaGraph Database 手册》的 Graph。

步骤 2. 使用 Spark SQL 确认 HIVE SQL 语句Graph

启动 spark-shell 环境后,依次运行以下语句,确认 Spark 能读取 HIVE 中的数据。

scala> sql("select userid from mooc.users").show
scala> sql("select courseid, coursename from mooc.courses").show
scala> sql("select actionid, srcid, dstid, duration, feature0, feature1, feature2, feature3, label from mooc.actions").show

以下为 mooc.actions 表中读出的结果。

+--------+-----+--------------------+--------+------------+------------+-----------+-----------+-----+
|actionid|srcid|               dstid|duration|    feature0|    feature1|   feature2|   feature3|label|
+--------+-----+--------------------+--------+------------+------------+-----------+-----------+-----+
|       0|    0|Environmental Dis...|     0.0|-0.319991479|-0.435701433|0.106783779|-0.06730924|false|
|       1|    0|  History of Ecology|     6.0|-0.319991479|-0.435701433|0.106783779|-0.06730924|false|
|       2|    0|      Women in Islam|    41.0|-0.319991479|-0.435701433|0.106783779|-0.06730924|false|
|       3|    0|  History of Ecology|    49.0|-0.319991479|-0.435701433|0.106783779|-0.06730924|false|
|       4|    0|      Women in Islam|    51.0|-0.319991479|-0.435701433|0.106783779|-0.06730924|false|
|       5|    0|Legacies of the A...|    55.0|-0.319991479|-0.435701433|0.106783779|-0.06730924|false|
|       6|    0|          ITP Core 2|    59.0|-0.319991479|-0.435701433|0.106783779|-0.06730924|false|
|       7|    0|The Research Pape...|    62.0|-0.319991479|-0.435701433|0.106783779|-0.06730924|false|
|       8|    0|        Neurobiology|    65.0|-0.319991479|-0.435701433|0.106783779|-0.06730924|false|
|       9|    0|           Wikipedia|   113.0|-0.319991479|-0.435701433|1.108826104|12.77723482|false|
|      10|    0|Media History and...|   226.0|-0.319991479|-0.435701433|0.607804941|149.4512115|false|
|      11|    0|             WIKISOO|   974.0|-0.319991479|-0.435701433|1.108826104|3.344522776|false|
|      12|    0|Environmental Dis...|  1000.0|-0.319991479|-0.435701433|0.106783779|-0.06730924|false|
|      13|    0|             WIKISOO|  1172.0|-0.319991479|-0.435701433|1.108826104|1.136866766|false|
|      14|    0|      Women in Islam|  1182.0|-0.319991479|-0.435701433|0.106783779|-0.06730924|false|
|      15|    0|  History of Ecology|  1185.0|-0.319991479|-0.435701433|0.106783779|-0.06730924|false|
|      16|    0|Human Development...|  1687.0|-0.319991479|-0.435701433|0.106783779|-0.06730924|false|
|      17|    1|Human Development...|  7262.0|-0.319991479|-0.435701433|0.106783779|-0.06730924|false|
|      18|    1|  History of Ecology|  7266.0|-0.319991479|-0.435701433|0.106783779|-0.06730924|false|
|      19|    1|      Women in Islam|  7273.0|-0.319991479|-0.435701433|0.607804941|0.936170765|false|
+--------+-----+--------------------+--------+------------+------------+-----------+-----------+-----+
only showing top 20 rows

步骤 3. 修改配置文件Graph

完成 Exchange 编译后,进入 nebula-java/tools/exchange 目录,根据 target/classes/application.conf 文件修改 HIVE 数据源相关的配置文件。在本示例中,文件被重命名为 hive_application.conf。以下仅详细说明点和边数据的配置信息,本次示例中未使用的配置项已被注释,但是提供了配置说明。Spark 和 NebulaGraph 相关配置,参考 Graph。

{
  # Spark 相关配置
  spark: {
    app: {
      name: Spark Writer
    }
    driver: {
      cores: 1
      maxResultSize: 1G
    }
    cores {
      max: 16
    }
  }
  # NebulaGraph 相关配置
  nebula: {
    address:{
      # 以下为 NebulaGraph 的 Graph 服务和 Meta 服务所在机器的 IP 地址及端口
      # 如果有多个地址,格式为 "ip1:port","ip2:port","ip3:port"
      # 不同地址之间以英文逗号 (,) 隔开
      graph:["127.0.0.1:3699"]
      meta:["127.0.0.1:45500"]
    }
    # 填写的账号必须拥有 NebulaGraph 相应图空间的写数据权限
    user: user
    pswd: password
    # 填写 NebulaGraph 中需要写入数据的图空间名称
    space: hive
    connection {
      timeout: 3000
      retry: 3
    }
    execution {
      retry: 3
    }
    error: {
      max: 32
      output: /tmp/errors
    }
    rate: {
      limit: 1024
      timeout: 1000
    }
  }
  # 处理标签
  tags: [
    # 设置标签相关信息
    {
      # NebulaGraph 中对应的标签名称。
      name: user
      type: {
        # 指定数据源文件格式,设置为 hive。
        source: hive
        # 指定点数据导入 NebulaGraph 的方式,
        # 可以设置为:client(以客户端形式导入)和 sst(以 SST 文件格式导入)。
        # 关于 SST 文件导入配置,参考文档:导入 SST 文件(https://
        # docs.nebula-graph.com.cn/nebula-exchange/
        # use-exchange/ex-ug-import-sst/)。
        sink: client
      }

      # 设置读取数据库 mooc 中 users 表数据的 SQL 语句
      exec: "select userid from mooc.users"

      # 在 fields 里指定 users 表中的列名称,其对应的 value
      # 会作为 NebulaGraph 中指定属性 userId (nebula.fields) 的数据源
      # fields 和 nebula.fields 里的配置必须一一对应
      # 如果需要指定多个列名称,用英文逗号(,)隔开
      fields: [userid]
      nebula.fields: [userId]

      # 指定表中某一列数据为 NebulaGraph 中点 VID 的来源。
      # vertex.field 的值必须与上述 fields 中的列名保持一致。
      # 如果数据不是 int 类型,则添加 vertex.policy 指定 VID 映射策略,建议设置为 "hash",参考以下 course 标签的设置。
      vertex: userid

      # 单次写入 NebulaGraph 的最大点数据量。
      batch: 256

      # Spark 分区数量
      partition: 32

      # isImplicit 的设置说明参考:https://github.com/vesoft-inc/nebula-java/
      # blob/v1.0/tools/exchange/src/main/resources/application.conf
      isImplicit: true
    }
    {
      name: course
      type: {
        source: hive
        sink: client
      }
      exec: "select courseid, coursename from mooc.courses"
      fields: [courseid, coursename]
      nebula.fields: [courseId, courseName]

      # 指定表中某一列数据为 NebulaGraph 中点 VID 的来源。
      # vertex.field 的值必须与上述 fields 中的列名保持一致。
      # 如果数据不是 int 类型,则添加 vertex.policy 指定 VID 映射策略,建议设置为 "hash"。
      vertex: {
        field: coursename
        policy: "hash"
      }
      batch: 256
      partition: 32
      isImplicit: true
    }

  ]

  # 处理边数据
  edges: [
    # 设置边类型 action 相关信息
    {
      # NebulaGraph 中对应的边类型名称。
      name: action

      type: {
        # 指定数据源文件格式,设置为 hive。
        source: hive

        # 指定边数据导入 NebulaGraph 的方式,
        # 可以设置为:client(以客户端形式导入)和 sst(以 SST 文件格式导入)。
        # 关于 SST 文件导入配置,参考文档:导入 SST 文件(https://
        # docs.nebula-graph.com.cn/nebula-exchange/
        # use-exchange/ex-ug-import-sst/)。
        sink: client
      }

      # 设置读取数据库 mooc 中 actions 表数据的 SQL 语句
      exec: "select actionid, srcid, dstid, duration, feature0, feature1, feature2, feature3, label from mooc.actions"

      # 在 fields 里指定 actions 表中的列名称,其对应的 value
      # 会作为 NebulaGraph 中 action 的属性(nebula.fields)值来源
      # fields 和 nebula.fields 里的配置必须一一对应
      # 如果需要指定多个列名称,用英文逗号(,)隔开
      fields: [actionid, duration, feature0, feature1, feature2, feature3, label]
      nebula.fields: [actionId, duration, feature0, feature1, feature2, feature3, label]

      # 在 source 里,将 actions 表中某一列作为边起点数据源
      # 在 target 里,将 actions 表中某一列作为边终点数据源
      # 如果数据源是 int 或 long 类型,直接指定列名
      # 如果数据源不是 int 类型,则添加 vertex.policy 指定 VID 映射策略,建议设置为 "hash"
      source: srcid
      target: {
        field: dstid
        policy: "hash"
      }

      # 单次写入 NebulaGraph 的最大点数据量。
      batch: 256

      # Spark 分区数量
      partition: 32
    }
  ]
}

步骤 4. (可选)检查配置文件是否正确Graph

完成配置后,运行以下命令检查配置文件格式是否正确。关于参数的说明,参考 Graph。

$SPARK_HOME/bin/spark-submit --master "local" --class com.vesoft.nebula.tools.importer.Exchange /path/to/exchange-1.2.1.jar -c /path/to/conf/hive_application.conf -h -D

步骤 5. 向 NebulaGraph 导入数据Graph

运行以下命令将 HIVE 中的数据导入到 NebulaGraph 中。关于参数的说明,参考 Graph。

$SPARK_HOME/bin/spark-submit --master "local" --class com.vesoft.nebula.tools.importer.Exchange /path/to/exchange-1.2.1.jar -c /path/to/conf/hive_application.conf -h

步骤 6. (可选)验证数据Graph

您可以在 NebulaGraph 客户端(例如 NebulaGraph Studio)里执行语句,确认数据是否已导入,例如:

GO FROM 1 OVER action;

如果返回边终点(action._dst)即表明数据已导入。

您也可以使用 db_dump 工具统计数据是否已经全部导入。详细的使用信息参考 Graph。

步骤 7. (可选)在 NebulaGraph 中重构索引Graph

导入数据后,您可以在 NebulaGraph 中重新创建并重构索引。详细信息,参考Graph。


最后更新: 2021年4月15日