在本科的时候学习完了BFS,就想到可以用它来求解华容道的最短路径求解。当时是用C语言编写的,为了简化状态表示,还使用整数来表示每个状态。 因为C语言当时没有标准库,所以还是手工实现的Queue, Hashmap等一些辅助数据结构。这几天忽然又想到这个问题,所以用Python重新实现了一遍。

1. 状态表示


class State:
    def __init__(self, matrix, xy=None):
        if not isinstance(matrix, np.ndarray):
            matrix = np.array(matrix)
        self.matrix = matrix
        self.nm = matrix.shape
        self.xy = xy
        self.str_cache = None
        self.id_cache = None
        if xy is None:
            self.xy = self.find_zero()

    def find_zero(self):
        matrix = self.matrix
        n, m = self.nm
        for i in range(n):
            for j in range(m):
                if matrix[i][j] == 0:
                    return i, j

    def next_states(self):
        matrix = self.matrix
        x, y = self.xy
        n, m = self.nm
        states = []
        for dx, dy in ((0, 1), (0, -1), (1, 0), (-1, 0)):
            x2, y2 = x + dx, y + dy
            if 0 <= x2 < n and 0 <= y2 < m:
                matrix2 = np.copy(matrix)
                matrix2[x2][y2], matrix2[x][y] = matrix2[x][y], matrix2[x2][y2]
                state2 = State(matrix2, (x2, y2))
        return states

    def __str__(self):
        return self.to_string()

    def is_equal(self, other):
        return self.xy == other.xy and self.identity() == other.identity()

    def identity(self):
        return self.matrix.tobytes()

    def to_string(self):
        if self.str_cache is not None:
            return self.str_cache
        self.str_cache = str(self.matrix)
        return self.str_cache

2. 状态记录

每个状态会产生多个新状态,在BFS的时候需要判断这些状态之前是否看到过,所以我们需要有个对象进行状态记录。 另外每个状态最好可以用一个整数来对应,这样在BFS inqueue的时候就只需要放入整数就好。

class StateBK:
    def __init__(self):
        self.map = {}
        self.seq = []

    def get_index(self, st: State):
        if st.identity() in self.map:
            return self.map[st.identity()]
        index = len(self.seq)
        self.map[st.identity()] = index
        return index

    def query_index(self, st: State):
        return st.identity() in self.map

    def get_state(self, index):
        return self.seq[index]

3. naive BFS

下面这个算法是通过BFS进行搜索。这个算法有个问题是,如果路径很长的话,整个树需要展开很多层, 会涉及到许多状态的探索,时间就会非常长。最坏的情况是,如果没有路径的话,那么需要遍历所有状态。

# NOTE(yan): naive BFS
def search_path_1(source: State, dest: State):
    bk = StateBK()
    parents = {}
    Q = deque()

    idx = bk.get_index(source)
    parents[idx] = -1

    paths = []
    found = False
    while len(Q):
        idx = Q.popleft()
        state = bk.get_state(idx)
        if state.is_equal(dest):
            found = True
        next_states = state.next_states()
        for st in next_states:
            if bk.query_index(st):
            idx2 = bk.get_index(st)
            parents[idx2] = idx

    if found:
        idx = bk.get_index(dest)
        while idx != -1:
            idx = parents[idx]
        paths = paths[::-1]
    return paths

4. bidirectional BFS


如果最短路径的长度是20的话,因为每个状态会展开成为4个状态,那么最多会展开 4 ^ 20个状态(当然考虑到部分状态之前访问过,以及fanout没有这么大, 所以实际情况不会有这么多,但是大约是这个量级)。

但是如果是双向搜索的话,那么每个方向只需要搜索长度10的路径,那么最多会展开2 * (4 ^ 10)个状态,这个数量比之前的少很多。如果存在路径的话, 那么这种双向BFS会节省很多时间。

# NOTE(yan): bidirectional BFS
def search_path_2(source: State, dest: State):
    bk = [StateBK(), StateBK()]
    parents = [{}, {}]
    dists = [{}, {}]
    Q = [deque(), deque()]

    idx = bk[0].get_index(source)
    parents[0][idx] = -1
    dists[0][idx] = 0
    Q[0].append((idx, 0))

    idx = bk[1].get_index(dest)
    parents[1][idx] = -1
    dists[1][idx] = 0
    Q[1].append((idx, 0))

    depth = -1
    found = False

    # distance, pidx0, pidx1, direction
    opt = (1 << 30, None, None, 0)

    while True:
        depth += 1
        for i in range(2):
            while len(Q[i]):
                idx, d = Q[i].popleft()
                if d != depth:
                    Q[i].append((idx, d))

                state = bk[i].get_state(idx)
                if bk[1 - i].query_index(state):
                    pidx0 = idx
                    pidx1 = bk[1 - i].get_index(state)
                    dist = dists[i][pidx0] + dists[1 - i][pidx1]
                    if dist < opt[0]:
                        # print('min dist = {}, i = {}'.format(dist, i))
                        opt = (dist, pidx0, pidx1, i)
                        found = True

                next_states = state.next_states()
                for st in next_states:
                    if bk[i].query_index(st):
                    idx2 = bk[i].get_index(st)
                    parents[i][idx2] = idx
                    dists[i][idx2] = d + 1
                    Q[i].append((idx2, d + 1))
            if found: break
        if found or not len(Q[0]) or not len(Q[1]):

    if not found:
        return []

    dist, pidx0, pidx1, i = opt
    paths0 = []
    while pidx0 != -1:
        pidx0 = parents[i][pidx0]

    paths1 = []
    while pidx1 != -1:
        paths1.append(bk[1 - i].get_state(pidx1))
        pidx1 = parents[1 - i][pidx1]

    assert len(paths0) > 0
    assert len(paths1) > 0
    paths = paths0[::-1] + paths1[1:]
    if i: paths = paths[::-1]
    return paths

5. 速度对比

def main():
    source_matrix = [
        [0, 1, 2],
        [3, 4, 5],
        [6, 7, 8]
    dest_matrix = [
        [1, 2, 3],
        [4, 5, 6],
        [7, 8, 0]
    source = State(source_matrix)
    dest = State(dest_matrix)

    start = time.time()
    paths1 = search_path_1(source, dest)
    print('naive BFS ...')
    # print_paths(paths1)
    print('size = {}'.format(len(paths1)))
    end = time.time()
    print('timer = {}'.format(end - start))

    start = time.time()
    paths2 = search_path_2(source, dest)
    print('bidirectional BFS ...')
    # print_paths(paths2)
    print('size = {}'.format(len(paths2)))
    end = time.time()
    print('timer = {}'.format(end - start))

运行下来速度差别还是蛮大的,方法1是1.492s, 方法2是0.026s, 时间缩短了差不多98%.

➜  misc git:(master) ✗ python klotski.py
naive BFS ...
size = 23
timer = 1.4918239116668701
bidirectional BFS ...
size = 23
timer = 0.026241064071655273

6. UPDATE@202003

今天重新把这题目拿出来看看,我在网上找到了两个比较复杂的例子,测试了一下两者的时间差距。 这两个例子是在 这里 找到的。

def test_case(source_matrix):
    print('source_matrix = {}'.format(source_matrix))
    dest_matrix = [
        [1, 2, 3],
        [4, 5, 6],
        [7, 8, 0]
    source = State(source_matrix)
    dest = State(dest_matrix)

    start = time.time()
    paths1 = search_path_1(source, dest)
    print('naive BFS ...')
    # print_paths(paths1)
    print('size = {}'.format(len(paths1)))
    end = time.time()
    print('timer = {}'.format(end - start))

    start = time.time()
    paths2 = search_path_2(source, dest)
    print('bidirectional BFS ...')
    # print_paths(paths2)
    print('size = {}'.format(len(paths2)))
    end = time.time()
    print('timer = {}'.format(end - start))

def main():
    # simple one
    source_matrix = [[0, 1, 2], [3, 4, 5], [6, 7, 8]]

    # http://w01fe.com/blog/2009/01/the-hardest-eight-puzzle-instances-take-31-moves-to-solve/
    # hard one.
    source_matrix = [[8, 6, 7], [2, 5, 4], [3, 0, 1]]

    source_matrix = [[6,4,7], [8,5,0],[3,2,1]]


➜  misc git:(master) ✗ python3 klotski.py
source_matrix = [[0, 1, 2], [3, 4, 5], [6, 7, 8]]
naive BFS ...
size = 23
timer = 2.9338197708129883
bidirectional BFS ...
size = 23
timer = 0.060230255126953125
source_matrix = [[8, 6, 7], [2, 5, 4], [3, 0, 1]]
naive BFS ...
size = 32
timer = 6.167062997817993
bidirectional BFS ...
size = 32
timer = 0.5203478336334229
source_matrix = [[6, 4, 7], [8, 5, 0], [3, 2, 1]]
naive BFS ...
size = 32
timer = 6.113824844360352
bidirectional BFS ...
size = 32
timer = 0.5001683235168457
