excel文档上传预处理
- 刚进excel对话界面需要上传excel文件
- 选好文件之后,点击上传
- 前端抓包,获得请求路径
\DB-GPT-main\dbgpt\app\openapi\api_v1\api_v1.py(方法具体位置)
@router.post("/v1/chat/mode/params/file/load")
async def params_load(
conv_uid: str,
chat_mode: str,
model_name: str,
user_name: Optional[str] = None,
sys_code: Optional[str] = None,
doc_file: UploadFile = File(...),
):
print(f"params_load: {conv_uid},{chat_mode},{model_name}")
try:
if doc_file:
# Save the uploaded file
upload_dir = os.path.join(KNOWLEDGE_UPLOAD_ROOT_PATH, chat_mode)
os.makedirs(upload_dir, exist_ok=True)
upload_path = os.path.join(upload_dir, doc_file.filename)
async with aiofiles.open(upload_path, "wb") as f:
await f.write(await doc_file.read())
# Prepare the chat
dialogue = ConversationVo(
conv_uid=conv_uid,
chat_mode=chat_mode,
select_param=doc_file.filename,
model_name=model_name,
user_name=user_name,
sys_code=sys_code,
)
# 有excel_reader参数,读取了excel的内容
chat: BaseChat = await get_chat_instance(dialogue)
resp = await chat.prepare()
print('-------------------------')
rres = Result.succ(get_hist_messages(conv_uid))
print(rres)
# Refresh messages
return rres
except Exception as e:
logger.error("excel load error!", e)
return Result.failed(code="E000X", msg=f"File Load Error {str(e)}")
- debug发现chat: BaseChat = await get_chat_instance(dialogue)读取了excel的内容
一路debug找到了读取excel文件的关键代码
D:\Code\Code_jupyter\DB-GPT-main\dbgpt\app\scene\chat_data\chat_excel\excel_reader.py
class ExcelReader:
def __init__(self, file_path):
file_name = os.path.basename(file_path)
self.file_name_without_extension = os.path.splitext(file_name)[0]
encoding, confidence = detect_encoding(file_path)
logger.info(f"Detected Encoding: {encoding} (Confidence: {confidence})")
self.excel_file_name = file_name
self.extension = os.path.splitext(file_name)[1]
# read excel file
if file_path.endswith(".xlsx") or file_path.endswith(".xls"):
#初步读取的 DataFrame,用于检查和处理列名
df_tmp = pd.read_excel(file_path, index_col=False)
#使用 converters 参数格式化每列的数据格式
self.df = pd.read_excel(
file_path,
index_col=False,
converters={i: csv_colunm_foramt for i in range(df_tmp.shape[1])},
)
elif file_path.endswith(".csv"):
df_tmp = pd.read_csv(file_path, index_col=False, encoding=encoding)
self.df = pd.read_csv(
file_path,
index_col=False,
encoding=encoding,
# csv_colunm_foramt 可以修改更多,只是针对美元人民币符号,假如是“你好¥¥¥”则会报错!
converters={i: csv_colunm_foramt for i in range(df_tmp.shape[1])},
)
else:
raise ValueError("Unsupported file format.")
self.df.replace("", np.nan, inplace=True)
#进行清洗和处理
unnamed_columns_tmp = [
col
for col in df_tmp.columns
if col.startswith("Unnamed") and df_tmp[col].isnull().all()
]
df_tmp.drop(columns=unnamed_columns_tmp, inplace=True)
self.df = self.df[df_tmp.columns.values]
#
self.columns_map = {}
for column_name in df_tmp.columns:
self.df[column_name] = self.df[column_name].astype(str)
self.columns_map.update({column_name: excel_colunm_format(column_name)})
try:
self.df[column_name] = pd.to_datetime(self.df[column_name]).dt.strftime(
"%Y-%m-%d"
)
except ValueError:
try:
self.df[column_name] = pd.to_numeric(self.df[column_name])
except ValueError:
try:
self.df[column_name] = self.df[column_name].astype(str)
except Exception:
print("Can't transform column: " + column_name)
self.df = self.df.rename(columns=lambda x: x.strip().replace(" ", "_"))
# 连接 DuckDB 数据库
self.db = duckdb.connect(database=":memory:", read_only=False)
self.table_name = "excel_data"
# write data in duckdb
self.db.register(self.table_name, self.df)
# 获取结果并打印表结构信息
result = self.db.execute(f"DESCRIBE {self.table_name}")
columns = result.fetchall()
for column in columns:
print(column)
def run(self, sql):
try:
if f'"{self.table_name}"' in sql:
sql = sql.replace(f'"{self.table_name}"', self.table_name)
sql = add_quotes_to_chinese_columns(sql)
print(f"excute sql:{sql}")
results = self.db.execute(sql)
colunms = []
for descrip in results.description:
colunms.append(descrip[0])
return colunms, results.fetchall()
except Exception as e:
logger.error(f"excel sql run error!, {str(e)}")
raise ValueError(f"Data Query Exception!\\nSQL[{sql}].\\nError:{str(e)}")
#通过 SQL 查询将结果转换为 DataFrame 返回
def get_df_by_sql_ex(self, sql):
colunms, values = self.run(sql)
return pd.DataFrame(values, columns=colunms)
def get_sample_data(self):
return self.run(f"SELECT * FROM {self.table_name} LIMIT 5;")
- chat: BaseChat = await get_chat_instance(dialogue)获取了一个对话实例
在上传文档的时候是用的下面的提示词模板
D:\Code\Code_jupyter\DB-GPT-main\dbgpt\app\scene\chat_data\chat_excel\excel_analyze\prompt.py
_PROMPT_SCENE_DEFINE_ZH = """你是一个数据分析专家!"""
_DEFAULT_TEMPLATE_ZH = """
请使用历史对话中的数据结构信息,在满足下面约束条件下通过duckdb sql数据分析回答用户的问题。
约束条件:
1.请充分理解用户的问题,使用duckdb sql的方式进行分析, 分析内容按下面要求的输出格式返回,sql请输出在对应的sql参数中
2.请从如下给出的展示方式种选择最优的一种用以进行数据渲染,将类型名称放入返回要求格式的name参数值种,如果找不到最合适的则使用'Table'作为展示方式,可用数据展示方式如下: {display_type}
3.SQL中需要使用的表名是: {table_name},请检查你生成的sql,不要使用没在数据结构中的列名
4.优先使用数据分析的方式回答,如果用户问题不涉及数据分析内容,你可以按你的理解进行回答
5.输出内容中sql部分转换为:<api-call><name>[数据显示方式]</name><args><sql>[正确的duckdb数据分析sql]</sql></args></api- call> 这样的格式,参考返回格式要求
请一步一步思考,给出回答,并确保你的回答内容格式如下:
对用户说的想法摘要.<api-call><name>[数据展示方式]</name><args><sql>[正确的duckdb数据分析sql]</sql></args></api-call>
用户问题:{user_input}
"""
还有一个提示词是,在读取了文件内容之后,用另外一个提示词
里面data_example的内容是
def get_sample_data(self):
return self.run(f"SELECT * FROM {self.table_name} LIMIT 5;")
从duckdb取的样例数据
prompt
_DEFAULT_TEMPLATE_ZH = """
下面是用户文件{file_name}的一部分数据,请学习理解该数据的结构和内容,按要求输出解析结果:
{data_example}
分析各列数据的含义和作用,并对专业术语进行简单明了的解释, 如果是时间类型请给出时间格式类似:yyyy-MM-dd HH:MM:ss.
将列名作为属性名,分析解释作为属性值,组成json数组,并输出在返回json内容的ColumnAnalysis属性中.
请不要修改或者翻译列名,确保和给出数据列名一致.
针对数据从不同维度提供一些有用的分析思路给用户。
请一步一步思考,确保只以JSON格式回答,具体格式如下:
{response}
"""
_RESPONSE_FORMAT_SIMPLE_ZH = {
"DataAnalysis": "数据内容分析总结",
"ColumnAnalysis": [{"column name": "字段1介绍,专业术语解释(请尽量简单明了)"}],
"AnalysisProgram": ["1.分析方案1", "2.分析方案2"],
}
[ModelMessage(role='system', content='你是一个数据分析专家. \n下面是用户文件故障记录表2.csv的一部分数据,请学习理解该数据的结构和内容,按要求输出解析结果:\n [["\\u65f6\\u95f4", "\\u673a\\u7ec4", "\\u8bbe\\u5907", "\\u6545\\u969c\\u539f\\u56e0"], ["2024-09-23", "A\\u7ec4", "\\u53d1\\u7535\\u673a1", "\\u77ed\\u8def "], ["2024-09-23", "B\\u7ec4", "\\u53d8\\u538b\\u56682", "\\u8fc7\\u70ed "], ["2024-09-23", "C\\u7ec4", "\\u51b7\\u5374\\u7cfb\\u7edf", "\\u6f0f\\u6c34 "], ["2024-09-23", "A\\u7ec4", "\\u63a7\\u5236\\u677f", "\\u8f6f\\u4ef6\\u6545\\u969c "], ["2024-09-23", "D\\u7ec4", "\\u7535\\u52a8\\u673a3", "\\u8f74\\u627f\\u635f\\u574f "]]\n分析各列数据的含义和作用,并对专业术语进行简单明了的解释, 如果是时间类型请给出时间格式类似:yyyy-MM-dd HH:MM:ss.\n将列名作为属性名,分析解释作为属性值,组成json数组,并输出在返回json内容的ColumnAnalysis属性中.\n请不要修改或者翻译列名,确保和给出数据列名一致.\n针对数据从不同维度提供一些有用的分析思路给用户。\n\n请一步一步思考,确保只以JSON格式回答,具体格式如下:\n "{\\n \\"DataAnalysis\\": \\"数据内容分析总结\\",\\n \\"ColumnAnalysis\\": [\\n {\\n \\"column name\\": \\"字段1介绍,专业术语解释(请尽量简单明了)\\"\\n }\\n ],\\n \\"AnalysisProgram\\": [\\n \\"1.分析方案1\\",\\n \\"2.分析方案2\\"\\n ]\\n}"\n', round_index=0), ModelMessage(role='human', content='故障记录表2.csv', round_index=0)]
然后交给模型处理
D:\Code\Code_jupyter\DB-GPT-main\dbgpt\app\scene\base_chat.py
async def _no_streaming_call_with_retry(self, payload):
with root_tracer.start_span("BaseChat.invoke_worker_manager.generate"):
model_output = await self.call_llm_operator(payload)
ai_response_text = self.prompt_template.output_parser.parse_model_nostream_resp(
model_output, self.prompt_template.sep
)
prompt_define_response = (
self.prompt_template.output_parser.parse_prompt_response(ai_response_text)
)
metadata = {
"model_output": model_output.to_dict(),
"ai_response_text": ai_response_text,
"prompt_define_response": self._parse_prompt_define_response(
prompt_define_response
),
}
with root_tracer.start_span("BaseChat.do_action", metadata=metadata):
result = await blocking_func_to_async(
self._executor, self.do_action, prompt_define_response
)
speak_to_user = self.get_llm_speak(prompt_define_response)
view_message = await blocking_func_to_async(
self._executor,
self.prompt_template.output_parser.parse_view_response,
speak_to_user,
result,
prompt_define_response,
)
return ai_response_text, view_message.replace("\n", "\\n")
在view_message中又做了格式处理
D:\Code\Code_jupyter\DB-GPT-main\dbgpt\app\scene\chat_data\chat_excel\excel_learning\out_parser.py
def parse_view_response(self, speak, data, prompt_response) -> str:
if data and not isinstance(data, str):
### tool out data to table view
html_title = f"### **Data Summary**\n{data.desciption} "
html_colunms = self.__build_colunms_html(data.clounms)
html_plans = self.__build_plans_html(data.plans)
html = f"""{html_title}\n{html_colunms}\n{html_plans}"""
return html
else:
return speak
解析完这个html的结果是
html解析结果
### **Data Summary**
这是一个关于用户设备故障的记录表,包含了故障发生的时间、涉及的机群、具体的设备以及故障的原由。
### **Data Structure**
- **1.[column name]** _时间(Time)_
- **1.[description]** _记录了故障发生的日期,格式为年-月-日(yyyy-MM-dd)_
- **2.[column name]** _机群(Cluster)_
- **2.[description]** _故障发生的设备所属的机群,例如A组、B组等_
- **3.[column name]** _设备(Equipment)_
- **3.[description]** _发生故障的具体设备,比如发电机组编号、变压器编号等_
- **4.[column name]** _故障原因(Cause)_
- **4.[description]** _导致设备故障的具体原因,例如短路、过热、漏水、软件故障、机械故障等_
### **Analysis plans**
1. 分析故障趋势:通过时间(Time)查看故障发生的季节性、周期性规律,以预测可能的故障高发时段。
2. 机群故障分析:统计各机群(Cluster)的故障频率,找出故障率较高的机群,进行重点维护。
3. 设备故障分析:按设备(Equipment)类型分析故障频率,识别出故障频发的设备,评估设备可靠性。
4. 故障原因分类:对故障原因(Cause)进行统计,识别主要故障类型,优化设备设计或改进维护流程。
5. 故障预防模型:结合历史数据,建立机器学习模型,预测未来可能出现的故障,提前进行预防措施。
6. 故障响应时间分析:计算从故障发生到记录的时间,评估故障报告的及时性,优化故障响应流程
最后呈现在页面上的内容
对话
使用的提示词模板
_DEFAULT_TEMPLATE_ZH = """
请使用历史对话中的数据结构信息,在满足下面约束条件下通过duckdb sql数据分析回答用户的问题。
约束条件:
1.请充分理解用户的问题,使用duckdb sql的方式进行分析, 分析内容按下面要求的输出格式返回,sql请输出在对应的sql参数中
2.请从如下给出的展示方式种选择最优的一种用以进行数据渲染,将类型名称放入返回要求格式的name参数值种,如果找不到最合适的则使用'Table'作为展示方式,可用数据展示方式如下: {display_type}
3.SQL中需要使用的表名是: {table_name},请检查你生成的sql,不要使用没在数据结构中的列名
4.优先使用数据分析的方式回答,如果用户问题不涉及数据分析内容,你可以按你的理解进行回答
5.输出内容中sql部分转换为:<api-call><name>[数据显示方式]</name><args><sql>[正确的duckdb数据分析sql]</sql></args></api- call> 这样的格式,参考返回格式要求
请一步一步思考,给出回答,并确保你的回答内容格式如下:
对用户说的想法摘要.<api-call><name>[数据展示方式]</name><args><sql>[正确的duckdb数据分析sql]</sql></args></api-call>
用户问题:{user_input}
"""
role='system' content="""你是一个数据分析专家!
请使用历史对话中的数据结构信息,
在满足下面约束条件下通过duckdb sql数据分析回答用户的问题。
约束条件:\n\t1.
请充分理解用户的问题,使用duckdb sql的方式进行分析, 分析内容按下面要求的输出格式返回,
sql请输出在对应的sql参数中
\t2.请从如下给出的展示方式种选择最优的一种用以进行数据渲染,
将类型名称放入返回要求格式的name参数值种,如果找不到最合适的则使用'Table'作为展示方式,
可用数据展示方式如下: response_line_chart:used to display comparative trend analysis data
response_pie_chart:suitable for scenarios such as proportion and distribution tatistics
response_table:
suitable for display with many display columns or non-numeric columns
response_scatter_plot:Suitable for exploring relationships between variables, detecting outliers, etc.
response_bubble_chart:Suitable for relationships between multiple variables,
highlighting outliers or special situations, etc.
response_donut_chart:Suitable for hierarchical structure representation, category proportion display and highlighting key categories, etc.
response_area_chart:Suitable for visualization of time series data, comparison of multiple groups of data, analysis of data change trends, etc.
response_heatmap:Suitable for visual analysis of time series data, large-scale data sets, distribution of classified data, etc.
\t3.SQL中需要使用的表名是: excel_data,请检查你生成的sql,不要使用没在数据结构中的列名
\t4.优先使用数据分析的方式回答,如果用户问题不涉及数据分析内容,你可以按你的理解进行回答
\t5.输出内容中sql部分转换为:<api-call><name>[数据显示方式]</name><args><sql>[正确的duckdb数据分析sql]</sql></args></api- call> 这样的格式,参考返回格式要求
请一步一步思考,给出回答,并确保你的回答内容格式如下:
对用户说的想法摘要.<api-call><name>[数据展示方式]</name><args><sql>[正确的duckdb数据分析sql]</sql></args></api-call>\n\n用户问题:帮我查询断路器发生了什么故障\n""" round_index=0
D:\Code\Code_jupyter\DB-GPT-main\dbgpt\app\openapi\api_v1\api_v1.py
直接进对话框
@router.post("/v1/chat/completions")
async def chat_completions(
dialogue: ConversationVo = Body(),
flow_service: FlowService = Depends(get_chat_flow),
):
#这里是选择的进行流式输出
return StreamingResponse(
stream_generator(chat, dialogue.incremental, dialogue.model_name),
headers=headers,
media_type="text/plain",
)
async def stream_generator(chat, incremental: bool, model_name: str):
"""Generate streaming responses
Our goal is to generate an openai-compatible streaming responses.
Currently, the incremental response is compatible, and the full response will be transformed in the future.
Args:
chat (BaseChat): Chat instance.
incremental (bool): Used to control whether the content is returned incrementally or in full each time.
model_name (str): The model name
Yields:
_type_: streaming responses
"""
span = root_tracer.start_span("stream_generator")
msg = "[LLM_ERROR]: llm server has no output, maybe your prompt template is wrong."
previous_response = ""
async for chunk in chat.stream_call():
if chunk:
msg = chunk.replace("\ufffd", "")
if incremental:
incremental_output = msg[len(previous_response) :]
choice_data = ChatCompletionResponseStreamChoice(
index=0,
delta=DeltaMessage(role="assistant", content=incremental_output),
)
chunk = ChatCompletionStreamResponse(
id=chat.chat_session_id, choices=[choice_data], model=model_name
)
json_chunk = model_to_json(
chunk, exclude_unset=True, ensure_ascii=False
)
yield f"data: {json_chunk}\n\n"
else:
# TODO generate an openai-compatible streaming responses
msg = msg.replace("\n", "\\n")
yield f"data:{msg}\n\n"
previous_response = msg
await asyncio.sleep(0.02)
if incremental:
yield "data: [DONE]\n\n"
span.end()
进行流式回答
D:\Code\Code_jupyter\DB-GPT-main\dbgpt\app\scene\base_chat.py
async for chunk in chat.stream_call():
async def stream_call(self):
# TODO Retry when server connection error
payload = await self._build_model_request()
logger.info(f"payload request: \n{payload}")
ai_response_text = ""
span = root_tracer.start_span(
"BaseChat.stream_call", metadata=payload.to_dict()
)
payload.span_id = span.span_id
try:
async for output in self.call_streaming_operator(payload):
# Plugin research in result generation
msg = self.prompt_template.output_parser.parse_model_stream_resp_ex(
output, 0
)
#模型的输出为
"""
msg = "为了找出断路器发生了哪些故障,我们需要从数据中筛选出所有涉及“断路器”的故障记录,并查看这些记录中的故障原因。
<api-call><name>response_table</name><args><sql>SELECT 故障原因, COUNT(*) AS 故障次数 FROM excel_data WHERE 设备 LIKE '%断路器%' GROUP BY 故障原因</sql></args></api-call>"
"""
# 给出答案
view_msg = self.stream_plugin_call(msg)
"""
在经过处理之后
view_msg =
为了找出断路器发生了哪些故障,我们需要从数据中筛选出所有涉及“断路器”的故障记录,并查看这些记录中的故障原因。 <chart-view content="{"type": "response_table", "sql": "SELECT 故障原因, COUNT(*) AS 故障次数 FROM excel_data WHERE 设备 LIKE '%断路器%' GROUP BY 故障原因", "data": [{"故障原因": "跳闸 ", "故障次数": 1}]}">\n</chart-view>
"""
view_msg = view_msg.replace("\n", "\\n")
yield view_msg
self.current_message.add_ai_message(msg)
view_msg = self.stream_call_reinforce_fn(view_msg)
self.current_message.add_view_message(view_msg)
span.end()
except Exception as e:
print(traceback.format_exc())
logger.error("model response parse failed!" + str(e))
self.current_message.add_view_message(
f"""<span style=\"color:red\">ERROR!</span>{str(e)}\n {ai_response_text} """
)
### store current conversation
span.end(metadata={"error": str(e)})
await blocking_func_to_async(
self._executor, self.current_message.end_current_round
)
D:\Code\Code_jupyter\DB-GPT-main\dbgpt\app\scene\chat_data\chat_excel\excel_analyze\chat.py
def stream_plugin_call(self, text):
text = (
text.replace("\\n", " ")
.replace("\n", " ")
.replace("\_", "_")
.replace("\\", " ")
)
with root_tracer.start_span(
"ChatExcel.stream_plugin_call.run_display_sql", metadata={"text": text}
):
res = self.api_call.display_sql_llmvis(
text, self.excel_reader.get_df_by_sql_ex
)
return res
"""
res = self.api_call.display_sql_llmvis(
text, self.excel_reader.get_df_by_sql_ex
)
"""
这里对于display_sql_llmvis方法的一个说明
\Code\Code_jupyter\DB-GPT-main\dbgpt\agent\util\api_call.py
def display_sql_llmvis(self, llm_text, sql_run_func):
"""Render charts using the Antv standard protocol.
Args:
llm_text: LLM response text
sql_run_func: sql run function
Returns:
ChartView protocol text
"""
try:
if self._is_need_wait_plugin_call(
llm_text
) and self.check_last_plugin_call_ready(llm_text):
# wait api call generate complete
self.update_from_context(llm_text)
for key, value in self.plugin_status_map.items():
if value.status == Status.TODO.value:
value.status = Status.RUNNING.value
logger.info(f"SQL execution:{value.name},{value.args}")
try:
sql = value.args["sql"]
if sql is not None and len(sql) > 0:
data_df = sql_run_func(sql)
value.df = data_df
value.api_result = json.loads(
data_df.to_json(
orient="records",
date_format="iso",
date_unit="s",
)
)
value.status = Status.COMPLETE.value
else:
value.status = Status.FAILED.value
value.err_msg = "No executable sql!"
except Exception as e:
logger.error(f"data prepare exception!{str(e)}")
value.status = Status.FAILED.value
value.err_msg = str(e)
value.end_time = datetime.now().timestamp() * 1000
except Exception as e:
logger.error("Api parsing exception", e)
raise ValueError("Api parsing exception," + str(e))
return self.api_view_context(llm_text, True)
可以看到在代码中
self.plugin_status_map.items() =
{"<api-call><name>response_table</name><args><sql>SELECT 故障原因 FROM excel_data WHERE 设备 LIKE '%断路器%'</sql></args></api-call>": PluginStatus(name='response_table', location=[57], args={'args': None, 'sql': "SELECT 故障原因 FROM excel_data WHERE 设备 LIKE '%断路器%'"}, status='todo', logo_url=None, api_result=None, err_msg=None, start_time=1727155331909.641, end_time=None, df=None)}
"""
这块用于提取sql语句