跳转至

导入 Neo4j 数据

本文以一个示例说明如何使用 Exchange 将存储在 Neo4j 的数据导入NebulaGraph。

实现方法

Exchange 使用 Neo4j Driver 4.0.1 实现对 Neo4j 数据的读取。执行批量导出之前,用户需要在配置文件中写入针对标签(label)和关系类型(Relationship Type)自动执行的 Cypher 语句,以及 Spark 分区数,提高数据导出性能。

Exchange 读取 Neo4j 数据时需要完成以下工作:

  1. Exchange 中的 Reader 会将配置文件中exec部分的 CypherRETURN语句后面的语句替换为COUNT(*),并执行这个语句,从而获取数据总量,再根据 Spark 分区数量计算每个分区的起始偏移量和大小。

  2. (可选)如果用户配置了check_point_path目录,Reader 会读取目录中的文件。如果处于续传状态,Reader 会计算每个 Spark 分区应该有的偏移量和大小。

  3. 在每个 Spark 分区里,Exchange 中的 Reader 会在 Cypher 语句后面添加不同的SKIPLIMIT语句,调用 Neo4j Driver 并行执行,将数据分布到不同的 Spark 分区中。

  4. Reader 最后将返回的数据处理成 DataFrame。

至此,Exchange 即完成了对 Neo4j 数据的导出。之后,数据被并行写入NebulaGraph数据库中。

整个过程如下图所示。

NebulaGraph® Exchange 从 Neo4j 数据库中导出数据再并行导入NebulaGraph数据库中

数据集

本文以 basketballplayer 数据集为例。

环境配置

本文示例在 MacOS 下完成,以下是相关的环境配置信息:

  • 硬件规格:

    • CPU:Intel(R) Xeon(R) CPU E5-2697 v3 @ 2.60GHz
    • CPU 内核数:14
    • 内存:251 GB
  • Spark:单机版,2.4.6 pre-build for Hadoop 2.7
  • Neo4j:3.5.20 Community Edition
  • NebulaGraph:3.6.0。

前提条件

开始导入数据之前,用户需要确认以下信息:

  • 已经安装部署NebulaGraph 并获取如下信息:

    • Graph 服务和 Meta 服务的的 IP 地址和端口。
    • 拥有NebulaGraph写权限的用户名和密码。
  • 已经编译 Exchange。详情请参见编译 Exchange。本示例中使用 Exchange 3.6.0。
  • 已经安装 Spark。
  • 了解NebulaGraph中创建 Schema 的信息,包括 Tag 和 Edge type 的名称、属性等。

操作步骤

步骤 1:在NebulaGraph中创建 Schema

分析数据,按以下步骤在NebulaGraph中创建 Schema:

  1. 确认 Schema 要素。NebulaGraph中的 Schema 要素如下表所示。

    要素 名称 属性
    Tag player name string, age int
    Tag team name string
    Edge Type follow degree int
    Edge Type serve start_year int, end_year int
  2. 使用 NebulaGraph Console 创建一个图空间 basketballplayer,并创建一个 Schema,如下所示。

    ## 创建图空间
    nebula> CREATE SPACE basketballplayer \
            (partition_num = 10, \
            replica_factor = 1, \
            vid_type = FIXED_STRING(30));
    
    ## 选择图空间 basketballplayer
    nebula> USE basketballplayer;
    
    ## 创建 Tag player
    nebula> CREATE TAG player(name string, age int);
    
    ## 创建 Tag team
    nebula> CREATE TAG team(name string);
    
    ## 创建 Edge type follow
    nebula> CREATE EDGE follow(degree int);
    
    ## 创建 Edge type serve
    nebula> CREATE EDGE serve(start_year int, end_year int);
    

更多信息,请参见快速开始

步骤 2:配置源数据

为了提高 Neo4j 数据的导出速度,在 Neo4j 数据库中为相应属性创建索引。详细信息,参考 Neo4j 用户手册

步骤 3:修改配置文件

编译 Exchange 后,复制target/classes/application.conf文件设置数据源相关的配置。在本示例中,复制的文件名为neo4j_application.conf。各个配置项的详细说明请参见配置说明

{
  # Spark 相关配置
  spark: {
    app: {
      name: NebulaGraph Exchange 3.6.0
    }

    driver: {
      cores: 1
      maxResultSize: 1G
    }

    executor: {
        memory:1G
    }

    cores:{
      max: 16
    }
  }

  # NebulaGraph 相关配置
  nebula: {
    address:{
      graph:["127.0.0.1:9669"]
      #任意一个 Meta 服务的地址。
      #如果您的 NebulaGraph 在虚拟网络中,如k8s,请配置 Leader Meta的地址。
      meta:["127.0.0.1:9559"]
    }
    user: root
    pswd: nebula
    space: basketballplayer

    connection: {
      timeout: 3000
      retry: 3
    }

    execution: {
      retry: 3
    }

    error: {
      max: 32
      output: /tmp/errors
    }

    rate: {
      limit: 1024
      timeout: 1000
    }
  }

  # 处理点
  tags: [

    # 设置 Tag player 相关信息。
    {
      name: player
      type: {
        source: neo4j
        sink: client
      }
      server: "bolt://192.168.*.*:7687"
      user: neo4j
      password:neo4j
      database:neo4j
      exec: "match (n:player) return n.id as id, n.age as age, n.name as name"
      fields: [age,name]
      nebula.fields: [age,name]
      vertex: {
        field:id
      # udf:{
      #            separator:"_"
      #            oldColNames:[field-0,field-1,field-2]
      #            newColName:new-field
      #        }
      # 为 VID 增加指定的前缀。例如 VID 为 12345,增加前缀 tag1 后为 tag1_12345。下划线无法修改。
      # prefix:"tag1"
      # 对 string 类型的 VID 进行哈希化操作。
      # policy:hash
      }

      # 批量操作类型,包括 INSERT、UPDATE 和 DELETE。默认为 INSERT。
      #writeMode: INSERT

      # 批量删除时是否删除该点关联的出边和入边。`writeMode`为`DELETE`时该参数生效。
      #deleteEdge: false

      partition: 10
      batch: 1000
      check_point_path: /tmp/test
   }
  # 设置 Tag team 相关信息。
  {
      name: team
      type: {
        source: neo4j
        sink: client
      }
      server: "bolt://192.168.*.*:7687"
      user: neo4j
      password:neo4j
      # bolt 3 does not support `select database`, please do not config database
      # database:neo4j
      exec: "match (n:team) return n.id as id,n.name as name order by id(n)"
      fields: [name]
      nebula.fields: [name]
      vertex: {
        field:id
      }
      partition: 10
      batch: 1000
      check_point_path: /tmp/test
   }
  ]

  # 处理边数据
  edges: [
    # 设置 Edge type follow 相关信息
    {
      name: follow
      type: {
        source: neo4j
        sink: client
      }
      server: "bolt://192.168.*.*:7687"
      user: neo4j
      password:neo4j
      # bolt 3 不支持多数据库,请勿配置数据库名。4 及以上可以配置数据库名。
      # database:neo4j
      exec: "match (a:player)-[r:follow]->(b:player) return a.id as src, b.id as dst, r.degree as degree  order by id(r)"
      fields: [degree]
      nebula.fields: [degree]
      source: {
        field: src
      # udf:{
      #            separator:"_"
      #            oldColNames:[field-0,field-1,field-2]
      #            newColName:new-field
      #        }
      # 为 VID 增加指定的前缀。例如 VID 为 12345,增加前缀 tag1 后为 tag1_12345。下划线无法修改。
      # prefix:"tag1"
      # 对 string 类型的 VID 进行哈希化操作。
      # policy:hash
      }
      target: {
        field: dst
      # udf:{
      #            separator:"_"
      #            oldColNames:[field-0,field-1,field-2]
      #            newColName:new-field
      #        }
      # 为 VID 增加指定的前缀。例如 VID 为 12345,增加前缀 tag1 后为 tag1_12345。下划线无法修改。
      # prefix:"tag1"
      # 对 string 类型的 VID 进行哈希化操作。
      # policy:hash
      }
      #ranking: rank
      # 批量操作类型,包括 INSERT、UPDATE 和 DELETE。默认为 INSERT。
      #writeMode: INSERT
      partition: 10
      batch: 1000
      check_point_path: /tmp/test
    }
   # 设置 Edge type serve 相关信息
   {
      name: serve
      type: {
        source: neo4j
        sink: client
      }
      server: "bolt://192.168.*.*:7687"
      user: neo4j
      password:neo4j
      exec: "match (a:player)-[r:serve]->(b:team) return a.id as src, b.id as dst, r.start_year as start_year, r.end_year as end_year  order by id(r)"
      fields: [start_year,end_year]
      nebula.fields: [start_year,end_year]
      source: {
        field: src
      }
      target: {
        field: dst
      }
      partition: 10
      batch: 1000
      check_point_path: /tmp/test
    }
   ]
}

exec 配置说明

在配置tags.exec或者edges.exec参数时,需要填写 Cypher 查询语句。为了保证每次查询结果排序一致,并且为了防止在导入时丢失数据,强烈建议在 Cypher 查询语句中加入ORDER BY子句,同时,为了提高数据导入效率,最好选取有索引的属性作为排序的属性。如果没有索引,用户也可以观察默认的排序,选择合适的属性用于排序,以提高效率。如果默认的排序找不到规律,用户可以根据点或关系的 ID 进行排序,并且将partition设置为一个尽量小的值,减轻 Neo4j 的排序压力。

说明:使用ORDER BY子句会延长数据导入的时间。

另外,Exchange 需要在不同 Spark 分区执行不同SKIPLIMIT的 Cypher 语句,所以在tags.execedges.exec对应的 Cypher 语句中不能含有SKIPLIMIT子句。

tags.vertex 或 edges.vertex 配置说明

NebulaGraph在创建点和边时会将 ID 作为唯一主键,如果主键已存在则会覆盖该主键中的数据。所以,假如将某个 Neo4j 属性值作为NebulaGraph的 ID,而这个属性值在 Neo4j 中是有重复的,就会导致重复 ID,它们对应的数据有且只有一条会存入NebulaGraph中,其它的则会被覆盖掉。由于数据导入过程是并发地往NebulaGraph中写数据,最终保存的数据并不能保证是 Neo4j 中最新的数据。

check_point_path 配置说明

如果启用了断点续传功能,为避免数据丢失,在断点和续传之间,数据库不应该改变状态,例如不能添加数据或删除数据,同时,不能更改partition数量配置。

步骤 4:向NebulaGraph导入数据

运行如下命令将文件数据导入到NebulaGraph中。关于参数的说明,请参见导入命令参数

${SPARK_HOME}/bin/spark-submit --master "local" --class com.vesoft.nebula.exchange.Exchange <nebula-exchange-3.6.0.jar_path> -c <neo4j_application.conf_path> 

Note

JAR 包有两种获取方式:自行编译或者从 maven 仓库下载。

示例:

${SPARK_HOME}/bin/spark-submit  --master "local" --class com.vesoft.nebula.exchange.Exchange  /root/nebula-exchange/nebula-exchange/target/nebula-exchange-3.6.0.jar  -c /root/nebula-exchange/nebula-exchange/target/classes/neo4j_application.conf

用户可以在返回信息中搜索batchSuccess.<tag_name/edge_name>,确认成功的数量。例如batchSuccess.follow: 300

步骤 5:(可选)验证数据

用户可以在NebulaGraph客户端(例如 NebulaGraph Studio)中执行查询语句,确认数据是否已导入。例如:

LOOKUP ON player YIELD id(vertex);

用户也可以使用命令 SHOW STATS 查看统计数据。

步骤 6:(如有)在NebulaGraph中重建索引

导入数据后,用户可以在NebulaGraph中重新创建并重建索引。详情请参见索引介绍


最后更新: October 17, 2023