Skip to content

Commit b6b8391

Browse files
committed
fix: Save msgs to key-contacts migration state and run migration periodically (#6956)
Save: - (old contact id) -> (new contact id) mapping. - The message id starting from which all messages are already migrated. Run the migration from `housekeeping()` and make the latter run every 2.4 hours untill all messages are migrated. Migrate >= 1000 messages per run.
1 parent 9cdf9f6 commit b6b8391

File tree

5 files changed

+137
-50
lines changed

5 files changed

+137
-50
lines changed

src/constants.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -257,6 +257,9 @@ pub(crate) const ASM_BODY: &str = "This is the Autocrypt Setup Message \
257257
If you see this message in a chatmail client (Delta Chat, Arcane Chat, Delta Touch ...), \
258258
use \"Settings / Add Second Device\" instead.";
259259

260+
/// Period between `sql::housekeeping()` runs.
261+
pub(crate) const HOUSEKEEPING_PERIOD: i64 = 24 * 60 * 60;
262+
260263
#[cfg(test)]
261264
mod tests {
262265
use num_traits::FromPrimitive;

src/context.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1041,6 +1041,13 @@ impl Context {
10411041
.await?
10421042
.to_string(),
10431043
);
1044+
res.insert(
1045+
"first_key_contacts_msg_id",
1046+
self.sql
1047+
.get_raw_config("first_key_contacts_msg_id")
1048+
.await?
1049+
.unwrap_or_default(),
1050+
);
10441051

10451052
let elapsed = time_elapsed(&self.creation_time);
10461053
res.insert("uptime", duration_to_str(elapsed));

src/scheduler.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ use tokio_util::task::TaskTracker;
1515

1616
use self::connectivity::ConnectivityStore;
1717
use crate::config::{self, Config};
18+
use crate::constants;
1819
use crate::contact::{ContactId, RecentlySeenLoop};
1920
use crate::context::Context;
2021
use crate::download::{DownloadState, download_msg};
@@ -497,7 +498,8 @@ async fn inbox_fetch_idle(ctx: &Context, imap: &mut Imap, mut session: Session)
497498

498499
match ctx.get_config_i64(Config::LastHousekeeping).await {
499500
Ok(last_housekeeping_time) => {
500-
let next_housekeeping_time = last_housekeeping_time.saturating_add(60 * 60 * 24);
501+
let next_housekeeping_time =
502+
last_housekeeping_time.saturating_add(constants::HOUSEKEEPING_PERIOD);
501503
if next_housekeeping_time <= time() {
502504
sql::housekeeping(ctx).await.log_err(ctx).ok();
503505
}

src/sql.rs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -581,6 +581,12 @@ impl Sql {
581581
Ok(value)
582582
}
583583

584+
/// Removes the `key`'s value from the cache.
585+
pub(crate) async fn uncache_raw_config(&self, key: &str) {
586+
let mut lock = self.config_cache.write().await;
587+
lock.remove(key);
588+
}
589+
584590
/// Sets configuration for the given key to 32-bit signed integer value.
585591
pub async fn set_raw_config_int(&self, key: &str, value: i32) -> Result<()> {
586592
self.set_raw_config(key, Some(&format!("{value}"))).await
@@ -793,6 +799,12 @@ pub async fn housekeeping(context: &Context) -> Result<()> {
793799
.log_err(context)
794800
.ok();
795801

802+
migrations::msgs_to_key_contacts(context)
803+
.await
804+
.context("migrations::msgs_to_key_contacts")
805+
.log_err(context)
806+
.ok();
807+
796808
info!(context, "Housekeeping done.");
797809
Ok(())
798810
}

src/sql/migrations.rs

Lines changed: 112 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
//! Migrations module.
22
3+
use std::cmp;
34
use std::collections::BTreeMap;
45
use std::collections::BTreeSet;
56
use std::time::Instant;
@@ -12,7 +13,7 @@ use rusqlite::OptionalExtension;
1213

1314
use crate::config::Config;
1415
use crate::configure::EnteredLoginParam;
15-
use crate::constants::ShowEmails;
16+
use crate::constants::{self, ShowEmails};
1617
use crate::context::Context;
1718
use crate::imap;
1819
use crate::key::DcKey;
@@ -21,7 +22,7 @@ use crate::login_param::ConfiguredLoginParam;
2122
use crate::message::MsgId;
2223
use crate::provider::get_provider_by_domain;
2324
use crate::sql::Sql;
24-
use crate::tools::inc_and_check;
25+
use crate::tools::{Time, inc_and_check, time, time_elapsed};
2526

2627
const DBVERSION: i32 = 68;
2728
const VERSION_CFG: &str = "dbversion";
@@ -1245,6 +1246,10 @@ CREATE INDEX gossip_timestamp_index ON gossip_timestamp (chat_id, fingerprint);
12451246
"key-contacts migration took {:?} in total.",
12461247
start.elapsed()
12471248
);
1249+
// Schedule `msgs_to_key_contacts()`.
1250+
context
1251+
.set_config_internal(Config::LastHousekeeping, None)
1252+
.await?;
12481253
}
12491254

12501255
let new_version = sql
@@ -1830,62 +1835,120 @@ fn migrate_key_contacts(
18301835
}
18311836

18321837
// ======================= Step 5: =======================
1833-
// Rewrite `from_id` in messages
1838+
// Prepare for rewriting `from_id`, `to_id` in messages
18341839
{
1835-
let start = Instant::now();
1836-
1837-
let mut encrypted_msgs_stmt = transaction
1838-
.prepare(
1839-
"SELECT id, from_id, to_id
1840-
FROM msgs
1841-
WHERE chat_id>9
1842-
AND (param GLOB '*\nc=1*' OR param GLOB 'c=1*')
1843-
ORDER BY id DESC LIMIT 10000",
1840+
let mut contacts_map = autocrypt_key_contacts_with_reset_peerstate;
1841+
for (old, new) in autocrypt_key_contacts {
1842+
contacts_map.insert(old, new);
1843+
}
1844+
transaction
1845+
.execute(
1846+
"CREATE TABLE key_contacts_map (
1847+
old_id INTEGER PRIMARY KEY,
1848+
new_id INTEGER NOT NULL
1849+
) WITHOUT ROWID, STRICT",
1850+
(),
18441851
)
18451852
.context("Step 32")?;
1846-
let mut rewrite_msg_stmt = transaction
1847-
.prepare("UPDATE msgs SET from_id=?, to_id=? WHERE id=?")
1848-
.context("Step 32.1")?;
1849-
1850-
struct LoadedMsg {
1851-
id: u32,
1852-
from_id: u32,
1853-
to_id: u32,
1853+
{
1854+
let mut stmt = transaction
1855+
.prepare("INSERT INTO key_contacts_map (old_id, new_id) VALUES (?, ?)")
1856+
.context("Step 33")?;
1857+
for ids in contacts_map {
1858+
stmt.execute(ids).context("Step 34")?;
1859+
}
18541860
}
1861+
transaction
1862+
.execute(
1863+
"INSERT INTO config (keyname, value) VALUES (
1864+
'first_key_contacts_msg_id',
1865+
IFNULL((SELECT MAX(id)+1 FROM msgs), 0)
1866+
)",
1867+
(),
1868+
)
1869+
.context("Step 35")?;
1870+
}
18551871

1856-
let encrypted_msgs = encrypted_msgs_stmt
1857-
.query_map((), |row| {
1858-
let id: u32 = row.get(0)?;
1859-
let from_id: u32 = row.get(1)?;
1860-
let to_id: u32 = row.get(2)?;
1861-
Ok(LoadedMsg { id, from_id, to_id })
1862-
})
1863-
.context("Step 33")?;
1864-
1865-
for msg in encrypted_msgs {
1866-
let msg = msg.context("Step 34")?;
1867-
1868-
let new_from_id = *autocrypt_key_contacts
1869-
.get(&msg.from_id)
1870-
.or_else(|| autocrypt_key_contacts_with_reset_peerstate.get(&msg.from_id))
1871-
.unwrap_or(&msg.from_id);
1872-
1873-
let new_to_id = *autocrypt_key_contacts
1874-
.get(&msg.to_id)
1875-
.or_else(|| autocrypt_key_contacts_with_reset_peerstate.get(&msg.to_id))
1876-
.unwrap_or(&msg.to_id);
1872+
Ok(())
1873+
}
18771874

1878-
rewrite_msg_stmt
1879-
.execute((new_from_id, new_to_id, msg.id))
1880-
.context("Step 35")?;
1875+
/// Rewrite `from_id`, `to_id` in ~1000 messages starting from the newest ones, to key-contacts.
1876+
pub(crate) async fn msgs_to_key_contacts(context: &Context) -> Result<()> {
1877+
if context
1878+
.sql
1879+
.get_raw_config_int64("first_key_contacts_msg_id")
1880+
.await?
1881+
<= Some(0)
1882+
{
1883+
return Ok(());
1884+
}
1885+
let start = Time::now();
1886+
let trans_fn = |t: &mut rusqlite::Transaction| {
1887+
let mut first_key_contacts_msg_id: u64 = t
1888+
.query_one(
1889+
"SELECT CAST(value AS INTEGER) FROM config WHERE keyname='first_key_contacts_msg_id'",
1890+
(),
1891+
|row| row.get(0),
1892+
)
1893+
.context("Get first_key_contacts_msg_id")?;
1894+
let mut stmt = t
1895+
.prepare(
1896+
"UPDATE msgs SET
1897+
from_id=IFNULL(
1898+
(SELECT new_id FROM key_contacts_map WHERE old_id=msgs.from_id),
1899+
from_id
1900+
),
1901+
to_id=IFNULL(
1902+
(SELECT new_id FROM key_contacts_map WHERE old_id=msgs.to_id),
1903+
to_id
1904+
)
1905+
WHERE id>=? AND id<?
1906+
AND chat_id>9
1907+
AND (param GLOB '*\nc=1*' OR param GLOB 'c=1*')",
1908+
)
1909+
.context("Prepare stmt")?;
1910+
let msgs_to_migrate = 1000;
1911+
let mut msgs_migrated: u64 = 0;
1912+
while first_key_contacts_msg_id > 0 && msgs_migrated < msgs_to_migrate {
1913+
let start_msg_id = first_key_contacts_msg_id.saturating_sub(msgs_to_migrate);
1914+
let cnt: u64 = stmt
1915+
.execute((start_msg_id, first_key_contacts_msg_id))
1916+
.context("UPDATE msgs")?
1917+
.try_into()?;
1918+
msgs_migrated += cnt;
1919+
first_key_contacts_msg_id = start_msg_id;
18811920
}
1882-
info!(
1883-
context,
1884-
"Rewriting msgs to key-contacts took {:?}.",
1885-
start.elapsed()
1921+
t.execute(
1922+
"UPDATE config SET value=? WHERE keyname='first_key_contacts_msg_id'",
1923+
(first_key_contacts_msg_id,),
1924+
)
1925+
.context("Update first_key_contacts_msg_id")?;
1926+
Ok((msgs_migrated, first_key_contacts_msg_id))
1927+
};
1928+
let (msgs_migrated, first_key_contacts_msg_id) = context.sql.transaction(trans_fn).await?;
1929+
context
1930+
.sql
1931+
.uncache_raw_config("first_key_contacts_msg_id")
1932+
.await;
1933+
info!(
1934+
context,
1935+
"Rewriting {msgs_migrated} msgs to key-contacts took {:?}.",
1936+
time_elapsed(&start),
1937+
);
1938+
1939+
if first_key_contacts_msg_id > 0 {
1940+
let last_housekeeping = context.get_config_i64(Config::LastHousekeeping).await?;
1941+
let last_housekeeping = cmp::min(
1942+
last_housekeeping,
1943+
time().saturating_sub(constants::HOUSEKEEPING_PERIOD / 10 * 9),
18861944
);
1945+
context
1946+
.set_config_internal(
1947+
Config::LastHousekeeping,
1948+
Some(&last_housekeeping.to_string()),
1949+
)
1950+
.await?;
18871951
}
1888-
18891952
Ok(())
18901953
}
18911954

0 commit comments

Comments
 (0)