【密码学】一文读懂Merkle-Damgård

admin 2022年8月20日14:52:52评论167 views字数 13074阅读43分34秒阅读模式

一文读懂Merkle-Damgård

【密码学】一文读懂Merkle-Damgård

Merkle-Damgård构造是Ralph Merkle在1979年设计的,然后由Merkle和Ivan Damgårrd共同完成了结构正确性的证明,具体结构如下:

【密码学】一文读懂Merkle-Damgård

看到上面的结构,相信看过md5、sha1、sha256等算法的读者应该都不会陌生,因为这些算法除了f函数不太一样之外,整个算法的流程和上面的过程如出一辙,本文就来具体的聊一聊这个结构背后的故事。

预备知识

首先来回顾一下有关哈希函数的基础知识,本文的符号均参考自 「密码学原理与实践」一书。

哈希函数的定义

给定如下的三元组即可表示一个哈希函数,这里我们不考虑带密钥的哈希函数,因此对书中的内容有一部分的删改(不带密钥的哈希函数实际上是带密钥哈希函数的一个子集,为了简化,本文只考虑这一种)。

  1. 是所有的消息集合,一般来说,是指的任意的二进制消息
  2. 是所有的消息摘要构成的集合
  3. 存在一个哈希函数

哈希函数安全性

对于哈希函数的安全性,对于如下三种情况应该都是难解的。

抗原像攻击

  • 实例: 给定Hash函数 h:
  • 找出: s.t.

说人话,就是给定一个计算之后的哈希值,能够找到计算哈希值之前的消息。

抗第二原像攻击

  • 实例: 给定Hash函数 h:
  • 找出 s.t. ,并且

说人话,实际上就是能够找到一个不同于原来的消息,使得他们计算之后的摘要相同。

抗碰撞攻击

  • 实例: 给定Hash函数 h :
  • 找出: s.t. 并且

说人话,就是要我们找到两个不同的消息,使得他们计算之后的摘要相同。

迭代哈希函数

假设压缩函数 compress: ,基于这个压缩函数构造出来一个迭代压缩函数

迭代哈希函数的求值主要是分为如下的三步

0x1 预处理

给定任意一个输入的比特串消息x,其中 ,用一个公开的算法构造一个串y,s.t. ,记作:

0x2 处理

IV是一个初始的公开的比特串,计算过程如下:

0x3 输出变换

这一步实际上是可选的,我们可以直接输出 当结果,也可以对 再次做一个别的函数变换。

Merkle-Damgård结构

通过上面我们对于迭代哈希函数的讲解,我们很容易的可以得到Merkle-Damgård结构,其中我们只需要对前面预处理的过程做一点点的微调,我们对于预处理的方式采用如下的方式:

其中具体的padding方案,参考最上面的图,这里就不过多解释了,相信看过其他算法的读者应该对于这个padding的算法不会陌生,经过证明,如果compress是碰撞稳固的,那么h也是碰撞稳固的,反之亦然,对于具体的详细证明,这里就不给展开讲了,有兴趣的读者可以自行翻阅一下 「密码学原理与实践」一书,第四章的相关章节,里面给出了详细的证明过程,好了这块结构就到这里了,好了,到这里,我又水了这么多字,溜了溜了。

代码实现

因为这个结构并不是针对某种特定的哈希函数而来的,而是针对于一系列的哈希函数,因此呢,这里的代码我也只提供一个框架,在这个框架下面实现compress函数之后,其实就可以实现出来一大批的哈希函数了。

Python

这里还是先给一个Python的版本吧,因为之前写过了TigerHash,因此只要删掉compress的实现,这个结构实际上就出来了,溜了溜了。不过这里唯一需要注意的一点是大小端的问题,看一下对应的算法在提取消息的过程当中是大端还是小端,我之前没注意在实现的时候被坑了一把。

import io
import struct
from typing import List


def compress(state: List[int], block):
    pass


class MerkleDamgardConstruction:
    def __init__(self):
        # Initial digest variables
        self._h = [
        ]

        # bytes object with 0 <= len < 64 used to store the end of the message
        # if the message length is not congruent to 64
        self._unprocessed = b''
        # Length in bytes of all data that has been processed so far
        self._message_byte_length = 0
        self._digest = None
        self.block_size = 64

    def digest(self) -> bytes:
        self.final()
        return self._digest

    def hexdigest(self):
        """Produce the final hash value (big-endian) as a hex string"""
        return self.digest().hex()

    def update(self, message: bytes):
        if isinstance(message, (bytes, bytearray)):
            message = io.BytesIO(message)

        block = self._unprocessed + message.read(self.block_size - len(self._unprocessed))

        # Read the rest of the data, 64 bytes at a time
        while len(block) == self.block_size:
            compress(self._h, block)
            self._message_byte_length += self.block_size
            block = message.read(self.block_size)
        self._unprocessed = block
        return self

    def final(self):
        """Return finalized digest variables for the data processed so far."""
        # Pre-processing:
        message = self._unprocessed
        message_byte_length = self._message_byte_length + len(message)

        # append the bit '1' to the message
        message += b'x01'

        # append 0 <= k < 512 bits '0', so that the resulting message length (in bytes)
        # is congruent to 56 (mod 64)
        message += b'x00' * ((56 - (message_byte_length + 1) % self.block_size) % self.block_size)

        # append length of message (before pre-processing), in bits, as 64-bit big-endian integer
        message_bit_length = message_byte_length * 8
        message += struct.pack(b'<Q', message_bit_length)

        # Process the final chunk
        # At this point, the length of the message is either 64 or 128 bytes.
        compress(self._h, message[:self.block_size])
        if len(message) == self.block_size:
            self._digest = struct.pack(f"<{len(self._h)}Q", *self._h)
            return
        compress(self._h, message[self.block_size:])

        self._digest = struct.pack(f"<{len(self._h)}Q", *self._h)

Go

对于go语言,官方的实现其实就不错,这里我直接搬运下官方源码当中对于这个的实现了。我不生产代码,我只是代码的搬运工,这里同样的需要处理下字节序的问题。

import (
  "encoding/binary"
  "hash"
)

// Size The size of an MD5 checksum in bytes.
const Size = 16

// BlockSize The blocksize of MD5 in bytes.
const BlockSize = 64

// digest represents the partial evaluation of a checksum.
type digest struct {
  s   [4]uint32
  x   [BlockSize]byte
  nx  int
  len uint64
}

func (d *digest) Reset() {
  // init iv
  d.nx = 0
  d.len = 0
}

// New returns a new hash.Hash computing the MD5 checksum. The Hash also
// implements encoding.BinaryMarshaler and encoding.BinaryUnmarshaler to
// marshal and unmarshal the internal state of the hash.
func New() hash.Hash {
  d := new(digest)
  d.Reset()
  return d
}

func (d *digest) Write(p []byte) (nn int, err error) {
  // Note that we currently call block or blockGeneric
  // directly (guarded using haveAsm) because this allows
  // escape analysis to see that p and d don't escape.
  nn = len(p)
  d.len += uint64(nn)
  if d.nx > 0 {
    n := copy(d.x[d.nx:], p)
    d.nx += n
    if d.nx == BlockSize {
      compress(d, d.x[:])
      d.nx = 0
    }
    p = p[n:]
  }
  if len(p) >= BlockSize {
    n := len(p) &^ (BlockSize - 1)

    compress(d, p[:n])
    p = p[n:]
  }
  if len(p) > 0 {
    d.nx = copy(d.x[:], p)
  }
  return
}

func (d *digest) Sum(in []byte) []byte {
  // Make a copy of d so that caller can keep writing and summing.
  d0 := *d
  hash := d0.checkSum()
  return append(in, hash[:]...)
}

func (d *digest) checkSum() [Size]byte {
  // Append 0x80 to the end of the message and then append zeros
  // until the length is a multiple of 56 bytes. Finally append
  // 8 bytes representing the message length in bits.
  //
  // 1 byte end marker :: 0-63 padding bytes :: 8 byte length
  tmp := [1 + 63 + 8]byte{0x80}
  pad := (55 - d.len) % 64                             // calculate number of padding bytes
  binary.LittleEndian.PutUint64(tmp[1+pad:], d.len<<3// append length in bits
  d.Write(tmp[:1+pad+8])

  // The previous write ensures that a whole number of
  // blocks (i.e. a multiple of 64 bytes) have been hashed.
  if d.nx != 0 {
    panic("d.nx != 0")
  }

  var digest [Size]byte
  // finalialize
  return digest
}

// Sum returns the MD5 checksum of the data.
func Sum(data []byte) [Size]byte {
  var d digest
  d.Reset()
  d.Write(data)
  return d.checkSum()
}

func (d *digest) Size() int { return Size }

func (d *digest) BlockSize() int { return BlockSize }

Rust

Rust官方实际上也是提供了标准实现的,这里依然来搬运一下标准的实现。

pub use digest::{self, Digest};

use core::{fmt, slice::from_ref};
use digest::{
    block_buffer::Eager,
    core_api::{
        AlgorithmName, Block, BlockSizeUser, Buffer, BufferKindUser, CoreWrapper, FixedOutputCore,
        OutputSizeUser, Reset, UpdateCore,
    },
    typenum::{Unsigned, U20, U64},
    HashMarker, Output,
};

mod compress;

#[cfg(feature = "compress")]
pub use compress::compress;
#[cfg(not(feature = "compress"))]
use compress::compress;

const STATE_LEN: usize = 5;

/// Core SHA-1 hasher state.
#[derive(Clone)]
pub struct MerkleDamgardCore {
    h: [u32; STATE_LEN],
    block_len: u64,
}

impl HashMarker for MerkleDamgardCore {}

impl BlockSizeUser for MerkleDamgardCore {
    type BlockSize = U64;
}

impl BufferKindUser for MerkleDamgardCore {
    type BufferKind = Eager;
}

impl OutputSizeUser for MerkleDamgardCore {
    type OutputSize = U20;
}

impl UpdateCore for MerkleDamgardCore {
    #[inline]
    fn update_blocks(&mut self, blocks: &[Block<Self>]) {
        self.block_len += blocks.len() as u64;
        compress(&mut self.h, blocks);
    }
}

impl FixedOutputCore for MerkleDamgardCore {
    #[inline]
    fn finalize_fixed_core(&mut self, buffer: &mut Buffer<Self>, out: &mut Output<Self>) {
        let bs = Self::BlockSize::U64;
        let bit_len = 8 * (buffer.get_pos() as u64 + bs * self.block_len);

        let mut h = self.h;
        buffer.len64_padding_be(bit_len, |b| compress(&mut h, from_ref(b)));
        for (chunk, v) in out.chunks_exact_mut(4).zip(h.iter()) {
            chunk.copy_from_slice(&v.to_be_bytes());
        }
    }
}

impl Default for MerkleDamgardCore {
    // init iv
    #[inline]
    fn default() -> Self {
        Self {
            h: [...],
            block_len: 0,
        }
    }
}

impl Reset for MerkleDamgardCore {
    #[inline]
    fn reset(&mut self) {
        *self = Default::default();
    }
}

impl AlgorithmName for MerkleDamgardCore {
    fn write_alg_name(f: &mut fmt::Formatter<'_>) -> fmt::Result {
        f.write_str("MerkleDamgardCore")
    }
}

impl fmt::Debug for MerkleDamgardCore {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        f.write_str("MerkleDamgardCore { ... }")
    }
}

/// SHA-1 hasher state.
pub type MerkleDamgardConstruction = CoreWrapper<MerkleDamgardCore>;

C

#include <stdlib.h>
#include <memory.h>

#define HASH_BLOCK_SIZE 64
#define HASH_SIZE 16

typedef struct {
    uint32_t state[4];
    uint64_t size;
    uint8_t buffer[HASH_BLOCK_SIZE];
    uint8_t digest[HASH_SIZE];
} MerkleDamgard_CTX;

void compress(MerkleDamgard_CTX *ctx, const uint8_t data[]) {
    // compress function
}

void MerkleDamgardInit(MerkleDamgard_CTX *ctx) {
    ctx->size = (uint64_t0;
    // init iv
}

void MD5Update(MerkleDamgard_CTX *ctx, const uint8_t *data, size_t message_len) {
    uint64_t offset = ctx->size % 64;
    ctx->size += (uint64_t) message_len;

    for (size_t i = 0; i < message_len; ++i) {
        ctx->buffer[offset++] = data[i];
        if (offset % 64 == 0) {
            compress(ctx, ctx->buffer);
            offset = 0;
        }
    }
}

void MD5Final(MerkleDamgard_CTX *ctx) {
    uint64_t remain_len = ctx->size % HASH_BLOCK_SIZE;
    uint64_t index = remain_len;
    if (remain_len < 56) {
        ctx->buffer[index++] = 0x80;
        while (index < 56) {
            ctx->buffer[index++] = 0x00;
        }
    } else if (remain_len >= 56) {
        ctx->buffer[index++] = 0x80;
        while (index < HASH_BLOCK_SIZE) {
            ctx->buffer[index++] = 0x00;
        }
        compress(ctx, ctx->buffer);
        memset(ctx->buffer, 056);
    }

    // Append to the padding the total message's length in bits and transform.
    uint64_t bit_len = ctx->size * 8;
    // finalization function
    compress(ctx, ctx->buffer);

    // output
}

这里只给出一下C的实现了,C++就不写了,因为C的代码是可以运行在C++之上的,我对于C++的特性其实一点也不熟悉。

JavaScript

js同样借鉴一下大佬们的实现,开心的做一个代码的搬运工。

/**
 * Abstract hasher template.
 *
 * @property {number} blockSize
 *
 *     The number of 32-bit words this hasher operates on. Default: 16 (512 bits)
 */

export class Hasher extends BufferedBlockAlgorithm {
    constructor(cfg) {
        super();

        this.blockSize = 512 / 32;

        /**
         * Configuration options.
         */

        this.cfg = Object.assign(new Base(), cfg);

        // Set initial values
        this.reset();
    }

    /**
     * Creates a shortcut function to a hasher's object interface.
     *
     * @param {Hasher} SubHasher The hasher to create a helper for.
     *
     * @return {Function} The shortcut function.
     *
     * @static
     *
     * @example
     *
     *     var SHA256 = CryptoJS.lib.Hasher._createHelper(CryptoJS.algo.SHA256);
     */

    static _createHelper(SubHasher) {
        return (message, cfg) => new SubHasher(cfg).finalize(message);
    }

    /**
     * Creates a shortcut function to the HMAC's object interface.
     *
     * @param {Hasher} SubHasher The hasher to use in this HMAC helper.
     *
     * @return {Function} The shortcut function.
     *
     * @static
     *
     * @example
     *
     *     var HmacSHA256 = CryptoJS.lib.Hasher._createHmacHelper(CryptoJS.algo.SHA256);
     */

    static _createHmacHelper(SubHasher) {
        return (message, key) => new HMAC(SubHasher, key).finalize(message);
    }

    /**
     * Resets this hasher to its initial state.
     *
     * @example
     *
     *     hasher.reset();
     */

    reset() {
        // Reset data buffer
        super.reset.call(this);

        // Perform concrete-hasher logic
        this._doReset();
    }

    /**
     * Updates this hasher with a message.
     *
     * @param {WordArray|string} messageUpdate The message to append.
     *
     * @return {Hasher} This hasher.
     *
     * @example
     *
     *     hasher.update('message');
     *     hasher.update(wordArray);
     */

    update(messageUpdate) {
        // Append
        this._append(messageUpdate);

        // Update the hash
        this._process();

        // Chainable
        return this;
    }

    /**
     * Finalizes the hash computation.
     * Note that the finalize operation is effectively a destructive, read-once operation.
     *
     * @param {WordArray|string} messageUpdate (Optional) A final message update.
     *
     * @return {WordArray} The hash.
     *
     * @example
     *
     *     var hash = hasher.finalize();
     *     var hash = hasher.finalize('message');
     *     var hash = hasher.finalize(wordArray);
     */

    finalize(messageUpdate) {
        // Final message update
        if (messageUpdate) {
            this._append(messageUpdate);
        }

        // Perform concrete-hasher logic
        const hash = this._doFinalize();

        return hash;
    }
}

因为这个结构的特征,因此呢,如果能掌握好这个结构之后,后面去学习相关的哈希函数,这里就只需要去看一下对应的压缩函数就好了,其他代码实现基本上不太用变,直接就可以出来整个哈希函数,好了,本文到这里其实就结束了,感谢耐心读到这里的读者。

参考资料

  • 密码学原理与实践(第三版) Cryptography Theory and Practice Third Edition [加] Dauglas R. Stinson
  • https://justcryptography.com/merkle-damgard-construction/
  • https://github.com/RustCrypto/hashes
  • https://go.dev/
  • https://github.com/B-Con/crypto-algorithms/blob/master/md5.c
  • https://github.com/entronad/crypto-es/blob/master/lib/md5.js


原文始发于微信公众号(Coder小Q):【密码学】一文读懂Merkle-Damgård

  • 左青龙
  • 微信扫一扫
  • weinxin
  • 右白虎
  • 微信扫一扫
  • weinxin
admin
  • 本文由 发表于 2022年8月20日14:52:52
  • 转载请保留本文链接(CN-SEC中文网:感谢原作者辛苦付出):
                   【密码学】一文读懂Merkle-Damgårdhttps://cn-sec.com/archives/1244322.html

发表评论

匿名网友 填写信息