大家好
去年换工作时系统复习了一下.NET Core多线程相关专题,学习了一线码农老哥的《.NET 5多线程编程实战》课程,我将复习的知识进行了总结形成本专题。
理解异步的本质
时间片切换成本高!
CPU密集型操作:编码解码、图形计算、正则表达式等
IO密集型操作:和硬件打交道,和DB打交道等
线程太多的烦恼/代价:
新开Thread是有开销的(时间、空间)
GC回收会冻结所有线程寻找引用根(gcroot)
Thread会和网络驱动程序打交道(外网络地址)
ThreadPool中的WorkQueue任务(4000+)得不到处理
异步:async/await
程序有可能会卡死!
(3)C#如何使用异步?
workThread:
适用于CPU密集型,在WinDbg中标签为 ThreadPool Worker
IOThread:
适用于IO密集型,在WinDbg中标签为 ThreadPool Completion Port
namespace ConsoleApp3
{
class Program
{
static void Main(string[] args)
{
GetContentLengthAsync("http://cnblogs.com");
Console.WriteLine($"主线程:{Environment.CurrentManagedThreadId}, 准备退出!");
Console.ReadLine();
}
static async Task<int> GetContentLengthAsync(string url)
{
using (HttpClient client = new HttpClient())
{
var content = await client.GetStringAsync(url);
Console.WriteLine($"当前线程:{Environment.CurrentManagedThreadId}, content={content.Length}");
return content.Length;
}
}
}
}
初始化时将SafeHandle、ThreadPool与IO完成端口进行绑定(比如:FileStream在Init时)
(主线程)创建IO完成端口:CreateIoCompletionPort(IntPtr FileHandle, IntPtr ExistingCompletionPort, IntPtr CompletionKey, uint NumberOfConcurrentThreads)
(主线程)将消息塞到IO完成端口的Queue队列:PostQueuedCompletionStatus
(子线程)从IO完成端口的Queue队列中获取消息:GetQueuedCompletionStatus
public class IOCP
{
[DllImport("kernel32.dll", CharSet = CharSet.Auto, SetLastError = true)]
public static extern SafeFileHandle CreateIoCompletionPort(IntPtr FileHandle, IntPtr ExistingCompletionPort, IntPtr CompletionKey, uint NumberOfConcurrentThreads);
[DllImport("kernel32.dll", CharSet = CharSet.Auto, SetLastError = true)]
public static extern bool GetQueuedCompletionStatus(SafeFileHandle CompletionPort,
out uint lpNumberOfBytesTransferred, out IntPtr lpCompletionKey,
out IntPtr lpOverlapped, uint dwMilliseconds);
[DllImport("Kernel32", CharSet = CharSet.Auto)]
public static extern bool PostQueuedCompletionStatus(SafeFileHandle CompletionPort, uint dwNumberOfBytesTransferred, IntPtr dwCompletionKey, IntPtr lpOverlapped);
}
// 1. 创建IO完成端口
var safehandle = IOCP.CreateIoCompletionPort(new IntPtr(-1), IntPtr.Zero, IntPtr.Zero, 1);
var thread = new Thread(() =>
{
Console.WriteLine($"工作线程: {Environment.CurrentManagedThreadId} 开始获取数据...");
while (true)
{
// 3. get数据
IOCP.GetQueuedCompletionStatus(safehandle, out var ipn, out var ipc, out var lop, int.MaxValue);
var receiveData = Convert.ToString(GCHandle.FromIntPtr(lop).Target);
Console.WriteLine($"工作线程: {Environment.CurrentManagedThreadId} 获取数据成功!{receiveData}");
Thread.Sleep(1000);
}
});
thread.Start();
// 2. post 数据
var data = (IntPtr)GCHandle.Alloc("hello world");
IOCP.PostQueuedCompletionStatus(safehandle, 4096, IntPtr.Zero, data);
Console.WriteLine($"主线程: {Environment.CurrentManagedThreadId} 塞入数据成功!");
Console.ReadLine();
/// <summary>
/// continutewith 的版本
/// </summary>
/// <returns></returns>
static Task<List<string>> GetContentListContinute()
{
var list = new List<string>();
SqlConnection connection = new SqlConnection("Server=LocalHost; Persist Security Info=False;Integrated Security=SSPI;Database= PostDB;");
var task = connection.OpenAsync().ContinueWith(t =>
{
SqlCommand command = new SqlCommand("select PostContent from Post", connection);
return command.ExecuteReaderAsync().ContinueWith(t2 =>
{
var reader = t2.Result;
return GetContent(reader, list).ContinueWith(t3 =>
{
return list;
});
}).Unwrap();
}).Unwrap();
return task;
}
static Task<bool> GetContent(SqlDataReader reader, List<string> list)
{
return reader.ReadAsync().ContinueWith(t =>
{
var hasRow = t.Result;
if (hasRow)
{
list.Add(reader.GetString(0)); //读取reader的值
GetContent(reader, list);
}
return false;
});
}
/// <summary>
/// await+async 的异步写法
/// </summary>
/// <returns></returns>
static async Task<List<string>> GetContentListAsync()
{
List<string> list = new List<string>();
SqlConnection connection = new SqlConnection("Server=LocalHost; Persist Security Info=False;Integrated Security=SSPI;Database= PostDB;");
await connection.OpenAsync();
SqlCommand command = new SqlCommand("select PostContent from Post", connection);
var reader = command.ExecuteReader();
while (await reader.ReadAsync())
{
list.Add(reader.GetString(0));
}
return list;
}
public interface IAsyncStateMachine{ void MoveNext(); void SetStateMachine(IAsyncStateMachine stateMachine);}
step1.初始化一个异步状态机machine
step2.初始化一个AsyncTaskMethodBuilder的实例,赋予machine.builder
step3.设置异步状态机的状态为-1,将类传入到状态机内部
step4.调用machine.builder的start方法
step5.返回machine.builder.Task
(3).NET提供异步方式的总结:
.NET 4.5开始提供的async/await,本质是.NET 4.0的Task + 状态机
.NET 4.0开始提供的Task,本质是.NET 3.5提供的Thread+ThreadPool+等待/取消等API操作
推荐阅读