LogStreamService.kt
package com.blizzard.content.filter.chatreplay.source
import com.blizzard.content.filter.chatreplay.config.CHATLOG_PREFIX
import com.blizzard.content.filter.chatreplay.config.METRICS_PREFIX
import com.blizzard.content.filter.chatreplay.config.SourceProperties
import com.blizzard.content.filter.chatreplay.config.flashbackInstant
import com.blizzard.content.filter.chatreplay.domain.ChatLog
import com.blizzard.content.filter.chatreplay.domain.hadOcurredAt
import io.micrometer.core.instrument.Counter
import io.micrometer.core.instrument.Gauge
import io.micrometer.core.instrument.MeterRegistry
import io.micrometer.core.instrument.Metrics
import io.micrometer.core.instrument.Tags
import org.jetbrains.annotations.NotNull
import org.slf4j.LoggerFactory
import org.springframework.beans.factory.annotation.Value
import org.springframework.boot.context.event.ApplicationReadyEvent
import org.springframework.context.event.EventListener
import org.springframework.stereotype.Service
import reactor.core.Disposable
import reactor.core.publisher.Flux
import reactor.core.publisher.Sinks
import reactor.core.scheduler.Schedulers
import reactor.util.retry.Retry
import java.time.Duration.ofMillis
import java.util.concurrent.ConcurrentLinkedQueue
import kotlin.random.Random.Default.nextDouble
@Service
class LogStreamService(
meterRegistry: MeterRegistry,
private val sourceProperties: SourceProperties,
@Value("\${git.commit.id}")
private val commitId: String// = "unknown"
) {
private val log = LoggerFactory.getLogger(javaClass)
private val driverLog = LoggerFactory.getLogger("${javaClass.name}.Driver")
private var logDriver: Disposable? = null
private val logStorage = ConcurrentLinkedQueue<ChatLog>()
private val driverScheduler = Schedulers.newBoundedElastic(2, 1000, "driver")
private val multiScheduler = Schedulers.newBoundedElastic(10, 1000, "multic")
private val logSink: Sinks.Many<ChatLog> = Sinks.many().multicast().directBestEffort()
private val publishCounter: Counter = meterRegistry.counter("$CHATLOG_PREFIX.logstream.published")
init {
Gauge.builder("$CHATLOG_PREFIX.fetch.queuesize", logStorage::size)
.description("Count of chatlogs collected, waiting to be processed.")
.register(meterRegistry)
// Increment it to some random value 0-100. Will probably be different from last restart
meterRegistry
.counter("$METRICS_PREFIX.application.restart", Tags.of("commitid", commitId))
.increment((nextDouble(0.0, 100.0) + 20.0).also {
log.info("Reporting restart: $it $commitId")
})
}
@EventListener
fun onApplicationEvent(@Suppress("UNUSED_PARAMETER") readyEvent: ApplicationReadyEvent) {
// See function comments
startDriver()
}
/**
* This flux interval drives sink emission
*/
private fun startDriver() {
Metrics.counter("$METRICS_PREFIX.logstream.driver.start").increment()
logDriver = Flux.interval(ofMillis(50))
.publishOn(driverScheduler)
.retryWhen(Retry.backoff(Long.MAX_VALUE, ofMillis(100))
.doBeforeRetry {
driverLog.error("Driver Retrying attempt [${it.totalRetries()}] due to ${it.failure()} : ${it.failure().message}")
Metrics.counter("$METRICS_PREFIX.logstream.driver.retry").increment()
})
.subscribe {
while (logStorage.peek()?.hadOccurred() == true) {
logSink.emitNext(logStorage.poll()) { _, _ -> true }
publishCounter.increment()
}
}
driverLog.info("ChatLog driver started.")
}
@Synchronized
fun logStream(): Flux<ChatLog> {
return logSink
.asFlux()
.retryWhen(Retry.backoff(Long.MAX_VALUE, ofMillis(100))
.doBeforeRetry {
log.error("Retrying attempt [${it.totalRetries()}] due to ${it.failure()} : ${it.failure().message}")
Metrics.counter("$CHATLOG_PREFIX.logstream.retry").increment()
})
.publishOn(multiScheduler)
}
@Synchronized
fun addAll(@NotNull incomingLogs: Collection<ChatLog>) {
// If, somehow, incomingLogs are empty
if (incomingLogs.isEmpty()) {
log.error("No chatLogs received to add. Queue size: ${logStorage.size}")
return
}
val sortedIncomingLogs = incomingLogs.sorted()
/*
In the infrequent case where incoming logs (which should already be in the queue) are
just now arriving (ie on app startup, if the first query takes significantly longer than the second),
then the arrangement will resemble this example:
last() |=========> peek() first()[0]
logStorage: 9 8 7 6 5
incomingLogs: 4 3 2 1 0 ^
Earlier logs are being inserted "after" later logs.
Time will need to pass to "9" before logs with timestamp "0" will be processed.
Unless the queue is checked and sorted first.
*/
if (logStorage.isNotEmpty() && sortedIncomingLogs.last().timestamp < logStorage.first().timestamp) {
val totalBefore = incomingLogs.size + logStorage.size
log.warn("Inverted insertion order detected in logStorage. Correcting...")
val fromLogStorage = logStorage.sorted()
logStorage.clear()
logStorage.addAll((fromLogStorage + sortedIncomingLogs).sorted())
when (logStorage.size) {
totalBefore -> log.info("Correction completed. Sizes match")
else -> log.error("Correction completed. Sizes do not match.")
}
} else {
logStorage.addAll(sortedIncomingLogs)
}
log.info("Added ${incomingLogs.size} chatLogs. New queue size: ${logStorage.size}")
}
private fun ChatLog.hadOccurred() = hadOcurredAt(sourceProperties.flashbackInstant())
}