大数据实时计算高性能解决方案
假设有这样一个场景:百万级数据量,每天千万级增量,几十字段匹配,单值/多值/范围,毫秒级别耗时,同时需要考虑后续数据量递增情况下,整体性能不能线性下降。我们应该采用哪种索引技术来实现匹配计算呢?
业务场景
以机票搜索场景为例,有效数据量百万+,每天增量千万+,单docker实例TPS要求1000+。下面的表格是计算机票所需要的一种规则数据,每条规则涉及到许多字段需要进行索引匹配,其中字段有多种数据类型:单值、多值、范围,多字段匹配关系支持:且、或、非。
规则数据
- 行程类型,总共有三种枚举值:1-单程,2-往返,3-不限
- 开票航司,支持多个,用/分隔
- 出发地/目的地/排除出发地/排除目的地,支持多个,用/分隔,支持多维度,同时支持全球(GLOBAL)
- 班期,支持多个,用/分隔
- 舱位,支持多个,用/分隔,同时支持全部(ALL)
- 旅行日期,支持日期范围
索引选型
索引选择原则:算法稳定、查询时间复杂度低、空间复杂度适中、支持多维查询。
综合对比,我们选择倒排索引+BitMap组合方式来构建索引。
BitMap特点
- BitMap是一种数据结构,通常用于存储大量的布尔值或者整数。它将每个元素映射到一个二进制位上,使用0或1来表示该元素的状态。在实际使用中,BitMap通常被用来解决一些需要高效查找或者去重的问题。
- BitMap的基本操作包括设置某个元素为1或0,查询某个元素的状态,以及对多个BitMap进行位运算,例如求并集、交集、差集等。由于BitMap的实现采用了位运算,因此它的操作非常高效,可以在常数时间内完成。
- 在计算机科学中,BitMap通常使用一个长整型数组来实现。在Java中,可以使用BitSet类来实现BitMap,它提供了一系列高效的操作方法,例如set、get、and、or等等。
- BitMap的应用非常广泛,例如在数据库中用于去重、在网络中用于IP地址的过滤、在图像处理中用于表示像素的颜色等等。
索引构建
基于规则数据,在顺序创建索引时,先采用哈希表进行双向映射,便于后续提取规则ID,结构如下:
按属性名逐一构建索引模型,结构如下:
- 航程类型属性,按枚举拆分成0-不限,1-单程,2-往返,三个值索引,其中不限单独做为一个值进行索引
- 开票航司属性,按/分隔符进行拆分,拆分为AA,CI,LH,MH,四个值索引
- 出发地属性,按/分割符进行拆分,拆分为SHA,CN,HKG,GLOBAL,四个值索引,区域各层级单独作为一个值进行索引
- 目的地属性,按/分割符进行拆分,拆分为US,TPE,EUR,NOA,GLOBAL,五个值索引,同出发地属性
- 排除出发地属性,按/分隔符进行拆分,拆分为DFW,CN两个值索引,没有值不建索引,同出发地属性
- 排除目的地属性,按/分隔符进行拆分,拆分为CN一个值索引,没有值不建索引,同出发地属性
- 舱位属性,按/分隔符进行拆分,拆分为N,S,V,L,ALL,五个值索引,其中不限单独作为一个值进行索引
- 旅行日期属性,按范围拆分到天
索引匹配
所有规则数据创建索引完成后,那如何进行索引匹配的呢,下面我们以正常搜索场景为例来进行介绍:
例如查询请求3月25日从上海(SHA)飞往洛杉矶(LAX)的单程航班,有一条达美航空(AA)的舱位代码为V舱的运价。按照请求参数逐一取对应属性名的索引模型,用传入数据进行BitMap索引匹配:
- 航程类型:值为单程(1)和不限(0)进行匹配,匹配结果分别为1001、0010,再做或运算,结果为1011
- 出发地:值为SHA、CN、GLOBAL进行匹配,匹配结果分别为1100、0010、0001,再做或运算,结果为1111
- 排除出发地:值为SHA、CN、GLOBAL进行匹配,匹配结果为0001,再做取反运算,结果为1110
- 目的地:值为LAX、US、NOA、GLOBAL进行匹配,匹配结果分别为1000、0010、0001,再做或运算,结果为1011
- 排除目的地:值为LAX、US、NOA、GLOBAL进行匹配,没有对应的索引模型,忽略当前属性
- 出发日期:值为2023-03-25进行匹配,匹配结果为1111开票航司:值为AA进行匹配,匹配结果为1010舱位:值为V、ALL进行匹配,分别匹配结果为1000、0011,做或运算,结果为1011单属性计算结果分别为:航程类型-1011,出发地-1110,目的地-1011,出发日期-1111,开票航司-1010,舱位-1011所有属性进行与运算,计算公式为:1011 & 1110 & 1011 & 1111 & 1010 & 1011,最终位运算结果为:1010最终取号位为0和2的规则ID,分别为100001和100003,作为当前搜索命中的两条规则ID。
以上就是索引匹配逻辑,涉及到BitMap的and、or、not等操作,通过位运算来提升匹配性能,降低数据量对查询性能线性影响。
代码实现
1.规则模型
Rule类:提供规则字段。
@Datapublic class Rule implements IndexKey { /** 规则ID */ private String ruleId; /** 开票航司 */ @Index(name = "AIRLINE") private String airline; /** 出发地 */ @Index(name = "DEP_CITY") private String depCity; /** 排除出发地 */ @Index(name = "FORBID_DEP_CITY") private String forbidDepCity; /** 目的地 */ @Index(name = "ARR_CITY") private String arrCity; /** 排除目的地 */ @Index(name = "FORBID_ARR_CITY") private String forbidArrCity; /** 起飞日期范围 */ @Index(name = "DEP_DATE") private String depDate; /** 舱位 */ @Index(name = "CABIN") private String cabin; public Rule(String ruleId) { this.ruleId = ruleId; } @Override public String getIndexKey() { return ruleId; }}
2.索引数据仓库接口
Repository接口:提供索引新增、删除和查询服务。
public interface Repository<T> { /** * 保存 索引 和 数据 的关系 */ void put(Integer index, T data); /** * 删除Key */ void remove(String key); /** * 根据数据ID查找对应的索引ID */ Integer findIndexId(String key); /** * 根据索引ID查找对应的数据ID */ String findDataId(Integer index);}
3.索引数据仓库接口实现类
默认实现类:DefaultRepository,数据默认存储在内存。
public class DefaultRepository<T extends IndexKey> implements Repository<T> { private final Map<Integer, String> dataMap = Maps.newConcurrentMap(); private final Map<String, Integer> indexMap = Maps.newConcurrentMap(); @Override public void put(Integer index, T data) { dataMap.put(index, data.getIndexKey()); indexMap.put(data.getIndexKey(), index); } @Override public void remove(String key) { Integer index = indexMap.get(key); dataMap.remove(index); indexMap.remove(key); } @Override public Integer findIndexId(String key) { return indexMap.get(key); } @Override public String findDataId(Integer index) { return dataMap.get(index); }}
4.索引模型接口
IndexModel接口:提供指定属性值构建、删除和查询索引,同时提供索引and、andIn、or和orIn位运算操作。
public interface IndexModel { /** * 构建 属性值对应的索引模型 */ void createIndex(String fieldValue, int index); /** * 清除 指定属性值对应的索引模型 */ void clear(String fieldValue, Integer index); /** * 获取 属性值 对应的索引模型 */ BitMapWrapper get(String fieldValue); /** * 与 运算 */ BitMapWrapper and(BitMapWrapper left, String fieldValue); /** * 与 运算 */ BitMapWrapper and(BitMapWrapper left, BitMapWrapper right); /** * 与 运算 */ BitMapWrapper and(BitMapWrapper left, String fieldValue, BitMapWrapper defaultLeft, BitMapWrapper defaultRight); /** * 与 运算 */ BitMapWrapper and(BitMapWrapper left, BitMapWrapper right, BitMapWrapper defaultLeft, BitMapWrapper defaultRight); /** * 或 运算 */ BitMapWrapper or(BitMapWrapper left, String fieldValue); /** * 或 运算 */ BitMapWrapper or(BitMapWrapper left, BitMapWrapper right); /** * 或 运算 */ BitMapWrapper or(BitMapWrapper left, String fieldValue, BitMapWrapper defaultLeft, BitMapWrapper defaultRight); /** * 或 运算 */ BitMapWrapper or(BitMapWrapper left, BitMapWrapper right, BitMapWrapper defaultLeft, BitMapWrapper defaultRight);}
5.索引模型接口实现类
默认实现类:DefaultIndexModel
public class DefaultIndexModel implements IndexModel { private final Map<String, BitMapWrapper> valueMap = new ConcurrentHashMap<>(); @Override public void createIndex(String fieldValue, int index) { valueMap.computeIfAbsent(fieldValue, k -> buildBitMapWrapper()).set(index); } @Override public void clear(String fieldValue, Integer index) { if (!valueMap.containsKey(fieldValue)) { return; } valueMap.get(fieldValue).clear(index); } @Override public BitMapWrapper get(String fieldValue) { return valueMap.get(fieldValue); } @Override public BitMapWrapper and(BitMapWrapper left, String fieldValue) { return and(left, getBitMapWrapper(fieldValue)); } @Override public BitMapWrapper and(BitMapWrapper left, BitMapWrapper right) { if (right == null) { return left; } if (left == null) { left = buildBitMapWrapper(); left.or(right); } else { left.and(right); } return left; } @Override public BitMapWrapper and(BitMapWrapper left, String fieldValue, BitMapWrapper defaultLeft, BitMapWrapper defaultRight) { BitMapWrapper right = valueMap.getOrDefault(fieldValue,defaultRight); if (left == null) { left = defaultLeft; left.or(right); } else { left.and(right); } return left; } @Override public BitMapWrapper and(BitMapWrapper left, BitMapWrapper right, BitMapWrapper defaultLeft, BitMapWrapper defaultRight) { if (right == null) { right = defaultRight; } if (left == null) { left = defaultLeft; left.or(right); } else { left.and(right); } return left; } @Override public BitMapWrapper or(BitMapWrapper left, String fieldValue) { return or(left, getBitMapWrapper(fieldValue)); } @Override public BitMapWrapper or(BitMapWrapper left, BitMapWrapper right) { if (right == null) { return left; } if (left == null) { left = buildBitMapWrapper(); } left.or(right); return left; } @Override public BitMapWrapper or(BitMapWrapper left, String fieldValue, BitMapWrapper defaultLeft, BitMapWrapper defaultRight) { if (left == null) { left = defaultLeft; } if (!valueMap.containsKey(fieldValue)) { return left; } BitMapWrapper right = valueMap.get(fieldValue); left.or(right); return left; } @Override public BitMapWrapper or(BitMapWrapper left, BitMapWrapper right, BitMapWrapper defaultLeft, BitMapWrapper defaultRight) { if (left == null) { left = defaultLeft; } if (right == null) { return left; } left.or(right); return left; } public BitMapWrapper getBitMapWrapper(String fieldValue) { return valueMap.computeIfAbsent(fieldValue,k -> buildBitMapWrapper()); } private BitMapWrapper buildBitMapWrapper() { return new DefaultBitMapWrapper(); }}
6.BitMap包装类接口
BitMapWrapper接口:提供BitSet位运算基础操作,且、或、翻转等运算,同时提供设置、清除、查询业务数据ID等服务。
public interface BitMapWrapper { /** * 且 运算 */ void and(BitMapWrapper wrapper); /** * 或 运算 */ void or(BitMapWrapper wrapper); /** * 翻转 运算 */ void flip(int fromIndex, int toIndex); /** * 指定号位 设置 为1 */ void set(int bitIndex); /** * 清除指定号位(设置 为0) */ void clear(int bitIndex); /** * 清除 BitMap */ void clear(); /** * 判断是否为空 */ boolean isEmpty(); /** * 获取源目标对象 */ Object getTarget(); /** * 获取业务数据ID */ void getDataId(Consumer<Integer> consumer); }
7.BitMap包装类接口实现类
默认实现类:DefaultBitMapWrapper
public class DefaultBitMapWrapper implements BitMapWrapper { private final BitSet bitSet = new BitSet(); @Override public void and(BitMapWrapper wrapper) { bitSet.and((BitSet) wrapper.getTarget()); } @Override public void or(BitMapWrapper wrapper) { bitSet.or((BitSet) wrapper.getTarget()); } @Override public void flip(int fromIndex, int toIndex) { bitSet.flip(fromIndex, toIndex); } @Override public void set(int bitIndex) { bitSet.set(bitIndex); } @Override public void clear(int bitIndex) { bitSet.clear(bitIndex); } @Override public void clear() { bitSet.clear(); } @Override public boolean isEmpty() { return bitSet.isEmpty(); } @Override public Object getTarget() { return bitSet; } @Override public void getDataId(Consumer<Integer> consumer) { int i = bitSet.nextSetBit(0); if (i != -1) { consumer.accept(i); for (i = bitSet.nextSetBit(i + 1); i >= 0; i = bitSet.nextSetBit(i + 1)) { int endOfRun = bitSet.nextClearBit(i); do { consumer.accept(i); } while (++i < endOfRun); } } }}
8.索引构建器接口
Build接口:提供创建、获取当前规则所有属性名对应的索引模型和查询当前索引长度。
public interface Build<T> { /** * 创建索引 */ void create(T data); /** * 获取所有属性名对应的索引模型 */ Map<String,IndexModel> getIndexModels(); /** * 获取当前索引长度 */ AtomicInteger getIndexLength();}
9.索引构建器接口实现类:
抽象实现类:AbstractBuild,实现接口相关服务,提供添加索引模型抽象方法,需自定义实现。
public abstract class AbstractBuild<T extends IndexKey> implements Build<T> { private final Map<String, IndexModel> INDEX_MODELS = Maps.newConcurrentMap(); private final AtomicInteger indexGenerator = new AtomicInteger(); private Repository<T> repository; public AbstractBuild(Repository<T> repository){ this.repository = repository; } @Override public void create(T data) { doCreateIndex(data,indexGenerator.getAndIncrement()); } /** * Do create index. * * @param data the data * @param index the index */ private void doCreateIndex(T data, int index){ // 获取当前模型所有属性,根据标记索引属性名构建对应的索引模型,添加到INDEX_MODELS中 putIndexModel(data,index); repository.put(index,data); } /** * 添加索引模型,此处不提供默认实现 * * @param data the data * @param index the index */ protected abstract void putIndexModel(T data, int index); @Override public Map<String, IndexModel> getIndexModels() { return INDEX_MODELS; } @Override public AtomicInteger getIndexLength() { return indexGenerator; }}
10.索引匹配接口
Match接口:提供单值和多值的与、或BitMap运算服务,以及获取最终匹配的业务数据ID集合,
public interface Match { /** * 与 运算操作 * * @param wrapper 左值 * @param fieldName 属性名 * @param fieldValue 属性值 * @return the bit map wrapper */ BitMapWrapper and(BitMapWrapper wrapper, String fieldName, String fieldValue); /** * in 与 运算操作 * * @param wrapper 左值 * @param fieldName 属性名 * @param fieldValues 属性值 * @return the bit map wrapper */ BitMapWrapper andIn(BitMapWrapper wrapper, String fieldName, String... fieldValues); /** * Not in 与 运算操作 * * @param wrapper 左值 * @param fieldName 属性名 * @param fieldValues 属性值 * @return the bit map wrapper */ BitMapWrapper notIn(BitMapWrapper wrapper, String fieldName, String... fieldValues); /** * 或 运算操作 * * @param wrapper 左值 * @param fieldName 属性名 * @param fieldValue 属性值 * @return the bit map wrapper */ BitMapWrapper or(BitMapWrapper wrapper, String fieldName, String fieldValue); /** * in 或 运算操作 * * @param wrapper 左值 * @param fieldName 属性名 * @param fieldValues 属性值 * @return the bit map wrapper */ BitMapWrapper orIn(BitMapWrapper wrapper, String fieldName, String... fieldValues); /** * 提交运算,计算出索引匹配后的数据ID集合 * * @param bitMap the bit map * @return the list */ List<String> commit(BitMapWrapper bitMap);}
11 索引匹配接口实现类
默认实现类:DefaultMatch
@Slf4jpublic class DefaultMatch implements Match { /** * 索引模型集合,Key:属性名,Val:索引模型 */ private Map<String, IndexModel> indexModels; /** * 当前规则索引自增记录器 */ private AtomicInteger indexGenerator; /** * 当前运算结果缓存 */ private BitMapWrapper resultWrapper; /** * OR运算结果缓存 */ private BitMapWrapper orResultWrapper; /** * 空结果缓存 */ private BitMapWrapper emptyWrapper; /** * 索引&业务数据仓库 */ private Repository repository; /** * Instantiates a new Default match. * * @param build the build * @param repository the repository */ public DefaultMatch(Build build, Repository repository){ this.resultWrapper = new DefaultBitMapWrapper(); this.orResultWrapper = new DefaultBitMapWrapper(); this.emptyWrapper = new DefaultBitMapWrapper(); this.indexModels = build.getIndexModels(); this.repository = repository; this.indexGenerator = build.getIndexLength(); } @Override public BitMapWrapper and(BitMapWrapper wrapper, String fieldName, String fieldValue) { if (StringUtils.isBlank(fieldValue)) { return wrapper; } wrapper = andIn(wrapper, fieldName, fieldValue); return wrapper; } @Override public BitMapWrapper andIn(BitMapWrapper wrapper, String fieldName, String... fieldValues) { IndexModel indexModel = indexModels.get(fieldName); if (indexModel == null) { if (wrapper == null) { wrapper = resultWrapper; } wrapper.clear(); return wrapper; } if (wrapper != null && wrapper.isEmpty()) { return wrapper; } try { // 属性值集合做 或 运算 for (String value : fieldValues) { orResultWrapper = indexModel.or(orResultWrapper, value, null, emptyWrapper); } // 原结果 和 翻转后结果做 与 运算 wrapper = indexModel.and(wrapper, orResultWrapper, resultWrapper, emptyWrapper); return wrapper; } finally { orResultWrapper.clear(); } } @Override public BitMapWrapper notIn(BitMapWrapper wrapper, String fieldName, String... fieldValues) { IndexModel indexModel = indexModels.get(fieldName); if (indexModel == null) { return wrapper; } if (wrapper != null && wrapper.isEmpty()) { return wrapper; } try { // 属性值集合做 或 运算 for (String value : fieldValues) { orResultWrapper = indexModel.or(orResultWrapper, value, null, emptyWrapper); } // 对结果翻转 orResultWrapper.flip(0, indexGenerator.get()); // 原结果 和 翻转后结果做 与 运算 wrapper = indexModel.and(wrapper, orResultWrapper, resultWrapper, emptyWrapper); return wrapper; } finally { orResultWrapper.clear(); } } @Override public BitMapWrapper or(BitMapWrapper wrapper, String fieldName, String fieldValue) { if (StringUtils.isBlank(fieldValue)) { return wrapper; } wrapper = orIn(wrapper, fieldName, fieldValue); return wrapper; } @Override public BitMapWrapper orIn(BitMapWrapper wrapper, String fieldName, String... fieldValues) { IndexModel indexModel = indexModels.get(fieldName); if (indexModel == null) { return wrapper; } // 属性值集合做 或 运算 for (String value : fieldValues) { wrapper = indexModel.or(wrapper, value, resultWrapper, emptyWrapper); } return wrapper; } @Override public List<String> commit(BitMapWrapper bitMap) { try { if (bitMap == null) { return null; } if (bitMap.isEmpty()) { return null; } // 提取当前索引运算后业务数据ID集合 return keyMapping(bitMap); } finally { resultWrapper.clear(); } } /** * 提取当前索引运算后业务数据ID集合 * * @param bitMap the bit map * @return the list */ private List<String> keyMapping(BitMapWrapper bitMap) { List<String> keys = new ArrayList<>(); bitMap.getDataId(i -> { CollectionUtils.addIgnoreNull(keys, repository.findDataId(i)); }); return keys; }}
12 测试类:Test
public class Test { public static void main(String[] args) { // 数据仓库 Repository repository = new DefaultRepository(); // 索引构建器 Build build = new AbstractBuild(repository) { @Override protected void putIndexModel(IndexKey data, int index) { // TODO } }; // 创建索引(模拟) build.create(new Rule("100001")); build.create(new Rule("100002")); build.create(new Rule("100003")); build.create(new Rule("100004")); // 索引匹配 Match match = new DefaultMatch(build,repository); // 航程类型,枚举值:1-单程,0-不限 BitMapWrapper wrapper = match.andIn(null,"TRAVEL_TYPE","1","0"); // 开票航司,枚举值:AA match.and(wrapper,"AIRLINE","AA"); // 起飞地,枚举值:上海(SHA),中国(CN),全球(GLOBAL) match.andIn(wrapper,"DEP_CITY","SHA","CN","GLOBAL"); // 排除起飞地,枚举值:上海(SHA),中国(CN),全球(GLOBAL) match.notIn(wrapper,"FORBID_DEP_CITY","SHA","CN","GLOBAL"); // 目的地,枚举值:洛杉矶(LAX),美国(US),北美(NOA),全球(GLOBAL) match.andIn(wrapper,"ARR_CITY","LAX","US","NOA","GLOBAL"); // 排除目的地,枚举值:洛杉矶(LAX),美国(US),北美(NOA),全球(GLOBAL) match.notIn(wrapper,"FORBID_ARR_CITY","LAX","US","NOA","GLOBAL"); // 出发日期,枚举值:2023-03-25 match.and(wrapper,"DEP_DATE","2023-03-25"); // 舱位,枚举值:V,不限(ALL) match.andIn(wrapper,"CABIN","V","ALL"); List<String> ruleIds = match.commit(wrapper); System.out.println(ruleIds); }}
写在最后
- 没有完美架构方案,只有最适合解决方案,大家需结合业务特点甄别筛选。
- 如何大家喜欢这篇文章或者感觉对自己有所帮助,盼大家多多点赞并关注。
- 如果大家有任何疑问和建议,欢迎大家评论区留言!!!
评论留言