OchRSocketService.kt
package com.blizzard.content.filter.chatreplay.sink
import com.blizzard.content.filter.chatreplay.config.EXECUTION
import com.blizzard.content.filter.chatreplay.config.ExternalServices
import com.blizzard.content.filter.chatreplay.config.INTERACTION
import com.blizzard.content.filter.chatreplay.config.METRICS_PREFIX
import com.blizzard.content.filter.chatreplay.config.MODE
import com.blizzard.content.filter.chatreplay.config.SERVICETARGET
import com.blizzard.content.filter.chatreplay.config.TRANSFORM
import com.blizzard.content.filter.chatreplay.config.TRANSPORT
import com.blizzard.content.filter.chatreplay.domain.TransformResponse
import com.blizzard.content.filter.chatreplay.domain.failedTransformResponse
import com.blizzard.content.filter.chatreplay.domain.log
import com.blizzard.content.filter.chatreplay.domain.parseId
import com.blizzard.content.filter.chatreplay.metrics.toMap
import com.blizzard.content.filter.chatreplay.metrics.toTags
import com.blizzard.content.filter.chatreplay.source.LogStreamService
import com.newrelic.api.agent.NewRelic
import io.micrometer.core.instrument.MeterRegistry
import io.micrometer.core.instrument.Metrics
import io.micrometer.core.instrument.Tag
import io.micrometer.core.instrument.Tags
import kotlinx.coroutines.CoroutineExceptionHandler
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.SupervisorJob
import kotlinx.coroutines.flow.catch
import kotlinx.coroutines.flow.flowOn
import kotlinx.coroutines.flow.launchIn
import kotlinx.coroutines.flow.onCompletion
import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.flow.onEmpty
import kotlinx.coroutines.flow.onStart
import kotlinx.coroutines.flow.retryWhen
import kotlinx.coroutines.flow.transform
import kotlinx.coroutines.launch
import kotlinx.coroutines.reactive.awaitFirstOrNull
import kotlinx.coroutines.reactor.awaitSingle
import org.slf4j.LoggerFactory
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty
import org.springframework.boot.context.event.ApplicationReadyEvent
import org.springframework.context.event.EventListener
import org.springframework.core.NestedRuntimeException
import org.springframework.messaging.rsocket.RSocketRequester
import org.springframework.messaging.rsocket.retrieveFlow
import org.springframework.stereotype.Service
import org.springframework.util.MimeTypeUtils
import reactor.core.publisher.Flux
import reactor.core.publisher.Mono
import reactor.util.retry.Retry
import java.time.Duration.ofMillis
import kotlin.random.Random
const val ROUTE_TRANSFORM_CHANNEL = "replace-channel-controller"
const val ROUTE_TRANSFORM_REQUEST_RESPONSE = "replace-request-response-controller"
const val PATH_RSOCKET = "/rsocket-api"
/**
* - [https://rsocket.io]
* - [https://docs.spring.io/spring-boot/docs/2.4.3/reference/html/spring-boot-features.html#boot-features-rsocket]
* - [https://docs.spring.io/spring/docs/5.2.9.RELEASE/spring-framework-reference/web-reactive.html#rsocket-spring]
*
* See file rest-client/metrics.http
*/
@Service
@ConditionalOnProperty("chat-replay.subscribers.rsocket-service.enable")
class OchRSocketService(
private val rSocketReqesterBuilder: RSocketRequester.Builder,
private val logStreamService: LogStreamService,
private val registry: MeterRegistry,
externalProperties: ExternalServices,
) {
private val cff = externalProperties.och.contentFilterFunctions
private val log = LoggerFactory.getLogger(OchRSocketService::class.java)
private val localTags = Tags.of(Tag.of(EXECUTION, "remote"), Tag.of(SERVICETARGET, "och"), Tag.of(TRANSPORT, "rsocket"), Tag.of(MODE, "streamlistener"), Tag.of(INTERACTION, "request-channel"))
private val transformedCounter = Metrics.counter(TRANSFORM, localTags.and("transformed", "true"))
private val untransformedCounter = Metrics.counter(TRANSFORM, localTags.and("transformed", "false"))
private val retryCounter = Metrics.counter("$METRICS_PREFIX.rsocket.request-channel.retry", localTags)
private val logStreamRetryCounter = Metrics.counter("$METRICS_PREFIX.rsocket.request-channel.logstream.retry", localTags)
private val rSocketScope: CoroutineScope = CoroutineScope(SupervisorJob() + Dispatchers.IO + CoroutineExceptionHandler { _, e ->
log.error("Exception from rSocketScope handler: ${e.message}")
})
private val handlingScope: CoroutineScope = CoroutineScope(SupervisorJob() + Dispatchers.Default + CoroutineExceptionHandler { _, e ->
log.error("Exception from handlingScope handler: ${e.message}")
})
private var requesterMono = Mono.empty<RSocketRequester>()
@EventListener(condition = "@environment.getProperty('chat-replay.subscribers.rsocket-service.enable')")
fun onApplicationEvent(readyEvent: ApplicationReadyEvent) {
log.info("ApplicationReady: ${javaClass.simpleName}")
launchChannelConnection()
}
private fun requesterMono(): Mono<RSocketRequester> {
log.info("Caching RSocketRequester for REQUEST_CHANNEL")
return Mono.just(rSocketReqesterBuilder
.rsocketConnector { it.configureConnector(log, localTags, registry) }
.dataMimeType(MimeTypeUtils.APPLICATION_JSON)
.websocket(resolveWsUri(cff).also {
log.info("RSocket REQUEST_CHANNEL connecting to $it")
}))
}
private fun launchChannelConnection() =
rSocketScope.launch {
log.info("Starting RSocket Channel connection...")
/* If we initialize this connection before app is ready, we can "steal" the
connection from the previously running instance. And if this app fails to start up,
no one is happy.
*/
if (requesterMono.awaitFirstOrNull() == null) {
requesterMono = requesterMono()
}
requesterMono.awaitSingle()
.route(ROUTE_TRANSFORM_CHANNEL)
.data(logStreamService.logStream()
.retryWhen(Retry.backoff(Long.MAX_VALUE, ofMillis(100)).doBeforeRetry {
log.error("Retrying [logStream] attempt [${it.totalRetries()}] due to ${it.failure()} : ${it.failure().message}")
logStreamRetryCounter.increment()
})
.map { chatLog -> chatLog.text ?: "" }.addReactiveMetrics())
/* End of RSocket config, beginning of Kotlin Suspending Flow processing */
.retrieveFlow<TransformResponse>()
.retryWhen { cause, attempt ->
log.error("Retrying attempt [$attempt] due to $cause : ${cause.message}")
retryCounter.increment()
true
}
.flowOn(Dispatchers.IO)
.onEach(::incrementCounters)
.onEach { if (log.isDebugEnabled) log.debug("Handling : ${it.parseId()} ${it.log()}") }
.transform {
try {
//occasionallyThrowException(/* To demonstrate error-handling */)
emit(it)
} catch (o_O: Exception) {
recordError(o_O, "Error: ${o_O.message}")
// Skip, continue
}
}
.catch /*Some other exception */{ o_O ->
recordError(o_O, "Actual exception: ${o_O.message}")
emit(failedTransformResponse)
}
.onStart { log.info("RSocket REQUEST_CHANNEL connected to ${cff.host}:${cff.port}$PATH_RSOCKET") }
.onCompletion { log.info("Stream complete") }
.onEmpty { log.info("onEmpty") }
.launchIn(handlingScope)
}
private fun occasionallyThrowException(): Unit =
if (Random.nextInt(10) == 0) throw RuntimeException("Random Exception") else Unit
private fun incrementCounters(response: TransformResponse) {
when {
response.wasModified -> transformedCounter.increment()
else -> untransformedCounter.increment()
}
}
private fun recordError(t: Throwable, s: String) {
log.error("$s: ${t.message}")
val contextTags: MutableMap<String, String> = mutableMapOf(
"class" to t.javaClass.simpleName,
"message" to "${t.message}",
)
if (t is NestedRuntimeException) {
contextTags += mutableMapOf(
"cause.class" to t.mostSpecificCause.javaClass.simpleName,
"cause.message" to "${t.mostSpecificCause.message}",
)
}
Metrics.counter("$TRANSFORM.error", localTags + contextTags.toTags()).increment()
NewRelic.noticeError(t, localTags.toMap() + contextTags)
}
/**
* - [https://projectreactor.io/docs/core/milestone/reference/#_publisher_metrics]
* - [https://projectreactor.io/docs/core/milestone/reference/#metrics]
*
* These metrics result in metric names of:
*
* name + ".onNext.delay"
* name + ".requested"
* name + ".subscribed"
*
*/
private fun <T : Any> Flux<T>.addReactiveMetrics(): Flux<T> {
var flux = name(TRANSFORM)
localTags.forEach {
flux = flux.tag(it.key, it.value)
}
return flux.metrics()
}
}