1.背景
Apache ShardingSphere支持数据加密。通过解析用户的SQL输入并根据用户的加密规则重写SQL,将原始数据加密并与基础数据库中的密文数据一起存储。
>当用户查询数据时,它只从数据库中获取密文数据,解密它,最后将解密的原始数据返回给用户。但是,由于加密算法加密整个字符串,因此无法实现模糊查询。
但是,在数据加密后,许多企业仍然需要模糊的查询。 In version 5.3.0,Apache shardingsphere为用户提供默认模糊查询算法,支持对加密字段的模糊查询。该算法还支持热插入,可以由用户自定义,并且可以通过配置来实现模糊查询。
2.如何在加密场景中实现模糊查询?
2.1将数据加载到内存数据库(IMDB)
将所有数据加载到IMDB中以解密其;然后,它将就像查询原始数据一样。此方法可以实现模糊的查询。如果数据量很小,则此方法将被证明是简单且具有成本效益的,而另一方面,如果数据量很大,则结果将是一场灾难。
2.2实施加密和解密功能与数据库程序一致
第二种方法是修改模糊查询条件,并使用数据库解密函数首先解密数据,然后实现模糊查询。这种方法的优势是实施和开发成本低以及使用成本。
用户只需要稍微修改先前的模糊查询条件即可。但是,密文和加密函数存储在数据库中,该功能无法应对帐户数据泄漏的问题。
Native SQL: select * from user where name like "%xxx%"
After implementing the decryption function: ѕеlесt * frоm uѕеr whеrе dесоdе(namе) lіkе "%ххх%"
2.3数据掩盖后存储
在密文上实现数据掩码,然后将其存储在模糊查询列中。这种方法可能缺乏精度。
For example, mobile number 13012345678 becomes 130****5678 after the masking algorithm is performed.
2.4执行令牌和组合后执行加密存储
此方法在密文数据上执行令牌化和组合,然后通过对固定长度进行分组并将一个字段分为多个字段来对结果集进行加密。例如,我们将四个英语字符和两个汉字作为查询条件:
ningyu1
使用4个字符作为一个组来加密,因此第一组是ning
,第二组ingy
,第三组ngyu
,第四组gyu1
等。所有字符均已加密并存储在模糊查询列中。如果要检索包含四个字符的所有数据,例如ingy
,请加密字符并使用键like"%partial%"
查询。
缺点:
- 增加存储成本:免费分组将增加数据的量,并且在加密后将增加数据长度。
- 模糊查询的长度有限:由于安全问题,自由分组的长度不能太短,否则rainbow table很容易破解。就像我上面提到的示例一样,模糊查询字符的长度必须大于或等于4个字母/数字,或2个汉字。
2.5单个字符摘要算法(在shardingsphere version 5.3.0中提供的默认模糊查询算法)
尽管上述方法都是可行的,但自然而然地想知道是否有更好的选择。在我们的社区中,我们发现单字符加密和存储可以平衡性能和查询,但无法满足安全要求。
那么理想的解决方案是什么?受掩盖算法和加密哈希功能的启发,我们发现数据丢失和单向功能可以使用。
加密哈希功能应具有以下四个功能:
- 对于任何给定的消息,应该很容易计算哈希值。
- 应该很难从已知的哈希值中推断出原始消息。
- 在不更改哈希值的情况下修改消息不应该是可行的。
- 只有两个不同的消息产生相同的哈希值的机会只有很低的机会。 安全性:由于具有单向功能,因此无法推断原始消息。为了提高模糊查询的准确性,我们想加密一个字符,但会被彩虹桌破解。
因此,我们采用一个单向功能(以确保加密后每个字符都相同)并增加碰撞的频率(以确保每个字符串都是1:n向后),从而极大地增强了安全性。
3.模糊查询算法
apache shardingsphere使用以下单字符摘要算法org.apache.shardingsphere.encrypt.algorithm.like.CharDigestLikeEncryptAlgorithm
实现通用模糊查询算法。
public final class CharDigestLikeEncryptAlgorithm implements LikeEncryptAlgorithm<Object, String> {
private static final String DELTA = "delta";
private static final String MASK = "mask";
private static final String START = "start";
private static final String DICT = "dict";
private static final int DEFAULT_DELTA = 1;
private static final int DEFAULT_MASK = 0b1111_0111_1101;
private static final int DEFAULT_START = 0x4e00;
private static final int MAX_NUMERIC_LETTER_CHAR = 255;
@Getter
private Properties props;
private int delta;
private int mask;
private int start;
private Map<Character, Integer> charIndexes;
@Override
public void init(final Properties props) {
this.props = props;
delta = createDelta(props);
mask = createMask(props);
start = createStart(props);
charIndexes = createCharIndexes(props);
}
private int createDelta(final Properties props) {
if (props.containsKey(DELTA)) {
String delta = props.getProperty(DELTA);
try {
return Integer.parseInt(delta);
} catch (NumberFormatException ex) {
throw new EncryptAlgorithmInitializationException("CHAR_DIGEST_LIKE", "delta can only be a decimal number");
}
}
return DEFAULT_DELTA;
}
private int createMask(final Properties props) {
if (props.containsKey(MASK)) {
String mask = props.getProperty(MASK);
try {
return Integer.parseInt(mask);
} catch (NumberFormatException ex) {
throw new EncryptAlgorithmInitializationException("CHAR_DIGEST_LIKE", "mask can only be a decimal number");
}
}
return DEFAULT_MASK;
}
private int createStart(final Properties props) {
if (props.containsKey(START)) {
String start = props.getProperty(START);
try {
return Integer.parseInt(start);
} catch (NumberFormatException ex) {
throw new EncryptAlgorithmInitializationException("CHAR_DIGEST_LIKE", "start can only be a decimal number");
}
}
return DEFAULT_START;
}
private Map<Character, Integer> createCharIndexes(final Properties props) {
String dictContent = props.containsKey(DICT) && !Strings.isNullOrEmpty(props.getProperty(DICT)) ? props.getProperty(DICT) : initDefaultDict();
Map<Character, Integer> result = new HashMap<>(dictContent.length(), 1);
for (int index = 0; index < dictContent.length(); index++) {
result.put(dictContent.charAt(index), index);
}
return result;
}
@SneakyThrows
private String initDefaultDict() {
InputStream inputStream = CharDigestLikeEncryptAlgorithm.class.getClassLoader().getResourceAsStream("algorithm/like/common_chinese_character.dict");
LineProcessor<String> lineProcessor = new LineProcessor<String>() {
private final StringBuilder builder = new StringBuilder();
@Override
public boolean processLine(final String line) {
if (line.startsWith("#") || 0 == line.length()) {
return true;
} else {
builder.append(line);
return false;
}
}
@Override
public String getResult() {
return builder.toString();
}
};
return CharStreams.readLines(new InputStreamReader(inputStream, Charsets.UTF_8), lineProcessor);
}
@Override
public String encrypt(final Object plainValue, final EncryptContext encryptContext) {
return null == plainValue ? null : digest(String.valueOf(plainValue));
}
private String digest(final String plainValue) {
StringBuilder result = new StringBuilder(plainValue.length());
for (char each : plainValue.toCharArray()) {
char maskedChar = getMaskedChar(each);
if ('%' == maskedChar) {
result.append(each);
} else {
result.append(maskedChar);
}
}
return result.toString();
}
private char getMaskedChar(final char originalChar) {
if ('%' == originalChar) {
return originalChar;
}
if (originalChar <= MAX_NUMERIC_LETTER_CHAR) {
return (char) ((originalChar + delta) & mask);
}
if (charIndexes.containsKey(originalChar)) {
return (char) (((charIndexes.get(originalChar) + delta) & mask) + start);
}
return (char) (((originalChar + delta) & mask) + start);
}
@Override
public String getType() {
return "CHAR_DIGEST_LIKE";
}
}
- 将二进制
mask
代码定义为失去精度0b1111_0111_1101
(mask)。 - 像
map
词典一样,保存具有破坏顺序的普通汉字。 - 获取一串
Unicode
,用于数字,英语和拉丁语。 - 获取属于字典的汉字的
index
。 - 其他字符获取单字符串的
Unicode
。 - 将
1 (delta)
添加到上面不同类型的数字中,以防止任何原始文本出现在数据库中。 - 然后将偏移
Unicode
转换为二进制,并使用mask
执行AND
操作,然后造成2位数字损失。 - 失去精度后,直接输出数字,英语和拉丁语。
- 在失去精确度后,其余字符被转换为十进制和输出,并用共同的字符
start
代码。 ## 4.模糊算法开发进度 ### 4.1第一版 只需使用Unicode
和mask
常见字符代码执行AND
操作即可。
Mask: 0b11111111111001111101
The original character: 0b1000101110101111讯
After encryption: 0b1000101000101101設
Assuming we know the key and encryption algorithm, the original string after a backward pass is:
1.0b1000101100101101 謭
2.0b1000101100101111 謯
3.0b1000101110101101 训
4.0b1000101110101111 讯
5.0b1000101010101101 読
6.0b1000101010101111 誯
7.0b1000101000101111 訯
8.0b1000101000101101 設
我们发现,基于缺失的位,每个字符串都可以派生为2^n
中文字符。当普通汉字的Unicode
是十进制的小数时,它们的间隔很大。请注意,向后推断的汉字不是普通字符,并且更有可能推断原始字符。
4.2第二版
由于普通汉字的间隔Unicode
是不规则的,因此我们计划留下最后几位汉字Unicode
,并将其转换为小数为index
,以获取一些常见的汉字。这样,当已知算法时,倒退后不常见的字符就不会出现,并且干扰器不再容易消除。
如果我们留下最后几位汉字Unicode
,它与模糊查询的准确性与抗注定复杂性之间的关系有关。精度越高,解密难度越低。
让我们看一下我们算法下的普通汉字的碰撞程度:
- 当
mask
= 0b0011_1111_1111:
- 当
mask
= 0b0001_1111_1111:
对于汉字的曼蒂萨(Mantissa),请留下10和9位数字。 10位查询更准确,因为其碰撞要弱得多。然而,如果已知算法和键,则可以向后派生1:1的原始文本。
9位数的查询不太准确,因为9位碰撞相对较强,但是1:1个字符较少。我们发现,尽管无论是留下10个数字还是9位数字,我们都会改变碰撞,但由于汉字的不规则Unicode,分布非常不平衡,并且无法控制整体碰撞概率。
4.3第三版
响应第二版中发现的不均匀分布的问题,我们以中断的顺序为词典表。
-
首先,加密的文本在排序词典表中查找
index
。我们使用index
和下标无需规则替换Unicode
。
如果不常见的字符,请使用Unicode
。 (注意:均匀分发代码要尽可能地计算。) -
下一步是用
mask
执行AND
操作,并丢失2位精度以增加碰撞的频率。
让我们看一下我们算法下的普通汉字的碰撞程度:
- 当
mask
= 0b1111_1011_1101:
- 当
mask
= 0b0111_1011_1101:
当mask
留下11位时,您会看到碰撞分布集中在1:4。当mask
留下10位时,数字变为1:8。目前,我们只需要调整精度损失的数量即可控制碰撞是否为1:2、1:4或1:8。
如果选择了mask
为1,并且已知算法和键,则会有1:1的汉字,因为我们目前计算的是常见字符的碰撞程度。如果我们在汉字的16位二进制之前添加缺少的4位,则情况变为2^5=32
案例。
由于我们对整个文本进行了加密,即使向后推断单个字符,也不会对整体安全产生影响,也不会导致质量数据泄漏。同时,向后通行证的前提是了解算法,键,delta
和字典,因此无法从数据库中的数据中实现。
5.如何使用模糊查询
模糊查询需要encryptors
(加密算法配置),likeQueryColumn
(模糊查询列名称)和likeQueryEncryptorName
(Fuzzy查询列的加密算法)中的配置。
请参阅以下配置。添加您自己的碎片算法和数据源。
dataSources:
ds_0:
dataSourceClassName: com.zaxxer.hikari.HikariDataSource
driverClassName: com.mysql.jdbc.Driver
jdbcUrl: jdbc:mysql://127.0.0.1:3306/test?allowPublicKeyRetrieval=true
username: root
password: root
rules:
- !ENCRYPT
encryptors:
like_encryptor:
type: CHAR_DIGEST_LIKE
aes_encryptor:
type: AES
props:
aes-key-value: 123456abc
tables:
user:
columns:
name:
cipherColumn: name
encryptorName: aes_encryptor
assistedQueryColumn: name_ext
assistedQueryEncryptorName: aes_encryptor
likeQueryColumn: name_like
likeQueryEncryptorName: like_encryptor
phone:
cipherColumn: phone
encryptorName: aes_encryptor
likeQueryColumn: phone_like
likeQueryEncryptorName: like_encryptor
queryWithCipherColumn: true
props:
sql-show: true
插入
Logic SQL: insert into user ( id, name, phone, sex) values ( 1, '熊高祥', '13012345678', '男')
Actual SQL: ds_0 ::: insert into user ( id, name, name_ext, name_like, phone, phone_like, sex) values (1, 'gyVPLyhIzDIZaWDwTl3n4g==', 'gyVPLyhIzDIZaWDwTl3n4g==', '佹堝偀', 'qEmE7xRzW0d7EotlOAt6ww==', '04101454589', '男')
更新
Logic SQL: update user set name = '熊高祥123', sex = '男1' where sex ='男' and phone like '130%'
Actual SQL: ds_0 ::: update user set name = 'K22HjufsPPy4rrf4PD046A==', name_ext = 'K22HjufsPPy4rrf4PD046A==', name_like = '佹堝偀014', sex = '男1' where sex ='男' and phone_like like '041%'
选择
Logic SQL: select * from user where (id = 1 or phone = '13012345678') and name like '熊%'
Actual SQL: ds_0 ::: select `user`.`id`, `user`.`name` AS `name`, `user`.`sex`, `user`.`phone` AS `phone`, `user`.`create_time` from user where (id = 1 or phone = 'qEmE7xRzW0d7EotlOAt6ww==') and name_like like '佹%'
选择:联合表格子查询
Logic SQL: select * from user LEFT JOIN user_ext on user.id=user_ext.id where user.id in (select id from user where sex = '男' and name like '熊%')
Actual SQL: ds_0 ::: select `user`.`id`, `user`.`name` AS `name`, `user`.`sex`, `user`.`phone` AS `phone`, `user`.`create_time`, `user_ext`.`id`, `user_ext`.`address` from user LEFT JOIN user_ext on user.id=user_ext.id where user.id in (select id from user where sex = '男' and name_like like '佹%')
删除
Logic SQL: delete from user where sex = '男' and name like '熊%'
Actual SQL: ds_0 ::: delete from user where sex = '男' and name_like like '佹%'
上面的示例演示了模糊查询列如何在不同的SQL语法中重写SQL以支持模糊查询。
本博客文章向您介绍了模糊查询的工作原理,并使用了特定示例来演示如何使用它。我们希望通过本文,您将获得对模糊查询的基本了解。
链接
作者
xiong gaoxiang是Iflytek的工程师和Shardingsphere的贡献者,负责数据加密和数据掩盖R&D。