Elasticsearch Java API 索引的增删改查(二)


声明:本文转载自https://my.oschina.net/quanke/blog/1574188,转载目的在于传递更多信息,仅供学习交流之用。如有侵权行为,请联系我,我会及时删除。

本节介绍以下 CRUD API:

单文档 APIs

多文档 APIs

Multi Get API Bulk API

注意:所有的单文档的CRUD API,index参数只能接受单一的索引库名称,或者是一个指向单一索引库的alias。

Index API

Index API 允许我们存储一个JSON格式的文档,使数据可以被搜索。文档通过index、type、id唯一确定。我们可以自己提供一个id,或者也使用Index API 为我们自动生成一个。

这里有几种不同的方式来产生JSON格式的文档(document):

  • 手动方式,使用原生的byte[]或者String
  • 使用Map方式,会自动转换成与之等价的JSON
  • 使用第三方库来序列化beans,如Jackson
  • 使用内置的帮助类 XContentFactory.jsonBuilder()

手动方式

数据格式

String json = "{" +         "\"user\":\"kimchy\"," +         "\"postDate\":\"2013-01-30\"," +         "\"message\":\"trying out Elasticsearch\"" +     "}"; 
实例
/**    * 手动生成JSON    */   @Test   public void CreateJSON(){              String json = "{" +               "\"user\":\"fendo\"," +               "\"postDate\":\"2013-01-30\"," +               "\"message\":\"Hell word\"" +           "}";              IndexResponse response = client.prepareIndex("fendo", "fendodate")               .setSource(json)               .get();       System.out.println(response.getResult());          }   

Map方式

Map是key:value数据类型,可以代表json结构.

Map<String, Object> json = new HashMap<String, Object>(); json.put("user","kimchy"); json.put("postDate",new Date()); json.put("message","trying out Elasticsearch"); 
实例
 /**    * 使用集合    */   @Test   public void CreateList(){              Map<String, Object> json = new HashMap<String, Object>();       json.put("user","kimchy");       json.put("postDate","2013-01-30");       json.put("message","trying out Elasticsearch");              IndexResponse response = client.prepareIndex("fendo", "fendodate")               .setSource(json)               .get();       System.out.println(response.getResult());          }   

序列化方式

ElasticSearch已经使用了jackson,可以直接使用它把javabean转为json.

import com.fasterxml.jackson.databind.*;  // instance a json mapper ObjectMapper mapper = new ObjectMapper(); // create once, reuse  // generate json byte[] json = mapper.writeValueAsBytes(yourbeaninstance); 
实例
/**    * 使用JACKSON序列化    * @throws Exception    */   @Test   public void CreateJACKSON() throws Exception{              CsdnBlog csdn=new CsdnBlog();       csdn.setAuthor("fendo");       csdn.setContent("这是JAVA书籍");       csdn.setTag("C");       csdn.setView("100");       csdn.setTitile("编程");       csdn.setDate(new Date().toString());              // instance a json mapper       ObjectMapper mapper = new ObjectMapper(); // create once, reuse        // generate json       byte[] json = mapper.writeValueAsBytes(csdn);              IndexResponse response = client.prepareIndex("fendo", "fendodate")               .setSource(json)               .get();       System.out.println(response.getResult());   }   

XContentBuilder帮助类方式

ElasticSearch提供了一个内置的帮助类XContentBuilder来产生JSON文档

// Index name String _index = response.getIndex(); // Type name String _type = response.getType(); // Document ID (generated or not) String _id = response.getId(); // Version (if it's the first time you index this document, you will get: 1) long _version = response.getVersion(); // status has stored current instance statement. RestStatus status = response.status(); 
实例
/**    * 使用ElasticSearch 帮助类    * @throws IOException     */   @Test   public void CreateXContentBuilder() throws IOException{              XContentBuilder builder = XContentFactory.jsonBuilder()               .startObject()                   .field("user", "ccse")                   .field("postDate", new Date())                   .field("message", "this is Elasticsearch")               .endObject();              IndexResponse response = client.prepareIndex("fendo", "fendodata").setSource(builder).get();       System.out.println("创建成功!");                 }   

综合实例

  import java.io.IOException;   import java.net.InetAddress;   import java.net.UnknownHostException;   import java.util.Date;   import java.util.HashMap;   import java.util.Map;      import org.elasticsearch.action.index.IndexResponse;   import org.elasticsearch.client.transport.TransportClient;   import org.elasticsearch.common.settings.Settings;   import org.elasticsearch.common.transport.InetSocketTransportAddress;   import org.elasticsearch.common.xcontent.XContentBuilder;   import org.elasticsearch.common.xcontent.XContentFactory;   import org.elasticsearch.transport.client.PreBuiltTransportClient;   import org.junit.Before;   import org.junit.Test;      import com.fasterxml.jackson.core.JsonProcessingException;   import com.fasterxml.jackson.databind.ObjectMapper;      public class CreateIndex {          private TransportClient client;              @Before       public void getClient() throws Exception{           //设置集群名称           Settings settings = Settings.builder().put("cluster.name", "my-application").build();// 集群名           //创建client           client  = new PreBuiltTransportClient(settings)                   .addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("127.0.0.1"), 9300));       }              /**        * 手动生成JSON        */       @Test       public void CreateJSON(){                      String json = "{" +                   "\"user\":\"fendo\"," +                   "\"postDate\":\"2013-01-30\"," +                   "\"message\":\"Hell word\"" +               "}";                      IndexResponse response = client.prepareIndex("fendo", "fendodate")                   .setSource(json)                   .get();           System.out.println(response.getResult());                  }                     /**        * 使用集合        */       @Test       public void CreateList(){                      Map<String, Object> json = new HashMap<String, Object>();           json.put("user","kimchy");           json.put("postDate","2013-01-30");           json.put("message","trying out Elasticsearch");                      IndexResponse response = client.prepareIndex("fendo", "fendodate")                   .setSource(json)                   .get();           System.out.println(response.getResult());                  }              /**        * 使用JACKSON序列化        * @throws Exception        */       @Test       public void CreateJACKSON() throws Exception{                      CsdnBlog csdn=new CsdnBlog();           csdn.setAuthor("fendo");           csdn.setContent("这是JAVA书籍");           csdn.setTag("C");           csdn.setView("100");           csdn.setTitile("编程");           csdn.setDate(new Date().toString());                      // instance a json mapper           ObjectMapper mapper = new ObjectMapper(); // create once, reuse              // generate json           byte[] json = mapper.writeValueAsBytes(csdn);                      IndexResponse response = client.prepareIndex("fendo", "fendodate")                   .setSource(json)                   .get();           System.out.println(response.getResult());       }              /**        * 使用ElasticSearch 帮助类        * @throws IOException         */       @Test       public void CreateXContentBuilder() throws IOException{                      XContentBuilder builder = XContentFactory.jsonBuilder()                   .startObject()                       .field("user", "ccse")                       .field("postDate", new Date())                       .field("message", "this is Elasticsearch")                   .endObject();                      IndexResponse response = client.prepareIndex("fendo", "fendodata").setSource(builder).get();           System.out.println("创建成功!");                             }          }   

你还可以通过startArray(string)和endArray()方法添加数组。.field()方法可以接受多种对象类型。你可以给它传递数字、日期、甚至其他XContentBuilder对象。

Get API

根据id查看文档:

GetResponse response = client.prepareGet("twitter", "tweet", "1").get();  

更多请查看 rest get API 文档

配置线程

operationThreaded 设置为 true 是在不同的线程里执行此次操作

下面的例子是operationThreaded 设置为 false

GetResponse response = client.prepareGet("twitter", "tweet", "1")         .setOperationThreaded(false)         .get(); 

Delete API

根据ID删除:

DeleteResponse response = client.prepareDelete("twitter", "tweet", "1").get();  

更多请查看 delete API 文档

配置线程

operationThreaded 设置为 true 是在不同的线程里执行此次操作

下面的例子是operationThreaded 设置为 false

GetResponse response = client.prepareGet("twitter", "tweet", "1")         .setOperationThreaded(false)         .get(); 
DeleteResponse response = client.prepareDelete("twitter", "tweet", "1")         .setOperationThreaded(false)         .get(); 

Delete By Query API

通过查询条件删除

BulkByScrollResponse response =     DeleteByQueryAction.INSTANCE.newRequestBuilder(client)         .filter(QueryBuilders.matchQuery("gender", "male")) //查询条件         .source("persons") //index(索引名)         .get();  //执行  long deleted = response.getDeleted(); //删除文档的数量 

如果需要执行的时间比较长,可以使用异步的方式处理,结果在回调里面获取

DeleteByQueryAction.INSTANCE.newRequestBuilder(client)     .filter(QueryBuilders.matchQuery("gender", "male"))      //查询                 .source("persons")                //index(索引名)                                         .execute(new ActionListener<BulkByScrollResponse>() {     //回调监听              @Override         public void onResponse(BulkByScrollResponse response) {             long deleted = response.getDeleted();   //删除文档的数量                          }         @Override         public void onFailure(Exception e) {             // Handle the exception         }     }); 

Update API

有两种方式更新索引:

  • 创建 UpdateRequest,通过client发送;
  • 使用 prepareUpdate() 方法;

使用UpdateRequest

UpdateRequest updateRequest = new UpdateRequest(); updateRequest.index("index"); updateRequest.type("type"); updateRequest.id("1"); updateRequest.doc(jsonBuilder()         .startObject()             .field("gender", "male")         .endObject()); client.update(updateRequest).get(); 

使用 prepareUpdate() 方法

这里官方的示例有问题,new Script()参数错误,所以一下代码是我自己写的(2017/11/10)

client.prepareUpdate("ttl", "doc", "1")         .setScript(new Script("ctx._source.gender = \"male\""  ,ScriptService.ScriptType.INLINE, null, null))//脚本可以是本地文件存储的,如果使用文件存储的脚本,需要设置 ScriptService.ScriptType.FILE          .get();  client.prepareUpdate("ttl", "doc", "1")         .setDoc(jsonBuilder()   //合并到现有文档             .startObject()                 .field("gender", "male")             .endObject())         .get(); 

Update by script

使用脚本更新文档

UpdateRequest updateRequest = new UpdateRequest("ttl", "doc", "1")         .script(new Script("ctx._source.gender = \"male\"")); client.update(updateRequest).get();  

Update by merging documents

合并文档

UpdateRequest updateRequest = new UpdateRequest("index", "type", "1")         .doc(jsonBuilder()             .startObject()                 .field("gender", "male")             .endObject()); client.update(updateRequest).get(); 

Upsert

更新插入,如果存在文档就更新,如果不存在就插入

IndexRequest indexRequest = new IndexRequest("index", "type", "1")         .source(jsonBuilder()             .startObject()                 .field("name", "Joe Smith")                 .field("gender", "male")             .endObject()); UpdateRequest updateRequest = new UpdateRequest("index", "type", "1")         .doc(jsonBuilder()             .startObject()                 .field("gender", "male")             .endObject())         .upsert(indexRequest); //如果不存在此文档 ,就增加 `indexRequest` client.update(updateRequest).get(); 

如果 index/type/1 存在,类似下面的文档:

{     "name"  : "Joe Dalton",     "gender": "male"         } 

如果不存在,会插入新的文档:

{     "name" : "Joe Smith",     "gender": "male" } 

Multi Get API

一次获取多个文档

MultiGetResponse multiGetItemResponses = client.prepareMultiGet()     .add("twitter", "tweet", "1") //一个id的方式     .add("twitter", "tweet", "2", "3", "4") //多个id的方式     .add("another", "type", "foo")  //可以从另外一个索引获取     .get();  for (MultiGetItemResponse itemResponse : multiGetItemResponses) { //迭代返回值     GetResponse response = itemResponse.getResponse();     if (response.isExists()) {      //判断是否存在                         String json = response.getSourceAsString(); //_source 字段     } } 

更多请浏览REST multi get 文档

Bulk API

Bulk API,批量插入:

import static org.elasticsearch.common.xcontent.XContentFactory.*; 
BulkRequestBuilder bulkRequest = client.prepareBulk();  // either use client#prepare, or use Requests# to directly build index/delete requests bulkRequest.add(client.prepareIndex("twitter", "tweet", "1")         .setSource(jsonBuilder()                     .startObject()                         .field("user", "kimchy")                         .field("postDate", new Date())                         .field("message", "trying out Elasticsearch")                     .endObject()                   )         );  bulkRequest.add(client.prepareIndex("twitter", "tweet", "2")         .setSource(jsonBuilder()                     .startObject()                         .field("user", "kimchy")                         .field("postDate", new Date())                         .field("message", "another post")                     .endObject()                   )         );  BulkResponse bulkResponse = bulkRequest.get(); if (bulkResponse.hasFailures()) {     // process failures by iterating through each bulk response item     //处理失败 } 

使用 Bulk Processor

BulkProcessor 提供了一个简单的接口,在给定的大小数量上定时批量自动请求

创建BulkProcessor实例

首先创建BulkProcessor实例

import org.elasticsearch.action.bulk.BackoffPolicy; import org.elasticsearch.action.bulk.BulkProcessor; import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.TimeValue; 
BulkProcessor bulkProcessor = BulkProcessor.builder(         client,  //增加elasticsearch客户端         new BulkProcessor.Listener() {             @Override             public void beforeBulk(long executionId,                                    BulkRequest request) { ... } //调用bulk之前执行 ,例如你可以通过request.numberOfActions()方法知道numberOfActions              @Override             public void afterBulk(long executionId,                                   BulkRequest request,                                   BulkResponse response) { ... } //调用bulk之后执行 ,例如你可以通过request.hasFailures()方法知道是否执行失败              @Override             public void afterBulk(long executionId,                                   BulkRequest request,                                   Throwable failure) { ... } //调用失败抛 Throwable         })         .setBulkActions(10000) //每次10000请求         .setBulkSize(new ByteSizeValue(5, ByteSizeUnit.MB)) //拆成5mb一块         .setFlushInterval(TimeValue.timeValueSeconds(5)) //无论请求数量多少,每5秒钟请求一次。         .setConcurrentRequests(1) //设置并发请求的数量。值为0意味着只允许执行一个请求。值为1意味着允许1并发请求。         .setBackoffPolicy(             BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(100), 3))//设置自定义重复请求机制,最开始等待100毫秒,之后成倍更加,重试3次,当一次或多次重复请求失败后因为计算资源不够抛出 EsRejectedExecutionException 异常,可以通过BackoffPolicy.noBackoff()方法关闭重试机制         .build(); 

BulkProcessor 默认设置

  • bulkActions 1000
  • bulkSize 5mb
  • 不设置flushInterval
  • concurrentRequests 为 1 ,异步执行
  • backoffPolicy 重试 8次,等待50毫秒

增加requests

然后增加requestsBulkProcessor

bulkProcessor.add(new IndexRequest("twitter", "tweet", "1").source(/* your doc here */)); bulkProcessor.add(new DeleteRequest("twitter", "tweet", "2")); 

关闭 Bulk Processor

当所有文档都处理完成,使用awaitCloseclose 方法关闭BulkProcessor:

bulkProcessor.awaitClose(10, TimeUnit.MINUTES);  

bulkProcessor.close();  

在测试中使用Bulk Processor

如果你在测试种使用Bulk Processor可以执行同步方法

BulkProcessor bulkProcessor = BulkProcessor.builder(client, new BulkProcessor.Listener() { /* Listener methods */ })         .setBulkActions(10000)         .setConcurrentRequests(0)         .build();  // Add your requests bulkProcessor.add(/* Your requests */);  // Flush any remaining requests bulkProcessor.flush();  // Or close the bulkProcessor if you don't need it anymore bulkProcessor.close();  // Refresh your indices client.admin().indices().prepareRefresh().get();  // Now you can start searching! client.prepareSearch().get(); 

所有实例 已经上传到Git

更多请浏览 spring-boot-starter-es 开源项目

如何有任何问题请关注微信公众号给我留言

全科的公众号

本文发表于2017年11月16日 00:33
(c)注:本文转载自https://my.oschina.net/quanke/blog/1574188,转载目的在于传递更多信息,并不代表本网赞同其观点和对其真实性负责。如有侵权行为,请联系我们,我们会及时删除.

阅读 1879 讨论 0 喜欢 0

抢先体验

扫码体验
趣味小程序
文字表情生成器

闪念胶囊

你要过得好哇,这样我才能恨你啊,你要是过得不好,我都不知道该恨你还是拥抱你啊。

直抵黄龙府,与诸君痛饮尔。

那时陪伴我的人啊,你们如今在何方。

不出意外的话,我们再也不会见了,祝你前程似锦。

这世界真好,吃野东西也要留出这条命来看看

快捷链接
网站地图
提交友链
Copyright © 2016 - 2021 Cion.
All Rights Reserved.
京ICP备2021004668号-1