跳转至

导入 Neo4j 数据

本文以一个示例说明如何使用 Exchange 将存储在 Neo4j 的数据导入 NebulaGraph 。

实现方法

Exchange 使用 Neo4j Driver 4.0.1 实现对 Neo4j 数据的读取。执行批量导出之前,用户需要在配置文件中写入针对标签(label)和关系类型(Relationship Type)自动执行的 Cypher 语句,以及 Spark 分区数,提高数据导出性能。

Exchange 读取 Neo4j 数据时需要完成以下工作:

  1. Exchange 中的 Reader 会将配置文件中exec部分的 CypherRETURN语句后面的语句替换为COUNT(*),并执行这个语句,从而获取数据总量,再根据 Spark 分区数量计算每个分区的起始偏移量和大小。

  2. (可选)如果用户配置了check_point_path目录,Reader 会读取目录中的文件。如果处于续传状态,Reader 会计算每个 Spark 分区应该有的偏移量和大小。

  3. 在每个 Spark 分区里,Exchange 中的 Reader 会在 Cypher 语句后面添加不同的SKIPLIMIT语句,调用 Neo4j Driver 并行执行,将数据分布到不同的 Spark 分区中。

  4. Reader 最后将返回的数据处理成 DataFrame。

至此,Exchange 即完成了对 Neo4j 数据的导出。之后,数据被并行写入 NebulaGraph 数据库中。

整个过程如下图所示。

NebulaGraph® Exchange 从 Neo4j 数据库中导出数据再并行导入 NebulaGraph 数据库中

数据集

本文以 basketballplayer 数据集为例。

环境配置

本文示例在 MacOS 下完成,以下是相关的环境配置信息:

  • 硬件规格:

    • CPU:Intel(R) Xeon(R) CPU E5-2697 v3 @ 2.60GHz
    • CPU 内核数:14
    • 内存:251 GB
  • Spark:单机版,2.4.6 pre-build for Hadoop 2.7
  • Neo4j:3.5.20 Community Edition
  • NebulaGraph :3.6.0。

前提条件

开始导入数据之前,用户需要确认以下信息:

  • 已经安装部署 NebulaGraph 并获取如下信息:

    • Graph 服务和 Meta 服务的的 IP 地址和端口。
    • 拥有 NebulaGraph 写权限的用户名和密码。
  • 已经编译 Exchange。详情请参见编译 Exchange。本示例中使用 Exchange 3.6.1。
  • 已经安装 Spark。
  • 了解 NebulaGraph 中创建 Schema 的信息,包括 Tag 和 Edge type 的名称、属性等。

操作步骤

步骤 1:在 NebulaGraph 中创建 Schema

分析数据,按以下步骤在 NebulaGraph 中创建 Schema:

  1. 确认 Schema 要素。 NebulaGraph 中的 Schema 要素如下表所示。

    要素 名称 属性
    Tag player name string, age int
    Tag team name string
    Edge Type follow degree int
    Edge Type serve start_year int, end_year int
  2. 使用 NebulaGraph Console 创建一个图空间 basketballplayer,并创建一个 Schema,如下所示。

    ## 创建图空间
    nebula> CREATE SPACE basketballplayer \
            (partition_num = 10, \
            replica_factor = 1, \
            vid_type = FIXED_STRING(30));
    
    ## 选择图空间 basketballplayer
    nebula> USE basketballplayer;
    
    ## 创建 Tag player
    nebula> CREATE TAG player(name string, age int);
    
    ## 创建 Tag team
    nebula> CREATE TAG team(name string);
    
    ## 创建 Edge type follow
    nebula> CREATE EDGE follow(degree int);
    
    ## 创建 Edge type serve
    nebula> CREATE EDGE serve(start_year int, end_year int);
    

更多信息,请参见快速开始

步骤 2:配置源数据

为了提高 Neo4j 数据的导出速度,在 Neo4j 数据库中为相应属性创建索引。详细信息,参考 Neo4j 用户手册

步骤 3:修改配置文件

编译 Exchange 后,复制target/classes/application.conf文件设置数据源相关的配置。在本示例中,复制的文件名为neo4j_application.conf。各个配置项的详细说明请参见配置说明

{
  # Spark 相关配置
  spark: {
    app: {
      name: NebulaGraph Exchange 3.6.1
    }

    driver: {
      cores: 1
      maxResultSize: 1G
    }

    executor: {
        memory:1G
    }

    cores:{
      max: 16
    }
  }

  # NebulaGraph 相关配置
  nebula: {
    address:{
      graph:["127.0.0.1:9669"]
      #任意一个 Meta 服务的地址。
      #如果您的 NebulaGraph 在虚拟网络中,如k8s,请配置 Leader Meta的地址。
      meta:["127.0.0.1:9559"]
    }
    user: root
    pswd: nebula
    space: basketballplayer

    connection: {
      timeout: 3000
      retry: 3
    }

    execution: {
      retry: 3
    }

    error: {
      max: 32
      output: /tmp/errors
    }

    rate: {
      limit: 1024
      timeout: 1000
    }
  }

  # 处理点
  tags: [

    # 设置 Tag player 相关信息。
    {
      name: player
      type: {
        source: neo4j
        sink: client
      }
      server: "bolt://192.168.*.*:7687"
      user: neo4j
      password:neo4j
      database:neo4j
      exec: "match (n:player) return n.id as id, n.age as age, n.name as name"
      fields: [age,name]
      nebula.fields: [age,name]
      vertex: {
        field:id
      # udf:{
      #            separator:"_"
      #            oldColNames:[field-0,field-1,field-2]
      #            newColName:new-field
      #        }
      # 为 VID 增加指定的前缀。例如 VID 为 12345,增加前缀 tag1 后为 tag1_12345。下划线无法修改。
      # prefix:"tag1"
      # 对 string 类型的 VID 进行哈希化操作。
      # policy:hash
      }

      # 批量操作类型,包括 INSERT、UPDATE 和 DELETE。默认为 INSERT。
      #writeMode: INSERT

      # 批量删除时是否删除该点关联的出边和入边。`writeMode`为`DELETE`时该参数生效。
      #deleteEdge: false

      partition: 10
      batch: 1000
      check_point_path: /tmp/test
   }
  # 设置 Tag team 相关信息。
  {
      name: team
      type: {
        source: neo4j
        sink: client
      }
      server: "bolt://192.168.*.*:7687"
      user: neo4j
      password:neo4j
      # bolt 3 does not support `select database`, please do not config database
      # database:neo4j
      exec: "match (n:team) return n.id as id,n.name as name order by id(n)"
      fields: [name]
      nebula.fields: [name]
      vertex: {
        field:id
      }
      partition: 10
      batch: 1000
      check_point_path: /tmp/test
   }
  ]

  # 处理边数据
  edges: [
    # 设置 Edge type follow 相关信息
    {
      name: follow
      type: {
        source: neo4j
        sink: client
      }
      server: "bolt://192.168.*.*:7687"
      user: neo4j
      password:neo4j
      # bolt 3 不支持多数据库,请勿配置数据库名。4 及以上可以配置数据库名。
      # database:neo4j
      exec: "match (a:player)-[r:follow]->(b:player) return a.id as src, b.id as dst, r.degree as degree  order by id(r)"
      fields: [degree]
      nebula.fields: [degree]
      source: {
        field: src
      # udf:{
      #            separator:"_"
      #            oldColNames:[field-0,field-1,field-2]
      #            newColName:new-field
      #        }
      # 为 VID 增加指定的前缀。例如 VID 为 12345,增加前缀 tag1 后为 tag1_12345。下划线无法修改。
      # prefix:"tag1"
      # 对 string 类型的 VID 进行哈希化操作。
      # policy:hash
      }
      target: {
        field: dst
      # udf:{
      #            separator:"_"
      #            oldColNames:[field-0,field-1,field-2]
      #            newColName:new-field
      #        }
      # 为 VID 增加指定的前缀。例如 VID 为 12345,增加前缀 tag1 后为 tag1_12345。下划线无法修改。
      # prefix:"tag1"
      # 对 string 类型的 VID 进行哈希化操作。
      # policy:hash
      }
      #ranking: rank
      # 批量操作类型,包括 INSERT、UPDATE 和 DELETE。默认为 INSERT。
      #writeMode: INSERT
      partition: 10
      batch: 1000
      check_point_path: /tmp/test
    }
   # 设置 Edge type serve 相关信息
   {
      name: serve
      type: {
        source: neo4j
        sink: client
      }
      server: "bolt://192.168.*.*:7687"
      user: neo4j
      password:neo4j
      exec: "match (a:player)-[r:serve]->(b:team) return a.id as src, b.id as dst, r.start_year as start_year, r.end_year as end_year  order by id(r)"
      fields: [start_year,end_year]
      nebula.fields: [start_year,end_year]
      source: {
        field: src
      }
      target: {
        field: dst
      }
      partition: 10
      batch: 1000
      check_point_path: /tmp/test
    }
   ]
}

exec 配置说明

在配置tags.exec或者edges.exec参数时,需要填写 Cypher 查询语句。为了保证每次查询结果排序一致,并且为了防止在导入时丢失数据,强烈建议在 Cypher 查询语句中加入ORDER BY子句,同时,为了提高数据导入效率,最好选取有索引的属性作为排序的属性。如果没有索引,用户也可以观察默认的排序,选择合适的属性用于排序,以提高效率。如果默认的排序找不到规律,用户可以根据点或关系的 ID 进行排序,并且将partition设置为一个尽量小的值,减轻 Neo4j 的排序压力。

说明:使用ORDER BY子句会延长数据导入的时间。

另外,Exchange 需要在不同 Spark 分区执行不同SKIPLIMIT的 Cypher 语句,所以在tags.execedges.exec对应的 Cypher 语句中不能含有SKIPLIMIT子句。

tags.vertex 或 edges.vertex 配置说明

NebulaGraph 在创建点和边时会将 ID 作为唯一主键,如果主键已存在则会覆盖该主键中的数据。所以,假如将某个 Neo4j 属性值作为 NebulaGraph 的 ID,而这个属性值在 Neo4j 中是有重复的,就会导致重复 ID,它们对应的数据有且只有一条会存入 NebulaGraph 中,其它的则会被覆盖掉。由于数据导入过程是并发地往 NebulaGraph 中写数据,最终保存的数据并不能保证是 Neo4j 中最新的数据。

check_point_path 配置说明

如果启用了断点续传功能,为避免数据丢失,在断点和续传之间,数据库不应该改变状态,例如不能添加数据或删除数据,同时,不能更改partition数量配置。

步骤 4:向 NebulaGraph 导入数据

运行如下命令将文件数据导入到 NebulaGraph 中。关于参数的说明,请参见导入命令参数

${SPARK_HOME}/bin/spark-submit --master "local" --class com.vesoft.nebula.exchange.Exchange <nebula-exchange-3.6.1.jar_path> -c <neo4j_application.conf_path> 

Note

JAR 包有两种获取方式:自行编译或者从 maven 仓库下载。

示例:

${SPARK_HOME}/bin/spark-submit  --master "local" --class com.vesoft.nebula.exchange.Exchange  /root/nebula-exchange/nebula-exchange/target/nebula-exchange-3.6.1.jar  -c /root/nebula-exchange/nebula-exchange/target/classes/neo4j_application.conf

用户可以在返回信息中搜索batchSuccess.<tag_name/edge_name>,确认成功的数量。例如batchSuccess.follow: 300

步骤 5:(可选)验证数据

用户可以在 NebulaGraph 客户端(例如 NebulaGraph Studio)中执行查询语句,确认数据是否已导入。例如:

LOOKUP ON player YIELD id(vertex);

用户也可以使用命令 SHOW STATS 查看统计数据。

步骤 6:(如有)在 NebulaGraph 中重建索引

导入数据后,用户可以在 NebulaGraph 中重新创建并重建索引。详情请参见索引介绍


最后更新: July 2, 2024