• ADADADADAD

    MySQL在ROW模式下如何通过binlog提取SQL语句[ mysql数据库 ]

    mysql数据库 时间:2024-11-26 22:17:02

    作者:文/会员上传

    简介:

    Linux基于row模式的binlog,生成DML(insert/update/delete)的rollback语句通过mysqlbinlog -v 解析binlog生成可读的sql文件提取需要处理的有效sql "### "开头的行.如果输入的

    以下为本文的正文内容,内容仅供参考!本站为公益性网站,复制本文以及下载DOC文档全部免费。

    Linux
    基于row模式的binlog,生成DML(insert/update/delete)的rollback语句
    通过mysqlbinlog -v 解析binlog生成可读的sql文件
    提取需要处理的有效sql
    "### "开头的行.如果输入的start-position位于某个event group中间,则会导致"无法识别event"错误


    将INSERT/UPDATE/DELETE 的sql反转,并且1个完整sql只能占1行
    INSERT: INSERT INTO => DELETE FROM, SET => WHERE
    UPDATE: WHERE => SET, SET => WHERE
    DELETE: DELETE FROM => INSERT INTO, WHERE => SET
    用列名替换位置@{1,2,3}
    通过desc table获得列顺序及对应的列名
    特殊列类型value做特别处理
    逆序


    注意:
    表结构与现在的表结构必须相同[谨记]
    由于row模式是幂等的,并且恢复是一次性,所以只提取sql,不提取BEGIN/COMMIT
    只能对INSERT/UPDATE/DELETE进行处理

    mysql> select * from yoon;
    +----------+------------+-----------+---------------------+
    | actor_id | first_name | last_name | last_update |
    +----------+------------+-----------+---------------------+
    |1 | HANK| YOON | 2006-02-15 04:34:33 |
    |2 | HANK| YOON | 2006-02-15 04:34:33 |
    |3 | HANK| YOON | 2006-02-15 04:34:33 |
    |4 | HANK| YOON | 2006-02-15 04:34:33 |
    |5 | HANK| YOON | 2006-02-15 04:34:33 |
    |6 | HANK| YOON | 2006-02-15 04:34:33 |
    |7 | HANK| YOON | 2006-02-15 04:34:33 |
    |8 | HANK| YOON | 2006-02-15 04:34:33 |
    |9 | HANK| YOON | 2006-02-15 04:34:33 |
    |10 | HANK| YOON | 2006-02-15 04:34:33 |
    |11 | HANK| YOON | 2006-02-15 04:34:33 |
    +----------+------------+-----------+---------------------+
    11 rows in set (0.00 sec)


    mysql> delete from yoon;
    Query OK, 11 rows affected (1.03 sec)


    mysql> select * from yoon;
    Empty set (0.00 sec)

    命令之间的空格一定要注意,否则就会无法提取SQL语句:
    [root@hank-yoon data]# perl binlog-rollback.pl -f 'mysql-bin.000001' -o '/export/data/mysql/data/yoon.sql' -u 'root' -p 'yoon'
    Warning: Using a password on the command line interface can be insecure.
    [root@hank-yoon data]# ls
    auto.cnfhank ibdata2 ib_logfile1 modify.pl mysql-bin.000001 performance_schema test yoon.sql
    binlog-rollback.pl ibdata1 ib_logfile0 ib_logfile2 mysql mysql-bin.indexsakila yoon
    [root@hank-yoon data]# cat yoon.sql
    INSERT INTO `yoon`.`yoon` SET `actor_id`=11, `first_name`='HANK', `last_name`='YOON', `last_update`=from_unixtime(1139949273);
    INSERT INTO `yoon`.`yoon` SET `actor_id`=10, `first_name`='HANK', `last_name`='YOON', `last_update`=from_unixtime(1139949273);
    INSERT INTO `yoon`.`yoon` SET `actor_id`=9, `first_name`='HANK', `last_name`='YOON', `last_update`=from_unixtime(1139949273);
    INSERT INTO `yoon`.`yoon` SET `actor_id`=8, `first_name`='HANK', `last_name`='YOON', `last_update`=from_unixtime(1139949273);
    INSERT INTO `yoon`.`yoon` SET `actor_id`=7, `first_name`='HANK', `last_name`='YOON', `last_update`=from_unixtime(1139949273);
    INSERT INTO `yoon`.`yoon` SET `actor_id`=6, `first_name`='HANK', `last_name`='YOON', `last_update`=from_unixtime(1139949273);
    INSERT INTO `yoon`.`yoon` SET `actor_id`=5, `first_name`='HANK', `last_name`='YOON', `last_update`=from_unixtime(1139949273);
    INSERT INTO `yoon`.`yoon` SET `actor_id`=4, `first_name`='HANK', `last_name`='YOON', `last_update`=from_unixtime(1139949273);
    INSERT INTO `yoon`.`yoon` SET `actor_id`=3, `first_name`='HANK', `last_name`='YOON', `last_update`=from_unixtime(1139949273);
    INSERT INTO `yoon`.`yoon` SET `actor_id`=2, `first_name`='HANK', `last_name`='YOON', `last_update`=from_unixtime(1139949273);
    INSERT INTO `yoon`.`yoon` SET `actor_id`=1, `first_name`='HANK', `last_name`='YOON', `last_update`=from_unixtime(1139949273);

    mysql> INSERT INTO `yoon`.`yoon` SET `actor_id`=11, `first_name`='HANK', `last_name`='YOON', `last_update`=from_unixtime(1139949273);
    Query OK, 1 row affected (0.01 sec)


    mysql> select * from yoon;
    +----------+------------+-----------+---------------------+
    | actor_id | first_name | last_name | last_update |
    +----------+------------+-----------+---------------------+
    |11 | HANK| YOON | 2006-02-15 04:34:33 |
    +----------+------------+-----------+---------------------+


    点击(此处)折叠或打开

      #!/usr/lib/perl -w

      use strict;

      use warnings;

      use Class::Struct;

      use Getopt::Long qw(:config no_ignore_case);# GetOption

      # register handler system signals

      use sigtrap 'handler', \&sig_int, 'normal-signals';

      # catch signal

      sub sig_int(){

      my ($signals) = @_;

      print STDERR "# Caught SIG$signals.\n";

      exit 1;

      }

      my %opt;

      my $srcfile;

      my $host = '127.0.0.1';

      my $port = 3306;

      my ($user,$pwd);

      my ($MYSQL, $MYSQLBINLOG, $ROLLBACK_DML);

      my $outfile = '/dev/null';

      my (%do_dbs,%do_tbs);

      # tbname=>tbcol, tbcol: @n=>colname,type

      my %tbcol_pos;

      my $SPLITER_COL = ',';

      my $SQLTYPE_IST = 'INSERT';

      my $SQLTYPE_UPD = 'UPDATE';

      my $SQLTYPE_DEL = 'DELETE';

      my $SQLAREA_WHERE = 'WHERE';

      my $SQLAREA_SET = 'SET';

      my $PRE_FUNCT = '========================== ';

      # =========================================================

      # 基于row模式的binlog,生成DML(insert/update/delete)的rollback语句

      # 通过mysqlbinlog -v 解析binlog生成可读的sql文件

      # 提取需要处理的有效sql

      # "### "开头的行.如果输入的start-position位于某个event group中间,则会导致"无法识别event"错误

      #

      # 将INSERT/UPDATE/DELETE 的sql反转,并且1个完整sql只能占1行

      # INSERT: INSERT INTO => DELETE FROM, SET => WHERE

      # UPDATE: WHERE => SET, SET => WHERE

      # DELETE: DELETE FROM => INSERT INTO, WHERE => SET

      # 用列名替换位置@{1,2,3}

      # 通过desc table获得列顺序及对应的列名

      # 特殊列类型value做特别处理

      # 逆序

      #

      # 注意:

      # 表结构与现在的表结构必须相同[谨记]

      # 由于row模式是幂等的,并且恢复是一次性,所以只提取sql,不提取BEGIN/COMMIT

      # 只能对INSERT/UPDATE/DELETE进行处理

      # ========================================================

      sub main{

      # get input option

      &get_options();

      #

      &init_tbcol();

      #

      &do_binlog_rollback();

      }

      &main();

      # ----------------------------------------------------------------------------------------

      # Func : get options and set option flag

      # ----------------------------------------------------------------------------------------

      sub get_options{

      #Get options info

      GetOptions(\%opt,

      'help',# OUT : print help info

      'f|srcfile=s',# IN : binlog file

      'o|outfile=s',# out : output sql file

      'h|host=s',# IN : host

      'u|user=s', # IN : user

      'p|password=s', # IN : password

      'P|port=i',# IN : port

      'start-datetime=s',# IN : start datetime

      'stop-datetime=s',# IN : stop datetime

      'start-position=i',# IN : start position

      'stop-position=i',# IN : stop position

      'd|database=s',# IN : database, split comma

      'T|table=s',# IN : table, split comma

      'i|ignore',# IN : ignore binlog check ddl and so on

      'debug',# IN : print debug information

      ) or print_usage();

      if (!scalar(%opt)) {

      &print_usage();

      }

      # Handle for options

      if ($opt{'f'}){

      $srcfile = $opt{'f'};

      }else{

      &merror("please input binlog file");

      }

      $opt{'h'} and $host = $opt{'h'};

      $opt{'u'} and $user = $opt{'u'};

      $opt{'p'} and $pwd = $opt{'p'};

      $opt{'P'} and $port = $opt{'P'};

      if ($opt{'o'}) {

      $outfile = $opt{'o'};

      # 清空 outfile

      `echo '' > $outfile`;

      }

      #

      $MYSQL = qq{mysql -h$host -u$user -p'$pwd' -P$port};

      &mdebug("get_options::MYSQL\n\t$MYSQL");

      # 提取binlog,不需要显示列定义信息,用-v,而不用-vv

      $MYSQLBINLOG = qq{mysqlbinlog -v};

      $MYSQLBINLOG .= " --start-position=".$opt{'start-position'} if $opt{'start-position'};

      $MYSQLBINLOG .= " --stop-position=".$opt{'stop-position'} if $opt{'stop-postion'};

      $MYSQLBINLOG .= " --start-datetime='".$opt{'start-datetime'}."'" if $opt{'start-datetime'};

      $MYSQLBINLOG .= " --stop-datetime='$opt{'stop-datetime'}'" if $opt{'stop-datetime'};

      $MYSQLBINLOG .= " $srcfile";

      &mdebug("get_options::MYSQLBINLOG\n\t$MYSQLBINLOG");

      # 检查binlog中是否含有 ddl sql: CREATE|ALTER|DROP|RENAME

      &check_binlog() unless ($opt{'i'});

      # 不使用mysqlbinlog过滤,USE dbname;方式可能会漏掉某些sql,所以不在mysqlbinlog过滤

      # 指定数据库

      if ($opt{'d'}){

      my @dbs = split(/,/,$opt{'d'});

      foreach my $db (@dbs){

      $do_dbs{$db}=1;

      }

      }

      # 指定表

      if ($opt{'T'}){

      my @tbs = split(/,/,$opt{'T'});

      foreach my $tb (@tbs){

      $do_tbs{$tb}=1;

      }

      }

      # 提取有效DML SQL

      $ROLLBACK_DML = $MYSQLBINLOG." | grep '^### '";

      # 去掉注释: '### ' -> ''

      # 删除首尾空格

      $ROLLBACK_DML .= " | sed 's/###\\s*//g;s/\\s*\$//g'";

      &mdebug("rollback dml\n\t$ROLLBACK_DML");

      # 检查内容是否为空

      my $cmd = "$ROLLBACK_DML | wc -l";

      &mdebug("check contain dml sql\n\t$cmd");

      my $size = `$cmd`;

      chomp($size);

      unless ($size >0){

      &merror("binlog DML is empty:$ROLLBACK_DML");

      };

      }

      # ----------------------------------------------------------------------------------------

      # Func : check binlog contain DDL

      # ----------------------------------------------------------------------------------------

      sub check_binlog{

      &mdebug("$PRE_FUNCT check_binlog");

      my $cmd = "$MYSQLBINLOG ";

      $cmd .= " | grep -E -i '^(CREATE|ALTER|DROP|RENAME)' ";

      &mdebug("check binlog has DDL cmd\n\t$cmd");

      my $ddlcnt = `$cmd`;

      chomp($ddlcnt);

      my $ddlnum = `$cmd | wc -l`;

      chomp($ddlnum);

      my $res = 0;

      if ($ddlnum>0){

      # 在ddl sql前面加上前缀<DDL>

      $ddlcnt = `echo '$ddlcnt' | sed 's/^//g'`;

      &merror("binlog contain $ddlnum DDL:$MYSQLBINLOG. ddl sql:\n$ddlcnt");

      }

      return $res;

      }

      # ----------------------------------------------------------------------------------------

      # Func : init all table column order

      #if input --database --table params, only get set table column order

      # ----------------------------------------------------------------------------------------

      sub init_tbcol{

      &mdebug("$PRE_FUNCT init_tbcol");

      # 提取DML语句

      my $cmd .= "$ROLLBACK_DML | grep -E '^(INSERT|UPDATE|DELETE)'";

      # 提取表名,并去重

      #$cmd .= " | awk '{if (\$1 ~ \"^UPDATE\") {print \$2}else {print \$3}}' | uniq ";

      $cmd .= " | awk '{if (\$1 ~ \"^UPDATE\") {print \$2}else {print \$3}}' | sort | uniq ";

      &mdebug("get table name cmd\n\t$cmd");

      open ALLTABLE, "$cmd | " or die "can't open file:$cmd\n";

      while (my $tbname = <ALLTABLE>){

      chomp($tbname);

      #if (exists $tbcol_pos{$tbname}){

      #next;

      #}

      &init_one_tbcol($tbname) unless (&ignore_tb($tbname));

      }

      close ALLTABLE or die "can't close file:$cmd\n";

      # init tb col

      foreach my $tb (keys %tbcol_pos){

      &mdebug("tbname->$tb");

      my %colpos = %{$tbcol_pos{$tb}};

      foreach my $pos (keys %colpos){

      my $col = $colpos{$pos};

      my ($cname,$ctype) = split(/$SPLITER_COL/, $col);

      &mdebug("\tpos->$pos,cname->$cname,ctype->$ctype");

      }

      }

      };

      # ----------------------------------------------------------------------------------------

      # Func : init one table column order

      # ----------------------------------------------------------------------------------------

      sub init_one_tbcol{

      my $tbname = shift;

      &mdebug("$PRE_FUNCT init_one_tbcol");

      # 获取表结构及列顺序

      my $cmd = $MYSQL." --skip-column-names --silent -e 'desc $tbname'";

      # 提取列名,并拼接

      $cmd .= " | awk -F\'\\t\' \'{print NR\"$SPLITER_COL`\"\$1\"`$SPLITER_COL\"\$2}'";

      &mdebug("get table column infor cmd\n\t$cmd");

      open TBCOL,"$cmd | " or die "can't open desc $tbname;";

      my %colpos;

      while (my $line = <TBCOL>){

      chomp($line);

      my ($pos,$col,$coltype) = split(/$SPLITER_COL/,$line);

      &mdebug("linesss=$line\n\t\tpos=$pos\n\t\tcol=$col\n\t\ttype=$coltype");

      $colpos{$pos} = $col.$SPLITER_COL.$coltype;

      }

      close TBCOL or die "can't colse desc $tbname";

      $tbcol_pos{$tbname} = \%colpos;

      }

      # ----------------------------------------------------------------------------------------

      # Func : rollback sql:INSERT/UPDATE/DELETE

      # ----------------------------------------------------------------------------------------

      sub do_binlog_rollback{

      my $binlogfile = "$ROLLBACK_DML ";

      &mdebug("$PRE_FUNCT do_binlog_rollback");

      # INSERT|UPDATE|DELETE

      my $sqltype;

      # WHERE|SET

      my $sqlarea;

      my ($tbname, $sqlstr) = ('', '');

      my ($notignore, $isareabegin) = (0,0);

      # output sql file

      open SQLFILE, ">> $outfile" or die "Can't open sql file:$outfile";

      # binlog file

      open BINLOG, "$binlogfile |" or die "Can't open file: $binlogfile";

      while (my $line = <BINLOG>){

      chomp($line);

      if ($line =~ /^(INSERT|UPDATE|DELETE)/){

      # export sql

      if ($sqlstr ne ''){

      $sqlstr .= ";\n";

      print SQLFILE $sqlstr;

      &mdebug("export sql\n\t".$sqlstr);

      $sqlstr = '';

      }

      if ($line =~ /^INSERT/){

      $sqltype = $SQLTYPE_IST;

      $tbname = `echo '$line' | awk '{print \$3}'`;

      chomp($tbname);

      $sqlstr = qq{DELETE FROM $tbname};

      }elsif ($line =~ /^UPDATE/){

      $sqltype = $SQLTYPE_UPD;

      $tbname = `echo '$line' | awk '{print \$2}'`;

      chomp($tbname);

      $sqlstr = qq{UPDATE $tbname};

      }elsif ($line =~ /^DELETE/){

      $sqltype = $SQLTYPE_DEL;

      $tbname = `echo '$line' | awk '{print \$3}'`;

      chomp($tbname);

      $sqlstr = qq{INSERT INTO $tbname};

      }

      # check ignore table

      if(&ignore_tb($tbname)){

      $notignore = 0;

      &mdebug("#IGNORE#:line:".$line);

      $sqlstr = '';

      }else{

      $notignore = 1;

      &mdebug("#DO#:line:".$line);

      }

      }else {

      if($notignore){

      &merror("can't get tbname") unless (defined($tbname));

      if ($line =~ /^WHERE/){

      $sqlarea = $SQLAREA_WHERE;

      $sqlstr .= qq{ SET};

      $isareabegin = 1;

      }elsif ($line =~ /^SET/){

      $sqlarea = $SQLAREA_SET;

      $sqlstr .= qq{ WHERE};

      $isareabegin = 1;

      }elsif ($line =~ /^\@/){

      $sqlstr .= &deal_col_value($tbname, $sqltype, $sqlarea, $isareabegin, $line);

      $isareabegin = 0;

      }else{

      &mdebug("::unknown sql:".$line);

      }

      }

      }

      }

      # export last sql

      if ($sqlstr ne ''){

      $sqlstr .= ";\n";

      print SQLFILE $sqlstr;

      &mdebug("export sql\n\t".$sqlstr);

      }

      close BINLOG or die "Can't close binlog file: $binlogfile";

      close SQLFILE or die "Can't close out sql file: $outfile";

      # 逆序

      # 1!G: 只有第一行不执行G, 将hold space中的内容append回到pattern space

      # h: 将pattern space 拷贝到hold space

      # $!d: 除最后一行都删除

      my $invert = "sed -i '1!G;h;\$!d' $outfile";

      my $res = `$invert`;

      &mdebug("inverter order sqlfile :$invert");

      }

      # ----------------------------------------------------------------------------------------

      # Func : transfer column pos to name

      #deal column value

      #

      # &deal_col_value($tbname, $sqltype, $sqlarea, $isareabegin, $line);

      # ----------------------------------------------------------------------------------------

      sub deal_col_value($$$$$){

      my ($tbname, $sqltype, $sqlarea, $isareabegin, $line) = @_;

      &mdebug("$PRE_FUNCT deal_col_value");

      &mdebug("input:tbname->$tbname,type->$sqltype,area->$sqlarea,areabegin->$isareabegin,line->$line");

      my @vals = split(/=/, $line);

      my $pos = substr($vals[0],1);

      my $valstartpos = length($pos)+2;

      my $val = substr($line,$valstartpos);

      my %tbcol = %{$tbcol_pos{$tbname}};

      my ($cname,$ctype) = split(/$SPLITER_COL/,$tbcol{$pos});

      &merror("can't get $tbname column $cname type") unless (defined($cname) || defined($ctype));

      &mdebug("column infor:cname->$cname,type->$ctype");

      # join str

      my $joinstr;

      if ($isareabegin){

      $joinstr = ' ';

      }else{

      # WHERE 被替换为 SET, 使用 , 连接

      if ($sqlarea eq $SQLAREA_WHERE){

      $joinstr = ', ';

      # SET 被替换为 WHERE 使用 AND 连接

      }elsif ($sqlarea eq $SQLAREA_SET){

      $joinstr = ' AND ';

      }else{

      &merror("!!!!!!The scripts error");

      }

      }

      #

      my $newline = $joinstr;

      # NULL value

      if (($val eq 'NULL') && ($sqlarea eq $SQLAREA_SET)){

      $newline .= qq{ $cname IS NULL};

      }else{

      # timestamp: record seconds

      if ($ctype eq 'timestamp'){

      $newline .= qq{$cname=from_unixtime($val)};

      # datetime: @n=yyyy-mm-dd hh::ii::ss

      }elsif ($ctype eq 'datetime'){

      $newline .= qq{$cname='$val'};

      }else{

      $newline .= qq{$cname=$val};

      }

      }

      &mdebug("\told>$line\n\tnew>$newline");

      return $newline;

      }

      # ----------------------------------------------------------------------------------------

      # Func : check is ignore table

      # params: IN table full name # format:`dbname`.`tbname`

      # RETURN:

      #0 not ignore

      #1 ignore

      # ----------------------------------------------------------------------------------------

      sub ignore_tb($){

      my $fullname = shift;

      # 删除`

      $fullname =~ s/`//g;

      my ($dbname,$tbname) = split(/\./,$fullname);

      my $res = 0;

      # 指定了数据库

      if ($opt{'d'}){

      # 与指定库相同

      if ($do_dbs{$dbname}){

      # 指定表

      if ($opt{'T'}){

      # 与指定表不同

      unless ($do_tbs{$tbname}){

      $res = 1;

      }

      }

      # 与指定库不同

      }else{

      $res = 1;

      }

      }

      #&mdebug("Table check ignore:$fullname->$res");

      return $res;

      }

      # ----------------------------------------------------------------------------------------

      # Func : print debug msg

      # ----------------------------------------------------------------------------------------

      sub mdebug{

      my (@msg) = @_;

      print "@msg\n" if ($opt{'debug'});

      }

      # ----------------------------------------------------------------------------------------

      # Func : print error msg and exit

      # ----------------------------------------------------------------------------------------

      sub merror{

      my (@msg) = @_;

      print ":@msg\n";

      &print_usage();

      exit(1);

      }

      # ----------------------------------------------------------------------------------------

      # Func : print usage

      # ----------------------------------------------------------------------------------------

      sub print_usage{

      print <<EOF;

      ==========================================================================================

      Command line options :

      --help# OUT : print help info

      -f, --srcfile# IN : binlog file. [required]

      -o, --outfile# OUT : output sql file. [required]

      -h, --host# IN : host. default '127.0.0.1'

      -u, --user# IN : user. [required]

      -p, --password# IN : password. [required]

      -P, --port# IN : port. default '3306'

      --start-datetime# IN : start datetime

      --stop-datetime# IN : stop datetime

      --start-position# IN : start position

      --stop-position# IN : stop position

      -d, --database# IN : database, split comma

      -T, --table# IN : table, split comma. [required] set -d

      -i, --ignore# IN : ignore binlog check contain DDL(CREATE|ALTER|DROP|RENAME)

      --debug# IN : print debug information

      Sample :

      shell> perl binlog-rollback.pl -f 'mysql-bin.000001' -o '/tmp/t.sql' -u 'user' -p 'pwd'

      shell> perl binlog-rollback.pl -f 'mysql-bin.000001' -o '/tmp/t.sql' -u 'user' -p 'pwd' -i

      shell> perl binlog-rollback.pl -f 'mysql-bin.000001' -o '/tmp/t.sql' -u 'user' -p 'pwd' --debug

      shell> perl binlog-rollback.pl -f 'mysql-bin.000001' -o '/tmp/t.sql' -h '192.168.1.2' -u 'user' -p 'pwd' -P 3307

      shell> perl binlog-rollback.pl -f 'mysql-bin.000001' -o '/tmp/t.sql' -u 'user' -p 'pwd' --start-position=107

      shell> perl binlog-rollback.pl -f 'mysql-bin.000001' -o '/tmp/t.sql' -u 'user' -p 'pwd' --start-position=107 --stop-position=10000

      shell> perl binlog-rollback.pl -f 'mysql-bin.000001' -o '/tmp/t.sql' -u 'user' -p 'pwd' -d 'db1,db2'

      shell> perl binlog-rollback.pl -f 'mysql-bin.0000*' -o '/tmp/t.sql' -u 'user' -p 'pwd' -d 'db1,db2' -T 'tb1,tb2'

      ==========================================================================================

      EOF

      exit;

      }

      1;

    MySQL在ROW模式下如何通过binlog提取SQL语句.docx

    将本文的Word文档下载到电脑

    推荐度:

    下载
    热门标签: mysqlrowsql