一文读懂Merkle-Damgård
Merkle-Damgård构造是Ralph Merkle在1979年设计的,然后由Merkle和Ivan Damgårrd共同完成了结构正确性的证明,具体结构如下:
看到上面的结构,相信看过md5、sha1、sha256等算法的读者应该都不会陌生,因为这些算法除了f函数不太一样之外,整个算法的流程和上面的过程如出一辙,本文就来具体的聊一聊这个结构背后的故事。
预备知识
首先来回顾一下有关哈希函数的基础知识,本文的符号均参考自 「密码学原理与实践」一书。
哈希函数的定义
给定如下的三元组即可表示一个哈希函数,这里我们不考虑带密钥的哈希函数,因此对书中的内容有一部分的删改(不带密钥的哈希函数实际上是带密钥哈希函数的一个子集,为了简化,本文只考虑这一种)。
-
是所有的消息集合,一般来说,是指的任意的二进制消息 -
是所有的消息摘要构成的集合 -
存在一个哈希函数 ,
哈希函数安全性
对于哈希函数的安全性,对于如下三种情况应该都是难解的。
抗原像攻击
-
实例: 给定Hash函数 h: 和 -
找出: s.t.
说人话,就是给定一个计算之后的哈希值,能够找到计算哈希值之前的消息。
抗第二原像攻击
-
实例: 给定Hash函数 h: 和 -
找出 s.t. ,并且
说人话,实际上就是能够找到一个不同于原来的消息,使得他们计算之后的摘要相同。
抗碰撞攻击
-
实例: 给定Hash函数 h : -
找出: s.t. 并且
说人话,就是要我们找到两个不同的消息,使得他们计算之后的摘要相同。
迭代哈希函数
假设压缩函数 compress: ,基于这个压缩函数构造出来一个迭代压缩函数
迭代哈希函数的求值主要是分为如下的三步
0x1 预处理
给定任意一个输入的比特串消息x,其中 ,用一个公开的算法构造一个串y,s.t. ,记作:
0x2 处理
IV是一个初始的公开的比特串,计算过程如下:
0x3 输出变换
这一步实际上是可选的,我们可以直接输出 当结果,也可以对 再次做一个别的函数变换。
Merkle-Damgård结构
通过上面我们对于迭代哈希函数的讲解,我们很容易的可以得到Merkle-Damgård结构,其中我们只需要对前面预处理的过程做一点点的微调,我们对于预处理的方式采用如下的方式:
其中具体的padding方案,参考最上面的图,这里就不过多解释了,相信看过其他算法的读者应该对于这个padding的算法不会陌生,经过证明,如果compress是碰撞稳固的,那么h也是碰撞稳固的,反之亦然,对于具体的详细证明,这里就不给展开讲了,有兴趣的读者可以自行翻阅一下 「密码学原理与实践」一书,第四章的相关章节,里面给出了详细的证明过程,好了这块结构就到这里了,好了,到这里,我又水了这么多字,溜了溜了。
代码实现
因为这个结构并不是针对某种特定的哈希函数而来的,而是针对于一系列的哈希函数,因此呢,这里的代码我也只提供一个框架,在这个框架下面实现compress函数之后,其实就可以实现出来一大批的哈希函数了。
Python
这里还是先给一个Python的版本吧,因为之前写过了TigerHash,因此只要删掉compress的实现,这个结构实际上就出来了,溜了溜了。不过这里唯一需要注意的一点是大小端的问题,看一下对应的算法在提取消息的过程当中是大端还是小端,我之前没注意在实现的时候被坑了一把。
import io
import struct
from typing import List
def compress(state: List[int], block):
pass
class MerkleDamgardConstruction:
def __init__(self):
# Initial digest variables
self._h = [
]
# bytes object with 0 <= len < 64 used to store the end of the message
# if the message length is not congruent to 64
self._unprocessed = b''
# Length in bytes of all data that has been processed so far
self._message_byte_length = 0
self._digest = None
self.block_size = 64
def digest(self) -> bytes:
self.final()
return self._digest
def hexdigest(self):
"""Produce the final hash value (big-endian) as a hex string"""
return self.digest().hex()
def update(self, message: bytes):
if isinstance(message, (bytes, bytearray)):
message = io.BytesIO(message)
block = self._unprocessed + message.read(self.block_size - len(self._unprocessed))
# Read the rest of the data, 64 bytes at a time
while len(block) == self.block_size:
compress(self._h, block)
self._message_byte_length += self.block_size
block = message.read(self.block_size)
self._unprocessed = block
return self
def final(self):
"""Return finalized digest variables for the data processed so far."""
# Pre-processing:
message = self._unprocessed
message_byte_length = self._message_byte_length + len(message)
# append the bit '1' to the message
message += b'x01'
# append 0 <= k < 512 bits '0', so that the resulting message length (in bytes)
# is congruent to 56 (mod 64)
message += b'x00' * ((56 - (message_byte_length + 1) % self.block_size) % self.block_size)
# append length of message (before pre-processing), in bits, as 64-bit big-endian integer
message_bit_length = message_byte_length * 8
message += struct.pack(b'<Q', message_bit_length)
# Process the final chunk
# At this point, the length of the message is either 64 or 128 bytes.
compress(self._h, message[:self.block_size])
if len(message) == self.block_size:
self._digest = struct.pack(f"<{len(self._h)}Q", *self._h)
return
compress(self._h, message[self.block_size:])
self._digest = struct.pack(f"<{len(self._h)}Q", *self._h)
Go
对于go语言,官方的实现其实就不错,这里我直接搬运下官方源码当中对于这个的实现了。我不生产代码,我只是代码的搬运工,这里同样的需要处理下字节序的问题。
import (
"encoding/binary"
"hash"
)
// Size The size of an MD5 checksum in bytes.
const Size = 16
// BlockSize The blocksize of MD5 in bytes.
const BlockSize = 64
// digest represents the partial evaluation of a checksum.
type digest struct {
s [4]uint32
x [BlockSize]byte
nx int
len uint64
}
func (d *digest) Reset() {
// init iv
d.nx = 0
d.len = 0
}
// New returns a new hash.Hash computing the MD5 checksum. The Hash also
// implements encoding.BinaryMarshaler and encoding.BinaryUnmarshaler to
// marshal and unmarshal the internal state of the hash.
func New() hash.Hash {
d := new(digest)
d.Reset()
return d
}
func (d *digest) Write(p []byte) (nn int, err error) {
// Note that we currently call block or blockGeneric
// directly (guarded using haveAsm) because this allows
// escape analysis to see that p and d don't escape.
nn = len(p)
d.len += uint64(nn)
if d.nx > 0 {
n := copy(d.x[d.nx:], p)
d.nx += n
if d.nx == BlockSize {
compress(d, d.x[:])
d.nx = 0
}
p = p[n:]
}
if len(p) >= BlockSize {
n := len(p) &^ (BlockSize - 1)
compress(d, p[:n])
p = p[n:]
}
if len(p) > 0 {
d.nx = copy(d.x[:], p)
}
return
}
func (d *digest) Sum(in []byte) []byte {
// Make a copy of d so that caller can keep writing and summing.
d0 := *d
hash := d0.checkSum()
return append(in, hash[:]...)
}
func (d *digest) checkSum() [Size]byte {
// Append 0x80 to the end of the message and then append zeros
// until the length is a multiple of 56 bytes. Finally append
// 8 bytes representing the message length in bits.
//
// 1 byte end marker :: 0-63 padding bytes :: 8 byte length
tmp := [1 + 63 + 8]byte{0x80}
pad := (55 - d.len) % 64 // calculate number of padding bytes
binary.LittleEndian.PutUint64(tmp[1+pad:], d.len<<3) // append length in bits
d.Write(tmp[:1+pad+8])
// The previous write ensures that a whole number of
// blocks (i.e. a multiple of 64 bytes) have been hashed.
if d.nx != 0 {
panic("d.nx != 0")
}
var digest [Size]byte
// finalialize
return digest
}
// Sum returns the MD5 checksum of the data.
func Sum(data []byte) [Size]byte {
var d digest
d.Reset()
d.Write(data)
return d.checkSum()
}
func (d *digest) Size() int { return Size }
func (d *digest) BlockSize() int { return BlockSize }
Rust
Rust官方实际上也是提供了标准实现的,这里依然来搬运一下标准的实现。
pub use digest::{self, Digest};
use core::{fmt, slice::from_ref};
use digest::{
block_buffer::Eager,
core_api::{
AlgorithmName, Block, BlockSizeUser, Buffer, BufferKindUser, CoreWrapper, FixedOutputCore,
OutputSizeUser, Reset, UpdateCore,
},
typenum::{Unsigned, U20, U64},
HashMarker, Output,
};
mod compress;
#[cfg(feature = "compress")]
pub use compress::compress;
#[cfg(not(feature = "compress"))]
use compress::compress;
const STATE_LEN: usize = 5;
/// Core SHA-1 hasher state.
#[derive(Clone)]
pub struct MerkleDamgardCore {
h: [u32; STATE_LEN],
block_len: u64,
}
impl HashMarker for MerkleDamgardCore {}
impl BlockSizeUser for MerkleDamgardCore {
type BlockSize = U64;
}
impl BufferKindUser for MerkleDamgardCore {
type BufferKind = Eager;
}
impl OutputSizeUser for MerkleDamgardCore {
type OutputSize = U20;
}
impl UpdateCore for MerkleDamgardCore {
#[inline]
fn update_blocks(&mut self, blocks: &[Block<Self>]) {
self.block_len += blocks.len() as u64;
compress(&mut self.h, blocks);
}
}
impl FixedOutputCore for MerkleDamgardCore {
#[inline]
fn finalize_fixed_core(&mut self, buffer: &mut Buffer<Self>, out: &mut Output<Self>) {
let bs = Self::BlockSize::U64;
let bit_len = 8 * (buffer.get_pos() as u64 + bs * self.block_len);
let mut h = self.h;
buffer.len64_padding_be(bit_len, |b| compress(&mut h, from_ref(b)));
for (chunk, v) in out.chunks_exact_mut(4).zip(h.iter()) {
chunk.copy_from_slice(&v.to_be_bytes());
}
}
}
impl Default for MerkleDamgardCore {
// init iv
#[inline]
fn default() -> Self {
Self {
h: [...],
block_len: 0,
}
}
}
impl Reset for MerkleDamgardCore {
#[inline]
fn reset(&mut self) {
*self = Default::default();
}
}
impl AlgorithmName for MerkleDamgardCore {
fn write_alg_name(f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.write_str("MerkleDamgardCore")
}
}
impl fmt::Debug for MerkleDamgardCore {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.write_str("MerkleDamgardCore { ... }")
}
}
/// SHA-1 hasher state.
pub type MerkleDamgardConstruction = CoreWrapper<MerkleDamgardCore>;
C
#include <stdlib.h>
#include <memory.h>
#define HASH_BLOCK_SIZE 64
#define HASH_SIZE 16
typedef struct {
uint32_t state[4];
uint64_t size;
uint8_t buffer[HASH_BLOCK_SIZE];
uint8_t digest[HASH_SIZE];
} MerkleDamgard_CTX;
void compress(MerkleDamgard_CTX *ctx, const uint8_t data[]) {
// compress function
}
void MerkleDamgardInit(MerkleDamgard_CTX *ctx) {
ctx->size = (uint64_t) 0;
// init iv
}
void MD5Update(MerkleDamgard_CTX *ctx, const uint8_t *data, size_t message_len) {
uint64_t offset = ctx->size % 64;
ctx->size += (uint64_t) message_len;
for (size_t i = 0; i < message_len; ++i) {
ctx->buffer[offset++] = data[i];
if (offset % 64 == 0) {
compress(ctx, ctx->buffer);
offset = 0;
}
}
}
void MD5Final(MerkleDamgard_CTX *ctx) {
uint64_t remain_len = ctx->size % HASH_BLOCK_SIZE;
uint64_t index = remain_len;
if (remain_len < 56) {
ctx->buffer[index++] = 0x80;
while (index < 56) {
ctx->buffer[index++] = 0x00;
}
} else if (remain_len >= 56) {
ctx->buffer[index++] = 0x80;
while (index < HASH_BLOCK_SIZE) {
ctx->buffer[index++] = 0x00;
}
compress(ctx, ctx->buffer);
memset(ctx->buffer, 0, 56);
}
// Append to the padding the total message's length in bits and transform.
uint64_t bit_len = ctx->size * 8;
// finalization function
compress(ctx, ctx->buffer);
// output
}
这里只给出一下C的实现了,C++就不写了,因为C的代码是可以运行在C++之上的,我对于C++的特性其实一点也不熟悉。
JavaScript
js同样借鉴一下大佬们的实现,开心的做一个代码的搬运工。
/**
* Abstract hasher template.
*
* @property {number} blockSize
*
* The number of 32-bit words this hasher operates on. Default: 16 (512 bits)
*/
export class Hasher extends BufferedBlockAlgorithm {
constructor(cfg) {
super();
this.blockSize = 512 / 32;
/**
* Configuration options.
*/
this.cfg = Object.assign(new Base(), cfg);
// Set initial values
this.reset();
}
/**
* Creates a shortcut function to a hasher's object interface.
*
* @param {Hasher} SubHasher The hasher to create a helper for.
*
* @return {Function} The shortcut function.
*
* @static
*
* @example
*
* var SHA256 = CryptoJS.lib.Hasher._createHelper(CryptoJS.algo.SHA256);
*/
static _createHelper(SubHasher) {
return (message, cfg) => new SubHasher(cfg).finalize(message);
}
/**
* Creates a shortcut function to the HMAC's object interface.
*
* @param {Hasher} SubHasher The hasher to use in this HMAC helper.
*
* @return {Function} The shortcut function.
*
* @static
*
* @example
*
* var HmacSHA256 = CryptoJS.lib.Hasher._createHmacHelper(CryptoJS.algo.SHA256);
*/
static _createHmacHelper(SubHasher) {
return (message, key) => new HMAC(SubHasher, key).finalize(message);
}
/**
* Resets this hasher to its initial state.
*
* @example
*
* hasher.reset();
*/
reset() {
// Reset data buffer
super.reset.call(this);
// Perform concrete-hasher logic
this._doReset();
}
/**
* Updates this hasher with a message.
*
* @param {WordArray|string} messageUpdate The message to append.
*
* @return {Hasher} This hasher.
*
* @example
*
* hasher.update('message');
* hasher.update(wordArray);
*/
update(messageUpdate) {
// Append
this._append(messageUpdate);
// Update the hash
this._process();
// Chainable
return this;
}
/**
* Finalizes the hash computation.
* Note that the finalize operation is effectively a destructive, read-once operation.
*
* @param {WordArray|string} messageUpdate (Optional) A final message update.
*
* @return {WordArray} The hash.
*
* @example
*
* var hash = hasher.finalize();
* var hash = hasher.finalize('message');
* var hash = hasher.finalize(wordArray);
*/
finalize(messageUpdate) {
// Final message update
if (messageUpdate) {
this._append(messageUpdate);
}
// Perform concrete-hasher logic
const hash = this._doFinalize();
return hash;
}
}
因为这个结构的特征,因此呢,如果能掌握好这个结构之后,后面去学习相关的哈希函数,这里就只需要去看一下对应的压缩函数就好了,其他代码实现基本上不太用变,直接就可以出来整个哈希函数,好了,本文到这里其实就结束了,感谢耐心读到这里的读者。
参考资料
-
密码学原理与实践(第三版) Cryptography Theory and Practice Third Edition [加] Dauglas R. Stinson -
https://justcryptography.com/merkle-damgard-construction/ -
https://github.com/RustCrypto/hashes -
https://go.dev/ -
https://github.com/B-Con/crypto-algorithms/blob/master/md5.c -
https://github.com/entronad/crypto-es/blob/master/lib/md5.js
原文始发于微信公众号(Coder小Q):【密码学】一文读懂Merkle-Damgård
- 左青龙
- 微信扫一扫
-
- 右白虎
- 微信扫一扫
-
评论