This commit is contained in:
2026-02-20 17:28:12 +03:00
parent 1cfad2ca4e
commit 8d6a76ccb5
4 changed files with 460 additions and 463 deletions
@@ -15,6 +15,7 @@ import io.ktor.client.request.setBody
import io.ktor.client.statement.bodyAsText
import io.ktor.http.ContentType
import io.ktor.http.HttpStatusCode
import io.ktor.http.contentLength
import io.ktor.http.contentType
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
@@ -26,6 +27,7 @@ import kotlinx.coroutines.flow.asStateFlow
import kotlinx.coroutines.isActive
import kotlinx.coroutines.launch
import org.json.JSONObject
import ru.risdeveau.pixeldragon.AccountData
import ru.risdeveau.pixeldragon.AccountData.syncLastBatch
import ru.risdeveau.pixeldragon.baseUrl
import ru.risdeveau.pixeldragon.client
@@ -48,18 +50,28 @@ class MatrixSyncService {
// }
fun startSync() {
if (AccountData.token == null)
Log.wtf("MatrixSyncService", "Token is null")
if (AccountData.userId == null)
Log.wtf("MatrixSyncService", "User ID is null")
if (AccountData.homeserver == null)
Log.wtf("MatrixSyncService", "Homeserver is null")
Log.i("MatrixSyncService", "Start syncing")
// if (!isInitialized)
// throw IllegalStateException("Sync service not initialized")
if (syncJob?.isActive == true) return
syncJob = CoroutineScope(Dispatchers.IO).launch {
if (syncLastBatch == "") {
if (syncLastBatch == null) {
Log.i("MatrixSyncService", "Init syncing")
_syncState.value = SyncState.Syncing
try {
processSyncResponse(initialSync())
initialSync()
_syncState.value = SyncState.Idle
} catch (e: Exception) {
Log.e("MatrixSyncService", "Initial sync error", e)
_syncState.value = SyncState.Error(e.message)
}
}
@@ -101,44 +113,44 @@ class MatrixSyncService {
fun isSyncActive(): Boolean = syncJob?.isActive == true
private suspend fun processSyncResponse(response: JSONObject) {
// Log.d("syncResponse", response.toString(2))
// val newMessages = mutableListOf<Message>()
// val roomUpdates = mutableListOf<RoomUpdate>()
//
// response.rooms?.join?.forEach { (roomId, roomData) ->
// // Process timeline events (messages)
// roomData.timeline?.events?.forEach { event ->
// val message = event.toMessage(roomId)
// database.messageDao().insertMessage(message)
// newMessages.add(message)
// }
//
// // Process room state updates
// roomData.state?.events?.forEach { event ->
// when (event.type) {
// "m.room.name", "m.room.avatar" -> {
// roomUpdates.add(RoomUpdate(roomId, event.type, event.content))
// }
// }
// }
//
// // Process ephemeral events (typing, receipts)
// roomData.ephemeral?.events?.forEach { event ->
// when (event.type) {
// "m.typing" -> handleTypingEvent(roomId, event)
// "m.receipt" -> handleReceiptEvent(roomId, event)
// }
// }
// }
//
// // Notify the app about new data
// if (newMessages.isNotEmpty()) {
// _newMessages.emit(newMessages)
// }
// if (roomUpdates.isNotEmpty()) {
// _roomUpdates.emit(roomUpdates)
// }
private suspend fun parseSyncResponse(response: JSONObject) {
Log.v("syncResponse", response.toString(2))
val newMessages = mutableListOf<Message>()
val roomUpdates = mutableListOf<RoomUpdate>()
response.rooms?.join?.forEach { (roomId, roomData) ->
// Process timeline events (messages)
roomData.timeline?.events?.forEach { event ->
val message = event.toMessage(roomId)
database.messageDao().insertMessage(message)
newMessages.add(message)
}
// Process room state updates
roomData.state?.events?.forEach { event ->
when (event.type) {
"m.room.name", "m.room.avatar" -> {
roomUpdates.add(RoomUpdate(roomId, event.type, event.content))
}
}
}
// Process ephemeral events (typing, receipts)
roomData.ephemeral?.events?.forEach { event ->
when (event.type) {
"m.typing" -> handleTypingEvent(roomId, event)
"m.receipt" -> handleReceiptEvent(roomId, event)
}
}
}
// Notify the app about new data
if (newMessages.isNotEmpty()) {
_newMessages.emit(newMessages)
}
if (roomUpdates.isNotEmpty()) {
_roomUpdates.emit(roomUpdates)
}
}
}
@@ -156,70 +168,42 @@ suspend fun sync(): JSONObject {
}
suspend fun initialSync(): JSONObject {
val initialFilter = """
fetchRoomMeta()
}
/**
* Fetch rooms metadata during initial sync
*/
suspend fun fetchRoomMeta() {
val filterId = getFilterId("""
{
"room": {
"state": {
"types": [
"m.room.name",
"m.room.avatar",
"m.room.canonical_alias",
"m.room.encryption",
"m.room.tombstone",
"m.room.power_levels",
"m.room.member"
],
"lazy_load_members": true,
"not_types": []
"room": {
"state": {
"types": [
"m.room.name",
"m.room.avatar",
"m.room.canonical_alias",
"m.room.member"
],
"lazy_load_members": true
},
"timeline": {
"limit": 0,
"types": []
},
"ephemeral": {
"types": []
},
"include_leave": false
},
"timeline": {
"limit": 10,
"types": ["m.room.message"],
"not_types": [
"m.room.name",
"m.room.avatar",
"m.room.canonical_alias",
"m.room.encryption",
"m.room.tombstone",
"m.room.power_levels",
"m.room.member",
"m.call.*"
]
},
"ephemeral": {
"types": [],
"not_types": ["m.typing", "m.receipt"]
},
"include_leave": false
},
"presence": {
"types": [],
"not_types": ["*"]
},
"event_format": "client",
"event_fields": [
"type",
"content",
"sender",
"state_key",
"room_id",
"origin_server_ts"
]
"presence": {
"types": []
}
}
""".trimIndent()
var r = client.post("$baseUrl/user/${ME!!.userId}/filter") {
setBody(initialFilter)
contentType(ContentType.Application.Json)
bearerAuth(token)
}
if (r.status != HttpStatusCode.OK)
throw IllegalStateException("Failed to create a filter")
val filterId = JSONObject(r.bodyAsText()).getString("filter_id")
""".trimIndent())
// val filterId = "vmNk"
/*val*/ r = client.get("$baseUrl/sync") {
val r = client.get("$baseUrl/sync") {
bearerAuth(token)
url {
parameter("filter", filterId)
@@ -228,10 +212,31 @@ suspend fun initialSync(): JSONObject {
if (r.status != HttpStatusCode.OK)
throw IllegalStateException("Failed to sync")
Log.d("initialSync", "Response size: ${r.bodyAsText().length}")
Log.v("initialSync", "Response size: ${r.contentLength()}")
r.contentLength()?.let {
if (it >= 50*1024*1024)
Log.w("initialSync", "Response size is too large")
}
val json = JSONObject(r.bodyAsText())
syncLastBatch = json.getString("next_batch")
}
return json
/**
* Send a filter to the server and get its id
* @param filter JSON Filter
* @return Filter ID
*/
suspend fun getFilterId(filter: String): String {
val userId = AccountData.userId
if (userId == null)
Log.wtf("getFilter", "user_id is not defined")
val r = client.post("$baseUrl/user/$userId/filter") {
setBody(filter)
contentType(ContentType.Application.Json)
bearerAuth(token)
}
return JSONObject(r.bodyAsText()).getString("filter_id")
}