C#实现请求唯一性校验支持高并发

栏目: IT技术 · 发布时间: 4年前

内容简介:您看此文用·秒,转发只需1秒呦~

C#实现请求唯一性校验支持高并发

转自:龙腾-hyl

cnblogs.com/huangyoulong/p/11611145.html

使用场景描述

网络请求中经常会遇到发送的请求,服务端响应是成功的,但是返回的时候出现网络故障,导致客户端无法接收到请求结果,那么客户端程序可能判断为网络故障,而重复发送同一个请求。

当然如果接口中定义了请求结果查询接口,那么这种重复会相对少一些。特别是交易类的数据,这种操作更是需要避免重复发送请求。

另外一种情况是用户过于快速的点击界面按钮,产生连续的相同内容请求,那么后端也需要进行过滤,这种一般出现在系统对接上,无法去控制第三方系统的业务逻辑,需要从自身业务逻辑里面去限定。

其他需求描述

这类请求一般存在时间范围和高并发的特点,就是短时间内会出现重复的请求,因此对模块需要支持高并发性。

技术实现

对请求的业务内容进行MD5摘要,并且将MD5摘要存储到缓存中,每个请求数据都通过这个一个公共的调用的方法进行判断。

代码实现

公共调用代码 UniqueCheck 采用单例模式创建唯一对象,便于在多线程调用的时候,只访问一个统一的缓存库

/*
 * volatile就像大家更熟悉的const一样,volatile是一个类型修饰符(type specifier)。
 * 它是被设计用来修饰被不同线程访问和修改的变量。
 * 如果没有volatile,基本上会导致这样的结果:要么无法编写多线程程序,要么编译器失去大量优化的机会。
 */
private static readonly object lockHelper = new object();
private volatile static UniqueCheck _instance;
/// <summary>
/// 获取单一实例
/// </summary>
/// <returns></returns>
public static UniqueCheck GetInstance()
{
    if (_instance == null)
    {
        lock (lockHelper)
        {
            if (_instance == null)
                _instance = new UniqueCheck();
        }
    }
    return _instance;
}

这里需要注意volatile的修饰符,在实际测试过程中,如果没有此修饰符,在高并发的情况下会出现报错。

自定义一个可以进行并发处理队列,代码如下:ConcurrentLinkedQueue

using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;
namespace PackgeUniqueCheck
{
    /// <summary>
    /// 非加锁并发队列,处理100个并发数以内
    /// </summary>
    /// <typeparam name="T"></typeparam>
    public class ConcurrentLinkedQueue<T>
    {
        private class Node<K>
        {
            internal K Item;
            internal Node<K> Next;
            public Node(K item, Node<K> next)
            {
                this.Item = item;
                this.Next = next;
            }
        }
        private Node<T> _head;
        private Node<T> _tail;
        public ConcurrentLinkedQueue()
        {
            _head = new Node<T>(default(T), null);
            _tail = _head;
        }
        public bool IsEmpty
        {
            get { return (_head.Next == null); }
        }
        /// <summary>
        /// 进入队列
        /// </summary>
        /// <param name="item"></param>
        public void Enqueue(T item)
        {
            Node<T> newNode = new Node<T>(item, null);
            while (true)
            {
                Node<T> curTail = _tail;
                Node<T> residue = curTail.Next;
                //判断_tail是否被其他process改变
                if (curTail == _tail)
                {
                    //A 有其他process执行C成功,_tail应该指向新的节点
                    if (residue == null)
                    {
                        //C 其他process改变了tail节点,需要重新取tail节点
                        if (Interlocked.CompareExchange<Node<T>>(
                          ref curTail.Next, newNode, residue) == residue)
                        {
                            //D 尝试修改tail
                           Interlocked.CompareExchange<Node<T>>(ref _tail, newNode, curTail);

                           return;
                        }
                    }
                    else
                    {
                        //B 帮助其他线程完成D操作
                        Interlocked.CompareExchange<Node<T>>(ref _tail, residue, curTail);
                    }
                }
            }
        }
        /// <summary>
        /// 队列取数据
        /// </summary>
        /// <param name="result"></param>
        /// <returns></returns>
        public bool TryDequeue(out T result)
        {
            Node<T> curHead;
            Node<T> curTail;
            Node<T> next;
            while (true)
            {
                curHead = _head;
                curTail = _tail;
                next = curHead.Next;
                if (curHead == _head)
                {
                    if (next == null) //Queue为空
                    {
                        result = default(T);
                        return false;
                    }
                    if (curHead == curTail) //Queue处于Enqueue第一个node的过程中
                    {
                        //尝试帮助其他Process完成操作
                        Interlocked.CompareExchange<Node<T>>(ref _tail, next, curTail);
                    }
                    else
                    {
                        //取next.Item必须放到CAS之前
                        result = next.Item;
                        //如果_head没有发生改变,则将_head指向next并退出
                        if (Interlocked.CompareExchange<Node<T>>(ref _head,
                          next, curHead) == curHead)
                            break;
                    }
                }
            }
            return true;
        }
        /// <summary>
        /// 尝试获取最后一个对象
        /// </summary>
        /// <param name="result"></param>
        /// <returns></returns>
        public bool TryGetTail(out T result)
        {
            result = default(T);
            if (_tail == null)
            {
                return false;
            }
            result = _tail.Item;
            return true;
        }
    }
}

虽然是一个非常简单的唯一性校验逻辑,但是要做到高效率,高并发支持,高可靠性,以及低内存占用,需要实现这样的需求,需要做细致的模拟测试。

using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;
using System.Collections;
namespace PackgeUniqueCheck
{
    public class UniqueCheck
    {
        /*
         * volatile就像大家更熟悉的const一样,volatile是一个类型修饰符(type specifier)。
         * 它是被设计用来修饰被不同线程访问和修改的变量。
         * 如果没有volatile,基本上会导致这样的结果:要么无法编写多线程程序,要么编译器失去大量优化的机会。
         */
        private static readonly object lockHelper = new object();
        private volatile static UniqueCheck _instance;        
        /// <summary>
        /// 获取单一实例
        /// </summary>
        /// <returns></returns>
        public static UniqueCheck GetInstance()
        {
            if (_instance == null)
            {
                lock (lockHelper)
                {
                    if (_instance == null)
                        _instance = new UniqueCheck();
                }
            }
            return _instance;
        }
        private UniqueCheck()
        {
            //创建一个线程安全的哈希表,作为字典缓存
            _DataKey = Hashtable.Synchronized(new Hashtable());
            Queue myqueue = new Queue();
            _DataQueue = Queue.Synchronized(myqueue);
            _Myqueue = new ConcurrentLinkedQueue<string>();
            _Timer = new Thread(DoTicket);
            _Timer.Start();
        }
        #region 公共属性设置
        /// <summary>
        /// 设定定时线程的休眠时间长度:默认为1分钟
        /// 时间范围:1-7200000,值为1毫秒到2小时
        /// </summary>
        /// <param name="value"></param>
        public void SetTimeSpan(int value)
        {
            if (value > 0&& value <=7200000)
            {
                _TimeSpan = value;
            }
        }
        /// <summary>
        /// 设定缓存Cache中的最大记录条数
        /// 值范围:1-5000000,1到500万
        /// </summary>
        /// <param name="value"></param>
        public void SetCacheMaxNum(int value)
        {
            if (value > 0 && value <= 5000000)
            {
                _CacheMaxNum = value;
            }
        }
        /// <summary>
        /// 设置是否在控制台中显示日志
        /// </summary>
        /// <param name="value"></param>
        public void SetIsShowMsg(bool value)
        {
            Helper.IsShowMsg = value;
        }
        /// <summary>
        /// 线程请求阻塞增量
        /// 值范围:1-CacheMaxNum,建议设置为缓存最大值的10%-20%
        /// </summary>
        /// <param name="value"></param>
        public void SetBlockNumExt(int value)
        {
            if (value > 0 && value <= _CacheMaxNum)
            {
                _BlockNumExt = value;
            }
        }
        /// <summary>
        /// 请求阻塞时间
        /// 值范围:1-max,根据阻塞增量设置请求阻塞时间
        /// 阻塞时间越长,阻塞增量可以设置越大,但是请求实时响应就越差
        /// </summary>
        /// <param name="value"></param>
        public void SetBlockSpanTime(int value)
        {
            if (value > 0)
            {
                _BlockSpanTime = value;
            }
        }
        #endregion
        #region 私有变量
        /// <summary>
        /// 内部运行线程
        /// </summary>
        private Thread _runner = null;
        /// <summary>
        /// 可处理高并发的队列
        /// </summary>
        private ConcurrentLinkedQueue<string> _Myqueue = null;
        /// <summary>
        /// 唯一内容的时间健值对
        /// </summary>
        private Hashtable _DataKey = null;
        /// <summary>
        /// 内容时间队列
        /// </summary>
        private Queue _DataQueue = null;
        /// <summary>
        /// 定时线程的休眠时间长度:默认为1分钟
        /// </summary>
        private int _TimeSpan = 3000;
        /// <summary>
        /// 定时计时器线程
        /// </summary>
        private Thread _Timer = null;
        /// <summary>
        /// 缓存Cache中的最大记录条数
        /// </summary>
        private int _CacheMaxNum = 500000;
        /// <summary>
        /// 线程请求阻塞增量
        /// </summary>
        private int _BlockNumExt = 10000;
        /// <summary>
        /// 请求阻塞时间
        /// </summary>
        private int _BlockSpanTime = 100;
        #endregion

        #region 私有方法
        private void StartRun()
        {
            _runner = new Thread(DoAction);
            _runner.Start();
            Helper.ShowMsg("内部线程启动成功!");
        }
        private string GetItem()
        {
            string tp = string.Empty;
            bool result = _Myqueue.TryDequeue(out tp);
            return tp;
        }
        /// <summary>
        /// 执行循环操作
        /// </summary>
        private void DoAction()
        {
            while (true)
            {
                while (!_Myqueue.IsEmpty)
                {
                    string item = GetItem();
                    _DataQueue.Enqueue(item);
                    if (!_DataKey.ContainsKey(item))
                    {
                        _DataKey.Add(item, DateTime.Now);
                    }
                }
                //Helper.ShowMsg("当前数组已经为空,处理线程进入休眠状态...");
               Thread.Sleep(2);
            }
        }
        /// <summary>
        /// 执行定时器的动作
        /// </summary>
        private void DoTicket()
        {
            while (true)
            {
                Helper.ShowMsg("当前数据队列个数:" + _DataQueue.Count.ToString());
                if (_DataQueue.Count > _CacheMaxNum)
                {
                    while (true)
                    {
                        Helper.ShowMsg(string.Format("当前队列数:{0},已经超出最大长度:{1},开始进行清理操作...", _DataQueue.Count, _CacheMaxNum.ToString()));
                        string item = _DataQueue.Dequeue().ToString();
                        if (!string.IsNullOrEmpty(item))
                        {
                            if (_DataKey.ContainsKey(item))
                            {
                                _DataKey.Remove(item);
                            }
                            if (_DataQueue.Count <= _CacheMaxNum)
                            {
                                Helper.ShowMsg("清理完成,开始休眠清理线程...");
                                break;
                            }
                        }
                    }
                }
                Thread.Sleep(_TimeSpan);
            }
        }

        /// <summary>
        /// 线程进行睡眠等待
        /// 如果当前负载压力大大超出了线程的处理能力
        /// 那么需要进行延时调用
        /// </summary>
        private void BlockThread()
        {
            if (_DataQueue.Count > _CacheMaxNum + _BlockNumExt)
            {
                Thread.Sleep(_BlockSpanTime);
            }
        }
        #endregion

        #region 公共方法
        /// <summary>
        /// 开启服务线程
        /// </summary>
        public void Start()
        {
            if (_runner == null)
            {
                StartRun();
            }
            else
            {
                if (_runner.IsAlive == false)
                {
                    StartRun();
                }
            }
        }
        /// <summary>
        /// 关闭服务线程
        /// </summary>
        public void Stop()
        {
            if (_runner != null)
            {
                _runner.Abort();
                _runner = null;
            }
        }
        /// <summary>
        /// 添加内容信息
        /// </summary>
        /// <param name="item">内容信息</param>
        /// <returns>true:缓存中不包含此值,队列添加成功,false:缓存中包含此值,队列添加失败</returns>
        public bool AddItem(string item)
        {
            BlockThread();
            item = Helper.MakeMd5(item);
            if (_DataKey.ContainsKey(item))
            {
                return false;
            }
            else
            {
                _Myqueue.Enqueue(item);
                return true;
            }
        }
        /// <summary>
        /// 判断内容信息是否已经存在
        /// </summary>
        /// <param name="item">内容信息</param>
        /// <returns>true:信息已经存在于缓存中,false:信息不存在于缓存中</returns>
        public bool CheckItem(string item)
        {
            item = Helper.MakeMd5(item);
            return _DataKey.ContainsKey(item);
        }
        #endregion    
    }
}

模拟测试代码:

private static string _example = Guid.NewGuid().ToString();
private static UniqueCheck _uck = null;
static void Main(string[] args)
{
    _uck = UniqueCheck.GetInstance();
    _uck.Start();
    _uck.SetIsShowMsg(false);
    _uck.SetCacheMaxNum(20000000);
    _uck.SetBlockNumExt(1000000);
    _uck.SetTimeSpan(6000);
    _uck.AddItem(_example);
    Thread[] threads = new Thread[20];
    for (int i = 0; i < 20; i++)
    {
        threads[i] = new Thread(AddInfo);
        threads[i].Start();
    }
    Thread checkthread = new Thread(CheckInfo);
    checkthread.Start();
    string value = Console.ReadLine();
    checkthread.Abort();
    for (int i = 0; i < 50; i++)
    {
        threads[i].Abort();
    }
    _uck.Stop();
}

static void AddInfo()
{
    while (true)
    {
       _uck.AddItem(Guid.NewGuid().ToString());
    }
}
static void CheckInfo()
{
    while (true)
    {
        Console.WriteLine("开始时间:{0}...", DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.ffff"));
        Console.WriteLine("插入结果:{0}", _uck.AddItem(_example));
        Console.WriteLine("结束时间:{0}", DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.ffff"));
     //调整进程休眠时间,可以测试高并发的情况
        //Thread.Sleep(1000);
    }
}

测试截图:

C#实现请求唯一性校验支持高并发

往期 精彩 回顾

【推荐】.NET Core开发实战视频课程   ★★★

.NET Core实战项目之CMS 第一章 入门篇-开篇及总体规划

【.NET Core微服务实战-统一身份认证】开篇及目录索引

Redis基本使用及百亿数据量中的使用技巧分享(附视频地址及观看指南)

.NET Core中的一个接口多种实现的依赖注入与动态选择看这篇就够了

10个小技巧助您写出高性能的ASP.NET Core代码

用abp vNext快速开发Quartz.NET定时任务管理界面

现身说法:实际业务出发分析百亿数据量下的多表查询优化

关于C#异步编程你应该了解的几点建议

C#异步编程看这篇就够了

给我好看

C#实现请求唯一性校验支持高并发

您看此文用

·

秒,转发只需1秒呦~

C#实现请求唯一性校验支持高并发

好看你就

点点


以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

Flow-Based Programming (2nd Edition)

Flow-Based Programming (2nd Edition)

CreateSpace / 2010-5-14 / $69.95

Written by a pioneer in the field, this is a thorough guide to the cost- and time-saving advantages of Flow-Based Programming. It explains the theoretical underpinnings and application of this program......一起来看看 《Flow-Based Programming (2nd Edition)》 这本书的介绍吧!

在线进制转换器
在线进制转换器

各进制数互转换器

随机密码生成器
随机密码生成器

多种字符组合密码

正则表达式在线测试
正则表达式在线测试

正则表达式在线测试