Compare commits

..

37 Commits

Author SHA1 Message Date
Ginger c376fce725 chore(sync/v3): Remove unused imports 2025-12-03 16:04:32 +00:00
Ginger da36604163 fix(sync/v3): Don't send rejected invites on initial syncs 2025-12-03 16:04:32 +00:00
Ginger fa74747ab1 refactor(sync/v3): Extract left room timeline logic into its own function 2025-12-03 16:04:32 +00:00
Ginger 07199f9f17 fix(sync/v3): Don't send dummy leaves on an initial sync 2025-12-03 16:04:32 +00:00
Ginger 2f38de16f6 chore: Formatting 2025-12-03 16:04:32 +00:00
ginger 5c162fdb3a fix: Nitpicky comment reword 2025-12-03 16:04:32 +00:00
Ginger 9e60bfa365 fix: Bump max startup time to ten minutes in the systemd unit 2025-12-03 16:04:32 +00:00
Ginger 5b959fca1c chore(sync/v3): More goat sacrifices 2025-12-03 16:04:32 +00:00
Ginger a6d325440c refactor(sync/v3): Split load_joined_room into smaller functions 2025-12-03 16:04:32 +00:00
ginger 6246c11265 fix: Correct error message 2025-12-03 16:04:32 +00:00
Ginger 852bf99d34 fix(sync/v3): Add a workaround for matrix-js-sdk/5071 2025-12-03 16:04:32 +00:00
Ginger d2cc2fb19b fix(sync/v3): Stop ignoring leave cache deserialization failures 2025-12-03 16:04:32 +00:00
Ginger c2449bde74 fix(sync/v3): Do not include the last membership event when syncing left rooms 2025-12-03 16:04:32 +00:00
Ginger c89aa4503e chore(sync/v3): Sacrifice a goat to clippy 2025-12-03 16:04:32 +00:00
Ginger f71cfd18a5 fix(sync/v3): Cache shortstatehashes to speed up migration 2025-12-03 16:04:32 +00:00
Ginger 9a27bccc8e fix(sync/v3): Implement a migration for the userroomid_leftstate table 2025-12-03 16:04:32 +00:00
Ginger fb66356154 fix(sync/v3): Fix invite filtering for federated invites 2025-12-03 16:04:32 +00:00
Ginger 3b8b9d4b5c feat(sync/v3): Remove TL size config option in favor of using the sync filter 2025-12-03 16:04:32 +00:00
Ginger b20000fcf3 chore(sync/v3): Fix clippy lints 2025-12-03 16:04:32 +00:00
Ginger fe1efe0787 fix(sync/v3): Remove mysterious membership event manipulation code 2025-12-03 16:04:32 +00:00
Ginger 08213038a9 fix(sync/v3): Properly sync room heroes 2025-12-03 16:04:32 +00:00
Ginger ad2118e371 chore(sync/v3): Use "build_*" terminology instead of "calculate_*" 2025-12-03 16:04:32 +00:00
Ginger be743ec70a chore(sync/v3): Use more descriptive names for SyncContext properties 2025-12-03 16:04:32 +00:00
Ginger eba5f16e09 chore: Remove unneeded comment 2025-12-03 16:04:32 +00:00
Ginger 5fb49d8668 fix: Use prepare_lazily_loaded_members for joined rooms
Also, don't take read receipts into consideration for lazy loading.
Synapse doesn't do this and they're making initial syncs very large.
2025-12-03 16:04:32 +00:00
Ginger 19e895b57f chore: Clippy fixes 2025-12-03 16:04:32 +00:00
Jade Ellis 5932efa92d feat: Typing notifications in simplified sliding sync
What's missing? Being able to use separate rooms & lists for typing
indicators.
At the moment, we use the same ones as we use for the timeline, as
todo_rooms is quite intertwined. We need to disentangle this to get that
functionality, although I'm not sure if clients use it.
2025-12-03 16:04:32 +00:00
Ginger 1afa8413a2 feat: Add a config option to change the max TL size for legacy sync 2025-12-03 16:04:32 +00:00
Ginger 31cc888119 fix: Set limited to true for newly joined rooms again 2025-12-03 16:04:32 +00:00
Ginger 1ad60df7a6 fix: Properly sync left rooms
- Remove most usages of `update_membership` in favor
  of directly calling the `mark_as_*` functions
- Store the leave membership event as the value in the
  `userroomid_leftstate` table
- Use the `userroomid_leftstate` table to synchronize the
  timeline and state for left rooms if possible
2025-12-03 16:04:32 +00:00
Ginger afd115eedc fix: Properly sync newly joined rooms 2025-12-03 16:04:32 +00:00
Ginger 1444f43fa7 fix(sync/v3): Further cleanup + improve incremental sync consistency 2025-12-03 16:04:32 +00:00
Ginger 91d07a9bfc fix: Correctly send limited timelines again 2025-12-03 16:04:32 +00:00
Ginger c85b5bb122 refactor: Split sync v3 into multiple files 2025-12-03 16:04:32 +00:00
Ginger 99aadff38e feat: Drop support for MSC3575 (legacy sliding sync) 2025-12-03 16:04:32 +00:00
Ginger d2d996d306 chore: Clippy fixes 2025-12-03 16:04:32 +00:00
Ginger 26fa73841b fix(sync/v3): Cleanup part 1: mostly fix redundant data in state 2025-12-03 16:04:32 +00:00
65 changed files with 548 additions and 2212 deletions
@@ -32,13 +32,11 @@ outputs:
runs:
using: composite
steps:
- run: mkdir -p digests
shell: bash
- name: Download digests
if: ${{ env.BUILTIN_REGISTRY_ENABLED == 'true' }}
uses: forgejo/download-artifact@v4
with:
path: digests
path: /tmp/digests
pattern: ${{ inputs.digest_pattern }}
merge-multiple: true
@@ -80,7 +78,7 @@ runs:
- name: Create manifest list and push
if: ${{ env.BUILTIN_REGISTRY_ENABLED == 'true' }}
working-directory: digests
working-directory: /tmp/digests
shell: bash
env:
IMAGES: ${{ inputs.images }}
+1 -2
View File
@@ -35,7 +35,6 @@ jobs:
uses: actions/checkout@v6
with:
fetch-depth: 0
ref: ${{ github.ref_name }}
- name: Cache Cargo registry
uses: actions/cache@v4
@@ -127,7 +126,7 @@ jobs:
[ -f /etc/conduwuit/conduwuit.toml ] && echo "✅ Config file installed"
- name: Upload deb artifact
uses: forgejo/upload-artifact@v4
uses: actions/upload-artifact@v5
with:
name: continuwuity-${{ steps.debian-version.outputs.distribution }}
path: ${{ steps.cargo-deb.outputs.path }}
+2 -3
View File
@@ -33,7 +33,6 @@ jobs:
uses: actions/checkout@v6
with:
fetch-depth: 0
ref: ${{ github.ref_name }}
- name: Cache DNF packages
@@ -239,13 +238,13 @@ jobs:
cp $BIN_RPM upload-bin/
- name: Upload binary RPM
uses: forgejo/upload-artifact@v4
uses: actions/upload-artifact@v5
with:
name: continuwuity
path: upload-bin/
- name: Upload debug RPM artifact
uses: forgejo/upload-artifact@v4
uses: actions/upload-artifact@v5
with:
name: continuwuity-debug
path: artifacts/*debuginfo*.rpm
+1 -3
View File
@@ -2,7 +2,6 @@ AlexPewMaster <git@alex.unbox.at> <68469103+AlexPewMaster@users.noreply.github.c
Daniel Wiesenberg <weasy@hotmail.de> <weasy666@gmail.com>
Devin Ragotzy <devin.ragotzy@gmail.com> <d6ragotzy@wmich.edu>
Devin Ragotzy <devin.ragotzy@gmail.com> <dragotzy7460@mail.kvcc.edu>
Ginger <ginger@gingershaped.computer> <75683114+gingershaped@users.noreply.github.com>
Jonas Platte <jplatte+git@posteo.de> <jplatte+gitlab@posteo.de>
Jonas Zohren <git-pbkyr@jzohren.de> <gitlab-jfowl-0ux98@sh14.de>
Jonathan de Jong <jonathan@automatia.nl> <jonathandejong02@gmail.com>
@@ -13,6 +12,5 @@ Olivia Lee <olivia@computer.surgery> <benjamin@computer.surgery>
Rudi Floren <rudi.floren@gmail.com> <rudi.floren@googlemail.com>
Tamara Schmitz <tamara.zoe.schmitz@posteo.de> <15906939+tamara-schmitz@users.noreply.github.com>
Timo Kösters <timo@koesters.xyz>
nexy7574 <git@nexy7574.co.uk> <nex@noreply.forgejo.ellis.link>
nexy7574 <git@nexy7574.co.uk> <nex@noreply.localhost>
x4u <xi.zhu@protonmail.ch> <14617923-x4u@users.noreply.gitlab.com>
Ginger <ginger@gingershaped.computer> <75683114+gingershaped@users.noreply.github.com>
Generated
+23 -23
View File
@@ -940,7 +940,7 @@ dependencies = [
[[package]]
name = "conduwuit"
version = "0.5.0"
version = "0.5.0-rc.8.1"
dependencies = [
"clap",
"conduwuit_admin",
@@ -972,7 +972,7 @@ dependencies = [
[[package]]
name = "conduwuit_admin"
version = "0.5.0"
version = "0.5.0-rc.8.1"
dependencies = [
"clap",
"conduwuit_api",
@@ -994,7 +994,7 @@ dependencies = [
[[package]]
name = "conduwuit_api"
version = "0.5.0"
version = "0.5.0-rc.8.1"
dependencies = [
"async-trait",
"axum 0.7.9",
@@ -1027,14 +1027,14 @@ dependencies = [
[[package]]
name = "conduwuit_build_metadata"
version = "0.5.0"
version = "0.5.0-rc.8.1"
dependencies = [
"built",
]
[[package]]
name = "conduwuit_core"
version = "0.5.0"
version = "0.5.0-rc.8.1"
dependencies = [
"argon2",
"arrayvec",
@@ -1095,7 +1095,7 @@ dependencies = [
[[package]]
name = "conduwuit_database"
version = "0.5.0"
version = "0.5.0-rc.8.1"
dependencies = [
"async-channel",
"conduwuit_core",
@@ -1114,7 +1114,7 @@ dependencies = [
[[package]]
name = "conduwuit_macros"
version = "0.5.0"
version = "0.5.0-rc.8.1"
dependencies = [
"itertools 0.14.0",
"proc-macro2",
@@ -1124,7 +1124,7 @@ dependencies = [
[[package]]
name = "conduwuit_router"
version = "0.5.0"
version = "0.5.0-rc.8.1"
dependencies = [
"axum 0.7.9",
"axum-client-ip",
@@ -1159,7 +1159,7 @@ dependencies = [
[[package]]
name = "conduwuit_service"
version = "0.5.0"
version = "0.5.0-rc.8.1"
dependencies = [
"async-trait",
"base64 0.22.1",
@@ -1200,7 +1200,7 @@ dependencies = [
[[package]]
name = "conduwuit_web"
version = "0.5.0"
version = "0.5.0-rc.8.1"
dependencies = [
"askama",
"axum 0.7.9",
@@ -4063,7 +4063,7 @@ checksum = "88f8660c1ff60292143c98d08fc6e2f654d722db50410e3f3797d40baaf9d8f3"
[[package]]
name = "ruma"
version = "0.10.1"
source = "git+https://forgejo.ellis.link/continuwuation/ruwuma?rev=3a6f29fda2c3ccf07282c746dc0e2021defc50bb#3a6f29fda2c3ccf07282c746dc0e2021defc50bb"
source = "git+https://forgejo.ellis.link/continuwuation/ruwuma?rev=50b2a91b2ab8f9830eea80b9911e11234e0eac66#50b2a91b2ab8f9830eea80b9911e11234e0eac66"
dependencies = [
"assign",
"js_int",
@@ -4083,7 +4083,7 @@ dependencies = [
[[package]]
name = "ruma-appservice-api"
version = "0.10.0"
source = "git+https://forgejo.ellis.link/continuwuation/ruwuma?rev=3a6f29fda2c3ccf07282c746dc0e2021defc50bb#3a6f29fda2c3ccf07282c746dc0e2021defc50bb"
source = "git+https://forgejo.ellis.link/continuwuation/ruwuma?rev=50b2a91b2ab8f9830eea80b9911e11234e0eac66#50b2a91b2ab8f9830eea80b9911e11234e0eac66"
dependencies = [
"js_int",
"ruma-common",
@@ -4095,7 +4095,7 @@ dependencies = [
[[package]]
name = "ruma-client-api"
version = "0.18.0"
source = "git+https://forgejo.ellis.link/continuwuation/ruwuma?rev=3a6f29fda2c3ccf07282c746dc0e2021defc50bb#3a6f29fda2c3ccf07282c746dc0e2021defc50bb"
source = "git+https://forgejo.ellis.link/continuwuation/ruwuma?rev=50b2a91b2ab8f9830eea80b9911e11234e0eac66#50b2a91b2ab8f9830eea80b9911e11234e0eac66"
dependencies = [
"as_variant",
"assign",
@@ -4118,7 +4118,7 @@ dependencies = [
[[package]]
name = "ruma-common"
version = "0.13.0"
source = "git+https://forgejo.ellis.link/continuwuation/ruwuma?rev=3a6f29fda2c3ccf07282c746dc0e2021defc50bb#3a6f29fda2c3ccf07282c746dc0e2021defc50bb"
source = "git+https://forgejo.ellis.link/continuwuation/ruwuma?rev=50b2a91b2ab8f9830eea80b9911e11234e0eac66#50b2a91b2ab8f9830eea80b9911e11234e0eac66"
dependencies = [
"as_variant",
"base64 0.22.1",
@@ -4150,7 +4150,7 @@ dependencies = [
[[package]]
name = "ruma-events"
version = "0.28.1"
source = "git+https://forgejo.ellis.link/continuwuation/ruwuma?rev=3a6f29fda2c3ccf07282c746dc0e2021defc50bb#3a6f29fda2c3ccf07282c746dc0e2021defc50bb"
source = "git+https://forgejo.ellis.link/continuwuation/ruwuma?rev=50b2a91b2ab8f9830eea80b9911e11234e0eac66#50b2a91b2ab8f9830eea80b9911e11234e0eac66"
dependencies = [
"as_variant",
"indexmap",
@@ -4175,7 +4175,7 @@ dependencies = [
[[package]]
name = "ruma-federation-api"
version = "0.9.0"
source = "git+https://forgejo.ellis.link/continuwuation/ruwuma?rev=3a6f29fda2c3ccf07282c746dc0e2021defc50bb#3a6f29fda2c3ccf07282c746dc0e2021defc50bb"
source = "git+https://forgejo.ellis.link/continuwuation/ruwuma?rev=50b2a91b2ab8f9830eea80b9911e11234e0eac66#50b2a91b2ab8f9830eea80b9911e11234e0eac66"
dependencies = [
"bytes",
"headers",
@@ -4197,7 +4197,7 @@ dependencies = [
[[package]]
name = "ruma-identifiers-validation"
version = "0.9.5"
source = "git+https://forgejo.ellis.link/continuwuation/ruwuma?rev=3a6f29fda2c3ccf07282c746dc0e2021defc50bb#3a6f29fda2c3ccf07282c746dc0e2021defc50bb"
source = "git+https://forgejo.ellis.link/continuwuation/ruwuma?rev=50b2a91b2ab8f9830eea80b9911e11234e0eac66#50b2a91b2ab8f9830eea80b9911e11234e0eac66"
dependencies = [
"js_int",
"thiserror 2.0.17",
@@ -4206,7 +4206,7 @@ dependencies = [
[[package]]
name = "ruma-identity-service-api"
version = "0.9.0"
source = "git+https://forgejo.ellis.link/continuwuation/ruwuma?rev=3a6f29fda2c3ccf07282c746dc0e2021defc50bb#3a6f29fda2c3ccf07282c746dc0e2021defc50bb"
source = "git+https://forgejo.ellis.link/continuwuation/ruwuma?rev=50b2a91b2ab8f9830eea80b9911e11234e0eac66#50b2a91b2ab8f9830eea80b9911e11234e0eac66"
dependencies = [
"js_int",
"ruma-common",
@@ -4216,7 +4216,7 @@ dependencies = [
[[package]]
name = "ruma-macros"
version = "0.13.0"
source = "git+https://forgejo.ellis.link/continuwuation/ruwuma?rev=3a6f29fda2c3ccf07282c746dc0e2021defc50bb#3a6f29fda2c3ccf07282c746dc0e2021defc50bb"
source = "git+https://forgejo.ellis.link/continuwuation/ruwuma?rev=50b2a91b2ab8f9830eea80b9911e11234e0eac66#50b2a91b2ab8f9830eea80b9911e11234e0eac66"
dependencies = [
"cfg-if",
"proc-macro-crate",
@@ -4231,7 +4231,7 @@ dependencies = [
[[package]]
name = "ruma-push-gateway-api"
version = "0.9.0"
source = "git+https://forgejo.ellis.link/continuwuation/ruwuma?rev=3a6f29fda2c3ccf07282c746dc0e2021defc50bb#3a6f29fda2c3ccf07282c746dc0e2021defc50bb"
source = "git+https://forgejo.ellis.link/continuwuation/ruwuma?rev=50b2a91b2ab8f9830eea80b9911e11234e0eac66#50b2a91b2ab8f9830eea80b9911e11234e0eac66"
dependencies = [
"js_int",
"ruma-common",
@@ -4243,7 +4243,7 @@ dependencies = [
[[package]]
name = "ruma-signatures"
version = "0.15.0"
source = "git+https://forgejo.ellis.link/continuwuation/ruwuma?rev=3a6f29fda2c3ccf07282c746dc0e2021defc50bb#3a6f29fda2c3ccf07282c746dc0e2021defc50bb"
source = "git+https://forgejo.ellis.link/continuwuation/ruwuma?rev=50b2a91b2ab8f9830eea80b9911e11234e0eac66#50b2a91b2ab8f9830eea80b9911e11234e0eac66"
dependencies = [
"base64 0.22.1",
"ed25519-dalek",
@@ -6204,7 +6204,7 @@ dependencies = [
[[package]]
name = "xtask"
version = "0.5.0"
version = "0.5.0-rc.8.1"
dependencies = [
"clap",
"serde",
@@ -6213,7 +6213,7 @@ dependencies = [
[[package]]
name = "xtask-generate-commands"
version = "0.5.0"
version = "0.5.0-rc.8.1"
dependencies = [
"clap-markdown",
"clap_builder",
+63 -63
View File
@@ -21,7 +21,7 @@ license = "Apache-2.0"
readme = "README.md"
repository = "https://forgejo.ellis.link/continuwuation/continuwuity"
rust-version = "1.86.0"
version = "0.5.0"
version = "0.5.0-rc.8.1"
[workspace.metadata.crane]
name = "conduwuit"
@@ -33,11 +33,11 @@ features = ["serde"]
[workspace.dependencies.smallvec]
version = "1.14.0"
features = [
"const_generics",
"const_new",
"serde",
"union",
"write",
"const_generics",
"const_new",
"serde",
"union",
"write",
]
[workspace.dependencies.smallstr]
@@ -96,13 +96,13 @@ version = "1.11.1"
version = "0.7.9"
default-features = false
features = [
"form",
"http1",
"http2",
"json",
"matched-path",
"tokio",
"tracing",
"form",
"http1",
"http2",
"json",
"matched-path",
"tokio",
"tracing",
]
[workspace.dependencies.axum-extra]
@@ -149,10 +149,10 @@ features = ["aws_lc_rs"]
version = "0.12.15"
default-features = false
features = [
"rustls-tls-native-roots",
"socks",
"hickory-dns",
"http2",
"rustls-tls-native-roots",
"socks",
"hickory-dns",
"http2",
]
[workspace.dependencies.serde]
@@ -188,18 +188,18 @@ default-features = false
version = "0.25.5"
default-features = false
features = [
"jpeg",
"png",
"gif",
"webp",
"jpeg",
"png",
"gif",
"webp",
]
[workspace.dependencies.blurhash]
version = "0.2.3"
default-features = false
features = [
"fast-linear-to-srgb",
"image",
"fast-linear-to-srgb",
"image",
]
# logging
@@ -229,13 +229,13 @@ default-features = false
version = "4.5.35"
default-features = false
features = [
"derive",
"env",
"error-context",
"help",
"std",
"string",
"usage",
"derive",
"env",
"error-context",
"help",
"std",
"string",
"usage",
]
[workspace.dependencies.futures]
@@ -247,15 +247,15 @@ features = ["std", "async-await"]
version = "1.44.2"
default-features = false
features = [
"fs",
"net",
"macros",
"sync",
"signal",
"time",
"rt-multi-thread",
"io-util",
"tracing",
"fs",
"net",
"macros",
"sync",
"signal",
"time",
"rt-multi-thread",
"io-util",
"tracing",
]
[workspace.dependencies.tokio-metrics]
@@ -280,18 +280,18 @@ default-features = false
version = "1.6.0"
default-features = false
features = [
"server",
"http1",
"http2",
"server",
"http1",
"http2",
]
[workspace.dependencies.hyper-util]
version = "=0.1.17"
default-features = false
features = [
"server-auto",
"server-graceful",
"tokio",
"server-auto",
"server-graceful",
"tokio",
]
# to support multiple variations of setting a config option
@@ -310,9 +310,9 @@ features = ["env", "toml"]
version = "0.25.1"
default-features = false
features = [
"serde",
"system-config",
"tokio",
"serde",
"system-config",
"tokio",
]
# Used for conduwuit::Error type
@@ -351,7 +351,7 @@ version = "0.1.2"
# Used for matrix spec type definitions and helpers
[workspace.dependencies.ruma]
git = "https://forgejo.ellis.link/continuwuation/ruwuma"
rev = "3a6f29fda2c3ccf07282c746dc0e2021defc50bb"
rev = "50b2a91b2ab8f9830eea80b9911e11234e0eac66"
features = [
"compat",
"rand",
@@ -381,13 +381,13 @@ features = [
"unstable-msc4095",
"unstable-msc4121",
"unstable-msc4125",
"unstable-msc4155",
"unstable-msc4155",
"unstable-msc4186",
"unstable-msc4203", # sending to-device events to appservices
"unstable-msc4210", # remove legacy mentions
"unstable-extensible-events",
"unstable-pdu",
"unstable-msc4155"
"unstable-msc4155"
]
[workspace.dependencies.rust-rocksdb]
@@ -395,11 +395,11 @@ git = "https://forgejo.ellis.link/continuwuation/rust-rocksdb-zaidoon1"
rev = "61d9d23872197e9ace4a477f2617d5c9f50ecb23"
default-features = false
features = [
"multi-threaded-cf",
"mt_static",
"lz4",
"zstd",
"bzip2",
"multi-threaded-cf",
"mt_static",
"lz4",
"zstd",
"bzip2",
]
[workspace.dependencies.sha2]
@@ -458,16 +458,16 @@ git = "https://forgejo.ellis.link/continuwuation/jemallocator"
rev = "82af58d6a13ddd5dcdc7d4e91eae3b63292995b8"
default-features = false
features = [
"background_threads_runtime_support",
"unprefixed_malloc_on_supported_platforms",
"background_threads_runtime_support",
"unprefixed_malloc_on_supported_platforms",
]
[workspace.dependencies.tikv-jemallocator]
git = "https://forgejo.ellis.link/continuwuation/jemallocator"
rev = "82af58d6a13ddd5dcdc7d4e91eae3b63292995b8"
default-features = false
features = [
"background_threads_runtime_support",
"unprefixed_malloc_on_supported_platforms",
"background_threads_runtime_support",
"unprefixed_malloc_on_supported_platforms",
]
[workspace.dependencies.tikv-jemalloc-ctl]
git = "https://forgejo.ellis.link/continuwuation/jemallocator"
@@ -491,9 +491,9 @@ default-features = false
version = "0.1.2"
default-features = false
features = [
"static",
"gcc",
"light",
"static",
"gcc",
"light",
]
[workspace.dependencies.rustyline-async]
+9 -13
View File
@@ -586,13 +586,10 @@
#allow_unstable_room_versions = true
# Default room version continuwuity will create rooms with.
# Note that this has to be a string since the room version is a string
# rather than an integer. Forgetting the quotes will make the server fail
# to start!
#
# Per spec, room version "11" is the default.
# Per spec, room version 11 is the default.
#
#default_room_version = "11"
#default_room_version = 11
# Enable OpenTelemetry OTLP tracing export. This replaces the deprecated
# Jaeger exporter. Traces will be sent via OTLP to a collector (such as
@@ -1515,11 +1512,16 @@
#
#block_non_admin_invites = false
# This item is undocumented. Please contribute documentation for it.
# Enable or disable making requests to MSC4284 Policy Servers.
# It is recommended you keep this enabled unless you experience frequent
# connectivity issues, such as in a restricted networking environment.
#
#enable_msc4284_policy_servers = true
# This item is undocumented. Please contribute documentation for it.
# Enable running locally generated events through configured MSC4284
# policy servers. You may wish to disable this if your server is
# single-user for a slight speed benefit in some rooms, but otherwise
# should leave it enabled.
#
#policy_server_check_own_events = true
@@ -1734,12 +1736,6 @@
#
#ldap = false
# Configuration for protocol experiments that enable experimental
# features. Each one is associated with a matrix spec proposal, a list of
# which are published at https://spec.matrix.org/proposals/
#
#experiments = false
[global.tls]
# Path to a valid TLS certificate file.
+1 -1
View File
@@ -134,7 +134,7 @@ You can also [view the file on Foregejo](https://forgejo.ellis.link/continuwuati
## Creating the Continuwuity configuration file
Now you need to create the Continuwuity configuration file in
`/etc/conduwuit/conduwuit.toml`. You can find an example configuration at
`/etc/continuwuity/continuwuity.toml`. You can find an example configuration at
[conduwuit-example.toml](../reference/config.mdx).
**Please take a moment to read the config. You need to change at least the
@@ -6,10 +6,12 @@
"message": "Welcome to Continuwuity! Important announcements about the project will appear here."
},
{
"id": 6,
"mention_room": true,
"date": "2025-12-22",
"message": "Continuwuity v0.5.0 has been released. **The release contains a fix for the critical vulnerability [GHSA-22fw-4jq7-g8r8](https://github.com/continuwuity/continuwuity/security/advisories/GHSA-22fw-4jq7-g8r8). Update as soon as possible.**\n\nThis has been *actively exploited* to create fake leave events in the Continuwuity rooms. Please leave and rejoin the rooms to fix any issues this may have caused. \n\n - [Continuwuity (space)](https://matrix.to/#/!PxtzompFuodlyzdCDtV5lzjXs10XIHeOOaq_FYodHyk?via=ellis.link&via=gingershaped.computer&via=continuwuity.org)\n - [Continuwuity](https://matrix.to/#/!kn3VQSLcgWGUFm0FFRid4MinJ_aeZPjHQ0irXbHa3bU?via=ellis.link&via=gingershaped.computer&via=continuwuity.org)\n - [Continuwuity Announcements](https://matrix.to/#/!d7zDZg1Vu5nhkCi50jNfOIObD5fpfGhfl48SZWZek7k?via=ellis.link)\n - [Continuwuity Offtopic](https://matrix.to/#/!QlOomq-suHC9rJHfDFVdbcGg4HS2ojSQ0bo4W2JOGMM?via=ellis.link&via=gingershaped.computer&via=continuwuity.org)\n - [Continuwuity Development](https://matrix.to/#/!aAvealFbgiKTJGzumNbjuwDgt1tOkBKwiyfYqE3ouk0?via=ellis.link&via=explodie.org&via=continuwuity.org)\n"
"id": 3,
"message": "_taps microphone_ The Continuwuity 0.5.0-rc.7 release is now available, and it's better than ever! **177 commits**, **35 pull requests**, **11 contributors,** and a lot of new stuff!\n\nFor highlights, we've got:\n\n* 🕵️ Full Policy Server support to fight spam!\n* 🚀 Smarter room & space upgrades.\n* 🚫 User suspension tools for better moderation.\n* 🤖 reCaptcha support for safer open registration.\n* 🔍 Ability to disable read receipts & typing indicators.\n* ⚡ Sweeping performance improvements!\n\nGet the [full changelog and downloads on our Forgejo](https://forgejo.ellis.link/continuwuation/continuwuity/releases/tag/v0.5.0-rc.7) - and make sure you're in the [Announcements room](https://matrix.to/#/!releases:continuwuity.org/$hN9z6L2_dTAlPxFLAoXVfo_g8DyYXu4cpvWsSrWhmB0) to get stuff like this sooner."
},
{
"id": 5,
"message": "It's a bird! It's a plane! No, it's 0.5.0-rc.8.1!\n\nThis is a minor bugfix update to the rc8 which backports some important fixes from the latest main branch. If you still haven't updated to rc8, you should skip to main. Otherwise, you should upgrade to this bugfix release as soon as possible.\n\nBugfixes backported to this version:\n\n- Resolved several issues with state resolution v2.1 (room version 12)\n- Fixed issues with the `restricted` and `knock_restricted` join rules that would sometimes incorrectly disallow a valid join\n- Fixed the automatic support contact listing being a no-op\n- Fixed upgrading pre-v12 rooms to v12 rooms\n- Fixed policy servers sending the incorrect JSON objects (resulted in false positives)\n- Fixed debug build panic during MSC4133 migration\n\nIt is recommended, if you can and are comfortable with doing so, following updates to the main branch - we're in the run up to the full 0.5.0 release, and more and more bugfixes and new features are being pushed constantly. Please don't forget to join [#announcements:continuwuity.org](https://matrix.to/#/#announcements:continuwuity.org) to receive this news faster and be alerted to other important updates!"
}
]
}
+1 -1
View File
@@ -1 +1 @@
{"m.homeserver":{"base_url": "https://matrix.continuwuity.org"},"org.matrix.msc3575.proxy":{"url": "https://matrix.continuwuity.org"},"org.matrix.msc4143.rtc_foci":[{"type":"livekit","livekit_service_url":"https://livekit.ellis.link"}]}
{"m.homeserver":{"base_url": "https://matrix.continuwuity.org"},"org.matrix.msc3575.proxy":{"url": "https://matrix.continuwuity.org"}}
Generated
+27 -27
View File
@@ -3,11 +3,11 @@
"advisory-db": {
"flake": false,
"locked": {
"lastModified": 1766324728,
"narHash": "sha256-9C+WyE5U3y5w4WQXxmb0ylRyMMsPyzxielWXSHrcDpE=",
"lastModified": 1761112158,
"narHash": "sha256-RIXu/7eyKpQHjsPuAUODO81I4ni8f+WYSb7K4mTG6+0=",
"owner": "rustsec",
"repo": "advisory-db",
"rev": "c88b88c62bda077be8aa621d4e89d8701e39cb5d",
"rev": "58f3aaec0e1776f4a900737be8cd7cb00972210d",
"type": "github"
},
"original": {
@@ -18,11 +18,11 @@
},
"crane": {
"locked": {
"lastModified": 1766194365,
"narHash": "sha256-4AFsUZ0kl6MXSm4BaQgItD0VGlEKR3iq7gIaL7TjBvc=",
"lastModified": 1760924934,
"narHash": "sha256-tuuqY5aU7cUkR71sO2TraVKK2boYrdW3gCSXUkF4i44=",
"owner": "ipetkov",
"repo": "crane",
"rev": "7d8ec2c71771937ab99790b45e6d9b93d15d9379",
"rev": "c6b4d5308293d0d04fcfeee92705017537cad02f",
"type": "github"
},
"original": {
@@ -39,11 +39,11 @@
"rust-analyzer-src": "rust-analyzer-src"
},
"locked": {
"lastModified": 1766299592,
"narHash": "sha256-7u+q5hexu2eAxL2VjhskHvaUKg+GexmelIR2ve9Nbb4=",
"lastModified": 1761115517,
"narHash": "sha256-Fev/ag/c3Fp3JBwHfup3lpA5FlNXfkoshnQ7dssBgJ0=",
"owner": "nix-community",
"repo": "fenix",
"rev": "381579dee168d5ced412e2990e9637ecc7cf1c5d",
"rev": "320433651636186ea32b387cff05d6bbfa30cea7",
"type": "github"
},
"original": {
@@ -55,11 +55,11 @@
"flake-compat": {
"flake": false,
"locked": {
"lastModified": 1765121682,
"narHash": "sha256-4VBOP18BFeiPkyhy9o4ssBNQEvfvv1kXkasAYd0+rrA=",
"lastModified": 1747046372,
"narHash": "sha256-CIVLLkVgvHYbgI2UpXvIIBJ12HWgX+fjA8Xf8PUmqCY=",
"owner": "edolstra",
"repo": "flake-compat",
"rev": "65f23138d8d09a92e30f1e5c87611b23ef451bf3",
"rev": "9100a0f413b0c601e0533d1d94ffd501ce2e7885",
"type": "github"
},
"original": {
@@ -74,11 +74,11 @@
"nixpkgs-lib": "nixpkgs-lib"
},
"locked": {
"lastModified": 1765835352,
"narHash": "sha256-XswHlK/Qtjasvhd1nOa1e8MgZ8GS//jBoTqWtrS1Giw=",
"lastModified": 1760948891,
"narHash": "sha256-TmWcdiUUaWk8J4lpjzu4gCGxWY6/Ok7mOK4fIFfBuU4=",
"owner": "hercules-ci",
"repo": "flake-parts",
"rev": "a34fae9c08a15ad73f295041fec82323541400a9",
"rev": "864599284fc7c0ba6357ed89ed5e2cd5040f0c04",
"type": "github"
},
"original": {
@@ -89,11 +89,11 @@
},
"nixpkgs": {
"locked": {
"lastModified": 1766070988,
"narHash": "sha256-G/WVghka6c4bAzMhTwT2vjLccg/awmHkdKSd2JrycLc=",
"lastModified": 1760878510,
"narHash": "sha256-K5Osef2qexezUfs0alLvZ7nQFTGS9DL2oTVsIXsqLgs=",
"owner": "NixOS",
"repo": "nixpkgs",
"rev": "c6245e83d836d0433170a16eb185cefe0572f8b8",
"rev": "5e2a59a5b1a82f89f2c7e598302a9cacebb72a67",
"type": "github"
},
"original": {
@@ -105,11 +105,11 @@
},
"nixpkgs-lib": {
"locked": {
"lastModified": 1765674936,
"narHash": "sha256-k00uTP4JNfmejrCLJOwdObYC9jHRrr/5M/a/8L2EIdo=",
"lastModified": 1754788789,
"narHash": "sha256-x2rJ+Ovzq0sCMpgfgGaaqgBSwY+LST+WbZ6TytnT9Rk=",
"owner": "nix-community",
"repo": "nixpkgs.lib",
"rev": "2075416fcb47225d9b68ac469a5c4801a9c4dd85",
"rev": "a73b9c743612e4244d865a2fdee11865283c04e6",
"type": "github"
},
"original": {
@@ -132,11 +132,11 @@
"rust-analyzer-src": {
"flake": false,
"locked": {
"lastModified": 1766253897,
"narHash": "sha256-ChK07B1aOlJ4QzWXpJo+y8IGAxp1V9yQ2YloJ+RgHRw=",
"lastModified": 1761077270,
"narHash": "sha256-O1uTuvI/rUlubJ8AXKyzh1WSWV3qCZX0huTFUvWLN4E=",
"owner": "rust-lang",
"repo": "rust-analyzer",
"rev": "765b7bdb432b3740f2d564afccfae831d5a972e4",
"rev": "39990a923c8bca38f5bd29dc4c96e20ee7808d5d",
"type": "github"
},
"original": {
@@ -153,11 +153,11 @@
]
},
"locked": {
"lastModified": 1766000401,
"narHash": "sha256-+cqN4PJz9y0JQXfAK5J1drd0U05D5fcAGhzhfVrDlsI=",
"lastModified": 1760945191,
"narHash": "sha256-ZRVs8UqikBa4Ki3X4KCnMBtBW0ux1DaT35tgsnB1jM4=",
"owner": "numtide",
"repo": "treefmt-nix",
"rev": "42d96e75aa56a3f70cab7e7dc4a32868db28e8fd",
"rev": "f56b1934f5f8fcab8deb5d38d42fd692632b47c2",
"type": "github"
},
"original": {
+92 -118
View File
@@ -6,69 +6,6 @@
pkgs,
...
}:
let
baseTestScript =
pkgs.writers.writePython3Bin "do_test" { libraries = [ pkgs.python3Packages.matrix-nio ]; }
''
import asyncio
import nio
async def main() -> None:
# Connect to continuwuity
client = nio.AsyncClient("http://continuwuity:6167", "alice")
# Register as user alice
response = await client.register("alice", "my-secret-password")
# Log in as user alice
response = await client.login("my-secret-password")
# Create a new room
response = await client.room_create(federate=False)
print("Matrix room create response:", response)
assert isinstance(response, nio.RoomCreateResponse)
room_id = response.room_id
# Join the room
response = await client.join(room_id)
print("Matrix join response:", response)
assert isinstance(response, nio.JoinResponse)
# Send a message to the room
response = await client.room_send(
room_id=room_id,
message_type="m.room.message",
content={
"msgtype": "m.text",
"body": "Hello continuwuity!"
}
)
print("Matrix room send response:", response)
assert isinstance(response, nio.RoomSendResponse)
# Sync responses
response = await client.sync(timeout=30000)
print("Matrix sync response:", response)
assert isinstance(response, nio.SyncResponse)
# Check the message was received by continuwuity
last_message = response.rooms.join[room_id].timeline.events[-1].body
assert last_message == "Hello continuwuity!"
# Leave the room
response = await client.room_leave(room_id)
print("Matrix room leave response:", response)
assert isinstance(response, nio.RoomLeaveResponse)
# Close the client
await client.close()
if __name__ == "__main__":
asyncio.run(main())
'';
in
{
# run some nixos tests as checks
checks = lib.pipe self'.packages [
@@ -81,69 +18,106 @@
# this test was initially yoinked from
#
# https://github.com/NixOS/nixpkgs/blob/960ce26339661b1b69c6f12b9063ca51b688615f/nixos/tests/matrix/continuwuity.nix
(builtins.concatMap (
name:
builtins.map
(
{ config, suffix }:
{
name = "test-${name}-${suffix}";
value = pkgs.testers.runNixOSTest {
inherit name;
(builtins.map (name: {
name = "test-${name}";
value = pkgs.testers.runNixOSTest {
inherit name;
nodes = {
continuwuity = {
services.matrix-continuwuity = {
enable = true;
package = self'.packages.${name};
settings = config;
extraEnvironment.RUST_BACKTRACE = "yes";
};
networking.firewall.allowedTCPPorts = [ 6167 ];
};
client.environment.systemPackages = [ baseTestScript ];
};
testScript = ''
start_all()
with subtest("start continuwuity"):
continuwuity.wait_for_unit("continuwuity.service")
continuwuity.wait_for_open_port(6167)
with subtest("ensure messages can be exchanged"):
client.succeed("${lib.getExe baseTestScript} >&2")
'';
};
}
)
[
{
suffix = "base";
config = {
global = {
nodes = {
continuwuity = {
services.matrix-continuwuity = {
enable = true;
package = self'.packages.${name};
settings.global = {
server_name = name;
address = [ "0.0.0.0" ];
allow_registration = true;
yes_i_am_very_very_sure_i_want_an_open_registration_server_prone_to_abuse = true;
};
extraEnvironment.RUST_BACKTRACE = "yes";
};
}
{
suffix = "with-room-version";
config = {
global = {
server_name = name;
address = [ "0.0.0.0" ];
allow_registration = true;
yes_i_am_very_very_sure_i_want_an_open_registration_server_prone_to_abuse = true;
default_room_version = "12";
};
networking.firewall.allowedTCPPorts = [ 6167 ];
};
client =
{ pkgs, ... }:
{
environment.systemPackages = [
(pkgs.writers.writePython3Bin "do_test" { libraries = [ pkgs.python3Packages.matrix-nio ]; } ''
import asyncio
import nio
async def main() -> None:
# Connect to continuwuity
client = nio.AsyncClient("http://continuwuity:6167", "alice")
# Register as user alice
response = await client.register("alice", "my-secret-password")
# Log in as user alice
response = await client.login("my-secret-password")
# Create a new room
response = await client.room_create(federate=False)
print("Matrix room create response:", response)
assert isinstance(response, nio.RoomCreateResponse)
room_id = response.room_id
# Join the room
response = await client.join(room_id)
print("Matrix join response:", response)
assert isinstance(response, nio.JoinResponse)
# Send a message to the room
response = await client.room_send(
room_id=room_id,
message_type="m.room.message",
content={
"msgtype": "m.text",
"body": "Hello continuwuity!"
}
)
print("Matrix room send response:", response)
assert isinstance(response, nio.RoomSendResponse)
# Sync responses
response = await client.sync(timeout=30000)
print("Matrix sync response:", response)
assert isinstance(response, nio.SyncResponse)
# Check the message was received by continuwuity
last_message = response.rooms.join[room_id].timeline.events[-1].body
assert last_message == "Hello continuwuity!"
# Leave the room
response = await client.room_leave(room_id)
print("Matrix room leave response:", response)
assert isinstance(response, nio.RoomLeaveResponse)
# Close the client
await client.close()
if __name__ == "__main__":
asyncio.run(main())
'')
];
};
}
]
))
};
testScript = ''
start_all()
with subtest("start continuwuity"):
continuwuity.wait_for_unit("continuwuity.service")
continuwuity.wait_for_open_port(6167)
with subtest("ensure messages can be exchanged"):
client.succeed("do_test >&2")
'';
};
}))
builtins.listToAttrs
];
};
+2 -2
View File
@@ -31,7 +31,7 @@ pub(super) async fn last(&self, room_id: OwnedRoomOrAliasId) -> Result {
.services
.rooms
.timeline
.last_timeline_count(&room_id)
.last_timeline_count(None, &room_id)
.await?;
self.write_str(&format!("{result:#?}")).await
@@ -52,7 +52,7 @@ pub(super) async fn pdus(
.services
.rooms
.timeline
.pdus_rev(&room_id, from)
.pdus_rev(None, &room_id, from)
.try_take(limit.unwrap_or(3))
.try_collect()
.await?;
+3 -24
View File
@@ -30,31 +30,10 @@ pub(super) async fn show_config(&self) -> Result {
#[admin_command]
pub(super) async fn reload_config(&self, path: Option<PathBuf>) -> Result {
// The path argument is only what's optionally passed via the admin command,
// so we need to merge it with the existing paths if any were given at startup.
let mut paths = Vec::new();
let path = path.as_deref().into_iter();
self.services.config.reload(path)?;
// Add previously saved paths to the argument list
self.services
.config
.config_paths
.clone()
.unwrap_or_default()
.iter()
.for_each(|p| paths.push(p.to_owned()));
// If a path is given, and it's not already in the list,
// add it last, so that it overrides earlier files
if let Some(p) = path {
if !paths.contains(&p) {
paths.push(p);
}
}
self.services.config.reload(&paths)?;
self.write_str(&format!("Successfully reconfigured from paths: {paths:?}"))
.await
self.write_str("Successfully reconfigured.").await
}
#[admin_command]
+4 -30
View File
@@ -59,7 +59,7 @@ pub(crate) async fn get_context_route(
.rooms
.timeline
.get_pdu(event_id)
.map_err(|_| err!(Request(NotFound("Event not found."))));
.map_err(|_| err!(Request(NotFound("Base event not found."))));
let visible = services
.rooms
@@ -70,7 +70,7 @@ pub(crate) async fn get_context_route(
let (base_id, base_pdu, visible) = try_join3(base_id, base_pdu, visible).await?;
if base_pdu.room_id_or_hash() != *room_id || base_pdu.event_id != *event_id {
return Err!(Request(NotFound("Event not found.")));
return Err!(Request(NotFound("Base event not found.")));
}
if !visible {
@@ -82,25 +82,11 @@ pub(crate) async fn get_context_route(
let base_event = ignored_filter(&services, (base_count, base_pdu), sender_user);
// PDUs are used to get seen user IDs and then returned in response.
let events_before = services
.rooms
.timeline
.pdus_rev(room_id, Some(base_count))
.pdus_rev(Some(sender_user), room_id, Some(base_count))
.ignore_err()
.then(async |mut pdu| {
pdu.1.set_unsigned(Some(sender_user));
if let Err(e) = services
.rooms
.pdu_metadata
.add_bundled_aggregations_to_pdu(sender_user, &mut pdu.1)
.await
{
debug_warn!("Failed to add bundled aggregations: {e}");
}
pdu
})
.ready_filter_map(|item| event_filter(item, filter))
.wide_filter_map(|item| ignored_filter(&services, item, sender_user))
.wide_filter_map(|item| visibility_filter(&services, item, sender_user))
@@ -110,20 +96,8 @@ pub(crate) async fn get_context_route(
let events_after = services
.rooms
.timeline
.pdus(room_id, Some(base_count))
.pdus(Some(sender_user), room_id, Some(base_count))
.ignore_err()
.then(async |mut pdu| {
pdu.1.set_unsigned(Some(sender_user));
if let Err(e) = services
.rooms
.pdu_metadata
.add_bundled_aggregations_to_pdu(sender_user, &mut pdu.1)
.await
{
debug_warn!("Failed to add bundled aggregations: {e}");
}
pdu
})
.ready_filter_map(|item| event_filter(item, filter))
.wide_filter_map(|item| ignored_filter(&services, item, sender_user))
.wide_filter_map(|item| visibility_filter(&services, item, sender_user))
+1 -12
View File
@@ -13,7 +13,6 @@ use ruma::{
room::member::{MembershipState, RoomMemberEventContent},
},
};
use serde_json::value::RawValue;
use service::Services;
use super::banned_room_check;
@@ -147,17 +146,7 @@ pub(crate) async fn invite_helper(
)
.await?;
let invite_room_state = services
.rooms
.state
.summary(&pdu, room_id)
.await
.into_iter()
.map(|evt| RawValue::from_string(evt.json().get().to_owned()))
.collect::<std::result::Result<Vec<_>, _>>()
.map_err(|e| {
err!(Request(BadJson(warn!("Could not clone invite state event: {e}"))))
})?;
let invite_room_state = services.rooms.state.summary_stripped(&pdu, room_id).await;
drop(state_lock);
-8
View File
@@ -44,7 +44,6 @@ use service::{
rooms::{
state::RoomMutexGuard,
state_compressor::{CompressedState, HashSetCompressStateEvent},
timeline::pdu_fits,
},
};
@@ -574,13 +573,6 @@ async fn join_room_by_id_helper_remote(
return state;
},
};
if !pdu_fits(&mut value.clone()) {
warn!(
"dropping incoming PDU {event_id} in room {room_id} from room join because \
it exceeds 65535 bytes or is otherwise too large."
);
return state;
}
services.rooms.outlier.add_pdu_outlier(&event_id, &value);
if let Some(state_key) = &pdu.state_key {
let shortstatekey = services
+3 -22
View File
@@ -1,7 +1,6 @@
use axum::extract::State;
use axum_client_ip::InsecureClientIp;
use conduwuit::{
Err, Result, at, debug_warn,
Err, Result, at,
matrix::{
event::{Event, Matches},
pdu::PduCount,
@@ -71,7 +70,6 @@ const LIMIT_DEFAULT: usize = 10;
/// where the user was joined, depending on `history_visibility`)
pub(crate) async fn get_message_events_route(
State(services): State<crate::State>,
InsecureClientIp(client_ip): InsecureClientIp,
body: Ruma<get_message_events::v3::Request>,
) -> Result<get_message_events::v3::Response> {
debug_assert!(IGNORED_MESSAGE_TYPES.is_sorted(), "IGNORED_MESSAGE_TYPES is not sorted");
@@ -80,11 +78,6 @@ pub(crate) async fn get_message_events_route(
let room_id = &body.room_id;
let filter = &body.filter;
services
.users
.update_device_last_seen(sender_user, sender_device, client_ip)
.await;
if !services.rooms.metadata.exists(room_id).await {
return Err!(Request(Forbidden("Room does not exist to this server")));
}
@@ -122,14 +115,14 @@ pub(crate) async fn get_message_events_route(
| Direction::Forward => services
.rooms
.timeline
.pdus(room_id, Some(from))
.pdus(Some(sender_user), room_id, Some(from))
.ignore_err()
.boxed(),
| Direction::Backward => services
.rooms
.timeline
.pdus_rev(room_id, Some(from))
.pdus_rev(Some(sender_user), room_id, Some(from))
.ignore_err()
.boxed(),
};
@@ -140,18 +133,6 @@ pub(crate) async fn get_message_events_route(
.wide_filter_map(|item| ignored_filter(&services, item, sender_user))
.wide_filter_map(|item| visibility_filter(&services, item, sender_user))
.take(limit)
.then(async |mut pdu| {
pdu.1.set_unsigned(Some(sender_user));
if let Err(e) = services
.rooms
.pdu_metadata
.add_bundled_aggregations_to_pdu(sender_user, &mut pdu.1)
.await
{
debug_warn!("Failed to add bundled aggregations: {e}");
}
pdu
})
.collect()
.await;
-6
View File
@@ -1,7 +1,6 @@
use std::collections::BTreeMap;
use axum::extract::State;
use axum_client_ip::InsecureClientIp;
use conduwuit::{Err, PduCount, Result, err};
use ruma::{
MilliSecondsSinceUnixEpoch,
@@ -119,14 +118,9 @@ pub(crate) async fn set_read_marker_route(
/// Sets private read marker and public read receipt EDU.
pub(crate) async fn create_receipt_route(
State(services): State<crate::State>,
InsecureClientIp(client_ip): InsecureClientIp,
body: Ruma<create_receipt::v3::Request>,
) -> Result<create_receipt::v3::Response> {
let sender_user = body.sender_user();
services
.users
.update_device_last_seen(sender_user, body.sender_device.as_deref(), client_ip)
.await;
if matches!(
&body.receipt_type,
-6
View File
@@ -1,5 +1,4 @@
use axum::extract::State;
use axum_client_ip::InsecureClientIp;
use conduwuit::{Err, Result, matrix::pdu::PduBuilder};
use ruma::{
api::client::redact::redact_event, events::room::redaction::RoomRedactionEventContent,
@@ -14,14 +13,9 @@ use crate::Ruma;
/// - TODO: Handle txn id
pub(crate) async fn redact_event_route(
State(services): State<crate::State>,
InsecureClientIp(client_ip): InsecureClientIp,
body: Ruma<redact_event::v3::Request>,
) -> Result<redact_event::v3::Response> {
let sender_user = body.sender_user();
services
.users
.update_device_last_seen(sender_user, body.sender_device.as_deref(), client_ip)
.await;
let body = &body.body;
if services.users.is_suspended(sender_user).await? {
// TODO: Users can redact their own messages while suspended
+38 -26
View File
@@ -1,6 +1,6 @@
use axum::extract::State;
use conduwuit::{
Err, Result, at, debug_warn,
Result, at,
matrix::{Event, event::RelationTypeEqual, pdu::PduCount},
utils::{IterStream, ReadyExt, result::FlatOk, stream::WidebandExt},
};
@@ -109,16 +109,6 @@ async fn paginate_relations_with_filter(
recurse: bool,
dir: Direction,
) -> Result<get_relating_events::v1::Response> {
if !services
.rooms
.state_accessor
.user_can_see_event(sender_user, room_id, target)
.await
{
debug_warn!(req_evt = ?target, ?room_id, "Event relations requested by {sender_user} but is not allowed to see it, returning 404");
return Err!(Request(NotFound("Event not found.")));
}
let start: PduCount = from
.map(str::parse)
.transpose()?
@@ -139,6 +129,11 @@ async fn paginate_relations_with_filter(
// Spec (v1.10) recommends depth of at least 3
let depth: u8 = if recurse { 3 } else { 1 };
// Check if this is a thread request
let is_thread = filter_rel_type
.as_ref()
.is_some_and(|rel| *rel == RelationType::Thread);
let events: Vec<_> = services
.rooms
.pdu_metadata
@@ -157,24 +152,40 @@ async fn paginate_relations_with_filter(
})
.stream()
.ready_take_while(|(count, _)| Some(*count) != to)
.take(limit)
.wide_filter_map(|item| visibility_filter(services, sender_user, item))
.then(async |mut pdu| {
if let Err(e) = services
.rooms
.pdu_metadata
.add_bundled_aggregations_to_pdu(sender_user, &mut pdu.1)
.await
{
debug_warn!("Failed to add bundled aggregations to relation: {e}");
}
pdu
})
.take(limit)
.collect()
.await;
// For threads, check if we should include the root event
let mut root_event = None;
if is_thread && dir == Direction::Backward {
// Check if we've reached the beginning of the thread
// (fewer events than requested means we've exhausted the thread)
if events.len() < limit {
// Try to get the thread root event
if let Ok(root_pdu) = services.rooms.timeline.get_pdu(target).await {
// Check visibility
if services
.rooms
.state_accessor
.user_can_see_event(sender_user, room_id, target)
.await
{
// Store the root event to add to the response
root_event = Some(root_pdu);
}
}
}
}
// Determine if there are more events to fetch
let has_more = events.len() >= limit;
let has_more = if root_event.is_some() {
false // We've included the root, no more events
} else {
// Check if we got a full page of results (might be more)
events.len() >= limit
};
let next_batch = if has_more {
match dir {
@@ -186,10 +197,11 @@ async fn paginate_relations_with_filter(
None
};
let chunk: Vec<_> = events
// Build the response chunk with thread root if needed
let chunk: Vec<_> = root_event
.into_iter()
.map(at!(1))
.map(Event::into_format)
.chain(events.into_iter().map(at!(1)).map(Event::into_format))
.collect();
Ok(get_relating_events::v1::Response {
+2 -11
View File
@@ -1,5 +1,5 @@
use axum::extract::State;
use conduwuit::{Err, Event, Result, debug_warn, err};
use conduwuit::{Err, Event, Result, err};
use futures::{FutureExt, TryFutureExt, future::try_join};
use ruma::api::client::room::get_room_event;
@@ -33,16 +33,7 @@ pub(crate) async fn get_room_event_route(
return Err!(Request(Forbidden("You don't have permission to view this event.")));
}
if let Err(e) = services
.rooms
.pdu_metadata
.add_bundled_aggregations_to_pdu(body.sender_user(), &mut event)
.await
{
debug_warn!("Failed to add bundled aggregations to event: {e}");
}
event.set_unsigned(body.sender_user.as_deref());
event.add_age().ok();
Ok(get_room_event::v3::Response { event: event.into_format() })
}
+2 -18
View File
@@ -1,6 +1,6 @@
use axum::extract::State;
use conduwuit::{
Err, Event, Result, at, debug_warn,
Err, Event, Result, at,
utils::{BoolExt, stream::TryTools},
};
use futures::{FutureExt, TryStreamExt, future::try_join4};
@@ -40,28 +40,12 @@ pub(crate) async fn room_initial_sync_route(
.map_ok(Event::into_format)
.try_collect::<Vec<_>>();
// Events are returned in body
let limit = LIMIT_MAX;
let events = services
.rooms
.timeline
.pdus_rev(room_id, None)
.pdus_rev(None, room_id, None)
.try_take(limit)
.and_then(async |mut pdu| {
pdu.1.set_unsigned(body.sender_user.as_deref());
if let Some(sender_user) = body.sender_user.as_deref() {
if let Err(e) = services
.rooms
.pdu_metadata
.add_bundled_aggregations_to_pdu(sender_user, &mut pdu.1)
.await
{
debug_warn!("Failed to add bundled aggregations: {e}");
}
}
Ok(pdu)
})
.try_collect::<Vec<_>>();
let (membership, visibility, state, events) =
+3 -19
View File
@@ -2,7 +2,7 @@ use std::collections::BTreeMap;
use axum::extract::State;
use conduwuit::{
Err, Result, at, debug_warn, is_true,
Err, Result, at, is_true,
matrix::Event,
result::FlatOk,
utils::{IterStream, stream::ReadyExt},
@@ -50,7 +50,7 @@ pub(crate) async fn search_events_route(
Ok(Response {
search_categories: ResultCategories {
room_events: Box::pin(room_events_result)
room_events: room_events_result
.await
.unwrap_or_else(|| Ok(ResultRoomEvents::default()))?,
},
@@ -110,12 +110,7 @@ async fn category_room_events(
limit,
};
let (count, results) = services
.rooms
.search
.search_pdus(&query, sender_user)
.await
.ok()?;
let (count, results) = services.rooms.search.search_pdus(&query).await.ok()?;
results
.collect::<Vec<_>>()
@@ -149,17 +144,6 @@ async fn category_room_events(
.map(at!(2))
.flatten()
.stream()
.then(|mut pdu| async {
if let Err(e) = services
.rooms
.pdu_metadata
.add_bundled_aggregations_to_pdu(sender_user, &mut pdu)
.await
{
debug_warn!("Failed to add bundled aggregations to search result: {e}");
}
pdu
})
.map(Event::into_format)
.map(|result| SearchResult {
rank: None,
-7
View File
@@ -1,7 +1,6 @@
use std::collections::BTreeMap;
use axum::extract::State;
use axum_client_ip::InsecureClientIp;
use conduwuit::{Err, Result, err, matrix::pdu::PduBuilder, utils};
use ruma::{api::client::message::send_message_event, events::MessageLikeEventType};
use serde_json::from_str;
@@ -19,7 +18,6 @@ use crate::Ruma;
/// allowed
pub(crate) async fn send_message_event_route(
State(services): State<crate::State>,
InsecureClientIp(client_ip): InsecureClientIp,
body: Ruma<send_message_event::v3::Request>,
) -> Result<send_message_event::v3::Response> {
let sender_user = body.sender_user();
@@ -29,11 +27,6 @@ pub(crate) async fn send_message_event_route(
return Err!(Request(UserSuspended("You cannot perform this action while suspended.")));
}
services
.users
.update_device_last_seen(sender_user, body.sender_device.as_deref(), client_ip)
.await;
// Forbid m.room.encrypted if encryption is disabled
if MessageLikeEventType::RoomEncrypted == body.event_type && !services.config.allow_encryption
{
+4 -11
View File
@@ -1,5 +1,4 @@
use axum::extract::State;
use axum_client_ip::InsecureClientIp;
use conduwuit::{
Err, Result, err,
matrix::{Event, pdu::PduBuilder},
@@ -8,7 +7,7 @@ use conduwuit::{
use conduwuit_service::Services;
use futures::{FutureExt, TryStreamExt};
use ruma::{
MilliSecondsSinceUnixEpoch, OwnedEventId, RoomId, UserId,
OwnedEventId, RoomId, UserId,
api::client::state::{get_state_events, get_state_events_for_key, send_state_event},
events::{
AnyStateEventContent, StateEventType,
@@ -31,14 +30,9 @@ use crate::{Ruma, RumaResponse};
/// Sends a state event into the room.
pub(crate) async fn send_state_event_for_key_route(
State(services): State<crate::State>,
InsecureClientIp(ip): InsecureClientIp,
body: Ruma<send_state_event::v3::Request>,
) -> Result<send_state_event::v3::Response> {
let sender_user = body.sender_user();
services
.users
.update_device_last_seen(sender_user, body.sender_device.as_deref(), ip)
.await;
if services.users.is_suspended(sender_user).await? {
return Err!(Request(UserSuspended("You cannot perform this action while suspended.")));
@@ -67,10 +61,9 @@ pub(crate) async fn send_state_event_for_key_route(
/// Sends a state event into the room.
pub(crate) async fn send_state_event_for_empty_key_route(
State(services): State<crate::State>,
InsecureClientIp(ip): InsecureClientIp,
body: Ruma<send_state_event::v3::Request>,
) -> Result<RumaResponse<send_state_event::v3::Response>> {
send_state_event_for_key_route(State(services), InsecureClientIp(ip), body)
send_state_event_for_key_route(State(services), body)
.boxed()
.await
.map(RumaResponse)
@@ -158,7 +151,7 @@ pub(crate) async fn get_state_events_for_key_route(
"content": event.content(),
"event_id": event.event_id(),
"origin_server_ts": event.origin_server_ts(),
"room_id": event.room_id_or_hash(),
"room_id": event.room_id(),
"sender": event.sender(),
"state_key": event.state_key(),
"type": event.kind(),
@@ -192,7 +185,7 @@ async fn send_state_event_for_key_helper(
event_type: &StateEventType,
json: &Raw<AnyStateEventContent>,
state_key: &str,
timestamp: Option<MilliSecondsSinceUnixEpoch>,
timestamp: Option<ruma::MilliSecondsSinceUnixEpoch>,
) -> Result<OwnedEventId> {
allowed_to_send_state_event(services, room_id, event_type, state_key, json).await?;
let state_lock = services.rooms.state.mutex.lock(room_id).await;
+12 -34
View File
@@ -4,7 +4,7 @@ mod v5;
use std::collections::VecDeque;
use conduwuit::{
Event, PduCount, Result, debug_warn, err,
Event, PduCount, Result, err,
matrix::pdu::PduEvent,
ref_at, trace,
utils::stream::{BroadbandExt, ReadyExt, TryIgnore},
@@ -53,7 +53,7 @@ async fn load_timeline(
let last_timeline_count = services
.rooms
.timeline
.last_timeline_count(room_id)
.last_timeline_count(Some(sender_user), room_id)
.await
.map_err(|err| {
err!(Database(warn!("Failed to fetch end of room timeline: {}", err)))
@@ -71,24 +71,13 @@ async fn load_timeline(
services
.rooms
.timeline
.pdus_rev(room_id, ending_count.map(|count| count.saturating_add(1)))
.pdus_rev(
Some(sender_user),
room_id,
ending_count.map(|count| count.saturating_add(1)),
)
.ignore_err()
.ready_take_while(move |&(pducount, _)| pducount > starting_count)
.map(move |mut pdu| {
pdu.1.set_unsigned(Some(sender_user));
pdu
})
.then(async move |mut pdu| {
if let Err(e) = services
.rooms
.pdu_metadata
.add_bundled_aggregations_to_pdu(sender_user, &mut pdu.1)
.await
{
debug_warn!("Failed to add bundled aggregations: {e}");
}
pdu
})
.boxed()
},
| None => {
@@ -97,23 +86,12 @@ async fn load_timeline(
services
.rooms
.timeline
.pdus_rev(room_id, ending_count.map(|count| count.saturating_add(1)))
.pdus_rev(
Some(sender_user),
room_id,
ending_count.map(|count| count.saturating_add(1)),
)
.ignore_err()
.map(move |mut pdu| {
pdu.1.set_unsigned(Some(sender_user));
pdu
})
.then(async move |mut pdu| {
if let Err(e) = services
.rooms
.pdu_metadata
.add_bundled_aggregations_to_pdu(sender_user, &mut pdu.1)
.await
{
debug_warn!("Failed to add bundled aggregations: {e}");
}
pdu
})
.boxed()
},
};
-8
View File
@@ -9,7 +9,6 @@ use std::{
};
use axum::extract::State;
use axum_client_ip::InsecureClientIp;
use conduwuit::{
Result, extract_variant,
utils::{
@@ -181,7 +180,6 @@ type PresenceUpdates = HashMap<OwnedUserId, PresenceEventContent>;
)]
pub(crate) async fn sync_events_route(
State(services): State<crate::State>,
InsecureClientIp(client_ip): InsecureClientIp,
body: Ruma<sync_events::v3::Request>,
) -> Result<sync_events::v3::Response, RumaResponse<UiaaResponse>> {
let (sender_user, sender_device) = body.sender();
@@ -194,12 +192,6 @@ pub(crate) async fn sync_events_route(
.await?;
}
// Increment the "device last active" metadata
services
.users
.update_device_last_seen(sender_user, Some(sender_device), client_ip)
.await;
// Setup watchers, so if there's no response, we can wait for them
let watcher = services.sync.watch(sender_user, sender_device);
+1 -1
View File
@@ -127,7 +127,7 @@ pub(super) async fn build_state_incremental<'a>(
let last_pdu_of_last_sync = services
.rooms
.timeline
.pdus_rev(room_id, Some(last_sync_end_count.saturating_add(1)))
.pdus_rev(Some(sender_user), room_id, Some(last_sync_end_count.saturating_add(1)))
.boxed()
.next()
.await
-8
View File
@@ -6,7 +6,6 @@ use std::{
};
use axum::extract::State;
use axum_client_ip::InsecureClientIp;
use conduwuit::{
Err, Error, Result, at, error, extract_variant, is_equal_to,
matrix::{Event, TypeStateKey, pdu::PduCount},
@@ -62,18 +61,11 @@ type KnownRooms = BTreeMap<String, BTreeMap<OwnedRoomId, u64>>;
/// [MSC4186]: https://github.com/matrix-org/matrix-spec-proposals/pull/4186
pub(crate) async fn sync_events_v5_route(
State(ref services): State<crate::State>,
InsecureClientIp(client_ip): InsecureClientIp,
body: Ruma<sync_events::v5::Request>,
) -> Result<sync_events::v5::Response> {
debug_assert!(DEFAULT_BUMP_TYPES.is_sorted(), "DEFAULT_BUMP_TYPES is not sorted");
let sender_user = body.sender_user.as_ref().expect("user is authenticated");
let sender_device = body.sender_device.as_ref().expect("user is authenticated");
services
.users
.update_device_last_seen(sender_user, Some(sender_device), client_ip)
.await;
let mut body = body.body;
// Setup watchers, so if there's no response, we can wait for them
+1 -12
View File
@@ -1,6 +1,6 @@
use axum::extract::State;
use conduwuit::{
Result, at, debug_warn,
Result, at,
matrix::{
Event,
pdu::{PduCount, PduEvent},
@@ -45,17 +45,6 @@ pub(crate) async fn get_threads_route(
.await
.then_some((count, pdu))
})
.then(|(count, mut pdu)| async move {
if let Err(e) = services
.rooms
.pdu_metadata
.add_bundled_aggregations_to_pdu(body.sender_user(), &mut pdu)
.await
{
debug_warn!("Failed to add bundled aggregations to thread: {e}");
}
(count, pdu)
})
.collect()
.await;
-6
View File
@@ -1,5 +1,4 @@
use axum::extract::State;
use axum_client_ip::InsecureClientIp;
use conduwuit::{Err, Result, utils, utils::math::Tried};
use ruma::api::client::typing::create_typing_event;
@@ -10,15 +9,10 @@ use crate::Ruma;
/// Sets the typing state of the sender user.
pub(crate) async fn create_typing_event_route(
State(services): State<crate::State>,
InsecureClientIp(ip): InsecureClientIp,
body: Ruma<create_typing_event::v3::Request>,
) -> Result<create_typing_event::v3::Response> {
use create_typing_event::v3::Typing;
let sender_user = body.sender_user();
services
.users
.update_device_last_seen(sender_user, body.sender_device.as_deref(), ip)
.await;
if sender_user != body.user_id && body.appservice_info.is_none() {
return Err!(Request(Forbidden("You cannot update typing status of other users.")));
+1 -11
View File
@@ -3,7 +3,6 @@ use std::cmp;
use axum::extract::State;
use conduwuit::{
Event, PduCount, Result,
result::LogErr,
utils::{IterStream, ReadyExt, stream::TryTools},
};
use futures::{FutureExt, StreamExt, TryStreamExt};
@@ -63,7 +62,7 @@ pub(crate) async fn get_backfill_route(
pdus: services
.rooms
.timeline
.pdus_rev(&body.room_id, Some(from.saturating_add(1)))
.pdus_rev(None, &body.room_id, Some(from.saturating_add(1)))
.try_take(limit)
.try_filter_map(|(_, pdu)| async move {
Ok(services
@@ -73,15 +72,6 @@ pub(crate) async fn get_backfill_route(
.await
.then_some(pdu))
})
.and_then(async |mut pdu| {
// Strip the transaction ID, as that is private
pdu.remove_transaction_id().log_err().ok();
// Add age, as this is specified
pdu.add_age().log_err().ok();
// It's not clear if we should strip or add any more data, leave as is.
// In particular: Redaction?
Ok(pdu)
})
.try_filter_map(|pdu| async move {
Ok(services
.rooms
+51 -354
View File
@@ -1,327 +1,29 @@
use std::collections::HashMap;
use axum::extract::State;
use axum_client_ip::InsecureClientIp;
use base64::{Engine as _, engine::general_purpose};
use conduwuit::{
Err, Error, EventTypeExt, PduEvent, Result, RoomVersion, debug, debug_warn, err,
matrix::{Event, StateKey, event::gen_event_id},
trace,
Err, Error, PduEvent, Result, err,
matrix::{Event, event::gen_event_id},
utils::{self, hash::sha256},
warn,
};
use ruma::{
CanonicalJsonValue, OwnedUserId, RoomId, RoomVersionId, ServerName, UserId,
CanonicalJsonValue, OwnedUserId, UserId,
api::{client::error::ErrorKind, federation::membership::create_invite},
events::{
StateEventType,
room::{
create::RoomCreateEventContent,
member::{MembershipState, RoomMemberEventContent},
},
},
serde::JsonObject,
};
use serde_json::value::RawValue;
use service::{Services, rooms::timeline::pdu_fits};
use crate::Ruma;
/// Ensures that the state received from the invite endpoint is sane, correct,
/// and complies with the room version's requirements.
async fn check_invite_state(
services: &Services,
stripped_state: &Vec<Box<RawValue>>,
room_id: &RoomId,
room_version_id: &RoomVersionId,
) -> Result<()> {
let room_version = RoomVersion::new(room_version_id).map_err(|e| {
err!(Request(UnsupportedRoomVersion("Invalid room version provided: {e}")))
})?;
let mut room_state: HashMap<(StateEventType, StateKey), PduEvent> = HashMap::new();
// Build the room state from the provided state events,
// ensuring that there's no duplicates. We need to check that m.room.create is
// present and lines up with the other things we've been told, and then verify
// any signatures present to ensure this isn't forged.
for raw_event in stripped_state {
trace!("Processing invite state event: {:?}", raw_event);
let canonical = utils::to_canonical_object(raw_event)?;
let event_id = gen_event_id(&canonical, room_version_id)?;
let event = PduEvent::from_id_val(&event_id, canonical.clone())
.map_err(|e| err!(Request(InvalidParam("Invite state event is invalid: {e}"))))?;
if event.state_key().is_none() {
return Err!(Request(InvalidParam("Invite state event missing event type.")));
}
let key = event
.event_type()
.with_state_key(event.state_key().unwrap());
if room_state.contains_key(&key) {
return Err!(Request(InvalidParam("Duplicate state event found for {key:?}")));
}
// verify the event
if !pdu_fits(&canonical) {
return Err!(Request(InvalidParam(
"An invite state event ({event_id}) is too large"
)));
}
services
.server_keys
.verify_event(&canonical, Some(room_version_id))
.await
.map_err(|e| {
err!(Request(InvalidParam(
"Signature failed verification on event {event_id}: {e}"
)))
})?;
// Ensure all events are in the same room
if event.room_id_or_hash() != room_id {
return Err!(Request(InvalidParam(
"State event room ID for {} does not match the expected room ID {}.",
event.event_id,
room_id,
)));
}
room_state.insert(key, event);
}
// verify m.room.create is present, has a matching room ID, and a matching room
// version.
let create_event = room_state
.get(&(StateEventType::RoomCreate, "".into()))
.ok_or_else(|| err!(Request(MissingParam("Missing m.room.create in stripped state."))))?;
let create_event_content: RoomCreateEventContent = create_event
.get_content()
.map_err(|e| err!(Request(InvalidParam("Invalid m.room.create content: {e}"))))?;
// Room v12 removed room IDs over federation, so we'll need to see if the event
// ID matches the room ID instead.
if room_version.room_ids_as_hashes {
let given_room_id = create_event.event_id().as_str().replace('$', "!");
if given_room_id != room_id.as_str() {
return Err!(Request(InvalidParam(
"m.room.create event ID does not match the room ID."
)));
}
} else if create_event.room_id().unwrap() != room_id {
return Err!(Request(InvalidParam("m.room.create room ID does not match the room ID.")));
}
// Make sure the room version matches
if &create_event_content.room_version != room_version_id {
return Err!(Request(InvalidParam(
"m.room.create room version does not match the given room version."
)));
}
// Looks solid
Ok(())
}
/// Ensures that the invite event received from the invite endpoint is sane,
/// correct, and complies with the room version's requirements.
/// Returns the invited user ID on success.
async fn check_invite_event(
services: &Services,
invite_event: &PduEvent,
origin: &ServerName,
room_id: &RoomId,
room_version_id: &RoomVersionId,
) -> Result<OwnedUserId> {
// Check: The event sender is not a user ID on the origin server.
if invite_event.sender.server_name() != origin {
return Err!(Request(InvalidParam(
"Invite event sender's server does not match the origin server."
)));
}
// Check: The `state_key` is not a user ID on the receiving server.
let state_key: &UserId = invite_event
.state_key()
.ok_or_else(|| err!(Request(MissingParam("Invite event missing state_key."))))?
.try_into()
.map_err(|e| err!(Request(InvalidParam("Invalid state_key property: {e}"))))?;
if !services.globals.server_is_ours(state_key.server_name()) {
return Err!(Request(InvalidParam(
"Invite event state_key does not belong to this homeserver."
)));
}
// Check: The event's room ID matches the expected room ID.
if let Some(evt_room_id) = invite_event.room_id() {
if evt_room_id != room_id {
return Err!(Request(InvalidParam(
"Invite event room ID does not match the expected room ID."
)));
}
} else {
return Err!(Request(MissingParam("Invite event missing room ID.")));
}
// Check: the membership really is "invite"
let content = invite_event.get_content::<RoomMemberEventContent>()?;
if content.membership != MembershipState::Invite {
return Err!(Request(InvalidParam("Invite event is not a membership invite.")));
}
// Check: signature is valid
let as_obj = &mut utils::to_canonical_object(invite_event)?;
// remove the event_id before verification
as_obj.remove("event_id");
services
.server_keys
.verify_event(as_obj, Some(room_version_id))
.await
.map_err(|e| {
err!(Request(InvalidParam("Invite event signature failed verification: {e}")))
})?;
Ok(state_key.to_owned())
}
/// Performs only legacy checks on the invite, for use when the requesting
/// server doesn't support matrix v1.16 invites.
/// This is significantly less secure than the full checks and should only be
/// used if the user has allowed it.
async fn legacy_check_invite(
services: &Services,
origin: &ServerName,
invite_event: &PduEvent,
stripped_state: &Vec<Box<RawValue>>,
room_id: &RoomId,
room_version_id: &RoomVersionId,
) -> Result<OwnedUserId> {
// Ensure the sender is from origin, the state key is a user ID that points at a
// local user, the event type is m.room.member with membership "invite", and
// the room ID matches.
if invite_event.sender().server_name() != origin {
return Err!(Request(InvalidParam(
"Invite event sender's server does not match the origin server."
)));
}
let state_key: &UserId = invite_event
.state_key()
.ok_or_else(|| err!(Request(MissingParam("Invite event missing state_key."))))?
.try_into()
.map_err(|e| err!(Request(InvalidParam("Invalid state_key property: {e}"))))?;
if !services.globals.server_is_ours(state_key.server_name()) {
return Err!(Request(InvalidParam(
"Invite event state_key does not belong to this homeserver."
)));
}
if let Some(evt_room_id) = invite_event.room_id() {
if evt_room_id != room_id {
return Err!(Request(InvalidParam(
"Invite event room ID does not match the expected room ID."
)));
}
} else {
return Err!(Request(MissingParam("Invite event missing room ID.")));
}
let content = invite_event.get_content::<RoomMemberEventContent>()?;
if content.membership != MembershipState::Invite {
return Err!(Request(InvalidParam("Invite event is not a membership invite.")));
}
// We can also opportunistically check that the m.room.create event is present
// and matches the room version, to avoid accepting invites to rooms that
// don't match.
let mut has_create = false;
for raw_event in stripped_state {
let canonical = utils::to_canonical_object(raw_event)?;
if canonical.get("room_id").is_none() {
// This is a stripped event, skip
continue;
}
if let Some(event_type) = canonical.get("type") {
if event_type.as_str().unwrap_or_default() == "m.room.create" {
has_create = true;
let event_id = gen_event_id(&canonical, room_version_id)?;
let event = PduEvent::from_id_val(&event_id, canonical.clone()).map_err(|e| {
err!(Request(InvalidParam("Invite state event is invalid: {e}")))
})?;
// We can verify that the room ID is correct now
let version = RoomVersion::new(room_version_id)?;
if version.room_ids_as_hashes {
let given_room_id = event.event_id().as_str().replace('$', "!");
if given_room_id != room_id.as_str() {
return Err!(Request(InvalidParam(
"m.room.create event ID does not match the room ID."
)));
}
} else if event.room_id().unwrap() != room_id {
return Err!(Request(InvalidParam(
"m.room.create room ID does not match the room ID."
)));
}
// Everything's as fine as we're getting with this event
break;
}
}
}
if !has_create {
warn!(
"federated invite is missing m.room.create event in stripped state, the remote \
server is either outdated or trying something fishy."
);
}
Ok(state_key.to_owned())
}
/// Checks the incoming event is allowed and not forged.
/// If the MSC4311 enforcement experiment is enabled, performs full checks,
/// otherwise performs legacy checks only.
async fn check_invite(
services: &Services,
invite_event: &PduEvent,
stripped_state: &Vec<Box<RawValue>>,
origin: &ServerName,
room_id: &RoomId,
room_version_id: &RoomVersionId,
) -> Result<OwnedUserId> {
if services.config.experiments.enforce_msc4311 {
debug!("Checking invite event validity");
let user = check_invite_event(services, invite_event, origin, room_id, room_version_id)
.await
.inspect_err(|e| {
debug_warn!("Invite event validity check failed: {e}");
})?;
debug!("Checking invite state validity");
check_invite_state(services, stripped_state, room_id, room_version_id)
.await
.inspect_err(|e| {
debug_warn!("Invite state validity check failed: {e}");
})?;
Ok(user)
} else {
debug!("Performing legacy invite checks");
legacy_check_invite(
services,
origin,
invite_event,
stripped_state,
room_id,
room_version_id,
)
.await
.inspect_err(|e| {
debug_warn!("Legacy invite validity check failed: {e}");
})
}
}
/// # `PUT /_matrix/federation/v2/invite/{roomId}/{eventId}`
///
/// Invites a remote user to a room.
#[tracing::instrument(skip_all, fields(%client, room_id=?body.room_id), name = "invite")]
#[tracing::instrument(skip_all, fields(%client), name = "invite")]
pub(crate) async fn create_invite_route(
State(services): State<crate::State>,
InsecureClientIp(client): InsecureClientIp,
body: Ruma<create_invite::v2::Request>,
) -> Result<create_invite::v2::Response> {
debug!("Received invite request from {}: {:?}", body.room_id, body.origin());
// ACL check origin
services
.rooms
@@ -330,7 +32,6 @@ pub(crate) async fn create_invite_route(
.await?;
if !services.server.supported_room_version(&body.room_version) {
debug_warn!("Unsupported room version: {}", body.room_version);
return Err(Error::BadRequest(
ErrorKind::IncompatibleRoomVersion { room_version: body.room_version.clone() },
"Server does not support this room version.",
@@ -339,7 +40,6 @@ pub(crate) async fn create_invite_route(
if let Some(server) = body.room_id.server_name() {
if services.moderation.is_remote_server_forbidden(server) {
warn!("Received invite to room created by a banned server: {}. Rejecting.", server);
return Err!(Request(Forbidden("Server is banned on this homeserver.")));
}
}
@@ -349,7 +49,7 @@ pub(crate) async fn create_invite_route(
.is_remote_server_forbidden(body.origin())
{
warn!(
"Received invite from banned server {} for room ID {}. Rejecting.",
"Received federated/remote invite from banned server {} for room ID {}. Rejecting.",
body.origin(),
body.room_id
);
@@ -360,42 +60,17 @@ pub(crate) async fn create_invite_route(
let mut signed_event = utils::to_canonical_object(&body.event)
.map_err(|_| err!(Request(InvalidParam("Invite event is invalid."))))?;
// We need to hash and sign the event before we can generate the event ID.
// It is important that this signed event does not get sent back to the caller
// until we've verified this isn't incorrect.
trace!(event=?signed_event, "Hashing & signing invite event");
services
.server_keys
.hash_and_sign_event(&mut signed_event, &body.room_version)
.map_err(|e| err!(Request(InvalidParam("Failed to sign event: {e}"))))?;
let event_id = gen_event_id(&signed_event.clone(), &body.room_version)?;
if event_id != body.event_id {
warn!("Event ID mismatch: expected {}, got {}", event_id, body.event_id);
return Err!(Request(InvalidParam("Event ID does not match the generated event ID.")));
}
let recipient_user: OwnedUserId = signed_event
.get("state_key")
.try_into()
.map(UserId::to_owned)
.map_err(|e| err!(Request(InvalidParam("Invalid state_key property: {e}"))))?;
let pdu = PduEvent::from_id_val(&event_id, signed_event.clone())
.map_err(|e| err!(Request(InvalidParam("Invite event is invalid: {e}"))))?;
let recipient_user = check_invite(
&services,
&pdu,
&body.invite_room_state,
body.origin(),
&body.room_id,
&body.room_version,
)
.await?;
// Make sure the room isn't banned and we allow invites
if services.config.block_non_admin_invites && !services.users.is_admin(&recipient_user).await
if !services
.globals
.server_is_ours(recipient_user.server_name())
{
return Err!(Request(Forbidden("This server does not allow room invites.")));
}
if services.rooms.metadata.is_banned(&body.room_id).await
&& !services.users.is_admin(&recipient_user).await
{
return Err!(Request(Forbidden("This room is banned on this homeserver.")));
return Err!(Request(InvalidParam("User does not belong to this homeserver.")));
}
// Make sure we're not ACL'ed from their room.
@@ -405,9 +80,45 @@ pub(crate) async fn create_invite_route(
.acl_check(recipient_user.server_name(), &body.room_id)
.await?;
services
.server_keys
.hash_and_sign_event(&mut signed_event, &body.room_version)
.map_err(|e| err!(Request(InvalidParam("Failed to sign event: {e}"))))?;
// Generate event id
let event_id = gen_event_id(&signed_event, &body.room_version)?;
// Add event_id back
signed_event.insert("event_id".to_owned(), CanonicalJsonValue::String(event_id.to_string()));
let sender_user: &UserId = signed_event
.get("sender")
.try_into()
.map_err(|e| err!(Request(InvalidParam("Invalid sender property: {e}"))))?;
if services.rooms.metadata.is_banned(&body.room_id).await
&& !services.users.is_admin(&recipient_user).await
{
return Err!(Request(Forbidden("This room is banned on this homeserver.")));
}
if services.config.block_non_admin_invites && !services.users.is_admin(&recipient_user).await
{
return Err!(Request(Forbidden("This server does not allow room invites.")));
}
let mut invite_state = body.invite_room_state.clone();
let mut event: JsonObject = serde_json::from_str(body.event.get())
.map_err(|e| err!(Request(BadJson("Invalid invite event PDU: {e}"))))?;
event.insert("event_id".to_owned(), "$placeholder".into());
let pdu: PduEvent = serde_json::from_value(event.into())
.map_err(|e| err!(Request(BadJson("Invalid invite event PDU: {e}"))))?;
invite_state.push(pdu.to_format());
// If we are active in the room, the remote server will notify us about the
// join/invite through /send. If we are not in the room, we need to manually
// record the invited state for client /sync through update_membership(), and
@@ -418,19 +129,6 @@ pub(crate) async fn create_invite_route(
.server_in_room(services.globals.server_name(), &body.room_id)
.await
{
let mut invite_state: Vec<CanonicalJsonValue> = body
.invite_room_state
.clone()
.into_iter()
.map(|v| utils::to_canonical_object(&v).unwrap().into())
.collect();
invite_state.push(pdu.to_canonical_object().into());
let sender_user: &UserId = signed_event
.get("sender")
.try_into()
.map_err(|e| err!(Request(InvalidParam("Invalid sender property: {e}"))))?;
debug!("Marking user {} as invited to remote room {}", recipient_user, body.room_id);
services
.rooms
.state_cache
@@ -469,7 +167,6 @@ pub(crate) async fn create_invite_route(
}
}
debug!("Invite is valid, returning signed event");
Ok(create_invite::v2::Response {
event: services
.sending
+5 -1
View File
@@ -175,7 +175,11 @@ pub(crate) async fn create_knock_event_v1_route(
.send_pdu_room(&body.room_id, &pdu_id)
.await?;
let knock_room_state = services.rooms.state.summary(&pdu, &body.room_id).await;
let knock_room_state = services
.rooms
.state
.summary_stripped(&pdu, &body.room_id)
.await;
Ok(send_knock::v1::Response { knock_room_state })
}
-25
View File
@@ -1,25 +0,0 @@
mod msc4284_policy_servers;
use conduwuit_macros::config_example_generator;
use serde::Deserialize;
#[derive(Clone, Debug, Default, Deserialize)]
#[config_example_generator(filename = "conduwuit-example.toml", section = "global.experiments")]
pub struct Experiments {
/// Enforce MSC4311's updated requirements on all incoming invites.
///
/// This drastically increases the security and filtering capabilities
/// when processing invites over federation, at the cost of compatibility.
/// Servers that do not implement MSC4311 will be unable to send invites
/// to your server when this is enabled, including continuwuity 0.5.0 and
/// below.
///
/// default: false
/// Introduced in: (unreleased)
#[serde(default)]
pub enforce_msc4311: bool,
/// MSC4284 Policy Server support configuration.
#[serde(default)]
pub msc4284: msc4284_policy_servers::MSC4248,
}
@@ -1,58 +0,0 @@
use conduwuit_macros::config_example_generator;
use serde::Deserialize;
fn true_fn() -> bool { true }
fn default_federation_timeout() -> u64 { 25 }
#[derive(Clone, Debug, Default, Deserialize)]
#[config_example_generator(
filename = "conduwuit-example.toml",
section = "global.experiments.msc4284"
)]
pub struct MSC4248 {
/// Enable or disable making requests to MSC4284 Policy Servers.
/// It is recommended you keep this enabled unless you experience frequent
/// connectivity issues, such as in a restricted networking environment.
///
/// default: true
/// Introduced in: 0.5.0
#[serde(default = "true_fn")]
pub enabled: bool,
/// Enable running locally generated events through configured MSC4284
/// policy servers. You may wish to disable this if your server is
/// single-user for a slight speed benefit in some rooms, but otherwise
/// should leave it enabled.
///
/// If the room's policy server configuration requires event signatures,
/// this option is effectively ignored, as otherwise local events would
/// be rejected for missing the policy server's signature.
///
/// default: true
/// Introduced in: 0.5.0
#[serde(default = "true_fn")]
pub check_own_events: bool,
/// MSC4284 Policy server request timeout (seconds). Generally policy
/// servers should respond near instantly, however may slow down under
/// load. If a policy server doesn't respond in a short amount of time, the
/// room it is configured in may become unusable if this limit is set too
/// high. 25 seconds is a good default, however should be raised if you
/// experience too many connection issues.
///
/// Please be aware that policy requests are *NOT* currently re-tried, so if
/// a spam check request fails, the event will be assumed to be not spam,
/// which in some cases may result in spam being sent to or received from
/// the room that would typically be prevented.
///
/// If your request timeout is too low, and the policy server requires
/// signatures, you may find that you are unable to send events that are
/// accepted regardless.
///
/// About policy servers: https://matrix.org/blog/2025/04/introducing-policy-servers/
/// default: 25
/// Introduced in: 0.5.0
#[serde(default = "default_federation_timeout")]
pub request_timeout: u64,
}
+19 -27
View File
@@ -1,13 +1,12 @@
#![allow(clippy::doc_link_with_quotes)]
pub mod check;
pub mod experiments;
pub mod manager;
pub mod proxy;
use std::{
collections::{BTreeMap, BTreeSet},
net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr},
path::PathBuf,
path::{Path, PathBuf},
};
use conduwuit_macros::config_example_generator;
@@ -54,13 +53,9 @@ use crate::{Result, err, error::Error, utils::sys};
### For more information, see:
### https://continuwuity.org/configuration.html
"#,
ignore = "config_paths catchall well_known tls blurhashing allow_invalid_tls_certificates_yes_i_know_what_the_fuck_i_am_doing_with_this_and_i_know_this_is_insecure"
ignore = "catchall well_known tls blurhashing allow_invalid_tls_certificates_yes_i_know_what_the_fuck_i_am_doing_with_this_and_i_know_this_is_insecure"
)]
pub struct Config {
// Paths to config file(s). Not supposed to be set manually in the config file,
// only updated dynamically from the --config option given at runtime.
pub config_paths: Option<Vec<PathBuf>>,
/// The server_name is the pretty name of this server. It is used as a
/// suffix for user and room IDs/aliases.
///
@@ -708,13 +703,10 @@ pub struct Config {
pub allow_unstable_room_versions: bool,
/// Default room version continuwuity will create rooms with.
/// Note that this has to be a string since the room version is a string
/// rather than an integer. Forgetting the quotes will make the server fail
/// to start!
///
/// Per spec, room version "11" is the default.
/// Per spec, room version 11 is the default.
///
/// default: "11"
/// default: 11
#[serde(default = "default_default_room_version")]
pub default_room_version: RoomVersionId,
@@ -1735,9 +1727,16 @@ pub struct Config {
#[serde(default)]
pub block_non_admin_invites: bool,
/// Enable or disable making requests to MSC4284 Policy Servers.
/// It is recommended you keep this enabled unless you experience frequent
/// connectivity issues, such as in a restricted networking environment.
#[serde(default = "true_fn")]
pub enable_msc4284_policy_servers: bool,
/// Enable running locally generated events through configured MSC4284
/// policy servers. You may wish to disable this if your server is
/// single-user for a slight speed benefit in some rooms, but otherwise
/// should leave it enabled.
#[serde(default = "true_fn")]
pub policy_server_check_own_events: bool,
@@ -1997,13 +1996,6 @@ pub struct Config {
// external structure; separate section
#[serde(default)]
pub blurhashing: BlurhashConfig,
/// Configuration for protocol experiments that enable experimental
/// features. Each one is associated with a matrix spec proposal, a list of
/// which are published at https://spec.matrix.org/proposals/
#[serde(default)]
pub experiments: experiments::Experiments,
#[serde(flatten)]
#[allow(clippy::zero_sized_map_values)]
// this is a catchall, the map shouldn't be zero at runtime
@@ -2217,7 +2209,7 @@ struct ListeningAddr {
addrs: Either<IpAddr, Vec<IpAddr>>,
}
const DEPRECATED_KEYS: &[&str; 11] = &[
const DEPRECATED_KEYS: &[&str; 9] = &[
"cache_capacity",
"conduit_cache_capacity_modifier",
"max_concurrent_requests",
@@ -2227,30 +2219,30 @@ const DEPRECATED_KEYS: &[&str; 11] = &[
"well_known_support_role",
"well_known_support_email",
"well_known_support_mxid",
"enable_msc4284_policy_servers",
"policy_server_check_own_events",
];
impl Config {
/// Pre-initialize config
pub fn load(paths: &[PathBuf]) -> Result<Figment> {
pub fn load<'a, I>(paths: I) -> Result<Figment>
where
I: Iterator<Item = &'a Path>,
{
let envs = [
Env::var("CONDUIT_CONFIG"),
Env::var("CONDUWUIT_CONFIG"),
Env::var("CONTINUWUITY_CONFIG"),
];
let mut config = envs
let config = envs
.into_iter()
.flatten()
.map(Toml::file)
.chain(paths.iter().cloned().map(Toml::file))
.chain(paths.map(Toml::file))
.fold(Figment::new(), |config, file| config.merge(file.nested()))
.merge(Env::prefixed("CONDUIT_").global().split("__"))
.merge(Env::prefixed("CONDUWUIT_").global().split("__"))
.merge(Env::prefixed("CONTINUWUITY_").global().split("__"));
config = config.join(("config_paths", paths));
Ok(config)
}
+1 -2
View File
@@ -14,12 +14,11 @@ pub const STABLE_ROOM_VERSIONS: &[RoomVersionId] = &[
RoomVersionId::V9,
RoomVersionId::V10,
RoomVersionId::V11,
RoomVersionId::V12,
];
/// Experimental, partially supported room versions
pub const UNSTABLE_ROOM_VERSIONS: &[RoomVersionId] =
&[RoomVersionId::V3, RoomVersionId::V4, RoomVersionId::V5];
&[RoomVersionId::V3, RoomVersionId::V4, RoomVersionId::V5, RoomVersionId::V12];
type RoomVersion = (RoomVersionId, RoomVersionStability);
+2 -2
View File
@@ -56,7 +56,7 @@ impl<'a, E: Event> From<Ref<'a, E>> for Raw<AnyTimelineEvent> {
"content": content,
"event_id": event.event_id(),
"origin_server_ts": event.origin_server_ts(),
"room_id": event.room_id_or_hash(),
"room_id": event.room_id(),
"sender": event.sender(),
"type": event.kind(),
});
@@ -117,7 +117,7 @@ impl<'a, E: Event> From<Ref<'a, E>> for Raw<AnyStateEvent> {
"content": event.content(),
"event_id": event.event_id(),
"origin_server_ts": event.origin_server_ts(),
"room_id": event.room_id_or_hash(),
"room_id": event.room_id(),
"sender": event.sender(),
"state_key": event.state_key(),
"type": event.kind(),
+2 -15
View File
@@ -1,23 +1,10 @@
use std::{borrow::Borrow, collections::BTreeMap};
use std::collections::BTreeMap;
use ruma::MilliSecondsSinceUnixEpoch;
use serde_json::value::{RawValue as RawJsonValue, Value as JsonValue, to_raw_value};
use super::Pdu;
use crate::{Result, err, implement, result::LogErr};
/// Set the `unsigned` field of the PDU using only information in the PDU.
/// Some unsigned data is already set within the database (eg. prev events,
/// threads). Once this is done, other data must be calculated from the database
/// (eg. relations) This is for server-to-client events.
/// Backfill handles this itself.
#[implement(Pdu)]
pub fn set_unsigned(&mut self, user_id: Option<&ruma::UserId>) {
if Some(self.sender.borrow()) != user_id {
self.remove_transaction_id().log_err().ok();
}
self.add_age().log_err().ok();
}
use crate::{Result, err, implement};
#[implement(Pdu)]
pub fn remove_transaction_id(&mut self) -> Result {
+5
View File
@@ -177,6 +177,11 @@ where
// [synapse] do_sig_check check the event has valid signatures for member events
// TODO do_size_check is false when called by `iterative_auth_check`
// do_size_check is also mostly accomplished by ruma with the exception of
// checking event_type, state_key, and json are below a certain size (255 and
// 65_536 respectively)
let sender = incoming_event.sender();
// Implementation of https://spec.matrix.org/latest/rooms/v1/#authorization-rules
+8 -3
View File
@@ -1,4 +1,4 @@
use std::sync::Arc;
use std::{path::PathBuf, sync::Arc};
use conduwuit_core::{
Error, Result,
@@ -38,9 +38,14 @@ impl Server {
) -> Result<Arc<Self>, Error> {
let _runtime_guard = runtime.map(runtime::Handle::enter);
let config_paths = args.config.clone().unwrap_or_default();
let config_paths = args
.config
.as_deref()
.into_iter()
.flat_map(<[_]>::iter)
.map(PathBuf::as_path);
let config = Config::load(&config_paths)
let config = Config::load(config_paths)
.and_then(|raw| update(raw, args))
.and_then(|raw| Config::new(&raw))?;
+6 -4
View File
@@ -1,4 +1,4 @@
use std::{ops::Deref, path::PathBuf, sync::Arc};
use std::{iter, ops::Deref, path::Path, sync::Arc};
use async_trait::async_trait;
use conduwuit::{
@@ -51,8 +51,7 @@ fn handle_reload(&self) -> Result {
])
.expect("failed to notify systemd of reloading state");
let config_paths = self.server.config.config_paths.clone().unwrap_or_default();
self.reload(&config_paths)?;
self.reload(iter::empty())?;
#[cfg(all(feature = "systemd", target_os = "linux"))]
sd_notify::notify(false, &[sd_notify::NotifyState::Ready])
@@ -63,7 +62,10 @@ fn handle_reload(&self) -> Result {
}
#[implement(Service)]
pub fn reload(&self, paths: &[PathBuf]) -> Result<Arc<Config>> {
pub fn reload<'a, I>(&self, paths: I) -> Result<Arc<Config>>
where
I: Iterator<Item = &'a Path>,
{
let old = self.server.config.clone();
let new = Config::load(paths).and_then(|raw| Config::new(&raw))?;
+4 -5
View File
@@ -31,13 +31,12 @@ impl crate::Service for Service {
let turn_secret = config.turn_secret_file.as_ref().map_or_else(
|| config.turn_secret.clone(),
|path| match std::fs::read_to_string(path) {
| Ok(secret) => secret.trim().to_owned(),
| Err(e) => {
|path| {
std::fs::read_to_string(path).unwrap_or_else(|e| {
error!("Failed to read the TURN secret file: {e}");
config.turn_secret.clone()
},
})
},
);
@@ -50,7 +49,7 @@ impl crate::Service for Service {
return config.registration_token.clone();
};
Some(token.trim().to_owned())
Some(token)
},
);
+16 -24
View File
@@ -590,10 +590,6 @@ async fn fix_readreceiptid_readreceipt_duplicates(services: &Services) -> Result
const FIXED_CORRUPT_MSC4133_FIELDS_MARKER: &[u8] = b"fix_corrupt_msc4133_fields";
async fn fix_corrupt_msc4133_fields(services: &Services) -> Result {
// Due to an old bug, some conduwuit databases have `us.cloke.msc4175.tz` user
// profile fields with raw strings instead of quoted JSON ones.
// This migration fixes that.
use serde_json::{Value, from_slice};
type KeyVal<'a> = ((OwnedUserId, String), &'a [u8]);
@@ -610,28 +606,24 @@ async fn fix_corrupt_msc4133_fields(services: &Services) -> Result {
async |(mut total, mut fixed),
((user, key), value): KeyVal<'_>|
-> Result<(usize, usize)> {
match from_slice::<Value>(value) {
// corrupted timezone field
| Err(_) if key == "us.cloke.msc4175.tz" => {
let new_value = Value::String(String::from_utf8(value.to_vec())?);
useridprofilekey_value.put((user, key), Json(new_value));
fixed = fixed.saturating_add(1);
},
// corrupted value for some other key
| Err(error) => {
warn!(
"deleting MSC4133 key {} for user {} due to deserialization \
failure: {}",
key, user, error
if let Err(error) = from_slice::<Value>(value) {
// Due to an old bug, some conduwuit databases have `us.cloke.msc4175.tz` user
// profile fields with raw strings instead of quoted JSON ones.
// This migration fixes that.
let new_value = if key == "us.cloke.msc4175.tz" {
Value::String(String::from_utf8(value.to_vec())?)
} else {
return Err!(
"failed to deserialize msc4133 key {} of user {}: {}",
key,
user,
error
);
useridprofilekey_value.del((user, key));
},
// other key with no issues
| Ok(_) => {
// do nothing
},
}
};
useridprofilekey_value.put((user, key), Json(new_value));
fixed = fixed.saturating_add(1);
}
total = total.saturating_add(1);
Ok((total, fixed))
@@ -14,7 +14,7 @@ use futures::{
use ruma::{CanonicalJsonValue, EventId, RoomId, ServerName, UserId, events::StateEventType};
use tracing::debug;
use crate::rooms::timeline::{RawPduId, pdu_fits};
use crate::rooms::timeline::RawPduId;
/// When receiving an event one needs to:
/// 0. Check the server is in the room
@@ -62,13 +62,6 @@ pub async fn handle_incoming_pdu<'a>(
if let Ok(pdu_id) = self.services.timeline.get_pdu_id(event_id).await {
return Ok(Some(pdu_id));
}
if !pdu_fits(&mut value.clone()) {
warn!(
"dropping incoming PDU {event_id} in room {room_id} from {origin} because it \
exceeds 65535 bytes or is otherwise too large."
);
return Err!(Request(TooLarge("PDU is too large")));
}
// 1.1 Check the server is in the room
let meta_exists = self.services.metadata.exists(room_id).map(Ok);
@@ -1,8 +1,7 @@
use std::collections::{BTreeMap, HashMap, hash_map};
use conduwuit::{
Err, Event, PduEvent, Result, debug, debug_info, debug_warn, err, implement, state_res,
trace, warn,
Err, Event, PduEvent, Result, debug, debug_info, debug_warn, err, implement, state_res, trace,
};
use futures::future::ready;
use ruma::{
@@ -11,7 +10,6 @@ use ruma::{
};
use super::{check_room_id, get_room_version_id, to_room_version};
use crate::rooms::timeline::pdu_fits;
#[implement(super::Service)]
#[allow(clippy::too_many_arguments)]
@@ -27,13 +25,6 @@ pub(super) async fn handle_outlier_pdu<'a, Pdu>(
where
Pdu: Event + Send + Sync,
{
if !pdu_fits(&mut value.clone()) {
warn!(
"dropping incoming PDU {event_id} in room {room_id} from {origin} because it \
exceeds 65535 bytes or is otherwise too large."
);
return Err!(Request(TooLarge("PDU is too large")));
}
// 1. Remove unsigned field
value.remove("unsigned");
+20 -175
View File
@@ -3,21 +3,14 @@
//! This module implements a check against a room-specific policy server, as
//! described in the relevant Matrix spec proposal (see: https://github.com/matrix-org/matrix-spec-proposals/pull/4284).
use std::{collections::BTreeMap, time::Duration};
use std::time::Duration;
use conduwuit::{
Err, Event, PduEvent, Result, debug, debug_error, debug_info, debug_warn, implement, trace,
warn,
};
use conduwuit::{Err, Event, PduEvent, Result, debug, debug_info, implement, trace, warn};
use ruma::{
CanonicalJsonObject, CanonicalJsonValue, KeyId, RoomId, ServerName, SigningKeyId,
api::federation::room::{
policy_check::unstable::Request as PolicyCheckRequest,
policy_sign::unstable::Request as PolicySignRequest,
},
CanonicalJsonObject, RoomId, ServerName,
api::federation::room::policy::v1::Request as PolicyRequest,
events::{StateEventType, room::policy::RoomPolicyEventContent},
};
use serde_json::value::RawValue;
/// Asks a remote policy server if the event is allowed.
///
@@ -31,18 +24,25 @@ use serde_json::value::RawValue;
/// contacted for whatever reason, Err(e) is returned, which generally is a
/// fail-open operation.
#[implement(super::Service)]
#[tracing::instrument(skip(self, pdu, pdu_json, room_id))]
#[tracing::instrument(skip_all, level = "debug")]
pub async fn ask_policy_server(
&self,
pdu: &PduEvent,
pdu_json: &mut CanonicalJsonObject,
pdu_json: &CanonicalJsonObject,
room_id: &RoomId,
incoming: bool,
) -> Result<bool> {
if !self.services.server.config.enable_msc4284_policy_servers {
trace!("policy server checking is disabled");
return Ok(true); // don't ever contact policy servers
}
if self.services.server.config.policy_server_check_own_events
&& pdu.origin.is_some()
&& self
.services
.server
.is_ours(pdu.origin.as_ref().unwrap().as_str())
{
return Ok(true); // don't contact policy servers for locally generated events
}
if *pdu.event_type() == StateEventType::RoomPolicy.into() {
debug!(
@@ -52,29 +52,16 @@ pub async fn ask_policy_server(
);
return Ok(true);
}
let Ok(policyserver) = self
.services
.state_accessor
.room_state_get_content(room_id, &StateEventType::RoomPolicy, "")
.await
.inspect_err(|e| debug_error!("failed to load room policy server state event: {e}"))
.map(|c: RoomPolicyEventContent| c)
else {
debug!("room has no policy server configured");
return Ok(true);
};
if self.services.server.config.policy_server_check_own_events
&& !incoming
&& policyserver.public_key.is_none()
{
// don't contact policy servers for locally generated events, but only when the
// policy server does not require signatures
trace!("won't contact policy server for locally generated event");
return Ok(true);
}
let via = match policyserver.via {
| Some(ref via) => ServerName::parse(via)?,
| None => {
@@ -88,6 +75,7 @@ pub async fn ask_policy_server(
}
if !self.services.state_cache.server_in_room(via, room_id).await {
debug!(
room_id = %room_id,
via = %via,
"Policy server is not in the room, skipping spam check"
);
@@ -98,43 +86,17 @@ pub async fn ask_policy_server(
.sending
.convert_to_outgoing_federation_event(pdu_json.clone())
.await;
if policyserver.public_key.is_some() {
if !incoming {
debug_info!(
via = %via,
outgoing = ?pdu_json,
"Getting policy server signature on event"
);
return self
.fetch_policy_server_signature(pdu, pdu_json, via, outgoing, room_id)
.await;
}
// for incoming events, is it signed by <via> with the key
// "ed25519:policy_server"?
if let Some(CanonicalJsonValue::Object(sigs)) = pdu_json.get("signatures") {
if let Some(CanonicalJsonValue::Object(server_sigs)) = sigs.get(via.as_str()) {
let wanted_key_id: &KeyId<ruma::SigningKeyAlgorithm, ruma::Base64PublicKey> =
SigningKeyId::parse("ed25519:policy_server")?;
if let Some(CanonicalJsonValue::String(_sig_value)) =
server_sigs.get(wanted_key_id.as_str())
{
// TODO: verify signature
}
}
}
debug!(
"Event is not local and has no policy server signature, performing legacy spam check"
);
}
debug_info!(
room_id = %room_id,
via = %via,
"Checking event for spam with policy server via legacy check"
outgoing = ?pdu_json,
"Checking event for spam with policy server"
);
let response = tokio::time::timeout(
Duration::from_secs(self.services.server.config.policy_server_request_timeout),
self.services
.sending
.send_federation_request(via, PolicyCheckRequest {
.send_federation_request(via, PolicyRequest {
event_id: pdu.event_id().to_owned(),
pdu: Some(outgoing),
}),
@@ -180,120 +142,3 @@ pub async fn ask_policy_server(
Ok(true)
}
/// Asks a remote policy server for a signature on this event.
/// If the policy server signs this event, the original data is mutated.
#[implement(super::Service)]
#[tracing::instrument(skip_all, fields(event_id=%pdu.event_id(), via=%via))]
pub async fn fetch_policy_server_signature(
&self,
pdu: &PduEvent,
pdu_json: &mut CanonicalJsonObject,
via: &ServerName,
outgoing: Box<RawValue>,
room_id: &RoomId,
) -> Result<bool> {
debug!("Requesting policy server signature");
let response = tokio::time::timeout(
Duration::from_secs(self.services.server.config.policy_server_request_timeout),
self.services
.sending
.send_federation_request(via, PolicySignRequest { pdu: outgoing }),
)
.await;
let response = match response {
| Ok(Ok(response)) => {
debug!("Response from policy server: {:?}", response);
response
},
| Ok(Err(e)) => {
warn!(
via = %via,
event_id = %pdu.event_id(),
room_id = %room_id,
"Failed to contact policy server: {e}"
);
// Network or policy server errors are treated as non-fatal: event is allowed by
// default.
return Err(e);
},
| Err(elapsed) => {
warn!(
%via,
event_id = %pdu.event_id(),
%room_id,
%elapsed,
"Policy server request timed out after 10 seconds"
);
return Err!("Request to policy server timed out");
},
};
if response.signatures.is_none() {
debug!("Policy server refused to sign event");
return Ok(false);
}
let sigs: ruma::Signatures<ruma::OwnedServerName, ruma::ServerSigningKeyVersion> =
response.signatures.unwrap();
if !sigs.contains_key(via) {
debug_warn!(
"Policy server returned signatures, but did not include the expected server name \
'{}': {:?}",
via,
sigs
);
return Ok(false);
}
let keypairs = sigs.get(via).unwrap();
let wanted_key_id = KeyId::parse("ed25519:policy_server")?;
if !keypairs.contains_key(wanted_key_id) {
debug_warn!(
"Policy server returned signature, but did not use the key ID \
'ed25519:policy_server'."
);
return Ok(false);
}
let signatures_entry = pdu_json
.entry("signatures".to_owned())
.or_insert_with(|| CanonicalJsonValue::Object(BTreeMap::default()));
if let CanonicalJsonValue::Object(signatures_map) = signatures_entry {
let sig_value = keypairs.get(wanted_key_id).unwrap().to_owned();
match signatures_map.get_mut(via.as_str()) {
| Some(CanonicalJsonValue::Object(inner_map)) => {
trace!("inserting PS signature: {}", sig_value);
inner_map.insert(
"ed25519:policy_server".to_owned(),
CanonicalJsonValue::String(sig_value),
);
},
| Some(_) => {
debug_warn!(
"Existing `signatures[{}]` field is not an object; cannot insert policy \
signature",
via
);
return Ok(false);
},
| None => {
let mut inner = BTreeMap::new();
inner.insert(
"ed25519:policy_server".to_owned(),
CanonicalJsonValue::String(sig_value.clone()),
);
trace!(
"created new signatures object for {via} with the signature {}",
sig_value
);
signatures_map.insert(via.as_str().to_owned(), CanonicalJsonValue::Object(inner));
},
}
} else {
debug_warn!(
"Existing `signatures` field is not an object; cannot insert policy signature"
);
return Ok(false);
}
Ok(true)
}
@@ -256,12 +256,7 @@ where
if incoming_pdu.state_key.is_none() {
debug!(event_id = %incoming_pdu.event_id, "Checking policy server for event");
match self
.ask_policy_server(
&incoming_pdu,
&mut incoming_pdu.to_canonical_object(),
room_id,
true,
)
.ask_policy_server(&incoming_pdu, &incoming_pdu.to_canonical_object(), room_id)
.await
{
| Ok(false) => {
@@ -1,746 +0,0 @@
use conduwuit::{Event, PduEvent, Result, err};
use ruma::{
UserId,
api::Direction,
events::relation::{BundledMessageLikeRelations, BundledReference, ReferenceChunk},
};
use crate::rooms::timeline::PdusIterItem;
const MAX_BUNDLED_RELATIONS: usize = 50;
impl super::Service {
/// Gets bundled aggregations for an event according to the Matrix
/// specification.
/// - m.replace relations are bundled to include the most recent replacement
/// event.
/// - m.reference relations are bundled to include a chunk of event IDs.
#[tracing::instrument(skip(self), level = "debug")]
pub async fn get_bundled_aggregations(
&self,
user_id: &UserId,
pdu: &PduEvent,
) -> Result<Option<BundledMessageLikeRelations<Box<serde_json::value::RawValue>>>> {
// Events that can never get bundled aggregations
if pdu.state_key().is_some() || Self::is_replacement_event(pdu) {
return Ok(None);
}
let relations = self
.get_relations(
user_id,
&pdu.room_id_or_hash(),
pdu.event_id(),
conduwuit::PduCount::max(),
MAX_BUNDLED_RELATIONS,
0,
Direction::Backward,
)
.await;
// The relations database code still handles the basic unsigned data
// We don't want to recursively fetch relations
if relations.is_empty() {
return Ok(None);
}
// Partition relations by type
let (replace_events, reference_events): (Vec<_>, Vec<_>) = relations
.iter()
.filter_map(|relation| {
let pdu = &relation.1;
let content = pdu.get_content_as_value();
content
.get("m.relates_to")
.and_then(|relates_to| relates_to.get("rel_type"))
.and_then(|rel_type| rel_type.as_str())
.and_then(|rel_type_str| match rel_type_str {
| "m.replace" => Some(RelationType::Replace(relation)),
| "m.reference" => Some(RelationType::Reference(relation)),
| _ => None, /* Ignore other relation types (threads are in DB but not
* handled here) */
})
})
.fold((Vec::new(), Vec::new()), |(mut replaces, mut references), rel_type| {
match rel_type {
| RelationType::Replace(r) => replaces.push(r),
| RelationType::Reference(r) => references.push(r),
}
(replaces, references)
});
// If no relations to bundle, return None
if replace_events.is_empty() && reference_events.is_empty() {
return Ok(None);
}
let mut bundled = BundledMessageLikeRelations::<Box<serde_json::value::RawValue>>::new();
// Handle m.replace relations - find the most recent valid one (lazy load
// original event)
if !replace_events.is_empty() {
if let Some(replacement) = self
.find_most_recent_valid_replacement(user_id, pdu, &replace_events)
.await?
{
bundled.replace = Some(Self::serialize_replacement(replacement)?);
}
}
// Handle m.reference relations - collect event IDs
if !reference_events.is_empty() {
let reference_chunk: Vec<_> = reference_events
.into_iter()
.map(|relation| BundledReference::new(relation.1.event_id().to_owned()))
.collect();
if !reference_chunk.is_empty() {
bundled.reference = Some(Box::new(ReferenceChunk::new(reference_chunk)));
}
}
// TODO: Handle other relation types (m.annotation, etc.) when specified
Ok(Some(bundled))
}
/// Serialize a replacement event to the bundled format
fn serialize_replacement(pdu: &PduEvent) -> Result<Box<Box<serde_json::value::RawValue>>> {
let replacement_json = serde_json::to_string(pdu)
.map_err(|e| err!(Database("Failed to serialize replacement event: {e}")))?;
let raw_value = serde_json::value::RawValue::from_string(replacement_json)
.map_err(|e| err!(Database("Failed to create RawValue: {e}")))?;
Ok(Box::new(raw_value))
}
/// Find the most recent valid replacement event based on origin_server_ts
/// and lexicographic event_id ordering
async fn find_most_recent_valid_replacement<'a>(
&self,
user_id: &UserId,
original_event: &PduEvent,
replacement_events: &[&'a PdusIterItem],
) -> Result<Option<&'a PduEvent>> {
// Filter valid replacements and find the maximum in a single pass
let mut result: Option<&PduEvent> = None;
for relation in replacement_events {
let pdu = &relation.1;
// Validate replacement
if !Self::is_valid_replacement_event(original_event, pdu).await? {
continue;
}
let next = match result {
| None => Some(pdu),
| Some(current) => {
// Compare by origin_server_ts first, then event_id lexicographically
match pdu.origin_server_ts().cmp(&current.origin_server_ts()) {
| std::cmp::Ordering::Greater => Some(pdu),
| std::cmp::Ordering::Equal if pdu.event_id() > current.event_id() =>
Some(pdu),
| _ => None,
}
},
};
if let Some(pdu) = next
&& self
.services
.state_accessor
.user_can_see_event(user_id, &pdu.room_id_or_hash(), pdu.event_id())
.await
{
result = Some(pdu);
}
}
Ok(result)
}
/// Adds bundled aggregations to a PDU's unsigned field
#[tracing::instrument(skip(self, pdu), level = "debug")]
pub async fn add_bundled_aggregations_to_pdu(
&self,
user_id: &UserId,
pdu: &mut PduEvent,
) -> Result<()> {
if pdu.is_redacted() {
return Ok(());
}
let bundled_aggregations = self.get_bundled_aggregations(user_id, pdu).await?;
if let Some(aggregations) = bundled_aggregations {
let aggregations_json = serde_json::to_value(aggregations)
.map_err(|e| err!(Database("Failed to serialize bundled aggregations: {e}")))?;
Self::add_bundled_aggregations_to_unsigned(pdu, aggregations_json)?;
}
Ok(())
}
/// Helper method to add bundled aggregations to a PDU's unsigned field
fn add_bundled_aggregations_to_unsigned(
pdu: &mut PduEvent,
aggregations_json: serde_json::Value,
) -> Result<()> {
use serde_json::{
Map, Value as JsonValue,
value::{RawValue as RawJsonValue, to_raw_value},
};
let mut unsigned: Map<String, JsonValue> = pdu
.unsigned
.as_deref()
.map(RawJsonValue::get)
.map_or_else(|| Ok(Map::new()), serde_json::from_str)
.map_err(|e| err!(Database("Invalid unsigned in pdu event: {e}")))?;
let relations = unsigned
.entry("m.relations")
.or_insert_with(|| JsonValue::Object(Map::new()))
.as_object_mut()
.ok_or_else(|| err!(Database("m.relations is not an object")))?;
if let JsonValue::Object(aggregations_map) = aggregations_json {
relations.extend(aggregations_map);
}
pdu.unsigned = Some(to_raw_value(&unsigned)?);
Ok(())
}
/// Validates that an event is acceptable as a replacement for another event
/// See C/S spec "Validity of replacement events"
#[tracing::instrument(level = "debug")]
async fn is_valid_replacement_event(
original_event: &PduEvent,
replacement_event: &PduEvent,
) -> Result<bool> {
Ok(
// 1. Same room_id
original_event.room_id() == replacement_event.room_id()
// 2. Same sender
&& original_event.sender() == replacement_event.sender()
// 3. Same type
&& original_event.event_type() == replacement_event.event_type()
// 4. Neither event should have a state_key property
&& original_event.state_key().is_none()
&& replacement_event.state_key().is_none()
// 5. Original event must not have rel_type of m.replace
&& !Self::is_replacement_event(original_event)
// 6. Replacement event must have m.new_content property (skip for encrypted)
&& Self::has_new_content_or_encrypted(replacement_event),
)
}
/// Check if an event is itself a replacement
#[inline]
fn is_replacement_event(event: &PduEvent) -> bool {
event
.get_content_as_value()
.get("m.relates_to")
.and_then(|relates_to| relates_to.get("rel_type"))
.and_then(|rel_type| rel_type.as_str())
.is_some_and(|rel_type| rel_type == "m.replace")
}
/// Check if event has m.new_content or is encrypted (where m.new_content
/// would be in the encrypted payload)
#[inline]
fn has_new_content_or_encrypted(event: &PduEvent) -> bool {
event.event_type() == &ruma::events::TimelineEventType::RoomEncrypted
|| event.get_content_as_value().get("m.new_content").is_some()
}
}
/// Helper enum for partitioning relations
enum RelationType<'a> {
Replace(&'a PdusIterItem),
Reference(&'a PdusIterItem),
}
#[cfg(test)]
mod tests {
use conduwuit_core::pdu::{EventHash, PduEvent};
use ruma::{UInt, events::TimelineEventType, owned_event_id, owned_room_id, owned_user_id};
use serde_json::{Value as JsonValue, json, value::to_raw_value};
fn create_test_pdu(unsigned_content: Option<JsonValue>) -> PduEvent {
PduEvent {
event_id: owned_event_id!("$test:example.com"),
room_id: Some(owned_room_id!("!test:example.com")),
sender: owned_user_id!("@test:example.com"),
origin_server_ts: UInt::try_from(1_234_567_890_u64).unwrap(),
kind: TimelineEventType::RoomMessage,
content: to_raw_value(&json!({"msgtype": "m.text", "body": "test"})).unwrap(),
state_key: None,
prev_events: vec![],
depth: UInt::from(1_u32),
auth_events: vec![],
redacts: None,
unsigned: unsigned_content.map(|content| to_raw_value(&content).unwrap()),
hashes: EventHash { sha256: "test_hash".to_owned() },
signatures: None,
origin: None,
}
}
fn create_bundled_aggregations() -> JsonValue {
json!({
"m.replace": {
"event_id": "$replace:example.com",
"origin_server_ts": 1_234_567_890,
"sender": "@replacer:example.com"
},
"m.reference": {
"count": 5,
"chunk": [
"$ref1:example.com",
"$ref2:example.com"
]
}
})
}
#[test]
fn test_add_bundled_aggregations_to_unsigned_no_existing_unsigned() {
let mut pdu = create_test_pdu(None);
let aggregations = create_bundled_aggregations();
let result = super::super::Service::add_bundled_aggregations_to_unsigned(
&mut pdu,
aggregations.clone(),
);
assert!(result.is_ok(), "Should succeed when no unsigned field exists");
assert!(pdu.unsigned.is_some(), "Unsigned field should be created");
let unsigned_str = pdu.unsigned.as_ref().unwrap().get();
let unsigned: JsonValue = serde_json::from_str(unsigned_str).unwrap();
assert!(unsigned.get("m.relations").is_some(), "m.relations should exist");
assert_eq!(
unsigned["m.relations"], aggregations,
"Relations should match the aggregations"
);
}
#[test]
fn test_add_bundled_aggregations_to_unsigned_overwrite_same_relation_type() {
let existing_unsigned = json!({
"m.relations": {
"m.replace": {
"event_id": "$old_replace:example.com",
"origin_server_ts": 1_111_111_111,
"sender": "@old_replacer:example.com"
}
}
});
let mut pdu = create_test_pdu(Some(existing_unsigned));
let new_aggregations = create_bundled_aggregations();
let result = super::super::Service::add_bundled_aggregations_to_unsigned(
&mut pdu,
new_aggregations.clone(),
);
assert!(result.is_ok(), "Should succeed when overwriting same relation type");
let unsigned_str = pdu.unsigned.as_ref().unwrap().get();
let unsigned: JsonValue = serde_json::from_str(unsigned_str).unwrap();
let relations = &unsigned["m.relations"];
assert_eq!(
relations["m.replace"], new_aggregations["m.replace"],
"m.replace should be updated"
);
assert_eq!(
relations["m.replace"]["event_id"], "$replace:example.com",
"Should have new event_id"
);
assert!(relations.get("m.reference").is_some(), "New m.reference should be added");
}
#[test]
fn test_add_bundled_aggregations_to_unsigned_preserve_other_unsigned_fields() {
// Test case: Other unsigned fields should be preserved
let existing_unsigned = json!({
"age": 98765,
"prev_content": {"msgtype": "m.text", "body": "old message"},
"redacted_because": {"event_id": "$redaction:example.com"},
"m.relations": {
"m.annotation": {"count": 1}
}
});
let mut pdu = create_test_pdu(Some(existing_unsigned));
let new_aggregations = json!({
"m.replace": {"event_id": "$new:example.com"}
});
let result = super::super::Service::add_bundled_aggregations_to_unsigned(
&mut pdu,
new_aggregations,
);
assert!(result.is_ok(), "Should succeed while preserving other fields");
let unsigned_str = pdu.unsigned.as_ref().unwrap().get();
let unsigned: JsonValue = serde_json::from_str(unsigned_str).unwrap();
// Verify all existing fields are preserved
assert_eq!(unsigned["age"], 98765, "age should be preserved");
assert!(unsigned.get("prev_content").is_some(), "prev_content should be preserved");
assert!(
unsigned.get("redacted_because").is_some(),
"redacted_because should be preserved"
);
// Verify relations were merged correctly
let relations = &unsigned["m.relations"];
assert!(
relations.get("m.annotation").is_some(),
"Existing m.annotation should be preserved"
);
assert!(relations.get("m.replace").is_some(), "New m.replace should be added");
}
#[test]
fn test_add_bundled_aggregations_to_unsigned_invalid_existing_unsigned() {
// Test case: Invalid JSON in existing unsigned should result in error
let mut pdu = create_test_pdu(None);
// Manually set invalid unsigned data
pdu.unsigned = Some(to_raw_value(&"invalid json").unwrap());
let aggregations = create_bundled_aggregations();
let result =
super::super::Service::add_bundled_aggregations_to_unsigned(&mut pdu, aggregations);
assert!(result.is_err(), "fails when existing unsigned is invalid");
// Should we ignore the error and overwrite anyway?
}
// Test helper function to create test PDU events
fn create_test_event(
event_id: &str,
room_id: &str,
sender: &str,
event_type: TimelineEventType,
content: &JsonValue,
state_key: Option<&str>,
) -> PduEvent {
PduEvent {
event_id: event_id.try_into().unwrap(),
room_id: Some(room_id.try_into().unwrap()),
sender: sender.try_into().unwrap(),
origin_server_ts: UInt::try_from(1_234_567_890_u64).unwrap(),
kind: event_type,
content: to_raw_value(&content).unwrap(),
state_key: state_key.map(Into::into),
prev_events: vec![],
depth: UInt::from(1_u32),
auth_events: vec![],
redacts: None,
unsigned: None,
hashes: EventHash { sha256: "test_hash".to_owned() },
signatures: None,
origin: None,
}
}
/// Test that a valid replacement event passes validation
#[tokio::test]
async fn test_valid_replacement_event() {
let original = create_test_event(
"$original:example.com",
"!room:example.com",
"@user:example.com",
TimelineEventType::RoomMessage,
&json!({"msgtype": "m.text", "body": "original message"}),
None,
);
let replacement = create_test_event(
"$replacement:example.com",
"!room:example.com",
"@user:example.com",
TimelineEventType::RoomMessage,
&json!({
"msgtype": "m.text",
"body": "* edited message",
"m.new_content": {
"msgtype": "m.text",
"body": "edited message"
},
"m.relates_to": {
"rel_type": "m.replace",
"event_id": "$original:example.com"
}
}),
None,
);
let result =
super::super::Service::is_valid_replacement_event(&original, &replacement).await;
assert!(result.is_ok(), "Validation should succeed");
assert!(result.unwrap(), "Valid replacement event should be accepted");
}
/// Test replacement event with different room ID is rejected
#[tokio::test]
async fn test_replacement_event_different_room() {
let original = create_test_event(
"$original:example.com",
"!room1:example.com",
"@user:example.com",
TimelineEventType::RoomMessage,
&json!({"msgtype": "m.text", "body": "original message"}),
None,
);
let replacement = create_test_event(
"$replacement:example.com",
"!room2:example.com", // Different room
"@user:example.com",
TimelineEventType::RoomMessage,
&json!({
"msgtype": "m.text",
"body": "* edited message",
"m.new_content": {
"msgtype": "m.text",
"body": "edited message"
}
}),
None,
);
let result =
super::super::Service::is_valid_replacement_event(&original, &replacement).await;
assert!(result.is_ok(), "Validation should succeed");
assert!(!result.unwrap(), "Different room ID should be rejected");
}
/// Test replacement event with different sender is rejected
#[tokio::test]
async fn test_replacement_event_different_sender() {
let original = create_test_event(
"$original:example.com",
"!room:example.com",
"@user1:example.com",
TimelineEventType::RoomMessage,
&json!({"msgtype": "m.text", "body": "original message"}),
None,
);
let replacement = create_test_event(
"$replacement:example.com",
"!room:example.com",
"@user2:example.com", // Different sender
TimelineEventType::RoomMessage,
&json!({
"msgtype": "m.text",
"body": "* edited message",
"m.new_content": {
"msgtype": "m.text",
"body": "edited message"
}
}),
None,
);
let result =
super::super::Service::is_valid_replacement_event(&original, &replacement).await;
assert!(result.is_ok(), "Validation should succeed");
assert!(!result.unwrap(), "Different sender should be rejected");
}
/// Test replacement event with different type is rejected
#[tokio::test]
async fn test_replacement_event_different_type() {
let original = create_test_event(
"$original:example.com",
"!room:example.com",
"@user:example.com",
TimelineEventType::RoomMessage,
&json!({"msgtype": "m.text", "body": "original message"}),
None,
);
let replacement = create_test_event(
"$replacement:example.com",
"!room:example.com",
"@user:example.com",
TimelineEventType::RoomTopic, // Different event type
&json!({
"topic": "new topic",
"m.new_content": {
"topic": "new topic"
}
}),
None,
);
let result =
super::super::Service::is_valid_replacement_event(&original, &replacement).await;
assert!(result.is_ok(), "Validation should succeed");
assert!(!result.unwrap(), "Different event type should be rejected");
}
/// Test replacement event with state key is rejected
#[tokio::test]
async fn test_replacement_event_with_state_key() {
let original = create_test_event(
"$original:example.com",
"!room:example.com",
"@user:example.com",
TimelineEventType::RoomName,
&json!({"name": "room name"}),
Some(""), // Has state key
);
let replacement = create_test_event(
"$replacement:example.com",
"!room:example.com",
"@user:example.com",
TimelineEventType::RoomName,
&json!({
"name": "new room name",
"m.new_content": {
"name": "new room name"
}
}),
None,
);
let result =
super::super::Service::is_valid_replacement_event(&original, &replacement).await;
assert!(result.is_ok(), "Validation should succeed");
assert!(!result.unwrap(), "Event with state key should be rejected");
}
/// Test replacement of an event that is already a replacement is rejected
#[tokio::test]
async fn test_replacement_event_original_is_replacement() {
let original = create_test_event(
"$original:example.com",
"!room:example.com",
"@user:example.com",
TimelineEventType::RoomMessage,
&json!({
"msgtype": "m.text",
"body": "* edited message",
"m.relates_to": {
"rel_type": "m.replace", // Original is already a replacement
"event_id": "$some_other:example.com"
}
}),
None,
);
let replacement = create_test_event(
"$replacement:example.com",
"!room:example.com",
"@user:example.com",
TimelineEventType::RoomMessage,
&json!({
"msgtype": "m.text",
"body": "* edited again",
"m.new_content": {
"msgtype": "m.text",
"body": "edited again"
}
}),
None,
);
let result =
super::super::Service::is_valid_replacement_event(&original, &replacement).await;
assert!(result.is_ok(), "Validation should succeed");
assert!(!result.unwrap(), "Replacement of replacement should be rejected");
}
/// Test replacement event missing m.new_content is rejected
#[tokio::test]
async fn test_replacement_event_missing_new_content() {
let original = create_test_event(
"$original:example.com",
"!room:example.com",
"@user:example.com",
TimelineEventType::RoomMessage,
&json!({"msgtype": "m.text", "body": "original message"}),
None,
);
let replacement = create_test_event(
"$replacement:example.com",
"!room:example.com",
"@user:example.com",
TimelineEventType::RoomMessage,
&json!({
"msgtype": "m.text",
"body": "* edited message"
// Missing m.new_content
}),
None,
);
let result =
super::super::Service::is_valid_replacement_event(&original, &replacement).await;
assert!(result.is_ok(), "Validation should succeed");
assert!(!result.unwrap(), "Missing m.new_content should be rejected");
}
/// Test encrypted replacement event without m.new_content is accepted
#[tokio::test]
async fn test_replacement_event_encrypted_missing_new_content_is_valid() {
let original = create_test_event(
"$original:example.com",
"!room:example.com",
"@user:example.com",
TimelineEventType::RoomEncrypted,
&json!({
"algorithm": "m.megolm.v1.aes-sha2",
"ciphertext": "encrypted_payload_base64",
"sender_key": "sender_key",
"session_id": "session_id"
}),
None,
);
let replacement = create_test_event(
"$replacement:example.com",
"!room:example.com",
"@user:example.com",
TimelineEventType::RoomEncrypted,
&json!({
"algorithm": "m.megolm.v1.aes-sha2",
"ciphertext": "encrypted_replacement_payload_base64",
"sender_key": "sender_key",
"session_id": "session_id",
"m.relates_to": {
"rel_type": "m.replace",
"event_id": "$original:example.com"
}
// No m.new_content in cleartext - this is valid for encrypted events
}),
None,
);
let result =
super::super::Service::is_valid_replacement_event(&original, &replacement).await;
assert!(result.is_ok(), "Validation should succeed");
assert!(
result.unwrap(),
"Encrypted replacement without cleartext m.new_content should be accepted"
);
}
}
+7 -5
View File
@@ -3,6 +3,7 @@ use std::{mem::size_of, sync::Arc};
use conduwuit::{
arrayvec::ArrayVec,
matrix::{Event, PduCount},
result::LogErr,
utils::{
ReadyExt,
stream::{TryIgnore, WidebandExt},
@@ -14,11 +15,10 @@ use futures::{Stream, StreamExt};
use ruma::{EventId, RoomId, UserId, api::Direction};
use crate::{
Dep,
Dep, rooms,
rooms::{
self,
short::{ShortEventId, ShortRoomId},
timeline::{PduId, PdusIterItem, RawPduId},
timeline::{PduId, RawPduId},
},
};
@@ -60,7 +60,7 @@ impl Data {
target: ShortEventId,
from: PduCount,
dir: Direction,
) -> impl Stream<Item = PdusIterItem> + Send + 'a {
) -> impl Stream<Item = (PduCount, impl Event)> + Send + 'a {
// Query from exact position then filter excludes it (saturating_inc could skip
// events at min/max boundaries)
let from_unsigned = from.into_unsigned();
@@ -92,7 +92,9 @@ impl Data {
let mut pdu = self.services.timeline.get_pdu_from_id(&pdu_id).await.ok()?;
pdu.as_mut_pdu().set_unsigned(Some(user_id));
if pdu.sender() != user_id {
pdu.as_mut_pdu().remove_transaction_id().log_err().ok();
}
Some((shorteventid, pdu))
})
+6 -10
View File
@@ -1,16 +1,15 @@
mod bundled_aggregations;
mod data;
use std::sync::Arc;
use conduwuit::{Result, matrix::PduCount};
use conduwuit::{
Result,
matrix::{Event, PduCount},
};
use futures::{StreamExt, future::try_join};
use ruma::{EventId, RoomId, UserId, api::Direction};
use self::data::Data;
use crate::{
Dep,
rooms::{self, timeline::PdusIterItem},
};
use crate::{Dep, rooms};
pub struct Service {
services: Services,
@@ -20,7 +19,6 @@ pub struct Service {
struct Services {
short: Dep<rooms::short::Service>,
timeline: Dep<rooms::timeline::Service>,
state_accessor: Dep<rooms::state_accessor::Service>,
}
impl crate::Service for Service {
@@ -29,8 +27,6 @@ impl crate::Service for Service {
services: Services {
short: args.depend::<rooms::short::Service>("rooms::short"),
timeline: args.depend::<rooms::timeline::Service>("rooms::timeline"),
state_accessor: args
.depend::<rooms::state_accessor::Service>("rooms::state_accessor"),
},
db: Data::new(&args),
}))
@@ -60,7 +56,7 @@ impl Service {
limit: usize,
max_depth: u8,
dir: Direction,
) -> Vec<PdusIterItem> {
) -> Vec<(PduCount, impl Event)> {
let room_id = self.services.short.get_shortroomid(room_id);
let target = self.services.timeline.get_pdu_count(target);
+4 -23
View File
@@ -1,9 +1,9 @@
use std::sync::Arc;
use conduwuit::{
PduCount, PduEvent, Result,
PduCount, Result,
arrayvec::ArrayVec,
debug_warn, implement,
implement,
matrix::event::{Event, Matches},
utils::{
ArrayVecExt, IterStream, ReadyExt, set,
@@ -35,7 +35,6 @@ struct Services {
short: Dep<rooms::short::Service>,
state_accessor: Dep<rooms::state_accessor::Service>,
timeline: Dep<rooms::timeline::Service>,
pdu_metadata: Dep<rooms::pdu_metadata::Service>,
}
#[derive(Clone, Debug)]
@@ -62,7 +61,6 @@ impl crate::Service for Service {
state_accessor: args
.depend::<rooms::state_accessor::Service>("rooms::state_accessor"),
timeline: args.depend::<rooms::timeline::Service>("rooms::timeline"),
pdu_metadata: args.depend::<rooms::pdu_metadata::Service>("rooms::pdu_metadata"),
},
}))
}
@@ -106,8 +104,7 @@ pub fn deindex_pdu(&self, shortroomid: ShortRoomId, pdu_id: &RawPduId, message_b
pub async fn search_pdus<'a>(
&'a self,
query: &'a RoomQuery<'a>,
sender_user: &'a UserId,
) -> Result<(usize, impl Stream<Item = PduEvent> + Send + 'a)> {
) -> Result<(usize, impl Stream<Item = impl Event + use<>> + Send + 'a)> {
let pdu_ids: Vec<_> = self.search_pdu_ids(query).await?.collect().await;
let filter = &query.criteria.filter;
@@ -132,23 +129,7 @@ pub async fn search_pdus<'a>(
.then_some(pdu)
})
.skip(query.skip)
.take(query.limit)
.map(move |mut pdu| {
pdu.set_unsigned(query.user_id);
pdu
})
.then(async move |mut pdu| {
if let Err(e) = self
.services
.pdu_metadata
.add_bundled_aggregations_to_pdu(sender_user, &mut pdu)
.await
{
debug_warn!("Failed to add bundled aggregations: {e}");
}
pdu
});
.take(query.limit);
Ok((count, pdus))
}
-2
View File
@@ -264,8 +264,6 @@ fn get_space_child_events<'a>(
if content.via.is_empty() {
return None;
}
} else {
return None;
}
if RoomId::parse(&state_key).is_err() {
+7 -16
View File
@@ -19,7 +19,8 @@ use futures::{
use ruma::{
EventId, OwnedEventId, OwnedRoomId, RoomId, RoomVersionId, UserId,
events::{
AnyStateEvent, StateEventType, TimelineEventType, room::create::RoomCreateEventContent,
AnyStrippedStateEvent, StateEventType, TimelineEventType,
room::create::RoomCreateEventContent,
},
serde::Raw,
};
@@ -306,22 +307,12 @@ impl Service {
}
}
/// Get a summary of the room state for invites and knock responses.
///
/// This used to return stripped state, but now returns complete events.
///
/// Returns:
///
/// - m.room.create
/// - m.room.join_rules
/// - m.room.canonical_alias
/// - m.room.name
/// - m.room.avatar
/// - m.room.member (of the event sender)
/// - m.room.encryption
/// - m.room.topic
#[tracing::instrument(skip_all, level = "debug")]
pub async fn summary<'a, E>(&self, event: &'a E, room_id: &RoomId) -> Vec<Raw<AnyStateEvent>>
pub async fn summary_stripped<'a, E>(
&self,
event: &'a E,
room_id: &RoomId,
) -> Vec<Raw<AnyStrippedStateEvent>>
where
E: Event + Send + Sync,
&'a E: Event + Send,
+3 -3
View File
@@ -1,10 +1,10 @@
use std::collections::HashSet;
use conduwuit::{Err, Event, Pdu, Result, implement, is_not_empty, utils::ReadyExt};
use conduwuit::{Err, Event, Pdu, Result, implement, is_not_empty, utils::ReadyExt, warn};
use database::{Json, serialize_key};
use futures::StreamExt;
use ruma::{
CanonicalJsonValue, OwnedServerName, RoomId, UserId,
OwnedServerName, RoomId, UserId,
events::{
AnyStrippedStateEvent, GlobalAccountDataEventType, RoomAccountDataEventType,
StateEventType,
@@ -334,7 +334,7 @@ pub async fn mark_as_invited(
user_id: &UserId,
room_id: &RoomId,
sender_user: &UserId,
last_state: Option<Vec<CanonicalJsonValue>>,
last_state: Option<Vec<Raw<AnyStrippedStateEvent>>>,
invite_via: Option<Vec<OwnedServerName>>,
) -> Result<()> {
// return an error for blocked invites. ignored invites aren't handled here
+3 -1
View File
@@ -163,7 +163,9 @@ impl Service {
let mut pdu = self.services.timeline.get_pdu_from_id(&pdu_id).await.ok()?;
let pdu_id: PduId = pdu_id.into();
pdu.as_mut_pdu().set_unsigned(Some(user_id));
if pdu.sender() != user_id {
pdu.as_mut_pdu().remove_transaction_id().ok();
}
Some((pdu_id.shorteventid, pdu))
});
-4
View File
@@ -347,10 +347,6 @@ where
| _ => {},
}
// CONCERN: If we receive events with a relation out-of-order, we never write
// their relation / thread. We need some kind of way to trigger when we receive
// this event, and potentially a way to rebuild the table entirely.
if let Ok(content) = pdu.get_content::<ExtractRelatesToEventId>() {
if let Ok(related_pducount) = self.get_pdu_count(&content.relates_to.event_id).await {
self.services
+15 -44
View File
@@ -23,40 +23,6 @@ use serde_json::value::{RawValue, to_raw_value};
use super::RoomMutexGuard;
pub fn pdu_fits(owned_obj: &CanonicalJsonObject) -> bool {
// room IDs, event IDs, senders, types, and state keys must all be <= 255 bytes
if let Some(CanonicalJsonValue::String(room_id)) = owned_obj.get("room_id") {
if room_id.len() > 255 {
return false;
}
}
if let Some(CanonicalJsonValue::String(event_id)) = owned_obj.get("event_id") {
if event_id.len() > 255 {
return false;
}
}
if let Some(CanonicalJsonValue::String(sender)) = owned_obj.get("sender") {
if sender.len() > 255 {
return false;
}
}
if let Some(CanonicalJsonValue::String(kind)) = owned_obj.get("type") {
if kind.len() > 255 {
return false;
}
}
if let Some(CanonicalJsonValue::String(state_key)) = owned_obj.get("state_key") {
if state_key.len() > 255 {
return false;
}
}
// Now check the full PDU size
match serde_json::to_string(owned_obj) {
| Ok(s) => s.len() <= 65535,
| Err(_) => false,
}
}
#[implement(super::Service)]
pub async fn create_hash_and_sign_event(
&self,
@@ -182,6 +148,19 @@ pub async fn create_hash_and_sign_event(
}
}
// if event_type != TimelineEventType::RoomCreate && prev_events.is_empty() {
// return Err!(Request(Unknown("Event incorrectly had zero prev_events.")));
// }
// if state_key.is_none() && depth.lt(&uint!(2)) {
// // The first two events in a room are always m.room.create and
// m.room.member, // so any other events with that same depth are illegal.
// warn!(
// "Had unsafe depth {depth} when creating non-state event in {}. Cowardly
// aborting", room_id.expect("room_id is Some here").as_str()
// );
// return Err!(Request(Unknown("Unsafe depth for non-state event.")));
// }
let mut pdu = PduEvent {
event_id: ruma::event_id!("$thiswillbefilledinlater").into(),
room_id: room_id.map(ToOwned::to_owned),
@@ -290,16 +269,8 @@ pub async fn create_hash_and_sign_event(
}
// Generate event id
pdu.event_id = gen_event_id(&pdu_json, &room_version_id)?;
pdu_json.insert("event_id".into(), CanonicalJsonValue::String(pdu.event_id.clone().into()));
// Verify that the *full* PDU isn't over 64KiB.
// Ruma only validates that it's under 64KiB before signing and hashing.
// Has to be cloned to prevent mutating pdu_json itself :(
if !pdu_fits(&mut pdu_json.clone()) {
// feckin huge PDU mate
return Err!(Request(TooLarge("Message/PDU is too long (exceeds 65535 bytes)")));
}
// Check with the policy server
pdu_json.insert("event_id".into(), CanonicalJsonValue::String(pdu.event_id.clone().into()));
if room_id.is_some() {
trace!(
"Checking event in room {} with policy server",
@@ -308,7 +279,7 @@ pub async fn create_hash_and_sign_event(
match self
.services
.event_handler
.ask_policy_server(&pdu, &mut pdu_json, pdu.room_id().expect("has room ID"), false)
.ask_policy_server(&pdu, &pdu_json, pdu.room_id().expect("has room ID"))
.await
{
| Ok(true) => {},
+27 -11
View File
@@ -1,13 +1,13 @@
use std::sync::Arc;
use std::{borrow::Borrow, sync::Arc};
use conduwuit::{
Err, PduCount, PduEvent, Result, at, err,
result::NotFound,
result::{LogErr, NotFound},
utils::{self, stream::TryReadyExt},
};
use database::{Database, Deserialized, Json, KeyVal, Map};
use futures::{FutureExt, Stream, TryFutureExt, TryStreamExt, future::select_ok, pin_mut};
use ruma::{CanonicalJsonObject, EventId, OwnedUserId, RoomId, api::Direction};
use ruma::{CanonicalJsonObject, EventId, OwnedUserId, RoomId, UserId, api::Direction};
use super::{PduId, RawPduId};
use crate::{Dep, rooms, rooms::short::ShortRoomId};
@@ -45,8 +45,12 @@ impl Data {
}
#[inline]
pub(super) async fn last_timeline_count(&self, room_id: &RoomId) -> Result<PduCount> {
let pdus_rev = self.pdus_rev(room_id, PduCount::max());
pub(super) async fn last_timeline_count(
&self,
sender_user: Option<&UserId>,
room_id: &RoomId,
) -> Result<PduCount> {
let pdus_rev = self.pdus_rev(sender_user, room_id, PduCount::max());
pin_mut!(pdus_rev);
let last_count = pdus_rev
@@ -60,8 +64,12 @@ impl Data {
}
#[inline]
pub(super) async fn latest_pdu_in_room(&self, room_id: &RoomId) -> Result<PduEvent> {
let pdus_rev = self.pdus_rev(room_id, PduCount::max());
pub(super) async fn latest_pdu_in_room(
&self,
sender_user: Option<&UserId>,
room_id: &RoomId,
) -> Result<PduEvent> {
let pdus_rev = self.pdus_rev(sender_user, room_id, PduCount::max());
pin_mut!(pdus_rev);
pdus_rev
@@ -213,6 +221,7 @@ impl Data {
/// order.
pub(super) fn pdus_rev<'a>(
&'a self,
user_id: Option<&'a UserId>,
room_id: &'a RoomId,
until: PduCount,
) -> impl Stream<Item = Result<PdusIterItem>> + Send + 'a {
@@ -222,13 +231,14 @@ impl Data {
self.pduid_pdu
.rev_raw_stream_from(&current)
.ready_try_take_while(move |(key, _)| Ok(key.starts_with(&prefix)))
.ready_and_then(Self::from_json_slice)
.ready_and_then(move |item| Self::each_pdu(item, user_id))
})
.try_flatten_stream()
}
pub(super) fn pdus<'a>(
&'a self,
user_id: Option<&'a UserId>,
room_id: &'a RoomId,
from: PduCount,
) -> impl Stream<Item = Result<PdusIterItem>> + Send + 'a {
@@ -238,15 +248,21 @@ impl Data {
self.pduid_pdu
.raw_stream_from(&current)
.ready_try_take_while(move |(key, _)| Ok(key.starts_with(&prefix)))
.ready_and_then(Self::from_json_slice)
.ready_and_then(move |item| Self::each_pdu(item, user_id))
})
.try_flatten_stream()
}
fn from_json_slice((pdu_id, pdu): KeyVal<'_>) -> Result<PdusIterItem> {
fn each_pdu((pdu_id, pdu): KeyVal<'_>, user_id: Option<&UserId>) -> Result<PdusIterItem> {
let pdu_id: RawPduId = pdu_id.into();
let pdu = serde_json::from_slice::<PduEvent>(pdu)?;
let mut pdu = serde_json::from_slice::<PduEvent>(pdu)?;
if Some(pdu.sender.borrow()) != user_id {
pdu.remove_transaction_id().log_err().ok();
}
pdu.add_age().log_err().ok();
Ok((pdu_id.pdu_count(), pdu))
}
+17 -9
View File
@@ -20,13 +20,13 @@ use conduwuit_core::{
};
use futures::{Future, Stream, TryStreamExt, pin_mut};
use ruma::{
CanonicalJsonObject, EventId, OwnedEventId, OwnedRoomId, RoomId,
CanonicalJsonObject, EventId, OwnedEventId, OwnedRoomId, RoomId, UserId,
events::room::encrypted::Relation,
};
use serde::Deserialize;
use self::data::Data;
pub use self::{create::pdu_fits, data::PdusIterItem};
pub use self::data::PdusIterItem;
use crate::{
Dep, account_data, admin, appservice, globals, pusher, rooms, sending, server_keys, users,
};
@@ -138,7 +138,7 @@ impl Service {
#[tracing::instrument(skip(self), level = "debug")]
pub async fn first_item_in_room(&self, room_id: &RoomId) -> Result<(PduCount, impl Event)> {
let pdus = self.pdus(room_id, None);
let pdus = self.pdus(None, room_id, None);
pin_mut!(pdus);
pdus.try_next()
@@ -148,12 +148,16 @@ impl Service {
#[tracing::instrument(skip(self), level = "debug")]
pub async fn latest_pdu_in_room(&self, room_id: &RoomId) -> Result<impl Event> {
self.db.latest_pdu_in_room(room_id).await
self.db.latest_pdu_in_room(None, room_id).await
}
#[tracing::instrument(skip(self), level = "debug")]
pub async fn last_timeline_count(&self, room_id: &RoomId) -> Result<PduCount> {
self.db.last_timeline_count(room_id).await
pub async fn last_timeline_count(
&self,
sender_user: Option<&UserId>,
room_id: &RoomId,
) -> Result<PduCount> {
self.db.last_timeline_count(sender_user, room_id).await
}
/// Returns the `count` of this pdu's id.
@@ -231,29 +235,33 @@ impl Service {
#[inline]
pub fn all_pdus<'a>(
&'a self,
user_id: &'a UserId,
room_id: &'a RoomId,
) -> impl Stream<Item = PdusIterItem> + Send + 'a {
self.pdus(room_id, None).ignore_err()
self.pdus(Some(user_id), room_id, None).ignore_err()
}
/// Reverse iteration starting after `until`.
#[tracing::instrument(skip(self), level = "debug")]
pub fn pdus_rev<'a>(
&'a self,
user_id: Option<&'a UserId>,
room_id: &'a RoomId,
until: Option<PduCount>,
) -> impl Stream<Item = Result<PdusIterItem>> + Send + 'a {
self.db
.pdus_rev(room_id, until.unwrap_or_else(PduCount::max))
.pdus_rev(user_id, room_id, until.unwrap_or_else(PduCount::max))
}
/// Forward iteration starting after `from`.
#[tracing::instrument(skip(self), level = "debug")]
pub fn pdus<'a>(
&'a self,
user_id: Option<&'a UserId>,
room_id: &'a RoomId,
from: Option<PduCount>,
) -> impl Stream<Item = Result<PdusIterItem>> + Send + 'a {
self.db.pdus(room_id, from.unwrap_or_else(PduCount::min))
self.db
.pdus(user_id, room_id, from.unwrap_or_else(PduCount::min))
}
}
+1 -1
View File
@@ -781,7 +781,7 @@ impl Service {
for pdu in pdus {
// Redacted events are not notification targets (we don't send push for them)
if pdu.is_redacted() {
if pdu.contains_unsigned_property("redacted_because", serde_json::Value::is_string) {
continue;
}
+10 -33
View File
@@ -11,7 +11,7 @@ use database::{Deserialized, Json, Map};
use ruma::{
CanonicalJsonValue, DeviceId, OwnedDeviceId, OwnedUserId, UserId,
api::client::{
error::{ErrorKind, StandardErrorBody},
error::ErrorKind,
uiaa::{AuthData, AuthType, Password, UiaaInfo, UserIdentifier},
},
};
@@ -104,7 +104,6 @@ pub fn create(
}
#[implement(Service)]
#[allow(clippy::useless_let_if_seq)]
pub async fn try_auth(
&self,
user_id: &UserId,
@@ -164,39 +163,17 @@ pub async fn try_auth(
let user_id = user_id_from_username;
// Check if password is correct
let mut password_verified = false;
// First try local password hash verification
if let Ok(hash) = self.services.users.password_hash(&user_id).await {
password_verified = hash::verify_password(password, &hash).is_ok();
}
// If local password verification failed, try LDAP authentication
#[cfg(feature = "ldap")]
if !password_verified && self.services.config.ldap.enable {
// Search for user in LDAP to get their DN
if let Ok(dns) = self.services.users.search_ldap(&user_id).await {
if let Some((user_dn, _is_admin)) = dns.first() {
// Try to authenticate with LDAP
password_verified = self
.services
.users
.auth_ldap(user_dn, password)
.await
.is_ok();
}
let hash_matches = hash::verify_password(password, &hash).is_ok();
if !hash_matches {
uiaainfo.auth_error = Some(ruma::api::client::error::StandardErrorBody {
kind: ErrorKind::forbidden(),
message: "Invalid username or password.".to_owned(),
});
return Ok((false, uiaainfo));
}
}
if !password_verified {
uiaainfo.auth_error = Some(StandardErrorBody {
kind: ErrorKind::forbidden(),
message: "Invalid username or password.".to_owned(),
});
return Ok((false, uiaainfo));
}
// Password was correct! Let's add it to `completed`
uiaainfo.completed.push(AuthType::Password);
},
@@ -220,7 +197,7 @@ pub async fn try_auth(
},
| Err(e) => {
error!("ReCaptcha verification failed: {e:?}");
uiaainfo.auth_error = Some(StandardErrorBody {
uiaainfo.auth_error = Some(ruma::api::client::error::StandardErrorBody {
kind: ErrorKind::forbidden(),
message: "ReCaptcha verification failed.".to_owned(),
});
@@ -233,7 +210,7 @@ pub async fn try_auth(
if tokens.contains(t.token.trim()) {
uiaainfo.completed.push(AuthType::RegistrationToken);
} else {
uiaainfo.auth_error = Some(StandardErrorBody {
uiaainfo.auth_error = Some(ruma::api::client::error::StandardErrorBody {
kind: ErrorKind::forbidden(),
message: "Invalid registration token.".to_owned(),
});
+1 -41
View File
@@ -1,6 +1,6 @@
#[cfg(feature = "ldap")]
use std::collections::HashMap;
use std::{collections::BTreeMap, mem, net::IpAddr, sync::Arc};
use std::{collections::BTreeMap, mem, sync::Arc};
#[cfg(feature = "ldap")]
use conduwuit::result::LogErr;
@@ -25,7 +25,6 @@ use ruma::{
invite_permission_config::{FilterLevel, InvitePermissionConfigEvent},
},
serde::Raw,
uint,
};
use serde::{Deserialize, Serialize};
use serde_json::json;
@@ -981,7 +980,6 @@ impl Service {
.await;
}
/// Updates device metadata and increments the device list version.
pub async fn update_device_metadata(
&self,
user_id: &UserId,
@@ -989,51 +987,13 @@ impl Service {
device: &Device,
) -> Result<()> {
increment(&self.db.userid_devicelistversion, user_id.as_bytes());
self.update_device_metadata_no_increment(user_id, device_id, device)
.await
}
// Updates device metadata without incrementing the device list version.
// This is namely used for updating the last_seen_ip and last_seen_ts values,
// as those do not need a device list version bump due to them not being
// relevant to other consumers.
pub async fn update_device_metadata_no_increment(
&self,
user_id: &UserId,
device_id: &DeviceId,
device: &Device,
) -> Result<()> {
let key = (user_id, device_id);
self.db.userdeviceid_metadata.put(key, Json(device));
Ok(())
}
pub async fn update_device_last_seen(
&self,
user_id: &UserId,
device_id: Option<&DeviceId>,
ip: IpAddr,
) {
let now = MilliSecondsSinceUnixEpoch::now();
if let Some(device_id) = device_id {
if let Ok(mut device) = self.get_device_metadata(user_id, device_id).await {
device.last_seen_ip = Some(ip.to_string());
// If the last update was less than 10 seconds ago, don't update the timestamp
if let Some(prev) = device.last_seen_ts {
if now.get().saturating_sub(prev.get()) < uint!(10_000) {
return;
}
}
device.last_seen_ts = Some(now);
self.update_device_metadata_no_increment(user_id, device_id, &device)
.await
.ok();
}
}
}
/// Get device metadata.
pub async fn get_device_metadata(
&self,