Openvidu server builds kurento load balancing mechanism

Why extend the streaming media server (kurento)

We pull down the code from the openvidu official website. After startup, the default is an openvidu server to a kurento server. Deal with signaling and streaming media respectively. According to the official document, under the condition that there are 7 sessions for one session and participants, the pressure that servers with different configurations can withstand is shown in the figure below:

Yes, when we use 4c8g server, theoretically we can only process 7 session s, that is, only 7 rooms exist at the same time! This is simply unbearable in production. Because the signaling server itself has little pressure, we must expand the media server (kurento) to increase its affordability.

How to extend kurento

openvidu is divided into CE version and Pro version. We use CE version, that is, the code of kurento will not be given to us in the source code. How can we expand the source code so that it can load multiple kurentos? First, let's take a look at the architecture of openvidupro kurento

Yes, as shown in the figure above, an openvidu server extends three kurento servers. Since the bottleneck is kurento, if openviduserver is not considered, our affordability will be multiplied by 3 in theory! Next, let's analyze the source code of openvidu.

KMS_URIS=["ws://116.196.10.***:8888/kurento"]
io.openvidu.server.config.OpenviduConfig:

...
 public List<String> checkKmsUris() {

        String property = "KMS_URIS";

        return asKmsUris(property, getValue(property));

  }
...

As you can see from here, the code entry is here. But if we're in KMS_ When multiple kurento addresses are added to URIs, it will only take the first kurento address in the address, that is, it does not handle load operations. The reasons are as follows:

public class FixedOneKmsManager extends KmsManager {

    @Override
    public List<Kms> initializeKurentoClients(List<KmsProperties> kmsProperties, boolean disconnectUponFailure) throws Exception {
        KmsProperties firstProps = kmsProperties.get(0);
        KurentoClient kClient = null;
        Kms kms = new Kms(firstProps, loadManager);
        try {
            kClient = KurentoClient.create(firstProps.getUri(), this.generateKurentoConnectionListener(kms.getId()));
            this.addKms(kms);
            kms.setKurentoClient(kClient);

            // TODO: This should be done in KurentoClient connected event
            kms.setKurentoClientConnected(true);
            kms.setTimeOfKurentoClientConnection(System.currentTimeMillis());

        } catch (KurentoException e) {
            log.error("KMS in {} is not reachable by OpenVidu Server", firstProps.getUri());
            if (kClient != null) {
                kClient.destroy();
            }
            throw new Exception();
        }
        return Arrays.asList(kms);
    }

    @Override
    @PostConstruct
    protected void postConstructInitKurentoClients() {
        try {
            List<KmsProperties> kmsProps = new ArrayList<>();
            for (String kmsUri : this.openviduConfig.getKmsUris()) {
                String kmsId = KmsManager.generateKmsId();
                kmsProps.add(new KmsProperties(kmsId, kmsUri));
            }
            this.initializeKurentoClients(kmsProps, true);
        } catch (Exception e) {
            // Some KMS wasn't reachable
            log.error("Shutting down OpenVidu Server");
            System.exit(1);
        }
    }

}

The initializeKurentoClients method clearly states that only kms will be taken_ The 0-th data of URIs. If we want to process multiple items, we need to return a collection here. Very simply, we define our own KmsManager class and override the initializeKurentoClients method. As follows:

public class FixedJDKmsManager extends KmsManager {

    @Override
    public List<Kms> initializeKurentoClients(List<KmsProperties> kmsProperties, boolean disconnectUponFailure) throws Exception {

        ArrayList<Kms> results = new ArrayList<>();

        for(KmsProperties kmsp:kmsProperties){
            KurentoClient kClient = null;
            Kms kms = new Kms(kmsp, loadManager);
            try {
                kClient = KurentoClient.create(kmsp.getUri(), this.generateKurentoConnectionListener(kms.getId()));
                this.addKms(kms);
                kms.setKurentoClient(kClient);

                // TODO: This should be done in KurentoClient connected event
                kms.setKurentoClientConnected(true);
                kms.setTimeOfKurentoClientConnection(System.currentTimeMillis());

                results.add(kms);
            } catch (KurentoException e) {
                log.error("KMS in {} is not reachable by OpenVidu Server", kmsp.getUri());
                if (kClient != null) {
                    kClient.destroy();
                }
                throw new Exception();
            }
        }

        return results;
    }

    @Override
    @PostConstruct
    protected void postConstructInitKurentoClients() {
        try {
            List<KmsProperties> kmsProps = new ArrayList<>();
            for (String kmsUri : this.openviduConfig.getKmsUris()) {
                String kmsId = KmsManager.generateKmsId();
                kmsProps.add(new KmsProperties(kmsId, kmsUri));
            }
            this.initializeKurentoClients(kmsProps, true);
        } catch (Exception e) {
            // Some KMS wasn't reachable
            log.error("Shutting down OpenVidu Server");
            System.exit(1);
        }
    }

}

Configure and use the kmsManager defined by yourself

How to realize weighted polling load balancing

And kms_urls similarly, we can add kms to the configuration file_ weights:

KMS_WEIGHT=[1]

Moreover, the KMS is artificially controlled_ size and kms of URLs_ Weight equivalent

Then, in openviduConfig, compare it with kms_urls is initialized in a similar way. The core code is as follows:

public class OpenviduConfig{

    ...

    private List<Integer> kmsWeights;


    public List<Integer> getKmsWeights() {
        return kmsWeights;
    }

    public void setKmsWeights(List<Integer> weights) {
        this.kmsWeights = weights;
        log.info("=====kms Reset to weight:{}", this.kmsUrisList);
    }

    public List<Integer> initWeight() {

        String property = "KMS_WEIGHT";

        return asKmsWeights(property, getValue(property));
    }


    protected void checkConfigurationProperties(boolean loadDotenv) {

      ...
      kmsUrisList = checkKmsUris();

      kmsWeights = initWeight();
      ...

    }

  ...

}

At this point, we put the weighting parameters we want into the container, but openvidu has not selected kurento. Next, we need to find the place where openvidu looks for the corresponding kurento and get the corresponding url according to the weight:

After reading the source code, you can find that in kurentosessionmanager In the joinroom method, call the getLessLoadedConnectedAndRunningKms method, which is the method to select kms. Similarly, we can rewrite a method to select kms according to our own logic. I write as follows:

public synchronized Kms getLoadBalanceConnectedAndRunningKms() throws NoSuchElementException {
        List<KmsLoad> kmsLoads = getKmsLoads().stream().filter(kmsLoad -> kmsLoad.kms.isKurentoClientConnected()
                && mediaNodeStatusManager.isRunning(kmsLoad.kms.getId()) ).collect(Collectors.toList());

        if (kmsLoads.isEmpty()) {
            throw new NoSuchElementException();
        } else {
            //todo writes the load balancing of kms here
            this.openviduConfig.getKmsWeights();
//            Kms kms = kmsLoads.get(Integer.parseInt(kmsNode)).kms;

            //First time: initialize data
            KmsWeightRobin.initKmsLoads(kmsLoads,this.openviduConfig.getKmsWeights());
            KmsLoad kmsWeightRobin = KmsWeightRobin.getKmsWeightRobin();
            log.info("=========>>>>>>After weighted polling,choice kms:{}",kmsWeightRobin.kms.getUri());
            return  kmsWeightRobin.getKms();

        }
    }
public class KmsWeightRobin {

//    static Map<String,Integer> ipMap=new HashMap<>();

    static Map<KmsManager.KmsLoad,Integer> kmsMap=new HashMap<>();

    static List<KmsManager.KmsLoad> kmsLoads = null;

    static List<Integer> weights = null;

    public  static synchronized void initMap() {

        kmsMap.clear();

        for(int i = 0;i<kmsLoads.size();i++){
            kmsMap.put(kmsLoads.get(i),weights.get(i));
        }
    }



    static Integer pos=0;
    public static KmsManager.KmsLoad getKmsWeightRobin(){
        Map<KmsManager.KmsLoad,Integer> ipServerMap=new ConcurrentHashMap<>();
        ipServerMap.putAll(kmsMap);

        Set<KmsManager.KmsLoad> ipSet=ipServerMap.keySet();
        Iterator<KmsManager.KmsLoad> ipIterator=ipSet.iterator();

        //Define a list for all server s
        ArrayList<KmsManager.KmsLoad> ipArrayList=new ArrayList<KmsManager.KmsLoad>();

        //Loop set, know the value in the map according to the in set, and add the corresponding number of server s to the list
        while (ipIterator.hasNext()){
            KmsManager.KmsLoad serverName=ipIterator.next();
            Integer weight=ipServerMap.get(serverName);
            for (int i = 0;i < weight ;i++){
                ipArrayList.add(serverName);
            }
        }
        KmsManager.KmsLoad serverName=null;
        if (pos>=ipArrayList.size()){
            pos=0;
        }
        serverName=ipArrayList.get(pos);
        //Polling + 1
        pos ++;
        return  serverName;
    }


//    public static Boolean initMapFlag = false;

    public static void initKmsLoads(List<KmsManager.KmsLoad> kmsLoads,List<Integer> weights){
        if(ObjectUtils.isEmpty(KmsWeightRobin.kmsLoads)){
            KmsWeightRobin.kmsLoads = kmsLoads;
        }
        if(ObjectUtils.isEmpty(KmsWeightRobin.weights)){
            KmsWeightRobin.weights = weights;
        }
//        if(!initMapFlag){
            initMap();
//            initMapFlag = true;
//        }


    }


    public static void updateKmsLoads(List<KmsManager.KmsLoad> kmsLoads,List<Integer> weights){


        return;
    }


}

In this way, when selecting kms, the container can be selected according to the weight.

Tags: webrtc

Posted by AcousticJames on Wed, 25 May 2022 08:07:39 +0300