DB_GPT excel研究-源码分析

excel文档上传预处理

  1. 刚进excel对话界面需要上传excel文件
  2. 选好文件之后,点击上传
    在这里插入图片描述
  3. 前端抓包,获得请求路径

\DB-GPT-main\dbgpt\app\openapi\api_v1\api_v1.py(方法具体位置)

@router.post("/v1/chat/mode/params/file/load")
async def params_load(
    conv_uid: str,
    chat_mode: str,
    model_name: str,
    user_name: Optional[str] = None,
    sys_code: Optional[str] = None,
    doc_file: UploadFile = File(...),
):
    print(f"params_load: {conv_uid},{chat_mode},{model_name}")
    try:
        if doc_file:
            # Save the uploaded file
            upload_dir = os.path.join(KNOWLEDGE_UPLOAD_ROOT_PATH, chat_mode)
            os.makedirs(upload_dir, exist_ok=True)
            upload_path = os.path.join(upload_dir, doc_file.filename)
            async with aiofiles.open(upload_path, "wb") as f:
                await f.write(await doc_file.read())

            # Prepare the chat
            dialogue = ConversationVo(
                conv_uid=conv_uid,
                chat_mode=chat_mode,
                select_param=doc_file.filename,
                model_name=model_name,
                user_name=user_name,
                sys_code=sys_code,
            )
			# 有excel_reader参数,读取了excel的内容
            chat: BaseChat = await get_chat_instance(dialogue)
            
            resp = await chat.prepare()
            print('-------------------------')
            rres = Result.succ(get_hist_messages(conv_uid))
            print(rres)
        # Refresh messages
        return rres
    except Exception as e:
        logger.error("excel load error!", e)
        return Result.failed(code="E000X", msg=f"File Load Error {str(e)}")
  1. debug发现chat: BaseChat = await get_chat_instance(dialogue)读取了excel的内容
    在这里插入图片描述
    在这里插入图片描述
    一路debug找到了读取excel文件的关键代码
    D:\Code\Code_jupyter\DB-GPT-main\dbgpt\app\scene\chat_data\chat_excel\excel_reader.py
class ExcelReader:
    def __init__(self, file_path):
        file_name = os.path.basename(file_path)
        self.file_name_without_extension = os.path.splitext(file_name)[0]
        encoding, confidence = detect_encoding(file_path)
        logger.info(f"Detected Encoding: {encoding} (Confidence: {confidence})")
        self.excel_file_name = file_name
        self.extension = os.path.splitext(file_name)[1]
        # read excel file
        if file_path.endswith(".xlsx") or file_path.endswith(".xls"):
            #初步读取的 DataFrame,用于检查和处理列名
            df_tmp = pd.read_excel(file_path, index_col=False)
			#使用 converters 参数格式化每列的数据格式
            self.df = pd.read_excel(
                file_path,
                index_col=False,
                converters={i: csv_colunm_foramt for i in range(df_tmp.shape[1])},
            )
        elif file_path.endswith(".csv"):
            df_tmp = pd.read_csv(file_path, index_col=False, encoding=encoding)
            self.df = pd.read_csv(
                file_path,
                index_col=False,
                encoding=encoding,
                # csv_colunm_foramt 可以修改更多,只是针对美元人民币符号,假如是“你好¥¥¥”则会报错!
                converters={i: csv_colunm_foramt for i in range(df_tmp.shape[1])},
            )
        else:
            raise ValueError("Unsupported file format.")

        self.df.replace("", np.nan, inplace=True)

        #进行清洗和处理

        unnamed_columns_tmp = [
            col
            for col in df_tmp.columns
            if col.startswith("Unnamed") and df_tmp[col].isnull().all()
        ]
        df_tmp.drop(columns=unnamed_columns_tmp, inplace=True)

        self.df = self.df[df_tmp.columns.values]
        #

        self.columns_map = {}
        for column_name in df_tmp.columns:
            self.df[column_name] = self.df[column_name].astype(str)
            self.columns_map.update({column_name: excel_colunm_format(column_name)})
            try:
                self.df[column_name] = pd.to_datetime(self.df[column_name]).dt.strftime(
                    "%Y-%m-%d"
                )
            except ValueError:
                try:
                    self.df[column_name] = pd.to_numeric(self.df[column_name])
                except ValueError:
                    try:
                        self.df[column_name] = self.df[column_name].astype(str)
                    except Exception:
                        print("Can't transform column: " + column_name)

        self.df = self.df.rename(columns=lambda x: x.strip().replace(" ", "_"))

        # 连接 DuckDB 数据库
        self.db = duckdb.connect(database=":memory:", read_only=False)

        self.table_name = "excel_data"
        # write data in duckdb
        self.db.register(self.table_name, self.df)

        # 获取结果并打印表结构信息
        result = self.db.execute(f"DESCRIBE {self.table_name}")
        columns = result.fetchall()
        for column in columns:
            print(column)

    def run(self, sql):
        try:
            if f'"{self.table_name}"' in sql:
                sql = sql.replace(f'"{self.table_name}"', self.table_name)
            sql = add_quotes_to_chinese_columns(sql)
            print(f"excute sql:{sql}")
            results = self.db.execute(sql)
            colunms = []
            for descrip in results.description:
                colunms.append(descrip[0])
            return colunms, results.fetchall()
        except Exception as e:
            logger.error(f"excel sql run error!, {str(e)}")
            raise ValueError(f"Data Query Exception!\\nSQL[{sql}].\\nError:{str(e)}")
	#通过 SQL 查询将结果转换为 DataFrame 返回
    def get_df_by_sql_ex(self, sql):
        colunms, values = self.run(sql)
        return pd.DataFrame(values, columns=colunms)

    def get_sample_data(self):
        return self.run(f"SELECT * FROM {self.table_name} LIMIT 5;")
  1. chat: BaseChat = await get_chat_instance(dialogue)获取了一个对话实例
    在这里插入图片描述
    在上传文档的时候是用的下面的提示词模板

D:\Code\Code_jupyter\DB-GPT-main\dbgpt\app\scene\chat_data\chat_excel\excel_analyze\prompt.py

_PROMPT_SCENE_DEFINE_ZH = """你是一个数据分析专家!"""
_DEFAULT_TEMPLATE_ZH = """
请使用历史对话中的数据结构信息,在满足下面约束条件下通过duckdb sql数据分析回答用户的问题。
约束条件:
	1.请充分理解用户的问题,使用duckdb sql的方式进行分析, 分析内容按下面要求的输出格式返回,sql请输出在对应的sql参数中
	2.请从如下给出的展示方式种选择最优的一种用以进行数据渲染,将类型名称放入返回要求格式的name参数值种,如果找不到最合适的则使用'Table'作为展示方式,可用数据展示方式如下: {display_type}
	3.SQL中需要使用的表名是: {table_name},请检查你生成的sql,不要使用没在数据结构中的列名
	4.优先使用数据分析的方式回答,如果用户问题不涉及数据分析内容,你可以按你的理解进行回答
	5.输出内容中sql部分转换为:<api-call><name>[数据显示方式]</name><args><sql>[正确的duckdb数据分析sql]</sql></args></api- call> 这样的格式,参考返回格式要求
	
请一步一步思考,给出回答,并确保你的回答内容格式如下:
    对用户说的想法摘要.<api-call><name>[数据展示方式]</name><args><sql>[正确的duckdb数据分析sql]</sql></args></api-call>

用户问题:{user_input}
"""

还有一个提示词是,在读取了文件内容之后,用另外一个提示词

里面data_example的内容是
def get_sample_data(self):
return self.run(f"SELECT * FROM {self.table_name} LIMIT 5;")
从duckdb取的样例数据

prompt
_DEFAULT_TEMPLATE_ZH = """
下面是用户文件{file_name}的一部分数据,请学习理解该数据的结构和内容,按要求输出解析结果:
    {data_example}
分析各列数据的含义和作用,并对专业术语进行简单明了的解释, 如果是时间类型请给出时间格式类似:yyyy-MM-dd HH:MM:ss.
将列名作为属性名,分析解释作为属性值,组成json数组,并输出在返回json内容的ColumnAnalysis属性中.
请不要修改或者翻译列名,确保和给出数据列名一致.
针对数据从不同维度提供一些有用的分析思路给用户。

请一步一步思考,确保只以JSON格式回答,具体格式如下:
    {response}
"""

_RESPONSE_FORMAT_SIMPLE_ZH = {
    "DataAnalysis": "数据内容分析总结",
    "ColumnAnalysis": [{"column name": "字段1介绍,专业术语解释(请尽量简单明了)"}],
    "AnalysisProgram": ["1.分析方案1", "2.分析方案2"],
}


[ModelMessage(role='system', content='你是一个数据分析专家. \n下面是用户文件故障记录表2.csv的一部分数据,请学习理解该数据的结构和内容,按要求输出解析结果:\n    [["\\u65f6\\u95f4", "\\u673a\\u7ec4", "\\u8bbe\\u5907", "\\u6545\\u969c\\u539f\\u56e0"], ["2024-09-23", "A\\u7ec4", "\\u53d1\\u7535\\u673a1", "\\u77ed\\u8def  "], ["2024-09-23", "B\\u7ec4", "\\u53d8\\u538b\\u56682", "\\u8fc7\\u70ed  "], ["2024-09-23", "C\\u7ec4", "\\u51b7\\u5374\\u7cfb\\u7edf", "\\u6f0f\\u6c34  "], ["2024-09-23", "A\\u7ec4", "\\u63a7\\u5236\\u677f", "\\u8f6f\\u4ef6\\u6545\\u969c  "], ["2024-09-23", "D\\u7ec4", "\\u7535\\u52a8\\u673a3", "\\u8f74\\u627f\\u635f\\u574f  "]]\n分析各列数据的含义和作用,并对专业术语进行简单明了的解释, 如果是时间类型请给出时间格式类似:yyyy-MM-dd HH:MM:ss.\n将列名作为属性名,分析解释作为属性值,组成json数组,并输出在返回json内容的ColumnAnalysis属性中.\n请不要修改或者翻译列名,确保和给出数据列名一致.\n针对数据从不同维度提供一些有用的分析思路给用户。\n\n请一步一步思考,确保只以JSON格式回答,具体格式如下:\n    "{\\n    \\"DataAnalysis\\": \\"数据内容分析总结\\",\\n    \\"ColumnAnalysis\\": [\\n        {\\n            \\"column name\\": \\"字段1介绍,专业术语解释(请尽量简单明了)\\"\\n        }\\n    ],\\n    \\"AnalysisProgram\\": [\\n        \\"1.分析方案1\\",\\n        \\"2.分析方案2\\"\\n    ]\\n}"\n', round_index=0), ModelMessage(role='human', content='故障记录表2.csv', round_index=0)]

然后交给模型处理

D:\Code\Code_jupyter\DB-GPT-main\dbgpt\app\scene\base_chat.py

async def _no_streaming_call_with_retry(self, payload):
        with root_tracer.start_span("BaseChat.invoke_worker_manager.generate"):
            model_output = await self.call_llm_operator(payload)

        ai_response_text = self.prompt_template.output_parser.parse_model_nostream_resp(
            model_output, self.prompt_template.sep
        )
        prompt_define_response = (
            self.prompt_template.output_parser.parse_prompt_response(ai_response_text)
        )
        metadata = {
            "model_output": model_output.to_dict(),
            "ai_response_text": ai_response_text,
            "prompt_define_response": self._parse_prompt_define_response(
                prompt_define_response
            ),
        }
        with root_tracer.start_span("BaseChat.do_action", metadata=metadata):
            result = await blocking_func_to_async(
                self._executor, self.do_action, prompt_define_response
            )

        speak_to_user = self.get_llm_speak(prompt_define_response)

        view_message = await blocking_func_to_async(
            self._executor,
            self.prompt_template.output_parser.parse_view_response,
            speak_to_user,
            result,
            prompt_define_response,
        )
        return ai_response_text, view_message.replace("\n", "\\n")

在view_message中又做了格式处理

D:\Code\Code_jupyter\DB-GPT-main\dbgpt\app\scene\chat_data\chat_excel\excel_learning\out_parser.py

def parse_view_response(self, speak, data, prompt_response) -> str:
        if data and not isinstance(data, str):
            ### tool out data to table view
            html_title = f"### **Data Summary**\n{data.desciption} "
            html_colunms = self.__build_colunms_html(data.clounms)
            html_plans = self.__build_plans_html(data.plans)

            html = f"""{html_title}\n{html_colunms}\n{html_plans}"""
            return html
        else:
            return speak
  解析完这个html的结果是

html解析结果

### **Data Summary**
这是一个关于用户设备故障的记录表,包含了故障发生的时间、涉及的机群、具体的设备以及故障的原由。 
### **Data Structure**
- **1.[column name]**   _时间(Time)_
- **1.[description]**   _记录了故障发生的日期,格式为年--日(yyyy-MM-dd)_
- **2.[column name]**   _机群(Cluster)_
- **2.[description]**   _故障发生的设备所属的机群,例如A组、B组等_
- **3.[column name]**   _设备(Equipment)_
- **3.[description]**   _发生故障的具体设备,比如发电机组编号、变压器编号等_
- **4.[column name]**   _故障原因(Cause)_
- **4.[description]**   _导致设备故障的具体原因,例如短路、过热、漏水、软件故障、机械故障等_

### **Analysis plans**
1. 分析故障趋势:通过时间(Time)查看故障发生的季节性、周期性规律,以预测可能的故障高发时段。 
2. 机群故障分析:统计各机群(Cluster)的故障频率,找出故障率较高的机群,进行重点维护。 
3. 设备故障分析:按设备(Equipment)类型分析故障频率,识别出故障频发的设备,评估设备可靠性。 
4. 故障原因分类:对故障原因(Cause)进行统计,识别主要故障类型,优化设备设计或改进维护流程。 
5. 故障预防模型:结合历史数据,建立机器学习模型,预测未来可能出现的故障,提前进行预防措施。 
6. 故障响应时间分析:计算从故障发生到记录的时间,评估故障报告的及时性,优化故障响应流程 

最后呈现在页面上的内容
在这里插入图片描述

对话

使用的提示词模板

_DEFAULT_TEMPLATE_ZH = """
请使用历史对话中的数据结构信息,在满足下面约束条件下通过duckdb sql数据分析回答用户的问题。
约束条件:
	1.请充分理解用户的问题,使用duckdb sql的方式进行分析, 分析内容按下面要求的输出格式返回,sql请输出在对应的sql参数中
	2.请从如下给出的展示方式种选择最优的一种用以进行数据渲染,将类型名称放入返回要求格式的name参数值种,如果找不到最合适的则使用'Table'作为展示方式,可用数据展示方式如下: {display_type}
	3.SQL中需要使用的表名是: {table_name},请检查你生成的sql,不要使用没在数据结构中的列名
	4.优先使用数据分析的方式回答,如果用户问题不涉及数据分析内容,你可以按你的理解进行回答
	5.输出内容中sql部分转换为:<api-call><name>[数据显示方式]</name><args><sql>[正确的duckdb数据分析sql]</sql></args></api- call> 这样的格式,参考返回格式要求
	
请一步一步思考,给出回答,并确保你的回答内容格式如下:
    对用户说的想法摘要.<api-call><name>[数据展示方式]</name><args><sql>[正确的duckdb数据分析sql]</sql></args></api-call>

用户问题:{user_input}
"""

role='system' content="""你是一个数据分析专家!
请使用历史对话中的数据结构信息,
在满足下面约束条件下通过duckdb sql数据分析回答用户的问题。
约束条件:\n\t1.
请充分理解用户的问题,使用duckdb sql的方式进行分析, 分析内容按下面要求的输出格式返回,
sql请输出在对应的sql参数中
\t2.请从如下给出的展示方式种选择最优的一种用以进行数据渲染,
将类型名称放入返回要求格式的name参数值种,如果找不到最合适的则使用'Table'作为展示方式,
可用数据展示方式如下: response_line_chart:used to display comparative trend analysis data
response_pie_chart:suitable for scenarios such as proportion and distribution tatistics
response_table:
suitable for display with many display columns or non-numeric columns
response_scatter_plot:Suitable for exploring relationships between variables, detecting outliers, etc.
response_bubble_chart:Suitable for relationships between multiple variables, 
highlighting outliers or special situations, etc.
response_donut_chart:Suitable for hierarchical structure representation, category proportion display and highlighting key categories, etc.
response_area_chart:Suitable for visualization of time series data, comparison of multiple groups of data, analysis of data change trends, etc.
response_heatmap:Suitable for visual analysis of time series data, large-scale data sets, distribution of classified data, etc.
\t3.SQL中需要使用的表名是: excel_data,请检查你生成的sql,不要使用没在数据结构中的列名
\t4.优先使用数据分析的方式回答,如果用户问题不涉及数据分析内容,你可以按你的理解进行回答
\t5.输出内容中sql部分转换为:<api-call><name>[数据显示方式]</name><args><sql>[正确的duckdb数据分析sql]</sql></args></api- call> 这样的格式,参考返回格式要求

请一步一步思考,给出回答,并确保你的回答内容格式如下:
对用户说的想法摘要.<api-call><name>[数据展示方式]</name><args><sql>[正确的duckdb数据分析sql]</sql></args></api-call>\n\n用户问题:帮我查询断路器发生了什么故障\n""" round_index=0

D:\Code\Code_jupyter\DB-GPT-main\dbgpt\app\openapi\api_v1\api_v1.py

直接进对话框
@router.post("/v1/chat/completions")
async def chat_completions(
    dialogue: ConversationVo = Body(),
    flow_service: FlowService = Depends(get_chat_flow),
):

    #这里是选择的进行流式输出
return StreamingResponse(
                stream_generator(chat, dialogue.incremental, dialogue.model_name),
                headers=headers,
                media_type="text/plain",
            )    
async def stream_generator(chat, incremental: bool, model_name: str):
    """Generate streaming responses

    Our goal is to generate an openai-compatible streaming responses.
    Currently, the incremental response is compatible, and the full response will be transformed in the future.

    Args:
        chat (BaseChat): Chat instance.
        incremental (bool): Used to control whether the content is returned incrementally or in full each time.
        model_name (str): The model name

    Yields:
        _type_: streaming responses
    """
    span = root_tracer.start_span("stream_generator")
    msg = "[LLM_ERROR]: llm server has no output, maybe your prompt template is wrong."

    previous_response = ""
    async for chunk in chat.stream_call():
        if chunk:
            msg = chunk.replace("\ufffd", "")
            if incremental:
                incremental_output = msg[len(previous_response) :]
                choice_data = ChatCompletionResponseStreamChoice(
                    index=0,
                    delta=DeltaMessage(role="assistant", content=incremental_output),
                )
                chunk = ChatCompletionStreamResponse(
                    id=chat.chat_session_id, choices=[choice_data], model=model_name
                )
                json_chunk = model_to_json(
                    chunk, exclude_unset=True, ensure_ascii=False
                )
                yield f"data: {json_chunk}\n\n"
            else:
                # TODO generate an openai-compatible streaming responses
                msg = msg.replace("\n", "\\n")
                yield f"data:{msg}\n\n"
            previous_response = msg
            await asyncio.sleep(0.02)
    if incremental:
        yield "data: [DONE]\n\n"
    span.end()

进行流式回答

D:\Code\Code_jupyter\DB-GPT-main\dbgpt\app\scene\base_chat.py

async for chunk in chat.stream_call():

    async def stream_call(self):
        # TODO Retry when server connection error
        payload = await self._build_model_request()

        logger.info(f"payload request: \n{payload}")
        ai_response_text = ""
        span = root_tracer.start_span(
            "BaseChat.stream_call", metadata=payload.to_dict()
        )
        payload.span_id = span.span_id
        try:
            async for output in self.call_streaming_operator(payload):
                # Plugin research in result generation
                msg = self.prompt_template.output_parser.parse_model_stream_resp_ex(
                    output, 0
                )
#模型的输出为
"""
msg = "为了找出断路器发生了哪些故障,我们需要从数据中筛选出所有涉及“断路器”的故障记录,并查看这些记录中的故障原因。
<api-call><name>response_table</name><args><sql>SELECT 故障原因, COUNT(*) AS 故障次数 FROM excel_data WHERE 设备 LIKE '%断路器%' GROUP BY 故障原因</sql></args></api-call>"
"""

				 # 给出答案
				view_msg = self.stream_plugin_call(msg)
"""
在经过处理之后

view_msg = 
为了找出断路器发生了哪些故障,我们需要从数据中筛选出所有涉及“断路器”的故障记录,并查看这些记录中的故障原因。  <chart-view content="{&quot;type&quot;: &quot;response_table&quot;, &quot;sql&quot;: &quot;SELECT 故障原因, COUNT(*) AS 故障次数 FROM excel_data WHERE 设备 LIKE '%断路器%' GROUP BY 故障原因&quot;, &quot;data&quot;: [{&quot;故障原因&quot;: &quot;跳闸  &quot;, &quot;故障次数&quot;: 1}]}">\n</chart-view>
"""
                view_msg = view_msg.replace("\n", "\\n")
                yield view_msg
            self.current_message.add_ai_message(msg)
            view_msg = self.stream_call_reinforce_fn(view_msg)
            self.current_message.add_view_message(view_msg)
            span.end()
        except Exception as e:
            print(traceback.format_exc())
            logger.error("model response parse failed!" + str(e))
            self.current_message.add_view_message(
                f"""<span style=\"color:red\">ERROR!</span>{str(e)}\n  {ai_response_text} """
            )
            ### store current conversation
            span.end(metadata={"error": str(e)})
        await blocking_func_to_async(
            self._executor, self.current_message.end_current_round
        )

D:\Code\Code_jupyter\DB-GPT-main\dbgpt\app\scene\chat_data\chat_excel\excel_analyze\chat.py
    def stream_plugin_call(self, text):

        text = (
            text.replace("\\n", " ")
            .replace("\n", " ")
            .replace("\_", "_")
            .replace("\\", " ")
        )
        with root_tracer.start_span(
            "ChatExcel.stream_plugin_call.run_display_sql", metadata={"text": text}
        ):
            res = self.api_call.display_sql_llmvis(
                text, self.excel_reader.get_df_by_sql_ex
            )
            return res

"""
res = self.api_call.display_sql_llmvis(
                text, self.excel_reader.get_df_by_sql_ex
            )
"""
这里对于display_sql_llmvis方法的一个说明
\Code\Code_jupyter\DB-GPT-main\dbgpt\agent\util\api_call.py
def display_sql_llmvis(self, llm_text, sql_run_func):
        """Render charts using the Antv standard protocol.

        Args:
            llm_text: LLM response text
            sql_run_func: sql run  function

        Returns:
           ChartView protocol text
        """
        try:
            if self._is_need_wait_plugin_call(
                llm_text
            ) and self.check_last_plugin_call_ready(llm_text):
                # wait api call generate complete
                self.update_from_context(llm_text)
                for key, value in self.plugin_status_map.items():
                    if value.status == Status.TODO.value:
                        value.status = Status.RUNNING.value
                        logger.info(f"SQL execution:{value.name},{value.args}")
                        try:
                            sql = value.args["sql"]
                            if sql is not None and len(sql) > 0:
                                data_df = sql_run_func(sql)
                                value.df = data_df
                                value.api_result = json.loads(
                                    data_df.to_json(
                                        orient="records",
                                        date_format="iso",
                                        date_unit="s",
                                    )
                                )
                                value.status = Status.COMPLETE.value
                            else:
                                value.status = Status.FAILED.value
                                value.err_msg = "No executable sql!"

                        except Exception as e:
                            logger.error(f"data prepare exception!{str(e)}")
                            value.status = Status.FAILED.value
                            value.err_msg = str(e)
                        value.end_time = datetime.now().timestamp() * 1000
        except Exception as e:
            logger.error("Api parsing exception", e)
            raise ValueError("Api parsing exception," + str(e))

        return self.api_view_context(llm_text, True)  
可以看到在代码中
self.plugin_status_map.items() = 
    {"<api-call><name>response_table</name><args><sql>SELECT 故障原因 FROM excel_data WHERE 设备 LIKE '%断路器%'</sql></args></api-call>": PluginStatus(name='response_table', location=[57], args={'args': None, 'sql': "SELECT 故障原因 FROM excel_data WHERE 设备 LIKE '%断路器%'"}, status='todo', logo_url=None, api_result=None, err_msg=None, start_time=1727155331909.641, end_time=None, df=None)}
    """
这块用于提取sql语句
上一篇:windows环境下使用socket进行tcp通信


下一篇:【JS】在 Node.js 和 Electron 中获取设备 UUID 的最佳实践