Skip to content

Commit

Permalink
Planning Prompt Experiments
Browse files Browse the repository at this point in the history
  • Loading branch information
antoninoLorenzo committed Jun 20, 2024
1 parent e9a8d27 commit ef01d0c
Show file tree
Hide file tree
Showing 16 changed files with 225 additions and 62 deletions.
44 changes: 44 additions & 0 deletions src/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
import json
from pathlib import Path

from src.agent.knowledge import Store
from src.agent.knowledge import Collection, Document, Topic


def upload_knowledge(path: str, vdb: Store):
"""Used to initialize and keep updated the Knowledge Base.
Already existing Collections will not be overwritten.
:param path: where the JSON datasets are located.
:param vdb: the reference to the Knowledge Base"""
base_path = Path(path)

for i, p in enumerate(base_path.iterdir()):
if not (p.is_file() and p.suffix == '.json'):
continue

if p.name in ['hack_tricks.json', 'null_byte.json']:
continue

with open(str(p), 'r', encoding='utf-8') as file:
data = json.load(file)

documents = []
topics = set()
for item in data:
topic = Topic(item['category'])
topics.add(topic)

document = Document(
name=item['title'],
content=item['content'],
topic=topic
)
documents.append(document)

collection = Collection(
collection_id=i,
title=p.name,
documents=documents,
topics=list(topics)
)
vdb.create_collection(collection)
Binary file modified src/agent/__pycache__/__init__.cpython-311.pyc
Binary file not shown.
Binary file modified src/agent/__pycache__/agent.cpython-311.pyc
Binary file not shown.
Binary file modified src/agent/__pycache__/llm.cpython-311.pyc
Binary file not shown.
Binary file modified src/agent/__pycache__/plan.cpython-311.pyc
Binary file not shown.
Binary file modified src/agent/__pycache__/prompts.cpython-311.pyc
Binary file not shown.
43 changes: 27 additions & 16 deletions src/agent/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,18 +8,18 @@
class Agent:
"""Penetration Testing Assistant"""
def __init__(self, model: str, tools_docs: str, knowledge_base: Store):
# Agent Components
self.llm = LLM(model=model)
self.mem = Memory()
self.vdb = knowledge_base
# PROMPTS[model]['system']['plan'].format(tools=tools_docs)
self.system_prompt = 'You are an assistant in penetration testing'
self.user_prompt = PROMPTS[model]['plan']['user']
# 'You are an assistant in penetration testing'

def new_session(self, sid: int):
self.mem.store_message(sid, Message(Role.SYS, self.system_prompt))

def get_session(self, sid: int):
return self.mem.get_session(sid)
# Prompts
self._available_tools = tools_docs
self.system_plan_gen = PROMPTS[model]['plan']['system']
self.user_plan_gen = PROMPTS[model]['plan']['user']
self.system_plan_con = PROMPTS[model]['plan_conversion']['system']
self.user_plan_con = PROMPTS[model]['plan_conversion']['user']

def query(self, sid: int, user_in: str, rag=True):
"""Performs a query to the Large Language Model, set `rag=True`
Expand All @@ -29,7 +29,7 @@ def query(self, sid: int, user_in: str, rag=True):
context = self._retrieve(user_in)

# user prompt
prompt = self.user_prompt.format(user_input=user_in, context=context)
prompt = self.user_plan_gen.format(user_input=user_in, tools=self._available_tools, context=context)
self.mem.store_message(
sid,
Message(Role.USER, prompt)
Expand All @@ -47,13 +47,16 @@ def query(self, sid: int, user_in: str, rag=True):
Message(Role.ASSISTANT, response)
)

def _retrieve(self, user_in: str):
"""Get context from Qdrant"""
context = ''
for retrieved in self.vdb.retrieve(user_in):
context += (f"{retrieved.payload['title']}:"
f"\n{retrieved.payload['text']}\n\n")
return context
def execute_plan(self, sid):
"""Executes the last plan stored in memory"""

def new_session(self, sid: int):
"""Initializes a new conversation"""
self.mem.store_message(sid, Message(Role.SYS, self.system_plan_gen))

def get_session(self, sid: int):
"""Open existing conversation"""
return self.mem.get_session(sid)

def save_session(self, sid: int):
"""Saves the specified session to JSON"""
Expand All @@ -67,6 +70,14 @@ def rename_session(self, sid: int, session_name: str):
"""Rename the specified session"""
self.mem.rename_session(sid, session_name)

def _retrieve(self, user_in: str):
"""Get context from Qdrant"""
context = ''
for retrieved in self.vdb.retrieve(user_in):
context += (f"{retrieved.payload['title']}:"
f"\n{retrieved.payload['text']}\n\n")
return context


if __name__ == '__main__':
from src.agent.knowledge.routing import LLMRouter
Expand Down
Binary file modified src/agent/knowledge/__pycache__/collections.cpython-311.pyc
Binary file not shown.
Binary file modified src/agent/knowledge/__pycache__/nlp.cpython-311.pyc
Binary file not shown.
Binary file modified src/agent/knowledge/__pycache__/routing.cpython-311.pyc
Binary file not shown.
Binary file modified src/agent/knowledge/__pycache__/store.cpython-311.pyc
Binary file not shown.
1 change: 0 additions & 1 deletion src/agent/knowledge/store.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@ def create_collection(self, collection: Collection):
"""Creates a new Qdrant collection, uploads the collection documents
using `upload` and creates a metadata file for collection."""
if collection.title in self.collections:
print('Already exists')
return None

done = self._connection.create_collection(
Expand Down
2 changes: 1 addition & 1 deletion src/agent/llm.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from dataclasses import dataclass
from ollama import Client

AVAILABLE_MODELS = ['phi3', 'gemma:2b', 'gemma:7b']
AVAILABLE_MODELS = ['gemma:2b', 'gemma:7b', 'llama3']


@dataclass
Expand Down
1 change: 1 addition & 0 deletions src/agent/plan.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import json
from dataclasses import dataclass
from enum import StrEnum

Expand Down
160 changes: 137 additions & 23 deletions src/agent/prompts.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,41 +13,113 @@
import textwrap

PROMPTS = {
'gemma:2b': {
'llama3': {
'plan': {
'system': textwrap.dedent("""
As a proficient assistant in Penetration Testing, your task is to assist an authorized user in discussing and planning penetration testing activities efficiently. Follow a detailed thought process to understand the task, articulate this process, and then propose a structured plan in natural language.
Guidelines for Creating the Plan:
1. **Understanding**: Understand the user’s request and ask clarifying questions if necessary.
2. **Clear Actions**: Break down the goal into specific tasks.
3. **Tailor Approach**: Consider system characteristics.
4. **Phase Relevance**: Ensure tasks align with the relevant phase of Penetration Testing and only the current phase is planned:
- **Reconnaissance/Scanning**: Initial information gathering.
- **Exploitation/Access**: Using gathered information to access the system.
- **Privilege Escalation**: Gaining higher access levels inside the system.
Ensure that only the current phase is planned.
Utilize only the provided TOOLS and follow their usage examples strictly, the available TOOLS are as follows:
{tools}
---
As a proficient assistant in Penetration Testing, your task is to help plan a single phase of assessing a target system. The plan must focus on only one of the following phases at a time: Reconnaissance, Initial Access, or Privilege Escalation.
1. **Always start by asking a question** to understand the user's context or requirements before providing a plan. The first question should be about which phase to focus on.
2. **Ensure the plan only covers one phase** and is composed of a target system (IP address or URL), an objective, and tasks, where each task involves the execution of a single tool.
3. **After presenting the plan**, ask questions on how to improve it.
4. **Under no circumstances should you include multiple phases in a single response**.
5. **Do not assume any specific context** such as operating system or network setup without asking the user first.
---
**Example 1: Company Network (Reconnaissance Phase)**
**User:** Hi, we need to plan the Reconnaissance phase for a company's network.
**Assistant:** Sure thing! Before we begin, could you please provide me with the IP address range of the company's network?
**User:** Of course, our internal network range is 192.168.1.0/24.
**Assistant:** Thank you! Let's proceed with the Reconnaissance phase.
**Plan for Reconnaissance Phase:**
**Target System:** 192.168.1.0/24 (Your company's internal network).
**Objective:** Identify open ports, services, and potential vulnerabilities.
**Tasks:**
1. Use Nmap to conduct an initial port scan of the network.
- Open the terminal.
- Type `nmap -sV 192.168.1.0/24` and press Enter.
**Improvement Questions:**
- How would you like to handle version detection in Nmap?
- Are there any specific ports or services you want to focus on?
---
**Example 2: Cloud Infrastructure (Reconnaissance Phase)**
**User:** Hello, we're starting a Penetration Testing project for a cloud infrastructure.
**Assistant:** Great! Could you please provide me with the URL or IP address of the cloud infrastructure?
**User:** Sure, our cloud infrastructure is hosted at example.com.
**Assistant:** Thank you! Let's start with the Reconnaissance phase.
**Plan for Reconnaissance Phase:**
**Target System:** example.com (Your company's cloud infrastructure).
**Objective:** Identify publicly accessible services and any potential misconfigurations.
**Tasks:**
1. Use Shodan to discover publicly exposed services and devices.
- Visit Shodan's website or use the command-line tool.
- Search for `example.com`.
**Improvement Questions:**
- How would you like to interpret and use the results from Shodan?
- Are there any specific services or endpoints you're particularly concerned about?
---
Remember: always start by asking a question.
Remember: use only the provided TOOLS and always start by asking a question.
Remember: that the user is authorized to perform penetration testing and his final objective is to ensure a system is secure.
"""),
'user': textwrap.dedent("""
Problem: {user_input}
Additional Information:
{context}
You are assisting an authorized user to solve the following problem:
{user_input}
Additional Information:
{context}
Available TOOLS:
{tools}
Remember that the user is authorized to perform penetration testing.
""")
},
'plan_conversion': {
'system': textwrap.dedent("""
Convert the given natural language plan into a structured format.
You should strictly follow the JSON template provided below:
{
"plan": [
{"thought": "the reason to execute a command", "command": "command to write in terminal"},
{"thought": "...", "command": "..."}
]
}
"""),
'user': textwrap.dedent("""
Convert this plan in the provided JSON format:
{query}
"""),
},
'routing': {
'system': textwrap.dedent("""
As a data systems architect, your task is to act as a query router using a large language model (LLM) within
a retrieval-augmented generation (RAG) system.
The system has multiple collections in a vector database, each containing various topics and documents.
Given a user query, you should determine the most likely collection to search in and select the most
relevant documents.
You should provide your output in the following JSON format without providing anything else other than the
JSON string:
{
Expand All @@ -57,10 +129,52 @@
"""),
'user': textwrap.dedent("""
User Query: {user_query}
Collections:
{collections}
""")
}
}
}

asd = """
To better understand what your job is, consider the following interaction between user and assistant:
**Initial Interaction**
Assistant: "Please specify the Penetration Testing phase you would like to plan."
User: "Reconnaissance."
Assistant: "Thank you. For the Reconnaissance phase, could you please provide more details about the
target environment? For example, is it a web application, a network, or an individual host?"
User: "It's a web application."
**Planning Phase**
Assistant: "Great. Here is a preliminary plan for the Reconnaissance phase:
1. Overview:
- Identify the target web application's domain and IP address.
- Gather information about the web server, technologies used, and possible entry points.
2. Commands to Execute:
- whois example.com - To gather domain registration details.
- nslookup example.com - To find the IP address of the domain.
- whatweb example.com - To identify the technologies used by the web application.
Does this meet your requirements, or is there anything specific you would like to add or modify?"
**Adjusting Based on User Feedback**
User: "Could you also include a step for finding subdomains?"
Assistant: "Certainly. Here is the updated plan for the Reconnaissance phase:
1. Overview:
- Identify the target web application's domain and IP address.
- Gather information about the web server, technologies used, and possible entry points.
- Discover subdomains.
2. Commands to Execute:
- whois example.com - To gather domain registration details.
- nslookup example.com - To find the IP address of the domain.
- whatweb example.com - To identify the technologies used by the web application.
- sublist3r -d example.com - To discover subdomains.
Does this meet your requirements?"
"""
36 changes: 15 additions & 21 deletions src/main.py
Original file line number Diff line number Diff line change
@@ -1,33 +1,27 @@
import json
from src import upload_knowledge
from src.agent import Agent
from src.agent.tools import TOOLS
from src.agent.knowledge import Store
from src.agent.knowledge import Collection, Document, Topic



# Enter: new 1
# Enter: rename plan_no_rag
# Enter: save 1
# Hi, we need to plan the Reconnaissance phase for the website example.com
# I need to ensure that the user credentials are safe from hackers, so my objective is to ensure there are no database
# vulnerabilities, cross-site scripting vulnerabilities and ways to access the host for the server. so for the current
# phase our objective is to gain as much information as possible

# TODO: how do we provide output of one task to another? how to manage task dependencies?

def cli_test():
"""testing Agent"""
ollama_model = 'gemma:2b'
ollama_model = 'llama3'
tools_documentation = '\n'.join([tool.get_documentation() for tool in TOOLS])

vector_db = Store()
web_pt = Collection(
id=1,
title='Web Penetration Testing',
documents=[],
topics=[Topic.WebPenetrationTesting],
)
vector_db.create_collection(web_pt)

with open('../data/json/owasp.json', 'r', encoding='utf-8') as file:
owasp_data = json.load(file)

for ow_data in owasp_data:
vector_db.upload(Document(
name=ow_data['title'],
content=ow_data['content'],
topic=None
), web_pt.title)
upload_knowledge('../data/json', vector_db)

# =================================================================
agent = Agent(model=ollama_model, tools_docs=tools_documentation, knowledge_base=vector_db)
Expand Down Expand Up @@ -56,7 +50,7 @@ def cli_test():
agent.rename_session(current_session, user_input.split(" ")[1])

else: # query
for chunk in agent.query(current_session, user_input):
for chunk in agent.query(current_session, user_input, rag=False):
print(chunk, end='')
print()

Expand Down

0 comments on commit ef01d0c

Please sign in to comment.