import random

# 1. 환경 설정
WORKERS = [f'Worker_{i}' for i in range(1, 11)] # 10명의 작업자
PROCESSES = [
    'Singulation_Picking', 
    'Multi_Picking', 
    'Rebin', 
    'Singulation_Packing', 
    'Multi_Packing'
]

# 공정별 남은 작업량 (단위: 건수)
WORKLOAD = {
    'Singulation_Picking': 1000,
    'Multi_Picking': 1500,
    'Rebin': 1500,           # Multi_Picking 물량이 여기로 들어옴
    'Singulation_Packing': 1000,
    'Multi_Packing': 1500
}

# 작업자별 숙련도 랜덤 생성 (실무에서는 과거 데이터를 기반으로 함)
random.seed(42)
SKILLS = {w: {p: round(random.uniform(0.7, 1.5), 2) for p in PROCESSES} for w in WORKERS}

# 2. 적합도 함수: 전체 공정 마감 시간(Makespan)의 역수
def get_fitness(individual):
    # individual: [공정1, 공정2, ..., 공정10] (각 작업자의 할당 공정)
    capacity = {p: 0 for p in PROCESSES}
    
    # 1. 각 공정의 총 처리 능력 계산
    for i, worker_name in enumerate(WORKERS):
        assigned_process = individual[i]
        capacity[assigned_process] += SKILLS[worker_name][assigned_process]
    
    # 2. 각 공정별 소요 시간 계산
    completion_times = []
    for p in PROCESSES:
        if capacity[p] > 0:
            # 실무에서는 Rebin이 끝나야 Multi_Packing이 가능하지만, 
            # 여기서는 단순화를 위해 각 공정의 독립적 완료 시간을 계산합니다.
            completion_times.append(WORKLOAD[p] / capacity[p])
        else:
            # 필수 공정에 인원이 없으면 아주 큰 페널티 부여
            completion_times.append(1e9) 
            
    # 전체 공정 중 가장 늦게 끝나는 시간이 병목(Bottleneck)
    makespan = max(completion_times)
    return 1 / makespan

# 3. 유전 알고리즘 메커니즘
def create_individual():
    return [random.choice(PROCESSES) for _ in range(len(WORKERS))]

def crossover(p1, p2):
    point = random.randint(1, len(WORKERS)-1)
    return p1[:point] + p2[point:]

def mutate(ind):
    if random.random() < 0.1:
        ind[random.randint(0, len(WORKERS)-1)] = random.choice(PROCESSES)
    return ind

# 4. 진화 실행
pop_size = 50
population = [create_individual() for _ in range(pop_size)]

for gen in range(100):
    # 성적순 정렬
    population = sorted(population, key=lambda x: get_fitness(x), reverse=True)
    
    # 다음 세대 생성 (엘리트주의 적용)
    next_gen = population[:5] 
    while len(next_gen) < pop_size:
        p1, p2 = random.sample(population[:20], 2)
        child = mutate(crossover(p1, p2))
        next_gen.append(child)
    population = next_gen

# 5. 최종 결과 출력
best = population[0]
print("--- [최적 인력 배치 결과] ---")
allocation = {p: [] for p in PROCESSES}
for i, worker in enumerate(WORKERS):
    allocation[best[i]].append(worker)

for p, workers in allocation.items():
    print(f"{p:20}: {', '.join(workers)} (인원: {len(workers)}명)")

makespan = 1 / get_fitness(best)
print(f"--- 예상 전체 마감 시간: {makespan:.2f}분 ---")

 

조금 더 고도화시

import random

# 1. 환경 설정 및 작업량
PROCESSES = ['Singulation_Picking', 'Multi_Picking', 'Rebin', 'Singulation_Packing', 'Multi_Packing']
WORKERS = [f'Worker_{i}' for i in range(1, 13)] # 12명의 작업자

# 현재 공정별 남은 작업량 (실시간 모니터링 데이터)
current_workload = {
    'Singulation_Picking': 800,
    'Multi_Picking': 1200,
    'Rebin': 300,            # 현재 Rebin 대기 물량
    'Singulation_Packing': 400,
    'Multi_Packing': 200     # 현재 Packing 대기 물량
}

# 작업자 숙련도 (고정 데이터)
random.seed(42)
SKILLS = {w: {p: round(random.uniform(0.8, 1.6), 2) for p in PROCESSES} for w in WORKERS}

# 2. 선후 관계가 반영된 적합도 함수 (Core Logic)
def get_fitness(individual):
    cap = {p: 0 for p in PROCESSES}
    for i, worker_name in enumerate(WORKERS):
        cap[individual[i]] += SKILLS[worker_name][individual[i]]
    
    # 각 공정별 순수 처리 시간
    t = {p: current_workload[p] / cap[p] if cap[p] > 0 else 1e6 for p in PROCESSES}
    
    # 선후 관계 반영 (누적 시간 계산)
    # 1. Singulation: Picking -> Packing
    time_singulation = t['Singulation_Picking'] + t['Singulation_Packing']
    
    # 2. Multi: Picking -> Rebin -> Packing (앞 공정이 끝나야 뒷 공정 병목 해소)
    # 실무적으로는 파이프라인 구조이므로, 가장 느린 공정이 전체 속도를 지배함
    time_multi = t['Multi_Picking'] + t['Rebin'] + t['Multi_Packing']
    
    # 전체 센터가 모든 주문을 처리하고 마감하는 시간 (Makespan)
    total_max_time = max(time_singulation, time_multi)
    
    return 1 / total_max_time

# 3. 유전 알고리즘 함수
def create_individual():
    return [random.choice(PROCESSES) for _ in range(len(WORKERS))]

def evolve(current_allocation=None):
    pop_size = 50
    # 현재 배치가 있다면 초기 집단에 포함시켜 '개선'을 유도
    population = [create_individual() for _ in range(pop_size)]
    if current_allocation:
        population[0] = current_allocation 

    for gen in range(100):
        population = sorted(population, key=lambda x: get_fitness(x), reverse=True)
        next_gen = population[:5]
        while len(next_gen) < pop_size:
            p1, p2 = random.sample(population[:20], 2)
            # Crossover
            point = random.randint(1, len(WORKERS)-1)
            child = p1[:point] + p2[point:]
            # Mutation
            if random.random() < 0.1:
                child[random.randint(0, len(WORKERS)-1)] = random.choice(PROCESSES)
            next_gen.append(child)
        population = next_gen
    return population[0]

# 4. 실행: 현재 상태 분석 및 재배치 제안
print("--- [Step 1: 현재 운영 상태 분석] ---")
# 임의의 현재 배치 (비효율적인 상태 가정)
current_status = ['Singulation_Picking']*6 + ['Multi_Picking']*6 
current_time = 1 / get_fitness(current_status)
print(f"현재 예상 마감 시간: {current_time:.2f}분")

print("\n--- [Step 2: 최적 재배치 알고리즘 가동...] ---")
best_reassignment = evolve(current_status)

# 5. 결과 시각화 및 SQL 저장용 데이터 구조
print("\n--- [Step 3: 최종 재배치 지시서] ---")
new_allocation = {p: [] for p in PROCESSES}
sql_data = [] # DB에 저장할 결과값

for i, worker in enumerate(WORKERS):
    proc = best_reassignment[i]
    new_allocation[proc].append(worker)
    sql_data.append((worker, proc))

for p, workers in new_allocation.items():
    print(f"▶ {p:20}: {len(workers)}명 ({', '.join(workers)})")

new_time = 1 / get_fitness(best_reassignment)
print(f"\n최적화 후 예상 마감 시간: {new_time:.2f}분 (약 {current_time - new_time:.1f}분 단축)")

+ Recent posts