TaskBroker.kt

package com.blizzard.content.filter.chatreplay.sink


import com.blizzard.content.filter.chatreplay.config.CHATLOG_PREFIX

import com.blizzard.content.filter.chatreplay.config.MODE

import com.blizzard.content.filter.chatreplay.config.TIME

import com.blizzard.content.filter.chatreplay.config.TRANSFORM

import com.blizzard.content.filter.chatreplay.domain.ChatLog

import com.blizzard.content.filter.chatreplay.metrics.timerTags

import com.blizzard.content.filter.chatreplay.metrics.toSpec

import com.blizzard.content.filter.chatreplay.sink.task.ChatLogTask

import com.blizzard.content.filter.chatreplay.sink.task.PredicateTask

import com.blizzard.content.filter.chatreplay.sink.task.ProcessTask

import com.blizzard.content.filter.chatreplay.sink.task.ReductionTask

import com.blizzard.content.filter.chatreplay.sink.task.TransformTask

import com.blizzard.content.filter.chatreplay.source.LogStreamService

import io.micrometer.core.instrument.MeterRegistry

import io.micrometer.core.instrument.Metrics

import io.micrometer.core.instrument.Tag

import io.micrometer.core.instrument.Tags

import io.micrometer.core.instrument.Timer

import kotlinx.coroutines.CoroutineScope

import kotlinx.coroutines.Dispatchers

import kotlinx.coroutines.launch

import org.slf4j.LoggerFactory

import org.springframework.beans.factory.annotation.Value

import org.springframework.boot.context.event.ApplicationReadyEvent

import org.springframework.context.event.EventListener

import org.springframework.stereotype.Service

import java.util.concurrent.TimeUnit

import kotlin.system.measureNanoTime


/**

* Some work on ChatLogs must be done individually, per ChatLog

*

* This class centralizes the dispatching of those individual ChatLog tasks into Kotlin coroutines for better concurrent performance.

*

* For work that is done on the stream of ChatLogs from LogStreamService, see other subscribers.

*

* @see ChatLogTask

*/

@Service

class TaskBroker(

private val logStreamService: LogStreamService,

private val chatLogTasks: List<ChatLogTask>,

meterRegistry: MeterRegistry,

) {


private val log = LoggerFactory.getLogger(javaClass)


init {

log.info("Brokering ChatLogs for Tasks:")

chatLogTasks.forEach {

log.info("...Task: ${it.javaClass.simpleName}")

}

}


@Value("\${chat-replay.task-broker.enable-brokering}")

private var brokeringEnabled: Boolean = true


private val brokerTags = Tags.of(Tag.of(MODE, "taskbroker"))


/**

* When this field is created using `Metrics.counter()` instead, the metrics are slightly off in EndToEndIT and TaskBrokerMetricsIT tests.

* When created with `registry.counter()`, it matches perfectly.

* /shrug

*/

private val brokeredTotalCounter = meterRegistry.counter("$CHATLOG_PREFIX.taskbroker.brokered", brokerTags)


@EventListener(condition = "@environment.getProperty('chat-replay.subscribers.task-broker.enable')")

fun onApplicationEvent(readyEvent: ApplicationReadyEvent) {

log.info("ApplicationReady: ${javaClass.simpleName}")

CoroutineScope(Dispatchers.Default).launch {

log.info("${LogStreamService::class.java.simpleName} is ready")

brokerChatLogs()

}

}


private fun brokerChatLogs() {

logStreamService.logStream()

.doOnSubscribe {

if (brokeringEnabled) log.info("Now brokering chat logs")

}

.subscribe { chatLog ->

brokeredTotalCounter.increment()

if (brokeringEnabled) {

forEachChatLog(chatLog)

}

}

}


// https://micrometer.io/docs/concepts#_global_registry

private fun forEachChatLog(chatLog: ChatLog) {

if (log.isDebugEnabled) log.debug("Brokering: ${chatLog.id}")

chatLogTasks.asSequence().forEach { task ->

task.taskScope().launch {

with(TaskResult()) {

taskDuration = measureNanoTime {

when (task) {

is ProcessTask -> {

val processed: String = task.process(chatLog)

timerSpec = task.toSpec(/*Too much cardinality to put the processed value here*/)

}

is PredicateTask -> {

val predicate: Boolean = task.process(chatLog)

timerSpec = task.toSpec("detection", "$predicate")

}

is ReductionTask -> {

val reduced: Long = task.process(chatLog)

timerSpec = task.toSpec("reduction", "$reduced")

}

is TransformTask -> {

task.process(chatLog)

.wasModified.toString().let {

Metrics.counter(TRANSFORM, brokerTags.and(task.timerTags()).and("transformed", it)).increment()

timerSpec = task.toSpec("transformed", it)

}

}

}

}

this

}.apply {

taskTimerPlus(brokerTags).record(taskDuration, TimeUnit.NANOSECONDS)

}

}

}

}


inner class TaskResult {

var taskDuration: Long = 1

var timerSpec: Pair<String, Tags> = Pair("GenericTaskResultName", Tags.empty())

fun name() = "${timerSpec.first}$TIME"

fun taskTimerPlus(otherTags: Tags): Timer = Metrics.timer(name(), timerSpec.second.and(otherTags))

}


}