跳转至

NebulaGraph Spark Connector

NebulaGraph Spark Connector 是一个 Spark 连接器,提供通过 Spark 标准形式读写 NebulaGraph 数据的能力。NebulaGraph Spark Connector 由 Reader 和 Writer 两部分组成。

  • Reader

    提供一个 Spark SQL 接口,用户可以使用该接口编程读取 NebulaGraph 图数据,单次读取一个点或 Edge type 的数据,并将读取的结果组装成 Spark 的 DataFrame。

  • Writer

    提供一个 Spark SQL 接口,用户可以使用该接口编程将 DataFrame 格式的数据逐条或批量写入 NebulaGraph。

更多使用说明请参见 NebulaGraph Spark Connector

版本兼容性

NebulaGraph Spark Connector、NebulaGraph 内核版本和 Spark 版本对应关系如下。

Spark Connector 版本 NebulaGraph 版本 Spark 版本
nebula-spark-connector_3.0-3.0-SNAPSHOT.jar nightly 3.x
nebula-spark-connector_2.2-3.0-SNAPSHOT.jar nightly 2.2.x
nebula-spark-connector-3.0-SNAPSHOT.jar nightly 2.4.x
nebula-spark-connector_2.2-3.4.0.jar 3.x 2.2.x
nebula-spark-connector-3.4.0.jar 3.x 2.4.x
nebula-spark-connector_2.2-3.3.0.jar 3.x 2.2.x
nebula-spark-connector-3.3.0.jar 3.x 2.4.x
nebula-spark-connector-3.0.0.jar 3.x 2.4.x
nebula-spark-connector-2.6.1.jar 2.6.0, 2.6.1 2.4.x
nebula-spark-connector-2.6.0.jar 2.6.0, 2.6.1 2.4.x
nebula-spark-connector-2.5.1.jar 2.5.0, 2.5.1 2.4.x
nebula-spark-connector-2.5.0.jar 2.5.0, 2.5.1 2.4.x
nebula-spark-connector-2.1.0.jar 2.0.0, 2.0.1 2.4.x
nebula-spark-connector-2.0.1.jar 2.0.0, 2.0.1 2.4.x
nebula-spark-connector-2.0.0.jar 2.0.0, 2.0.1 2.4.x

适用场景

NebulaGraph Spark Connector 适用于以下场景:

  • 在不同的 NebulaGraph 集群之间迁移数据。
  • 在同一个 NebulaGraph 集群内不同图空间之间迁移数据。
  • NebulaGraph 与其他数据源之间迁移数据。

特性

NebulaGraph Spark Connector 3.4.0版本特性如下:

  • 提供多种连接配置项,如超时时间、连接重试次数、执行重试次数等。
  • 提供多种数据配置项,如写入数据时设置对应列为点 ID、起始点 ID、目的点 ID 或属性。
  • Reader 支持无属性读取和全属性读取。
  • Reader 支持将 NebulaGraph 数据读取成 Graphx 的 VertexRDD 和 EdgeRDD,支持非 Long 型点 ID。
  • 统一了 SparkSQL 的扩展数据源,统一采用 DataSourceV2 进行 NebulaGraph 数据扩展。
  • 支持insertupdatedelete三种写入模式。insert模式会插入(覆盖)数据,update模式仅会更新已存在的数据,delete模式只删除数据。
  • 支持与 NebulaGraph 之间的 SSL 加密连接。

更新说明

Release notes

获取 NebulaGraph Spark Connector

编译打包

  1. 克隆仓库nebula-spark-connector

    $ git clone -b release-3.4 https://github.com/vesoft-inc/nebula-spark-connector.git
    
  2. 进入目录nebula-spark-connector

  3. 编译打包。不同版本的 Spark 命令略有不同。

    Note

    需已安装对应版本 Spark。

    • Spark 2.4

      $ mvn clean package -Dmaven.test.skip=true -Dgpg.skip -Dmaven.javadoc.skip=true -pl nebula-spark-connector -am -Pscala-2.11 -Pspark-2.4
      
    • Spark 2.2

      $ mvn clean package -Dmaven.test.skip=true -Dgpg.skip -Dmaven.javadoc.skip=true -pl nebula-spark-connector_2.2 -am -Pscala-2.11 -Pspark-2.2
      
    • Spark 3.x

      $ mvn clean package -Dmaven.test.skip=true -Dgpg.skip -Dmaven.javadoc.skip=true -pl nebula-spark-connector_3.0 -am -Pscala-2.12 -Pspark-3.0
      

编译完成后,在目录的文件夹target下生成类似文件nebula-spark-connector-3.4.0-SHANPSHOT.jar

Maven 远程仓库下载

下载地址

使用方法

使用 NebulaGraph Spark Connector 读写 NebulaGraph 数据库时,只需要编写以下代码即可实现。

#  NebulaGraph 读取点边数据
spark.read.nebula().loadVerticesToDF()
spark.read.nebula().loadEdgesToDF()

#  dataframe 数据作为点和边写入 NebulaGraph 
dataframe.write.nebula().writeVertices()
dataframe.write.nebula().writeEdges()

nebula()接收两个配置参数,包括连接配置和读写配置。

从 NebulaGraph 读取数据

val config = NebulaConnectionConfig
  .builder()
  .withMetaAddress("127.0.0.1:9559")
  .withConenctionRetry(2)
  .withExecuteRetry(2)
  .withTimeout(6000)
  .build()

val nebulaReadVertexConfig: ReadNebulaConfig = ReadNebulaConfig
  .builder()
  .withSpace("test")
  .withLabel("person")
  .withNoColumn(false)
  .withReturnCols(List("birthday"))
  .withLimit(10)
  .withPartitionNum(10)
  .build()
val vertex = spark.read.nebula(config, nebulaReadVertexConfig).loadVerticesToDF()

val nebulaReadEdgeConfig: ReadNebulaConfig = ReadNebulaConfig
  .builder()
  .withSpace("test")
  .withLabel("knows")
  .withNoColumn(false)
  .withReturnCols(List("degree"))
  .withLimit(10)
  .withPartitionNum(10)
  .build()
val edge = spark.read.nebula(config, nebulaReadEdgeConfig).loadEdgesToDF()
  • NebulaConnectionConfig是连接 NebulaGraph 的配置,说明如下。

    参数 是否必须 说明
    withMetaAddress 所有 Meta 服务的地址,多个地址用英文逗号(,)隔开,格式为ip1:port1,ip2:port2,...。读取数据不需要配置withGraphAddress
    withConnectionRetry NebulaGraph Java Client 连接 NebulaGraph 的重试次数。默认值为1
    withExecuteRetry NebulaGraph Java Client 执行查询语句的重试次数。默认值为1
    withTimeout NebulaGraph Java Client 请求响应的超时时间。默认值为6000,单位:毫秒(ms)。
  • ReadNebulaConfig是读取 NebulaGraph 数据的配置,说明如下。

    参数 是否必须 说明
    withSpace NebulaGraph 图空间名称。
    withLabel NebulaGraph 图空间内的 Tag 或 Edge type 名称。
    withNoColumn 是否不读取属性。默认值为false,表示读取属性。取值为true时,表示不读取属性,此时withReturnCols配置无效。
    withReturnCols 配置要读取的点或边的属性集。格式为List(property1,property2,...),默认值为List(),表示读取全部属性。
    withLimit 配置 NebulaGraph Java Storage Client 一次从服务端读取的数据行数。默认值为 1000。
    withPartitionNum 配置读取 NebulaGraph 数据时 Spark 的分区数。默认值为 100。该值的配置最好不超过图空间的的分片数量(partition_num)。

向 NebulaGraph 写入数据

Note

DataFrame 中的列会自动作为属性写入 NebulaGraph。

val config = NebulaConnectionConfig
  .builder()
  .withMetaAddress("127.0.0.1:9559")
  .withGraphAddress("127.0.0.1:9669")
  .withConenctionRetry(2)
  .build()

val nebulaWriteVertexConfig: WriteNebulaVertexConfig = WriteNebulaVertexConfig      
  .builder()
  .withSpace("test")
  .withTag("person")
  .withVidField("id")
  .withVidPolicy("hash")
  .withVidAsProp(true)
  .withUser("root")
  .withPasswd("nebula")
  .withBatch(512)
  .build()    
df.write.nebula(config, nebulaWriteVertexConfig).writeVertices()

val nebulaWriteEdgeConfig: WriteNebulaEdgeConfig = WriteNebulaEdgeConfig      
  .builder()
  .withSpace("test")
  .withEdge("friend")
  .withSrcIdField("src")
  .withSrcPolicy(null)
  .withDstIdField("dst")
  .withDstPolicy(null)
  .withRankField("degree")
  .withSrcAsProperty(true)
  .withDstAsProperty(true)
  .withRankAsProperty(true)
  .withUser("root")
  .withPasswd("nebula")
  .withBatch(512)
  .build()
df.write.nebula(config, nebulaWriteEdgeConfig).writeEdges()

默认写入模式为insert,可以通过withWriteMode配置修改为updatedelete

val config = NebulaConnectionConfig
  .builder()
  .withMetaAddress("127.0.0.1:9559")
  .withGraphAddress("127.0.0.1:9669")
  .build()
val nebulaWriteVertexConfig = WriteNebulaVertexConfig
  .builder()
  .withSpace("test")
  .withTag("person")
  .withVidField("id")
  .withVidAsProp(true)
  .withBatch(512)
  .withWriteMode(WriteMode.UPDATE)
  .build()
df.write.nebula(config, nebulaWriteVertexConfig).writeVertices()
  • NebulaConnectionConfig是连接 NebulaGraph 的配置,说明如下。

    参数 是否必须 说明
    withMetaAddress 所有 Meta 服务的地址,多个地址用英文逗号(,)隔开,格式为ip1:port1,ip2:port2,...
    withGraphAddress Graph 服务的地址,多个地址用英文逗号(,)隔开,格式为ip1:port1,ip2:port2,...
    withConnectionRetry NebulaGraph Java Client 连接 NebulaGraph 的重试次数。默认值为1
  • WriteNebulaVertexConfig是写入点的配置,说明如下。

    参数 是否必须 说明
    withSpace NebulaGraph 图空间名称。
    withTag 写入点时需要关联的 Tag 名称。
    withVidField DataFrame 中作为点 ID 的列。
    withVidPolicy 写入点 ID 时,采用的映射函数,NebulaGraph 仅支持 HASH。默认不做映射。
    withVidAsProp DataFrame 中作为点 ID 的列是否也作为属性写入。默认值为false。如果配置为true,请确保 Tag 中有和VidField相同的属性名。
    withUser NebulaGraph 用户名。若未开启身份验证,无需配置用户名和密码。
    withPasswd NebulaGraph 用户名对应的密码。
    withBatch 一次写入的数据行数,默认值为512。当withWriteModeupdate时,该参数的最大值为512
    withWriteMode 写入模式。可选值为insertupdatedelete。默认为insert
    withDeleteEdge 删除点时是否删除该点关联的边。默认为false。当withWriteModedelete时生效。
  • WriteNebulaEdgeConfig是写入边的配置,说明如下。

    参数 是否必须 说明
    withSpace NebulaGraph 图空间名称。
    withEdge 写入边时需要关联的 Edge type 名称。
    withSrcIdField DataFrame 中作为起始点的列。
    withSrcPolicy 写入起始点时,采用的映射函数,NebulaGraph 仅支持 HASH。默认不做映射。
    withDstIdField DataFrame 中作为目的点的列。
    withDstPolicy 写入目的点时,采用的映射函数,NebulaGraph 仅支持 HASH。默认不做映射。
    withRankField DataFrame 中作为 rank 的列。默认不写入 rank。
    withSrcAsProperty DataFrame 中作为起始点的列是否也作为属性写入。默认值为false。如果配置为true,请确保 Edge type 中有和SrcIdField相同的属性名。
    withDstAsProperty DataFrame 中作为目的点的列是否也作为属性写入。默认值为false。如果配置为true,请确保 Edge type 中有和DstIdField相同的属性名。
    withRankAsProperty DataFrame 中作为 rank 的列是否也作为属性写入。默认值为false。如果配置为true,请确保 Edge type 中有和RankField相同的属性名。
    withUser NebulaGraph 用户名。若未开启身份验证,无需配置用户名和密码。
    withPasswd NebulaGraph 用户名对应的密码。
    withBatch 一次写入的数据行数,默认值为512。当withWriteModeupdate时,该参数的最大值为512
    withWriteMode 写入模式。可选值为insertupdatedelete。默认为insert

示例代码

详细的使用方式参见 示例代码


最后更新: August 7, 2023