es的C# api 的封装和使用说明

栏目: ASP.NET · 发布时间: 5年前

内容简介:说明:

说明:

  1. C#的elastic client通过ConnectionSettings来指定es server的地址

  2. C#的Request有多种类型封装,比如Search相关的封装SearchRequest, GetAlias相关的GetAliasRequest, Count相关的CountRequest, ping相关的PingRequest等。  即,es默认支持的关键字都有对应的request

  3. C#的Query有matchQuery、DateRangeQuery等, 相互之间用条件运算符可以组合查询, 最终生成的restful 请求头转换为bool query了

  4. C#的Aggregation也有TermsAggregation、DateHistogramAggregation等, 用来普通分类和时间分类等, 不像js,需要自己用字符串指定。 返回值也要在Aggregations里查看。

public async Task<IReadOnlyCollection<DateHistogramBucket>> GetScanPatientStatistics( string systemID, string startDate, string endDate, DateInterval dateInterval, string indices = "*scanstatistics*" )

{

var client = GetClient();

var filterPath = new string [] { "aggregations.*.buckets" };

var request = new SearchRequest(indices)

{

Query = new MatchQuery() { Field = "systemID" , Query = systemID }

&& new DateRangeQuery() { Field = "timeStamp" , GreaterThanOrEqualTo = startDate, LessThanOrEqualTo = endDate, Format = "yyyyMMdd" },

Aggregations = new DateHistogramAggregation( "agg_date_histogram_timeStamp" )

{

Field = "timeStamp" ,

Interval = dateInterval,

Format = "yyyy-MM-dd HH:mm:ss" ,

Aggregations = new ValueCountAggregation( "agg_count_systemID" , "systemID.keyword" )

},

FilterPath = filterPath

};

var response = await client.SearchAsync<CTDMS<CTDMSItem>>(request);

var dateHistogram = response.Aggregations.DateHistogram( "agg_date_histogram_timeStamp" );

return dateHistogram.Buckets;

}

  1. Buckets可以转为Dictionary

public async Task<Dictionary<DateTime, double ?>> GetScanPatientStatistics( string systemID, DateRangeEnum dateRange, DateInterval dateInterval, string indices = "*scanstatistics*" )

{

var buckets = await GetScanPatientStatistics(systemID, DateRangeEnumExtensions.ToString(dateRange), "now" , dateInterval, indices);

return buckets.ToDictionary(k => k.Date, v => v.Values.OfType<ValueAggregate>().First().Value);

}

  1. C#对于各种查询都有数据类型校验, 这点和js api不相同, 因此必须指定类型

代码如下:

namespace My.ElasticSearch

{

/// <summary>

/// sample code 如下:

///  var search = new SearchDomainService();

//获取当月的每天温度

/*

var result = search.GetDetectorTemp("987654321098",

DetectorEnum.DetectorTempL,

DateRangeEnum.CurrentMonth,

DateInterval.Day).GetAwaiter().GetResult();

*/

//获取当前es中的所有index

/*

var indices = search.GetIndices().GetAwaiter().GetResult();

*/

//当前服务是否ok

/*

var isAvailable = search.IsAvailable().GetAwaiter().GetResult();

*/

//获取扫描量

/*

var count = search.GetScanCount("987654321098", "now/y").GetAwaiter().GetResult();

*/

//获取每一类部位的扫描量

/*

var stat = search.GetScanBodypartStatistics("987654321098", DateRangeEnum.CurrentYear).GetAwaiter().GetResult();

*/

//获取最后一次上传日志的时间

/*

var date = search.GetLatestUploadTime("987654321098").GetAwaiter().GetResult();

*/

/*

var date = search.GetLatestUploadTime<CTScanStatistics<CTScanStatisticsItem>, CTScanStatisticsItem>("987654321098").GetAwaiter().GetResult();

*/

/// </summary>

public class ElasticSearchDomainService

{

private ConnectionSettings _connectionSettings;

private ElasticClient _client = null;

public ElasticSearchDomainService()

{

_connectionSettings = new ConnectionSettings(new Uri(" http://localhost:9200 ")).DisableDirectStreaming();

}

public void SetConnectionSettingUrl(string url)

{

_connectionSettings = new ConnectionSettings(new Uri(url)).DisableDirectStreaming();

}

public ElasticClient GetClient()

{

if (_client == null)

{

_client = new ElasticClient(_connectionSettings);

}

return _client;

}

/// <summary>

/// -> POST http://localhost:9200/*dms*/_search?filter_path=aggregations.agg_date_histogram_timeStamp.buckets

/// {

///   "query": {

///     "bool": {

///       "must": [

///         {

///           "match": {

///             "systemID": "987654321098"

///           }

///         },

///         {

///           "match": {

///             "items.name.keyword": "DetectorTempL"

///           }

///         },

///         {

///           "range": {

///             "timeStamp": {

///               "gte": "now/M"

///             }

///           }

///         }

///       ]

///     }

///   },

///   "aggs": {

///     "agg_date_histogram_timeStamp": {

///       "date_histogram": {

///         "field": "timeStamp",

///         "interval": "1d",

///         "format": "yyyy-MM-dd HH:mm:ss"

///       },

///       "aggs": {

///         "agg_avg_items.value": {

///           "avg": {

///             "field": "items.value"

///           }

///         }

///       }

///     }

///   }

/// }

/// <- 200

/// {

///   "aggregations": {

///     "agg_date_histogram_timeStamp": {

///       "buckets": [

///         {

///           "key_as_string": "2018-05-30",

///           "key": 1527638400000,

///           "doc_count": 23,

///           "agg_avg_items.value": {

///             "value": 38.84304356229478

///           }

///         }

///       ]

///     }

///   }

/// }

/// 此方法提供特定设备的特定探测器在某个时间范围的每个interval的温度平均值

/// </summary>

/// <param name="systemID">查询的设备唯一标识,index中的systemID字段</param>

/// <param name="detector">查询的设备的探测器,有左、中、右。Index的items.name字段</param>

/// <param name="startDate">查询的时间范围的起始时间, 格式yyyyMMdd,参考es官方的Date Math</param>

/// <param name="endDate">查询的时间范围的终止时间, 格式yyyyMMdd,参考es官方的Date Math</param>

/// <param name="dateInterval">统计的时间间隔</param>

/// <param name="indices">查询的index name</param>

/// <returns>返回一个es自带的DateHistogramBucket的数组。DateHistogramBucket对象中包括分类的时间点及对应的值</returns>

public async Task<IReadOnlyCollection<DateHistogramBucket>> GetDetectorTemp(string systemID, DetectorEnum detector, string startDate, string endDate, DateInterval dateInterval, string indices = "*dms*")

{

var client = GetClient();

var filterPath = new string[] { "aggregations.*.buckets" };

var request = new SearchRequest(indices)

{

Query = new MatchQuery() { Field = "systemID", Query = systemID }

&& new MatchQuery() { Field = "items.name.keyword", Query = detector.ToString() }

&& new DateRangeQuery() { Field = "timeStamp", GreaterThanOrEqualTo = startDate, LessThanOrEqualTo = endDate, Format = "yyyyMMdd" },

Aggregations = new DateHistogramAggregation("agg_date_histogram_timeStamp")

{

Field = "timeStamp",

Interval = dateInterval,

Format = "yyyy-MM-dd HH:mm:ss",

Aggregations = new AverageAggregation("agg_avg_items.value", "items.value")

},

FilterPath = filterPath,

};

var response = await client.SearchAsync<CTDMS<CTDMSItem>>(request);

var dateHistogram = response.Aggregations.DateHistogram("agg_date_histogram_timeStamp");

return dateHistogram.Buckets;

}

/// <summary>

/// 此方法提供特定设备的特定探测器在某个时间范围的每个interval的温度平均值

/// </summary>

/// <param name="systemID">查询的设备唯一标识,index中的systemID字段</param>

/// <param name="detector">查询的设备的探测器,有左、中、右。Index的items.name字段</param>

/// <param name="dateRange">查询的时间范围,自定义的enum有当天、当月、当年、昨天等预设范围。如果需要一个自由的时间范围,请用另一个dateRange为string类型的同名函数</param>

/// <param name="dateInterval">统计的时间间隔</param>

/// <param name="indices">查询的index name</param>

/// <returns>返回一个字典,key为日期,value为该日期内的温度平均值。key主要根据interval来阶段递增变化</returns>

public async Task<Dictionary<DateTime, double?>> GetDetectorTemp(string systemID, DetectorEnum detector, DateRangeEnum dateRange, DateInterval dateInterval, string indices = "*dms*")

{

var buckets = await GetDetectorTemp(systemID, detector, DateRangeEnumExtensions.ToString(dateRange), "now", dateInterval, indices);

return buckets.ToDictionary(k => k.Date, v => v.Values.OfType<ValueAggregate>().First().Value);

}

/// <summary>

///-> GET http://localhost:9200/_alias?filter_path=-*kibana *,-*es*,-*monitoring*,metric*

///<- 200

///{

/// "metric_2018_05_30_ctdms`1": {

///   "aliases": {}

/// },

/// "metric_2018_05_30_ctaws`1": {

///   "aliases": {}

/// },

/// "metric_2018_05_30_ctscanstatistics`1": {

///   "aliases": {}

/// },

/// "metric_2018_05_30_ctdailystatistics`1": {

///   "aliases": {}

/// }

///}

///此方法用于获取当前es中的所有index name

/// </summary>

/// <param name="filterPath">指定需要过滤的或者显示的index,可以排除系统自带的一些index</param>

/// <returns>当前和业务相关的所有设备名称的数组</returns>

public async Task<List<string>> GetIndices(string[] filterPath = null)

{

var client = GetClient();

if (filterPath == null)

{

filterPath = new string[] { "-*kibana*", "-*es*", "-*monitoring*", "metric*" };

}

var request = new GetAliasRequest()

{

FilterPath = filterPath

};

var response = await client.GetAliasAsync(request);

return response.Indices.Keys.Select(e => e.Name).ToList();

}

/// <summary>

///  -> POST http://localhost:9200/*scanstatistics*/_search?filter_path=hits.hits._source.timeStamp

/// {

///   "size": 1,

///   "sort": [

///     {

///       "timeStamp": {

///         "order": "asc"

///       }

/// }

///   ],

///   "query": {

///     "match": {

///       "systemID": "987654321098"

///     }

///   }

/// }

/// <- 200

/// {

///   "hits": {

///     "hits": [

///       {

///         "_source": {

///           "timeStamp": "2018-05-30T09:48:26.705659+08:00"

///         }

///       }

///     ]

///   }

/// }

/// 返回特定数据类型的某个设备的最后一条日志的上传时间。用于显示设备的最后一次更新状态

/// </summary>

/// <param name="systemID">查询的设备唯一标识,index中的systemID字段</param>

/// <param name="indices">查询的index name</param>

/// <returns>最后一条日志的上传时间</returns>

public async Task<DateTime> GetLatestUploadTime<T1, T2>(string systemID, string indices = "*scanstatistics*")

where T1 : BaseMetrics<T2>

where T2 : BaseItem

{

var client = GetClient();

var filterPath = new string[] { "hits.hits._source.timeStamp" };

var request = new SearchRequest(indices)

{

Query = new MatchQuery() { Field = "systemID", Query = systemID },

Size = 1,

Sort = new List<ISort>

{

new SortField {Field = "timeStamp", Order = SortOrder.Descending}

},

FilterPath = filterPath

};

var response = await client.SearchAsync<T1>(request);

return response.Hits.First().Source.TimeStamp;

}

/// <summary>

/// 获取特定设备在某个时间范围内的扫描部位量的统计。

/// </summary>

/// <param name="systemID">查询的设备唯一标识,index中的systemID字段</param>

/// <param name="dateRange">查询的时间范围,自定义的enum有当天、当月、当年、昨天等预设范围。如果需要一个自由的时间范围,请用另一个dateRange为string类型的同名函数</param>

/// <param name="indices">查询的index name</param>

/// <returns>一个字典,key为扫描部位的名称, value为该扫描部位在查询的时间范围内的总数量之和。</returns>

public async Task<Dictionary<string, double?>> GetScanBodypartStatistics(string systemID, DateRangeEnum dateRange, string indices = "*scanstatistics*")

{

var buckets = await GetScanBodypartStatistics(systemID, DateRangeEnumExtensions.ToString(dateRange), "now", indices);

return buckets.ToDictionary(k => k.Key, v => v.Values.OfType<ValueAggregate>().First().Value);

}

/// <summary>

/// -> POST http://localhost:9200/*scanstatistics*/_search?filter_path=aggregations.agg_terms_items \.name\.keyword.buckets

/// {

///   "query": {

///     "bool": {

///       "must": [

///         {

///           "match": {

///             "systemID": "987654321098"

///           }

///         },

///         {

///           "range": {

///             "timeStamp": {

///               "gte": "now/M"

///             }

///           }

///         }

///       ]

///     }

///   },

///   "aggs": {

///     "agg_terms_items.name.keyword": {

///       "terms": {

///         "field": "items.name.keyword"

///       },

///       "aggs": {

///         "agg_sum_items.value": {

///           "sum": {

///             "field": "items.value"

///           }

///         }

///       }

///     }

///   }

/// }

/// <- 200

/// {

///   "aggregations": {

///     "agg_terms_items.name.keyword": {

///       "buckets": [

///         {

///           "key": "Upper Extremity",

///           "doc_count": 15,

///           "agg_sum_items.value": {

///             "value": 1247

///           }

///         },

///         {

///           "key": "Pelvis",

///           "doc_count": 13,

///           "agg_sum_items.value": {

///             "value": 888

///           }

///         },

///         {

///           "key": "Neck",

///           "doc_count": 11,

///           "agg_sum_items.value": {

///             "value": 939

///           }

///         },

///         {

///           "key": "Spine",

///           "doc_count": 11,

///           "agg_sum_items.value": {

///             "value": 797

///           }

///         },

///         {

///           "key": "Thorax",

///           "doc_count": 11,

///           "agg_sum_items.value": {

///             "value": 1016

///           }

///         },

///         {

///           "key": "Abdomen",

///           "doc_count": 9,

///           "agg_sum_items.value": {

///             "value": 732

///           }

///         },

///         {

///           "key": "Head",

///           "doc_count": 8,

///           "agg_sum_items.value": {

///             "value": 531

///           }

///         }

///       ]

///     }

///   }

/// }

/// 获取特定设备在某个时间范围内的扫描部位量的统计。

/// </summary>

/// <param name="systemID">查询的设备唯一标识,index中的systemID字段</param>

/// <param name="startDate">查询的时间范围的起始时间, 格式yyyyMMdd,参考es官方的Date Math</param>

/// <param name="endDate">查询的时间范围的终止时间, 格式yyyyMMdd,参考es官方的Date Math</param>

/// <param name="indices">查询的index name</param>

/// <returns>一个es自带的KeyedBucket的数组。KeyedBucket对象包括分类的name和value</returns>

public async Task<IReadOnlyCollection<KeyedBucket<string>>> GetScanBodypartStatistics(string systemID, string startDate, string endDate, string indices = "*scanstatistics*")

{

var client = GetClient();

var filterPath = new string[] { "aggregations.*.buckets" };

var request = new SearchRequest(indices)

{

Query = new MatchQuery() { Field = "systemID", Query = systemID }

&& new DateRangeQuery() { Field = "timeStamp", GreaterThanOrEqualTo = startDate, LessThanOrEqualTo = endDate, Format = "yyyyMMdd" },

Aggregations = new TermsAggregation("terms")

{

Field = "items.name.keyword",

Aggregations = new SumAggregation("sum", "items.value")

},

FilterPath = filterPath

};

var response = await client.SearchAsync<CTScanStatistics<CTScanStatisticsItem>>(request);

var agg = response.Aggregations.Terms("terms");

return agg.Buckets;

}

/// <summary>

/// 查询某个时间范围内的每个interval的患者扫描量

/// </summary>

/// <param name="systemID">查询的设备唯一标识,index中的systemID字段</param>

/// <param name="dateRange">查询的时间范围,自定义的enum有当天、当月、当年、昨天等预设范围。如果需要一个自由的时间范围,请用另一个dateRange为string类型的同名函数</param>

/// <param name="dateInterval">统计的时间间隔</param>

/// <param name="indices">查询的index name</param>

/// <returns>返回一个字典,key为日期,value为该日期内的患者扫描量总和。key主要根据interval来阶段递增变化</returns>

public async Task<Dictionary<DateTime, double?>> GetScanPatientStatistics(string systemID, DateRangeEnum dateRange, DateInterval dateInterval, string indices = "*scanstatistics*")

{

var buckets = await GetScanPatientStatistics(systemID, DateRangeEnumExtensions.ToString(dateRange), "now", dateInterval, indices);

return buckets.ToDictionary(k => k.Date, v => v.Values.OfType<ValueAggregate>().First().Value);

}

/// <summary>

/// -> POST /*scanstatistics*/_search?typed_keys=true&filter_path=aggregations.*.buckets

///{

/// "query": {

///   "bool": {

///     "must": [

///       {

///         "match": {

///           "systemID": {

///             "query": "987654321098"

///           }

///         }

///       },

///       {

///         "range": {

///           "timeStamp": {

///             "gte": "now/y"

///           }

///         }

///       }

///     ]

///   }

/// },

/// "aggs": {

///   "agg_date_histogram_timeStamp": {

///     "date_histogram": {

///       "field": "timeStamp",

///       "interval": "month",

///       "format": "yyyy-MM-dd HH:mm:ss"

///     },

///     "aggs": {

///       "agg_count_systemID": {

///         "value_count": {

///           "field": "systemID.keyword"

///         }

///       }

///     }

///   }

/// }

///}

/// <- 200

/// {

///  "aggregations": {

///   "date_histogram#agg_date_histogram_timeStamp": {

///     "buckets": [

///       {

///         "key_as_string": "2018-05-01 00:00:00",

///         "key": 1525132800000,

///         "doc_count": 30,

///         "value_count#agg_count_systemID": {

///           "value": 30

///         }

///      },

///       {

///         "key_as_string": "2018-06-01 00:00:00",

///         "key": 1527811200000,

///         "doc_count": 30,

///         "value_count#agg_count_systemID": {

///           "value": 30

///         }

///       }

///     ]

///   }

/// }

///}

/// 查询某个时间范围内的每个interval的患者扫描量

/// </summary>

/// <param name="systemID">查询的设备唯一标识,index中的systemID字段</param>

/// <param name="startDate">查询的时间范围的起始时间, 格式yyyyMMdd,参考es官方的Date Math</param>

/// <param name="endDate">查询的时间范围的终止时间, 格式yyyyMMdd,参考es官方的Date Math</param>

/// <param name="dateInterval">统计的时间间隔</param>

/// <param name="indices">查询的index name</param>

/// <returns>返回一个es自带的DateHistogramBucket的数组。DateHistogramBucket对象中包括分类的时间点及对应的值</returns>

public async Task<IReadOnlyCollection<DateHistogramBucket>> GetScanPatientStatistics(string systemID, string startDate, string endDate, DateInterval dateInterval, string indices = "*scanstatistics*")

{

var client = GetClient();

var filterPath = new string[] { "aggregations.*.buckets" };

var request = new SearchRequest(indices)

{

Query = new MatchQuery() { Field = "systemID", Query = systemID }

&& new DateRangeQuery() { Field = "timeStamp", GreaterThanOrEqualTo = startDate, LessThanOrEqualTo = endDate, Format = "yyyyMMdd" },

Aggregations = new DateHistogramAggregation("agg_date_histogram_timeStamp")

{

Field = "timeStamp",

Interval = dateInterval,

Format = "yyyy-MM-dd HH:mm:ss",

Aggregations = new ValueCountAggregation("agg_count_systemID", "systemID.keyword")

},

FilterPath = filterPath

};

var response = await client.SearchAsync<CTDMS<CTDMSItem>>(request);

var dateHistogram = response.Aggregations.DateHistogram("agg_date_histogram_timeStamp");

return dateHistogram.Buckets;

}

/// <summary>

/// 获取某个设备在特定时间范围内的患者扫描量。

/// 每条日志就是该患者在当时扫描时记录的log,因此统计扫描log的数量即可得到扫描患者的数量

/// </summary>

/// <param name="systemID">查询的设备唯一标识,index中的systemID</param>

/// <param name="dateRange">查询的时间范围,自定义的enum有当天、当月、当年、昨天等预设范围。如果需要一个自由的时间范围,请用另一个dateRange为string类型的同名函数</param>

/// <param name="indices">查询的index name</param>

/// <returns>患者扫描的总和</returns>

public async Task<long> GetTotalScanPatientCount(string systemID, DateRangeEnum dateRange, string indices = "*scanstatistics*")

{

return await GetTotalScanPatientCount(systemID, DateRangeEnumExtensions.ToString(dateRange), "now", indices);

}

/// <summary>

///  -> POST http://localhost:9200/*scanstatistics*/_count

///  {

///   "query": {

///     "bool": {

///       "must": [

///         {

///           "match": {

///             "systemID": "987654321098"

///           }

///         },

///         {

///           "range": {

///             "timeStamp": {

///               "gte": "now/d"

///             }

///           }

///         }

///       ]

///     }

///   }

/// }

/// <- 200

/// {

///   "count": 30,

///   "_shards": {

///     "total": 15,

///     "successful": 15,

///     "skipped": 0,

///     "failed": 0

///   }

/// }

/// 获取某个设备在特定时间范围内的患者扫描量。

/// 每条日志就是该患者在当时扫描时记录的log,因此统计扫描log的数量即可得到扫描患者的数量

/// </summary>

/// <param name="systemID">查询的设备唯一标识,index中的systemID</param>

/// <param name="startDate">查询的时间范围的起始时间, 格式yyyyMMdd,参考es官方的Date Math</param>

/// <param name="endDate">查询的时间范围的终止时间, 格式yyyyMMdd,参考es官方的Date Math</param>

/// <param name="indices">查询的index name</param>

/// <returns>患者扫描的总和</returns>

public async Task<long> GetTotalScanPatientCount(string systemID, string startDate, string endDate, string indices = "*scanstatistics*")

{

var client = GetClient();

var request = new CountRequest(indices)

{

Query = new MatchQuery() { Field = "systemID", Query = systemID }

&& new DateRangeQuery() { Field = "timeStamp", GreaterThanOrEqualTo = startDate, LessThanOrEqualTo = endDate, Format = "yyyyMMdd" }

};

var response = await client.CountAsync<CTScanStatistics<CTScanStatisticsItem>>(request);

return response.Count;

}

/// <summary>

/// 判断当前ES的服务是否有效。建议在执行每一个ES的操作前都应该先调用此方法。

/// </summary>

/// <returns>true,服务运行正常; false,服务运行异常</returns>

public async Task<bool> IsAvailable()

{

var client = GetClient();

var request = new PingRequest()

{

ErrorTrace = true

};

var response = await client.PingAsync(request);

return response.IsValid;

}

}

}


以上所述就是小编给大家介绍的《es的C# api 的封装和使用说明》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

趋势红利

趋势红利

刘润 / 文化发展出版社(原印刷工业出版社) / 2016-6-1 / 45.00

【编辑推荐】 1、国内顶尖的互联网转型专家,海尔、百度等知名企业战略顾问刘润送给传统企业的转型、创新“导航仪”,这个时代企业家的必修课 站在近200年商业全景图角度,刘润发现三种企业类型(产品型、渠道型、营销型),针对不同企业类型定制转型战略(找到自己的未来红利),方便 传统企业对号入座:不走错路就是节省时间,适合自己的最有效率。 本书内容还源自芬尼克兹、红领集团、名创优品、必要......一起来看看 《趋势红利》 这本书的介绍吧!

JS 压缩/解压工具
JS 压缩/解压工具

在线压缩/解压 JS 代码

随机密码生成器
随机密码生成器

多种字符组合密码

MD5 加密
MD5 加密

MD5 加密工具