CardinalityKit Architecture
CardinalityKit provides a complete implementation of cardinality estimation algorithms with a focus on research and practical applications. The codebase follows consistent patterns and best practices derived from the analysis.
🏗️ Project Structure
- Core Algorithms: Individual implementations of each algorithm
- Extended Versions: Demographic-aware variants
- Panel Integration: TV panel to sketch conversion
- Simulation Framework: Comprehensive testing and comparison
Core Implementation Patterns
# Standard algorithm implementation pattern from CardinalityKit
class AlgorithmSketch:
def __init__(self, b_m, additional_params=None):
"""
Initialize sketch with bucket bits and parameters
Args:
b_m: Number of bits for bucket indexing (2^b_m buckets)
additional_params: Algorithm-specific parameters
"""
self.b_m = b_m
self.m = 2 ** b_m
self.registers = self._initialize_registers()
def _initialize_registers(self):
"""Initialize data structures based on algorithm type"""
# HLL: initialize to 0 (for max rank)
# HR: initialize to 1.0 (for min value)
pass
def _hash_function(self, data):
"""
Double hashing for increased entropy
Standard pattern across all CardinalityKit implementations
"""
inner_hash = hashlib.sha256(str(data).encode('utf-8')).hexdigest()
outer_hash = hashlib.sha256(str(inner_hash).encode('utf-8')).hexdigest()
# Convert to 256-bit binary with zero padding
hashed_ = '{:256b}'.format(int(outer_hash, 16)).replace(' ', '0')
# Handle bit length constraints
off = 32
if (self.b_m) * 4 + off > 256:
hashed = hashed_
else:
hashed = hashed_[1:(self.b_m) * 3]
return hashed
def update_sketch(self, data):
"""Process single data point - algorithm-specific implementation"""
pass
def get_sketch(self):
"""Return internal data structures"""
return self.registers
def get_cardinality_estimate(self):
"""Calculate and return cardinality estimate"""
pass
HyperLogLog Implementation
import math
import sys
import hashlib
class HyperLogLogSketch:
def __init__(self, b_m):
self.b_m = b_m
self.m = 2 ** b_m
self.registers = [0] * self.m
def _hash_function(self, data):
"""Double hashing for increased entropy"""
inner_hash = hashlib.sha256(str(data).encode('utf-8')).hexdigest()
outer_hash = hashlib.sha256(str(inner_hash).encode('utf-8')).hexdigest()
hashed_ = '{:256b}'.format(int(outer_hash, 16)).replace(' ', '0')
return hashed_
def update_sketch(self, data):
"""Add element to HyperLogLog sketch"""
x = self._hash_function(data)
# Extract bucket index (first b_m bits)
j = int(str(x)[:self.b_m], 2)
# Calculate rank from remaining bits
remaining_bits = str(x)[self.b_m:]
w = 1
for c in reversed(remaining_bits):
if c == "0":
w += 1
else:
break
# Update bucket with maximum rank
self.registers[j] = max(self.registers[j], w)
def get_cardinality_estimate(self):
"""Calculate HyperLogLog cardinality estimate"""
# Harmonic mean calculation
total = sum(2 ** (-1 * bucket) for bucket in self.registers)
harmonic_mean = total ** -1
# Raw estimate
estimate = (self.m ** 2) * harmonic_mean
# Bias correction
if self.b_m <= 4:
BIAS = 0.673
elif self.b_m == 5:
BIAS = 0.697
else:
BIAS = 0.7213 / (1 + (1.079 / self.m))
estimate = BIAS * estimate
# Small range correction
if estimate < ((5 / 2) * self.m):
zeros = sum(1 for bucket in self.registers if bucket == 0)
if zeros != 0:
estimate = self.m * math.log(self.m / zeros)
# Large range correction
elif estimate > ((2 ** 32) / 30):
estimate = -1 * (2 ** 32) * math.log(1 - (estimate / (2 ** 32)))
return estimate
def get_memory_usage(self):
"""Return memory usage in bytes"""
return sys.getsizeof(self.registers)
HyperReal Implementation
class HyperRealSketch:
def __init__(self, b_m):
self.b_m = b_m
self.m = 2 ** b_m
self.registers = [1.0] * self.m # Initialize to 1.0 for minimum values
def _hash_function(self, data):
"""Same double hashing as HLL"""
inner_hash = hashlib.sha256(str(data).encode('utf-8')).hexdigest()
outer_hash = hashlib.sha256(str(inner_hash).encode('utf-8')).hexdigest()
hashed_ = '{:256b}'.format(int(outer_hash, 16)).replace(' ', '0')
return hashed_
def update_sketch(self, data):
"""Add element to HyperReal sketch"""
x = self._hash_function(data)
# Extract bucket index (first b_m bits)
j = int(str(x)[:self.b_m], 2)
# Normalize hash to [0,1] range
int_val = int(hashlib.sha256(str(data).encode()).hexdigest()[:8], 16)
w = int_val / (16 ** 8 - 1)
# Update bucket with minimum value
self.registers[j] = min(self.registers[j], w)
def get_cardinality_estimate(self):
"""Calculate HyperReal cardinality estimate (unbiased)"""
# Simple formula - no bias correction needed
estimate = (self.m ** 2) / sum(self.registers)
return estimate
def get_memory_usage(self):
"""Return memory usage in bytes"""
return sys.getsizeof(self.registers)
Extended Algorithm Implementation
class ExtendedHyperRealSketch:
def __init__(self, b_m, b_s):
self.b_m = b_m # Bucket bits
self.b_s = b_s # Attribute bits
self.m = 2 ** b_m
self.s = 2 ** b_s
# Core data structures
self.registers = [1.0] * self.m
self.frequency_counts = [0] * self.m
self.attribute_samples = [None] * self.m
def _hash_function(self, data):
"""Enhanced hash function for extended algorithms"""
inner_hash = hashlib.sha256(str(data).encode('utf-8')).hexdigest()
outer_hash = hashlib.sha256(str(inner_hash).encode('utf-8')).hexdigest()
hashed_ = '{:256b}'.format(int(outer_hash, 16)).replace(' ', '0')
# Ensure sufficient bits for bucket + attribute indexing
off = 32
if (self.b_m + self.b_s) * 4 + off > 256:
hashed = hashed_
else:
hashed = hashed_[1:(self.b_m + self.b_s) * 3]
return hashed
def update_sketch(self, event):
"""
Update sketch with user ID and attribute
Args:
event: dict with 'id_to_count' and 'attribute' keys
"""
user_id = event['id_to_count']
user_attr = event['attribute']
x = self._hash_function(user_id)
j = int(str(x)[:self.b_m], 2) # Bucket index
# Normalize hash to [0,1] range
int_val = int(hashlib.sha256(str(user_id).encode()).hexdigest()[:8], 16)
w = int_val / (16 ** 8 - 1)
# Update bucket (simpler than HLL due to float precision)
if self.registers[j] > w:
self.frequency_counts[j] = 1
self.attribute_samples[j] = user_attr
elif self.registers[j] == w: # Extremely rare with floats
self.frequency_counts[j] += 1
if self.attribute_samples[j] != user_attr:
self.attribute_samples[j] = user_attr
self.registers[j] = min(self.registers[j], w)
def get_cardinality_estimate(self):
"""Total cardinality estimate"""
return (self.m ** 2) / sum(self.registers)
def get_frequency_for_attr(self, attr=None):
"""Get cardinality estimate for specific attribute or all attributes"""
# Collect attribute distribution from buckets
attr_distribution = {}
for count, attr_sample in zip(self.frequency_counts, self.attribute_samples):
if attr_sample:
if attr_sample in attr_distribution:
attr_distribution[attr_sample] += count
else:
attr_distribution[attr_sample] = count
# Normalize to probabilities
total = sum(attr_distribution.values())
if total > 0:
for attr_sample in attr_distribution:
attr_distribution[attr_sample] /= total
# Scale by total cardinality estimate
total_cardinality = self.get_cardinality_estimate()
for attr_sample in attr_distribution:
attr_distribution[attr_sample] *= total_cardinality
# Return specific attribute or full distribution
if attr:
return attr_distribution.get(attr, 0)
else:
return attr_distribution
def get_sketch(self):
"""Return all internal data structures"""
return self.registers, self.frequency_counts, self.attribute_samples
Best Practices from CardinalityKit
✅ Implementation Best Practices
- Double Hashing: Always use SHA256(SHA256(data)) for increased entropy
- Binary Conversion: Use 256-bit binary with proper zero padding
- Memory Pre-allocation: Initialize lists with known sizes ([0] * m, [1.0] * m)
- Error Handling: Use try/except for attribute processing with graceful degradation
- Progress Tracking: Use tqdm for long-running operations
- Memory Tracking: Return sys.getsizeof() for memory analysis
# Best practices implementation example
import hashlib
import sys
from tqdm import tqdm
class OptimizedSketch:
def __init__(self, precision=14):
# Validate parameters
if precision < 4 or precision > 20:
raise ValueError("Precision must be between 4 and 20")
self.precision = precision
self.m = 2 ** precision
# Pre-allocate memory efficiently
self.registers = [1.0] * self.m
# Track statistics
self.elements_processed = 0
def _consistent_hash(self, element):
"""Ensure consistent hashing across different runs"""
# Convert to string for consistent hashing
element_str = str(element)
# Double hashing for entropy
inner = hashlib.sha256(element_str.encode('utf-8')).hexdigest()
outer = hashlib.sha256(inner.encode('utf-8')).hexdigest()
return outer
def add_batch(self, elements):
"""Process elements in batch with progress tracking"""
for element in tqdm(elements, desc="Processing elements"):
try:
self.add(element)
except Exception as e:
# Graceful error handling
print(f"Error processing element {element}: {e}")
continue
def get_memory_usage(self):
"""Comprehensive memory usage analysis"""
return {
'registers': sys.getsizeof(self.registers),
'total_objects': sys.getsizeof(self),
'elements_processed': self.elements_processed
}
Performance Optimization
🚀 Speed Optimizations
- Pre-compile hash functions
- Use numpy arrays for large sketches
- Batch processing with vectorization
- Minimize string operations
- Cache frequently used calculations
💾 Memory Optimizations
- Use appropriate data types (int32 vs int64)
- Pre-allocate arrays with known sizes
- Avoid unnecessary object creation
- Use generators for large datasets
- Implement memory pooling for sketches
# Performance-optimized implementation
import numpy as np
from numba import jit
class HighPerformanceHyperReal:
def __init__(self, precision=14):
self.precision = precision
self.m = 2 ** precision
# Use numpy arrays for better performance
self.registers = np.ones(self.m, dtype=np.float32)
# Pre-compile hash function
self._hash_cache = {}
@jit(nopython=True)
def _fast_bucket_update(self, bucket_idx, value):
"""JIT-compiled bucket update for speed"""
if self.registers[bucket_idx] > value:
self.registers[bucket_idx] = value
def add_batch_vectorized(self, elements):
"""Vectorized batch processing"""
# Convert to numpy array for vectorized operations
element_array = np.array(elements)
# Vectorized hashing (simplified example)
hashes = np.array([hash(str(e)) for e in element_array])
buckets = hashes % self.m
values = (hashes / (2**32 - 1)).astype(np.float32)
# Vectorized minimum operation
for bucket, value in zip(buckets, values):
self._fast_bucket_update(bucket, value)
def get_cardinality_estimate(self):
"""Optimized cardinality calculation"""
# Use numpy for faster computation
return (self.m ** 2) / np.sum(self.registers)
Testing and Validation
# Comprehensive testing framework from CardinalityKit
import statistics
import pandas as pd
class SketchTester:
def __init__(self, algorithm_class):
self.algorithm_class = algorithm_class
self.results = []
def run_accuracy_test(self, dataset, true_cardinality, k_values=None):
"""Test accuracy across different parameter values"""
if k_values is None:
k_values = range(8, 17)
for k in k_values:
estimates = []
# Multiple runs with different hash offsets
for iteration in range(8):
sketch = self.algorithm_class(k)
# Process dataset with hash offset
for item in dataset:
modified_item = f"{item}_{iteration}" # Hash offset
sketch.update_sketch(modified_item)
estimate = sketch.get_cardinality_estimate()
estimates.append(estimate)
# Calculate statistics
mean_estimate = statistics.mean(estimates)
std_estimate = statistics.pstdev(estimates)
error = (mean_estimate - true_cardinality) / true_cardinality
self.results.append({
'k': k,
'buckets': 2**k,
'true_cardinality': true_cardinality,
'mean_estimate': mean_estimate,
'std_estimate': std_estimate,
'error_percent': error * 100,
'memory_usage': sketch.get_memory_usage()
})
def run_performance_test(self, dataset_sizes):
"""Test performance across different dataset sizes"""
import time
performance_results = []
for size in dataset_sizes:
dataset = [f"item_{i}" for i in range(size)]
start_time = time.time()
sketch = self.algorithm_class(14) # Standard k=14
for item in dataset:
sketch.update_sketch(item)
processing_time = time.time() - start_time
performance_results.append({
'dataset_size': size,
'processing_time': processing_time,
'items_per_second': size / processing_time,
'memory_usage': sketch.get_memory_usage()
})
return performance_results
def generate_report(self):
"""Generate comprehensive test report"""
df = pd.DataFrame(self.results)
report = {
'summary': {
'best_accuracy_k': df.loc[df['error_percent'].abs().idxmin(), 'k'],
'mean_error': df['error_percent'].mean(),
'std_error': df['error_percent'].std(),
'memory_range': f"{df['memory_usage'].min()} - {df['memory_usage'].max()} bytes"
},
'detailed_results': df
}
return report
# Usage example
tester = SketchTester(HyperRealSketch)
dataset = generate_test_dataset(1000000) # 1M unique items
tester.run_accuracy_test(dataset, 1000000)
report = tester.generate_report()
Common Pitfalls and Solutions
⚠️ Common Implementation Mistakes
- Hash Inconsistency: Using different hash functions across platforms
- Bit Extraction Errors: Incorrect string slicing for bucket indexing
- Bias Correction Bugs: Wrong formulas or missing range corrections
- Memory Leaks: Not properly managing large datasets
- Precision Loss: Using inappropriate data types for calculations
# Common pitfalls and their solutions
# WRONG: Inconsistent hashing
def bad_hash(data):
return hashlib.md5(str(data).encode()).hexdigest() # Different algorithm
# CORRECT: Consistent double hashing
def good_hash(data):
inner = hashlib.sha256(str(data).encode('utf-8')).hexdigest()
outer = hashlib.sha256(inner.encode('utf-8')).hexdigest()
return '{:256b}'.format(int(outer, 16)).replace(' ', '0')
# WRONG: Incorrect bit extraction
bucket = int(hash_binary[0:k]) # Missing base conversion
# CORRECT: Proper bit extraction
bucket = int(hash_binary[:k], 2) # Convert binary string to int
# WRONG: Missing bias correction
estimate = (m ** 2) * harmonic_mean # Raw estimate only
# CORRECT: Proper bias correction
BIAS = 0.7213 / (1 + (1.079 / m)) if k >= 6 else 0.673
estimate = BIAS * (m ** 2) * harmonic_mean
# WRONG: Memory inefficient
registers = []
for i in range(m):
registers.append(0) # Slow list building
# CORRECT: Pre-allocated memory
registers = [0] * m # Fast pre-allocation
Production Deployment Considerations
| Aspect | Recommendation | Implementation |
|---|---|---|
| Precision | k=14 for most applications | 64KB memory, <1% error |
| Algorithm Choice | HyperReal for new projects | Better accuracy, simpler implementation |
| Serialization | Protocol Buffers or MessagePack | Efficient sketch storage and transmission |
| Monitoring | Track accuracy and performance | Log estimates vs ground truth when available |
| Scaling | Distributed sketch merging | Map-reduce pattern for large datasets |