跳转至

导入 Neo4j 数据

本文以一个示例说明如何使用 Exchange 将存储在 Neo4j 的数据导入 Nebula Graph。

实现方法

Exchange 使用 Neo4j Driver 4.0.1 实现对 Neo4j 数据的读取。执行批量导出之前,用户需要在配置文件中写入针对标签(label)和关系类型(Relationship Type)自动执行的 Cypher 语句,以及 Spark 分区数,提高数据导出性能。

Exchange 读取 Neo4j 数据时需要完成以下工作:

  1. Exchange 中的 Reader 会将配置文件中exec部分的 CypherRETURN语句后面的语句替换为COUNT(*),并执行这个语句,从而获取数据总量,再根据 Spark 分区数量计算每个分区的起始偏移量和大小。

  2. (可选)如果用户配置了check_point_path目录,Reader 会读取目录中的文件。如果处于续传状态,Reader 会计算每个 Spark 分区应该有的偏移量和大小。

  3. 在每个 Spark 分区里,Exchange 中的 Reader 会在 Cypher 语句后面添加不同的SKIPLIMIT语句,调用 Neo4j Driver 并行执行,将数据分布到不同的 Spark 分区中。

  4. Reader 最后将返回的数据处理成 DataFrame。

至此,Exchange 即完成了对 Neo4j 数据的导出。之后,数据被并行写入 Nebula Graph 数据库中。

整个过程如下图所示。

Nebula Graph® Exchange 从 Neo4j 数据库中导出数据再并行导入 Nebula Graph 数据库中

数据集

本文以 basketballplayer 数据集为例。

环境配置

本文示例在 MacOS 下完成,以下是相关的环境配置信息:

  • 硬件规格:

    • CPU:Intel(R) Xeon(R) CPU E5-2697 v3 @ 2.60GHz
    • CPU 内核数:14
    • 内存:251 GB
  • Spark:单机版,2.4.6 pre-build for Hadoop 2.7
  • Neo4j:3.5.20 Community Edition

前提条件

开始导入数据之前,用户需要确认以下信息:

  • 已经安装部署 Nebula Graph 并获取如下信息:

    • Graph 服务和 Meta 服务的的 IP 地址和端口。
    • 拥有 Nebula Graph 写权限的用户名和密码。
  • 已经编译 Exchange。详情请参见编译 Exchange。本示例中使用 Exchange 3.0.0。
  • 已经安装 Spark。
  • 了解 Nebula Graph 中创建 Schema 的信息,包括 Tag 和 Edge type 的名称、属性等。

操作步骤

步骤 1:在 Nebula Graph 中创建 Schema

分析数据,按以下步骤在 Nebula Graph 中创建 Schema:

  1. 确认 Schema 要素。Nebula Graph 中的 Schema 要素如下表所示。

    要素 名称 属性
    Tag player name string, age int
    Tag team name string
    Edge Type follow degree int
    Edge Type serve start_year int, end_year int
  2. 使用 Nebula Console 创建一个图空间 basketballplayer,并创建一个 Schema,如下所示。

    ## 创建图空间
    nebula> CREATE SPACE basketballplayer \
            (partition_num = 10, \
            replica_factor = 1, \
            vid_type = FIXED_STRING(30));
    
    ## 选择图空间 basketballplayer
    nebula> USE basketballplayer;
    
    ## 创建 Tag player
    nebula> CREATE TAG player(name string, age int);
    
    ## 创建 Tag team
    nebula> CREATE TAG team(name string);
    
    ## 创建 Edge type follow
    nebula> CREATE EDGE follow(degree int);
    
    ## 创建 Edge type serve
    nebula> CREATE EDGE serve(start_year int, end_year int);
    

更多信息,请参见快速开始

步骤 2:配置源数据

为了提高 Neo4j 数据的导出速度,在 Neo4j 数据库中为相应属性创建索引。详细信息,参考 Neo4j 用户手册

步骤 3:修改配置文件

编译 Exchange 后,复制target/classes/application.conf文件设置数据源相关的配置。在本示例中,复制的文件名为neo4j_application.conf。各个配置项的详细说明请参见配置说明

{
  # Spark 相关配置
  spark: {
    app: {
      name: Nebula Exchange 3.0.0
    }

    driver: {
      cores: 1
      maxResultSize: 1G
    }

    executor: {
        memory:1G
    }

    cores:{
      max: 16
    }
  }

  # Nebula Graph 相关配置
  nebula: {
    address:{
      graph:["127.0.0.1:9669"]
      meta:["127.0.0.1:9559"]
    }
    user: root
    pswd: nebula
    space: basketballplayer

    connection: {
      timeout: 3000
      retry: 3
    }

    execution: {
      retry: 3
    }

    error: {
      max: 32
      output: /tmp/errors
    }

    rate: {
      limit: 1024
      timeout: 1000
    }
  }

  # 处理点
  tags: [

    # 设置 Tag player 相关信息。
    {
      name: player
      type: {
        source: neo4j
        sink: client
      }
      server: "bolt://192.168.*.*:7687"
      user: neo4j
      password:neo4j
      database:neo4j
      exec: "match (n:player) return n.id as id, n.age as age, n.name as name"
      fields: [age,name]
      nebula.fields: [age,name]
      vertex: {
        field:id
      }
      partition: 10
      batch: 1000
      check_point_path: /tmp/test
   }
  # 设置 Tag team 相关信息。
  {
      name: team
      type: {
        source: neo4j
        sink: client
      }
      server: "bolt://192.168.*.*:7687"
      user: neo4j
      password:neo4j
      database:neo4j
      exec: "match (n:team) return n.id as id,n.name as name"
      fields: [name]
      nebula.fields: [name]
      vertex: {
        field:id
      }
      partition: 10
      batch: 1000
      check_point_path: /tmp/test
   }
  ]

  # 处理边数据
  edges: [
    # 设置 Edge type follow 相关信息
    {
      name: follow
      type: {
        source: neo4j
        sink: client
      }
      server: "bolt://192.168.*.*:7687"
      user: neo4j
      password:neo4j
      database:neo4j
      exec: "match (a:player)-[r:follow]->(b:player) return a.id as src, b.id as dst, r.degree as degree  order by id(r)"
      fields: [degree]
      nebula.fields: [degree]
      source: {
        field: src
      }
      target: {
        field: dst
      }
      #ranking: rank
      partition: 10
      batch: 1000
      check_point_path: /tmp/test
    }
   # 设置 Edge type serve 相关信息
   {
      name: serve
      type: {
        source: neo4j
        sink: client
      }
      server: "bolt://192.168.*.*:7687"
      user: neo4j
      password:neo4j
      database:neo4j
      exec: "match (a:player)-[r:serve]->(b:team) return a.id as src, b.id as dst, r.start_year as start_year, r.end_year as end_year  order by id(r)"
      fields: [start_year,end_year]
      nebula.fields: [start_year,end_year]
      source: {
        field: src
      }
      target: {
        field: dst
      }
      #ranking: rank
      partition: 10
      batch: 1000
      check_point_path: /tmp/test
    }
   ]
}

exec 配置说明

在配置tags.exec或者edges.exec参数时,需要填写 Cypher 查询语句。为了保证每次查询结果排序一致,并且为了防止在导入时丢失数据,强烈建议在 Cypher 查询语句中加入ORDER BY子句,同时,为了提高数据导入效率,最好选取有索引的属性作为排序的属性。如果没有索引,用户也可以观察默认的排序,选择合适的属性用于排序,以提高效率。如果默认的排序找不到规律,用户可以根据点或关系的 ID 进行排序,并且将partition设置为一个尽量小的值,减轻 Neo4j 的排序压力。

说明:使用ORDER BY子句会延长数据导入的时间。

另外,Exchange 需要在不同 Spark 分区执行不同SKIPLIMIT的 Cypher 语句,所以在tags.execedges.exec对应的 Cypher 语句中不能含有SKIPLIMIT子句。

tags.vertex 或 edges.vertex 配置说明

Nebula Graph 在创建点和边时会将 ID 作为唯一主键,如果主键已存在则会覆盖该主键中的数据。所以,假如将某个 Neo4j 属性值作为 Nebula Graph 的 ID,而这个属性值在 Neo4j 中是有重复的,就会导致重复 ID,它们对应的数据有且只有一条会存入 Nebula Graph 中,其它的则会被覆盖掉。由于数据导入过程是并发地往 Nebula Graph 中写数据,最终保存的数据并不能保证是 Neo4j 中最新的数据。

check_point_path 配置说明

如果启用了断点续传功能,为避免数据丢失,在断点和续传之间,数据库不应该改变状态,例如不能添加数据或删除数据,同时,不能更改partition数量配置。

步骤 4:向 Nebula Graph 导入数据

运行如下命令将文件数据导入到 Nebula Graph 中。关于参数的说明,请参见导入命令参数

${SPARK_HOME}/bin/spark-submit --master "local" --class com.vesoft.nebula.exchange.Exchange <nebula-exchange-3.0.0.jar_path> -c <neo4j_application.conf_path> 

Note

JAR 包有两种获取方式:自行编译或者从 maven 仓库下载。

示例:

${SPARK_HOME}/bin/spark-submit  --master "local" --class com.vesoft.nebula.exchange.Exchange  /root/nebula-exchange/nebula-exchange/target/nebula-exchange-3.0.0.jar  -c /root/nebula-exchange/nebula-exchange/target/classes/neo4j_application.conf

用户可以在返回信息中搜索batchSuccess.<tag_name/edge_name>,确认成功的数量。例如batchSuccess.follow: 300

步骤 5:(可选)验证数据

用户可以在 Nebula Graph 客户端(例如 Nebula Graph Studio)中执行查询语句,确认数据是否已导入。例如:

GO FROM "player100" OVER follow;

用户也可以使用命令 SHOW STATS 查看统计数据。

步骤 6:(如有)在 Nebula Graph 中重建索引

导入数据后,用户可以在 Nebula Graph 中重新创建并重建索引。详情请参见索引介绍


最后更新: March 7, 2023