跳转至

NebulaGraph Importer

NebulaGraph Importer(简称 Importer)是一款 NebulaGraph 的 CSV 文件单机导入工具,可以读取并批量导入多种数据源的 CSV 文件数据,还支持批量更新和删除操作。

功能

  • 支持多种数据源,包括本地、S3、OSS、HDFS、FTP、SFTP、GCS。
  • 支持导入 CSV 格式文件的数据。单个文件内可以包含多种 Tag、多种 Edge type 或者二者混合的数据。
  • 支持过滤数据源数据。
  • 支持批量操作,包括导入、更新、删除。
  • 支持同时连接多个 Graph 服务进行导入并且动态负载均衡。
  • 支持失败后重连、重试。
  • 支持多维度显示统计信息,包括导入时间、导入百分比等。统计信息支持打印在 Console 或日志中。
  • 支持 SSL 加密。

优势

  • 轻量快捷:不需要复杂环境即可使用,快速导入数据。
  • 灵活筛选:通过配置文件可以实现对 CSV 文件数据的灵活筛选。

版本兼容性

NebulaGraph Importer 版本和 NebulaGraph 内核的版本对应关系如下。

NebulaGraph 版本 NebulaGraph Importer 版本
3.x.x 3.x.x、4.x.x
2.x.x 2.x.x、3.x.x

Note

Importer 4.0.0 对 Importer 进行了重做,性能得到了提高,但配置文件不兼容旧版本。建议使用新版 Importer。

更新说明

Release notes

前提条件

在使用 NebulaGraph Importer 之前,请确保:

  • NebulaGraph 中已创建 Schema,包括图空间、Tag 和 Edge type,或者通过参数manager.hooks.before.statements设置。

操作步骤

创建 CSV 文件

准备好待导入的 CSV 文件并配置 YAML 文件,即可使用本工具向 NebulaGraph 批量导入数据。

Note

YAML 配置文件说明请参见下文的配置文件说明

下载二进制包运行

  1. Release 页面下载和安装二进制包,并添加执行权限。

    Note

    使用 RPM/DEB 包安装的文件路径为/usr/bin/nebula-importer

  2. nebula-importer的安装目录下,执行以下命令导入数据。

    $ ./<binary_file_name> --config <yaml_config_file_path>
    

源码编译运行

编译源码需要部署 Golang 环境。详情请参见 Golang 环境搭建

  1. 克隆仓库。

    $ git clone -b release-4.1 https://github.com/vesoft-inc/nebula-importer.git
    

    Note

    请使用正确的分支。不同分支的 rpc 协议不同。

  2. 进入目录nebula-importer

    $ cd nebula-importer
    
  3. 编译源码。

    $ make build
    
  4. 开始导入数据。

    $ ./bin/nebula-importer --config <yaml_config_file_path>
    

Docker 方式运行

使用 Docker 可以不必在本地安装 Go 语言环境,只需要拉取 NebulaGraph Importer 的镜像,并将本地配置文件和 CSV 数据文件挂载到容器中。命令如下:

$ docker pull vesoft/nebula-importer:<version>
$ docker run --rm -ti \
      --network=host \
      -v <config_file>:<config_file> \
      -v <data_dir>:<data_dir> \
      vesoft/nebula-importer:<version> \
      --config <config_file>
  • <config_file>:填写 YAML 配置文件的绝对路径。
  • <data_dir>:填写 CSV 数据文件的绝对路径。如果文件不在本地,请忽略该参数。
  • <version>:填写 Importer 的版本号,请填写v4

Note

建议使用相对路径。如果使用本地绝对路径,请检查路径映射到 Docker 中的路径。

例如:

$ docker pull vesoft/nebula-importer:v4
$ docker run --rm -ti \
      --network=host \
      -v /home/user/config.yaml:/home/user/config.yaml \
      -v /home/user/data:/home/user/data \
      vesoft/nebula-importer:v4 \
      --config /home/user/config.yaml

配置文件说明

NebulaGraph Importer 的 Github 内提供多种示例配置文件。配置文件用来描述待导入文件信息、 NebulaGraph 服务器信息等。下文将分类介绍配置文件内的字段。

Note

如果用户下载的是二进制包,请手动创建配置文件。

Client 配置

Client 配置存储客户端连接 NebulaGraph 相关的配置。

示例配置如下:

client:
  version: v3
  address: "192.168.1.100:9669,192.168.1.101:9669"
  user: root
  password: nebula
  ssl:
    enable: true
    certPath: "/home/xxx/cert/importer.crt"
    keyPath: "/home/xxx/cert/importer.key"
    caPath: "/home/xxx/cert/root.crt"
    insecureSkipVerify: false
  concurrencyPerAddress: 10
  reconnectInitialInterval: 1s
  retry: 3
  retryInitialInterval: 1s
参数 默认值 是否必须 说明
client.version v3 指定连接的 NebulaGraph 的大版本。当前仅支持v3
client.address "127.0.0.1:9669" 指定连接的 NebulaGraph 地址。多个地址用英文逗号(,)分隔。
client.user root NebulaGraph 的用户名。
client.password nebula NebulaGraph 用户名对应的密码。
client.ssl.enable false 指定是否开启 SSL 认证。
client.ssl.certPath - 指定 SSL 公钥证书的存储路径。
开启 SSL 认证后该参数必填。
client.ssl.keyPath - 指定 SSL 密钥的存储路径。
开启 SSL 认证后该参数必填。
client.ssl.caPath - 指定 CA 根证书的存储路径。
开启 SSL 认证后该参数必填。
client.ssl.insecureSkipVerify false 指定是否跳过验证服务端的证书链和主机名。如果设置为true,则接受服务端提供的任何证书链和主机名。
client.concurrencyPerAddress 10 单个 Graph 服务的客户端并发连接数。
client.reconnectInitialInterval 1s 重连间隔时间。
client.retry 3 nGQL 语句执行失败的重试次数。
client.retryInitialInterval 1s 重试间隔时间。

Manager 配置

Manager 配置是连接数据库后的人为控制配置。

示例配置如下:

manager:
  spaceName: basic_string_examples
  batch: 128
  readerConcurrency: 50
  importerConcurrency: 512
  statsInterval: 10s
  hooks:
    before:
      - statements:
        - |
            DROP SPACE IF EXISTS basic_string_examples;
            CREATE SPACE IF NOT EXISTS basic_string_examples(partition_num=5, replica_factor=1, vid_type=int);
            USE basic_string_examples;
        wait: 10s
    after:
      - statements:
          - |
            SHOW SPACES;
参数 默认值 是否必须 说明
manager.spaceName - 指定数据要导入的图空间。不支持同时导入多个图空间。
manager.batch 128 执行语句的批处理量(全局配置)。
对某个数据源单独设置批处理量可以使用下文的sources.batch
manager.readerConcurrency 50 读取器读取数据源的并发数。
manager.importerConcurrency 512 生成待执行的 nGQL 语句的并发数,然后会调用客户端执行这些语句。
manager.statsInterval 10s 打印统计信息的时间间隔。
manager.hooks.before.[].statements - 导入前在图空间内执行的命令。
manager.hooks.before.[].wait - 执行statements语句后的等待时间。
manager.hooks.after.[].statements - 导入后在图空间内执行的命令。
manager.hooks.after.[].wait - 执行statements语句后的等待时间。

Log 配置

Log 配置是设置日志相关配置。

示例配置如下:

log:
  level: INFO
  console: true
  files:
    - logs/nebula-importer.log   
参数 默认值 是否必须 说明
log.level INFO 日志级别。可选值为DEBUGINFOWARNERRORPANICFATAL
log.console true 存储日志时是否将日志同步打印到 Console。
log.files - 日志文件路径。需手动创建日志文件目录。

Source 配置

Source 配置中需要配置数据源信息、数据处理方式和 Schema 映射。

示例配置如下:

sources:
  - path: ./person.csv  # 指定存储数据文件的路径。如果使用相对路径,则路径和当前配置文件目录拼接。也支持通配符文件名,例如:./follower-*.csv,请确保所有匹配的文件具有相同的架构。
#  - s3: # AWS S3
#      endpoint: endpoint    # 可选。S3 服务端点,如果使用 AWS S3 可以省略。
#      region: us-east-1     # 必填。S3 服务的区域。
#      bucket: gdelt-open-data    # 必填。S3 服务中的 bucket。
#      key: events/20190918.export.csv     # 必填。S3 服务中文件的 key。
#      accessKeyID: ""    # 可选。S3 服务的访问密钥。如果是公共数据,则无需配置。
#      accessKeySecret: ""     # 可选。S3 服务的密钥。如果是公共数据,则无需配置。
#  - oss:
#      endpoint: https://oss-cn-hangzhou.aliyuncs.com    # 必填。OSS 服务端点。
#      bucket: bucketName    # 必填。OSS 服务中的 bucket。
#      key: objectKey    # 必填。OSS 服务中文件的 object key。
#      accessKeyID: accessKey    # 必填。OSS 服务的访问密钥。
#      accessKeySecret: secretKey    # 必填。OSS 服务的秘钥。
#  - ftp:
#      host: 192.168.0.10    # 必填。FTP 服务的主机。
#      port: 21    # 必填。FTP 服务的端口。
#      user: user    # 必填。FTP 服务的用户名。
#      password: password    # 必填。FTP 服务的密码。
#      path: "/events/20190918.export.csv"    # FTP 服务中文件的路径。
#  - sftp:
#      host: 192.168.0.10    # 必填。SFTP 服务的主机。
#      port: 22    # 必填。SFTP 服务的端口。
#      user: user    # 必填。SFTP 服务的用户名。
#      password: password    # 可选。SFTP 服务的密码。
#      keyFile: keyFile    # 可选。SFTP 服务的 SSH 密钥文件路径。
#      keyData: keyData    $ 可选。SFTP 服务的 SSH 密钥文件内容。
#      passphrase: passphrase    # 可选。SFTP 服务的 SSH 密钥密码。
#      path: "/events/20190918.export.csv"    # 必填。SFTP 服务中文件的路径。
#  - hdfs:
#      address: "127.0.0.1:8020"    # 必填。HDFS 服务的地址。
#      user: "hdfs"    # 可选。HDFS 服务的用户名。
#      servicePrincipalName: <Kerberos Service Principal Name>  # 可选。启用 Kerberos 认证时,HDFS 服务的 Kerberos 服务实例名称。
#      krb5ConfigFile: <Kerberos config file>  # 可选。启用 Kerberos 认证时,HDFS 服务的 Kerberos 配置文件路径,默认为`/etc/krb5.conf`。
#      ccacheFile: <Kerberos ccache file>  # 可选。启用 Kerberos 认证时,HDFS 服务的 Kerberos ccache 文件路径。
#      keyTabFile: <Kerberos keytab file>  # 可选。启用 Kerberos 认证时,HDFS 服务的 Kerberos keytab 文件路径。
#      password: <Kerberos password>  # 可选。启用 Kerberos 认证时,HDFS 服务的 Kerberos 密码。
#      dataTransferProtection: <Kerberos Data Transfer Protection>  # 可选。启用 Kerberos 认证时的传输加密类型。可选值为`authentication`、`integrity`、`privacy`。
#      disablePAFXFAST: false  # 可选。是否禁止客户端使用预身份验证(PA_FX_FAST)。
#      path: "/events/20190918.export.csv"    # 必填。HDFS 服务中文件的路径。也支持通配符文件名,例如:/events/*.export.csv,请确保所有匹配的文件具有相同的架构。
#  - gcs: # Google Cloud Storage
#      bucket: chicago-crime-sample  # 必填。GCS 服务中的 bucket 名称。
#      key: stats/000000000000.csv  # 必填。GCS 服务中文件的路径。
#      withoutAuthentication: false  # 可选。是否匿名访问。默认为 false,即使用凭证访问。
#      # 使用凭证访问时,credentialsFile 和 credentialsJSON 参数二选一即可。
#      credentialsFile: "/path/to/your/credentials/file"  # 可选。GCS 服务的凭证文件路径。
#      credentialsJSON: '{  # 可选。GCS 服务的凭证 JSON 内容。
#        "type": "service_account",
#        "project_id": "your-project-id",
#        "private_key_id": "key-id",
#        "private_key": "-----BEGIN PRIVATE KEY-----\nxxxxx\n-----END PRIVATE KEY-----\n",
#        "client_email": "your-client@your-project-id.iam.gserviceaccount.com",
#        "client_id": "client-id",
#        "auth_uri": "https://accounts.google.com/o/oauth2/auth",
#        "token_uri": "https://oauth2.googleapis.com/token",
#        "auth_provider_x509_cert_url": "https://www.googleapis.com/oauth2/v1/certs",
#        "client_x509_cert_url": "https://www.googleapis.com/robot/v1/metadata/x509/your-client%40your-project-id.iam.gserviceaccount.com",
#        "universe_domain": "googleapis.com"
#      }'

    batch: 256
    csv:
      delimiter: "|"
      withHeader: false
      lazyQuotes: false
    tags:
    - name: Person
#      mode: INSERT
#      filter:  
#        expr: Record[1] == "XXX"
      id:
        type: "STRING"
        function: "hash"
#       index: 0        
        concatItems:
          - person_
          - 0
          - _id
      props:
        - name: "firstName"
          type: "STRING"
          index: 1
        - name: "lastName"
          type: "STRING"
          index: 2
        - name: "gender"
          type: "STRING"
          index: 3
          nullable: true
          defaultValue: female
        - name: "birthday"
          type: "DATE"
          index: 4
          nullable: true
          nullValue: _NULL_
        - name: "creationDate"
          type: "DATETIME"
          index: 5
        - name: "locationIP"
          type: "STRING"
          index: 6
        - name: "browserUsed"
          type: "STRING"
          index: 7
  - path: ./knows.csv
    batch: 256
    edges:
    - name: KNOWS # person_knows_person
#      mode: INSERT
#      filter:  
#        expr: Record[1] == "XXX"
      src:
        id:
          type: "STRING"
          concatItems:
            - person_
            - 0
            - _id
      dst:
        id:
          type: "STRING"
          concatItems:
            - person_
            - 1
            - _id
      props:
        - name: "creationDate"
          type: "DATETIME"
          index: 2
          nullable: true
          nullValue: _NULL_
          defaultValue: 0000-00-00T00:00:00

配置主要包括以下几个部分:

  • 指定数据源信息。
  • 指定执行语句的批处理量。
  • 指定 CSV 文件格式信息。
  • 指定 Tag 的模式映射。
  • 指定 Edge type 的模式映射。
参数 默认值 是否必须 说明
sources.path
sources.s3
sources.oss
sources.ftp
sources.sftp
sources.hdfs
- 指定数据源信息,例如本地文件、HDFS、S3 等。一个source只能配置一种数据源,配置多个数据源请在多个source内配置。
不同数据源的配置项说明请参见示例内的注释。
sources.batch 256 导入该数据源时执行语句的批处理量。优先级高于manager.batch
sources.csv.delimiter , CSV 文件的分隔符。仅支持 1 个字符的字符串分隔符。使用特殊字符做分隔符时需要进行转义。例如当分隔符为十六进制0x03Ctrl+C时,转义的写法为:"\x03""\u0003"。关于 yaml 格式特殊字符转义的细节请参见Escaped Characters
sources.csv.withHeader false 是否忽略 CSV 文件中的第一条记录。
sources.csv.lazyQuotes false 是否允许惰性解析引号。如果值为true,引号可以出现在非引号字段中,非双引号可以出现在引号字段中,而不会引发解析错误。
sources.tags.name - Tag 名称。
sources.tags.mode INSERT 批量操作类型,包括导入、更新和删除。可选值为INSERTUPDATEDELETE(NebulaGraph Importer 从 4.1.0 版本开始支持DELETE类型)。
sources.tags.filter.expr - 过滤数据,满足过滤条件的才会导入。支持的比较符为==!=<><=>=。支持的逻辑运算符为not(!)、and(&&)和or(||)。例如(Record[0] == "Mahinda" or Record[0] == "Michael") and Record[3] == "male"
sources.tags.id.type STRING VID 的类型。
sources.tags.id.function - 生成 VID 的函数。目前仅支持hash
sources.tags.id.index - VID 对应的数据文件中的列号。如果未配置sources.tags.id.concatItems,该参数必须配置。
sources.tags.id.concatItems - 用于连接两个或多个数组,连接项可以是stringint或者混合。string代表常量,int表示索引列。如果设置了该参数,sources.tags.id.index参数将不生效。
sources.tags.ignoreExistedIndex true 是否启用IGNORE_EXISTED_INDEX,即插入点后不更新索引。
sources.tags.props.name - VID 上的属性名称,必须与数据库中的属性相同。
sources.tags.props.type STRING VID 上属性的数据类型。目前支持BOOLINTFLOATDOUBLESTRINGTIMETIMESTAMPDATEDATETIMEGEOGRAPHYGEOGRAPHY(POINT)GEOGRAPHY(LINESTRING)GEOGRAPHY(POLYGON)
sources.tags.props.index - 属性值对应的数据文件中的列号。
sources.tags.props.nullable false 属性是否可以为NULL,可选true或者false
sources.tags.props.nullValue - nullable设置为true时,属性的值与nullValue相等则将该属性值设置为NULL
sources.tags.props.alternativeIndices - nullablefalse时忽略。该属性根据索引顺序从文件中获取,直到不等于nullValue
sources.tags.props.defaultValue - nullablefalse时忽略。根据indexalternativeIndices获取的所有值为nullValue时设置默认值。
sources.edges.name - Edge type 名称。
sources.edges.mode INSERT 批量操作类型,包括导入、更新和删除。可选值为INSERTUPDATEDELETE(NebulaGraph Importer 从 4.1.0 版本开始支持DELETE类型)。
sources.edges.filter.expr - 过滤数据,满足过滤条件的才会导入。支持的比较符为==!=<><=>=。支持的逻辑运算符为not(!)、and(&&)和or(||)。例如(Record[0] == "Mahinda" or Record[0] == "Michael") and Record[3] == "male"
sources.edges.src.id.type STRING 边上起点 VID 的数据类型。
sources.edges.src.id.index - 边上起点 VID 对应的数据文件中的列号。
sources.edges.dst.id.type STRING 边上终点 VID 的数据类型。
sources.edges.dst.id.index - 边上终点 VID 对应的数据文件中的列号。
sources.edges.rank.index - 边上 RANK 对应的数据文件中的列号。
sources.edges.ignoreExistedIndex true 是否启用IGNORE_EXISTED_INDEX,即插入点后不更新索引。
sources.edges.props.name - 边上属性的名称,必须与数据库中的属性相同。
sources.edges.props.type STRING 边上属性的数据类型。目前支持BOOLINTFLOATDOUBLESTRINGTIMETIMESTAMPDATEDATETIMEGEOGRAPHYGEOGRAPHY(POINT)GEOGRAPHY(LINESTRING)GEOGRAPHY(POLYGON)
sources.edges.props.index - 属性值对应的数据文件中的列号。
sources.edges.props.nullable - 属性是否可以为NULL,可选true或者false
sources.edges.props.nullValue - nullable设置为true时,属性的值与nullValue相等则将该属性值设置为NULL
sources.edges.props.defaultValue - nullablefalse时忽略。根据indexalternativeIndices获取的所有值为nullValue时设置默认值。

Note

CSV 文件中列的序号从 0 开始,即第一列的序号为 0,第二列的序号为 1。

社区用户实践

Note

阅读他人实践后倘若想按原文实践,请留意原文所用的内核和周边工具版本号,请确保你的软件环境和原文兼容。

常见问题

日志输出的字段含义是什么?

例如以下日志内容:

“msg”: “44m20s 2h7m10s 25.85%(129 GiB/498 GiB) Records{Finished: 302016726, Failed: 0, Rate: 113538.13/s}, Requests{Finished: 181786, Failed: 0, Latency: 4.046496736s/4.06694393s, Rate: 68.34/s}, Processed{Finished: 908575178, Failed: 0, Rate: 341563.62/s}”

字段说明如下:

  • 44m20s 2h7m10s 25.85%(129 GiB/498 GiB)对应基本信息,没有字段名称。
    • 已用时间。
    • 预计剩余时间。
    • 已处理百分比。
    • 已处理的数据大小。
    • 总数据大小。
  • Records对应 CSV 文件的记录。
    • Finished:完成的记录数。
    • Failed:失败的记录数。
    • Rate:每秒导入的记录数。
  • Requests对应请求。
    • Finished:完成的请求数。
    • Failed:失败的请求数。
    • Latency:服务端请求耗时/客户端请求耗时。
    • Rate:每秒处理的请求数。
  • Processed对应点边。
    • Finished:处理完成的点边数量。
    • Failed:处理失败的点边数量。
    • Rate:每秒处理的点边数量。

最后更新: September 6, 2024