
Table of Contents
1. Introduction
1.1. What Is a Bloom Filter?
You may already heard about this term. It is a data structure used to test whether an element is a member of a set with a small risk of false positives. Therefore excellent candidate for where/filter clause in SQL to limit data input before expensive operation like a join. Let me explain you in visual form how it works.
1.2. Visual Example
Here you can see example of populating and using bloom filter. Bit array size is 16 and you will use 3 hashing functions. Implementation of these functions can be anything but their output is position of the bit in an array.
At the beginning you have empty Bit Array
Bit Array (size = 16):
Index: 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
Bits: 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0
then encode “toughcoding” inside array by getting value of 3 different hashing functions
"toughcoding" → h1=2, h2=6, h3=12
Bit Array:
Index: 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
Bits: 0 0 1 0 0 0 1 0 0 0 0 0 1 0 0 0
^ ^ ^
h1=2 h2=6 h3=12
by look up “toughcoding” you executing hashing functions again to check if expected positions in an array have 1. If yes it means this string was already encoded in the array in past.
"toughcoding" →h1=2, h2=6, h3=12
Check bits at 2, 6, 12 → [1, 1, 1] → all 3 positions = 1 → Possibly Present ✅
If you check “medium”:
"medium" → h1=2, h2=5, h3=12
Check bits at 2, 5, 12 → [1, 0, 1] → At least one 0 → Definitely NOT Present ❌
- Queries passes only if all corresponding bits are 1.
- False positives happen when unrelated values share all 1s by coincidence. For example
"false_positive" → h1=6, h2=12, h3=2
Check bits at 2, 6, 12 → [1, 1, 1] → False positive, not present ❌
Bit array as a filter allow hash collision due to it’s nature. Multiple hash functions operating on different inputs can produce same bit position number as output. Various words can be encoded same way in the bit array and because you do not store original words anywhere you cannot distinguish later on if value was really “added” (encoded) in an array or not thus false positive effect.
1.3.Equations
The false positive rate is approximately given by
\( P = \left(1 – \left(1 – \frac{1}{m}\right)^{kn} \right)^k \)
Where:
– \( P \): Probability of false positive
– \( m \): Number of bits in the Bloom filter
– \( k \): Number of hash functions
– \( n \): Number of inserted elements
To minimize the false positive probability in a Bloom filter:
– The optimal size of the bit array is
\(m = -\frac{n \cdot \ln p}{(\ln 2)^2}\)
– The optimal number of hash functions is
\(k = \frac{m}{n} \cdot \ln 2\)
Where:
– \( n \): Number of inserted elements
– \( p \): Desired false positive probability
– \( m \): Number of bits in the Bloom filter
– \( k \): Number of hash functions
For example when you want to store 5,000 items with 0.1% false positive rate
Given:
– \( n = 5000 \)
– \( p = 0.001 \)
Step 1: Calculate optimal bit array size \( m \)
\(
m = -\frac{n \cdot \ln p}{(\ln 2)^2}
= -\frac{5000 \cdot \ln(0.001)}{(\ln 2)^2}
= \frac{5000 \cdot 6.9078}{0.4809}
\approx 71800 \text{ bits}
\)
Step 2: Calculate optimal number of hash functions \( k \)
\(
k = \frac{m}{n} \cdot \ln 2
= \frac{71800}{5000} \cdot 0.6931
\approx 14.36 \approx 14 \text{ hash functions}
\)
A Bloom filter with ~71,800 bits and 14 hash functions is needed to store 5,000 items with a 0.1% false positive rate.
2. Code
You can copy paste commands to create small scala project to execute 2 joins , with and without bloom filtering. Java Development Kit (JDK) is version 11.
2.1. sbt
cat <<'EOF' > build.sbt
ThisBuild / scalaVersion := "2.13.12"
name := "SparkBloomFilterDemo"
version := "0.1"
resolvers += "Spark Packages Repo" at "https://repos.spark-packages.org"
libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-core" % "3.3.0",
"org.apache.spark" %% "spark-sql" % "3.3.0",
"org.apache.hadoop" % "hadoop-client" % "3.3.4"
)
EOF
2.2. scala code
mkdir -p src/main/scala
cat <<'EOF' > src/main/scala/BloomFilterJoin.scala
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
import org.apache.spark.util.sketch.BloomFilter
object BloomFilterJoin {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder()
.appName("Bloom Filter Join")
.master("local[*]")
.config("spark.driver.memory", "8g")
.config("spark.executor.memory", "8g")
.config("spark.sql.autoBroadcastJoinThreshold", -1)
.getOrCreate()
import spark.implicits._
val numCustomers = 200000
val numLogs = 1000000
val customersDF = spark.range(1, numCustomers + 1)
.withColumnRenamed("id", "customer_id")
.cache()
val logsDF = spark.range(1, numLogs + 1)
.withColumn("customer_id", (rand() * numCustomers).cast("long") + 1)
.drop("id")
val start1 = System.nanoTime()
val joinedDF1 = logsDF.join(customersDF, "customer_id")
val count1 = joinedDF1.count()
val duration1 = (System.nanoTime() - start1) / 1e9d
println(f"⏱️ Join WITHOUT Bloom filter: $duration1%.2f seconds, Count: $count1")
joinedDF1.explain()
val expectedNumItems = customersDF.count()
val fpp = 0.01
val bloom = customersDF
.select("customer_id")
.rdd
.map(_.getLong(0))
.aggregate(BloomFilter.create(expectedNumItems, fpp))(
(bf, id) => { bf.put(id); bf },
(bf1, bf2) => bf1.mergeInPlace(bf2)
)
val broadcastBF = spark.sparkContext.broadcast(bloom)
val mightContain = udf((id: Long) => broadcastBF.value.mightContain(id))
val start2 = System.nanoTime()
val filteredLogsDF = logsDF.filter(mightContain($"customer_id"))
val joinedDF2 = filteredLogsDF.join(customersDF, "customer_id")
val count2 = joinedDF2.count()
val duration2 = (System.nanoTime() - start2) / 1e9d
println(f"⏱️ Join WITH Bloom filter: $duration2%.2f seconds, Count: $count2")
joinedDF2.explain()
val gain = duration1 - duration2
val percentGain = (gain / duration1) * 100
println(f"✅ Bloom filter saved $gain%.2f seconds (~$percentGain%.2f%% faster)")
spark.stop()
System.exit(0)
}
}
EOF
2.3. Build and run
sbt clean compile run
2.3.1. Dockerize
You can run it with docker if your scala/sbt/java environment is not ready.
docker run --rm -it \
-v "$(pwd)":/app \
-w /app \
sbtscala/scala-sbt:eclipse-temurin-11.0.17_8_1.8.2_2.13.10 \
sbt run
if you are windows user running through git bash you have to add winpty and adjust paths
winpty docker run --rm -it \
-v //c/tmp/bloom_example_or_your_path/.://app \
-w //app \
sbtscala/scala-sbt:eclipse-temurin-11.0.17_8_1.8.2_2.13.10 \
sbt run
3. Results
3.1. Join Without Bloom Filter
In the first scenario, the join operation is performed without a Bloom filter:
- Execution Time: 2.35 seconds
- Row Count: 1,000,000
- Left Side (Larger Table – 1M rows):
- Generates 1 million random customer IDs with a rand function.
- Ensures all IDs are not null.
- Projected, filtered, sorted, and repartitioned.
- Right Side (Smaller Table – 200K rows):
- Precomputed and stored in memory (InMemoryRelation).
- Range from 1 to 200,000 mapped to customer_id.
- Sorted and hash-partitioned to match the join strategy.
- Join Strategy:
- Uses a Sort-Merge Join after both datasets are sorted and partitioned.
- No Bloom filter optimization is applied.
Physical Plan:
⏱️ Join WITHOUT Bloom filter: 2.35 seconds, Count: 1000000
AdaptiveSparkPlan (isFinalPlan = false)
└── Project [customer_id]
└── SortMergeJoin [customer_id], [customer_id], Inner
├── Sort [customer_id ASC]
│ └── Exchange (hashpartitioning(customer_id, 200))
│ └── Filter isnotnull(customer_id)
│ └── Project [(cast(rand * 200000) + 1) AS customer_id]
│ └── Range (1 to 1,000,000)
└── Sort [customer_id ASC]
└── Exchange (hashpartitioning(customer_id, 200))
└── InMemoryTableScan [customer_id]
└── InMemoryRelation [customer_id]
└── Project [id AS customer_id]
└── Range (1 to 200,000)
- SortMergeJoin: The join is executed using a SortMergeJoin on customer_id.
- Exchange: Data is exchanged between partitions using hash partitioning on customer_id.
- Filter: A filter is applied to ensure that customer_id is not null.
- Project: A projection is applied to generate the customer_id column.
- Range: A range operation generates a sequence of numbers to simulate data.
3.2. Join With Bloom Filter
In the second scenario, the join operation is performed with a Bloom filter:
- Execution Time: 0.93 seconds
- Row Count: 1,000,000
Physical Plan:
⏱️ Join WITH Bloom filter: 0.93 seconds, Count: 1000000
AdaptiveSparkPlan (isFinalPlan = false)
└── Project [customer_id]
└── SortMergeJoin [customer_id], [customer_id], Inner
├── Sort [customer_id ASC]
│ └── Exchange (hashpartitioning(customer_id, 200))
│ └── Filter isnotnull(customer_id) AND BloomFilterUDF(customer_id)
│ └── Project [(cast(rand * 200000) + 1) AS customer_id]
│ └── Range (1 to 1,000,000)
└── Sort [customer_id ASC]
└── Exchange (hashpartitioning(customer_id, 200))
└── Filter BloomFilterUDF(customer_id)
└── InMemoryTableScan [customer_id], [BloomFilterUDF(customer_id)]
└── InMemoryRelation [customer_id]
└── Project [id AS customer_id]
└── Range (1 to 200,000)
✅ Bloom filter saved 1.42 seconds (~60.51% faster)
- SortMergeJoin: The join is executed using a SortMergeJoin on customer_id.
- Exchange: Data is exchanged between partitions using hash partitioning on customer_id.
- Filter: A filter is applied to ensure that customer_id is not null and Bloom Filter using a user-defined function).
- Project: A projection is applied to generate the customer_id column.
- Range: A range operation generates a sequence of numbers to simulate data.
3.3. How the Bloom Filter Enhances Performance
Key Differences between plans:
Feature | Without Bloom Filter | With Bloom Filter |
---|---|---|
Join Type | Sort-Merge Join | Sort-Merge Join |
Optimization | None | Bloom filter on both sides |
Filter on Left Side | isnotnull(customer_id) |
isnotnull(customer_id) + BloomFilterUDF |
Filter on Right Side | None | BloomFilterUDF |
Execution Time | 2.35 seconds | 0.93 seconds ✅ |
Speed-up | – | ~60.5% faster |
Performance Comparison
Aspect | Without Bloom Filter | With Bloom Filter |
---|---|---|
Execution Time | 2.35 seconds | 0.93 seconds |
Performance Gain | — | ~60.51% faster |
Data Shuffling | Higher | Reduced |
CPU Utilization | Higher | Lower |
Shuffle Bytes Written | Higher | Lower |
- The Bloom filter is applied as a UDF to both sides of the join.
- It filters out non-matching customer_id values _before_ the sort and shuffle, significantly reducing I/O and CPU cost.
- This results in fewer rows being sorted and joined.
By applying a Bloom filter, Spark can pre-filter data, reducing the amount of data shuffled across the network. This leads to:
- Reduced Shuffle Bytes: Less data is transferred between nodes.
- Lower CPU Usage: Less computation is needed to process the data.
- Faster Execution: Overall query execution time is reduced.
This optimization is particularly beneficial in scenarios where one side of the join is significantly smaller than the other, allowing the Bloom filter to effectively eliminate non-matching rows early in the process.
4. Summary
In this knowledge article you have learned about Bloom filters principle. How they are working in theory and how to apply them in real scenario using scala and spark. From now onward you can keep on using them to optimize your execution wherever you can implement them.
Have a nice coding!