Commit 2fbd06c3 authored by seykron's avatar seykron

Renames project from eye-of-cthulhu to iris

* Moves the bot commands from Antifaz to Iris
parent 577d8315
......@@ -10,9 +10,9 @@ RUN useradd \
ARG JAR_FILE
# Add the service itself
ADD target/$JAR_FILE /usr/share/eye-of-cthulhu/eye-of-cthulhu.jar
ADD target/$JAR_FILE /usr/share/iris/iris.jar
# Drop privs
USER app
ENTRYPOINT ["java", "-jar", "/usr/share/eye-of-cthulhu/eye-of-cthulhu.jar"]
ENTRYPOINT ["java", "-jar", "/usr/share/iris/iris.jar"]
version: '3'
services:
h2:
image: "oscarfonts/h2"
ports:
- 7777:1521
volumes:
- /tmp/h2-data-iris:/opt/h2-data:rw
......@@ -3,15 +3,15 @@
<modelVersion>4.0.0</modelVersion>
<groupId>be.rlab</groupId>
<artifactId>eye-of-cthulhu</artifactId>
<artifactId>iris</artifactId>
<packaging>jar</packaging>
<version>1.2.2-SNAPSHOT</version>
<!-- SCM -->
<scm>
<connection>scm:git:git@git.rlab.be:seykron/eye-of-cthulhu.git</connection>
<developerConnection>scm:git:git@git.rlab.be:seykron/eye-of-cthulhu.git</developerConnection>
<url>https://git.rlab.be/seykron/eye-of-cthulhu</url>
<connection>scm:git:git@git.rlab.be:seykron/iris.git</connection>
<developerConnection>scm:git:git@git.rlab.be:seykron/iris.git</developerConnection>
<url>https://git.rlab.be/seykron/iris</url>
<tag>HEAD</tag>
</scm>
......@@ -25,7 +25,7 @@
<reactor-test.version>3.2.5.RELEASE</reactor-test.version>
<assertj.version>3.11.1</assertj.version>
<mockito.version>2.28.2</mockito.version>
<mockito-kotlin.version>1.6.0</mockito-kotlin.version>
<mockito-kotlin.version>2.1.0</mockito-kotlin.version>
<bytebuddy.version>[1.9,)</bytebuddy.version>
<qos.logback.logback-classic.version>1.0.11</qos.logback.logback-classic.version>
......@@ -38,6 +38,7 @@
<netty.version>0.8.9.RELEASE</netty.version>
<tehanu.version>2.0.2</tehanu.version>
<h2.version>1.4.197</h2.version>
<zero-alloc-hash.version>0.9</zero-alloc-hash.version>
<kotlin.version>1.3.40</kotlin.version>
<kotlin-coroutines.version>1.1.1</kotlin-coroutines.version>
......@@ -121,7 +122,7 @@
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.nhaarman</groupId>
<groupId>com.nhaarman.mockitokotlin2</groupId>
<artifactId>mockito-kotlin</artifactId>
<version>${mockito-kotlin.version}</version>
<scope>test</scope>
......@@ -193,6 +194,11 @@
<artifactId>config</artifactId>
<version>${typesafe.version}</version>
</dependency>
<dependency>
<groupId>net.openhft</groupId>
<artifactId>zero-allocation-hashing</artifactId>
<version>${zero-alloc-hash.version}</version>
</dependency>
<!-- Web & HTTP-->
<dependency>
......@@ -222,7 +228,7 @@
</dependencies>
<build>
<finalName>eye-of-cthulhu</finalName>
<finalName>iris</finalName>
<sourceDirectory>${project.basedir}/src/main/kotlin</sourceDirectory>
<testSourceDirectory>${project.basedir}/src/test/kotlin</testSourceDirectory>
<plugins>
......@@ -286,7 +292,7 @@
<configuration>
<archive>
<manifest>
<mainClass>be.rlab.eoc.ApplicationKt</mainClass>
<mainClass>be.rlab.iris.ApplicationKt</mainClass>
</manifest>
</archive>
<descriptorRefs>
......@@ -317,7 +323,7 @@
</execution>
</executions>
<configuration>
<repository>registry.rlab.be/seykron/eye-of-cthulhu</repository>
<repository>registry.rlab.be/seykron/iris</repository>
<tag>${project.version}</tag>
<buildArgs>
<JAR_FILE>${project.build.finalName}-jar-with-dependencies.jar</JAR_FILE>
......
package be.rlab.eoc.domain
import be.rlab.eoc.domain.model.Notification
import be.rlab.tehanu.domain.Memory
class NotificationService(
memory: Memory
) {
private var notifications: List<Notification> by memory.slot("notifications", listOf<Notification>())
fun push(notification: Notification) {
notifications = notifications + notification
}
}
package be.rlab.eoc
package be.rlab.iris
import be.rlab.eoc.config.DomainBeans
import be.rlab.eoc.config.WebConfig
import be.rlab.iris.config.CommandBeans
import be.rlab.iris.config.DomainBeans
import be.rlab.iris.config.WebConfig
import be.rlab.iris.domain.NotificationService
import be.rlab.tehanu.config.BotBeans
import be.rlab.tehanu.config.DataSourceBeans
import be.rlab.tehanu.config.MemoryBeans
import be.rlab.tehanu.domain.Tehanu
import be.rlab.tehanu.util.persistence.DataSourceInitializer
import org.slf4j.Logger
import org.slf4j.LoggerFactory
import org.slf4j.bridge.SLF4JBridgeHandler
import org.springframework.beans.factory.getBean
import org.springframework.context.annotation.AnnotationConfigApplicationContext
import org.springframework.http.server.reactive.HttpHandler
import org.springframework.http.server.reactive.ReactorHttpHandlerAdapter
import org.springframework.web.server.adapter.WebHttpHandlerBuilder
import reactor.netty.http.server.HttpServer
import java.time.Duration
class Application(port: Int = 8090) {
......@@ -26,6 +30,8 @@ class Application(port: Int = 8090) {
DataSourceBeans.beans().initialize(this)
MemoryBeans.beans().initialize(this)
DomainBeans.beans().initialize(this)
BotBeans.beans().initialize(this)
CommandBeans.beans().initialize(this)
register(WebConfig::class.java)
refresh()
}
......@@ -34,6 +40,10 @@ class Application(port: Int = 8090) {
val dataSourceInitializer: DataSourceInitializer = context.getBean()
dataSourceInitializer.init()
logger.info("Starting notification polling")
val notificationService: NotificationService = context.getBean()
notificationService.poll()
httpHandler = WebHttpHandlerBuilder
.applicationContext(context)
.build()
......@@ -41,12 +51,20 @@ class Application(port: Int = 8090) {
HttpServer.create()
.port(port)
.handle(ReactorHttpHandlerAdapter(httpHandler))
.bindUntilJavaShutdown(Duration.ZERO) {
logger.info("Application started")
}
.bindNow()
logger.info("Application started")
val tehanu: Tehanu = context.getBean()
logger.info("Iris is entering Telegram")
tehanu.start()
}
}
fun main(args: Array<String>) {
// Sets up jul-to-slf4j bridge. I have not idea
// why the logging.properties strategy doesn't work.
SLF4JBridgeHandler.removeHandlersForRootLogger()
SLF4JBridgeHandler.install()
Application()
}
package be.rlab.eoc.application
package be.rlab.iris.application
import be.rlab.eoc.application.model.NotificationDTO
import be.rlab.eoc.domain.NotificationService
import be.rlab.iris.application.model.NotificationDTO
import be.rlab.iris.domain.NotificationService
import org.springframework.http.HttpStatus
import org.springframework.web.bind.annotation.*
......
package be.rlab.eoc.application.model
package be.rlab.iris.application.model
import be.rlab.eoc.domain.model.Notification
import be.rlab.iris.domain.model.Notification
data class NotificationDTO(
val userName: String,
......
package be.rlab.iris.config
import be.rlab.iris.domain.command.*
import be.rlab.tehanu.domain.model.ChatType
import org.springframework.context.support.beans
object CommandBeans {
fun beans() = beans {
bean {
ListSources(
name = "/list_sources",
scope = listOf(ChatType.PRIVATE),
timeout = -1
)
}
bean {
Subscribe(
name = "/subscribe",
scope = listOf(ChatType.PRIVATE, ChatType.GROUP),
timeout = 60000,
inboxService = ref()
)
}
bean {
EnableNotifications(
name = "/enable_notifications",
scope = listOf(ChatType.GROUP),
timeout = 60000,
memory = ref()
)
}
bean {
InboxRead(
name = "/inbox_read",
scope = listOf(ChatType.PRIVATE, ChatType.GROUP),
timeout = 60000,
inboxService = ref(),
accessControl = ref()
)
}
bean {
InboxCount(
name = "/inbox_count",
scope = listOf(ChatType.PRIVATE, ChatType.GROUP),
timeout = -1,
inboxService = ref()
)
}
}
}
package be.rlab.eoc.config
package be.rlab.iris.config
import be.rlab.eoc.domain.NotificationService
import be.rlab.eoc.util.ObjectMapperFactory
import be.rlab.iris.domain.ContentService
import be.rlab.iris.domain.InboxService
import be.rlab.iris.domain.NotificationService
import be.rlab.iris.domain.UserSubscriptionListener
import be.rlab.iris.util.ObjectMapperFactory
import be.rlab.tehanu.domain.MessageListenerDefinition
import be.rlab.tehanu.domain.model.ChatMessage
import org.springframework.context.support.beans
import org.springframework.http.converter.json.MappingJackson2HttpMessageConverter
object DomainBeans {
fun beans() = beans {
// Services
bean<InboxService>()
bean<ContentService>()
bean<NotificationService>()
bean {
MessageListenerDefinition(
messageType = ChatMessage::class,
listener = UserSubscriptionListener(
accessControl = ref(),
inboxService = ref()
)
)
}
bean {
MappingJackson2HttpMessageConverter(ObjectMapperFactory.snakeCaseMapper)
......
package be.rlab.eoc.config
package be.rlab.iris.config
import be.rlab.eoc.util.ObjectMapperFactory
import be.rlab.iris.util.ObjectMapperFactory
import org.springframework.context.annotation.ComponentScan
import org.springframework.context.annotation.Configuration
import org.springframework.http.codec.ServerCodecConfigurer
......@@ -11,7 +11,7 @@ import org.springframework.web.reactive.config.WebFluxConfigurer
@Configuration
@EnableWebFlux
@ComponentScan("be.rlab.eoc.application")
@ComponentScan("be.rlab.iris.application")
open class WebConfig : WebFluxConfigurer {
override fun configureHttpMessageCodecs(configurer: ServerCodecConfigurer) {
configurer.defaultCodecs().jackson2JsonEncoder(Jackson2JsonEncoder(ObjectMapperFactory.snakeCaseMapper))
......
package be.rlab.iris.domain
import be.rlab.tehanu.domain.Memory
import be.rlab.iris.domain.MemorySlots.CONTENT_SLOT
import be.rlab.iris.domain.model.Content
import be.rlab.iris.domain.model.ContentRecord
import net.openhft.hashing.LongHashFunction
import org.springframework.util.Base64Utils
import java.io.ByteArrayOutputStream
import java.io.InputStream
import java.util.zip.GZIPInputStream
import java.util.zip.GZIPOutputStream
class ContentService(memory: Memory) {
private var contents: Map<Long, ContentRecord> by memory.slot(CONTENT_SLOT, mapOf<Long, ContentRecord>())
fun store(content: Content): Long {
val data: String = serialize(content.data)
val contentId: Long = contentId(data)
return if (contents.containsKey(contentId)) {
contentId
} else {
val record = ContentRecord.new(
contentId = contentId,
contentType = content.contentType,
encoding = content.encoding,
data = data
)
contents = contents + (contentId to record)
contentId
}
}
fun retrieve(contentId: Long): Content? {
return contents[contentId]?.let { record ->
Content(
contentType = record.contentType,
encoding = record.encoding,
data = deserialize(record.data)
)
}
}
private fun contentId(data: String): Long =
LongHashFunction.xx().hashBytes(data.toByteArray())
private fun serialize(data: InputStream): String {
val out = ByteArrayOutputStream()
GZIPOutputStream(out).use { gzip ->
gzip.write(data.readBytes())
}
return Base64Utils.encodeToString(out.toByteArray())
}
private fun deserialize(
data: String
): InputStream {
return GZIPInputStream(
Base64Utils.decodeFromString(data).inputStream()
)
}
}
package be.rlab.iris.domain
import be.rlab.tehanu.domain.AccessControl
import be.rlab.tehanu.domain.Memory
import be.rlab.tehanu.domain.model.User
import be.rlab.iris.domain.MemorySlots.INBOXES_SLOT
import be.rlab.iris.domain.MemorySlots.MESSAGES_SLOT
import be.rlab.iris.domain.MemorySlots.SUBSCRIPTIONS_SLOT
import be.rlab.iris.domain.model.*
import org.joda.time.DateTime
import java.util.*
class InboxService(
memory: Memory,
private val accessControl: AccessControl,
private val contentService: ContentService
) {
companion object {
private const val INBOX_TAG: String = "INBOX"
}
private var messages: List<Message> by memory.slot(MESSAGES_SLOT, listOf<Message>())
private var inboxes: List<Inbox> by memory.slot(INBOXES_SLOT, listOf<Inbox>())
private var subscriptions: Map<Long, List<Subscription>> by memory.slot(
SUBSCRIPTIONS_SLOT,
mapOf<Long, List<Subscription>>()
)
fun listMessages(
userId: Long
): List<Message> {
return listMessages(userId) { _, messages ->
messages
}.flatten()
}
fun<T> listMessages(
userId: Long,
callback: (Inbox, List<Message>) -> T
): List<T> {
return subscriptions[userId]?.map { subscription ->
val inbox: Inbox = inboxes.find { inbox ->
inbox.id == subscription.inboxId
} ?: throw RuntimeException("Inbox not found for subscription: $subscription")
callback(inbox, listMessages(subscription.inboxId))
} ?: emptyList()
}
fun listSubscriptors(source: Source): List<User> {
return subscriptions.filterValues { subscriptions ->
subscriptions.any { subscription ->
subscription.source == source
}
}.map { (userId, _) ->
accessControl.findUserById(userId)
}.requireNoNulls()
}
fun getMessage(
userId: Long,
index: Int
): Message? {
return listMessages(userId).find { message ->
message.index == index
}
}
fun readContent(message: Message): Content {
return contentService.retrieve(message.contentId)
?: throw RuntimeException("Content for message ${message.messageId} not found")
}
fun count(userId: Long): Map<Source, Int> {
return subscriptions[userId]?.map { subscription ->
subscription.source to listMessages(subscription.inboxId).size
}?.toMap() ?: emptyMap()
}
fun isSubscribed(
userId: Long,
source: Source
): Boolean {
return subscriptions[userId]?.any { subscription ->
subscription.source == source
} ?: false
}
fun subscribe(
userId: Long,
source: Source
) {
val exists: Boolean = subscriptions[userId]?.any { subscription ->
subscription.source == source
} ?: false
if (!exists) {
val inbox: Inbox = resolveInbox(userId, source, Inbox.new(
userId = userId,
source = source
))
val newSubscriptions: List<Subscription> = subscriptions.getOrDefault(userId, emptyList()) +
Subscription.new(
inboxId = inbox.id,
source = source
)
subscriptions = subscriptions + (userId to newSubscriptions)
}
}
fun unsubscribe(
userId: Long,
source: Source
) {
val inbox: Inbox = resolveInbox(userId, source)
val currentSubscriptions: List<Subscription> = subscriptions.getValue(userId)
val newSubscriptions: List<Subscription> = currentSubscriptions.filter { subscription ->
subscription.inboxId != inbox.id &&
subscription.source != source
}
subscriptions = subscriptions + (userId to newSubscriptions)
}
fun broadcast(
chatId: UUID,
from: String,
subject: String,
content: Content,
timestamp: DateTime
) {
val contentId: Long = contentService.store(content)
val source: Source = Source.from(accessControl.findChatById(chatId)
?: throw RuntimeException("Chat $chatId not found"))
accessControl.listUsers(chatId).forEach { user ->
notify(
userId = user.id,
source = source,
from = from,
subject = subject,
contentId = contentId,
timestamp = timestamp
)
}
}
fun broadcast(
source: Source,
from: String,
subject: String,
content: Content,
timestamp: DateTime
) {
val contentId: Long = contentService.store(content)
subscriptions.filterValues { subscriptions ->
subscriptions.any { subscription ->
subscription.source == source
}
}.forEach { (userId, _) ->
notify(
userId = userId,
source = source,
from = from,
subject = subject,
contentId = contentId,
timestamp = timestamp
)
}
}
private fun notify(
userId: Long,
source: Source,
from: String,
subject: String,
contentId: Long,
timestamp: DateTime
) {
val inbox: Inbox = resolveInbox(userId, source)
val index: Int = listMessages(userId).maxBy { message ->
message.index
}?.index ?: 0
messages = messages + Message.new(
inboxId = inbox.id,
index = index + 1,
from = from,
subject = subject,
contentId = contentId,
tags = listOf(INBOX_TAG),
timestamp = timestamp
)
}
private fun resolveInbox(
userId: Long,
source: Source,
defaultValue: Inbox? = null
): Inbox {
return inboxes.find { inbox ->
inbox.userId == userId &&
inbox.source == source
} ?: defaultValue?.let {
inboxes = inboxes + defaultValue
defaultValue
} ?: throw RuntimeException("Cannot resolve inbox for user/source: $userId/$source")
}
private fun listMessages(inboxId: UUID): List<Message> {
return messages.filter { message ->
message.inboxId == inboxId
}
}
}
package be.rlab.iris.domain
object MemorySlots {
const val MESSAGES_SLOT: String = "messages"
const val INBOXES_SLOT: String = "inboxes"
const val SUBSCRIPTIONS_SLOT: String = "subscriptions"
const val CONTENT_SLOT: String = "content"
const val NOTIFICATIONS: String = "notifications"
const val CHANNELS_SUBSCRIPTIONS: String = "channels_subscriptions"
}
package be.rlab.iris.domain
import be.rlab.iris.domain.MemorySlots.CHANNELS_SUBSCRIPTIONS
import be.rlab.iris.domain.MemorySlots.NOTIFICATIONS
import be.rlab.iris.domain.model.Content
import be.rlab.iris.domain.model.DefaultSources
import be.rlab.iris.domain.model.Flag
import be.rlab.iris.domain.model.Notification
import be.rlab.tehanu.domain.BotAware
import be.rlab.tehanu.domain.Memory
import be.rlab.tehanu.domain.Tehanu
import be.rlab.tehanu.util.ObjectMapperFactory
import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.delay
import kotlinx.coroutines.launch
import java.util.*
class NotificationService(
memory: Memory,
private val inboxService: InboxService
): BotAware {
companion object {
private const val POLLING_DELAY: Long = 3000
}
private var notifications: List<Notification> by memory.slot(NOTIFICATIONS, listOf<Notification>())
private var channelSubscriptions: Map<UUID, List<DefaultSources>> by memory.slot(CHANNELS_SUBSCRIPTIONS, mapOf<UUID, List<DefaultSources>>())
override lateinit var tehanu: Tehanu
private val notificationHandlers: Map<DefaultSources, (Notification) -> Unit> = mapOf(
DefaultSources.SERVICE_STATUS to this::serviceStatus
)
fun push(notification: Notification) {
notifications = notifications + notification
}
fun poll() = GlobalScope.launch {
while (true) {
delay(POLLING_DELAY)
notifications = notifications.filter { notification ->
notification.flags.contains(Flag.UNSEEN)
}.map { notification ->
val defaultSource: DefaultSources = DefaultSources.fromEventType(notification.eventType)
val handler: (Notification) -> Unit = notificationHandlers.getOrDefault(
defaultSource, this@NotificationService::broadcast
)
handler(notification)
notification.markAsSeen()
}
}
}
private fun broadcast(notification: Notification) {
val defaultSource: DefaultSources = DefaultSources.fromEventType(notification.eventType)
if (defaultSource != DefaultSources.UNKNOWN) {
inboxService.broadcast(
source = defaultSource.toSource(),
from = notification.userName,
subject = notification.subject,
content = Content.yaml(
ObjectMapperFactory.yamlMapper.writeValueAsString(notification.additionalData)
),
timestamp = notification.timestamp
)
inboxService.listSubscriptors(defaultSource.toSource()).forEach { subscriptor ->
tehanu.sendMessage(subscriptor, "Tenés un mensaje nuevo desde: $defaultSource")
}
}
}
private fun serviceStatus(notification: Notification) {
channelSubscriptions.filterValues { subscriptions ->
subscriptions.contains(DefaultSources.SERVICE_STATUS)
}.forEach { (chatId, _) ->
val summary: String = notification.summary ?: notification.additionalData.entries.joinToString("; ")
tehanu.sendMessage(chatId, "Servicios caidos: $summary")
}
broadcast(notification)
}
}
package be.rlab.iris.domain
import be.rlab.iris.domain.model.Source
import be.rlab.tehanu.domain.AccessControl
import be.rlab.tehanu.domain.MessageContext