首先是调用接口,需要调用的接口ACCESS_KEY和SECRET_KEY
# coding=utf-8
import os
import qianfan
def gpt(question):
with open("QIANFAN_ACCESS_KEY",'r',encoding='utf-8') as f:
QIANFAN_ACCESS_KEY = f.read()
with open("QIANFAN_SECRET_KEY","r",encoding="utf-8") as f:
QIANFAN_SECRET_KEY = f.read()
os.environ["QIANFAN_ACCESS_KEY"] = QIANFAN_ACCESS_KEY # 公钥
os.environ["QIANFAN_SECRET_KEY"] = QIANFAN_SECRET_KEY # 私钥
chat_robot = qianfan.ChatCompletion()
resp = chat_robot.do(
messages=[{
"role":"user",
"content":question
}]
)
return resp.body['result']
print(gpt('你好'))
然后客户端.py
import wx
from socket import *
import threading
from faker import Faker
class Client(wx.Frame):
# 构造方法
def __init__(self):
# 实例属性
self.name = Faker('zh_CN').name() # 客户端的名字
self.isConnected = False # 客户端是否连接服务器
self.client_socket = None
# 界面布局
wx.Frame.__init__(self,None,title=self.name+"智能问答聊天室客户端",size=(450,660),pos=(100,50))
# 创建面板
self.pl = wx.Panel(self)
# 创建按钮
# 加入聊天室
self.conn_btn = wx.Button(self.pl,label="加入聊天室",pos=(10,10),size=(200,40))
# 离开聊天室
self.dis_conn_btn = wx.Button(self.pl, label="离开聊天室", pos=(220, 10), size=(200, 40))
# 清空按钮
self.clear_btn = wx.Button(self.pl, label="清空", pos=(10, 580), size=(200, 40))
# 发送按钮
self.send_btn = wx.Button(self.pl, label="发送", pos=(220, 580), size=(200, 40))
# 创建聊天内容文本框
self.text = wx.TextCtrl(self.pl,size=(400,400),pos=(10,60),style=wx.TE_READONLY|wx.TE_MULTILINE)
# 创建输入文本框
self.input_text = wx.TextCtrl(self.pl,size=(400,100),pos=(10,470),style=wx.TE_MULTILINE)
# 按钮的事件绑定
self.Bind(wx.EVT_BUTTON,self.clear,self.clear_btn)
self.Bind(wx.EVT_BUTTON, self.conn, self.conn_btn)
self.Bind(wx.EVT_BUTTON, self.dis_conn, self.dis_conn_btn)
self.Bind(wx.EVT_BUTTON, self.send, self.send_btn)
# 点击 加入聊天室 按钮 触发
def conn(self,event):
print('conn方法')
if self.isConnected==False:
self.isConnected=True
self.client_socket=socket()
self.client_socket.connect(('127.0.0.1',8998))
# 发送用户名
self.client_socket.send(self.name.encode('utf8'))
main_thread = threading.Thread(target=self.recv_data)
main_thread.daemon=True
main_thread.start()
def recv_data(self):
print(22222222222)
while self.isConnected:
print(111111111)
text = self.client_socket.recv(1024).decode('utf8')
print(text)
self.text.AppendText(text+'\n')
# 点击 离开聊天室 按钮 触发
def dis_conn(self,event):
print('dis_conn方法')
self.client_socket.send('断开连接'.encode('utf8'))
self.isConnected=False
# 点击 清空 按钮 触发
def clear(self, event):
print('clear方法')
self.input_text.Clear()
# 点击 发送 按钮 触发
def send(self, event):
print('send方法')
if self.isConnected:
text = self.input_text.GetValue()
if text !='':
self.client_socket.send(text.encode('utf8'))
self.input_text.Clear()
# 程序入口
if __name__ =='__main__':
# 创建应用程序对象
app = wx.App()
# 创建客户端窗口
client = Client()
# 显示客户端窗口
client.Show()
# 一直循环显示
app.MainLoop()
最后是服务器.py
import wx
from socket import *
import threading
from concurrent.futures import ThreadPoolExecutor
from 千帆模型 import gpt
class Server(wx.Frame):
def __init__(self):
# 实例属性
self.isOn = False # 服务器的启动状态
# 创建socket对象
self.server_socket = socket()
# 绑定ip和端口号
self.server_socket.bind(('0.0.0.0',8998))
# 监听
self.server_socket.listen(5)
# 保存所有的客户端
self.client_thread_dict={}
# 创建线程池
self.pool = ThreadPoolExecutor(max_workers=10)
# 界面布局
# 调用父类的init方法
wx.Frame.__init__(self,None,title='智能问答聊天室',pos=(0,50),size=(450,600))
# 创建面板
self.pl = wx.Panel(self)
# 创建按钮
# 启动服务器
self.start_server_btn = wx.Button(self.pl,pos=(10,10),size=(200,40),label='启动服务器')
# 保存聊天记录
self.save_text_btn = wx.Button(self.pl, pos=(220, 10), size=(200, 40), label='保存聊天记录')
# 创建聊天内容文本框
self.text = wx.TextCtrl(self.pl, size=(400, 400), pos=(10, 60), style=wx.TE_READONLY | wx.TE_MULTILINE)
# 给按钮绑定事件
self.Bind(wx.EVT_BUTTON,self.start_server,self.start_server_btn)
self.Bind(wx.EVT_BUTTON, self.save_text, self.save_text_btn)
# 启动服务器
def start_server(self,event):
print('start server')
if self.isOn==False:
self.isOn=True
# 创建线程
main_thread = threading.Thread(target=self.main_thread_fun)
# 设置为守护线程
main_thread.daemon = True
# 启动线程
main_thread.start()
def main_thread_fun(self):
while self.isOn:
client_socket,client_addr=self.server_socket.accept()
print(client_addr)
client_name = client_socket.recv(1024).decode('utf8')
print(client_name)
client_thread = ClientThead(client_socket,client_name,self)
# 保存客户端
self.client_thread_dict[client_name] = client_thread
self.pool.submit(client_thread.run)
# self.send("【服务器通知】欢迎%s进入聊天室"%client_name)
def send(self,text):
self.text.AppendText(text+'\n')
for client in self.client_thread_dict.values():
if client.isOn:
print(text)
result = gpt(text)
print(result)
client.client_socket.send(result.encode('utf8'))
self.text.AppendText(result + '\n')
# 保存聊天记录
def save_text(self,event):
print('save text')
record = self.text.GetValue()
with open('record.log',"a+",encoding='GBK') as f:
f.write(record)
class ClientThead(threading.Thread):
def __init__(self,socket,name,server):
threading.Thread.__init__(self)
self.client_socket = socket
self.client_name = name
self.server = server
self.isOn = True
def run(self):
while self.isOn:
text = self.client_socket.recv(1024).decode('utf8')
if text =='断开连接':
self.isOn = False
# self.server.send('【服务器消息】%s离开了聊天室'%self.client_name)
else:
self.server.send('%s'%(text))
self.client_socket.close()
# 程序入口
if __name__=="__main__":
# 创建应用程序对象
app = wx.App()
# 创建服务器窗口
server = Server()
# 显示服务器窗口
server.Show()
# 循环显示
app.MainLoop()