# Connect to nebula with console
~/.nebula-up/console.sh
# Execute any queryies like
~/.nebula-up/console.sh -e "SHOW HOSTS"
加载一份数据进去,并执行一个图查询:
1
2
3
4
5
6
# Load the sample dataset
~/.nebula-up/load-basketballplayer-dataset.sh
# 等一分钟左右# Make a Graph Query the sample dataset
~/.nebula-up/console.sh -e 'USE basketballplayer; FIND ALL PATH FROM "player100" TO "team204" OVER * WHERE follow.degree is EMPTY or follow.degree >=0 YIELD path AS p;'
2.2.2 进入 Spark 环境
执行下面这一行,我们就可以进入到 Spark 环境:
1
docker exec -it sparkmaster bash
如果我们想执行编译,可以在里边安装 mvn:
1
2
3
4
5
6
7
8
9
10
11
docker exec -it sparkmaster bash
# in the container shellexportMAVEN_VERSION=3.5.4
exportMAVEN_HOME=/usr/lib/mvn
exportPATH=$MAVEN_HOME/bin:$PATH
wget http://archive.apache.org/dist/maven/maven-3/$MAVEN_VERSION/binaries/apache-maven-$MAVEN_VERSION-bin.tar.gz &&\
tar -zxvf apache-maven-$MAVEN_VERSION-bin.tar.gz &&\
rm apache-maven-$MAVEN_VERSION-bin.tar.gz &&\
mv apache-maven-$MAVEN_VERSION /usr/lib/mvn
2.2.3 跑 Spark Connector 的例子
2.2.3.1 选项 1(推荐):通过 PySpark
进入 PySpark Shell
1
~/.nebula-up/nebula-pyspark.sh
调用 Nebula Spark Reader
1
2
3
4
5
6
7
8
9
10
11
12
# call Nebula Spark Connector Readerdf=spark.read.format("com.vesoft.nebula.connector.NebulaDataSource").option("type","vertex").option("spaceName","basketballplayer").option("label","player").option("returnCols","name,age").option("metaAddress","metad0:9559").option("partitionNumber",1).load()# show the dataframe with limit of 2df.show(n=2)
cd ~/.nebula-up/nebula-up/spark
git clone https://github.com/vesoft-inc/nebula-spark-connector.git
docker exec -it sparkmaster bash
cd /root/nebula-spark-connector
替换示例项目的代码
1
2
3
echo > example/src/main/scala/com/vesoft/nebula/examples/connector/NebulaSparkReaderExample.scala
vi example/src/main/scala/com/vesoft/nebula/examples/connector/NebulaSparkReaderExample.scala
{
# Spark relation config
spark: {
app: {
name: Nebula Exchange
}
master:local
driver: {
cores: 1
maxResultSize: 1G
}
executor: {
memory: 1G
}
cores:{
max: 16
}
}
# Nebula Graph relation config
nebula: {
address:{
graph:["graphd:9669"]
meta:["metad0:9559", "metad1:9559", "metad2:9559"]
}
user: root
pswd: nebula
space: basketballplayer
# parameters for SST import, not required
path:{
local:"/tmp"
remote:"/sst"
hdfs.namenode: "hdfs://localhost:9000"
}
# nebula client connection parameters
connection {
# socket connect & execute timeout, unit: millisecond
timeout: 30000
}
error: {
# max number of failures, if the number of failures is bigger than max, then exit the application.
max: 32
# failed import job will be recorded in output path
output: /tmp/errors
}
# use google's RateLimiter to limit the requests send to NebulaGraph
rate: {
# the stable throughput of RateLimiter
limit: 1024
# Acquires a permit from RateLimiter, unit: MILLISECONDS
# if it can't be obtained within the specified timeout, then give up the request.
timeout: 1000
}
}
# Processing tags
# There are tag config examples for different dataSources.
tags: [
# HDFS csv
# Import mode is client, just change type.sink to sst if you want to use client import mode.
{
name: player
type: {
source: csv
sink: client
}
path: "file:///root/player.csv"
# if your csv file has no header, then use _c0,_c1,_c2,.. to indicate fields
fields: [_c1, _c2]
nebula.fields: [name, age]
vertex: {
field:_c0
}
separator: ","
header: false
batch: 256
partition: 32
}
]
}
我们应该能看到那个 CSV 数据源和这个配置文件都在同一个目录下了:
1
2
3
4
5
6
7
bash-5.0# ls -l
total 24
drwxrwxr-x 2100010004096 Jun 1 04:26 download
-rw-rw-r-- 1100010001908 Jun 1 04:23 exchange.conf
-rw-rw-r-- 1100010002593 Jun 1 04:23 hadoop.env
drwxrwxr-x 7100010004096 Jun 6 03:27 nebula-spark-connector
-rw-rw-r-- 11000100051 Jun 1 04:23 player.csv
22/06/06 03:56:26 INFO Exchange$: Processing Tag player
22/06/06 03:56:26 INFO Exchange$: field keys: _c1, _c2
22/06/06 03:56:26 INFO Exchange$: nebula keys: name, age
22/06/06 03:56:26 INFO Exchange$: Loading CSV files from file:///root/player.csv
...
22/06/06 03:56:41 INFO Exchange$: import for tag player cost time: 3.35 s
22/06/06 03:56:41 INFO Exchange$: Client-Import: batchSuccess.player: 2
22/06/06 03:56:41 INFO Exchange$: Client-Import: batchFailure.player: 0
...
data: {
# data source. optional of nebula,csv,json
source: nebula
# data sink, means the algorithm result will be write into this sink. optional of nebula,csv,text
sink: csv
# if your algorithm needs weight
hasWeight: false
}
nebula: {
# algo's data source from Nebula. If data.source is nebula, then this nebula.read config can be valid.
read: {
# Nebula metad server address, multiple addresses are split by English comma
metaAddress: "metad0:9559"
# Nebula space
space: livejournal
# Nebula edge types, multiple labels means that data from multiple edges will union together
labels: ["follow"]
# Nebula edge property name for each edge type, this property will be as weight col for algorithm.
# Make sure the weightCols are corresponding to labels.
weightCols: []
}