Nebula-importer optimize performance

Here is example of data model for load info about files and their references to sites from where those files are downloaded.

We try to optimize load speed and play with number of client connections, buffer size and batch size.
What will be optimal parameters for this use case.

Nebula cluster is 4 nodes with 64 cores, 256GB RAM, 4 x 3.5TB Nvme disks
Nebula database version is 1.2.0

Here are also some script definitions and some simple template trick to generate scripts for parallel or sequential load for large dump files to be able to control velocity and be able to continue with load when something breaks.

NebulaDB Data Model

CREATE SPACE IF NOT EXISTS fgraph(partition_num=128, replica_factor=3);
USE fgraph;
CREATE TAG info(sha1 string, record_timestamp int);
CREATE TAG file(file_name string, file_size int);
CREATE TAG domain(source_domain string);
CREATE EDGE reference();
CREATE TAG INDEX source_domain_idx ON domain(source_domain);
CREATE TAG INDEX info_timestamp_idx ON info(record_timestamp);

files-template.yaml

version: v1
description: Files info and size - Template
removeTempFiles: false
clientSettings:
  retry: 5
  concurrency: 16
  channelBufferSize: 1024
  space: fgraph
  connection:
    user: root
    password: nebula
    address: alt-nebuladb01:9669,alt-nebuladb02:9669,alt-nebuladb03:9669,alt-nebuladb04:9669
    afterPeriod: 3s
logPath: /data01/dumps/err/files-XY.log
files:
  - path: /data01/dumps/files/files-XY.csv
    failDataPath: /data01/dumps/err/files-XY.csv
    batchSize: 32
    inOrder: false
    type: csv
    csv:
      withHeader: false
      withLabel: false
      delimiter: ","
    schema:
      type: vertex
      vertex:
        vid:
          index: 0
        tags:
          - name: info
            props:
              - name: sha1
                type: string
                index: 1
              - name: record_timestamp
                type: int
                index: 2
          - name: file
            props:
              - name: file_name
                type: string
                index: 3
              - name: file_size
                type: int
                index: 4
for id in `seq 0 255`; do cat /data01/dumps/scripts/files-template.yaml | sed s/XY/$( printf "%02x" $id )/ > /data01/dumps/scripts/files-$( printf "%02x" $id ).yaml ; done   

for id in `seq 0 255`; do /home/nebula/nebula-importer/nebula-importer --config /data01/dumps/scripts/files-$( printf "%02x" $id ).yaml ; sleep 10 ; done

domains.yaml

version: v1
description: Domains
removeTempFiles: false
clientSettings:
  retry: 5
  concurrency: 16 
  channelBufferSize: 1024
  space: fgraph
  connection:
    user: root
    password: nebula
    address: alt-nebuladb01:9669,alt-nebuladb02:9669,alt-nebuladb03:9669,alt-nebuladb04:9669
    afterPeriod: 3s
logPath: /data01/dumps/err/domains.log
files:
  - path: /data01/dumps/domains/domains.csv
    failDataPath: /data01/dumps/err/domains.csv
    batchSize: 16
    inOrder: false
    type: csv
    csv:
      withHeader: false
      withLabel: false
      delimiter: ","
    schema:
      type: vertex
      vertex:
        vid:
          index: 0
        tags:
          - name: domain
            props:
              - name: source_domain
                type: string
                index: 1
`/home/nebula/nebula-importer/nebula-importer --config /data01/dumps/scripts/domains.yaml` 

references-template.yaml

version: v1
description: References - Template
removeTempFiles: false
clientSettings:
  retry: 5
  concurrency: 16
  channelBufferSize: 1024
  space: fgraph
  connection:
user: root
password: nebula
address: alt-nebuladb01:9669,alt-nebuladb02:9669,alt-nebuladb03:9669,alt-nebuladb04:9669
afterPeriod: 3s
logPath: /data01/dumps/err/references-X.log
files:
  - path: /data01/dumps/domains/references-X.csv
failDataPath: /data01/dumps/err/references-X.csv
batchSize: 32
inOrder: false
type: csv
csv:
  withHeader: false
  withLabel: false
  delimiter: ","
schema:
  type: edge
  edge:
    name: reference
    withRanking: false
    srcVID:
      index: 0
    dstVID:
      index: 1
for id in `seq 0 15`; do cat /data01/dumps/scripts/references-template.yaml | sed s/X/$( printf "%1x" $id )/ > /data01/dumps/scripts/references-$( printf "%1x" $id ).yaml ; done  

for id in `seq 0 15`; do /home/nebula/nebula-importer/nebula-importer --config /data01/dumps/scripts/references-$( printf "%1x" $id ).yaml ; sleep 10 ; done

@yee Please help look at this.

Hi @goranc

I have a question about your data loading process: do you create index in nebula and then start to import csv data? If so, you’d better to delete the index at first and rebuild it when finishing data loading. For nebula 1.0, we haven’t optimized the performance of insertion with index.

I have tried with and without indexes pre-created.

But after load is finished, rebuilding index in background take long time for dataset I have
(I mean it takes many days to finish, even weeks) and in meantime I need to start loading new streaming data from kafka system.
I suppose that for streaming data arriving to space, index is automatically updated for new data.

So I decided that lower ingestion speed is more acceptable than rebuilding index afterwards.
Is there any issues with having indexes in advance, except slowing down the load speed ?

This way I’m able to do query to fetch progress and monitor which records are ingesting into the graphdb, almost in real time.

For described data model I can execute query:

lookup on info where info.record_timestamp >= now()-60 | \
fetch prop on info,file $-.VertexID yield info.sha1 as sha1, info.record_timestamp as record_timestamp, file.file_name as file_anme, file.file_size as file_size | \
yield $-.sha1 as sha1, (timestamp)$-.record_timestamp as record_timestamp, $-.file_name as file_name, $-.file_size as file_size where $-.file_size > 1024000 | order by record_timestamp

Hi @goranc , sorry to reply so late.

If you want to connect streaming system to nebula graph, it’s not a good idea to rebuild index in real time. But when you load data at first time, you can handle the index like what i said. For increment data, it’s not necessary to rebuild all index any more. And you can do that if the lower load speed could meet your requirement.

In addition, welcome to try nebula 2.0, we have optimize the data loading for the schema with index. And we will release it in this month.

Ok, will try with index create after initial load.

Rebuild index will take long time, so if we start real-time loaders before index is finished with rebuild,
will new data from real-time loaders be available for query using index, eg. lookup query ?

Also another question is about settings for nebula-importer, what is the optimal parallel loaders versus batch size ratio ?
And how to define how many parallel loaders can be executed before we get overloaded nebula cluster ?

Yes. the real-time new data will also update index data after index rebuilding. So you can lookup them after insert successfully.

The concurrency configuration item represents how many clients to be used to connect to nebula graph in parallel. I suggest you’d better to set it equal to the cores of your machine.

The batchSize is the number of vertices or edges sent to nebula at same time. Nebula Storaged will store these data in batch to avoid too many RPCs. The speed of inserting data is affected by your disk. So you can do some tests to find a suitable value.