Alink ramble: cluster evaluation of source code analysis

Alink ramble (22): cluster evaluation of source code analysis

0x00 summary

Alink is a new generation of machine learning algorithm platform developed by Alibaba based on Flink, a real-time computing engine. It is the first machine learning platform in the industry to support batch algorithm and streaming algorithm at the same time. This article and the above will lead you to analyze the implementation of clustering evaluation in alink.

0x01 background concept

1.1 what is clustering

Clustering, in popular words, means that birds of a feather flock together and people flock together.

Clustering is observational learning, not example learning. Clustering can be used as an independent tool to obtain the distribution of data, observe the characteristics of each cluster of data, and focus on the specific cluster for further analysis.

Cluster analysis can also be used as a preprocessing step for other data mining tasks (such as classification and association rules).

1.2 cluster analysis method

Cluster analysis can be roughly divided into the following methods:

Division method

  • Construct various partitions and then evaluate them by some criterion,e.g.,minimizing the sum of square errors
  • Typical methods:k-means,k-medoids,CLARANS

Hierarchical approach:

  • Create a hierarchical decomposition of the set of data (or objects) using some criterion
  • Typical methods:Diana,Agnes,BIRCH,CAMELEON

Density based approach:

  • Based on connectivity and density functions
  • Typical methods:DBSCAN,OPTICS,DenClue

Grid based approach:

  • Based on multiple-level granularity structure
  • Typical methods:STING,WaveCluster,CLIQUE

Model based approach:

  • A model is hypothesized for each of the clusters and tries to find the best fit of that model to each other
  • Typical methods:EM,SOM,COBWEB

Frequent pattern based approach:

  • Based on the analysis of frequent patterns
  • Typical methods:p-Cluster

Constraint based approach:

  • Clustering by considering user-specified or application-specific constraints
  • Typical methods:COD(obstacles),constrained clustering

Link based approach:

  • Objects are often linked together in various ways
  • Massive links can be used to cluster objects:SimRank,LinkClus

1.3 cluster evaluation

Clustering evaluation estimates the feasibility of clustering on the data set and the quality of the results produced by the clustering method. Clustering evaluation mainly includes: estimating the clustering trend, determining the number of clusters in the data set, and measuring the clustering quality.

Estimate clustering trend: for a given data set, evaluate whether the data set has a non random structure. Blindly using clustering methods on data sets will return some clusters, and the mined clusters may be misleading. Cluster analysis on data sets is meaningful only if there are non random structures in the data.

Clustering trend evaluation determines whether a given data set has a non random structure that can lead to meaningful clustering. A data set without any non random structure, such as evenly distributed points in the data space, although the clustering algorithm can return clusters for the data set, these clusters are random and meaningless. Clustering requires non-uniform distribution of data.

Measuring the clustering quality: after using the clustering method on the data set, it is necessary to evaluate the quality of the result cluster.

There are two kinds of methods: external method and internal method

  • External method: a supervised method requires benchmark data. A certain measure is used to judge the consistency between the clustering results and the benchmark data.
  • Intrinsic method: an unsupervised method without baseline data. The degree of intra class aggregation and inter class dispersion.

0x02 evaluation indicators supported by alink

Alink document is as follows: clustering evaluation is to evaluate the effect of the prediction results of clustering algorithm, and supports the following evaluation indicators. But in fact, you can find more from its test code.

Compactness (CP): the lower the CP, the closer the clustering distance within the class
C P i ‾ = 1 ∣ C i ∣ ∑ x ∈ C i ∥ x i − u i ∥ \overline{CP_i}=\dfrac{1}{|C_i|}\sum_{x \in C_i}\|x_i-u_i\| CPi​​=∣Ci​∣1​x∈Ci​∑​∥xi​−ui​∥

C P ‾ = 1 k ∑ i = 1 k C P k ‾ \overline{CP}=\dfrac{1}{k}\sum_{i=1}^{k}\overline{CP_k} CP=k1​i=1∑k​CPk​​

Seperation (SP): the higher the SP, the farther the clustering distance between classes
S P = 2 k 2 − k ∑ i = 1 k ∑ j = i + 1 k ∥ u i − u j ∥ SP=\dfrac{2}{k^2-k}\sum_{i=1}^{k}\sum_{j=i+1}^{k}\|u_i-u_j\| SP=k2−k2​i=1∑k​j=i+1∑k​∥ui​−uj​∥
Davies bouldin index (DB). The smaller the DB, the smaller the distance within the class and the larger the distance between classes
D B = 1 k ∑ i = 1 k m a x ( C P i ‾ + C P j ‾ ∥ u i − u j ∥ ) , i ≠ j DB=\dfrac{1}{k}\sum_{i=1}^{k}max(\dfrac{\overline{CP_i}+\overline{CP_j}}{\|u_i-u_j\|}), i \not= j DB=k1​i=1∑k​max(∥ui​−uj​∥CPi​​+CPj​​​),i​=j
Calinski harabasz index (VRC). The larger the VRC, the better the clustering quality
S S B = ∑ i = 1 k n i ∥ u i − u ∥ 2 SSB=\sum_{i=1}^{k}n_i\|u_i-u\|^2 SSB=i=1∑k​ni​∥ui​−u∥2

S S W = ∑ i = 1 k ∑ x ∈ C i ∥ x i − u i ∥ SSW=\sum_{i=1}^{k}\sum_{x \in C_i}\|x_i-u_i\| SSW=i=1∑k​x∈Ci​∑​∥xi​−ui​∥

V R C = S S B S S W ∗ N − k k − 1 VRC=\dfrac{SSB}{SSW}*\dfrac{N-k}{k-1} VRC=SSWSSB​∗k−1N−k​

From its test code, we can find more indicators:

Assert.assertEquals(metrics.getCalinskiHarabaz(), 12150.00, 0.01);
Assert.assertEquals(metrics.getCompactness(), 0.115, 0.01);
Assert.assertEquals(metrics.getCount().intValue(), 6);
Assert.assertEquals(metrics.getDaviesBouldin(), 0.014, 0.01);
Assert.assertEquals(metrics.getSeperation(), 15.58, 0.01);
Assert.assertEquals(metrics.getK().intValue(), 2);
Assert.assertEquals(metrics.getSsb(), 364.5, 0.01);
Assert.assertEquals(metrics.getSsw(), 0.119, 0.01);
Assert.assertEquals(metrics.getPurity(), 1.0, 0.01);
Assert.assertEquals(metrics.getNmi(), 1.0, 0.01);
Assert.assertEquals(metrics.getAri(), 1.0, 0.01);
Assert.assertEquals(metrics.getRi(), 1.0, 0.01);
Assert.assertEquals(metrics.getSilhouetteCoefficient(), 0.99,0.01);

We need to introduce several indicators

2.1 contour coefficient:

For each object o in D, calculate:

  • a(o): the average distance a(o) between O and other objects in the cluster to which o belongs.
  • b(o): is the minimum average distance from O to all clusters without o.

The contour coefficient is defined as:
s ( o ) = b ( o ) − a ( o ) m a x { a ( o ) , b ( o ) } s(o)=\dfrac{b(o)-a(o)}{max\{a(o),b(o)\}} s(o)=max{a(o),b(o)}b(o)−a(o)​
The value of the contour coefficient is between - 1 and 1.

The value of a(o) reflects the compactness of the cluster to which o belongs. The smaller the value, the more compact the cluster.

The value of b(o) captures the degree of separation of O from other clusters. The larger the value of b(o), the more separated o from other clusters.

When the contour coefficient of O is close to 1, the cluster containing o is compact and O is far away from other clusters, which is a desirable case.

When the value of the contour coefficient is negative, this means that in the expected case, o is closer to objects in other clusters than objects in the same cluster. In many cases, this is very bad and should be avoided.

2.2 Calinski-Harabaz(CH)

The CH index measures the compactness within the class by calculating the sum of squares of the distance between each point in the class and the center of the class, and measures the separation of the data set by calculating the sum of squares of the distance between each center point and the center point of the data set. The CH index is obtained from the ratio of separation and compactness. Thus, the larger the CH is, the closer the class itself is, and the more scattered between classes is, that is, the better clustering result.

CH and contour coefficient are applicable to the case where the actual category information is unknown.

2.3 Davies bouldin index (Dbi)

Davidson bauding index (DBI), also known as classification accuracy index, is an index proposed by David L. Davis and Donald Bouldin to evaluate the advantages and disadvantages of clustering algorithm.

This DBI is to calculate the ratio of the sum of the distance within the class to the distance outside the class to optimize the selection of K value and avoid the local optimization caused by only calculating the objective function Wn in the K-means algorithm.

2.4 Rand index (RI), adjusted Rand index (ARI)

Where C represents the actual category information, K represents the clustering result, a represents the logarithm of elements in the same category in C and K, and b represents the logarithm of elements in different categories in C and K.

The RI value range is [0,1]. The larger the value, the more consistent the clustering results are with the real situation. The larger the RI, the higher the accuracy of clustering effect, and the higher the purity in each class

In order to realize that "when the clustering results are generated randomly, the index should be close to zero", the Adjusted rand index is proposed, which has higher discrimination:

The value range of ARI is [− 1,1]. The larger the value, the more consistent the clustering results are with the real situation. In a broad sense, ARI measures the degree of coincidence between the two data distributions.

0x03 example code

The cluster evaluation example code is as follows:

public class EvalClusterBatchOpExp {
    public static void main(String[] args) throws Exception {
        Row[] rows = new Row[] {
                Row.of(0, "0,0,0"),
                Row.of(0, "0.1,0.1,0.1"),
                Row.of(0, "0.2,0.2,0.2"),
                Row.of(1, "9,9,9"),
                Row.of(1, "9.1,9.1,9.1"),
                Row.of(1, "9.2,9.2,9.2")
        };

        MemSourceBatchOp inOp = new MemSourceBatchOp(Arrays.asList(rows), new String[] {"label", "Y"});

        KMeans train = new KMeans()
                .setVectorCol("Y")
                .setPredictionCol("pred")
                .setK(2);

        ClusterMetrics metrics = new EvalClusterBatchOp()
                .setPredictionCol("pred")
                .setVectorCol("Y")
                .setLabelCol("label")
                .linkFrom(train.fit(inOp).transform(inOp))
                .collectMetrics();

        System.out.println(metrics.getCalinskiHarabaz());
        System.out.println(metrics.getCompactness());
        System.out.println(metrics.getCount());
        System.out.println(metrics.getDaviesBouldin());
        System.out.println(metrics.getSeperation());
        System.out.println(metrics.getK());
        System.out.println(metrics.getSsb());
        System.out.println(metrics.getSsw());
        System.out.println(metrics.getPurity());
        System.out.println(metrics.getNmi());
        System.out.println(metrics.getAri());
        System.out.println(metrics.getRi());
        System.out.println(metrics.getSilhouetteCoefficient());
    }
}

Output is:

12150.000000000042
0.11547005383792497
6
0.014814814814814791
15.588457268119896
2
364.5
0.1199999999999996
1.0
1.0
1.0
1.0
0.9997530305375205

0x04 overall logic

The overall logic of the code is as follows:

  • label related index calculation operation
    • Use calllocalpredresult to operate on each partition
      • flatMap 1 is to break up the Row and get Label y
      • flatMap 2 is to break up Row and get y_hat, so the first two steps are to get y and y_ map of hat. These two will be broadcast to CalLocalPredResult for use.
      • Call CalLocalPredResult to establish confusion matrix
    • Use reduce to merge the results of these partition operations.
    • Use extractparamsfromfusionmatrix to calculate purity, NMI and other indicators according to the confusion matrix
  • Vector related index calculation operation
    • Group data by category
    • Group and merge, and call calcclustermetricsummary to calculate vector related indicators in a distributed manner
      • Traverse rows and accumulate to sumVector
      • Loop to calculate some statistical information
    • Call reducebasesemetrics and merge to form a BaseMetricsSummary
    • Call callsilhouettecoefficiency to calculate silhouettecoefficiency
    • Store data as Params
  • Merge output
    • Make a union, combine labelMetrics and vectorMetrics, and then merge and output them to the final table
    • Grouping merging
    • Output to last table

The specific codes are as follows:

public EvalClusterBatchOp linkFrom(BatchOperator<?>... inputs) {
    BatchOperator in = checkAndGetFirst(inputs);
    String labelColName = this.getLabelCol();
    String predResultColName = this.getPredictionCol();
    String vectorColName = this.getVectorCol();
    DistanceType distanceType = getDistanceType();
    ContinuousDistance distance = distanceType.getFastDistance();

    DataSet<Params> empty = MLEnvironmentFactory.get(getMLEnvironmentId()).getExecutionEnvironment().fromElements(
        new Params());
    DataSet<Params> labelMetrics = empty, vectorMetrics;

    if (null != labelColName) { // For label operation
        // get data
        DataSet<Row> data = in.select(new String[] {labelColName, predResultColName}).getDataSet();
        // Use calllocalpredresult to operate on each partition
        labelMetrics = calLocalPredResult(data)
            .reduce(new ReduceFunction<LongMatrix>() { // Use reduce to merge the results of these partition operations
                @Override
                public LongMatrix reduce(LongMatrix value1, LongMatrix value2) {
                    value1.plusEqual(value2);
                    return value1;
                }
            })
            .map(new MapFunction<LongMatrix, Params>() { 
                @Override
                public Params map(LongMatrix value) {
                    // Use extractparamsfromfusionmatrix to calculate purity, NMI and other indicators according to the confusion matrix
                    return ClusterEvaluationUtil.extractParamsFromConfusionMatrix(value);
                }
            });
    }
    if (null != vectorColName) {
        // get data
        DataSet<Row> data = in.select(new String[] {predResultColName, vectorColName}).getDataSet();
      
        DataSet<BaseMetricsSummary> metricsSummary = data
            .groupBy(0) // Group data by category
            .reduceGroup(new CalcClusterMetricsSummary(distance)) // Distributed computing vector related indicators
            .reduce(new EvaluationUtil.ReduceBaseMetrics());// Merge
        DataSet<Tuple1<Double>> silhouetteCoefficient = data.map(  // Calculate silhouette
            new RichMapFunction<Row, Tuple1<Double>>() {
                @Override
                public Tuple1<Double> map(Row value) {
                    List<BaseMetricsSummary> list = getRuntimeContext().getBroadcastVariable(METRICS_SUMMARY);
                    return ClusterEvaluationUtil.calSilhouetteCoefficient(value,
                        (ClusterMetricsSummary)list.get(0));
                }
            }).withBroadcastSet(metricsSummary, METRICS_SUMMARY)
            .aggregate(Aggregations.SUM, 0);

        // Store data as Params
        vectorMetrics = metricsSummary.map(new ClusterEvaluationUtil.SaveDataAsParams()).withBroadcastSet( 
            silhouetteCoefficient, SILHOUETTE_COEFFICIENT);
    } else {
        vectorMetrics = in.select(predResultColName)
            .getDataSet()
            .reduceGroup(new BasicClusterParams());
    }

    DataSet<Row> out = labelMetrics
        .union(vectorMetrics) // Combine labelMetrics and vectorMetrics
        .reduceGroup(new GroupReduceFunction<Params, Row>() { // Grouping merging
            @Override
            public void reduce(Iterable<Params> values, Collector<Row> out) {
                Params params = new Params();
                for (Params p : values) {
                    params.merge(p);
                }
                out.collect(Row.of(params.toJson()));
            }
        });
    // Output to last table
    this.setOutputTable(DataSetConversionUtil.toTable(getMLEnvironmentId(),
        out, new TableSchema(new String[] {EVAL_RESULT}, new TypeInformation[] {Types.STRING}) 
    ));
    return this;
}

0x05 for label operation

5.1 calLocalPredResult

Because there is dataset < row > data = in select(new String[] {labelColName, predResultColName}). getDataSet();, So what we're dealing with here is y and y_hat.

There are two flatmaps strung together.

  • flatMap 1 is to break up the Row and get Label y
  • flatMap 2 is to break up Row and get y_hat

Both flatmaps are connected with distinguishlabelindexmap and project(0). The function of distinguishlabelindexmap is to Give each label an ID, return a map of label and ID., that is, give each ID a label. project(0) is to extract the label.

So the first two steps are to get y and y_ map of hat. These two will be broadcast to CalLocalPredResult for use.

The third step is to call CalLocalPredResult to establish the confusion matrix.

The specific codes are as follows:

private static DataSet<LongMatrix> calLocalPredResult(DataSet<Row> data) {

    // Break up Row and get Label y
    DataSet<Tuple1<Map<String, Integer>>> labels = data.flatMap(new FlatMapFunction<Row, String>() {
        @Override
        public void flatMap(Row row, Collector<String> collector) {
            if (EvaluationUtil.checkRowFieldNotNull(row)) {
                collector.collect(row.getField(0).toString());
            }
        }
    }).reduceGroup(new EvaluationUtil.DistinctLabelIndexMap(false, null)).project(0);
    // Break up Row and get y_hat
    DataSet<Tuple1<Map<String, Integer>>> predictions = data.flatMap(new FlatMapFunction<Row, String>() {
        @Override
        public void flatMap(Row row, Collector<String> collector) {
            if (EvaluationUtil.checkRowFieldNotNull(row)) {
                collector.collect(row.getField(1).toString());
            }
        }
    }).reduceGroup(new EvaluationUtil.DistinctLabelIndexMap(false, null)).project(0);

    // The first two steps are to get y and y_ map of hat. These two will be broadcast to CalLocalPredResult for use
    // Build the confusion matrix.
    DataSet<LongMatrix> statistics = data
        .rebalance()
        .mapPartition(new CalLocalPredResult())
        .withBroadcastSet(labels, LABELS)
        .withBroadcastSet(predictions, PREDICTIONS);

    return statistics;
}

CalLocalPredResult establishes the confusion matrix.

  • In the open function, y and y are obtained from the system_ hat.
  • In mapPartition function, establish confusion matrix.
matrix = {long[2][]@10707} 
 0 = {long[2]@10709} 
  0 = 0
  1 = 0
 1 = {long[2]@10710} 
  0 = 1
  1 = 0

The code is:

static class CalLocalPredResult extends RichMapPartitionFunction<Row, LongMatrix> {
    private Map<String, Integer> labels, predictions;

    @Override
    public void open(Configuration parameters) throws Exception {
        List<Tuple1<Map<String, Integer>>> list = getRuntimeContext().getBroadcastVariable(LABELS);
        this.labels = list.get(0).f0;
        list = getRuntimeContext().getBroadcastVariable(PREDICTIONS);
        this.predictions = list.get(0).f0;
    }

    @Override
    public void mapPartition(Iterable<Row> rows, Collector<LongMatrix> collector) {
        long[][] matrix = new long[predictions.size()][labels.size()];
        for (Row r : rows) {
            if (EvaluationUtil.checkRowFieldNotNull(r)) {
                int label = labels.get(r.getField(0).toString());
                int pred = predictions.get(r.getField(1).toString());
                matrix[pred][label] += 1;
            }
        }
        collector.collect(new LongMatrix(matrix));
    }
}

5.2 extractParamsFromConfusionMatrix

Extractparamsfromfusionmatrix here is to calculate purity, NMI and other indicators according to the confusion matrix.

public static Params extractParamsFromConfusionMatrix(LongMatrix longMatrix) {
    long[][] matrix = longMatrix.getMatrix();
    long[] actualLabel = longMatrix.getColSums();
    long[] predictLabel = longMatrix.getRowSums();
    long total = longMatrix.getTotal();

    double entropyActual = 0.0;
    double entropyPredict = 0.0;
    double mutualInfor = 0.0;
    double purity = 0.0;
    long tp = 0L;
    long tpFpSum = 0L;
    long tpFnSum = 0L;
    for (long anActualLabel : actualLabel) {
        entropyActual += entropy(anActualLabel, total);
        tpFpSum += combination(anActualLabel);
    }
    entropyActual /= -Math.log(2);
    for (long aPredictLabel : predictLabel) {
        entropyPredict += entropy(aPredictLabel, total);
        tpFnSum += combination(aPredictLabel);
    }
    entropyPredict /= -Math.log(2);
    for (int i = 0; i < matrix.length; i++) {
        long max = 0;
        for (int j = 0; j < matrix[0].length; j++) {
            max = Math.max(max, matrix[i][j]);
            mutualInfor += (0 == matrix[i][j] ? 0.0 :
                1.0 * matrix[i][j] / total * Math.log(1.0 * total * matrix[i][j] / predictLabel[i] / actualLabel[j]));
            tp += combination(matrix[i][j]);
        }
        purity += max;
    }
    purity /= total;
    mutualInfor /= Math.log(2);
    long fp = tpFpSum - tp;
    long fn = tpFnSum - tp;
    long totalCombination = combination(total);
    long tn = totalCombination - tp - fn - fp;
    double expectedIndex = 1.0 * tpFpSum * tpFnSum / totalCombination;
    double maxIndex = 1.0 * (tpFpSum + tpFnSum) / 2;
    double ri = 1.0 * (tp + tn) / (tp + tn + fp + fn);
    return new Params()
        .set(ClusterMetrics.NMI, 2.0 * mutualInfor / (entropyActual + entropyPredict))
        .set(ClusterMetrics.PURITY, purity)
        .set(ClusterMetrics.RI, ri)
        .set(ClusterMetrics.ARI, (tp - expectedIndex) / (maxIndex - expectedIndex));
}

0x06 Vector related

The first two steps are distributed computing and merging:

DataSet<BaseMetricsSummary> metricsSummary = data
    .groupBy(0)
    .reduceGroup(new CalcClusterMetricsSummary(distance))
    .reduce(new EvaluationUtil.ReduceBaseMetrics());

6.1 CalcClusterMetricsSummary

Clusterevaluationutil. Was called Getclusterstatistics to calculate.

public static class CalcClusterMetricsSummary implements GroupReduceFunction<Row, BaseMetricsSummary> {
    private ContinuousDistance distance;

    public CalcClusterMetricsSummary(ContinuousDistance distance) {
        this.distance = distance;
    }

    @Override
    public void reduce(Iterable<Row> rows, Collector<BaseMetricsSummary> collector) {
        collector.collect(ClusterEvaluationUtil.getClusterStatistics(rows, distance));
    }
}

ClusterEvaluationUtil.getClusterStatistics is as follows

public static ClusterMetricsSummary getClusterStatistics(Iterable<Row> rows, ContinuousDistance distance) {
    List<Vector> list = new ArrayList<>();
    int total = 0;
    String clusterId;
    DenseVector sumVector;

    Iterator<Row> iterator = rows.iterator();
    Row row = null;
    while (iterator.hasNext() && !EvaluationUtil.checkRowFieldNotNull(row)) {
        // Take out the first item that is not empty
        row = iterator.next();
    }
    if (EvaluationUtil.checkRowFieldNotNull(row)) {
        clusterId = row.getField(0).toString(); // Remove clusterId
        Vector vec = VectorUtil.getVector(row.getField(1)); // Take out the Vector
        sumVector = DenseVector.zeros(vec.size()); // initialization
    } else {
        return null;
    }

    while (null != row) { // Traverse rows and accumulate to sumVector
        if (EvaluationUtil.checkRowFieldNotNull(row)) {
            Vector vec = VectorUtil.getVector(row.getField(1));
            list.add(vec);
            if (distance instanceof EuclideanDistance) {
                sumVector.plusEqual(vec);
            } else {
                vec.scaleEqual(1.0 / vec.normL2());
                sumVector.plusEqual(vec);
            }
            total++;
        }
        row = iterator.hasNext() ? iterator.next() : null;
    }

    DenseVector meanVector = sumVector.scale(1.0 / total); // Take mean

// runtime variables. Here is the second set of vectors  
list = {ArrayList@10654}  size = 3
 0 = {DenseVector@10661} "9.0 9.0 9.0"
 1 = {DenseVector@10662} "9.1 9.1 9.1"
 2 = {DenseVector@10663} "9.2 9.2 9.2"  
  
    double distanceSum = 0.0;
    double distanceSquareSum = 0.0;
    double vectorNormL2Sum = 0.0;
    for (Vector vec : list) { // Loop to calculate several statistics
        double d = distance.calc(meanVector, vec);
        distanceSum += d;
        distanceSquareSum += d * d;
        vectorNormL2Sum += vec.normL2Square();
    }
  
// runtime variable
sumVector = {DenseVector@10656} "27.3 27.3 27.3"
meanVector = {DenseVector@10657} "9.1 9.1 9.1"
distanceSum = 0.34641016151377424
distanceSquareSum = 0.059999999999999575
vectorNormL2Sum = 745.3499999999999  
  
    return new ClusterMetricsSummary(clusterId, total, distanceSum / total, distanceSquareSum, vectorNormL2Sum,
        meanVector, distance);
}

6.2 ReduceBaseMetrics

Here, merge to form a basemetrics summary.

/**
 * Merge the BaseMetrics calculated locally.
 */
public static class ReduceBaseMetrics implements ReduceFunction<BaseMetricsSummary> {
    @Override
    public BaseMetricsSummary reduce(BaseMetricsSummary t1, BaseMetricsSummary t2) throws Exception {
        return null == t1 ? t2 : t1.merge(t2);
    }
}

6.3 calSilhouetteCoefficient

The third step is to call callsilhouettecoefficiency to calculate silhouettecoefficiency.

vectorMetrics = metricsSummary.map(new ClusterEvaluationUtil.SaveDataAsParams()).withBroadcastSet(
        silhouetteCoefficient, SILHOUETTE_COEFFICIENT);

Here is the same treatment as the formula

public static Tuple1<Double> calSilhouetteCoefficient(Row row, ClusterMetricsSummary clusterMetricsSummary) {
    if (!EvaluationUtil.checkRowFieldNotNull(row)) {
        return Tuple1.of(0.);
    }
    String clusterId = row.getField(0).toString();
    Vector vec = VectorUtil.getVector(row.getField(1));
    double currentClusterDissimilarity = 0.0;
    double neighboringClusterDissimilarity = Double.MAX_VALUE;
    if (clusterMetricsSummary.distance instanceof EuclideanDistance) {
        double normSquare = vec.normL2Square();
        for (int i = 0; i < clusterMetricsSummary.k; i++) {
            double dissimilarity = clusterMetricsSummary.clusterCnt.get(i) * normSquare
                - 2 * clusterMetricsSummary.clusterCnt.get(i) * MatVecOp.dot(vec, clusterMetricsSummary.meanVector.get(i)) + clusterMetricsSummary.vectorNormL2Sum.get(i);
            if (clusterId.equals(clusterMetricsSummary.clusterId.get(i))) {
                if (clusterMetricsSummary.clusterCnt.get(i) > 1) {
                    currentClusterDissimilarity = dissimilarity / (clusterMetricsSummary.clusterCnt.get(i) - 1);
                }
            } else {
                neighboringClusterDissimilarity = Math.min(neighboringClusterDissimilarity,
                    dissimilarity / clusterMetricsSummary.clusterCnt.get(i));
            }
        }
    } else {
        for (int i = 0; i < clusterMetricsSummary.k; i++) {
            double dissimilarity = 1.0 - MatVecOp.dot(vec, clusterMetricsSummary.meanVector.get(i));
            if (clusterId.equals(clusterMetricsSummary.clusterId.get(i))) {
                if (clusterMetricsSummary.clusterCnt.get(i) > 1) {
                    currentClusterDissimilarity = dissimilarity * clusterMetricsSummary.clusterCnt.get(i) / (clusterMetricsSummary.clusterCnt.get(i) - 1);
                }
            } else {
                neighboringClusterDissimilarity = Math.min(neighboringClusterDissimilarity,
                    dissimilarity);
            }
        }
    }
    return Tuple1.of(currentClusterDissimilarity < neighboringClusterDissimilarity ?
        1 - (currentClusterDissimilarity / neighboringClusterDissimilarity) :
        (neighboringClusterDissimilarity / currentClusterDissimilarity) - 1);
}

6.4 SaveDataAsParams

The fourth step is to store the data as Params

public static class SaveDataAsParams extends RichMapFunction<BaseMetricsSummary, Params> {
    @Override
    public Params map(BaseMetricsSummary t) throws Exception {
        Params params = t.toMetrics().getParams();
        List<Tuple1<Double>> silhouetteCoefficient = getRuntimeContext().getBroadcastVariable(
            EvalClusterBatchOp.SILHOUETTE_COEFFICIENT);
        params.set(ClusterMetrics.SILHOUETTE_COEFFICIENT,
            silhouetteCoefficient.get(0).f0 / params.get(ClusterMetrics.COUNT));
        return params;
    }
}

0x06 merge output

In this step, a union is made to combine labelMetrics and vectorMetrics, and then merge and output them to the final table.

DataSet<Row> out = labelMetrics
    .union(vectorMetrics)
    .reduceGroup(new GroupReduceFunction<Params, Row>() {
        @Override
        public void reduce(Iterable<Params> values, Collector<Row> out) {
            Params params = new Params();
            for (Params p : values) {
                params.merge(p);
            }
            out.collect(Row.of(params.toJson()));
        }
    });

this.setOutputTable(DataSetConversionUtil.toTable(getMLEnvironmentId(),
    out, new TableSchema(new String[] {EVAL_RESULT}, new TypeInformation[] {Types.STRING})
));

0xFF reference

Clustering algorithm and its evaluation index

[ML] clustering evaluation index

Evaluation index of clustering results

Cluster evaluation index

How to evaluate the clustering results?

Clustering evaluation algorithm - contour coefficient

Evaluation index of clustering effect

ARI clustering effect evaluation index

Evaluation index of clustering algorithm -- Davies bouldin index (Dbi)

[weekly blog] on Davies bouldin index (DBI)

Evaluation index of clustering algorithm

Performance evaluation index of clustering model

★★★★★★★ thinking about life and technology ★★★★★★
Wechat public account: Rossi's thinking
You can scan the following QR code (or long press the identification QR code) to follow the personal official account if you want to get the message push of personal articles written in time, or want to see the technical data recommended by individuals.

Tags: Big Data Machine Learning flink

Posted by chenci on Sat, 14 May 2022 16:31:35 +0300