Parallel Density-Based Spatial Clustering

By Yi Ge (Ellen)

Summary

This project focuses on optimizing the DBSCAN (Density-Based Spatial Clustering of Applications with Noise) algorithm for large datasets using various parallel computing strategies. Key challenges addressed include computational cost, random access inefficiencies, and workload imbalances in skewed datasets.

DBSCAN Example GIF 1 DBSCAN Example GIF 2

Key Knowledge and Formulas of DBSCAN

DBSCAN groups points into clusters based on density and labels points as core, border, or noise depending on their neighborhood density.

Methodology

Method Details Time Complexity Platform
NaiveDBScanner A naive implementation where we build neighbors by going through every pair of points, then we simply find all connected parts by performing Breadth-first search (BFS). O(n²) C++
CPU
OpenMPDBScanner The multi-threaded CPU implementation of NaiveDBScanner, achieved using C++ with OpenMP and SIMD instructions. O(n²/p) (p: number of threads) C++
CPU
OpenMP+SIMD
GraphDBScanner A sequential version of graph-based & serial DBSCAN, using CPU BFS. Average: O(n log n / p)
Best: O(n/p)
Worst: O(n²/p)
Theoretical: O(n²/p)
(p: number of GPU cores)
C++
GPU (for Graph Construction) & CPU (for BFS)
CUDA
ParallelDBScanner A CUDA version of the GraphDBScanner, which utilizes a compact adjacency list to represent the graph. Both graph construction (exclusive_scan with Thrust library) and cluster identification (BFS with level synchronization) are parallelized. Average: O(n log n / p)
Best: O(n/p)
Worst: O(n²/p)
Theoretical: O(n²/p)
(p: number of GPU cores)
C++
GPU
CUDA
SLScanner Basically invoking the sklearn.cluster.DBSCAN. For reference, this version includes a k-d tree for building neighbors faster. The BFS procedure is optimized using Cython (C extension for Python) for fair comparison.
Note: DBSAN implementation in Scikit-learn does not use a strict BFS algorithm. Instead, it employs a BFS-like approach to expand clusters.
Average: O(n²)
Empirical: O(n log n)
Python:
Scikit-Learn
Cython
ParallelDBScanner+ Optimizing the 1st stage of ParallelDBScanner with grid structure. The BFS part (2nd stage) stays the same as in ParallelDBScanner. Theoretical: O(n log n / p)
Average: O(n log n / p)
Best: O(n/p)
(p: number of GPU cores)
C++
GPU
CUDA
ParallelDBScanner++ A brand-new implementation: a hybrid CUDA MPI version (best suitable for Big Data platform like MapReduce or Spark).
Phase 1: Data Partitioning
Phase 2: Cell Graph Construction
Phase 3: Cell Graph Merging
Time complexity of an (ε, ρ)-region query with R*-tree or kd-tree:
O(log |cell|), where |cell| is the total number of cells in the two-level cell dictionary.
(p: a point)
C++
GPU
CUDA
MPI

Implementation

OpenMPDBScanner

#pragma omp parallel for num_threads(4)
for (size_t i = 0; i < points.size(); ++i) {
    for (size_t j = 0; j < points.size(); ++j) {
        if (i != j && euclidean_distance(points[i], points[j]) <= eps) {
            #pragma omp critical
            neighbors[i].push_back(j);
        }
    }
}

Explanation: The omp parallel for directive is used to parallelize the outer loop for building neighborhoods. This enables multiple threads to work concurrently, reducing computation time significantly.

#pragma omp parallel for schedule(dynamic) num_threads(4)
for (size_t i = 0; i < points.size(); ++i) {
    if (points[i].cluster_id == 0) { // Unclassified point
        if (neighbors[i].size() >= min_pts) {
            #pragma omp critical
            {
                cluster_id++;
                bfs_cluster(i, cluster_id, eps, min_pts, points, neighbors);
            }
        } else {
            points[i].cluster_id = -1; // Mark as noise
        }
    }
}

Explanation: Using schedule(dynamic), the loop dynamically assigns chunks of iterations to threads, ensuring better load balancing for unpredictable workloads.

ParallelDBScanner

        // Graph structure to represent the adjacency list in CSR format
        struct Graph {
            thrust::device_vector row_ptr;  // Row pointer for CSR format
            thrust::device_vector col_indices;  // Column indices for CSR format
        };
        

Explanation: The Graph structure is designed using CUDA's thrust library for efficient representation of adjacency lists.

        // CUDA kernel to perform BFS for cluster identification
        __global__ void bfs_kernel(int num_points, const int* row_ptr, const int* col_indices,
                                   int* cluster_ids, int current_cluster_id) {
            int idx = blockDim.x * blockIdx.x + threadIdx.x;
            if (idx < num_points && cluster_ids[idx] == 0) { // Unvisited point
                cluster_ids[idx] = current_cluster_id;
                for (int i = row_ptr[idx]; i < row_ptr[idx + 1]; ++i) {
                    int neighbor = col_indices[i];
                    if (cluster_ids[neighbor] == 0) {
                        cluster_ids[neighbor] = current_cluster_id;
                    }
                }
            }
        }
        

Explanation: The CUDA kernel efficiently assigns cluster IDs using BFS on the adjacency list.

        // CUDA device function to compute Euclidean distance
        __device__ float euclidean_distance(const Point& a, const Point& b) {
            float dx = a.x - b.x;
            float dy = a.y - b.y;
            return sqrtf(dx * dx + dy * dy);
        }
        
        // CUDA kernel to construct the adjacency list
        __global__ void construct_adjacency_list(const Point* points, int* row_ptr, int* col_indices,
                                                 int num_points, float eps) {
            int idx = blockDim.x * blockIdx.x + threadIdx.x;
            if (idx < num_points) {
                int count = 0;
                for (int j = 0; j < num_points; ++j) {
                    if (idx != j && euclidean_distance(points[idx], points[j]) <= eps) {
                        col_indices[row_ptr[idx] + count++] = j;
                    }
                }
            }
        }
        

Explanation: Device functions and kernels compute distances and construct adjacency lists for graph-based DBSCAN.

OpenMPDBScanner

#pragma omp parallel for num_threads(4)
for (size_t i = 0; i < points.size(); ++i) {
    for (size_t j = 0; j < points.size(); ++j) {
        if (i != j && euclidean_distance(points[i], points[j]) <= eps) {
            #pragma omp critical
            neighbors[i].push_back(j);
        }
    }
}

Explanation: The omp parallel for directive is used to parallelize the outer loop for building neighborhoods. This enables multiple threads to work concurrently, reducing computation time significantly.

ParallelDBScanner+

Binning Kernel

The binning kernel assigns each point to a specific cell in the grid for efficient neighbor identification.

__global__ void binning_kernel() {
    int v = blockIdx.x * blockDim.x + threadIdx.x;
    if (v >= cuConstParams.num_points) return;

    float2 point = cuConstParams.points[v];
    float side = cuConstParams.side;
    int col_idx = (point.x - cuConstParams.min_x) / side;
    int row_idx = (point.y - cuConstParams.min_y) / side;

    cuConstParams.bin_index[v] = row_idx * cuConstParams.num_cols + col_idx;
    cuConstParams.point_index[v] = v;
}

Explanation: Each point is mapped to a grid cell based on its coordinates. This spatial partitioning helps reduce search space for neighbor computation.

Sorting

After binning, points are sorted based on their cell index using thrust::sort_by_key.

// Use Thrust to sort points by their bin index
thrust::sort_by_key(d_bin_index.begin(), d_bin_index.end(), d_point_index.begin());

// Call the find_bin_start_kernel to determine the start and end indices for each bin
find_bin_start_kernel<<>>();

// Call the degree_kernel to calculate the degree for each point
degree_kernel<<>>();

// Use Thrust for exclusive scan to determine the start index in the adjacency list
thrust::exclusive_scan(d_degree.begin(), d_degree.end(), d_degree.begin());

Explanation: Sorting points within bins organizes data for efficient neighbor searching and cluster identification.

Finding Bin Start/End Indices

The kernel identifies the start and end indices of each cell in the sorted list, allowing efficient access to all points in a cell.

__global__ void find_bin_start_kernel() {
    int v = blockIdx.x * blockDim.x + threadIdx.x;
    if (v >= cuConstParams.num_points) return;

    int bin_idx = cuConstParams.bin_index[v];
    if (v == 0) {
        cuConstParams.bin_start_index[bin_idx] = 0;
    } else {
        int last_bin_idx = cuConstParams.bin_index[v - 1];
        if (bin_idx != last_bin_idx) {
            cuConstParams.bin_start_index[bin_idx] = v;
            cuConstParams.bin_end_index[last_bin_idx] = v;
        }
    }
    if (v == cuConstParams.num_points - 1) {
        cuConstParams.bin_end_index[bin_idx] = v + 1;
    }
}

Explanation: Start and end indices for bins allow quick neighbor lookups within the grid, improving performance.

Degree Calculation

The degree_kernel calculates the number of neighbors for each point by examining its cell and adjacent cells.

__global__ void degree_kernel() {
    int v = blockIdx.x * blockDim.x + threadIdx.x;
    if (v >= cuConstParams.num_points) return;

    size_t pid = cuConstParams.point_index[v];
    float2 p1 = cuConstParams.points[pid];
    int bin_idx = cuConstParams.bin_index[v];
    size_t degree = 0;

    for (int neighbor_bin_id = 0; neighbor_bin_id < 9; ++neighbor_bin_id) {
        degree += degree_in_bin(p1, neighbor_bin_id);
    }
    cuConstParams.degree[pid] = degree;
}

Explanation: The degree kernel finds neighbors within the specified range by scanning adjacent cells, enabling efficient clustering.

ParallelDBScanner++

Phase 1: Work Partitioning

// Phase 1: Partitioning the dataset
t.reset();
initialize(points, eps, minPts);
build_global_graph(parameters, point_index, cell_start_index, cell_end_index);
std::vector assigned_cells = random_distribute(numTasks, taskid);
size_t assigned_cell_count = assigned_cells.size();

double partitioningTime = t.elapsed();
if (taskid == MASTER) {
    printf("Step 1. Time taken for partitioning: %.6fms\n", partitioningTime);
}
                    
    

Explanation: This step partitions the data into cells, with each cell assigned to an MPI worker. The partitions are split randomly across workers for load balancing.

Phase 2: Partial Graph Construction

// Phase 2: Construct a partial clustering graph
std::vector point_ids;
std::vector is_core_points;
std::vector adjacency_list;
std::vector partition_data;

t.reset();
buildLocalCellGraph(local_cell_index,
                    point_ids,
                    is_core_points,
                    adjacency_list,
                    partition_data);

double graphConstructionTime = t.elapsed();
if (taskid == MASTER) {
    printf("Step 2. Time for constructing partial cell graph: %.6fms\n", graphConstructionTime);
}

size_t point_count = point_ids.size();
size_t adj_list_size = adjacency_list.size();
                    
    

Explanation: Each worker constructs a local graph for its assigned cells. Core points and edges between cells are identified during this process.

Phase 3: Graph Merging and Point Labeling

// Phase 3: Combine clustering graphs
if (taskid != MASTER) {
    // Workers send their local graphs to the master node
    MPI_Send(&adjacency_list_size, 1, MPI_UNSIGNED_LONG, MASTER, 1, MPI_COMM_WORLD);
    MPI_Send(&adjacency_list[0], adjacency_list_size, MPI_UNSIGNED_LONG, MASTER, 2, MPI_COMM_WORLD);
    MPI_Send(&point_ids[0], point_count, MPI_UNSIGNED_LONG, MASTER, 3, MPI_COMM_WORLD);
    MPI_Send(&is_core_points[0], point_count, MPI_SHORT, MASTER, 4, MPI_COMM_WORLD);
} else {
    // Master node gathers graphs from workers and performs merging
    for (int worker_id = 1; worker_id < numTasks; ++worker_id) {
        size_t received_adj_list_size;
        MPI_Recv(&received_adj_list_size, 1, MPI_UNSIGNED_LONG, worker_id, 1, MPI_COMM_WORLD, &status);
        
        std::vector received_adj_list(received_adj_list_size);
        MPI_Recv(&received_adj_list[0], received_adj_list_size, MPI_UNSIGNED_LONG, worker_id, 2, MPI_COMM_WORLD, &status);
        
        std::vector received_point_ids(point_count);
        MPI_Recv(&received_point_ids[0], point_count, MPI_UNSIGNED_LONG, worker_id, 3, MPI_COMM_WORLD, &status);
        
        std::vector received_is_core_points(point_count);
        MPI_Recv(&received_is_core_points[0], point_count, MPI_SHORT, worker_id, 4, MPI_COMM_WORLD, &status);

        // Combine the received graph with the master's graph
        mergeGraphs(partition_data, adjacency_list,
                    received_partition, received_adj_list);

        adjacency_list = merged_adjacency_list;
    }
}

// Assign labels to points
for (size_t idx = 0; idx < point_ids.size(); ++idx) {
    if (is_core_points[idx]) {
        assignCoreLabel(point_ids[idx]);
    } else {
        assignNonCoreLabel(point_ids[idx], adjacency_list);
    }
}
                    
    

Explanation: In this phase, partial graphs from workers are sent to the master and merged into a global graph. Points are then labeled using BFS or adjacency information.

SLScanner

Perform BFS Clustering with KD-Tree

%%cython
import numpy as np
cimport numpy as np
from libc.math cimport sqrt
from sklearn.neighbors import KDTree

def bfs_cluster(int point_idx, int cluster_id, double eps, int min_pts, 
                double[:, :] points, long[:] labels, object kdtree):
    cdef int i, j, n_points
    cdef long[:] neighbors
    cdef double[:, :] point = np.empty((1, points.shape[1]), dtype=np.double)
    cdef list q = [point_idx]
    labels[point_idx] = cluster_id

    while q:
        current_point = q.pop(0)
        for j in range(points.shape[1]):
            point[0, j] = points[current_point, j]
        neighbors = kdtree.query_radius(point, r=eps)[0]

        if len(neighbors) >= min_pts:
            for i in neighbors:
                if labels[i] == 0:  # Unclassified
                    labels[i] = cluster_id
                    q.append(i)

def cython_dbscan(double[:, :] points, double eps, int min_pts):
    cdef int i, n_points = points.shape[0]
    cdef long[:] labels = np.zeros(n_points, dtype=np.int64)
    cdef int cluster_id = 0
    cdef object kdtree = KDTree(points, metric='euclidean')

    for i in range(n_points):
        if labels[i] == 0:  # Unclassified
            neighbors = kdtree.query_radius(points[i:i+1], r=eps)[0]
            if len(neighbors) >= min_pts:
                cluster_id += 1
                bfs_cluster(i, cluster_id, eps, min_pts, points, labels, kdtree)
            else:
                labels[i] = -1  # Noise

    return np.array(labels, dtype=np.int64)

Explanation: In Scikit-Learn's DBSCAN implementation, the BFS clustering process uses a KD-Tree to efficiently reduce the time complexity from O(n²) to O(n log n). The provided bfs_cluster function utilizes the KD-Tree to find neighbors and label clusters. The main cython_dbscan function orchestrates the clustering process while leveraging the KD-Tree for efficient neighborhood queries.

Analysis

Analysis was conducted on synthetic datasets (Moons, Blobs, and Chameleon) to evaluate runtime, scalability, and clustering accuracy. Our setup included an NVIDIA RTX 4080 Ti GPU and a quad-core Intel i7 CPU.

Performance on Moons Dataset

#samples NaiveDBScanner OpenMPDBScanner SLScanner ParallelDBScanner ParallelDBScanner+ ParallelDBScanner++
1,000 27 0.7 30.05 10.6 10.6 5.7
10,000 2,569.3 44.2 321.65 40 38.9 0.1
50,000 63,556.6 868.7 2,157.44 202.5 197.2 4
100,000 253,360 3,070.1 5,949.36 411.7 406.7 13
200,000 1,016,081 11,613.1 19,672.82 845.1 835.9 41.1

Analysis - Runtime

Performance Chart 1
Analysis - Performance on Moons Dataset
Analysis - Comparison

Note: Use plt.xscale('log') and plt.yscale('log'): A logarithmic scale allows us to visualize data with a wide range of values more effectively by scaling the axis by orders of magnitude. This makes patterns and trends more apparent, especially when dealing with exponential growth or data that varies greatly in size.

Analysis - # Threads (OpenMPDBScanner - CPU)

Analysis - # Threads (OpenMPDBScanner - CPU)

Analysis - Block Size (ParallelDBScanner - GPU & CUDA)

Analysis - Block Size (ParallelDBScanner - GPU & CUDA)

Blocksize: # threads in each CUDA thread block. Each CUDA Streaming Multiprocessor (SM) can manage and schedule a limited number of threads, usually 1024 or 2048. The blockSize needs to consider hardware limitations to fully utilize GPU resources. For compute-intensive tasks, a larger blockSize can often increase parallelism. For memory-intensive tasks, a smaller blockSize may be needed to reduce contention for shared memory and registers.

Performance Comparison on Dataset Blobs

Performance Comparison on Dataset Blobs

Performance Comparison on Dataset Chameleon

Performance Comparison on Dataset Chameleon

Fine-tuning

To fine-tune the parameters in ParallelDBScanner++ and evaluate the clustering performance, follow these steps:

  1. Select Parameter Ranges: First, define the range of parameters you want to experiment with. The key parameters are eps (neighborhood radius) and min_samples (minimum number of points required to form a core point). Choose a reasonable range of values for these parameters to experiment with.
  2. Run Clustering: For each set of parameter values, run the ParallelDBScanner++ to perform clustering.
  3. Calculate Evaluation Metrics:
    • Silhouette Coefficient: This metric assesses the quality of the clustering, ranging from -1 to 1. A higher value indicates better clustering quality.
    • Adjusted Rand Index (ARI): This metric evaluates the consistency between the clustering results and the true labels. A higher ARI indicates a closer match between the clustering results and the true classification.
    • Proportion of Noise Points: Calculate the proportion of points labeled as noise in the clustering results. A higher proportion might indicate that the eps value is set too low, or that the data contains many outliers.
  4. Analyze and Select Optimal Parameters: Compare the evaluation metrics across different parameter settings. Select the parameter combination that yields the highest Silhouette Coefficient, the highest ARI, and a reasonable proportion of noise points as the optimal parameters.

Finetune (Moons: 1k ~ 200k; Blobs & Chameleon: 200k) on optimal DBScanner (Parallel++)

Finetuning Results on Moons Dataset
Finetuning Results on Blobs Dataset
Finetuning Results on Chameleons Dataset

References

  1. G. Andrade, G. Ramos, D. Madeira, R. Sachetto, R. Ferreira, and L. Rocha. G-DBSCAN: A GPU Accelerated Algorithm for Density-Based Clustering. Procedia Computer Science, 18:369–378, 2013.
  2. M. Ester, H.-P. Kriegel, J. Sander, and X. Xu. A Density-Based Algorithm for Discovering Clusters in Large Spatial Databases with Noise. In KDD, volume 96, pages 226–231, 1996.
  3. Song, H., & Lee, J. G. (2018, May). RP-DBSCAN: A Superfast Parallel DBSCAN Algorithm Based on Random Partitioning. In Proceedings of the 2018 International Conference on Management of Data (pp. 1173-1187).
  4. He, Y., Tan, H., Luo, W., Feng, S., & Fan, J. (2014). MR-DBSCAN: A Scalable MapReduce-Based DBSCAN Algorithm for Heavily Skewed Data. Frontiers of Computer Science, 8, 83-99.
  5. Lulli, A., Dell'Amico, M., Michiardi, P., & Ricci, L. (2016). NG-DBSCAN: Scalable Density-Based Clustering for Arbitrary Data. Proceedings of the VLDB Endowment, 10(3), 157-168.
  6. Cordova, I., & Moh, T. S. (2015, July). DBSCAN on Resilient Distributed Datasets. In 2015 International Conference on High Performance Computing & Simulation (HPCS) (pp. 531-540). IEEE.