Erlang聊天室

Erlang 聊天室

因为之前有使用过PyQt做过聊天室的功能,现在使用Erlang去重写聊天室,去了解这门语言,有兴趣的朋友可以写基于Qt客户端和Erlang的服务端去交互,本Demo统一使用Erlang去实现客户端。

所需知识

  • 网络编程中接受请求的套接字创建过程如下(四步记忆)
    • 第一步:调用socket函数
    • 第二步:调用bind函数分配IP地址和端口
    • 第三步:调用listen函数转为可接收请求状态
    • 第四步:调用accept函数受理了解请求

服务端监听Socket

  • 第一部分初始化ets表,可选参数可*选择
start_server() ->
	ets:new(id, [ordered_set, public, named_table, {write_concurrency, true}, {read_concurrency, true}]),
	case gen_tcp:listen(1234, [binary, {packet, 0}, {active, true}]) of
		{ok, ListenSocket} ->
			spawn(fun() -> client_connect(ListenSocket) end);
		{error, Reason} ->
			io:format("~p~n", [Reason])
	end.
  • 第二部分处理请求
    • 这部分要注意gen_tcp:accept()函数,这部分会处理本次请求才会去处理新的请求,即顺序处理,为了解决这个问题,使用了并发处理,每次连接都会开一个新进程去处理,这样就可以并发处理了。
client_connect(ListenSocket) ->
	case gen_tcp:accept(ListenSocket) of
		{ok, Socket} ->
			%% 进行验证,看是否是注册还是登录
			spawn(fun() -> client_connect(ListenSocket) end),
			loop(Socket);
		{error, Reason} ->
			io:format("~p~n", [Reason])
	end.
  • 第三部分While True循环
    • 这部分分别去处理客户端的发送信息,如注册,登录,退出,私聊,群聊。
loop(Socket) ->
	receive
		{tcp, Socket, Bin} ->
			[Id, Sign, PassWord, SendId, MessageInfos] = binary_to_term(Bin),
			if
				Sign =:= register_user ->
					Info = register_user(Id, PassWord, Socket),
					gen_tcp:send(Socket, term_to_binary(Info)),
					loop(Socket);
				Sign =:= login_user ->
					Info = login_user(Id, PassWord, Socket),
					gen_tcp:send(Socket, term_to_binary(Info)),
					loop(Socket);
				Sign =:= login_out ->
					Info = login_out(Id, Socket),
					gen_tcp:send(Socket, term_to_binary(Info)),
					loop(Socket);
				Sign =:= private_msg ->
					private_chat(SendId, Socket, MessageInfos),
					loop(Socket);
				Sign =:= group_msg ->
					group_chat(Socket, MessageInfos),
					loop(Socket);
				true ->
					io:format("error sign ~n"),
					loop(Socket)
			end;
		{tcp_closed, Socket} ->
			io:format("Server socket closed ~n")
	end.
  • 功能部分

    • 用户注册
    • 使用ets库的lookup函数去检查用户是否已经注册
    %% 用户注册
    register_user(Id, PassWord, Socket) ->
    	case ets:lookup(id, Id) of
    		[_Ok] ->
    			io:format("Account is fail ~n"),
    			"Account is exist ~n";
    		_ ->
    			ets:insert(id, {Id, PassWord, 0, Socket}),
    			"register successed ~n"
    	end.
    
    • 用户登录
    • 首先使用ets库的match_object函数的去检查用户登录情况,这里其实写详细点,就是有三种可能,用户账号不存在,密码错误,用户已经在线。同时如果登录成功,应该设置用户为在线状态和给用户分配socket。
    %% 用户登录
    login_user(Id, PassWord, Socket) ->
    	case ets:match_object(id, {Id, PassWord, 0, Socket}) of
    		[_Ok] ->
    			ets:update_element(id, Id, [{3, 1}, {4, Socket}]),
    			"login successed";
    		Reson ->
    			io:format("login is fail ~n ~p", [Reson]),
    			"Password error or Account is not exist ~n"
    	end.
    
    • 用户退出
    • 退出用户,首先判断用户已经在线,才能下线,同时把在线状态和socket设置为0。
    %% 退出用户
    login_out(Id, Socket) ->
    	%% 因为id对应唯一socket,所以不需要PassWord
    	case ets:match_object(id, {Id, '_', 1, Socket}) of
    		[_Ok] ->
    			ets:update_element(id, Id, [{3, 0}, {4, 0}]),
    			"login successed";
    		_ ->
    			io:format("out is fail ~n"),
    			"login is fail"
    	end.
    
    • 群聊
    • 首先判断发送用户是否在线,然后递归给在线用户发送信息。
    %% 群聊
    group_chat(Socket, MessageInfos) ->
    	case ets:match_object(id, {'_', '_', 1, Socket}) of
    		[{Id, _, _, _}] ->
    			Res = ets:match_object(id, {'_', '_', 1, '_'}),
    			case Res =:= [] of
    				true ->
    					io:format("no person online ~p ~n", [Res]);
    				_ ->
    					group_send_msg(Res, Id, MessageInfos)
    			end;
    		_ ->
    			io:format("group chat is fail ~n")
    	end.
    
    
    %% 群聊发送
    group_send_msg([], _Id, _MessageInfos) ->
    	next;
    group_send_msg([Info | Infos], Id, MessageInfos) ->
    	{_, _, _, Socket} = Info,
    	gen_tcp:send(Socket, term_to_binary("from: " ++ integer_to_list(Id) ++ "say: " ++ MessageInfos)),
    	group_send_msg(Infos, Id, MessageInfos).
    
    • 在线私聊
    • 由于没有设置字段去存取聊天记录,所以使用只有在线状态下的聊天。
    %% 在线私聊
    private_chat(SendId, Socket, MessageInfos) ->
    	case ets:match_object(id, {'_', '_', 1, Socket}) of
    		[{Id, _, _, _}] ->
    			Res = ets:match_object(id, {SendId, '_', 1, '_'}),
    			case Res =:= [] of
    				true ->
    					io:format("send person not online ~p ~n", [Res]);
    				_ ->
    					private_send_msg(Res, Id, MessageInfos)
    			end;
    		_ ->
    			io:format("private chat is fail ~n")
    	end.
    
    %% 私聊发送
    private_send_msg([Info], Id, MessageInfos) ->
    	{_, _, _, Socket} = Info,
    	gen_tcp:send(Socket, term_to_binary("from: " ++ integer_to_list(Id) ++ "say: " ++ MessageInfos)).
    

客户端实现

  • 客户端进程
  • 并把socket控制进程设置为loop的进程号
%客户端
start_client() ->
	{ok, Socket} = gen_tcp:connect("localhost", 1234, [binary, {packet, 0}]),  %连接服务器
	%新建一个进程负责接收消息
	Pid = spawn(fun() -> loop() end),
	gen_tcp:controlling_process(Socket, Pid),
	sendMsg(Socket).

loop() ->
	receive
		{tcp, _Socket, Bin} ->
			Res = binary_to_term(Bin),
			io:format("Message Info! ~p ~n", [Res]),
			loop();
		{tcp_closed, _Socket} ->
			io:format("Socket is closed! ~p ~n")
	end.
  • 客户端操作
  • 对应服务端的功能部分
sendMsg(Socket) ->
	S = io:get_line("select operation: "),
	{Sign, _Info} = string:to_integer(S),
	SendMsg = operation_message(Sign),
	gen_tcp:send(Socket, term_to_binary(SendMsg)),
	sendMsg(Socket).

%% 用户注册
operation_message(1) ->
	I = io:get_line("id: "),
	{Id, _Info} = string:to_integer(I),
	Password = io:get_line("register password: "),
	[Id, register_user, Password, 0, 0];
%% 用户登录
operation_message(2) ->
	I = io:get_line("id:"),
	Password = io:get_line("login password: "),
	{Id, _Info} = string:to_integer(I),
	[Id, login_user, Password, 0, 0];
%% 用户退出
operation_message(3) ->
	I = io:get_line("id: "),
	{Id, _Info} = string:to_integer(I),
	[Id, login_out, 0, 0, 0];
%% 私聊
operation_message(4) ->
	Sd = io:get_line("send_id: "),
	Msg = io:get_line("MsgInfo: "),
	{SendId, _Info} = string:to_integer(Sd),
	[0, private_msg, 0, SendId, Msg];
%% 群聊
operation_message(5) ->
	Msg = io:get_line("MsgInfo: "),
	[0, group_msg, 0, 0, Msg];
%% 无效操作
operation_message(_) ->
	Msg = io:format("invalid_operation ~n"),
	[0, invalid_operation, 0, 0, Msg].

完整源代码

  • 服务端
%%%-------------------------------------------------------------------
%%% @author Curry
%%% @copyright (C) 2021, <COMPANY>
%%% @doc
%%%
%%% @end
%%% Created : 08. 8月 2021 13:33
%%%-------------------------------------------------------------------
-module(chatserv).
-author("Curry").

%% API
-compile(export_all).
-import(ets, [insert_new/2]).

start_server() ->
	ets:new(id, [ordered_set, public, named_table, {write_concurrency, true}, {read_concurrency, true}]),
	case gen_tcp:listen(1234, [binary, {packet, 0}, {active, true}]) of
		{ok, ListenSocket} ->
			spawn(fun() -> client_connect(ListenSocket) end);
		{error, Reason} ->
			io:format("~p~n", [Reason])
	end.


client_connect(ListenSocket) ->
	case gen_tcp:accept(ListenSocket) of
		{ok, Socket} ->
			%% 进行验证,看是否是注册还是登录
			spawn(fun() -> client_connect(ListenSocket) end),
			loop(Socket);
		{error, Reason} ->
			io:format("~p~n", [Reason])
	end.

loop(Socket) ->
	receive
		{tcp, Socket, Bin} ->
			[Id, Sign, PassWord, SendId, MessageInfos] = binary_to_term(Bin),
			if
				Sign =:= register_user ->
					Info = register_user(Id, PassWord, Socket),
					gen_tcp:send(Socket, term_to_binary(Info)),
					loop(Socket);
				Sign =:= login_user ->
					Info = login_user(Id, PassWord, Socket),
					gen_tcp:send(Socket, term_to_binary(Info)),
					loop(Socket);
				Sign =:= login_out ->
					Info = login_out(Id, Socket),
					gen_tcp:send(Socket, term_to_binary(Info)),
					loop(Socket);
				Sign =:= private_msg ->
					private_chat(SendId, Socket, MessageInfos),
					loop(Socket);
				Sign =:= group_msg ->
					group_chat(Socket, MessageInfos),
					loop(Socket);
				true ->
					io:format("error sign ~n"),
					loop(Socket)
			end;
		{tcp_closed, Socket} ->
			io:format("Server socket closed ~n")
	end.

%% 用户注册
register_user(Id, PassWord, Socket) ->
	case ets:lookup(id, Id) of
		[_Ok] ->
			io:format("Account is fail ~n"),
			"Account is exist ~n";
		_ ->
			ets:insert(id, {Id, PassWord, 0, Socket}),
			"register successed ~n"
	end.

%% 用户登录
login_user(Id, PassWord, Socket) ->
	case ets:match_object(id, {Id, PassWord, 0, Socket}) of
		[_Ok] ->
			ets:update_element(id, Id, [{3, 1}, {4, Socket}]),
			"login successed";
		Reson ->
			io:format("login is fail ~n ~p", [Reson]),
			"Password error or Account is not exist ~n"
	end.

%% 退出用户
login_out(Id, Socket) ->
	%% 因为id对应唯一socket,所以不需要PassWord
	case ets:match_object(id, {Id, '_', 1, Socket}) of
		[_Ok] ->
			ets:update_element(id, Id, [{3, 0}, {4, 0}]),
			"login successed";
		_ ->
			io:format("out is fail ~n"),
			"login is fail"
	end.

%% 群聊
group_chat(Socket, MessageInfos) ->
	case ets:match_object(id, {'_', '_', 1, Socket}) of
		[{Id, _, _, _}] ->
			Res = ets:match_object(id, {'_', '_', 1, '_'}),
			case Res =:= [] of
				true ->
					io:format("no person online ~p ~n", [Res]);
				_ ->
					group_send_msg(Res, Id, MessageInfos)
			end;
		_ ->
			io:format("group chat is fail ~n")
	end.


%% 群聊发送
group_send_msg([], _Id, _MessageInfos) ->
	next;
group_send_msg([Info | Infos], Id, MessageInfos) ->
	{_, _, _, Socket} = Info,
	gen_tcp:send(Socket, term_to_binary("from: " ++ integer_to_list(Id) ++ "say: " ++ MessageInfos)),
	group_send_msg(Infos, Id, MessageInfos).

%% 在线私聊
private_chat(SendId, Socket, MessageInfos) ->
	XX = ets:match_object(id, {'_', '_', 1, Socket}),
	case ets:match_object(id, {'_', '_', 1, Socket}) of
		[{Id, _, _, _}] ->
			Res = ets:match_object(id, {SendId, '_', 1, '_'}),
			case Res =:= [] of
				true ->
					io:format("send person not online ~p ~n", [Res]);
				_ ->
					private_send_msg(Res, Id, MessageInfos)
			end;
		_ ->
			io:format("private chat is fail ~n")
	end.

%% 私聊发送
private_send_msg([Info], Id, MessageInfos) ->
	{_, _, _, Socket} = Info,
	gen_tcp:send(Socket, term_to_binary("from: " ++ integer_to_list(Id) ++ "say: " ++ MessageInfos)).
  • 客户端
%%%-------------------------------------------------------------------
%%% @author Curry
%%% @copyright (C) 2021, <COMPANY>
%%% @doc
%%%
%%% @end
%%% Created : 08. 8月 2021 14:03
%%%-------------------------------------------------------------------
-module(chatclient).
-author("Curry").

%% API
-compile(export_all).

%客户端
start_client() ->
	{ok, Socket} = gen_tcp:connect("localhost", 1234, [binary, {packet, 0}]),  %连接服务器
	%新建一个进程负责接收消息
	Pid = spawn(fun() -> loop() end),
	gen_tcp:controlling_process(Socket, Pid),
	sendMsg(Socket).

loop() ->
	receive
		{tcp, _Socket, Bin} ->
			Res = binary_to_term(Bin),
			io:format("Message Info! ~p ~n", [Res]),
			loop();
		{tcp_closed, _Socket} ->
			io:format("Socket is closed! ~p ~n")
	end.

sendMsg(Socket) ->
	S = io:get_line("select operation: "),
	{Sign, _Info} = string:to_integer(S),
	SendMsg = operation_message(Sign),
	gen_tcp:send(Socket, term_to_binary(SendMsg)),
	sendMsg(Socket).

%% 用户注册
operation_message(1) ->
	I = io:get_line("id: "),
	{Id, _Info} = string:to_integer(I),
	Password = io:get_line("register password: "),
	[Id, register_user, Password, 0, 0];
%% 用户登录
operation_message(2) ->
	I = io:get_line("id:"),
	Password = io:get_line("login password: "),
	{Id, _Info} = string:to_integer(I),
	[Id, login_user, Password, 0, 0];
%% 用户退出
operation_message(3) ->
	I = io:get_line("id: "),
	{Id, _Info} = string:to_integer(I),
	[Id, login_out, 0, 0, 0];
%% 私聊
operation_message(4) ->
	Sd = io:get_line("send_id: "),
	Msg = io:get_line("MsgInfo: "),
	{SendId, _Info} = string:to_integer(Sd),
	[0, private_msg, 0, SendId, Msg];
%% 群聊
operation_message(5) ->
	Msg = io:get_line("MsgInfo: "),
	[0, group_msg, 0, 0, Msg];
%% 无效操作
operation_message(_) ->
	Msg = io:format("invalid_operation ~n"),
	[0, invalid_operation, 0, 0, Msg].
  • Github源码链接
上一篇:运维技术-中间件-消息队列-RabbitMQ01


下一篇:win10安装rabbitmq流程