跳转至

Spark WriterGraph

概述Graph

Spark Writer 是 NebulaGraph 基于 Spark 的分布式数据导入工具,能够将多种数据仓库中的数据转化为图的点和边,并批量导入到图数据库中。目前支持的数据仓库有:

  • HDFS,包括 Parquet、JSON、ORC 和 CSV 格式的文件
  • HIVE

Spark Writer 支持并发导入多个 tag、edge,支持不同 tag/edge 配置不同的数据仓库。

软件要求Graph

注意: 为确保 NebulaGraph Spark Writer 正常使用,请确保你的机器已安装:

  • Spark 2.0 及以上版本
  • Hive 2.3 及以上版本
  • Hadoop 2.0 及以上版本

获取 Spark WriterGraph

编译源码Graph

git clone https://github.com/vesoft-inc/nebula.git
cd nebula/src/tools/spark-sstfile-generator
mvn compile package

或者直接下载

从云存储 OSS 下载Graph

wget https://oss-cdn.nebula-graph.com.cn/jar-packages/sst.generator-1.2.1.jar

使用流程Graph

基本流程分为以下几步:

  1. 在 NebulaGraph 中创建图模型,构图
  2. 编写数据文件
  3. 编写输入源映射文件
  4. 导入数据

构图Graph

构图请参考Graph中的示例构图。

注意:请先在 NebulaGraph 中完成构图(创建图空间和定义图数据 Schema),再通过本工具向 NebulaGraph 中写入数据。

数据示例Graph

点Graph

顶点数据文件由一行一行的数据组成,文件中每一行表示一个点和它的属性。一般来说,第一列为点的 ID ——此列的名称将在后文的映射文件中指定,其他列为点的属性。

  • player 顶点数据
{"id":100,"name":"Tim Duncan","age":42}
{"id":101,"name":"Tony Parker","age":36}
{"id":102,"name":"LaMarcus Aldridge","age":33}

边Graph

边数据文件由一行一行的数据组成,文件中每一行表示一条边和它的属性。一般来说,第一列为起点 ID,第二列为终点 ID,起点 ID 列及终点 ID 列会在映射文件中指定。其他列为边属性。下面以 JSON 格式为例进行说明。

以边 边 follow 的数据为例:

  • 无 rank 的边
{"source":100,"target":101,"likeness":95}
{"source":101,"target":100,"likeness":95}
{"source":101,"target":102,"likeness":90}
  • 有 rank 的边
{"source":100,"target":101,"likeness":95,"ranking":2}
{"source":101,"target":100,"likeness":95,"ranking":1}
{"source":101,"target":102,"likeness":90,"ranking":3}

含有地理位置 Geo 的数据Graph

Spark Writer 支持 Geo 数据导入,Geo 数据用 latitudelongitude 字段描述经纬度,数据类型为 double。

{"latitude":30.2822095,"longitude":120.0298785,"target":0,"dp_poi_name":"0"}
{"latitude":30.2813834,"longitude":120.0208692,"target":1,"dp_poi_name":"1"}
{"latitude":30.2807347,"longitude":120.0181162,"target":2,"dp_poi_name":"2"}
{"latitude":30.2812694,"longitude":120.0164896,"target":3,"dp_poi_name":"3"}

数据源文件Graph

目前 Spark Writer 支持的数据源有:

  • HDFS
  • HIVE
HDFS 文件Graph

支持的文件格式包括:

  • Parquet
  • JSON
  • CSV
  • ORC

Player 的 Parquet 示例如下:

+---+---+------------+
|age| id|        name|
+---+---+------------+
| 42|100| Tim Duncan |
| 36|101| Tony Parker|
+---+---+------------+

JSON 示例如下:

{"id":100,"name":"Tim Duncan","age":42}
{"id":101,"name":"Tony Parker","age":36}

CSV 示例如下:

age,id,name
42,100,Tim Duncan
36,101,Tony Parker
数据库Graph

Spark Writer 支持以数据库作为数据源,目前支持 HIVE。

Player 表结构如下:

col_name data_type comment
id int
name string
age int

编写配置文件Graph

配置文件由 Spark 相关信息,Nebula 相关信息,以及 tags 映射 和 edges 映射块组成。Spark 信息配置了 Spark 运行的相关参数,Nebula 相关信息配置了连接 NebulaGraph 的用户名和密码等信息。 tags 映射和 edges 映射分别对应多个 tag/edge 的输入源映射,描述每个 tag/edge 的数据源等基本信息,不同 tag/edge 可以来自不同数据源。

输入源的映射文件示例:

{
  # Spark 相关信息配置
  # 参见: http://spark.apache.org/docs/latest/configuration.html
  spark: {
    app: {
      name: Spark Writer
    }

    driver: {
      cores: 1
      maxResultSize: 1G
    }

    cores {
      max: 16
    }
  }

  # NebulaGraph 相关信息配置
  nebula: {
    # 查询引擎 IP 列表
    addresses: ["127.0.0.1:3699"]

    # 连接 NebulaGraph 服务的用户名和密码
    user: user
    pswd: password

    # NebulaGraph 图空间名称
    space: test

    # thrift 超时时长及重试次数
    # 如未设置,则默认值分别为 3000 和 3
    connection {
      timeout: 3000
      retry: 3
    }

  # nGQL 查询重试次数
  # 如未设置,则默认值为 3
    execution {
      retry: 3
    }
  }

  # 处理标签
  tags: [

    # 从 HDFS 文件加载数据, 此处数据类型为 Parquet
    # tag 名称为 tag name 0
    #  HDFS Parquet 文件的中的 field_0、field_1、field_2 将写入 tag_name_0
    # 节点列为 vertex_key_field
    {
      name: tag_name_0
      type: parquet
      path: hdfs path
      fields: {
        field_0: nebula_field_0,
        field_1: nebula_field_1,
        field_2: nebula_field_2
      }
      vertex: vertex_key_field
      batch : 16
    }

    # 与上述类似
    # 从 Hive 加载将执行命令 $ {exec} 作为数据集
    {
      name: tag_name_1
      type: hive
      exec: "select hive_field_0, hive_field_1, hive_field_2 from database.table"
      fields: {
        hive_field_0: nebula_field_0,
        hive_field_1: nebula_field_1,
        hive_field_2: nebula_field_2
      }
      vertex: vertex_id_field
    }
  ]

  # 处理边
  edges: [
    # 从 HDFS 加载数据,数据类型为 JSON
    # 边名称为 edge_name_0
    # HDFS JSON 文件中的 field_0、field_1、field 2 将被写入 edge_name_0
    # 起始字段为 source_field,终止字段为 target_field ,边权重字段为 ranking_field。
    {
      name: edge_name_0
      type: json
      path: hdfs_path
      fields: {
        field_0: nebula_field_0,
        field_1: nebula_field_1,
        field_2: nebula_field_2
      }
      source:  source_field
      target:  target_field
      ranking: ranking_field
    }

    # 从 Hive 加载将执行命令 $ {exec} 作为数据集
    # 边权重为可选
    {
      name: edge_name_1
      type: hive
      exec: "select hive_field_0, hive_field_1, hive_field_2 from database.table"
      fields: {
        hive_field_0: nebula_field_0,
        hive_field_1: nebula_field_1,
        hive_field_2: nebula_field_2
      }
      source:  source_id_field
      target:  target_id_field
    }
  ]
}

Spark 配置信息Graph

下表给出了一些示例,所有可配置项请见 Graph。

字段 默认值 是否必须 说明
spark.app.name Spark Writer app 名称
spark.driver.cores 1 驱动程序进程的核数,仅适用于群集模式
spark.driver.maxResultSize 1G 每个 Spark 操作(例如收集)中所有分区的序列化结果的上限(以字节为单位)。至少应为 1M,否则应为 0(无限制)
spark.cores.max (not set) 当以“粗粒度”共享模式在独立部署群集或 Mesos 群集上运行时,跨群集(而非从每台计算机)请求应用程序的最大 CPU 核数。如果未设置,则默认值为 Spark 的独立集群管理器上的 spark.deploy.defaultCores 或 Mesos 上的 infinite(所有可用的内核)

NebulaGraph 配置信息Graph

字段 默认值 是否必须 说明
nebula.addresses 查询引擎的地址列表,逗号分隔
nebula.user 数据库用户名,默认为 user
nebula.pswd 数据库用户名对应密码,默认 user 密码为 password
nebula.space 导入数据对应的 space,本例中为 test
nebula.connection.timeout 3000 Thrift 连接超时时间
nebula.connection.retry 3 Thrift 连接重试次数
nebula.execution.retry 3 nGQL 语句执行重试次数

tags 和 edges 映射信息Graph

tag 和 edge 映射的选项比较类似。下面先介绍相同的选项,再分别介绍 tag 映射edge 映射的特有选项。

  • 相同的选项
    • type 指定上文中提到的数据类型,目前支持 “Parquet”、"JSON"、"ORC" 和 “CSV”,大小写不敏感,必填
    • path 适用于 HDFS 数据源,指定HDFS 文件或目录的绝对路径,type 为 HDFS 时,必填
    • exec 适用于 Hive 数据源, 当执行查询语句 type 为 HIVE 时,必填
    • fields 将输入源列的列名映射为 tag / edge 的属性名,必填
  • tag 映射的特有选项
    • vertex 指定某一列作为点的 ID 列,必填
  • edge 映射的特有选项
    • source 指定输入源某一列作为源点的 ID 列,必填
    • target 指定某一列作为目标点的 ID 列,必填
    • 当插入边有 ranking 值, ranking 指定某一列作为边 ranking 列,选填

数据源映射Graph

  • HDFS Parquet 文件
    • type 指定输入源类型,当为 parquet 时大小写不敏感,必填
    • path 指定 HDFS 文件或目录的路径,必须是 HDFS 的绝对路径,必填
  • HDFS JSON 文件
    • type 指定输入源类型,当为 JSON 时大小写不敏感,必填
    • path 指定 HDFS 文件或目录的路径,必须是 HDFS 的绝对路径,必填
  • HIVE ORC 文件
    • type 指定输入源类型,当为 ORC时大小写不敏感,必填
    • path 指定 HDFS 文件或目录的路径,必须是 HDFS 的绝对路径,必填
  • HIVE CSV 文件
    • type 指定输入源类型,当为 CSV 时大小写不敏感,必填
    • path 指定 HDFS 文件或目录的路径,必须是 HDFS 的绝对路径,必填
  • HIVE
    • type 指定输入源类型,当为 HIVE 时大小写不敏感,必填
    • exec 指定 HIVE 执行查询的语句,必填

执行命令导入数据Graph

导入数据命令:

bin/spark-submit \
 --class com.vesoft.nebula.tools.generator.v2.SparkClientGenerator \
 --master ${MASTER-URL} \
 ${SPARK_WRITER_JAR_PACKAGE} -c conf/test.conf -h -d

参数说明:

Abbreviation Required Default Description 示例
--class yes 指定程序主类
--master yes 指定spark cluster master url,请参见 Graph e.g. spark://23.195.26.187:7077
-c / --config  yes 上文所编写的配置文件路径
-h / --hive no false 用于指定是否支持 Hive
-d / --directly no false true 为客户端方式插入;
false 为 sst 方式导入 (TODO)
-D / --dry no false 检查配置文件是否正确

性能测试结果Graph

三台物理机 (56 核,250G 内存,万兆网,SSD),写 1 亿条数据(每条数据三个字段,每个 batch 64 条记录),用时 4 分钟(40万条/秒)。


最后更新: 2021年4月15日