Merge pull request #116 from arc53/code-ingestion

Code ingestion
This commit is contained in:
Alex
2023-02-22 18:46:50 +00:00
committed by GitHub
4 changed files with 476 additions and 1 deletions

126
scripts/code_docs_gen.py Normal file
View File

@@ -0,0 +1,126 @@
from pathlib import Path
from langchain.text_splitter import CharacterTextSplitter
import faiss
from langchain.vectorstores import FAISS
from langchain.embeddings import OpenAIEmbeddings
from langchain.llms import OpenAI
from langchain.prompts import PromptTemplate
import pickle
import dotenv
import tiktoken
import sys
from argparse import ArgumentParser
import ast
dotenv.load_dotenv()
ps = list(Path("inputs").glob("**/*.py"))
data = []
sources = []
for p in ps:
with open(p) as f:
data.append(f.read())
sources.append(p)
# with open('inputs/client.py', 'r') as f:
# tree = ast.parse(f.read())
# print(tree)
def get_functions_in_class(node):
functions = []
functions_code = []
for child in node.body:
if isinstance(child, ast.FunctionDef):
functions.append(child.name)
functions_code.append(ast.unparse(child))
return functions, functions_code
def get_classes_and_functions(source_code):
tree = ast.parse(source_code)
classes = {}
for node in tree.body:
if isinstance(node, ast.ClassDef):
class_name = node.name
function_name, function = get_functions_in_class(node)
# join function name and function code
functions = dict(zip(function_name, function))
classes[class_name] = functions
return classes
structure_dict = {}
c1 = 0
for code in data:
classes = get_classes_and_functions(ast.parse(code))
source = str(sources[c1])
structure_dict[source] = classes
c1 += 1
# save the structure dict as json
import json
with open('structure_dict.json', 'w') as f:
json.dump(structure_dict, f)
# llm = OpenAI(temperature=0)
# prompt = PromptTemplate(
# input_variables=["code"],
# template="Code: {code}, Documentation: ",
# )
#
# print(prompt.format(code="print('hello world')"))
# print(llm(prompt.format(code="print('hello world')")))
if not Path("outputs").exists():
Path("outputs").mkdir()
c1 = len(structure_dict)
c2 = 0
for source, classes in structure_dict.items():
c2 += 1
print(f"Processing file {c2}/{c1}")
f1 = len(classes)
f2 = 0
for class_name, functions in classes.items():
f2 += 1
print(f"Processing class {f2}/{f1}")
source_w = source.replace("inputs/", "")
source_w = source_w.replace(".py", ".txt")
if not Path(f"outputs/{source_w}").exists():
with open(f"outputs/{source_w}", "w") as f:
f.write(f"Class: {class_name}")
else:
with open(f"outputs/{source_w}", "a") as f:
f.write(f"\n\nClass: {class_name}")
# append class name to the front
for function in functions:
b1 = len(functions)
b2 = 0
print(f"Processing function {b2}/{b1}")
b2 += 1
prompt = PromptTemplate(
input_variables=["code"],
template="Code: \n{code}, \nDocumentation: ",
)
llm = OpenAI(temperature=0)
response = llm(prompt.format(code=functions[function]))
if not Path(f"outputs/{source_w}").exists():
with open(f"outputs/{source_w}", "w") as f:
f.write(f"Function: {functions[function]}, \nDocumentation: {response}")
else:
with open(f"outputs/{source_w}", "a") as f:
f.write(f"\n\nFunction: {functions[function]}, \nDocumentation: {response}")

View File

@@ -1,10 +1,12 @@
from collections import defaultdict
import os
import sys
import nltk
import dotenv
import typer
import ast
from collections import defaultdict
from pathlib import Path
from typing import List, Optional
from langchain.text_splitter import RecursiveCharacterTextSplitter
@@ -12,6 +14,7 @@ from langchain.text_splitter import RecursiveCharacterTextSplitter
from parser.file.bulk import SimpleDirectoryReader
from parser.schema.base import Document
from parser.open_ai_func import call_openai_api, get_user_permission
from parser.py2doc import get_classes, get_functions, transform_to_docs
dotenv.load_dotenv()
@@ -78,5 +81,29 @@ def ingest(yes: bool = typer.Option(False, "-y", "--yes", prompt=False,
for directory, folder_name in zip(dir, folder_names):
process_one_docs(directory, folder_name)
@app.command()
def convert():
ps = list(Path("inputs").glob("**/*.py"))
data = []
sources = []
for p in ps:
with open(p) as f:
data.append(f.read())
sources.append(p)
functions_dict = {}
classes_dict = {}
c1 = 0
for code in data:
functions = get_functions(ast.parse(code))
source = str(sources[c1])
functions_dict[source] = functions
classes = get_classes(code)
classes_dict[source] = classes
c1 += 1
transform_to_docs(functions_dict, classes_dict)
if __name__ == "__main__":
app()

167
scripts/outputs/test.md Normal file
View File

@@ -0,0 +1,167 @@
# Function name: get_functions_in_class
Function:
```
def get_functions_in_class(source_code, class_name):
tree = ast.parse(source_code)
functions = []
for node in tree.body:
if isinstance(node, ast.ClassDef):
if node.name == class_name:
for function in node.body:
if isinstance(function, ast.FunctionDef):
functions.append(function.name)
return functions
```,
Documentation:
get_functions_in_class(source_code, class_name)
Inputs:
source_code (str): The source code of the program.
class_name (str): The name of the class.
Outputs:
functions (list): A list of the functions in the class.
Description:
This function takes in a source code and a class name and returns a list of the functions in the class. It uses the ast module to parse the source code and find the class definition. It then iterates through the body of the class and checks if each node is a function definition. If it is, it adds the name of the function to the list of functions.
# Function name: process_functions
Function:
```
def process_functions(functions_dict):
c1 = len(functions_dict)
c2 = 0
for (source, functions) in functions_dict.items():
c2 += 1
print(f'Processing file {c2}/{c1}')
f1 = len(functions)
f2 = 0
source_w = source.replace('inputs/', '')
source_w = source_w.replace('.py', '.md')
create_subfolder(source_w)
for (name, function) in functions.items():
f2 += 1
print(f'Processing function {f2}/{f1}')
response = generate_response(function)
write_output_file(source_w, name, function, response)
```,
Documentation:
This function takes in a dictionary of functions and processes them. It takes the source file and the functions from the dictionary and creates a subfolder for the source file. It then generates a response for each function and writes the output file. The output file contains the function, the response, and the source file.
# Function name: get_functions_in_class
Function:
```
def get_functions_in_class(source_code, class_name):
tree = ast.parse(source_code)
functions = []
for node in tree.body:
if isinstance(node, ast.ClassDef):
if node.name == class_name:
for function in node.body:
if isinstance(function, ast.FunctionDef):
functions.append(function.name)
return functions
```,
Documentation:
get_functions_in_class(source_code, class_name)
Inputs:
source_code (str): The source code of the program.
class_name (str): The name of the class.
Outputs:
functions (list): A list of the functions in the class.
Description:
This function takes in a source code and a class name and returns a list of the functions in the class. It uses the ast module to parse the source code and find the class definition. It then iterates through the body of the class and checks if each node is a function definition. If it is, it adds the name of the function to the list of functions.
# Function name: process_functions
Function:
```
def process_functions(functions_dict):
c1 = len(functions_dict)
c2 = 0
for (source, functions) in functions_dict.items():
c2 += 1
print(f'Processing file {c2}/{c1}')
f1 = len(functions)
f2 = 0
source_w = source.replace('inputs/', '')
source_w = source_w.replace('.py', '.md')
create_subfolder(source_w)
for (name, function) in functions.items():
f2 += 1
print(f'Processing function {f2}/{f1}')
response = generate_response(function)
write_output_file(source_w, name, function, response)
```,
Documentation:
This function takes in a dictionary of functions and processes them. It takes the source file and the functions from the dictionary and creates a subfolder for the source file. It then generates a response for each function and writes the output file for each function.
# Function name: get_functions_in_class
Function:
```
def get_functions_in_class(source_code, class_name):
tree = ast.parse(source_code)
functions = []
for node in tree.body:
if isinstance(node, ast.ClassDef):
if node.name == class_name:
for function in node.body:
if isinstance(function, ast.FunctionDef):
functions.append(function.name)
return functions
```,
Documentation:
get_functions_in_class(source_code, class_name)
Inputs:
source_code (str): The source code of the program.
class_name (str): The name of the class.
Outputs:
functions (list): A list of the functions in the class.
Description:
This function takes in a source code and a class name and returns a list of the functions in the class. It uses the ast module to parse the source code and find the class definition. It then iterates through the body of the class and checks if each node is a function definition. If it is, it adds the name of the function to the list of functions.
# Function name: process_functions
Function:
```
def process_functions(functions_dict):
c1 = len(functions_dict)
c2 = 0
for (source, functions) in functions_dict.items():
c2 += 1
print(f'Processing file {c2}/{c1}')
f1 = len(functions)
f2 = 0
source_w = source.replace('inputs/', '')
source_w = source_w.replace('.py', '.md')
create_subfolder(source_w)
for (name, function) in functions.items():
f2 += 1
print(f'Processing function {f2}/{f1}')
response = generate_response(function)
write_output_file(source_w, name, function, response)
```,
Documentation:
This function takes in a dictionary of functions and processes them. It takes the source file and the functions from the dictionary and creates a subfolder for the source file. It then generates a response for each function and writes the output file for each function.

155
scripts/parser/py2doc.py Normal file
View File

@@ -0,0 +1,155 @@
from pathlib import Path
from langchain.llms import OpenAI
from langchain.prompts import PromptTemplate
import dotenv
import ast
import typer
import tiktoken
dotenv.load_dotenv()
def get_functions(source_code):
tree = ast.parse(source_code)
functions = {}
for node in tree.body:
if isinstance(node, ast.FunctionDef):
functions[node.name] = ast.unparse(node)
return functions
def get_functions_names(node):
functions = []
for child in node.body:
if isinstance(child, ast.FunctionDef):
functions.append(child.name)
return functions
def get_classes(source_code):
tree = ast.parse(source_code)
classes = {}
for node in tree.body:
if isinstance(node, ast.ClassDef):
classes[node.name] = get_functions_names(node)
return classes
def get_functions_in_class(source_code, class_name):
tree = ast.parse(source_code)
functions = []
for node in tree.body:
if isinstance(node, ast.ClassDef):
if node.name == class_name:
for function in node.body:
if isinstance(function, ast.FunctionDef):
functions.append(function.name)
return functions
def parse_functions(functions_dict):
c1 = len(functions_dict)
c2 = 0
for source, functions in functions_dict.items():
c2 += 1
print(f"Processing file {c2}/{c1}")
f1 = len(functions)
f2 = 0
source_w = source.replace("inputs/", "")
source_w = source_w.replace(".py", ".md")
# this is how we check subfolders
if "/" in source_w:
subfolders = source_w.split("/")
subfolders = subfolders[:-1]
subfolders = "/".join(subfolders)
if not Path(f"outputs/{subfolders}").exists():
Path(f"outputs/{subfolders}").mkdir(parents=True)
for name, function in functions.items():
f2 += 1
print(f"Processing function {f2}/{f1}")
prompt = PromptTemplate(
input_variables=["code"],
template="Code: \n{code}, \nDocumentation: ",
)
llm = OpenAI(temperature=0)
response = llm(prompt.format(code=function))
if not Path(f"outputs/{source_w}").exists():
with open(f"outputs/{source_w}", "w") as f:
f.write(f"# Function name: {name} \n\nFunction: \n```\n{function}\n```, \nDocumentation: \n{response}")
else:
with open(f"outputs/{source_w}", "a") as f:
f.write(f"\n\n# Function name: {name} \n\nFunction: \n```\n{function}\n```, \nDocumentation: \n{response}")
def parse_classes(classes_dict):
c1 = len(classes_dict)
c2 = 0
for source, classes in classes_dict.items():
c2 += 1
print(f"Processing file {c2}/{c1}")
f1 = len(classes)
f2 = 0
source_w = source.replace("inputs/", "")
source_w = source_w.replace(".py", ".md")
if "/" in source_w:
subfolders = source_w.split("/")
subfolders = subfolders[:-1]
subfolders = "/".join(subfolders)
if not Path(f"outputs/{subfolders}").exists():
Path(f"outputs/{subfolders}").mkdir(parents=True)
for name, function_names in classes.items():
print(f"Processing Class {f2}/{f1}")
f2 += 1
prompt = PromptTemplate(
input_variables=["class_name", "functions_names"],
template="Class name: {class_name} \nFunctions: {functions_names}, \nDocumentation: ",
)
llm = OpenAI(temperature=0)
response = llm(prompt.format(class_name=name, functions_names=function_names))
if not Path(f"outputs/{source_w}").exists():
with open(f"outputs/{source_w}", "w") as f:
f.write(f"# Class name: {name} \n\nFunctions: \n{function_names}, \nDocumentation: \n{response}")
else:
with open(f"outputs/{source_w}", "a") as f:
f.write(f"\n\n# Class name: {name} \n\nFunctions: \n{function_names}, \nDocumentation: \n{response}")
#User permission
def transform_to_docs(functions_dict, classes_dict):
# Function to ask user permission to call the OpenAI api and spend their OpenAI funds.
# Here we convert dicts to a string and calculate the number of OpenAI tokens the string represents.
docs_content = ""
for key, value in functions_dict.items():
docs_content += str(key) + str(value)
for key, value in classes_dict.items():
docs_content += str(key) + str(value)
encoding = tiktoken.get_encoding("cl100k_base")
num_tokens = len(encoding.encode(docs_content))
total_price = ((num_tokens / 1000) * 0.02)
# Here we print the number of tokens and the approx user cost with some visually appealing formatting.
print(f"Number of Tokens = {format(num_tokens, ',d')}")
print(f"Approx Cost = ${format(total_price, ',.2f')}")
#Here we check for user permission before calling the API.
user_input = input("Price Okay? (Y/N) \n").lower()
if user_input == "y":
if not Path("outputs").exists():
Path("outputs").mkdir()
parse_functions(functions_dict)
print("Functions done!")
parse_classes(classes_dict)
print("All done!")
elif user_input == "":
if not Path("outputs").exists():
Path("outputs").mkdir()
parse_functions(functions_dict)
print("Functions done!")
parse_classes(classes_dict)
print("All done!")
else:
print("The API was not called. No money was spent.")