Implementation Guide

Complete Python Implementations with Best Practices

CardinalityKit Documentation

CardinalityKit Architecture

CardinalityKit provides a complete implementation of cardinality estimation algorithms with a focus on research and practical applications. The codebase follows consistent patterns and best practices derived from the analysis.

🏗️ Project Structure

  • Core Algorithms: Individual implementations of each algorithm
  • Extended Versions: Demographic-aware variants
  • Panel Integration: TV panel to sketch conversion
  • Simulation Framework: Comprehensive testing and comparison

Core Implementation Patterns

# Standard algorithm implementation pattern from CardinalityKit class AlgorithmSketch: def __init__(self, b_m, additional_params=None): """ Initialize sketch with bucket bits and parameters Args: b_m: Number of bits for bucket indexing (2^b_m buckets) additional_params: Algorithm-specific parameters """ self.b_m = b_m self.m = 2 ** b_m self.registers = self._initialize_registers() def _initialize_registers(self): """Initialize data structures based on algorithm type""" # HLL: initialize to 0 (for max rank) # HR: initialize to 1.0 (for min value) pass def _hash_function(self, data): """ Double hashing for increased entropy Standard pattern across all CardinalityKit implementations """ inner_hash = hashlib.sha256(str(data).encode('utf-8')).hexdigest() outer_hash = hashlib.sha256(str(inner_hash).encode('utf-8')).hexdigest() # Convert to 256-bit binary with zero padding hashed_ = '{:256b}'.format(int(outer_hash, 16)).replace(' ', '0') # Handle bit length constraints off = 32 if (self.b_m) * 4 + off > 256: hashed = hashed_ else: hashed = hashed_[1:(self.b_m) * 3] return hashed def update_sketch(self, data): """Process single data point - algorithm-specific implementation""" pass def get_sketch(self): """Return internal data structures""" return self.registers def get_cardinality_estimate(self): """Calculate and return cardinality estimate""" pass

HyperLogLog Implementation

import math import sys import hashlib class HyperLogLogSketch: def __init__(self, b_m): self.b_m = b_m self.m = 2 ** b_m self.registers = [0] * self.m def _hash_function(self, data): """Double hashing for increased entropy""" inner_hash = hashlib.sha256(str(data).encode('utf-8')).hexdigest() outer_hash = hashlib.sha256(str(inner_hash).encode('utf-8')).hexdigest() hashed_ = '{:256b}'.format(int(outer_hash, 16)).replace(' ', '0') return hashed_ def update_sketch(self, data): """Add element to HyperLogLog sketch""" x = self._hash_function(data) # Extract bucket index (first b_m bits) j = int(str(x)[:self.b_m], 2) # Calculate rank from remaining bits remaining_bits = str(x)[self.b_m:] w = 1 for c in reversed(remaining_bits): if c == "0": w += 1 else: break # Update bucket with maximum rank self.registers[j] = max(self.registers[j], w) def get_cardinality_estimate(self): """Calculate HyperLogLog cardinality estimate""" # Harmonic mean calculation total = sum(2 ** (-1 * bucket) for bucket in self.registers) harmonic_mean = total ** -1 # Raw estimate estimate = (self.m ** 2) * harmonic_mean # Bias correction if self.b_m <= 4: BIAS = 0.673 elif self.b_m == 5: BIAS = 0.697 else: BIAS = 0.7213 / (1 + (1.079 / self.m)) estimate = BIAS * estimate # Small range correction if estimate < ((5 / 2) * self.m): zeros = sum(1 for bucket in self.registers if bucket == 0) if zeros != 0: estimate = self.m * math.log(self.m / zeros) # Large range correction elif estimate > ((2 ** 32) / 30): estimate = -1 * (2 ** 32) * math.log(1 - (estimate / (2 ** 32))) return estimate def get_memory_usage(self): """Return memory usage in bytes""" return sys.getsizeof(self.registers)

HyperReal Implementation

class HyperRealSketch: def __init__(self, b_m): self.b_m = b_m self.m = 2 ** b_m self.registers = [1.0] * self.m # Initialize to 1.0 for minimum values def _hash_function(self, data): """Same double hashing as HLL""" inner_hash = hashlib.sha256(str(data).encode('utf-8')).hexdigest() outer_hash = hashlib.sha256(str(inner_hash).encode('utf-8')).hexdigest() hashed_ = '{:256b}'.format(int(outer_hash, 16)).replace(' ', '0') return hashed_ def update_sketch(self, data): """Add element to HyperReal sketch""" x = self._hash_function(data) # Extract bucket index (first b_m bits) j = int(str(x)[:self.b_m], 2) # Normalize hash to [0,1] range int_val = int(hashlib.sha256(str(data).encode()).hexdigest()[:8], 16) w = int_val / (16 ** 8 - 1) # Update bucket with minimum value self.registers[j] = min(self.registers[j], w) def get_cardinality_estimate(self): """Calculate HyperReal cardinality estimate (unbiased)""" # Simple formula - no bias correction needed estimate = (self.m ** 2) / sum(self.registers) return estimate def get_memory_usage(self): """Return memory usage in bytes""" return sys.getsizeof(self.registers)

Extended Algorithm Implementation

class ExtendedHyperRealSketch: def __init__(self, b_m, b_s): self.b_m = b_m # Bucket bits self.b_s = b_s # Attribute bits self.m = 2 ** b_m self.s = 2 ** b_s # Core data structures self.registers = [1.0] * self.m self.frequency_counts = [0] * self.m self.attribute_samples = [None] * self.m def _hash_function(self, data): """Enhanced hash function for extended algorithms""" inner_hash = hashlib.sha256(str(data).encode('utf-8')).hexdigest() outer_hash = hashlib.sha256(str(inner_hash).encode('utf-8')).hexdigest() hashed_ = '{:256b}'.format(int(outer_hash, 16)).replace(' ', '0') # Ensure sufficient bits for bucket + attribute indexing off = 32 if (self.b_m + self.b_s) * 4 + off > 256: hashed = hashed_ else: hashed = hashed_[1:(self.b_m + self.b_s) * 3] return hashed def update_sketch(self, event): """ Update sketch with user ID and attribute Args: event: dict with 'id_to_count' and 'attribute' keys """ user_id = event['id_to_count'] user_attr = event['attribute'] x = self._hash_function(user_id) j = int(str(x)[:self.b_m], 2) # Bucket index # Normalize hash to [0,1] range int_val = int(hashlib.sha256(str(user_id).encode()).hexdigest()[:8], 16) w = int_val / (16 ** 8 - 1) # Update bucket (simpler than HLL due to float precision) if self.registers[j] > w: self.frequency_counts[j] = 1 self.attribute_samples[j] = user_attr elif self.registers[j] == w: # Extremely rare with floats self.frequency_counts[j] += 1 if self.attribute_samples[j] != user_attr: self.attribute_samples[j] = user_attr self.registers[j] = min(self.registers[j], w) def get_cardinality_estimate(self): """Total cardinality estimate""" return (self.m ** 2) / sum(self.registers) def get_frequency_for_attr(self, attr=None): """Get cardinality estimate for specific attribute or all attributes""" # Collect attribute distribution from buckets attr_distribution = {} for count, attr_sample in zip(self.frequency_counts, self.attribute_samples): if attr_sample: if attr_sample in attr_distribution: attr_distribution[attr_sample] += count else: attr_distribution[attr_sample] = count # Normalize to probabilities total = sum(attr_distribution.values()) if total > 0: for attr_sample in attr_distribution: attr_distribution[attr_sample] /= total # Scale by total cardinality estimate total_cardinality = self.get_cardinality_estimate() for attr_sample in attr_distribution: attr_distribution[attr_sample] *= total_cardinality # Return specific attribute or full distribution if attr: return attr_distribution.get(attr, 0) else: return attr_distribution def get_sketch(self): """Return all internal data structures""" return self.registers, self.frequency_counts, self.attribute_samples

Best Practices from CardinalityKit

✅ Implementation Best Practices

  • Double Hashing: Always use SHA256(SHA256(data)) for increased entropy
  • Binary Conversion: Use 256-bit binary with proper zero padding
  • Memory Pre-allocation: Initialize lists with known sizes ([0] * m, [1.0] * m)
  • Error Handling: Use try/except for attribute processing with graceful degradation
  • Progress Tracking: Use tqdm for long-running operations
  • Memory Tracking: Return sys.getsizeof() for memory analysis
# Best practices implementation example import hashlib import sys from tqdm import tqdm class OptimizedSketch: def __init__(self, precision=14): # Validate parameters if precision < 4 or precision > 20: raise ValueError("Precision must be between 4 and 20") self.precision = precision self.m = 2 ** precision # Pre-allocate memory efficiently self.registers = [1.0] * self.m # Track statistics self.elements_processed = 0 def _consistent_hash(self, element): """Ensure consistent hashing across different runs""" # Convert to string for consistent hashing element_str = str(element) # Double hashing for entropy inner = hashlib.sha256(element_str.encode('utf-8')).hexdigest() outer = hashlib.sha256(inner.encode('utf-8')).hexdigest() return outer def add_batch(self, elements): """Process elements in batch with progress tracking""" for element in tqdm(elements, desc="Processing elements"): try: self.add(element) except Exception as e: # Graceful error handling print(f"Error processing element {element}: {e}") continue def get_memory_usage(self): """Comprehensive memory usage analysis""" return { 'registers': sys.getsizeof(self.registers), 'total_objects': sys.getsizeof(self), 'elements_processed': self.elements_processed }

Performance Optimization

🚀 Speed Optimizations

  • Pre-compile hash functions
  • Use numpy arrays for large sketches
  • Batch processing with vectorization
  • Minimize string operations
  • Cache frequently used calculations

💾 Memory Optimizations

  • Use appropriate data types (int32 vs int64)
  • Pre-allocate arrays with known sizes
  • Avoid unnecessary object creation
  • Use generators for large datasets
  • Implement memory pooling for sketches
# Performance-optimized implementation import numpy as np from numba import jit class HighPerformanceHyperReal: def __init__(self, precision=14): self.precision = precision self.m = 2 ** precision # Use numpy arrays for better performance self.registers = np.ones(self.m, dtype=np.float32) # Pre-compile hash function self._hash_cache = {} @jit(nopython=True) def _fast_bucket_update(self, bucket_idx, value): """JIT-compiled bucket update for speed""" if self.registers[bucket_idx] > value: self.registers[bucket_idx] = value def add_batch_vectorized(self, elements): """Vectorized batch processing""" # Convert to numpy array for vectorized operations element_array = np.array(elements) # Vectorized hashing (simplified example) hashes = np.array([hash(str(e)) for e in element_array]) buckets = hashes % self.m values = (hashes / (2**32 - 1)).astype(np.float32) # Vectorized minimum operation for bucket, value in zip(buckets, values): self._fast_bucket_update(bucket, value) def get_cardinality_estimate(self): """Optimized cardinality calculation""" # Use numpy for faster computation return (self.m ** 2) / np.sum(self.registers)

Testing and Validation

# Comprehensive testing framework from CardinalityKit import statistics import pandas as pd class SketchTester: def __init__(self, algorithm_class): self.algorithm_class = algorithm_class self.results = [] def run_accuracy_test(self, dataset, true_cardinality, k_values=None): """Test accuracy across different parameter values""" if k_values is None: k_values = range(8, 17) for k in k_values: estimates = [] # Multiple runs with different hash offsets for iteration in range(8): sketch = self.algorithm_class(k) # Process dataset with hash offset for item in dataset: modified_item = f"{item}_{iteration}" # Hash offset sketch.update_sketch(modified_item) estimate = sketch.get_cardinality_estimate() estimates.append(estimate) # Calculate statistics mean_estimate = statistics.mean(estimates) std_estimate = statistics.pstdev(estimates) error = (mean_estimate - true_cardinality) / true_cardinality self.results.append({ 'k': k, 'buckets': 2**k, 'true_cardinality': true_cardinality, 'mean_estimate': mean_estimate, 'std_estimate': std_estimate, 'error_percent': error * 100, 'memory_usage': sketch.get_memory_usage() }) def run_performance_test(self, dataset_sizes): """Test performance across different dataset sizes""" import time performance_results = [] for size in dataset_sizes: dataset = [f"item_{i}" for i in range(size)] start_time = time.time() sketch = self.algorithm_class(14) # Standard k=14 for item in dataset: sketch.update_sketch(item) processing_time = time.time() - start_time performance_results.append({ 'dataset_size': size, 'processing_time': processing_time, 'items_per_second': size / processing_time, 'memory_usage': sketch.get_memory_usage() }) return performance_results def generate_report(self): """Generate comprehensive test report""" df = pd.DataFrame(self.results) report = { 'summary': { 'best_accuracy_k': df.loc[df['error_percent'].abs().idxmin(), 'k'], 'mean_error': df['error_percent'].mean(), 'std_error': df['error_percent'].std(), 'memory_range': f"{df['memory_usage'].min()} - {df['memory_usage'].max()} bytes" }, 'detailed_results': df } return report # Usage example tester = SketchTester(HyperRealSketch) dataset = generate_test_dataset(1000000) # 1M unique items tester.run_accuracy_test(dataset, 1000000) report = tester.generate_report()

Common Pitfalls and Solutions

⚠️ Common Implementation Mistakes

  • Hash Inconsistency: Using different hash functions across platforms
  • Bit Extraction Errors: Incorrect string slicing for bucket indexing
  • Bias Correction Bugs: Wrong formulas or missing range corrections
  • Memory Leaks: Not properly managing large datasets
  • Precision Loss: Using inappropriate data types for calculations
# Common pitfalls and their solutions # WRONG: Inconsistent hashing def bad_hash(data): return hashlib.md5(str(data).encode()).hexdigest() # Different algorithm # CORRECT: Consistent double hashing def good_hash(data): inner = hashlib.sha256(str(data).encode('utf-8')).hexdigest() outer = hashlib.sha256(inner.encode('utf-8')).hexdigest() return '{:256b}'.format(int(outer, 16)).replace(' ', '0') # WRONG: Incorrect bit extraction bucket = int(hash_binary[0:k]) # Missing base conversion # CORRECT: Proper bit extraction bucket = int(hash_binary[:k], 2) # Convert binary string to int # WRONG: Missing bias correction estimate = (m ** 2) * harmonic_mean # Raw estimate only # CORRECT: Proper bias correction BIAS = 0.7213 / (1 + (1.079 / m)) if k >= 6 else 0.673 estimate = BIAS * (m ** 2) * harmonic_mean # WRONG: Memory inefficient registers = [] for i in range(m): registers.append(0) # Slow list building # CORRECT: Pre-allocated memory registers = [0] * m # Fast pre-allocation

Production Deployment Considerations

Aspect Recommendation Implementation
Precision k=14 for most applications 64KB memory, <1% error
Algorithm Choice HyperReal for new projects Better accuracy, simpler implementation
Serialization Protocol Buffers or MessagePack Efficient sketch storage and transmission
Monitoring Track accuracy and performance Log estimates vs ground truth when available
Scaling Distributed sketch merging Map-reduce pattern for large datasets