由于之前一直遇到一些关于并行进行数据处理的时效果往往不好,不管是c#还是java程序都是一样,但是在Python中通过multiprocess实现同样的功能时,却发现确实可以提高程序运行的性能,及服务器资源使用提高。python具体性能及multiprocess用法,请参考:《Python:使用pymssql批量插入csv文件到数据库测试》
如有转载请标明原文地址:http://www.cnblogs.com/yy3b2007com/p/7228337.html
很久之前就设想如何在c#中实现多进程的方案,如今终于设计出了C#中动态实现多进程的方案:
具体代码:
using Microsoft.CSharp;
using System;
using System.CodeDom;
using System.CodeDom.Compiler;
using System.Collections.Generic;
using System.Linq;
using System.Reflection;
using System.Text; namespace MutilProcessDemo
{
public class Program
{
static System.Timers.Timer timer = null; public static void Main(string[] args)
{
GenerateExe(); timer = new System.Timers.Timer();
timer.AutoReset = true;
timer.Enabled = true;
timer.Interval = ;
timer.Elapsed += Timer_Elapsed; Console.WriteLine("Press Enter key to exit...");
Console.ReadKey();
} private static void DynamicCompiler()
{
// 1.CSharpCodePrivoder
CSharpCodeProvider objCSharpCodePrivoder = new CSharpCodeProvider(); // 2.ICodeComplier
ICodeCompiler objICodeCompiler = objCSharpCodePrivoder.CreateCompiler(); // 3.CompilerParameters
CompilerParameters objCompilerParameters = new CompilerParameters();
objCompilerParameters.ReferencedAssemblies.Add("System.dll");
objCompilerParameters.GenerateExecutable = false;
objCompilerParameters.GenerateInMemory = true; // 4.CompilerResults
CompilerResults cr = objICodeCompiler.CompileAssemblyFromSource(objCompilerParameters, GenerateCode()); if (cr.Errors.HasErrors)
{
Console.WriteLine("编译错误:");
foreach (CompilerError err in cr.Errors)
{
Console.WriteLine(err.ErrorText);
}
}
else
{
// 通过反射,调用HelloWorld的实例
Assembly objAssembly = cr.CompiledAssembly;
object objHelloWorld = objAssembly.CreateInstance("DynamicCodeGenerate.HelloWorld");
MethodInfo objMI = objHelloWorld.GetType().GetMethod("OutPut"); Console.WriteLine(objMI.Invoke(objHelloWorld, null));
} Console.ReadLine();
} static string GenerateCode()
{
StringBuilder sb = new StringBuilder();
sb.Append("using System;");
sb.Append(Environment.NewLine);
sb.Append("namespace DynamicCodeGenerate");
sb.Append(Environment.NewLine);
sb.Append("{");
sb.Append(Environment.NewLine);
sb.Append(" public class HelloWorld");
sb.Append(Environment.NewLine);
sb.Append(" {");
sb.Append(Environment.NewLine);
sb.Append(" public string OutPut()");
sb.Append(Environment.NewLine);
sb.Append(" {");
sb.Append(Environment.NewLine);
sb.Append(" return \"Hello world!\";");
sb.Append(Environment.NewLine);
sb.Append(" }");
sb.Append(Environment.NewLine);
sb.Append(" }");
sb.Append(Environment.NewLine);
sb.Append("}"); string code = sb.ToString();
Console.WriteLine(code);
Console.WriteLine(); return code;
} private static void Timer_Elapsed(object sender, System.Timers.ElapsedEventArgs e)
{
System.Diagnostics.Process[] processes = System.Diagnostics.Process.GetProcessesByName("HelloWorld"); int maxProcessCount = ;
// 如果超过了最大允许任务数据不再触发新的进程任务
if (processes.Length >= maxProcessCount)
return; List<int> tasks = new List<int>();
Random random = new Random();
for (int i = ; i < maxProcessCount - processes.Length; i++)
{
tasks.Add(random.Next());
} System.Threading.Tasks.Task[] taskItems = new System.Threading.Tasks.Task[tasks.Count]; for (int i = ; i < tasks.Count; i++)
{
//TaskBase taskSubmit = new ParseMrToHdfsFileItemTask();
//taskItems[i] = System.Threading.Tasks.Task.Factory.StartNew(taskSubmit.Submit, new ParseMrToHdfsFileItemTaskArgument() { Task = tasks[i], ComputeNode = this.ComputeNode }, TaskCreationOptions.PreferFairness);
taskItems[i] = System.Threading.Tasks.Task.Factory.StartNew((object state) =>
{
System.Diagnostics.Process process = new System.Diagnostics.Process();
//System.Diagnostics.Process process = System.Diagnostics.Process.Start("HelloWorld.exe", state.ToString());
process.StartInfo = new System.Diagnostics.ProcessStartInfo();
process.StartInfo.CreateNoWindow = true;
process.StartInfo.UseShellExecute = false;
process.StartInfo.WindowStyle = System.Diagnostics.ProcessWindowStyle.Hidden;
process.StartInfo.Arguments = state.ToString();
process.StartInfo.FileName = "HelloWorld.exe";
process.Start();
process.WaitForExit();
process.Close();
process.Dispose();
}, tasks[i]);
} System.Threading.Tasks.Task.WaitAll(taskItems);
} private static void GenerateExe()
{
// 创建编译器对象
CSharpCodeProvider p = new CSharpCodeProvider();
ICodeCompiler cc = p.CreateCompiler(); // 设置编译参数
CompilerParameters options = new CompilerParameters();
options.ReferencedAssemblies.Add("System.dll");
options.ReferencedAssemblies.Add("MutilProcessDemo.exe");
options.GenerateExecutable = true;
options.OutputAssembly = "HelloWorld.exe"; //options.ReferencedAssemblies.Add("System.Windows.Forms.dll");
//options.EmbeddedResources.Add("Data.xml"); // 添加内置资源
//options.CompilerOptions += " /target:winexe";
//options.CompilerOptions += " /res:Resource1.res";
//options.CompilerOptions += " /win32icon:test.ico"; // 创建源码 // 1. 使用CodeDom创建源码
//CodeCompileUnit cu = new CodeCompileUnit();
//CodeNamespace Samples = new CodeNamespace("Samples");
//cu.Namespaces.Add(Samples);
//Samples.Imports.Add(new CodeNamespaceImport("System"));
//CodeTypeDeclaration Class1 = new CodeTypeDeclaration("Class1");
//Samples.Types.Add(Class1);
//CodeEntryPointMethod Start = new CodeEntryPointMethod();
//CodeMethodInvokeExpression cs1 = new CodeMethodInvokeExpression(
// new CodeTypeReferenceExpression("System.Console"), "WriteLine",
// new CodePrimitiveExpression("Hello World!")
// );
//Start.Statements.Add(new CodeExpressionStatement(cs1));
//Class1.Members.Add(Start); // 2. 直接指定源码字符串
string code = @"
using System;
using MutilProcessDemo; namespace Samples
{
public class Class1
{
static void Main(string[] args)
{
Console.WriteLine(""Hello, World!"");
MutilProcessDemo.Program.DoMethod(args);
Console.WriteLine(DateTime.Now.ToString());
}
}
}";
CodeSnippetCompileUnit codeSnippetCompileUnit = new CodeSnippetCompileUnit(code); // 开始编译
CompilerResults compilerResults = cc.CompileAssemblyFromDom(options, codeSnippetCompileUnit); // 显示编译信息
if (compilerResults.Errors.Count == )
Console.WriteLine("{0}compiled ok!", compilerResults.CompiledAssembly.Location);
else
{
Console.WriteLine("Complie Error:");
foreach (CompilerError error in compilerResults.Errors)
Console.WriteLine(" {0}", error);
}
} public static void DoMethod(string[] args)
{
System.Console.WriteLine("begin ..." + args[]); for (int i = ; i < int.Parse(args[]); i++)
{
System.Threading.Thread.Sleep();
} System.Console.WriteLine("end..." + args[]);
}
}
}
上边的程序运行之后会在\MutilProcessDemo\bin\Debug\下出现两个可执行的.exe:HelloWorld.exe、MutilProcessDemo.exe。
实战:
public class Program_ForInsertATU : BaseDao
{
static readonly BuildingsBO buildBO = new BuildingsBO();
static readonly FingerPrintDatabaseBO fingerPrintDatabaseBO = new FingerPrintDatabaseBO();
static readonly GridMappingBuildingBO gridMappingBuildingBO = new GridMappingBuildingBO();
static readonly SiteBO siteBO = new SiteBO();
static NLog.Logger logger = new NLog.LogFactory().GetCurrentClassLogger(); private static void GenerateExe()
{
// 创建编译器对象
CSharpCodeProvider p = new CSharpCodeProvider();
ICodeCompiler cc = p.CreateCompiler(); // 设置编译参数
CompilerParameters options = new CompilerParameters();
options.ReferencedAssemblies.Add("System.dll");
options.ReferencedAssemblies.Add("System.Data.Linq.dll");
options.ReferencedAssemblies.Add("System.Data.dll");
options.ReferencedAssemblies.Add("System.Core.dll"); options.ReferencedAssemblies.Add("Dx.Business.dll");
options.ReferencedAssemblies.Add("Dx.Model.dll");
options.ReferencedAssemblies.Add("Dx.Utility.dll");
options.ReferencedAssemblies.Add("Dx.Configuration.dll");
options.ReferencedAssemblies.Add("Dx.IDAL.dll");
options.ReferencedAssemblies.Add("Dx.DataAccess.dll");
options.ReferencedAssemblies.Add("Dx.Data.dll");
options.ReferencedAssemblies.Add("Dx.Framework.dll");
options.ReferencedAssemblies.Add("ICSharpCode.SharpZipLib.dll");
options.ReferencedAssemblies.Add("Microsoft.CSharp.dll"); options.EmbeddedResources.Add("HelloWorld.exe.config");
//options.CompilerOptions += " /target:winexe";
//options.CompilerOptions += " /res:Resource1.res";
//options.CompilerOptions += " /win32icon:test.ico"; options.GenerateExecutable = true;
options.OutputAssembly = "HelloWorld.exe"; // 2. 直接指定源码字符串
string code = @"
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.IO;
using System.Data;
using System.Data.SqlClient;
using System.Threading;
using System.Threading.Tasks; using Dx.Data;
using Dx.Business.Signal;
using Dx.Model.FingerprintDatabase;
using Dx.Business;
using Dx.Model;
using Dx.Utility;
using Dx.Configuration;
using Dx.Business.FingerprintDatabase.BO;
using Dx.Business.ATU.Parse;
using Dx.Framework.Utils;
using System.Text.RegularExpressions;
using ICSharpCode.SharpZipLib.GZip;
using System.Diagnostics;
using Dx.Business.Scheme;
using Dx.Business.ATU;
using Dx.Business.SiChuan;
using Dx.Model.SiChuan;
using System.CodeDom;
using System.CodeDom.Compiler;
using Microsoft.CSharp; namespace Samples
{
public class Class1
{
static void Main(string[] args)
{
ConfigurationManger.Init(ConfigurationType.WindowsService);
EnumsRegister.Register(); Dx.Business.SiChuan.GridInnerDoorAnalysis.GridInnerDoorAnalysis bo=new Dx.Business.SiChuan.GridInnerDoorAnalysis.GridInnerDoorAnalysis();
bo.ComputeGridInnerDoor(args[0]);
}
}
}";
CodeSnippetCompileUnit codeSnippetCompileUnit = new CodeSnippetCompileUnit(code); // 开始编译
CompilerResults compilerResults = cc.CompileAssemblyFromDom(options, codeSnippetCompileUnit); // 显示编译信息
if (compilerResults.Errors.Count == )
Console.WriteLine("{0} compiled ok!", compilerResults.CompiledAssembly.Location);
else
{
Console.WriteLine("Complie Error:");
foreach (CompilerError error in compilerResults.Errors)
Console.WriteLine(" {0}", error);
}
} public static void SubmitTask(object state)
{
List<string> ids = state as List<string>; System.Threading.Tasks.Task[] taskItems = new System.Threading.Tasks.Task[ids.Count]; for (int i = ; i < ids.Count; i++)
{
taskItems[i] = System.Threading.Tasks.Task.Factory.StartNew((object argument) =>
{
System.Diagnostics.Process process = new System.Diagnostics.Process(); process.StartInfo = new System.Diagnostics.ProcessStartInfo();
process.StartInfo.CreateNoWindow = true;
process.StartInfo.UseShellExecute = false;
process.StartInfo.WindowStyle = System.Diagnostics.ProcessWindowStyle.Hidden;
process.StartInfo.Arguments = argument.ToString();
process.StartInfo.FileName = "HelloWorld.exe"; process.Start(); //Console.WriteLine(process.ProcessName);
process.WaitForExit();
process.Close();
process.Dispose();
}, ids[i]);
} System.Threading.Tasks.Task.WaitAll(taskItems);
} public static void Main(string[] args)
{
ConfigurationManger.Init(ConfigurationType.WindowsService);
EnumsRegister.Register(); Console.WriteLine(DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss")); GenerateExe(); List<BuildingVector> allBuildingVectors = buildBO.GetAllBuildingsAsGridInnerDoorAnalysis(); // 将 指纹库数据拷贝到GridMappingBuildings
// gridMappingBuildingBO.CopyFingerPrinterDatabase(); List<string> tasks = new List<string>();
int take = ;
for (var i = ; i <= allBuildingVectors.Count / take; i++)
{
tasks.Add(string.Join(",", allBuildingVectors.Skip(i * take).Take(take).Select(m => m.BuildingID).ToArray()));
} int skipCount = ; while (true)
{
System.Diagnostics.Process[] processes2 = System.Diagnostics.Process.GetProcessesByName("HelloWorld"); int maxProcessCount = ;
// 如果超过了最大允许任务数据不再触发新的进程任务
if (processes2.Length >= maxProcessCount)
{
Thread.Sleep();
continue;
} // submit task
int submitTaskCount = maxProcessCount - processes2.Length;
List<string> submitTasks = tasks.Skip(skipCount).Take(submitTaskCount).ToList(); if(submitTasks.Count==){
break;
} System.Threading.ThreadPool.QueueUserWorkItem(SubmitTask, submitTasks);
Thread.Sleep();
skipCount += submitTaskCount; Console.WriteLine("complete task count:{0}*25,total task count:{1}*25", skipCount, tasks.Count);
} Console.WriteLine("Complete...");
Console.WriteLine(DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss"));
Console.ReadKey();
}
}
参考文章:
c#动态编译,程序集的动态创建(http://www.360doc.com/content/14/1014/11/5054188_416763069.shtml)