内容简介:说明:
说明:
-
C#的elastic client通过ConnectionSettings来指定es server的地址
-
C#的Request有多种类型封装,比如Search相关的封装SearchRequest, GetAlias相关的GetAliasRequest, Count相关的CountRequest, ping相关的PingRequest等。 即,es默认支持的关键字都有对应的request
-
C#的Query有matchQuery、DateRangeQuery等, 相互之间用条件运算符可以组合查询, 最终生成的restful 请求头转换为bool query了
-
C#的Aggregation也有TermsAggregation、DateHistogramAggregation等, 用来普通分类和时间分类等, 不像js,需要自己用字符串指定。 返回值也要在Aggregations里查看。
public async Task<IReadOnlyCollection<DateHistogramBucket>> GetScanPatientStatistics( string systemID, string startDate, string endDate, DateInterval dateInterval, string indices = "*scanstatistics*" )
{
var client = GetClient();
var filterPath = new string [] { "aggregations.*.buckets" };
var request = new SearchRequest(indices)
{
Query = new MatchQuery() { Field = "systemID" , Query = systemID }
&& new DateRangeQuery() { Field = "timeStamp" , GreaterThanOrEqualTo = startDate, LessThanOrEqualTo = endDate, Format = "yyyyMMdd" },
Aggregations = new DateHistogramAggregation( "agg_date_histogram_timeStamp" )
{
Field = "timeStamp" ,
Interval = dateInterval,
Format = "yyyy-MM-dd HH:mm:ss" ,
Aggregations = new ValueCountAggregation( "agg_count_systemID" , "systemID.keyword" )
},
FilterPath = filterPath
};
var response = await client.SearchAsync<CTDMS<CTDMSItem>>(request);
var dateHistogram = response.Aggregations.DateHistogram( "agg_date_histogram_timeStamp" );
return dateHistogram.Buckets;
}
-
Buckets可以转为Dictionary
public async Task<Dictionary<DateTime, double ?>> GetScanPatientStatistics( string systemID, DateRangeEnum dateRange, DateInterval dateInterval, string indices = "*scanstatistics*" )
{
var buckets = await GetScanPatientStatistics(systemID, DateRangeEnumExtensions.ToString(dateRange), "now" , dateInterval, indices);
return buckets.ToDictionary(k => k.Date, v => v.Values.OfType<ValueAggregate>().First().Value);
}
-
C#对于各种查询都有数据类型校验, 这点和js api不相同, 因此必须指定类型
代码如下:
namespace My.ElasticSearch
{
/// <summary>
/// sample code 如下:
/// var search = new SearchDomainService();
//获取当月的每天温度
/*
var result = search.GetDetectorTemp("987654321098",
DetectorEnum.DetectorTempL,
DateRangeEnum.CurrentMonth,
DateInterval.Day).GetAwaiter().GetResult();
*/
//获取当前es中的所有index
/*
var indices = search.GetIndices().GetAwaiter().GetResult();
*/
//当前服务是否ok
/*
var isAvailable = search.IsAvailable().GetAwaiter().GetResult();
*/
//获取扫描量
/*
var count = search.GetScanCount("987654321098", "now/y").GetAwaiter().GetResult();
*/
//获取每一类部位的扫描量
/*
var stat = search.GetScanBodypartStatistics("987654321098", DateRangeEnum.CurrentYear).GetAwaiter().GetResult();
*/
//获取最后一次上传日志的时间
/*
var date = search.GetLatestUploadTime("987654321098").GetAwaiter().GetResult();
*/
/*
var date = search.GetLatestUploadTime<CTScanStatistics<CTScanStatisticsItem>, CTScanStatisticsItem>("987654321098").GetAwaiter().GetResult();
*/
/// </summary>
public class ElasticSearchDomainService
{
private ConnectionSettings _connectionSettings;
private ElasticClient _client = null;
public ElasticSearchDomainService()
{
_connectionSettings = new ConnectionSettings(new Uri(" http://localhost:9200 ")).DisableDirectStreaming();
}
public void SetConnectionSettingUrl(string url)
{
_connectionSettings = new ConnectionSettings(new Uri(url)).DisableDirectStreaming();
}
public ElasticClient GetClient()
{
if (_client == null)
{
_client = new ElasticClient(_connectionSettings);
}
return _client;
}
/// <summary>
/// -> POST http://localhost:9200/*dms*/_search?filter_path=aggregations.agg_date_histogram_timeStamp.buckets
/// {
/// "query": {
/// "bool": {
/// "must": [
/// {
/// "match": {
/// "systemID": "987654321098"
/// }
/// },
/// {
/// "match": {
/// "items.name.keyword": "DetectorTempL"
/// }
/// },
/// {
/// "range": {
/// "timeStamp": {
/// "gte": "now/M"
/// }
/// }
/// }
/// ]
/// }
/// },
/// "aggs": {
/// "agg_date_histogram_timeStamp": {
/// "date_histogram": {
/// "field": "timeStamp",
/// "interval": "1d",
/// "format": "yyyy-MM-dd HH:mm:ss"
/// },
/// "aggs": {
/// "agg_avg_items.value": {
/// "avg": {
/// "field": "items.value"
/// }
/// }
/// }
/// }
/// }
/// }
/// <- 200
/// {
/// "aggregations": {
/// "agg_date_histogram_timeStamp": {
/// "buckets": [
/// {
/// "key_as_string": "2018-05-30",
/// "key": 1527638400000,
/// "doc_count": 23,
/// "agg_avg_items.value": {
/// "value": 38.84304356229478
/// }
/// }
/// ]
/// }
/// }
/// }
/// 此方法提供特定设备的特定探测器在某个时间范围的每个interval的温度平均值
/// </summary>
/// <param name="systemID">查询的设备唯一标识,index中的systemID字段</param>
/// <param name="detector">查询的设备的探测器,有左、中、右。Index的items.name字段</param>
/// <param name="startDate">查询的时间范围的起始时间, 格式yyyyMMdd,参考es官方的Date Math</param>
/// <param name="endDate">查询的时间范围的终止时间, 格式yyyyMMdd,参考es官方的Date Math</param>
/// <param name="dateInterval">统计的时间间隔</param>
/// <param name="indices">查询的index name</param>
/// <returns>返回一个es自带的DateHistogramBucket的数组。DateHistogramBucket对象中包括分类的时间点及对应的值</returns>
public async Task<IReadOnlyCollection<DateHistogramBucket>> GetDetectorTemp(string systemID, DetectorEnum detector, string startDate, string endDate, DateInterval dateInterval, string indices = "*dms*")
{
var client = GetClient();
var filterPath = new string[] { "aggregations.*.buckets" };
var request = new SearchRequest(indices)
{
Query = new MatchQuery() { Field = "systemID", Query = systemID }
&& new MatchQuery() { Field = "items.name.keyword", Query = detector.ToString() }
&& new DateRangeQuery() { Field = "timeStamp", GreaterThanOrEqualTo = startDate, LessThanOrEqualTo = endDate, Format = "yyyyMMdd" },
Aggregations = new DateHistogramAggregation("agg_date_histogram_timeStamp")
{
Field = "timeStamp",
Interval = dateInterval,
Format = "yyyy-MM-dd HH:mm:ss",
Aggregations = new AverageAggregation("agg_avg_items.value", "items.value")
},
FilterPath = filterPath,
};
var response = await client.SearchAsync<CTDMS<CTDMSItem>>(request);
var dateHistogram = response.Aggregations.DateHistogram("agg_date_histogram_timeStamp");
return dateHistogram.Buckets;
}
/// <summary>
/// 此方法提供特定设备的特定探测器在某个时间范围的每个interval的温度平均值
/// </summary>
/// <param name="systemID">查询的设备唯一标识,index中的systemID字段</param>
/// <param name="detector">查询的设备的探测器,有左、中、右。Index的items.name字段</param>
/// <param name="dateRange">查询的时间范围,自定义的enum有当天、当月、当年、昨天等预设范围。如果需要一个自由的时间范围,请用另一个dateRange为string类型的同名函数</param>
/// <param name="dateInterval">统计的时间间隔</param>
/// <param name="indices">查询的index name</param>
/// <returns>返回一个字典,key为日期,value为该日期内的温度平均值。key主要根据interval来阶段递增变化</returns>
public async Task<Dictionary<DateTime, double?>> GetDetectorTemp(string systemID, DetectorEnum detector, DateRangeEnum dateRange, DateInterval dateInterval, string indices = "*dms*")
{
var buckets = await GetDetectorTemp(systemID, detector, DateRangeEnumExtensions.ToString(dateRange), "now", dateInterval, indices);
return buckets.ToDictionary(k => k.Date, v => v.Values.OfType<ValueAggregate>().First().Value);
}
/// <summary>
///-> GET http://localhost:9200/_alias?filter_path=-*kibana *,-*es*,-*monitoring*,metric*
///<- 200
///{
/// "metric_2018_05_30_ctdms`1": {
/// "aliases": {}
/// },
/// "metric_2018_05_30_ctaws`1": {
/// "aliases": {}
/// },
/// "metric_2018_05_30_ctscanstatistics`1": {
/// "aliases": {}
/// },
/// "metric_2018_05_30_ctdailystatistics`1": {
/// "aliases": {}
/// }
///}
///此方法用于获取当前es中的所有index name
/// </summary>
/// <param name="filterPath">指定需要过滤的或者显示的index,可以排除系统自带的一些index</param>
/// <returns>当前和业务相关的所有设备名称的数组</returns>
public async Task<List<string>> GetIndices(string[] filterPath = null)
{
var client = GetClient();
if (filterPath == null)
{
filterPath = new string[] { "-*kibana*", "-*es*", "-*monitoring*", "metric*" };
}
var request = new GetAliasRequest()
{
FilterPath = filterPath
};
var response = await client.GetAliasAsync(request);
return response.Indices.Keys.Select(e => e.Name).ToList();
}
/// <summary>
/// -> POST http://localhost:9200/*scanstatistics*/_search?filter_path=hits.hits._source.timeStamp
/// {
/// "size": 1,
/// "sort": [
/// {
/// "timeStamp": {
/// "order": "asc"
/// }
/// }
/// ],
/// "query": {
/// "match": {
/// "systemID": "987654321098"
/// }
/// }
/// }
/// <- 200
/// {
/// "hits": {
/// "hits": [
/// {
/// "_source": {
/// "timeStamp": "2018-05-30T09:48:26.705659+08:00"
/// }
/// }
/// ]
/// }
/// }
/// 返回特定数据类型的某个设备的最后一条日志的上传时间。用于显示设备的最后一次更新状态
/// </summary>
/// <param name="systemID">查询的设备唯一标识,index中的systemID字段</param>
/// <param name="indices">查询的index name</param>
/// <returns>最后一条日志的上传时间</returns>
public async Task<DateTime> GetLatestUploadTime<T1, T2>(string systemID, string indices = "*scanstatistics*")
where T1 : BaseMetrics<T2>
where T2 : BaseItem
{
var client = GetClient();
var filterPath = new string[] { "hits.hits._source.timeStamp" };
var request = new SearchRequest(indices)
{
Query = new MatchQuery() { Field = "systemID", Query = systemID },
Size = 1,
Sort = new List<ISort>
{
new SortField {Field = "timeStamp", Order = SortOrder.Descending}
},
FilterPath = filterPath
};
var response = await client.SearchAsync<T1>(request);
return response.Hits.First().Source.TimeStamp;
}
/// <summary>
/// 获取特定设备在某个时间范围内的扫描部位量的统计。
/// </summary>
/// <param name="systemID">查询的设备唯一标识,index中的systemID字段</param>
/// <param name="dateRange">查询的时间范围,自定义的enum有当天、当月、当年、昨天等预设范围。如果需要一个自由的时间范围,请用另一个dateRange为string类型的同名函数</param>
/// <param name="indices">查询的index name</param>
/// <returns>一个字典,key为扫描部位的名称, value为该扫描部位在查询的时间范围内的总数量之和。</returns>
public async Task<Dictionary<string, double?>> GetScanBodypartStatistics(string systemID, DateRangeEnum dateRange, string indices = "*scanstatistics*")
{
var buckets = await GetScanBodypartStatistics(systemID, DateRangeEnumExtensions.ToString(dateRange), "now", indices);
return buckets.ToDictionary(k => k.Key, v => v.Values.OfType<ValueAggregate>().First().Value);
}
/// <summary>
/// -> POST http://localhost:9200/*scanstatistics*/_search?filter_path=aggregations.agg_terms_items \.name\.keyword.buckets
/// {
/// "query": {
/// "bool": {
/// "must": [
/// {
/// "match": {
/// "systemID": "987654321098"
/// }
/// },
/// {
/// "range": {
/// "timeStamp": {
/// "gte": "now/M"
/// }
/// }
/// }
/// ]
/// }
/// },
/// "aggs": {
/// "agg_terms_items.name.keyword": {
/// "terms": {
/// "field": "items.name.keyword"
/// },
/// "aggs": {
/// "agg_sum_items.value": {
/// "sum": {
/// "field": "items.value"
/// }
/// }
/// }
/// }
/// }
/// }
/// <- 200
/// {
/// "aggregations": {
/// "agg_terms_items.name.keyword": {
/// "buckets": [
/// {
/// "key": "Upper Extremity",
/// "doc_count": 15,
/// "agg_sum_items.value": {
/// "value": 1247
/// }
/// },
/// {
/// "key": "Pelvis",
/// "doc_count": 13,
/// "agg_sum_items.value": {
/// "value": 888
/// }
/// },
/// {
/// "key": "Neck",
/// "doc_count": 11,
/// "agg_sum_items.value": {
/// "value": 939
/// }
/// },
/// {
/// "key": "Spine",
/// "doc_count": 11,
/// "agg_sum_items.value": {
/// "value": 797
/// }
/// },
/// {
/// "key": "Thorax",
/// "doc_count": 11,
/// "agg_sum_items.value": {
/// "value": 1016
/// }
/// },
/// {
/// "key": "Abdomen",
/// "doc_count": 9,
/// "agg_sum_items.value": {
/// "value": 732
/// }
/// },
/// {
/// "key": "Head",
/// "doc_count": 8,
/// "agg_sum_items.value": {
/// "value": 531
/// }
/// }
/// ]
/// }
/// }
/// }
/// 获取特定设备在某个时间范围内的扫描部位量的统计。
/// </summary>
/// <param name="systemID">查询的设备唯一标识,index中的systemID字段</param>
/// <param name="startDate">查询的时间范围的起始时间, 格式yyyyMMdd,参考es官方的Date Math</param>
/// <param name="endDate">查询的时间范围的终止时间, 格式yyyyMMdd,参考es官方的Date Math</param>
/// <param name="indices">查询的index name</param>
/// <returns>一个es自带的KeyedBucket的数组。KeyedBucket对象包括分类的name和value</returns>
public async Task<IReadOnlyCollection<KeyedBucket<string>>> GetScanBodypartStatistics(string systemID, string startDate, string endDate, string indices = "*scanstatistics*")
{
var client = GetClient();
var filterPath = new string[] { "aggregations.*.buckets" };
var request = new SearchRequest(indices)
{
Query = new MatchQuery() { Field = "systemID", Query = systemID }
&& new DateRangeQuery() { Field = "timeStamp", GreaterThanOrEqualTo = startDate, LessThanOrEqualTo = endDate, Format = "yyyyMMdd" },
Aggregations = new TermsAggregation("terms")
{
Field = "items.name.keyword",
Aggregations = new SumAggregation("sum", "items.value")
},
FilterPath = filterPath
};
var response = await client.SearchAsync<CTScanStatistics<CTScanStatisticsItem>>(request);
var agg = response.Aggregations.Terms("terms");
return agg.Buckets;
}
/// <summary>
/// 查询某个时间范围内的每个interval的患者扫描量
/// </summary>
/// <param name="systemID">查询的设备唯一标识,index中的systemID字段</param>
/// <param name="dateRange">查询的时间范围,自定义的enum有当天、当月、当年、昨天等预设范围。如果需要一个自由的时间范围,请用另一个dateRange为string类型的同名函数</param>
/// <param name="dateInterval">统计的时间间隔</param>
/// <param name="indices">查询的index name</param>
/// <returns>返回一个字典,key为日期,value为该日期内的患者扫描量总和。key主要根据interval来阶段递增变化</returns>
public async Task<Dictionary<DateTime, double?>> GetScanPatientStatistics(string systemID, DateRangeEnum dateRange, DateInterval dateInterval, string indices = "*scanstatistics*")
{
var buckets = await GetScanPatientStatistics(systemID, DateRangeEnumExtensions.ToString(dateRange), "now", dateInterval, indices);
return buckets.ToDictionary(k => k.Date, v => v.Values.OfType<ValueAggregate>().First().Value);
}
/// <summary>
/// -> POST /*scanstatistics*/_search?typed_keys=true&filter_path=aggregations.*.buckets
///{
/// "query": {
/// "bool": {
/// "must": [
/// {
/// "match": {
/// "systemID": {
/// "query": "987654321098"
/// }
/// }
/// },
/// {
/// "range": {
/// "timeStamp": {
/// "gte": "now/y"
/// }
/// }
/// }
/// ]
/// }
/// },
/// "aggs": {
/// "agg_date_histogram_timeStamp": {
/// "date_histogram": {
/// "field": "timeStamp",
/// "interval": "month",
/// "format": "yyyy-MM-dd HH:mm:ss"
/// },
/// "aggs": {
/// "agg_count_systemID": {
/// "value_count": {
/// "field": "systemID.keyword"
/// }
/// }
/// }
/// }
/// }
///}
/// <- 200
/// {
/// "aggregations": {
/// "date_histogram#agg_date_histogram_timeStamp": {
/// "buckets": [
/// {
/// "key_as_string": "2018-05-01 00:00:00",
/// "key": 1525132800000,
/// "doc_count": 30,
/// "value_count#agg_count_systemID": {
/// "value": 30
/// }
/// },
/// {
/// "key_as_string": "2018-06-01 00:00:00",
/// "key": 1527811200000,
/// "doc_count": 30,
/// "value_count#agg_count_systemID": {
/// "value": 30
/// }
/// }
/// ]
/// }
/// }
///}
/// 查询某个时间范围内的每个interval的患者扫描量
/// </summary>
/// <param name="systemID">查询的设备唯一标识,index中的systemID字段</param>
/// <param name="startDate">查询的时间范围的起始时间, 格式yyyyMMdd,参考es官方的Date Math</param>
/// <param name="endDate">查询的时间范围的终止时间, 格式yyyyMMdd,参考es官方的Date Math</param>
/// <param name="dateInterval">统计的时间间隔</param>
/// <param name="indices">查询的index name</param>
/// <returns>返回一个es自带的DateHistogramBucket的数组。DateHistogramBucket对象中包括分类的时间点及对应的值</returns>
public async Task<IReadOnlyCollection<DateHistogramBucket>> GetScanPatientStatistics(string systemID, string startDate, string endDate, DateInterval dateInterval, string indices = "*scanstatistics*")
{
var client = GetClient();
var filterPath = new string[] { "aggregations.*.buckets" };
var request = new SearchRequest(indices)
{
Query = new MatchQuery() { Field = "systemID", Query = systemID }
&& new DateRangeQuery() { Field = "timeStamp", GreaterThanOrEqualTo = startDate, LessThanOrEqualTo = endDate, Format = "yyyyMMdd" },
Aggregations = new DateHistogramAggregation("agg_date_histogram_timeStamp")
{
Field = "timeStamp",
Interval = dateInterval,
Format = "yyyy-MM-dd HH:mm:ss",
Aggregations = new ValueCountAggregation("agg_count_systemID", "systemID.keyword")
},
FilterPath = filterPath
};
var response = await client.SearchAsync<CTDMS<CTDMSItem>>(request);
var dateHistogram = response.Aggregations.DateHistogram("agg_date_histogram_timeStamp");
return dateHistogram.Buckets;
}
/// <summary>
/// 获取某个设备在特定时间范围内的患者扫描量。
/// 每条日志就是该患者在当时扫描时记录的log,因此统计扫描log的数量即可得到扫描患者的数量
/// </summary>
/// <param name="systemID">查询的设备唯一标识,index中的systemID</param>
/// <param name="dateRange">查询的时间范围,自定义的enum有当天、当月、当年、昨天等预设范围。如果需要一个自由的时间范围,请用另一个dateRange为string类型的同名函数</param>
/// <param name="indices">查询的index name</param>
/// <returns>患者扫描的总和</returns>
public async Task<long> GetTotalScanPatientCount(string systemID, DateRangeEnum dateRange, string indices = "*scanstatistics*")
{
return await GetTotalScanPatientCount(systemID, DateRangeEnumExtensions.ToString(dateRange), "now", indices);
}
/// <summary>
/// -> POST http://localhost:9200/*scanstatistics*/_count
/// {
/// "query": {
/// "bool": {
/// "must": [
/// {
/// "match": {
/// "systemID": "987654321098"
/// }
/// },
/// {
/// "range": {
/// "timeStamp": {
/// "gte": "now/d"
/// }
/// }
/// }
/// ]
/// }
/// }
/// }
/// <- 200
/// {
/// "count": 30,
/// "_shards": {
/// "total": 15,
/// "successful": 15,
/// "skipped": 0,
/// "failed": 0
/// }
/// }
/// 获取某个设备在特定时间范围内的患者扫描量。
/// 每条日志就是该患者在当时扫描时记录的log,因此统计扫描log的数量即可得到扫描患者的数量
/// </summary>
/// <param name="systemID">查询的设备唯一标识,index中的systemID</param>
/// <param name="startDate">查询的时间范围的起始时间, 格式yyyyMMdd,参考es官方的Date Math</param>
/// <param name="endDate">查询的时间范围的终止时间, 格式yyyyMMdd,参考es官方的Date Math</param>
/// <param name="indices">查询的index name</param>
/// <returns>患者扫描的总和</returns>
public async Task<long> GetTotalScanPatientCount(string systemID, string startDate, string endDate, string indices = "*scanstatistics*")
{
var client = GetClient();
var request = new CountRequest(indices)
{
Query = new MatchQuery() { Field = "systemID", Query = systemID }
&& new DateRangeQuery() { Field = "timeStamp", GreaterThanOrEqualTo = startDate, LessThanOrEqualTo = endDate, Format = "yyyyMMdd" }
};
var response = await client.CountAsync<CTScanStatistics<CTScanStatisticsItem>>(request);
return response.Count;
}
/// <summary>
/// 判断当前ES的服务是否有效。建议在执行每一个ES的操作前都应该先调用此方法。
/// </summary>
/// <returns>true,服务运行正常; false,服务运行异常</returns>
public async Task<bool> IsAvailable()
{
var client = GetClient();
var request = new PingRequest()
{
ErrorTrace = true
};
var response = await client.PingAsync(request);
return response.IsValid;
}
}
}
以上所述就是小编给大家介绍的《es的C# api 的封装和使用说明》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!
猜你喜欢:本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。