ES写入/查询底层原理
Elasticsearch写入数据流程
客户端随机选择一个ES集群中的节点,发送POST/PUT请求,被选择的节点为协调节点(coordinating node)协调节点查询集群状态信息并计算路由,将请求发送到真正处理请求的节点(primary shard所在的节点)包含primary shard的节点处理写入请求,并将数据同步到包含replica shard的节点coordinating node收到包含primary shard的节点的响应信息,将最终结果返回给Client端Elasticsearch读取数据流程
客户端随机选择一个ES集群中的节点,发送GET请求,被选择的节点为协调节点(coordinating node)协调节点查询集群状态信息并使用round-robin随机轮询算法计算出去此次请求的节点,将请求发送到真正处理请求的节点(主分片节点和副本节点随机分配)处理读请求的节点将数据返回给协调节点协调节点会把文档信息返回给ClientElasticsearch检索数据流程
客户端发送请求到一个协调节点协调节点将搜索请求转发到所有的shard对应的primary shard或replica shard也可以每个shard将自己的搜索结果(其实就是一些doc id),返回给协调节点,由协调节点进行数据的合并、排序、分页等操作,产出最终结果接着由协调节点,根据doc id去各个节点上拉取实际的document数据,最终返回给客户端这里需要注意,分页查询,当from特别大时会造成大量无用数据返回到协调节点,谨慎使用。
数据索引底层原理
先写入buffer,在buffer里的时候数据是搜索不到的;同时将数据写入translog日志文件。如果buffer到达阈值,或者到一定时间,ES会将buffer中的数据refresh到一个新的segment file中,但是此时数据不是直接进入segment file的磁盘文件的,而是先进入os cache的。这个过程就是refresh。每隔1秒钟,es就会将buffer中的数据写入到一个新的segment file,因此每秒钟产生一个新的磁盘文件(segment file),这个segment file中就存储最近1秒内buffer中写入的数据。如果buffer里面此时没有数据,就不会执行refresh操作;如果buffer里面有数据,默认1秒钟执行一次refresh操作,刷入一个新的segment file中。操作系统里面存在操作系统缓存(os cache),数据写入磁盘文件之前会先进入os cache,先进入操作系统级别的一个内存缓存中。只要buffer中的数据被refresh到os cache中,数据就可以被检索到了。可以通过es的restful api或者java api,手动执行一次refresh操作,就是手动将buffer中的数据刷入os cache中,让数据立马就可以被搜索到。只要数据被输入os cache中,buffer就会被清空了,因为不需要保留buffer了,数据在translog里面已经持久化到磁盘去一份了。这里需要注意,分页查询,当from特别大时会造成大量无用数据返回到协调节点,谨慎使用。
性能调优
系统层面
系统层面的调优主要是内存的设定与避免交换内存。ES 安装后默认设置的堆内存是 1GB,这很明显是不够的,那么接下来就会有一个问题出现:我们要设置多少内存给 ES 呢?其实这是要看我们集群节点的内存大小,还取决于我们是否在服务器节点上还是否要部署其他服务。如果内存相对很大,如 64G 及以上,并且不在 ES 集群上部署其他服务,那么建议 ES 内存可以设置为 31G-32G,因为这里有一个 32G 性能瓶颈问题,直白的说就是即使你给了 ES 集群大于 32G 的内存,其性能也不一定会更加优良,甚至会不如设置为 31G-32G 时候的性能。设置 ES 集群内存的时候,还有一点就是确保堆内存最小值(Xms)与最大值(Xmx)的大小是相同的,防止程序在运行时改变堆内存大小,这是一个很耗系统资源的过程。
禁止swap,一旦允许内存与磁盘的交换,会引起致命的性能问题。swap空间是一块磁盘空间,操作系统使用这块空间保存从内存中换出的操作系统不常用page数据,这样可以分配出更多的内存做page cache。这样通常会提升系统的吞吐量和IO性能,但同样会产生很多问题。页面频繁换入换出会产生IO读写、操作系统中断,这些都很影响系统的性能。这个值越大操作系统就会更加积极的使用swap空间。通过:在elasticsearch.yml 中 bootstrap.memory_lock: true, 以保持JVM锁定内存,保证ES的性能。
分片及副本层面
ES 是一个分布式的搜索引擎, 索引通常都会分解成不同部分, 分布在不同节点的部分数据就是分片。ES 自动管理和组织分片, 并在必要的时候对分片数据进行再平衡分配, 所以用户基本上不用担心分片的处理细节。创建索引时默认的分片数为 5 个,并且一旦创建不能更改。
ES 默认创建一份副本,就是说在 5 个主分片的基础上,每个主分片都相应的有一个副本分片。额外的副本有利有弊,有副本可以有更强的故障恢复能力,但也占了相应副本倍数的磁盘空间。
对于副本数,比较好确定,可以根据我们集群节点的多少与我们的存储空间决定,我们的集群服务器多,并且有足够大多存储空间,可以多设置副本数,一般是 1-3 个副本数,如果集群服务器相对较少并且存储空间没有那么宽松,则可以只设定一份副本以保证容灾(副本数可以动态调整)。
对于分片数,是比较难确定的。因为一个索引分片数一旦确定,就不能更改,所以我们在创建索引前,要充分的考虑到,以后我们创建的索引所存储的数据量,否则创建了不合适的分片数,会对我们的性能造成很大的影响。
查询大量小分片使得每个分片处理数据速度更快了,那是不是分片数越多,我们的查询就越快,ES 性能就越好呢?其实也不是,因为在查询过程中,有一个分片合并的过程,如果分片数不断的增加,合并的时间则会增加,而且随着更多的任务需要按顺序排队和处理,更多的小分片不一定要比查询较小数量的更大的分片更快。如果有多个并发查询,则有很多小碎片也会降低查询吞吐量。
如果现在你的场景是分片数不合适了,但是又不知道如何调整,那么有一个好的解决方法就是按照时间创建索引,然后进行通配查询。如果每天的数据量很大,则可以按天创建索引,如果是一个月积累起来导致数据量很大,则可以一个月创建一个索引。如果要对现有索引进行重新分片,则需要重建索引,对于每个index的shard数量,可以根据数据总量、写入压力、节点数量等综合考量后设定,然后根据数据增长状态定期检测下shard数量是否合理。
腾讯云CES技术团队的推荐方案是:对于数据量较小(100GB以下)的index,往往写入压力查询压力相对较低,一般设置35个shard,number_of_replicas设置为1即可(也就是一主一从,共两副本)。对于数据量较大(100GB以上)的index:一般把单个shard的数据量控制在(20GB50GB)让index压力分摊至多个节点:可通过index.routing.allocation.total_shards_per_node参数,强制限定一个节点上该index的shard数量,让shard尽量分配到不同节点上综合考虑整个index的shard数量,如果shard数量(不包括副本)超过50个,就很可能引发拒绝率上升的问题,此时可考虑把该index拆分为多个独立的index,分摊数据量,同时配合routing使用,降低每个查询需要访问的shard数量。
ES系统方面调优
确定集群CPU占用率高的原因,使用GET_nodes/{node}/hot_threads,如果结果为elasticsearch[{node}][search][T#10]则为查询导致,如果结果为elasticsearch[{node}][bulk][T#1],则为写入导致。
index.merge.scheduler.max_thread_count
在实际调优中,cpu使用率很高,使用SSD替代机械硬盘。SSD与机械磁盘相比,具有高效的读写速度和稳定性。如果不是SSD,建议设置index.merge.scheduler.max_thread_count: 1,即索引merge最大线程数设置为1 个,该参数可以有效调节写入的性能。因为在存储介质上并发写,由于寻址的原因,写入性能不会提升,只会降低。当有多个磁盘时可以设置为对应的数量。
index.refresh_interval
这个参数的意思是数据写入后几秒可以被搜索到,默认是 1s。每次索引的 refresh 会产生一个新的 lucene 段, 这会导致频繁的合并行为,如果业务需求对实时性要求没那么高,可以将此参数调大。
indices.memory.index_buffer_size
如果我们要进行非常重的高并发写入操作,那么最好将它调大一些,index buffer的大小是所有的shard公用的,对于每个 shard来说,最多给512MB,因为再大性能就没什么提升了。ES会将这个设置作为每个shard共享的index buffer,那些特别活跃的shard会更多的使用这个 buffer。默认这个参数的值是10%,也就是jvm堆内存的10%。
translog
ES为了保证数据不丢失,每次index、bulk、delete、update完成的时候,一定会触发刷新translog到磁盘上。在提高数据安全性的同时当然也降低了性能。如果你不在意这点可能性,还是希望性能优先,可以设置如下参数:
这样设定的意思是开启异步写入磁盘,并设定写入的时间间隔与大小,有助于写入性能的提升。
replica数目
为了让创建的es index在每台datanode上均匀分布,同一个datanode上同一个index的shard数目不应超过3个。计算公式: (number_of_shard*(1+number_of_replicas)) < 3*number_of_datanodes每台机器上分配的shard数目,index.routing.allocation.total_shards_per_node: 2
merge相关参数
超时参数
Linux层面相关调优
Linux中,每个进程默认打开的最大文件句柄数是1000,对于服务器进程来说,显然太小,通过修改/etc/security/limits.conf来增大打开最大句柄数* - nofile 65535
vm.dirty_background_ratio: 这个参数指定了当文件系统缓存脏页数量达到系统内存百分之多少时(如5%)就会触发pdflush/flush/kdmflush等后台回写进程运行,将一定缓存的脏页异步地刷入外存;
vm.dirty_ratio: 该参数则指定了当文件系统缓存脏页数量达到系统内存百分之多少时(如10%),系统不得不开始处理缓存脏页(因为此时脏页数量已经比较多,为了避免数据丢失需要将一定脏页刷入外存);在此过程中很多应用进程可能会因为系统转而处理文件IO而阻塞。
把该参数适当调小。如果cached的脏数据所占比例(这里是占MemTotal的比例)超过这个设置,系统会停止所有的应用层的IO写操作,等待刷完数据后恢复IO。所以万一触发了系统的这个操作,对于用户来说影响非常大的。
可以修改/etc/sysctl.conf文件进行持久化。
ES使用建议
读数据
避免大结果集和深翻,ES 提供了 Scroll 和 Scroll-Scan 这两种查询模式。
Scroll:是为检索大量的结果而设计的。例如,我们需要查询 1~100 页的数据,每页 100 条数据。
如果使用Search查询:每次都需要在每个分片上查询得分最高的 from+100 条数据,然后协同节点把收集到的 n×(from+100)条数据聚合起来再进行一次排序。
每次返回from+1开始的100条数据,并且要重复执行100次。
如果使用Scroll查询:在各个分片上查询10000条数据,协同节点聚合n×10000条数据进行合并、排序,并将排名前10000的结果快照起来。这样做的好处是减少了查询和排序的次数。
插入索引自动生成 id
当写入端使用特定的 id 将数据写入 ES 时,ES 会检查对应的索引下是否存在相同的 id,这个操作会随着文档数量的增加使消耗越来越大,所以如果业务上没有硬性需求建议使用 ES 自动生成的 id,加快写入速率。
避免稀疏索引
索引稀疏之后,会导致索引文件增大。ES的keyword,数组类型采用doc_values结构,即使字段是空值,每个文档也会占用一定的空间,所以稀疏索引会造成磁盘增大,导致查询和写入效率降低。
参数调优汇总
ES大批量写入提高性能的策略
用bulk批量写入
你如果要往es里面灌入数据的话,那么根据你的业务场景来,如果你的业务场景可以支持让你将一批数据聚合起来,一次性写入es,那么就尽量采用bulk的方式,每次批量写个几百条这样子。
bulk批量写入的性能比你一条一条写入大量的document的性能要好很多。但是如果要知道一个bulk请求最佳的大小,需要对单个es node的单个shard做压测。先bulk写入100个document,然后200个,400个,以此类推,每次都将bulk size加倍一次。如果bulk写入性能开始变平缓的时候,那么这个就是最佳的bulk大小。并不是bulk size越大越好,而是根据你的集群等环境具体要测试出来的,因为越大的bulk size会导致内存压力过大,因此最好一个请求不要发送超过10mb的数据量。
先确定一个是bulk size,此时就尽量是单线程,一个es node,一个shard,进行测试。看看单线程最多一次性写多少条数据,性能是比较好的。
使用多线程将数据写入es
单线程发送bulk请求是无法最大化es集群写入的吞吐量的。如果要利用集群的所有资源,就需要使用多线程并发将数据bulk写入集群中。为了更好的利用集群的资源,这样多线程并发写入,可以减少每次底层磁盘fsync的次数和开销。首先对单个es节点的单个shard做压测,比如说,先是2个线程,然后是4个线程,然后是8个线程,16个,每次线程数量倍增。一旦发现es返回了TOO_MANY_REQUESTS的错误,JavaClient也就是EsRejectedExecutionException。此时那么就说明es是说已经到了一个并发写入的最大瓶颈了,此时我们就知道最多只能支撑这么高的并发写入了。
增加refresh间隔
默认的refresh间隔是1s,用index.refresh_interval参数可以设置,这样会其强迫es每秒中都将内存中的数据写入磁盘中,创建一个新的segment file。正是这个间隔,让我们每次写入数据后,1s以后才能看到。但是如果我们将这个间隔调大,比如30s,可以接受写入的数据30s后才看到,那么我们就可以获取更大的写入吞吐量,因为30s内都是写内存的,每隔30s才会创建一个segment file。
禁止refresh和replia
如果我们要一次性加载大批量的数据进es,可以先禁止refresh和replia复制,将index.refresh_interval设置为-1,将index.number_of_replicas设置为0即可。这可能会导致我们的数据丢失,因为没有refresh和replica机制了。但是不需要创建segment file,也不需要将数据replica复制到其他的replica shasrd上面去。此时写入的速度会非常快,一旦写完之后,可以将refresh和replica修改回正常的状态。
禁止swapping交换内存
如果要将es jvm内存交换到磁盘,再交换回内存,大量磁盘IO,性能很差
给filesystem cache更多的内存
filesystem cache被用来执行更多的IO操作,如果我们能给filesystemcache更多的内存资源,那么es的写入性能会好很多。
使用自动生成的id
如果我们要手动给es document设置一个id,那么es需要每次都去确认一下那个id是否存在,这个过程是比较耗费时间的。如果我们使用自动生成的id,那么es就可以跳过这个步骤,写入性能会更好。对于你的业务中的表id,可以作为es document的一个field。
用性能更好的硬件
我们可以给filesystem cache更多的内存,也可以使用SSD替代机械硬盘,避免使用NAS等网络存储,考虑使用RAID 0来条带化存储提升磁盘并行读写效率,等等。
index buffer
如果我们要进行非常重的高并发写入操作,那么最好将index buffer调大一些,indices.memory.index_buffer_size,这个可以调节大一些,设置的这个index buffer大小,是所有的shard公用的,但是如果除以shard数量以后,算出来平均每个shard可以使用的内存大小,一般建议,但是对于每个shard来说,最多给512mb,因为再大性能就没什么提升了。es会将这个设置作为每个shard共享的index buffer,那些特别活跃的shard会更多的使用这个buffer。默认这个参数的值是10%,也就是jvm heap的10%,如果我们给jvmheap分配10gb内存,那么这个index buffer就有1gb,对于两个shard共享来说,是足够的了。
ES查询提高性能的策略
query_string 或 multi_match的查询字段越多, 查询越慢。可以在mapping阶段,利用copy_to属性将多字段的值索引到一个新字段,multi_match时,用新的字段查询。日期字段的查询, 尤其是用now 的查询实际上是不存在缓存的,因此, 可以从业务的角度来考虑是否一定要用now, 毕竟利用query cache 是能够大大提高查询效率的。查询结果集的大小不能随意设置成大得离谱的值, 如query.setSize不能设置成 Integer.MAX_VALUE, 因为ES内部需要建立一个数据结构来放指定大小的结果集数据。尽量避免使用script,万不得已需要使用的话,选择painless & experssions 引擎。一旦使用script查询,一定要注意控制返回,千万不要有死循环(如下错误的例子),因为ES没有脚本运行的超时控制,只要当前的脚本没执行完,该查询会一直阻塞。
5. 避免层级过深的聚合查询, 层级过深的group by , 会导致内存、CPU消耗,建议在服务层通过程序来组装业务,也可以通过pipeline的方式来优化。
6. 复用预索引数据方式来提高AGG性能:
如通过 terms aggregations 替代 range aggregations, 如要根据年龄来分组,分组目标是: 少年(14岁以下) 青年(14-28) 中年(29-50) 老年(51以上), 可以在索引的时候设置一个age_group字段,预先将数据进行分类。从而不用按age来做range aggregations, 通过age_group字段就可以了。
7. Cache的设置及使用:
a) QueryCache: ES查询的时候,使用filter查询会使用query cache, 如果业务场景中的过滤查询比较多,建议将querycache设置大一些,以提高查询速度。
indices.queries.cache.size:10%(默认),可设置成百分比,也可设置成具体值,如256mb。
当然也可以禁用查询缓存(默认是开启), 通过index.queries.cache.enabled:false设置。
b) FieldDataCache: 在聚类或排序时,field data cache会使用频繁,因此,设置字段数据缓存的大小,在聚类或排序场景较多的情形下很有必要,可通过indices.fielddata.cache.size:30% 或具体值10GB来设置。但是如果场景或数据变更比较频繁,设置cache并不是好的做法,因为缓存加载的开销也是特别大的。
c) ShardRequestCache: 查询请求发起后,每个分片会将结果返回给协调节点(Coordinating Node), 由协调节点将结果整合。
如果有需求,可以设置开启; 通过设置index.requests.cache.enable: true来开启。
不过,shard request cache只缓存hits.total, aggregations, suggestions类型的数据,并不会缓存hits的内容。也可以通过设置indices.requests.cache.size: 1%(默认)来控制缓存空间大小。