import random
# 1. 환경 설정
WORKERS = [f'Worker_{i}' for i in range(1, 11)] # 10명의 작업자
PROCESSES = [
'Singulation_Picking',
'Multi_Picking',
'Rebin',
'Singulation_Packing',
'Multi_Packing'
]
# 공정별 남은 작업량 (단위: 건수)
WORKLOAD = {
'Singulation_Picking': 1000,
'Multi_Picking': 1500,
'Rebin': 1500, # Multi_Picking 물량이 여기로 들어옴
'Singulation_Packing': 1000,
'Multi_Packing': 1500
}
# 작업자별 숙련도 랜덤 생성 (실무에서는 과거 데이터를 기반으로 함)
random.seed(42)
SKILLS = {w: {p: round(random.uniform(0.7, 1.5), 2) for p in PROCESSES} for w in WORKERS}
# 2. 적합도 함수: 전체 공정 마감 시간(Makespan)의 역수
def get_fitness(individual):
# individual: [공정1, 공정2, ..., 공정10] (각 작업자의 할당 공정)
capacity = {p: 0 for p in PROCESSES}
# 1. 각 공정의 총 처리 능력 계산
for i, worker_name in enumerate(WORKERS):
assigned_process = individual[i]
capacity[assigned_process] += SKILLS[worker_name][assigned_process]
# 2. 각 공정별 소요 시간 계산
completion_times = []
for p in PROCESSES:
if capacity[p] > 0:
# 실무에서는 Rebin이 끝나야 Multi_Packing이 가능하지만,
# 여기서는 단순화를 위해 각 공정의 독립적 완료 시간을 계산합니다.
completion_times.append(WORKLOAD[p] / capacity[p])
else:
# 필수 공정에 인원이 없으면 아주 큰 페널티 부여
completion_times.append(1e9)
# 전체 공정 중 가장 늦게 끝나는 시간이 병목(Bottleneck)
makespan = max(completion_times)
return 1 / makespan
# 3. 유전 알고리즘 메커니즘
def create_individual():
return [random.choice(PROCESSES) for _ in range(len(WORKERS))]
def crossover(p1, p2):
point = random.randint(1, len(WORKERS)-1)
return p1[:point] + p2[point:]
def mutate(ind):
if random.random() < 0.1:
ind[random.randint(0, len(WORKERS)-1)] = random.choice(PROCESSES)
return ind
# 4. 진화 실행
pop_size = 50
population = [create_individual() for _ in range(pop_size)]
for gen in range(100):
# 성적순 정렬
population = sorted(population, key=lambda x: get_fitness(x), reverse=True)
# 다음 세대 생성 (엘리트주의 적용)
next_gen = population[:5]
while len(next_gen) < pop_size:
p1, p2 = random.sample(population[:20], 2)
child = mutate(crossover(p1, p2))
next_gen.append(child)
population = next_gen
# 5. 최종 결과 출력
best = population[0]
print("--- [최적 인력 배치 결과] ---")
allocation = {p: [] for p in PROCESSES}
for i, worker in enumerate(WORKERS):
allocation[best[i]].append(worker)
for p, workers in allocation.items():
print(f"{p:20}: {', '.join(workers)} (인원: {len(workers)}명)")
makespan = 1 / get_fitness(best)
print(f"--- 예상 전체 마감 시간: {makespan:.2f}분 ---")
조금 더 고도화시
import random
# 1. 환경 설정 및 작업량
PROCESSES = ['Singulation_Picking', 'Multi_Picking', 'Rebin', 'Singulation_Packing', 'Multi_Packing']
WORKERS = [f'Worker_{i}' for i in range(1, 13)] # 12명의 작업자
# 현재 공정별 남은 작업량 (실시간 모니터링 데이터)
current_workload = {
'Singulation_Picking': 800,
'Multi_Picking': 1200,
'Rebin': 300, # 현재 Rebin 대기 물량
'Singulation_Packing': 400,
'Multi_Packing': 200 # 현재 Packing 대기 물량
}
# 작업자 숙련도 (고정 데이터)
random.seed(42)
SKILLS = {w: {p: round(random.uniform(0.8, 1.6), 2) for p in PROCESSES} for w in WORKERS}
# 2. 선후 관계가 반영된 적합도 함수 (Core Logic)
def get_fitness(individual):
cap = {p: 0 for p in PROCESSES}
for i, worker_name in enumerate(WORKERS):
cap[individual[i]] += SKILLS[worker_name][individual[i]]
# 각 공정별 순수 처리 시간
t = {p: current_workload[p] / cap[p] if cap[p] > 0 else 1e6 for p in PROCESSES}
# 선후 관계 반영 (누적 시간 계산)
# 1. Singulation: Picking -> Packing
time_singulation = t['Singulation_Picking'] + t['Singulation_Packing']
# 2. Multi: Picking -> Rebin -> Packing (앞 공정이 끝나야 뒷 공정 병목 해소)
# 실무적으로는 파이프라인 구조이므로, 가장 느린 공정이 전체 속도를 지배함
time_multi = t['Multi_Picking'] + t['Rebin'] + t['Multi_Packing']
# 전체 센터가 모든 주문을 처리하고 마감하는 시간 (Makespan)
total_max_time = max(time_singulation, time_multi)
return 1 / total_max_time
# 3. 유전 알고리즘 함수
def create_individual():
return [random.choice(PROCESSES) for _ in range(len(WORKERS))]
def evolve(current_allocation=None):
pop_size = 50
# 현재 배치가 있다면 초기 집단에 포함시켜 '개선'을 유도
population = [create_individual() for _ in range(pop_size)]
if current_allocation:
population[0] = current_allocation
for gen in range(100):
population = sorted(population, key=lambda x: get_fitness(x), reverse=True)
next_gen = population[:5]
while len(next_gen) < pop_size:
p1, p2 = random.sample(population[:20], 2)
# Crossover
point = random.randint(1, len(WORKERS)-1)
child = p1[:point] + p2[point:]
# Mutation
if random.random() < 0.1:
child[random.randint(0, len(WORKERS)-1)] = random.choice(PROCESSES)
next_gen.append(child)
population = next_gen
return population[0]
# 4. 실행: 현재 상태 분석 및 재배치 제안
print("--- [Step 1: 현재 운영 상태 분석] ---")
# 임의의 현재 배치 (비효율적인 상태 가정)
current_status = ['Singulation_Picking']*6 + ['Multi_Picking']*6
current_time = 1 / get_fitness(current_status)
print(f"현재 예상 마감 시간: {current_time:.2f}분")
print("\n--- [Step 2: 최적 재배치 알고리즘 가동...] ---")
best_reassignment = evolve(current_status)
# 5. 결과 시각화 및 SQL 저장용 데이터 구조
print("\n--- [Step 3: 최종 재배치 지시서] ---")
new_allocation = {p: [] for p in PROCESSES}
sql_data = [] # DB에 저장할 결과값
for i, worker in enumerate(WORKERS):
proc = best_reassignment[i]
new_allocation[proc].append(worker)
sql_data.append((worker, proc))
for p, workers in new_allocation.items():
print(f"▶ {p:20}: {len(workers)}명 ({', '.join(workers)})")
new_time = 1 / get_fitness(best_reassignment)
print(f"\n최적화 후 예상 마감 시간: {new_time:.2f}분 (약 {current_time - new_time:.1f}분 단축)")'코딩관련 개인 학습 및 기록 > 실무활용' 카테고리의 다른 글
| 재고배치 최적화(Slotting Optimization)를 유전 알고리즘으로 구현 (0) | 2026.01.11 |
|---|---|
| 쉐어포인트에 csv 파일 저장 (0) | 2026.01.10 |
| 상품명 분류하기 (tf-idf, truncatedSVD, minibatch_Kmeans, Normalizer, silhouette) (1) | 2026.01.09 |