Skip to content

Commit 56d0d25

Browse files
authored
DEV-875 Creating Grafana Dashboards and adding labels to metrics/more refactoring (#389)
* Building grafana dashboards * Adding labels to grafana metrics * Clean up and add graph builder total updates metric * Added spout labels to metrics
1 parent 0f40074 commit 56d0d25

20 files changed

+279
-295
lines changed

core/src/main/scala/com/raphtory/components/Component.scala

+3-2
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package com.raphtory.components
22

33
import com.raphtory.config.PulsarController
4+
import com.raphtory.config.telemetry.ComponentTelemetryHandler
45
import com.raphtory.serialisers.PulsarKryoSerialiser
56
import com.typesafe.config.Config
67
import com.typesafe.scalalogging.Logger
@@ -15,8 +16,8 @@ import scala.reflect.runtime.universe._
1516
abstract class Component[T](conf: Config, private val pulsarController: PulsarController)
1617
extends Runnable {
1718

18-
val logger: Logger = Logger(LoggerFactory.getLogger(this.getClass))
19-
19+
val logger: Logger = Logger(LoggerFactory.getLogger(this.getClass))
20+
val telemetry = ComponentTelemetryHandler
2021
val pulsarAddress: String = conf.getString("raphtory.pulsar.broker.address")
2122
val pulsarAdminAddress: String = conf.getString("raphtory.pulsar.admin.address")
2223
val spoutTopic: String = conf.getString("raphtory.spout.topic")

core/src/main/scala/com/raphtory/components/graphbuilder/BuilderExecutor.scala

+2-11
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,6 @@ import scala.reflect.runtime.universe._
1919
class BuilderExecutor[T: ClassTag](
2020
name: Int,
2121
deploymentID: String,
22-
vertexAddCounter: Counter,
23-
vertexDeleteCounter: Counter,
24-
edgeAddCounter: Counter,
25-
edgeDeleteCounter: Counter,
2622
graphBuilder: GraphBuilder[T],
2723
conf: Config,
2824
pulsarController: PulsarController
@@ -31,16 +27,11 @@ class BuilderExecutor[T: ClassTag](
3127
safegraphBuilder
3228
.setBuilderMetaData(
3329
name,
34-
deploymentID,
35-
vertexAddCounter,
36-
vertexDeleteCounter,
37-
edgeAddCounter,
38-
edgeDeleteCounter
30+
deploymentID
3931
)
4032
private val producers = pulsarController.toWriterProducers
4133
private val failOnError: Boolean = conf.getBoolean("raphtory.builders.failOnError")
4234
private var messagesProcessed = 0
43-
private val graphUpdateCounter = BuilderTelemetry.totalGraphBuilderUpdates(deploymentID)
4435
var cancelableConsumer: Option[Consumer[Array[Byte]]] = None
4536

4637
override def run(): Unit = {
@@ -72,7 +63,7 @@ class BuilderExecutor[T: ClassTag](
7263
.getUpdates(msg)(failOnError = failOnError)
7364
.foreach { message =>
7465
sendUpdate(message)
75-
graphUpdateCounter.inc()
66+
telemetry.graphBuilderUpdatesCounter.labels(deploymentID).inc()
7667
}
7768

7869
protected def sendUpdate(graphUpdate: GraphUpdate): Unit = {

core/src/main/scala/com/raphtory/components/graphbuilder/GraphBuilder.scala

+12-24
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,8 @@ import com.raphtory.components.partition.BatchWriter
44
import com.typesafe.scalalogging.Logger
55
import net.openhft.hashing.LongHashFunction
66
import org.slf4j.LoggerFactory
7-
87
import Properties._
8+
import com.raphtory.config.telemetry.ComponentTelemetryHandler
99
import io.prometheus.client.Counter
1010

1111
import scala.collection.mutable
@@ -140,10 +140,6 @@ trait GraphBuilder[T] extends Serializable {
140140
private var batchWriters: mutable.Map[Int, BatchWriter[T]] = _
141141
private var builderID: Int = _
142142
private var deploymentID: String = _
143-
private var vertexAddCounter: Counter = _
144-
private var vertexDeleteCounter: Counter = _
145-
private var edgeAddCounter: Counter = _
146-
private var edgeDeleteCounter: Counter = _
147143
private var batching: Boolean = false
148144
private var totalPartitions: Int = 1
149145

@@ -176,18 +172,10 @@ trait GraphBuilder[T] extends Serializable {
176172

177173
private[raphtory] def setBuilderMetaData(
178174
builderID: Int,
179-
deploymentID: String,
180-
vertexAddCounter: Counter,
181-
vertexDeleteCounter: Counter,
182-
edgeAddCounter: Counter,
183-
edgeDeleteCounter: Counter
175+
deploymentID: String
184176
) = {
185177
this.builderID = builderID
186178
this.deploymentID = deploymentID
187-
this.vertexAddCounter = vertexAddCounter
188-
this.vertexDeleteCounter = vertexDeleteCounter
189-
this.edgeAddCounter = edgeAddCounter
190-
this.edgeDeleteCounter = edgeDeleteCounter
191179
}
192180

193181
private[raphtory] def setupBatchIngestion(
@@ -204,19 +192,19 @@ trait GraphBuilder[T] extends Serializable {
204192
protected def addVertex(updateTime: Long, srcId: Long): Unit = {
205193
val update = VertexAdd(updateTime, srcId, Properties(), None)
206194
handleVertexAdd(update)
207-
vertexAddCounter.inc()
195+
ComponentTelemetryHandler.vertexAddCounter.labels(deploymentID).inc()
208196
}
209197

210198
protected def addVertex(updateTime: Long, srcId: Long, properties: Properties): Unit = {
211199
val update = VertexAdd(updateTime, srcId, properties, None)
212200
handleVertexAdd(update)
213-
vertexAddCounter.inc()
201+
ComponentTelemetryHandler.vertexAddCounter.labels(deploymentID).inc()
214202
}
215203

216204
protected def addVertex(updateTime: Long, srcId: Long, vertexType: Type): Unit = {
217205
val update = VertexAdd(updateTime, srcId, Properties(), Some(vertexType))
218206
handleVertexAdd(update)
219-
vertexAddCounter.inc()
207+
ComponentTelemetryHandler.vertexAddCounter.labels(deploymentID).inc()
220208
}
221209

222210
protected def addVertex(
@@ -227,18 +215,18 @@ trait GraphBuilder[T] extends Serializable {
227215
): Unit = {
228216
val update = VertexAdd(updateTime, srcId, properties, Some(vertexType))
229217
handleVertexAdd(update)
230-
vertexAddCounter.inc()
218+
ComponentTelemetryHandler.vertexAddCounter.labels(deploymentID).inc()
231219
}
232220

233221
protected def deleteVertex(updateTime: Long, srcId: Long): Unit = {
234222
updates += VertexDelete(updateTime, srcId)
235-
vertexDeleteCounter.inc()
223+
ComponentTelemetryHandler.vertexDeleteCounter.labels(deploymentID).inc()
236224
}
237225

238226
protected def addEdge(updateTime: Long, srcId: Long, dstId: Long): Unit = {
239227
val update = EdgeAdd(updateTime, srcId, dstId, Properties(), None)
240228
handleEdgeAdd(update)
241-
edgeAddCounter.inc()
229+
ComponentTelemetryHandler.edgeAddCounter.labels(deploymentID).inc()
242230
}
243231

244232
protected def addEdge(
@@ -249,13 +237,13 @@ trait GraphBuilder[T] extends Serializable {
249237
): Unit = {
250238
val update = EdgeAdd(updateTime, srcId, dstId, properties, None)
251239
handleEdgeAdd(update)
252-
edgeAddCounter.inc()
240+
ComponentTelemetryHandler.edgeAddCounter.labels(deploymentID).inc()
253241
}
254242

255243
protected def addEdge(updateTime: Long, srcId: Long, dstId: Long, edgeType: Type): Unit = {
256244
val update = EdgeAdd(updateTime, srcId, dstId, Properties(), Some(edgeType))
257245
handleEdgeAdd(update)
258-
edgeAddCounter.inc()
246+
ComponentTelemetryHandler.edgeAddCounter.labels(deploymentID).inc()
259247
}
260248

261249
protected def addEdge(
@@ -267,12 +255,12 @@ trait GraphBuilder[T] extends Serializable {
267255
): Unit = {
268256
val update = EdgeAdd(updateTime, srcId, dstId, properties, Some(edgeType))
269257
handleEdgeAdd(update)
270-
edgeAddCounter.inc()
258+
ComponentTelemetryHandler.edgeAddCounter.labels(deploymentID).inc()
271259
}
272260

273261
protected def deleteEdge(updateTime: Long, srcId: Long, dstId: Long): Unit = {
274262
updates += EdgeDelete(updateTime, srcId, dstId)
275-
edgeDeleteCounter.inc()
263+
ComponentTelemetryHandler.edgeDeleteCounter.labels(deploymentID).inc()
276264
}
277265

278266
private def handleVertexAdd(update: VertexAdd) =

core/src/main/scala/com/raphtory/components/partition/BatchWriter.scala

+8-25
Original file line numberDiff line numberDiff line change
@@ -1,21 +1,8 @@
11
package com.raphtory.components.partition
22

3-
import com.raphtory.components.Component
4-
import com.raphtory.components.graphbuilder.BatchAddRemoteEdge
5-
import com.raphtory.components.graphbuilder.EdgeAdd
6-
import com.raphtory.components.graphbuilder.EdgeDelete
7-
import com.raphtory.components.graphbuilder.GraphAlteration
8-
import com.raphtory.config.telemetry.PartitionTelemetry
9-
import com.raphtory.components.graphbuilder.GraphBuilder
10-
import com.raphtory.components.graphbuilder.SyncExistingEdgeAdd
11-
import com.raphtory.components.graphbuilder.SyncExistingEdgeRemoval
12-
import com.raphtory.components.graphbuilder.SyncNewEdgeAdd
13-
import com.raphtory.components.graphbuilder.SyncNewEdgeRemoval
14-
import com.raphtory.components.graphbuilder.VertexAdd
15-
import com.raphtory.components.spout.Spout
16-
import com.raphtory.config.PulsarController
3+
import com.raphtory.components.graphbuilder._
4+
import com.raphtory.config.telemetry.ComponentTelemetryHandler
175
import com.raphtory.graph._
18-
import com.typesafe.config.Config
196
import com.typesafe.scalalogging.Logger
207
import org.slf4j.LoggerFactory
218

@@ -27,18 +14,13 @@ class BatchWriter[T: ClassTag](
2714
partitionID: Int,
2815
storage: GraphPartition
2916
) {
17+
val telemetry = ComponentTelemetryHandler
3018

3119
def getStorage() = storage
3220

3321
private var processedMessages = 0
3422
val logger: Logger = Logger(LoggerFactory.getLogger(this.getClass))
3523

36-
val batchWriterVertexAdditions = PartitionTelemetry.batchWriterVertexAdditions(partitionID)
37-
val batchWriterEdgeAdditions = PartitionTelemetry.batchWriterEdgeAdditions(partitionID)
38-
39-
val batchWriterRemoteEdgeAdditions =
40-
PartitionTelemetry.batchWriterRemoteEdgeAdditions(partitionID)
41-
4224
def handleMessage(msg: GraphAlteration): Unit = {
4325
msg match {
4426
//Updates from the Graph Builder
@@ -65,7 +47,7 @@ class BatchWriter[T: ClassTag](
6547
logger.trace(s"Partition $partitionID: Received VertexAdd message '$update'.")
6648
storage.addVertex(update.updateTime, update.srcId, update.properties, update.vType)
6749
storage.timings(update.updateTime)
68-
batchWriterVertexAdditions.inc()
50+
telemetry.batchWriterVertexAdditionsCollector.labels(partitionID.toString).inc()
6951
}
7052

7153
def processEdgeAdd(update: EdgeAdd): Unit = {
@@ -78,7 +60,7 @@ class BatchWriter[T: ClassTag](
7860
update.properties,
7961
update.eType
8062
)
81-
batchWriterEdgeAdditions.inc()
63+
telemetry.batchWriterEdgeAdditionsCollector.labels(partitionID.toString).inc()
8264
}
8365

8466
def processRemoteEdgeAdd(req: BatchAddRemoteEdge): Unit = {
@@ -87,7 +69,7 @@ class BatchWriter[T: ClassTag](
8769
storage.timings(req.msgTime)
8870
storage
8971
.batchAddRemoteEdge(req.msgTime, req.srcId, req.dstId, req.properties, req.vType)
90-
batchWriterRemoteEdgeAdditions.inc()
72+
telemetry.batchWriterRemoteEdgeAdditionsCollector.labels(partitionID.toString).inc()
9173
}
9274

9375
def processSyncNewEdgeRemoval(req: SyncNewEdgeRemoval): Unit = {
@@ -96,13 +78,14 @@ class BatchWriter[T: ClassTag](
9678
)
9779
storage.timings(req.msgTime)
9880
storage.syncNewEdgeRemoval(req.msgTime, req.srcId, req.dstId, req.removals)
99-
batchWriterRemoteEdgeAdditions.inc()
81+
telemetry.batchWriterRemoteEdgeDeletionsCollector.labels(partitionID.toString).inc()
10082
}
10183

10284
def processEdgeDelete(update: EdgeDelete): Unit = {
10385
logger.trace(s"Partition $partitionID: Received EdgeDelete message '$update'.")
10486
storage.timings(update.updateTime)
10587
storage.removeEdge(update.updateTime, update.srcId, update.dstId)
88+
telemetry.batchWriterEdgeDeletionsCollector.labels(partitionID.toString).inc()
10689
}
10790

10891
def printUpdateCount() = {

core/src/main/scala/com/raphtory/components/partition/LocalBatchHandler.scala

+1-10
Original file line numberDiff line numberDiff line change
@@ -26,21 +26,12 @@ class LocalBatchHandler[T: ClassTag](
2626
scheduler: Scheduler
2727
) extends Component[GraphAlteration](conf: Config, pulsarController: PulsarController) {
2828

29-
private val vertexAddCounter = BuilderTelemetry.totalVertexAdds(deploymentID)
30-
private val vertexDeleteCounter = BuilderTelemetry.totalVertexDeletes(deploymentID)
31-
private val edgeAddCounter = BuilderTelemetry.totalEdgeAdds(deploymentID)
32-
private val edgeDeleteCounter = BuilderTelemetry.totalEdgeDeletes(deploymentID)
33-
3429
graphBuilder.setupBatchIngestion(partitionIDs, batchWriters, totalPartitions)
3530

3631
// TODO get builderID to pull from zookeeper once stream and batch can run synchro
3732
graphBuilder.setBuilderMetaData(
3833
builderID = 0,
39-
deploymentID,
40-
vertexAddCounter,
41-
vertexDeleteCounter,
42-
edgeAddCounter,
43-
edgeDeleteCounter
34+
deploymentID
4435
)
4536

4637
private val rescheduler: Runnable = new Runnable {

core/src/main/scala/com/raphtory/components/partition/QueryExecutor.scala

-6
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,6 @@ import com.raphtory.components.querymanager.TableFunctionComplete
3232
import com.raphtory.components.querymanager.VertexMessage
3333
import com.raphtory.components.querymanager.VertexMessageBatch
3434
import com.raphtory.config.PulsarController
35-
import com.raphtory.config.telemetry.StorageTelemetry
3635
import com.raphtory.graph.GraphPartition
3736
import com.raphtory.graph.LensInterface
3837
import com.raphtory.output.PulsarOutputFormat
@@ -134,11 +133,6 @@ class QueryExecutor(
134133
s"${jobID}_partitionID_${partitionID}_time_${timestamp}_window_$value"
135134
case None => s"${jobID}_partitionID_${partitionID}_time_$timestamp"
136135
}
137-
StorageTelemetry
138-
.pojoLensGraphSize(
139-
id
140-
)
141-
.set(graphLens.getFullGraphSize)
142136
logger.debug(
143137
s"Job '$jobID' at Partition '$partitionID': Created perspective at time '$timestamp' with window '$window'. in ${System
144138
.currentTimeMillis() - time}ms"

core/src/main/scala/com/raphtory/components/partition/Reader.scala

+8-15
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,9 @@ import com.raphtory.components.querymanager.EstablishExecutor
66
import com.raphtory.components.querymanager.QueryManagement
77
import com.raphtory.components.querymanager.WatermarkTime
88
import com.raphtory.config.PulsarController
9-
import com.raphtory.config.telemetry.PartitionTelemetry
109
import com.raphtory.graph.GraphPartition
1110
import com.typesafe.config.Config
11+
import io.prometheus.client.Gauge
1212
import monix.execution.Cancelable
1313
import monix.execution.Scheduler
1414
import org.apache.pulsar.client.api.Consumer
@@ -32,16 +32,6 @@ class Reader(
3232
var scheduledWatermark: Option[Cancelable] = None
3333
private var lastWatermark = WatermarkTime(partitionID, Long.MaxValue, Long.MinValue, false)
3434

35-
val lastWaterMarkProcessed =
36-
PartitionTelemetry.lastWaterMarkProcessed(
37-
s"partitionID_${partitionID}_deploymentID_$deploymentID"
38-
)
39-
40-
val queryExecutorMapCounter =
41-
PartitionTelemetry.queryExecutorMapCounter(
42-
s"partitionID_${partitionID}_deploymentID_$deploymentID"
43-
)
44-
4535
private val watermarking = new Runnable {
4636
override def run(): Unit = createWatermark()
4737
}
@@ -75,9 +65,10 @@ class Reader(
7565
msg match {
7666
case req: EstablishExecutor =>
7767
val jobID = req.jobID
78-
val queryExecutor = new QueryExecutor(partitionID, storage, jobID, conf, pulsarController)
68+
val queryExecutor =
69+
new QueryExecutor(partitionID, storage, jobID, conf, pulsarController)
7970
scheduler.execute(queryExecutor)
80-
queryExecutorMapCounter.inc()
71+
telemetry.queryExecutorCollector.labels(partitionID.toString, deploymentID).inc()
8172
executorMap += ((jobID, queryExecutor))
8273

8374
case req: EndQuery =>
@@ -86,7 +77,7 @@ class Reader(
8677
try {
8778
executorMap(req.jobID).stop()
8879
executorMap.remove(req.jobID)
89-
queryExecutorMapCounter.dec()
80+
telemetry.queryExecutorCollector.labels(partitionID.toString, deploymentID).dec()
9081
}
9182
catch {
9283
case e: Exception =>
@@ -143,7 +134,9 @@ class Reader(
143134
)
144135
}
145136
lastWatermark = watermark
146-
lastWaterMarkProcessed.set(finalTime)
137+
telemetry.lastWatermarkProcessedCollector
138+
.labels(partitionID.toString, deploymentID)
139+
.set(finalTime)
147140
}
148141
scheduleWaterMarker()
149142
}

0 commit comments

Comments
 (0)