What could be done with Spark and PySpark on top of Nebula Graph, this post covers everything we should know.
In this article, I am trying to walk you through all three Spark projects of Nebula Graph with some runnable hands-on examples. Also, I managed to make PySpark usable with Nebula Graph Spark Connector, which will be contributed to the Docs later.
1 The three Spark projects for Nebula Graph
I used to draw a sketch around all data importing methods of Nebula Graph here, where all three of the Spark-based Nebula Graph projects were already briefly introduced. Instead, in this article, a slightly deeper dive into all of them will be made based on my recent work on them.
TL;DR
Nebula Spark Connector is a Spark Lib to enable spark application reading from and writing to Nebula Graph in form of a dataframe.
Nebula Exchange, built on top of Nebula Spark Connector, is a Spark Lib and Application to exchange(for the Open Source version, it’s one way: write, whereas for the enterprise version it’s bidirectional) different data sources like(MySQL, Neo4j, PostgreSQL, Clickhouse, Hive, etc.). Besides writing directly to Nebula Graph, it could optionally generate SST files to be ingested into Nebula Graph to offload the storage computation outside of the Nebula Graph cluster.
Nebula Algorithm, built on top of Nebula Spark Connector and GraphX, is a Spark Lib and Application to run de facto graph algorithms(PageRank, LPA, etc…) on a graph from Nebula Graph.
Then let’s have the long version of those spark projects more on how-to perspectives.
To read data from Nebula Graph, i.e. vertex, Nebula Spark Connector will scan all storage instances that hold the given label(TAG): withLabel("player"), and we could optionally specify the properties of the vertex: withReturnCols(List("name", "age")).
With needed configuration being provided, a call of spark.read.nebula.loadVerticesToDF will return dataframe of the Vertex Scan call towards Nebula Graph:
defreadVertex(spark:SparkSession):Unit={LOG.info("start to read nebula vertices")valconfig=NebulaConnectionConfig.builder().withMetaAddress("metad0:9559,metad1:9559,metad2:9559").withConenctionRetry(2).build()valnebulaReadVertexConfig:ReadNebulaConfig=ReadNebulaConfig.builder().withSpace("basketballplayer").withLabel("player").withNoColumn(false).withReturnCols(List("name","age")).withLimit(10).withPartitionNum(10).build()valvertex=spark.read.nebula(config,nebulaReadVertexConfig).loadVerticesToDF()vertex.printSchema()vertex.show(20)println("vertex count: "+vertex.count())}
It’s similar for the writer part and one big difference here is the writing path is done via GraphD as the underlying Spark Connector is shooting nGQL INSERT queries.
Then let’s do the hands-on end-to-end practice.
2.2 Hands-on Spark Connector
Prerequisites: it’s assumed below the procedure is being run on a Linux Machine with an internet connection, ideally with Docker and Docker-Compose preinstalled.
2.2.1 Bootstrap a Nebula Graph Cluster
Firstly, let’s deploy Nebula Graph Core v3.0 and Nebula Studio with Nebula-Up, it will try to install Docker and Docker-Compose for us, in case it failed, please try to install Docker and Docker-Compose on your own first.
In the above one line command, we created a container named spark-master-0 with a built-in Hadoop 2.7 and spark 2.4.5, connected to the Nebula Graph cluster in its docker network named nebula-docker-compose_nebula-net, and it mapped the current path to /root of the spark container.
Then, we could access the spark env container with:
1
docker exec -it spark-master-0 bash
Optionally, we could install mvn inside the container:
1
2
3
4
5
6
7
8
9
10
11
docker exec -it spark-master-0 bash
# in the container shellexportMAVEN_VERSION=3.5.4
exportMAVEN_HOME=/usr/lib/mvn
exportPATH=$MAVEN_HOME/bin:$PATH
wget http://archive.apache.org/dist/maven/maven-3/$MAVEN_VERSION/binaries/apache-maven-$MAVEN_VERSION-bin.tar.gz &&\
tar -zxvf apache-maven-$MAVEN_VERSION-bin.tar.gz &&\
rm apache-maven-$MAVEN_VERSION-bin.tar.gz &&\
mv apache-maven-$MAVEN_VERSION /usr/lib/mvn
2.2.3 Run spark connector example
Let’s clone the connector and the example code base, and build(or place the connector Jar package) the connector:
22/04/19 07:29:34 INFO DAGScheduler: Job 1 finished: show at NebulaSparkReaderExample.scala:57, took 0.199310 s
+---------+------------------+---+
|_vertexId| name|age|
+---------+------------------+---+
|player105| Danny Green| 31|
|player109| Tiago Splitter| 34|
|player111| David West| 38|
|player118| Russell Westbrook| 30|
|player143|Kristaps Porzingis| 23|
|player114| Tracy McGrady| 39|
|player150| Luka Doncic| 20|
|player103| Rudy Gay| 32|
|player113| Dejounte Murray| 29|
|player121| Chris Paul| 33|
|player128| Carmelo Anthony| 34|
|player130| Joel Embiid| 25|
|player136| Steve Nash| 45|
|player108| Boris Diaw| 36|
|player122| DeAndre Jordan| 30|
|player123| Ricky Rubio| 28|
|player139| Marc Gasol| 34|
|player142| Klay Thompson| 29|
|player145| JaVale McGee| 31|
|player102| LaMarcus Aldridge| 33|
+---------+------------------+---+
only showing top 20 rows
22/04/19 07:29:36 INFO DAGScheduler: Job 4 finished: show at NebulaSparkReaderExample.scala:82, took 0.135543 s
+---------+---------+-----+------+
| _srcId| _dstId|_rank|degree|
+---------+---------+-----+------+
|player105|player100| 0| 70|
|player105|player104| 0| 83|
|player105|player116| 0| 80|
|player109|player100| 0| 80|
|player109|player125| 0| 90|
|player118|player120| 0| 90|
|player118|player131| 0| 90|
|player143|player150| 0| 90|
|player114|player103| 0| 90|
|player114|player115| 0| 90|
|player114|player140| 0| 90|
|player150|player120| 0| 80|
|player150|player137| 0| 90|
|player150|player143| 0| 90|
|player103|player102| 0| 70|
|player113|player100| 0| 99|
|player113|player101| 0| 99|
|player113|player104| 0| 99|
|player113|player105| 0| 99|
|player113|player106| 0| 99|
+---------+---------+-----+------+
only showing top 20 rows
And there are more examples under the repo, especially one for GraphX, you could try exploring yourself for that part. Please be noted in GraphX assumed vertex ID to be in numeric type, thus for string typed vertex ID case a conversion on the fly is needed, please refer to the example in Nebula Algorithom on how to mitigate that.
Nebula Exchange is a Spark Lib/App to read data from multiple sources, then, write to either Nebula Graph directly or into Nebula Graph SST Files.
The way to leverage Nebula Exchange is only to firstly create the configuration files to let the exchange know how data should be fetched and written, then call the exchange package with the conf file specified.
Now let’s do a hands-on test with the same environment created in the last chapter.
3.1 Hands-on Exchange
Here, we are using Exchange to consume data source from a CSV file, where the first column is Vertex ID, and the second, and third to be properties of “name” and “age”.
{
# Spark relation config
spark: {
app: {
name: Nebula Exchange
}
master:local
driver: {
cores: 1
maxResultSize: 1G
}
executor: {
memory: 1G
}
cores:{
max: 16
}
}
# Nebula Graph relation config
nebula: {
address:{
graph:["graphd:9669"]
meta:["metad0:9559", "metad1:9559", "metad2:9559"]
}
user: root
pswd: nebula
space: basketballplayer
# parameters for SST import, not required
path:{
local:"/tmp"
remote:"/sst"
hdfs.namenode: "hdfs://localhost:9000"
}
# nebula client connection parameters
connection {
# socket connect & execute timeout, unit: millisecond
timeout: 30000
}
error: {
# max number of failures, if the number of failures is bigger than max, then exit the application.
max: 32
# failed import job will be recorded in output path
output: /tmp/errors
}
# use google's RateLimiter to limit the requests send to NebulaGraph
rate: {
# the stable throughput of RateLimiter
limit: 1024
# Acquires a permit from RateLimiter, unit: MILLISECONDS
# if it can't be obtained within the specified timeout, then give up the request.
timeout: 1000
}
}
# Processing tags
# There are tag config examples for different data Sources.
tags: [
# HDFS CSV
# Import mode is client, just change type.sink to sst if you want to use client import mode.
{
name: player
type: {
source: csv
sink: client
}
path: "file:///root/player.csv"
# if your csv file has no header, then use _c0,_c1,_c2,.. to indicate fields
fields: [_c1, _c2]
nebula.fields: [name, age]
vertex: {
field:_c0
}
separator: ","
header: false
batch: 256
partition: 32
}
]
}
Finally, let’s create player.csv and exchange.conf, it should be listed as the following:
...
22/04/19 08:22:08 INFO Exchange$: import for tag player cost time: 1.32 s
22/04/19 08:22:08 INFO Exchange$: Client-Import: batchSuccess.player: 2
22/04/19 08:22:08 INFO Exchange$: Client-Import: batchFailure.player: 0
...
Please refer to the documentation and conf examples for more data sources. For hands-on Exchange writing to SST Files, you could refer to both Documentation and Nebula Exchange SST 2.x Hands-on Guide.
When we call Nebula Algorithm with spark-submit, on how to use perspective, it is quite similar to Exchange. This post already showed us how to do that in action.
4.2 Calling as a lib in code
On the other hand, we could call Nebula Algorithm in spark as a Spark Lib, the gain will be:
More control/customization on the output format of the algorithm
Possible to perform algorithm for non-numerical vertex ID cases, see here
5 PySpark for Nebula Graph
PySpark comes with the capability to call java/scala packages inside python, thus it’s also quite easy to use Spark Connector with Python.
Here I am doing this from the pyspark entrypoint in /spark/bin/pyspark, with the connector’s Jar package specified with --driver-class-path and --jars
Then, rather than pass NebulaConnectionConfig and ReadNebulaConfig to spark.read.nebula, we should instead call spark.read.format("com.vesoft.nebula.connector.NebulaDataSource").
defloadVerticesToDF():DataFrame={assert(connectionConfig!=null&&readConfig!=null,"nebula config is not set, please call nebula() before loadVerticesToDF")valdfReader=reader.format(classOf[NebulaDataSource].getName).option(NebulaOptions.TYPE,DataTypeEnum.VERTEX.toString).option(NebulaOptions.SPACE_NAME,readConfig.getSpace).option(NebulaOptions.LABEL,readConfig.getLabel).option(NebulaOptions.PARTITION_NUMBER,readConfig.getPartitionNum).option(NebulaOptions.RETURN_COLS,readConfig.getReturnCols.mkString(",")).option(NebulaOptions.NO_COLUMN,readConfig.getNoColumn).option(NebulaOptions.LIMIT,readConfig.getLimit).option(NebulaOptions.META_ADDRESS,connectionConfig.getMetaAddress).option(NebulaOptions.TIMEOUT,connectionConfig.getTimeout).option(NebulaOptions.CONNECTION_RETRY,connectionConfig.getConnectionRetry).option(NebulaOptions.EXECUTION_RETRY,connectionConfig.getExecRetry).option(NebulaOptions.ENABLE_META_SSL,connectionConfig.getEnableMetaSSL).option(NebulaOptions.ENABLE_STORAGE_SSL,connectionConfig.getEnableStorageSSL)if(connectionConfig.getEnableStorageSSL||connectionConfig.getEnableMetaSSL){dfReader.option(NebulaOptions.SSL_SIGN_TYPE,connectionConfig.getSignType)SSLSignType.withName(connectionConfig.getSignType)match{caseSSLSignType.CA=>dfReader.option(NebulaOptions.CA_SIGN_PARAM,connectionConfig.getCaSignParam)caseSSLSignType.SELF=>dfReader.option(NebulaOptions.SELF_SIGN_PARAM,connectionConfig.getSelfSignParam)}}dfReader.load()}