跳转至

Nebula Spark ConnectorGraph

Nebula Spark Connector是一个Spark连接器,提供通过Spark标准形式读写NebulaGraph数据的能力。Nebula Spark Connector由Reader和Writer两部分组成。

  • Reader

    提供一个Spark SQL接口,用户可以使用该接口编程读取NebulaGraph图数据,单次读取一个点或Edge type的数据,并将读取的结果组装成Spark的DataFrame。

  • Writer

    提供一个Spark SQL接口,用户可以使用该接口编程将DataFrame格式的数据逐条或批量写入NebulaGraph。

更多使用说明请参见Graph。

适用场景Graph

Nebula Spark Connector适用于以下场景:

  • 在不同的NebulaGraph集群之间迁移数据。
  • 在同一个NebulaGraph集群内不同图空间之间迁移数据。
  • NebulaGraph与其他数据源之间迁移数据。
  • 结合Graph进行图计算。

特性Graph

Nebula Spark Connector 2.6.0版本特性如下:

  • 提供多种连接配置项,如超时时间、连接重试次数、执行重试次数等。
  • 提供多种数据配置项,如写入数据时设置对应列为点ID、起始点ID、目的点ID或属性。
  • Reader支持无属性读取和全属性读取。
  • Reader支持将NebulaGraph数据读取成Graphx的VertexRDD和EdgeRDD,支持非Long型点ID。
  • 统一了SparkSQL的扩展数据源,统一采用DataSourceV2进行NebulaGraph数据扩展。
  • 支持insertupdatedelete三种写入模式。insert模式会插入(覆盖)数据,update模式仅会更新已存在的数据,delete模式只删除数据。

获取Nebula Spark ConnectorGraph

编译打包Graph

Note

安装 Spark 2.4.x 版本。

  1. 克隆仓库nebula-spark-connector

    $ git clone -b v2.6 https://github.com/vesoft-inc/nebula-spark-connector.git
    
  2. 进入目录nebula-spark-connector

    $ cd nebula-spark-connector/nebula-spark-connector
    
  3. 编译打包。

    $ mvn clean package -Dmaven.test.skip=true -Dgpg.skip -Dmaven.javadoc.skip=true
    

编译完成后,在目录nebula-spark-connector/nebula-spark-connector/target/下生成类似文件nebula-spark-connector-2.6.0-SHANPSHOT.jar

Maven远程仓库下载Graph

Graph

使用方法Graph

使用Nebula Spark Connector读写NebulaGraph数据库时,只需要编写以下代码即可实现。

# 从Nebula Graph读取点边数据
spark.read.nebula().loadVerticesToDF()
spark.read.nebula().loadEdgesToDF()

# 将dataframe数据作为点和边写入Nebula Graph中
dataframe.write.nebula().writeVertices()
dataframe.write.nebula().writeEdges()

nebula()接收两个配置参数,包括连接配置和读写配置。

从NebulaGraph读取数据Graph

val config = NebulaConnectionConfig
  .builder()
  .withMetaAddress("127.0.0.1:9559")
  .withConenctionRetry(2)
  .withExecuteRetry(2)
  .withTimeout(6000)
  .build()

val nebulaReadVertexConfig: ReadNebulaConfig = ReadNebulaConfig
  .builder()
  .withSpace("test")
  .withLabel("person")
  .withNoColumn(false)
  .withReturnCols(List("birthday"))
  .withLimit(10)
  .withPartitionNum(10)
  .build()
val vertex = spark.read.nebula(config, nebulaReadVertexConfig).loadVerticesToDF()

val nebulaReadEdgeConfig: ReadNebulaConfig = ReadNebulaConfig
  .builder()
  .withSpace("test")
  .withLabel("knows")
  .withNoColumn(false)
  .withReturnCols(List("degree"))
  .withLimit(10)
  .withPartitionNum(10)
  .build()
val edge = spark.read.nebula(config, nebulaReadEdgeConfig).loadEdgesToDF()
  • NebulaConnectionConfig是连接NebulaGraph的配置,说明如下。

    参数 是否必须 说明
    withMetaAddress 所有Meta服务的地址,多个地址用英文逗号(,)隔开,格式为ip1:port1,ip2:port2,...。读取数据不需要配置withGraphAddress
    withConnectionRetry Nebula Java Client连接NebulaGraph的重试次数。默认值为1
    withExecuteRetry Nebula Java Client执行查询语句的重试次数。默认值为1
    withTimeout Nebula Java Client请求响应的超时时间。默认值为6000,单位:毫秒(ms)。
  • ReadNebulaConfig是读取NebulaGraph数据的配置,说明如下。

    参数 是否必须 说明
    withSpace NebulaGraph图空间名称。
    withLabel NebulaGraph图空间内的Tag或Edge type名称。
    withNoColumn 是否不读取属性。默认值为false,表示读取属性。取值为true时,表示不读取属性,此时withReturnCols配置无效。
    withReturnCols 配置要读取的点或边的属性集。格式为List(property1,property2,...),默认值为List(),表示读取全部属性。
    withLimit 配置Nebula Java Storage Client一次从服务端读取的数据行数。默认值为1000。
    withPartitionNum 配置读取NebulaGraph数据时Spark的分区数。默认值为100。该值的配置最好不超过图空间的的分片数量(partition_num)。

向NebulaGraph写入数据Graph

val config = NebulaConnectionConfig
  .builder()
  .withMetaAddress("127.0.0.1:9559")
  .withGraphAddress("127.0.0.1:9669")
  .withConenctionRetry(2)
  .build()

val nebulaWriteVertexConfig: WriteNebulaVertexConfig = WriteNebulaVertexConfig      
  .builder()
  .withSpace("test")
  .withTag("person")
  .withVidField("id")
  .withVidPolicy("hash")
  .withVidAsProp(true)
  .withUser("root")
  .withPasswd("nebula")
  .withBatch(512)
  .build()    
df.write.nebula(config, nebulaWriteVertexConfig).writeVertices()

val nebulaWriteEdgeConfig: WriteNebulaEdgeConfig = WriteNebulaEdgeConfig      
  .builder()
  .withSpace("test")
  .withEdge("friend")
  .withSrcIdField("src")
  .withSrcPolicy(null)
  .withDstIdField("dst")
  .withDstPolicy(null)
  .withRankField("degree")
  .withSrcAsProperty(true)
  .withDstAsProperty(true)
  .withRankAsProperty(true)
  .withUser("root")
  .withPasswd("nebula")
  .withBatch(512)
  .build()
df.write.nebula(config, nebulaWriteEdgeConfig).writeEdges()

默认写入模式为insert,可以通过withWriteMode配置修改为update

val config = NebulaConnectionConfig
  .builder()
  .withMetaAddress("127.0.0.1:9559")
  .withGraphAddress("127.0.0.1:9669")
  .build()
val nebulaWriteVertexConfig = WriteNebulaVertexConfig
  .builder()
  .withSpace("test")
  .withTag("person")
  .withVidField("id")
  .withVidAsProp(true)
  .withBatch(512)
  .withWriteMode(WriteMode.UPDATE)
  .build()
df.write.nebula(config, nebulaWriteVertexConfig).writeVertices()
  • NebulaConnectionConfig是连接NebulaGraph的配置,说明如下。

    参数 是否必须 说明
    withMetaAddress 所有Meta服务的地址,多个地址用英文逗号(,)隔开,格式为ip1:port1,ip2:port2,...
    withGraphAddress Graph服务的地址,多个地址用英文逗号(,)隔开,格式为ip1:port1,ip2:port2,...
    withConnectionRetry Nebula Java Client连接NebulaGraph的重试次数。默认值为1
  • WriteNebulaVertexConfig是写入点的配置,说明如下。

    参数 是否必须 说明
    withSpace NebulaGraph图空间名称。
    withTag 写入点时需要关联的Tag名称。
    withVidField DataFrame中作为点ID的列。
    withVidPolicy 写入点ID时,采用的映射函数,NebulaGraph 2.x仅支持HASH。默认不做映射。
    withVidAsProp DataFrame中作为点ID的列是否也作为属性写入。默认值为false。如果配置为true,请确保Tag中有和VidField相同的属性名。
    withUser NebulaGraph用户名。若未开启Graph,无需配置用户名和密码。
    withPasswd NebulaGraph用户名对应的密码。
    withBatch 一次写入的数据行数,默认值为1000。当withWriteModeupdate时,该参数的最大值为512
    withWriteMode 写入模式。可选值为insertupdate。默认为insert
  • WriteNebulaEdgeConfig是写入边的配置,说明如下。

    参数 是否必须 说明
    withSpace NebulaGraph图空间名称。
    withEdge 写入边时需要关联的Edge type名称。
    withSrcIdField DataFrame中作为起始点的列。
    withSrcPolicy 写入起始点时,采用的映射函数,NebulaGraph 2.x仅支持HASH。默认不做映射。
    withDstIdField DataFrame中作为目的点的列。
    withDstPolicy 写入目的点时,采用的映射函数,NebulaGraph 2.x仅支持HASH。默认不做映射。
    withRankField DataFrame中作为rank的列。默认不写入rank。
    withSrcAsProperty DataFrame中作为起始点的列是否也作为属性写入。默认值为false。如果配置为true,请确保Edge type中有和SrcIdField相同的属性名。
    withDstAsProperty DataFrame中作为目的点的列是否也作为属性写入。默认值为false。如果配置为true,请确保Edge type中有和DstIdField相同的属性名。
    withRankAsProperty DataFrame中作为rank的列是否也作为属性写入。默认值为false。如果配置为true,请确保Edge type中有和RankField相同的属性名。
    withUser NebulaGraph用户名。若未开启Graph,无需配置用户名和密码。
    withPasswd NebulaGraph用户名对应的密码。
    withBatch 一次写入的数据行数,默认值为1000。当withWriteModeupdate时,该参数的最大值为512
    withWriteMode 写入模式。可选值为insertupdate。默认为insert

最后更新: December 14, 2021
Back to top