[오픈 소스 공부] babyagi 에 대해 알아보자
"Generative Agents: Interactive Simulacra of Human Behavior" 이라는 논문을 너무 재밌게 읽었고, (곧 리뷰하겠다) 구현해보고 싶다는 생각에 구글링을 하다가 langchain 이라는 오픈 소스를 찾았다. (곧 리뷰하겠다) langchain의 use cases 를 훑어보던 중 babyagi와 autogpt (너무 유명한) 프로젝트를 알게 되었는데, 두 프로젝트는 "autonomous agent" 라는 공통적인 목적을 갖고 있다.
autonomous agent에 대한 개념은 langchain 문서에 정리되어 있는 것을 빌려 말해보면, chatgpt와 같이 우리의 요구 사항에 대한 solution을 반환하는 어떤 주체를 agent 라고 하자. autonomous agent는 이러한 agent가 문제에 대한 좀 더 장기적인 목적 의식을 갖고 해당 문제를 풀기 위해 독립적으로 실행되는 형태를 의미한다. 이러한 형태의 예시는 아래와 같다.
"A에 대한 문제를 풀기 위해서는 B를 해야 한다. 근데 B를 하려면 C가 이뤄져야 한다.
이에 따라 A에 대한 문제를 풀고 싶어하는 것 같은데 우선 C부터 하고 C가 이뤄지면 그 다음 B에 대해 고민해봐라"
"oracle(인간 User) 이 A에 대해 알아보라고 한다. 위키피디아 서치로 A에 대해 알아볼 수 있으니,
해당 tool을 통해 알아본 후 알려줘야 겠다"
뭔가 내부적으로 oracle이 낸 문제에 대해 더 나은 답을 내리기 위한 고민을 하는 메커니즘이 포함된 것을 볼 수 있다. 내가 조금 공부를 하면서 느낀 것은 chatgpt는 엄청나게 두꺼운 사전이 있고, 내가 질문한 것에 대해 가장 적절한 페이지를 indexing해서 답을 내놓는 것이라면, autonomous agent는 일단 바탕이 chatgpt인데, 해당 문제를 더 잘 풀 수 있는 도구나 방법에 대해 생각해보고 반환하는 형태라고 생각할 수 있다.
도구라면 예시에서 말한 것처럼 구글 서치와 같은 형태가 될 수 있고 (LLM과 google search를 연결, 이걸 chain 이라고 하고 이걸 langchain에서 지원한다!) 방법이라면, 특정 문제에 대해 존재할 수 있는 다양한 방법에 대해 리스트 업을 하고, 이를 중요도나 우선 순위 별로 리스트 업 한 후, 각 방법을 달성하기 위한 방법을 또 알아보는 재귀적인 방법이 있을 수 있다.
아직 잘 알지 못하지만, 조금 알아보면서, 느끼고 배운 것을 써봤고 autonomous agent 에 대한 오픈 소스 프로젝트 중 하나인 babyagi 에 대해 알아본다.
babyagi
babyagi는 autonomous agent는 위와 같은 메커니즘으로 구현을 해뒀는데, 위 메커니즘을 소스 코드를 보면서 정리해보려 한다. 위 이미지를 보고 각 단계에서 해야 하는 것을 풀어서 요약하는 것으로 시작해보겠다.
아래의 설명에서 agent는 문제를 푸는 것을 도와주는 LLM이라고 생각하면 된다. (ex. chatgpt)
- Step 1 : Pull the first incomplete task : 풀어야 하는 문제를 정의하고, 이를 풀기 위한 방법에 대해 알아본다.
- Execution Agent : agent 한테 궁극적으로 풀려고 하는 문제의 목적(objective)와 이를 위해 달성해야 하는 과제 (task)를 부여, 이에 대한 solution을 받는다.
- Step 2 : Enrich result and store in Vector DB : Step 1에서 진행하고자 하는 해결책을 보조하는 역할을 하며, 이로 부터 나온 해결책을 기억해둔다.
- Context Agent : 최종적인 목적을 달성하기 위해, 과제들을 푸는 과정에서 agent가 달성해왔던 문제들을 agent에게 알려주는 역할을 한다. (ex. 우리가 이러한 문제를 풀어왔는데, 이 문제는 어떻게 푸는거야?)
- Step 3 : Create new tasks and reprioritize task list : Step 1~2에서 푼 과제의 해결책(result)를 받고, 해당 해결책 실행하기 위해 해결해야 하는 과제들을 정의한다.
- Task Creation Agent : Step 1~2을 통해 어떤 해결책을 받고, 해당 해결책을 행하기 위해 (execute) 해야 할 과제 항목들을 제시받는다.
- Prioritization Agent : Task Creation Agent 에서 받은 과제 항목(Task List) 들을 우선 순위로 정렬하여 다시 받는다.
babyagi 는 oracle이 제시하는 문제를 풀기 위해 문제와 문제에 대한 해결책을 재귀적인 구조로 탐구해가면서, 최종적으로 문제를 풀기 위한 해상도 높은 해결책을 제시한다. 깃헙에서 받을 수 있는 babyagi 프로젝트 상에서 해당 agent는 구글 서치와 같은 tool은 사용할 수 없으며, 이를 위해서는 langchain 과의 호환이 필요하다.
이제 코드 레벨에서 살펴보자!
앞으로 살펴볼 모든 코드는 오픈 소스의 babyagi.py 에 있는 것들이다. 참고로 사용하는 모델은 openai 의 gpt-3.5-turbo를 가정한다.
코드 상에서 180번째 줄까지는 환경 설정에 대한 것이다. 어떤 모델을 사용할 것이고, vectorstore는 무엇을 사용할 것인지 등등... 이는 생략한다.
VectorStore
첫 번째로 볼 것은 vectorstore에 대해서이다. embedding vector들을 database에 넣어두고, string이나 또 다른 embedding vector를 query로 사용해, 유사한 string 또는 embedding vector를 받기 위함이다. babyagi는 내부적으로 pinecone, weaviate, chromadb를 지원하며 디폴트는 chromadb이다. 본 포스팅에서는 chromadb 사용에 대해서만 다룬다.
chromadb는 add 함수로 embedding vector 와 prompt를 메타데이터와 함께 insert 할 수 있으며, query 함수로 저장된 데이터에서 vector search 를 할 수 있다. 아래 모듈은 chromadb가 지원하는 이런 기능들을 embedding model 선택과 함께, wrapping 해둔 형태이다.
어떤 데이터를 저장할 것이며 (add 함수 사용) 어떤 데이터와 유사한 vector들을 불러올 것인지 (query 함수 사용) 에 대해서는 아래에서 계속 알아본다.
마지막으로 코드 상에서 해당 모듈은 results_storage 라는 이름의 객체로 할당된다.
# Results storage using local ChromaDB
class DefaultResultsStorage:
def __init__(self):
logging.getLogger('chromadb').setLevel(logging.ERROR)
# Create Chroma collection
chroma_persist_dir = "chroma"
chroma_client = chromadb.Client(
settings=chromadb.config.Settings(
chroma_db_impl="duckdb+parquet",
persist_directory=chroma_persist_dir,
)
)
metric = "cosine"
if LLM_MODEL.startswith("llama"):
embedding_function = LlamaEmbeddingFunction()
else:
embedding_function = OpenAIEmbeddingFunction(api_key=OPENAI_API_KEY)
self.collection = chroma_client.get_or_create_collection(
name=RESULTS_STORE_NAME,
metadata={"hnsw:space": metric},
embedding_function=embedding_function,
)
def add(self, task: Dict, result: str, result_id: str):
# Break the function if LLM_MODEL starts with "human" (case-insensitive)
if LLM_MODEL.startswith("human"):
return
# Continue with the rest of the function
embeddings = llm_embed.embed(result) if LLM_MODEL.startswith("llama") else None
if (
len(self.collection.get(ids=[result_id], include=[])["ids"]) > 0
): # Check if the result already exists
self.collection.update(
ids=result_id,
embeddings=embeddings,
documents=result,
metadatas={"task": task["task_name"], "result": result},
)
else:
self.collection.add(
ids=result_id,
embeddings=embeddings,
documents=result,
metadatas={"task": task["task_name"], "result": result},
)
def query(self, query: str, top_results_num: int) -> List[dict]:
count: int = self.collection.count()
if count == 0:
return []
results = self.collection.query(
query_texts=query,
n_results=min(top_results_num, count),
include=["metadatas"]
)
# results 의 예시 : {'ids': [['result_1']],
# 'embeddings': None,
# 'documents': None,
# 'metadatas': [[{'task': 'Develop a task list',
# 'result': 'blahblah}]],
# 'distances': None}
return [item["task"] for item in results["metadatas"][0]]
# 우리가 context agent로부터 얻게 되는 것은 task로부터의 result가 아닌 task 자체이다.
results_storage = DefaultResultsStorage()
Task
위에서 짧게 얘기한 바와 같이, babyagi는 특정 목적 (objective)을 달성하기 위해 풀어야 하는 과제들을(task list) 나열 및 정렬하고, 그 과제들을 풀 수 있는 과제들을 알아가는 과정을 거친다. 이를 위해 과제들 (task list)을 모아두는 모듈이 필요하며, 이는 아래와 같다.
# Task storage supporting only a single instance of BabyAGI
class SingleTaskListStorage:
def __init__(self):
self.tasks = deque([])
self.task_id_counter = 0
def append(self, task: Dict):
self.tasks.append(task)
def replace(self, tasks: List[Dict]):
self.tasks = deque(tasks)
def popleft(self):
return self.tasks.popleft()
def is_empty(self):
return False if self.tasks else True
def next_task_id(self):
self.task_id_counter += 1
return self.task_id_counter
def get_task_names(self):
return [t["task_name"] for t in self.tasks]
Agent
우리가 사용할 모델은 "gpt-3.5-turbo" (chatgpt-based) 이다. 이는 아래 openai_call 함수의 else 에 해당하는 openai.ChatCompletion.create() 함수를 사용하게 된다. 입출력의 형태는 string 타입의 prompt가 들어가면, string 타입의response 가 나온다.
gpt 기반 모델 사용할 경우, tiktoken 모듈을 사용하여 string 타입의 prompt를 token으로 변환, 최대 길이를 맞춰 입력으로 넣어줄 수 있다.
!) 이때, 모델의 입력으로 [{"role": "system", "content": trimmed_prompt}] 가 들어가는데, 이 때 "system" 이라는 역할이 부여된다. 요즘 chatgpt 와 같은 agent에게 역할을 부여해줄 수 있는데, 이러한 형태가 내부적으로 어떻게 작동하기에 가능한 것인지 확인해봐야 한다.
def limit_tokens_from_string(string: str, model: str, limit: int) -> str:
"""Limits the string to a number of tokens (estimated)."""
try:
encoding = tiktoken.encoding_for_model(model)
except:
encoding = tiktoken.encoding_for_model('gpt2') # Fallback for others.
encoded = encoding.encode(string)
return encoding.decode(encoded[:limit])
def openai_call(
prompt: str,
model: str = LLM_MODEL,
temperature: float = OPENAI_TEMPERATURE,
max_tokens: int = 100,
):
while True:
try:
if model.lower().startswith("llama"):
result = llm(prompt[:CTX_MAX], stop=["### Human"], echo=False, temperature=0.2)
return str(result['choices'][0]['text'].strip())
elif model.lower().startswith("human"):
return user_input_await(prompt)
elif not model.lower().startswith("gpt-"):
# Use completion API
response = openai.Completion.create(
engine=model,
prompt=prompt,
temperature=temperature,
max_tokens=max_tokens,
top_p=1,
frequency_penalty=0,
presence_penalty=0,
)
return response.choices[0].text.strip()
else:
# Use 4000 instead of the real limit (4097) to give a bit of wiggle room for the encoding of roles.
# TODO: different limits for different models.
trimmed_prompt = limit_tokens_from_string(prompt, model, 4000 - max_tokens)
# Use chat completion API
messages = [{"role": "system", "content": trimmed_prompt}]
response = openai.ChatCompletion.create(
model=model,
messages=messages,
temperature=temperature,
max_tokens=max_tokens,
n=1,
stop=None,
)
return response.choices[0].message.content.strip()
except openai.error.RateLimitError:
...
Context Agent
제일 처음 알아본 vectorstore에 대한 객체인 results_storage가 보인다. 그리고 query 함수를 사용하는 것을 보니, 입력으로 들어온 string 타입의 쿼리와 유사한 string 을 top_results_num 개를 리스트 타입으로 불러오는 것으로 보인다.
results_storage에는 completed task, 즉 이미 우리가 풀려고 한 과제 (task)와 이에 대한 해결책들이(result) 저장된다. 그리고 result_storage.query() 함수의 결과는 result가 아닌 task이다.
context_agent의 인자 중 query는 뒤에 나올 execution_agent에서 확인할 수 있 듯, 상수값인 OBJECTIVE이다. 즉, 처음 user가 부여한 값이기에, 프로세스가 진행되어도 바뀌지 않는다. 이는 user intention의 consistency를 보장하려는 것으로 보인다.
줄이면, context_agent는 유저가 제일 처음 한 질문과 유사한 과제 (task)들을 반환하여 이에 대한 결과를 안정적(consistency)이고 풍성(enrich)하게 만들어 준다.
# Get the top n completed tasks for the objective
def context_agent(query: str, top_results_num: int):
"""
Retrieves context for a given query from an index of tasks.
Args:
query (str): The query or objective for retrieving context.
top_results_num (int): The number of top results to retrieve.
Returns:
list: A list of tasks as context for the given query, sorted by relevance.
"""
results = results_storage.query(query=query, top_results_num=top_results_num)
print("The # of data in result_storage : ", results_storage.collection.count())
# print("***** RESULTS *****")
# print(results)
return results
Execution Agent
prompt engineering 을 해주는 영역으로 보인다. 해당 agent는 아래 요소들을 Prompt에 넣어 agent에게 질문한다.
- 우리의 궁극적 목적
- 풀어왔던 과제들
- 풀어야 하는 과제
가장 처음 질문을 할 때는 위 항목의 2번은 context가 빈 리스트일 것이기 때문에 없을 것이고 (if context), 3번은 사전에 정의된 "Define the task list" 가 된다.
prompt 예시는 아래 코드의 제일 밑에 주석으로 써뒀다.
# Execute a task based on the objective and five previous tasks
def execution_agent(objective: str, task: str) -> str:
"""
Executes a task based on the given objective and previous context.
Args:
objective (str): The objective or goal for the AI to perform the task.
task (str): The task to be executed by the AI.
Returns:
str: The response generated by the AI for the given task.
"""
context = context_agent(query=objective, top_results_num=5)
# print("\n*******RELEVANT CONTEXT******\n")
# print(context)
# print('')
prompt = f'Perform one task based on the following objective: {objective}.\n'
if context:
prompt += 'Take into account these previously completed tasks:' + '\n'.join(context)\
prompt += f'\nYour task: {task}\nResponse:'
return openai_call(prompt, max_tokens=2000), prompt
# 가장 처음 질문할 때 prompt 예시
# *****PROMPT FOR GETTING TASK RESULT*****
# Perform one task based on the following objective: Solve world hunger.
# Your task: Develop a task list
# Response:
# 두 번째 질문할 때 prompt 예시
# *****PROMPT FOR GETTING TASK RESULT*****
# Perform one task based on the following objective: Solve world hunger.
# Take into account these previously completed tasks:Develop a task list
# Your task: Research and identify regions and populations most affected by hunger
# Response:
Task Creation Agent
해당 부분 또한 prompt engineering 을 해주는 영역으로 보인다. 여기서 담는 정보는 아래와 같다.
- 궁극적으로 풀려고 하는 문제 (objective)
- 직전에 푼 과제(task)와 이에 대한 해결책 (result) -> execution agent가 제시
- task queue에 남아있는 미해결 과제 (incompleted task)
정리하면 "우리가 풀려는 objective는 이렇고, 최근 푼 문제와 이에 따른 해결책은 이렇다. 그리고 아직 우리가 못 푼 문제는 이렇다. 어쨋든 우리가 지금 이렇게 해오고 있는데, 같은 목적을 달성하기 위한 과제들을 또 제시해달라" 이다. 즉, Define the task list인데 그동안 부딪혀온 task list를 completed/incompleted 를 prompt 상에서 나눠서 memory의 형태로 전달한 것이다.
def task_creation_agent(
objective: str, result: Dict, task_description: str, task_list: List[str]
):
prompt = f"""
You are to use the result from an execution agent to create new tasks with the following objective: {objective}.
The last completed task has the result: \n{result["data"]}
This result was based on this task description: {task_description}.\n"""
if task_list:
prompt += f"These are incomplete tasks: {', '.join(task_list)}\n"
prompt += "Based on the result, create a list of new tasks to be completed in order to meet the objective. "
if task_list:
prompt += "These new tasks must not overlap with incomplete tasks. "
prompt += """
Return all the new tasks, with one task per line in your response. The result must be a numbered list in the format:
#. First task
#. Second task
The number of each entry must be followed by a period.
Do not include any headers before your numbered list. Do not follow your numbered list with any other output."""
print(f'\n************** TASK CREATION AGENT PROMPT *************\n{prompt}\n')
response = openai_call(prompt, max_tokens=2000)
print(f'\n************* TASK CREATION AGENT RESPONSE ************\n{response}\n')
new_tasks = response.split('\n')
new_tasks_list = []
for task_string in new_tasks:
task_parts = task_string.strip().split(".", 1)
if len(task_parts) == 2:
task_id = ''.join(s for s in task_parts[0] if s.isnumeric())
task_name = re.sub(r'[^\w\s_]+', '', task_parts[1]).strip()
if task_name.strip() and task_id.isnumeric():
new_tasks_list.append(task_name)
# print('New task created: ' + task_name)
out = [{"task_name": task_name} for task_name in new_tasks_list]
return out
Prioritization Agent
개인적으로는 babyagi에서 제일 신기했던 부분이다. 직전의 task_creation agent에서 task list를 받으면 그것을 우선 순위로 정렬해서 다시 받는 것이다. 그렇게 하는 이유는 전반적 mechanism 을 다루는 main.py 에서 보려 한다.
def prioritization_agent():
task_names = tasks_storage.get_task_names()
next_task_id = tasks_storage.next_task_id()
prompt = f"""
You are tasked with cleaning the format and re-prioritizing the following tasks: {', '.join(task_names)}.
Consider the ultimate objective of your team: {OBJECTIVE}.
Tasks should be sorted from highest to lowest priority.
Higher-priority tasks are those that act as pre-requisites or are more essential for meeting the objective.
Do not remove any tasks. Return the result as a numbered list in the format:
#. First task
#. Second task
The entries are consecutively numbered, starting with 1. The number of each entry must be followed by a period.
Do not include any headers before your numbered list. Do not follow your numbered list with any other output."""
print(f'\n************** TASK PRIORITIZATION AGENT PROMPT *************\n{prompt}\n')
response = openai_call(prompt, max_tokens=2000)
print(f'\n************* TASK PRIORITIZATION AGENT RESPONSE ************\n{response}\n')
new_tasks = response.split("\n") if "\n" in response else [response]
new_tasks_list = []
for task_string in new_tasks:
task_parts = task_string.strip().split(".", 1)
if len(task_parts) == 2:
task_id = ''.join(s for s in task_parts[0] if s.isnumeric())
task_name = re.sub(r'[^\w\s_]+', '', task_parts[1]).strip()
if task_name.strip():
new_tasks_list.append({"task_id": task_id, "task_name": task_name})
tasks_storage.replace(new_tasks_list)
Main
우선 prioritization_agent의 목적에 대해 보자.
# Step 1: Pull the first incomplete task
task = tasks_storage.popleft()
...
# 71번째 줄
# task_creation_agent가 만든 task들은 모두 task queue 안에 들어간다.
print('Adding new tasks to task_storage')
for new_task in new_tasks:
new_task.update({"task_id": tasks_storage.next_task_id()})
print(str(new_task))
tasks_storage.append(new_task)
# 이후 sorted by prioritization
if not JOIN_EXISTING_OBJECTIVE: prioritization_agent()
위 코드를 보면, tasks_storage에 가장 첫번째 element를 가져온다. 그리고 이는 prioritization_agent에서 얘기한 가장 우선 순위가 높은 task가 된다.
코드를 보면 while True로 반복문이 계속 도는데, 이러한 반복문은 계속해서 task queue에 in-completed task list를 누적시킬 것이다. 그리고 새로운 task list는 제시된 task list에서 가장 높은 우선 순위를 가진 task 에 의해서 생성된다. in-completed task와 중복되지 않는 new task list를 반환하라고 했기 때문에, 점점 new task list가 줄어들 것으로 보이긴 한다. (하지만 이를 위해서는 꽤 많은 iteration이 필요해 보이기에 적당한 반복 횟수를 정해야 할 것이다.)
아무튼, 이러한 반복 작업들은 최종 목적인 objective를 달성하기 위해 우리가 할 수 있는 일들을 나열하고, 그 일들이 새로운 일들을 계속 만들어나간다. 그리고 최종적으로는 이러한 task list를 추리고 정렬해서 반환하는 형태가 될 수 있을 것으로 보인다.
# Initialize tasks storage
tasks_storage = SingleTaskListStorage()
if COOPERATIVE_MODE in ['l', 'local']:
if can_import("extensions.ray_tasks"):
import sys
from pathlib import Path
sys.path.append(str(Path(__file__).resolve().parent))
from extensions.ray_tasks import CooperativeTaskListStorage
tasks_storage = CooperativeTaskListStorage(OBJECTIVE)
print("\nReplacing tasks storage: " + "\033[93m\033[1m" + "Ray" + "\033[0m\033[0m")
elif COOPERATIVE_MODE in ['d', 'distributed']:
pass
# Add the initial task if starting new objective
if not JOIN_EXISTING_OBJECTIVE:
initial_task = {
"task_id": tasks_storage.next_task_id(),
"task_name": INITIAL_TASK
}
tasks_storage.append(initial_task)
def main():
loop = True
while loop:
# As long as there are tasks in the storage...
if not tasks_storage.is_empty():
# Print the task list
print("\033[95m\033[1m" + "\n*****TASK LIST*****\n" + "\033[0m\033[0m")
for t in tasks_storage.get_task_names():
print(" • " + str(t))
# Step 1: Pull the first incomplete task
task = tasks_storage.popleft()
print("\033[92m\033[1m" + "\n*****NEXT TASK*****\n" + "\033[0m\033[0m")
print(str(task["task_name"]))
# Send to execution function to complete the task based on the context
result, prompt = execution_agent(OBJECTIVE, str(task["task_name"]))
print("\033[91m\033[1m" + "\n*****PROMPT FOR GETTING TASK RESULT*****\n" + "\033[0m\033[0m")
print(prompt)
print("\033[93m\033[1m" + "\n*****TASK RESULT*****\n" + "\033[0m\033[0m")
print(result)
# Step 2: Enrich result and store in the results storage
# This is where you should enrich the result if needed
enriched_result = {
"data": result
}
# extract the actual result from the dictionary
# since we don't do enrichment currently
# vector = enriched_result["data"]
result_id = f"result_{task['task_id']}"
results_storage.add(task, result, result_id)
# Step 3: Create new tasks and re-prioritize task list
# only the main instance in cooperative mode does that
new_tasks = task_creation_agent(
OBJECTIVE,
enriched_result,
task["task_name"],
tasks_storage.get_task_names(),
)
print('Adding new tasks to task_storage')
for new_task in new_tasks:
new_task.update({"task_id": tasks_storage.next_task_id()})
print(str(new_task))
tasks_storage.append(new_task)
if not JOIN_EXISTING_OBJECTIVE: prioritization_agent()
# Sleep a bit before checking the task list again
time.sleep(5)
else:
print('Done.')
loop = False
Conclusion
소스 코드를 위주로 살펴보아서, 해당 알고리즘이 어떤 형태로 task list들을 고도화해나가는 지를 예시로 살펴보기 좋은 자료로 해당 링크를 공유한다.