Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

improve broker selector utils #14733

Merged
merged 5 commits into from
Jan 9, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -190,20 +190,14 @@ protected void updateBrokerData()
}

public String getBroker(String... tableNames) {
List<String> brokers = null;
// If tableNames is not-null, filter out nulls
tableNames =
tableNames == null ? tableNames : Arrays.stream(tableNames).filter(Objects::nonNull).toArray(String[]::new);
if (!(tableNames == null || tableNames.length == 0)) {
// returning list of common brokers hosting all the tables.
brokers = BrokerSelectorUtils.getTablesCommonBrokers(Arrays.asList(tableNames),
_brokerData.getTableToBrokerMap());
tableNames = tableNames == null ? tableNames
: Arrays.stream(tableNames).filter(Objects::nonNull).toArray(String[]::new);
if (tableNames == null || tableNames.length == 0) {
List<String> brokers = _brokerData.getBrokers();
return brokers.get(ThreadLocalRandom.current().nextInt(brokers.size()));
}

if (brokers == null || brokers.isEmpty()) {
brokers = _brokerData.getBrokers();
}
return brokers.get(ThreadLocalRandom.current().nextInt(brokers.size()));
return BrokerSelectorUtils.getRandomBroker(Arrays.asList(tableNames), _brokerData.getTableToBrokerMap());
}

public List<String> getBrokers() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,10 +91,10 @@ private void refresh() {
public String selectBroker(String... tableNames) {
if (!(tableNames == null || tableNames.length == 0 || tableNames[0] == null)) {
// getting list of brokers hosting all the tables.
List<String> list = BrokerSelectorUtils.getTablesCommonBrokers(Arrays.asList(tableNames),
String randomBroker = BrokerSelectorUtils.getRandomBroker(Arrays.asList(tableNames),
_tableToBrokerListMapRef.get());
if (list != null && !list.isEmpty()) {
return list.get(ThreadLocalRandom.current().nextInt(list.size()));
if (randomBroker != null) {
return randomBroker;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,13 @@
package org.apache.pinot.client.utils;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ThreadLocalRandom;
import javax.annotation.Nullable;
import org.apache.pinot.client.ExternalViewReader;


Expand All @@ -34,35 +38,52 @@ private BrokerSelectorUtils() {
*
* @param tableNames: List of table names.
* @param brokerData: map holding data for table hosting on brokers.
* @return list of common brokers hosting all the tables.
* @return list of common brokers hosting all the tables or null if no common brokers found.
* @deprecated Use {@link #getTablesCommonBrokersSet(List, Map)} instead. It is more efficient and its semantics are
* clearer (ie it returns an empty set instead of null if no common brokers are found).
*/
public static List<String> getTablesCommonBrokers(List<String> tableNames, Map<String, List<String>> brokerData) {
List<List<String>> tablesBrokersList = new ArrayList<>();
for (String name: tableNames) {
String tableName = getTableNameWithoutSuffix(name);
int idx = tableName.indexOf('.');

if (brokerData.containsKey(tableName)) {
tablesBrokersList.add(brokerData.get(tableName));
} else if (idx > 0) {
// In case tableName is formatted as <db>.<table>
tableName = tableName.substring(idx + 1);
tablesBrokersList.add(brokerData.get(tableName));
}
@Nullable
@Deprecated
public static List<String> getTablesCommonBrokers(@Nullable List<String> tableNames,
Map<String, List<String>> brokerData) {
Set<String> tablesCommonBrokersSet = getTablesCommonBrokersSet(tableNames, brokerData);
if (tablesCommonBrokersSet == null || tablesCommonBrokersSet.isEmpty()) {
return null;
}
return new ArrayList<>(tablesCommonBrokersSet);
}

// return null if tablesBrokersList is empty or contains null
if (tablesBrokersList.isEmpty()
|| tablesBrokersList.stream().anyMatch(Objects::isNull)) {
/**
* Returns a random broker from the common brokers hosting all the tables.
*/
@Nullable
public static String getRandomBroker(@Nullable List<String> tableNames, Map<String, List<String>> brokerData) {
Set<String> tablesCommonBrokersSet = getTablesCommonBrokersSet(tableNames, brokerData);
if (tablesCommonBrokersSet.isEmpty()) {
return null;
}
return tablesCommonBrokersSet.stream()
.skip(ThreadLocalRandom.current().nextInt(tablesCommonBrokersSet.size()))
.findFirst()
.orElseThrow(() -> new IllegalStateException("No broker found"));
}

// Make a copy of the brokersList of the first table. retainAll does inplace modifications.
// So lists from brokerData should not be used directly.
List<String> commonBrokers = new ArrayList<>(tablesBrokersList.get(0));
for (int i = 1; i < tablesBrokersList.size(); i++) {
commonBrokers.retainAll(tablesBrokersList.get(i));
/**
*
* @param tableNames: List of table names.
* @param brokerData: map holding data for table hosting on brokers.
* @return set of common brokers hosting all the tables
*/
public static Set<String> getTablesCommonBrokersSet(
@Nullable List<String> tableNames, Map<String, List<String>> brokerData) {
if (tableNames == null || tableNames.isEmpty()) {
return Collections.emptySet();
}
HashSet<String> commonBrokers = getBrokers(tableNames.get(0), brokerData);
for (int i = 1; i < tableNames.size() && !commonBrokers.isEmpty(); i++) {
commonBrokers.retainAll(getBrokers(tableNames.get(i), brokerData));
}

return commonBrokers;
}

Expand All @@ -71,4 +92,28 @@ private static String getTableNameWithoutSuffix(String tableName) {
tableName.replace(ExternalViewReader.OFFLINE_SUFFIX, "").
replace(ExternalViewReader.REALTIME_SUFFIX, "");
}

/**
* Returns the brokers for the given table name.
*
* This means that an empty set is returned if there are no brokers for the given table name.
*/
private static HashSet<String> getBrokers(String tableName, Map<String, List<String>> brokerData) {
String tableNameWithoutSuffix = getTableNameWithoutSuffix(tableName);
int idx = tableNameWithoutSuffix.indexOf('.');

List<String> brokers = brokerData.get(tableNameWithoutSuffix);
if (brokers != null) {
return new HashSet<>(brokers);
} else if (idx > 0) {
// TODO: This is probably unnecessary and even wrong. `brokerData` should include the fully qualified name.
// In case tableNameWithoutSuffix is formatted as <db>.<table> and not found in the fully qualified name
tableNameWithoutSuffix = tableNameWithoutSuffix.substring(idx + 1);
List<String> brokersWithoutDb = brokerData.get(tableNameWithoutSuffix);
if (brokersWithoutDb != null) {
return new HashSet<>(brokersWithoutDb);
}
}
return new HashSet<>();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -152,4 +152,24 @@ public void testCloseZkClient() {

Mockito.verify(_mockZkClient, times(1)).close();
}

@Test
public void testSelectBrokerWithInvalidTable() {
Map<String, List<String>> tableToBrokerListMap = new HashMap<>();
tableToBrokerListMap.put("table1", Collections.singletonList("broker1"));
when(_mockExternalViewReader.getTableToBrokersMap()).thenReturn(tableToBrokerListMap);
_dynamicBrokerSelectorUnderTest.handleDataChange("dataPath", "data");
String result = _dynamicBrokerSelectorUnderTest.selectBroker("invalidTable");
assertEquals(result, "broker1");
}

@Test
public void testSelectBrokerWithTwoTablesOneInvalid() {
Map<String, List<String>> tableToBrokerListMap = new HashMap<>();
tableToBrokerListMap.put("table1", Collections.singletonList("broker1"));
when(_mockExternalViewReader.getTableToBrokersMap()).thenReturn(tableToBrokerListMap);
_dynamicBrokerSelectorUnderTest.handleDataChange("dataPath", "data");
String result = _dynamicBrokerSelectorUnderTest.selectBroker("table1", "invalidTable");
assertEquals(result, "broker1");
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pinot.client.utils;

import java.util.HashMap;
import java.util.List;
import java.util.Set;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.Test;


public class BrokerSelectorUtilsTest {

HashMap<String, List<String>> _brokerData = new HashMap<>();
@Test
public void getTablesCommonBrokersSetNullTables() {
Set<String> tableSet = BrokerSelectorUtils.getTablesCommonBrokersSet(null, _brokerData);
Assert.assertEquals(tableSet, Set.of());
}

@Test
public void getTablesCommonBrokersListNullTables() {
List<String> tableList = BrokerSelectorUtils.getTablesCommonBrokers(null, _brokerData);
Assert.assertNull(tableList);
}

@Test
public void getTablesCommonBrokersSetEmptyTables() {
Set<String> tableSet = BrokerSelectorUtils.getTablesCommonBrokersSet(List.of(), _brokerData);
Assert.assertEquals(tableSet, Set.of());
}

@Test
public void getTablesCommonBrokersListEmptyTables() {
List<String> tableList = BrokerSelectorUtils.getTablesCommonBrokers(List.of(), _brokerData);
Assert.assertNull(tableList);
}

@Test
public void getTablesCommonBrokersSetNotExistentTable() {
Set<String> tableSet = BrokerSelectorUtils.getTablesCommonBrokersSet(List.of("notExistent"), _brokerData);
Assert.assertEquals(tableSet, Set.of());
}

@Test
public void getTablesCommonBrokersListNotExistentTable() {
List<String> tableList = BrokerSelectorUtils.getTablesCommonBrokers(List.of("notExistent"), _brokerData);
Assert.assertNull(tableList);
}

@Test
public void getTablesCommonBrokersSetOneTable() {
_brokerData.put("table1", List.of("broker1"));
Set<String> tableSet = BrokerSelectorUtils.getTablesCommonBrokersSet(List.of("table1"), _brokerData);
Assert.assertEquals(tableSet, Set.of("broker1"));
}

@Test
public void getTablesCommonBrokersListOneTable() {
_brokerData.put("table1", List.of("broker1"));
List<String> tableList = BrokerSelectorUtils.getTablesCommonBrokers(List.of("table1"), _brokerData);
Assert.assertNotNull(tableList);
Assert.assertEquals(tableList, List.of("broker1"));
}

@Test
public void getTablesCommonBrokersSetTwoTables() {
_brokerData.put("table1", List.of("broker1"));
_brokerData.put("table2", List.of("broker1"));
Set<String> tableSet = BrokerSelectorUtils.getTablesCommonBrokersSet(List.of("table1", "table2"), _brokerData);
Assert.assertNotNull(tableSet);
Assert.assertEquals(tableSet, Set.of("broker1"));
}

@Test
public void getTablesCommonBrokersListTwoTables() {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why does the data type of the result (List vs Set) matter ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It doesn't depend on the returned data type. It is just that I'm keeping the older method for background compatibility reasons.

Implementation of the older method is not correct. Apart of being unnecessary expensive by allocating too much, the main issue is that depending on some conditions it either returns null or an empty set when there are not common brokers.

I'm just keeping the old method in case someone is using it outside this repo (I know we do), but the idea is to change or remove it in future.

_brokerData.put("table1", List.of("broker1"));
_brokerData.put("table2", List.of("broker1"));
List<String> tableList = BrokerSelectorUtils.getTablesCommonBrokers(List.of("table1", "table2"), _brokerData);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wouldn't a single assert be more readable ?
Assert.assertEquals(tableList, Array.asList("broker1"))

Assert.assertNotNull(tableList);
Assert.assertEquals(tableList, List.of("broker1"));
}

@Test
public void getTablesCommonBrokersSetTwoTablesDifferentBrokers() {
_brokerData.put("table1", List.of("broker1"));
_brokerData.put("table2", List.of("broker2"));
Set<String> tableSet = BrokerSelectorUtils.getTablesCommonBrokersSet(List.of("table1", "table2"), _brokerData);
Assert.assertEquals(tableSet, Set.of());
}

@Test
public void getTablesCommonBrokersListTwoTablesDifferentBrokers() {
_brokerData.put("table1", List.of("broker1"));
_brokerData.put("table2", List.of("broker2"));
List<String> tableList = BrokerSelectorUtils.getTablesCommonBrokers(List.of("table1", "table2"), _brokerData);
Assert.assertNull(tableList);
}

@AfterMethod
public void tearDown() {
_brokerData.clear();
}
}
Loading