Nebula Algorithm

  • Nebula Graph 2.0.0
  • Deployment type (Single-Host)
  • Hardware info
    • Disk in use (HDD)
    • CPU: Intel(R) Xeon(R) CPU E5-2630 v2 @ 2.60GHz * 10
    • Ram: 64 G
  • +----+------------------+------------------+----------------+---------+------------+--------------------+-------------+-----------+ | ID | Name | Partition Number | Replica Factor | Charset | Collate | Vid Type | Atomic Edge | Group | +----+------------------+------------------+----------------+---------+------------+--------------------+-------------+-----------+ | 1 | "CallConnection" | 10 | 1 | "utf8" | "utf8_bin" | "FIXED_STRING(80)" | "false" | "default" | +----+------------------+------------------+----------------+---------+------------+--------------------+-------------+-----------+
    I wanna use pagerank algorithm. I use below configuration:
{
  # Spark relation config
  spark: {
    app: {
        name: LPA
        # spark.app.partitionNum
        partitionNum:100
    }
    master:local
  }

  data: {
    # data source. optional of nebula,csv,json
    source: nebula
    # data sink, means the algorithm result will be write into this sink. optional of nebula,csv,text
    sink: nebula
    # if your algorithm needs weight
    hasWeight: false
  }

  # Nebula Graph relation config
  nebula: {
    # algo's data source from Nebula. If data.source is nebula, then this nebula.read config can be valid.
    read: {
        # Nebula metad server address, multiple addresses are split by English comma
        metaAddress: "127.0.0.1:9559"
        # Nebula space
        space: CallConnection
        # Nebula edge types, multiple labels means that data from multiple edges will union together
        labels: ["callTO"]
        # Nebula edge property name for each edge type, this property will be as weight col for algorithm.
        # Make sure the weightCols are corresponding to labels.
        # weightCols: ["start_year"]
    }

    # algo result sink into Nebula. If data.sink is nebula, then this nebula.write config can be valid.
    write:{
        # Nebula graphd server address, multiple addresses are split by English comma
        graphAddress: "127.0.0.1:9669"
        # Nebula metad server address, multiple addresses are split by English comma
        metaAddress: "127.0.0.1:9559,127.0.0.1:9560"
        user:user
        pswd:password
        # Nebula space name
        space:CallConnection
        # Nebula tag name, the algorithm result will be write into this tag
        tag:pagerank
    }
  }

#   local: {
#     # algo's data source from Nebula. If data.source is csv or json, then this local.read can be valid.
#     read:{
#         filePath: "hdfs://127.0.0.1:9000/edge/work_for.csv"
#         # srcId column
#         srcId:"_c0"
#         # dstId column
#         dstId:"_c1"
#         # weight column
#         #weight: "col3"
#         # if csv file has header
#         header: false
#         # csv file's delimiter
#         delimiter:","
#     }

#     # algo result sink into local file. If data.sink is csv or text, then this local.write can be valid.
#     write:{
#         resultPath:/tmp/
#     }
#   }

  algorithm: {
    # the algorithm that you are going to execute,pick one from [pagerank, louvain, connectedcomponent,
    # labelpropagation, shortestpaths, degreestatic, kcore, stronglyconnectedcomponent, trianglecount,
    # betweenness]
    executeAlgo: pagerank

    # PageRank parameter
    pagerank: {
        maxIter: 2
        resetProb: 0.15  # default 0.15
    }

 }
}

after run spark-submit as this

spark-submit --master local --class com.vesoft.nebula.algorithm.Main nebula-algorithm-2.0.0.jar -p classes/Algorithm.conf

after about several minutes and use all CPU but not RAM,

here is the spark UI

spark-submit --master local --class com.vesoft.nebula.algorithm.Main nebula-algorithm-2.0.0.jar -p classes/Algorithm.conf 
21/05/19 06:19:53 WARN Utils: Your hostname, orientserver resolves to a loopback address: 127.0.1.1; using 10.10.1.121 instead (on interface ens33)
21/05/19 06:19:53 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
21/05/19 06:20:00 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
log4j:WARN No appenders could be found for logger (com.vesoft.nebula.algorithm.Main$).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
21/05/19 06:20:00 INFO SparkContext: Running Spark version 2.4.7
21/05/19 06:20:00 INFO SparkContext: Submitted application: LPA
21/05/19 06:20:00 INFO SecurityManager: Changing view acls to: orient
21/05/19 06:20:00 INFO SecurityManager: Changing modify acls to: orient
21/05/19 06:20:00 INFO SecurityManager: Changing view acls groups to: 
21/05/19 06:20:00 INFO SecurityManager: Changing modify acls groups to: 
21/05/19 06:20:00 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users  with view permissions: Set(orient); groups with view permissions: Set(); users  with modify permissions: Set(orient); groups with modify permissions: Set()
21/05/19 06:20:00 INFO Utils: Successfully started service 'sparkDriver' on port 36273.
21/05/19 06:20:00 INFO SparkEnv: Registering MapOutputTracker
21/05/19 06:20:00 INFO SparkEnv: Registering BlockManagerMaster
21/05/19 06:20:00 INFO BlockManagerMasterEndpoint: Using org.apache.spark.storage.DefaultTopologyMapper for getting topology information
21/05/19 06:20:00 INFO BlockManagerMasterEndpoint: BlockManagerMasterEndpoint up
21/05/19 06:20:00 INFO DiskBlockManager: Created local directory at /tmp/blockmgr-e15b4c05-c314-463e-9f06-a1fca904f7e7
21/05/19 06:20:00 INFO MemoryStore: MemoryStore started with capacity 366.3 MB
21/05/19 06:20:01 INFO SparkEnv: Registering OutputCommitCoordinator
21/05/19 06:20:01 INFO Utils: Successfully started service 'SparkUI' on port 4040.
21/05/19 06:20:01 INFO SparkUI: Bound SparkUI to 0.0.0.0, and started at http://10.10.1.121:4040
21/05/19 06:20:01 INFO SparkContext: Added JAR file:/home/orient/NebulaJavaTools/nebula-spark-utils/nebula-algorithm/target/nebula-algorithm-2.0.0.jar at spark://10.10.1.121:36273/jars/nebula-algorithm-2.0.0.jar with timestamp 1621405201288
21/05/19 06:20:01 INFO Executor: Starting executor ID driver on host localhost
21/05/19 06:20:01 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 34449.
21/05/19 06:20:01 INFO NettyBlockTransferService: Server created on 10.10.1.121:34449
21/05/19 06:20:01 INFO BlockManager: Using org.apache.spark.storage.RandomBlockReplicationPolicy for block replication policy
21/05/19 06:20:01 INFO BlockManagerMaster: Registering BlockManager BlockManagerId(driver, 10.10.1.121, 34449, None)
21/05/19 06:20:01 INFO BlockManagerMasterEndpoint: Registering block manager 10.10.1.121:34449 with 366.3 MB RAM, BlockManagerId(driver, 10.10.1.121, 34449, None)
21/05/19 06:20:01 INFO BlockManagerMaster: Registered BlockManager BlockManagerId(driver, 10.10.1.121, 34449, None)
21/05/19 06:20:01 INFO BlockManager: Initialized BlockManager: BlockManagerId(driver, 10.10.1.121, 34449, None)
21/05/19 06:20:01 INFO ReadNebulaConfig$: NebulaReadConfig={space=CallConnection,label=callTO,returnCols=List(),noColumn=true,partitionNum=10}
21/05/19 06:20:02 INFO SharedState: Setting hive.metastore.warehouse.dir ('null') to the value of spark.sql.warehouse.dir ('file:/home/orient/NebulaJavaTools/nebula-spark-utils/nebula-algorithm/target/spark-warehouse').
21/05/19 06:20:02 INFO SharedState: Warehouse path is 'file:/home/orient/NebulaJavaTools/nebula-spark-utils/nebula-algorithm/target/spark-warehouse'.
21/05/19 06:20:02 INFO StateStoreCoordinatorRef: Registered StateStoreCoordinator endpoint
21/05/19 06:20:02 INFO NebulaDataSource: create reader
21/05/19 06:20:02 INFO NebulaDataSource: options {spacename=CallConnection, nocolumn=true, metaaddress=127.0.0.1:9559, label=callTO, type=edge, connectionretry=2, timeout=6000, executionretry=1, paths=[], limit=1000, returncols=, partitionnumber=10}
21/05/19 06:20:02 INFO NebulaDataSourceEdgeReader: dataset's schema: StructType(StructField(_srcId,StringType,false), StructField(_dstId,StringType,false), StructField(_rank,LongType,false))
21/05/19 06:20:04 INFO NebulaDataSource: create reader
21/05/19 06:20:04 INFO NebulaDataSource: options {spacename=CallConnection, nocolumn=true, metaaddress=127.0.0.1:9559, label=callTO, type=edge, connectionretry=2, timeout=6000, executionretry=1, paths=[], limit=1000, returncols=, partitionnumber=10}
21/05/19 06:20:04 INFO DataSourceV2Strategy: 
Pushing operators to class com.vesoft.nebula.connector.NebulaDataSource
Pushed Filters: 
Post-Scan Filters: 
Output: _srcId#0, _dstId#1, _rank#2L
         
21/05/19 06:20:05 INFO CodeGenerator: Code generated in 205.90032 ms
21/05/19 06:20:05 INFO SparkContext: Starting job: fold at VertexRDDImpl.scala:90
21/05/19 06:20:05 INFO DAGScheduler: Registering RDD 9 (mapPartitions at VertexRDD.scala:356) as input to shuffle 3
21/05/19 06:20:05 INFO DAGScheduler: Registering RDD 27 (mapPartitions at VertexRDDImpl.scala:247) as input to shuffle 1
21/05/19 06:20:05 INFO DAGScheduler: Registering RDD 23 (mapPartitions at VertexRDDImpl.scala:247) as input to shuffle 0
21/05/19 06:20:05 INFO DAGScheduler: Registering RDD 31 (mapPartitions at GraphImpl.scala:208) as input to shuffle 2
21/05/19 06:20:05 INFO DAGScheduler: Got job 0 (fold at VertexRDDImpl.scala:90) with 10 output partitions
21/05/19 06:20:05 INFO DAGScheduler: Final stage: ResultStage 4 (fold at VertexRDDImpl.scala:90)
21/05/19 06:20:05 INFO DAGScheduler: Parents of final stage: List(ShuffleMapStage 0, ShuffleMapStage 3)
21/05/19 06:20:05 INFO DAGScheduler: Missing parents: List(ShuffleMapStage 0, ShuffleMapStage 3)
21/05/19 06:20:05 INFO DAGScheduler: Submitting ShuffleMapStage 0 (VertexRDD.createRoutingTables - vid2pid (aggregation) MapPartitionsRDD[9] at mapPartitions at VertexRDD.scala:356), which has no missing parents
21/05/19 06:20:05 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 16.4 KB, free 366.3 MB)
21/05/19 06:20:05 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 7.6 KB, free 366.3 MB)
21/05/19 06:20:05 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on 10.10.1.121:34449 (size: 7.6 KB, free: 366.3 MB)
21/05/19 06:20:05 INFO SparkContext: Created broadcast 0 from broadcast at DAGScheduler.scala:1184
21/05/19 06:20:05 INFO DAGScheduler: Submitting 10 missing tasks from ShuffleMapStage 0 (VertexRDD.createRoutingTables - vid2pid (aggregation) MapPartitionsRDD[9] at mapPartitions at VertexRDD.scala:356) (first 15 tasks are for partitions Vector(0, 1, 2, 3, 4, 5, 6, 7, 8, 9))
21/05/19 06:20:05 INFO TaskSchedulerImpl: Adding task set 0.0 with 10 tasks
21/05/19 06:20:06 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, localhost, executor driver, partition 0, PROCESS_LOCAL, 9761 bytes)
21/05/19 06:20:06 INFO Executor: Running task 0.0 in stage 0.0 (TID 0)
21/05/19 06:20:06 INFO Executor: Fetching spark://10.10.1.121:36273/jars/nebula-algorithm-2.0.0.jar with timestamp 1621405201288
21/05/19 06:20:06 INFO TransportClientFactory: Successfully created connection to /10.10.1.121:36273 after 44 ms (0 ms spent in bootstraps)
21/05/19 06:20:06 INFO Utils: Fetching spark://10.10.1.121:36273/jars/nebula-algorithm-2.0.0.jar to /tmp/spark-17e5e989-b080-4a41-aac3-f9fd56f03752/userFiles-693448ab-9c10-468b-a2a9-c926b9e62800/fetchFileTemp3358155586060883895.tmp
21/05/19 06:20:06 INFO Executor: Adding file:/tmp/spark-17e5e989-b080-4a41-aac3-f9fd56f03752/userFiles-693448ab-9c10-468b-a2a9-c926b9e62800/nebula-algorithm-2.0.0.jar to class loader
21/05/19 06:20:07 INFO NebulaEdgePartitionReader: partition index: 1, scanParts: List(1)
21/05/19 06:20:07 INFO CodeGenerator: Code generated in 23.996116 ms

after some minutes,


just do nothing and all task gone!

How many edges in your callTO, and please change the config of spark.app.partitionNum to 10 (Nebula space partition number ).

Ps: If you want to write the algorithm’s result back to nebula, the default column name of result should be changed. Now the default name starts with _, but Nebula does not support a property name that starts with _.

about 700 million links

ps: which line do you mean? I want to write back to nebula