mirror of
https://forgejo.ellis.link/continuwuation/continuwuity.git
synced 2026-05-26 20:49:55 +00:00
Compare commits
68 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| b6e9dc3d98 | |||
| cfcd6eb1a6 | |||
| 88e7e50daf | |||
| 8345ea2cd3 | |||
| add2e0e9ee | |||
| 43e6c27bb7 | |||
| c7c9f0e4a6 | |||
| ef2d307c15 | |||
| f761d4d5c9 | |||
| 16b07ae3ec | |||
| 62d80b97e6 | |||
| fda8b36809 | |||
| f6dfc9538f | |||
| f80d85e107 | |||
| 9158edfb7c | |||
| 04656a7886 | |||
| 442bb9889c | |||
| 62180897c0 | |||
| 80277f6aa2 | |||
| d32534164c | |||
| b3271e0d65 | |||
| 106bcd30b7 | |||
| da4b94d80d | |||
| 32f990fc72 | |||
| 5e59ce37c4 | |||
| a774afe837 | |||
| ffe3b0faf2 | |||
| bd6d4bc58f | |||
| b4d22bd05e | |||
| 7ce782ddf4 | |||
| 4add39d0fe | |||
| ea49b60273 | |||
| 2fa9621f3a | |||
| 09bc71caab | |||
| 6983798487 | |||
| a4ef04cd14 | |||
| 4e0cedbe51 | |||
| 4ff1155bf0 | |||
| e161e5dd61 | |||
| f698254c41 | |||
| 69837671bb | |||
| ff8bbd4cfa | |||
| 1a8482b3b4 | |||
| 31c2968bb2 | |||
| 3c8376d897 | |||
| 50acfe7832 | |||
| eb7d893c86 | |||
| 936161d89e | |||
| 329925c661 | |||
| af399fd517 | |||
| ad0b0af955 | |||
| 2c5af902a3 | |||
| 2f449ba47d | |||
| a567e314e9 | |||
| ed3cd99781 | |||
| 99fe88c21e | |||
| ffd0fd4242 | |||
| b2a565b0b4 | |||
| c516a8df3e | |||
| 94d786ac12 | |||
| 677316631a | |||
| 2b730a30ad | |||
| 98f9570547 | |||
| 13335042b7 | |||
| 6db8df5e23 | |||
| d0b4a619af | |||
| 4a2d0d35bc | |||
| 3e0ff2dc84 |
@@ -0,0 +1,87 @@
|
||||
# taken from https://github.com/gitattributes/gitattributes/blob/46a8961ad73f5bd4d8d193708840fbc9e851d702/Rust.gitattributes
|
||||
# Auto detect text files and perform normalization
|
||||
* text=auto
|
||||
|
||||
*.rs text diff=rust
|
||||
*.toml text diff=toml
|
||||
Cargo.lock text
|
||||
|
||||
# taken from https://github.com/gitattributes/gitattributes/blob/46a8961ad73f5bd4d8d193708840fbc9e851d702/Common.gitattributes
|
||||
# Documents
|
||||
*.bibtex text diff=bibtex
|
||||
*.doc diff=astextplain
|
||||
*.DOC diff=astextplain
|
||||
*.docx diff=astextplain
|
||||
*.DOCX diff=astextplain
|
||||
*.dot diff=astextplain
|
||||
*.DOT diff=astextplain
|
||||
*.pdf diff=astextplain
|
||||
*.PDF diff=astextplain
|
||||
*.rtf diff=astextplain
|
||||
*.RTF diff=astextplain
|
||||
*.md text diff=markdown
|
||||
*.mdx text diff=markdown
|
||||
*.tex text diff=tex
|
||||
*.adoc text
|
||||
*.textile text
|
||||
*.mustache text
|
||||
*.csv text eol=crlf
|
||||
*.tab text
|
||||
*.tsv text
|
||||
*.txt text
|
||||
*.sql text
|
||||
*.epub diff=astextplain
|
||||
|
||||
# Graphics
|
||||
*.png binary
|
||||
*.jpg binary
|
||||
*.jpeg binary
|
||||
*.gif binary
|
||||
*.tif binary
|
||||
*.tiff binary
|
||||
*.ico binary
|
||||
# SVG treated as text by default.
|
||||
*.svg text
|
||||
*.eps binary
|
||||
|
||||
# Scripts
|
||||
*.bash text eol=lf
|
||||
*.fish text eol=lf
|
||||
*.ksh text eol=lf
|
||||
*.sh text eol=lf
|
||||
*.zsh text eol=lf
|
||||
# These are explicitly windows files and should use crlf
|
||||
*.bat text eol=crlf
|
||||
*.cmd text eol=crlf
|
||||
*.ps1 text eol=crlf
|
||||
|
||||
# Serialisation
|
||||
*.json text
|
||||
*.toml text
|
||||
*.xml text
|
||||
*.yaml text
|
||||
*.yml text
|
||||
|
||||
# Archives
|
||||
*.7z binary
|
||||
*.bz binary
|
||||
*.bz2 binary
|
||||
*.bzip2 binary
|
||||
*.gz binary
|
||||
*.lz binary
|
||||
*.lzma binary
|
||||
*.rar binary
|
||||
*.tar binary
|
||||
*.taz binary
|
||||
*.tbz binary
|
||||
*.tbz2 binary
|
||||
*.tgz binary
|
||||
*.tlz binary
|
||||
*.txz binary
|
||||
*.xz binary
|
||||
*.Z binary
|
||||
*.zip binary
|
||||
*.zst binary
|
||||
|
||||
# Text files where line endings should be preserved
|
||||
*.patch -text
|
||||
@@ -128,7 +128,7 @@ jobs:
|
||||
- name: Restore and cache Nix store
|
||||
# we want a fresh-state when we do releases/tags to avoid potential cache poisoning attacks impacting
|
||||
# releases and tags
|
||||
if: ${{ !startsWith(github.ref, 'refs/tags/') }}
|
||||
#if: ${{ !startsWith(github.ref, 'refs/tags/') }}
|
||||
uses: nix-community/cache-nix-action@v5.1.0
|
||||
with:
|
||||
# restore and save a cache using this key
|
||||
@@ -191,14 +191,14 @@ jobs:
|
||||
- name: Run sccache-cache
|
||||
# we want a fresh-state when we do releases/tags to avoid potential cache poisoning attacks impacting
|
||||
# releases and tags
|
||||
if: ${{ (env.SCCACHE_GHA_ENABLED == 'true') && !startsWith(github.ref, 'refs/tags/') }}
|
||||
#if: ${{ (env.SCCACHE_GHA_ENABLED == 'true') && !startsWith(github.ref, 'refs/tags/') }}
|
||||
uses: mozilla-actions/sccache-action@main
|
||||
|
||||
# use rust-cache
|
||||
- uses: Swatinem/rust-cache@v2
|
||||
# we want a fresh-state when we do releases/tags to avoid potential cache poisoning attacks impacting
|
||||
# releases and tags
|
||||
if: ${{ !startsWith(github.ref, 'refs/tags/') }}
|
||||
#if: ${{ !startsWith(github.ref, 'refs/tags/') }}
|
||||
with:
|
||||
cache-all-crates: "true"
|
||||
cache-on-failure: "true"
|
||||
@@ -323,7 +323,7 @@ jobs:
|
||||
- name: Restore and cache Nix store
|
||||
# we want a fresh-state when we do releases/tags to avoid potential cache poisoning attacks impacting
|
||||
# releases and tags
|
||||
if: ${{ !startsWith(github.ref, 'refs/tags/') }}
|
||||
#if: ${{ !startsWith(github.ref, 'refs/tags/') }}
|
||||
uses: nix-community/cache-nix-action@v5.1.0
|
||||
with:
|
||||
# restore and save a cache using this key
|
||||
@@ -379,14 +379,14 @@ jobs:
|
||||
- name: Run sccache-cache
|
||||
# we want a fresh-state when we do releases/tags to avoid potential cache poisoning attacks impacting
|
||||
# releases and tags
|
||||
if: ${{ (env.SCCACHE_GHA_ENABLED == 'true') && !startsWith(github.ref, 'refs/tags/') }}
|
||||
#if: ${{ (env.SCCACHE_GHA_ENABLED == 'true') && !startsWith(github.ref, 'refs/tags/') }}
|
||||
uses: mozilla-actions/sccache-action@main
|
||||
|
||||
# use rust-cache
|
||||
- uses: Swatinem/rust-cache@v2
|
||||
# we want a fresh-state when we do releases/tags to avoid potential cache poisoning attacks impacting
|
||||
# releases and tags
|
||||
if: ${{ !startsWith(github.ref, 'refs/tags/') }}
|
||||
#if: ${{ !startsWith(github.ref, 'refs/tags/') }}
|
||||
with:
|
||||
cache-all-crates: "true"
|
||||
cache-on-failure: "true"
|
||||
@@ -679,7 +679,7 @@ jobs:
|
||||
- name: Run sccache-cache
|
||||
# we want a fresh-state when we do releases/tags to avoid potential cache poisoning attacks impacting
|
||||
# releases and tags
|
||||
if: ${{ (env.SCCACHE_GHA_ENABLED == 'true') && !startsWith(github.ref, 'refs/tags/') }}
|
||||
#if: ${{ (env.SCCACHE_GHA_ENABLED == 'true') && !startsWith(github.ref, 'refs/tags/') }}
|
||||
uses: mozilla-actions/sccache-action@main
|
||||
|
||||
# use rust-cache
|
||||
|
||||
Generated
+386
-19
@@ -26,6 +26,12 @@ dependencies = [
|
||||
"memchr",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "aligned-vec"
|
||||
version = "0.5.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "4aa90d7ce82d4be67b64039a3d588d38dbcc6736577de4a847025ce5b0c468d1"
|
||||
|
||||
[[package]]
|
||||
name = "alloc-no-stdlib"
|
||||
version = "2.0.4"
|
||||
@@ -53,12 +59,29 @@ version = "1.0.95"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "34ac096ce696dc2fcabef30516bb13c0a68a11d30131d3df6f04711467681b04"
|
||||
|
||||
[[package]]
|
||||
name = "arbitrary"
|
||||
version = "1.4.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "dde20b3d026af13f561bdd0f15edf01fc734f0dafcedbaf42bba506a9517f223"
|
||||
|
||||
[[package]]
|
||||
name = "arc-swap"
|
||||
version = "1.7.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "69f7f8c3906b62b754cd5326047894316021dcfe5a194c8ea52bdd94934a3457"
|
||||
|
||||
[[package]]
|
||||
name = "arg_enum_proc_macro"
|
||||
version = "0.3.4"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "0ae92a5119aa49cdbcf6b9f893fe4e1d98b04ccbf82ee0584ad948a44a734dea"
|
||||
dependencies = [
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
"syn 2.0.96",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "argon2"
|
||||
version = "0.5.3"
|
||||
@@ -173,6 +196,29 @@ version = "1.4.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "ace50bade8e6234aa140d9a2f552bbee1db4d353f69b8217bc503490fc1a9f26"
|
||||
|
||||
[[package]]
|
||||
name = "av1-grain"
|
||||
version = "0.2.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "6678909d8c5d46a42abcf571271e15fdbc0a225e3646cf23762cd415046c78bf"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"arrayvec",
|
||||
"log",
|
||||
"nom",
|
||||
"num-rational",
|
||||
"v_frame",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "avif-serialize"
|
||||
version = "0.8.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "e335041290c43101ca215eed6f43ec437eb5a42125573f600fc3fa42b9bddd62"
|
||||
dependencies = [
|
||||
"arrayvec",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "aws-lc-rs"
|
||||
version = "1.12.1"
|
||||
@@ -385,6 +431,12 @@ dependencies = [
|
||||
"which",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "bit_field"
|
||||
version = "0.10.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "dc827186963e592360843fb5ba4b973e145841266c1357f7180c43526f2e5b61"
|
||||
|
||||
[[package]]
|
||||
name = "bitflags"
|
||||
version = "1.3.2"
|
||||
@@ -397,6 +449,12 @@ version = "2.8.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "8f68f53c83ab957f72c32642f3868eec03eb974d1fb82e453128456482613d36"
|
||||
|
||||
[[package]]
|
||||
name = "bitstream-io"
|
||||
version = "2.6.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "6099cdc01846bc367c4e7dd630dc5966dccf36b652fae7a74e17b640411a91b2"
|
||||
|
||||
[[package]]
|
||||
name = "blake2"
|
||||
version = "0.10.6"
|
||||
@@ -415,6 +473,15 @@ dependencies = [
|
||||
"generic-array",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "blurhash"
|
||||
version = "0.2.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "e79769241dcd44edf79a732545e8b5cec84c247ac060f5252cd51885d093a8fc"
|
||||
dependencies = [
|
||||
"image",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "brotli"
|
||||
version = "7.0.0"
|
||||
@@ -436,6 +503,12 @@ dependencies = [
|
||||
"alloc-stdlib",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "built"
|
||||
version = "0.7.5"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "c360505aed52b7ec96a3636c3f039d99103c37d1d9b4f7a8c743d3ea9ffcd03b"
|
||||
|
||||
[[package]]
|
||||
name = "bumpalo"
|
||||
version = "3.16.0"
|
||||
@@ -513,6 +586,16 @@ dependencies = [
|
||||
"nom",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "cfg-expr"
|
||||
version = "0.15.8"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "d067ad48b8650848b989a59a86c6c36a995d02d2bf778d45c3c5d57bc2718f02"
|
||||
dependencies = [
|
||||
"smallvec",
|
||||
"target-lexicon",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "cfg-if"
|
||||
version = "1.0.0"
|
||||
@@ -685,6 +768,7 @@ dependencies = [
|
||||
"http-body-util",
|
||||
"hyper",
|
||||
"ipaddress",
|
||||
"itertools 0.13.0",
|
||||
"log",
|
||||
"rand",
|
||||
"reqwest",
|
||||
@@ -821,6 +905,7 @@ dependencies = [
|
||||
"arrayvec",
|
||||
"async-trait",
|
||||
"base64 0.22.1",
|
||||
"blurhash",
|
||||
"bytes",
|
||||
"conduwuit_core",
|
||||
"conduwuit_database",
|
||||
@@ -844,6 +929,7 @@ dependencies = [
|
||||
"serde_json",
|
||||
"serde_yaml",
|
||||
"sha2",
|
||||
"smallvec",
|
||||
"termimad",
|
||||
"tokio",
|
||||
"tracing",
|
||||
@@ -1069,6 +1155,12 @@ dependencies = [
|
||||
"winapi",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "crunchy"
|
||||
version = "0.2.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "43da5946c66ffcc7745f48db692ffbb10a83bfe0afd96235c5c2a4fb23994929"
|
||||
|
||||
[[package]]
|
||||
name = "crypto-common"
|
||||
version = "0.1.6"
|
||||
@@ -1250,7 +1342,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "33d852cb9b869c2a9b3df2f71a3074817f01e1844f839a144f5fcef059a4eb5d"
|
||||
dependencies = [
|
||||
"libc",
|
||||
"windows-sys 0.52.0",
|
||||
"windows-sys 0.59.0",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@@ -1273,6 +1365,21 @@ dependencies = [
|
||||
"pin-project-lite",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "exr"
|
||||
version = "1.73.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "f83197f59927b46c04a183a619b7c29df34e63e63c7869320862268c0ef687e0"
|
||||
dependencies = [
|
||||
"bit_field",
|
||||
"half",
|
||||
"lebe",
|
||||
"miniz_oxide",
|
||||
"rayon-core",
|
||||
"smallvec",
|
||||
"zune-inflate",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "fdeflate"
|
||||
version = "0.3.7"
|
||||
@@ -1517,6 +1624,16 @@ dependencies = [
|
||||
"tracing",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "half"
|
||||
version = "2.4.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "6dd08c532ae367adf81c312a4580bc67f1d0fe8bc9c460520283f4c0ff277888"
|
||||
dependencies = [
|
||||
"cfg-if",
|
||||
"crunchy",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "hardened_malloc-rs"
|
||||
version = "0.1.2+12"
|
||||
@@ -1971,10 +2088,16 @@ dependencies = [
|
||||
"bytemuck",
|
||||
"byteorder-lite",
|
||||
"color_quant",
|
||||
"exr",
|
||||
"gif",
|
||||
"image-webp",
|
||||
"num-traits",
|
||||
"png",
|
||||
"qoi",
|
||||
"ravif",
|
||||
"rayon",
|
||||
"rgb",
|
||||
"tiff",
|
||||
"zune-core",
|
||||
"zune-jpeg",
|
||||
]
|
||||
@@ -1989,6 +2112,12 @@ dependencies = [
|
||||
"quick-error 2.0.1",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "imgref"
|
||||
version = "1.11.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "d0263a3d970d5c054ed9312c0057b4f3bde9c0b33836d3637361d4a9e6e7a408"
|
||||
|
||||
[[package]]
|
||||
name = "indexmap"
|
||||
version = "1.9.3"
|
||||
@@ -2022,6 +2151,17 @@ version = "3.0.4"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "8bb03732005da905c88227371639bf1ad885cc712789c011c31c5fb3ab3ccf02"
|
||||
|
||||
[[package]]
|
||||
name = "interpolate_name"
|
||||
version = "0.2.4"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "c34819042dc3d3971c46c2190835914dfbe0c3c13f61449b2997f4e9722dfa60"
|
||||
dependencies = [
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
"syn 2.0.96",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "ipaddress"
|
||||
version = "0.1.3"
|
||||
@@ -2087,6 +2227,12 @@ dependencies = [
|
||||
"libc",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "jpeg-decoder"
|
||||
version = "0.3.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "f5d4a7da358eff58addd2877a45865158f0d78c911d43a5784ceb7bbf52833b0"
|
||||
|
||||
[[package]]
|
||||
name = "js-sys"
|
||||
version = "0.3.77"
|
||||
@@ -2170,12 +2316,28 @@ version = "1.3.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "830d08ce1d1d941e6b30645f1a0eb5643013d835ce3779a5fc208261dbe10f55"
|
||||
|
||||
[[package]]
|
||||
name = "lebe"
|
||||
version = "0.5.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "03087c2bad5e1034e8cace5926dec053fb3790248370865f5117a7d0213354c8"
|
||||
|
||||
[[package]]
|
||||
name = "libc"
|
||||
version = "0.2.169"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "b5aba8db14291edd000dfcc4d620c7ebfb122c613afb886ca8803fa4e128a20a"
|
||||
|
||||
[[package]]
|
||||
name = "libfuzzer-sys"
|
||||
version = "0.4.9"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "cf78f52d400cf2d84a3a973a78a592b4adc535739e0a5597a0da6f0c357adc75"
|
||||
dependencies = [
|
||||
"arbitrary",
|
||||
"cc",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "libloading"
|
||||
version = "0.8.6"
|
||||
@@ -2183,7 +2345,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "fc2f4eb4bc735547cfed7c0a4922cbd04a4655978c09b54f1f7b228750664c34"
|
||||
dependencies = [
|
||||
"cfg-if",
|
||||
"windows-targets 0.48.5",
|
||||
"windows-targets 0.52.6",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@@ -2241,6 +2403,15 @@ dependencies = [
|
||||
"futures-sink",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "loop9"
|
||||
version = "0.1.5"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "0fae87c125b03c1d2c0150c90365d7d6bcc53fb73a9acaef207d2d065860f062"
|
||||
dependencies = [
|
||||
"imgref",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "lru-cache"
|
||||
version = "0.1.2"
|
||||
@@ -2319,6 +2490,16 @@ version = "0.7.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "0e7465ac9959cc2b1404e8e2367b43684a6d13790fe23056cc8c6c5a6b7bcb94"
|
||||
|
||||
[[package]]
|
||||
name = "maybe-rayon"
|
||||
version = "0.1.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "8ea1f30cedd69f0a2954655f7188c6a834246d2bcf1e315e2ac40c4b24dc9519"
|
||||
dependencies = [
|
||||
"cfg-if",
|
||||
"rayon",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "memchr"
|
||||
version = "2.7.4"
|
||||
@@ -2432,6 +2613,12 @@ version = "0.7.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "e9e591e719385e6ebaeb5ce5d3887f7d5676fceca6411d1925ccc95745f3d6f7"
|
||||
|
||||
[[package]]
|
||||
name = "noop_proc_macro"
|
||||
version = "0.3.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "0676bb32a98c1a483ce53e500a81ad9c3d5b3f7c920c28c24e9cb0980d0b5bc8"
|
||||
|
||||
[[package]]
|
||||
name = "nu-ansi-term"
|
||||
version = "0.46.0"
|
||||
@@ -2481,6 +2668,17 @@ version = "0.1.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "51d515d32fb182ee37cda2ccdcb92950d6a3c2893aa280e540671c2cd0f3b1d9"
|
||||
|
||||
[[package]]
|
||||
name = "num-derive"
|
||||
version = "0.4.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "ed3955f1a9c7c0c15e092f9c887db08b1fc683305fdf6eb6684f22555355e202"
|
||||
dependencies = [
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
"syn 2.0.96",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "num-integer"
|
||||
version = "0.1.46"
|
||||
@@ -2905,6 +3103,25 @@ dependencies = [
|
||||
"yansi",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "profiling"
|
||||
version = "1.0.16"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "afbdc74edc00b6f6a218ca6a5364d6226a259d4b8ea1af4a0ea063f27e179f4d"
|
||||
dependencies = [
|
||||
"profiling-procmacros",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "profiling-procmacros"
|
||||
version = "1.0.16"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "a65f2e60fbf1063868558d69c6beacf412dc755f9fc020f514b7955fc914fe30"
|
||||
dependencies = [
|
||||
"quote",
|
||||
"syn 2.0.96",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "prost"
|
||||
version = "0.13.4"
|
||||
@@ -2955,6 +3172,15 @@ version = "0.11.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "007d8adb5ddab6f8e3f491ac63566a7d5002cc7ed73901f72057943fa71ae1ae"
|
||||
|
||||
[[package]]
|
||||
name = "qoi"
|
||||
version = "0.4.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "7f6d64c71eb498fe9eae14ce4ec935c555749aef511cca85b5568910d6e48001"
|
||||
dependencies = [
|
||||
"bytemuck",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "quick-error"
|
||||
version = "1.2.3"
|
||||
@@ -3016,7 +3242,7 @@ dependencies = [
|
||||
"once_cell",
|
||||
"socket2",
|
||||
"tracing",
|
||||
"windows-sys 0.52.0",
|
||||
"windows-sys 0.59.0",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@@ -3058,6 +3284,76 @@ dependencies = [
|
||||
"getrandom",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "rav1e"
|
||||
version = "0.7.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "cd87ce80a7665b1cce111f8a16c1f3929f6547ce91ade6addf4ec86a8dda5ce9"
|
||||
dependencies = [
|
||||
"arbitrary",
|
||||
"arg_enum_proc_macro",
|
||||
"arrayvec",
|
||||
"av1-grain",
|
||||
"bitstream-io",
|
||||
"built",
|
||||
"cfg-if",
|
||||
"interpolate_name",
|
||||
"itertools 0.12.1",
|
||||
"libc",
|
||||
"libfuzzer-sys",
|
||||
"log",
|
||||
"maybe-rayon",
|
||||
"new_debug_unreachable",
|
||||
"noop_proc_macro",
|
||||
"num-derive",
|
||||
"num-traits",
|
||||
"once_cell",
|
||||
"paste",
|
||||
"profiling",
|
||||
"rand",
|
||||
"rand_chacha",
|
||||
"simd_helpers",
|
||||
"system-deps",
|
||||
"thiserror 1.0.69",
|
||||
"v_frame",
|
||||
"wasm-bindgen",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "ravif"
|
||||
version = "0.11.11"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "2413fd96bd0ea5cdeeb37eaf446a22e6ed7b981d792828721e74ded1980a45c6"
|
||||
dependencies = [
|
||||
"avif-serialize",
|
||||
"imgref",
|
||||
"loop9",
|
||||
"quick-error 2.0.1",
|
||||
"rav1e",
|
||||
"rayon",
|
||||
"rgb",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "rayon"
|
||||
version = "1.10.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "b418a60154510ca1a002a752ca9714984e21e4241e804d32555251faf8b78ffa"
|
||||
dependencies = [
|
||||
"either",
|
||||
"rayon-core",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "rayon-core"
|
||||
version = "1.12.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "1465873a3dfdaa8ae7cb14b4383657caab0b3e8a0aa9ae8e04b044854c8dfce2"
|
||||
dependencies = [
|
||||
"crossbeam-deque",
|
||||
"crossbeam-utils",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "redox_syscall"
|
||||
version = "0.5.8"
|
||||
@@ -3170,6 +3466,12 @@ dependencies = [
|
||||
"quick-error 1.2.3",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "rgb"
|
||||
version = "0.8.50"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "57397d16646700483b67d2dd6511d79318f9d057fdbd21a4066aeac8b41d310a"
|
||||
|
||||
[[package]]
|
||||
name = "ring"
|
||||
version = "0.17.8"
|
||||
@@ -3188,7 +3490,7 @@ dependencies = [
|
||||
[[package]]
|
||||
name = "ruma"
|
||||
version = "0.10.1"
|
||||
source = "git+https://github.com/girlbossceo/ruwuma?rev=b560338b2a50dbf61ecfe80808b9b095ad4cec00#b560338b2a50dbf61ecfe80808b9b095ad4cec00"
|
||||
source = "git+https://github.com/girlbossceo/ruwuma?rev=f5667c6292adb43fbe4725d31d6b5127a0cf60ce#f5667c6292adb43fbe4725d31d6b5127a0cf60ce"
|
||||
dependencies = [
|
||||
"assign",
|
||||
"js_int",
|
||||
@@ -3210,7 +3512,7 @@ dependencies = [
|
||||
[[package]]
|
||||
name = "ruma-appservice-api"
|
||||
version = "0.10.0"
|
||||
source = "git+https://github.com/girlbossceo/ruwuma?rev=b560338b2a50dbf61ecfe80808b9b095ad4cec00#b560338b2a50dbf61ecfe80808b9b095ad4cec00"
|
||||
source = "git+https://github.com/girlbossceo/ruwuma?rev=f5667c6292adb43fbe4725d31d6b5127a0cf60ce#f5667c6292adb43fbe4725d31d6b5127a0cf60ce"
|
||||
dependencies = [
|
||||
"js_int",
|
||||
"ruma-common",
|
||||
@@ -3222,7 +3524,7 @@ dependencies = [
|
||||
[[package]]
|
||||
name = "ruma-client-api"
|
||||
version = "0.18.0"
|
||||
source = "git+https://github.com/girlbossceo/ruwuma?rev=b560338b2a50dbf61ecfe80808b9b095ad4cec00#b560338b2a50dbf61ecfe80808b9b095ad4cec00"
|
||||
source = "git+https://github.com/girlbossceo/ruwuma?rev=f5667c6292adb43fbe4725d31d6b5127a0cf60ce#f5667c6292adb43fbe4725d31d6b5127a0cf60ce"
|
||||
dependencies = [
|
||||
"as_variant",
|
||||
"assign",
|
||||
@@ -3245,7 +3547,7 @@ dependencies = [
|
||||
[[package]]
|
||||
name = "ruma-common"
|
||||
version = "0.13.0"
|
||||
source = "git+https://github.com/girlbossceo/ruwuma?rev=b560338b2a50dbf61ecfe80808b9b095ad4cec00#b560338b2a50dbf61ecfe80808b9b095ad4cec00"
|
||||
source = "git+https://github.com/girlbossceo/ruwuma?rev=f5667c6292adb43fbe4725d31d6b5127a0cf60ce#f5667c6292adb43fbe4725d31d6b5127a0cf60ce"
|
||||
dependencies = [
|
||||
"as_variant",
|
||||
"base64 0.22.1",
|
||||
@@ -3276,7 +3578,7 @@ dependencies = [
|
||||
[[package]]
|
||||
name = "ruma-events"
|
||||
version = "0.28.1"
|
||||
source = "git+https://github.com/girlbossceo/ruwuma?rev=b560338b2a50dbf61ecfe80808b9b095ad4cec00#b560338b2a50dbf61ecfe80808b9b095ad4cec00"
|
||||
source = "git+https://github.com/girlbossceo/ruwuma?rev=f5667c6292adb43fbe4725d31d6b5127a0cf60ce#f5667c6292adb43fbe4725d31d6b5127a0cf60ce"
|
||||
dependencies = [
|
||||
"as_variant",
|
||||
"indexmap 2.7.0",
|
||||
@@ -3301,7 +3603,7 @@ dependencies = [
|
||||
[[package]]
|
||||
name = "ruma-federation-api"
|
||||
version = "0.9.0"
|
||||
source = "git+https://github.com/girlbossceo/ruwuma?rev=b560338b2a50dbf61ecfe80808b9b095ad4cec00#b560338b2a50dbf61ecfe80808b9b095ad4cec00"
|
||||
source = "git+https://github.com/girlbossceo/ruwuma?rev=f5667c6292adb43fbe4725d31d6b5127a0cf60ce#f5667c6292adb43fbe4725d31d6b5127a0cf60ce"
|
||||
dependencies = [
|
||||
"bytes",
|
||||
"http",
|
||||
@@ -3319,7 +3621,7 @@ dependencies = [
|
||||
[[package]]
|
||||
name = "ruma-identifiers-validation"
|
||||
version = "0.9.5"
|
||||
source = "git+https://github.com/girlbossceo/ruwuma?rev=b560338b2a50dbf61ecfe80808b9b095ad4cec00#b560338b2a50dbf61ecfe80808b9b095ad4cec00"
|
||||
source = "git+https://github.com/girlbossceo/ruwuma?rev=f5667c6292adb43fbe4725d31d6b5127a0cf60ce#f5667c6292adb43fbe4725d31d6b5127a0cf60ce"
|
||||
dependencies = [
|
||||
"js_int",
|
||||
"thiserror 2.0.11",
|
||||
@@ -3328,7 +3630,7 @@ dependencies = [
|
||||
[[package]]
|
||||
name = "ruma-identity-service-api"
|
||||
version = "0.9.0"
|
||||
source = "git+https://github.com/girlbossceo/ruwuma?rev=b560338b2a50dbf61ecfe80808b9b095ad4cec00#b560338b2a50dbf61ecfe80808b9b095ad4cec00"
|
||||
source = "git+https://github.com/girlbossceo/ruwuma?rev=f5667c6292adb43fbe4725d31d6b5127a0cf60ce#f5667c6292adb43fbe4725d31d6b5127a0cf60ce"
|
||||
dependencies = [
|
||||
"js_int",
|
||||
"ruma-common",
|
||||
@@ -3338,7 +3640,7 @@ dependencies = [
|
||||
[[package]]
|
||||
name = "ruma-macros"
|
||||
version = "0.13.0"
|
||||
source = "git+https://github.com/girlbossceo/ruwuma?rev=b560338b2a50dbf61ecfe80808b9b095ad4cec00#b560338b2a50dbf61ecfe80808b9b095ad4cec00"
|
||||
source = "git+https://github.com/girlbossceo/ruwuma?rev=f5667c6292adb43fbe4725d31d6b5127a0cf60ce#f5667c6292adb43fbe4725d31d6b5127a0cf60ce"
|
||||
dependencies = [
|
||||
"cfg-if",
|
||||
"proc-macro-crate",
|
||||
@@ -3353,7 +3655,7 @@ dependencies = [
|
||||
[[package]]
|
||||
name = "ruma-push-gateway-api"
|
||||
version = "0.9.0"
|
||||
source = "git+https://github.com/girlbossceo/ruwuma?rev=b560338b2a50dbf61ecfe80808b9b095ad4cec00#b560338b2a50dbf61ecfe80808b9b095ad4cec00"
|
||||
source = "git+https://github.com/girlbossceo/ruwuma?rev=f5667c6292adb43fbe4725d31d6b5127a0cf60ce#f5667c6292adb43fbe4725d31d6b5127a0cf60ce"
|
||||
dependencies = [
|
||||
"js_int",
|
||||
"ruma-common",
|
||||
@@ -3365,7 +3667,7 @@ dependencies = [
|
||||
[[package]]
|
||||
name = "ruma-server-util"
|
||||
version = "0.3.0"
|
||||
source = "git+https://github.com/girlbossceo/ruwuma?rev=b560338b2a50dbf61ecfe80808b9b095ad4cec00#b560338b2a50dbf61ecfe80808b9b095ad4cec00"
|
||||
source = "git+https://github.com/girlbossceo/ruwuma?rev=f5667c6292adb43fbe4725d31d6b5127a0cf60ce#f5667c6292adb43fbe4725d31d6b5127a0cf60ce"
|
||||
dependencies = [
|
||||
"headers",
|
||||
"http",
|
||||
@@ -3378,7 +3680,7 @@ dependencies = [
|
||||
[[package]]
|
||||
name = "ruma-signatures"
|
||||
version = "0.15.0"
|
||||
source = "git+https://github.com/girlbossceo/ruwuma?rev=b560338b2a50dbf61ecfe80808b9b095ad4cec00#b560338b2a50dbf61ecfe80808b9b095ad4cec00"
|
||||
source = "git+https://github.com/girlbossceo/ruwuma?rev=f5667c6292adb43fbe4725d31d6b5127a0cf60ce#f5667c6292adb43fbe4725d31d6b5127a0cf60ce"
|
||||
dependencies = [
|
||||
"base64 0.22.1",
|
||||
"ed25519-dalek",
|
||||
@@ -3394,7 +3696,7 @@ dependencies = [
|
||||
[[package]]
|
||||
name = "ruma-state-res"
|
||||
version = "0.11.0"
|
||||
source = "git+https://github.com/girlbossceo/ruwuma?rev=b560338b2a50dbf61ecfe80808b9b095ad4cec00#b560338b2a50dbf61ecfe80808b9b095ad4cec00"
|
||||
source = "git+https://github.com/girlbossceo/ruwuma?rev=f5667c6292adb43fbe4725d31d6b5127a0cf60ce#f5667c6292adb43fbe4725d31d6b5127a0cf60ce"
|
||||
dependencies = [
|
||||
"futures-util",
|
||||
"js_int",
|
||||
@@ -3409,7 +3711,7 @@ dependencies = [
|
||||
[[package]]
|
||||
name = "rust-librocksdb-sys"
|
||||
version = "0.32.0+9.10.0"
|
||||
source = "git+https://github.com/girlbossceo/rust-rocksdb-zaidoon1?rev=1f032427d3a0e7b0f13c04b4e34712bd8610291b#1f032427d3a0e7b0f13c04b4e34712bd8610291b"
|
||||
source = "git+https://github.com/girlbossceo/rust-rocksdb-zaidoon1?rev=7b0e1bbe395a41ba8a11347a4921da590e3ad0d9#7b0e1bbe395a41ba8a11347a4921da590e3ad0d9"
|
||||
dependencies = [
|
||||
"bindgen",
|
||||
"bzip2-sys",
|
||||
@@ -3426,7 +3728,7 @@ dependencies = [
|
||||
[[package]]
|
||||
name = "rust-rocksdb"
|
||||
version = "0.36.0"
|
||||
source = "git+https://github.com/girlbossceo/rust-rocksdb-zaidoon1?rev=1f032427d3a0e7b0f13c04b4e34712bd8610291b#1f032427d3a0e7b0f13c04b4e34712bd8610291b"
|
||||
source = "git+https://github.com/girlbossceo/rust-rocksdb-zaidoon1?rev=7b0e1bbe395a41ba8a11347a4921da590e3ad0d9#7b0e1bbe395a41ba8a11347a4921da590e3ad0d9"
|
||||
dependencies = [
|
||||
"libc",
|
||||
"rust-librocksdb-sys",
|
||||
@@ -3477,7 +3779,7 @@ dependencies = [
|
||||
"errno",
|
||||
"libc",
|
||||
"linux-raw-sys",
|
||||
"windows-sys 0.52.0",
|
||||
"windows-sys 0.59.0",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@@ -3943,6 +4245,15 @@ version = "0.3.7"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "d66dc143e6b11c1eddc06d5c423cfc97062865baf299914ab64caa38182078fe"
|
||||
|
||||
[[package]]
|
||||
name = "simd_helpers"
|
||||
version = "0.1.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "95890f873bec569a0362c235787f3aca6e1e887302ba4840839bcc6459c42da6"
|
||||
dependencies = [
|
||||
"quote",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "siphasher"
|
||||
version = "0.3.11"
|
||||
@@ -4094,6 +4405,25 @@ dependencies = [
|
||||
"syn 2.0.96",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "system-deps"
|
||||
version = "6.2.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "a3e535eb8dded36d55ec13eddacd30dec501792ff23a0b1682c38601b8cf2349"
|
||||
dependencies = [
|
||||
"cfg-expr",
|
||||
"heck",
|
||||
"pkg-config",
|
||||
"toml",
|
||||
"version-compare",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "target-lexicon"
|
||||
version = "0.12.16"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "61c41af27dd6d1e27b1b16b489db798443478cef1f06a660c96db617ba5de3b1"
|
||||
|
||||
[[package]]
|
||||
name = "tendril"
|
||||
version = "0.4.3"
|
||||
@@ -4203,6 +4533,17 @@ dependencies = [
|
||||
"threadpool",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "tiff"
|
||||
version = "0.9.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "ba1310fcea54c6a9a4fd1aad794ecc02c31682f6bfbecdf460bf19533eed1e3e"
|
||||
dependencies = [
|
||||
"flate2",
|
||||
"jpeg-decoder",
|
||||
"weezl",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "tikv-jemalloc-ctl"
|
||||
version = "0.6.0"
|
||||
@@ -4742,6 +5083,17 @@ dependencies = [
|
||||
"serde",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "v_frame"
|
||||
version = "0.3.8"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "d6f32aaa24bacd11e488aa9ba66369c7cd514885742c9fe08cfe85884db3e92b"
|
||||
dependencies = [
|
||||
"aligned-vec",
|
||||
"num-traits",
|
||||
"wasm-bindgen",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "valuable"
|
||||
version = "0.1.1"
|
||||
@@ -4754,6 +5106,12 @@ version = "0.2.15"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "accd4ea62f7bb7a82fe23066fb0957d48ef677f6eeb8215f372f52e48bb32426"
|
||||
|
||||
[[package]]
|
||||
name = "version-compare"
|
||||
version = "0.2.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "852e951cb7832cb45cb1169900d19760cfa39b82bc0ea9c0e5a14ae88411c98b"
|
||||
|
||||
[[package]]
|
||||
name = "version_check"
|
||||
version = "0.9.5"
|
||||
@@ -5322,6 +5680,15 @@ version = "0.4.12"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "3f423a2c17029964870cfaabb1f13dfab7d092a62a29a89264f4d36990ca414a"
|
||||
|
||||
[[package]]
|
||||
name = "zune-inflate"
|
||||
version = "0.2.54"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "73ab332fe2f6680068f3582b16a24f90ad7096d5d39b974d1c0aff0125116f02"
|
||||
dependencies = [
|
||||
"simd-adler32",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "zune-jpeg"
|
||||
version = "0.4.14"
|
||||
|
||||
+12
-3
@@ -127,12 +127,13 @@ version = "0.6.2"
|
||||
default-features = false
|
||||
features = [
|
||||
"add-extension",
|
||||
"catch-panic",
|
||||
"cors",
|
||||
"sensitive-headers",
|
||||
"set-header",
|
||||
"timeout",
|
||||
"trace",
|
||||
"util",
|
||||
"catch-panic",
|
||||
]
|
||||
|
||||
[workspace.dependencies.rustls]
|
||||
@@ -178,7 +179,7 @@ version = "0.5.3"
|
||||
features = ["alloc", "rand"]
|
||||
default-features = false
|
||||
|
||||
# Used to generate thumbnails for images
|
||||
# Used to generate thumbnails for images & blurhashes
|
||||
[workspace.dependencies.image]
|
||||
version = "0.25.5"
|
||||
default-features = false
|
||||
@@ -189,6 +190,14 @@ features = [
|
||||
"webp",
|
||||
]
|
||||
|
||||
[workspace.dependencies.blurhash]
|
||||
version = "0.2.3"
|
||||
default-features = false
|
||||
features = [
|
||||
"fast-linear-to-srgb",
|
||||
"image",
|
||||
]
|
||||
|
||||
# logging
|
||||
[workspace.dependencies.log]
|
||||
version = "0.4.22"
|
||||
@@ -333,7 +342,7 @@ version = "0.1.2"
|
||||
[workspace.dependencies.ruma]
|
||||
git = "https://github.com/girlbossceo/ruwuma"
|
||||
#branch = "conduwuit-changes"
|
||||
rev = "b560338b2a50dbf61ecfe80808b9b095ad4cec00"
|
||||
rev = "f5667c6292adb43fbe4725d31d6b5127a0cf60ce"
|
||||
features = [
|
||||
"compat",
|
||||
"rand",
|
||||
|
||||
+14
-1
@@ -7,7 +7,20 @@ RequiresMountsFor=/var/lib/private/conduwuit
|
||||
|
||||
[Service]
|
||||
DynamicUser=yes
|
||||
Type=notify
|
||||
Type=notify-reload
|
||||
ReloadSignal=SIGUSR1
|
||||
|
||||
TTYPath=/dev/tty25
|
||||
DeviceAllow=char-tty
|
||||
StandardInput=tty-force
|
||||
StandardOutput=tty
|
||||
StandardError=journal+console
|
||||
TTYReset=yes
|
||||
# uncomment to allow buffer to be cleared every restart
|
||||
TTYVTDisallocate=no
|
||||
|
||||
TTYColumns=120
|
||||
TTYRows=40
|
||||
|
||||
AmbientCapabilities=
|
||||
CapabilityBoundingSet=
|
||||
|
||||
@@ -36,7 +36,6 @@ pushd "$toplevel" > /dev/null
|
||||
|
||||
#bin/nix-build-and-cache just .#linux-complement
|
||||
bin/nix-build-and-cache just .#complement
|
||||
#nom build .#complement
|
||||
|
||||
docker load < result
|
||||
popd > /dev/null
|
||||
|
||||
+60
-2
@@ -377,6 +377,26 @@
|
||||
#
|
||||
#pusher_idle_timeout = 15
|
||||
|
||||
# Maximum time to receive a request from a client (seconds).
|
||||
#
|
||||
#client_receive_timeout = 75
|
||||
|
||||
# Maximum time to process a request received from a client (seconds).
|
||||
#
|
||||
#client_request_timeout = 180
|
||||
|
||||
# Maximum time to transmit a response to a client (seconds)
|
||||
#
|
||||
#client_response_timeout = 120
|
||||
|
||||
# Grace period for clean shutdown of client requests (seconds).
|
||||
#
|
||||
#client_shutdown_timeout = 10
|
||||
|
||||
# Grace period for clean shutdown of federation requests (seconds).
|
||||
#
|
||||
#sender_shutdown_timeout = 5
|
||||
|
||||
# Enables registration. If set to false, no users can register on this
|
||||
# server.
|
||||
#
|
||||
@@ -406,8 +426,9 @@
|
||||
#
|
||||
#registration_token =
|
||||
|
||||
# Path to a file on the system that gets read for the registration token.
|
||||
# this config option takes precedence/priority over "registration_token".
|
||||
# Path to a file on the system that gets read for additional registration
|
||||
# tokens. Multiple tokens can be added if you separate them with
|
||||
# whitespace
|
||||
#
|
||||
# conduwuit must be able to access the file, and it must not be empty
|
||||
#
|
||||
@@ -897,6 +918,13 @@
|
||||
#
|
||||
#rocksdb_paranoid_file_checks = false
|
||||
|
||||
# Enables or disables checksum verification in rocksdb at runtime.
|
||||
# Checksums are usually hardware accelerated with low overhead; they are
|
||||
# enabled in rocksdb by default. Older or slower platforms may see gains
|
||||
# from disabling.
|
||||
#
|
||||
#rocksdb_checksums = true
|
||||
|
||||
# Database repair mode (for RocksDB SST corruption).
|
||||
#
|
||||
# Use this option when the server reports corruption while running or
|
||||
@@ -1355,6 +1383,13 @@
|
||||
#
|
||||
#admin_execute_errors_ignore = false
|
||||
|
||||
# List of admin commands to execute on SIGUSR2.
|
||||
#
|
||||
# Similar to admin_execute, but these commands are executed when the
|
||||
# server receives SIGUSR2 on supporting platforms.
|
||||
#
|
||||
#admin_signal_execute = []
|
||||
|
||||
# Controls the max log level for admin command log captures (logs
|
||||
# generated from running admin commands). Defaults to "info" on release
|
||||
# builds, else "debug" on debug builds.
|
||||
@@ -1517,6 +1552,11 @@
|
||||
#
|
||||
#listening = true
|
||||
|
||||
# Enables configuration reload when the server receives SIGUSR1 on
|
||||
# supporting platforms.
|
||||
#
|
||||
#config_reload_signal = true
|
||||
|
||||
[global.tls]
|
||||
|
||||
# Path to a valid TLS certificate file.
|
||||
@@ -1567,3 +1607,21 @@
|
||||
# This item is undocumented. Please contribute documentation for it.
|
||||
#
|
||||
#support_mxid =
|
||||
|
||||
[global.blurhashing]
|
||||
|
||||
# blurhashing x component, 4 is recommended by https://blurha.sh/
|
||||
#
|
||||
#components_x = 4
|
||||
|
||||
# blurhashing y component, 3 is recommended by https://blurha.sh/
|
||||
#
|
||||
#components_y = 3
|
||||
|
||||
# Max raw size that the server will blurhash, this is the size of the
|
||||
# image after converting it to raw data, it should be higher than the
|
||||
# upload limit but not too high. The higher it is the higher the
|
||||
# potential load will be for clients requesting blurhashes. The default
|
||||
# is 33.55MB. Setting it to 0 disables blurhashing.
|
||||
#
|
||||
#blurhash_max_raw_size = 33554432
|
||||
|
||||
Vendored
+14
-1
@@ -8,7 +8,20 @@ Documentation=https://conduwuit.puppyirl.gay/
|
||||
DynamicUser=yes
|
||||
User=conduwuit
|
||||
Group=conduwuit
|
||||
Type=notify
|
||||
Type=notify-reload
|
||||
ReloadSignal=SIGUSR1
|
||||
|
||||
TTYPath=/dev/tty25
|
||||
DeviceAllow=char-tty
|
||||
StandardInput=tty-force
|
||||
StandardOutput=tty
|
||||
StandardError=journal+console
|
||||
TTYReset=yes
|
||||
# uncomment to allow buffer to be cleared every restart
|
||||
TTYVTDisallocate=no
|
||||
|
||||
TTYColumns=120
|
||||
TTYRows=40
|
||||
|
||||
Environment="CONDUWUIT_CONFIG=/etc/conduwuit/conduwuit.toml"
|
||||
|
||||
|
||||
Vendored
+1
-1
@@ -27,7 +27,7 @@ malloc-usable-size = ["rust-rocksdb/malloc-usable-size"]
|
||||
|
||||
[dependencies.rust-rocksdb]
|
||||
git = "https://github.com/girlbossceo/rust-rocksdb-zaidoon1"
|
||||
rev = "1f032427d3a0e7b0f13c04b4e34712bd8610291b"
|
||||
rev = "7b0e1bbe395a41ba8a11347a4921da590e3ad0d9"
|
||||
#branch = "master"
|
||||
default-features = false
|
||||
|
||||
|
||||
+45
-18
@@ -86,6 +86,7 @@ env DIRENV_DEVSHELL=all-features \
|
||||
direnv exec . \
|
||||
cargo doc \
|
||||
--workspace \
|
||||
--locked \
|
||||
--profile test \
|
||||
--all-features \
|
||||
--no-deps \
|
||||
@@ -100,8 +101,8 @@ script = """
|
||||
direnv exec . \
|
||||
cargo clippy \
|
||||
--workspace \
|
||||
--locked \
|
||||
--profile test \
|
||||
--all-targets \
|
||||
--color=always \
|
||||
-- \
|
||||
-D warnings
|
||||
@@ -115,8 +116,8 @@ env DIRENV_DEVSHELL=all-features \
|
||||
direnv exec . \
|
||||
cargo clippy \
|
||||
--workspace \
|
||||
--locked \
|
||||
--profile test \
|
||||
--all-targets \
|
||||
--all-features \
|
||||
--color=always \
|
||||
-- \
|
||||
@@ -124,33 +125,37 @@ env DIRENV_DEVSHELL=all-features \
|
||||
"""
|
||||
|
||||
[[task]]
|
||||
name = "clippy/jemalloc"
|
||||
name = "clippy/no-features"
|
||||
group = "lints"
|
||||
script = """
|
||||
env DIRENV_DEVSHELL=no-features \
|
||||
direnv exec . \
|
||||
cargo clippy \
|
||||
--workspace \
|
||||
--locked \
|
||||
--profile test \
|
||||
--no-default-features \
|
||||
--color=always \
|
||||
-- \
|
||||
-D warnings
|
||||
"""
|
||||
|
||||
[[task]]
|
||||
name = "clippy/other-features"
|
||||
group = "lints"
|
||||
script = """
|
||||
direnv exec . \
|
||||
cargo clippy \
|
||||
--workspace \
|
||||
--locked \
|
||||
--profile test \
|
||||
--features jemalloc \
|
||||
--all-targets \
|
||||
--no-default-features \
|
||||
--features=console,systemd,element_hacks,direct_tls,perf_measurements,brotli_compression,blurhashing \
|
||||
--color=always \
|
||||
-- \
|
||||
-D warnings
|
||||
"""
|
||||
|
||||
#[[task]]
|
||||
#name = "clippy/hardened_malloc"
|
||||
#group = "lints"
|
||||
#script = """
|
||||
#cargo clippy \
|
||||
# --workspace \
|
||||
# --features hardened_malloc \
|
||||
# --all-targets \
|
||||
# --color=always \
|
||||
# -- \
|
||||
# -D warnings
|
||||
#"""
|
||||
|
||||
[[task]]
|
||||
name = "lychee"
|
||||
group = "lints"
|
||||
@@ -169,8 +174,10 @@ env DIRENV_DEVSHELL=all-features \
|
||||
direnv exec . \
|
||||
cargo test \
|
||||
--workspace \
|
||||
--locked \
|
||||
--profile test \
|
||||
--all-targets \
|
||||
--no-fail-fast \
|
||||
--all-features \
|
||||
--color=always \
|
||||
-- \
|
||||
@@ -185,8 +192,28 @@ env DIRENV_DEVSHELL=default \
|
||||
direnv exec . \
|
||||
cargo test \
|
||||
--workspace \
|
||||
--locked \
|
||||
--profile test \
|
||||
--all-targets \
|
||||
--no-fail-fast \
|
||||
--color=always \
|
||||
-- \
|
||||
--color=always
|
||||
"""
|
||||
|
||||
[[task]]
|
||||
name = "cargo/no-features"
|
||||
group = "tests"
|
||||
script = """
|
||||
env DIRENV_DEVSHELL=no-features \
|
||||
direnv exec . \
|
||||
cargo test \
|
||||
--workspace \
|
||||
--locked \
|
||||
--profile test \
|
||||
--all-targets \
|
||||
--no-fail-fast \
|
||||
--no-default-features \
|
||||
--color=always \
|
||||
-- \
|
||||
--color=always
|
||||
|
||||
@@ -170,20 +170,8 @@
|
||||
# used for rust caching in CI to speed it up
|
||||
sccache
|
||||
]
|
||||
# valgrind is unavailable in static contexts
|
||||
# used for CI and complement
|
||||
++ (if !stdenv.hostPlatform.isStatic then [ "valgrind" ] else [])
|
||||
# liburing is Linux-exclusive
|
||||
++ lib.optional stdenv.hostPlatform.isLinux liburing
|
||||
# needed to build Rust applications on macOS
|
||||
++ lib.optionals stdenv.hostPlatform.isDarwin [
|
||||
# https://github.com/NixOS/nixpkgs/issues/206242
|
||||
# ld: library not found for -liconv
|
||||
libiconv
|
||||
# https://stackoverflow.com/questions/69869574/properly-adding-darwin-apple-sdk-to-a-nix-shell
|
||||
# https://discourse.nixos.org/t/compile-a-rust-binary-on-macos-dbcrossbar/8612
|
||||
pkgsBuildHost.darwin.apple_sdk.frameworks.Security
|
||||
])
|
||||
++ lib.optional stdenv.hostPlatform.isLinux liburing)
|
||||
++ scope.main.buildInputs
|
||||
++ scope.main.propagatedBuildInputs
|
||||
++ scope.main.nativeBuildInputs;
|
||||
|
||||
@@ -22,6 +22,7 @@ prune_missing_media = true
|
||||
log_colors = false
|
||||
admin_room_notices = false
|
||||
allow_check_for_updates = false
|
||||
intentionally_unknown_config_option_for_testing = true
|
||||
rocksdb_log_level = "debug"
|
||||
rocksdb_max_log_files = 1
|
||||
rocksdb_recovery_mode = 0
|
||||
@@ -29,6 +30,7 @@ rocksdb_paranoid_file_checks = true
|
||||
log_guest_registrations = false
|
||||
allow_legacy_media = true
|
||||
startup_netburst = true
|
||||
startup_netburst_keep = -1
|
||||
|
||||
# valgrind makes things so slow
|
||||
dns_timeout = 60
|
||||
|
||||
@@ -9,30 +9,21 @@
|
||||
, openssl
|
||||
, stdenv
|
||||
, tini
|
||||
, valgrind
|
||||
, writeShellScriptBin
|
||||
}:
|
||||
|
||||
let
|
||||
main' = main.override {
|
||||
#profile = "test";
|
||||
profile = "release-debuginfo";
|
||||
profile = "test";
|
||||
all_features = true;
|
||||
disable_release_max_log_level = true;
|
||||
disable_features = [
|
||||
"jemalloc"
|
||||
"jemalloc_stats"
|
||||
"jemalloc_prof"
|
||||
"jemalloc_conf"
|
||||
"io_uring"
|
||||
# console/CLI stuff isn't used or relevant for complement
|
||||
"console"
|
||||
"tokio_console"
|
||||
# sentry telemetry isn't useful for complement, disabled by default anyways
|
||||
"sentry_telemetry"
|
||||
"perf_measurements"
|
||||
# the containers don't use or need systemd signal support
|
||||
#"systemd"
|
||||
# this is non-functional on nix for some reason
|
||||
"hardened_malloc"
|
||||
# dont include experimental features
|
||||
@@ -47,50 +38,8 @@ let
|
||||
"url_preview"
|
||||
];
|
||||
};
|
||||
# TODO: figure out why a suspicious amounnt of complement tests fail with valgrind only under complement.
|
||||
# maybe issue with direct TLS mode?
|
||||
#${lib.getExe' valgrind "valgrind"} \
|
||||
#--leak-check=no \
|
||||
#--undef-value-errors=no \
|
||||
#--exit-on-first-error=yes \
|
||||
#--error-exitcode=1 \
|
||||
|
||||
# valgrind only works in non-static ocntexts
|
||||
start = if !stdenv.hostPlatform.isStatic then writeShellScriptBin "start" ''
|
||||
set -euxo pipefail
|
||||
|
||||
${lib.getExe openssl} genrsa -out private_key.key 2048
|
||||
${lib.getExe openssl} req \
|
||||
-new \
|
||||
-sha256 \
|
||||
-key private_key.key \
|
||||
-subj "/C=US/ST=CA/O=MyOrg, Inc./CN=$SERVER_NAME" \
|
||||
-out signing_request.csr
|
||||
cp ${./v3.ext} v3.ext
|
||||
echo "DNS.1 = $SERVER_NAME" >> v3.ext
|
||||
echo "IP.1 = $(${lib.getExe gawk} 'END{print $1}' /etc/hosts)" \
|
||||
>> v3.ext
|
||||
${lib.getExe openssl} x509 \
|
||||
-req \
|
||||
-extfile v3.ext \
|
||||
-in signing_request.csr \
|
||||
-CA /complement/ca/ca.crt \
|
||||
-CAkey /complement/ca/ca.key \
|
||||
-CAcreateserial \
|
||||
-out certificate.crt \
|
||||
-days 1 \
|
||||
-sha256
|
||||
|
||||
${lib.getExe' coreutils "env"} \
|
||||
CONDUWUIT_SERVER_NAME="$SERVER_NAME" \
|
||||
TMPDIR="/" \
|
||||
${lib.getExe' valgrind "valgrind"} \
|
||||
--leak-check=no \
|
||||
--undef-value-errors=no \
|
||||
--exit-on-first-error=yes \
|
||||
--error-exitcode=1 \
|
||||
${lib.getExe main'}
|
||||
'' else writeShellScriptBin "start" ''
|
||||
start = writeShellScriptBin "start" ''
|
||||
set -euxo pipefail
|
||||
|
||||
${lib.getExe openssl} genrsa -out private_key.key 2048
|
||||
@@ -135,7 +84,6 @@ dockerTools.buildImage {
|
||||
coreutils
|
||||
main'
|
||||
start
|
||||
valgrind
|
||||
];
|
||||
};
|
||||
|
||||
|
||||
@@ -103,6 +103,7 @@ buildDepsOnlyEnv =
|
||||
++ [ "-DPORTABLE=haswell" ]) else ([ "-DPORTABLE=1" ])
|
||||
)
|
||||
++ old.cmakeFlags;
|
||||
|
||||
# outputs has "tools" which we dont need or use
|
||||
outputs = [ "out" ];
|
||||
|
||||
@@ -199,17 +200,7 @@ commonAttrs = {
|
||||
# differing values for `NIX_CFLAGS_COMPILE`, which contributes to spurious
|
||||
# rebuilds of bindgen and its depedents.
|
||||
jq
|
||||
]
|
||||
# needed to build Rust applications on macOS
|
||||
++ lib.optionals stdenv.hostPlatform.isDarwin [
|
||||
# https://github.com/NixOS/nixpkgs/issues/206242
|
||||
# ld: library not found for -liconv
|
||||
libiconv
|
||||
|
||||
# https://stackoverflow.com/questions/69869574/properly-adding-darwin-apple-sdk-to-a-nix-shell
|
||||
# https://discourse.nixos.org/t/compile-a-rust-binary-on-macos-dbcrossbar/8612
|
||||
pkgsBuildHost.darwin.apple_sdk.frameworks.Security
|
||||
];
|
||||
];
|
||||
};
|
||||
in
|
||||
|
||||
|
||||
@@ -36,6 +36,7 @@ dockerTools.buildLayeredImage {
|
||||
"org.opencontainers.image.documentation" = "https://conduwuit.puppyirl.gay/";
|
||||
"org.opencontainers.image.licenses" = "Apache-2.0";
|
||||
"org.opencontainers.image.revision" = inputs.self.rev or inputs.self.dirtyRev or "";
|
||||
"org.opencontainers.image.source" = "https://github.com/girlbossceo/conduwuit";
|
||||
"org.opencontainers.image.title" = main.pname;
|
||||
"org.opencontainers.image.url" = "https://conduwuit.puppyirl.gay/";
|
||||
"org.opencontainers.image.vendor" = "girlbossceo";
|
||||
|
||||
+68
-28
@@ -6,10 +6,14 @@ use std::{
|
||||
};
|
||||
|
||||
use conduwuit::{
|
||||
debug_error, err, info, trace, utils, utils::string::EMPTY, warn, Error, PduEvent, PduId,
|
||||
RawPduId, Result,
|
||||
debug_error, err, info, trace, utils,
|
||||
utils::{
|
||||
stream::{IterStream, ReadyExt},
|
||||
string::EMPTY,
|
||||
},
|
||||
warn, Error, PduEvent, PduId, RawPduId, Result,
|
||||
};
|
||||
use futures::{FutureExt, StreamExt};
|
||||
use futures::{FutureExt, StreamExt, TryStreamExt};
|
||||
use ruma::{
|
||||
api::{client::error::ErrorKind, federation::event::get_room_state},
|
||||
events::room::message::RoomMessageEventContent,
|
||||
@@ -54,7 +58,7 @@ pub(super) async fn get_auth_chain(
|
||||
.rooms
|
||||
.auth_chain
|
||||
.event_ids_iter(room_id, once(event_id.as_ref()))
|
||||
.await?
|
||||
.ready_filter_map(Result::ok)
|
||||
.count()
|
||||
.await;
|
||||
|
||||
@@ -327,11 +331,10 @@ pub(super) async fn get_room_state(
|
||||
.services
|
||||
.rooms
|
||||
.state_accessor
|
||||
.room_state_full(&room_id)
|
||||
.await?
|
||||
.values()
|
||||
.map(PduEvent::to_state_event)
|
||||
.collect();
|
||||
.room_state_full_pdus(&room_id)
|
||||
.map_ok(PduEvent::into_state_event)
|
||||
.try_collect()
|
||||
.await?;
|
||||
|
||||
if room_state.is_empty() {
|
||||
return Ok(RoomMessageEventContent::text_plain(
|
||||
@@ -554,7 +557,7 @@ pub(super) async fn first_pdu_in_room(
|
||||
.services
|
||||
.rooms
|
||||
.state_cache
|
||||
.server_in_room(&self.services.server.config.server_name, &room_id)
|
||||
.server_in_room(&self.services.server.name, &room_id)
|
||||
.await
|
||||
{
|
||||
return Ok(RoomMessageEventContent::text_plain(
|
||||
@@ -583,7 +586,7 @@ pub(super) async fn latest_pdu_in_room(
|
||||
.services
|
||||
.rooms
|
||||
.state_cache
|
||||
.server_in_room(&self.services.server.config.server_name, &room_id)
|
||||
.server_in_room(&self.services.server.name, &room_id)
|
||||
.await
|
||||
{
|
||||
return Ok(RoomMessageEventContent::text_plain(
|
||||
@@ -613,7 +616,7 @@ pub(super) async fn force_set_room_state_from_server(
|
||||
.services
|
||||
.rooms
|
||||
.state_cache
|
||||
.server_in_room(&self.services.server.config.server_name, &room_id)
|
||||
.server_in_room(&self.services.server.name, &room_id)
|
||||
.await
|
||||
{
|
||||
return Ok(RoomMessageEventContent::text_plain(
|
||||
@@ -640,6 +643,7 @@ pub(super) async fn force_set_room_state_from_server(
|
||||
room_id: room_id.clone().into(),
|
||||
event_id: first_pdu.event_id.clone(),
|
||||
})
|
||||
.boxed()
|
||||
.await?;
|
||||
|
||||
for pdu in remote_state_response.pdus.clone() {
|
||||
@@ -648,6 +652,7 @@ pub(super) async fn force_set_room_state_from_server(
|
||||
.rooms
|
||||
.event_handler
|
||||
.parse_incoming_pdu(&pdu)
|
||||
.boxed()
|
||||
.await
|
||||
{
|
||||
| Ok(t) => t,
|
||||
@@ -711,6 +716,7 @@ pub(super) async fn force_set_room_state_from_server(
|
||||
.rooms
|
||||
.event_handler
|
||||
.resolve_state(&room_id, &room_version, state)
|
||||
.boxed()
|
||||
.await?;
|
||||
|
||||
info!("Forcing new room state");
|
||||
@@ -756,8 +762,7 @@ pub(super) async fn get_signing_keys(
|
||||
notary: Option<Box<ServerName>>,
|
||||
query: bool,
|
||||
) -> Result<RoomMessageEventContent> {
|
||||
let server_name =
|
||||
server_name.unwrap_or_else(|| self.services.server.config.server_name.clone().into());
|
||||
let server_name = server_name.unwrap_or_else(|| self.services.server.name.clone().into());
|
||||
|
||||
if let Some(notary) = notary {
|
||||
let signing_keys = self
|
||||
@@ -793,8 +798,7 @@ pub(super) async fn get_verify_keys(
|
||||
&self,
|
||||
server_name: Option<Box<ServerName>>,
|
||||
) -> Result<RoomMessageEventContent> {
|
||||
let server_name =
|
||||
server_name.unwrap_or_else(|| self.services.server.config.server_name.clone().into());
|
||||
let server_name = server_name.unwrap_or_else(|| self.services.server.name.clone().into());
|
||||
|
||||
let keys = self
|
||||
.services
|
||||
@@ -824,7 +828,7 @@ pub(super) async fn resolve_true_destination(
|
||||
));
|
||||
}
|
||||
|
||||
if server_name == self.services.server.config.server_name {
|
||||
if server_name == self.services.server.name {
|
||||
return Ok(RoomMessageEventContent::text_plain(
|
||||
"Not allowed to send federation requests to ourselves. Please use `get-pdu` for \
|
||||
fetching local PDUs.",
|
||||
@@ -948,21 +952,57 @@ pub(super) async fn database_stats(
|
||||
property: Option<String>,
|
||||
map: Option<String>,
|
||||
) -> Result<RoomMessageEventContent> {
|
||||
let property = property.unwrap_or_else(|| "rocksdb.stats".to_owned());
|
||||
let map_name = map.as_ref().map_or(EMPTY, String::as_str);
|
||||
let property = property.unwrap_or_else(|| "rocksdb.stats".to_owned());
|
||||
self.services
|
||||
.db
|
||||
.iter()
|
||||
.filter(|(&name, _)| map_name.is_empty() || map_name == name)
|
||||
.try_stream()
|
||||
.try_for_each(|(&name, map)| {
|
||||
let res = map.property(&property).expect("invalid property");
|
||||
writeln!(self, "##### {name}:\n```\n{}\n```", res.trim())
|
||||
})
|
||||
.await?;
|
||||
|
||||
let mut out = String::new();
|
||||
for (&name, map) in self.services.db.iter() {
|
||||
if !map_name.is_empty() && map_name != name {
|
||||
continue;
|
||||
}
|
||||
Ok(RoomMessageEventContent::notice_plain(""))
|
||||
}
|
||||
|
||||
let res = map.property(&property)?;
|
||||
let res = res.trim();
|
||||
writeln!(out, "##### {name}:\n```\n{res}\n```")?;
|
||||
}
|
||||
#[admin_command]
|
||||
pub(super) async fn database_files(
|
||||
&self,
|
||||
map: Option<String>,
|
||||
level: Option<i32>,
|
||||
) -> Result<RoomMessageEventContent> {
|
||||
let mut files: Vec<_> = self.services.db.db.file_list().collect::<Result<_>>()?;
|
||||
|
||||
Ok(RoomMessageEventContent::notice_markdown(out))
|
||||
files.sort_by_key(|f| f.name.clone());
|
||||
|
||||
writeln!(self, "| lev | sst | keys | dels | size | column |").await?;
|
||||
writeln!(self, "| ---: | :--- | ---: | ---: | ---: | :--- |").await?;
|
||||
files
|
||||
.into_iter()
|
||||
.filter(|file| {
|
||||
map.as_deref()
|
||||
.is_none_or(|map| map == file.column_family_name)
|
||||
})
|
||||
.filter(|file| level.as_ref().is_none_or(|&level| level == file.level))
|
||||
.try_stream()
|
||||
.try_for_each(|file| {
|
||||
writeln!(
|
||||
self,
|
||||
"| {} | {:<13} | {:7}+ | {:4}- | {:9} | {} |",
|
||||
file.level,
|
||||
file.name,
|
||||
file.num_entries,
|
||||
file.num_deletions,
|
||||
file.size,
|
||||
file.column_family_name,
|
||||
)
|
||||
})
|
||||
.await?;
|
||||
|
||||
Ok(RoomMessageEventContent::notice_plain(""))
|
||||
}
|
||||
|
||||
#[admin_command]
|
||||
|
||||
@@ -226,6 +226,14 @@ pub(super) enum DebugCommand {
|
||||
/// - Trim memory usage
|
||||
TrimMemory,
|
||||
|
||||
/// - List database files
|
||||
DatabaseFiles {
|
||||
map: Option<String>,
|
||||
|
||||
#[arg(long)]
|
||||
level: Option<i32>,
|
||||
},
|
||||
|
||||
/// - Developer test stubs
|
||||
#[command(subcommand)]
|
||||
#[allow(non_snake_case)]
|
||||
|
||||
@@ -92,7 +92,7 @@ pub(super) async fn remote_user_in_rooms(
|
||||
&self,
|
||||
user_id: Box<UserId>,
|
||||
) -> Result<RoomMessageEventContent> {
|
||||
if user_id.server_name() == self.services.server.config.server_name {
|
||||
if user_id.server_name() == self.services.server.name {
|
||||
return Ok(RoomMessageEventContent::text_plain(
|
||||
"User belongs to our server, please use `list-joined-rooms` user admin command \
|
||||
instead.",
|
||||
|
||||
@@ -41,7 +41,7 @@ async fn changes_since(
|
||||
let results: Vec<_> = self
|
||||
.services
|
||||
.account_data
|
||||
.changes_since(room_id.as_deref(), &user_id, since)
|
||||
.changes_since(room_id.as_deref(), &user_id, since, None)
|
||||
.collect()
|
||||
.await;
|
||||
let query_time = timer.elapsed();
|
||||
|
||||
@@ -413,7 +413,7 @@ async fn get_to_device_events(
|
||||
let result = self
|
||||
.services
|
||||
.users
|
||||
.get_to_device_events(&user_id, &device_id)
|
||||
.get_to_device_events(&user_id, &device_id, None, None)
|
||||
.collect::<Vec<_>>()
|
||||
.await;
|
||||
let query_time = timer.elapsed();
|
||||
|
||||
@@ -72,7 +72,7 @@ pub(super) async fn reprocess(
|
||||
))),
|
||||
};
|
||||
match command {
|
||||
| RoomAliasCommand::Set { force, room_id, .. } =>
|
||||
| RoomAliasCommand::Set { force, room_id, .. } => {
|
||||
match (force, services.rooms.alias.resolve_local_alias(&room_alias).await) {
|
||||
| (true, Ok(id)) => {
|
||||
match services.rooms.alias.set_alias(
|
||||
@@ -106,8 +106,9 @@ pub(super) async fn reprocess(
|
||||
))),
|
||||
}
|
||||
},
|
||||
},
|
||||
| RoomAliasCommand::Remove { .. } =>
|
||||
}
|
||||
},
|
||||
| RoomAliasCommand::Remove { .. } => {
|
||||
match services.rooms.alias.resolve_local_alias(&room_alias).await {
|
||||
| Ok(id) => match services
|
||||
.rooms
|
||||
@@ -124,15 +125,17 @@ pub(super) async fn reprocess(
|
||||
},
|
||||
| Err(_) =>
|
||||
Ok(RoomMessageEventContent::text_plain("Alias isn't in use.")),
|
||||
},
|
||||
| RoomAliasCommand::Which { .. } =>
|
||||
}
|
||||
},
|
||||
| RoomAliasCommand::Which { .. } => {
|
||||
match services.rooms.alias.resolve_local_alias(&room_alias).await {
|
||||
| Ok(id) => Ok(RoomMessageEventContent::text_plain(format!(
|
||||
"Alias resolves to {id}"
|
||||
))),
|
||||
| Err(_) =>
|
||||
Ok(RoomMessageEventContent::text_plain("Alias isn't in use.")),
|
||||
},
|
||||
}
|
||||
},
|
||||
| RoomAliasCommand::List { .. } => unreachable!(),
|
||||
}
|
||||
},
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
use std::{fmt::Write, path::PathBuf, sync::Arc};
|
||||
|
||||
use conduwuit::{info, utils::time, warn, Config, Err, Result};
|
||||
use conduwuit::{info, utils::time, warn, Err, Result};
|
||||
use ruma::events::room::message::RoomMessageEventContent;
|
||||
|
||||
use crate::admin_command;
|
||||
@@ -33,12 +33,7 @@ pub(super) async fn reload_config(
|
||||
path: Option<PathBuf>,
|
||||
) -> Result<RoomMessageEventContent> {
|
||||
let path = path.as_deref().into_iter();
|
||||
let config = Config::load(path).and_then(|raw| Config::new(&raw))?;
|
||||
if config.server_name != self.services.server.config.server_name {
|
||||
return Err!("You can't change the server name.");
|
||||
}
|
||||
|
||||
let _old = self.services.server.config.update(config)?;
|
||||
self.services.config.reload(path)?;
|
||||
|
||||
Ok(RoomMessageEventContent::text_plain("Successfully reconfigured."))
|
||||
}
|
||||
@@ -97,7 +92,7 @@ pub(super) async fn clear_caches(&self) -> Result<RoomMessageEventContent> {
|
||||
|
||||
#[admin_command]
|
||||
pub(super) async fn list_backups(&self) -> Result<RoomMessageEventContent> {
|
||||
let result = self.services.globals.db.backup_list()?;
|
||||
let result = self.services.db.db.backup_list()?;
|
||||
|
||||
if result.is_empty() {
|
||||
Ok(RoomMessageEventContent::text_plain("No backups found."))
|
||||
@@ -108,31 +103,24 @@ pub(super) async fn list_backups(&self) -> Result<RoomMessageEventContent> {
|
||||
|
||||
#[admin_command]
|
||||
pub(super) async fn backup_database(&self) -> Result<RoomMessageEventContent> {
|
||||
let globals = Arc::clone(&self.services.globals);
|
||||
let db = Arc::clone(&self.services.db);
|
||||
let mut result = self
|
||||
.services
|
||||
.server
|
||||
.runtime()
|
||||
.spawn_blocking(move || match globals.db.backup() {
|
||||
.spawn_blocking(move || match db.db.backup() {
|
||||
| Ok(()) => String::new(),
|
||||
| Err(e) => e.to_string(),
|
||||
})
|
||||
.await?;
|
||||
|
||||
if result.is_empty() {
|
||||
result = self.services.globals.db.backup_list()?;
|
||||
result = self.services.db.db.backup_list()?;
|
||||
}
|
||||
|
||||
Ok(RoomMessageEventContent::notice_markdown(result))
|
||||
}
|
||||
|
||||
#[admin_command]
|
||||
pub(super) async fn list_database_files(&self) -> Result<RoomMessageEventContent> {
|
||||
let result = self.services.globals.db.file_list()?;
|
||||
|
||||
Ok(RoomMessageEventContent::notice_markdown(result))
|
||||
}
|
||||
|
||||
#[admin_command]
|
||||
pub(super) async fn admin_notice(&self, message: Vec<String>) -> Result<RoomMessageEventContent> {
|
||||
let message = message.join(" ");
|
||||
|
||||
@@ -46,9 +46,6 @@ pub(super) enum ServerCommand {
|
||||
/// - List database backups
|
||||
ListBackups,
|
||||
|
||||
/// - List database files
|
||||
ListDatabaseFiles,
|
||||
|
||||
/// - Send a message to the admin room.
|
||||
AdminNotice {
|
||||
message: Vec<String>,
|
||||
|
||||
@@ -50,6 +50,7 @@ http.workspace = true
|
||||
http-body-util.workspace = true
|
||||
hyper.workspace = true
|
||||
ipaddress.workspace = true
|
||||
itertools.workspace = true
|
||||
log.workspace = true
|
||||
rand.workspace = true
|
||||
reqwest.workspace = true
|
||||
|
||||
+18
-10
@@ -1,6 +1,6 @@
|
||||
use axum::extract::State;
|
||||
use conduwuit::{
|
||||
at, deref_at, err, ref_at,
|
||||
at, err, ref_at,
|
||||
utils::{
|
||||
future::TryExtExt,
|
||||
stream::{BroadbandExt, ReadyExt, TryIgnore, WidebandExt},
|
||||
@@ -10,10 +10,10 @@ use conduwuit::{
|
||||
};
|
||||
use futures::{
|
||||
future::{join, join3, try_join3, OptionFuture},
|
||||
FutureExt, StreamExt, TryFutureExt,
|
||||
FutureExt, StreamExt, TryFutureExt, TryStreamExt,
|
||||
};
|
||||
use ruma::{api::client::context::get_context, events::StateEventType, OwnedEventId, UserId};
|
||||
use service::rooms::{lazy_loading, lazy_loading::Options};
|
||||
use service::rooms::{lazy_loading, lazy_loading::Options, short::ShortStateKey};
|
||||
|
||||
use crate::{
|
||||
client::message::{event_filter, ignored_filter, lazy_loading_witness, visibility_filter},
|
||||
@@ -132,21 +132,29 @@ pub(crate) async fn get_context_route(
|
||||
.state_accessor
|
||||
.pdu_shortstatehash(state_at)
|
||||
.or_else(|_| services.rooms.state.get_room_shortstatehash(room_id))
|
||||
.and_then(|shortstatehash| services.rooms.state_accessor.state_full_ids(shortstatehash))
|
||||
.map_ok(|shortstatehash| {
|
||||
services
|
||||
.rooms
|
||||
.state_accessor
|
||||
.state_full_ids(shortstatehash)
|
||||
.map(Ok)
|
||||
})
|
||||
.map_err(|e| err!(Database("State not found: {e}")))
|
||||
.try_flatten_stream()
|
||||
.try_collect()
|
||||
.boxed();
|
||||
|
||||
let (lazy_loading_witnessed, state_ids) = join(lazy_loading_witnessed, state_ids).await;
|
||||
|
||||
let state_ids = state_ids?;
|
||||
let state_ids: Vec<(ShortStateKey, OwnedEventId)> = state_ids?;
|
||||
let shortstatekeys = state_ids.iter().map(at!(0)).stream();
|
||||
let shorteventids = state_ids.iter().map(ref_at!(1)).stream();
|
||||
let lazy_loading_witnessed = lazy_loading_witnessed.unwrap_or_default();
|
||||
let shortstatekeys = state_ids.iter().stream().map(deref_at!(0));
|
||||
|
||||
let state: Vec<_> = services
|
||||
.rooms
|
||||
.short
|
||||
.multi_get_statekey_from_short(shortstatekeys)
|
||||
.zip(state_ids.iter().stream().map(at!(1)))
|
||||
.zip(shorteventids)
|
||||
.ready_filter_map(|item| Some((item.0.ok()?, item.1)))
|
||||
.ready_filter_map(|((event_type, state_key), event_id)| {
|
||||
if filter.lazy_load_options.is_enabled()
|
||||
@@ -162,9 +170,9 @@ pub(crate) async fn get_context_route(
|
||||
Some(event_id)
|
||||
})
|
||||
.broad_filter_map(|event_id: &OwnedEventId| {
|
||||
services.rooms.timeline.get_pdu(event_id).ok()
|
||||
services.rooms.timeline.get_pdu(event_id.as_ref()).ok()
|
||||
})
|
||||
.map(|pdu| pdu.to_state_event())
|
||||
.map(PduEvent::into_state_event)
|
||||
.collect()
|
||||
.await;
|
||||
|
||||
|
||||
+16
-7
@@ -57,19 +57,28 @@ pub(crate) async fn create_content_route(
|
||||
let filename = body.filename.as_deref();
|
||||
let content_type = body.content_type.as_deref();
|
||||
let content_disposition = make_content_disposition(None, content_type, filename);
|
||||
let mxc = Mxc {
|
||||
let ref mxc = Mxc {
|
||||
server_name: services.globals.server_name(),
|
||||
media_id: &utils::random_string(MXC_LENGTH),
|
||||
};
|
||||
|
||||
services
|
||||
.media
|
||||
.create(&mxc, Some(user), Some(&content_disposition), content_type, &body.file)
|
||||
.await
|
||||
.map(|()| create_content::v3::Response {
|
||||
content_uri: mxc.to_string().into(),
|
||||
blurhash: None,
|
||||
})
|
||||
.create(mxc, Some(user), Some(&content_disposition), content_type, &body.file)
|
||||
.await?;
|
||||
|
||||
let blurhash = body.generate_blurhash.then(|| {
|
||||
services
|
||||
.media
|
||||
.create_blurhash(&body.file, content_type, filename)
|
||||
.ok()
|
||||
.flatten()
|
||||
});
|
||||
|
||||
Ok(create_content::v3::Response {
|
||||
content_uri: mxc.to_string().into(),
|
||||
blurhash: blurhash.flatten(),
|
||||
})
|
||||
}
|
||||
|
||||
/// # `GET /_matrix/client/v1/media/thumbnail/{serverName}/{mediaId}`
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
use std::{
|
||||
borrow::Borrow,
|
||||
collections::{BTreeMap, HashMap, HashSet},
|
||||
iter::once,
|
||||
net::IpAddr,
|
||||
sync::Arc,
|
||||
};
|
||||
@@ -8,14 +9,14 @@ use std::{
|
||||
use axum::extract::State;
|
||||
use axum_client_ip::InsecureClientIp;
|
||||
use conduwuit::{
|
||||
debug, debug_info, debug_warn, err, info,
|
||||
at, debug, debug_info, debug_warn, err, info,
|
||||
pdu::{gen_event_id_canonical_json, PduBuilder},
|
||||
result::FlatOk,
|
||||
trace,
|
||||
utils::{self, shuffle, IterStream, ReadyExt},
|
||||
warn, Err, PduEvent, Result,
|
||||
};
|
||||
use futures::{join, FutureExt, StreamExt};
|
||||
use futures::{join, FutureExt, StreamExt, TryFutureExt};
|
||||
use ruma::{
|
||||
api::{
|
||||
client::{
|
||||
@@ -45,7 +46,10 @@ use ruma::{
|
||||
use service::{
|
||||
appservice::RegistrationInfo,
|
||||
pdu::gen_event_id,
|
||||
rooms::{state::RoomMutexGuard, state_compressor::HashSetCompressStateEvent},
|
||||
rooms::{
|
||||
state::RoomMutexGuard,
|
||||
state_compressor::{CompressedState, HashSetCompressStateEvent},
|
||||
},
|
||||
Services,
|
||||
};
|
||||
|
||||
@@ -765,11 +769,12 @@ pub(crate) async fn get_member_events_route(
|
||||
.rooms
|
||||
.state_accessor
|
||||
.room_state_full(&body.room_id)
|
||||
.await?
|
||||
.iter()
|
||||
.filter(|(key, _)| key.0 == StateEventType::RoomMember)
|
||||
.map(|(_, pdu)| pdu.to_member_event())
|
||||
.collect(),
|
||||
.ready_filter_map(Result::ok)
|
||||
.ready_filter(|((ty, _), _)| *ty == StateEventType::RoomMember)
|
||||
.map(at!(1))
|
||||
.map(PduEvent::into_member_event)
|
||||
.collect()
|
||||
.await,
|
||||
})
|
||||
}
|
||||
|
||||
@@ -1167,7 +1172,7 @@ async fn join_room_by_id_helper_remote(
|
||||
}
|
||||
|
||||
info!("Compressing state from send_join");
|
||||
let compressed: HashSet<_> = services
|
||||
let compressed: CompressedState = services
|
||||
.rooms
|
||||
.state_compressor
|
||||
.compress_state_events(state.iter().map(|(ssk, eid)| (ssk, eid.borrow())))
|
||||
@@ -1215,7 +1220,7 @@ async fn join_room_by_id_helper_remote(
|
||||
.append_pdu(
|
||||
&parsed_join_pdu,
|
||||
join_event,
|
||||
vec![(*parsed_join_pdu.event_id).to_owned()],
|
||||
once(parsed_join_pdu.event_id.borrow()),
|
||||
&state_lock,
|
||||
)
|
||||
.await?;
|
||||
@@ -1707,9 +1712,6 @@ pub async fn leave_room(
|
||||
room_id: &RoomId,
|
||||
reason: Option<String>,
|
||||
) -> Result<()> {
|
||||
//use conduwuit::utils::stream::OptionStream;
|
||||
use futures::TryFutureExt;
|
||||
|
||||
// Ask a remote server if we don't have this room and are not knocking on it
|
||||
if !services
|
||||
.rooms
|
||||
@@ -2197,7 +2199,7 @@ async fn knock_room_helper_local(
|
||||
.append_pdu(
|
||||
&parsed_knock_pdu,
|
||||
knock_event,
|
||||
vec![(*parsed_knock_pdu.event_id).to_owned()],
|
||||
once(parsed_knock_pdu.event_id.borrow()),
|
||||
&state_lock,
|
||||
)
|
||||
.await?;
|
||||
@@ -2341,7 +2343,7 @@ async fn knock_room_helper_remote(
|
||||
}
|
||||
|
||||
info!("Compressing state from send_knock");
|
||||
let compressed: HashSet<_> = services
|
||||
let compressed: CompressedState = services
|
||||
.rooms
|
||||
.state_compressor
|
||||
.compress_state_events(state_map.iter().map(|(ssk, eid)| (ssk, eid.borrow())))
|
||||
@@ -2396,7 +2398,7 @@ async fn knock_room_helper_remote(
|
||||
.append_pdu(
|
||||
&parsed_knock_pdu,
|
||||
knock_event,
|
||||
vec![(*parsed_knock_pdu.event_id).to_owned()],
|
||||
once(parsed_knock_pdu.event_id.borrow()),
|
||||
&state_lock,
|
||||
)
|
||||
.await?;
|
||||
|
||||
@@ -6,9 +6,9 @@ use conduwuit::{
|
||||
stream::{BroadbandExt, TryIgnore, WidebandExt},
|
||||
IterStream, ReadyExt,
|
||||
},
|
||||
Event, PduCount, Result,
|
||||
Event, PduCount, PduEvent, Result,
|
||||
};
|
||||
use futures::{future::OptionFuture, pin_mut, FutureExt, StreamExt};
|
||||
use futures::{future::OptionFuture, pin_mut, FutureExt, StreamExt, TryFutureExt};
|
||||
use ruma::{
|
||||
api::{
|
||||
client::{filter::RoomEventFilter, message::get_message_events},
|
||||
@@ -220,8 +220,8 @@ async fn get_member_event(
|
||||
.rooms
|
||||
.state_accessor
|
||||
.room_state_get(room_id, &StateEventType::RoomMember, user_id.as_str())
|
||||
.map_ok(PduEvent::into_state_event)
|
||||
.await
|
||||
.map(|member_event| member_event.to_state_event())
|
||||
.ok()
|
||||
}
|
||||
|
||||
|
||||
@@ -37,7 +37,7 @@ pub(crate) async fn create_openid_token_route(
|
||||
Ok(account::request_openid_token::v3::Response {
|
||||
access_token,
|
||||
token_type: TokenType::Bearer,
|
||||
matrix_server_name: services.server.config.server_name.clone(),
|
||||
matrix_server_name: services.server.name.clone(),
|
||||
expires_in: Duration::from_secs(expires_in),
|
||||
})
|
||||
}
|
||||
|
||||
@@ -50,7 +50,7 @@ pub(crate) async fn report_room_route(
|
||||
if !services
|
||||
.rooms
|
||||
.state_cache
|
||||
.server_in_room(&services.server.config.server_name, &body.room_id)
|
||||
.server_in_room(&services.server.name, &body.room_id)
|
||||
.await
|
||||
{
|
||||
return Err!(Request(NotFound(
|
||||
|
||||
@@ -71,7 +71,7 @@ pub(crate) async fn create_room_route(
|
||||
let room_id: OwnedRoomId = if let Some(custom_room_id) = &body.room_id {
|
||||
custom_room_id_check(&services, custom_room_id)?
|
||||
} else {
|
||||
RoomId::new(&services.server.config.server_name)
|
||||
RoomId::new(&services.server.name)
|
||||
};
|
||||
|
||||
// check if room ID doesn't already exist instead of erroring on auth check
|
||||
|
||||
@@ -2,7 +2,7 @@ use axum::extract::State;
|
||||
use conduwuit::{
|
||||
at,
|
||||
utils::{stream::TryTools, BoolExt},
|
||||
Err, Result,
|
||||
Err, PduEvent, Result,
|
||||
};
|
||||
use futures::TryStreamExt;
|
||||
use ruma::api::client::room::initial_sync::v3::{PaginationChunk, Request, Response};
|
||||
@@ -39,10 +39,9 @@ pub(crate) async fn room_initial_sync_route(
|
||||
.rooms
|
||||
.state_accessor
|
||||
.room_state_full_pdus(room_id)
|
||||
.await?
|
||||
.into_iter()
|
||||
.map(|pdu| pdu.to_state_event())
|
||||
.collect();
|
||||
.map_ok(PduEvent::into_state_event)
|
||||
.try_collect()
|
||||
.await?;
|
||||
|
||||
let messages = PaginationChunk {
|
||||
start: events.last().map(at!(0)).as_ref().map(ToString::to_string),
|
||||
|
||||
@@ -7,7 +7,7 @@ use conduwuit::{
|
||||
utils::{stream::ReadyExt, IterStream},
|
||||
Err, PduEvent, Result,
|
||||
};
|
||||
use futures::{future::OptionFuture, FutureExt, StreamExt, TryFutureExt};
|
||||
use futures::{future::OptionFuture, FutureExt, StreamExt, TryFutureExt, TryStreamExt};
|
||||
use ruma::{
|
||||
api::client::search::search_events::{
|
||||
self,
|
||||
@@ -181,15 +181,15 @@ async fn category_room_events(
|
||||
}
|
||||
|
||||
async fn procure_room_state(services: &Services, room_id: &RoomId) -> Result<RoomState> {
|
||||
let state_map = services
|
||||
let state = services
|
||||
.rooms
|
||||
.state_accessor
|
||||
.room_state_full(room_id)
|
||||
.room_state_full_pdus(room_id)
|
||||
.map_ok(PduEvent::into_state_event)
|
||||
.try_collect()
|
||||
.await?;
|
||||
|
||||
let state_events = state_map.values().map(PduEvent::to_state_event).collect();
|
||||
|
||||
Ok(state_events)
|
||||
Ok(state)
|
||||
}
|
||||
|
||||
async fn check_room_visible(
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
use axum::extract::State;
|
||||
use conduwuit::{err, pdu::PduBuilder, utils::BoolExt, Err, PduEvent, Result};
|
||||
use futures::TryStreamExt;
|
||||
use ruma::{
|
||||
api::client::state::{get_state_events, get_state_events_for_key, send_state_event},
|
||||
events::{
|
||||
@@ -82,11 +83,10 @@ pub(crate) async fn get_state_events_route(
|
||||
room_state: services
|
||||
.rooms
|
||||
.state_accessor
|
||||
.room_state_full(&body.room_id)
|
||||
.await?
|
||||
.values()
|
||||
.map(PduEvent::to_state_event)
|
||||
.collect(),
|
||||
.room_state_full_pdus(&body.room_id)
|
||||
.map_ok(PduEvent::into_state_event)
|
||||
.try_collect()
|
||||
.await?,
|
||||
})
|
||||
}
|
||||
|
||||
@@ -133,7 +133,7 @@ pub(crate) async fn get_state_events_for_key_route(
|
||||
|
||||
Ok(get_state_events_for_key::v3::Response {
|
||||
content: event_format.or(|| event.get_content_as_value()),
|
||||
event: event_format.then(|| event.to_state_event_value()),
|
||||
event: event_format.then(|| event.into_state_event_value()),
|
||||
})
|
||||
}
|
||||
|
||||
|
||||
+324
-299
@@ -1,22 +1,22 @@
|
||||
use std::{
|
||||
cmp::{self},
|
||||
collections::{hash_map::Entry, BTreeMap, HashMap, HashSet},
|
||||
collections::{BTreeMap, HashMap, HashSet},
|
||||
time::Duration,
|
||||
};
|
||||
|
||||
use axum::extract::State;
|
||||
use conduwuit::{
|
||||
at, err, error, extract_variant, is_equal_to,
|
||||
pdu::EventHash,
|
||||
at, err, error, extract_variant, is_equal_to, pair_of,
|
||||
pdu::{Event, EventHash},
|
||||
ref_at,
|
||||
result::FlatOk,
|
||||
utils::{
|
||||
self,
|
||||
future::OptionExt,
|
||||
math::ruma_from_u64,
|
||||
stream::{BroadbandExt, Tools, WidebandExt},
|
||||
stream::{BroadbandExt, Tools, TryExpect, WidebandExt},
|
||||
BoolExt, IterStream, ReadyExt, TryFutureExtExt,
|
||||
},
|
||||
Error, PduCount, PduEvent, Result,
|
||||
PduCount, PduEvent, Result,
|
||||
};
|
||||
use conduwuit_service::{
|
||||
rooms::{
|
||||
@@ -28,7 +28,7 @@ use conduwuit_service::{
|
||||
};
|
||||
use futures::{
|
||||
future::{join, join3, join4, join5, try_join, try_join4, OptionFuture},
|
||||
FutureExt, StreamExt, TryFutureExt,
|
||||
FutureExt, StreamExt, TryFutureExt, TryStreamExt,
|
||||
};
|
||||
use ruma::{
|
||||
api::client::{
|
||||
@@ -45,7 +45,7 @@ use ruma::{
|
||||
uiaa::UiaaResponse,
|
||||
},
|
||||
events::{
|
||||
presence::PresenceEvent,
|
||||
presence::{PresenceEvent, PresenceEventContent},
|
||||
room::member::{MembershipState, RoomMemberEventContent},
|
||||
AnyRawAccountDataEvent, AnySyncEphemeralRoomEvent, StateEventType,
|
||||
TimelineEventType::*,
|
||||
@@ -53,6 +53,7 @@ use ruma::{
|
||||
serde::Raw,
|
||||
uint, DeviceId, EventId, OwnedEventId, OwnedRoomId, OwnedUserId, RoomId, UserId,
|
||||
};
|
||||
use service::rooms::short::{ShortEventId, ShortStateKey};
|
||||
|
||||
use super::{load_timeline, share_encrypted_room};
|
||||
use crate::{client::ignored_filter, Ruma, RumaResponse};
|
||||
@@ -62,11 +63,12 @@ struct StateChanges {
|
||||
heroes: Option<Vec<OwnedUserId>>,
|
||||
joined_member_count: Option<u64>,
|
||||
invited_member_count: Option<u64>,
|
||||
joined_since_last_sync: bool,
|
||||
state_events: Vec<PduEvent>,
|
||||
device_list_updates: HashSet<OwnedUserId>,
|
||||
left_encrypted_users: HashSet<OwnedUserId>,
|
||||
}
|
||||
|
||||
type PresenceUpdates = HashMap<OwnedUserId, PresenceEvent>;
|
||||
type PresenceUpdates = HashMap<OwnedUserId, PresenceEventContent>;
|
||||
|
||||
/// # `GET /_matrix/client/r0/sync`
|
||||
///
|
||||
@@ -285,20 +287,20 @@ pub(crate) async fn build_sync_events(
|
||||
|
||||
let account_data = services
|
||||
.account_data
|
||||
.changes_since(None, sender_user, since)
|
||||
.changes_since(None, sender_user, since, Some(next_batch))
|
||||
.ready_filter_map(|e| extract_variant!(e, AnyRawAccountDataEvent::Global))
|
||||
.collect();
|
||||
|
||||
// Look for device list updates of this account
|
||||
let keys_changed = services
|
||||
.users
|
||||
.keys_changed(sender_user, since, None)
|
||||
.keys_changed(sender_user, since, Some(next_batch))
|
||||
.map(ToOwned::to_owned)
|
||||
.collect::<HashSet<_>>();
|
||||
|
||||
let to_device_events = services
|
||||
.users
|
||||
.get_to_device_events(sender_user, sender_device)
|
||||
.get_to_device_events(sender_user, sender_device, Some(since), Some(next_batch))
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
let device_one_time_keys_count = services
|
||||
@@ -325,18 +327,16 @@ pub(crate) async fn build_sync_events(
|
||||
|
||||
// If the user doesn't share an encrypted room with the target anymore, we need
|
||||
// to tell them
|
||||
let device_list_left = left_encrypted_users
|
||||
let device_list_left: HashSet<_> = left_encrypted_users
|
||||
.into_iter()
|
||||
.stream()
|
||||
.broad_filter_map(|user_id| async move {
|
||||
let no_shared_encrypted_room =
|
||||
!share_encrypted_room(services, sender_user, &user_id, None).await;
|
||||
no_shared_encrypted_room.then_some(user_id)
|
||||
})
|
||||
.ready_fold(HashSet::new(), |mut device_list_left, user_id| {
|
||||
device_list_left.insert(user_id);
|
||||
device_list_left
|
||||
share_encrypted_room(services, sender_user, &user_id, None)
|
||||
.await
|
||||
.eq(&false)
|
||||
.then_some(user_id)
|
||||
})
|
||||
.collect()
|
||||
.await;
|
||||
|
||||
let response = sync_events::v3::Response {
|
||||
@@ -351,9 +351,11 @@ pub(crate) async fn build_sync_events(
|
||||
next_batch: next_batch.to_string(),
|
||||
presence: Presence {
|
||||
events: presence_updates
|
||||
.unwrap_or_default()
|
||||
.into_values()
|
||||
.map(|v| Raw::new(&v).expect("PresenceEvent always serializes successfully"))
|
||||
.into_iter()
|
||||
.flat_map(IntoIterator::into_iter)
|
||||
.map(|(sender, content)| PresenceEvent { content, sender })
|
||||
.map(|ref event| Raw::new(event))
|
||||
.filter_map(Result::ok)
|
||||
.collect(),
|
||||
},
|
||||
rooms: Rooms {
|
||||
@@ -390,45 +392,8 @@ async fn process_presence_updates(
|
||||
.map_ok(move |event| (user_id, event))
|
||||
.ok()
|
||||
})
|
||||
.ready_fold(PresenceUpdates::new(), |mut updates, (user_id, event)| {
|
||||
match updates.entry(user_id.into()) {
|
||||
| Entry::Vacant(slot) => {
|
||||
let mut new_event = event;
|
||||
new_event.content.last_active_ago = match new_event.content.currently_active {
|
||||
| Some(true) => None,
|
||||
| _ => new_event.content.last_active_ago,
|
||||
};
|
||||
|
||||
slot.insert(new_event);
|
||||
},
|
||||
| Entry::Occupied(mut slot) => {
|
||||
let curr_event = slot.get_mut();
|
||||
let curr_content = &mut curr_event.content;
|
||||
let new_content = event.content;
|
||||
|
||||
// Update existing presence event with more info
|
||||
curr_content.presence = new_content.presence;
|
||||
curr_content.status_msg = new_content
|
||||
.status_msg
|
||||
.or_else(|| curr_content.status_msg.take());
|
||||
curr_content.displayname = new_content
|
||||
.displayname
|
||||
.or_else(|| curr_content.displayname.take());
|
||||
curr_content.avatar_url = new_content
|
||||
.avatar_url
|
||||
.or_else(|| curr_content.avatar_url.take());
|
||||
curr_content.currently_active = new_content
|
||||
.currently_active
|
||||
.or(curr_content.currently_active);
|
||||
curr_content.last_active_ago = match curr_content.currently_active {
|
||||
| Some(true) => None,
|
||||
| _ => new_content.last_active_ago.or(curr_content.last_active_ago),
|
||||
};
|
||||
},
|
||||
};
|
||||
|
||||
updates
|
||||
})
|
||||
.map(|(user_id, event)| (user_id.to_owned(), event.content))
|
||||
.collect()
|
||||
.await
|
||||
}
|
||||
|
||||
@@ -503,16 +468,20 @@ async fn handle_left_room(
|
||||
|
||||
let mut left_state_events = Vec::new();
|
||||
|
||||
let since_shortstatehash = services
|
||||
.rooms
|
||||
.user
|
||||
.get_token_shortstatehash(room_id, since)
|
||||
.await;
|
||||
let since_shortstatehash = services.rooms.user.get_token_shortstatehash(room_id, since);
|
||||
|
||||
let since_state_ids = match since_shortstatehash {
|
||||
| Ok(s) => services.rooms.state_accessor.state_full_ids(s).await?,
|
||||
| Err(_) => HashMap::new(),
|
||||
};
|
||||
let since_state_ids: HashMap<_, OwnedEventId> = since_shortstatehash
|
||||
.map_ok(|since_shortstatehash| {
|
||||
services
|
||||
.rooms
|
||||
.state_accessor
|
||||
.state_full_ids(since_shortstatehash)
|
||||
.map(Ok)
|
||||
})
|
||||
.try_flatten_stream()
|
||||
.try_collect()
|
||||
.await
|
||||
.unwrap_or_default();
|
||||
|
||||
let Ok(left_event_id): Result<OwnedEventId> = services
|
||||
.rooms
|
||||
@@ -534,11 +503,12 @@ async fn handle_left_room(
|
||||
return Ok(None);
|
||||
};
|
||||
|
||||
let mut left_state_ids = services
|
||||
let mut left_state_ids: HashMap<_, _> = services
|
||||
.rooms
|
||||
.state_accessor
|
||||
.state_full_ids(left_shortstatehash)
|
||||
.await?;
|
||||
.collect()
|
||||
.await;
|
||||
|
||||
let leave_shortstatekey = services
|
||||
.rooms
|
||||
@@ -652,6 +622,40 @@ async fn load_joined_room(
|
||||
.await?;
|
||||
|
||||
let (timeline_pdus, limited) = timeline;
|
||||
let initial = since_shortstatehash.is_none();
|
||||
let lazy_loading_enabled = filter.room.state.lazy_load_options.is_enabled()
|
||||
|| filter.room.timeline.lazy_load_options.is_enabled();
|
||||
|
||||
let lazy_loading_context = &lazy_loading::Context {
|
||||
user_id: sender_user,
|
||||
device_id: sender_device,
|
||||
room_id,
|
||||
token: Some(since),
|
||||
options: Some(&filter.room.state.lazy_load_options),
|
||||
};
|
||||
|
||||
// Reset lazy loading because this is an initial sync
|
||||
let lazy_load_reset: OptionFuture<_> = initial
|
||||
.then(|| services.rooms.lazy_loading.reset(lazy_loading_context))
|
||||
.into();
|
||||
|
||||
lazy_load_reset.await;
|
||||
let witness: OptionFuture<_> = lazy_loading_enabled
|
||||
.then(|| {
|
||||
let witness: Witness = timeline_pdus
|
||||
.iter()
|
||||
.map(ref_at!(1))
|
||||
.map(Event::sender)
|
||||
.map(Into::into)
|
||||
.chain(receipt_events.keys().map(Into::into))
|
||||
.collect();
|
||||
|
||||
services
|
||||
.rooms
|
||||
.lazy_loading
|
||||
.witness_retain(witness, lazy_loading_context)
|
||||
})
|
||||
.into();
|
||||
|
||||
let last_notification_read: OptionFuture<_> = timeline_pdus
|
||||
.is_empty()
|
||||
@@ -663,10 +667,6 @@ async fn load_joined_room(
|
||||
})
|
||||
.into();
|
||||
|
||||
let no_state_changes = timeline_pdus.is_empty()
|
||||
&& (since_shortstatehash.is_none()
|
||||
|| since_shortstatehash.is_some_and(is_equal_to!(current_shortstatehash)));
|
||||
|
||||
let since_sender_member: OptionFuture<_> = since_shortstatehash
|
||||
.map(|short| {
|
||||
services
|
||||
@@ -677,125 +677,85 @@ async fn load_joined_room(
|
||||
})
|
||||
.into();
|
||||
|
||||
let (last_notification_read, since_sender_member, witness) =
|
||||
join3(last_notification_read, since_sender_member, witness).await;
|
||||
|
||||
let joined_since_last_sync =
|
||||
since_sender_member
|
||||
.await
|
||||
.flatten()
|
||||
.is_none_or(|content: RoomMemberEventContent| {
|
||||
content.membership != MembershipState::Join
|
||||
});
|
||||
|
||||
let lazy_loading_enabled = filter.room.state.lazy_load_options.is_enabled()
|
||||
|| filter.room.timeline.lazy_load_options.is_enabled();
|
||||
|
||||
let generate_witness =
|
||||
lazy_loading_enabled && (since_shortstatehash.is_none() || joined_since_last_sync);
|
||||
|
||||
let lazy_reset = lazy_loading_enabled && since_shortstatehash.is_none();
|
||||
|
||||
let lazy_loading_context = &lazy_loading::Context {
|
||||
user_id: sender_user,
|
||||
device_id: sender_device,
|
||||
room_id,
|
||||
token: None,
|
||||
options: Some(&filter.room.state.lazy_load_options),
|
||||
};
|
||||
|
||||
// Reset lazy loading because this is an initial sync
|
||||
let lazy_load_reset: OptionFuture<_> = lazy_reset
|
||||
.then(|| services.rooms.lazy_loading.reset(lazy_loading_context))
|
||||
.into();
|
||||
|
||||
lazy_load_reset.await;
|
||||
let witness: Option<Witness> = generate_witness.then(|| {
|
||||
timeline_pdus
|
||||
.iter()
|
||||
.map(|(_, pdu)| pdu.sender.clone())
|
||||
.chain(receipt_events.keys().cloned())
|
||||
.collect()
|
||||
});
|
||||
|
||||
let witness: OptionFuture<_> = witness
|
||||
.map(|witness| {
|
||||
services
|
||||
.rooms
|
||||
.lazy_loading
|
||||
.witness_retain(witness, lazy_loading_context)
|
||||
})
|
||||
.into();
|
||||
|
||||
let witness = witness.await;
|
||||
let mut device_list_updates = HashSet::<OwnedUserId>::new();
|
||||
let mut left_encrypted_users = HashSet::<OwnedUserId>::new();
|
||||
let StateChanges {
|
||||
heroes,
|
||||
joined_member_count,
|
||||
invited_member_count,
|
||||
mut state_events,
|
||||
mut device_list_updates,
|
||||
left_encrypted_users,
|
||||
} = calculate_state_changes(
|
||||
services,
|
||||
sender_user,
|
||||
room_id,
|
||||
full_state,
|
||||
filter,
|
||||
since_shortstatehash,
|
||||
current_shortstatehash,
|
||||
joined_since_last_sync,
|
||||
state_events,
|
||||
} = if no_state_changes {
|
||||
StateChanges::default()
|
||||
} else {
|
||||
calculate_state_changes(
|
||||
services,
|
||||
sender_user,
|
||||
room_id,
|
||||
full_state,
|
||||
filter,
|
||||
&mut device_list_updates,
|
||||
&mut left_encrypted_users,
|
||||
since_shortstatehash,
|
||||
current_shortstatehash,
|
||||
joined_since_last_sync,
|
||||
witness.as_ref(),
|
||||
)
|
||||
.boxed()
|
||||
.await?
|
||||
witness.as_ref(),
|
||||
)
|
||||
.boxed()
|
||||
.await?;
|
||||
|
||||
let is_sender_membership = |pdu: &PduEvent| {
|
||||
pdu.kind == StateEventType::RoomMember.into()
|
||||
&& pdu
|
||||
.state_key
|
||||
.as_deref()
|
||||
.is_some_and(is_equal_to!(sender_user.as_str()))
|
||||
};
|
||||
|
||||
let joined_sender_member: Option<_> = (joined_since_last_sync && timeline_pdus.is_empty())
|
||||
.then(|| {
|
||||
state_events
|
||||
.iter()
|
||||
.position(is_sender_membership)
|
||||
.map(|pos| state_events.swap_remove(pos))
|
||||
})
|
||||
.flatten();
|
||||
|
||||
let prev_batch = timeline_pdus.first().map(at!(0)).or_else(|| {
|
||||
joined_sender_member
|
||||
.is_some()
|
||||
.then_some(since)
|
||||
.map(Into::into)
|
||||
});
|
||||
|
||||
let room_events = timeline_pdus
|
||||
.into_iter()
|
||||
.stream()
|
||||
.wide_filter_map(|item| ignored_filter(services, item, sender_user))
|
||||
.map(at!(1))
|
||||
.chain(joined_sender_member.into_iter().stream())
|
||||
.map(|pdu| pdu.to_sync_room_event())
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
let account_data_events = services
|
||||
.account_data
|
||||
.changes_since(Some(room_id), sender_user, since)
|
||||
.changes_since(Some(room_id), sender_user, since, Some(next_batch))
|
||||
.ready_filter_map(|e| extract_variant!(e, AnyRawAccountDataEvent::Room))
|
||||
.collect();
|
||||
|
||||
// Look for device list updates in this room
|
||||
let device_updates = services
|
||||
.users
|
||||
.room_keys_changed(room_id, since, None)
|
||||
.room_keys_changed(room_id, since, Some(next_batch))
|
||||
.map(|(user_id, _)| user_id)
|
||||
.map(ToOwned::to_owned)
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
let room_events = timeline_pdus
|
||||
.iter()
|
||||
.stream()
|
||||
.wide_filter_map(|item| ignored_filter(services, item.clone(), sender_user))
|
||||
.map(|(_, pdu)| pdu.to_sync_room_event())
|
||||
.collect();
|
||||
|
||||
let typing_events = services
|
||||
.rooms
|
||||
.typing
|
||||
.last_typing_update(room_id)
|
||||
.and_then(|count| async move {
|
||||
if count <= since {
|
||||
return Ok(Vec::<Raw<AnySyncEphemeralRoomEvent>>::new());
|
||||
}
|
||||
|
||||
let typings = services
|
||||
.rooms
|
||||
.typing
|
||||
.typings_all(room_id, sender_user)
|
||||
.await?;
|
||||
|
||||
Ok(vec![serde_json::from_str(&serde_json::to_string(&typings)?)?])
|
||||
})
|
||||
.unwrap_or(Vec::new());
|
||||
|
||||
let send_notification_counts = last_notification_read
|
||||
.is_none_or(|&count| count > since)
|
||||
.await;
|
||||
let send_notification_counts = last_notification_read.is_none_or(|count| count > since);
|
||||
|
||||
let notification_count: OptionFuture<_> = send_notification_counts
|
||||
.then(|| {
|
||||
@@ -819,8 +779,27 @@ async fn load_joined_room(
|
||||
})
|
||||
.into();
|
||||
|
||||
let events = join3(room_events, account_data_events, typing_events);
|
||||
let typing_events = services
|
||||
.rooms
|
||||
.typing
|
||||
.last_typing_update(room_id)
|
||||
.and_then(|count| async move {
|
||||
if count <= since {
|
||||
return Ok(Vec::<Raw<AnySyncEphemeralRoomEvent>>::new());
|
||||
}
|
||||
|
||||
let typings = services
|
||||
.rooms
|
||||
.typing
|
||||
.typings_all(room_id, sender_user)
|
||||
.await?;
|
||||
|
||||
Ok(vec![serde_json::from_str(&serde_json::to_string(&typings)?)?])
|
||||
})
|
||||
.unwrap_or(Vec::new());
|
||||
|
||||
let unread_notifications = join(notification_count, highlight_count);
|
||||
let events = join3(room_events, account_data_events, typing_events);
|
||||
let (unread_notifications, events, device_updates) =
|
||||
join3(unread_notifications, events, device_updates)
|
||||
.boxed()
|
||||
@@ -877,12 +856,8 @@ async fn load_joined_room(
|
||||
unread_notifications: UnreadNotificationsCount { highlight_count, notification_count },
|
||||
timeline: Timeline {
|
||||
limited: limited || joined_since_last_sync,
|
||||
prev_batch: prev_batch.as_ref().map(ToString::to_string),
|
||||
events: room_events,
|
||||
prev_batch: timeline_pdus
|
||||
.first()
|
||||
.map(at!(0))
|
||||
.as_ref()
|
||||
.map(ToString::to_string),
|
||||
},
|
||||
state: RoomState {
|
||||
events: state_events
|
||||
@@ -914,14 +889,12 @@ async fn calculate_state_changes(
|
||||
room_id: &RoomId,
|
||||
full_state: bool,
|
||||
filter: &FilterDefinition,
|
||||
device_list_updates: &mut HashSet<OwnedUserId>,
|
||||
left_encrypted_users: &mut HashSet<OwnedUserId>,
|
||||
since_shortstatehash: Option<ShortStateHash>,
|
||||
current_shortstatehash: ShortStateHash,
|
||||
joined_since_last_sync: bool,
|
||||
witness: Option<&Witness>,
|
||||
) -> Result<StateChanges> {
|
||||
if since_shortstatehash.is_none() || joined_since_last_sync {
|
||||
if since_shortstatehash.is_none() {
|
||||
calculate_state_initial(
|
||||
services,
|
||||
sender_user,
|
||||
@@ -939,11 +912,10 @@ async fn calculate_state_changes(
|
||||
room_id,
|
||||
full_state,
|
||||
filter,
|
||||
device_list_updates,
|
||||
left_encrypted_users,
|
||||
since_shortstatehash,
|
||||
current_shortstatehash,
|
||||
joined_since_last_sync,
|
||||
witness,
|
||||
)
|
||||
.await
|
||||
}
|
||||
@@ -956,39 +928,32 @@ async fn calculate_state_initial(
|
||||
sender_user: &UserId,
|
||||
room_id: &RoomId,
|
||||
full_state: bool,
|
||||
filter: &FilterDefinition,
|
||||
_filter: &FilterDefinition,
|
||||
current_shortstatehash: ShortStateHash,
|
||||
witness: Option<&Witness>,
|
||||
) -> Result<StateChanges> {
|
||||
let state_events = services
|
||||
let (shortstatekeys, event_ids): (Vec<_>, Vec<_>) = services
|
||||
.rooms
|
||||
.state_accessor
|
||||
.state_full_ids(current_shortstatehash)
|
||||
.await?;
|
||||
|
||||
let shortstatekeys = state_events.keys().copied().stream();
|
||||
.unzip()
|
||||
.await;
|
||||
|
||||
let state_events = services
|
||||
.rooms
|
||||
.short
|
||||
.multi_get_statekey_from_short(shortstatekeys)
|
||||
.zip(state_events.values().cloned().stream())
|
||||
.multi_get_statekey_from_short(shortstatekeys.into_iter().stream())
|
||||
.zip(event_ids.into_iter().stream())
|
||||
.ready_filter_map(|item| Some((item.0.ok()?, item.1)))
|
||||
.ready_filter_map(|((event_type, state_key), event_id)| {
|
||||
let lazy_load_enabled = filter.room.state.lazy_load_options.is_enabled()
|
||||
|| filter.room.timeline.lazy_load_options.is_enabled();
|
||||
|
||||
if lazy_load_enabled
|
||||
let lazy = !full_state
|
||||
&& event_type == StateEventType::RoomMember
|
||||
&& !full_state
|
||||
&& state_key.as_str().try_into().is_ok_and(|user_id: &UserId| {
|
||||
sender_user != user_id
|
||||
&& witness.is_some_and(|witness| !witness.contains(user_id))
|
||||
}) {
|
||||
return None;
|
||||
}
|
||||
});
|
||||
|
||||
Some(event_id)
|
||||
lazy.or_some(event_id)
|
||||
})
|
||||
.broad_filter_map(|event_id: OwnedEventId| async move {
|
||||
services.rooms.timeline.get_pdu(&event_id).await.ok()
|
||||
@@ -1007,128 +972,170 @@ async fn calculate_state_initial(
|
||||
heroes,
|
||||
joined_member_count,
|
||||
invited_member_count,
|
||||
joined_since_last_sync: true,
|
||||
state_events,
|
||||
..Default::default()
|
||||
})
|
||||
}
|
||||
|
||||
#[tracing::instrument(name = "incremental", level = "trace", skip_all)]
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
async fn calculate_state_incremental(
|
||||
async fn calculate_state_incremental<'a>(
|
||||
services: &Services,
|
||||
sender_user: &UserId,
|
||||
sender_user: &'a UserId,
|
||||
room_id: &RoomId,
|
||||
full_state: bool,
|
||||
_filter: &FilterDefinition,
|
||||
device_list_updates: &mut HashSet<OwnedUserId>,
|
||||
left_encrypted_users: &mut HashSet<OwnedUserId>,
|
||||
since_shortstatehash: Option<ShortStateHash>,
|
||||
current_shortstatehash: ShortStateHash,
|
||||
joined_since_last_sync: bool,
|
||||
witness: Option<&'a Witness>,
|
||||
) -> Result<StateChanges> {
|
||||
// Incremental /sync
|
||||
let since_shortstatehash =
|
||||
since_shortstatehash.expect("missing since_shortstatehash on incremental sync");
|
||||
let since_shortstatehash = since_shortstatehash.unwrap_or(current_shortstatehash);
|
||||
|
||||
let mut delta_state_events = Vec::new();
|
||||
|
||||
if since_shortstatehash != current_shortstatehash {
|
||||
let current_state_ids = services
|
||||
.rooms
|
||||
.state_accessor
|
||||
.state_full_ids(current_shortstatehash);
|
||||
|
||||
let since_state_ids = services
|
||||
.rooms
|
||||
.state_accessor
|
||||
.state_full_ids(since_shortstatehash);
|
||||
|
||||
let (current_state_ids, since_state_ids): (
|
||||
HashMap<_, OwnedEventId>,
|
||||
HashMap<_, OwnedEventId>,
|
||||
) = try_join(current_state_ids, since_state_ids).await?;
|
||||
|
||||
current_state_ids
|
||||
.iter()
|
||||
.stream()
|
||||
.ready_filter(|(key, id)| full_state || since_state_ids.get(key) != Some(id))
|
||||
.wide_filter_map(|(_, id)| services.rooms.timeline.get_pdu(id).ok())
|
||||
.ready_for_each(|pdu| delta_state_events.push(pdu))
|
||||
.await;
|
||||
}
|
||||
let state_changed = since_shortstatehash != current_shortstatehash;
|
||||
|
||||
let encrypted_room = services
|
||||
.rooms
|
||||
.state_accessor
|
||||
.state_get(current_shortstatehash, &StateEventType::RoomEncryption, "")
|
||||
.is_ok();
|
||||
.is_ok()
|
||||
.await;
|
||||
|
||||
let since_encryption = services
|
||||
.rooms
|
||||
.state_accessor
|
||||
.state_get(since_shortstatehash, &StateEventType::RoomEncryption, "")
|
||||
.is_ok();
|
||||
|
||||
let (encrypted_room, since_encryption) = join(encrypted_room, since_encryption).await;
|
||||
|
||||
// Calculations:
|
||||
let new_encrypted_room = encrypted_room && !since_encryption;
|
||||
|
||||
let send_member_count = delta_state_events
|
||||
.iter()
|
||||
.any(|event| event.kind == RoomMember);
|
||||
|
||||
if encrypted_room {
|
||||
for state_event in &delta_state_events {
|
||||
if state_event.kind != RoomMember {
|
||||
continue;
|
||||
}
|
||||
|
||||
if let Some(state_key) = &state_event.state_key {
|
||||
let user_id = UserId::parse(state_key)
|
||||
.map_err(|_| Error::bad_database("Invalid UserId in member PDU."))?;
|
||||
|
||||
if user_id == sender_user {
|
||||
continue;
|
||||
}
|
||||
|
||||
let content: RoomMemberEventContent = state_event.get_content()?;
|
||||
|
||||
match content.membership {
|
||||
| MembershipState::Join => {
|
||||
// A new user joined an encrypted room
|
||||
if !share_encrypted_room(services, sender_user, user_id, Some(room_id))
|
||||
.await
|
||||
{
|
||||
device_list_updates.insert(user_id.into());
|
||||
}
|
||||
},
|
||||
| MembershipState::Leave => {
|
||||
// Write down users that have left encrypted rooms we are in
|
||||
left_encrypted_users.insert(user_id.into());
|
||||
},
|
||||
| _ => {},
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if joined_since_last_sync && encrypted_room || new_encrypted_room {
|
||||
let updates: Vec<OwnedUserId> = services
|
||||
let state_get_shorteventid = |user_id: &'a UserId| {
|
||||
services
|
||||
.rooms
|
||||
.state_cache
|
||||
.room_members(room_id)
|
||||
.ready_filter(|user_id| sender_user != *user_id)
|
||||
.filter_map(|user_id| {
|
||||
share_encrypted_room(services, sender_user, user_id, Some(room_id))
|
||||
.map(|res| res.or_some(user_id.to_owned()))
|
||||
})
|
||||
.collect()
|
||||
.await;
|
||||
.state_accessor
|
||||
.state_get_shortid(
|
||||
current_shortstatehash,
|
||||
&StateEventType::RoomMember,
|
||||
user_id.as_str(),
|
||||
)
|
||||
.ok()
|
||||
};
|
||||
|
||||
// If the user is in a new encrypted room, give them all joined users
|
||||
device_list_updates.extend(updates);
|
||||
}
|
||||
let lazy_state_ids: OptionFuture<_> = witness
|
||||
.filter(|_| !full_state && !encrypted_room)
|
||||
.map(|witness| {
|
||||
witness
|
||||
.iter()
|
||||
.stream()
|
||||
.broad_filter_map(|user_id| state_get_shorteventid(user_id))
|
||||
.into_future()
|
||||
})
|
||||
.into();
|
||||
|
||||
let state_diff: OptionFuture<_> = (!full_state && state_changed)
|
||||
.then(|| {
|
||||
services
|
||||
.rooms
|
||||
.state_accessor
|
||||
.state_added((since_shortstatehash, current_shortstatehash))
|
||||
.boxed()
|
||||
.into_future()
|
||||
})
|
||||
.into();
|
||||
|
||||
let current_state_ids: OptionFuture<_> = full_state
|
||||
.then(|| {
|
||||
services
|
||||
.rooms
|
||||
.state_accessor
|
||||
.state_full_shortids(current_shortstatehash)
|
||||
.expect_ok()
|
||||
.boxed()
|
||||
.into_future()
|
||||
})
|
||||
.into();
|
||||
|
||||
let lazy_state_ids = lazy_state_ids
|
||||
.map(|opt| {
|
||||
opt.map(|(curr, next)| {
|
||||
let opt = curr;
|
||||
let iter = Option::into_iter(opt);
|
||||
IterStream::stream(iter).chain(next)
|
||||
})
|
||||
})
|
||||
.map(Option::into_iter)
|
||||
.map(IterStream::stream)
|
||||
.flatten_stream()
|
||||
.flatten();
|
||||
|
||||
let state_diff_ids = state_diff
|
||||
.map(|opt| {
|
||||
opt.map(|(curr, next)| {
|
||||
let opt = curr;
|
||||
let iter = Option::into_iter(opt);
|
||||
IterStream::stream(iter).chain(next)
|
||||
})
|
||||
})
|
||||
.map(Option::into_iter)
|
||||
.map(IterStream::stream)
|
||||
.flatten_stream()
|
||||
.flatten();
|
||||
|
||||
let state_events = current_state_ids
|
||||
.map(|opt| {
|
||||
opt.map(|(curr, next)| {
|
||||
let opt = curr;
|
||||
let iter = Option::into_iter(opt);
|
||||
IterStream::stream(iter).chain(next)
|
||||
})
|
||||
})
|
||||
.map(Option::into_iter)
|
||||
.map(IterStream::stream)
|
||||
.flatten_stream()
|
||||
.flatten()
|
||||
.chain(state_diff_ids)
|
||||
.broad_filter_map(|(shortstatekey, shorteventid)| async move {
|
||||
if witness.is_none() || encrypted_room {
|
||||
return Some(shorteventid);
|
||||
}
|
||||
|
||||
lazy_filter(services, sender_user, shortstatekey, shorteventid).await
|
||||
})
|
||||
.chain(lazy_state_ids)
|
||||
.broad_filter_map(|shorteventid| {
|
||||
services
|
||||
.rooms
|
||||
.short
|
||||
.get_eventid_from_short(shorteventid)
|
||||
.ok()
|
||||
})
|
||||
.broad_filter_map(|event_id: OwnedEventId| async move {
|
||||
services.rooms.timeline.get_pdu(&event_id).await.ok()
|
||||
})
|
||||
.collect::<Vec<_>>()
|
||||
.await;
|
||||
|
||||
let (device_list_updates, left_encrypted_users) = state_events
|
||||
.iter()
|
||||
.stream()
|
||||
.ready_filter(|_| encrypted_room)
|
||||
.ready_filter(|state_event| state_event.kind == RoomMember)
|
||||
.ready_filter_map(|state_event| {
|
||||
let content: RoomMemberEventContent = state_event.get_content().ok()?;
|
||||
let user_id: OwnedUserId = state_event.state_key.as_ref()?.parse().ok()?;
|
||||
|
||||
Some((content, user_id))
|
||||
})
|
||||
.fold_default(|(mut dlu, mut leu): pair_of!(HashSet<_>), (content, user_id)| async move {
|
||||
use MembershipState::*;
|
||||
|
||||
let shares_encrypted_room =
|
||||
|user_id| share_encrypted_room(services, sender_user, user_id, Some(room_id));
|
||||
|
||||
match content.membership {
|
||||
| Leave => leu.insert(user_id),
|
||||
| Join if joined_since_last_sync || !shares_encrypted_room(&user_id).await =>
|
||||
dlu.insert(user_id),
|
||||
| _ => false,
|
||||
};
|
||||
|
||||
(dlu, leu)
|
||||
})
|
||||
.await;
|
||||
|
||||
let send_member_count = state_events.iter().any(|event| event.kind == RoomMember);
|
||||
|
||||
let (joined_member_count, invited_member_count, heroes) = if send_member_count {
|
||||
calculate_counts(services, room_id, sender_user).await?
|
||||
@@ -1140,11 +1147,29 @@ async fn calculate_state_incremental(
|
||||
heroes,
|
||||
joined_member_count,
|
||||
invited_member_count,
|
||||
joined_since_last_sync,
|
||||
state_events: delta_state_events,
|
||||
state_events,
|
||||
device_list_updates,
|
||||
left_encrypted_users,
|
||||
})
|
||||
}
|
||||
|
||||
async fn lazy_filter(
|
||||
services: &Services,
|
||||
sender_user: &UserId,
|
||||
shortstatekey: ShortStateKey,
|
||||
shorteventid: ShortEventId,
|
||||
) -> Option<ShortEventId> {
|
||||
let (event_type, state_key) = services
|
||||
.rooms
|
||||
.short
|
||||
.get_statekey_from_short(shortstatekey)
|
||||
.await
|
||||
.ok()?;
|
||||
|
||||
(event_type != StateEventType::RoomMember || state_key == sender_user.as_str())
|
||||
.then_some(shorteventid)
|
||||
}
|
||||
|
||||
async fn calculate_counts(
|
||||
services: &Services,
|
||||
room_id: &RoomId,
|
||||
|
||||
@@ -153,7 +153,7 @@ pub(crate) async fn sync_events_v4_route(
|
||||
if body.extensions.account_data.enabled.unwrap_or(false) {
|
||||
account_data.global = services
|
||||
.account_data
|
||||
.changes_since(None, sender_user, globalsince)
|
||||
.changes_since(None, sender_user, globalsince, Some(next_batch))
|
||||
.ready_filter_map(|e| extract_variant!(e, AnyRawAccountDataEvent::Global))
|
||||
.collect()
|
||||
.await;
|
||||
@@ -164,7 +164,7 @@ pub(crate) async fn sync_events_v4_route(
|
||||
room.clone(),
|
||||
services
|
||||
.account_data
|
||||
.changes_since(Some(&room), sender_user, globalsince)
|
||||
.changes_since(Some(&room), sender_user, globalsince, Some(next_batch))
|
||||
.ready_filter_map(|e| extract_variant!(e, AnyRawAccountDataEvent::Room))
|
||||
.collect()
|
||||
.await,
|
||||
@@ -241,13 +241,15 @@ pub(crate) async fn sync_events_v4_route(
|
||||
.rooms
|
||||
.state_accessor
|
||||
.state_full_ids(current_shortstatehash)
|
||||
.await?;
|
||||
.collect()
|
||||
.await;
|
||||
|
||||
let since_state_ids = services
|
||||
let since_state_ids: HashMap<_, _> = services
|
||||
.rooms
|
||||
.state_accessor
|
||||
.state_full_ids(since_shortstatehash)
|
||||
.await?;
|
||||
.collect()
|
||||
.await;
|
||||
|
||||
for (key, id) in current_state_ids {
|
||||
if since_state_ids.get(&key) != Some(&id) {
|
||||
@@ -529,7 +531,7 @@ pub(crate) async fn sync_events_v4_route(
|
||||
room_id.to_owned(),
|
||||
services
|
||||
.account_data
|
||||
.changes_since(Some(room_id), sender_user, *roomsince)
|
||||
.changes_since(Some(room_id), sender_user, *roomsince, Some(next_batch))
|
||||
.ready_filter_map(|e| extract_variant!(e, AnyRawAccountDataEvent::Room))
|
||||
.collect()
|
||||
.await,
|
||||
@@ -777,7 +779,12 @@ pub(crate) async fn sync_events_v4_route(
|
||||
Some(sync_events::v4::ToDevice {
|
||||
events: services
|
||||
.users
|
||||
.get_to_device_events(sender_user, &sender_device)
|
||||
.get_to_device_events(
|
||||
sender_user,
|
||||
&sender_device,
|
||||
Some(globalsince),
|
||||
Some(next_batch),
|
||||
)
|
||||
.collect()
|
||||
.await,
|
||||
next_batch: next_batch.to_string(),
|
||||
|
||||
@@ -390,7 +390,7 @@ async fn process_rooms(
|
||||
room_id.to_owned(),
|
||||
services
|
||||
.account_data
|
||||
.changes_since(Some(room_id), sender_user, *roomsince)
|
||||
.changes_since(Some(room_id), sender_user, *roomsince, Some(next_batch))
|
||||
.ready_filter_map(|e| extract_variant!(e, AnyRawAccountDataEvent::Room))
|
||||
.collect()
|
||||
.await,
|
||||
@@ -644,7 +644,7 @@ async fn collect_account_data(
|
||||
|
||||
account_data.global = services
|
||||
.account_data
|
||||
.changes_since(None, sender_user, globalsince)
|
||||
.changes_since(None, sender_user, globalsince, None)
|
||||
.ready_filter_map(|e| extract_variant!(e, AnyRawAccountDataEvent::Global))
|
||||
.collect()
|
||||
.await;
|
||||
@@ -655,7 +655,7 @@ async fn collect_account_data(
|
||||
room.clone(),
|
||||
services
|
||||
.account_data
|
||||
.changes_since(Some(room), sender_user, globalsince)
|
||||
.changes_since(Some(room), sender_user, globalsince, None)
|
||||
.ready_filter_map(|e| extract_variant!(e, AnyRawAccountDataEvent::Room))
|
||||
.collect()
|
||||
.await,
|
||||
@@ -748,13 +748,15 @@ async fn collect_e2ee<'a>(
|
||||
.rooms
|
||||
.state_accessor
|
||||
.state_full_ids(current_shortstatehash)
|
||||
.await?;
|
||||
.collect()
|
||||
.await;
|
||||
|
||||
let since_state_ids = services
|
||||
let since_state_ids: HashMap<_, _> = services
|
||||
.rooms
|
||||
.state_accessor
|
||||
.state_full_ids(since_shortstatehash)
|
||||
.await?;
|
||||
.collect()
|
||||
.await;
|
||||
|
||||
for (key, id) in current_state_ids {
|
||||
if since_state_ids.get(&key) != Some(&id) {
|
||||
@@ -874,7 +876,7 @@ async fn collect_to_device(
|
||||
next_batch: next_batch.to_string(),
|
||||
events: services
|
||||
.users
|
||||
.get_to_device_events(sender_user, sender_device)
|
||||
.get_to_device_events(sender_user, sender_device, None, Some(next_batch))
|
||||
.collect()
|
||||
.await,
|
||||
})
|
||||
|
||||
+16
-12
@@ -10,6 +10,7 @@ use ruma::{
|
||||
},
|
||||
to_device::DeviceIdOrAllDevices,
|
||||
};
|
||||
use service::sending::EduBuf;
|
||||
|
||||
use crate::Ruma;
|
||||
|
||||
@@ -42,18 +43,21 @@ pub(crate) async fn send_event_to_device_route(
|
||||
messages.insert(target_user_id.clone(), map);
|
||||
let count = services.globals.next_count()?;
|
||||
|
||||
services.sending.send_edu_server(
|
||||
target_user_id.server_name(),
|
||||
serde_json::to_vec(&federation::transactions::edu::Edu::DirectToDevice(
|
||||
DirectDeviceContent {
|
||||
sender: sender_user.clone(),
|
||||
ev_type: body.event_type.clone(),
|
||||
message_id: count.to_string().into(),
|
||||
messages,
|
||||
},
|
||||
))
|
||||
.expect("DirectToDevice EDU can be serialized"),
|
||||
)?;
|
||||
let mut buf = EduBuf::new();
|
||||
serde_json::to_writer(
|
||||
&mut buf,
|
||||
&federation::transactions::edu::Edu::DirectToDevice(DirectDeviceContent {
|
||||
sender: sender_user.clone(),
|
||||
ev_type: body.event_type.clone(),
|
||||
message_id: count.to_string().into(),
|
||||
messages,
|
||||
}),
|
||||
)
|
||||
.expect("DirectToDevice EDU can be serialized");
|
||||
|
||||
services
|
||||
.sending
|
||||
.send_edu_server(target_user_id.server_name(), buf)?;
|
||||
|
||||
continue;
|
||||
}
|
||||
|
||||
@@ -38,7 +38,7 @@ pub(crate) async fn turn_server_route(
|
||||
let user = body.sender_user.unwrap_or_else(|| {
|
||||
UserId::parse_with_server_name(
|
||||
utils::random_string(RANDOM_USER_ID_LENGTH).to_lowercase(),
|
||||
&services.server.config.server_name,
|
||||
&services.server.name,
|
||||
)
|
||||
.unwrap()
|
||||
});
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
use std::{borrow::Borrow, iter::once};
|
||||
|
||||
use axum::extract::State;
|
||||
use conduwuit::{Error, Result};
|
||||
use conduwuit::{utils::stream::ReadyExt, Error, Result};
|
||||
use futures::StreamExt;
|
||||
use ruma::{
|
||||
api::{client::error::ErrorKind, federation::authorization::get_event_authorization},
|
||||
@@ -48,7 +48,7 @@ pub(crate) async fn get_event_authorization_route(
|
||||
.rooms
|
||||
.auth_chain
|
||||
.event_ids_iter(room_id, once(body.event_id.borrow()))
|
||||
.await?
|
||||
.ready_filter_map(Result::ok)
|
||||
.filter_map(|id| async move { services.rooms.timeline.get_pdu_json(&id).await.ok() })
|
||||
.then(|pdu| services.sending.convert_to_outgoing_federation_event(pdu))
|
||||
.collect()
|
||||
|
||||
@@ -13,7 +13,7 @@ use crate::{Error, Result, Ruma};
|
||||
/// # `POST /_matrix/federation/v1/publicRooms`
|
||||
///
|
||||
/// Lists the public rooms on this server.
|
||||
#[tracing::instrument(skip_all, fields(%client), name = "publicrooms")]
|
||||
#[tracing::instrument(name = "publicrooms", level = "debug", skip_all, fields(%client))]
|
||||
pub(crate) async fn get_public_rooms_filtered_route(
|
||||
State(services): State<crate::State>,
|
||||
InsecureClientIp(client): InsecureClientIp,
|
||||
@@ -51,7 +51,7 @@ pub(crate) async fn get_public_rooms_filtered_route(
|
||||
/// # `GET /_matrix/federation/v1/publicRooms`
|
||||
///
|
||||
/// Lists the public rooms on this server.
|
||||
#[tracing::instrument(skip_all, fields(%client), "publicrooms")]
|
||||
#[tracing::instrument(name = "publicrooms", level = "debug", skip_all, fields(%client))]
|
||||
pub(crate) async fn get_public_rooms_route(
|
||||
State(services): State<crate::State>,
|
||||
InsecureClientIp(client): InsecureClientIp,
|
||||
|
||||
+363
-261
@@ -3,17 +3,27 @@ use std::{collections::BTreeMap, net::IpAddr, time::Instant};
|
||||
use axum::extract::State;
|
||||
use axum_client_ip::InsecureClientIp;
|
||||
use conduwuit::{
|
||||
debug, debug_warn, err, error, result::LogErr, trace, utils::ReadyExt, warn, Err, Error,
|
||||
Result,
|
||||
debug,
|
||||
debug::INFO_SPAN_LEVEL,
|
||||
debug_warn, err, error,
|
||||
result::LogErr,
|
||||
trace,
|
||||
utils::{
|
||||
stream::{automatic_width, BroadbandExt, TryBroadbandExt},
|
||||
IterStream, ReadyExt,
|
||||
},
|
||||
warn, Err, Error, Result,
|
||||
};
|
||||
use futures::{FutureExt, StreamExt};
|
||||
use futures::{FutureExt, Stream, StreamExt, TryFutureExt, TryStreamExt};
|
||||
use itertools::Itertools;
|
||||
use ruma::{
|
||||
api::{
|
||||
client::error::ErrorKind,
|
||||
federation::transactions::{
|
||||
edu::{
|
||||
DeviceListUpdateContent, DirectDeviceContent, Edu, PresenceContent,
|
||||
ReceiptContent, SigningKeyUpdateContent, TypingContent,
|
||||
PresenceUpdate, ReceiptContent, ReceiptData, ReceiptMap, SigningKeyUpdateContent,
|
||||
TypingContent,
|
||||
},
|
||||
send_transaction_message,
|
||||
},
|
||||
@@ -21,27 +31,28 @@ use ruma::{
|
||||
events::receipt::{ReceiptEvent, ReceiptEventContent, ReceiptType},
|
||||
serde::Raw,
|
||||
to_device::DeviceIdOrAllDevices,
|
||||
OwnedEventId, ServerName,
|
||||
CanonicalJsonObject, OwnedEventId, OwnedRoomId, OwnedUserId, RoomId, ServerName, UserId,
|
||||
};
|
||||
use serde_json::value::RawValue as RawJsonValue;
|
||||
use service::{
|
||||
sending::{EDU_LIMIT, PDU_LIMIT},
|
||||
Services,
|
||||
};
|
||||
use utils::millis_since_unix_epoch;
|
||||
|
||||
use crate::{
|
||||
utils::{self},
|
||||
Ruma,
|
||||
};
|
||||
|
||||
type ResolvedMap = BTreeMap<OwnedEventId, Result<()>>;
|
||||
type ResolvedMap = BTreeMap<OwnedEventId, Result>;
|
||||
type Pdu = (OwnedRoomId, OwnedEventId, CanonicalJsonObject);
|
||||
|
||||
/// # `PUT /_matrix/federation/v1/send/{txnId}`
|
||||
///
|
||||
/// Push EDUs and PDUs to this server.
|
||||
#[tracing::instrument(
|
||||
name = "send",
|
||||
level = "debug",
|
||||
name = "txn",
|
||||
level = INFO_SPAN_LEVEL,
|
||||
skip_all,
|
||||
fields(
|
||||
%client,
|
||||
@@ -73,91 +84,41 @@ pub(crate) async fn send_transaction_message_route(
|
||||
|
||||
let txn_start_time = Instant::now();
|
||||
trace!(
|
||||
pdus = ?body.pdus.len(),
|
||||
edus = ?body.edus.len(),
|
||||
pdus = body.pdus.len(),
|
||||
edus = body.edus.len(),
|
||||
elapsed = ?txn_start_time.elapsed(),
|
||||
id = ?body.transaction_id,
|
||||
origin =?body.origin(),
|
||||
"Starting txn",
|
||||
);
|
||||
|
||||
let resolved_map =
|
||||
handle_pdus(&services, &client, &body.pdus, body.origin(), &txn_start_time)
|
||||
.boxed()
|
||||
.await?;
|
||||
let pdus = body
|
||||
.pdus
|
||||
.iter()
|
||||
.stream()
|
||||
.broad_then(|pdu| services.rooms.event_handler.parse_incoming_pdu(pdu))
|
||||
.inspect_err(|e| debug_warn!("Could not parse PDU: {e}"))
|
||||
.ready_filter_map(Result::ok);
|
||||
|
||||
handle_edus(&services, &client, &body.edus, body.origin())
|
||||
.boxed()
|
||||
.await;
|
||||
let edus = body
|
||||
.edus
|
||||
.iter()
|
||||
.map(|edu| edu.json().get())
|
||||
.map(serde_json::from_str)
|
||||
.filter_map(Result::ok)
|
||||
.stream();
|
||||
|
||||
let results = handle(&services, &client, body.origin(), txn_start_time, pdus, edus).await?;
|
||||
|
||||
debug!(
|
||||
pdus = ?body.pdus.len(),
|
||||
edus = ?body.edus.len(),
|
||||
pdus = body.pdus.len(),
|
||||
edus = body.edus.len(),
|
||||
elapsed = ?txn_start_time.elapsed(),
|
||||
id = ?body.transaction_id,
|
||||
origin =?body.origin(),
|
||||
"Finished txn",
|
||||
);
|
||||
|
||||
Ok(send_transaction_message::v1::Response {
|
||||
pdus: resolved_map
|
||||
.into_iter()
|
||||
.map(|(e, r)| (e, r.map_err(error::sanitized_message)))
|
||||
.collect(),
|
||||
})
|
||||
}
|
||||
|
||||
async fn handle_pdus(
|
||||
services: &Services,
|
||||
_client: &IpAddr,
|
||||
pdus: &[Box<RawJsonValue>],
|
||||
origin: &ServerName,
|
||||
txn_start_time: &Instant,
|
||||
) -> Result<ResolvedMap> {
|
||||
let mut parsed_pdus = Vec::with_capacity(pdus.len());
|
||||
for pdu in pdus {
|
||||
parsed_pdus.push(match services.rooms.event_handler.parse_incoming_pdu(pdu).await {
|
||||
| Ok(t) => t,
|
||||
| Err(e) => {
|
||||
debug_warn!("Could not parse PDU: {e}");
|
||||
continue;
|
||||
},
|
||||
});
|
||||
|
||||
// We do not add the event_id field to the pdu here because of signature
|
||||
// and hashes checks
|
||||
}
|
||||
|
||||
let mut resolved_map = BTreeMap::new();
|
||||
for (event_id, value, room_id) in parsed_pdus {
|
||||
services.server.check_running()?;
|
||||
let pdu_start_time = Instant::now();
|
||||
let mutex_lock = services
|
||||
.rooms
|
||||
.event_handler
|
||||
.mutex_federation
|
||||
.lock(&room_id)
|
||||
.await;
|
||||
|
||||
let result = services
|
||||
.rooms
|
||||
.event_handler
|
||||
.handle_incoming_pdu(origin, &room_id, &event_id, value, true)
|
||||
.boxed()
|
||||
.await
|
||||
.map(|_| ());
|
||||
|
||||
drop(mutex_lock);
|
||||
debug!(
|
||||
pdu_elapsed = ?pdu_start_time.elapsed(),
|
||||
txn_elapsed = ?txn_start_time.elapsed(),
|
||||
"Finished PDU {event_id}",
|
||||
);
|
||||
|
||||
resolved_map.insert(event_id, result);
|
||||
}
|
||||
|
||||
for (id, result) in &resolved_map {
|
||||
for (id, result) in &results {
|
||||
if let Err(e) = result {
|
||||
if matches!(e, Error::BadRequest(ErrorKind::NotFound, _)) {
|
||||
warn!("Incoming PDU failed {id}: {e:?}");
|
||||
@@ -165,39 +126,117 @@ async fn handle_pdus(
|
||||
}
|
||||
}
|
||||
|
||||
Ok(resolved_map)
|
||||
Ok(send_transaction_message::v1::Response {
|
||||
pdus: results
|
||||
.into_iter()
|
||||
.map(|(e, r)| (e, r.map_err(error::sanitized_message)))
|
||||
.collect(),
|
||||
})
|
||||
}
|
||||
|
||||
async fn handle_edus(
|
||||
async fn handle(
|
||||
services: &Services,
|
||||
client: &IpAddr,
|
||||
edus: &[Raw<Edu>],
|
||||
origin: &ServerName,
|
||||
) {
|
||||
for edu in edus
|
||||
.iter()
|
||||
.filter_map(|edu| serde_json::from_str::<Edu>(edu.json().get()).ok())
|
||||
{
|
||||
match edu {
|
||||
| Edu::Presence(presence) => {
|
||||
handle_edu_presence(services, client, origin, presence).await;
|
||||
},
|
||||
| Edu::Receipt(receipt) =>
|
||||
handle_edu_receipt(services, client, origin, receipt).await,
|
||||
| Edu::Typing(typing) => handle_edu_typing(services, client, origin, typing).await,
|
||||
| Edu::DeviceListUpdate(content) => {
|
||||
handle_edu_device_list_update(services, client, origin, content).await;
|
||||
},
|
||||
| Edu::DirectToDevice(content) => {
|
||||
handle_edu_direct_to_device(services, client, origin, content).await;
|
||||
},
|
||||
| Edu::SigningKeyUpdate(content) => {
|
||||
handle_edu_signing_key_update(services, client, origin, content).await;
|
||||
},
|
||||
| Edu::_Custom(ref _custom) => {
|
||||
debug_warn!(?edus, "received custom/unknown EDU");
|
||||
},
|
||||
}
|
||||
started: Instant,
|
||||
pdus: impl Stream<Item = Pdu> + Send,
|
||||
edus: impl Stream<Item = Edu> + Send,
|
||||
) -> Result<ResolvedMap> {
|
||||
// group pdus by room
|
||||
let pdus = pdus
|
||||
.collect()
|
||||
.map(|mut pdus: Vec<_>| {
|
||||
pdus.sort_by(|(room_a, ..), (room_b, ..)| room_a.cmp(room_b));
|
||||
pdus.into_iter()
|
||||
.into_grouping_map_by(|(room_id, ..)| room_id.clone())
|
||||
.collect()
|
||||
})
|
||||
.await;
|
||||
|
||||
// we can evaluate rooms concurrently
|
||||
let results: ResolvedMap = pdus
|
||||
.into_iter()
|
||||
.try_stream()
|
||||
.broad_and_then(|(room_id, pdus): (_, Vec<_>)| {
|
||||
handle_room(services, client, origin, started, room_id, pdus.into_iter())
|
||||
.map_ok(Vec::into_iter)
|
||||
.map_ok(IterStream::try_stream)
|
||||
})
|
||||
.try_flatten()
|
||||
.try_collect()
|
||||
.boxed()
|
||||
.await?;
|
||||
|
||||
// evaluate edus after pdus, at least for now.
|
||||
edus.for_each_concurrent(automatic_width(), |edu| handle_edu(services, client, origin, edu))
|
||||
.boxed()
|
||||
.await;
|
||||
|
||||
Ok(results)
|
||||
}
|
||||
|
||||
async fn handle_room(
|
||||
services: &Services,
|
||||
_client: &IpAddr,
|
||||
origin: &ServerName,
|
||||
txn_start_time: Instant,
|
||||
room_id: OwnedRoomId,
|
||||
pdus: impl Iterator<Item = Pdu> + Send,
|
||||
) -> Result<Vec<(OwnedEventId, Result)>> {
|
||||
let _room_lock = services
|
||||
.rooms
|
||||
.event_handler
|
||||
.mutex_federation
|
||||
.lock(&room_id)
|
||||
.await;
|
||||
|
||||
let room_id = &room_id;
|
||||
pdus.try_stream()
|
||||
.and_then(|(_, event_id, value)| async move {
|
||||
services.server.check_running()?;
|
||||
let pdu_start_time = Instant::now();
|
||||
let result = services
|
||||
.rooms
|
||||
.event_handler
|
||||
.handle_incoming_pdu(origin, room_id, &event_id, value, true)
|
||||
.await
|
||||
.map(|_| ());
|
||||
|
||||
debug!(
|
||||
pdu_elapsed = ?pdu_start_time.elapsed(),
|
||||
txn_elapsed = ?txn_start_time.elapsed(),
|
||||
"Finished PDU {event_id}",
|
||||
);
|
||||
|
||||
Ok((event_id, result))
|
||||
})
|
||||
.try_collect()
|
||||
.await
|
||||
}
|
||||
|
||||
async fn handle_edu(services: &Services, client: &IpAddr, origin: &ServerName, edu: Edu) {
|
||||
match edu {
|
||||
| Edu::Presence(presence) if services.server.config.allow_incoming_presence =>
|
||||
handle_edu_presence(services, client, origin, presence).await,
|
||||
|
||||
| Edu::Receipt(receipt) if services.server.config.allow_incoming_read_receipts =>
|
||||
handle_edu_receipt(services, client, origin, receipt).await,
|
||||
|
||||
| Edu::Typing(typing) if services.server.config.allow_incoming_typing =>
|
||||
handle_edu_typing(services, client, origin, typing).await,
|
||||
|
||||
| Edu::DeviceListUpdate(content) =>
|
||||
handle_edu_device_list_update(services, client, origin, content).await,
|
||||
|
||||
| Edu::DirectToDevice(content) =>
|
||||
handle_edu_direct_to_device(services, client, origin, content).await,
|
||||
|
||||
| Edu::SigningKeyUpdate(content) =>
|
||||
handle_edu_signing_key_update(services, client, origin, content).await,
|
||||
|
||||
| Edu::_Custom(ref _custom) => debug_warn!(?edu, "received custom/unknown EDU"),
|
||||
|
||||
| _ => trace!(?edu, "skipped"),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -207,32 +246,41 @@ async fn handle_edu_presence(
|
||||
origin: &ServerName,
|
||||
presence: PresenceContent,
|
||||
) {
|
||||
if !services.globals.allow_incoming_presence() {
|
||||
presence
|
||||
.push
|
||||
.into_iter()
|
||||
.stream()
|
||||
.for_each_concurrent(automatic_width(), |update| {
|
||||
handle_edu_presence_update(services, origin, update)
|
||||
})
|
||||
.await;
|
||||
}
|
||||
|
||||
async fn handle_edu_presence_update(
|
||||
services: &Services,
|
||||
origin: &ServerName,
|
||||
update: PresenceUpdate,
|
||||
) {
|
||||
if update.user_id.server_name() != origin {
|
||||
debug_warn!(
|
||||
%update.user_id, %origin,
|
||||
"received presence EDU for user not belonging to origin"
|
||||
);
|
||||
return;
|
||||
}
|
||||
|
||||
for update in presence.push {
|
||||
if update.user_id.server_name() != origin {
|
||||
debug_warn!(
|
||||
%update.user_id, %origin,
|
||||
"received presence EDU for user not belonging to origin"
|
||||
);
|
||||
continue;
|
||||
}
|
||||
|
||||
services
|
||||
.presence
|
||||
.set_presence(
|
||||
&update.user_id,
|
||||
&update.presence,
|
||||
Some(update.currently_active),
|
||||
Some(update.last_active_ago),
|
||||
update.status_msg.clone(),
|
||||
)
|
||||
.await
|
||||
.log_err()
|
||||
.ok();
|
||||
}
|
||||
services
|
||||
.presence
|
||||
.set_presence(
|
||||
&update.user_id,
|
||||
&update.presence,
|
||||
Some(update.currently_active),
|
||||
Some(update.last_active_ago),
|
||||
update.status_msg.clone(),
|
||||
)
|
||||
.await
|
||||
.log_err()
|
||||
.ok();
|
||||
}
|
||||
|
||||
async fn handle_edu_receipt(
|
||||
@@ -241,66 +289,94 @@ async fn handle_edu_receipt(
|
||||
origin: &ServerName,
|
||||
receipt: ReceiptContent,
|
||||
) {
|
||||
if !services.globals.allow_incoming_read_receipts() {
|
||||
receipt
|
||||
.receipts
|
||||
.into_iter()
|
||||
.stream()
|
||||
.for_each_concurrent(automatic_width(), |(room_id, room_updates)| {
|
||||
handle_edu_receipt_room(services, origin, room_id, room_updates)
|
||||
})
|
||||
.await;
|
||||
}
|
||||
|
||||
async fn handle_edu_receipt_room(
|
||||
services: &Services,
|
||||
origin: &ServerName,
|
||||
room_id: OwnedRoomId,
|
||||
room_updates: ReceiptMap,
|
||||
) {
|
||||
if services
|
||||
.rooms
|
||||
.event_handler
|
||||
.acl_check(origin, &room_id)
|
||||
.await
|
||||
.is_err()
|
||||
{
|
||||
debug_warn!(
|
||||
%origin, %room_id,
|
||||
"received read receipt EDU from ACL'd server"
|
||||
);
|
||||
return;
|
||||
}
|
||||
|
||||
for (room_id, room_updates) in receipt.receipts {
|
||||
if services
|
||||
.rooms
|
||||
.event_handler
|
||||
.acl_check(origin, &room_id)
|
||||
.await
|
||||
.is_err()
|
||||
{
|
||||
debug_warn!(
|
||||
%origin, %room_id,
|
||||
"received read receipt EDU from ACL'd server"
|
||||
);
|
||||
continue;
|
||||
}
|
||||
let room_id = &room_id;
|
||||
room_updates
|
||||
.read
|
||||
.into_iter()
|
||||
.stream()
|
||||
.for_each_concurrent(automatic_width(), |(user_id, user_updates)| async move {
|
||||
handle_edu_receipt_room_user(services, origin, room_id, &user_id, user_updates).await;
|
||||
})
|
||||
.await;
|
||||
}
|
||||
|
||||
for (user_id, user_updates) in room_updates.read {
|
||||
if user_id.server_name() != origin {
|
||||
debug_warn!(
|
||||
%user_id, %origin,
|
||||
"received read receipt EDU for user not belonging to origin"
|
||||
);
|
||||
continue;
|
||||
}
|
||||
|
||||
if services
|
||||
.rooms
|
||||
.state_cache
|
||||
.room_members(&room_id)
|
||||
.ready_any(|member| member.server_name() == user_id.server_name())
|
||||
.await
|
||||
{
|
||||
for event_id in &user_updates.event_ids {
|
||||
let user_receipts =
|
||||
BTreeMap::from([(user_id.clone(), user_updates.data.clone())]);
|
||||
let receipts = BTreeMap::from([(ReceiptType::Read, user_receipts)]);
|
||||
let receipt_content = BTreeMap::from([(event_id.to_owned(), receipts)]);
|
||||
let event = ReceiptEvent {
|
||||
content: ReceiptEventContent(receipt_content),
|
||||
room_id: room_id.clone(),
|
||||
};
|
||||
|
||||
services
|
||||
.rooms
|
||||
.read_receipt
|
||||
.readreceipt_update(&user_id, &room_id, &event)
|
||||
.await;
|
||||
}
|
||||
} else {
|
||||
debug_warn!(
|
||||
%user_id, %room_id, %origin,
|
||||
"received read receipt EDU from server who does not have a member in the room",
|
||||
);
|
||||
continue;
|
||||
}
|
||||
}
|
||||
async fn handle_edu_receipt_room_user(
|
||||
services: &Services,
|
||||
origin: &ServerName,
|
||||
room_id: &RoomId,
|
||||
user_id: &UserId,
|
||||
user_updates: ReceiptData,
|
||||
) {
|
||||
if user_id.server_name() != origin {
|
||||
debug_warn!(
|
||||
%user_id, %origin,
|
||||
"received read receipt EDU for user not belonging to origin"
|
||||
);
|
||||
return;
|
||||
}
|
||||
|
||||
if !services
|
||||
.rooms
|
||||
.state_cache
|
||||
.server_in_room(origin, room_id)
|
||||
.await
|
||||
{
|
||||
debug_warn!(
|
||||
%user_id, %room_id, %origin,
|
||||
"received read receipt EDU from server who does not have a member in the room",
|
||||
);
|
||||
return;
|
||||
}
|
||||
|
||||
let data = &user_updates.data;
|
||||
user_updates
|
||||
.event_ids
|
||||
.into_iter()
|
||||
.stream()
|
||||
.for_each_concurrent(automatic_width(), |event_id| async move {
|
||||
let user_data = [(user_id.to_owned(), data.clone())];
|
||||
let receipts = [(ReceiptType::Read, BTreeMap::from(user_data))];
|
||||
let content = [(event_id.clone(), BTreeMap::from(receipts))];
|
||||
services
|
||||
.rooms
|
||||
.read_receipt
|
||||
.readreceipt_update(user_id, room_id, &ReceiptEvent {
|
||||
content: ReceiptEventContent(content.into()),
|
||||
room_id: room_id.to_owned(),
|
||||
})
|
||||
.await;
|
||||
})
|
||||
.await;
|
||||
}
|
||||
|
||||
async fn handle_edu_typing(
|
||||
@@ -309,10 +385,6 @@ async fn handle_edu_typing(
|
||||
origin: &ServerName,
|
||||
typing: TypingContent,
|
||||
) {
|
||||
if !services.server.config.allow_incoming_typing {
|
||||
return;
|
||||
}
|
||||
|
||||
if typing.user_id.server_name() != origin {
|
||||
debug_warn!(
|
||||
%typing.user_id, %origin,
|
||||
@@ -335,41 +407,38 @@ async fn handle_edu_typing(
|
||||
return;
|
||||
}
|
||||
|
||||
if services
|
||||
if !services
|
||||
.rooms
|
||||
.state_cache
|
||||
.is_joined(&typing.user_id, &typing.room_id)
|
||||
.await
|
||||
{
|
||||
if typing.typing {
|
||||
let timeout = utils::millis_since_unix_epoch().saturating_add(
|
||||
services
|
||||
.server
|
||||
.config
|
||||
.typing_federation_timeout_s
|
||||
.saturating_mul(1000),
|
||||
);
|
||||
services
|
||||
.rooms
|
||||
.typing
|
||||
.typing_add(&typing.user_id, &typing.room_id, timeout)
|
||||
.await
|
||||
.log_err()
|
||||
.ok();
|
||||
} else {
|
||||
services
|
||||
.rooms
|
||||
.typing
|
||||
.typing_remove(&typing.user_id, &typing.room_id)
|
||||
.await
|
||||
.log_err()
|
||||
.ok();
|
||||
}
|
||||
} else {
|
||||
debug_warn!(
|
||||
%typing.user_id, %typing.room_id, %origin,
|
||||
"received typing EDU for user not in room"
|
||||
);
|
||||
return;
|
||||
}
|
||||
|
||||
if typing.typing {
|
||||
let secs = services.server.config.typing_federation_timeout_s;
|
||||
let timeout = millis_since_unix_epoch().saturating_add(secs.saturating_mul(1000));
|
||||
|
||||
services
|
||||
.rooms
|
||||
.typing
|
||||
.typing_add(&typing.user_id, &typing.room_id, timeout)
|
||||
.await
|
||||
.log_err()
|
||||
.ok();
|
||||
} else {
|
||||
services
|
||||
.rooms
|
||||
.typing
|
||||
.typing_remove(&typing.user_id, &typing.room_id)
|
||||
.await
|
||||
.log_err()
|
||||
.ok();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -398,7 +467,12 @@ async fn handle_edu_direct_to_device(
|
||||
origin: &ServerName,
|
||||
content: DirectDeviceContent,
|
||||
) {
|
||||
let DirectDeviceContent { sender, ev_type, message_id, messages } = content;
|
||||
let DirectDeviceContent {
|
||||
ref sender,
|
||||
ref ev_type,
|
||||
ref message_id,
|
||||
messages,
|
||||
} = content;
|
||||
|
||||
if sender.server_name() != origin {
|
||||
debug_warn!(
|
||||
@@ -411,60 +485,88 @@ async fn handle_edu_direct_to_device(
|
||||
// Check if this is a new transaction id
|
||||
if services
|
||||
.transaction_ids
|
||||
.existing_txnid(&sender, None, &message_id)
|
||||
.existing_txnid(sender, None, message_id)
|
||||
.await
|
||||
.is_ok()
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
for (target_user_id, map) in &messages {
|
||||
for (target_device_id_maybe, event) in map {
|
||||
let Ok(event) = event.deserialize_as().map_err(|e| {
|
||||
err!(Request(InvalidParam(error!("To-Device event is invalid: {e}"))))
|
||||
}) else {
|
||||
continue;
|
||||
};
|
||||
|
||||
let ev_type = ev_type.to_string();
|
||||
match target_device_id_maybe {
|
||||
| DeviceIdOrAllDevices::DeviceId(target_device_id) => {
|
||||
services
|
||||
.users
|
||||
.add_to_device_event(
|
||||
&sender,
|
||||
target_user_id,
|
||||
target_device_id,
|
||||
&ev_type,
|
||||
event,
|
||||
)
|
||||
.await;
|
||||
},
|
||||
|
||||
| DeviceIdOrAllDevices::AllDevices => {
|
||||
let (sender, ev_type, event) = (&sender, &ev_type, &event);
|
||||
services
|
||||
.users
|
||||
.all_device_ids(target_user_id)
|
||||
.for_each(|target_device_id| {
|
||||
services.users.add_to_device_event(
|
||||
sender,
|
||||
target_user_id,
|
||||
target_device_id,
|
||||
ev_type,
|
||||
event.clone(),
|
||||
)
|
||||
})
|
||||
.await;
|
||||
},
|
||||
}
|
||||
}
|
||||
}
|
||||
// process messages concurrently for different users
|
||||
let ev_type = ev_type.to_string();
|
||||
messages
|
||||
.into_iter()
|
||||
.stream()
|
||||
.for_each_concurrent(automatic_width(), |(target_user_id, map)| {
|
||||
handle_edu_direct_to_device_user(services, target_user_id, sender, &ev_type, map)
|
||||
})
|
||||
.await;
|
||||
|
||||
// Save transaction id with empty data
|
||||
services
|
||||
.transaction_ids
|
||||
.add_txnid(&sender, None, &message_id, &[]);
|
||||
.add_txnid(sender, None, message_id, &[]);
|
||||
}
|
||||
|
||||
async fn handle_edu_direct_to_device_user<Event: Send + Sync>(
|
||||
services: &Services,
|
||||
target_user_id: OwnedUserId,
|
||||
sender: &UserId,
|
||||
ev_type: &str,
|
||||
map: BTreeMap<DeviceIdOrAllDevices, Raw<Event>>,
|
||||
) {
|
||||
for (target_device_id_maybe, event) in map {
|
||||
let Ok(event) = event
|
||||
.deserialize_as()
|
||||
.map_err(|e| err!(Request(InvalidParam(error!("To-Device event is invalid: {e}")))))
|
||||
else {
|
||||
continue;
|
||||
};
|
||||
|
||||
handle_edu_direct_to_device_event(
|
||||
services,
|
||||
&target_user_id,
|
||||
sender,
|
||||
target_device_id_maybe,
|
||||
ev_type,
|
||||
event,
|
||||
)
|
||||
.await;
|
||||
}
|
||||
}
|
||||
|
||||
async fn handle_edu_direct_to_device_event(
|
||||
services: &Services,
|
||||
target_user_id: &UserId,
|
||||
sender: &UserId,
|
||||
target_device_id_maybe: DeviceIdOrAllDevices,
|
||||
ev_type: &str,
|
||||
event: serde_json::Value,
|
||||
) {
|
||||
match target_device_id_maybe {
|
||||
| DeviceIdOrAllDevices::DeviceId(ref target_device_id) => {
|
||||
services
|
||||
.users
|
||||
.add_to_device_event(sender, target_user_id, target_device_id, ev_type, event)
|
||||
.await;
|
||||
},
|
||||
|
||||
| DeviceIdOrAllDevices::AllDevices => {
|
||||
services
|
||||
.users
|
||||
.all_device_ids(target_user_id)
|
||||
.for_each(|target_device_id| {
|
||||
services.users.add_to_device_event(
|
||||
sender,
|
||||
target_user_id,
|
||||
target_device_id,
|
||||
ev_type,
|
||||
event.clone(),
|
||||
)
|
||||
})
|
||||
.await;
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
async fn handle_edu_signing_key_update(
|
||||
|
||||
@@ -1,10 +1,10 @@
|
||||
#![allow(deprecated)]
|
||||
|
||||
use std::{borrow::Borrow, collections::HashMap};
|
||||
use std::borrow::Borrow;
|
||||
|
||||
use axum::extract::State;
|
||||
use conduwuit::{
|
||||
err,
|
||||
at, err,
|
||||
pdu::gen_event_id_canonical_json,
|
||||
utils::stream::{IterStream, TryBroadbandExt},
|
||||
warn, Err, Result,
|
||||
@@ -211,14 +211,16 @@ async fn create_join_event(
|
||||
|
||||
drop(mutex_lock);
|
||||
|
||||
let state_ids: HashMap<_, OwnedEventId> = services
|
||||
let state_ids: Vec<OwnedEventId> = services
|
||||
.rooms
|
||||
.state_accessor
|
||||
.state_full_ids(shortstatehash)
|
||||
.await?;
|
||||
.map(at!(1))
|
||||
.collect()
|
||||
.await;
|
||||
|
||||
let state = state_ids
|
||||
.values()
|
||||
.iter()
|
||||
.try_stream()
|
||||
.broad_and_then(|event_id| services.rooms.timeline.get_pdu_json(event_id))
|
||||
.broad_and_then(|pdu| {
|
||||
@@ -231,13 +233,11 @@ async fn create_join_event(
|
||||
.boxed()
|
||||
.await?;
|
||||
|
||||
let starting_events = state_ids.values().map(Borrow::borrow);
|
||||
let starting_events = state_ids.iter().map(Borrow::borrow);
|
||||
let auth_chain = services
|
||||
.rooms
|
||||
.auth_chain
|
||||
.event_ids_iter(room_id, starting_events)
|
||||
.await?
|
||||
.map(Ok)
|
||||
.broad_and_then(|event_id| async move {
|
||||
services.rooms.timeline.get_pdu_json(&event_id).await
|
||||
})
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
use std::{borrow::Borrow, iter::once};
|
||||
|
||||
use axum::extract::State;
|
||||
use conduwuit::{err, result::LogErr, utils::IterStream, Result};
|
||||
use conduwuit::{at, err, utils::IterStream, Result};
|
||||
use futures::{FutureExt, StreamExt, TryStreamExt};
|
||||
use ruma::{api::federation::event::get_room_state, OwnedEventId};
|
||||
|
||||
@@ -35,11 +35,9 @@ pub(crate) async fn get_room_state_route(
|
||||
.rooms
|
||||
.state_accessor
|
||||
.state_full_ids(shortstatehash)
|
||||
.await
|
||||
.log_err()
|
||||
.map_err(|_| err!(Request(NotFound("PDU state IDs not found."))))?
|
||||
.into_values()
|
||||
.collect();
|
||||
.map(at!(1))
|
||||
.collect()
|
||||
.await;
|
||||
|
||||
let pdus = state_ids
|
||||
.iter()
|
||||
@@ -58,8 +56,6 @@ pub(crate) async fn get_room_state_route(
|
||||
.rooms
|
||||
.auth_chain
|
||||
.event_ids_iter(&body.room_id, once(body.event_id.borrow()))
|
||||
.await?
|
||||
.map(Ok)
|
||||
.and_then(|id| async move { services.rooms.timeline.get_pdu_json(&id).await })
|
||||
.and_then(|pdu| {
|
||||
services
|
||||
|
||||
@@ -1,8 +1,8 @@
|
||||
use std::{borrow::Borrow, iter::once};
|
||||
|
||||
use axum::extract::State;
|
||||
use conduwuit::{err, Result};
|
||||
use futures::StreamExt;
|
||||
use conduwuit::{at, err, Result};
|
||||
use futures::{StreamExt, TryStreamExt};
|
||||
use ruma::{api::federation::event::get_room_state_ids, OwnedEventId};
|
||||
|
||||
use super::AccessCheck;
|
||||
@@ -36,19 +36,16 @@ pub(crate) async fn get_room_state_ids_route(
|
||||
.rooms
|
||||
.state_accessor
|
||||
.state_full_ids(shortstatehash)
|
||||
.await
|
||||
.map_err(|_| err!(Request(NotFound("State ids not found"))))?
|
||||
.into_values()
|
||||
.collect();
|
||||
.map(at!(1))
|
||||
.collect()
|
||||
.await;
|
||||
|
||||
let auth_chain_ids = services
|
||||
.rooms
|
||||
.auth_chain
|
||||
.event_ids_iter(&body.room_id, once(body.event_id.borrow()))
|
||||
.await?
|
||||
.map(|id| (*id).to_owned())
|
||||
.collect()
|
||||
.await;
|
||||
.try_collect()
|
||||
.await?;
|
||||
|
||||
Ok(get_room_state_ids::v1::Response { auth_chain_ids, pdu_ids })
|
||||
}
|
||||
|
||||
+19
-12
@@ -8,6 +8,7 @@ use std::{
|
||||
};
|
||||
|
||||
use arrayvec::ArrayVec;
|
||||
use const_str::concat_bytes;
|
||||
use tikv_jemalloc_ctl as mallctl;
|
||||
use tikv_jemalloc_sys as ffi;
|
||||
use tikv_jemallocator as jemalloc;
|
||||
@@ -20,18 +21,24 @@ use crate::{
|
||||
|
||||
#[cfg(feature = "jemalloc_conf")]
|
||||
#[unsafe(no_mangle)]
|
||||
pub static malloc_conf: &[u8] = b"\
|
||||
metadata_thp:always\
|
||||
,percpu_arena:percpu\
|
||||
,background_thread:true\
|
||||
,max_background_threads:-1\
|
||||
,lg_extent_max_active_fit:4\
|
||||
,oversize_threshold:16777216\
|
||||
,tcache_max:2097152\
|
||||
,dirty_decay_ms:16000\
|
||||
,muzzy_decay_ms:144000\
|
||||
,prof_active:false\
|
||||
\0";
|
||||
pub static malloc_conf: &[u8] = concat_bytes!(
|
||||
"lg_extent_max_active_fit:4",
|
||||
",oversize_threshold:16777216",
|
||||
",tcache_max:2097152",
|
||||
",dirty_decay_ms:16000",
|
||||
",muzzy_decay_ms:144000",
|
||||
",percpu_arena:percpu",
|
||||
",metadata_thp:always",
|
||||
",background_thread:true",
|
||||
",max_background_threads:-1",
|
||||
MALLOC_CONF_PROF,
|
||||
0
|
||||
);
|
||||
|
||||
#[cfg(all(feature = "jemalloc_conf", feature = "jemalloc_prof"))]
|
||||
const MALLOC_CONF_PROF: &str = ",prof_active:false";
|
||||
#[cfg(all(feature = "jemalloc_conf", not(feature = "jemalloc_prof")))]
|
||||
const MALLOC_CONF_PROF: &str = "";
|
||||
|
||||
#[global_allocator]
|
||||
static JEMALLOC: jemalloc::Jemalloc = jemalloc::Jemalloc;
|
||||
|
||||
@@ -6,8 +6,24 @@ use figment::Figment;
|
||||
use super::DEPRECATED_KEYS;
|
||||
use crate::{debug, debug_info, debug_warn, error, warn, Config, Err, Result, Server};
|
||||
|
||||
/// Performs check() with additional checks specific to reloading old config
|
||||
/// with new config.
|
||||
pub fn reload(old: &Config, new: &Config) -> Result {
|
||||
check(new)?;
|
||||
|
||||
if new.server_name != old.server_name {
|
||||
return Err!(Config(
|
||||
"server_name",
|
||||
"You can't change the server's name from {:?}.",
|
||||
old.server_name
|
||||
));
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[allow(clippy::cognitive_complexity)]
|
||||
pub fn check(config: &Config) -> Result<()> {
|
||||
pub fn check(config: &Config) -> Result {
|
||||
if cfg!(debug_assertions) {
|
||||
warn!("Note: conduwuit was built without optimisations (i.e. debug build)");
|
||||
}
|
||||
@@ -22,7 +38,7 @@ pub fn check(config: &Config) -> Result<()> {
|
||||
));
|
||||
}
|
||||
|
||||
if cfg!(all(feature = "hardened_malloc", feature = "jemalloc")) {
|
||||
if cfg!(all(feature = "hardened_malloc", feature = "jemalloc", not(target_env = "msvc"))) {
|
||||
debug_warn!(
|
||||
"hardened_malloc and jemalloc compile-time features are both enabled, this causes \
|
||||
jemalloc to be used."
|
||||
|
||||
+107
-3
@@ -52,7 +52,7 @@ use crate::{err, error::Error, utils::sys, Result};
|
||||
### For more information, see:
|
||||
### https://conduwuit.puppyirl.gay/configuration.html
|
||||
"#,
|
||||
ignore = "catchall well_known tls"
|
||||
ignore = "catchall well_known tls blurhashing"
|
||||
)]
|
||||
pub struct Config {
|
||||
/// The server_name is the pretty name of this server. It is used as a
|
||||
@@ -480,6 +480,36 @@ pub struct Config {
|
||||
#[serde(default = "default_pusher_idle_timeout")]
|
||||
pub pusher_idle_timeout: u64,
|
||||
|
||||
/// Maximum time to receive a request from a client (seconds).
|
||||
///
|
||||
/// default: 75
|
||||
#[serde(default = "default_client_receive_timeout")]
|
||||
pub client_receive_timeout: u64,
|
||||
|
||||
/// Maximum time to process a request received from a client (seconds).
|
||||
///
|
||||
/// default: 180
|
||||
#[serde(default = "default_client_request_timeout")]
|
||||
pub client_request_timeout: u64,
|
||||
|
||||
/// Maximum time to transmit a response to a client (seconds)
|
||||
///
|
||||
/// default: 120
|
||||
#[serde(default = "default_client_response_timeout")]
|
||||
pub client_response_timeout: u64,
|
||||
|
||||
/// Grace period for clean shutdown of client requests (seconds).
|
||||
///
|
||||
/// default: 10
|
||||
#[serde(default = "default_client_shutdown_timeout")]
|
||||
pub client_shutdown_timeout: u64,
|
||||
|
||||
/// Grace period for clean shutdown of federation requests (seconds).
|
||||
///
|
||||
/// default: 5
|
||||
#[serde(default = "default_sender_shutdown_timeout")]
|
||||
pub sender_shutdown_timeout: u64,
|
||||
|
||||
/// Enables registration. If set to false, no users can register on this
|
||||
/// server.
|
||||
///
|
||||
@@ -510,8 +540,9 @@ pub struct Config {
|
||||
/// display: sensitive
|
||||
pub registration_token: Option<String>,
|
||||
|
||||
/// Path to a file on the system that gets read for the registration token.
|
||||
/// this config option takes precedence/priority over "registration_token".
|
||||
/// Path to a file on the system that gets read for additional registration
|
||||
/// tokens. Multiple tokens can be added if you separate them with
|
||||
/// whitespace
|
||||
///
|
||||
/// conduwuit must be able to access the file, and it must not be empty
|
||||
///
|
||||
@@ -1049,6 +1080,15 @@ pub struct Config {
|
||||
#[serde(default)]
|
||||
pub rocksdb_paranoid_file_checks: bool,
|
||||
|
||||
/// Enables or disables checksum verification in rocksdb at runtime.
|
||||
/// Checksums are usually hardware accelerated with low overhead; they are
|
||||
/// enabled in rocksdb by default. Older or slower platforms may see gains
|
||||
/// from disabling.
|
||||
///
|
||||
/// default: true
|
||||
#[serde(default = "true_fn")]
|
||||
pub rocksdb_checksums: bool,
|
||||
|
||||
/// Database repair mode (for RocksDB SST corruption).
|
||||
///
|
||||
/// Use this option when the server reports corruption while running or
|
||||
@@ -1545,6 +1585,15 @@ pub struct Config {
|
||||
#[serde(default)]
|
||||
pub admin_execute_errors_ignore: bool,
|
||||
|
||||
/// List of admin commands to execute on SIGUSR2.
|
||||
///
|
||||
/// Similar to admin_execute, but these commands are executed when the
|
||||
/// server receives SIGUSR2 on supporting platforms.
|
||||
///
|
||||
/// default: []
|
||||
#[serde(default)]
|
||||
pub admin_signal_execute: Vec<String>,
|
||||
|
||||
/// Controls the max log level for admin command log captures (logs
|
||||
/// generated from running admin commands). Defaults to "info" on release
|
||||
/// builds, else "debug" on debug builds.
|
||||
@@ -1733,6 +1782,16 @@ pub struct Config {
|
||||
#[serde(default = "true_fn")]
|
||||
pub listening: bool,
|
||||
|
||||
/// Enables configuration reload when the server receives SIGUSR1 on
|
||||
/// supporting platforms.
|
||||
///
|
||||
/// default: true
|
||||
#[serde(default = "true_fn")]
|
||||
pub config_reload_signal: bool,
|
||||
|
||||
// external structure; separate section
|
||||
#[serde(default)]
|
||||
pub blurhashing: BlurhashConfig,
|
||||
#[serde(flatten)]
|
||||
#[allow(clippy::zero_sized_map_values)]
|
||||
// this is a catchall, the map shouldn't be zero at runtime
|
||||
@@ -1783,6 +1842,31 @@ pub struct WellKnownConfig {
|
||||
pub support_mxid: Option<OwnedUserId>,
|
||||
}
|
||||
|
||||
#[derive(Clone, Copy, Debug, Deserialize, Default)]
|
||||
#[allow(rustdoc::broken_intra_doc_links, rustdoc::bare_urls)]
|
||||
#[config_example_generator(filename = "conduwuit-example.toml", section = "global.blurhashing")]
|
||||
pub struct BlurhashConfig {
|
||||
/// blurhashing x component, 4 is recommended by https://blurha.sh/
|
||||
///
|
||||
/// default: 4
|
||||
#[serde(default = "default_blurhash_x_component")]
|
||||
pub components_x: u32,
|
||||
/// blurhashing y component, 3 is recommended by https://blurha.sh/
|
||||
///
|
||||
/// default: 3
|
||||
#[serde(default = "default_blurhash_y_component")]
|
||||
pub components_y: u32,
|
||||
/// Max raw size that the server will blurhash, this is the size of the
|
||||
/// image after converting it to raw data, it should be higher than the
|
||||
/// upload limit but not too high. The higher it is the higher the
|
||||
/// potential load will be for clients requesting blurhashes. The default
|
||||
/// is 33.55MB. Setting it to 0 disables blurhashing.
|
||||
///
|
||||
/// default: 33554432
|
||||
#[serde(default = "default_blurhash_max_raw_size")]
|
||||
pub blurhash_max_raw_size: u64,
|
||||
}
|
||||
|
||||
#[derive(Deserialize, Clone, Debug)]
|
||||
#[serde(transparent)]
|
||||
struct ListeningPort {
|
||||
@@ -2144,3 +2228,23 @@ fn default_stream_width_default() -> usize { 32 }
|
||||
fn default_stream_width_scale() -> f32 { 1.0 }
|
||||
|
||||
fn default_stream_amplification() -> usize { 1024 }
|
||||
|
||||
fn default_client_receive_timeout() -> u64 { 75 }
|
||||
|
||||
fn default_client_request_timeout() -> u64 { 180 }
|
||||
|
||||
fn default_client_response_timeout() -> u64 { 120 }
|
||||
|
||||
fn default_client_shutdown_timeout() -> u64 { 15 }
|
||||
|
||||
fn default_sender_shutdown_timeout() -> u64 { 5 }
|
||||
|
||||
// blurhashing defaults recommended by https://blurha.sh/
|
||||
// 2^25
|
||||
pub(super) fn default_blurhash_max_raw_size() -> u64 { 33_554_432 }
|
||||
|
||||
pub(super) fn default_blurhash_x_component() -> u32 { 4 }
|
||||
|
||||
pub(super) fn default_blurhash_y_component() -> u32 { 3 }
|
||||
|
||||
// end recommended & blurhashing defaults
|
||||
|
||||
+21
-4
@@ -1,9 +1,10 @@
|
||||
#![allow(clippy::disallowed_macros)]
|
||||
|
||||
use std::{any::Any, panic};
|
||||
use std::{any::Any, env, panic, sync::LazyLock};
|
||||
|
||||
// Export debug proc_macros
|
||||
pub use conduwuit_macros::recursion_depth;
|
||||
use tracing::Level;
|
||||
|
||||
// Export all of the ancillary tools from here as well.
|
||||
pub use crate::{result::DebugInspect, utils::debug::*};
|
||||
@@ -51,16 +52,32 @@ macro_rules! debug_info {
|
||||
}
|
||||
}
|
||||
|
||||
pub fn set_panic_trap() {
|
||||
pub const INFO_SPAN_LEVEL: Level = if cfg!(debug_assertions) {
|
||||
Level::INFO
|
||||
} else {
|
||||
Level::DEBUG
|
||||
};
|
||||
|
||||
pub static DEBUGGER: LazyLock<bool> =
|
||||
LazyLock::new(|| env::var("_").unwrap_or_default().ends_with("gdb"));
|
||||
|
||||
#[cfg_attr(debug_assertions, crate::ctor)]
|
||||
#[cfg_attr(not(debug_assertions), allow(dead_code))]
|
||||
fn set_panic_trap() {
|
||||
if !*DEBUGGER {
|
||||
return;
|
||||
}
|
||||
|
||||
let next = panic::take_hook();
|
||||
panic::set_hook(Box::new(move |info| {
|
||||
panic_handler(info, &next);
|
||||
}));
|
||||
}
|
||||
|
||||
#[inline(always)]
|
||||
#[cold]
|
||||
#[inline(never)]
|
||||
#[allow(deprecated_in_future)]
|
||||
fn panic_handler(info: &panic::PanicHookInfo<'_>, next: &dyn Fn(&panic::PanicHookInfo<'_>)) {
|
||||
pub fn panic_handler(info: &panic::PanicHookInfo<'_>, next: &dyn Fn(&panic::PanicHookInfo<'_>)) {
|
||||
trap();
|
||||
next(info);
|
||||
}
|
||||
|
||||
@@ -106,6 +106,7 @@ pub(super) fn io_error_code(kind: std::io::ErrorKind) -> StatusCode {
|
||||
| ErrorKind::TimedOut => StatusCode::GATEWAY_TIMEOUT,
|
||||
| ErrorKind::FileTooLarge => StatusCode::PAYLOAD_TOO_LARGE,
|
||||
| ErrorKind::StorageFull => StatusCode::INSUFFICIENT_STORAGE,
|
||||
| ErrorKind::Interrupted => StatusCode::SERVICE_UNAVAILABLE,
|
||||
| _ => StatusCode::INTERNAL_SERVER_ERROR,
|
||||
}
|
||||
}
|
||||
|
||||
+71
-6
@@ -1,3 +1,5 @@
|
||||
use std::{env, io, sync::LazyLock};
|
||||
|
||||
use tracing::{
|
||||
field::{Field, Visit},
|
||||
Event, Level, Subscriber,
|
||||
@@ -7,12 +9,59 @@ use tracing_subscriber::{
|
||||
fmt,
|
||||
fmt::{
|
||||
format::{Compact, DefaultVisitor, Format, Full, Pretty, Writer},
|
||||
FmtContext, FormatEvent, FormatFields,
|
||||
FmtContext, FormatEvent, FormatFields, MakeWriter,
|
||||
},
|
||||
registry::LookupSpan,
|
||||
};
|
||||
|
||||
use crate::{Config, Result};
|
||||
use crate::{apply, Config, Result};
|
||||
|
||||
static SYSTEMD_MODE: LazyLock<bool> =
|
||||
LazyLock::new(|| env::var("SYSTEMD_EXEC_PID").is_ok() && env::var("JOURNAL_STREAM").is_ok());
|
||||
|
||||
pub struct ConsoleWriter {
|
||||
stdout: io::Stdout,
|
||||
stderr: io::Stderr,
|
||||
_journal_stream: [u64; 2],
|
||||
use_stderr: bool,
|
||||
}
|
||||
|
||||
impl ConsoleWriter {
|
||||
#[must_use]
|
||||
pub fn new(_config: &Config) -> Self {
|
||||
let journal_stream = get_journal_stream();
|
||||
Self {
|
||||
stdout: io::stdout(),
|
||||
stderr: io::stderr(),
|
||||
_journal_stream: journal_stream.into(),
|
||||
use_stderr: journal_stream.0 != 0,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a> MakeWriter<'a> for ConsoleWriter {
|
||||
type Writer = &'a Self;
|
||||
|
||||
fn make_writer(&'a self) -> Self::Writer { self }
|
||||
}
|
||||
|
||||
impl io::Write for &'_ ConsoleWriter {
|
||||
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
|
||||
if self.use_stderr {
|
||||
self.stderr.lock().write(buf)
|
||||
} else {
|
||||
self.stdout.lock().write(buf)
|
||||
}
|
||||
}
|
||||
|
||||
fn flush(&mut self) -> io::Result<()> {
|
||||
if self.use_stderr {
|
||||
self.stderr.lock().flush()
|
||||
} else {
|
||||
self.stdout.lock().flush()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub struct ConsoleFormat {
|
||||
_compact: Format<Compact>,
|
||||
@@ -20,10 +69,6 @@ pub struct ConsoleFormat {
|
||||
pretty: Format<Pretty>,
|
||||
}
|
||||
|
||||
struct ConsoleVisitor<'a> {
|
||||
visitor: DefaultVisitor<'a>,
|
||||
}
|
||||
|
||||
impl ConsoleFormat {
|
||||
#[must_use]
|
||||
pub fn new(config: &Config) -> Self {
|
||||
@@ -68,6 +113,10 @@ where
|
||||
}
|
||||
}
|
||||
|
||||
struct ConsoleVisitor<'a> {
|
||||
visitor: DefaultVisitor<'a>,
|
||||
}
|
||||
|
||||
impl<'writer> FormatFields<'writer> for ConsoleFormat {
|
||||
fn format_fields<R>(&self, writer: Writer<'writer>, fields: R) -> Result<(), std::fmt::Error>
|
||||
where
|
||||
@@ -92,3 +141,19 @@ impl Visit for ConsoleVisitor<'_> {
|
||||
self.visitor.record_debug(field, value);
|
||||
}
|
||||
}
|
||||
|
||||
#[must_use]
|
||||
fn get_journal_stream() -> (u64, u64) {
|
||||
is_systemd_mode()
|
||||
.then(|| env::var("JOURNAL_STREAM").ok())
|
||||
.flatten()
|
||||
.as_deref()
|
||||
.and_then(|s| s.split_once(':'))
|
||||
.map(apply!(2, str::parse))
|
||||
.map(apply!(2, Result::unwrap_or_default))
|
||||
.unwrap_or((0, 0))
|
||||
}
|
||||
|
||||
#[inline]
|
||||
#[must_use]
|
||||
pub fn is_systemd_mode() -> bool { *SYSTEMD_MODE }
|
||||
|
||||
+2
-2
@@ -2,14 +2,14 @@
|
||||
|
||||
pub mod capture;
|
||||
pub mod color;
|
||||
mod console;
|
||||
pub mod console;
|
||||
pub mod fmt;
|
||||
pub mod fmt_span;
|
||||
mod reload;
|
||||
mod suppress;
|
||||
|
||||
pub use capture::Capture;
|
||||
pub use console::ConsoleFormat;
|
||||
pub use console::{is_systemd_mode, ConsoleFormat, ConsoleWriter};
|
||||
pub use reload::{LogLevelReloadHandles, ReloadHandle};
|
||||
pub use suppress::Suppress;
|
||||
pub use tracing::Level;
|
||||
|
||||
@@ -19,8 +19,6 @@ pub struct Metrics {
|
||||
runtime_intervals: std::sync::Mutex<Option<RuntimeIntervals>>,
|
||||
|
||||
// TODO: move stats
|
||||
pub requests_spawn_active: AtomicU32,
|
||||
pub requests_spawn_finished: AtomicU32,
|
||||
pub requests_handle_active: AtomicU32,
|
||||
pub requests_handle_finished: AtomicU32,
|
||||
pub requests_panic: AtomicU32,
|
||||
@@ -48,8 +46,6 @@ impl Metrics {
|
||||
#[cfg(tokio_unstable)]
|
||||
runtime_intervals: std::sync::Mutex::new(runtime_intervals),
|
||||
|
||||
requests_spawn_active: AtomicU32::new(0),
|
||||
requests_spawn_finished: AtomicU32::new(0),
|
||||
requests_handle_active: AtomicU32::new(0),
|
||||
requests_handle_finished: AtomicU32::new(0),
|
||||
requests_panic: AtomicU32::new(0),
|
||||
|
||||
@@ -90,3 +90,21 @@ pub fn copy_redacts(&self) -> (Option<OwnedEventId>, Box<RawJsonValue>) {
|
||||
|
||||
(self.redacts.clone(), self.content.clone())
|
||||
}
|
||||
|
||||
#[implement(super::Pdu)]
|
||||
#[must_use]
|
||||
pub fn redacts_id(&self, room_version: &RoomVersionId) -> Option<OwnedEventId> {
|
||||
use RoomVersionId::*;
|
||||
|
||||
if self.kind != TimelineEventType::RoomRedaction {
|
||||
return None;
|
||||
}
|
||||
|
||||
match *room_version {
|
||||
| V1 | V2 | V3 | V4 | V5 | V6 | V7 | V8 | V9 | V10 => self.redacts.clone(),
|
||||
| _ =>
|
||||
self.get_content::<RoomRedactionEventContent>()
|
||||
.ok()?
|
||||
.redacts,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -116,7 +116,7 @@ pub fn to_message_like_event(&self) -> Raw<AnyMessageLikeEvent> {
|
||||
|
||||
#[must_use]
|
||||
#[implement(super::Pdu)]
|
||||
pub fn to_state_event_value(&self) -> JsonValue {
|
||||
pub fn into_state_event_value(self) -> JsonValue {
|
||||
let mut json = json!({
|
||||
"content": self.content,
|
||||
"type": self.kind,
|
||||
@@ -127,7 +127,7 @@ pub fn to_state_event_value(&self) -> JsonValue {
|
||||
"state_key": self.state_key,
|
||||
});
|
||||
|
||||
if let Some(unsigned) = &self.unsigned {
|
||||
if let Some(unsigned) = self.unsigned {
|
||||
json["unsigned"] = json!(unsigned);
|
||||
}
|
||||
|
||||
@@ -136,8 +136,8 @@ pub fn to_state_event_value(&self) -> JsonValue {
|
||||
|
||||
#[must_use]
|
||||
#[implement(super::Pdu)]
|
||||
pub fn to_state_event(&self) -> Raw<AnyStateEvent> {
|
||||
serde_json::from_value(self.to_state_event_value()).expect("Raw::from_value always works")
|
||||
pub fn into_state_event(self) -> Raw<AnyStateEvent> {
|
||||
serde_json::from_value(self.into_state_event_value()).expect("Raw::from_value always works")
|
||||
}
|
||||
|
||||
#[must_use]
|
||||
@@ -188,7 +188,7 @@ pub fn to_stripped_spacechild_state_event(&self) -> Raw<HierarchySpaceChildEvent
|
||||
|
||||
#[must_use]
|
||||
#[implement(super::Pdu)]
|
||||
pub fn to_member_event(&self) -> Raw<StateEvent<RoomMemberEventContent>> {
|
||||
pub fn into_member_event(self) -> Raw<StateEvent<RoomMemberEventContent>> {
|
||||
let mut json = json!({
|
||||
"content": self.content,
|
||||
"type": self.kind,
|
||||
@@ -200,7 +200,7 @@ pub fn to_member_event(&self) -> Raw<StateEvent<RoomMemberEventContent>> {
|
||||
"state_key": self.state_key,
|
||||
});
|
||||
|
||||
if let Some(unsigned) = &self.unsigned {
|
||||
if let Some(unsigned) = self.unsigned {
|
||||
json["unsigned"] = json!(unsigned);
|
||||
}
|
||||
|
||||
|
||||
+22
-5
@@ -6,12 +6,17 @@ use std::{
|
||||
time::SystemTime,
|
||||
};
|
||||
|
||||
use ruma::OwnedServerName;
|
||||
use tokio::{runtime, sync::broadcast};
|
||||
|
||||
use crate::{config, config::Config, err, log::Log, metrics::Metrics, Err, Result};
|
||||
use crate::{config, config::Config, log::Log, metrics::Metrics, Err, Result};
|
||||
|
||||
/// Server runtime state; public portion
|
||||
pub struct Server {
|
||||
/// Configured name of server. This is the same as the one in the config
|
||||
/// but developers can (and should) reference this string instead.
|
||||
pub name: OwnedServerName,
|
||||
|
||||
/// Server-wide configuration instance
|
||||
pub config: config::Manager,
|
||||
|
||||
@@ -46,6 +51,7 @@ impl Server {
|
||||
#[must_use]
|
||||
pub fn new(config: Config, runtime: Option<runtime::Handle>, log: Log) -> Self {
|
||||
Self {
|
||||
name: config.server_name.clone(),
|
||||
config: config::Manager::new(config),
|
||||
started: SystemTime::now(),
|
||||
stopping: AtomicBool::new(false),
|
||||
@@ -63,6 +69,10 @@ impl Server {
|
||||
return Err!("Reloading not enabled");
|
||||
}
|
||||
|
||||
#[cfg(all(feature = "systemd", target_os = "linux"))]
|
||||
sd_notify::notify(true, &[sd_notify::NotifyState::Reloading])
|
||||
.expect("failed to notify systemd of reloading state");
|
||||
|
||||
if self.reloading.swap(true, Ordering::AcqRel) {
|
||||
return Err!("Reloading already in progress");
|
||||
}
|
||||
@@ -77,7 +87,7 @@ impl Server {
|
||||
})
|
||||
}
|
||||
|
||||
pub fn restart(&self) -> Result<()> {
|
||||
pub fn restart(&self) -> Result {
|
||||
if self.restarting.swap(true, Ordering::AcqRel) {
|
||||
return Err!("Restart already in progress");
|
||||
}
|
||||
@@ -87,7 +97,11 @@ impl Server {
|
||||
})
|
||||
}
|
||||
|
||||
pub fn shutdown(&self) -> Result<()> {
|
||||
pub fn shutdown(&self) -> Result {
|
||||
#[cfg(all(feature = "systemd", target_os = "linux"))]
|
||||
sd_notify::notify(true, &[sd_notify::NotifyState::Stopping])
|
||||
.expect("failed to notify systemd of stopping state");
|
||||
|
||||
if self.stopping.swap(true, Ordering::AcqRel) {
|
||||
return Err!("Shutdown already in progress");
|
||||
}
|
||||
@@ -106,7 +120,7 @@ impl Server {
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub async fn until_shutdown(self: Arc<Self>) {
|
||||
pub async fn until_shutdown(self: &Arc<Self>) {
|
||||
while self.running() {
|
||||
self.signal.subscribe().recv().await.ok();
|
||||
}
|
||||
@@ -121,9 +135,12 @@ impl Server {
|
||||
|
||||
#[inline]
|
||||
pub fn check_running(&self) -> Result {
|
||||
use std::{io, io::ErrorKind::Interrupted};
|
||||
|
||||
self.running()
|
||||
.then_some(())
|
||||
.ok_or_else(|| err!(debug_warn!("Server is shutting down.")))
|
||||
.ok_or_else(|| io::Error::new(Interrupted, "Server shutting down"))
|
||||
.map_err(Into::into)
|
||||
}
|
||||
|
||||
#[inline]
|
||||
|
||||
@@ -84,6 +84,17 @@ macro_rules! apply {
|
||||
};
|
||||
}
|
||||
|
||||
#[macro_export]
|
||||
macro_rules! pair_of {
|
||||
($decl:ty) => {
|
||||
($decl, $decl)
|
||||
};
|
||||
|
||||
($init:expr) => {
|
||||
($init, $init)
|
||||
};
|
||||
}
|
||||
|
||||
/// Functor for truthy
|
||||
#[macro_export]
|
||||
macro_rules! is_true {
|
||||
|
||||
@@ -35,6 +35,13 @@ where
|
||||
Fut: Future<Output = Option<U>> + Send,
|
||||
U: Send;
|
||||
|
||||
fn broadn_flat_map<F, Fut, U, N>(self, n: N, f: F) -> impl Stream<Item = U> + Send
|
||||
where
|
||||
N: Into<Option<usize>>,
|
||||
F: Fn(Item) -> Fut + Send,
|
||||
Fut: Stream<Item = U> + Send + Unpin,
|
||||
U: Send;
|
||||
|
||||
fn broadn_then<F, Fut, U, N>(self, n: N, f: F) -> impl Stream<Item = U> + Send
|
||||
where
|
||||
N: Into<Option<usize>>,
|
||||
@@ -70,6 +77,16 @@ where
|
||||
self.broadn_filter_map(None, f)
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn broad_flat_map<F, Fut, U>(self, f: F) -> impl Stream<Item = U> + Send
|
||||
where
|
||||
F: Fn(Item) -> Fut + Send,
|
||||
Fut: Stream<Item = U> + Send + Unpin,
|
||||
U: Send,
|
||||
{
|
||||
self.broadn_flat_map(None, f)
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn broad_then<F, Fut, U>(self, f: F) -> impl Stream<Item = U> + Send
|
||||
where
|
||||
@@ -122,6 +139,17 @@ where
|
||||
.ready_filter_map(identity)
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn broadn_flat_map<F, Fut, U, N>(self, n: N, f: F) -> impl Stream<Item = U> + Send
|
||||
where
|
||||
N: Into<Option<usize>>,
|
||||
F: Fn(Item) -> Fut + Send,
|
||||
Fut: Stream<Item = U> + Send + Unpin,
|
||||
U: Send,
|
||||
{
|
||||
self.flat_map_unordered(n.into().unwrap_or_else(automatic_width), f)
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn broadn_then<F, Fut, U, N>(self, n: N, f: F) -> impl Stream<Item = U> + Send
|
||||
where
|
||||
|
||||
+18
-5
@@ -22,7 +22,7 @@ pub(crate) fn from_slice<'a, T>(buf: &'a [u8]) -> Result<T>
|
||||
where
|
||||
T: Deserialize<'a>,
|
||||
{
|
||||
let mut deserializer = Deserializer { buf, pos: 0, seq: false };
|
||||
let mut deserializer = Deserializer { buf, pos: 0, rec: 0, seq: false };
|
||||
|
||||
T::deserialize(&mut deserializer).debug_inspect(|_| {
|
||||
deserializer
|
||||
@@ -35,6 +35,7 @@ where
|
||||
pub(crate) struct Deserializer<'de> {
|
||||
buf: &'de [u8],
|
||||
pos: usize,
|
||||
rec: usize,
|
||||
seq: bool,
|
||||
}
|
||||
|
||||
@@ -107,7 +108,7 @@ impl<'de> Deserializer<'de> {
|
||||
/// consumed None is returned instead.
|
||||
#[inline]
|
||||
fn record_peek_byte(&self) -> Option<u8> {
|
||||
let started = self.pos != 0;
|
||||
let started = self.pos != 0 || self.rec > 0;
|
||||
let buf = &self.buf[self.pos..];
|
||||
debug_assert!(
|
||||
!started || buf[0] == Self::SEP,
|
||||
@@ -121,13 +122,14 @@ impl<'de> Deserializer<'de> {
|
||||
/// the start of the next record. (Case for some sequences)
|
||||
#[inline]
|
||||
fn record_start(&mut self) {
|
||||
let started = self.pos != 0;
|
||||
let started = self.pos != 0 || self.rec > 0;
|
||||
debug_assert!(
|
||||
!started || self.buf[self.pos] == Self::SEP,
|
||||
"Missing expected record separator at current position"
|
||||
);
|
||||
|
||||
self.inc_pos(started.into());
|
||||
self.inc_rec(1);
|
||||
}
|
||||
|
||||
/// Consume all remaining bytes, which may include record separators,
|
||||
@@ -157,6 +159,9 @@ impl<'de> Deserializer<'de> {
|
||||
debug_assert!(self.pos <= self.buf.len(), "pos out of range");
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn inc_rec(&mut self, n: usize) { self.rec = self.rec.saturating_add(n); }
|
||||
|
||||
/// Unconsumed input bytes.
|
||||
#[inline]
|
||||
fn remaining(&self) -> Result<usize> {
|
||||
@@ -270,8 +275,16 @@ impl<'a, 'de: 'a> de::Deserializer<'de> for &'a mut Deserializer<'de> {
|
||||
}
|
||||
|
||||
#[cfg_attr(unabridged, tracing::instrument(level = "trace", skip_all))]
|
||||
fn deserialize_option<V: Visitor<'de>>(self, _visitor: V) -> Result<V::Value> {
|
||||
unhandled!("deserialize Option not implemented")
|
||||
fn deserialize_option<V: Visitor<'de>>(self, visitor: V) -> Result<V::Value> {
|
||||
if self
|
||||
.buf
|
||||
.get(self.pos)
|
||||
.is_none_or(|b| *b == Deserializer::SEP)
|
||||
{
|
||||
visitor.visit_none()
|
||||
} else {
|
||||
visitor.visit_some(self)
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg_attr(unabridged, tracing::instrument(level = "trace", skip_all))]
|
||||
|
||||
@@ -30,12 +30,13 @@ use crate::{
|
||||
};
|
||||
|
||||
pub struct Engine {
|
||||
pub(super) read_only: bool,
|
||||
pub(super) secondary: bool,
|
||||
corks: AtomicU32,
|
||||
pub(crate) db: Db,
|
||||
pub(crate) pool: Arc<Pool>,
|
||||
pub(crate) ctx: Arc<Context>,
|
||||
pub(super) read_only: bool,
|
||||
pub(super) secondary: bool,
|
||||
pub(crate) checksums: bool,
|
||||
corks: AtomicU32,
|
||||
}
|
||||
|
||||
pub(crate) type Db = DBWithThreadMode<MultiThreaded>;
|
||||
|
||||
@@ -72,6 +72,13 @@ fn descriptor_cf_options(
|
||||
opts.set_options_from_string("{{arena_block_size=2097152;}}")
|
||||
.map_err(map_err)?;
|
||||
|
||||
#[cfg(debug_assertions)]
|
||||
opts.set_options_from_string(
|
||||
"{{paranoid_checks=true;paranoid_file_checks=true;force_consistency_checks=true;\
|
||||
verify_sst_unique_id_in_manifest=true;}}",
|
||||
)
|
||||
.map_err(map_err)?;
|
||||
|
||||
Ok(opts)
|
||||
}
|
||||
|
||||
|
||||
@@ -83,7 +83,7 @@ pub(crate) static RANDOM: Descriptor = Descriptor {
|
||||
write_size: 1024 * 1024 * 32,
|
||||
cache_shards: 128,
|
||||
compression_level: -3,
|
||||
bottommost_level: Some(4),
|
||||
bottommost_level: Some(-1),
|
||||
compressed_index: true,
|
||||
..BASE
|
||||
};
|
||||
@@ -94,8 +94,8 @@ pub(crate) static SEQUENTIAL: Descriptor = Descriptor {
|
||||
level_size: 1024 * 1024 * 32,
|
||||
file_size: 1024 * 1024 * 2,
|
||||
cache_shards: 128,
|
||||
compression_level: -1,
|
||||
bottommost_level: Some(6),
|
||||
compression_level: -2,
|
||||
bottommost_level: Some(-1),
|
||||
compression_shape: [0, 0, 1, 1, 1, 1, 1],
|
||||
compressed_index: false,
|
||||
..BASE
|
||||
@@ -111,7 +111,7 @@ pub(crate) static RANDOM_SMALL: Descriptor = Descriptor {
|
||||
block_size: 512,
|
||||
cache_shards: 64,
|
||||
compression_level: -4,
|
||||
bottommost_level: Some(1),
|
||||
bottommost_level: Some(-1),
|
||||
compression_shape: [0, 0, 0, 0, 0, 1, 1],
|
||||
compressed_index: false,
|
||||
..RANDOM
|
||||
@@ -126,8 +126,8 @@ pub(crate) static SEQUENTIAL_SMALL: Descriptor = Descriptor {
|
||||
block_size: 512,
|
||||
cache_shards: 64,
|
||||
block_index_hashing: Some(false),
|
||||
compression_level: -2,
|
||||
bottommost_level: Some(4),
|
||||
compression_level: -4,
|
||||
bottommost_level: Some(-2),
|
||||
compression_shape: [0, 0, 0, 0, 1, 1, 1],
|
||||
compressed_index: false,
|
||||
..SEQUENTIAL
|
||||
|
||||
@@ -1,32 +1,15 @@
|
||||
use std::fmt::Write;
|
||||
|
||||
use conduwuit::{implement, Result};
|
||||
use rocksdb::LiveFile as SstFile;
|
||||
|
||||
use super::Engine;
|
||||
use crate::util::map_err;
|
||||
|
||||
#[implement(Engine)]
|
||||
pub fn file_list(&self) -> Result<String> {
|
||||
match self.db.live_files() {
|
||||
| Err(e) => Ok(String::from(e)),
|
||||
| Ok(mut files) => {
|
||||
files.sort_by_key(|f| f.name.clone());
|
||||
let mut res = String::new();
|
||||
writeln!(res, "| lev | sst | keys | dels | size | column |")?;
|
||||
writeln!(res, "| ---: | :--- | ---: | ---: | ---: | :--- |")?;
|
||||
for file in files {
|
||||
writeln!(
|
||||
res,
|
||||
"| {} | {:<13} | {:7}+ | {:4}- | {:9} | {} |",
|
||||
file.level,
|
||||
file.name,
|
||||
file.num_entries,
|
||||
file.num_deletions,
|
||||
file.size,
|
||||
file.column_family_name,
|
||||
)?;
|
||||
}
|
||||
|
||||
Ok(res)
|
||||
},
|
||||
}
|
||||
pub fn file_list(&self) -> impl Iterator<Item = Result<SstFile>> + Send {
|
||||
self.db
|
||||
.live_files()
|
||||
.map_err(map_err)
|
||||
.into_iter()
|
||||
.flat_map(Vec::into_iter)
|
||||
.map(Ok)
|
||||
}
|
||||
|
||||
@@ -56,12 +56,13 @@ pub(crate) async fn open(ctx: Arc<Context>, desc: &[Descriptor]) -> Result<Arc<S
|
||||
);
|
||||
|
||||
Ok(Arc::new(Self {
|
||||
db,
|
||||
pool: ctx.pool.clone(),
|
||||
ctx: ctx.clone(),
|
||||
read_only: config.rocksdb_read_only,
|
||||
secondary: config.rocksdb_secondary,
|
||||
checksums: config.rocksdb_checksums,
|
||||
corks: AtomicU32::new(0),
|
||||
pool: ctx.pool.clone(),
|
||||
db,
|
||||
ctx,
|
||||
}))
|
||||
}
|
||||
|
||||
|
||||
+11
-8
@@ -9,6 +9,8 @@ mod keys_from;
|
||||
mod keys_prefix;
|
||||
mod open;
|
||||
mod options;
|
||||
mod qry;
|
||||
mod qry_batch;
|
||||
mod remove;
|
||||
mod rev_keys;
|
||||
mod rev_keys_from;
|
||||
@@ -37,28 +39,29 @@ pub(crate) use self::options::{
|
||||
cache_iter_options_default, cache_read_options_default, iter_options_default,
|
||||
read_options_default, write_options_default,
|
||||
};
|
||||
pub use self::{get_batch::Get, qry_batch::Qry};
|
||||
use crate::{watchers::Watchers, Engine};
|
||||
|
||||
pub struct Map {
|
||||
name: &'static str,
|
||||
db: Arc<Engine>,
|
||||
cf: Arc<ColumnFamily>,
|
||||
watchers: Watchers,
|
||||
write_options: WriteOptions,
|
||||
cf: Arc<ColumnFamily>,
|
||||
db: Arc<Engine>,
|
||||
read_options: ReadOptions,
|
||||
cache_read_options: ReadOptions,
|
||||
write_options: WriteOptions,
|
||||
}
|
||||
|
||||
impl Map {
|
||||
pub(crate) fn open(db: &Arc<Engine>, name: &'static str) -> Result<Arc<Self>> {
|
||||
Ok(Arc::new(Self {
|
||||
name,
|
||||
db: db.clone(),
|
||||
cf: open::open(db, name),
|
||||
watchers: Watchers::default(),
|
||||
write_options: write_options_default(),
|
||||
read_options: read_options_default(),
|
||||
cache_read_options: cache_read_options_default(),
|
||||
cf: open::open(db, name),
|
||||
db: db.clone(),
|
||||
read_options: read_options_default(db),
|
||||
cache_read_options: cache_read_options_default(db),
|
||||
write_options: write_options_default(db),
|
||||
}))
|
||||
}
|
||||
|
||||
|
||||
+1
-51
@@ -1,65 +1,15 @@
|
||||
use std::{convert::AsRef, fmt::Debug, io::Write, sync::Arc};
|
||||
use std::{convert::AsRef, fmt::Debug, sync::Arc};
|
||||
|
||||
use arrayvec::ArrayVec;
|
||||
use conduwuit::{err, implement, utils::result::MapExpect, Err, Result};
|
||||
use futures::{future::ready, Future, FutureExt, TryFutureExt};
|
||||
use rocksdb::{DBPinnableSlice, ReadOptions};
|
||||
use serde::Serialize;
|
||||
use tokio::task;
|
||||
|
||||
use crate::{
|
||||
keyval::KeyBuf,
|
||||
ser,
|
||||
util::{is_incomplete, map_err, or_else},
|
||||
Handle,
|
||||
};
|
||||
|
||||
/// Fetch a value from the database into cache, returning a reference-handle
|
||||
/// asynchronously. The key is serialized into an allocated buffer to perform
|
||||
/// the query.
|
||||
#[implement(super::Map)]
|
||||
#[inline]
|
||||
pub fn qry<K>(self: &Arc<Self>, key: &K) -> impl Future<Output = Result<Handle<'_>>> + Send
|
||||
where
|
||||
K: Serialize + ?Sized + Debug,
|
||||
{
|
||||
let mut buf = KeyBuf::new();
|
||||
self.bqry(key, &mut buf)
|
||||
}
|
||||
|
||||
/// Fetch a value from the database into cache, returning a reference-handle
|
||||
/// asynchronously. The key is serialized into a fixed-sized buffer to perform
|
||||
/// the query. The maximum size is supplied as const generic parameter.
|
||||
#[implement(super::Map)]
|
||||
#[inline]
|
||||
pub fn aqry<const MAX: usize, K>(
|
||||
self: &Arc<Self>,
|
||||
key: &K,
|
||||
) -> impl Future<Output = Result<Handle<'_>>> + Send
|
||||
where
|
||||
K: Serialize + ?Sized + Debug,
|
||||
{
|
||||
let mut buf = ArrayVec::<u8, MAX>::new();
|
||||
self.bqry(key, &mut buf)
|
||||
}
|
||||
|
||||
/// Fetch a value from the database into cache, returning a reference-handle
|
||||
/// asynchronously. The key is serialized into a user-supplied Writer.
|
||||
#[implement(super::Map)]
|
||||
#[tracing::instrument(skip(self, buf), level = "trace")]
|
||||
pub fn bqry<K, B>(
|
||||
self: &Arc<Self>,
|
||||
key: &K,
|
||||
buf: &mut B,
|
||||
) -> impl Future<Output = Result<Handle<'_>>> + Send
|
||||
where
|
||||
K: Serialize + ?Sized + Debug,
|
||||
B: Write + AsRef<[u8]>,
|
||||
{
|
||||
let key = ser::serialize(buf, key).expect("failed to serialize query key");
|
||||
self.get(key)
|
||||
}
|
||||
|
||||
/// Fetch a value from the database into cache, returning a reference-handle
|
||||
/// asynchronously. The key is referenced directly to perform the query.
|
||||
#[implement(super::Map)]
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
use std::{convert::AsRef, fmt::Debug, sync::Arc};
|
||||
use std::{convert::AsRef, sync::Arc};
|
||||
|
||||
use conduwuit::{
|
||||
implement,
|
||||
@@ -10,43 +10,34 @@ use conduwuit::{
|
||||
};
|
||||
use futures::{Stream, StreamExt, TryStreamExt};
|
||||
use rocksdb::{DBPinnableSlice, ReadOptions};
|
||||
use serde::Serialize;
|
||||
|
||||
use super::get::{cached_handle_from, handle_from};
|
||||
use crate::{keyval::KeyBuf, ser, Handle};
|
||||
use crate::Handle;
|
||||
|
||||
#[implement(super::Map)]
|
||||
#[tracing::instrument(skip(self, keys), level = "trace")]
|
||||
pub fn qry_batch<'a, S, K>(
|
||||
self: &'a Arc<Self>,
|
||||
keys: S,
|
||||
) -> impl Stream<Item = Result<Handle<'_>>> + Send + 'a
|
||||
pub trait Get<'a, K, S>
|
||||
where
|
||||
Self: Sized,
|
||||
S: Stream<Item = K> + Send + 'a,
|
||||
K: Serialize + Debug + 'a,
|
||||
K: AsRef<[u8]> + Send + Sync + 'a,
|
||||
{
|
||||
use crate::pool::Get;
|
||||
fn get(self, map: &'a Arc<super::Map>) -> impl Stream<Item = Result<Handle<'_>>> + Send + 'a;
|
||||
}
|
||||
|
||||
keys.ready_chunks(automatic_amplification())
|
||||
.widen_then(automatic_width(), |chunk| {
|
||||
let keys = chunk
|
||||
.iter()
|
||||
.map(ser::serialize_to::<KeyBuf, _>)
|
||||
.map(|result| result.expect("failed to serialize query key"))
|
||||
.map(Into::into)
|
||||
.collect();
|
||||
|
||||
self.db
|
||||
.pool
|
||||
.execute_get(Get { map: self.clone(), key: keys, res: None })
|
||||
})
|
||||
.map_ok(|results| results.into_iter().stream())
|
||||
.try_flatten()
|
||||
impl<'a, K, S> Get<'a, K, S> for S
|
||||
where
|
||||
Self: Sized,
|
||||
S: Stream<Item = K> + Send + 'a,
|
||||
K: AsRef<[u8]> + Send + Sync + 'a,
|
||||
{
|
||||
#[inline]
|
||||
fn get(self, map: &'a Arc<super::Map>) -> impl Stream<Item = Result<Handle<'_>>> + Send + 'a {
|
||||
map.get_batch(self)
|
||||
}
|
||||
}
|
||||
|
||||
#[implement(super::Map)]
|
||||
#[tracing::instrument(skip(self, keys), level = "trace")]
|
||||
pub fn get_batch<'a, S, K>(
|
||||
pub(crate) fn get_batch<'a, S, K>(
|
||||
self: &'a Arc<Self>,
|
||||
keys: S,
|
||||
) -> impl Stream<Item = Result<Handle<'_>>> + Send + 'a
|
||||
|
||||
@@ -22,7 +22,7 @@ where
|
||||
pub fn raw_keys(self: &Arc<Self>) -> impl Stream<Item = Result<Key<'_>>> + Send {
|
||||
use crate::pool::Seek;
|
||||
|
||||
let opts = super::iter_options_default();
|
||||
let opts = super::iter_options_default(&self.db);
|
||||
let state = stream::State::new(self, opts);
|
||||
if is_cached(self) {
|
||||
let state = state.init_fwd(None);
|
||||
|
||||
@@ -53,7 +53,7 @@ where
|
||||
{
|
||||
use crate::pool::Seek;
|
||||
|
||||
let opts = super::iter_options_default();
|
||||
let opts = super::iter_options_default(&self.db);
|
||||
let state = stream::State::new(self, opts);
|
||||
if is_cached(self, from) {
|
||||
return stream::Keys::<'_>::from(state.init_fwd(from.as_ref().into())).boxed();
|
||||
|
||||
@@ -30,8 +30,5 @@ pub(super) fn open(db: &Arc<Engine>, name: &str) -> Arc<ColumnFamily> {
|
||||
// lifetime parameter. We should not hold this handle, even in its Arc, after
|
||||
// closing the database (dropping `Engine`). Since `Arc<Engine>` is a sibling
|
||||
// member along with this handle in `Map`, that is prevented.
|
||||
unsafe {
|
||||
Arc::increment_strong_count(cf_ptr);
|
||||
Arc::from_raw(cf_ptr)
|
||||
}
|
||||
unsafe { Arc::from_raw(cf_ptr) }
|
||||
}
|
||||
|
||||
+29
-21
@@ -1,35 +1,43 @@
|
||||
use std::sync::Arc;
|
||||
|
||||
use rocksdb::{ReadOptions, ReadTier, WriteOptions};
|
||||
|
||||
#[inline]
|
||||
pub(crate) fn iter_options_default() -> ReadOptions {
|
||||
let mut options = read_options_default();
|
||||
options.set_background_purge_on_iterator_cleanup(true);
|
||||
//options.set_pin_data(true);
|
||||
options
|
||||
}
|
||||
use crate::Engine;
|
||||
|
||||
#[inline]
|
||||
pub(crate) fn cache_iter_options_default() -> ReadOptions {
|
||||
let mut options = cache_read_options_default();
|
||||
options.set_background_purge_on_iterator_cleanup(true);
|
||||
//options.set_pin_data(true);
|
||||
options
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub(crate) fn cache_read_options_default() -> ReadOptions {
|
||||
let mut options = read_options_default();
|
||||
pub(crate) fn cache_iter_options_default(db: &Arc<Engine>) -> ReadOptions {
|
||||
let mut options = iter_options_default(db);
|
||||
options.set_read_tier(ReadTier::BlockCache);
|
||||
options.fill_cache(false);
|
||||
options
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub(crate) fn read_options_default() -> ReadOptions {
|
||||
let mut options = ReadOptions::default();
|
||||
options.set_total_order_seek(true);
|
||||
pub(crate) fn iter_options_default(db: &Arc<Engine>) -> ReadOptions {
|
||||
let mut options = read_options_default(db);
|
||||
options.set_background_purge_on_iterator_cleanup(true);
|
||||
options
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub(crate) fn write_options_default() -> WriteOptions { WriteOptions::default() }
|
||||
pub(crate) fn cache_read_options_default(db: &Arc<Engine>) -> ReadOptions {
|
||||
let mut options = read_options_default(db);
|
||||
options.set_read_tier(ReadTier::BlockCache);
|
||||
options.fill_cache(false);
|
||||
options
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub(crate) fn read_options_default(db: &Arc<Engine>) -> ReadOptions {
|
||||
let mut options = ReadOptions::default();
|
||||
options.set_total_order_seek(true);
|
||||
|
||||
if !db.checksums {
|
||||
options.set_verify_checksums(false);
|
||||
}
|
||||
|
||||
options
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub(crate) fn write_options_default(_db: &Arc<Engine>) -> WriteOptions { WriteOptions::default() }
|
||||
|
||||
@@ -0,0 +1,54 @@
|
||||
use std::{convert::AsRef, fmt::Debug, io::Write, sync::Arc};
|
||||
|
||||
use arrayvec::ArrayVec;
|
||||
use conduwuit::{implement, Result};
|
||||
use futures::Future;
|
||||
use serde::Serialize;
|
||||
|
||||
use crate::{keyval::KeyBuf, ser, Handle};
|
||||
|
||||
/// Fetch a value from the database into cache, returning a reference-handle
|
||||
/// asynchronously. The key is serialized into an allocated buffer to perform
|
||||
/// the query.
|
||||
#[implement(super::Map)]
|
||||
#[inline]
|
||||
pub fn qry<K>(self: &Arc<Self>, key: &K) -> impl Future<Output = Result<Handle<'_>>> + Send
|
||||
where
|
||||
K: Serialize + ?Sized + Debug,
|
||||
{
|
||||
let mut buf = KeyBuf::new();
|
||||
self.bqry(key, &mut buf)
|
||||
}
|
||||
|
||||
/// Fetch a value from the database into cache, returning a reference-handle
|
||||
/// asynchronously. The key is serialized into a fixed-sized buffer to perform
|
||||
/// the query. The maximum size is supplied as const generic parameter.
|
||||
#[implement(super::Map)]
|
||||
#[inline]
|
||||
pub fn aqry<const MAX: usize, K>(
|
||||
self: &Arc<Self>,
|
||||
key: &K,
|
||||
) -> impl Future<Output = Result<Handle<'_>>> + Send
|
||||
where
|
||||
K: Serialize + ?Sized + Debug,
|
||||
{
|
||||
let mut buf = ArrayVec::<u8, MAX>::new();
|
||||
self.bqry(key, &mut buf)
|
||||
}
|
||||
|
||||
/// Fetch a value from the database into cache, returning a reference-handle
|
||||
/// asynchronously. The key is serialized into a user-supplied Writer.
|
||||
#[implement(super::Map)]
|
||||
#[tracing::instrument(skip(self, buf), level = "trace")]
|
||||
pub fn bqry<K, B>(
|
||||
self: &Arc<Self>,
|
||||
key: &K,
|
||||
buf: &mut B,
|
||||
) -> impl Future<Output = Result<Handle<'_>>> + Send
|
||||
where
|
||||
K: Serialize + ?Sized + Debug,
|
||||
B: Write + AsRef<[u8]>,
|
||||
{
|
||||
let key = ser::serialize(buf, key).expect("failed to serialize query key");
|
||||
self.get(key)
|
||||
}
|
||||
@@ -0,0 +1,63 @@
|
||||
use std::{fmt::Debug, sync::Arc};
|
||||
|
||||
use conduwuit::{
|
||||
implement,
|
||||
utils::{
|
||||
stream::{automatic_amplification, automatic_width, WidebandExt},
|
||||
IterStream,
|
||||
},
|
||||
Result,
|
||||
};
|
||||
use futures::{Stream, StreamExt, TryStreamExt};
|
||||
use serde::Serialize;
|
||||
|
||||
use crate::{keyval::KeyBuf, ser, Handle};
|
||||
|
||||
pub trait Qry<'a, K, S>
|
||||
where
|
||||
S: Stream<Item = K> + Send + 'a,
|
||||
K: Serialize + Debug,
|
||||
{
|
||||
fn qry(self, map: &'a Arc<super::Map>) -> impl Stream<Item = Result<Handle<'_>>> + Send + 'a;
|
||||
}
|
||||
|
||||
impl<'a, K, S> Qry<'a, K, S> for S
|
||||
where
|
||||
Self: 'a,
|
||||
S: Stream<Item = K> + Send + 'a,
|
||||
K: Serialize + Debug + 'a,
|
||||
{
|
||||
#[inline]
|
||||
fn qry(self, map: &'a Arc<super::Map>) -> impl Stream<Item = Result<Handle<'_>>> + Send + 'a {
|
||||
map.qry_batch(self)
|
||||
}
|
||||
}
|
||||
|
||||
#[implement(super::Map)]
|
||||
#[tracing::instrument(skip(self, keys), level = "trace")]
|
||||
pub(crate) fn qry_batch<'a, S, K>(
|
||||
self: &'a Arc<Self>,
|
||||
keys: S,
|
||||
) -> impl Stream<Item = Result<Handle<'_>>> + Send + 'a
|
||||
where
|
||||
S: Stream<Item = K> + Send + 'a,
|
||||
K: Serialize + Debug + 'a,
|
||||
{
|
||||
use crate::pool::Get;
|
||||
|
||||
keys.ready_chunks(automatic_amplification())
|
||||
.widen_then(automatic_width(), |chunk| {
|
||||
let keys = chunk
|
||||
.iter()
|
||||
.map(ser::serialize_to::<KeyBuf, _>)
|
||||
.map(|result| result.expect("failed to serialize query key"))
|
||||
.map(Into::into)
|
||||
.collect();
|
||||
|
||||
self.db
|
||||
.pool
|
||||
.execute_get(Get { map: self.clone(), key: keys, res: None })
|
||||
})
|
||||
.map_ok(|results| results.into_iter().stream())
|
||||
.try_flatten()
|
||||
}
|
||||
@@ -22,7 +22,7 @@ where
|
||||
pub fn rev_raw_keys(self: &Arc<Self>) -> impl Stream<Item = Result<Key<'_>>> + Send {
|
||||
use crate::pool::Seek;
|
||||
|
||||
let opts = super::iter_options_default();
|
||||
let opts = super::iter_options_default(&self.db);
|
||||
let state = stream::State::new(self, opts);
|
||||
if is_cached(self) {
|
||||
let state = state.init_rev(None);
|
||||
|
||||
@@ -61,7 +61,7 @@ where
|
||||
{
|
||||
use crate::pool::Seek;
|
||||
|
||||
let opts = super::iter_options_default();
|
||||
let opts = super::iter_options_default(&self.db);
|
||||
let state = stream::State::new(self, opts);
|
||||
if is_cached(self, from) {
|
||||
return stream::KeysRev::<'_>::from(state.init_rev(from.as_ref().into())).boxed();
|
||||
|
||||
@@ -31,7 +31,7 @@ where
|
||||
pub fn rev_raw_stream(self: &Arc<Self>) -> impl Stream<Item = Result<KeyVal<'_>>> + Send {
|
||||
use crate::pool::Seek;
|
||||
|
||||
let opts = super::iter_options_default();
|
||||
let opts = super::iter_options_default(&self.db);
|
||||
let state = stream::State::new(self, opts);
|
||||
if is_cached(self) {
|
||||
let state = state.init_rev(None);
|
||||
@@ -66,7 +66,7 @@ pub fn rev_raw_stream(self: &Arc<Self>) -> impl Stream<Item = Result<KeyVal<'_>>
|
||||
fields(%map),
|
||||
)]
|
||||
pub(super) fn is_cached(map: &Arc<super::Map>) -> bool {
|
||||
let opts = super::cache_iter_options_default();
|
||||
let opts = super::cache_iter_options_default(&map.db);
|
||||
let state = stream::State::new(map, opts).init_rev(None);
|
||||
|
||||
!state.is_incomplete()
|
||||
|
||||
@@ -80,7 +80,7 @@ where
|
||||
{
|
||||
use crate::pool::Seek;
|
||||
|
||||
let opts = super::iter_options_default();
|
||||
let opts = super::iter_options_default(&self.db);
|
||||
let state = stream::State::new(self, opts);
|
||||
if is_cached(self, from) {
|
||||
let state = state.init_rev(from.as_ref().into());
|
||||
@@ -118,7 +118,7 @@ pub(super) fn is_cached<P>(map: &Arc<super::Map>, from: &P) -> bool
|
||||
where
|
||||
P: AsRef<[u8]> + ?Sized,
|
||||
{
|
||||
let cache_opts = super::cache_iter_options_default();
|
||||
let cache_opts = super::cache_iter_options_default(&map.db);
|
||||
let cache_status = stream::State::new(map, cache_opts)
|
||||
.init_rev(from.as_ref().into())
|
||||
.status();
|
||||
|
||||
@@ -30,7 +30,7 @@ where
|
||||
pub fn raw_stream(self: &Arc<Self>) -> impl Stream<Item = Result<KeyVal<'_>>> + Send {
|
||||
use crate::pool::Seek;
|
||||
|
||||
let opts = super::iter_options_default();
|
||||
let opts = super::iter_options_default(&self.db);
|
||||
let state = stream::State::new(self, opts);
|
||||
if is_cached(self) {
|
||||
let state = state.init_fwd(None);
|
||||
@@ -65,7 +65,7 @@ pub fn raw_stream(self: &Arc<Self>) -> impl Stream<Item = Result<KeyVal<'_>>> +
|
||||
fields(%map),
|
||||
)]
|
||||
pub(super) fn is_cached(map: &Arc<super::Map>) -> bool {
|
||||
let opts = super::cache_iter_options_default();
|
||||
let opts = super::cache_iter_options_default(&map.db);
|
||||
let state = stream::State::new(map, opts).init_fwd(None);
|
||||
|
||||
!state.is_incomplete()
|
||||
|
||||
@@ -77,7 +77,7 @@ where
|
||||
{
|
||||
use crate::pool::Seek;
|
||||
|
||||
let opts = super::iter_options_default();
|
||||
let opts = super::iter_options_default(&self.db);
|
||||
let state = stream::State::new(self, opts);
|
||||
if is_cached(self, from) {
|
||||
let state = state.init_fwd(from.as_ref().into());
|
||||
@@ -115,7 +115,7 @@ pub(super) fn is_cached<P>(map: &Arc<super::Map>, from: &P) -> bool
|
||||
where
|
||||
P: AsRef<[u8]> + ?Sized,
|
||||
{
|
||||
let opts = super::cache_iter_options_default();
|
||||
let opts = super::cache_iter_options_default(&map.db);
|
||||
let state = stream::State::new(map, opts).init_fwd(from.as_ref().into());
|
||||
|
||||
!state.is_incomplete()
|
||||
|
||||
+1
-1
@@ -30,7 +30,7 @@ pub use self::{
|
||||
deserialized::Deserialized,
|
||||
handle::Handle,
|
||||
keyval::{serialize_key, serialize_val, KeyVal, Slice},
|
||||
map::{compact, Map},
|
||||
map::{compact, Get, Map, Qry},
|
||||
ser::{serialize, serialize_to, serialize_to_vec, Cbor, Interfix, Json, Separator, SEP},
|
||||
};
|
||||
pub(crate) use self::{
|
||||
|
||||
@@ -13,7 +13,7 @@ use std::{
|
||||
use async_channel::{QueueStrategy, Receiver, RecvError, Sender};
|
||||
use conduwuit::{
|
||||
debug, debug_warn, err, error, implement,
|
||||
result::{DebugInspect, LogDebugErr},
|
||||
result::DebugInspect,
|
||||
trace,
|
||||
utils::sys::compute::{get_affinity, nth_core_available, set_affinity},
|
||||
Error, Result, Server,
|
||||
@@ -290,9 +290,12 @@ fn worker_init(&self, id: usize) {
|
||||
// affinity is empty (no-op) if there's only one queue
|
||||
set_affinity(affinity.clone());
|
||||
|
||||
#[cfg(feature = "jemalloc")]
|
||||
#[cfg(all(not(target_env = "msvc"), feature = "jemalloc"))]
|
||||
if affinity.clone().count() == 1 && conduwuit::alloc::je::is_affine_arena() {
|
||||
use conduwuit::alloc::je::this_thread::{arena_id, set_arena};
|
||||
use conduwuit::{
|
||||
alloc::je::this_thread::{arena_id, set_arena},
|
||||
result::LogDebugErr,
|
||||
};
|
||||
|
||||
let id = affinity.clone().next().expect("at least one id");
|
||||
|
||||
|
||||
+158
-1
@@ -3,7 +3,7 @@
|
||||
use std::fmt::Debug;
|
||||
|
||||
use arrayvec::ArrayVec;
|
||||
use conduwuit::ruma::{serde::Raw, RoomId, UserId};
|
||||
use conduwuit::ruma::{serde::Raw, EventId, RoomId, UserId};
|
||||
use serde::Serialize;
|
||||
|
||||
use crate::{
|
||||
@@ -389,3 +389,160 @@ fn de_complex() {
|
||||
|
||||
assert_eq!(arr, key, "deserialization of serialization does not match");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn serde_tuple_option_value_some() {
|
||||
let room_id: &RoomId = "!room:example.com".try_into().unwrap();
|
||||
let user_id: &UserId = "@user:example.com".try_into().unwrap();
|
||||
|
||||
let mut aa = Vec::<u8>::new();
|
||||
aa.extend_from_slice(room_id.as_bytes());
|
||||
aa.push(0xFF);
|
||||
aa.extend_from_slice(user_id.as_bytes());
|
||||
|
||||
let bb: (&RoomId, Option<&UserId>) = (room_id, Some(user_id));
|
||||
let bbs = serialize_to_vec(&bb).expect("failed to serialize tuple");
|
||||
assert_eq!(aa, bbs);
|
||||
|
||||
let cc: (&RoomId, Option<&UserId>) =
|
||||
de::from_slice(&bbs).expect("failed to deserialize tuple");
|
||||
|
||||
assert_eq!(bb.1, cc.1);
|
||||
assert_eq!(cc.0, bb.0);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn serde_tuple_option_value_none() {
|
||||
let room_id: &RoomId = "!room:example.com".try_into().unwrap();
|
||||
|
||||
let mut aa = Vec::<u8>::new();
|
||||
aa.extend_from_slice(room_id.as_bytes());
|
||||
aa.push(0xFF);
|
||||
|
||||
let bb: (&RoomId, Option<&UserId>) = (room_id, None);
|
||||
let bbs = serialize_to_vec(&bb).expect("failed to serialize tuple");
|
||||
assert_eq!(aa, bbs);
|
||||
|
||||
let cc: (&RoomId, Option<&UserId>) =
|
||||
de::from_slice(&bbs).expect("failed to deserialize tuple");
|
||||
|
||||
assert_eq!(None, cc.1);
|
||||
assert_eq!(cc.0, bb.0);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn serde_tuple_option_none_value() {
|
||||
let user_id: &UserId = "@user:example.com".try_into().unwrap();
|
||||
|
||||
let mut aa = Vec::<u8>::new();
|
||||
aa.push(0xFF);
|
||||
aa.extend_from_slice(user_id.as_bytes());
|
||||
|
||||
let bb: (Option<&RoomId>, &UserId) = (None, user_id);
|
||||
let bbs = serialize_to_vec(&bb).expect("failed to serialize tuple");
|
||||
assert_eq!(aa, bbs);
|
||||
|
||||
let cc: (Option<&RoomId>, &UserId) =
|
||||
de::from_slice(&bbs).expect("failed to deserialize tuple");
|
||||
|
||||
assert_eq!(None, cc.0);
|
||||
assert_eq!(cc.1, bb.1);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn serde_tuple_option_some_value() {
|
||||
let room_id: &RoomId = "!room:example.com".try_into().unwrap();
|
||||
let user_id: &UserId = "@user:example.com".try_into().unwrap();
|
||||
|
||||
let mut aa = Vec::<u8>::new();
|
||||
aa.extend_from_slice(room_id.as_bytes());
|
||||
aa.push(0xFF);
|
||||
aa.extend_from_slice(user_id.as_bytes());
|
||||
|
||||
let bb: (Option<&RoomId>, &UserId) = (Some(room_id), user_id);
|
||||
let bbs = serialize_to_vec(&bb).expect("failed to serialize tuple");
|
||||
assert_eq!(aa, bbs);
|
||||
|
||||
let cc: (Option<&RoomId>, &UserId) =
|
||||
de::from_slice(&bbs).expect("failed to deserialize tuple");
|
||||
|
||||
assert_eq!(bb.0, cc.0);
|
||||
assert_eq!(cc.1, bb.1);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn serde_tuple_option_some_some() {
|
||||
let room_id: &RoomId = "!room:example.com".try_into().unwrap();
|
||||
let user_id: &UserId = "@user:example.com".try_into().unwrap();
|
||||
|
||||
let mut aa = Vec::<u8>::new();
|
||||
aa.extend_from_slice(room_id.as_bytes());
|
||||
aa.push(0xFF);
|
||||
aa.extend_from_slice(user_id.as_bytes());
|
||||
|
||||
let bb: (Option<&RoomId>, Option<&UserId>) = (Some(room_id), Some(user_id));
|
||||
let bbs = serialize_to_vec(&bb).expect("failed to serialize tuple");
|
||||
assert_eq!(aa, bbs);
|
||||
|
||||
let cc: (Option<&RoomId>, Option<&UserId>) =
|
||||
de::from_slice(&bbs).expect("failed to deserialize tuple");
|
||||
|
||||
assert_eq!(cc.0, bb.0);
|
||||
assert_eq!(bb.1, cc.1);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn serde_tuple_option_none_none() {
|
||||
let aa = vec![0xFF];
|
||||
|
||||
let bb: (Option<&RoomId>, Option<&UserId>) = (None, None);
|
||||
let bbs = serialize_to_vec(&bb).expect("failed to serialize tuple");
|
||||
assert_eq!(aa, bbs);
|
||||
|
||||
let cc: (Option<&RoomId>, Option<&UserId>) =
|
||||
de::from_slice(&bbs).expect("failed to deserialize tuple");
|
||||
|
||||
assert_eq!(cc.0, bb.0);
|
||||
assert_eq!(None, cc.1);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn serde_tuple_option_some_none_some() {
|
||||
let room_id: &RoomId = "!room:example.com".try_into().unwrap();
|
||||
let user_id: &UserId = "@user:example.com".try_into().unwrap();
|
||||
|
||||
let mut aa = Vec::<u8>::new();
|
||||
aa.extend_from_slice(room_id.as_bytes());
|
||||
aa.push(0xFF);
|
||||
aa.push(0xFF);
|
||||
aa.extend_from_slice(user_id.as_bytes());
|
||||
|
||||
let bb: (Option<&RoomId>, Option<&EventId>, Option<&UserId>) =
|
||||
(Some(room_id), None, Some(user_id));
|
||||
|
||||
let bbs = serialize_to_vec(&bb).expect("failed to serialize tuple");
|
||||
assert_eq!(aa, bbs);
|
||||
|
||||
let cc: (Option<&RoomId>, Option<&EventId>, Option<&UserId>) =
|
||||
de::from_slice(&bbs).expect("failed to deserialize tuple");
|
||||
|
||||
assert_eq!(bb.0, cc.0);
|
||||
assert_eq!(None, cc.1);
|
||||
assert_eq!(bb.1, cc.1);
|
||||
assert_eq!(bb.2, cc.2);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn serde_tuple_option_none_none_none() {
|
||||
let aa = vec![0xFF, 0xFF];
|
||||
|
||||
let bb: (Option<&RoomId>, Option<&EventId>, Option<&UserId>) = (None, None, None);
|
||||
let bbs = serialize_to_vec(&bb).expect("failed to serialize tuple");
|
||||
assert_eq!(aa, bbs);
|
||||
|
||||
let cc: (Option<&RoomId>, Option<&EventId>, Option<&UserId>) =
|
||||
de::from_slice(&bbs).expect("failed to deserialize tuple");
|
||||
|
||||
assert_eq!(None, cc.0);
|
||||
assert_eq!(bb, cc);
|
||||
}
|
||||
|
||||
@@ -49,6 +49,9 @@ default = [
|
||||
"zstd_compression",
|
||||
]
|
||||
|
||||
blurhashing = [
|
||||
"conduwuit-service/blurhashing",
|
||||
]
|
||||
brotli_compression = [
|
||||
"conduwuit-api/brotli_compression",
|
||||
"conduwuit-core/brotli_compression",
|
||||
|
||||
+2
-2
@@ -3,7 +3,7 @@ use std::sync::Arc;
|
||||
use conduwuit::{
|
||||
config::Config,
|
||||
debug_warn, err,
|
||||
log::{capture, fmt_span, ConsoleFormat, LogLevelReloadHandles},
|
||||
log::{capture, fmt_span, ConsoleFormat, ConsoleWriter, LogLevelReloadHandles},
|
||||
result::UnwrapOrErr,
|
||||
Result,
|
||||
};
|
||||
@@ -30,7 +30,7 @@ pub(crate) fn init(
|
||||
.with_span_events(console_span_events)
|
||||
.event_format(ConsoleFormat::new(config))
|
||||
.fmt_fields(ConsoleFormat::new(config))
|
||||
.map_writer(|w| w);
|
||||
.with_writer(ConsoleWriter::new(config));
|
||||
|
||||
let (console_reload_filter, console_reload_handle) =
|
||||
reload::Layer::new(console_filter.clone());
|
||||
|
||||
+8
-9
@@ -8,13 +8,11 @@ use std::{
|
||||
time::Duration,
|
||||
};
|
||||
|
||||
#[cfg(all(not(target_env = "msvc"), feature = "jemalloc"))]
|
||||
use conduwuit::result::LogDebugErr;
|
||||
use conduwuit::{
|
||||
is_true,
|
||||
result::LogDebugErr,
|
||||
utils::{
|
||||
available_parallelism,
|
||||
sys::compute::{nth_core_available, set_affinity},
|
||||
},
|
||||
utils::sys::compute::{nth_core_available, set_affinity},
|
||||
Result,
|
||||
};
|
||||
use tokio::runtime::Builder;
|
||||
@@ -25,6 +23,7 @@ const WORKER_NAME: &str = "conduwuit:worker";
|
||||
const WORKER_MIN: usize = 2;
|
||||
const WORKER_KEEPALIVE: u64 = 36;
|
||||
const MAX_BLOCKING_THREADS: usize = 1024;
|
||||
#[cfg(all(not(target_env = "msvc"), feature = "jemalloc"))]
|
||||
const DISABLE_MUZZY_THRESHOLD: usize = 4;
|
||||
|
||||
static WORKER_AFFINITY: OnceLock<bool> = OnceLock::new();
|
||||
@@ -122,7 +121,7 @@ fn set_worker_affinity() {
|
||||
set_worker_mallctl(id);
|
||||
}
|
||||
|
||||
#[cfg(feature = "jemalloc")]
|
||||
#[cfg(all(not(target_env = "msvc"), feature = "jemalloc"))]
|
||||
fn set_worker_mallctl(id: usize) {
|
||||
use conduwuit::alloc::je::{
|
||||
is_affine_arena,
|
||||
@@ -137,13 +136,13 @@ fn set_worker_mallctl(id: usize) {
|
||||
.get()
|
||||
.expect("GC_MUZZY initialized by runtime::new()");
|
||||
|
||||
let muzzy_auto_disable = available_parallelism() >= DISABLE_MUZZY_THRESHOLD;
|
||||
let muzzy_auto_disable = conduwuit::utils::available_parallelism() >= DISABLE_MUZZY_THRESHOLD;
|
||||
if matches!(muzzy_option, Some(false) | None if muzzy_auto_disable) {
|
||||
set_muzzy_decay(-1).log_debug_err().ok();
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(not(feature = "jemalloc"))]
|
||||
#[cfg(any(not(feature = "jemalloc"), target_env = "msvc"))]
|
||||
fn set_worker_mallctl(_: usize) {}
|
||||
|
||||
#[tracing::instrument(
|
||||
@@ -189,7 +188,7 @@ fn thread_park() {
|
||||
}
|
||||
|
||||
fn gc_on_park() {
|
||||
#[cfg(feature = "jemalloc")]
|
||||
#[cfg(all(not(target_env = "msvc"), feature = "jemalloc"))]
|
||||
conduwuit::alloc::je::this_thread::decay()
|
||||
.log_debug_err()
|
||||
.ok();
|
||||
|
||||
+3
-3
@@ -46,14 +46,14 @@ impl Server {
|
||||
.and_then(|raw| crate::clap::update(raw, args))
|
||||
.and_then(|raw| Config::new(&raw))?;
|
||||
|
||||
#[cfg(feature = "sentry_telemetry")]
|
||||
let sentry_guard = crate::sentry::init(&config);
|
||||
|
||||
let (tracing_reload_handle, tracing_flame_guard, capture) =
|
||||
crate::logging::init(&config)?;
|
||||
|
||||
config.check()?;
|
||||
|
||||
#[cfg(feature = "sentry_telemetry")]
|
||||
let sentry_guard = crate::sentry::init(&config);
|
||||
|
||||
#[cfg(unix)]
|
||||
sys::maximize_fd_limit()
|
||||
.expect("Unable to increase maximum soft and hard file descriptor limit");
|
||||
|
||||
@@ -16,6 +16,8 @@ pub(super) async fn signal(server: Arc<Server>) {
|
||||
|
||||
let mut quit = unix::signal(SignalKind::quit()).expect("SIGQUIT handler");
|
||||
let mut term = unix::signal(SignalKind::terminate()).expect("SIGTERM handler");
|
||||
let mut usr1 = unix::signal(SignalKind::user_defined1()).expect("SIGUSR1 handler");
|
||||
let mut usr2 = unix::signal(SignalKind::user_defined2()).expect("SIGUSR2 handler");
|
||||
loop {
|
||||
trace!("Installed signal handlers");
|
||||
let sig: &'static str;
|
||||
@@ -23,6 +25,8 @@ pub(super) async fn signal(server: Arc<Server>) {
|
||||
_ = signal::ctrl_c() => { sig = "SIGINT"; },
|
||||
_ = quit.recv() => { sig = "SIGQUIT"; },
|
||||
_ = term.recv() => { sig = "SIGTERM"; },
|
||||
_ = usr1.recv() => { sig = "SIGUSR1"; },
|
||||
_ = usr2.recv() => { sig = "SIGUSR2"; },
|
||||
}
|
||||
|
||||
warn!("Received {sig}");
|
||||
|
||||
+32
-23
@@ -5,7 +5,7 @@ use axum::{
|
||||
Router,
|
||||
};
|
||||
use axum_client_ip::SecureClientIpSource;
|
||||
use conduwuit::{error, Result, Server};
|
||||
use conduwuit::{debug, error, Result, Server};
|
||||
use conduwuit_api::router::state::Guard;
|
||||
use conduwuit_service::Services;
|
||||
use http::{
|
||||
@@ -18,6 +18,7 @@ use tower_http::{
|
||||
cors::{self, CorsLayer},
|
||||
sensitive_headers::SetSensitiveHeadersLayer,
|
||||
set_header::SetResponseHeaderLayer,
|
||||
timeout::{RequestBodyTimeoutLayer, ResponseBodyTimeoutLayer, TimeoutLayer},
|
||||
trace::{DefaultOnFailure, DefaultOnRequest, DefaultOnResponse, TraceLayer},
|
||||
};
|
||||
use tracing::Level;
|
||||
@@ -48,9 +49,9 @@ pub(crate) fn build(services: &Arc<Services>) -> Result<(Router, Guard)> {
|
||||
))]
|
||||
let layers = layers.layer(compression_layer(server));
|
||||
|
||||
let services_ = services.clone();
|
||||
let layers = layers
|
||||
.layer(SetSensitiveHeadersLayer::new([header::AUTHORIZATION]))
|
||||
.layer(axum::middleware::from_fn_with_state(Arc::clone(services), request::spawn))
|
||||
.layer(
|
||||
TraceLayer::new_for_http()
|
||||
.make_span_with(tracing_span::<_>)
|
||||
@@ -60,6 +61,9 @@ pub(crate) fn build(services: &Arc<Services>) -> Result<(Router, Guard)> {
|
||||
)
|
||||
.layer(axum::middleware::from_fn_with_state(Arc::clone(services), request::handle))
|
||||
.layer(SecureClientIpSource::ConnectInfo.into_extension())
|
||||
.layer(ResponseBodyTimeoutLayer::new(Duration::from_secs(server.config.client_response_timeout)))
|
||||
.layer(RequestBodyTimeoutLayer::new(Duration::from_secs(server.config.client_receive_timeout)))
|
||||
.layer(TimeoutLayer::new(Duration::from_secs(server.config.client_request_timeout)))
|
||||
.layer(SetResponseHeaderLayer::if_not_present(
|
||||
HeaderName::from_static("origin-agent-cluster"), // https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Origin-Agent-Cluster
|
||||
HeaderValue::from_static("?1"),
|
||||
@@ -86,7 +90,7 @@ pub(crate) fn build(services: &Arc<Services>) -> Result<(Router, Guard)> {
|
||||
))
|
||||
.layer(cors_layer(server))
|
||||
.layer(body_limit_layer(server))
|
||||
.layer(CatchPanicLayer::custom(catch_panic));
|
||||
.layer(CatchPanicLayer::custom(move |panic| catch_panic(panic, services_.clone())));
|
||||
|
||||
let (router, guard) = router::build(services);
|
||||
Ok((router.layer(layers), guard))
|
||||
@@ -164,15 +168,14 @@ fn body_limit_layer(server: &Server) -> DefaultBodyLimit {
|
||||
#[allow(clippy::needless_pass_by_value)]
|
||||
fn catch_panic(
|
||||
err: Box<dyn Any + Send + 'static>,
|
||||
services: Arc<Services>,
|
||||
) -> http::Response<http_body_util::Full<bytes::Bytes>> {
|
||||
//TODO: XXX
|
||||
/*
|
||||
conduwuit_service::services()
|
||||
.server
|
||||
.metrics
|
||||
.requests_panic
|
||||
.fetch_add(1, std::sync::atomic::Ordering::Release);
|
||||
*/
|
||||
services
|
||||
.server
|
||||
.metrics
|
||||
.requests_panic
|
||||
.fetch_add(1, std::sync::atomic::Ordering::Release);
|
||||
|
||||
let details = if let Some(s) = err.downcast_ref::<String>() {
|
||||
s.clone()
|
||||
} else if let Some(s) = err.downcast_ref::<&str>() {
|
||||
@@ -196,20 +199,26 @@ fn catch_panic(
|
||||
}
|
||||
|
||||
fn tracing_span<T>(request: &http::Request<T>) -> tracing::Span {
|
||||
let path = request.extensions().get::<MatchedPath>().map_or_else(
|
||||
|| {
|
||||
request
|
||||
.uri()
|
||||
.path_and_query()
|
||||
.expect("all requests have a path")
|
||||
.as_str()
|
||||
},
|
||||
truncated_matched_path,
|
||||
);
|
||||
let path = request
|
||||
.extensions()
|
||||
.get::<MatchedPath>()
|
||||
.map_or_else(|| request_path_str(request), truncated_matched_path);
|
||||
|
||||
let method = request.method();
|
||||
tracing::span! {
|
||||
parent: None,
|
||||
debug::INFO_SPAN_LEVEL,
|
||||
"router",
|
||||
method = %request.method(),
|
||||
%path,
|
||||
}
|
||||
}
|
||||
|
||||
tracing::debug_span!(parent: None, "router", %method, %path)
|
||||
fn request_path_str<T>(request: &http::Request<T>) -> &str {
|
||||
request
|
||||
.uri()
|
||||
.path_and_query()
|
||||
.expect("all requests have a path")
|
||||
.as_str()
|
||||
}
|
||||
|
||||
fn truncated_matched_path(path: &MatchedPath) -> &str {
|
||||
|
||||
+79
-72
@@ -1,4 +1,8 @@
|
||||
use std::sync::{atomic::Ordering, Arc};
|
||||
use std::{
|
||||
fmt::Debug,
|
||||
sync::{atomic::Ordering, Arc},
|
||||
time::Duration,
|
||||
};
|
||||
|
||||
use axum::{
|
||||
extract::State,
|
||||
@@ -6,82 +10,18 @@ use axum::{
|
||||
};
|
||||
use conduwuit::{debug, debug_error, debug_warn, err, error, trace, Result};
|
||||
use conduwuit_service::Services;
|
||||
use futures::FutureExt;
|
||||
use http::{Method, StatusCode, Uri};
|
||||
use tokio::time::sleep;
|
||||
use tracing::Span;
|
||||
|
||||
#[tracing::instrument(
|
||||
parent = None,
|
||||
level = "trace",
|
||||
skip_all,
|
||||
fields(
|
||||
handled = %services
|
||||
.server
|
||||
.metrics
|
||||
.requests_spawn_finished
|
||||
.fetch_add(1, Ordering::Relaxed),
|
||||
active = %services
|
||||
.server
|
||||
.metrics
|
||||
.requests_spawn_active
|
||||
.fetch_add(1, Ordering::Relaxed),
|
||||
)
|
||||
)]
|
||||
pub(crate) async fn spawn(
|
||||
State(services): State<Arc<Services>>,
|
||||
req: http::Request<axum::body::Body>,
|
||||
next: axum::middleware::Next,
|
||||
) -> Result<Response, StatusCode> {
|
||||
let server = &services.server;
|
||||
|
||||
#[cfg(debug_assertions)]
|
||||
conduwuit::defer! {{
|
||||
_ = server
|
||||
.metrics
|
||||
.requests_spawn_active
|
||||
.fetch_sub(1, Ordering::Relaxed);
|
||||
}};
|
||||
|
||||
if !server.running() {
|
||||
debug_warn!("unavailable pending shutdown");
|
||||
return Err(StatusCode::SERVICE_UNAVAILABLE);
|
||||
}
|
||||
|
||||
let fut = next.run(req);
|
||||
let task = server.runtime().spawn(fut);
|
||||
task.await.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)
|
||||
}
|
||||
|
||||
#[tracing::instrument(
|
||||
level = "debug",
|
||||
skip_all,
|
||||
fields(
|
||||
handled = %services
|
||||
.server
|
||||
.metrics
|
||||
.requests_handle_finished
|
||||
.fetch_add(1, Ordering::Relaxed),
|
||||
active = %services
|
||||
.server
|
||||
.metrics
|
||||
.requests_handle_active
|
||||
.fetch_add(1, Ordering::Relaxed),
|
||||
)
|
||||
)]
|
||||
#[tracing::instrument(name = "request", level = "debug", skip_all)]
|
||||
pub(crate) async fn handle(
|
||||
State(services): State<Arc<Services>>,
|
||||
req: http::Request<axum::body::Body>,
|
||||
next: axum::middleware::Next,
|
||||
) -> Result<Response, StatusCode> {
|
||||
let server = &services.server;
|
||||
|
||||
#[cfg(debug_assertions)]
|
||||
conduwuit::defer! {{
|
||||
_ = server
|
||||
.metrics
|
||||
.requests_handle_active
|
||||
.fetch_sub(1, Ordering::Relaxed);
|
||||
}};
|
||||
|
||||
if !server.running() {
|
||||
if !services.server.running() {
|
||||
debug_warn!(
|
||||
method = %req.method(),
|
||||
uri = %req.uri(),
|
||||
@@ -93,14 +33,74 @@ pub(crate) async fn handle(
|
||||
|
||||
let uri = req.uri().clone();
|
||||
let method = req.method().clone();
|
||||
let result = next.run(req).await;
|
||||
handle_result(&method, &uri, result)
|
||||
let services_ = services.clone();
|
||||
let parent = Span::current();
|
||||
let task = services.server.runtime().spawn(async move {
|
||||
tokio::select! {
|
||||
response = execute(&services_, req, next, parent) => response,
|
||||
response = services_.server.until_shutdown()
|
||||
.then(|()| {
|
||||
let timeout = services_.server.config.client_shutdown_timeout;
|
||||
let timeout = Duration::from_secs(timeout);
|
||||
sleep(timeout)
|
||||
})
|
||||
.map(|()| StatusCode::SERVICE_UNAVAILABLE)
|
||||
.map(IntoResponse::into_response) => response,
|
||||
}
|
||||
});
|
||||
|
||||
task.await
|
||||
.map_err(unhandled)
|
||||
.and_then(move |result| handle_result(&method, &uri, result))
|
||||
}
|
||||
|
||||
#[tracing::instrument(
|
||||
name = "handle",
|
||||
level = "debug",
|
||||
parent = parent,
|
||||
skip_all,
|
||||
fields(
|
||||
active = %services
|
||||
.server
|
||||
.metrics
|
||||
.requests_handle_active
|
||||
.fetch_add(1, Ordering::Relaxed),
|
||||
handled = %services
|
||||
.server
|
||||
.metrics
|
||||
.requests_handle_finished
|
||||
.load(Ordering::Relaxed),
|
||||
)
|
||||
)]
|
||||
async fn execute(
|
||||
// we made a safety contract that Services will not go out of scope
|
||||
// during the request; this ensures a reference is accounted for at
|
||||
// the base frame of the task regardless of its detachment.
|
||||
services: &Arc<Services>,
|
||||
req: http::Request<axum::body::Body>,
|
||||
next: axum::middleware::Next,
|
||||
parent: Span,
|
||||
) -> Response {
|
||||
#[cfg(debug_assertions)]
|
||||
conduwuit::defer! {{
|
||||
_ = services.server
|
||||
.metrics
|
||||
.requests_handle_finished
|
||||
.fetch_add(1, Ordering::Relaxed);
|
||||
_ = services.server
|
||||
.metrics
|
||||
.requests_handle_active
|
||||
.fetch_sub(1, Ordering::Relaxed);
|
||||
}};
|
||||
|
||||
next.run(req).await
|
||||
}
|
||||
|
||||
fn handle_result(method: &Method, uri: &Uri, result: Response) -> Result<Response, StatusCode> {
|
||||
let status = result.status();
|
||||
let reason = status.canonical_reason().unwrap_or("Unknown Reason");
|
||||
let code = status.as_u16();
|
||||
|
||||
if status.is_server_error() {
|
||||
error!(method = ?method, uri = ?uri, "{code} {reason}");
|
||||
} else if status.is_client_error() {
|
||||
@@ -117,3 +117,10 @@ fn handle_result(method: &Method, uri: &Uri, result: Response) -> Result<Respons
|
||||
|
||||
Ok(result)
|
||||
}
|
||||
|
||||
#[cold]
|
||||
fn unhandled<Error: Debug>(e: Error) -> StatusCode {
|
||||
error!("unhandled error or panic during request: {e:?}");
|
||||
|
||||
StatusCode::INTERNAL_SERVER_ERROR
|
||||
}
|
||||
|
||||
+9
-26
@@ -9,6 +9,7 @@ use std::{
|
||||
|
||||
use axum_server::Handle as ServerHandle;
|
||||
use conduwuit::{debug, debug_error, debug_info, error, info, Error, Result, Server};
|
||||
use futures::FutureExt;
|
||||
use service::Services;
|
||||
use tokio::{
|
||||
sync::broadcast::{self, Sender},
|
||||
@@ -99,46 +100,28 @@ pub(crate) async fn stop(services: Arc<Services>) -> Result<()> {
|
||||
);
|
||||
}
|
||||
|
||||
#[cfg(all(feature = "systemd", target_os = "linux"))]
|
||||
sd_notify::notify(true, &[sd_notify::NotifyState::Stopping])
|
||||
.expect("failed to notify systemd of stopping state");
|
||||
|
||||
info!("Shutdown complete.");
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tracing::instrument(skip_all)]
|
||||
async fn signal(server: Arc<Server>, tx: Sender<()>, handle: axum_server::Handle) {
|
||||
loop {
|
||||
let sig: &'static str = server
|
||||
.signal
|
||||
.subscribe()
|
||||
.recv()
|
||||
.await
|
||||
.expect("channel error");
|
||||
|
||||
if !server.running() {
|
||||
handle_shutdown(&server, &tx, &handle, sig).await;
|
||||
break;
|
||||
}
|
||||
}
|
||||
server
|
||||
.clone()
|
||||
.until_shutdown()
|
||||
.then(move |()| handle_shutdown(server, tx, handle))
|
||||
.await;
|
||||
}
|
||||
|
||||
async fn handle_shutdown(
|
||||
server: &Arc<Server>,
|
||||
tx: &Sender<()>,
|
||||
handle: &axum_server::Handle,
|
||||
sig: &str,
|
||||
) {
|
||||
debug!("Received signal {sig}");
|
||||
async fn handle_shutdown(server: Arc<Server>, tx: Sender<()>, handle: axum_server::Handle) {
|
||||
if let Err(e) = tx.send(()) {
|
||||
error!("failed sending shutdown transaction to channel: {e}");
|
||||
}
|
||||
|
||||
let timeout = Duration::from_secs(36);
|
||||
let timeout = server.config.client_shutdown_timeout;
|
||||
let timeout = Duration::from_secs(timeout);
|
||||
debug!(
|
||||
?timeout,
|
||||
spawn_active = ?server.metrics.requests_spawn_active.load(Ordering::Relaxed),
|
||||
handle_active = ?server.metrics.requests_handle_active.load(Ordering::Relaxed),
|
||||
"Notifying for graceful shutdown"
|
||||
);
|
||||
|
||||
@@ -24,27 +24,20 @@ pub(super) async fn serve(
|
||||
info!("Listening on {addrs:?}");
|
||||
while join_set.join_next().await.is_some() {}
|
||||
|
||||
let spawn_active = server.metrics.requests_spawn_active.load(Ordering::Relaxed);
|
||||
let handle_active = server
|
||||
.metrics
|
||||
.requests_handle_active
|
||||
.load(Ordering::Relaxed);
|
||||
debug_info!(
|
||||
spawn_finished = server
|
||||
.metrics
|
||||
.requests_spawn_finished
|
||||
.load(Ordering::Relaxed),
|
||||
handle_finished = server
|
||||
.metrics
|
||||
.requests_handle_finished
|
||||
.load(Ordering::Relaxed),
|
||||
panics = server.metrics.requests_panic.load(Ordering::Relaxed),
|
||||
spawn_active,
|
||||
handle_active,
|
||||
"Stopped listening on {addrs:?}",
|
||||
);
|
||||
|
||||
debug_assert!(spawn_active == 0, "active request tasks are not joined");
|
||||
debug_assert!(handle_active == 0, "active request handles still pending");
|
||||
|
||||
Ok(())
|
||||
|
||||
@@ -17,10 +17,9 @@ pub(super) async fn serve(
|
||||
addrs: Vec<SocketAddr>,
|
||||
) -> Result {
|
||||
let tls = &server.config.tls;
|
||||
let certs = tls
|
||||
.certs
|
||||
.as_ref()
|
||||
.ok_or_else(|| err!(Config("tls.certs", "Missing required value in tls config section")))?;
|
||||
let certs = tls.certs.as_ref().ok_or_else(|| {
|
||||
err!(Config("tls.certs", "Missing required value in tls config section"))
|
||||
})?;
|
||||
let key = tls
|
||||
.key
|
||||
.as_ref()
|
||||
|
||||
@@ -159,7 +159,12 @@ async fn fini(server: &Arc<Server>, listener: UnixListener, mut tasks: JoinSet<(
|
||||
drop(listener);
|
||||
|
||||
debug!("Waiting for requests to finish...");
|
||||
while server.metrics.requests_spawn_active.load(Ordering::Relaxed) > 0 {
|
||||
while server
|
||||
.metrics
|
||||
.requests_handle_active
|
||||
.load(Ordering::Relaxed)
|
||||
.gt(&0)
|
||||
{
|
||||
tokio::select! {
|
||||
task = tasks.join_next() => if task.is_none() { break; },
|
||||
() = sleep(FINI_POLL_INTERVAL) => {},
|
||||
|
||||
@@ -44,6 +44,7 @@ url_preview = [
|
||||
zstd_compression = [
|
||||
"reqwest/zstd",
|
||||
]
|
||||
blurhashing = ["dep:image","dep:blurhash"]
|
||||
|
||||
[dependencies]
|
||||
arrayvec.workspace = true
|
||||
@@ -74,6 +75,7 @@ serde_json.workspace = true
|
||||
serde.workspace = true
|
||||
serde_yaml.workspace = true
|
||||
sha2.workspace = true
|
||||
smallvec.workspace = true
|
||||
termimad.workspace = true
|
||||
termimad.optional = true
|
||||
tokio.workspace = true
|
||||
@@ -81,6 +83,8 @@ tracing.workspace = true
|
||||
url.workspace = true
|
||||
webpage.workspace = true
|
||||
webpage.optional = true
|
||||
blurhash.workspace = true
|
||||
blurhash.optional = true
|
||||
|
||||
[lints]
|
||||
workspace = true
|
||||
|
||||
@@ -5,7 +5,7 @@ use conduwuit::{
|
||||
utils::{result::LogErr, stream::TryIgnore, ReadyExt},
|
||||
Err, Result,
|
||||
};
|
||||
use database::{Deserialized, Handle, Interfix, Json, Map};
|
||||
use database::{Deserialized, Handle, Ignore, Json, Map};
|
||||
use futures::{Stream, StreamExt, TryFutureExt};
|
||||
use ruma::{
|
||||
events::{
|
||||
@@ -131,18 +131,20 @@ pub fn changes_since<'a>(
|
||||
room_id: Option<&'a RoomId>,
|
||||
user_id: &'a UserId,
|
||||
since: u64,
|
||||
to: Option<u64>,
|
||||
) -> impl Stream<Item = AnyRawAccountDataEvent> + Send + 'a {
|
||||
let prefix = (room_id, user_id, Interfix);
|
||||
let prefix = database::serialize_key(prefix).expect("failed to serialize prefix");
|
||||
type Key<'a> = (Option<&'a RoomId>, &'a UserId, u64, Ignore);
|
||||
|
||||
// Skip the data that's exactly at since, because we sent that last time
|
||||
let first_possible = (room_id, user_id, since.saturating_add(1));
|
||||
|
||||
self.db
|
||||
.roomuserdataid_accountdata
|
||||
.stream_from_raw(&first_possible)
|
||||
.stream_from(&first_possible)
|
||||
.ignore_err()
|
||||
.ready_take_while(move |(k, _)| k.starts_with(&prefix))
|
||||
.ready_take_while(move |((room_id_, user_id_, count, _), _): &(Key<'_>, _)| {
|
||||
room_id == *room_id_ && user_id == *user_id_ && to.is_none_or(|to| *count <= to)
|
||||
})
|
||||
.map(move |(_, v)| {
|
||||
match room_id {
|
||||
| Some(_) => serde_json::from_slice::<Raw<AnyRoomAccountDataEvent>>(v)
|
||||
|
||||
@@ -1,10 +1,11 @@
|
||||
#![cfg(feature = "console")]
|
||||
|
||||
use std::{
|
||||
collections::VecDeque,
|
||||
sync::{Arc, Mutex},
|
||||
};
|
||||
|
||||
use conduwuit::{debug, defer, error, log, Server};
|
||||
use conduwuit::{debug, defer, error, log, log::is_systemd_mode, Server};
|
||||
use futures::future::{AbortHandle, Abortable};
|
||||
use ruma::events::room::message::RoomMessageEventContent;
|
||||
use rustyline_async::{Readline, ReadlineError, ReadlineEvent};
|
||||
@@ -123,7 +124,7 @@ impl Console {
|
||||
}
|
||||
|
||||
async fn readline(self: &Arc<Self>) -> Result<ReadlineEvent, ReadlineError> {
|
||||
let _suppression = log::Suppress::new(&self.server);
|
||||
let _suppression = (!is_systemd_mode()).then(|| log::Suppress::new(&self.server));
|
||||
|
||||
let (mut readline, _writer) = Readline::new(PROMPT.to_owned())?;
|
||||
let self_ = Arc::clone(self);
|
||||
|
||||
@@ -2,6 +2,8 @@ use conduwuit::{debug, debug_info, error, implement, info, Err, Result};
|
||||
use ruma::events::room::message::RoomMessageEventContent;
|
||||
use tokio::time::{sleep, Duration};
|
||||
|
||||
pub(super) const SIGNAL: &str = "SIGUSR2";
|
||||
|
||||
/// Possibly spawn the terminal console at startup if configured.
|
||||
#[implement(super::Service)]
|
||||
pub(super) async fn console_auto_start(&self) {
|
||||
@@ -22,7 +24,7 @@ pub(super) async fn console_auto_stop(&self) {
|
||||
|
||||
/// Execute admin commands after startup
|
||||
#[implement(super::Service)]
|
||||
pub(super) async fn startup_execute(&self) -> Result<()> {
|
||||
pub(super) async fn startup_execute(&self) -> Result {
|
||||
// List of comamnds to execute
|
||||
let commands = &self.services.server.config.admin_execute;
|
||||
|
||||
@@ -36,7 +38,7 @@ pub(super) async fn startup_execute(&self) -> Result<()> {
|
||||
sleep(Duration::from_millis(500)).await;
|
||||
|
||||
for (i, command) in commands.iter().enumerate() {
|
||||
if let Err(e) = self.startup_execute_command(i, command.clone()).await {
|
||||
if let Err(e) = self.execute_command(i, command.clone()).await {
|
||||
if !errors {
|
||||
return Err(e);
|
||||
}
|
||||
@@ -59,16 +61,38 @@ pub(super) async fn startup_execute(&self) -> Result<()> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Execute one admin command after startup
|
||||
/// Execute admin commands after signal
|
||||
#[implement(super::Service)]
|
||||
async fn startup_execute_command(&self, i: usize, command: String) -> Result<()> {
|
||||
debug!("Startup command #{i}: executing {command:?}");
|
||||
pub(super) async fn signal_execute(&self) -> Result {
|
||||
// List of comamnds to execute
|
||||
let commands = self.services.server.config.admin_signal_execute.clone();
|
||||
|
||||
// When true, errors are ignored and execution continues.
|
||||
let ignore_errors = self.services.server.config.admin_execute_errors_ignore;
|
||||
|
||||
for (i, command) in commands.iter().enumerate() {
|
||||
if let Err(e) = self.execute_command(i, command.clone()).await {
|
||||
if !ignore_errors {
|
||||
return Err(e);
|
||||
}
|
||||
}
|
||||
|
||||
tokio::task::yield_now().await;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Execute one admin command after startup or signal
|
||||
#[implement(super::Service)]
|
||||
async fn execute_command(&self, i: usize, command: String) -> Result {
|
||||
debug!("Execute command #{i}: executing {command:?}");
|
||||
|
||||
match self.command_in_place(command, None).await {
|
||||
| Ok(Some(output)) => Self::startup_command_output(i, &output),
|
||||
| Err(output) => Self::startup_command_error(i, &output),
|
||||
| Ok(Some(output)) => Self::execute_command_output(i, &output),
|
||||
| Err(output) => Self::execute_command_error(i, &output),
|
||||
| Ok(None) => {
|
||||
info!("Startup command #{i} completed (no output).");
|
||||
info!("Execute command #{i} completed (no output).");
|
||||
Ok(())
|
||||
},
|
||||
}
|
||||
@@ -76,28 +100,28 @@ async fn startup_execute_command(&self, i: usize, command: String) -> Result<()>
|
||||
|
||||
#[cfg(feature = "console")]
|
||||
#[implement(super::Service)]
|
||||
fn startup_command_output(i: usize, content: &RoomMessageEventContent) -> Result<()> {
|
||||
debug_info!("Startup command #{i} completed:");
|
||||
fn execute_command_output(i: usize, content: &RoomMessageEventContent) -> Result {
|
||||
debug_info!("Execute command #{i} completed:");
|
||||
super::console::print(content.body());
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[cfg(feature = "console")]
|
||||
#[implement(super::Service)]
|
||||
fn startup_command_error(i: usize, content: &RoomMessageEventContent) -> Result<()> {
|
||||
fn execute_command_error(i: usize, content: &RoomMessageEventContent) -> Result {
|
||||
super::console::print_err(content.body());
|
||||
Err!(debug_error!("Startup command #{i} failed."))
|
||||
Err!(debug_error!("Execute command #{i} failed."))
|
||||
}
|
||||
|
||||
#[cfg(not(feature = "console"))]
|
||||
#[implement(super::Service)]
|
||||
fn startup_command_output(i: usize, content: &RoomMessageEventContent) -> Result<()> {
|
||||
info!("Startup command #{i} completed:\n{:#}", content.body());
|
||||
fn execute_command_output(i: usize, content: &RoomMessageEventContent) -> Result {
|
||||
info!("Execute command #{i} completed:\n{:#}", content.body());
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[cfg(not(feature = "console"))]
|
||||
#[implement(super::Service)]
|
||||
fn startup_command_error(i: usize, content: &RoomMessageEventContent) -> Result<()> {
|
||||
Err!(error!("Startup command #{i} failed:\n{:#}", content.body()))
|
||||
fn execute_command_error(i: usize, content: &RoomMessageEventContent) -> Result {
|
||||
Err!(error!("Execute command #{i} failed:\n{:#}", content.body()))
|
||||
}
|
||||
@@ -1,7 +1,7 @@
|
||||
pub mod console;
|
||||
mod create;
|
||||
mod execute;
|
||||
mod grant;
|
||||
mod startup;
|
||||
|
||||
use std::{
|
||||
future::Future,
|
||||
@@ -183,7 +183,11 @@ impl Service {
|
||||
.map(|complete| complete(command))
|
||||
}
|
||||
|
||||
async fn handle_signal(&self, #[allow(unused_variables)] sig: &'static str) {
|
||||
async fn handle_signal(&self, sig: &'static str) {
|
||||
if sig == execute::SIGNAL {
|
||||
self.signal_execute().await.ok();
|
||||
}
|
||||
|
||||
#[cfg(feature = "console")]
|
||||
self.console.handle_signal(sig).await;
|
||||
}
|
||||
|
||||
@@ -0,0 +1,70 @@
|
||||
use std::{iter, ops::Deref, path::Path, sync::Arc};
|
||||
|
||||
use async_trait::async_trait;
|
||||
use conduwuit::{
|
||||
config::{check, Config},
|
||||
error, implement, Result, Server,
|
||||
};
|
||||
|
||||
pub struct Service {
|
||||
server: Arc<Server>,
|
||||
}
|
||||
|
||||
const SIGNAL: &str = "SIGUSR1";
|
||||
|
||||
#[async_trait]
|
||||
impl crate::Service for Service {
|
||||
fn build(args: crate::Args<'_>) -> Result<Arc<Self>> {
|
||||
Ok(Arc::new(Self { server: args.server.clone() }))
|
||||
}
|
||||
|
||||
async fn worker(self: Arc<Self>) -> Result {
|
||||
while self.server.running() {
|
||||
if self.server.signal.subscribe().recv().await == Ok(SIGNAL) {
|
||||
if let Err(e) = self.handle_reload() {
|
||||
error!("Failed to reload config: {e}");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn name(&self) -> &str { crate::service::make_name(std::module_path!()) }
|
||||
}
|
||||
|
||||
impl Deref for Service {
|
||||
type Target = Arc<Config>;
|
||||
|
||||
#[inline]
|
||||
fn deref(&self) -> &Self::Target { &self.server.config }
|
||||
}
|
||||
|
||||
#[implement(Service)]
|
||||
fn handle_reload(&self) -> Result {
|
||||
if self.server.config.config_reload_signal {
|
||||
#[cfg(all(feature = "systemd", target_os = "linux"))]
|
||||
sd_notify::notify(true, &[sd_notify::NotifyState::Reloading])
|
||||
.expect("failed to notify systemd of reloading state");
|
||||
|
||||
self.reload(iter::empty())?;
|
||||
|
||||
#[cfg(all(feature = "systemd", target_os = "linux"))]
|
||||
sd_notify::notify(true, &[sd_notify::NotifyState::Ready])
|
||||
.expect("failed to notify systemd of ready state");
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[implement(Service)]
|
||||
pub fn reload<'a, I>(&self, paths: I) -> Result<Arc<Config>>
|
||||
where
|
||||
I: Iterator<Item = &'a Path>,
|
||||
{
|
||||
let old = self.server.config.clone();
|
||||
let new = Config::load(paths).and_then(|raw| Config::new(&raw))?;
|
||||
|
||||
check::reload(&old, &new)?;
|
||||
self.server.config.update(new)
|
||||
}
|
||||
@@ -1,9 +1,9 @@
|
||||
use std::mem;
|
||||
use std::{fmt::Debug, mem};
|
||||
|
||||
use bytes::Bytes;
|
||||
use conduwuit::{
|
||||
debug, debug_error, debug_warn, err, error::inspect_debug_log, implement, trace,
|
||||
utils::string::EMPTY, Err, Error, Result,
|
||||
debug, debug::INFO_SPAN_LEVEL, debug_error, debug_warn, err, error::inspect_debug_log,
|
||||
implement, trace, utils::string::EMPTY, Err, Error, Result,
|
||||
};
|
||||
use http::{header::AUTHORIZATION, HeaderValue};
|
||||
use ipaddress::IPAddress;
|
||||
@@ -20,82 +20,110 @@ use ruma::{
|
||||
|
||||
use crate::resolver::actual::ActualDest;
|
||||
|
||||
impl super::Service {
|
||||
#[tracing::instrument(
|
||||
level = "debug"
|
||||
/// Sends a request to a federation server
|
||||
#[implement(super::Service)]
|
||||
#[tracing::instrument(skip_all, name = "request", level = "debug")]
|
||||
pub async fn execute<T>(&self, dest: &ServerName, request: T) -> Result<T::IncomingResponse>
|
||||
where
|
||||
T: OutgoingRequest + Debug + Send,
|
||||
{
|
||||
let client = &self.services.client.federation;
|
||||
self.execute_on(client, dest, request).await
|
||||
}
|
||||
|
||||
/// Like execute() but with a very large timeout
|
||||
#[implement(super::Service)]
|
||||
#[tracing::instrument(skip_all, name = "synapse", level = "debug")]
|
||||
pub async fn execute_synapse<T>(
|
||||
&self,
|
||||
dest: &ServerName,
|
||||
request: T,
|
||||
) -> Result<T::IncomingResponse>
|
||||
where
|
||||
T: OutgoingRequest + Debug + Send,
|
||||
{
|
||||
let client = &self.services.client.synapse;
|
||||
self.execute_on(client, dest, request).await
|
||||
}
|
||||
|
||||
#[implement(super::Service)]
|
||||
#[tracing::instrument(
|
||||
name = "fed",
|
||||
level = INFO_SPAN_LEVEL,
|
||||
skip(self, client, request),
|
||||
)]
|
||||
pub async fn send<T>(
|
||||
&self,
|
||||
client: &Client,
|
||||
dest: &ServerName,
|
||||
request: T,
|
||||
) -> Result<T::IncomingResponse>
|
||||
where
|
||||
T: OutgoingRequest + Send,
|
||||
pub async fn execute_on<T>(
|
||||
&self,
|
||||
client: &Client,
|
||||
dest: &ServerName,
|
||||
request: T,
|
||||
) -> Result<T::IncomingResponse>
|
||||
where
|
||||
T: OutgoingRequest + Send,
|
||||
{
|
||||
if !self.services.server.config.allow_federation {
|
||||
return Err!(Config("allow_federation", "Federation is disabled."));
|
||||
}
|
||||
|
||||
if self
|
||||
.services
|
||||
.server
|
||||
.config
|
||||
.forbidden_remote_server_names
|
||||
.contains(dest)
|
||||
{
|
||||
if !self.server.config.allow_federation {
|
||||
return Err!(Config("allow_federation", "Federation is disabled."));
|
||||
}
|
||||
|
||||
if self
|
||||
.server
|
||||
.config
|
||||
.forbidden_remote_server_names
|
||||
.contains(dest)
|
||||
{
|
||||
return Err!(Request(Forbidden(debug_warn!(
|
||||
"Federation with {dest} is not allowed."
|
||||
))));
|
||||
}
|
||||
|
||||
let actual = self.services.resolver.get_actual_dest(dest).await?;
|
||||
let request = into_http_request::<T>(&actual, request)?;
|
||||
let request = self.prepare(dest, request)?;
|
||||
self.execute::<T>(dest, &actual, request, client).await
|
||||
return Err!(Request(Forbidden(debug_warn!("Federation with {dest} is not allowed."))));
|
||||
}
|
||||
|
||||
async fn execute<T>(
|
||||
&self,
|
||||
dest: &ServerName,
|
||||
actual: &ActualDest,
|
||||
request: Request,
|
||||
client: &Client,
|
||||
) -> Result<T::IncomingResponse>
|
||||
where
|
||||
T: OutgoingRequest + Send,
|
||||
{
|
||||
let url = request.url().clone();
|
||||
let method = request.method().clone();
|
||||
let actual = self.services.resolver.get_actual_dest(dest).await?;
|
||||
let request = into_http_request::<T>(&actual, request)?;
|
||||
let request = self.prepare(dest, request)?;
|
||||
self.perform::<T>(dest, &actual, request, client).await
|
||||
}
|
||||
|
||||
debug!(?method, ?url, "Sending request");
|
||||
match client.execute(request).await {
|
||||
| Ok(response) => handle_response::<T>(dest, actual, &method, &url, response).await,
|
||||
| Err(error) =>
|
||||
Err(handle_error(actual, &method, &url, error).expect_err("always returns error")),
|
||||
#[implement(super::Service)]
|
||||
async fn perform<T>(
|
||||
&self,
|
||||
dest: &ServerName,
|
||||
actual: &ActualDest,
|
||||
request: Request,
|
||||
client: &Client,
|
||||
) -> Result<T::IncomingResponse>
|
||||
where
|
||||
T: OutgoingRequest + Send,
|
||||
{
|
||||
let url = request.url().clone();
|
||||
let method = request.method().clone();
|
||||
|
||||
debug!(?method, ?url, "Sending request");
|
||||
match client.execute(request).await {
|
||||
| Ok(response) => handle_response::<T>(dest, actual, &method, &url, response).await,
|
||||
| Err(error) =>
|
||||
Err(handle_error(actual, &method, &url, error).expect_err("always returns error")),
|
||||
}
|
||||
}
|
||||
|
||||
#[implement(super::Service)]
|
||||
fn prepare(&self, dest: &ServerName, mut request: http::Request<Vec<u8>>) -> Result<Request> {
|
||||
self.sign_request(&mut request, dest);
|
||||
|
||||
let request = Request::try_from(request)?;
|
||||
self.validate_url(request.url())?;
|
||||
self.services.server.check_running()?;
|
||||
|
||||
Ok(request)
|
||||
}
|
||||
|
||||
#[implement(super::Service)]
|
||||
fn validate_url(&self, url: &Url) -> Result<()> {
|
||||
if let Some(url_host) = url.host_str() {
|
||||
if let Ok(ip) = IPAddress::parse(url_host) {
|
||||
trace!("Checking request URL IP {ip:?}");
|
||||
self.services.resolver.validate_ip(&ip)?;
|
||||
}
|
||||
}
|
||||
|
||||
fn prepare(&self, dest: &ServerName, mut request: http::Request<Vec<u8>>) -> Result<Request> {
|
||||
self.sign_request(&mut request, dest);
|
||||
|
||||
let request = Request::try_from(request)?;
|
||||
self.validate_url(request.url())?;
|
||||
self.server.check_running()?;
|
||||
|
||||
Ok(request)
|
||||
}
|
||||
|
||||
fn validate_url(&self, url: &Url) -> Result<()> {
|
||||
if let Some(url_host) = url.host_str() {
|
||||
if let Ok(ip) = IPAddress::parse(url_host) {
|
||||
trace!("Checking request URL IP {ip:?}");
|
||||
self.services.resolver.validate_ip(&ip)?;
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn handle_response<T>(
|
||||
@@ -195,7 +223,7 @@ fn sign_request(&self, http_request: &mut http::Request<Vec<u8>>, dest: &ServerN
|
||||
type Value = CanonicalJsonValue;
|
||||
type Object = CanonicalJsonObject;
|
||||
|
||||
let origin = self.services.globals.server_name();
|
||||
let origin = &self.services.server.name;
|
||||
let body = http_request.body();
|
||||
let uri = http_request
|
||||
.uri()
|
||||
Some files were not shown because too many files have changed in this diff Show More
Reference in New Issue
Block a user