Nebula Spark ConnectorGraph
Nebula Spark Connector是一个Spark连接器,提供通过Spark标准形式读写NebulaGraph数据的能力。Nebula Spark Connector由Reader和Writer两部分组成。
-
Reader
提供一个Spark SQL接口,用户可以使用该接口编程读取NebulaGraph图数据,单次读取一个点或Edge type的数据,并将读取的结果组装成Spark的DataFrame。
-
Writer
提供一个Spark SQL接口,用户可以使用该接口编程将DataFrame格式的数据逐条或批量写入NebulaGraph。
更多使用说明请参见Graph。
适用场景Graph
Nebula Spark Connector适用于以下场景:
- 在不同的NebulaGraph集群之间迁移数据。
- 在同一个NebulaGraph集群内不同图空间之间迁移数据。
- NebulaGraph与其他数据源之间迁移数据。
- 结合Graph进行图计算。
特性Graph
Nebula Spark Connector 2.1.0版本特性如下:
- 提供多种连接配置项,如超时时间、连接重试次数、执行重试次数等。
- 提供多种数据配置项,如写入数据时设置对应列为点ID、起始点ID、目的点ID或属性。
- Reader支持无属性读取和全属性读取。
- Reader支持将NebulaGraph数据读取成Graphx的VertexRDD和EdgeRDD,支持非Long型点ID。
- Nebula Spark Connector 2.0统一了SparkSQL的扩展数据源,统一采用DataSourceV2进行NebulaGraph数据扩展。
- 支持
insert和update两种写入模式。insert模式会插入(覆盖)数据,update模式仅会更新已存在的数据。
获取Nebula Spark ConnectorGraph
编译打包Graph
-
克隆仓库
nebula-spark-utils。$ git clone -b 2.1.0 https://github.com/vesoft-inc/nebula-spark-utils.git -
进入目录
nebula-spark-connector。$ cd nebula-spark-utils/nebula-spark-connector -
编译打包。
$ mvn clean package -Dmaven.test.skip=true -Dgpg.skip -Dmaven.javadoc.skip=true
编译完成后,在目录nebula-spark-connector/target下生成类似文件nebula-spark-connector-2.1.0-SHANPSHOT.jar。
Maven远程仓库下载Graph
Graph
使用方法Graph
使用Nebula Spark Connector读写NebulaGraph数据库时,只需要编写以下代码即可实现。
# 从Nebula Graph读取点边数据。
spark.read.nebula().loadVerticesToDF()
spark.read.nebula().loadEdgesToDF()
# 将dataframe数据作为点和边写入Nebula Graph中。
dataframe.write.nebula().writeVertices()
dataframe.write.nebula().writeEdges()
nebula()接收两个配置参数,包括连接配置和读写配置。
从NebulaGraph读取数据Graph
val config = NebulaConnectionConfig
.builder()
.withMetaAddress("127.0.0.1:9559")
.withConenctionRetry(2)
.withExecuteRetry(2)
.withTimeout(6000)
.build()
val nebulaReadVertexConfig: ReadNebulaConfig = ReadNebulaConfig
.builder()
.withSpace("test")
.withLabel("person")
.withNoColumn(false)
.withReturnCols(List("birthday"))
.withLimit(10)
.withPartitionNum(10)
.build()
val vertex = spark.read.nebula(config, nebulaReadVertexConfig).loadVerticesToDF()
val nebulaReadEdgeConfig: ReadNebulaConfig = ReadNebulaConfig
.builder()
.withSpace("test")
.withLabel("knows")
.withNoColumn(false)
.withReturnCols(List("degree"))
.withLimit(10)
.withPartitionNum(10)
.build()
val edge = spark.read.nebula(config, nebulaReadEdgeConfig).loadEdgesToDF()
-
NebulaConnectionConfig是连接NebulaGraph的配置,说明如下。参数 是否必须 说明 withMetaAddress是 所有Meta服务的地址,多个地址用英文逗号(,)隔开,格式为 ip1:port1,ip2:port2,...。读取数据不需要配置withGraphAddress。withConnectionRetry否 Nebula Java Client连接NebulaGraph的重试次数。默认值为 1。withExecuteRetry否 Nebula Java Client执行查询语句的重试次数。默认值为 1。withTimeout否 Nebula Java Client请求响应的超时时间。默认值为 6000,单位:毫秒(ms)。
-
ReadNebulaConfig是读取NebulaGraph数据的配置,说明如下。参数 是否必须 说明 withSpace是 NebulaGraph图空间名称。 withLabel是 NebulaGraph图空间内的Tag或Edge type名称。 withNoColumn否 是否不读取属性。默认值为 false,表示读取属性。取值为true时,表示不读取属性,此时withReturnCols配置无效。withReturnCols否 配置要读取的点或边的属性集。格式为 List(property1,property2,...),默认值为List(),表示读取全部属性。withLimit否 配置Nebula Java Storage Client一次从服务端读取的数据行数。默认值为1000。 withPartitionNum否 配置读取NebulaGraph数据时Spark的分区数。默认值为100。该值的配置最好不超过图空间的的分片数量(partition_num)。
向NebulaGraph写入数据Graph
val config = NebulaConnectionConfig
.builder()
.withMetaAddress("127.0.0.1:9559")
.withGraphAddress("127.0.0.1:9669")
.withConenctionRetry(2)
.build()
val nebulaWriteVertexConfig: WriteNebulaVertexConfig = WriteNebulaVertexConfig
.builder()
.withSpace("test")
.withTag("person")
.withVidField("id")
.withVidPolicy("hash")
.withVidAsProp(true)
.withUser("root")
.withPasswd("nebula")
.withBatch(1000)
.build()
df.write.nebula(config, nebulaWriteVertexConfig).writeVertices()
val nebulaWriteEdgeConfig: WriteNebulaEdgeConfig = WriteNebulaEdgeConfig
.builder()
.withSpace("test")
.withEdge("friend")
.withSrcIdField("src")
.withSrcPolicy(null)
.withDstIdField("dst")
.withDstPolicy(null)
.withRankField("degree")
.withSrcAsProperty(true)
.withDstAsProperty(true)
.withRankAsProperty(true)
.withUser("root")
.withPasswd("nebula")
.withBatch(1000)
.build()
df.write.nebula(config, nebulaWriteEdgeConfig).writeEdges()
默认写入模式为insert,可以通过withWriteMode配置修改为update:
val config = NebulaConnectionConfig
.builder()
.withMetaAddress("127.0.0.1:9559")
.withGraphAddress("127.0.0.1:9669")
.build()
val nebulaWriteVertexConfig = WriteNebulaVertexConfig
.builder()
.withSpace("test")
.withTag("person")
.withVidField("id")
.withVidAsProp(true)
.withBatch(1000)
.withWriteMode(WriteMode.UPDATE)
.build()
df.write.nebula(config, nebulaWriteVertexConfig).writeVertices()
-
NebulaConnectionConfig是连接NebulaGraph的配置,说明如下。参数 是否必须 说明 withMetaAddress是 所有Meta服务的地址,多个地址用英文逗号(,)隔开,格式为 ip1:port1,ip2:port2,...。withGraphAddress是 Graph服务的地址,多个地址用英文逗号(,)隔开,格式为 ip1:port1,ip2:port2,...。withConnectionRetry否 Nebula Java Client连接NebulaGraph的重试次数。默认值为 1。
-
WriteNebulaVertexConfig是写入点的配置,说明如下。参数 是否必须 说明 withSpace是 NebulaGraph图空间名称。 withTag是 写入点时需要关联的Tag名称。 withVidField是 DataFrame中作为点ID的列。 withVidPolicy否 写入点ID时,采用的映射函数,NebulaGraph 2.0仅支持HASH。默认不做映射。 withVidAsProp否 DataFrame中作为点ID的列是否也作为属性写入。默认值为 false。如果配置为true,请确保Tag中有和VidField相同的属性名。withUser否 NebulaGraph用户名。若未开启Graph,无需配置用户名和密码。 withPasswd否 NebulaGraph用户名对应的密码。 withBatch是 一次写入的数据行数。默认值为 1000.withWriteMode否 写入模式。可选值为 insert和update。默认为insert。
-
WriteNebulaEdgeConfig是写入边的配置,说明如下。参数 是否必须 说明 withSpace是 NebulaGraph图空间名称。 withEdge是 写入边时需要关联的Edge type名称。 withSrcIdField是 DataFrame中作为起始点的列。 withSrcPolicy否 写入起始点时,采用的映射函数,NebulaGraph 2.0仅支持HASH。默认不做映射。 withDstIdField是 DataFrame中作为目的点的列。 withDstPolicy否 写入目的点时,采用的映射函数,NebulaGraph 2.0仅支持HASH。默认不做映射。 withRankField否 DataFrame中作为rank的列。默认不写入rank。 withSrcAsProperty否 DataFrame中作为起始点的列是否也作为属性写入。默认值为 false。如果配置为true,请确保Edge type中有和SrcIdField相同的属性名。withDstAsProperty否 DataFrame中作为目的点的列是否也作为属性写入。默认值为 false。如果配置为true,请确保Edge type中有和DstIdField相同的属性名。withRankAsProperty否 DataFrame中作为rank的列是否也作为属性写入。默认值为 false。如果配置为true,请确保Edge type中有和RankField相同的属性名。withUser否 NebulaGraph用户名。若未开启Graph,无需配置用户名和密码。 withPasswd否 NebulaGraph用户名对应的密码。 withBatch是 一次写入的数据行数。默认值为 1000.withWriteMode否 写入模式。可选值为 insert和update。默认为insert。