Compare commits

...

68 Commits

Author SHA1 Message Date
strawberry b6e9dc3d98 comment out borked ci thing for now
Signed-off-by: strawberry <strawberry@puppygock.gay>
2025-02-09 10:17:28 -05:00
strawberry cfcd6eb1a6 bump ruwuma to stop erroring on empty push response body
Signed-off-by: strawberry <strawberry@puppygock.gay>
2025-02-07 18:00:58 -05:00
strawberry 88e7e50daf add missing source OCI image label metadata
Signed-off-by: strawberry <strawberry@puppygock.gay>
2025-02-07 11:49:00 -05:00
strawberry 8345ea2cd3 add --locked and --no-fail-fast to cargo test, add other feature test
Signed-off-by: strawberry <strawberry@puppygock.gay>
2025-02-06 19:02:14 -05:00
strawberry add2e0e9ee bump rust-rocksdb
Signed-off-by: strawberry <strawberry@puppygock.gay>
2025-02-06 18:27:36 -05:00
strawberry 43e6c27bb7 misc nix tweaks to maybe speedup ci
Signed-off-by: strawberry <strawberry@puppygock.gay>
2025-02-06 18:27:36 -05:00
strawberry c7c9f0e4a6 catch clippy lints for --no-default-features builds
Signed-off-by: strawberry <strawberry@puppygock.gay>
2025-02-06 18:27:36 -05:00
strawberry ef2d307c15 fix warnings and errors when building with no features
Signed-off-by: strawberry <strawberry@puppygock.gay>
2025-02-06 18:27:32 -05:00
strawberry f761d4d5c9 bump db version to 17, cleanup, rerun old migrations for users who downgraded
Signed-off-by: strawberry <strawberry@puppygock.gay>
2025-02-06 16:48:19 -05:00
strawberry 16b07ae3ec add default systemd support for a TTY to use console mode from
Signed-off-by: strawberry <strawberry@puppygock.gay>
2025-02-06 16:47:10 -05:00
Jason Volk 62d80b97e6 add systemd unit logging mode
Signed-off-by: Jason Volk <jason@zemos.net>
2025-02-06 03:16:56 +00:00
strawberry fda8b36809 add more systemd notify integration with stopping/reloading/ready states
Signed-off-by: strawberry <strawberry@puppygock.gay>
2025-02-05 07:32:00 +00:00
strawberry f6dfc9538f bump ruwuma to stop erroring on duplicate yaml values on appservice EDUs (we dont implement this atm anyways)
Signed-off-by: strawberry <strawberry@puppygock.gay>
2025-02-05 07:32:00 +00:00
strawberry f80d85e107 add SIGUSR1 systemctl reload config support to systemd units
Signed-off-by: strawberry <strawberry@puppygock.gay>
2025-02-05 07:32:00 +00:00
Jason Volk 9158edfb7c fix empty join timeline bug
Signed-off-by: Jason Volk <jason@zemos.net>
2025-02-05 07:32:00 +00:00
Jason Volk 04656a7886 fix spaces pagination bug
Signed-off-by: Jason Volk <jason@zemos.net>
2025-02-05 07:32:00 +00:00
Jason Volk 442bb9889c improvements on blurhashing feature
Signed-off-by: Jason Volk <jason@zemos.net>
2025-02-04 21:47:03 +00:00
Niko 62180897c0 Added blurhash.rs to fascilitate blurhashing.
Signed-off-by: Niko <cnotsomark@gmail.com>
2025-02-04 21:47:03 +00:00
Nineko 80277f6aa2 Adds .gitattributes to the projects to prevent LN and CLRF conflicts. (#681) 2025-02-04 16:46:00 -05:00
Jason Volk d32534164c fix soft-failed redaction regression (ff8bbd4cfa)
Signed-off-by: Jason Volk <jason@zemos.net>
2025-02-04 21:00:12 +00:00
Jason Volk b3271e0d65 split state_accessor
Signed-off-by: Jason Volk <jason@zemos.net>
2025-02-04 00:02:00 +00:00
Jason Volk 106bcd30b7 optimize incremental sync state diff
Signed-off-by: Jason Volk <jason@zemos.net>
2025-02-04 00:02:00 +00:00
Jason Volk da4b94d80d trap panics when running in gdb
Signed-off-by: Jason Volk <jason@zemos.net>
2025-02-04 00:02:00 +00:00
Jason Volk 32f990fc72 fix the panic counter in the tower layer
Signed-off-by: Jason Volk <jason@zemos.net>
2025-02-03 01:16:09 +00:00
Jason Volk 5e59ce37c4 snapshot sync results at next_batch upper-bound
Signed-off-by: Jason Volk <jason@zemos.net>
2025-02-02 16:30:55 +00:00
Jason Volk a774afe837 modernize remove_to_device_events
Signed-off-by: Jason Volk <jason@zemos.net>
2025-02-02 16:30:55 +00:00
Jason Volk ffe3b0faf2 make shutdown grace periods configurable
Signed-off-by: Jason Volk <jason@zemos.net>
2025-02-02 16:30:55 +00:00
Jason Volk bd6d4bc58f enforce timeout on request layers
Signed-off-by: Jason Volk <jason@zemos.net>
2025-02-02 16:30:55 +00:00
Jason Volk b4d22bd05e remove unnecessary cf arc refcnt workaround
log errors and panics propagating through the request task join

Signed-off-by: Jason Volk <jason@zemos.net>
2025-02-02 10:08:30 +00:00
Jason Volk 7ce782ddf4 fix jemalloc cfgs lacking msvc conditions
Signed-off-by: Jason Volk <jason@zemos.net>
2025-02-02 10:08:30 +00:00
Jason Volk 4add39d0fe cache compressed state in a sorted structure for logarithmic queries with partial keys
Signed-off-by: Jason Volk <jason@zemos.net>
2025-02-02 10:08:30 +00:00
Jason Volk ea49b60273 add Option support to database deserializer
Signed-off-by: Jason Volk <jason@zemos.net>
2025-02-02 00:09:18 +00:00
Jason Volk 2fa9621f3a flatten state_full_shortids
Signed-off-by: Jason Volk <jason@zemos.net>
2025-01-31 22:46:26 +00:00
Jason Volk 09bc71caab fix missed concurrent fetch opportunities in sender (ffd0fd4242)
Signed-off-by: Jason Volk <jason@zemos.net>
2025-01-31 22:46:26 +00:00
Jason Volk 6983798487 implement lazy-loading for incremental sync
Signed-off-by: Jason Volk <jason@zemos.net>
2025-01-31 22:46:26 +00:00
Jason Volk a4ef04cd14 fix room join completion taking wrong sync branch
Signed-off-by: Jason Volk <jason@zemos.net>
2025-01-31 22:46:26 +00:00
Jason Volk 4e0cedbe51 simplify v3 sync presence collecting
Signed-off-by: Jason Volk <jason@zemos.net>
2025-01-31 22:46:26 +00:00
Jason Volk 4ff1155bf0 reroll encrypted_room branch in incremental sync state
Signed-off-by: Jason Volk <jason@zemos.net>
2025-01-31 22:46:26 +00:00
Jason Volk e161e5dd61 add pair_of! macro
Signed-off-by: Jason Volk <jason@zemos.net>
2025-01-31 22:45:37 +00:00
morguldir f698254c41 make registration tokens reloadable, and allow configuring multiple
Signed-off-by: morguldir <morguldir@protonmail.com>
2025-01-31 03:09:02 +01:00
Jason Volk 69837671bb simplify request handler task base
Signed-off-by: Jason Volk <jason@zemos.net>
2025-01-30 18:58:32 +00:00
Jason Volk ff8bbd4cfa untwist the redaction check stanza
Signed-off-by: Jason Volk <jason@zemos.net>
2025-01-30 05:26:30 +00:00
Jason Volk 1a8482b3b4 refactor incoming extremities retention; broad filter, single pass
Signed-off-by: Jason Volk <jason@zemos.net>
2025-01-30 05:26:30 +00:00
Jason Volk 31c2968bb2 move db files command w/ filter args; misc related cleanup
Signed-off-by: Jason Volk <jason@zemos.net>
2025-01-29 23:21:08 +00:00
Jason Volk 3c8376d897 parallelize state-res pre-gathering
Signed-off-by: Jason Volk <jason@zemos.net>
2025-01-29 23:21:07 +00:00
Jason Volk 50acfe7832 flatten auth chain iterations
Signed-off-by: Jason Volk <jason@zemos.net>
2025-01-29 21:28:26 +00:00
Jason Volk eb7d893c86 fix malloc_conf feature-awareness
Signed-off-by: Jason Volk <jason@zemos.net>
2025-01-29 06:37:30 +00:00
Jason Volk 936161d89e reduce bottommost compression underrides
Signed-off-by: Jason Volk <jason@zemos.net>
2025-01-29 03:09:13 +00:00
Jason Volk 329925c661 additional info level span adjustments
Signed-off-by: Jason Volk <jason@zemos.net>
2025-01-29 03:09:13 +00:00
Jason Volk af399fd517 flatten state accessor iterations
Signed-off-by: Jason Volk <jason@zemos.net>
2025-01-29 01:18:08 +00:00
Jason Volk ad0b0af955 combine state_accessor data into mod
Signed-off-by: Jason Volk <jason@zemos.net>
2025-01-29 01:18:08 +00:00
Jason Volk 2c5af902a3 support executing configurable admin commands via SIGUSR2
Signed-off-by: Jason Volk <jason@zemos.net>
2025-01-29 01:18:08 +00:00
Jason Volk 2f449ba47d support reloading config via SIGUSR1
Signed-off-by: Jason Volk <jason@zemos.net>
2025-01-29 01:18:08 +00:00
Jason Volk a567e314e9 simplify shutdown signal handlers
Signed-off-by: Jason Volk <jason@zemos.net>
2025-01-29 01:18:08 +00:00
Jason Volk ed3cd99781 abstract the config reload checks
Signed-off-by: Jason Volk <jason@zemos.net>
2025-01-29 01:18:08 +00:00
Jason Volk 99fe88c21e use smallvec for the edu sending event buffer
Signed-off-by: Jason Volk <jason@zemos.net>
2025-01-29 01:18:08 +00:00
Jason Volk ffd0fd4242 pipeline pdu fetch for federation sending destination
Signed-off-by: Jason Volk <jason@zemos.net>
2025-01-29 01:18:08 +00:00
Jason Volk b2a565b0b4 propagate better error from server.check_running() 2025-01-29 01:18:08 +00:00
Jason Volk c516a8df3e fanout edu processing
Signed-off-by: Jason Volk <jason@zemos.net>
2025-01-29 01:18:08 +00:00
Jason Volk 94d786ac12 process rooms and edus concurrently
Signed-off-by: Jason Volk <jason@zemos.net>
2025-01-29 01:18:08 +00:00
Jason Volk 677316631a pipeline prologue of handle_incoming_pdu
simplify room_version/first_pdu_in_room argument passing

Signed-off-by: Jason Volk <jason@zemos.net>
2025-01-29 01:18:08 +00:00
Jason Volk 2b730a30ad add broad_flat_map
Signed-off-by: Jason Volk <jason@zemos.net>
2025-01-29 01:18:08 +00:00
Jason Volk 98f9570547 add option to disable rocksdb checksums
reference runtime state for default option initialization

Signed-off-by: Jason Volk <jason@zemos.net>
2025-01-28 18:38:47 +00:00
Jason Volk 13335042b7 enable the paranoid-checks options in debug mode
Signed-off-by: Jason Volk <jason@zemos.net>
2025-01-28 18:36:00 +00:00
Jason Volk 6db8df5e23 skip redundant acl check when sender is origin
Signed-off-by: Jason Volk <jason@zemos.net>
2025-01-28 18:36:00 +00:00
Jason Volk d0b4a619af furnish batch interface with trait
Signed-off-by: Jason Volk <jason@zemos.net>
2025-01-28 18:36:00 +00:00
Jason Volk 4a2d0d35bc split federation request from sending service
Signed-off-by: Jason Volk <jason@zemos.net>
2025-01-28 18:36:00 +00:00
Jason Volk 3e0ff2dc84 simplify references to server_name
Signed-off-by: Jason Volk <jason@zemos.net>
2025-01-28 18:36:00 +00:00
141 changed files with 4304 additions and 2687 deletions
+87
View File
@@ -0,0 +1,87 @@
# taken from https://github.com/gitattributes/gitattributes/blob/46a8961ad73f5bd4d8d193708840fbc9e851d702/Rust.gitattributes
# Auto detect text files and perform normalization
* text=auto
*.rs text diff=rust
*.toml text diff=toml
Cargo.lock text
# taken from https://github.com/gitattributes/gitattributes/blob/46a8961ad73f5bd4d8d193708840fbc9e851d702/Common.gitattributes
# Documents
*.bibtex text diff=bibtex
*.doc diff=astextplain
*.DOC diff=astextplain
*.docx diff=astextplain
*.DOCX diff=astextplain
*.dot diff=astextplain
*.DOT diff=astextplain
*.pdf diff=astextplain
*.PDF diff=astextplain
*.rtf diff=astextplain
*.RTF diff=astextplain
*.md text diff=markdown
*.mdx text diff=markdown
*.tex text diff=tex
*.adoc text
*.textile text
*.mustache text
*.csv text eol=crlf
*.tab text
*.tsv text
*.txt text
*.sql text
*.epub diff=astextplain
# Graphics
*.png binary
*.jpg binary
*.jpeg binary
*.gif binary
*.tif binary
*.tiff binary
*.ico binary
# SVG treated as text by default.
*.svg text
*.eps binary
# Scripts
*.bash text eol=lf
*.fish text eol=lf
*.ksh text eol=lf
*.sh text eol=lf
*.zsh text eol=lf
# These are explicitly windows files and should use crlf
*.bat text eol=crlf
*.cmd text eol=crlf
*.ps1 text eol=crlf
# Serialisation
*.json text
*.toml text
*.xml text
*.yaml text
*.yml text
# Archives
*.7z binary
*.bz binary
*.bz2 binary
*.bzip2 binary
*.gz binary
*.lz binary
*.lzma binary
*.rar binary
*.tar binary
*.taz binary
*.tbz binary
*.tbz2 binary
*.tgz binary
*.tlz binary
*.txz binary
*.xz binary
*.Z binary
*.zip binary
*.zst binary
# Text files where line endings should be preserved
*.patch -text
+7 -7
View File
@@ -128,7 +128,7 @@ jobs:
- name: Restore and cache Nix store
# we want a fresh-state when we do releases/tags to avoid potential cache poisoning attacks impacting
# releases and tags
if: ${{ !startsWith(github.ref, 'refs/tags/') }}
#if: ${{ !startsWith(github.ref, 'refs/tags/') }}
uses: nix-community/cache-nix-action@v5.1.0
with:
# restore and save a cache using this key
@@ -191,14 +191,14 @@ jobs:
- name: Run sccache-cache
# we want a fresh-state when we do releases/tags to avoid potential cache poisoning attacks impacting
# releases and tags
if: ${{ (env.SCCACHE_GHA_ENABLED == 'true') && !startsWith(github.ref, 'refs/tags/') }}
#if: ${{ (env.SCCACHE_GHA_ENABLED == 'true') && !startsWith(github.ref, 'refs/tags/') }}
uses: mozilla-actions/sccache-action@main
# use rust-cache
- uses: Swatinem/rust-cache@v2
# we want a fresh-state when we do releases/tags to avoid potential cache poisoning attacks impacting
# releases and tags
if: ${{ !startsWith(github.ref, 'refs/tags/') }}
#if: ${{ !startsWith(github.ref, 'refs/tags/') }}
with:
cache-all-crates: "true"
cache-on-failure: "true"
@@ -323,7 +323,7 @@ jobs:
- name: Restore and cache Nix store
# we want a fresh-state when we do releases/tags to avoid potential cache poisoning attacks impacting
# releases and tags
if: ${{ !startsWith(github.ref, 'refs/tags/') }}
#if: ${{ !startsWith(github.ref, 'refs/tags/') }}
uses: nix-community/cache-nix-action@v5.1.0
with:
# restore and save a cache using this key
@@ -379,14 +379,14 @@ jobs:
- name: Run sccache-cache
# we want a fresh-state when we do releases/tags to avoid potential cache poisoning attacks impacting
# releases and tags
if: ${{ (env.SCCACHE_GHA_ENABLED == 'true') && !startsWith(github.ref, 'refs/tags/') }}
#if: ${{ (env.SCCACHE_GHA_ENABLED == 'true') && !startsWith(github.ref, 'refs/tags/') }}
uses: mozilla-actions/sccache-action@main
# use rust-cache
- uses: Swatinem/rust-cache@v2
# we want a fresh-state when we do releases/tags to avoid potential cache poisoning attacks impacting
# releases and tags
if: ${{ !startsWith(github.ref, 'refs/tags/') }}
#if: ${{ !startsWith(github.ref, 'refs/tags/') }}
with:
cache-all-crates: "true"
cache-on-failure: "true"
@@ -679,7 +679,7 @@ jobs:
- name: Run sccache-cache
# we want a fresh-state when we do releases/tags to avoid potential cache poisoning attacks impacting
# releases and tags
if: ${{ (env.SCCACHE_GHA_ENABLED == 'true') && !startsWith(github.ref, 'refs/tags/') }}
#if: ${{ (env.SCCACHE_GHA_ENABLED == 'true') && !startsWith(github.ref, 'refs/tags/') }}
uses: mozilla-actions/sccache-action@main
# use rust-cache
Generated
+386 -19
View File
@@ -26,6 +26,12 @@ dependencies = [
"memchr",
]
[[package]]
name = "aligned-vec"
version = "0.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4aa90d7ce82d4be67b64039a3d588d38dbcc6736577de4a847025ce5b0c468d1"
[[package]]
name = "alloc-no-stdlib"
version = "2.0.4"
@@ -53,12 +59,29 @@ version = "1.0.95"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "34ac096ce696dc2fcabef30516bb13c0a68a11d30131d3df6f04711467681b04"
[[package]]
name = "arbitrary"
version = "1.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dde20b3d026af13f561bdd0f15edf01fc734f0dafcedbaf42bba506a9517f223"
[[package]]
name = "arc-swap"
version = "1.7.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "69f7f8c3906b62b754cd5326047894316021dcfe5a194c8ea52bdd94934a3457"
[[package]]
name = "arg_enum_proc_macro"
version = "0.3.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0ae92a5119aa49cdbcf6b9f893fe4e1d98b04ccbf82ee0584ad948a44a734dea"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.96",
]
[[package]]
name = "argon2"
version = "0.5.3"
@@ -173,6 +196,29 @@ version = "1.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ace50bade8e6234aa140d9a2f552bbee1db4d353f69b8217bc503490fc1a9f26"
[[package]]
name = "av1-grain"
version = "0.2.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6678909d8c5d46a42abcf571271e15fdbc0a225e3646cf23762cd415046c78bf"
dependencies = [
"anyhow",
"arrayvec",
"log",
"nom",
"num-rational",
"v_frame",
]
[[package]]
name = "avif-serialize"
version = "0.8.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e335041290c43101ca215eed6f43ec437eb5a42125573f600fc3fa42b9bddd62"
dependencies = [
"arrayvec",
]
[[package]]
name = "aws-lc-rs"
version = "1.12.1"
@@ -385,6 +431,12 @@ dependencies = [
"which",
]
[[package]]
name = "bit_field"
version = "0.10.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dc827186963e592360843fb5ba4b973e145841266c1357f7180c43526f2e5b61"
[[package]]
name = "bitflags"
version = "1.3.2"
@@ -397,6 +449,12 @@ version = "2.8.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8f68f53c83ab957f72c32642f3868eec03eb974d1fb82e453128456482613d36"
[[package]]
name = "bitstream-io"
version = "2.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6099cdc01846bc367c4e7dd630dc5966dccf36b652fae7a74e17b640411a91b2"
[[package]]
name = "blake2"
version = "0.10.6"
@@ -415,6 +473,15 @@ dependencies = [
"generic-array",
]
[[package]]
name = "blurhash"
version = "0.2.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e79769241dcd44edf79a732545e8b5cec84c247ac060f5252cd51885d093a8fc"
dependencies = [
"image",
]
[[package]]
name = "brotli"
version = "7.0.0"
@@ -436,6 +503,12 @@ dependencies = [
"alloc-stdlib",
]
[[package]]
name = "built"
version = "0.7.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c360505aed52b7ec96a3636c3f039d99103c37d1d9b4f7a8c743d3ea9ffcd03b"
[[package]]
name = "bumpalo"
version = "3.16.0"
@@ -513,6 +586,16 @@ dependencies = [
"nom",
]
[[package]]
name = "cfg-expr"
version = "0.15.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d067ad48b8650848b989a59a86c6c36a995d02d2bf778d45c3c5d57bc2718f02"
dependencies = [
"smallvec",
"target-lexicon",
]
[[package]]
name = "cfg-if"
version = "1.0.0"
@@ -685,6 +768,7 @@ dependencies = [
"http-body-util",
"hyper",
"ipaddress",
"itertools 0.13.0",
"log",
"rand",
"reqwest",
@@ -821,6 +905,7 @@ dependencies = [
"arrayvec",
"async-trait",
"base64 0.22.1",
"blurhash",
"bytes",
"conduwuit_core",
"conduwuit_database",
@@ -844,6 +929,7 @@ dependencies = [
"serde_json",
"serde_yaml",
"sha2",
"smallvec",
"termimad",
"tokio",
"tracing",
@@ -1069,6 +1155,12 @@ dependencies = [
"winapi",
]
[[package]]
name = "crunchy"
version = "0.2.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "43da5946c66ffcc7745f48db692ffbb10a83bfe0afd96235c5c2a4fb23994929"
[[package]]
name = "crypto-common"
version = "0.1.6"
@@ -1250,7 +1342,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "33d852cb9b869c2a9b3df2f71a3074817f01e1844f839a144f5fcef059a4eb5d"
dependencies = [
"libc",
"windows-sys 0.52.0",
"windows-sys 0.59.0",
]
[[package]]
@@ -1273,6 +1365,21 @@ dependencies = [
"pin-project-lite",
]
[[package]]
name = "exr"
version = "1.73.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f83197f59927b46c04a183a619b7c29df34e63e63c7869320862268c0ef687e0"
dependencies = [
"bit_field",
"half",
"lebe",
"miniz_oxide",
"rayon-core",
"smallvec",
"zune-inflate",
]
[[package]]
name = "fdeflate"
version = "0.3.7"
@@ -1517,6 +1624,16 @@ dependencies = [
"tracing",
]
[[package]]
name = "half"
version = "2.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6dd08c532ae367adf81c312a4580bc67f1d0fe8bc9c460520283f4c0ff277888"
dependencies = [
"cfg-if",
"crunchy",
]
[[package]]
name = "hardened_malloc-rs"
version = "0.1.2+12"
@@ -1971,10 +2088,16 @@ dependencies = [
"bytemuck",
"byteorder-lite",
"color_quant",
"exr",
"gif",
"image-webp",
"num-traits",
"png",
"qoi",
"ravif",
"rayon",
"rgb",
"tiff",
"zune-core",
"zune-jpeg",
]
@@ -1989,6 +2112,12 @@ dependencies = [
"quick-error 2.0.1",
]
[[package]]
name = "imgref"
version = "1.11.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d0263a3d970d5c054ed9312c0057b4f3bde9c0b33836d3637361d4a9e6e7a408"
[[package]]
name = "indexmap"
version = "1.9.3"
@@ -2022,6 +2151,17 @@ version = "3.0.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8bb03732005da905c88227371639bf1ad885cc712789c011c31c5fb3ab3ccf02"
[[package]]
name = "interpolate_name"
version = "0.2.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c34819042dc3d3971c46c2190835914dfbe0c3c13f61449b2997f4e9722dfa60"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.96",
]
[[package]]
name = "ipaddress"
version = "0.1.3"
@@ -2087,6 +2227,12 @@ dependencies = [
"libc",
]
[[package]]
name = "jpeg-decoder"
version = "0.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f5d4a7da358eff58addd2877a45865158f0d78c911d43a5784ceb7bbf52833b0"
[[package]]
name = "js-sys"
version = "0.3.77"
@@ -2170,12 +2316,28 @@ version = "1.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "830d08ce1d1d941e6b30645f1a0eb5643013d835ce3779a5fc208261dbe10f55"
[[package]]
name = "lebe"
version = "0.5.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "03087c2bad5e1034e8cace5926dec053fb3790248370865f5117a7d0213354c8"
[[package]]
name = "libc"
version = "0.2.169"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b5aba8db14291edd000dfcc4d620c7ebfb122c613afb886ca8803fa4e128a20a"
[[package]]
name = "libfuzzer-sys"
version = "0.4.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cf78f52d400cf2d84a3a973a78a592b4adc535739e0a5597a0da6f0c357adc75"
dependencies = [
"arbitrary",
"cc",
]
[[package]]
name = "libloading"
version = "0.8.6"
@@ -2183,7 +2345,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fc2f4eb4bc735547cfed7c0a4922cbd04a4655978c09b54f1f7b228750664c34"
dependencies = [
"cfg-if",
"windows-targets 0.48.5",
"windows-targets 0.52.6",
]
[[package]]
@@ -2241,6 +2403,15 @@ dependencies = [
"futures-sink",
]
[[package]]
name = "loop9"
version = "0.1.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0fae87c125b03c1d2c0150c90365d7d6bcc53fb73a9acaef207d2d065860f062"
dependencies = [
"imgref",
]
[[package]]
name = "lru-cache"
version = "0.1.2"
@@ -2319,6 +2490,16 @@ version = "0.7.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0e7465ac9959cc2b1404e8e2367b43684a6d13790fe23056cc8c6c5a6b7bcb94"
[[package]]
name = "maybe-rayon"
version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8ea1f30cedd69f0a2954655f7188c6a834246d2bcf1e315e2ac40c4b24dc9519"
dependencies = [
"cfg-if",
"rayon",
]
[[package]]
name = "memchr"
version = "2.7.4"
@@ -2432,6 +2613,12 @@ version = "0.7.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e9e591e719385e6ebaeb5ce5d3887f7d5676fceca6411d1925ccc95745f3d6f7"
[[package]]
name = "noop_proc_macro"
version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0676bb32a98c1a483ce53e500a81ad9c3d5b3f7c920c28c24e9cb0980d0b5bc8"
[[package]]
name = "nu-ansi-term"
version = "0.46.0"
@@ -2481,6 +2668,17 @@ version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "51d515d32fb182ee37cda2ccdcb92950d6a3c2893aa280e540671c2cd0f3b1d9"
[[package]]
name = "num-derive"
version = "0.4.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ed3955f1a9c7c0c15e092f9c887db08b1fc683305fdf6eb6684f22555355e202"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.96",
]
[[package]]
name = "num-integer"
version = "0.1.46"
@@ -2905,6 +3103,25 @@ dependencies = [
"yansi",
]
[[package]]
name = "profiling"
version = "1.0.16"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "afbdc74edc00b6f6a218ca6a5364d6226a259d4b8ea1af4a0ea063f27e179f4d"
dependencies = [
"profiling-procmacros",
]
[[package]]
name = "profiling-procmacros"
version = "1.0.16"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a65f2e60fbf1063868558d69c6beacf412dc755f9fc020f514b7955fc914fe30"
dependencies = [
"quote",
"syn 2.0.96",
]
[[package]]
name = "prost"
version = "0.13.4"
@@ -2955,6 +3172,15 @@ version = "0.11.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "007d8adb5ddab6f8e3f491ac63566a7d5002cc7ed73901f72057943fa71ae1ae"
[[package]]
name = "qoi"
version = "0.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7f6d64c71eb498fe9eae14ce4ec935c555749aef511cca85b5568910d6e48001"
dependencies = [
"bytemuck",
]
[[package]]
name = "quick-error"
version = "1.2.3"
@@ -3016,7 +3242,7 @@ dependencies = [
"once_cell",
"socket2",
"tracing",
"windows-sys 0.52.0",
"windows-sys 0.59.0",
]
[[package]]
@@ -3058,6 +3284,76 @@ dependencies = [
"getrandom",
]
[[package]]
name = "rav1e"
version = "0.7.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cd87ce80a7665b1cce111f8a16c1f3929f6547ce91ade6addf4ec86a8dda5ce9"
dependencies = [
"arbitrary",
"arg_enum_proc_macro",
"arrayvec",
"av1-grain",
"bitstream-io",
"built",
"cfg-if",
"interpolate_name",
"itertools 0.12.1",
"libc",
"libfuzzer-sys",
"log",
"maybe-rayon",
"new_debug_unreachable",
"noop_proc_macro",
"num-derive",
"num-traits",
"once_cell",
"paste",
"profiling",
"rand",
"rand_chacha",
"simd_helpers",
"system-deps",
"thiserror 1.0.69",
"v_frame",
"wasm-bindgen",
]
[[package]]
name = "ravif"
version = "0.11.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2413fd96bd0ea5cdeeb37eaf446a22e6ed7b981d792828721e74ded1980a45c6"
dependencies = [
"avif-serialize",
"imgref",
"loop9",
"quick-error 2.0.1",
"rav1e",
"rayon",
"rgb",
]
[[package]]
name = "rayon"
version = "1.10.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b418a60154510ca1a002a752ca9714984e21e4241e804d32555251faf8b78ffa"
dependencies = [
"either",
"rayon-core",
]
[[package]]
name = "rayon-core"
version = "1.12.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1465873a3dfdaa8ae7cb14b4383657caab0b3e8a0aa9ae8e04b044854c8dfce2"
dependencies = [
"crossbeam-deque",
"crossbeam-utils",
]
[[package]]
name = "redox_syscall"
version = "0.5.8"
@@ -3170,6 +3466,12 @@ dependencies = [
"quick-error 1.2.3",
]
[[package]]
name = "rgb"
version = "0.8.50"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "57397d16646700483b67d2dd6511d79318f9d057fdbd21a4066aeac8b41d310a"
[[package]]
name = "ring"
version = "0.17.8"
@@ -3188,7 +3490,7 @@ dependencies = [
[[package]]
name = "ruma"
version = "0.10.1"
source = "git+https://github.com/girlbossceo/ruwuma?rev=b560338b2a50dbf61ecfe80808b9b095ad4cec00#b560338b2a50dbf61ecfe80808b9b095ad4cec00"
source = "git+https://github.com/girlbossceo/ruwuma?rev=f5667c6292adb43fbe4725d31d6b5127a0cf60ce#f5667c6292adb43fbe4725d31d6b5127a0cf60ce"
dependencies = [
"assign",
"js_int",
@@ -3210,7 +3512,7 @@ dependencies = [
[[package]]
name = "ruma-appservice-api"
version = "0.10.0"
source = "git+https://github.com/girlbossceo/ruwuma?rev=b560338b2a50dbf61ecfe80808b9b095ad4cec00#b560338b2a50dbf61ecfe80808b9b095ad4cec00"
source = "git+https://github.com/girlbossceo/ruwuma?rev=f5667c6292adb43fbe4725d31d6b5127a0cf60ce#f5667c6292adb43fbe4725d31d6b5127a0cf60ce"
dependencies = [
"js_int",
"ruma-common",
@@ -3222,7 +3524,7 @@ dependencies = [
[[package]]
name = "ruma-client-api"
version = "0.18.0"
source = "git+https://github.com/girlbossceo/ruwuma?rev=b560338b2a50dbf61ecfe80808b9b095ad4cec00#b560338b2a50dbf61ecfe80808b9b095ad4cec00"
source = "git+https://github.com/girlbossceo/ruwuma?rev=f5667c6292adb43fbe4725d31d6b5127a0cf60ce#f5667c6292adb43fbe4725d31d6b5127a0cf60ce"
dependencies = [
"as_variant",
"assign",
@@ -3245,7 +3547,7 @@ dependencies = [
[[package]]
name = "ruma-common"
version = "0.13.0"
source = "git+https://github.com/girlbossceo/ruwuma?rev=b560338b2a50dbf61ecfe80808b9b095ad4cec00#b560338b2a50dbf61ecfe80808b9b095ad4cec00"
source = "git+https://github.com/girlbossceo/ruwuma?rev=f5667c6292adb43fbe4725d31d6b5127a0cf60ce#f5667c6292adb43fbe4725d31d6b5127a0cf60ce"
dependencies = [
"as_variant",
"base64 0.22.1",
@@ -3276,7 +3578,7 @@ dependencies = [
[[package]]
name = "ruma-events"
version = "0.28.1"
source = "git+https://github.com/girlbossceo/ruwuma?rev=b560338b2a50dbf61ecfe80808b9b095ad4cec00#b560338b2a50dbf61ecfe80808b9b095ad4cec00"
source = "git+https://github.com/girlbossceo/ruwuma?rev=f5667c6292adb43fbe4725d31d6b5127a0cf60ce#f5667c6292adb43fbe4725d31d6b5127a0cf60ce"
dependencies = [
"as_variant",
"indexmap 2.7.0",
@@ -3301,7 +3603,7 @@ dependencies = [
[[package]]
name = "ruma-federation-api"
version = "0.9.0"
source = "git+https://github.com/girlbossceo/ruwuma?rev=b560338b2a50dbf61ecfe80808b9b095ad4cec00#b560338b2a50dbf61ecfe80808b9b095ad4cec00"
source = "git+https://github.com/girlbossceo/ruwuma?rev=f5667c6292adb43fbe4725d31d6b5127a0cf60ce#f5667c6292adb43fbe4725d31d6b5127a0cf60ce"
dependencies = [
"bytes",
"http",
@@ -3319,7 +3621,7 @@ dependencies = [
[[package]]
name = "ruma-identifiers-validation"
version = "0.9.5"
source = "git+https://github.com/girlbossceo/ruwuma?rev=b560338b2a50dbf61ecfe80808b9b095ad4cec00#b560338b2a50dbf61ecfe80808b9b095ad4cec00"
source = "git+https://github.com/girlbossceo/ruwuma?rev=f5667c6292adb43fbe4725d31d6b5127a0cf60ce#f5667c6292adb43fbe4725d31d6b5127a0cf60ce"
dependencies = [
"js_int",
"thiserror 2.0.11",
@@ -3328,7 +3630,7 @@ dependencies = [
[[package]]
name = "ruma-identity-service-api"
version = "0.9.0"
source = "git+https://github.com/girlbossceo/ruwuma?rev=b560338b2a50dbf61ecfe80808b9b095ad4cec00#b560338b2a50dbf61ecfe80808b9b095ad4cec00"
source = "git+https://github.com/girlbossceo/ruwuma?rev=f5667c6292adb43fbe4725d31d6b5127a0cf60ce#f5667c6292adb43fbe4725d31d6b5127a0cf60ce"
dependencies = [
"js_int",
"ruma-common",
@@ -3338,7 +3640,7 @@ dependencies = [
[[package]]
name = "ruma-macros"
version = "0.13.0"
source = "git+https://github.com/girlbossceo/ruwuma?rev=b560338b2a50dbf61ecfe80808b9b095ad4cec00#b560338b2a50dbf61ecfe80808b9b095ad4cec00"
source = "git+https://github.com/girlbossceo/ruwuma?rev=f5667c6292adb43fbe4725d31d6b5127a0cf60ce#f5667c6292adb43fbe4725d31d6b5127a0cf60ce"
dependencies = [
"cfg-if",
"proc-macro-crate",
@@ -3353,7 +3655,7 @@ dependencies = [
[[package]]
name = "ruma-push-gateway-api"
version = "0.9.0"
source = "git+https://github.com/girlbossceo/ruwuma?rev=b560338b2a50dbf61ecfe80808b9b095ad4cec00#b560338b2a50dbf61ecfe80808b9b095ad4cec00"
source = "git+https://github.com/girlbossceo/ruwuma?rev=f5667c6292adb43fbe4725d31d6b5127a0cf60ce#f5667c6292adb43fbe4725d31d6b5127a0cf60ce"
dependencies = [
"js_int",
"ruma-common",
@@ -3365,7 +3667,7 @@ dependencies = [
[[package]]
name = "ruma-server-util"
version = "0.3.0"
source = "git+https://github.com/girlbossceo/ruwuma?rev=b560338b2a50dbf61ecfe80808b9b095ad4cec00#b560338b2a50dbf61ecfe80808b9b095ad4cec00"
source = "git+https://github.com/girlbossceo/ruwuma?rev=f5667c6292adb43fbe4725d31d6b5127a0cf60ce#f5667c6292adb43fbe4725d31d6b5127a0cf60ce"
dependencies = [
"headers",
"http",
@@ -3378,7 +3680,7 @@ dependencies = [
[[package]]
name = "ruma-signatures"
version = "0.15.0"
source = "git+https://github.com/girlbossceo/ruwuma?rev=b560338b2a50dbf61ecfe80808b9b095ad4cec00#b560338b2a50dbf61ecfe80808b9b095ad4cec00"
source = "git+https://github.com/girlbossceo/ruwuma?rev=f5667c6292adb43fbe4725d31d6b5127a0cf60ce#f5667c6292adb43fbe4725d31d6b5127a0cf60ce"
dependencies = [
"base64 0.22.1",
"ed25519-dalek",
@@ -3394,7 +3696,7 @@ dependencies = [
[[package]]
name = "ruma-state-res"
version = "0.11.0"
source = "git+https://github.com/girlbossceo/ruwuma?rev=b560338b2a50dbf61ecfe80808b9b095ad4cec00#b560338b2a50dbf61ecfe80808b9b095ad4cec00"
source = "git+https://github.com/girlbossceo/ruwuma?rev=f5667c6292adb43fbe4725d31d6b5127a0cf60ce#f5667c6292adb43fbe4725d31d6b5127a0cf60ce"
dependencies = [
"futures-util",
"js_int",
@@ -3409,7 +3711,7 @@ dependencies = [
[[package]]
name = "rust-librocksdb-sys"
version = "0.32.0+9.10.0"
source = "git+https://github.com/girlbossceo/rust-rocksdb-zaidoon1?rev=1f032427d3a0e7b0f13c04b4e34712bd8610291b#1f032427d3a0e7b0f13c04b4e34712bd8610291b"
source = "git+https://github.com/girlbossceo/rust-rocksdb-zaidoon1?rev=7b0e1bbe395a41ba8a11347a4921da590e3ad0d9#7b0e1bbe395a41ba8a11347a4921da590e3ad0d9"
dependencies = [
"bindgen",
"bzip2-sys",
@@ -3426,7 +3728,7 @@ dependencies = [
[[package]]
name = "rust-rocksdb"
version = "0.36.0"
source = "git+https://github.com/girlbossceo/rust-rocksdb-zaidoon1?rev=1f032427d3a0e7b0f13c04b4e34712bd8610291b#1f032427d3a0e7b0f13c04b4e34712bd8610291b"
source = "git+https://github.com/girlbossceo/rust-rocksdb-zaidoon1?rev=7b0e1bbe395a41ba8a11347a4921da590e3ad0d9#7b0e1bbe395a41ba8a11347a4921da590e3ad0d9"
dependencies = [
"libc",
"rust-librocksdb-sys",
@@ -3477,7 +3779,7 @@ dependencies = [
"errno",
"libc",
"linux-raw-sys",
"windows-sys 0.52.0",
"windows-sys 0.59.0",
]
[[package]]
@@ -3943,6 +4245,15 @@ version = "0.3.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d66dc143e6b11c1eddc06d5c423cfc97062865baf299914ab64caa38182078fe"
[[package]]
name = "simd_helpers"
version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "95890f873bec569a0362c235787f3aca6e1e887302ba4840839bcc6459c42da6"
dependencies = [
"quote",
]
[[package]]
name = "siphasher"
version = "0.3.11"
@@ -4094,6 +4405,25 @@ dependencies = [
"syn 2.0.96",
]
[[package]]
name = "system-deps"
version = "6.2.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a3e535eb8dded36d55ec13eddacd30dec501792ff23a0b1682c38601b8cf2349"
dependencies = [
"cfg-expr",
"heck",
"pkg-config",
"toml",
"version-compare",
]
[[package]]
name = "target-lexicon"
version = "0.12.16"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "61c41af27dd6d1e27b1b16b489db798443478cef1f06a660c96db617ba5de3b1"
[[package]]
name = "tendril"
version = "0.4.3"
@@ -4203,6 +4533,17 @@ dependencies = [
"threadpool",
]
[[package]]
name = "tiff"
version = "0.9.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ba1310fcea54c6a9a4fd1aad794ecc02c31682f6bfbecdf460bf19533eed1e3e"
dependencies = [
"flate2",
"jpeg-decoder",
"weezl",
]
[[package]]
name = "tikv-jemalloc-ctl"
version = "0.6.0"
@@ -4742,6 +5083,17 @@ dependencies = [
"serde",
]
[[package]]
name = "v_frame"
version = "0.3.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d6f32aaa24bacd11e488aa9ba66369c7cd514885742c9fe08cfe85884db3e92b"
dependencies = [
"aligned-vec",
"num-traits",
"wasm-bindgen",
]
[[package]]
name = "valuable"
version = "0.1.1"
@@ -4754,6 +5106,12 @@ version = "0.2.15"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "accd4ea62f7bb7a82fe23066fb0957d48ef677f6eeb8215f372f52e48bb32426"
[[package]]
name = "version-compare"
version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "852e951cb7832cb45cb1169900d19760cfa39b82bc0ea9c0e5a14ae88411c98b"
[[package]]
name = "version_check"
version = "0.9.5"
@@ -5322,6 +5680,15 @@ version = "0.4.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3f423a2c17029964870cfaabb1f13dfab7d092a62a29a89264f4d36990ca414a"
[[package]]
name = "zune-inflate"
version = "0.2.54"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "73ab332fe2f6680068f3582b16a24f90ad7096d5d39b974d1c0aff0125116f02"
dependencies = [
"simd-adler32",
]
[[package]]
name = "zune-jpeg"
version = "0.4.14"
+12 -3
View File
@@ -127,12 +127,13 @@ version = "0.6.2"
default-features = false
features = [
"add-extension",
"catch-panic",
"cors",
"sensitive-headers",
"set-header",
"timeout",
"trace",
"util",
"catch-panic",
]
[workspace.dependencies.rustls]
@@ -178,7 +179,7 @@ version = "0.5.3"
features = ["alloc", "rand"]
default-features = false
# Used to generate thumbnails for images
# Used to generate thumbnails for images & blurhashes
[workspace.dependencies.image]
version = "0.25.5"
default-features = false
@@ -189,6 +190,14 @@ features = [
"webp",
]
[workspace.dependencies.blurhash]
version = "0.2.3"
default-features = false
features = [
"fast-linear-to-srgb",
"image",
]
# logging
[workspace.dependencies.log]
version = "0.4.22"
@@ -333,7 +342,7 @@ version = "0.1.2"
[workspace.dependencies.ruma]
git = "https://github.com/girlbossceo/ruwuma"
#branch = "conduwuit-changes"
rev = "b560338b2a50dbf61ecfe80808b9b095ad4cec00"
rev = "f5667c6292adb43fbe4725d31d6b5127a0cf60ce"
features = [
"compat",
"rand",
+14 -1
View File
@@ -7,7 +7,20 @@ RequiresMountsFor=/var/lib/private/conduwuit
[Service]
DynamicUser=yes
Type=notify
Type=notify-reload
ReloadSignal=SIGUSR1
TTYPath=/dev/tty25
DeviceAllow=char-tty
StandardInput=tty-force
StandardOutput=tty
StandardError=journal+console
TTYReset=yes
# uncomment to allow buffer to be cleared every restart
TTYVTDisallocate=no
TTYColumns=120
TTYRows=40
AmbientCapabilities=
CapabilityBoundingSet=
+2 -1
View File
@@ -34,7 +34,8 @@ toplevel="$(git rev-parse --show-toplevel)"
pushd "$toplevel" > /dev/null
bin/nix-build-and-cache just .#linux-complement
#bin/nix-build-and-cache just .#linux-complement
bin/nix-build-and-cache just .#complement
docker load < result
popd > /dev/null
+60 -2
View File
@@ -377,6 +377,26 @@
#
#pusher_idle_timeout = 15
# Maximum time to receive a request from a client (seconds).
#
#client_receive_timeout = 75
# Maximum time to process a request received from a client (seconds).
#
#client_request_timeout = 180
# Maximum time to transmit a response to a client (seconds)
#
#client_response_timeout = 120
# Grace period for clean shutdown of client requests (seconds).
#
#client_shutdown_timeout = 10
# Grace period for clean shutdown of federation requests (seconds).
#
#sender_shutdown_timeout = 5
# Enables registration. If set to false, no users can register on this
# server.
#
@@ -406,8 +426,9 @@
#
#registration_token =
# Path to a file on the system that gets read for the registration token.
# this config option takes precedence/priority over "registration_token".
# Path to a file on the system that gets read for additional registration
# tokens. Multiple tokens can be added if you separate them with
# whitespace
#
# conduwuit must be able to access the file, and it must not be empty
#
@@ -897,6 +918,13 @@
#
#rocksdb_paranoid_file_checks = false
# Enables or disables checksum verification in rocksdb at runtime.
# Checksums are usually hardware accelerated with low overhead; they are
# enabled in rocksdb by default. Older or slower platforms may see gains
# from disabling.
#
#rocksdb_checksums = true
# Database repair mode (for RocksDB SST corruption).
#
# Use this option when the server reports corruption while running or
@@ -1355,6 +1383,13 @@
#
#admin_execute_errors_ignore = false
# List of admin commands to execute on SIGUSR2.
#
# Similar to admin_execute, but these commands are executed when the
# server receives SIGUSR2 on supporting platforms.
#
#admin_signal_execute = []
# Controls the max log level for admin command log captures (logs
# generated from running admin commands). Defaults to "info" on release
# builds, else "debug" on debug builds.
@@ -1517,6 +1552,11 @@
#
#listening = true
# Enables configuration reload when the server receives SIGUSR1 on
# supporting platforms.
#
#config_reload_signal = true
[global.tls]
# Path to a valid TLS certificate file.
@@ -1567,3 +1607,21 @@
# This item is undocumented. Please contribute documentation for it.
#
#support_mxid =
[global.blurhashing]
# blurhashing x component, 4 is recommended by https://blurha.sh/
#
#components_x = 4
# blurhashing y component, 3 is recommended by https://blurha.sh/
#
#components_y = 3
# Max raw size that the server will blurhash, this is the size of the
# image after converting it to raw data, it should be higher than the
# upload limit but not too high. The higher it is the higher the
# potential load will be for clients requesting blurhashes. The default
# is 33.55MB. Setting it to 0 disables blurhashing.
#
#blurhash_max_raw_size = 33554432
+14 -1
View File
@@ -8,7 +8,20 @@ Documentation=https://conduwuit.puppyirl.gay/
DynamicUser=yes
User=conduwuit
Group=conduwuit
Type=notify
Type=notify-reload
ReloadSignal=SIGUSR1
TTYPath=/dev/tty25
DeviceAllow=char-tty
StandardInput=tty-force
StandardOutput=tty
StandardError=journal+console
TTYReset=yes
# uncomment to allow buffer to be cleared every restart
TTYVTDisallocate=no
TTYColumns=120
TTYRows=40
Environment="CONDUWUIT_CONFIG=/etc/conduwuit/conduwuit.toml"
+1 -1
View File
@@ -27,7 +27,7 @@ malloc-usable-size = ["rust-rocksdb/malloc-usable-size"]
[dependencies.rust-rocksdb]
git = "https://github.com/girlbossceo/rust-rocksdb-zaidoon1"
rev = "1f032427d3a0e7b0f13c04b4e34712bd8610291b"
rev = "7b0e1bbe395a41ba8a11347a4921da590e3ad0d9"
#branch = "master"
default-features = false
+45 -18
View File
@@ -86,6 +86,7 @@ env DIRENV_DEVSHELL=all-features \
direnv exec . \
cargo doc \
--workspace \
--locked \
--profile test \
--all-features \
--no-deps \
@@ -100,8 +101,8 @@ script = """
direnv exec . \
cargo clippy \
--workspace \
--locked \
--profile test \
--all-targets \
--color=always \
-- \
-D warnings
@@ -115,8 +116,8 @@ env DIRENV_DEVSHELL=all-features \
direnv exec . \
cargo clippy \
--workspace \
--locked \
--profile test \
--all-targets \
--all-features \
--color=always \
-- \
@@ -124,33 +125,37 @@ env DIRENV_DEVSHELL=all-features \
"""
[[task]]
name = "clippy/jemalloc"
name = "clippy/no-features"
group = "lints"
script = """
env DIRENV_DEVSHELL=no-features \
direnv exec . \
cargo clippy \
--workspace \
--locked \
--profile test \
--no-default-features \
--color=always \
-- \
-D warnings
"""
[[task]]
name = "clippy/other-features"
group = "lints"
script = """
direnv exec . \
cargo clippy \
--workspace \
--locked \
--profile test \
--features jemalloc \
--all-targets \
--no-default-features \
--features=console,systemd,element_hacks,direct_tls,perf_measurements,brotli_compression,blurhashing \
--color=always \
-- \
-D warnings
"""
#[[task]]
#name = "clippy/hardened_malloc"
#group = "lints"
#script = """
#cargo clippy \
# --workspace \
# --features hardened_malloc \
# --all-targets \
# --color=always \
# -- \
# -D warnings
#"""
[[task]]
name = "lychee"
group = "lints"
@@ -169,8 +174,10 @@ env DIRENV_DEVSHELL=all-features \
direnv exec . \
cargo test \
--workspace \
--locked \
--profile test \
--all-targets \
--no-fail-fast \
--all-features \
--color=always \
-- \
@@ -185,8 +192,28 @@ env DIRENV_DEVSHELL=default \
direnv exec . \
cargo test \
--workspace \
--locked \
--profile test \
--all-targets \
--no-fail-fast \
--color=always \
-- \
--color=always
"""
[[task]]
name = "cargo/no-features"
group = "tests"
script = """
env DIRENV_DEVSHELL=no-features \
direnv exec . \
cargo test \
--workspace \
--locked \
--profile test \
--all-targets \
--no-fail-fast \
--no-default-features \
--color=always \
-- \
--color=always
+1 -13
View File
@@ -169,21 +169,9 @@
# used for rust caching in CI to speed it up
sccache
# needed so we can get rid of gcc and other unused deps that bloat OCI images
removeReferencesTo
]
# liburing is Linux-exclusive
++ lib.optional stdenv.hostPlatform.isLinux liburing
# needed to build Rust applications on macOS
++ lib.optionals stdenv.hostPlatform.isDarwin [
# https://github.com/NixOS/nixpkgs/issues/206242
# ld: library not found for -liconv
libiconv
# https://stackoverflow.com/questions/69869574/properly-adding-darwin-apple-sdk-to-a-nix-shell
# https://discourse.nixos.org/t/compile-a-rust-binary-on-macos-dbcrossbar/8612
pkgsBuildHost.darwin.apple_sdk.frameworks.Security
])
++ lib.optional stdenv.hostPlatform.isLinux liburing)
++ scope.main.buildInputs
++ scope.main.propagatedBuildInputs
++ scope.main.nativeBuildInputs;
+17 -4
View File
@@ -17,19 +17,32 @@ ip_range_denylist = []
url_preview_domain_contains_allowlist = ["*"]
url_preview_domain_explicit_denylist = ["*"]
media_compat_file_link = false
media_startup_check = false
prune_missing_media = false
media_startup_check = true
prune_missing_media = true
log_colors = false
admin_room_notices = false
allow_check_for_updates = false
allow_unstable_room_versions = true
intentionally_unknown_config_option_for_testing = true
rocksdb_log_level = "debug"
rocksdb_max_log_files = 1
rocksdb_recovery_mode = 0
rocksdb_paranoid_file_checks = true
log_guest_registrations = false
allow_legacy_media = true
startup_netburst = false
startup_netburst = true
startup_netburst_keep = -1
# valgrind makes things so slow
dns_timeout = 60
dns_attempts = 20
request_conn_timeout = 60
request_timeout = 120
well_known_conn_timeout = 60
well_known_timeout = 60
federation_idle_timeout = 300
sender_timeout = 300
sender_idle_timeout = 300
sender_retry_backoff_limit = 300
[global.tls]
certs = "/certificate.crt"
-6
View File
@@ -18,18 +18,12 @@ let
all_features = true;
disable_release_max_log_level = true;
disable_features = [
# no reason to use jemalloc for complement, just has compatibility/build issues
"jemalloc"
"jemalloc_stats"
"jemalloc_prof"
# console/CLI stuff isn't used or relevant for complement
"console"
"tokio_console"
# sentry telemetry isn't useful for complement, disabled by default anyways
"sentry_telemetry"
"perf_measurements"
# the containers don't use or need systemd signal support
"systemd"
# this is non-functional on nix for some reason
"hardened_malloc"
# dont include experimental features
+30 -28
View File
@@ -82,7 +82,7 @@ rust-jemalloc-sys' = (rust-jemalloc-sys.override {
buildDepsOnlyEnv =
let
rocksdb' = (rocksdb.override {
jemalloc = rust-jemalloc-sys';
jemalloc = lib.optional (featureEnabled "jemalloc") rust-jemalloc-sys';
# rocksdb fails to build with prefixed jemalloc, which is required on
# darwin due to [1]. In this case, fall back to building rocksdb with
# libc malloc. This should not cause conflicts, because all of the
@@ -103,6 +103,12 @@ buildDepsOnlyEnv =
++ [ "-DPORTABLE=haswell" ]) else ([ "-DPORTABLE=1" ])
)
++ old.cmakeFlags;
# outputs has "tools" which we dont need or use
outputs = [ "out" ];
# preInstall hooks has stuff for messing with ldb/sst_dump which we dont need or use
preInstall = "";
});
in
{
@@ -156,6 +162,19 @@ commonAttrs = {
];
};
# This is redundant with CI
doCheck = false;
cargoTestCommand = "cargo test --locked ";
cargoExtraArgs = "--no-default-features --locked "
+ lib.optionalString
(features'' != [])
"--features " + (builtins.concatStringsSep "," features'');
cargoTestExtraArgs = "--no-default-features --locked "
+ lib.optionalString
(features'' != [])
"--features " + (builtins.concatStringsSep "," features'');
dontStrip = profile == "dev" || profile == "test";
dontPatchELF = profile == "dev" || profile == "test";
@@ -181,27 +200,7 @@ commonAttrs = {
# differing values for `NIX_CFLAGS_COMPILE`, which contributes to spurious
# rebuilds of bindgen and its depedents.
jq
# needed so we can get rid of gcc and other unused deps that bloat OCI images
removeReferencesTo
]
# needed to build Rust applications on macOS
++ lib.optionals stdenv.hostPlatform.isDarwin [
# https://github.com/NixOS/nixpkgs/issues/206242
# ld: library not found for -liconv
libiconv
# https://stackoverflow.com/questions/69869574/properly-adding-darwin-apple-sdk-to-a-nix-shell
# https://discourse.nixos.org/t/compile-a-rust-binary-on-macos-dbcrossbar/8612
pkgsBuildHost.darwin.apple_sdk.frameworks.Security
];
# for some reason gcc and other weird deps are added to OCI images and bloats it up
#
# <https://github.com/input-output-hk/haskell.nix/issues/829>
postInstall = with pkgsBuildHost; ''
find "$out" -type f -exec remove-references-to -t ${stdenv.cc} -t ${gcc} -t ${llvm} -t ${rustc.unwrapped} -t ${rustc} '{}' +
'';
];
};
in
@@ -210,15 +209,18 @@ craneLib.buildPackage ( commonAttrs // {
env = buildDepsOnlyEnv;
});
cargoExtraArgs = "--no-default-features "
# This is redundant with CI
doCheck = false;
cargoTestCommand = "cargo test --locked ";
cargoExtraArgs = "--no-default-features --locked "
+ lib.optionalString
(features'' != [])
"--features " + (builtins.concatStringsSep "," features'');
cargoTestExtraArgs = "--no-default-features --locked "
+ lib.optionalString
(features'' != [])
"--features " + (builtins.concatStringsSep "," features'');
# This is redundant with CI
cargoTestCommand = "";
cargoCheckCommand = "";
doCheck = false;
env = buildPackageEnv;
+1
View File
@@ -36,6 +36,7 @@ dockerTools.buildLayeredImage {
"org.opencontainers.image.documentation" = "https://conduwuit.puppyirl.gay/";
"org.opencontainers.image.licenses" = "Apache-2.0";
"org.opencontainers.image.revision" = inputs.self.rev or inputs.self.dirtyRev or "";
"org.opencontainers.image.source" = "https://github.com/girlbossceo/conduwuit";
"org.opencontainers.image.title" = main.pname;
"org.opencontainers.image.url" = "https://conduwuit.puppyirl.gay/";
"org.opencontainers.image.vendor" = "girlbossceo";
+68 -28
View File
@@ -6,10 +6,14 @@ use std::{
};
use conduwuit::{
debug_error, err, info, trace, utils, utils::string::EMPTY, warn, Error, PduEvent, PduId,
RawPduId, Result,
debug_error, err, info, trace, utils,
utils::{
stream::{IterStream, ReadyExt},
string::EMPTY,
},
warn, Error, PduEvent, PduId, RawPduId, Result,
};
use futures::{FutureExt, StreamExt};
use futures::{FutureExt, StreamExt, TryStreamExt};
use ruma::{
api::{client::error::ErrorKind, federation::event::get_room_state},
events::room::message::RoomMessageEventContent,
@@ -54,7 +58,7 @@ pub(super) async fn get_auth_chain(
.rooms
.auth_chain
.event_ids_iter(room_id, once(event_id.as_ref()))
.await?
.ready_filter_map(Result::ok)
.count()
.await;
@@ -327,11 +331,10 @@ pub(super) async fn get_room_state(
.services
.rooms
.state_accessor
.room_state_full(&room_id)
.await?
.values()
.map(PduEvent::to_state_event)
.collect();
.room_state_full_pdus(&room_id)
.map_ok(PduEvent::into_state_event)
.try_collect()
.await?;
if room_state.is_empty() {
return Ok(RoomMessageEventContent::text_plain(
@@ -554,7 +557,7 @@ pub(super) async fn first_pdu_in_room(
.services
.rooms
.state_cache
.server_in_room(&self.services.server.config.server_name, &room_id)
.server_in_room(&self.services.server.name, &room_id)
.await
{
return Ok(RoomMessageEventContent::text_plain(
@@ -583,7 +586,7 @@ pub(super) async fn latest_pdu_in_room(
.services
.rooms
.state_cache
.server_in_room(&self.services.server.config.server_name, &room_id)
.server_in_room(&self.services.server.name, &room_id)
.await
{
return Ok(RoomMessageEventContent::text_plain(
@@ -613,7 +616,7 @@ pub(super) async fn force_set_room_state_from_server(
.services
.rooms
.state_cache
.server_in_room(&self.services.server.config.server_name, &room_id)
.server_in_room(&self.services.server.name, &room_id)
.await
{
return Ok(RoomMessageEventContent::text_plain(
@@ -640,6 +643,7 @@ pub(super) async fn force_set_room_state_from_server(
room_id: room_id.clone().into(),
event_id: first_pdu.event_id.clone(),
})
.boxed()
.await?;
for pdu in remote_state_response.pdus.clone() {
@@ -648,6 +652,7 @@ pub(super) async fn force_set_room_state_from_server(
.rooms
.event_handler
.parse_incoming_pdu(&pdu)
.boxed()
.await
{
| Ok(t) => t,
@@ -711,6 +716,7 @@ pub(super) async fn force_set_room_state_from_server(
.rooms
.event_handler
.resolve_state(&room_id, &room_version, state)
.boxed()
.await?;
info!("Forcing new room state");
@@ -756,8 +762,7 @@ pub(super) async fn get_signing_keys(
notary: Option<Box<ServerName>>,
query: bool,
) -> Result<RoomMessageEventContent> {
let server_name =
server_name.unwrap_or_else(|| self.services.server.config.server_name.clone().into());
let server_name = server_name.unwrap_or_else(|| self.services.server.name.clone().into());
if let Some(notary) = notary {
let signing_keys = self
@@ -793,8 +798,7 @@ pub(super) async fn get_verify_keys(
&self,
server_name: Option<Box<ServerName>>,
) -> Result<RoomMessageEventContent> {
let server_name =
server_name.unwrap_or_else(|| self.services.server.config.server_name.clone().into());
let server_name = server_name.unwrap_or_else(|| self.services.server.name.clone().into());
let keys = self
.services
@@ -824,7 +828,7 @@ pub(super) async fn resolve_true_destination(
));
}
if server_name == self.services.server.config.server_name {
if server_name == self.services.server.name {
return Ok(RoomMessageEventContent::text_plain(
"Not allowed to send federation requests to ourselves. Please use `get-pdu` for \
fetching local PDUs.",
@@ -948,21 +952,57 @@ pub(super) async fn database_stats(
property: Option<String>,
map: Option<String>,
) -> Result<RoomMessageEventContent> {
let property = property.unwrap_or_else(|| "rocksdb.stats".to_owned());
let map_name = map.as_ref().map_or(EMPTY, String::as_str);
let property = property.unwrap_or_else(|| "rocksdb.stats".to_owned());
self.services
.db
.iter()
.filter(|(&name, _)| map_name.is_empty() || map_name == name)
.try_stream()
.try_for_each(|(&name, map)| {
let res = map.property(&property).expect("invalid property");
writeln!(self, "##### {name}:\n```\n{}\n```", res.trim())
})
.await?;
let mut out = String::new();
for (&name, map) in self.services.db.iter() {
if !map_name.is_empty() && map_name != name {
continue;
}
Ok(RoomMessageEventContent::notice_plain(""))
}
let res = map.property(&property)?;
let res = res.trim();
writeln!(out, "##### {name}:\n```\n{res}\n```")?;
}
#[admin_command]
pub(super) async fn database_files(
&self,
map: Option<String>,
level: Option<i32>,
) -> Result<RoomMessageEventContent> {
let mut files: Vec<_> = self.services.db.db.file_list().collect::<Result<_>>()?;
Ok(RoomMessageEventContent::notice_markdown(out))
files.sort_by_key(|f| f.name.clone());
writeln!(self, "| lev | sst | keys | dels | size | column |").await?;
writeln!(self, "| ---: | :--- | ---: | ---: | ---: | :--- |").await?;
files
.into_iter()
.filter(|file| {
map.as_deref()
.is_none_or(|map| map == file.column_family_name)
})
.filter(|file| level.as_ref().is_none_or(|&level| level == file.level))
.try_stream()
.try_for_each(|file| {
writeln!(
self,
"| {} | {:<13} | {:7}+ | {:4}- | {:9} | {} |",
file.level,
file.name,
file.num_entries,
file.num_deletions,
file.size,
file.column_family_name,
)
})
.await?;
Ok(RoomMessageEventContent::notice_plain(""))
}
#[admin_command]
+8
View File
@@ -226,6 +226,14 @@ pub(super) enum DebugCommand {
/// - Trim memory usage
TrimMemory,
/// - List database files
DatabaseFiles {
map: Option<String>,
#[arg(long)]
level: Option<i32>,
},
/// - Developer test stubs
#[command(subcommand)]
#[allow(non_snake_case)]
+1 -1
View File
@@ -92,7 +92,7 @@ pub(super) async fn remote_user_in_rooms(
&self,
user_id: Box<UserId>,
) -> Result<RoomMessageEventContent> {
if user_id.server_name() == self.services.server.config.server_name {
if user_id.server_name() == self.services.server.name {
return Ok(RoomMessageEventContent::text_plain(
"User belongs to our server, please use `list-joined-rooms` user admin command \
instead.",
+1 -1
View File
@@ -41,7 +41,7 @@ async fn changes_since(
let results: Vec<_> = self
.services
.account_data
.changes_since(room_id.as_deref(), &user_id, since)
.changes_since(room_id.as_deref(), &user_id, since, None)
.collect()
.await;
let query_time = timer.elapsed();
+1 -1
View File
@@ -413,7 +413,7 @@ async fn get_to_device_events(
let result = self
.services
.users
.get_to_device_events(&user_id, &device_id)
.get_to_device_events(&user_id, &device_id, None, None)
.collect::<Vec<_>>()
.await;
let query_time = timer.elapsed();
+9 -6
View File
@@ -72,7 +72,7 @@ pub(super) async fn reprocess(
))),
};
match command {
| RoomAliasCommand::Set { force, room_id, .. } =>
| RoomAliasCommand::Set { force, room_id, .. } => {
match (force, services.rooms.alias.resolve_local_alias(&room_alias).await) {
| (true, Ok(id)) => {
match services.rooms.alias.set_alias(
@@ -106,8 +106,9 @@ pub(super) async fn reprocess(
))),
}
},
},
| RoomAliasCommand::Remove { .. } =>
}
},
| RoomAliasCommand::Remove { .. } => {
match services.rooms.alias.resolve_local_alias(&room_alias).await {
| Ok(id) => match services
.rooms
@@ -124,15 +125,17 @@ pub(super) async fn reprocess(
},
| Err(_) =>
Ok(RoomMessageEventContent::text_plain("Alias isn't in use.")),
},
| RoomAliasCommand::Which { .. } =>
}
},
| RoomAliasCommand::Which { .. } => {
match services.rooms.alias.resolve_local_alias(&room_alias).await {
| Ok(id) => Ok(RoomMessageEventContent::text_plain(format!(
"Alias resolves to {id}"
))),
| Err(_) =>
Ok(RoomMessageEventContent::text_plain("Alias isn't in use.")),
},
}
},
| RoomAliasCommand::List { .. } => unreachable!(),
}
},
+6 -18
View File
@@ -1,6 +1,6 @@
use std::{fmt::Write, path::PathBuf, sync::Arc};
use conduwuit::{info, utils::time, warn, Config, Err, Result};
use conduwuit::{info, utils::time, warn, Err, Result};
use ruma::events::room::message::RoomMessageEventContent;
use crate::admin_command;
@@ -33,12 +33,7 @@ pub(super) async fn reload_config(
path: Option<PathBuf>,
) -> Result<RoomMessageEventContent> {
let path = path.as_deref().into_iter();
let config = Config::load(path).and_then(|raw| Config::new(&raw))?;
if config.server_name != self.services.server.config.server_name {
return Err!("You can't change the server name.");
}
let _old = self.services.server.config.update(config)?;
self.services.config.reload(path)?;
Ok(RoomMessageEventContent::text_plain("Successfully reconfigured."))
}
@@ -97,7 +92,7 @@ pub(super) async fn clear_caches(&self) -> Result<RoomMessageEventContent> {
#[admin_command]
pub(super) async fn list_backups(&self) -> Result<RoomMessageEventContent> {
let result = self.services.globals.db.backup_list()?;
let result = self.services.db.db.backup_list()?;
if result.is_empty() {
Ok(RoomMessageEventContent::text_plain("No backups found."))
@@ -108,31 +103,24 @@ pub(super) async fn list_backups(&self) -> Result<RoomMessageEventContent> {
#[admin_command]
pub(super) async fn backup_database(&self) -> Result<RoomMessageEventContent> {
let globals = Arc::clone(&self.services.globals);
let db = Arc::clone(&self.services.db);
let mut result = self
.services
.server
.runtime()
.spawn_blocking(move || match globals.db.backup() {
.spawn_blocking(move || match db.db.backup() {
| Ok(()) => String::new(),
| Err(e) => e.to_string(),
})
.await?;
if result.is_empty() {
result = self.services.globals.db.backup_list()?;
result = self.services.db.db.backup_list()?;
}
Ok(RoomMessageEventContent::notice_markdown(result))
}
#[admin_command]
pub(super) async fn list_database_files(&self) -> Result<RoomMessageEventContent> {
let result = self.services.globals.db.file_list()?;
Ok(RoomMessageEventContent::notice_markdown(result))
}
#[admin_command]
pub(super) async fn admin_notice(&self, message: Vec<String>) -> Result<RoomMessageEventContent> {
let message = message.join(" ");
-3
View File
@@ -46,9 +46,6 @@ pub(super) enum ServerCommand {
/// - List database backups
ListBackups,
/// - List database files
ListDatabaseFiles,
/// - Send a message to the admin room.
AdminNotice {
message: Vec<String>,
+1
View File
@@ -50,6 +50,7 @@ http.workspace = true
http-body-util.workspace = true
hyper.workspace = true
ipaddress.workspace = true
itertools.workspace = true
log.workspace = true
rand.workspace = true
reqwest.workspace = true
+18 -10
View File
@@ -1,6 +1,6 @@
use axum::extract::State;
use conduwuit::{
at, deref_at, err, ref_at,
at, err, ref_at,
utils::{
future::TryExtExt,
stream::{BroadbandExt, ReadyExt, TryIgnore, WidebandExt},
@@ -10,10 +10,10 @@ use conduwuit::{
};
use futures::{
future::{join, join3, try_join3, OptionFuture},
FutureExt, StreamExt, TryFutureExt,
FutureExt, StreamExt, TryFutureExt, TryStreamExt,
};
use ruma::{api::client::context::get_context, events::StateEventType, OwnedEventId, UserId};
use service::rooms::{lazy_loading, lazy_loading::Options};
use service::rooms::{lazy_loading, lazy_loading::Options, short::ShortStateKey};
use crate::{
client::message::{event_filter, ignored_filter, lazy_loading_witness, visibility_filter},
@@ -132,21 +132,29 @@ pub(crate) async fn get_context_route(
.state_accessor
.pdu_shortstatehash(state_at)
.or_else(|_| services.rooms.state.get_room_shortstatehash(room_id))
.and_then(|shortstatehash| services.rooms.state_accessor.state_full_ids(shortstatehash))
.map_ok(|shortstatehash| {
services
.rooms
.state_accessor
.state_full_ids(shortstatehash)
.map(Ok)
})
.map_err(|e| err!(Database("State not found: {e}")))
.try_flatten_stream()
.try_collect()
.boxed();
let (lazy_loading_witnessed, state_ids) = join(lazy_loading_witnessed, state_ids).await;
let state_ids = state_ids?;
let state_ids: Vec<(ShortStateKey, OwnedEventId)> = state_ids?;
let shortstatekeys = state_ids.iter().map(at!(0)).stream();
let shorteventids = state_ids.iter().map(ref_at!(1)).stream();
let lazy_loading_witnessed = lazy_loading_witnessed.unwrap_or_default();
let shortstatekeys = state_ids.iter().stream().map(deref_at!(0));
let state: Vec<_> = services
.rooms
.short
.multi_get_statekey_from_short(shortstatekeys)
.zip(state_ids.iter().stream().map(at!(1)))
.zip(shorteventids)
.ready_filter_map(|item| Some((item.0.ok()?, item.1)))
.ready_filter_map(|((event_type, state_key), event_id)| {
if filter.lazy_load_options.is_enabled()
@@ -162,9 +170,9 @@ pub(crate) async fn get_context_route(
Some(event_id)
})
.broad_filter_map(|event_id: &OwnedEventId| {
services.rooms.timeline.get_pdu(event_id).ok()
services.rooms.timeline.get_pdu(event_id.as_ref()).ok()
})
.map(|pdu| pdu.to_state_event())
.map(PduEvent::into_state_event)
.collect()
.await;
+16 -7
View File
@@ -57,19 +57,28 @@ pub(crate) async fn create_content_route(
let filename = body.filename.as_deref();
let content_type = body.content_type.as_deref();
let content_disposition = make_content_disposition(None, content_type, filename);
let mxc = Mxc {
let ref mxc = Mxc {
server_name: services.globals.server_name(),
media_id: &utils::random_string(MXC_LENGTH),
};
services
.media
.create(&mxc, Some(user), Some(&content_disposition), content_type, &body.file)
.await
.map(|()| create_content::v3::Response {
content_uri: mxc.to_string().into(),
blurhash: None,
})
.create(mxc, Some(user), Some(&content_disposition), content_type, &body.file)
.await?;
let blurhash = body.generate_blurhash.then(|| {
services
.media
.create_blurhash(&body.file, content_type, filename)
.ok()
.flatten()
});
Ok(create_content::v3::Response {
content_uri: mxc.to_string().into(),
blurhash: blurhash.flatten(),
})
}
/// # `GET /_matrix/client/v1/media/thumbnail/{serverName}/{mediaId}`
+18 -16
View File
@@ -1,6 +1,7 @@
use std::{
borrow::Borrow,
collections::{BTreeMap, HashMap, HashSet},
iter::once,
net::IpAddr,
sync::Arc,
};
@@ -8,14 +9,14 @@ use std::{
use axum::extract::State;
use axum_client_ip::InsecureClientIp;
use conduwuit::{
debug, debug_info, debug_warn, err, info,
at, debug, debug_info, debug_warn, err, info,
pdu::{gen_event_id_canonical_json, PduBuilder},
result::FlatOk,
trace,
utils::{self, shuffle, IterStream, ReadyExt},
warn, Err, PduEvent, Result,
};
use futures::{join, FutureExt, StreamExt};
use futures::{join, FutureExt, StreamExt, TryFutureExt};
use ruma::{
api::{
client::{
@@ -45,7 +46,10 @@ use ruma::{
use service::{
appservice::RegistrationInfo,
pdu::gen_event_id,
rooms::{state::RoomMutexGuard, state_compressor::HashSetCompressStateEvent},
rooms::{
state::RoomMutexGuard,
state_compressor::{CompressedState, HashSetCompressStateEvent},
},
Services,
};
@@ -765,11 +769,12 @@ pub(crate) async fn get_member_events_route(
.rooms
.state_accessor
.room_state_full(&body.room_id)
.await?
.iter()
.filter(|(key, _)| key.0 == StateEventType::RoomMember)
.map(|(_, pdu)| pdu.to_member_event())
.collect(),
.ready_filter_map(Result::ok)
.ready_filter(|((ty, _), _)| *ty == StateEventType::RoomMember)
.map(at!(1))
.map(PduEvent::into_member_event)
.collect()
.await,
})
}
@@ -1167,7 +1172,7 @@ async fn join_room_by_id_helper_remote(
}
info!("Compressing state from send_join");
let compressed: HashSet<_> = services
let compressed: CompressedState = services
.rooms
.state_compressor
.compress_state_events(state.iter().map(|(ssk, eid)| (ssk, eid.borrow())))
@@ -1215,7 +1220,7 @@ async fn join_room_by_id_helper_remote(
.append_pdu(
&parsed_join_pdu,
join_event,
vec![(*parsed_join_pdu.event_id).to_owned()],
once(parsed_join_pdu.event_id.borrow()),
&state_lock,
)
.await?;
@@ -1707,9 +1712,6 @@ pub async fn leave_room(
room_id: &RoomId,
reason: Option<String>,
) -> Result<()> {
//use conduwuit::utils::stream::OptionStream;
use futures::TryFutureExt;
// Ask a remote server if we don't have this room and are not knocking on it
if !services
.rooms
@@ -2197,7 +2199,7 @@ async fn knock_room_helper_local(
.append_pdu(
&parsed_knock_pdu,
knock_event,
vec![(*parsed_knock_pdu.event_id).to_owned()],
once(parsed_knock_pdu.event_id.borrow()),
&state_lock,
)
.await?;
@@ -2341,7 +2343,7 @@ async fn knock_room_helper_remote(
}
info!("Compressing state from send_knock");
let compressed: HashSet<_> = services
let compressed: CompressedState = services
.rooms
.state_compressor
.compress_state_events(state_map.iter().map(|(ssk, eid)| (ssk, eid.borrow())))
@@ -2396,7 +2398,7 @@ async fn knock_room_helper_remote(
.append_pdu(
&parsed_knock_pdu,
knock_event,
vec![(*parsed_knock_pdu.event_id).to_owned()],
once(parsed_knock_pdu.event_id.borrow()),
&state_lock,
)
.await?;
+3 -3
View File
@@ -6,9 +6,9 @@ use conduwuit::{
stream::{BroadbandExt, TryIgnore, WidebandExt},
IterStream, ReadyExt,
},
Event, PduCount, Result,
Event, PduCount, PduEvent, Result,
};
use futures::{future::OptionFuture, pin_mut, FutureExt, StreamExt};
use futures::{future::OptionFuture, pin_mut, FutureExt, StreamExt, TryFutureExt};
use ruma::{
api::{
client::{filter::RoomEventFilter, message::get_message_events},
@@ -220,8 +220,8 @@ async fn get_member_event(
.rooms
.state_accessor
.room_state_get(room_id, &StateEventType::RoomMember, user_id.as_str())
.map_ok(PduEvent::into_state_event)
.await
.map(|member_event| member_event.to_state_event())
.ok()
}
+1 -1
View File
@@ -37,7 +37,7 @@ pub(crate) async fn create_openid_token_route(
Ok(account::request_openid_token::v3::Response {
access_token,
token_type: TokenType::Bearer,
matrix_server_name: services.server.config.server_name.clone(),
matrix_server_name: services.server.name.clone(),
expires_in: Duration::from_secs(expires_in),
})
}
+1 -1
View File
@@ -50,7 +50,7 @@ pub(crate) async fn report_room_route(
if !services
.rooms
.state_cache
.server_in_room(&services.server.config.server_name, &body.room_id)
.server_in_room(&services.server.name, &body.room_id)
.await
{
return Err!(Request(NotFound(
+1 -1
View File
@@ -71,7 +71,7 @@ pub(crate) async fn create_room_route(
let room_id: OwnedRoomId = if let Some(custom_room_id) = &body.room_id {
custom_room_id_check(&services, custom_room_id)?
} else {
RoomId::new(&services.server.config.server_name)
RoomId::new(&services.server.name)
};
// check if room ID doesn't already exist instead of erroring on auth check
+4 -5
View File
@@ -2,7 +2,7 @@ use axum::extract::State;
use conduwuit::{
at,
utils::{stream::TryTools, BoolExt},
Err, Result,
Err, PduEvent, Result,
};
use futures::TryStreamExt;
use ruma::api::client::room::initial_sync::v3::{PaginationChunk, Request, Response};
@@ -39,10 +39,9 @@ pub(crate) async fn room_initial_sync_route(
.rooms
.state_accessor
.room_state_full_pdus(room_id)
.await?
.into_iter()
.map(|pdu| pdu.to_state_event())
.collect();
.map_ok(PduEvent::into_state_event)
.try_collect()
.await?;
let messages = PaginationChunk {
start: events.last().map(at!(0)).as_ref().map(ToString::to_string),
+6 -6
View File
@@ -7,7 +7,7 @@ use conduwuit::{
utils::{stream::ReadyExt, IterStream},
Err, PduEvent, Result,
};
use futures::{future::OptionFuture, FutureExt, StreamExt, TryFutureExt};
use futures::{future::OptionFuture, FutureExt, StreamExt, TryFutureExt, TryStreamExt};
use ruma::{
api::client::search::search_events::{
self,
@@ -181,15 +181,15 @@ async fn category_room_events(
}
async fn procure_room_state(services: &Services, room_id: &RoomId) -> Result<RoomState> {
let state_map = services
let state = services
.rooms
.state_accessor
.room_state_full(room_id)
.room_state_full_pdus(room_id)
.map_ok(PduEvent::into_state_event)
.try_collect()
.await?;
let state_events = state_map.values().map(PduEvent::to_state_event).collect();
Ok(state_events)
Ok(state)
}
async fn check_room_visible(
+6 -6
View File
@@ -1,5 +1,6 @@
use axum::extract::State;
use conduwuit::{err, pdu::PduBuilder, utils::BoolExt, Err, PduEvent, Result};
use futures::TryStreamExt;
use ruma::{
api::client::state::{get_state_events, get_state_events_for_key, send_state_event},
events::{
@@ -82,11 +83,10 @@ pub(crate) async fn get_state_events_route(
room_state: services
.rooms
.state_accessor
.room_state_full(&body.room_id)
.await?
.values()
.map(PduEvent::to_state_event)
.collect(),
.room_state_full_pdus(&body.room_id)
.map_ok(PduEvent::into_state_event)
.try_collect()
.await?,
})
}
@@ -133,7 +133,7 @@ pub(crate) async fn get_state_events_for_key_route(
Ok(get_state_events_for_key::v3::Response {
content: event_format.or(|| event.get_content_as_value()),
event: event_format.then(|| event.to_state_event_value()),
event: event_format.then(|| event.into_state_event_value()),
})
}
+324 -299
View File
@@ -1,22 +1,22 @@
use std::{
cmp::{self},
collections::{hash_map::Entry, BTreeMap, HashMap, HashSet},
collections::{BTreeMap, HashMap, HashSet},
time::Duration,
};
use axum::extract::State;
use conduwuit::{
at, err, error, extract_variant, is_equal_to,
pdu::EventHash,
at, err, error, extract_variant, is_equal_to, pair_of,
pdu::{Event, EventHash},
ref_at,
result::FlatOk,
utils::{
self,
future::OptionExt,
math::ruma_from_u64,
stream::{BroadbandExt, Tools, WidebandExt},
stream::{BroadbandExt, Tools, TryExpect, WidebandExt},
BoolExt, IterStream, ReadyExt, TryFutureExtExt,
},
Error, PduCount, PduEvent, Result,
PduCount, PduEvent, Result,
};
use conduwuit_service::{
rooms::{
@@ -28,7 +28,7 @@ use conduwuit_service::{
};
use futures::{
future::{join, join3, join4, join5, try_join, try_join4, OptionFuture},
FutureExt, StreamExt, TryFutureExt,
FutureExt, StreamExt, TryFutureExt, TryStreamExt,
};
use ruma::{
api::client::{
@@ -45,7 +45,7 @@ use ruma::{
uiaa::UiaaResponse,
},
events::{
presence::PresenceEvent,
presence::{PresenceEvent, PresenceEventContent},
room::member::{MembershipState, RoomMemberEventContent},
AnyRawAccountDataEvent, AnySyncEphemeralRoomEvent, StateEventType,
TimelineEventType::*,
@@ -53,6 +53,7 @@ use ruma::{
serde::Raw,
uint, DeviceId, EventId, OwnedEventId, OwnedRoomId, OwnedUserId, RoomId, UserId,
};
use service::rooms::short::{ShortEventId, ShortStateKey};
use super::{load_timeline, share_encrypted_room};
use crate::{client::ignored_filter, Ruma, RumaResponse};
@@ -62,11 +63,12 @@ struct StateChanges {
heroes: Option<Vec<OwnedUserId>>,
joined_member_count: Option<u64>,
invited_member_count: Option<u64>,
joined_since_last_sync: bool,
state_events: Vec<PduEvent>,
device_list_updates: HashSet<OwnedUserId>,
left_encrypted_users: HashSet<OwnedUserId>,
}
type PresenceUpdates = HashMap<OwnedUserId, PresenceEvent>;
type PresenceUpdates = HashMap<OwnedUserId, PresenceEventContent>;
/// # `GET /_matrix/client/r0/sync`
///
@@ -285,20 +287,20 @@ pub(crate) async fn build_sync_events(
let account_data = services
.account_data
.changes_since(None, sender_user, since)
.changes_since(None, sender_user, since, Some(next_batch))
.ready_filter_map(|e| extract_variant!(e, AnyRawAccountDataEvent::Global))
.collect();
// Look for device list updates of this account
let keys_changed = services
.users
.keys_changed(sender_user, since, None)
.keys_changed(sender_user, since, Some(next_batch))
.map(ToOwned::to_owned)
.collect::<HashSet<_>>();
let to_device_events = services
.users
.get_to_device_events(sender_user, sender_device)
.get_to_device_events(sender_user, sender_device, Some(since), Some(next_batch))
.collect::<Vec<_>>();
let device_one_time_keys_count = services
@@ -325,18 +327,16 @@ pub(crate) async fn build_sync_events(
// If the user doesn't share an encrypted room with the target anymore, we need
// to tell them
let device_list_left = left_encrypted_users
let device_list_left: HashSet<_> = left_encrypted_users
.into_iter()
.stream()
.broad_filter_map(|user_id| async move {
let no_shared_encrypted_room =
!share_encrypted_room(services, sender_user, &user_id, None).await;
no_shared_encrypted_room.then_some(user_id)
})
.ready_fold(HashSet::new(), |mut device_list_left, user_id| {
device_list_left.insert(user_id);
device_list_left
share_encrypted_room(services, sender_user, &user_id, None)
.await
.eq(&false)
.then_some(user_id)
})
.collect()
.await;
let response = sync_events::v3::Response {
@@ -351,9 +351,11 @@ pub(crate) async fn build_sync_events(
next_batch: next_batch.to_string(),
presence: Presence {
events: presence_updates
.unwrap_or_default()
.into_values()
.map(|v| Raw::new(&v).expect("PresenceEvent always serializes successfully"))
.into_iter()
.flat_map(IntoIterator::into_iter)
.map(|(sender, content)| PresenceEvent { content, sender })
.map(|ref event| Raw::new(event))
.filter_map(Result::ok)
.collect(),
},
rooms: Rooms {
@@ -390,45 +392,8 @@ async fn process_presence_updates(
.map_ok(move |event| (user_id, event))
.ok()
})
.ready_fold(PresenceUpdates::new(), |mut updates, (user_id, event)| {
match updates.entry(user_id.into()) {
| Entry::Vacant(slot) => {
let mut new_event = event;
new_event.content.last_active_ago = match new_event.content.currently_active {
| Some(true) => None,
| _ => new_event.content.last_active_ago,
};
slot.insert(new_event);
},
| Entry::Occupied(mut slot) => {
let curr_event = slot.get_mut();
let curr_content = &mut curr_event.content;
let new_content = event.content;
// Update existing presence event with more info
curr_content.presence = new_content.presence;
curr_content.status_msg = new_content
.status_msg
.or_else(|| curr_content.status_msg.take());
curr_content.displayname = new_content
.displayname
.or_else(|| curr_content.displayname.take());
curr_content.avatar_url = new_content
.avatar_url
.or_else(|| curr_content.avatar_url.take());
curr_content.currently_active = new_content
.currently_active
.or(curr_content.currently_active);
curr_content.last_active_ago = match curr_content.currently_active {
| Some(true) => None,
| _ => new_content.last_active_ago.or(curr_content.last_active_ago),
};
},
};
updates
})
.map(|(user_id, event)| (user_id.to_owned(), event.content))
.collect()
.await
}
@@ -503,16 +468,20 @@ async fn handle_left_room(
let mut left_state_events = Vec::new();
let since_shortstatehash = services
.rooms
.user
.get_token_shortstatehash(room_id, since)
.await;
let since_shortstatehash = services.rooms.user.get_token_shortstatehash(room_id, since);
let since_state_ids = match since_shortstatehash {
| Ok(s) => services.rooms.state_accessor.state_full_ids(s).await?,
| Err(_) => HashMap::new(),
};
let since_state_ids: HashMap<_, OwnedEventId> = since_shortstatehash
.map_ok(|since_shortstatehash| {
services
.rooms
.state_accessor
.state_full_ids(since_shortstatehash)
.map(Ok)
})
.try_flatten_stream()
.try_collect()
.await
.unwrap_or_default();
let Ok(left_event_id): Result<OwnedEventId> = services
.rooms
@@ -534,11 +503,12 @@ async fn handle_left_room(
return Ok(None);
};
let mut left_state_ids = services
let mut left_state_ids: HashMap<_, _> = services
.rooms
.state_accessor
.state_full_ids(left_shortstatehash)
.await?;
.collect()
.await;
let leave_shortstatekey = services
.rooms
@@ -652,6 +622,40 @@ async fn load_joined_room(
.await?;
let (timeline_pdus, limited) = timeline;
let initial = since_shortstatehash.is_none();
let lazy_loading_enabled = filter.room.state.lazy_load_options.is_enabled()
|| filter.room.timeline.lazy_load_options.is_enabled();
let lazy_loading_context = &lazy_loading::Context {
user_id: sender_user,
device_id: sender_device,
room_id,
token: Some(since),
options: Some(&filter.room.state.lazy_load_options),
};
// Reset lazy loading because this is an initial sync
let lazy_load_reset: OptionFuture<_> = initial
.then(|| services.rooms.lazy_loading.reset(lazy_loading_context))
.into();
lazy_load_reset.await;
let witness: OptionFuture<_> = lazy_loading_enabled
.then(|| {
let witness: Witness = timeline_pdus
.iter()
.map(ref_at!(1))
.map(Event::sender)
.map(Into::into)
.chain(receipt_events.keys().map(Into::into))
.collect();
services
.rooms
.lazy_loading
.witness_retain(witness, lazy_loading_context)
})
.into();
let last_notification_read: OptionFuture<_> = timeline_pdus
.is_empty()
@@ -663,10 +667,6 @@ async fn load_joined_room(
})
.into();
let no_state_changes = timeline_pdus.is_empty()
&& (since_shortstatehash.is_none()
|| since_shortstatehash.is_some_and(is_equal_to!(current_shortstatehash)));
let since_sender_member: OptionFuture<_> = since_shortstatehash
.map(|short| {
services
@@ -677,125 +677,85 @@ async fn load_joined_room(
})
.into();
let (last_notification_read, since_sender_member, witness) =
join3(last_notification_read, since_sender_member, witness).await;
let joined_since_last_sync =
since_sender_member
.await
.flatten()
.is_none_or(|content: RoomMemberEventContent| {
content.membership != MembershipState::Join
});
let lazy_loading_enabled = filter.room.state.lazy_load_options.is_enabled()
|| filter.room.timeline.lazy_load_options.is_enabled();
let generate_witness =
lazy_loading_enabled && (since_shortstatehash.is_none() || joined_since_last_sync);
let lazy_reset = lazy_loading_enabled && since_shortstatehash.is_none();
let lazy_loading_context = &lazy_loading::Context {
user_id: sender_user,
device_id: sender_device,
room_id,
token: None,
options: Some(&filter.room.state.lazy_load_options),
};
// Reset lazy loading because this is an initial sync
let lazy_load_reset: OptionFuture<_> = lazy_reset
.then(|| services.rooms.lazy_loading.reset(lazy_loading_context))
.into();
lazy_load_reset.await;
let witness: Option<Witness> = generate_witness.then(|| {
timeline_pdus
.iter()
.map(|(_, pdu)| pdu.sender.clone())
.chain(receipt_events.keys().cloned())
.collect()
});
let witness: OptionFuture<_> = witness
.map(|witness| {
services
.rooms
.lazy_loading
.witness_retain(witness, lazy_loading_context)
})
.into();
let witness = witness.await;
let mut device_list_updates = HashSet::<OwnedUserId>::new();
let mut left_encrypted_users = HashSet::<OwnedUserId>::new();
let StateChanges {
heroes,
joined_member_count,
invited_member_count,
mut state_events,
mut device_list_updates,
left_encrypted_users,
} = calculate_state_changes(
services,
sender_user,
room_id,
full_state,
filter,
since_shortstatehash,
current_shortstatehash,
joined_since_last_sync,
state_events,
} = if no_state_changes {
StateChanges::default()
} else {
calculate_state_changes(
services,
sender_user,
room_id,
full_state,
filter,
&mut device_list_updates,
&mut left_encrypted_users,
since_shortstatehash,
current_shortstatehash,
joined_since_last_sync,
witness.as_ref(),
)
.boxed()
.await?
witness.as_ref(),
)
.boxed()
.await?;
let is_sender_membership = |pdu: &PduEvent| {
pdu.kind == StateEventType::RoomMember.into()
&& pdu
.state_key
.as_deref()
.is_some_and(is_equal_to!(sender_user.as_str()))
};
let joined_sender_member: Option<_> = (joined_since_last_sync && timeline_pdus.is_empty())
.then(|| {
state_events
.iter()
.position(is_sender_membership)
.map(|pos| state_events.swap_remove(pos))
})
.flatten();
let prev_batch = timeline_pdus.first().map(at!(0)).or_else(|| {
joined_sender_member
.is_some()
.then_some(since)
.map(Into::into)
});
let room_events = timeline_pdus
.into_iter()
.stream()
.wide_filter_map(|item| ignored_filter(services, item, sender_user))
.map(at!(1))
.chain(joined_sender_member.into_iter().stream())
.map(|pdu| pdu.to_sync_room_event())
.collect::<Vec<_>>();
let account_data_events = services
.account_data
.changes_since(Some(room_id), sender_user, since)
.changes_since(Some(room_id), sender_user, since, Some(next_batch))
.ready_filter_map(|e| extract_variant!(e, AnyRawAccountDataEvent::Room))
.collect();
// Look for device list updates in this room
let device_updates = services
.users
.room_keys_changed(room_id, since, None)
.room_keys_changed(room_id, since, Some(next_batch))
.map(|(user_id, _)| user_id)
.map(ToOwned::to_owned)
.collect::<Vec<_>>();
let room_events = timeline_pdus
.iter()
.stream()
.wide_filter_map(|item| ignored_filter(services, item.clone(), sender_user))
.map(|(_, pdu)| pdu.to_sync_room_event())
.collect();
let typing_events = services
.rooms
.typing
.last_typing_update(room_id)
.and_then(|count| async move {
if count <= since {
return Ok(Vec::<Raw<AnySyncEphemeralRoomEvent>>::new());
}
let typings = services
.rooms
.typing
.typings_all(room_id, sender_user)
.await?;
Ok(vec![serde_json::from_str(&serde_json::to_string(&typings)?)?])
})
.unwrap_or(Vec::new());
let send_notification_counts = last_notification_read
.is_none_or(|&count| count > since)
.await;
let send_notification_counts = last_notification_read.is_none_or(|count| count > since);
let notification_count: OptionFuture<_> = send_notification_counts
.then(|| {
@@ -819,8 +779,27 @@ async fn load_joined_room(
})
.into();
let events = join3(room_events, account_data_events, typing_events);
let typing_events = services
.rooms
.typing
.last_typing_update(room_id)
.and_then(|count| async move {
if count <= since {
return Ok(Vec::<Raw<AnySyncEphemeralRoomEvent>>::new());
}
let typings = services
.rooms
.typing
.typings_all(room_id, sender_user)
.await?;
Ok(vec![serde_json::from_str(&serde_json::to_string(&typings)?)?])
})
.unwrap_or(Vec::new());
let unread_notifications = join(notification_count, highlight_count);
let events = join3(room_events, account_data_events, typing_events);
let (unread_notifications, events, device_updates) =
join3(unread_notifications, events, device_updates)
.boxed()
@@ -877,12 +856,8 @@ async fn load_joined_room(
unread_notifications: UnreadNotificationsCount { highlight_count, notification_count },
timeline: Timeline {
limited: limited || joined_since_last_sync,
prev_batch: prev_batch.as_ref().map(ToString::to_string),
events: room_events,
prev_batch: timeline_pdus
.first()
.map(at!(0))
.as_ref()
.map(ToString::to_string),
},
state: RoomState {
events: state_events
@@ -914,14 +889,12 @@ async fn calculate_state_changes(
room_id: &RoomId,
full_state: bool,
filter: &FilterDefinition,
device_list_updates: &mut HashSet<OwnedUserId>,
left_encrypted_users: &mut HashSet<OwnedUserId>,
since_shortstatehash: Option<ShortStateHash>,
current_shortstatehash: ShortStateHash,
joined_since_last_sync: bool,
witness: Option<&Witness>,
) -> Result<StateChanges> {
if since_shortstatehash.is_none() || joined_since_last_sync {
if since_shortstatehash.is_none() {
calculate_state_initial(
services,
sender_user,
@@ -939,11 +912,10 @@ async fn calculate_state_changes(
room_id,
full_state,
filter,
device_list_updates,
left_encrypted_users,
since_shortstatehash,
current_shortstatehash,
joined_since_last_sync,
witness,
)
.await
}
@@ -956,39 +928,32 @@ async fn calculate_state_initial(
sender_user: &UserId,
room_id: &RoomId,
full_state: bool,
filter: &FilterDefinition,
_filter: &FilterDefinition,
current_shortstatehash: ShortStateHash,
witness: Option<&Witness>,
) -> Result<StateChanges> {
let state_events = services
let (shortstatekeys, event_ids): (Vec<_>, Vec<_>) = services
.rooms
.state_accessor
.state_full_ids(current_shortstatehash)
.await?;
let shortstatekeys = state_events.keys().copied().stream();
.unzip()
.await;
let state_events = services
.rooms
.short
.multi_get_statekey_from_short(shortstatekeys)
.zip(state_events.values().cloned().stream())
.multi_get_statekey_from_short(shortstatekeys.into_iter().stream())
.zip(event_ids.into_iter().stream())
.ready_filter_map(|item| Some((item.0.ok()?, item.1)))
.ready_filter_map(|((event_type, state_key), event_id)| {
let lazy_load_enabled = filter.room.state.lazy_load_options.is_enabled()
|| filter.room.timeline.lazy_load_options.is_enabled();
if lazy_load_enabled
let lazy = !full_state
&& event_type == StateEventType::RoomMember
&& !full_state
&& state_key.as_str().try_into().is_ok_and(|user_id: &UserId| {
sender_user != user_id
&& witness.is_some_and(|witness| !witness.contains(user_id))
}) {
return None;
}
});
Some(event_id)
lazy.or_some(event_id)
})
.broad_filter_map(|event_id: OwnedEventId| async move {
services.rooms.timeline.get_pdu(&event_id).await.ok()
@@ -1007,128 +972,170 @@ async fn calculate_state_initial(
heroes,
joined_member_count,
invited_member_count,
joined_since_last_sync: true,
state_events,
..Default::default()
})
}
#[tracing::instrument(name = "incremental", level = "trace", skip_all)]
#[allow(clippy::too_many_arguments)]
async fn calculate_state_incremental(
async fn calculate_state_incremental<'a>(
services: &Services,
sender_user: &UserId,
sender_user: &'a UserId,
room_id: &RoomId,
full_state: bool,
_filter: &FilterDefinition,
device_list_updates: &mut HashSet<OwnedUserId>,
left_encrypted_users: &mut HashSet<OwnedUserId>,
since_shortstatehash: Option<ShortStateHash>,
current_shortstatehash: ShortStateHash,
joined_since_last_sync: bool,
witness: Option<&'a Witness>,
) -> Result<StateChanges> {
// Incremental /sync
let since_shortstatehash =
since_shortstatehash.expect("missing since_shortstatehash on incremental sync");
let since_shortstatehash = since_shortstatehash.unwrap_or(current_shortstatehash);
let mut delta_state_events = Vec::new();
if since_shortstatehash != current_shortstatehash {
let current_state_ids = services
.rooms
.state_accessor
.state_full_ids(current_shortstatehash);
let since_state_ids = services
.rooms
.state_accessor
.state_full_ids(since_shortstatehash);
let (current_state_ids, since_state_ids): (
HashMap<_, OwnedEventId>,
HashMap<_, OwnedEventId>,
) = try_join(current_state_ids, since_state_ids).await?;
current_state_ids
.iter()
.stream()
.ready_filter(|(key, id)| full_state || since_state_ids.get(key) != Some(id))
.wide_filter_map(|(_, id)| services.rooms.timeline.get_pdu(id).ok())
.ready_for_each(|pdu| delta_state_events.push(pdu))
.await;
}
let state_changed = since_shortstatehash != current_shortstatehash;
let encrypted_room = services
.rooms
.state_accessor
.state_get(current_shortstatehash, &StateEventType::RoomEncryption, "")
.is_ok();
.is_ok()
.await;
let since_encryption = services
.rooms
.state_accessor
.state_get(since_shortstatehash, &StateEventType::RoomEncryption, "")
.is_ok();
let (encrypted_room, since_encryption) = join(encrypted_room, since_encryption).await;
// Calculations:
let new_encrypted_room = encrypted_room && !since_encryption;
let send_member_count = delta_state_events
.iter()
.any(|event| event.kind == RoomMember);
if encrypted_room {
for state_event in &delta_state_events {
if state_event.kind != RoomMember {
continue;
}
if let Some(state_key) = &state_event.state_key {
let user_id = UserId::parse(state_key)
.map_err(|_| Error::bad_database("Invalid UserId in member PDU."))?;
if user_id == sender_user {
continue;
}
let content: RoomMemberEventContent = state_event.get_content()?;
match content.membership {
| MembershipState::Join => {
// A new user joined an encrypted room
if !share_encrypted_room(services, sender_user, user_id, Some(room_id))
.await
{
device_list_updates.insert(user_id.into());
}
},
| MembershipState::Leave => {
// Write down users that have left encrypted rooms we are in
left_encrypted_users.insert(user_id.into());
},
| _ => {},
}
}
}
}
if joined_since_last_sync && encrypted_room || new_encrypted_room {
let updates: Vec<OwnedUserId> = services
let state_get_shorteventid = |user_id: &'a UserId| {
services
.rooms
.state_cache
.room_members(room_id)
.ready_filter(|user_id| sender_user != *user_id)
.filter_map(|user_id| {
share_encrypted_room(services, sender_user, user_id, Some(room_id))
.map(|res| res.or_some(user_id.to_owned()))
})
.collect()
.await;
.state_accessor
.state_get_shortid(
current_shortstatehash,
&StateEventType::RoomMember,
user_id.as_str(),
)
.ok()
};
// If the user is in a new encrypted room, give them all joined users
device_list_updates.extend(updates);
}
let lazy_state_ids: OptionFuture<_> = witness
.filter(|_| !full_state && !encrypted_room)
.map(|witness| {
witness
.iter()
.stream()
.broad_filter_map(|user_id| state_get_shorteventid(user_id))
.into_future()
})
.into();
let state_diff: OptionFuture<_> = (!full_state && state_changed)
.then(|| {
services
.rooms
.state_accessor
.state_added((since_shortstatehash, current_shortstatehash))
.boxed()
.into_future()
})
.into();
let current_state_ids: OptionFuture<_> = full_state
.then(|| {
services
.rooms
.state_accessor
.state_full_shortids(current_shortstatehash)
.expect_ok()
.boxed()
.into_future()
})
.into();
let lazy_state_ids = lazy_state_ids
.map(|opt| {
opt.map(|(curr, next)| {
let opt = curr;
let iter = Option::into_iter(opt);
IterStream::stream(iter).chain(next)
})
})
.map(Option::into_iter)
.map(IterStream::stream)
.flatten_stream()
.flatten();
let state_diff_ids = state_diff
.map(|opt| {
opt.map(|(curr, next)| {
let opt = curr;
let iter = Option::into_iter(opt);
IterStream::stream(iter).chain(next)
})
})
.map(Option::into_iter)
.map(IterStream::stream)
.flatten_stream()
.flatten();
let state_events = current_state_ids
.map(|opt| {
opt.map(|(curr, next)| {
let opt = curr;
let iter = Option::into_iter(opt);
IterStream::stream(iter).chain(next)
})
})
.map(Option::into_iter)
.map(IterStream::stream)
.flatten_stream()
.flatten()
.chain(state_diff_ids)
.broad_filter_map(|(shortstatekey, shorteventid)| async move {
if witness.is_none() || encrypted_room {
return Some(shorteventid);
}
lazy_filter(services, sender_user, shortstatekey, shorteventid).await
})
.chain(lazy_state_ids)
.broad_filter_map(|shorteventid| {
services
.rooms
.short
.get_eventid_from_short(shorteventid)
.ok()
})
.broad_filter_map(|event_id: OwnedEventId| async move {
services.rooms.timeline.get_pdu(&event_id).await.ok()
})
.collect::<Vec<_>>()
.await;
let (device_list_updates, left_encrypted_users) = state_events
.iter()
.stream()
.ready_filter(|_| encrypted_room)
.ready_filter(|state_event| state_event.kind == RoomMember)
.ready_filter_map(|state_event| {
let content: RoomMemberEventContent = state_event.get_content().ok()?;
let user_id: OwnedUserId = state_event.state_key.as_ref()?.parse().ok()?;
Some((content, user_id))
})
.fold_default(|(mut dlu, mut leu): pair_of!(HashSet<_>), (content, user_id)| async move {
use MembershipState::*;
let shares_encrypted_room =
|user_id| share_encrypted_room(services, sender_user, user_id, Some(room_id));
match content.membership {
| Leave => leu.insert(user_id),
| Join if joined_since_last_sync || !shares_encrypted_room(&user_id).await =>
dlu.insert(user_id),
| _ => false,
};
(dlu, leu)
})
.await;
let send_member_count = state_events.iter().any(|event| event.kind == RoomMember);
let (joined_member_count, invited_member_count, heroes) = if send_member_count {
calculate_counts(services, room_id, sender_user).await?
@@ -1140,11 +1147,29 @@ async fn calculate_state_incremental(
heroes,
joined_member_count,
invited_member_count,
joined_since_last_sync,
state_events: delta_state_events,
state_events,
device_list_updates,
left_encrypted_users,
})
}
async fn lazy_filter(
services: &Services,
sender_user: &UserId,
shortstatekey: ShortStateKey,
shorteventid: ShortEventId,
) -> Option<ShortEventId> {
let (event_type, state_key) = services
.rooms
.short
.get_statekey_from_short(shortstatekey)
.await
.ok()?;
(event_type != StateEventType::RoomMember || state_key == sender_user.as_str())
.then_some(shorteventid)
}
async fn calculate_counts(
services: &Services,
room_id: &RoomId,
+14 -7
View File
@@ -153,7 +153,7 @@ pub(crate) async fn sync_events_v4_route(
if body.extensions.account_data.enabled.unwrap_or(false) {
account_data.global = services
.account_data
.changes_since(None, sender_user, globalsince)
.changes_since(None, sender_user, globalsince, Some(next_batch))
.ready_filter_map(|e| extract_variant!(e, AnyRawAccountDataEvent::Global))
.collect()
.await;
@@ -164,7 +164,7 @@ pub(crate) async fn sync_events_v4_route(
room.clone(),
services
.account_data
.changes_since(Some(&room), sender_user, globalsince)
.changes_since(Some(&room), sender_user, globalsince, Some(next_batch))
.ready_filter_map(|e| extract_variant!(e, AnyRawAccountDataEvent::Room))
.collect()
.await,
@@ -241,13 +241,15 @@ pub(crate) async fn sync_events_v4_route(
.rooms
.state_accessor
.state_full_ids(current_shortstatehash)
.await?;
.collect()
.await;
let since_state_ids = services
let since_state_ids: HashMap<_, _> = services
.rooms
.state_accessor
.state_full_ids(since_shortstatehash)
.await?;
.collect()
.await;
for (key, id) in current_state_ids {
if since_state_ids.get(&key) != Some(&id) {
@@ -529,7 +531,7 @@ pub(crate) async fn sync_events_v4_route(
room_id.to_owned(),
services
.account_data
.changes_since(Some(room_id), sender_user, *roomsince)
.changes_since(Some(room_id), sender_user, *roomsince, Some(next_batch))
.ready_filter_map(|e| extract_variant!(e, AnyRawAccountDataEvent::Room))
.collect()
.await,
@@ -777,7 +779,12 @@ pub(crate) async fn sync_events_v4_route(
Some(sync_events::v4::ToDevice {
events: services
.users
.get_to_device_events(sender_user, &sender_device)
.get_to_device_events(
sender_user,
&sender_device,
Some(globalsince),
Some(next_batch),
)
.collect()
.await,
next_batch: next_batch.to_string(),
+9 -7
View File
@@ -390,7 +390,7 @@ async fn process_rooms(
room_id.to_owned(),
services
.account_data
.changes_since(Some(room_id), sender_user, *roomsince)
.changes_since(Some(room_id), sender_user, *roomsince, Some(next_batch))
.ready_filter_map(|e| extract_variant!(e, AnyRawAccountDataEvent::Room))
.collect()
.await,
@@ -644,7 +644,7 @@ async fn collect_account_data(
account_data.global = services
.account_data
.changes_since(None, sender_user, globalsince)
.changes_since(None, sender_user, globalsince, None)
.ready_filter_map(|e| extract_variant!(e, AnyRawAccountDataEvent::Global))
.collect()
.await;
@@ -655,7 +655,7 @@ async fn collect_account_data(
room.clone(),
services
.account_data
.changes_since(Some(room), sender_user, globalsince)
.changes_since(Some(room), sender_user, globalsince, None)
.ready_filter_map(|e| extract_variant!(e, AnyRawAccountDataEvent::Room))
.collect()
.await,
@@ -748,13 +748,15 @@ async fn collect_e2ee<'a>(
.rooms
.state_accessor
.state_full_ids(current_shortstatehash)
.await?;
.collect()
.await;
let since_state_ids = services
let since_state_ids: HashMap<_, _> = services
.rooms
.state_accessor
.state_full_ids(since_shortstatehash)
.await?;
.collect()
.await;
for (key, id) in current_state_ids {
if since_state_ids.get(&key) != Some(&id) {
@@ -874,7 +876,7 @@ async fn collect_to_device(
next_batch: next_batch.to_string(),
events: services
.users
.get_to_device_events(sender_user, sender_device)
.get_to_device_events(sender_user, sender_device, None, Some(next_batch))
.collect()
.await,
})
+16 -12
View File
@@ -10,6 +10,7 @@ use ruma::{
},
to_device::DeviceIdOrAllDevices,
};
use service::sending::EduBuf;
use crate::Ruma;
@@ -42,18 +43,21 @@ pub(crate) async fn send_event_to_device_route(
messages.insert(target_user_id.clone(), map);
let count = services.globals.next_count()?;
services.sending.send_edu_server(
target_user_id.server_name(),
serde_json::to_vec(&federation::transactions::edu::Edu::DirectToDevice(
DirectDeviceContent {
sender: sender_user.clone(),
ev_type: body.event_type.clone(),
message_id: count.to_string().into(),
messages,
},
))
.expect("DirectToDevice EDU can be serialized"),
)?;
let mut buf = EduBuf::new();
serde_json::to_writer(
&mut buf,
&federation::transactions::edu::Edu::DirectToDevice(DirectDeviceContent {
sender: sender_user.clone(),
ev_type: body.event_type.clone(),
message_id: count.to_string().into(),
messages,
}),
)
.expect("DirectToDevice EDU can be serialized");
services
.sending
.send_edu_server(target_user_id.server_name(), buf)?;
continue;
}
+1 -1
View File
@@ -38,7 +38,7 @@ pub(crate) async fn turn_server_route(
let user = body.sender_user.unwrap_or_else(|| {
UserId::parse_with_server_name(
utils::random_string(RANDOM_USER_ID_LENGTH).to_lowercase(),
&services.server.config.server_name,
&services.server.name,
)
.unwrap()
});
+2 -2
View File
@@ -1,7 +1,7 @@
use std::{borrow::Borrow, iter::once};
use axum::extract::State;
use conduwuit::{Error, Result};
use conduwuit::{utils::stream::ReadyExt, Error, Result};
use futures::StreamExt;
use ruma::{
api::{client::error::ErrorKind, federation::authorization::get_event_authorization},
@@ -48,7 +48,7 @@ pub(crate) async fn get_event_authorization_route(
.rooms
.auth_chain
.event_ids_iter(room_id, once(body.event_id.borrow()))
.await?
.ready_filter_map(Result::ok)
.filter_map(|id| async move { services.rooms.timeline.get_pdu_json(&id).await.ok() })
.then(|pdu| services.sending.convert_to_outgoing_federation_event(pdu))
.collect()
+2 -2
View File
@@ -13,7 +13,7 @@ use crate::{Error, Result, Ruma};
/// # `POST /_matrix/federation/v1/publicRooms`
///
/// Lists the public rooms on this server.
#[tracing::instrument(skip_all, fields(%client), name = "publicrooms")]
#[tracing::instrument(name = "publicrooms", level = "debug", skip_all, fields(%client))]
pub(crate) async fn get_public_rooms_filtered_route(
State(services): State<crate::State>,
InsecureClientIp(client): InsecureClientIp,
@@ -51,7 +51,7 @@ pub(crate) async fn get_public_rooms_filtered_route(
/// # `GET /_matrix/federation/v1/publicRooms`
///
/// Lists the public rooms on this server.
#[tracing::instrument(skip_all, fields(%client), "publicrooms")]
#[tracing::instrument(name = "publicrooms", level = "debug", skip_all, fields(%client))]
pub(crate) async fn get_public_rooms_route(
State(services): State<crate::State>,
InsecureClientIp(client): InsecureClientIp,
+363 -261
View File
@@ -3,17 +3,27 @@ use std::{collections::BTreeMap, net::IpAddr, time::Instant};
use axum::extract::State;
use axum_client_ip::InsecureClientIp;
use conduwuit::{
debug, debug_warn, err, error, result::LogErr, trace, utils::ReadyExt, warn, Err, Error,
Result,
debug,
debug::INFO_SPAN_LEVEL,
debug_warn, err, error,
result::LogErr,
trace,
utils::{
stream::{automatic_width, BroadbandExt, TryBroadbandExt},
IterStream, ReadyExt,
},
warn, Err, Error, Result,
};
use futures::{FutureExt, StreamExt};
use futures::{FutureExt, Stream, StreamExt, TryFutureExt, TryStreamExt};
use itertools::Itertools;
use ruma::{
api::{
client::error::ErrorKind,
federation::transactions::{
edu::{
DeviceListUpdateContent, DirectDeviceContent, Edu, PresenceContent,
ReceiptContent, SigningKeyUpdateContent, TypingContent,
PresenceUpdate, ReceiptContent, ReceiptData, ReceiptMap, SigningKeyUpdateContent,
TypingContent,
},
send_transaction_message,
},
@@ -21,27 +31,28 @@ use ruma::{
events::receipt::{ReceiptEvent, ReceiptEventContent, ReceiptType},
serde::Raw,
to_device::DeviceIdOrAllDevices,
OwnedEventId, ServerName,
CanonicalJsonObject, OwnedEventId, OwnedRoomId, OwnedUserId, RoomId, ServerName, UserId,
};
use serde_json::value::RawValue as RawJsonValue;
use service::{
sending::{EDU_LIMIT, PDU_LIMIT},
Services,
};
use utils::millis_since_unix_epoch;
use crate::{
utils::{self},
Ruma,
};
type ResolvedMap = BTreeMap<OwnedEventId, Result<()>>;
type ResolvedMap = BTreeMap<OwnedEventId, Result>;
type Pdu = (OwnedRoomId, OwnedEventId, CanonicalJsonObject);
/// # `PUT /_matrix/federation/v1/send/{txnId}`
///
/// Push EDUs and PDUs to this server.
#[tracing::instrument(
name = "send",
level = "debug",
name = "txn",
level = INFO_SPAN_LEVEL,
skip_all,
fields(
%client,
@@ -73,91 +84,41 @@ pub(crate) async fn send_transaction_message_route(
let txn_start_time = Instant::now();
trace!(
pdus = ?body.pdus.len(),
edus = ?body.edus.len(),
pdus = body.pdus.len(),
edus = body.edus.len(),
elapsed = ?txn_start_time.elapsed(),
id = ?body.transaction_id,
origin =?body.origin(),
"Starting txn",
);
let resolved_map =
handle_pdus(&services, &client, &body.pdus, body.origin(), &txn_start_time)
.boxed()
.await?;
let pdus = body
.pdus
.iter()
.stream()
.broad_then(|pdu| services.rooms.event_handler.parse_incoming_pdu(pdu))
.inspect_err(|e| debug_warn!("Could not parse PDU: {e}"))
.ready_filter_map(Result::ok);
handle_edus(&services, &client, &body.edus, body.origin())
.boxed()
.await;
let edus = body
.edus
.iter()
.map(|edu| edu.json().get())
.map(serde_json::from_str)
.filter_map(Result::ok)
.stream();
let results = handle(&services, &client, body.origin(), txn_start_time, pdus, edus).await?;
debug!(
pdus = ?body.pdus.len(),
edus = ?body.edus.len(),
pdus = body.pdus.len(),
edus = body.edus.len(),
elapsed = ?txn_start_time.elapsed(),
id = ?body.transaction_id,
origin =?body.origin(),
"Finished txn",
);
Ok(send_transaction_message::v1::Response {
pdus: resolved_map
.into_iter()
.map(|(e, r)| (e, r.map_err(error::sanitized_message)))
.collect(),
})
}
async fn handle_pdus(
services: &Services,
_client: &IpAddr,
pdus: &[Box<RawJsonValue>],
origin: &ServerName,
txn_start_time: &Instant,
) -> Result<ResolvedMap> {
let mut parsed_pdus = Vec::with_capacity(pdus.len());
for pdu in pdus {
parsed_pdus.push(match services.rooms.event_handler.parse_incoming_pdu(pdu).await {
| Ok(t) => t,
| Err(e) => {
debug_warn!("Could not parse PDU: {e}");
continue;
},
});
// We do not add the event_id field to the pdu here because of signature
// and hashes checks
}
let mut resolved_map = BTreeMap::new();
for (event_id, value, room_id) in parsed_pdus {
services.server.check_running()?;
let pdu_start_time = Instant::now();
let mutex_lock = services
.rooms
.event_handler
.mutex_federation
.lock(&room_id)
.await;
let result = services
.rooms
.event_handler
.handle_incoming_pdu(origin, &room_id, &event_id, value, true)
.boxed()
.await
.map(|_| ());
drop(mutex_lock);
debug!(
pdu_elapsed = ?pdu_start_time.elapsed(),
txn_elapsed = ?txn_start_time.elapsed(),
"Finished PDU {event_id}",
);
resolved_map.insert(event_id, result);
}
for (id, result) in &resolved_map {
for (id, result) in &results {
if let Err(e) = result {
if matches!(e, Error::BadRequest(ErrorKind::NotFound, _)) {
warn!("Incoming PDU failed {id}: {e:?}");
@@ -165,39 +126,117 @@ async fn handle_pdus(
}
}
Ok(resolved_map)
Ok(send_transaction_message::v1::Response {
pdus: results
.into_iter()
.map(|(e, r)| (e, r.map_err(error::sanitized_message)))
.collect(),
})
}
async fn handle_edus(
async fn handle(
services: &Services,
client: &IpAddr,
edus: &[Raw<Edu>],
origin: &ServerName,
) {
for edu in edus
.iter()
.filter_map(|edu| serde_json::from_str::<Edu>(edu.json().get()).ok())
{
match edu {
| Edu::Presence(presence) => {
handle_edu_presence(services, client, origin, presence).await;
},
| Edu::Receipt(receipt) =>
handle_edu_receipt(services, client, origin, receipt).await,
| Edu::Typing(typing) => handle_edu_typing(services, client, origin, typing).await,
| Edu::DeviceListUpdate(content) => {
handle_edu_device_list_update(services, client, origin, content).await;
},
| Edu::DirectToDevice(content) => {
handle_edu_direct_to_device(services, client, origin, content).await;
},
| Edu::SigningKeyUpdate(content) => {
handle_edu_signing_key_update(services, client, origin, content).await;
},
| Edu::_Custom(ref _custom) => {
debug_warn!(?edus, "received custom/unknown EDU");
},
}
started: Instant,
pdus: impl Stream<Item = Pdu> + Send,
edus: impl Stream<Item = Edu> + Send,
) -> Result<ResolvedMap> {
// group pdus by room
let pdus = pdus
.collect()
.map(|mut pdus: Vec<_>| {
pdus.sort_by(|(room_a, ..), (room_b, ..)| room_a.cmp(room_b));
pdus.into_iter()
.into_grouping_map_by(|(room_id, ..)| room_id.clone())
.collect()
})
.await;
// we can evaluate rooms concurrently
let results: ResolvedMap = pdus
.into_iter()
.try_stream()
.broad_and_then(|(room_id, pdus): (_, Vec<_>)| {
handle_room(services, client, origin, started, room_id, pdus.into_iter())
.map_ok(Vec::into_iter)
.map_ok(IterStream::try_stream)
})
.try_flatten()
.try_collect()
.boxed()
.await?;
// evaluate edus after pdus, at least for now.
edus.for_each_concurrent(automatic_width(), |edu| handle_edu(services, client, origin, edu))
.boxed()
.await;
Ok(results)
}
async fn handle_room(
services: &Services,
_client: &IpAddr,
origin: &ServerName,
txn_start_time: Instant,
room_id: OwnedRoomId,
pdus: impl Iterator<Item = Pdu> + Send,
) -> Result<Vec<(OwnedEventId, Result)>> {
let _room_lock = services
.rooms
.event_handler
.mutex_federation
.lock(&room_id)
.await;
let room_id = &room_id;
pdus.try_stream()
.and_then(|(_, event_id, value)| async move {
services.server.check_running()?;
let pdu_start_time = Instant::now();
let result = services
.rooms
.event_handler
.handle_incoming_pdu(origin, room_id, &event_id, value, true)
.await
.map(|_| ());
debug!(
pdu_elapsed = ?pdu_start_time.elapsed(),
txn_elapsed = ?txn_start_time.elapsed(),
"Finished PDU {event_id}",
);
Ok((event_id, result))
})
.try_collect()
.await
}
async fn handle_edu(services: &Services, client: &IpAddr, origin: &ServerName, edu: Edu) {
match edu {
| Edu::Presence(presence) if services.server.config.allow_incoming_presence =>
handle_edu_presence(services, client, origin, presence).await,
| Edu::Receipt(receipt) if services.server.config.allow_incoming_read_receipts =>
handle_edu_receipt(services, client, origin, receipt).await,
| Edu::Typing(typing) if services.server.config.allow_incoming_typing =>
handle_edu_typing(services, client, origin, typing).await,
| Edu::DeviceListUpdate(content) =>
handle_edu_device_list_update(services, client, origin, content).await,
| Edu::DirectToDevice(content) =>
handle_edu_direct_to_device(services, client, origin, content).await,
| Edu::SigningKeyUpdate(content) =>
handle_edu_signing_key_update(services, client, origin, content).await,
| Edu::_Custom(ref _custom) => debug_warn!(?edu, "received custom/unknown EDU"),
| _ => trace!(?edu, "skipped"),
}
}
@@ -207,32 +246,41 @@ async fn handle_edu_presence(
origin: &ServerName,
presence: PresenceContent,
) {
if !services.globals.allow_incoming_presence() {
presence
.push
.into_iter()
.stream()
.for_each_concurrent(automatic_width(), |update| {
handle_edu_presence_update(services, origin, update)
})
.await;
}
async fn handle_edu_presence_update(
services: &Services,
origin: &ServerName,
update: PresenceUpdate,
) {
if update.user_id.server_name() != origin {
debug_warn!(
%update.user_id, %origin,
"received presence EDU for user not belonging to origin"
);
return;
}
for update in presence.push {
if update.user_id.server_name() != origin {
debug_warn!(
%update.user_id, %origin,
"received presence EDU for user not belonging to origin"
);
continue;
}
services
.presence
.set_presence(
&update.user_id,
&update.presence,
Some(update.currently_active),
Some(update.last_active_ago),
update.status_msg.clone(),
)
.await
.log_err()
.ok();
}
services
.presence
.set_presence(
&update.user_id,
&update.presence,
Some(update.currently_active),
Some(update.last_active_ago),
update.status_msg.clone(),
)
.await
.log_err()
.ok();
}
async fn handle_edu_receipt(
@@ -241,66 +289,94 @@ async fn handle_edu_receipt(
origin: &ServerName,
receipt: ReceiptContent,
) {
if !services.globals.allow_incoming_read_receipts() {
receipt
.receipts
.into_iter()
.stream()
.for_each_concurrent(automatic_width(), |(room_id, room_updates)| {
handle_edu_receipt_room(services, origin, room_id, room_updates)
})
.await;
}
async fn handle_edu_receipt_room(
services: &Services,
origin: &ServerName,
room_id: OwnedRoomId,
room_updates: ReceiptMap,
) {
if services
.rooms
.event_handler
.acl_check(origin, &room_id)
.await
.is_err()
{
debug_warn!(
%origin, %room_id,
"received read receipt EDU from ACL'd server"
);
return;
}
for (room_id, room_updates) in receipt.receipts {
if services
.rooms
.event_handler
.acl_check(origin, &room_id)
.await
.is_err()
{
debug_warn!(
%origin, %room_id,
"received read receipt EDU from ACL'd server"
);
continue;
}
let room_id = &room_id;
room_updates
.read
.into_iter()
.stream()
.for_each_concurrent(automatic_width(), |(user_id, user_updates)| async move {
handle_edu_receipt_room_user(services, origin, room_id, &user_id, user_updates).await;
})
.await;
}
for (user_id, user_updates) in room_updates.read {
if user_id.server_name() != origin {
debug_warn!(
%user_id, %origin,
"received read receipt EDU for user not belonging to origin"
);
continue;
}
if services
.rooms
.state_cache
.room_members(&room_id)
.ready_any(|member| member.server_name() == user_id.server_name())
.await
{
for event_id in &user_updates.event_ids {
let user_receipts =
BTreeMap::from([(user_id.clone(), user_updates.data.clone())]);
let receipts = BTreeMap::from([(ReceiptType::Read, user_receipts)]);
let receipt_content = BTreeMap::from([(event_id.to_owned(), receipts)]);
let event = ReceiptEvent {
content: ReceiptEventContent(receipt_content),
room_id: room_id.clone(),
};
services
.rooms
.read_receipt
.readreceipt_update(&user_id, &room_id, &event)
.await;
}
} else {
debug_warn!(
%user_id, %room_id, %origin,
"received read receipt EDU from server who does not have a member in the room",
);
continue;
}
}
async fn handle_edu_receipt_room_user(
services: &Services,
origin: &ServerName,
room_id: &RoomId,
user_id: &UserId,
user_updates: ReceiptData,
) {
if user_id.server_name() != origin {
debug_warn!(
%user_id, %origin,
"received read receipt EDU for user not belonging to origin"
);
return;
}
if !services
.rooms
.state_cache
.server_in_room(origin, room_id)
.await
{
debug_warn!(
%user_id, %room_id, %origin,
"received read receipt EDU from server who does not have a member in the room",
);
return;
}
let data = &user_updates.data;
user_updates
.event_ids
.into_iter()
.stream()
.for_each_concurrent(automatic_width(), |event_id| async move {
let user_data = [(user_id.to_owned(), data.clone())];
let receipts = [(ReceiptType::Read, BTreeMap::from(user_data))];
let content = [(event_id.clone(), BTreeMap::from(receipts))];
services
.rooms
.read_receipt
.readreceipt_update(user_id, room_id, &ReceiptEvent {
content: ReceiptEventContent(content.into()),
room_id: room_id.to_owned(),
})
.await;
})
.await;
}
async fn handle_edu_typing(
@@ -309,10 +385,6 @@ async fn handle_edu_typing(
origin: &ServerName,
typing: TypingContent,
) {
if !services.server.config.allow_incoming_typing {
return;
}
if typing.user_id.server_name() != origin {
debug_warn!(
%typing.user_id, %origin,
@@ -335,41 +407,38 @@ async fn handle_edu_typing(
return;
}
if services
if !services
.rooms
.state_cache
.is_joined(&typing.user_id, &typing.room_id)
.await
{
if typing.typing {
let timeout = utils::millis_since_unix_epoch().saturating_add(
services
.server
.config
.typing_federation_timeout_s
.saturating_mul(1000),
);
services
.rooms
.typing
.typing_add(&typing.user_id, &typing.room_id, timeout)
.await
.log_err()
.ok();
} else {
services
.rooms
.typing
.typing_remove(&typing.user_id, &typing.room_id)
.await
.log_err()
.ok();
}
} else {
debug_warn!(
%typing.user_id, %typing.room_id, %origin,
"received typing EDU for user not in room"
);
return;
}
if typing.typing {
let secs = services.server.config.typing_federation_timeout_s;
let timeout = millis_since_unix_epoch().saturating_add(secs.saturating_mul(1000));
services
.rooms
.typing
.typing_add(&typing.user_id, &typing.room_id, timeout)
.await
.log_err()
.ok();
} else {
services
.rooms
.typing
.typing_remove(&typing.user_id, &typing.room_id)
.await
.log_err()
.ok();
}
}
@@ -398,7 +467,12 @@ async fn handle_edu_direct_to_device(
origin: &ServerName,
content: DirectDeviceContent,
) {
let DirectDeviceContent { sender, ev_type, message_id, messages } = content;
let DirectDeviceContent {
ref sender,
ref ev_type,
ref message_id,
messages,
} = content;
if sender.server_name() != origin {
debug_warn!(
@@ -411,60 +485,88 @@ async fn handle_edu_direct_to_device(
// Check if this is a new transaction id
if services
.transaction_ids
.existing_txnid(&sender, None, &message_id)
.existing_txnid(sender, None, message_id)
.await
.is_ok()
{
return;
}
for (target_user_id, map) in &messages {
for (target_device_id_maybe, event) in map {
let Ok(event) = event.deserialize_as().map_err(|e| {
err!(Request(InvalidParam(error!("To-Device event is invalid: {e}"))))
}) else {
continue;
};
let ev_type = ev_type.to_string();
match target_device_id_maybe {
| DeviceIdOrAllDevices::DeviceId(target_device_id) => {
services
.users
.add_to_device_event(
&sender,
target_user_id,
target_device_id,
&ev_type,
event,
)
.await;
},
| DeviceIdOrAllDevices::AllDevices => {
let (sender, ev_type, event) = (&sender, &ev_type, &event);
services
.users
.all_device_ids(target_user_id)
.for_each(|target_device_id| {
services.users.add_to_device_event(
sender,
target_user_id,
target_device_id,
ev_type,
event.clone(),
)
})
.await;
},
}
}
}
// process messages concurrently for different users
let ev_type = ev_type.to_string();
messages
.into_iter()
.stream()
.for_each_concurrent(automatic_width(), |(target_user_id, map)| {
handle_edu_direct_to_device_user(services, target_user_id, sender, &ev_type, map)
})
.await;
// Save transaction id with empty data
services
.transaction_ids
.add_txnid(&sender, None, &message_id, &[]);
.add_txnid(sender, None, message_id, &[]);
}
async fn handle_edu_direct_to_device_user<Event: Send + Sync>(
services: &Services,
target_user_id: OwnedUserId,
sender: &UserId,
ev_type: &str,
map: BTreeMap<DeviceIdOrAllDevices, Raw<Event>>,
) {
for (target_device_id_maybe, event) in map {
let Ok(event) = event
.deserialize_as()
.map_err(|e| err!(Request(InvalidParam(error!("To-Device event is invalid: {e}")))))
else {
continue;
};
handle_edu_direct_to_device_event(
services,
&target_user_id,
sender,
target_device_id_maybe,
ev_type,
event,
)
.await;
}
}
async fn handle_edu_direct_to_device_event(
services: &Services,
target_user_id: &UserId,
sender: &UserId,
target_device_id_maybe: DeviceIdOrAllDevices,
ev_type: &str,
event: serde_json::Value,
) {
match target_device_id_maybe {
| DeviceIdOrAllDevices::DeviceId(ref target_device_id) => {
services
.users
.add_to_device_event(sender, target_user_id, target_device_id, ev_type, event)
.await;
},
| DeviceIdOrAllDevices::AllDevices => {
services
.users
.all_device_ids(target_user_id)
.for_each(|target_device_id| {
services.users.add_to_device_event(
sender,
target_user_id,
target_device_id,
ev_type,
event.clone(),
)
})
.await;
},
}
}
async fn handle_edu_signing_key_update(
+8 -8
View File
@@ -1,10 +1,10 @@
#![allow(deprecated)]
use std::{borrow::Borrow, collections::HashMap};
use std::borrow::Borrow;
use axum::extract::State;
use conduwuit::{
err,
at, err,
pdu::gen_event_id_canonical_json,
utils::stream::{IterStream, TryBroadbandExt},
warn, Err, Result,
@@ -211,14 +211,16 @@ async fn create_join_event(
drop(mutex_lock);
let state_ids: HashMap<_, OwnedEventId> = services
let state_ids: Vec<OwnedEventId> = services
.rooms
.state_accessor
.state_full_ids(shortstatehash)
.await?;
.map(at!(1))
.collect()
.await;
let state = state_ids
.values()
.iter()
.try_stream()
.broad_and_then(|event_id| services.rooms.timeline.get_pdu_json(event_id))
.broad_and_then(|pdu| {
@@ -231,13 +233,11 @@ async fn create_join_event(
.boxed()
.await?;
let starting_events = state_ids.values().map(Borrow::borrow);
let starting_events = state_ids.iter().map(Borrow::borrow);
let auth_chain = services
.rooms
.auth_chain
.event_ids_iter(room_id, starting_events)
.await?
.map(Ok)
.broad_and_then(|event_id| async move {
services.rooms.timeline.get_pdu_json(&event_id).await
})
+4 -8
View File
@@ -1,7 +1,7 @@
use std::{borrow::Borrow, iter::once};
use axum::extract::State;
use conduwuit::{err, result::LogErr, utils::IterStream, Result};
use conduwuit::{at, err, utils::IterStream, Result};
use futures::{FutureExt, StreamExt, TryStreamExt};
use ruma::{api::federation::event::get_room_state, OwnedEventId};
@@ -35,11 +35,9 @@ pub(crate) async fn get_room_state_route(
.rooms
.state_accessor
.state_full_ids(shortstatehash)
.await
.log_err()
.map_err(|_| err!(Request(NotFound("PDU state IDs not found."))))?
.into_values()
.collect();
.map(at!(1))
.collect()
.await;
let pdus = state_ids
.iter()
@@ -58,8 +56,6 @@ pub(crate) async fn get_room_state_route(
.rooms
.auth_chain
.event_ids_iter(&body.room_id, once(body.event_id.borrow()))
.await?
.map(Ok)
.and_then(|id| async move { services.rooms.timeline.get_pdu_json(&id).await })
.and_then(|pdu| {
services
+7 -10
View File
@@ -1,8 +1,8 @@
use std::{borrow::Borrow, iter::once};
use axum::extract::State;
use conduwuit::{err, Result};
use futures::StreamExt;
use conduwuit::{at, err, Result};
use futures::{StreamExt, TryStreamExt};
use ruma::{api::federation::event::get_room_state_ids, OwnedEventId};
use super::AccessCheck;
@@ -36,19 +36,16 @@ pub(crate) async fn get_room_state_ids_route(
.rooms
.state_accessor
.state_full_ids(shortstatehash)
.await
.map_err(|_| err!(Request(NotFound("State ids not found"))))?
.into_values()
.collect();
.map(at!(1))
.collect()
.await;
let auth_chain_ids = services
.rooms
.auth_chain
.event_ids_iter(&body.room_id, once(body.event_id.borrow()))
.await?
.map(|id| (*id).to_owned())
.collect()
.await;
.try_collect()
.await?;
Ok(get_room_state_ids::v1::Response { auth_chain_ids, pdu_ids })
}
+19 -12
View File
@@ -8,6 +8,7 @@ use std::{
};
use arrayvec::ArrayVec;
use const_str::concat_bytes;
use tikv_jemalloc_ctl as mallctl;
use tikv_jemalloc_sys as ffi;
use tikv_jemallocator as jemalloc;
@@ -20,18 +21,24 @@ use crate::{
#[cfg(feature = "jemalloc_conf")]
#[unsafe(no_mangle)]
pub static malloc_conf: &[u8] = b"\
metadata_thp:always\
,percpu_arena:percpu\
,background_thread:true\
,max_background_threads:-1\
,lg_extent_max_active_fit:4\
,oversize_threshold:16777216\
,tcache_max:2097152\
,dirty_decay_ms:16000\
,muzzy_decay_ms:144000\
,prof_active:false\
\0";
pub static malloc_conf: &[u8] = concat_bytes!(
"lg_extent_max_active_fit:4",
",oversize_threshold:16777216",
",tcache_max:2097152",
",dirty_decay_ms:16000",
",muzzy_decay_ms:144000",
",percpu_arena:percpu",
",metadata_thp:always",
",background_thread:true",
",max_background_threads:-1",
MALLOC_CONF_PROF,
0
);
#[cfg(all(feature = "jemalloc_conf", feature = "jemalloc_prof"))]
const MALLOC_CONF_PROF: &str = ",prof_active:false";
#[cfg(all(feature = "jemalloc_conf", not(feature = "jemalloc_prof")))]
const MALLOC_CONF_PROF: &str = "";
#[global_allocator]
static JEMALLOC: jemalloc::Jemalloc = jemalloc::Jemalloc;
+18 -2
View File
@@ -6,8 +6,24 @@ use figment::Figment;
use super::DEPRECATED_KEYS;
use crate::{debug, debug_info, debug_warn, error, warn, Config, Err, Result, Server};
/// Performs check() with additional checks specific to reloading old config
/// with new config.
pub fn reload(old: &Config, new: &Config) -> Result {
check(new)?;
if new.server_name != old.server_name {
return Err!(Config(
"server_name",
"You can't change the server's name from {:?}.",
old.server_name
));
}
Ok(())
}
#[allow(clippy::cognitive_complexity)]
pub fn check(config: &Config) -> Result<()> {
pub fn check(config: &Config) -> Result {
if cfg!(debug_assertions) {
warn!("Note: conduwuit was built without optimisations (i.e. debug build)");
}
@@ -22,7 +38,7 @@ pub fn check(config: &Config) -> Result<()> {
));
}
if cfg!(all(feature = "hardened_malloc", feature = "jemalloc")) {
if cfg!(all(feature = "hardened_malloc", feature = "jemalloc", not(target_env = "msvc"))) {
debug_warn!(
"hardened_malloc and jemalloc compile-time features are both enabled, this causes \
jemalloc to be used."
+107 -3
View File
@@ -52,7 +52,7 @@ use crate::{err, error::Error, utils::sys, Result};
### For more information, see:
### https://conduwuit.puppyirl.gay/configuration.html
"#,
ignore = "catchall well_known tls"
ignore = "catchall well_known tls blurhashing"
)]
pub struct Config {
/// The server_name is the pretty name of this server. It is used as a
@@ -480,6 +480,36 @@ pub struct Config {
#[serde(default = "default_pusher_idle_timeout")]
pub pusher_idle_timeout: u64,
/// Maximum time to receive a request from a client (seconds).
///
/// default: 75
#[serde(default = "default_client_receive_timeout")]
pub client_receive_timeout: u64,
/// Maximum time to process a request received from a client (seconds).
///
/// default: 180
#[serde(default = "default_client_request_timeout")]
pub client_request_timeout: u64,
/// Maximum time to transmit a response to a client (seconds)
///
/// default: 120
#[serde(default = "default_client_response_timeout")]
pub client_response_timeout: u64,
/// Grace period for clean shutdown of client requests (seconds).
///
/// default: 10
#[serde(default = "default_client_shutdown_timeout")]
pub client_shutdown_timeout: u64,
/// Grace period for clean shutdown of federation requests (seconds).
///
/// default: 5
#[serde(default = "default_sender_shutdown_timeout")]
pub sender_shutdown_timeout: u64,
/// Enables registration. If set to false, no users can register on this
/// server.
///
@@ -510,8 +540,9 @@ pub struct Config {
/// display: sensitive
pub registration_token: Option<String>,
/// Path to a file on the system that gets read for the registration token.
/// this config option takes precedence/priority over "registration_token".
/// Path to a file on the system that gets read for additional registration
/// tokens. Multiple tokens can be added if you separate them with
/// whitespace
///
/// conduwuit must be able to access the file, and it must not be empty
///
@@ -1049,6 +1080,15 @@ pub struct Config {
#[serde(default)]
pub rocksdb_paranoid_file_checks: bool,
/// Enables or disables checksum verification in rocksdb at runtime.
/// Checksums are usually hardware accelerated with low overhead; they are
/// enabled in rocksdb by default. Older or slower platforms may see gains
/// from disabling.
///
/// default: true
#[serde(default = "true_fn")]
pub rocksdb_checksums: bool,
/// Database repair mode (for RocksDB SST corruption).
///
/// Use this option when the server reports corruption while running or
@@ -1545,6 +1585,15 @@ pub struct Config {
#[serde(default)]
pub admin_execute_errors_ignore: bool,
/// List of admin commands to execute on SIGUSR2.
///
/// Similar to admin_execute, but these commands are executed when the
/// server receives SIGUSR2 on supporting platforms.
///
/// default: []
#[serde(default)]
pub admin_signal_execute: Vec<String>,
/// Controls the max log level for admin command log captures (logs
/// generated from running admin commands). Defaults to "info" on release
/// builds, else "debug" on debug builds.
@@ -1733,6 +1782,16 @@ pub struct Config {
#[serde(default = "true_fn")]
pub listening: bool,
/// Enables configuration reload when the server receives SIGUSR1 on
/// supporting platforms.
///
/// default: true
#[serde(default = "true_fn")]
pub config_reload_signal: bool,
// external structure; separate section
#[serde(default)]
pub blurhashing: BlurhashConfig,
#[serde(flatten)]
#[allow(clippy::zero_sized_map_values)]
// this is a catchall, the map shouldn't be zero at runtime
@@ -1783,6 +1842,31 @@ pub struct WellKnownConfig {
pub support_mxid: Option<OwnedUserId>,
}
#[derive(Clone, Copy, Debug, Deserialize, Default)]
#[allow(rustdoc::broken_intra_doc_links, rustdoc::bare_urls)]
#[config_example_generator(filename = "conduwuit-example.toml", section = "global.blurhashing")]
pub struct BlurhashConfig {
/// blurhashing x component, 4 is recommended by https://blurha.sh/
///
/// default: 4
#[serde(default = "default_blurhash_x_component")]
pub components_x: u32,
/// blurhashing y component, 3 is recommended by https://blurha.sh/
///
/// default: 3
#[serde(default = "default_blurhash_y_component")]
pub components_y: u32,
/// Max raw size that the server will blurhash, this is the size of the
/// image after converting it to raw data, it should be higher than the
/// upload limit but not too high. The higher it is the higher the
/// potential load will be for clients requesting blurhashes. The default
/// is 33.55MB. Setting it to 0 disables blurhashing.
///
/// default: 33554432
#[serde(default = "default_blurhash_max_raw_size")]
pub blurhash_max_raw_size: u64,
}
#[derive(Deserialize, Clone, Debug)]
#[serde(transparent)]
struct ListeningPort {
@@ -2144,3 +2228,23 @@ fn default_stream_width_default() -> usize { 32 }
fn default_stream_width_scale() -> f32 { 1.0 }
fn default_stream_amplification() -> usize { 1024 }
fn default_client_receive_timeout() -> u64 { 75 }
fn default_client_request_timeout() -> u64 { 180 }
fn default_client_response_timeout() -> u64 { 120 }
fn default_client_shutdown_timeout() -> u64 { 15 }
fn default_sender_shutdown_timeout() -> u64 { 5 }
// blurhashing defaults recommended by https://blurha.sh/
// 2^25
pub(super) fn default_blurhash_max_raw_size() -> u64 { 33_554_432 }
pub(super) fn default_blurhash_x_component() -> u32 { 4 }
pub(super) fn default_blurhash_y_component() -> u32 { 3 }
// end recommended & blurhashing defaults
+21 -4
View File
@@ -1,9 +1,10 @@
#![allow(clippy::disallowed_macros)]
use std::{any::Any, panic};
use std::{any::Any, env, panic, sync::LazyLock};
// Export debug proc_macros
pub use conduwuit_macros::recursion_depth;
use tracing::Level;
// Export all of the ancillary tools from here as well.
pub use crate::{result::DebugInspect, utils::debug::*};
@@ -51,16 +52,32 @@ macro_rules! debug_info {
}
}
pub fn set_panic_trap() {
pub const INFO_SPAN_LEVEL: Level = if cfg!(debug_assertions) {
Level::INFO
} else {
Level::DEBUG
};
pub static DEBUGGER: LazyLock<bool> =
LazyLock::new(|| env::var("_").unwrap_or_default().ends_with("gdb"));
#[cfg_attr(debug_assertions, crate::ctor)]
#[cfg_attr(not(debug_assertions), allow(dead_code))]
fn set_panic_trap() {
if !*DEBUGGER {
return;
}
let next = panic::take_hook();
panic::set_hook(Box::new(move |info| {
panic_handler(info, &next);
}));
}
#[inline(always)]
#[cold]
#[inline(never)]
#[allow(deprecated_in_future)]
fn panic_handler(info: &panic::PanicHookInfo<'_>, next: &dyn Fn(&panic::PanicHookInfo<'_>)) {
pub fn panic_handler(info: &panic::PanicHookInfo<'_>, next: &dyn Fn(&panic::PanicHookInfo<'_>)) {
trap();
next(info);
}
+1
View File
@@ -106,6 +106,7 @@ pub(super) fn io_error_code(kind: std::io::ErrorKind) -> StatusCode {
| ErrorKind::TimedOut => StatusCode::GATEWAY_TIMEOUT,
| ErrorKind::FileTooLarge => StatusCode::PAYLOAD_TOO_LARGE,
| ErrorKind::StorageFull => StatusCode::INSUFFICIENT_STORAGE,
| ErrorKind::Interrupted => StatusCode::SERVICE_UNAVAILABLE,
| _ => StatusCode::INTERNAL_SERVER_ERROR,
}
}
+71 -6
View File
@@ -1,3 +1,5 @@
use std::{env, io, sync::LazyLock};
use tracing::{
field::{Field, Visit},
Event, Level, Subscriber,
@@ -7,12 +9,59 @@ use tracing_subscriber::{
fmt,
fmt::{
format::{Compact, DefaultVisitor, Format, Full, Pretty, Writer},
FmtContext, FormatEvent, FormatFields,
FmtContext, FormatEvent, FormatFields, MakeWriter,
},
registry::LookupSpan,
};
use crate::{Config, Result};
use crate::{apply, Config, Result};
static SYSTEMD_MODE: LazyLock<bool> =
LazyLock::new(|| env::var("SYSTEMD_EXEC_PID").is_ok() && env::var("JOURNAL_STREAM").is_ok());
pub struct ConsoleWriter {
stdout: io::Stdout,
stderr: io::Stderr,
_journal_stream: [u64; 2],
use_stderr: bool,
}
impl ConsoleWriter {
#[must_use]
pub fn new(_config: &Config) -> Self {
let journal_stream = get_journal_stream();
Self {
stdout: io::stdout(),
stderr: io::stderr(),
_journal_stream: journal_stream.into(),
use_stderr: journal_stream.0 != 0,
}
}
}
impl<'a> MakeWriter<'a> for ConsoleWriter {
type Writer = &'a Self;
fn make_writer(&'a self) -> Self::Writer { self }
}
impl io::Write for &'_ ConsoleWriter {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
if self.use_stderr {
self.stderr.lock().write(buf)
} else {
self.stdout.lock().write(buf)
}
}
fn flush(&mut self) -> io::Result<()> {
if self.use_stderr {
self.stderr.lock().flush()
} else {
self.stdout.lock().flush()
}
}
}
pub struct ConsoleFormat {
_compact: Format<Compact>,
@@ -20,10 +69,6 @@ pub struct ConsoleFormat {
pretty: Format<Pretty>,
}
struct ConsoleVisitor<'a> {
visitor: DefaultVisitor<'a>,
}
impl ConsoleFormat {
#[must_use]
pub fn new(config: &Config) -> Self {
@@ -68,6 +113,10 @@ where
}
}
struct ConsoleVisitor<'a> {
visitor: DefaultVisitor<'a>,
}
impl<'writer> FormatFields<'writer> for ConsoleFormat {
fn format_fields<R>(&self, writer: Writer<'writer>, fields: R) -> Result<(), std::fmt::Error>
where
@@ -92,3 +141,19 @@ impl Visit for ConsoleVisitor<'_> {
self.visitor.record_debug(field, value);
}
}
#[must_use]
fn get_journal_stream() -> (u64, u64) {
is_systemd_mode()
.then(|| env::var("JOURNAL_STREAM").ok())
.flatten()
.as_deref()
.and_then(|s| s.split_once(':'))
.map(apply!(2, str::parse))
.map(apply!(2, Result::unwrap_or_default))
.unwrap_or((0, 0))
}
#[inline]
#[must_use]
pub fn is_systemd_mode() -> bool { *SYSTEMD_MODE }
+2 -2
View File
@@ -2,14 +2,14 @@
pub mod capture;
pub mod color;
mod console;
pub mod console;
pub mod fmt;
pub mod fmt_span;
mod reload;
mod suppress;
pub use capture::Capture;
pub use console::ConsoleFormat;
pub use console::{is_systemd_mode, ConsoleFormat, ConsoleWriter};
pub use reload::{LogLevelReloadHandles, ReloadHandle};
pub use suppress::Suppress;
pub use tracing::Level;
-4
View File
@@ -19,8 +19,6 @@ pub struct Metrics {
runtime_intervals: std::sync::Mutex<Option<RuntimeIntervals>>,
// TODO: move stats
pub requests_spawn_active: AtomicU32,
pub requests_spawn_finished: AtomicU32,
pub requests_handle_active: AtomicU32,
pub requests_handle_finished: AtomicU32,
pub requests_panic: AtomicU32,
@@ -48,8 +46,6 @@ impl Metrics {
#[cfg(tokio_unstable)]
runtime_intervals: std::sync::Mutex::new(runtime_intervals),
requests_spawn_active: AtomicU32::new(0),
requests_spawn_finished: AtomicU32::new(0),
requests_handle_active: AtomicU32::new(0),
requests_handle_finished: AtomicU32::new(0),
requests_panic: AtomicU32::new(0),
+18
View File
@@ -90,3 +90,21 @@ pub fn copy_redacts(&self) -> (Option<OwnedEventId>, Box<RawJsonValue>) {
(self.redacts.clone(), self.content.clone())
}
#[implement(super::Pdu)]
#[must_use]
pub fn redacts_id(&self, room_version: &RoomVersionId) -> Option<OwnedEventId> {
use RoomVersionId::*;
if self.kind != TimelineEventType::RoomRedaction {
return None;
}
match *room_version {
| V1 | V2 | V3 | V4 | V5 | V6 | V7 | V8 | V9 | V10 => self.redacts.clone(),
| _ =>
self.get_content::<RoomRedactionEventContent>()
.ok()?
.redacts,
}
}
+6 -6
View File
@@ -116,7 +116,7 @@ pub fn to_message_like_event(&self) -> Raw<AnyMessageLikeEvent> {
#[must_use]
#[implement(super::Pdu)]
pub fn to_state_event_value(&self) -> JsonValue {
pub fn into_state_event_value(self) -> JsonValue {
let mut json = json!({
"content": self.content,
"type": self.kind,
@@ -127,7 +127,7 @@ pub fn to_state_event_value(&self) -> JsonValue {
"state_key": self.state_key,
});
if let Some(unsigned) = &self.unsigned {
if let Some(unsigned) = self.unsigned {
json["unsigned"] = json!(unsigned);
}
@@ -136,8 +136,8 @@ pub fn to_state_event_value(&self) -> JsonValue {
#[must_use]
#[implement(super::Pdu)]
pub fn to_state_event(&self) -> Raw<AnyStateEvent> {
serde_json::from_value(self.to_state_event_value()).expect("Raw::from_value always works")
pub fn into_state_event(self) -> Raw<AnyStateEvent> {
serde_json::from_value(self.into_state_event_value()).expect("Raw::from_value always works")
}
#[must_use]
@@ -188,7 +188,7 @@ pub fn to_stripped_spacechild_state_event(&self) -> Raw<HierarchySpaceChildEvent
#[must_use]
#[implement(super::Pdu)]
pub fn to_member_event(&self) -> Raw<StateEvent<RoomMemberEventContent>> {
pub fn into_member_event(self) -> Raw<StateEvent<RoomMemberEventContent>> {
let mut json = json!({
"content": self.content,
"type": self.kind,
@@ -200,7 +200,7 @@ pub fn to_member_event(&self) -> Raw<StateEvent<RoomMemberEventContent>> {
"state_key": self.state_key,
});
if let Some(unsigned) = &self.unsigned {
if let Some(unsigned) = self.unsigned {
json["unsigned"] = json!(unsigned);
}
+22 -5
View File
@@ -6,12 +6,17 @@ use std::{
time::SystemTime,
};
use ruma::OwnedServerName;
use tokio::{runtime, sync::broadcast};
use crate::{config, config::Config, err, log::Log, metrics::Metrics, Err, Result};
use crate::{config, config::Config, log::Log, metrics::Metrics, Err, Result};
/// Server runtime state; public portion
pub struct Server {
/// Configured name of server. This is the same as the one in the config
/// but developers can (and should) reference this string instead.
pub name: OwnedServerName,
/// Server-wide configuration instance
pub config: config::Manager,
@@ -46,6 +51,7 @@ impl Server {
#[must_use]
pub fn new(config: Config, runtime: Option<runtime::Handle>, log: Log) -> Self {
Self {
name: config.server_name.clone(),
config: config::Manager::new(config),
started: SystemTime::now(),
stopping: AtomicBool::new(false),
@@ -63,6 +69,10 @@ impl Server {
return Err!("Reloading not enabled");
}
#[cfg(all(feature = "systemd", target_os = "linux"))]
sd_notify::notify(true, &[sd_notify::NotifyState::Reloading])
.expect("failed to notify systemd of reloading state");
if self.reloading.swap(true, Ordering::AcqRel) {
return Err!("Reloading already in progress");
}
@@ -77,7 +87,7 @@ impl Server {
})
}
pub fn restart(&self) -> Result<()> {
pub fn restart(&self) -> Result {
if self.restarting.swap(true, Ordering::AcqRel) {
return Err!("Restart already in progress");
}
@@ -87,7 +97,11 @@ impl Server {
})
}
pub fn shutdown(&self) -> Result<()> {
pub fn shutdown(&self) -> Result {
#[cfg(all(feature = "systemd", target_os = "linux"))]
sd_notify::notify(true, &[sd_notify::NotifyState::Stopping])
.expect("failed to notify systemd of stopping state");
if self.stopping.swap(true, Ordering::AcqRel) {
return Err!("Shutdown already in progress");
}
@@ -106,7 +120,7 @@ impl Server {
}
#[inline]
pub async fn until_shutdown(self: Arc<Self>) {
pub async fn until_shutdown(self: &Arc<Self>) {
while self.running() {
self.signal.subscribe().recv().await.ok();
}
@@ -121,9 +135,12 @@ impl Server {
#[inline]
pub fn check_running(&self) -> Result {
use std::{io, io::ErrorKind::Interrupted};
self.running()
.then_some(())
.ok_or_else(|| err!(debug_warn!("Server is shutting down.")))
.ok_or_else(|| io::Error::new(Interrupted, "Server shutting down"))
.map_err(Into::into)
}
#[inline]
+11
View File
@@ -84,6 +84,17 @@ macro_rules! apply {
};
}
#[macro_export]
macro_rules! pair_of {
($decl:ty) => {
($decl, $decl)
};
($init:expr) => {
($init, $init)
};
}
/// Functor for truthy
#[macro_export]
macro_rules! is_true {
+28
View File
@@ -35,6 +35,13 @@ where
Fut: Future<Output = Option<U>> + Send,
U: Send;
fn broadn_flat_map<F, Fut, U, N>(self, n: N, f: F) -> impl Stream<Item = U> + Send
where
N: Into<Option<usize>>,
F: Fn(Item) -> Fut + Send,
Fut: Stream<Item = U> + Send + Unpin,
U: Send;
fn broadn_then<F, Fut, U, N>(self, n: N, f: F) -> impl Stream<Item = U> + Send
where
N: Into<Option<usize>>,
@@ -70,6 +77,16 @@ where
self.broadn_filter_map(None, f)
}
#[inline]
fn broad_flat_map<F, Fut, U>(self, f: F) -> impl Stream<Item = U> + Send
where
F: Fn(Item) -> Fut + Send,
Fut: Stream<Item = U> + Send + Unpin,
U: Send,
{
self.broadn_flat_map(None, f)
}
#[inline]
fn broad_then<F, Fut, U>(self, f: F) -> impl Stream<Item = U> + Send
where
@@ -122,6 +139,17 @@ where
.ready_filter_map(identity)
}
#[inline]
fn broadn_flat_map<F, Fut, U, N>(self, n: N, f: F) -> impl Stream<Item = U> + Send
where
N: Into<Option<usize>>,
F: Fn(Item) -> Fut + Send,
Fut: Stream<Item = U> + Send + Unpin,
U: Send,
{
self.flat_map_unordered(n.into().unwrap_or_else(automatic_width), f)
}
#[inline]
fn broadn_then<F, Fut, U, N>(self, n: N, f: F) -> impl Stream<Item = U> + Send
where
+18 -5
View File
@@ -22,7 +22,7 @@ pub(crate) fn from_slice<'a, T>(buf: &'a [u8]) -> Result<T>
where
T: Deserialize<'a>,
{
let mut deserializer = Deserializer { buf, pos: 0, seq: false };
let mut deserializer = Deserializer { buf, pos: 0, rec: 0, seq: false };
T::deserialize(&mut deserializer).debug_inspect(|_| {
deserializer
@@ -35,6 +35,7 @@ where
pub(crate) struct Deserializer<'de> {
buf: &'de [u8],
pos: usize,
rec: usize,
seq: bool,
}
@@ -107,7 +108,7 @@ impl<'de> Deserializer<'de> {
/// consumed None is returned instead.
#[inline]
fn record_peek_byte(&self) -> Option<u8> {
let started = self.pos != 0;
let started = self.pos != 0 || self.rec > 0;
let buf = &self.buf[self.pos..];
debug_assert!(
!started || buf[0] == Self::SEP,
@@ -121,13 +122,14 @@ impl<'de> Deserializer<'de> {
/// the start of the next record. (Case for some sequences)
#[inline]
fn record_start(&mut self) {
let started = self.pos != 0;
let started = self.pos != 0 || self.rec > 0;
debug_assert!(
!started || self.buf[self.pos] == Self::SEP,
"Missing expected record separator at current position"
);
self.inc_pos(started.into());
self.inc_rec(1);
}
/// Consume all remaining bytes, which may include record separators,
@@ -157,6 +159,9 @@ impl<'de> Deserializer<'de> {
debug_assert!(self.pos <= self.buf.len(), "pos out of range");
}
#[inline]
fn inc_rec(&mut self, n: usize) { self.rec = self.rec.saturating_add(n); }
/// Unconsumed input bytes.
#[inline]
fn remaining(&self) -> Result<usize> {
@@ -270,8 +275,16 @@ impl<'a, 'de: 'a> de::Deserializer<'de> for &'a mut Deserializer<'de> {
}
#[cfg_attr(unabridged, tracing::instrument(level = "trace", skip_all))]
fn deserialize_option<V: Visitor<'de>>(self, _visitor: V) -> Result<V::Value> {
unhandled!("deserialize Option not implemented")
fn deserialize_option<V: Visitor<'de>>(self, visitor: V) -> Result<V::Value> {
if self
.buf
.get(self.pos)
.is_none_or(|b| *b == Deserializer::SEP)
{
visitor.visit_none()
} else {
visitor.visit_some(self)
}
}
#[cfg_attr(unabridged, tracing::instrument(level = "trace", skip_all))]
+4 -3
View File
@@ -30,12 +30,13 @@ use crate::{
};
pub struct Engine {
pub(super) read_only: bool,
pub(super) secondary: bool,
corks: AtomicU32,
pub(crate) db: Db,
pub(crate) pool: Arc<Pool>,
pub(crate) ctx: Arc<Context>,
pub(super) read_only: bool,
pub(super) secondary: bool,
pub(crate) checksums: bool,
corks: AtomicU32,
}
pub(crate) type Db = DBWithThreadMode<MultiThreaded>;
+7
View File
@@ -72,6 +72,13 @@ fn descriptor_cf_options(
opts.set_options_from_string("{{arena_block_size=2097152;}}")
.map_err(map_err)?;
#[cfg(debug_assertions)]
opts.set_options_from_string(
"{{paranoid_checks=true;paranoid_file_checks=true;force_consistency_checks=true;\
verify_sst_unique_id_in_manifest=true;}}",
)
.map_err(map_err)?;
Ok(opts)
}
+6 -6
View File
@@ -83,7 +83,7 @@ pub(crate) static RANDOM: Descriptor = Descriptor {
write_size: 1024 * 1024 * 32,
cache_shards: 128,
compression_level: -3,
bottommost_level: Some(4),
bottommost_level: Some(-1),
compressed_index: true,
..BASE
};
@@ -94,8 +94,8 @@ pub(crate) static SEQUENTIAL: Descriptor = Descriptor {
level_size: 1024 * 1024 * 32,
file_size: 1024 * 1024 * 2,
cache_shards: 128,
compression_level: -1,
bottommost_level: Some(6),
compression_level: -2,
bottommost_level: Some(-1),
compression_shape: [0, 0, 1, 1, 1, 1, 1],
compressed_index: false,
..BASE
@@ -111,7 +111,7 @@ pub(crate) static RANDOM_SMALL: Descriptor = Descriptor {
block_size: 512,
cache_shards: 64,
compression_level: -4,
bottommost_level: Some(1),
bottommost_level: Some(-1),
compression_shape: [0, 0, 0, 0, 0, 1, 1],
compressed_index: false,
..RANDOM
@@ -126,8 +126,8 @@ pub(crate) static SEQUENTIAL_SMALL: Descriptor = Descriptor {
block_size: 512,
cache_shards: 64,
block_index_hashing: Some(false),
compression_level: -2,
bottommost_level: Some(4),
compression_level: -4,
bottommost_level: Some(-2),
compression_shape: [0, 0, 0, 0, 1, 1, 1],
compressed_index: false,
..SEQUENTIAL
+9 -26
View File
@@ -1,32 +1,15 @@
use std::fmt::Write;
use conduwuit::{implement, Result};
use rocksdb::LiveFile as SstFile;
use super::Engine;
use crate::util::map_err;
#[implement(Engine)]
pub fn file_list(&self) -> Result<String> {
match self.db.live_files() {
| Err(e) => Ok(String::from(e)),
| Ok(mut files) => {
files.sort_by_key(|f| f.name.clone());
let mut res = String::new();
writeln!(res, "| lev | sst | keys | dels | size | column |")?;
writeln!(res, "| ---: | :--- | ---: | ---: | ---: | :--- |")?;
for file in files {
writeln!(
res,
"| {} | {:<13} | {:7}+ | {:4}- | {:9} | {} |",
file.level,
file.name,
file.num_entries,
file.num_deletions,
file.size,
file.column_family_name,
)?;
}
Ok(res)
},
}
pub fn file_list(&self) -> impl Iterator<Item = Result<SstFile>> + Send {
self.db
.live_files()
.map_err(map_err)
.into_iter()
.flat_map(Vec::into_iter)
.map(Ok)
}
+4 -3
View File
@@ -56,12 +56,13 @@ pub(crate) async fn open(ctx: Arc<Context>, desc: &[Descriptor]) -> Result<Arc<S
);
Ok(Arc::new(Self {
db,
pool: ctx.pool.clone(),
ctx: ctx.clone(),
read_only: config.rocksdb_read_only,
secondary: config.rocksdb_secondary,
checksums: config.rocksdb_checksums,
corks: AtomicU32::new(0),
pool: ctx.pool.clone(),
db,
ctx,
}))
}
+11 -8
View File
@@ -9,6 +9,8 @@ mod keys_from;
mod keys_prefix;
mod open;
mod options;
mod qry;
mod qry_batch;
mod remove;
mod rev_keys;
mod rev_keys_from;
@@ -37,28 +39,29 @@ pub(crate) use self::options::{
cache_iter_options_default, cache_read_options_default, iter_options_default,
read_options_default, write_options_default,
};
pub use self::{get_batch::Get, qry_batch::Qry};
use crate::{watchers::Watchers, Engine};
pub struct Map {
name: &'static str,
db: Arc<Engine>,
cf: Arc<ColumnFamily>,
watchers: Watchers,
write_options: WriteOptions,
cf: Arc<ColumnFamily>,
db: Arc<Engine>,
read_options: ReadOptions,
cache_read_options: ReadOptions,
write_options: WriteOptions,
}
impl Map {
pub(crate) fn open(db: &Arc<Engine>, name: &'static str) -> Result<Arc<Self>> {
Ok(Arc::new(Self {
name,
db: db.clone(),
cf: open::open(db, name),
watchers: Watchers::default(),
write_options: write_options_default(),
read_options: read_options_default(),
cache_read_options: cache_read_options_default(),
cf: open::open(db, name),
db: db.clone(),
read_options: read_options_default(db),
cache_read_options: cache_read_options_default(db),
write_options: write_options_default(db),
}))
}
+1 -51
View File
@@ -1,65 +1,15 @@
use std::{convert::AsRef, fmt::Debug, io::Write, sync::Arc};
use std::{convert::AsRef, fmt::Debug, sync::Arc};
use arrayvec::ArrayVec;
use conduwuit::{err, implement, utils::result::MapExpect, Err, Result};
use futures::{future::ready, Future, FutureExt, TryFutureExt};
use rocksdb::{DBPinnableSlice, ReadOptions};
use serde::Serialize;
use tokio::task;
use crate::{
keyval::KeyBuf,
ser,
util::{is_incomplete, map_err, or_else},
Handle,
};
/// Fetch a value from the database into cache, returning a reference-handle
/// asynchronously. The key is serialized into an allocated buffer to perform
/// the query.
#[implement(super::Map)]
#[inline]
pub fn qry<K>(self: &Arc<Self>, key: &K) -> impl Future<Output = Result<Handle<'_>>> + Send
where
K: Serialize + ?Sized + Debug,
{
let mut buf = KeyBuf::new();
self.bqry(key, &mut buf)
}
/// Fetch a value from the database into cache, returning a reference-handle
/// asynchronously. The key is serialized into a fixed-sized buffer to perform
/// the query. The maximum size is supplied as const generic parameter.
#[implement(super::Map)]
#[inline]
pub fn aqry<const MAX: usize, K>(
self: &Arc<Self>,
key: &K,
) -> impl Future<Output = Result<Handle<'_>>> + Send
where
K: Serialize + ?Sized + Debug,
{
let mut buf = ArrayVec::<u8, MAX>::new();
self.bqry(key, &mut buf)
}
/// Fetch a value from the database into cache, returning a reference-handle
/// asynchronously. The key is serialized into a user-supplied Writer.
#[implement(super::Map)]
#[tracing::instrument(skip(self, buf), level = "trace")]
pub fn bqry<K, B>(
self: &Arc<Self>,
key: &K,
buf: &mut B,
) -> impl Future<Output = Result<Handle<'_>>> + Send
where
K: Serialize + ?Sized + Debug,
B: Write + AsRef<[u8]>,
{
let key = ser::serialize(buf, key).expect("failed to serialize query key");
self.get(key)
}
/// Fetch a value from the database into cache, returning a reference-handle
/// asynchronously. The key is referenced directly to perform the query.
#[implement(super::Map)]
+18 -27
View File
@@ -1,4 +1,4 @@
use std::{convert::AsRef, fmt::Debug, sync::Arc};
use std::{convert::AsRef, sync::Arc};
use conduwuit::{
implement,
@@ -10,43 +10,34 @@ use conduwuit::{
};
use futures::{Stream, StreamExt, TryStreamExt};
use rocksdb::{DBPinnableSlice, ReadOptions};
use serde::Serialize;
use super::get::{cached_handle_from, handle_from};
use crate::{keyval::KeyBuf, ser, Handle};
use crate::Handle;
#[implement(super::Map)]
#[tracing::instrument(skip(self, keys), level = "trace")]
pub fn qry_batch<'a, S, K>(
self: &'a Arc<Self>,
keys: S,
) -> impl Stream<Item = Result<Handle<'_>>> + Send + 'a
pub trait Get<'a, K, S>
where
Self: Sized,
S: Stream<Item = K> + Send + 'a,
K: Serialize + Debug + 'a,
K: AsRef<[u8]> + Send + Sync + 'a,
{
use crate::pool::Get;
fn get(self, map: &'a Arc<super::Map>) -> impl Stream<Item = Result<Handle<'_>>> + Send + 'a;
}
keys.ready_chunks(automatic_amplification())
.widen_then(automatic_width(), |chunk| {
let keys = chunk
.iter()
.map(ser::serialize_to::<KeyBuf, _>)
.map(|result| result.expect("failed to serialize query key"))
.map(Into::into)
.collect();
self.db
.pool
.execute_get(Get { map: self.clone(), key: keys, res: None })
})
.map_ok(|results| results.into_iter().stream())
.try_flatten()
impl<'a, K, S> Get<'a, K, S> for S
where
Self: Sized,
S: Stream<Item = K> + Send + 'a,
K: AsRef<[u8]> + Send + Sync + 'a,
{
#[inline]
fn get(self, map: &'a Arc<super::Map>) -> impl Stream<Item = Result<Handle<'_>>> + Send + 'a {
map.get_batch(self)
}
}
#[implement(super::Map)]
#[tracing::instrument(skip(self, keys), level = "trace")]
pub fn get_batch<'a, S, K>(
pub(crate) fn get_batch<'a, S, K>(
self: &'a Arc<Self>,
keys: S,
) -> impl Stream<Item = Result<Handle<'_>>> + Send + 'a
+1 -1
View File
@@ -22,7 +22,7 @@ where
pub fn raw_keys(self: &Arc<Self>) -> impl Stream<Item = Result<Key<'_>>> + Send {
use crate::pool::Seek;
let opts = super::iter_options_default();
let opts = super::iter_options_default(&self.db);
let state = stream::State::new(self, opts);
if is_cached(self) {
let state = state.init_fwd(None);
+1 -1
View File
@@ -53,7 +53,7 @@ where
{
use crate::pool::Seek;
let opts = super::iter_options_default();
let opts = super::iter_options_default(&self.db);
let state = stream::State::new(self, opts);
if is_cached(self, from) {
return stream::Keys::<'_>::from(state.init_fwd(from.as_ref().into())).boxed();
+1 -4
View File
@@ -30,8 +30,5 @@ pub(super) fn open(db: &Arc<Engine>, name: &str) -> Arc<ColumnFamily> {
// lifetime parameter. We should not hold this handle, even in its Arc, after
// closing the database (dropping `Engine`). Since `Arc<Engine>` is a sibling
// member along with this handle in `Map`, that is prevented.
unsafe {
Arc::increment_strong_count(cf_ptr);
Arc::from_raw(cf_ptr)
}
unsafe { Arc::from_raw(cf_ptr) }
}
+29 -21
View File
@@ -1,35 +1,43 @@
use std::sync::Arc;
use rocksdb::{ReadOptions, ReadTier, WriteOptions};
#[inline]
pub(crate) fn iter_options_default() -> ReadOptions {
let mut options = read_options_default();
options.set_background_purge_on_iterator_cleanup(true);
//options.set_pin_data(true);
options
}
use crate::Engine;
#[inline]
pub(crate) fn cache_iter_options_default() -> ReadOptions {
let mut options = cache_read_options_default();
options.set_background_purge_on_iterator_cleanup(true);
//options.set_pin_data(true);
options
}
#[inline]
pub(crate) fn cache_read_options_default() -> ReadOptions {
let mut options = read_options_default();
pub(crate) fn cache_iter_options_default(db: &Arc<Engine>) -> ReadOptions {
let mut options = iter_options_default(db);
options.set_read_tier(ReadTier::BlockCache);
options.fill_cache(false);
options
}
#[inline]
pub(crate) fn read_options_default() -> ReadOptions {
let mut options = ReadOptions::default();
options.set_total_order_seek(true);
pub(crate) fn iter_options_default(db: &Arc<Engine>) -> ReadOptions {
let mut options = read_options_default(db);
options.set_background_purge_on_iterator_cleanup(true);
options
}
#[inline]
pub(crate) fn write_options_default() -> WriteOptions { WriteOptions::default() }
pub(crate) fn cache_read_options_default(db: &Arc<Engine>) -> ReadOptions {
let mut options = read_options_default(db);
options.set_read_tier(ReadTier::BlockCache);
options.fill_cache(false);
options
}
#[inline]
pub(crate) fn read_options_default(db: &Arc<Engine>) -> ReadOptions {
let mut options = ReadOptions::default();
options.set_total_order_seek(true);
if !db.checksums {
options.set_verify_checksums(false);
}
options
}
#[inline]
pub(crate) fn write_options_default(_db: &Arc<Engine>) -> WriteOptions { WriteOptions::default() }
+54
View File
@@ -0,0 +1,54 @@
use std::{convert::AsRef, fmt::Debug, io::Write, sync::Arc};
use arrayvec::ArrayVec;
use conduwuit::{implement, Result};
use futures::Future;
use serde::Serialize;
use crate::{keyval::KeyBuf, ser, Handle};
/// Fetch a value from the database into cache, returning a reference-handle
/// asynchronously. The key is serialized into an allocated buffer to perform
/// the query.
#[implement(super::Map)]
#[inline]
pub fn qry<K>(self: &Arc<Self>, key: &K) -> impl Future<Output = Result<Handle<'_>>> + Send
where
K: Serialize + ?Sized + Debug,
{
let mut buf = KeyBuf::new();
self.bqry(key, &mut buf)
}
/// Fetch a value from the database into cache, returning a reference-handle
/// asynchronously. The key is serialized into a fixed-sized buffer to perform
/// the query. The maximum size is supplied as const generic parameter.
#[implement(super::Map)]
#[inline]
pub fn aqry<const MAX: usize, K>(
self: &Arc<Self>,
key: &K,
) -> impl Future<Output = Result<Handle<'_>>> + Send
where
K: Serialize + ?Sized + Debug,
{
let mut buf = ArrayVec::<u8, MAX>::new();
self.bqry(key, &mut buf)
}
/// Fetch a value from the database into cache, returning a reference-handle
/// asynchronously. The key is serialized into a user-supplied Writer.
#[implement(super::Map)]
#[tracing::instrument(skip(self, buf), level = "trace")]
pub fn bqry<K, B>(
self: &Arc<Self>,
key: &K,
buf: &mut B,
) -> impl Future<Output = Result<Handle<'_>>> + Send
where
K: Serialize + ?Sized + Debug,
B: Write + AsRef<[u8]>,
{
let key = ser::serialize(buf, key).expect("failed to serialize query key");
self.get(key)
}
+63
View File
@@ -0,0 +1,63 @@
use std::{fmt::Debug, sync::Arc};
use conduwuit::{
implement,
utils::{
stream::{automatic_amplification, automatic_width, WidebandExt},
IterStream,
},
Result,
};
use futures::{Stream, StreamExt, TryStreamExt};
use serde::Serialize;
use crate::{keyval::KeyBuf, ser, Handle};
pub trait Qry<'a, K, S>
where
S: Stream<Item = K> + Send + 'a,
K: Serialize + Debug,
{
fn qry(self, map: &'a Arc<super::Map>) -> impl Stream<Item = Result<Handle<'_>>> + Send + 'a;
}
impl<'a, K, S> Qry<'a, K, S> for S
where
Self: 'a,
S: Stream<Item = K> + Send + 'a,
K: Serialize + Debug + 'a,
{
#[inline]
fn qry(self, map: &'a Arc<super::Map>) -> impl Stream<Item = Result<Handle<'_>>> + Send + 'a {
map.qry_batch(self)
}
}
#[implement(super::Map)]
#[tracing::instrument(skip(self, keys), level = "trace")]
pub(crate) fn qry_batch<'a, S, K>(
self: &'a Arc<Self>,
keys: S,
) -> impl Stream<Item = Result<Handle<'_>>> + Send + 'a
where
S: Stream<Item = K> + Send + 'a,
K: Serialize + Debug + 'a,
{
use crate::pool::Get;
keys.ready_chunks(automatic_amplification())
.widen_then(automatic_width(), |chunk| {
let keys = chunk
.iter()
.map(ser::serialize_to::<KeyBuf, _>)
.map(|result| result.expect("failed to serialize query key"))
.map(Into::into)
.collect();
self.db
.pool
.execute_get(Get { map: self.clone(), key: keys, res: None })
})
.map_ok(|results| results.into_iter().stream())
.try_flatten()
}
+1 -1
View File
@@ -22,7 +22,7 @@ where
pub fn rev_raw_keys(self: &Arc<Self>) -> impl Stream<Item = Result<Key<'_>>> + Send {
use crate::pool::Seek;
let opts = super::iter_options_default();
let opts = super::iter_options_default(&self.db);
let state = stream::State::new(self, opts);
if is_cached(self) {
let state = state.init_rev(None);
+1 -1
View File
@@ -61,7 +61,7 @@ where
{
use crate::pool::Seek;
let opts = super::iter_options_default();
let opts = super::iter_options_default(&self.db);
let state = stream::State::new(self, opts);
if is_cached(self, from) {
return stream::KeysRev::<'_>::from(state.init_rev(from.as_ref().into())).boxed();
+2 -2
View File
@@ -31,7 +31,7 @@ where
pub fn rev_raw_stream(self: &Arc<Self>) -> impl Stream<Item = Result<KeyVal<'_>>> + Send {
use crate::pool::Seek;
let opts = super::iter_options_default();
let opts = super::iter_options_default(&self.db);
let state = stream::State::new(self, opts);
if is_cached(self) {
let state = state.init_rev(None);
@@ -66,7 +66,7 @@ pub fn rev_raw_stream(self: &Arc<Self>) -> impl Stream<Item = Result<KeyVal<'_>>
fields(%map),
)]
pub(super) fn is_cached(map: &Arc<super::Map>) -> bool {
let opts = super::cache_iter_options_default();
let opts = super::cache_iter_options_default(&map.db);
let state = stream::State::new(map, opts).init_rev(None);
!state.is_incomplete()
+2 -2
View File
@@ -80,7 +80,7 @@ where
{
use crate::pool::Seek;
let opts = super::iter_options_default();
let opts = super::iter_options_default(&self.db);
let state = stream::State::new(self, opts);
if is_cached(self, from) {
let state = state.init_rev(from.as_ref().into());
@@ -118,7 +118,7 @@ pub(super) fn is_cached<P>(map: &Arc<super::Map>, from: &P) -> bool
where
P: AsRef<[u8]> + ?Sized,
{
let cache_opts = super::cache_iter_options_default();
let cache_opts = super::cache_iter_options_default(&map.db);
let cache_status = stream::State::new(map, cache_opts)
.init_rev(from.as_ref().into())
.status();
+2 -2
View File
@@ -30,7 +30,7 @@ where
pub fn raw_stream(self: &Arc<Self>) -> impl Stream<Item = Result<KeyVal<'_>>> + Send {
use crate::pool::Seek;
let opts = super::iter_options_default();
let opts = super::iter_options_default(&self.db);
let state = stream::State::new(self, opts);
if is_cached(self) {
let state = state.init_fwd(None);
@@ -65,7 +65,7 @@ pub fn raw_stream(self: &Arc<Self>) -> impl Stream<Item = Result<KeyVal<'_>>> +
fields(%map),
)]
pub(super) fn is_cached(map: &Arc<super::Map>) -> bool {
let opts = super::cache_iter_options_default();
let opts = super::cache_iter_options_default(&map.db);
let state = stream::State::new(map, opts).init_fwd(None);
!state.is_incomplete()
+2 -2
View File
@@ -77,7 +77,7 @@ where
{
use crate::pool::Seek;
let opts = super::iter_options_default();
let opts = super::iter_options_default(&self.db);
let state = stream::State::new(self, opts);
if is_cached(self, from) {
let state = state.init_fwd(from.as_ref().into());
@@ -115,7 +115,7 @@ pub(super) fn is_cached<P>(map: &Arc<super::Map>, from: &P) -> bool
where
P: AsRef<[u8]> + ?Sized,
{
let opts = super::cache_iter_options_default();
let opts = super::cache_iter_options_default(&map.db);
let state = stream::State::new(map, opts).init_fwd(from.as_ref().into());
!state.is_incomplete()
+1 -1
View File
@@ -30,7 +30,7 @@ pub use self::{
deserialized::Deserialized,
handle::Handle,
keyval::{serialize_key, serialize_val, KeyVal, Slice},
map::{compact, Map},
map::{compact, Get, Map, Qry},
ser::{serialize, serialize_to, serialize_to_vec, Cbor, Interfix, Json, Separator, SEP},
};
pub(crate) use self::{
+6 -3
View File
@@ -13,7 +13,7 @@ use std::{
use async_channel::{QueueStrategy, Receiver, RecvError, Sender};
use conduwuit::{
debug, debug_warn, err, error, implement,
result::{DebugInspect, LogDebugErr},
result::DebugInspect,
trace,
utils::sys::compute::{get_affinity, nth_core_available, set_affinity},
Error, Result, Server,
@@ -290,9 +290,12 @@ fn worker_init(&self, id: usize) {
// affinity is empty (no-op) if there's only one queue
set_affinity(affinity.clone());
#[cfg(feature = "jemalloc")]
#[cfg(all(not(target_env = "msvc"), feature = "jemalloc"))]
if affinity.clone().count() == 1 && conduwuit::alloc::je::is_affine_arena() {
use conduwuit::alloc::je::this_thread::{arena_id, set_arena};
use conduwuit::{
alloc::je::this_thread::{arena_id, set_arena},
result::LogDebugErr,
};
let id = affinity.clone().next().expect("at least one id");
+158 -1
View File
@@ -3,7 +3,7 @@
use std::fmt::Debug;
use arrayvec::ArrayVec;
use conduwuit::ruma::{serde::Raw, RoomId, UserId};
use conduwuit::ruma::{serde::Raw, EventId, RoomId, UserId};
use serde::Serialize;
use crate::{
@@ -389,3 +389,160 @@ fn de_complex() {
assert_eq!(arr, key, "deserialization of serialization does not match");
}
#[test]
fn serde_tuple_option_value_some() {
let room_id: &RoomId = "!room:example.com".try_into().unwrap();
let user_id: &UserId = "@user:example.com".try_into().unwrap();
let mut aa = Vec::<u8>::new();
aa.extend_from_slice(room_id.as_bytes());
aa.push(0xFF);
aa.extend_from_slice(user_id.as_bytes());
let bb: (&RoomId, Option<&UserId>) = (room_id, Some(user_id));
let bbs = serialize_to_vec(&bb).expect("failed to serialize tuple");
assert_eq!(aa, bbs);
let cc: (&RoomId, Option<&UserId>) =
de::from_slice(&bbs).expect("failed to deserialize tuple");
assert_eq!(bb.1, cc.1);
assert_eq!(cc.0, bb.0);
}
#[test]
fn serde_tuple_option_value_none() {
let room_id: &RoomId = "!room:example.com".try_into().unwrap();
let mut aa = Vec::<u8>::new();
aa.extend_from_slice(room_id.as_bytes());
aa.push(0xFF);
let bb: (&RoomId, Option<&UserId>) = (room_id, None);
let bbs = serialize_to_vec(&bb).expect("failed to serialize tuple");
assert_eq!(aa, bbs);
let cc: (&RoomId, Option<&UserId>) =
de::from_slice(&bbs).expect("failed to deserialize tuple");
assert_eq!(None, cc.1);
assert_eq!(cc.0, bb.0);
}
#[test]
fn serde_tuple_option_none_value() {
let user_id: &UserId = "@user:example.com".try_into().unwrap();
let mut aa = Vec::<u8>::new();
aa.push(0xFF);
aa.extend_from_slice(user_id.as_bytes());
let bb: (Option<&RoomId>, &UserId) = (None, user_id);
let bbs = serialize_to_vec(&bb).expect("failed to serialize tuple");
assert_eq!(aa, bbs);
let cc: (Option<&RoomId>, &UserId) =
de::from_slice(&bbs).expect("failed to deserialize tuple");
assert_eq!(None, cc.0);
assert_eq!(cc.1, bb.1);
}
#[test]
fn serde_tuple_option_some_value() {
let room_id: &RoomId = "!room:example.com".try_into().unwrap();
let user_id: &UserId = "@user:example.com".try_into().unwrap();
let mut aa = Vec::<u8>::new();
aa.extend_from_slice(room_id.as_bytes());
aa.push(0xFF);
aa.extend_from_slice(user_id.as_bytes());
let bb: (Option<&RoomId>, &UserId) = (Some(room_id), user_id);
let bbs = serialize_to_vec(&bb).expect("failed to serialize tuple");
assert_eq!(aa, bbs);
let cc: (Option<&RoomId>, &UserId) =
de::from_slice(&bbs).expect("failed to deserialize tuple");
assert_eq!(bb.0, cc.0);
assert_eq!(cc.1, bb.1);
}
#[test]
fn serde_tuple_option_some_some() {
let room_id: &RoomId = "!room:example.com".try_into().unwrap();
let user_id: &UserId = "@user:example.com".try_into().unwrap();
let mut aa = Vec::<u8>::new();
aa.extend_from_slice(room_id.as_bytes());
aa.push(0xFF);
aa.extend_from_slice(user_id.as_bytes());
let bb: (Option<&RoomId>, Option<&UserId>) = (Some(room_id), Some(user_id));
let bbs = serialize_to_vec(&bb).expect("failed to serialize tuple");
assert_eq!(aa, bbs);
let cc: (Option<&RoomId>, Option<&UserId>) =
de::from_slice(&bbs).expect("failed to deserialize tuple");
assert_eq!(cc.0, bb.0);
assert_eq!(bb.1, cc.1);
}
#[test]
fn serde_tuple_option_none_none() {
let aa = vec![0xFF];
let bb: (Option<&RoomId>, Option<&UserId>) = (None, None);
let bbs = serialize_to_vec(&bb).expect("failed to serialize tuple");
assert_eq!(aa, bbs);
let cc: (Option<&RoomId>, Option<&UserId>) =
de::from_slice(&bbs).expect("failed to deserialize tuple");
assert_eq!(cc.0, bb.0);
assert_eq!(None, cc.1);
}
#[test]
fn serde_tuple_option_some_none_some() {
let room_id: &RoomId = "!room:example.com".try_into().unwrap();
let user_id: &UserId = "@user:example.com".try_into().unwrap();
let mut aa = Vec::<u8>::new();
aa.extend_from_slice(room_id.as_bytes());
aa.push(0xFF);
aa.push(0xFF);
aa.extend_from_slice(user_id.as_bytes());
let bb: (Option<&RoomId>, Option<&EventId>, Option<&UserId>) =
(Some(room_id), None, Some(user_id));
let bbs = serialize_to_vec(&bb).expect("failed to serialize tuple");
assert_eq!(aa, bbs);
let cc: (Option<&RoomId>, Option<&EventId>, Option<&UserId>) =
de::from_slice(&bbs).expect("failed to deserialize tuple");
assert_eq!(bb.0, cc.0);
assert_eq!(None, cc.1);
assert_eq!(bb.1, cc.1);
assert_eq!(bb.2, cc.2);
}
#[test]
fn serde_tuple_option_none_none_none() {
let aa = vec![0xFF, 0xFF];
let bb: (Option<&RoomId>, Option<&EventId>, Option<&UserId>) = (None, None, None);
let bbs = serialize_to_vec(&bb).expect("failed to serialize tuple");
assert_eq!(aa, bbs);
let cc: (Option<&RoomId>, Option<&EventId>, Option<&UserId>) =
de::from_slice(&bbs).expect("failed to deserialize tuple");
assert_eq!(None, cc.0);
assert_eq!(bb, cc);
}
+3
View File
@@ -49,6 +49,9 @@ default = [
"zstd_compression",
]
blurhashing = [
"conduwuit-service/blurhashing",
]
brotli_compression = [
"conduwuit-api/brotli_compression",
"conduwuit-core/brotli_compression",
+2 -2
View File
@@ -3,7 +3,7 @@ use std::sync::Arc;
use conduwuit::{
config::Config,
debug_warn, err,
log::{capture, fmt_span, ConsoleFormat, LogLevelReloadHandles},
log::{capture, fmt_span, ConsoleFormat, ConsoleWriter, LogLevelReloadHandles},
result::UnwrapOrErr,
Result,
};
@@ -30,7 +30,7 @@ pub(crate) fn init(
.with_span_events(console_span_events)
.event_format(ConsoleFormat::new(config))
.fmt_fields(ConsoleFormat::new(config))
.map_writer(|w| w);
.with_writer(ConsoleWriter::new(config));
let (console_reload_filter, console_reload_handle) =
reload::Layer::new(console_filter.clone());
+8 -9
View File
@@ -8,13 +8,11 @@ use std::{
time::Duration,
};
#[cfg(all(not(target_env = "msvc"), feature = "jemalloc"))]
use conduwuit::result::LogDebugErr;
use conduwuit::{
is_true,
result::LogDebugErr,
utils::{
available_parallelism,
sys::compute::{nth_core_available, set_affinity},
},
utils::sys::compute::{nth_core_available, set_affinity},
Result,
};
use tokio::runtime::Builder;
@@ -25,6 +23,7 @@ const WORKER_NAME: &str = "conduwuit:worker";
const WORKER_MIN: usize = 2;
const WORKER_KEEPALIVE: u64 = 36;
const MAX_BLOCKING_THREADS: usize = 1024;
#[cfg(all(not(target_env = "msvc"), feature = "jemalloc"))]
const DISABLE_MUZZY_THRESHOLD: usize = 4;
static WORKER_AFFINITY: OnceLock<bool> = OnceLock::new();
@@ -122,7 +121,7 @@ fn set_worker_affinity() {
set_worker_mallctl(id);
}
#[cfg(feature = "jemalloc")]
#[cfg(all(not(target_env = "msvc"), feature = "jemalloc"))]
fn set_worker_mallctl(id: usize) {
use conduwuit::alloc::je::{
is_affine_arena,
@@ -137,13 +136,13 @@ fn set_worker_mallctl(id: usize) {
.get()
.expect("GC_MUZZY initialized by runtime::new()");
let muzzy_auto_disable = available_parallelism() >= DISABLE_MUZZY_THRESHOLD;
let muzzy_auto_disable = conduwuit::utils::available_parallelism() >= DISABLE_MUZZY_THRESHOLD;
if matches!(muzzy_option, Some(false) | None if muzzy_auto_disable) {
set_muzzy_decay(-1).log_debug_err().ok();
}
}
#[cfg(not(feature = "jemalloc"))]
#[cfg(any(not(feature = "jemalloc"), target_env = "msvc"))]
fn set_worker_mallctl(_: usize) {}
#[tracing::instrument(
@@ -189,7 +188,7 @@ fn thread_park() {
}
fn gc_on_park() {
#[cfg(feature = "jemalloc")]
#[cfg(all(not(target_env = "msvc"), feature = "jemalloc"))]
conduwuit::alloc::je::this_thread::decay()
.log_debug_err()
.ok();
+3 -3
View File
@@ -46,14 +46,14 @@ impl Server {
.and_then(|raw| crate::clap::update(raw, args))
.and_then(|raw| Config::new(&raw))?;
#[cfg(feature = "sentry_telemetry")]
let sentry_guard = crate::sentry::init(&config);
let (tracing_reload_handle, tracing_flame_guard, capture) =
crate::logging::init(&config)?;
config.check()?;
#[cfg(feature = "sentry_telemetry")]
let sentry_guard = crate::sentry::init(&config);
#[cfg(unix)]
sys::maximize_fd_limit()
.expect("Unable to increase maximum soft and hard file descriptor limit");
+4
View File
@@ -16,6 +16,8 @@ pub(super) async fn signal(server: Arc<Server>) {
let mut quit = unix::signal(SignalKind::quit()).expect("SIGQUIT handler");
let mut term = unix::signal(SignalKind::terminate()).expect("SIGTERM handler");
let mut usr1 = unix::signal(SignalKind::user_defined1()).expect("SIGUSR1 handler");
let mut usr2 = unix::signal(SignalKind::user_defined2()).expect("SIGUSR2 handler");
loop {
trace!("Installed signal handlers");
let sig: &'static str;
@@ -23,6 +25,8 @@ pub(super) async fn signal(server: Arc<Server>) {
_ = signal::ctrl_c() => { sig = "SIGINT"; },
_ = quit.recv() => { sig = "SIGQUIT"; },
_ = term.recv() => { sig = "SIGTERM"; },
_ = usr1.recv() => { sig = "SIGUSR1"; },
_ = usr2.recv() => { sig = "SIGUSR2"; },
}
warn!("Received {sig}");
+32 -23
View File
@@ -5,7 +5,7 @@ use axum::{
Router,
};
use axum_client_ip::SecureClientIpSource;
use conduwuit::{error, Result, Server};
use conduwuit::{debug, error, Result, Server};
use conduwuit_api::router::state::Guard;
use conduwuit_service::Services;
use http::{
@@ -18,6 +18,7 @@ use tower_http::{
cors::{self, CorsLayer},
sensitive_headers::SetSensitiveHeadersLayer,
set_header::SetResponseHeaderLayer,
timeout::{RequestBodyTimeoutLayer, ResponseBodyTimeoutLayer, TimeoutLayer},
trace::{DefaultOnFailure, DefaultOnRequest, DefaultOnResponse, TraceLayer},
};
use tracing::Level;
@@ -48,9 +49,9 @@ pub(crate) fn build(services: &Arc<Services>) -> Result<(Router, Guard)> {
))]
let layers = layers.layer(compression_layer(server));
let services_ = services.clone();
let layers = layers
.layer(SetSensitiveHeadersLayer::new([header::AUTHORIZATION]))
.layer(axum::middleware::from_fn_with_state(Arc::clone(services), request::spawn))
.layer(
TraceLayer::new_for_http()
.make_span_with(tracing_span::<_>)
@@ -60,6 +61,9 @@ pub(crate) fn build(services: &Arc<Services>) -> Result<(Router, Guard)> {
)
.layer(axum::middleware::from_fn_with_state(Arc::clone(services), request::handle))
.layer(SecureClientIpSource::ConnectInfo.into_extension())
.layer(ResponseBodyTimeoutLayer::new(Duration::from_secs(server.config.client_response_timeout)))
.layer(RequestBodyTimeoutLayer::new(Duration::from_secs(server.config.client_receive_timeout)))
.layer(TimeoutLayer::new(Duration::from_secs(server.config.client_request_timeout)))
.layer(SetResponseHeaderLayer::if_not_present(
HeaderName::from_static("origin-agent-cluster"), // https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Origin-Agent-Cluster
HeaderValue::from_static("?1"),
@@ -86,7 +90,7 @@ pub(crate) fn build(services: &Arc<Services>) -> Result<(Router, Guard)> {
))
.layer(cors_layer(server))
.layer(body_limit_layer(server))
.layer(CatchPanicLayer::custom(catch_panic));
.layer(CatchPanicLayer::custom(move |panic| catch_panic(panic, services_.clone())));
let (router, guard) = router::build(services);
Ok((router.layer(layers), guard))
@@ -164,15 +168,14 @@ fn body_limit_layer(server: &Server) -> DefaultBodyLimit {
#[allow(clippy::needless_pass_by_value)]
fn catch_panic(
err: Box<dyn Any + Send + 'static>,
services: Arc<Services>,
) -> http::Response<http_body_util::Full<bytes::Bytes>> {
//TODO: XXX
/*
conduwuit_service::services()
.server
.metrics
.requests_panic
.fetch_add(1, std::sync::atomic::Ordering::Release);
*/
services
.server
.metrics
.requests_panic
.fetch_add(1, std::sync::atomic::Ordering::Release);
let details = if let Some(s) = err.downcast_ref::<String>() {
s.clone()
} else if let Some(s) = err.downcast_ref::<&str>() {
@@ -196,20 +199,26 @@ fn catch_panic(
}
fn tracing_span<T>(request: &http::Request<T>) -> tracing::Span {
let path = request.extensions().get::<MatchedPath>().map_or_else(
|| {
request
.uri()
.path_and_query()
.expect("all requests have a path")
.as_str()
},
truncated_matched_path,
);
let path = request
.extensions()
.get::<MatchedPath>()
.map_or_else(|| request_path_str(request), truncated_matched_path);
let method = request.method();
tracing::span! {
parent: None,
debug::INFO_SPAN_LEVEL,
"router",
method = %request.method(),
%path,
}
}
tracing::debug_span!(parent: None, "router", %method, %path)
fn request_path_str<T>(request: &http::Request<T>) -> &str {
request
.uri()
.path_and_query()
.expect("all requests have a path")
.as_str()
}
fn truncated_matched_path(path: &MatchedPath) -> &str {
+79 -72
View File
@@ -1,4 +1,8 @@
use std::sync::{atomic::Ordering, Arc};
use std::{
fmt::Debug,
sync::{atomic::Ordering, Arc},
time::Duration,
};
use axum::{
extract::State,
@@ -6,82 +10,18 @@ use axum::{
};
use conduwuit::{debug, debug_error, debug_warn, err, error, trace, Result};
use conduwuit_service::Services;
use futures::FutureExt;
use http::{Method, StatusCode, Uri};
use tokio::time::sleep;
use tracing::Span;
#[tracing::instrument(
parent = None,
level = "trace",
skip_all,
fields(
handled = %services
.server
.metrics
.requests_spawn_finished
.fetch_add(1, Ordering::Relaxed),
active = %services
.server
.metrics
.requests_spawn_active
.fetch_add(1, Ordering::Relaxed),
)
)]
pub(crate) async fn spawn(
State(services): State<Arc<Services>>,
req: http::Request<axum::body::Body>,
next: axum::middleware::Next,
) -> Result<Response, StatusCode> {
let server = &services.server;
#[cfg(debug_assertions)]
conduwuit::defer! {{
_ = server
.metrics
.requests_spawn_active
.fetch_sub(1, Ordering::Relaxed);
}};
if !server.running() {
debug_warn!("unavailable pending shutdown");
return Err(StatusCode::SERVICE_UNAVAILABLE);
}
let fut = next.run(req);
let task = server.runtime().spawn(fut);
task.await.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)
}
#[tracing::instrument(
level = "debug",
skip_all,
fields(
handled = %services
.server
.metrics
.requests_handle_finished
.fetch_add(1, Ordering::Relaxed),
active = %services
.server
.metrics
.requests_handle_active
.fetch_add(1, Ordering::Relaxed),
)
)]
#[tracing::instrument(name = "request", level = "debug", skip_all)]
pub(crate) async fn handle(
State(services): State<Arc<Services>>,
req: http::Request<axum::body::Body>,
next: axum::middleware::Next,
) -> Result<Response, StatusCode> {
let server = &services.server;
#[cfg(debug_assertions)]
conduwuit::defer! {{
_ = server
.metrics
.requests_handle_active
.fetch_sub(1, Ordering::Relaxed);
}};
if !server.running() {
if !services.server.running() {
debug_warn!(
method = %req.method(),
uri = %req.uri(),
@@ -93,14 +33,74 @@ pub(crate) async fn handle(
let uri = req.uri().clone();
let method = req.method().clone();
let result = next.run(req).await;
handle_result(&method, &uri, result)
let services_ = services.clone();
let parent = Span::current();
let task = services.server.runtime().spawn(async move {
tokio::select! {
response = execute(&services_, req, next, parent) => response,
response = services_.server.until_shutdown()
.then(|()| {
let timeout = services_.server.config.client_shutdown_timeout;
let timeout = Duration::from_secs(timeout);
sleep(timeout)
})
.map(|()| StatusCode::SERVICE_UNAVAILABLE)
.map(IntoResponse::into_response) => response,
}
});
task.await
.map_err(unhandled)
.and_then(move |result| handle_result(&method, &uri, result))
}
#[tracing::instrument(
name = "handle",
level = "debug",
parent = parent,
skip_all,
fields(
active = %services
.server
.metrics
.requests_handle_active
.fetch_add(1, Ordering::Relaxed),
handled = %services
.server
.metrics
.requests_handle_finished
.load(Ordering::Relaxed),
)
)]
async fn execute(
// we made a safety contract that Services will not go out of scope
// during the request; this ensures a reference is accounted for at
// the base frame of the task regardless of its detachment.
services: &Arc<Services>,
req: http::Request<axum::body::Body>,
next: axum::middleware::Next,
parent: Span,
) -> Response {
#[cfg(debug_assertions)]
conduwuit::defer! {{
_ = services.server
.metrics
.requests_handle_finished
.fetch_add(1, Ordering::Relaxed);
_ = services.server
.metrics
.requests_handle_active
.fetch_sub(1, Ordering::Relaxed);
}};
next.run(req).await
}
fn handle_result(method: &Method, uri: &Uri, result: Response) -> Result<Response, StatusCode> {
let status = result.status();
let reason = status.canonical_reason().unwrap_or("Unknown Reason");
let code = status.as_u16();
if status.is_server_error() {
error!(method = ?method, uri = ?uri, "{code} {reason}");
} else if status.is_client_error() {
@@ -117,3 +117,10 @@ fn handle_result(method: &Method, uri: &Uri, result: Response) -> Result<Respons
Ok(result)
}
#[cold]
fn unhandled<Error: Debug>(e: Error) -> StatusCode {
error!("unhandled error or panic during request: {e:?}");
StatusCode::INTERNAL_SERVER_ERROR
}
+9 -26
View File
@@ -9,6 +9,7 @@ use std::{
use axum_server::Handle as ServerHandle;
use conduwuit::{debug, debug_error, debug_info, error, info, Error, Result, Server};
use futures::FutureExt;
use service::Services;
use tokio::{
sync::broadcast::{self, Sender},
@@ -99,46 +100,28 @@ pub(crate) async fn stop(services: Arc<Services>) -> Result<()> {
);
}
#[cfg(all(feature = "systemd", target_os = "linux"))]
sd_notify::notify(true, &[sd_notify::NotifyState::Stopping])
.expect("failed to notify systemd of stopping state");
info!("Shutdown complete.");
Ok(())
}
#[tracing::instrument(skip_all)]
async fn signal(server: Arc<Server>, tx: Sender<()>, handle: axum_server::Handle) {
loop {
let sig: &'static str = server
.signal
.subscribe()
.recv()
.await
.expect("channel error");
if !server.running() {
handle_shutdown(&server, &tx, &handle, sig).await;
break;
}
}
server
.clone()
.until_shutdown()
.then(move |()| handle_shutdown(server, tx, handle))
.await;
}
async fn handle_shutdown(
server: &Arc<Server>,
tx: &Sender<()>,
handle: &axum_server::Handle,
sig: &str,
) {
debug!("Received signal {sig}");
async fn handle_shutdown(server: Arc<Server>, tx: Sender<()>, handle: axum_server::Handle) {
if let Err(e) = tx.send(()) {
error!("failed sending shutdown transaction to channel: {e}");
}
let timeout = Duration::from_secs(36);
let timeout = server.config.client_shutdown_timeout;
let timeout = Duration::from_secs(timeout);
debug!(
?timeout,
spawn_active = ?server.metrics.requests_spawn_active.load(Ordering::Relaxed),
handle_active = ?server.metrics.requests_handle_active.load(Ordering::Relaxed),
"Notifying for graceful shutdown"
);
-7
View File
@@ -24,27 +24,20 @@ pub(super) async fn serve(
info!("Listening on {addrs:?}");
while join_set.join_next().await.is_some() {}
let spawn_active = server.metrics.requests_spawn_active.load(Ordering::Relaxed);
let handle_active = server
.metrics
.requests_handle_active
.load(Ordering::Relaxed);
debug_info!(
spawn_finished = server
.metrics
.requests_spawn_finished
.load(Ordering::Relaxed),
handle_finished = server
.metrics
.requests_handle_finished
.load(Ordering::Relaxed),
panics = server.metrics.requests_panic.load(Ordering::Relaxed),
spawn_active,
handle_active,
"Stopped listening on {addrs:?}",
);
debug_assert!(spawn_active == 0, "active request tasks are not joined");
debug_assert!(handle_active == 0, "active request handles still pending");
Ok(())
+4 -5
View File
@@ -17,14 +17,13 @@ pub(super) async fn serve(
addrs: Vec<SocketAddr>,
) -> Result {
let tls = &server.config.tls;
let certs = tls
.certs
.as_ref()
.ok_or(err!(Config("tls.certs", "Missing required value in tls config section")))?;
let certs = tls.certs.as_ref().ok_or_else(|| {
err!(Config("tls.certs", "Missing required value in tls config section"))
})?;
let key = tls
.key
.as_ref()
.ok_or(err!(Config("tls.key", "Missing required value in tls config section")))?;
.ok_or_else(|| err!(Config("tls.key", "Missing required value in tls config section")))?;
// we use ring for ruma and hashing state, but aws-lc-rs is the new default.
// without this, TLS mode will panic.
+6 -1
View File
@@ -159,7 +159,12 @@ async fn fini(server: &Arc<Server>, listener: UnixListener, mut tasks: JoinSet<(
drop(listener);
debug!("Waiting for requests to finish...");
while server.metrics.requests_spawn_active.load(Ordering::Relaxed) > 0 {
while server
.metrics
.requests_handle_active
.load(Ordering::Relaxed)
.gt(&0)
{
tokio::select! {
task = tasks.join_next() => if task.is_none() { break; },
() = sleep(FINI_POLL_INTERVAL) => {},
+4
View File
@@ -44,6 +44,7 @@ url_preview = [
zstd_compression = [
"reqwest/zstd",
]
blurhashing = ["dep:image","dep:blurhash"]
[dependencies]
arrayvec.workspace = true
@@ -74,6 +75,7 @@ serde_json.workspace = true
serde.workspace = true
serde_yaml.workspace = true
sha2.workspace = true
smallvec.workspace = true
termimad.workspace = true
termimad.optional = true
tokio.workspace = true
@@ -81,6 +83,8 @@ tracing.workspace = true
url.workspace = true
webpage.workspace = true
webpage.optional = true
blurhash.workspace = true
blurhash.optional = true
[lints]
workspace = true
+7 -5
View File
@@ -5,7 +5,7 @@ use conduwuit::{
utils::{result::LogErr, stream::TryIgnore, ReadyExt},
Err, Result,
};
use database::{Deserialized, Handle, Interfix, Json, Map};
use database::{Deserialized, Handle, Ignore, Json, Map};
use futures::{Stream, StreamExt, TryFutureExt};
use ruma::{
events::{
@@ -131,18 +131,20 @@ pub fn changes_since<'a>(
room_id: Option<&'a RoomId>,
user_id: &'a UserId,
since: u64,
to: Option<u64>,
) -> impl Stream<Item = AnyRawAccountDataEvent> + Send + 'a {
let prefix = (room_id, user_id, Interfix);
let prefix = database::serialize_key(prefix).expect("failed to serialize prefix");
type Key<'a> = (Option<&'a RoomId>, &'a UserId, u64, Ignore);
// Skip the data that's exactly at since, because we sent that last time
let first_possible = (room_id, user_id, since.saturating_add(1));
self.db
.roomuserdataid_accountdata
.stream_from_raw(&first_possible)
.stream_from(&first_possible)
.ignore_err()
.ready_take_while(move |(k, _)| k.starts_with(&prefix))
.ready_take_while(move |((room_id_, user_id_, count, _), _): &(Key<'_>, _)| {
room_id == *room_id_ && user_id == *user_id_ && to.is_none_or(|to| *count <= to)
})
.map(move |(_, v)| {
match room_id {
| Some(_) => serde_json::from_slice::<Raw<AnyRoomAccountDataEvent>>(v)
+3 -2
View File
@@ -1,10 +1,11 @@
#![cfg(feature = "console")]
use std::{
collections::VecDeque,
sync::{Arc, Mutex},
};
use conduwuit::{debug, defer, error, log, Server};
use conduwuit::{debug, defer, error, log, log::is_systemd_mode, Server};
use futures::future::{AbortHandle, Abortable};
use ruma::events::room::message::RoomMessageEventContent;
use rustyline_async::{Readline, ReadlineError, ReadlineEvent};
@@ -123,7 +124,7 @@ impl Console {
}
async fn readline(self: &Arc<Self>) -> Result<ReadlineEvent, ReadlineError> {
let _suppression = log::Suppress::new(&self.server);
let _suppression = (!is_systemd_mode()).then(|| log::Suppress::new(&self.server));
let (mut readline, _writer) = Readline::new(PROMPT.to_owned())?;
let self_ = Arc::clone(self);
@@ -2,6 +2,8 @@ use conduwuit::{debug, debug_info, error, implement, info, Err, Result};
use ruma::events::room::message::RoomMessageEventContent;
use tokio::time::{sleep, Duration};
pub(super) const SIGNAL: &str = "SIGUSR2";
/// Possibly spawn the terminal console at startup if configured.
#[implement(super::Service)]
pub(super) async fn console_auto_start(&self) {
@@ -22,7 +24,7 @@ pub(super) async fn console_auto_stop(&self) {
/// Execute admin commands after startup
#[implement(super::Service)]
pub(super) async fn startup_execute(&self) -> Result<()> {
pub(super) async fn startup_execute(&self) -> Result {
// List of comamnds to execute
let commands = &self.services.server.config.admin_execute;
@@ -36,7 +38,7 @@ pub(super) async fn startup_execute(&self) -> Result<()> {
sleep(Duration::from_millis(500)).await;
for (i, command) in commands.iter().enumerate() {
if let Err(e) = self.startup_execute_command(i, command.clone()).await {
if let Err(e) = self.execute_command(i, command.clone()).await {
if !errors {
return Err(e);
}
@@ -59,16 +61,38 @@ pub(super) async fn startup_execute(&self) -> Result<()> {
Ok(())
}
/// Execute one admin command after startup
/// Execute admin commands after signal
#[implement(super::Service)]
async fn startup_execute_command(&self, i: usize, command: String) -> Result<()> {
debug!("Startup command #{i}: executing {command:?}");
pub(super) async fn signal_execute(&self) -> Result {
// List of comamnds to execute
let commands = self.services.server.config.admin_signal_execute.clone();
// When true, errors are ignored and execution continues.
let ignore_errors = self.services.server.config.admin_execute_errors_ignore;
for (i, command) in commands.iter().enumerate() {
if let Err(e) = self.execute_command(i, command.clone()).await {
if !ignore_errors {
return Err(e);
}
}
tokio::task::yield_now().await;
}
Ok(())
}
/// Execute one admin command after startup or signal
#[implement(super::Service)]
async fn execute_command(&self, i: usize, command: String) -> Result {
debug!("Execute command #{i}: executing {command:?}");
match self.command_in_place(command, None).await {
| Ok(Some(output)) => Self::startup_command_output(i, &output),
| Err(output) => Self::startup_command_error(i, &output),
| Ok(Some(output)) => Self::execute_command_output(i, &output),
| Err(output) => Self::execute_command_error(i, &output),
| Ok(None) => {
info!("Startup command #{i} completed (no output).");
info!("Execute command #{i} completed (no output).");
Ok(())
},
}
@@ -76,28 +100,28 @@ async fn startup_execute_command(&self, i: usize, command: String) -> Result<()>
#[cfg(feature = "console")]
#[implement(super::Service)]
fn startup_command_output(i: usize, content: &RoomMessageEventContent) -> Result<()> {
debug_info!("Startup command #{i} completed:");
fn execute_command_output(i: usize, content: &RoomMessageEventContent) -> Result {
debug_info!("Execute command #{i} completed:");
super::console::print(content.body());
Ok(())
}
#[cfg(feature = "console")]
#[implement(super::Service)]
fn startup_command_error(i: usize, content: &RoomMessageEventContent) -> Result<()> {
fn execute_command_error(i: usize, content: &RoomMessageEventContent) -> Result {
super::console::print_err(content.body());
Err!(debug_error!("Startup command #{i} failed."))
Err!(debug_error!("Execute command #{i} failed."))
}
#[cfg(not(feature = "console"))]
#[implement(super::Service)]
fn startup_command_output(i: usize, content: &RoomMessageEventContent) -> Result<()> {
info!("Startup command #{i} completed:\n{:#}", content.body());
fn execute_command_output(i: usize, content: &RoomMessageEventContent) -> Result {
info!("Execute command #{i} completed:\n{:#}", content.body());
Ok(())
}
#[cfg(not(feature = "console"))]
#[implement(super::Service)]
fn startup_command_error(i: usize, content: &RoomMessageEventContent) -> Result<()> {
Err!(error!("Startup command #{i} failed:\n{:#}", content.body()))
fn execute_command_error(i: usize, content: &RoomMessageEventContent) -> Result {
Err!(error!("Execute command #{i} failed:\n{:#}", content.body()))
}
+6 -2
View File
@@ -1,7 +1,7 @@
pub mod console;
mod create;
mod execute;
mod grant;
mod startup;
use std::{
future::Future,
@@ -183,7 +183,11 @@ impl Service {
.map(|complete| complete(command))
}
async fn handle_signal(&self, #[allow(unused_variables)] sig: &'static str) {
async fn handle_signal(&self, sig: &'static str) {
if sig == execute::SIGNAL {
self.signal_execute().await.ok();
}
#[cfg(feature = "console")]
self.console.handle_signal(sig).await;
}
+70
View File
@@ -0,0 +1,70 @@
use std::{iter, ops::Deref, path::Path, sync::Arc};
use async_trait::async_trait;
use conduwuit::{
config::{check, Config},
error, implement, Result, Server,
};
pub struct Service {
server: Arc<Server>,
}
const SIGNAL: &str = "SIGUSR1";
#[async_trait]
impl crate::Service for Service {
fn build(args: crate::Args<'_>) -> Result<Arc<Self>> {
Ok(Arc::new(Self { server: args.server.clone() }))
}
async fn worker(self: Arc<Self>) -> Result {
while self.server.running() {
if self.server.signal.subscribe().recv().await == Ok(SIGNAL) {
if let Err(e) = self.handle_reload() {
error!("Failed to reload config: {e}");
}
}
}
Ok(())
}
fn name(&self) -> &str { crate::service::make_name(std::module_path!()) }
}
impl Deref for Service {
type Target = Arc<Config>;
#[inline]
fn deref(&self) -> &Self::Target { &self.server.config }
}
#[implement(Service)]
fn handle_reload(&self) -> Result {
if self.server.config.config_reload_signal {
#[cfg(all(feature = "systemd", target_os = "linux"))]
sd_notify::notify(true, &[sd_notify::NotifyState::Reloading])
.expect("failed to notify systemd of reloading state");
self.reload(iter::empty())?;
#[cfg(all(feature = "systemd", target_os = "linux"))]
sd_notify::notify(true, &[sd_notify::NotifyState::Ready])
.expect("failed to notify systemd of ready state");
}
Ok(())
}
#[implement(Service)]
pub fn reload<'a, I>(&self, paths: I) -> Result<Arc<Config>>
where
I: Iterator<Item = &'a Path>,
{
let old = self.server.config.clone();
let new = Config::load(paths).and_then(|raw| Config::new(&raw))?;
check::reload(&old, &new)?;
self.server.config.update(new)
}
@@ -1,9 +1,9 @@
use std::mem;
use std::{fmt::Debug, mem};
use bytes::Bytes;
use conduwuit::{
debug, debug_error, debug_warn, err, error::inspect_debug_log, implement, trace,
utils::string::EMPTY, Err, Error, Result,
debug, debug::INFO_SPAN_LEVEL, debug_error, debug_warn, err, error::inspect_debug_log,
implement, trace, utils::string::EMPTY, Err, Error, Result,
};
use http::{header::AUTHORIZATION, HeaderValue};
use ipaddress::IPAddress;
@@ -20,82 +20,110 @@ use ruma::{
use crate::resolver::actual::ActualDest;
impl super::Service {
#[tracing::instrument(
level = "debug"
/// Sends a request to a federation server
#[implement(super::Service)]
#[tracing::instrument(skip_all, name = "request", level = "debug")]
pub async fn execute<T>(&self, dest: &ServerName, request: T) -> Result<T::IncomingResponse>
where
T: OutgoingRequest + Debug + Send,
{
let client = &self.services.client.federation;
self.execute_on(client, dest, request).await
}
/// Like execute() but with a very large timeout
#[implement(super::Service)]
#[tracing::instrument(skip_all, name = "synapse", level = "debug")]
pub async fn execute_synapse<T>(
&self,
dest: &ServerName,
request: T,
) -> Result<T::IncomingResponse>
where
T: OutgoingRequest + Debug + Send,
{
let client = &self.services.client.synapse;
self.execute_on(client, dest, request).await
}
#[implement(super::Service)]
#[tracing::instrument(
name = "fed",
level = INFO_SPAN_LEVEL,
skip(self, client, request),
)]
pub async fn send<T>(
&self,
client: &Client,
dest: &ServerName,
request: T,
) -> Result<T::IncomingResponse>
where
T: OutgoingRequest + Send,
pub async fn execute_on<T>(
&self,
client: &Client,
dest: &ServerName,
request: T,
) -> Result<T::IncomingResponse>
where
T: OutgoingRequest + Send,
{
if !self.services.server.config.allow_federation {
return Err!(Config("allow_federation", "Federation is disabled."));
}
if self
.services
.server
.config
.forbidden_remote_server_names
.contains(dest)
{
if !self.server.config.allow_federation {
return Err!(Config("allow_federation", "Federation is disabled."));
}
if self
.server
.config
.forbidden_remote_server_names
.contains(dest)
{
return Err!(Request(Forbidden(debug_warn!(
"Federation with {dest} is not allowed."
))));
}
let actual = self.services.resolver.get_actual_dest(dest).await?;
let request = into_http_request::<T>(&actual, request)?;
let request = self.prepare(dest, request)?;
self.execute::<T>(dest, &actual, request, client).await
return Err!(Request(Forbidden(debug_warn!("Federation with {dest} is not allowed."))));
}
async fn execute<T>(
&self,
dest: &ServerName,
actual: &ActualDest,
request: Request,
client: &Client,
) -> Result<T::IncomingResponse>
where
T: OutgoingRequest + Send,
{
let url = request.url().clone();
let method = request.method().clone();
let actual = self.services.resolver.get_actual_dest(dest).await?;
let request = into_http_request::<T>(&actual, request)?;
let request = self.prepare(dest, request)?;
self.perform::<T>(dest, &actual, request, client).await
}
debug!(?method, ?url, "Sending request");
match client.execute(request).await {
| Ok(response) => handle_response::<T>(dest, actual, &method, &url, response).await,
| Err(error) =>
Err(handle_error(actual, &method, &url, error).expect_err("always returns error")),
#[implement(super::Service)]
async fn perform<T>(
&self,
dest: &ServerName,
actual: &ActualDest,
request: Request,
client: &Client,
) -> Result<T::IncomingResponse>
where
T: OutgoingRequest + Send,
{
let url = request.url().clone();
let method = request.method().clone();
debug!(?method, ?url, "Sending request");
match client.execute(request).await {
| Ok(response) => handle_response::<T>(dest, actual, &method, &url, response).await,
| Err(error) =>
Err(handle_error(actual, &method, &url, error).expect_err("always returns error")),
}
}
#[implement(super::Service)]
fn prepare(&self, dest: &ServerName, mut request: http::Request<Vec<u8>>) -> Result<Request> {
self.sign_request(&mut request, dest);
let request = Request::try_from(request)?;
self.validate_url(request.url())?;
self.services.server.check_running()?;
Ok(request)
}
#[implement(super::Service)]
fn validate_url(&self, url: &Url) -> Result<()> {
if let Some(url_host) = url.host_str() {
if let Ok(ip) = IPAddress::parse(url_host) {
trace!("Checking request URL IP {ip:?}");
self.services.resolver.validate_ip(&ip)?;
}
}
fn prepare(&self, dest: &ServerName, mut request: http::Request<Vec<u8>>) -> Result<Request> {
self.sign_request(&mut request, dest);
let request = Request::try_from(request)?;
self.validate_url(request.url())?;
self.server.check_running()?;
Ok(request)
}
fn validate_url(&self, url: &Url) -> Result<()> {
if let Some(url_host) = url.host_str() {
if let Ok(ip) = IPAddress::parse(url_host) {
trace!("Checking request URL IP {ip:?}");
self.services.resolver.validate_ip(&ip)?;
}
}
Ok(())
}
Ok(())
}
async fn handle_response<T>(
@@ -195,7 +223,7 @@ fn sign_request(&self, http_request: &mut http::Request<Vec<u8>>, dest: &ServerN
type Value = CanonicalJsonValue;
type Object = CanonicalJsonObject;
let origin = self.services.globals.server_name();
let origin = &self.services.server.name;
let body = http_request.body();
let uri = http_request
.uri()

Some files were not shown because too many files have changed in this diff Show More