之前看wcf服务的时候看到wcf有支持管道通信协议,之前不知道,最近刚好有用到这个,这里写个简单实例
.net有已经封装好的pip通信的对象NamedPipeServerStream 和NamedPipeClientStream对象,底层应该还是调用C++实现的api实现的
对服务端和客户端做个简单的封装方便调用:
server:
public class PipServer:Log
{
public Action<string> ReceiveEvent;
NamedPipeServerStream m_pipServer;
AutoResetEvent monitor = new AutoResetEvent(false);
Thread m_thread;
bool run = true;
string servname; public PipServer(string name)
{
m_pipServer = new NamedPipeServerStream(name,PipeDirection.InOut, , PipeTransmissionMode.Byte, PipeOptions.Asynchronous);
servname = name;
}
public void Listen()
{
try
{
m_thread = new Thread(() =>
{
WaitConnect();
});
m_thread.Start();
}
catch (Exception ex)
{
P(ex, "[PipServer.WaitForConnect]");
}
}
void WaitConnect()
{ AsyncCallback callback = null;
callback = new AsyncCallback(ar =>
{
var pipeServer = (NamedPipeServerStream)ar.AsyncState;
pipeServer.EndWaitForConnection(ar);
Accept();
pipeServer.Disconnect();
m_pipServer.BeginWaitForConnection(callback, m_pipServer);
});
m_pipServer.BeginWaitForConnection(callback, m_pipServer);
} void Accept()
{
try
{ var res = Read();
if(!string.IsNullOrEmpty(res))
ReceiveEvent?.Invoke(res);
}
catch(Exception ex)
{
P(ex, "[PipServer.Accept]");
}
}
public bool Send(string msg)
{
try
{
var buf = Encoding.UTF8.GetBytes(msg);
if (m_pipServer.CanWrite)
{
m_pipServer.Write(buf, , buf.Length);
m_pipServer.Flush();
return true;
}
return false;
}
catch (Exception ex)
{
P(ex, "[PipServer.Send]");
return false;
} } public string Read()
{
try
{
if (m_pipServer.CanRead)
{
int count = ;
List<byte> data = new List<byte>();
byte[] buf = new byte[];
do
{
count=m_pipServer.Read(buf, , buf.Length);
if (count == buf.Length)
{
data.AddRange(buf);
}
else
{
var dst = new byte[count];
Buffer.BlockCopy(buf, , dst, , count);
data.AddRange(dst);
}
} while (count > &&m_pipServer.CanRead);
var res = Encoding.UTF8.GetString(data.ToArray());
return res;
}
return null; }
catch (Exception ex)
{
P(ex, "[PipServer.Read]");
return null;
}
} public void Close()
{
run = false;
m_thread.Join();
if (m_pipServer.IsConnected)
{
m_pipServer.Close();
} }
}
client:
public class PipClient:Log
{ string serv;
public PipClient(string server)
{
serv = server;
}
public bool Send(string msg)
{
try
{
var buf = Encoding.UTF8.GetBytes(msg);
NamedPipeClientStream pipclient = new NamedPipeClientStream(serv);
pipclient.Connect();
if (pipclient.CanWrite)
{
pipclient.Write(buf, , buf.Length);
pipclient.Flush();
pipclient.Close();
return true;
}
return false;
}
catch (Exception ex)
{
P(ex, "[PipClient.Send]");
return false;
}
}
}
log类写了一个简单日志打印类,集成下方便打印日志,可以直接去掉继承,吧日志打印去掉:
public class Log
{
public void L(string msg)
{
Console.WriteLine(msg);
}
public void L(string format, params string[] data)
{
Console.WriteLine(string.Format(format,data));
}
public void P(Exception ex, string format, params string[] data)
{
var msg = string.Format(format, data);
Console.WriteLine(string.Format("{0}:{1},{1}", msg, ex.Message, ex.StackTrace));
}
}
调用实例:
static void PipTest()
{
Thread thread = new Thread(() =>
{
PipServer pip = new PipServer("TEST_PIP");
pip.ReceiveEvent += s =>
{
w(string.Format("receive:{0}",s));
};
pip.Listen();
});
thread.Start(); bool send = true;
int count = ;
AutoResetEvent monitor = new AutoResetEvent(false);
Thread client = new Thread(() =>
{
PipClient ct = new PipClient("TEST_PIP");
while (send)
{
string msg = string.Format("这是第{0}条数据", count);
w(msg);
ct.Send(msg);
count++;
if (monitor.WaitOne())
{
break;
}
}
});
client.Start();
while (true)
{
var input = Console.ReadLine();
if (input == "q" || input == "Q")
{
send = false;
monitor.Set();
break;
}
}
}
运行时,是客户端向服务端每隔一秒发送一次数据
有几个要注意的点:
1 要注意编码方式,怎么编码就怎么解码,最好是要有固定编码,不要直接写string,因为如果是不同的语言和不同平台实现的类,可能default对应的编码方式是不一样的,这样会造成读取乱码
2 这里可以用streamreader来读取,但是不要用readend这种写法,如果发送方不及时调用close方法,这样写会一直卡住,调用flush也没用
3 这里初始化只传入了servername,实际底层的地址是\\\\.\\pipe\\TEST_PIP,调试的时候下个断点可以看到的,如果用C++写的话,直接调用API传入的地址就是全名,到C#这边会自动被解析
4 可以再传入的信息上做一些文章,加上ID,发送方和接收方,这样可以实现类似回调的功能,这个是支持双向通信的,这里只有单向
5 类库是支持同步和异步的,这里是异步的等待连接,同步的读取,但是貌似没有直接支持4.5await写法的方法,只有AsyncCallback的写法