大数据实时计算高性能解决方案

大数据
后台-插件-广告管理-内容页头部广告(手机)

大数据实时计算高性能解决方案

假设有这样一个场景:百万级数据量,每天千万级增量,几十字段匹配,单值/多值/范围,毫秒级别耗时,同时需要考虑后续数据量递增情况下,整体性能不能线性下降。我们应该采用哪种索引技术来实现匹配计算呢?

业务场景

以机票搜索场景为例,有效数据量百万+,每天增量千万+,单docker实例TPS要求1000+。下面的表格是计算机票所需要的一种规则数据,每条规则涉及到许多字段需要进行索引匹配,其中字段有多种数据类型:单值、多值、范围,多字段匹配关系支持:且、或、非。

规则数据

 
  • 行程类型,总共有三种枚举值:1-单程,2-往返,3-不限
  • 开票航司,支持多个,用/分隔
  • 出发地/目的地/排除出发地/排除目的地,支持多个,用/分隔,支持多维度,同时支持全球(GLOBAL)
  • 班期,支持多个,用/分隔
  • 舱位,支持多个,用/分隔,同时支持全部(ALL)
  • 旅行日期,支持日期范围

索引选型

索引选择原则:算法稳定、查询时间复杂度低、空间复杂度适中、支持多维查询。

 

综合对比,我们选择倒排索引+BitMap组合方式来构建索引。

BitMap特点

  • BitMap是一种数据结构,通常用于存储大量的布尔值或者整数。它将每个元素映射到一个二进制位上,使用0或1来表示该元素的状态。在实际使用中,BitMap通常被用来解决一些需要高效查找或者去重的问题。
  • BitMap的基本操作包括设置某个元素为1或0,查询某个元素的状态,以及对多个BitMap进行位运算,例如求并集、交集、差集等。由于BitMap的实现采用了位运算,因此它的操作非常高效,可以在常数时间内完成。
  • 在计算机科学中,BitMap通常使用一个长整型数组来实现。在Java中,可以使用BitSet类来实现BitMap,它提供了一系列高效的操作方法,例如set、get、and、or等等。
  • BitMap的应用非常广泛,例如在数据库中用于去重、在网络中用于IP地址的过滤、在图像处理中用于表示像素的颜色等等。

索引构建

基于规则数据,在顺序创建索引时,先采用哈希表进行双向映射,便于后续提取规则ID,结构如下:

 

按属性名逐一构建索引模型,结构如下:

 
  • 航程类型属性,按枚举拆分成0-不限,1-单程,2-往返,三个值索引,其中不限单独做为一个值进行索引
  • 开票航司属性,按/分隔符进行拆分,拆分为AA,CI,LH,MH,四个值索引
  • 出发地属性,按/分割符进行拆分,拆分为SHA,CN,HKG,GLOBAL,四个值索引,区域各层级单独作为一个值进行索引
  • 目的地属性,按/分割符进行拆分,拆分为US,TPE,EUR,NOA,GLOBAL,五个值索引,同出发地属性
  • 排除出发地属性,按/分隔符进行拆分,拆分为DFW,CN两个值索引,没有值不建索引,同出发地属性
  • 排除目的地属性,按/分隔符进行拆分,拆分为CN一个值索引,没有值不建索引,同出发地属性
  • 舱位属性,按/分隔符进行拆分,拆分为N,S,V,L,ALL,五个值索引,其中不限单独作为一个值进行索引
  • 旅行日期属性,按范围拆分到天

索引匹配

所有规则数据创建索引完成后,那如何进行索引匹配的呢,下面我们以正常搜索场景为例来进行介绍:

 

例如查询请求3月25日从上海(SHA)飞往洛杉矶(LAX)的单程航班,有一条达美航空(AA)的舱位代码为V舱的运价。按照请求参数逐一取对应属性名的索引模型,用传入数据进行BitMap索引匹配:

 
  • 航程类型:值为单程(1)和不限(0)进行匹配,匹配结果分别为1001、0010,再做或运算,结果为1011
  • 出发地:值为SHA、CN、GLOBAL进行匹配,匹配结果分别为1100、0010、0001,再做或运算,结果为1111
  • 排除出发地:值为SHA、CN、GLOBAL进行匹配,匹配结果为0001,再做取反运算,结果为1110
  • 目的地:值为LAX、US、NOA、GLOBAL进行匹配,匹配结果分别为1000、0010、0001,再做或运算,结果为1011
  • 排除目的地:值为LAX、US、NOA、GLOBAL进行匹配,没有对应的索引模型,忽略当前属性
  • 出发日期:值为2023-03-25进行匹配,匹配结果为1111开票航司:值为AA进行匹配,匹配结果为1010舱位:值为V、ALL进行匹配,分别匹配结果为1000、0011,做或运算,结果为1011单属性计算结果分别为:航程类型-1011,出发地-1110,目的地-1011,出发日期-1111,开票航司-1010,舱位-1011所有属性进行与运算,计算公式为:1011 & 1110 & 1011 & 1111 & 1010 & 1011,最终位运算结果为:1010最终取号位为0和2的规则ID,分别为100001和100003,作为当前搜索命中的两条规则ID。

以上就是索引匹配逻辑,涉及到BitMap的and、or、not等操作,通过位运算来提升匹配性能,降低数据量对查询性能线性影响。

代码实现

1.规则模型

Rule类:提供规则字段。

@Datapublic class Rule implements IndexKey {    /** 规则ID */    private String ruleId;    /** 开票航司 */    @Index(name = "AIRLINE")    private String airline;    /** 出发地 */    @Index(name = "DEP_CITY")    private String depCity;    /** 排除出发地 */    @Index(name = "FORBID_DEP_CITY")    private String forbidDepCity;    /** 目的地 */    @Index(name = "ARR_CITY")    private String arrCity;    /** 排除目的地 */    @Index(name = "FORBID_ARR_CITY")    private String forbidArrCity;    /** 起飞日期范围 */    @Index(name = "DEP_DATE")    private String depDate;    /** 舱位 */    @Index(name = "CABIN")    private String cabin;    public Rule(String ruleId) {        this.ruleId = ruleId;    }    @Override    public String getIndexKey() {        return ruleId;    }}

2.索引数据仓库接口

Repository接口:提供索引新增、删除和查询服务。

public interface Repository<T> {    /**     * 保存 索引 和 数据 的关系     */    void put(Integer index, T data);    /**     * 删除Key     */    void remove(String key);    /**     * 根据数据ID查找对应的索引ID     */    Integer findIndexId(String key);    /**     * 根据索引ID查找对应的数据ID     */    String findDataId(Integer index);}

3.索引数据仓库接口实现类

默认实现类:DefaultRepository,数据默认存储在内存。

public class DefaultRepository<T extends IndexKey> implements Repository<T> {    private final Map<Integer, String> dataMap = Maps.newConcurrentMap();    private final Map<String, Integer> indexMap = Maps.newConcurrentMap();        @Override    public void put(Integer index, T data) {        dataMap.put(index, data.getIndexKey());        indexMap.put(data.getIndexKey(), index);    }    @Override    public void remove(String key) {        Integer index = indexMap.get(key);        dataMap.remove(index);        indexMap.remove(key);    }    @Override    public Integer findIndexId(String key) {        return indexMap.get(key);    }    @Override    public String findDataId(Integer index) {        return dataMap.get(index);    }}

4.索引模型接口

IndexModel接口:提供指定属性值构建、删除和查询索引,同时提供索引and、andIn、or和orIn位运算操作。

public interface IndexModel {        /**     * 构建 属性值对应的索引模型     */    void createIndex(String fieldValue, int index);    /**     * 清除 指定属性值对应的索引模型     */    void clear(String fieldValue, Integer index);    /**     * 获取 属性值 对应的索引模型     */    BitMapWrapper get(String fieldValue);    /**     * 与 运算     */    BitMapWrapper and(BitMapWrapper left, String fieldValue);    /**     * 与 运算     */    BitMapWrapper and(BitMapWrapper left, BitMapWrapper right);    /**     * 与 运算     */    BitMapWrapper and(BitMapWrapper left, String fieldValue, BitMapWrapper defaultLeft, BitMapWrapper defaultRight);    /**     * 与 运算     */    BitMapWrapper and(BitMapWrapper left, BitMapWrapper right, BitMapWrapper defaultLeft, BitMapWrapper defaultRight);    /**     * 或 运算     */    BitMapWrapper or(BitMapWrapper left, String fieldValue);    /**     * 或 运算     */    BitMapWrapper or(BitMapWrapper left, BitMapWrapper right);    /**     * 或 运算     */    BitMapWrapper or(BitMapWrapper left, String fieldValue, BitMapWrapper defaultLeft, BitMapWrapper defaultRight);    /**     * 或 运算     */    BitMapWrapper or(BitMapWrapper left, BitMapWrapper right, BitMapWrapper defaultLeft, BitMapWrapper defaultRight);}

5.索引模型接口实现类

默认实现类:DefaultIndexModel

public class DefaultIndexModel implements IndexModel {    private final Map<String, BitMapWrapper> valueMap = new ConcurrentHashMap<>();        @Override    public void createIndex(String fieldValue, int index) {        valueMap.computeIfAbsent(fieldValue, k -> buildBitMapWrapper()).set(index);    }    @Override    public void clear(String fieldValue, Integer index) {        if (!valueMap.containsKey(fieldValue)) {            return;        }        valueMap.get(fieldValue).clear(index);    }    @Override    public BitMapWrapper get(String fieldValue) {        return valueMap.get(fieldValue);    }    @Override    public BitMapWrapper and(BitMapWrapper left, String fieldValue) {        return and(left, getBitMapWrapper(fieldValue));    }    @Override    public BitMapWrapper and(BitMapWrapper left, BitMapWrapper right) {        if (right == null) {            return left;        }        if (left == null) {            left = buildBitMapWrapper();            left.or(right);        } else {            left.and(right);        }        return left;    }    @Override    public BitMapWrapper and(BitMapWrapper left, String fieldValue, BitMapWrapper defaultLeft, BitMapWrapper defaultRight) {        BitMapWrapper right = valueMap.getOrDefault(fieldValue,defaultRight);        if (left == null) {            left = defaultLeft;            left.or(right);        } else {            left.and(right);        }        return left;    }    @Override    public BitMapWrapper and(BitMapWrapper left, BitMapWrapper right, BitMapWrapper defaultLeft, BitMapWrapper defaultRight) {        if (right == null) {            right = defaultRight;        }        if (left == null) {            left = defaultLeft;            left.or(right);        } else {            left.and(right);        }        return left;    }    @Override    public BitMapWrapper or(BitMapWrapper left, String fieldValue) {        return or(left, getBitMapWrapper(fieldValue));    }    @Override    public BitMapWrapper or(BitMapWrapper left, BitMapWrapper right) {        if (right == null) {            return left;        }        if (left == null) {            left = buildBitMapWrapper();        }        left.or(right);        return left;    }    @Override    public BitMapWrapper or(BitMapWrapper left, String fieldValue, BitMapWrapper defaultLeft, BitMapWrapper defaultRight) {        if (left == null) {            left = defaultLeft;        }        if (!valueMap.containsKey(fieldValue)) {            return left;        }        BitMapWrapper right = valueMap.get(fieldValue);        left.or(right);        return left;    }    @Override    public BitMapWrapper or(BitMapWrapper left, BitMapWrapper right, BitMapWrapper defaultLeft, BitMapWrapper defaultRight) {        if (left == null) {            left = defaultLeft;        }        if (right == null) {            return left;        }        left.or(right);        return left;    }    public BitMapWrapper getBitMapWrapper(String fieldValue) {        return valueMap.computeIfAbsent(fieldValue,k -> buildBitMapWrapper());    }    private BitMapWrapper buildBitMapWrapper() {        return new DefaultBitMapWrapper();    }}

6.BitMap包装类接口

BitMapWrapper接口:提供BitSet位运算基础操作,且、或、翻转等运算,同时提供设置、清除、查询业务数据ID等服务。

public interface BitMapWrapper {    /**     * 且 运算     */    void and(BitMapWrapper wrapper);    /**     * 或 运算     */    void or(BitMapWrapper wrapper);    /**     * 翻转 运算     */    void flip(int fromIndex, int toIndex);    /**     * 指定号位 设置 为1     */    void set(int bitIndex);    /**     * 清除指定号位(设置 为0)     */    void clear(int bitIndex);    /**     * 清除 BitMap     */    void clear();    /**     * 判断是否为空     */    boolean isEmpty();    /**     * 获取源目标对象     */    Object getTarget();    /**     * 获取业务数据ID     */    void getDataId(Consumer<Integer> consumer);   }

7.BitMap包装类接口实现类

默认实现类:DefaultBitMapWrapper

public class DefaultBitMapWrapper implements BitMapWrapper {    private final BitSet bitSet = new BitSet();        @Override    public void and(BitMapWrapper wrapper) {        bitSet.and((BitSet) wrapper.getTarget());    }    @Override    public void or(BitMapWrapper wrapper) {        bitSet.or((BitSet) wrapper.getTarget());    }    @Override    public void flip(int fromIndex, int toIndex) {        bitSet.flip(fromIndex, toIndex);    }    @Override    public void set(int bitIndex) {        bitSet.set(bitIndex);    }    @Override    public void clear(int bitIndex) {        bitSet.clear(bitIndex);    }    @Override    public void clear() {        bitSet.clear();    }    @Override    public boolean isEmpty() {        return bitSet.isEmpty();    }    @Override    public Object getTarget() {        return bitSet;    }    @Override    public void getDataId(Consumer<Integer> consumer) {        int i = bitSet.nextSetBit(0);        if (i != -1) {            consumer.accept(i);            for (i = bitSet.nextSetBit(i + 1); i >= 0; i = bitSet.nextSetBit(i + 1)) {                int endOfRun = bitSet.nextClearBit(i);                do {                    consumer.accept(i);                } while (++i < endOfRun);            }        }    }}

8.索引构建器接口

Build接口:提供创建、获取当前规则所有属性名对应的索引模型和查询当前索引长度。

public interface Build<T> {    /**     * 创建索引     */    void create(T data);    /**     * 获取所有属性名对应的索引模型     */    Map<String,IndexModel> getIndexModels();    /**     * 获取当前索引长度     */    AtomicInteger getIndexLength();}

9.索引构建器接口实现类:

抽象实现类:AbstractBuild,实现接口相关服务,提供添加索引模型抽象方法,需自定义实现。

public abstract class AbstractBuild<T extends IndexKey> implements Build<T> {    private final Map<String, IndexModel> INDEX_MODELS = Maps.newConcurrentMap();    private final AtomicInteger  indexGenerator  = new AtomicInteger();    private Repository<T> repository;    public AbstractBuild(Repository<T> repository){        this.repository = repository;    }        @Override    public void create(T data) {        doCreateIndex(data,indexGenerator.getAndIncrement());    }    /**     * Do create index.     *     * @param data  the data     * @param index the index     */    private void doCreateIndex(T data, int index){        // 获取当前模型所有属性,根据标记索引属性名构建对应的索引模型,添加到INDEX_MODELS中        putIndexModel(data,index);        repository.put(index,data);    }    /**     * 添加索引模型,此处不提供默认实现     *     * @param data  the data     * @param index the index     */    protected abstract void putIndexModel(T data, int index);    @Override    public Map<String, IndexModel> getIndexModels() {        return INDEX_MODELS;    }    @Override    public AtomicInteger getIndexLength() {        return indexGenerator;    }}

10.索引匹配接口

Match接口:提供单值和多值的与、或BitMap运算服务,以及获取最终匹配的业务数据ID集合,

public interface Match {    /**     * 与 运算操作     *     * @param wrapper    左值     * @param fieldName  属性名     * @param fieldValue 属性值     * @return the bit map wrapper     */    BitMapWrapper and(BitMapWrapper wrapper, String fieldName, String fieldValue);    /**     * in 与 运算操作     *     * @param wrapper     左值     * @param fieldName   属性名     * @param fieldValues 属性值     * @return the bit map wrapper     */    BitMapWrapper andIn(BitMapWrapper wrapper, String fieldName, String... fieldValues);    /**     * Not in 与 运算操作     *     * @param wrapper     左值     * @param fieldName   属性名     * @param fieldValues 属性值     * @return the bit map wrapper     */    BitMapWrapper notIn(BitMapWrapper wrapper, String fieldName, String... fieldValues);    /**     * 或 运算操作     *     * @param wrapper    左值     * @param fieldName  属性名     * @param fieldValue 属性值     * @return the bit map wrapper     */    BitMapWrapper or(BitMapWrapper wrapper, String fieldName, String fieldValue);    /**     * in 或 运算操作     *     * @param wrapper     左值     * @param fieldName   属性名     * @param fieldValues 属性值     * @return the bit map wrapper     */    BitMapWrapper orIn(BitMapWrapper wrapper, String fieldName, String... fieldValues);    /**     * 提交运算,计算出索引匹配后的数据ID集合     *     * @param bitMap the bit map     * @return the list     */    List<String> commit(BitMapWrapper bitMap);}

11 索引匹配接口实现类

默认实现类:DefaultMatch

@Slf4jpublic class DefaultMatch implements Match {    /**     * 索引模型集合,Key:属性名,Val:索引模型     */    private Map<String, IndexModel> indexModels;    /**     * 当前规则索引自增记录器     */    private AtomicInteger indexGenerator;    /**     * 当前运算结果缓存     */    private BitMapWrapper resultWrapper;    /**     * OR运算结果缓存     */    private  BitMapWrapper orResultWrapper;    /**     * 空结果缓存     */    private BitMapWrapper  emptyWrapper;    /**     * 索引&业务数据仓库     */    private Repository repository;    /**     * Instantiates a new Default match.     *     * @param build      the build     * @param repository the repository     */    public DefaultMatch(Build build, Repository repository){        this.resultWrapper = new DefaultBitMapWrapper();        this.orResultWrapper = new DefaultBitMapWrapper();        this.emptyWrapper = new DefaultBitMapWrapper();        this.indexModels = build.getIndexModels();        this.repository = repository;        this.indexGenerator = build.getIndexLength();    }    @Override    public BitMapWrapper and(BitMapWrapper wrapper, String fieldName, String fieldValue) {        if (StringUtils.isBlank(fieldValue)) {            return wrapper;        }        wrapper = andIn(wrapper, fieldName, fieldValue);        return wrapper;    }    @Override    public BitMapWrapper andIn(BitMapWrapper wrapper, String fieldName, String... fieldValues) {        IndexModel indexModel = indexModels.get(fieldName);        if (indexModel == null) {            if (wrapper == null) {                wrapper = resultWrapper;            }            wrapper.clear();            return wrapper;        }        if (wrapper != null && wrapper.isEmpty()) {            return wrapper;        }        try {            // 属性值集合做 或 运算            for (String value : fieldValues) {                orResultWrapper = indexModel.or(orResultWrapper, value, null, emptyWrapper);            }            // 原结果 和 翻转后结果做 与 运算            wrapper = indexModel.and(wrapper, orResultWrapper, resultWrapper, emptyWrapper);            return wrapper;        } finally {            orResultWrapper.clear();        }    }    @Override    public BitMapWrapper notIn(BitMapWrapper wrapper, String fieldName, String... fieldValues) {        IndexModel indexModel = indexModels.get(fieldName);        if (indexModel == null) {            return wrapper;        }        if (wrapper != null && wrapper.isEmpty()) {            return wrapper;        }        try {            // 属性值集合做 或 运算            for (String value : fieldValues) {                orResultWrapper = indexModel.or(orResultWrapper, value, null, emptyWrapper);            }            // 对结果翻转            orResultWrapper.flip(0, indexGenerator.get());            // 原结果 和 翻转后结果做 与 运算            wrapper = indexModel.and(wrapper, orResultWrapper, resultWrapper, emptyWrapper);            return wrapper;        } finally {            orResultWrapper.clear();        }    }    @Override    public BitMapWrapper or(BitMapWrapper wrapper, String fieldName, String fieldValue) {        if (StringUtils.isBlank(fieldValue)) {            return wrapper;        }        wrapper = orIn(wrapper, fieldName, fieldValue);        return wrapper;    }    @Override    public BitMapWrapper orIn(BitMapWrapper wrapper, String fieldName, String... fieldValues) {        IndexModel indexModel = indexModels.get(fieldName);        if (indexModel == null) {            return wrapper;        }        // 属性值集合做 或 运算        for (String value : fieldValues) {            wrapper = indexModel.or(wrapper, value, resultWrapper, emptyWrapper);        }        return wrapper;    }    @Override    public List<String> commit(BitMapWrapper bitMap) {        try {            if (bitMap == null) {                return null;            }            if (bitMap.isEmpty()) {                return null;            }            // 提取当前索引运算后业务数据ID集合            return keyMapping(bitMap);        } finally {            resultWrapper.clear();        }    }    /**     * 提取当前索引运算后业务数据ID集合     *     * @param bitMap the bit map     * @return the list     */    private List<String> keyMapping(BitMapWrapper bitMap) {        List<String> keys = new ArrayList<>();        bitMap.getDataId(i -> {            CollectionUtils.addIgnoreNull(keys, repository.findDataId(i));        });        return keys;    }}

12 测试类:Test

public class Test {    public static void main(String[] args) {        // 数据仓库        Repository repository = new DefaultRepository();                // 索引构建器        Build build = new AbstractBuild(repository) {            @Override            protected void putIndexModel(IndexKey data, int index) {                // TODO            }        };        // 创建索引(模拟)        build.create(new Rule("100001"));        build.create(new Rule("100002"));        build.create(new Rule("100003"));        build.create(new Rule("100004"));                // 索引匹配        Match match = new DefaultMatch(build,repository);        // 航程类型,枚举值:1-单程,0-不限        BitMapWrapper wrapper = match.andIn(null,"TRAVEL_TYPE","1","0");        // 开票航司,枚举值:AA        match.and(wrapper,"AIRLINE","AA");        // 起飞地,枚举值:上海(SHA),中国(CN),全球(GLOBAL)        match.andIn(wrapper,"DEP_CITY","SHA","CN","GLOBAL");        // 排除起飞地,枚举值:上海(SHA),中国(CN),全球(GLOBAL)        match.notIn(wrapper,"FORBID_DEP_CITY","SHA","CN","GLOBAL");        // 目的地,枚举值:洛杉矶(LAX),美国(US),北美(NOA),全球(GLOBAL)        match.andIn(wrapper,"ARR_CITY","LAX","US","NOA","GLOBAL");        // 排除目的地,枚举值:洛杉矶(LAX),美国(US),北美(NOA),全球(GLOBAL)        match.notIn(wrapper,"FORBID_ARR_CITY","LAX","US","NOA","GLOBAL");        // 出发日期,枚举值:2023-03-25        match.and(wrapper,"DEP_DATE","2023-03-25");        // 舱位,枚举值:V,不限(ALL)        match.andIn(wrapper,"CABIN","V","ALL");        List<String> ruleIds = match.commit(wrapper);        System.out.println(ruleIds);    }}

写在最后

  • 没有完美架构方案,只有最适合解决方案,大家需结合业务特点甄别筛选。
  • 如何大家喜欢这篇文章或者感觉对自己有所帮助,盼大家多多点赞并关注。
  • 如果大家有任何疑问和建议,欢迎大家评论区留言!!!

后台-插件-广告管理-内容页尾部广告(手机)
标签:

评论留言

我要留言

◎欢迎参与讨论,请在这里发表您的看法、交流您的观点。