Elasticsearch数据更新全方位解析

栏目: 后端 · 发布时间: 6年前

内容简介:“需求是人类进步的阶梯“,这是著名的我们当前项目产品经理在提出最后一次需求时说出的话(为什么是最后一句话, 因为被祭天了)。前段时间在项目的crm存储部分,为了满足大量自定义的搜索功能,选择了使用了ES作为后端存储介质。期间随着需求的变更对ES存储数据更新方式也多方面进行了了解,本着好记性不如烂笔头,记录在此。需求:每次完全替换crm中某document的所有的数据

导言

“需求是人类进步的阶梯“,这是著名的我们当前项目产品经理在提出最后一次需求时说出的话(为什么是最后一句话, 因为被祭天了)。

前段时间在项目的crm存储部分,为了满足大量自定义的搜索功能,选择了使用了ES作为后端存储介质。期间随着需求的变更对ES存储数据更新方式也多方面进行了了解,本着好记性不如烂笔头,记录在此。

完整更新

需求:每次完全替换crm中某document的所有的数据

方案:通过插入更新

ES API:index

在ES中,通过提供一个document ID,index API可以实现插入一条新的document,或者更新一条已有的document到ES中。至于document存在与否的判断,则是根据给到的document ID来判定。因此,通过index API来对已有的文档实现更新,其实是进行了一次reindex操作从而实现。

下面的操作,是向ES的index crm中插入一条数据document ID为1的数据:

PUT crm/1
{
    "crm_id": 12001,
    "user_name":"王五",
    "age": 30,
    "mobile":"13600000001",
}

我们的开发语言为golang,在 go 语言中,我们采用了https://github.com/olivere/elastic中的Elastic库来实现与ES的交互,对应的代码如下:

type CrmData struct {
	CrmId     int           `json:"crm_id"`
	UserName  string        `json:"user_name"`
	Age       int           `json:"age"`
	Mobile    string        `json:"mobile"`
}

crmData1 := CrmData{CrmId: 12001, UserName: "王五", Age: 30, Mobile:"13600000001"}

// 添加一份document
indexResult, err := esClient.Index().
		Index("crm").
		Id("1").
		BodyJson(&crmData1).
		Do(context.TODO())
	if err != nil {
		//error code
}

需要注意的是,如上所提到,这种方式的更新存在两个局限性或者说特性:

1. 只能实现对一个document ID的整个内容进行全部的更新,如果需要更新document中的部分field,无法实现;

2. 必须事先知道document的ID*。

*请注意,在大多数ES操作中,若无业务需求,建议不指定document ID,因为在ES中插入数据时,通过指定ID进行index的操作比起使用随机ID的性能要稍微差些。

部分更新

开发完上述需求没两天,将被祭天的产品经理又来了。这次是客户的后端数据更新,希望可以往每一个会员数据中插入一些新的字段。也就是需要我们支持到,通过document ID实现对某文档的部分field更新。不过,兵来将挡水来土掩,问题不大。

需求:更新ES中某document的部分数据

方案:通过update更新

ES API:update

ES中的update API支持到根据用户提供的脚本去实现更新一份document的功能,而脚本语言的支持极大地提升了我们对数据进行更新的灵活性。

举个例子,对于上述插入到ES中的第一条用户crm数据,现在,客户的crm数据更新了会员等级制度,也就是需要对ES中每一条docuemnt数据增加一份会员等级信息(类似的需求诸如:随着新的一年的到来,需要对用户的年龄增加一岁),对此,在ES中可以操作如下:

POST crm/1/_update
{
    "script" : {
        "source": "ctx._source.level=params.level;ctx._source.age+= params.count",
        "lang": "painless",
        "params" : {
            "level": 2
            "count" : 1
        }
    }
}

上述的操作中,首先,我们通过document ID=1来实现对应的document的索引,其次,通过script字段中的内容,来实现对具体field的更新。在update的script字段中:

  • source是将要执行的脚本内容;
  • lang表示的是当前脚本的语言*;
  • param则是脚本执行的参数;

*注:lang支持多种语言,关于本文使用的painless脚本的语言等,不是本文介绍的内容,需要了解的朋友,可以去这儿参考详情:https://www.elastic.co/guide/en/elasticsearch/reference/master/modules-scripting-painless.html。

在update API实现的逻辑中,其实可以理解为三步操作:

  • index:根据document ID去索引中获取到对应的document快照信息;
  • update:根据script脚本来更新document;
  • reindex:将更新后的document重新写回到索引;

在上述的第一步和第三步执行时,update操作都会应用到ES内部的version以实现版本控制,从而保证document在更新的过程中没有发生改变。因此,需要注意的是,ES的update API依然是需要对文档做一次完全的reindex操作,而不是直接去修改原始document。但是,update API所能做的是减少了网络交互次数,当然这比起我们自己通过index获取数据并在业务代码中更新再写回到ES来实现,大大的减少了版本冲突的概率。

上面提到,在update 中第一步和第三步执行时,update操作都会应用到ES内部的version以实现版本控制。那么,冲突了我们如何解决呢?

在遇到版本冲突问题时,ES将会返回409 Conflict HTTP错误码。因此,当遇到409后,为了保证数据的最终插入,我们就必须要考虑到retry机制。为了实现冲突后的retry,有两种方案来实现:

1. 业务代码自定义

通过识别409错误,在业务代码中,跟据自己的需求来进行retry。因为是自定义的逻辑,所以我们可以任意的操作retry的回退策略,以及retry的内容等;

2. retry_on_conflict

通过在参数中指定来实现retry_on_conflict来实现,如下所示:

POST crm/_doc/1/_update?retry_on_conflict=3
{
    "script" : {
        "source": "ctx._source.level=params.level;ctx._source.age+= params.count",
        "lang": "painless",
        "params" : {
            "level": 2
            "count" : 1
        }
    }
}

上述的操作,结合retry_on_conflict,当ES的update过程中检测到version发生了变化,则会在之后尝试更新三次,在golang中对应的代码:

update := esClient.Update().
		Index("crm").
		Id("1").
		Script(elastic.NewScript("ctx._source.level=level;ctx._source.age+= params.count").
		Params(map[string]interface{}{"level": 2, "count":1}).
		Lang("painless")).
		RetryOnConflict(3)

根据上面的描述,我们可以发现,通过update API来进行更新的方式,对于那些不依赖执行顺序的更新行为显然更为合适,比如counter累加这类,再依赖于强大脚本语言的支持,使得更新的功能更加丰富。但是,这一切的前提依然依赖于对document ID的已知。

通过搜索更新

当我还沉浸于不费吹灰之力又解决了一个需求的时候,产品经理端了端镜框,站了过来,并慢慢踱步而来。你没猜错,新的需求又来了!这次是来自于产品经理自己的产品升级需求,需要对用户的crm数据做出画像功能,同时对所有crm会员数据进行标签分组。

关于标签功能的设计,不是本文的重点内容,我们不在此花更多的篇幅介绍,简化下来,可以用一句话来说明需求:将年纪介于30-40岁的会员添加一个【高购买力人群】的标签。虽然这句话很短,但是根据我们目前的方案来看,这可就为难了。在没办法知道符合这个年龄段所有人群在crm中的document ID的前提下,我们通过上述两份方案均无法实现,或者换句话说,如果需要知道这些ID,那就需要额外的存储(如MySQL)来记录这些所有的ID信息,而这最后,就会演变成将数据完全的存储到了 MySQL 中,显然是不可取的。也就是说无论是通过index或者update方案,一开始就是行不通的。这时候,我们就需要第三种更新方式了。

需求:需要将年纪介于30-40岁的会员添加一个【高购买力人群】的标签

方案:通过搜索更新来实现

ES API:update_by_query

update_by_query,顾名思义,这种更新方式,即通过查询再更新。对应上述的需求,为了实现对年龄在30-40之间的会员添加标签,在ES中,我们通过update_by_query中的query和script来实现先查询再更新的机制:

POST crm/_update_by_query?conflicts=proceed
{
  "script": {
    "source": "ctx._source.tag.add(param.tag)",
    "lang": "painless",
    "param":{
      "tag" : 10
    }
  },
  "query": {
    "term": {
       "range" : {
            "age" : {
                "gte" : 30,
                "lte" : 40
            }
       }
    }
  }
}

其中,query字段,表示我们query的条件*,根据该条件,ES将找到对应的document。(*注:ES中query的语法,不是本文讨论的主要内容,将不赘述。)

通过上述命令,ES将首先执行query语句找到对应的document,再根据script中的语法去更新相应的字段(上面示例中假设tag=10表示的是【高购买力人群】)。

细心的朋友会发现,在上述请求中我们还指定的conflicts=proceed参数。当update_by_query执行的时候,也应用了ES内部的version以支持到版本控制 ,也就是说,我们在执行过程可能会出现版本冲突的问题。默认情况下,update_by_query在遇到版本冲突问题时,同样返回409错误码,如果需求场景是不介意版本冲突的,那么可以按照上文那样,通过指定conflicts=proceed,从而当出现版本冲突时,ES将会继续执行更新的操作。至于Retry机制,则可以采用一样的方案来实现。上述介绍的部份,对应在golang中的代码如下:

res, err := esClient.UpdateByQuery("crm").
		Query(elastic.NewRangeQuery("created").Gte(30).Lte(40))).
		Script(elastic.NewScriptInline("ctx._source.tag.add(param.tag)")).
		Param(map[string]interface{}{"tag": 10}).
		ProceedOnVersionConflict().
		Do(context.TODO())
	if err != nil {
		t.Fatal(err)
	}

批量操作

在实际代码运行过程中,我们很有可能面对的场景是批量更新。如果通过for循环来一条条操作自然是可以实现的,但是网络来回导致的性能问题惨不忍睹。这时候我们便需要使用ES的buck AP来实现批量操作。在golang代码中,一个bulk update的操作大概如下所示:

func addTagToEs(es *elastic.Client, esIndex string, tagId int, esIds []string) (err error) {
	var res *elastic.BulkResponse
	bulkReq := es.Bulk()
	for _, esId := range esIds {
	  // 创建bulk的request
		scriptStr := "if (ctx._source.user_tag.contains(params.tagId)==false) { ctx._source.user_tag.add(params.tagId) } "
		updateReq := elastic.NewBulkUpdateRequest().
			Index(esIndex).
			Type(TYPE_POPULATION).
			Id(esId).
			RetryOnConflict(3).
			Script(elastic.NewScriptInline(scriptStr).Param("tagId", tagId))
		bulkReq = bulkReq.Add(updateReq)

		// 执行批量添加
		if bulkReq.NumberOfActions() >= inited.EsBulksize {
			res, err = bulkReq.Do(context.TODO())
			if err != nil {
				log.LoggerWrapperWithCaller().Errorf(err.Error())
				return
			}
			if res.Errors {
				rep := res.Failed()
				for _, v := range rep {
					err = fmt.Errorf("Failed: %v, error msg: %v", *v, *(v.Error))
					log.LoggerWrapperWithCaller().Errorf(err.Error())
				}
				err = fmt.Errorf("bulk insert commit failed")
				log.LoggerWrapperWithCaller().Errorf(err.Error())
				return
			}
		}
	}

	// 将剩余的继续添加
	if bulkReq.NumberOfActions() > 0 {
		res, err = bulkReq.Do(context.TODO())
		if err != nil {
			log.LoggerWrapperWithCaller().Errorf(err.Error())
			return
		}
		if res.Errors {
			rep := res.Failed()
			for _, v := range rep {
				err = fmt.Errorf("Failed: %v, error msg: %v", *v, *(v.Error))
				log.LoggerWrapperWithCaller().Errorf(err.Error())
			}
			err = fmt.Errorf("bulk insert commit failed")
			log.LoggerWrapperWithCaller().Errorf(err.Error())
			return
		}
	}

	return
}

疑难杂症

在开发过程中,尤其是批量操作update过程中,很多人可能会遇到了一个script compliation 限制的错误:

Too many dynamic script compilations within one minute

通过搜索不难发现,当我们使用script功能的时候,在ES中需要对该脚本进行编译,但是ES对脚本编译有个限制的配置,可以通过下面操作来修改该解析上限的配置:

PUT /_cluster/settings
{
  "transient": {
    "script.max_compilations_per_minute": 40
  }
}

然而,对于大批量的数据更新通过这样的配置显然是治标不治本(比如一次更新上万条数据,这在ES这中存储介质中已经是很小很小的量了)。经过进一步分析,原来每当遇到一条不同的脚本时,ES都需要单独的编译解析,因此,当进行bulk update时,每一个脚本都实时编译的话,很快就会达到了上述的上限。

因此第一个需要注意的问题,bulk update操作时的脚本一定要尽量保持变化较少。

其实,对于上述问题,在大多数的更新场景中都是可以满足的。在大多数的更新场景中,我们更多都是去更新相同的字段,也因此批量更新的脚本基本都是同一个(或者几个),但仅仅是这样却并不能完全解决问题,在批量更新脚本编写的过程中,依然会有个坑等着你踩。

第二个需要注意的问题,就是脚本的编写了。这需要我们了解一个知识背景:ES如何认定一个脚本是否需要实时编译的。以一个简单例子举例,当更新用户会员年龄信息时,可能会是【1-100】岁之间的任意整数,如果时20岁,对应的脚本应该如下:

"script" : {
        "source": "ctx._source.age=20",
        "lang": "painless"
    }

按照这种方式就意味着,每一个年龄都会需要一条脚本对应着,那么最多会有100条脚本,从而在批量更新年龄的时候就很大概率会超过上面说到的编译脚本的限制。

那么有没有解决方法呢?当然是有的,这时候我们就需要结合script中的param功能,同样的操作,下面这条脚本,无论年龄在【0-100】中如何变化,对于ES中都只会在第一次进行解析:ctx._source.age=params.age这个脚本,之后便无需再次解析:

"script" : {
        "source": "ctx._source.age=params.age",
        "lang": "painless",
        "params" : {
            "age": 20
        }
    }

通过观察上述两种写法,不难发现,当脚本中有常数变量时,ES会实时编译脚本,这时候可以通过结合script中param的功能,设法将脚本中的变量通过param传递进去,从而从根本上解决脚本编译解析限制的问题。


以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

Head First Rails

Head First Rails

David Griffiths / O'Reilly Media / 2008-12-30 / USD 49.99

Figure its about time that you hop on the Ruby on Rails bandwagon? You've heard that it'll increase your productivity exponentially, and allow you to created full fledged web applications with minimal......一起来看看 《Head First Rails》 这本书的介绍吧!

随机密码生成器
随机密码生成器

多种字符组合密码

Base64 编码/解码
Base64 编码/解码

Base64 编码/解码

MD5 加密
MD5 加密

MD5 加密工具