本节介绍以下 CRUD API:
单文档 APIs
多文档 APIs
Multi Get API Bulk API
注意:所有的单文档的CRUD API,index参数只能接受单一的索引库名称,或者是一个指向单一索引库的alias。
Index API
Index API 允许我们存储一个JSON格式的文档,使数据可以被搜索。文档通过index、type、id唯一确定。我们可以自己提供一个id,或者也使用Index API 为我们自动生成一个。
这里有几种不同的方式来产生JSON格式的文档(document):
- 手动方式,使用原生的byte[]或者String
- 使用Map方式,会自动转换成与之等价的JSON
- 使用第三方库来序列化beans,如Jackson
- 使用内置的帮助类 XContentFactory.jsonBuilder()
手动方式
数据格式
String json = "{" + "\"user\":\"kimchy\"," + "\"postDate\":\"2013-01-30\"," + "\"message\":\"trying out Elasticsearch\"" + "}";
实例
/** * 手动生成JSON */ @Test public void CreateJSON(){ String json = "{" + "\"user\":\"fendo\"," + "\"postDate\":\"2013-01-30\"," + "\"message\":\"Hell word\"" + "}"; IndexResponse response = client.prepareIndex("fendo", "fendodate") .setSource(json) .get(); System.out.println(response.getResult()); }
Map方式
Map是key:value数据类型,可以代表json结构.
Map<String, Object> json = new HashMap<String, Object>(); json.put("user","kimchy"); json.put("postDate",new Date()); json.put("message","trying out Elasticsearch");
实例
/** * 使用集合 */ @Test public void CreateList(){ Map<String, Object> json = new HashMap<String, Object>(); json.put("user","kimchy"); json.put("postDate","2013-01-30"); json.put("message","trying out Elasticsearch"); IndexResponse response = client.prepareIndex("fendo", "fendodate") .setSource(json) .get(); System.out.println(response.getResult()); }
序列化方式
ElasticSearch已经使用了jackson,可以直接使用它把javabean转为json.
import com.fasterxml.jackson.databind.*; // instance a json mapper ObjectMapper mapper = new ObjectMapper(); // create once, reuse // generate json byte[] json = mapper.writeValueAsBytes(yourbeaninstance);
实例
/** * 使用JACKSON序列化 * @throws Exception */ @Test public void CreateJACKSON() throws Exception{ CsdnBlog csdn=new CsdnBlog(); csdn.setAuthor("fendo"); csdn.setContent("这是JAVA书籍"); csdn.setTag("C"); csdn.setView("100"); csdn.setTitile("编程"); csdn.setDate(new Date().toString()); // instance a json mapper ObjectMapper mapper = new ObjectMapper(); // create once, reuse // generate json byte[] json = mapper.writeValueAsBytes(csdn); IndexResponse response = client.prepareIndex("fendo", "fendodate") .setSource(json) .get(); System.out.println(response.getResult()); }
XContentBuilder帮助类方式
ElasticSearch提供了一个内置的帮助类XContentBuilder来产生JSON文档
// Index name String _index = response.getIndex(); // Type name String _type = response.getType(); // Document ID (generated or not) String _id = response.getId(); // Version (if it's the first time you index this document, you will get: 1) long _version = response.getVersion(); // status has stored current instance statement. RestStatus status = response.status();
实例
/** * 使用ElasticSearch 帮助类 * @throws IOException */ @Test public void CreateXContentBuilder() throws IOException{ XContentBuilder builder = XContentFactory.jsonBuilder() .startObject() .field("user", "ccse") .field("postDate", new Date()) .field("message", "this is Elasticsearch") .endObject(); IndexResponse response = client.prepareIndex("fendo", "fendodata").setSource(builder).get(); System.out.println("创建成功!"); }
综合实例
import java.io.IOException; import java.net.InetAddress; import java.net.UnknownHostException; import java.util.Date; import java.util.HashMap; import java.util.Map; import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.client.transport.TransportClient; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.InetSocketTransportAddress; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.transport.client.PreBuiltTransportClient; import org.junit.Before; import org.junit.Test; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; public class CreateIndex { private TransportClient client; @Before public void getClient() throws Exception{ //设置集群名称 Settings settings = Settings.builder().put("cluster.name", "my-application").build();// 集群名 //创建client client = new PreBuiltTransportClient(settings) .addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("127.0.0.1"), 9300)); } /** * 手动生成JSON */ @Test public void CreateJSON(){ String json = "{" + "\"user\":\"fendo\"," + "\"postDate\":\"2013-01-30\"," + "\"message\":\"Hell word\"" + "}"; IndexResponse response = client.prepareIndex("fendo", "fendodate") .setSource(json) .get(); System.out.println(response.getResult()); } /** * 使用集合 */ @Test public void CreateList(){ Map<String, Object> json = new HashMap<String, Object>(); json.put("user","kimchy"); json.put("postDate","2013-01-30"); json.put("message","trying out Elasticsearch"); IndexResponse response = client.prepareIndex("fendo", "fendodate") .setSource(json) .get(); System.out.println(response.getResult()); } /** * 使用JACKSON序列化 * @throws Exception */ @Test public void CreateJACKSON() throws Exception{ CsdnBlog csdn=new CsdnBlog(); csdn.setAuthor("fendo"); csdn.setContent("这是JAVA书籍"); csdn.setTag("C"); csdn.setView("100"); csdn.setTitile("编程"); csdn.setDate(new Date().toString()); // instance a json mapper ObjectMapper mapper = new ObjectMapper(); // create once, reuse // generate json byte[] json = mapper.writeValueAsBytes(csdn); IndexResponse response = client.prepareIndex("fendo", "fendodate") .setSource(json) .get(); System.out.println(response.getResult()); } /** * 使用ElasticSearch 帮助类 * @throws IOException */ @Test public void CreateXContentBuilder() throws IOException{ XContentBuilder builder = XContentFactory.jsonBuilder() .startObject() .field("user", "ccse") .field("postDate", new Date()) .field("message", "this is Elasticsearch") .endObject(); IndexResponse response = client.prepareIndex("fendo", "fendodata").setSource(builder).get(); System.out.println("创建成功!"); } }
你还可以通过startArray(string)和endArray()方法添加数组。.field()方法可以接受多种对象类型。你可以给它传递数字、日期、甚至其他XContentBuilder对象。
Get API
根据id查看文档:
GetResponse response = client.prepareGet("twitter", "tweet", "1").get();
更多请查看 rest get API 文档
配置线程
operationThreaded
设置为 true
是在不同的线程里执行此次操作
下面的例子是operationThreaded
设置为 false
:
GetResponse response = client.prepareGet("twitter", "tweet", "1") .setOperationThreaded(false) .get();
Delete API
根据ID删除:
DeleteResponse response = client.prepareDelete("twitter", "tweet", "1").get();
更多请查看 delete API 文档
配置线程
operationThreaded
设置为 true
是在不同的线程里执行此次操作
下面的例子是operationThreaded
设置为 false
:
GetResponse response = client.prepareGet("twitter", "tweet", "1") .setOperationThreaded(false) .get();
DeleteResponse response = client.prepareDelete("twitter", "tweet", "1") .setOperationThreaded(false) .get();
Delete By Query API
通过查询条件删除
BulkByScrollResponse response = DeleteByQueryAction.INSTANCE.newRequestBuilder(client) .filter(QueryBuilders.matchQuery("gender", "male")) //查询条件 .source("persons") //index(索引名) .get(); //执行 long deleted = response.getDeleted(); //删除文档的数量
如果需要执行的时间比较长,可以使用异步的方式处理,结果在回调里面获取
DeleteByQueryAction.INSTANCE.newRequestBuilder(client) .filter(QueryBuilders.matchQuery("gender", "male")) //查询 .source("persons") //index(索引名) .execute(new ActionListener<BulkByScrollResponse>() { //回调监听 @Override public void onResponse(BulkByScrollResponse response) { long deleted = response.getDeleted(); //删除文档的数量 } @Override public void onFailure(Exception e) { // Handle the exception } });
Update API
有两种方式更新索引:
- 创建
UpdateRequest
,通过client发送; - 使用
prepareUpdate()
方法;
使用UpdateRequest
UpdateRequest updateRequest = new UpdateRequest(); updateRequest.index("index"); updateRequest.type("type"); updateRequest.id("1"); updateRequest.doc(jsonBuilder() .startObject() .field("gender", "male") .endObject()); client.update(updateRequest).get();
使用 prepareUpdate()
方法
这里官方的示例有问题,new Script()参数错误,所以一下代码是我自己写的(2017/11/10)
client.prepareUpdate("ttl", "doc", "1") .setScript(new Script("ctx._source.gender = \"male\"" ,ScriptService.ScriptType.INLINE, null, null))//脚本可以是本地文件存储的,如果使用文件存储的脚本,需要设置 ScriptService.ScriptType.FILE .get(); client.prepareUpdate("ttl", "doc", "1") .setDoc(jsonBuilder() //合并到现有文档 .startObject() .field("gender", "male") .endObject()) .get();
Update by script
使用脚本更新文档
UpdateRequest updateRequest = new UpdateRequest("ttl", "doc", "1") .script(new Script("ctx._source.gender = \"male\"")); client.update(updateRequest).get();
Update by merging documents
合并文档
UpdateRequest updateRequest = new UpdateRequest("index", "type", "1") .doc(jsonBuilder() .startObject() .field("gender", "male") .endObject()); client.update(updateRequest).get();
Upsert
更新插入,如果存在文档就更新,如果不存在就插入
IndexRequest indexRequest = new IndexRequest("index", "type", "1") .source(jsonBuilder() .startObject() .field("name", "Joe Smith") .field("gender", "male") .endObject()); UpdateRequest updateRequest = new UpdateRequest("index", "type", "1") .doc(jsonBuilder() .startObject() .field("gender", "male") .endObject()) .upsert(indexRequest); //如果不存在此文档 ,就增加 `indexRequest` client.update(updateRequest).get();
如果 index/type/1
存在,类似下面的文档:
{ "name" : "Joe Dalton", "gender": "male" }
如果不存在,会插入新的文档:
{ "name" : "Joe Smith", "gender": "male" }
Multi Get API
一次获取多个文档
MultiGetResponse multiGetItemResponses = client.prepareMultiGet() .add("twitter", "tweet", "1") //一个id的方式 .add("twitter", "tweet", "2", "3", "4") //多个id的方式 .add("another", "type", "foo") //可以从另外一个索引获取 .get(); for (MultiGetItemResponse itemResponse : multiGetItemResponses) { //迭代返回值 GetResponse response = itemResponse.getResponse(); if (response.isExists()) { //判断是否存在 String json = response.getSourceAsString(); //_source 字段 } }
更多请浏览REST multi get 文档
Bulk API
Bulk API,批量插入:
import static org.elasticsearch.common.xcontent.XContentFactory.*;
BulkRequestBuilder bulkRequest = client.prepareBulk(); // either use client#prepare, or use Requests# to directly build index/delete requests bulkRequest.add(client.prepareIndex("twitter", "tweet", "1") .setSource(jsonBuilder() .startObject() .field("user", "kimchy") .field("postDate", new Date()) .field("message", "trying out Elasticsearch") .endObject() ) ); bulkRequest.add(client.prepareIndex("twitter", "tweet", "2") .setSource(jsonBuilder() .startObject() .field("user", "kimchy") .field("postDate", new Date()) .field("message", "another post") .endObject() ) ); BulkResponse bulkResponse = bulkRequest.get(); if (bulkResponse.hasFailures()) { // process failures by iterating through each bulk response item //处理失败 }
使用 Bulk Processor
BulkProcessor 提供了一个简单的接口,在给定的大小数量上定时批量自动请求
创建BulkProcessor
实例
首先创建BulkProcessor
实例
import org.elasticsearch.action.bulk.BackoffPolicy; import org.elasticsearch.action.bulk.BulkProcessor; import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.TimeValue;
BulkProcessor bulkProcessor = BulkProcessor.builder( client, //增加elasticsearch客户端 new BulkProcessor.Listener() { @Override public void beforeBulk(long executionId, BulkRequest request) { ... } //调用bulk之前执行 ,例如你可以通过request.numberOfActions()方法知道numberOfActions @Override public void afterBulk(long executionId, BulkRequest request, BulkResponse response) { ... } //调用bulk之后执行 ,例如你可以通过request.hasFailures()方法知道是否执行失败 @Override public void afterBulk(long executionId, BulkRequest request, Throwable failure) { ... } //调用失败抛 Throwable }) .setBulkActions(10000) //每次10000请求 .setBulkSize(new ByteSizeValue(5, ByteSizeUnit.MB)) //拆成5mb一块 .setFlushInterval(TimeValue.timeValueSeconds(5)) //无论请求数量多少,每5秒钟请求一次。 .setConcurrentRequests(1) //设置并发请求的数量。值为0意味着只允许执行一个请求。值为1意味着允许1并发请求。 .setBackoffPolicy( BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(100), 3))//设置自定义重复请求机制,最开始等待100毫秒,之后成倍更加,重试3次,当一次或多次重复请求失败后因为计算资源不够抛出 EsRejectedExecutionException 异常,可以通过BackoffPolicy.noBackoff()方法关闭重试机制 .build();
BulkProcessor 默认设置
- bulkActions 1000
- bulkSize 5mb
- 不设置flushInterval
- concurrentRequests 为 1 ,异步执行
- backoffPolicy 重试 8次,等待50毫秒
增加requests
然后增加requests
到BulkProcessor
bulkProcessor.add(new IndexRequest("twitter", "tweet", "1").source(/* your doc here */)); bulkProcessor.add(new DeleteRequest("twitter", "tweet", "2"));
关闭 Bulk Processor
当所有文档都处理完成,使用awaitClose
或 close
方法关闭BulkProcessor
:
bulkProcessor.awaitClose(10, TimeUnit.MINUTES);
或
bulkProcessor.close();
在测试中使用Bulk Processor
如果你在测试种使用Bulk Processor
可以执行同步方法
BulkProcessor bulkProcessor = BulkProcessor.builder(client, new BulkProcessor.Listener() { /* Listener methods */ }) .setBulkActions(10000) .setConcurrentRequests(0) .build(); // Add your requests bulkProcessor.add(/* Your requests */); // Flush any remaining requests bulkProcessor.flush(); // Or close the bulkProcessor if you don't need it anymore bulkProcessor.close(); // Refresh your indices client.admin().indices().prepareRefresh().get(); // Now you can start searching! client.prepareSearch().get();
所有实例 已经上传到Git
更多请浏览 spring-boot-starter-es 开源项目
如何有任何问题请关注微信公众号给我留言
