Skip to content

Commit b506e98

Browse files
authored
Add communication layer and set Akka for local deployments (#373)
* Add initial abstractions for the communication layer * Fix compilation errors * Add topic as separated entity * Fix compilation errors * Drop EndPoint abstraction for sharding topics * Fix bug blocking the queries * Add AkkaConnector implementation * Fix bug when actor is not registered yet * Replace akka serialiser by Kryo * Update Akka based Global Factories * Add support for one builder * Remove commented out method * Reorder Component class * Replace PulsarController by PulsarConnector * Rename back PulsarConnector to make everything work * Rename 'queries' topic to 'submissions' * Add configuration parameters for the pulsar topics * Add check to use Akka for local deployments * Set Monix scheduler as global scheduler * Set queryTrack and submissions topics to use Akka * Avoid using Zookeeper for local deployments * Drop GlobalFactory classes * Avoid configuring vertex messages channel when there is only one partition * Add DoNotDocument tag to new classes * Create communication package * Fix broken imports * Remove unused imports of PulsarKryoSerialiser * Rename PulsarKryoSerialiser to KryoSerialiser * Use listener id as subscription name in PulsarConnector * Update listeners ids to be unique * Drop AkkaScheduler implementation * Review pending TODOs * Rebuild message checking inside QueryHandler * Rename endedQueries topic to CompletedQueries * Remove comented out code * Add comment to AkkaTopicRepository * Add TODO to QueryHandler * Make LocalIDManager thread-safe * Rename producers to endPoints in VertexMessageHandler * Add comments for the global watermark problem * Update application.conf variables to match keys * Fix bug reading application.conf * Add missing dependency
1 parent 56d0d25 commit b506e98

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

42 files changed

+1351
-575
lines changed

build.sbt

+25-20
Original file line numberDiff line numberDiff line change
@@ -12,22 +12,24 @@ sonatypeCredentialHost := "s01.oss.sonatype.org"
1212
sonatypeRepository := "https://s01.oss.sonatype.org/service/local"
1313

1414
ThisBuild / scmInfo := Some(
15-
ScmInfo(
16-
url("https://github.com/Raphtory/Raphtory"),
17-
"scm:[email protected]:Raphtory/Raphtory.git"
18-
)
15+
ScmInfo(
16+
url("https://github.com/Raphtory/Raphtory"),
17+
"scm:[email protected]:Raphtory/Raphtory.git"
18+
)
1919
)
2020
ThisBuild / developers := List(
21-
Developer(
22-
id = "miratepuffin",
23-
name = "Ben Steer",
24-
email = "[email protected]",
25-
url = url("https://twitter.com/miratepuffin")
26-
)
21+
Developer(
22+
id = "miratepuffin",
23+
name = "Ben Steer",
24+
email = "[email protected]",
25+
url = url("https://twitter.com/miratepuffin")
26+
)
2727
)
2828

2929
ThisBuild / description := "A Distributed Temporal Graph Processing System"
30-
ThisBuild / licenses := List("Apache 2" -> new URL("http://www.apache.org/licenses/LICENSE-2.0.txt"))
30+
ThisBuild / licenses := List(
31+
"Apache 2" -> new URL("http://www.apache.org/licenses/LICENSE-2.0.txt")
32+
)
3133
ThisBuild / homepage := Some(url("https://github.com/Raphtory/Raphtory"))
3234

3335
// Remove all additional repository other than Maven Central from POM
@@ -89,7 +91,8 @@ lazy val core = (project in file("core"))
8991
twitterChill,
9092
twittered,
9193
typesafeConfig,
92-
zookeeper
94+
zookeeper,
95+
akkaTyped
9396
),
9497
libraryDependencies ~= { _.map(_.exclude("org.slf4j", "slf4j-log4j12")) }
9598
)
@@ -112,14 +115,16 @@ lazy val core = (project in file("core"))
112115

113116
// EXAMPLE PROJECTS
114117

115-
lazy val examplesEnron = (project in file("examples/raphtory-example-enron")).dependsOn(core)
116-
lazy val examplesEthereum = (project in file("examples/raphtory-example-ethereum")).dependsOn(core)
117-
lazy val examplesFacebook = (project in file("examples/raphtory-example-facebook")).dependsOn(core)
118-
lazy val examplesGab = (project in file("examples/raphtory-example-gab")).dependsOn(core)
119-
lazy val examplesLotr = (project in file("examples/raphtory-example-lotr")).dependsOn(core)
120-
lazy val examplesPresto = (project in file("examples/raphtory-example-presto")).dependsOn(core)
121-
lazy val examplesTwitter = (project in file("examples/raphtory-example-twitter")).dependsOn(core)
122-
lazy val examplesTwitterCircles = (project in file("examples/raphtory-example-twittercircles")).dependsOn(core)
118+
lazy val examplesEnron = (project in file("examples/raphtory-example-enron")).dependsOn(core)
119+
lazy val examplesEthereum = (project in file("examples/raphtory-example-ethereum")).dependsOn(core)
120+
lazy val examplesFacebook = (project in file("examples/raphtory-example-facebook")).dependsOn(core)
121+
lazy val examplesGab = (project in file("examples/raphtory-example-gab")).dependsOn(core)
122+
lazy val examplesLotr = (project in file("examples/raphtory-example-lotr")).dependsOn(core)
123+
lazy val examplesPresto = (project in file("examples/raphtory-example-presto")).dependsOn(core)
124+
lazy val examplesTwitter = (project in file("examples/raphtory-example-twitter")).dependsOn(core)
125+
126+
lazy val examplesTwitterCircles =
127+
(project in file("examples/raphtory-example-twittercircles")).dependsOn(core)
123128

124129
// SETTINGS
125130

core/src/main/resources/application.conf

+196-37
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,203 @@ raphtory {
1717
}
1818
retention {
1919
time = 0
20-
time = ${?RAPHTORY_RETENTION_TIME}
20+
time = ${?RAPHTORY_PULSAR_RETENTION_TIME}
2121
size = 0
22-
size = ${?RAPHTORY_RETENTION_SIZE}
23-
22+
size = ${?RAPHTORY_PULSAR_RETENTION_SIZE}
23+
}
24+
topics {
25+
spout {
26+
tenant = "public"
27+
tenant = ${?RAPHTORY_PULSAR_TOPICS_TENANT}
28+
tenant = ${?RAPHTORY_PULSAR_TOPICS_SPOUT_TENANT}
29+
namespace = ${raphtory.deploy.id}
30+
namespace = ${?RAPHTORY_PULSAR_TOPICS_NAMESPACE}
31+
namespace = ${?RAPHTORY_PULSAR_TOPICS_SPOUT_NAMESPACE}
32+
persistence = true
33+
persistence = ${?RAPHTORY_PULSAR_TOPICS_SPOUT_PERSISTENCE}
34+
retention {
35+
time = ${raphtory.pulsar.retention.time}
36+
time = ${?RAPHTORY_PULSAR_TOPICS_SPOUT_RETENTION_TIME}
37+
size = ${raphtory.pulsar.retention.size}
38+
size = ${?RAPHTORY_PULSAR_TOPICS_SPOUT_RETENTION_SIZE}
39+
}
40+
}
41+
graph.updates {
42+
tenant = "public"
43+
tenant = ${?RAPHTORY_PULSAR_TOPICS_TENANT}
44+
tenant = ${?RAPHTORY_PULSAR_TOPICS_GRAPH_UPDATES_TENANT}
45+
namespace = ${raphtory.deploy.id}
46+
namespace = ${?RAPHTORY_PULSAR_TOPICS_NAMESPACE}
47+
namespace = ${?RAPHTORY_PULSAR_TOPICS_GRAPH_UPDATES_NAMESPACE}
48+
persistence = true
49+
persistence = ${?RAPHTORY_PULSAR_TOPICS_GRAPH_UPDATES_PERSISTENCE}
50+
retention {
51+
time = ${raphtory.pulsar.retention.time}
52+
time = ${?RAPHTORY_PULSAR_TOPICS_GRAPH_UPDATES_RETENTION_TIME}
53+
size = ${raphtory.pulsar.retention.size}
54+
size = ${?RAPHTORY_PULSAR_TOPICS_GRAPH_UPDATES_RETENTION_SIZE}
55+
}
56+
}
57+
graph.sync {
58+
tenant = "public"
59+
tenant = ${?RAPHTORY_PULSAR_TOPICS_TENANT}
60+
tenant = ${?RAPHTORY_PULSAR_TOPICS_GRAPH_SYNC_TENANT}
61+
namespace = ${raphtory.deploy.id}
62+
namespace = ${?RAPHTORY_PULSAR_TOPICS_NAMESPACE}
63+
namespace = ${?RAPHTORY_PULSAR_TOPICS_GRAPH_SYNC_NAMESPACE}
64+
persistence = true
65+
persistence = ${?RAPHTORY_PULSAR_TOPICS_GRAPH_SYNC_PERSISTENCE}
66+
retention {
67+
time = ${raphtory.pulsar.retention.time}
68+
time = ${?RAPHTORY_PULSAR_TOPICS_GRAPH_SYNC_RETENTION_TIME}
69+
size = ${raphtory.pulsar.retention.size}
70+
size = ${?RAPHTORY_PULSAR_TOPICS_GRAPH_SYNC_RETENTION_SIZE}
71+
}
72+
}
73+
submissions {
74+
tenant = "public"
75+
tenant = ${?RAPHTORY_PULSAR_TOPICS_TENANT}
76+
tenant = ${?RAPHTORY_PULSAR_TOPICS_SUBMISSIONS_TENANT}
77+
namespace = ${raphtory.deploy.id}
78+
namespace = ${?RAPHTORY_PULSAR_TOPICS_NAMESPACE}
79+
namespace = ${?RAPHTORY_PULSAR_TOPICS_SUBMISSIONS_NAMESPACE}
80+
persistence = true
81+
persistence = ${?RAPHTORY_PULSAR_TOPICS_SUBMISSIONS_PERSISTENCE}
82+
retention {
83+
time = ${raphtory.pulsar.retention.time}
84+
time = ${?RAPHTORY_PULSAR_TOPICS_SUBMISSIONS_RETENTION_TIME}
85+
size = ${raphtory.pulsar.retention.size}
86+
size = ${?RAPHTORY_PULSAR_TOPICS_SUBMISSIONS_RETENTION_SIZE}
87+
}
88+
}
89+
completed.queries {
90+
tenant = "public"
91+
tenant = ${?RAPHTORY_PULSAR_TOPICS_TENANT}
92+
tenant = ${?RAPHTORY_PULSAR_TOPICS_COMPLETED_QUERIES_TENANT}
93+
namespace = ${raphtory.deploy.id}
94+
namespace = ${?RAPHTORY_PULSAR_TOPICS_NAMESPACE}
95+
namespace = ${?RAPHTORY_PULSAR_TOPICS_COMPLETED_QUERIES_NAMESPACE}
96+
persistence = true
97+
persistence = ${?RAPHTORY_PULSAR_TOPICS_COMPLETED_QUERIES_PERSISTENCE}
98+
retention {
99+
time = ${raphtory.pulsar.retention.time}
100+
time = ${?RAPHTORY_PULSAR_TOPICS_COMPLETED_QUERIES_RETENTION_TIME}
101+
size = ${raphtory.pulsar.retention.size}
102+
size = ${?RAPHTORY_PULSAR_TOPICS_COMPLETED_QUERIES_RETENTION_SIZE}
103+
}
104+
}
105+
watermark {
106+
tenant = "public"
107+
tenant = ${?RAPHTORY_PULSAR_TOPICS_TENANT}
108+
tenant = ${?RAPHTORY_PULSAR_TOPICS_WATERMARK_TENANT}
109+
namespace = ${raphtory.deploy.id}
110+
namespace = ${?RAPHTORY_PULSAR_TOPICS_NAMESPACE}
111+
namespace = ${?RAPHTORY_PULSAR_TOPICS_WATERMARK_NAMESPACE}
112+
persistence = true
113+
persistence = ${?RAPHTORY_PULSAR_TOPICS_WATERMARK_PERSISTENCE}
114+
retention {
115+
time = ${raphtory.pulsar.retention.time}
116+
time = ${?RAPHTORY_PULSAR_TOPICS_WATERMARK_RETENTION_TIME}
117+
size = ${raphtory.pulsar.retention.size}
118+
size = ${?RAPHTORY_PULSAR_TOPICS_WATERMARK_RETENTION_SIZE}
119+
}
120+
}
121+
query.prep {
122+
tenant = "public"
123+
tenant = ${?RAPHTORY_PULSAR_TOPICS_TENANT}
124+
tenant = ${?RAPHTORY_PULSAR_TOPICS_QUERY_PREP_TENANT}
125+
namespace = ${raphtory.deploy.id}
126+
namespace = ${?RAPHTORY_PULSAR_TOPICS_NAMESPACE}
127+
namespace = ${?RAPHTORY_PULSAR_TOPICS_QUERY_PREP_NAMESPACE}
128+
persistence = true
129+
persistence = ${?RAPHTORY_PULSAR_TOPICS_QUERY_PREP_PERSISTENCE}
130+
retention {
131+
time = ${raphtory.pulsar.retention.time}
132+
time = ${?RAPHTORY_PULSAR_TOPICS_QUERY_PREP_RETENTION_TIME}
133+
size = ${raphtory.pulsar.retention.size}
134+
size = ${?RAPHTORY_PULSAR_TOPICS_QUERY_PREP_RETENTION_SIZE}
135+
}
136+
}
137+
query.track {
138+
tenant = "public"
139+
tenant = ${?RAPHTORY_PULSAR_TOPICS_TENANT}
140+
tenant = ${?RAPHTORY_PULSAR_TOPICS_QUERY_TRACK_TENANT}
141+
namespace = ${raphtory.deploy.id}
142+
namespace = ${?RAPHTORY_PULSAR_TOPICS_NAMESPACE}
143+
namespace = ${?RAPHTORY_PULSAR_TOPICS_QUERY_TRACK_NAMESPACE}
144+
persistence = true
145+
persistence = ${?RAPHTORY_PULSAR_TOPICS_QUERY_TRACK_PERSISTENCE}
146+
retention {
147+
time = ${raphtory.pulsar.retention.time}
148+
time = ${?RAPHTORY_PULSAR_TOPICS_QUERY_TRACK_RETENTION_TIME}
149+
size = ${raphtory.pulsar.retention.size}
150+
size = ${?RAPHTORY_PULSAR_TOPICS_QUERY_TRACK_RETENTION_SIZE}
151+
}
152+
}
153+
rechecks {
154+
tenant = "public"
155+
tenant = ${?RAPHTORY_PULSAR_TOPICS_TENANT}
156+
tenant = ${?RAPHTORY_PULSAR_TOPICS_RECHECKS_TENANT}
157+
namespace = ${raphtory.deploy.id}
158+
namespace = ${?RAPHTORY_PULSAR_TOPICS_NAMESPACE}
159+
namespace = ${?RAPHTORY_PULSAR_TOPICS_RECHECKS_NAMESPACE}
160+
persistence = true
161+
persistence = ${?RAPHTORY_PULSAR_TOPICS_RECHECKS_PERSISTENCE}
162+
retention {
163+
time = ${raphtory.pulsar.retention.time}
164+
time = ${?RAPHTORY_PULSAR_TOPICS_RECHECKS_RETENTION_TIME}
165+
size = ${raphtory.pulsar.retention.size}
166+
size = ${?RAPHTORY_PULSAR_TOPICS_RECHECKS_RETENTION_SIZE}
167+
}
168+
}
169+
job.status {
170+
tenant = "public"
171+
tenant = ${?RAPHTORY_PULSAR_TOPICS_TENANT}
172+
tenant = ${?RAPHTORY_PULSAR_TOPICS_JOB_STATUS_TENANT}
173+
namespace = ${raphtory.deploy.id}
174+
namespace = ${?RAPHTORY_PULSAR_TOPICS_NAMESPACE}
175+
namespace = ${?RAPHTORY_PULSAR_TOPICS_JOB_STATUS_NAMESPACE}
176+
persistence = true
177+
persistence = ${?RAPHTORY_PULSAR_TOPICS_JOB_STATUS_PERSISTENCE}
178+
retention {
179+
time = ${raphtory.pulsar.retention.time}
180+
time = ${?RAPHTORY_PULSAR_TOPICS_JOB_STATUS_RETENTION_TIME}
181+
size = ${raphtory.pulsar.retention.size}
182+
size = ${?RAPHTORY_PULSAR_TOPICS_JOB_STATUS_RETENTION_SIZE}
183+
}
184+
}
185+
vertex.messages {
186+
tenant = "public"
187+
tenant = ${?RAPHTORY_PULSAR_TOPICS_TENANT}
188+
tenant = ${?RAPHTORY_PULSAR_TOPICS_VERTEX_MESSAGES_TENANT}
189+
namespace = ${raphtory.deploy.id}
190+
namespace = ${?RAPHTORY_PULSAR_TOPICS_NAMESPACE}
191+
namespace = ${?RAPHTORY_PULSAR_TOPICS_VERTEX_MESSAGES_NAMESPACE}
192+
persistence = true
193+
persistence = ${?RAPHTORY_PULSAR_TOPICS_VERTEX_MESSAGES_PERSISTENCE}
194+
retention {
195+
time = ${raphtory.pulsar.retention.time}
196+
time = ${?RAPHTORY_PULSAR_TOPICS_VERTEX_MESSAGES_RETENTION_TIME}
197+
size = ${raphtory.pulsar.retention.size}
198+
size = ${?RAPHTORY_PULSAR_TOPICS_VERTEX_MESSAGES_RETENTION_SIZE}
199+
}
200+
}
201+
job.operations {
202+
tenant = "public"
203+
tenant = ${?RAPHTORY_PULSAR_TOPICS_TENANT}
204+
tenant = ${?RAPHTORY_PULSAR_TOPICS_JOB_OPERATIONS_TENANT}
205+
namespace = ${raphtory.deploy.id}
206+
namespace = ${?RAPHTORY_PULSAR_TOPICS_NAMESPACE}
207+
namespace = ${?RAPHTORY_PULSAR_TOPICS_JOB_OPERATIONS_NAMESPACE}
208+
persistence = true
209+
persistence = ${?RAPHTORY_PULSAR_TOPICS_JOB_OPERATIONS_PERSISTENCE}
210+
retention {
211+
time = ${raphtory.pulsar.retention.time}
212+
time = ${?RAPHTORY_PULSAR_TOPICS_JOB_OPERATIONS_RETENTION_TIME}
213+
size = ${raphtory.pulsar.retention.size}
214+
size = ${?RAPHTORY_PULSAR_TOPICS_JOB_OPERATIONS_RETENTION_SIZE}
215+
}
216+
}
24217
}
25218
}
26219

@@ -48,34 +241,12 @@ raphtory {
48241
}
49242
query {
50243
status = "NO QUERY TO EXECUTE"
51-
tenant = "public"
52-
tenant = ${?RAPHTORY_TENANT}
53-
tenant = ${?RAPHTORY_QUERY_TENANT}
54-
namespace = ${raphtory.deploy.id}
55-
namespace = ${?RAPHTORY_NAMESPACE}
56-
namespace = ${?RAPHTORY_QUERY_NAMESPACE}
57-
retentionTime = ${raphtory.pulsar.retention.time}
58-
retentionSize = ${raphtory.pulsar.retention.size}
59-
persistence = true
60-
persistence = ${?RAPHTORY_QUERY_PERSISTENCE}
61244
timeFormat = "yyyy-MM-dd[ HH:mm:ss[.SSS]]"
62245
timeFormat = ${?RAPHTORY_QUERY_TIMEFORMAT}
63246
}
64247
spout {
65248
topic = "raphtory_data_raw"
66249
topic = ${?RAPHTORY_SPOUT_TOPIC}
67-
tenant = "public"
68-
tenant = ${?RAPHTORY_TENANT}
69-
tenant = ${?RAPHTORY_SPOUT_TENANT}
70-
namespace = ${raphtory.deploy.id}
71-
namespace = ${?RAPHTORY_NAMESPACE}
72-
namespace = ${?RAPHTORY_SPOUT_NAMESPACE}
73-
retentionTime = ${raphtory.pulsar.retention.time}
74-
retentionTime = ${?RAPHTORY_SPOUT_RETENTION_TIME}
75-
retentionSize = ${raphtory.pulsar.retention.size}
76-
retentionSize = ${?RAPHTORY_SPOUT_RETENTION_SIZE}
77-
persistence = true
78-
persistence = ${?RAPHTORY_SPOUT_PERSISTENCE}
79250
copyFiles = ${?RAPHTORY_SPOUT_COPY_FILES}
80251
failOnError = true
81252
failOnError = ${?RAPHTORY_SPOUT_FAIL_ON_ERROR}
@@ -117,18 +288,6 @@ raphtory {
117288
builders {
118289
countPerServer = 1
119290
countPerServer = ${?RAPHTORY_BUILDERS_COUNTPERSERVER}
120-
tenant = "public"
121-
tenant = ${?RAPHTORY_TENANT}
122-
tenant = ${?RAPHTORY_BUILDERS_TENANT}
123-
namespace = ${raphtory.deploy.id}
124-
namespace = ${?RAPHTORY_NAMESPACE}
125-
namespace = ${?RAPHTORY_BUILDERS_NAMESPACE}
126-
retentionTime = ${raphtory.pulsar.retention.time}
127-
retentionTime = ${?RAPHTORY_BUILDERS_RETENTION_TIME}
128-
retentionSize = ${raphtory.pulsar.retention.size}
129-
retentionSize = ${?RAPHTORY_BUILDERS_RETENTION_SIZE}
130-
persistence = true
131-
persistence = ${?RAPHTORY_BUILDERS_PERSISTENCE}
132291
failOnError = true
133292
failOnError = ${?RAPHTORY_BUILDERS_FAIL_ON_ERROR}
134293
}

core/src/main/scala/com/raphtory/client/GraphDeployment.scala

+1-1
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,10 @@ import com.raphtory.components.graphbuilder.GraphBuilder
44
import com.raphtory.components.spout.Spout
55
import com.raphtory.config.ComponentFactory
66
import com.raphtory.config.Partitions
7+
import com.raphtory.config.Scheduler
78
import com.raphtory.config.ThreadedWorker
89
import com.raphtory.config.ZookeeperIDManager
910
import com.typesafe.config.Config
10-
import monix.execution.Scheduler
1111
import io.prometheus.client.exporter.HTTPServer
1212

1313
import java.io.IOException

core/src/main/scala/com/raphtory/client/QuerySender.scala

+4-9
Original file line numberDiff line numberDiff line change
@@ -1,30 +1,25 @@
11
package com.raphtory.client
22

3+
import com.raphtory.communication.TopicRepository
34
import com.raphtory.components.querymanager.Query
45
import com.raphtory.components.querytracker.QueryProgressTracker
56
import com.raphtory.config.ComponentFactory
6-
import com.raphtory.config.PulsarController
7-
import com.raphtory.serialisers.PulsarKryoSerialiser
8-
import monix.execution.Scheduler
9-
import org.apache.pulsar.client.api.Schema
7+
import com.raphtory.config.Scheduler
108

119
import scala.util.Random
1210

1311
/** @DoNotDocument */
1412
class QuerySender(
1513
private val componentFactory: ComponentFactory,
1614
private val scheduler: Scheduler,
17-
private val pulsarController: PulsarController
15+
private val topics: TopicRepository
1816
) {
1917

20-
val kryo = PulsarKryoSerialiser()
21-
implicit private val schema: Schema[Array[Byte]] = Schema.BYTES
22-
2318
def submit(query: Query, customJobName: String = ""): QueryProgressTracker = {
2419
val jobName = if (customJobName.nonEmpty) customJobName else getDefaultName(query)
2520
val jobID = jobName + "_" + Random.nextLong().abs
2621
val outputQuery = query.copy(name = jobID)
27-
pulsarController.toQueryManagerProducer sendAsync kryo.serialise(outputQuery)
22+
topics.submissions.endPoint sendAsync outputQuery
2823
componentFactory.queryProgressTracker(jobID, scheduler)
2924
}
3025

0 commit comments

Comments
 (0)