Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGES.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
trunk?? (The current trunk is on 5.1)
* Support ZSTD dictionary compression (CASSANDRA-17021)

5.1
* Update snakeyaml to 2.4 (CASSANDRA-20928)
* Update Netty to 4.1.125.Final (CASSANDRA-20925)
Expand Down
48 changes: 48 additions & 0 deletions conf/cassandra.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -617,6 +617,54 @@ counter_cache_save_period: 7200s
# Disabled by default, meaning all keys are going to be saved
# counter_cache_keys_to_save: 100

# Dictionary compression settings for ZSTD dictionary-based compression
# These settings control the automatic training and caching of compression dictionaries
# for tables that use ZSTD dictionary compression.

# How often to refresh compression dictionaries across the cluster.
# During refresh, nodes will check for newer dictionary versions and update their caches.
# Min unit: s
compression_dictionary_refresh_interval: 3600s

# Initial delay before starting the first dictionary refresh cycle after node startup.
# This prevents all nodes from refreshing simultaneously when the cluster starts.
# Min unit: s
compression_dictionary_refresh_initial_delay: 10s

# Maximum number of compression dictionaries to cache per table.
# Each table using dictionary compression can have multiple dictionaries cached
# (current version plus recently used versions for reading older SSTables).
compression_dictionary_cache_size: 10

# How long to keep compression dictionaries in the cache before they expire.
# Expired dictionaries will be removed from memory but can be reloaded if needed.
# Min unit: s
compression_dictionary_cache_expire: 3600s

# Dictionary training configuration (advanced settings)
# These settings control how compression dictionaries are trained from sample data.

# Maximum size of a trained compression dictionary in bytes.
# Larger dictionaries may provide better compression but use more memory.
# Min unit: B
compression_dictionary_training_max_dictionary_size: 65536

# Maximum total size of sample data to collect for dictionary training.
# More sample data generally produces better dictionaries but takes longer to train.
# The recommended sample size is 100x the dictionary size.
# Min unit: B
compression_dictionary_training_max_total_sample_size: 10485760

# Enable automatic dictionary training based on sampling of write operations.
# When enabled, the system will automatically collect samples and train new dictionaries.
# Manual training via nodetool is always available regardless of this setting.
compression_dictionary_training_auto_train_enabled: false

# Sampling rate for automatic dictionary training (1-10000).
# Value of 100 means 1% of writes are sampled. Lower values reduce overhead but may
# result in less representative sample data for dictionary training.
compression_dictionary_training_sampling_rate: 100

# saved caches
# If not set, the default directory is $CASSANDRA_HOME/data/saved_caches.
# saved_caches_directory: /var/lib/cassandra/saved_caches
Expand Down
11 changes: 11 additions & 0 deletions src/java/org/apache/cassandra/config/Config.java
Original file line number Diff line number Diff line change
Expand Up @@ -514,6 +514,17 @@ public static class SSTableConfig
public volatile DurationSpec.IntSecondsBound counter_cache_save_period = new DurationSpec.IntSecondsBound("7200s");
public volatile int counter_cache_keys_to_save = Integer.MAX_VALUE;

public volatile DurationSpec.IntSecondsBound compression_dictionary_refresh_interval = new DurationSpec.IntSecondsBound("3600s"); // 1 hour - TODO: re-assess whether daily (86400s) is more appropriate
public volatile DurationSpec.IntSecondsBound compression_dictionary_refresh_initial_delay = new DurationSpec.IntSecondsBound("10s"); // 10 seconds default
public volatile int compression_dictionary_cache_size = 10; // max dictionaries per table
public volatile DurationSpec.IntSecondsBound compression_dictionary_cache_expire = new DurationSpec.IntSecondsBound("3600s"); // 1 hour default

// Dictionary training settings
public volatile int compression_dictionary_training_max_dictionary_size = 65536; // 64KB
public volatile int compression_dictionary_training_max_total_sample_size = 10485760; // 10MB total
public volatile boolean compression_dictionary_training_auto_train_enabled = false;
public volatile int compression_dictionary_training_sampling_rate = 100; // samples 1%; using int since random.nextInt is generally faster than random.nextDouble

public DataStorageSpec.LongMebibytesBound paxos_cache_size = null;

public DataStorageSpec.LongMebibytesBound consensus_migration_cache_size = null;
Expand Down
41 changes: 41 additions & 0 deletions src/java/org/apache/cassandra/config/DatabaseDescriptor.java
Original file line number Diff line number Diff line change
Expand Up @@ -4361,6 +4361,47 @@ public static void setCounterCacheKeysToSave(int counterCacheKeysToSave)
conf.counter_cache_keys_to_save = counterCacheKeysToSave;
}

public static int getCompressionDictionaryRefreshIntervalSeconds()
{
return conf.compression_dictionary_refresh_interval.toSeconds();
}

public static int getCompressionDictionaryRefreshInitialDelaySeconds()
{
return conf.compression_dictionary_refresh_initial_delay.toSeconds();
}

public static int getCompressionDictionaryCacheSize()
{
return conf.compression_dictionary_cache_size;
}

public static int getCompressionDictionaryCacheExpireSeconds()
{
return conf.compression_dictionary_cache_expire.toSeconds();
}

public static int getCompressionDictionaryTrainingMaxDictionarySize()
{
return conf.compression_dictionary_training_max_dictionary_size;
}

public static int getCompressionDictionaryTrainingMaxTotalSampleSize()
{
return conf.compression_dictionary_training_max_total_sample_size;
}

public static boolean getCompressionDictionaryTrainingAutoTrainEnabled()
{
return conf.compression_dictionary_training_auto_train_enabled;
}


public static int getCompressionDictionaryTrainingSamplingRate()
{
return conf.compression_dictionary_training_sampling_rate;
}

public static int getStreamingKeepAlivePeriod()
{
return conf.streaming_keep_alive_period.toSeconds();
Expand Down
12 changes: 12 additions & 0 deletions src/java/org/apache/cassandra/db/ColumnFamilyStore.java
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@
import org.apache.cassandra.db.compaction.CompactionManager;
import org.apache.cassandra.db.compaction.CompactionStrategyManager;
import org.apache.cassandra.db.compaction.OperationType;
import org.apache.cassandra.db.compression.CompressionDictionaryManager;
import org.apache.cassandra.db.filter.ClusteringIndexFilter;
import org.apache.cassandra.db.filter.DataLimits;
import org.apache.cassandra.db.lifecycle.ILifecycleTransaction;
Expand Down Expand Up @@ -320,6 +321,7 @@ public enum FlushReason
public final TopPartitionTracker topPartitions;

private final SSTableImporter sstableImporter;
private final CompressionDictionaryManager compressionDictionaryManager;

private volatile boolean compactionSpaceCheck = true;

Expand Down Expand Up @@ -390,6 +392,7 @@ public void reload(TableMetadata tableMetadata)
cfs.crcCheckChance = new DefaultValue<>(tableMetadata.params.crcCheckChance);

compactionStrategyManager.maybeReloadParamsFromSchema(tableMetadata.params.compaction);
compressionDictionaryManager.maybeReloadFromSchema(tableMetadata.params.compression);

indexManager.reload(tableMetadata);

Expand Down Expand Up @@ -576,6 +579,7 @@ public ColumnFamilyStore(Keyspace keyspace,
streamManager = new CassandraStreamManager(this);
repairManager = new CassandraTableRepairManager(this);
sstableImporter = new SSTableImporter(this);
compressionDictionaryManager = new CompressionDictionaryManager(this, registerBookeeping);

if (DatabaseDescriptor.isClientOrToolInitialized() || SchemaConstants.isSystemKeyspace(getKeyspaceName()))
topPartitions = null;
Expand Down Expand Up @@ -733,6 +737,8 @@ public void invalidate(boolean expectMBean, boolean dropData)
invalidateCaches();
if (topPartitions != null)
topPartitions.close();

compressionDictionaryManager.close();
}

/**
Expand Down Expand Up @@ -3420,6 +3426,12 @@ public TableMetrics getMetrics()
return metric;
}

@Override
public CompressionDictionaryManager compressionDictionaryManager()
{
return compressionDictionaryManager;
}

public TableId getTableId()
{
return metadata().id;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1796,6 +1796,7 @@ public static SSTableWriter createWriter(ColumnFamilyStore cfs,
.setSerializationHeader(sstable.header)
.addDefaultComponents(cfs.indexManager.listIndexGroups())
.setSecondaryIndexGroups(cfs.indexManager.listIndexGroups())
.setCompressionDictionaryManager(cfs.compressionDictionaryManager())
.build(txn, cfs);
}

Expand Down Expand Up @@ -1836,6 +1837,7 @@ public static SSTableWriter createWriterForAntiCompaction(ColumnFamilyStore cfs,
.setSerializationHeader(SerializationHeader.make(cfs.metadata(), sstables))
.addDefaultComponents(cfs.indexManager.listIndexGroups())
.setSecondaryIndexGroups(cfs.indexManager.listIndexGroups())
.setCompressionDictionaryManager(cfs.compressionDictionaryManager())
.build(txn, cfs);
}

Expand Down
1 change: 1 addition & 0 deletions src/java/org/apache/cassandra/db/compaction/Upgrader.java
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ private SSTableWriter createCompactionWriter(StatsMetadata metadata)
.setSerializationHeader(SerializationHeader.make(cfs.metadata(), Sets.newHashSet(sstable)))
.addDefaultComponents(cfs.indexManager.listIndexGroups())
.setSecondaryIndexGroups(cfs.indexManager.listIndexGroups())
.setCompressionDictionaryManager(cfs.compressionDictionaryManager())
.build(transaction, cfs);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ private SSTableWriter createWriter(Descriptor descriptor)
.setSerializationHeader(header)
.addDefaultComponents(indexGroups)
.setSecondaryIndexGroups(indexGroups)
.setCompressionDictionaryManager(cfs.compressionDictionaryManager())
.build(txn, cfs);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -329,6 +329,7 @@ protected long getExpectedWriteSize()
.setRepairedAt(minRepairedAt)
.setPendingRepair(pendingRepair)
.setSecondaryIndexGroups(cfs.indexManager.listIndexGroups())
.addDefaultComponents(cfs.indexManager.listIndexGroups());
.addDefaultComponents(cfs.indexManager.listIndexGroups())
.setCompressionDictionaryManager(cfs.compressionDictionaryManager());
}
}
Loading