//队列
public class FQueue : IDisposable
{
EventWaitHandle _wh = new AutoResetEvent(false);
Thread _worker;
readonly object _locker = new object(); //这个锁,是用来同步读和写的;集合的读和写的;
Queue<string> _tasks = new Queue<string>(); //一旦队列有了消息就要通知线程去消费;
/// <summary>
/// 运行队列线程
/// </summary>
public FQueue()
{
_worker = new Thread(Work);
_worker.Start();
}
/// <summary>
/// 往线程中添加任务;一旦有了任务,就通知消费者;
/// </summary>
/// <param name="task"></param>
public bool EnqueueTask(string task)
{
bool isOK = false;
lock (_locker)
{
_tasks.Enqueue(task);
isOK = true;
}
_wh.Set();
return isOK;
}
public void Work()
{
NBService nBService = new NBService(); //数据处理类 可以自己自定义存储
string task;
int listIndex = 0;
DateTime dateTime;
while (true)
{
try
{
task = null;
if (_tasks.Count > 0) //队列中有数据,我们就从中取出数据;
{
lock (_locker) //这个锁是用来同步读和写的;
{
task = _tasks.Dequeue();// 取出任务;
}
dateTime = DateTime.Now;
Console.WriteLine(" 出第" + listIndex + "数据,剩余:" + _tasks.Count);
if (task != null)
{
//取出数据后执行处理任务;
nBService.Running(task); //这个方法将数据存入数据库
Console.WriteLine("处理第" + listIndex + "完毕,剩余:" + _tasks.Count + ",用时:" + (DateTime.Now - dateTime).TotalMilliseconds);
}
listIndex++;
}
else
{
_wh.WaitOne(); //队列为空就等待
}
}
catch (Exception ex)
{
Log.Instance.Error(DateTime.Now + " FQueue.Work error:" + ex.Message);
}
}
}
public void Dispose()
{
EnqueueTask(null);
_worker.Join();
_wh.Close();
}
}
//调用
FQueue _nbQueue = new FQueue();
HttpListener listerner = new HttpListener
{
AuthenticationSchemes = AuthenticationSchemes.Anonymous//指定身份验证 Anonymous匿名访问
};
try
{
listerner.Prefixes.Add(ConstValue.Instance.NBHttpURL);
listerner.Start();
}
catch (Exception ex)
{
Console.WriteLine("NBHttpService服务启动失败..." + ex.Message);
}
while (true)
{
try
{
//等待请求连接
//没有请求则GetContext处于阻塞状态
HttpListenerContext ctx = listerner.GetContext();
ctx.Response.StatusCode = 200;//设置返回给客户端http状态代码
//接收POST参数
Stream stream = ctx.Request.InputStream;
StreamReader reader = new StreamReader(stream, Encoding.UTF8);
string data = reader.ReadToEnd();
//Console.WriteLine("收到POST数据:DeviceDataChanged" + (data));
bool addState = _nbQueue.EnqueueTask(data); ; //存入一条任务
//使用Writer输出http响应代码,UTF8格式
using (StreamWriter writer = new StreamWriter(ctx.Response.OutputStream, Encoding.UTF8))
{
writer.Write(addState.ToString().ToLower());
writer.Close();
ctx.Response.Close();
}
}
catch (Exception ex)
{
Log.Instance.Error(" CaiotCA.Service.NBHttpService.GetNBData 异常..." + ex.Message);
}
}