kyrie
kyrie
发布于 2025-11-20 / 6 阅读
0
0

GA 优化 无人机任务分配 + 航线

数据: 10 个目标点, 1 个基站,最大续航 20 分钟,风场为方向惩罚。

目标:最小化 \frac{总飞行时间}{风险惩罚} ;越界罚 \times10

编码:[序列 |分隔符】(分隔符切分多段表示补给返回),或“指派向量 + 局部 2-opt "。

算子:OX交叉 + 2-opt /交换变异;精英保留 1-2 个。

可视化:

。地图叠加路径(每 10 代更新一次);

。曲线;代数-总时间;代数-多样性指数。

思考题:风向改变后,需要如何调参( p_m精英数2-opt 强度 )?

代码

import numpy as np
import matplotlib.pyplot as plt

# ========= 中文正常显示 =========
plt.rcParams['font.sans-serif'] = ['SimHei']  # 用来正常显示中文标签
plt.rcParams['axes.unicode_minus'] = False    # 用来正常显示负号

# ===================== 1. 问题设置:10 个目标 + 1 个基站 =====================
np.random.seed(0)

num_targets = 10
num_points  = num_targets + 1  # + 基站
MAX_TRIPS   = 2                # 最多 2 段出航(1 个分隔符)
SEP         = 0                # 分隔符基因,用 0 表示

# 基站设在中心 (50, 50),目标点随机撒在 [0, 100] × [0, 100]
base = np.array([[50.0, 50.0]])
targets = np.random.rand(num_targets, 2) * 100
points = np.vstack([base, targets])   # points[0] 是基站,1..10 是目标点

# ---------- 风场参数:统一风向 + 方向惩罚 ----------
# 设一个大致向右上角吹的风
wind_dir = np.array([1.0, 0.5], dtype=float)
wind_dir = wind_dir / np.linalg.norm(wind_dir)
alpha = 0.3   # 风对时间的影响强度:0 表示无风

# ---------- 风险场:离中心越远越危险 ----------
center = np.array([50.0, 50.0], dtype=float)
max_radius = np.linalg.norm(np.array([0.0, 0.0], dtype=float) - center)
gamma = 0.5   # 风险程度系数:0.5 → 最危险区域的风险因子约为 0.5

# ---------- 续航约束 ----------
time_per_unit = 0.04        # 每 1 个距离单位耗时 0.04 分钟
endurance_minutes = 20.0    # 最大续航时间
boundary_penalty_factor = 10.0  # 越界罚 10x(作用在时间上)


# ========== 染色体解码 & 评价 ==========
def decode_trips(ind, sep=SEP):
    """
    将染色体 decode 为若干段出航序列。
    例如 ind = [3,8,5,7,0,2,1,9,10,4,6]
    返回 [[3,8,5,7], [2,1,9,10,4,6]]
    """
    trips = []
    cur = []
    for g in ind:
        if g == sep:
            if cur:
                trips.append(cur)
                cur = []
        else:
            cur.append(int(g))
    if cur:
        trips.append(cur)
    return trips


def compute_segment_metrics(perm):
    """
    计算单段出航(Base -> perm -> Base)的指标:
    - raw_time:真实飞行时间(考虑风,但不含罚)
    - eff_time:用于优化的“有效时间”(若 >20 分钟则乘 10)
    - risk_factor:风险因子 (0,1],越小越危险
    """
    path_idx = np.concatenate([[0], perm, [0]])
    coords = points[path_idx]

    total_dist = 0.0
    raw_time = 0.0
    risk_factors = []

    for i in range(len(coords) - 1):
        p = coords[i]
        q = coords[i + 1]
        seg_vec = q - p
        dist = float(np.linalg.norm(seg_vec))
        if dist == 0.0:
            continue

        total_dist += dist

        # ----- 风场方向惩罚 -----
        d_unit = seg_vec / dist
        cos_theta = float(np.dot(d_unit, wind_dir))
        cos_theta = max(-1.0, min(1.0, cos_theta))
        # cos_theta = 1 顺风;cos_theta = -1 逆风
        wind_factor = 1.0 + alpha * (1.0 - cos_theta)
        seg_time = dist * time_per_unit * wind_factor
        raw_time += seg_time

        # ----- 风险:离中心越远越危险 -----
        mid = 0.5 * (p + q)
        r = float(np.linalg.norm(mid - center) / max_radius)
        r = max(0.0, min(1.0, r))
        seg_risk = 1.0 - gamma * r  # ∈ [1-gamma,1]
        risk_factors.append(seg_risk)

    if risk_factors:
        risk_factor = sum(risk_factors) / len(risk_factors)
    else:
        risk_factor = 1.0

    # 超过续航时间 → 时间乘以 10
    if raw_time > endurance_minutes:
        eff_time = raw_time * boundary_penalty_factor
    else:
        eff_time = raw_time

    return total_dist, raw_time, eff_time, risk_factor


def compute_route_metrics_multi(ind):
    """
    计算多段出航的总指标:
    返回:
    - total_dist: 总距离
    - total_raw_time: 不含罚的总时间(分钟)
    - global_risk: 平均风险因子
    - objective: 目标值 = 总有效时间 / global_risk
    """
    trips = decode_trips(ind)
    if not trips:
        return 0.0, 0.0, 1.0, 1e9

    total_dist = 0.0
    total_raw_time = 0.0
    total_eff_time = 0.0
    risk_sum = 0.0
    seg_cnt = 0

    for seg in trips:
        perm = np.array(seg, dtype=int)
        dist, raw_t, eff_t, risk = compute_segment_metrics(perm)
        total_dist += dist
        total_raw_time += raw_t
        total_eff_time += eff_t
        risk_sum += risk
        seg_cnt += 1

    global_risk = risk_sum / seg_cnt if seg_cnt > 0 else 1.0
    objective = total_eff_time / global_risk

    return total_dist, total_raw_time, global_risk, objective


# ===================== 2. GA 算法([序列|分隔符] + OX 交叉 + 交换变异) =====================
pop_size = 80
n_generations = 200
pc = 0.9   # 交叉概率
pm = 0.2   # 变异概率


def init_population():
    """
    染色体基因为 1..num_targets 再加 (MAX_TRIPS-1) 个分隔符 0,
    然后做一次全排列。
    MAX_TRIPS=2 时只有一个分隔符,对 OX 交叉友好(基因不重复)。
    """
    genes = np.concatenate([
        np.arange(1, num_points, dtype=int),
        np.array([SEP] * (MAX_TRIPS - 1), dtype=int)
    ])
    pop = []
    for _ in range(pop_size):
        perm = np.random.permutation(genes)
        pop.append(perm)
    return np.array(pop, dtype=int)


def fitness(ind):
    """
    适应度:目标值越小越好,所以取其倒数。
    """
    _, _, _, obj = compute_route_metrics_multi(ind)
    return 1.0 / (obj + 1e-6)


def tournament_selection(pop, k=3):
    idx = np.random.randint(0, len(pop), size=k)
    cand = pop[idx]
    fit = np.array([fitness(ind) for ind in cand])
    return cand[fit.argmax()].copy()


def ordered_crossover(p1, p2):
    """
    OX 交叉:在 [序列|分隔符] 编码上也可用(前提是基因不重复)。
    """
    n = len(p1)
    c1, c2 = np.sort(np.random.choice(n, 2, replace=False))
    child = -np.ones(n, dtype=int)

    # 中段来自 p1
    child[c1:c2 + 1] = p1[c1:c2 + 1]

    # 剩余位置按顺序从 p2 填充
    rest = [x for x in p2 if x not in child]
    j = 0
    for i in range(n):
        if child[i] == -1:
            child[i] = rest[j]
            j += 1

    return child


def swap_mutation(ind):
    """
    简单交换变异:随机交换两个位置的基因(包括可能交换分隔符)。
    """
    a, b = np.random.choice(len(ind), 2, replace=False)
    ind[a], ind[b] = ind[b], ind[a]


def population_diversity(pop):
    """
    多样性指数:所有个体两两平均汉明距离 / 染色体长度 ∈ [0,1]
    """
    pop_arr = np.array(pop)
    n, L = pop_arr.shape
    if n < 2:
        return 0.0
    dist_sum = 0
    cnt = 0
    for i in range(n):
        for j in range(i + 1, n):
            dist_sum += np.sum(pop_arr[i] != pop_arr[j])
            cnt += 1
    return dist_sum / (cnt * L)


# ===================== 3. 主循环:记录用于可视化的数据 =====================
pop = init_population()

best_times = []      # 每代最优的“总飞行时间(未罚)”
best_objs  = []      # 每代最优目标值(含罚)
diversities = []     # 每代多样性
snapshots = {}       # 每 10 代保存一条最优解,画地图用

# 👉 一定要在这里初始化全局最优
global_best_obj  = float("inf")
global_best_time = None
global_best_dist = None
global_best_risk = None
global_best_ind  = None
global_best_gen  = None

for gen in range(n_generations):
    metrics = [compute_route_metrics_multi(ind) for ind in pop]
    times = np.array([m[1] for m in metrics])  # total_raw_time
    objs  = np.array([m[3] for m in metrics])  # objective

    best_idx = objs.argmin()
    best_times.append(times[best_idx])
    best_objs.append(objs[best_idx])
    diversities.append(population_diversity(pop))

    # ---- 更新全局最优解(跨所有世代)----
    if objs[best_idx] < global_best_obj:
        global_best_obj  = objs[best_idx]
        global_best_time = times[best_idx]
        global_best_dist = metrics[best_idx][0]   # total_dist
        global_best_risk = metrics[best_idx][2]   # global_risk
        global_best_ind  = pop[best_idx].copy()
        global_best_gen  = gen

    if gen % 10 == 0:
        snapshots[gen] = pop[best_idx].copy()

    # ---- 精英保留 1 个 ----
    elite = pop[best_idx].copy()

    # ---- 选择 + 交叉 + 变异 ----
    new_pop = [elite]   # 把精英先塞进去
    while len(new_pop) < pop_size:
        p1 = tournament_selection(pop)
        p2 = tournament_selection(pop)

        if np.random.rand() < pc:
            c1 = ordered_crossover(p1, p2)
            c2 = ordered_crossover(p2, p1)
        else:
            c1, c2 = p1.copy(), p2.copy()

        if np.random.rand() < pm:
            swap_mutation(c1)
        if np.random.rand() < pm:
            swap_mutation(c2)

        new_pop.append(c1)
        if len(new_pop) < pop_size:
            new_pop.append(c2)

    pop = np.array(new_pop, dtype=int)

# ===================== 打印更具体的计算信息 =====================
print("======== 全局最优解统计结果 ========")
print(f"出现世代: {global_best_gen}")
print(f"最优染色体编码: {global_best_ind}")
print(f"出航分段(decode_trips): {decode_trips(global_best_ind)}")
print(f"总飞行距离: {global_best_dist:.3f}")
print(f"总飞行时间(未罚): {global_best_time:.3f} 分钟")
print(f"平均风险因子: {global_best_risk:.3f}")
print(f"最终最优目标值: {global_best_obj:.3f}")

print("\n---- 各段出航详细信息 ----")
trips = decode_trips(global_best_ind)

sum_seg_dist = 0.0
sum_seg_time = 0.0

for i, seg in enumerate(trips, start=1):
    seg_list = [int(x) for x in seg]          # 纯 Python int,用来打印
    seg_arr  = np.array(seg_list, dtype=int)  # Numpy 数组,用来计算

    seg_dist, seg_raw_t, seg_eff_t, seg_risk = compute_segment_metrics(seg_arr)
    penalized = "是" if seg_eff_t > seg_raw_t + 1e-9 else "否"

    sum_seg_dist += seg_dist
    sum_seg_time += seg_raw_t

    print(f"第 {i} 段: 访问顺序 {seg_list}")
    print(f"    本段总距离 = {seg_dist:.3f}")
    print(f"    本段原始时间 = {seg_raw_t:.3f} 分钟")
    print(f"    本段有效时间(含超续航罚) = {seg_eff_t:.3f} 分钟, 是否触发10x罚: {penalized}")
    print(f"    本段平均风险因子 = {seg_risk:.3f}")

    # ================== 这里是你要的“子段细节” ==================
    path_idx = np.concatenate([[0], seg_arr, [0]])   # Base -> ... -> Base 的索引序列
    coords = points[path_idx]

    print("    子段明细:")
    for j in range(len(coords) - 1):
        p_idx = path_idx[j]
        q_idx = path_idx[j + 1]
        p = coords[j]
        q = coords[j + 1]

        vec = q - p
        dist = float(np.linalg.norm(vec))
        if dist == 0.0:
            sub_time = 0.0
            sub_risk = 0.0
        else:
            # 和 compute_segment_metrics 完全一致的时间 / 风险计算
            d_unit = vec / dist
            cos_theta = float(np.dot(d_unit, wind_dir))
            cos_theta = max(-1.0, min(1.0, cos_theta))
            wind_factor = 1.0 + alpha * (1.0 - cos_theta)
            sub_time = dist * time_per_unit * wind_factor

            mid = 0.5 * (p + q)
            r = float(np.linalg.norm(mid - center) / max_radius)
            r = max(0.0, min(1.0, r))
            sub_risk = 1.0 - gamma * r

        from_label = "Base" if p_idx == 0 else str(p_idx)
        to_label   = "Base" if q_idx == 0 else str(q_idx)
        print(f"        {from_label} -> {to_label}: 距离 = {dist:.3f}, 时间 = {sub_time:.3f} 分钟, 风险因子 = {sub_risk:.3f}")
    # ==========================================================

    print(f"    当前累计距离 = {sum_seg_dist:.3f},当前累计时间 = {sum_seg_time:.3f} 分钟\n")

print("---- 按段累加的汇总值(用于和全局结果对比) ----")
print(f"分段距离之和 = {sum_seg_dist:.3f},全局总飞行距离 = {global_best_dist:.3f}")
print(f"分段时间之和 = {sum_seg_time:.3f},全局总飞行时间 = {global_best_time:.3f} 分钟")

# ===================== 4. 可视化 =====================

# 4.1 地图叠加路径(每 10 代一条)
gens = sorted(snapshots.keys())
colors = plt.cm.viridis(np.linspace(0, 1, len(gens)))

fig, ax = plt.subplots(figsize=(6, 6))

# 基站和目标点
base_scatter = ax.scatter(points[0, 0], points[0, 1], marker='s', s=120)
targets_scatter = ax.scatter(points[1:, 0], points[1:, 1])

# ==== 在这里给每个点加编号 ====
for i, (x, y) in enumerate(points):
    if i == 0:
        label = "B"        # 或者写成 "0"、"Base" 都行
    else:
        label = str(i)     # 目标点 1..10
    ax.text(x + 1, y + 1,   # 略微偏移一点,避免挡住点
            label,
            fontsize=9,
            color="black",
            ha="left", va="bottom",
            zorder=5)
# =================================

# 每 10 代一条路径(可能包含多段出航)
for gen, color in zip(gens, colors):
    ind = snapshots[gen]
    trips = decode_trips(ind)
    for seg in trips:
        path_idx = np.concatenate([[0], seg, [0]])
        ax.plot(points[path_idx, 0],
                points[path_idx, 1],
                color=color,
                alpha=0.5)

ax.set_title("地图叠加路径(每 10 代一条)")

# 图例只显示“基站 / 目标”
ax.legend([base_scatter, targets_scatter],
          ["基站 Base", "目标点 Targets"],
          loc="upper left")

# 用 colorbar 表示是哪一代的路径
sm = plt.cm.ScalarMappable(cmap=plt.cm.viridis,
                           norm=plt.Normalize(vmin=min(gens), vmax=max(gens)))
sm.set_array([])
cbar = plt.colorbar(sm, ax=ax)
cbar.set_label("世代编号(每 10 代一条)")

plt.tight_layout()

# 4.2 代数-总飞行时间(未罚)
plt.figure()
plt.plot(best_times)
plt.xlabel("世代")
plt.ylabel("最优总时间 / 分钟")
plt.title("代数-总时间")

# 4.3 代数-多样性指数
plt.figure() 
plt.plot(diversities)
plt.xlabel("世代")
plt.ylabel("多样性指数")
plt.title("代数-多样性指数")

plt.show()
======== 全局最优解统计结果 ========
出现世代: 45
最优染色体编码: [ 3  1 10  7  4  9  8  5  6  0  2]
出航分段(decode_trips): [[3, 1, 10, 7, 4, 9, 8, 5, 6], [2]]
总飞行距离: 378.846
总飞行时间(未罚): 19.700 分钟
平均风险因子: 0.869
最终最优目标值: 22.667

---- 各段出航详细信息 ----
第 1 段: 访问顺序 [3, 1, 10, 7, 4, 9, 8, 5, 6]
    本段总距离 = 356.419
    本段原始时间 = 18.534 分钟
    本段有效时间(含超续航罚) = 18.534 分钟, 是否触发10x罚: 否
    本段平均风险因子 = 0.778
    子段明细:
        Base -> 3: 距离 = 16.466, 时间 = 0.860 分钟, 风险因子 = 0.942
        3 -> 1: 距离 = 14.306, 时间 = 0.572 分钟, 风险因子 = 0.872
        1 -> 10: 距离 = 27.671, 时间 = 1.110 分钟, 风险因子 = 0.763
        10 -> 7: 距离 = 21.734, 时间 = 1.326 分钟, 风险因子 = 0.693
        7 -> 4: 距离 = 13.477, 时间 = 0.859 分钟, 风险因子 = 0.711
        4 -> 9: 距离 = 42.154, 时间 = 2.672 分钟, 风险因子 = 0.680
        9 -> 8: 距离 = 74.722, 时间 = 4.231 分钟, 风险因子 = 0.677
        8 -> 5: 距离 = 94.052, 时间 = 3.774 分钟, 风险因子 = 0.812
        5 -> 6: 距离 = 22.521, 时间 = 1.278 分钟, 风险因子 = 0.731
        6 -> Base: 距离 = 29.315, 时间 = 1.853 分钟, 风险因子 = 0.896
    当前累计距离 = 356.419,当前累计时间 = 18.534 分钟

第 2 段: 访问顺序 [2]
    本段总距离 = 22.427
    本段原始时间 = 1.166 分钟
    本段有效时间(含超续航罚) = 1.166 分钟, 是否触发10x罚: 否
    本段平均风险因子 = 0.960
    子段明细:
        Base -> 2: 距离 = 11.214, 时间 = 0.449 分钟, 风险因子 = 0.960
        2 -> Base: 距离 = 11.214, 时间 = 0.717 分钟, 风险因子 = 0.960
    当前累计距离 = 378.846,当前累计时间 = 19.700 分钟

---- 按段累加的汇总值(用于和全局结果对比) ----
分段距离之和 = 378.846,全局总飞行距离 = 378.846
分段时间之和 = 19.700,全局总飞行时间 = 19.700 分钟

可视化

思考题

  1. 提高变异率( P_m

    • 风向一变,原来“顺风段”可能变成“逆风段”,旧解质量大幅下降。

    • 适当 提高变异概率、或在若干代里加大变异幅度,让种群跳出旧的路径模式,多尝试新的航线组合。

  2. 暂时减少精英数

    • 之前的精英个体是在旧风向下得到的,很可能已经“过时”。

    • 减少精英保留(比如由 2 个减到 0–1 个),避免旧精英长期占据大量选择概率,妨碍种群向新环境适应。

  3. 先减弱,后加强 2-opt 强度

    • 2-opt 是局部搜索 / 局部优化算子,强度越大,当前解越容易被“磨”进一个局部最优。

    • 风向刚变时:先把 2-opt 做得弱一些,让 GA 先多探索新路径;

    • 若干代之后:种群已经在新风向下收敛到一个区域,再把 2-opt 强度调回去(甚至略微增强),精细打磨航线。


评论