TaskBroker.kt
package com.blizzard.content.filter.chatreplay.sink
import com.blizzard.content.filter.chatreplay.config.CHATLOG_PREFIX
import com.blizzard.content.filter.chatreplay.config.MODE
import com.blizzard.content.filter.chatreplay.config.TIME
import com.blizzard.content.filter.chatreplay.config.TRANSFORM
import com.blizzard.content.filter.chatreplay.domain.ChatLog
import com.blizzard.content.filter.chatreplay.metrics.timerTags
import com.blizzard.content.filter.chatreplay.metrics.toSpec
import com.blizzard.content.filter.chatreplay.sink.task.ChatLogTask
import com.blizzard.content.filter.chatreplay.sink.task.PredicateTask
import com.blizzard.content.filter.chatreplay.sink.task.ProcessTask
import com.blizzard.content.filter.chatreplay.sink.task.ReductionTask
import com.blizzard.content.filter.chatreplay.sink.task.TransformTask
import com.blizzard.content.filter.chatreplay.source.LogStreamService
import io.micrometer.core.instrument.MeterRegistry
import io.micrometer.core.instrument.Metrics
import io.micrometer.core.instrument.Tag
import io.micrometer.core.instrument.Tags
import io.micrometer.core.instrument.Timer
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.launch
import org.slf4j.LoggerFactory
import org.springframework.beans.factory.annotation.Value
import org.springframework.boot.context.event.ApplicationReadyEvent
import org.springframework.context.event.EventListener
import org.springframework.stereotype.Service
import java.util.concurrent.TimeUnit
import kotlin.system.measureNanoTime
/**
* Some work on ChatLogs must be done individually, per ChatLog
*
* This class centralizes the dispatching of those individual ChatLog tasks into Kotlin coroutines for better concurrent performance.
*
* For work that is done on the stream of ChatLogs from LogStreamService, see other subscribers.
*
* @see ChatLogTask
*/
@Service
class TaskBroker(
private val logStreamService: LogStreamService,
private val chatLogTasks: List<ChatLogTask>,
meterRegistry: MeterRegistry,
) {
private val log = LoggerFactory.getLogger(javaClass)
init {
log.info("Brokering ChatLogs for Tasks:")
chatLogTasks.forEach {
log.info("...Task: ${it.javaClass.simpleName}")
}
}
@Value("\${chat-replay.task-broker.enable-brokering}")
private var brokeringEnabled: Boolean = true
private val brokerTags = Tags.of(Tag.of(MODE, "taskbroker"))
/**
* When this field is created using `Metrics.counter()` instead, the metrics are slightly off in EndToEndIT and TaskBrokerMetricsIT tests.
* When created with `registry.counter()`, it matches perfectly.
* /shrug
*/
private val brokeredTotalCounter = meterRegistry.counter("$CHATLOG_PREFIX.taskbroker.brokered", brokerTags)
@EventListener(condition = "@environment.getProperty('chat-replay.subscribers.task-broker.enable')")
fun onApplicationEvent(readyEvent: ApplicationReadyEvent) {
log.info("ApplicationReady: ${javaClass.simpleName}")
CoroutineScope(Dispatchers.Default).launch {
log.info("${LogStreamService::class.java.simpleName} is ready")
brokerChatLogs()
}
}
private fun brokerChatLogs() {
logStreamService.logStream()
.doOnSubscribe {
if (brokeringEnabled) log.info("Now brokering chat logs")
}
.subscribe { chatLog ->
brokeredTotalCounter.increment()
if (brokeringEnabled) {
forEachChatLog(chatLog)
}
}
}
// https://micrometer.io/docs/concepts#_global_registry
private fun forEachChatLog(chatLog: ChatLog) {
if (log.isDebugEnabled) log.debug("Brokering: ${chatLog.id}")
chatLogTasks.asSequence().forEach { task ->
task.taskScope().launch {
with(TaskResult()) {
taskDuration = measureNanoTime {
when (task) {
is ProcessTask -> {
val processed: String = task.process(chatLog)
timerSpec = task.toSpec(/*Too much cardinality to put the processed value here*/)
}
is PredicateTask -> {
val predicate: Boolean = task.process(chatLog)
timerSpec = task.toSpec("detection", "$predicate")
}
is ReductionTask -> {
val reduced: Long = task.process(chatLog)
timerSpec = task.toSpec("reduction", "$reduced")
}
is TransformTask -> {
task.process(chatLog)
.wasModified.toString().let {
Metrics.counter(TRANSFORM, brokerTags.and(task.timerTags()).and("transformed", it)).increment()
timerSpec = task.toSpec("transformed", it)
}
}
}
}
this
}.apply {
taskTimerPlus(brokerTags).record(taskDuration, TimeUnit.NANOSECONDS)
}
}
}
}
inner class TaskResult {
var taskDuration: Long = 1
var timerSpec: Pair<String, Tags> = Pair("GenericTaskResultName", Tags.empty())
fun name() = "${timerSpec.first}$TIME"
fun taskTimerPlus(otherTags: Tags): Timer = Metrics.timer(name(), timerSpec.second.and(otherTags))
}
}