解决 MySQL 与 Elasticsearch 数据不对称问题


声明:本文转载自https://my.oschina.net/neochen/blog/1518679,转载目的在于传递更多信息,仅供学习交流之用。如有侵权行为,请联系我,我会及时删除。

本文节选自《Netkiller Database 手札》

作者:netkiller 网站:http://www.netkiller.cn

23.10.7. 解决MySQL与Elasticsearch 数据不对称问题

jdbc-input-plugin 只能实现数据库的追加,对于 elasticsearch 增量写入,但经常jdbc源一端的数据库可能会做数据库删除或者更新操作。这样一来数据库与搜索引擎的数据库就出现了不对称的情况。

当然你如果有开发团队可以写程序在删除或者更新的时候同步对搜索引擎操作。如果你没有这个能力,可以尝试下面的方法。

这里有一个数据表 article , mtime 字段定义了 ON UPDATE CURRENT_TIMESTAMP 所以每次更新mtime的时间都会变化

mysql> desc article; +-------------+--------------+------+-----+--------------------------------+-------+ | Field       | Type         | Null | Key | Default                        | Extra | +-------------+--------------+------+-----+--------------------------------+-------+ | id          | int(11)      | NO   |     | 0                              |       | | title       | mediumtext   | NO   |     | NULL                           |       | | description | mediumtext   | YES  |     | NULL                           |       | | author      | varchar(100) | YES  |     | NULL                           |       | | source      | varchar(100) | YES  |     | NULL                           |       | | content     | longtext     | YES  |     | NULL                           |       | | status      | enum('Y','N')| NO   |     | 'N'                            |       | | ctime       | timestamp    | NO   |     | CURRENT_TIMESTAMP              |       | | mtime       | timestamp    | YES  |     | ON UPDATE CURRENT_TIMESTAMP    |       | +-------------+--------------+------+-----+--------------------------------+-------+ 7 rows in set (0.00 sec)

logstash 增加 mtime 的查询规则

jdbc {     jdbc_driver_library => "/usr/share/java/mysql-connector-java.jar"     jdbc_driver_class => "com.mysql.jdbc.Driver"     jdbc_connection_string => "jdbc:mysql://localhost:3306/cms"     jdbc_user => "cms"     jdbc_password => "password"     schedule => "* * * * *"	#定时cron的表达式,这里是每分钟执行一次     statement => "select * from article where mtime > :sql_last_value"     use_column_value => true     tracking_column => "mtime"     tracking_column_type => "timestamp"      record_last_run => true     last_run_metadata_path => "/var/tmp/article-mtime.last"   }

创建回收站表,这个事用于解决数据库删除,或者禁用 status = 'N' 这种情况的。

CREATE TABLE `elasticsearch_trash` (   `id` int(11) NOT NULL,   `ctime` timestamp NULL DEFAULT CURRENT_TIMESTAMP,   PRIMARY KEY (`id`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8

为 article 表创建触发器

CREATE DEFINER=`dba`@`%` TRIGGER `article_BEFORE_UPDATE` BEFORE UPDATE ON `article` FOR EACH ROW BEGIN 	-- 此处的逻辑是解决文章状态变为 N 的时候,需要将搜索引擎中对应的数据删除。 	IF NEW.status = 'N' THEN 		insert into elasticsearch_trash(id) values(OLD.id); 	END IF; 	-- 此处逻辑是修改状态到 Y 的时候,方式elasticsearch_trash仍然存在该文章ID,导致误删除。所以需要删除回收站中得回收记录。     IF NEW.status = 'Y' THEN 		delete from elasticsearch_trash where id = OLD.id; 	END IF; END  CREATE DEFINER=`dba`@`%` TRIGGER `article_BEFORE_DELETE` BEFORE DELETE ON `article` FOR EACH ROW BEGIN 	-- 此处逻辑是文章被删除同事将改文章放入搜索引擎回收站。 	insert into elasticsearch_trash(id) values(OLD.id); END

接下来我们需要写一个简单地 Shell 每分钟运行一次,从 elasticsearch_trash 数据表中取出数据,然后使用 curl 命令调用 elasticsearch restful 接口,删除被收回的数据。

你还可以开发相关的程序,这里提供一个 Spring boot 定时任务例子。

实体

package cn.netkiller.api.domain.elasticsearch;  import java.util.Date;  import javax.persistence.Column; import javax.persistence.Entity; import javax.persistence.Id; import javax.persistence.Table;  @Entity @Table public class ElasticsearchTrash { 	@Id 	private int id;  	@Column(columnDefinition = "TIMESTAMP DEFAULT CURRENT_TIMESTAMP") 	private Date ctime;  	public int getId() { 		return id; 	}  	public void setId(int id) { 		this.id = id; 	}  	public Date getCtime() { 		return ctime; 	}  	public void setCtime(Date ctime) { 		this.ctime = ctime; 	}  }

仓库 

package cn.netkiller.api.repository.elasticsearch;  import org.springframework.data.repository.CrudRepository;  import com.example.api.domain.elasticsearch.ElasticsearchTrash;  public interface ElasticsearchTrashRepository extends CrudRepository<ElasticsearchTrash, Integer>{   } 

定时任务 

package cn.netkiller.api.schedule;  import org.elasticsearch.action.delete.DeleteResponse; import org.elasticsearch.client.transport.TransportClient; import org.elasticsearch.rest.RestStatus; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component;  import com.example.api.domain.elasticsearch.ElasticsearchTrash; import com.example.api.repository.elasticsearch.ElasticsearchTrashRepository;  @Component public class ScheduledTasks { 	private static final Logger logger = LoggerFactory.getLogger(ScheduledTasks.class);  	@Autowired 	private TransportClient client;  	@Autowired 	private ElasticsearchTrashRepository alasticsearchTrashRepository;  	public ScheduledTasks() { 	}  	@Scheduled(fixedRate = 1000 * 60) // 60秒运行一次调度任务 	public void cleanTrash() { 		for (ElasticsearchTrash elasticsearchTrash : alasticsearchTrashRepository.findAll()) { 			DeleteResponse response = client.prepareDelete("information", "article", elasticsearchTrash.getId() + "").get(); 			RestStatus status = response.status(); 			logger.info("delete {} {}", elasticsearchTrash.getId(), status.toString()); 			if (status == RestStatus.OK || status == RestStatus.NOT_FOUND) { 				alasticsearchTrashRepository.delete(elasticsearchTrash); 			} 		} 	} } 

Spring boot 启动主程序。 

package cn.netkiller.api;  import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.scheduling.annotation.EnableScheduling;  @SpringBootApplication @EnableScheduling public class Application {  	public static void main(String[] args) { 		SpringApplication.run(Application.class, args); 	} } 

 

本文发表于2017年08月22日 16:36
(c)注:本文转载自https://my.oschina.net/neochen/blog/1518679,转载目的在于传递更多信息,并不代表本网赞同其观点和对其真实性负责。如有侵权行为,请联系我们,我们会及时删除.

阅读 1748 讨论 0 喜欢 0

抢先体验

扫码体验
趣味小程序
文字表情生成器

闪念胶囊

你要过得好哇,这样我才能恨你啊,你要是过得不好,我都不知道该恨你还是拥抱你啊。

直抵黄龙府,与诸君痛饮尔。

那时陪伴我的人啊,你们如今在何方。

不出意外的话,我们再也不会见了,祝你前程似锦。

这世界真好,吃野东西也要留出这条命来看看

快捷链接
网站地图
提交友链
Copyright © 2016 - 2021 Cion.
All Rights Reserved.
京ICP备2021004668号-1