博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
canal源码分析——DirectLogFetcher源码分析
阅读量:6563 次
发布时间:2019-06-24

本文共 6682 字,大约阅读时间需要 22 分钟。

hot3.png

类结构

DirectLogFetcher的类结构图如下。

LogBuffer

|

LogFetcher

|

DirectLogFetcher

LogBuffer是一个数据库复制日志的缓存区,可将日志缓冲存储起来。

LogFetcher是一个日志提取器的抽象类,它定义了一些提取日志的抽象方法,供子类实现。

DirectLogFetcher是一个既有socket的日志提取器实现类,它实现了LogFetcher类。我们重点研究这个类中代码的实现。

DirectLogFetcher源码分析

package com.alibaba.otter.canal.parse.inbound.mysql.dbsync;import java.io.IOException;import java.io.InterruptedIOException;import java.net.SocketTimeoutException;import java.nio.ByteBuffer;import java.nio.channels.ClosedByInterruptException;import java.nio.channels.SocketChannel;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import com.taobao.tddl.dbsync.binlog.LogFetcher;/** * 基于socket的logEvent实现 *  * @author jianghang 2013-1-14 下午07:39:30 * @version 1.0.0 */public class DirectLogFetcher extends LogFetcher {    protected static final Logger logger            = LoggerFactory.getLogger(DirectLogFetcher.class);    /** Command to dump binlog */    public static final byte      COM_BINLOG_DUMP   = 18;    /** Packet header sizes */    public static final int       NET_HEADER_SIZE   = 4;    public static final int       SQLSTATE_LENGTH   = 5;    /** Packet offsets */    public static final int       PACKET_LEN_OFFSET = 0;    public static final int       PACKET_SEQ_OFFSET = 3;    /** Maximum packet length */    public static final int       MAX_PACKET_LENGTH = (256 * 256 * 256 - 1);    private SocketChannel         channel;    // private BufferedInputStream input;    public DirectLogFetcher(){        super(DEFAULT_INITIAL_CAPACITY, DEFAULT_GROWTH_FACTOR);    }    public DirectLogFetcher(final int initialCapacity){        super(initialCapacity, DEFAULT_GROWTH_FACTOR);    }    public DirectLogFetcher(final int initialCapacity, final float growthFactor){        super(initialCapacity, growthFactor);    }    public void start(SocketChannel channel) throws IOException {        this.channel = channel;        // 和mysql driver一样,提供buffer机制,提升读取binlog速度        // this.input = new        // BufferedInputStream(channel.socket().getInputStream(), 16384);    }    /**     * {@inheritDoc}     *      * @see com.taobao.tddl.dbsync.binlog.LogFetcher#fetch()     */    public boolean fetch() throws IOException {        try {            // Fetching packet header from input.            if (!fetch0(0, NET_HEADER_SIZE)) {                logger.warn("Reached end of input stream while fetching header");                return false;            }            // Fetching the first packet(may a multi-packet).            int netlen = getUint24(PACKET_LEN_OFFSET);            int netnum = getUint8(PACKET_SEQ_OFFSET);            if (!fetch0(NET_HEADER_SIZE, netlen)) {                logger.warn("Reached end of input stream: packet #" + netnum + ", len = " + netlen);                return false;            }            // Detecting error code.            final int mark = getUint8(NET_HEADER_SIZE);            if (mark != 0) {                if (mark == 255) // error from master                {                    // Indicates an error, for example trying to fetch from                    // wrong                    // binlog position.                    position = NET_HEADER_SIZE + 1;                    final int errno = getInt16();                    String sqlstate = forward(1).getFixString(SQLSTATE_LENGTH);                    String errmsg = getFixString(limit - position);                    throw new IOException("Received error packet:" + " errno = " + errno + ", sqlstate = " + sqlstate                                          + " errmsg = " + errmsg);                } else if (mark == 254) {                    // Indicates end of stream. It's not clear when this would                    // be sent.                    logger.warn("Received EOF packet from server, apparent"                                + " master disconnected. It's may be duplicate slaveId , check instance config");                    return false;                } else {                    // Should not happen.                    throw new IOException("Unexpected response " + mark + " while fetching binlog: packet #" + netnum                                          + ", len = " + netlen);                }            }            // The first packet is a multi-packet, concatenate the packets.            while (netlen == MAX_PACKET_LENGTH) {                if (!fetch0(0, NET_HEADER_SIZE)) {                    logger.warn("Reached end of input stream while fetching header");                    return false;                }                netlen = getUint24(PACKET_LEN_OFFSET);                netnum = getUint8(PACKET_SEQ_OFFSET);                if (!fetch0(limit, netlen)) {                    logger.warn("Reached end of input stream: packet #" + netnum + ", len = " + netlen);                    return false;                }            }            // Preparing buffer variables to decoding.            origin = NET_HEADER_SIZE + 1;            position = origin;            limit -= origin;            return true;        } catch (SocketTimeoutException e) {            close(); /* Do cleanup */            logger.error("Socket timeout expired, closing connection", e);            throw e;        } catch (InterruptedIOException e) {            close(); /* Do cleanup */            logger.info("I/O interrupted while reading from client socket", e);            throw e;        } catch (ClosedByInterruptException e) {            close(); /* Do cleanup */            logger.info("I/O interrupted while reading from client socket", e);            throw e;        } catch (IOException e) {            close(); /* Do cleanup */            logger.error("I/O error while reading from client socket", e);            throw e;        }    }    private final boolean fetch0(final int off, final int len) throws IOException {        ensureCapacity(off + len);        ByteBuffer buffer = ByteBuffer.wrap(this.buffer, off, len);        while (buffer.hasRemaining()) {            int readNum = channel.read(buffer);            if (readNum == -1) {                throw new IOException("Unexpected End Stream");            }        }        // for (int count, n = 0; n < len; n += count) {        // if (0 > (count = input.read(buffer, off + n, len - n))) {        // // Reached end of input stream        // return false;        // }        // }        if (limit < off + len) limit = off + len;        return true;    }    /**     * {@inheritDoc}     *      * @see com.taobao.tddl.dbsync.binlog.LogFetcher#close()     */    public void close() throws IOException {        // do nothing    }}

上线是该类的源码,我们逐个方法来分析一下。

start()方法特别简单,几乎什么都没有干,直接给内部的channel赋值而已,这个没有什么看的。

重点是fetch()方法的实现特别复杂,

转载于:https://my.oschina.net/ywbrj042/blog/647063

你可能感兴趣的文章
疑似checkpoint堵塞数据库连接
查看>>
Node.js中针对中文的查找和替换无效的解决方法
查看>>
理解指针的关键
查看>>
如何查看Ubuntu下已安装包版本号
查看>>
我的那些年(2)~我毕业了
查看>>
VS2017 配置ImageMagick
查看>>
Hive任务优化--控制hive任务中的map数和reduce数
查看>>
[摄影]上海往事
查看>>
『原创』c#实现文件加密、解密及文件拖拽至程序图标直接打开
查看>>
【Leetcode】Search in Rotated Sorted Array
查看>>
redis3.0.0 集群安装详细步骤
查看>>
如何在Linux命令行中创建以及展示演示稿
查看>>
FutureTask——另一种闭锁的实现
查看>>
Android和MVC
查看>>
Linux 用户和用户组管理
查看>>
tomcat架构分析(valve源码导读)
查看>>
spring中InitializingBean接口使用理解(转)
查看>>
基于php5.5使用PHPMailer-5.2发送邮件
查看>>
InstallShield 2012 Spring新功能试用(16): Suite/Advanced UI 或 Advanced UI安装程序能在安装时进行输入合法性校验与反馈...
查看>>
C#面试宝典
查看>>