stream 的delete、insert冲突解决方式

3.继续测试:
 

SQL> create table hz.t6 (id number primary key ,vname varchar2(20),vaddr varchar2(20),vqty number);
 
Table created
 
SQL> insert into hz.t6 values (1,'a1','guangzhou',100);
 
1 row inserted
 
SQL> insert into hz.t6 values (2,'b1','zhenzhou',300);
 
1 row inserted

SQL> insert into hz.t6 values (3,'c1','zhenming',200);
 
1 row inserted
 
SQL> commit;
 
Commit complete
 
SQL>  select * from hz.t6;
 
        ID VNAME                VADDR                      VQTY
---------- -------------------- -------------------- ----------
         1 a1                   guangzhou                   100
         2 b1                   zhenzhou                    300
         3 c1                   zhenming                    200
查看目标库,也是3条记录:
SQL> select * from hz.t6;
 
        ID VNAME                VADDR                      VQTY
---------- -------------------- -------------------- ----------
         1 a1                   guangzhou                   100
         2 b1                   zhenzhou                    300
         3 c1                   zhenming                    200
 
目标库做删除操作:
SQL> DELETE HZ.T6 WHERE ID=3;
 
1 row deleted
 
SQL> commit;
 
Commit complete
 
SQL>  select * from hz.t6;
 
 
        ID VNAME                VADDR                      VQTY
---------- -------------------- -------------------- ----------
         1 a1                   guangzhou                   100
         2 b1                   zhenzhou                    300

原库也删除相同的序号3记录,
SQL> delete hz.t6 where id=3;
 
1 row deleted
 
SQL> commit;
 
Commit complete

目标库的应用进程显示abort.
SQL> select status from dba_apply;
 
STATUS
--------
ABORTED
应用进程的错误信息,
SQL> select LOCAL_TRANSACTION_ID,ERROR_MESSAGE    from dba_apply_error;
 
LOCAL_TRANSACTION_ID   ERROR_MESSAGE
---------------------- ----------------------------------------------------------------------
1.16.629               ORA-26787: The row with key ("ID") = (3) does not exist in table HZ.T1
                       ORA-01403: no data found
 

冲突解决
-------------------
冲突有很多类型,stream提供更新操作冲突类型的handler,其他类型的handler需要自己定义:

自定义handler的方法:
You use the SET_DML_HANDLER procedure in the DBMS_APPLY_ADM package to designate one or more custom conflict handlers for a particular table
 
 
CREATE TABLE strmadmin.history_row_lcrs(
timestamp DATE,
source_database_name VARCHAR2(128),
command_type VARCHAR2(30),
object_owner VARCHAR2(32),
object_name VARCHAR2(32),
tag RAW(10),
transaction_id VARCHAR2(10),
scn NUMBER,
commit_scn NUMBER,
old_values SYS.LCR$_ROW_LIST,
new_values SYS.LCR$_ROW_LIST)
NESTED TABLE old_values STORE AS old_values_ntab
NESTED TABLE new_values STORE AS new_values_ntab;
 
 
 

SQL> CREATE TABLE event_log (id NUMBER, timestamp DATE, event VARCHAR2(2000));
 
Table created

SQL> create sequence seq_event increment by 1 start with 1 nomaxvalue nocache;
 
Sequence created
 
 
CREATE TABLE strmadmin.history_row_lcrs(
timestamp DATE,
source_database_name VARCHAR2(128),
command_type VARCHAR2(30),
object_owner VARCHAR2(32),
object_name VARCHAR2(32),
tag RAW(10),
transaction_id VARCHAR2(10),
scn NUMBER,
commit_scn NUMBER,
old_values SYS.LCR$_ROW_LIST,
new_values SYS.LCR$_ROW_LIST)
NESTED TABLE old_values STORE AS old_values_ntab
NESTED TABLE new_values STORE AS new_values_ntab;
 

CREATE OR REPLACE PROCEDURE hz_t5_handler(in_any IN SYS.ANYDATA) IS
  lcr              SYS.LCR$_ROW_RECORD;
  rc               PLS_INTEGER;
  object_owner     VARCHAR2(30);
  object_name      VARCHAR2(40);
  dmlcommand       VARCHAR2(10);
  v_name           varchar2(20) := ' ';
  v_old_id_anydata SYS.ANYDATA;
  v_old_id         number;
  v_dummy          PLS_INTEGER;
  v_sqlcode        varchar2(32);
  v_sqlerrm        varchar2(255);
  --v_typecode       PLS_INTEGER;
  --v_type           SYS.ANYTYPE;
  non_null_anytype_for_NUMBER exception;
  unknown_typename exception;
  v_cnt number;
BEGIN
  -- Access the LCR
  rc           := in_any.GETOBJECT(lcr);
  object_owner := lcr.GET_OBJECT_OWNER();
  object_name  := lcr.GET_OBJECT_NAME();
  dmlcommand   := lcr.GET_COMMAND_TYPE();
  if dmlcommand in ('INSERT') then
 
    v_old_id_anydata := lcr.get_value('NEW', 'NO', 'Y');
  elsif dmlcommand in ('DELETE') then
    v_old_id_anydata := lcr.get_value('OLD', 'NO');
  else
    null;
  end if;
  -- Insert information about the LCR into the history_row_lcrs table
  INSERT INTO strmadmin.history_row_lcrs
  VALUES
    (SYSDATE,
     lcr.GET_SOURCE_DATABASE_NAME(),
     lcr.GET_COMMAND_TYPE(),
     lcr.GET_OBJECT_OWNER(),
     lcr.GET_OBJECT_NAME(),
     lcr.GET_TAG(),
     lcr.GET_TRANSACTION_ID(),
     lcr.GET_SCN(),
     lcr.GET_COMMIT_SCN,
     lcr.GET_VALUES('old'),
     lcr.GET_VALUES('new', 'n'));
  commit;
  
  v_dummy := v_old_id_anydata.GetNUMBER(v_old_id /* OUT */);
  DBMS_OUTPUT.PUT_LINE(TO_CHAR(rc) || To_Char(v_dummy));  --临时加的字段
  select count(*) into v_cnt from hz.t6@lirhz.net  a where a.id = v_old_id;
  if v_cnt = 1 then
    select vname into v_name from hz.T6@lirhz.net a where a.id = v_old_id;
 
    IF object_owner = 'HZ' and object_name = 'T6' and dmlcommand IN ('INSERT' /*, 'DELETE', 'UPDATE'*/
       ) THEN
   
      -- Add Columns
      lcr.add_column('NEW', 'NAME', sys.anydata.convertvarchar2(v_name));
    ELSE
      lcr.add_column('OLD', 'NAME', sys.anydata.convertvarchar2(v_name));
    END IF;
    LCR.EXECUTE(TRUE);
  elsif v_cnt = 0 then
    IF object_owner = 'HZ' and object_name = 'T6' and dmlcommand IN ('INSERT' /*, 'DELETE', 'UPDATE'*/
       ) THEN
   
      -- Add Columns
      lcr.add_column('NEW', 'NAME', sys.anydata.convertvarchar2(' '));
    ELSE
      lcr.add_column('OLD', 'NAME', sys.anydata.convertvarchar2(' '));
    END IF;
    LCR.EXECUTE(TRUE);
  else
    null;
  end if;
  insert into event_log
    (id, timestamp, event)
  values
    (seq_event.nextval,
     sysdate,
     'succeed! v_id=' || v_old_id || 'cnt=' || v_cnt || ',dmlcommand=' ||
     dmlcommand || ',v_name=' || v_name);
  commit;
exception
  when others then
    rollback;
    v_sqlcode := sqlcode;
    v_sqlerrm := sqlerrm;
    insert into event_log
      (id, timestamp, event)
    values
      (seq_event.nextval,
       sysdate,
       v_sqlcode || '|' || v_sqlerrm || ',v_old_id=' || v_old_id || ',cnt=' ||
       --v_cnt || ',v_name=' || v_name || ',dmlcommand=' || dmlcommand);
       v_cnt || ',v_name=' || v_name || ',dmlcommand=' || dmlcommand || TO_CHAR(rc)|| To_Char(v_dummy)  );
    commit;
 
END;
 
 
SQL> BEGIN
  2     DBMS_APPLY_ADM.SET_DML_HANDLER(
  3     object_name => 'hz.t6',
  4     object_type => 'TABLE',
  5     operation_name => 'DELETE',
  6     error_handler => false,
  7     user_procedure => 'strmadmin.hz_t5_handler',
  8     apply_database_link => NULL,
  9     apply_name => NULL);
 10    END;
 11  /
 
PL/SQL procedure successfully completed
SQL> BEGIN
  2     DBMS_APPLY_ADM.SET_DML_HANDLER(
  3     object_name => 'hz.t6',
  4     object_type => 'TABLE',
  5     operation_name => 'INSERT',
  6     error_handler => false,
  7     user_procedure => 'strmadmin.hz_t5_handler',
  8     apply_database_link => NULL,
  9     apply_name => NULL);
 10    END;
 11  /
 
PL/SQL procedure successfully completed

删除之前的应用进程错误号信息,重新时应用日志应用。
SQL> exec dbms_apply_adm.execute_all_errors('APPLY_STANDY');
 
PL/SQL procedure successfully completed

SQL> exec dbms_apply_adm.start_apply('APPLY_STANDY');
 
PL/SQL procedure successfully completed

原库重新插入id=3的记录:
SQL> insert into hz.t6 values (3,'c1','zhenming',200);
 
1 row inserted
 
SQL> commit;
 
Commit complete
 
SQL>  select * from hz.t6;
 
        ID VNAME                VADDR                      VQTY
---------- -------------------- -------------------- ----------
         1 a1                   guangzhou                   100
         2 b1                   zhenzhou                    300
         3 c1                   zhenming                    200
查看目标库,也是3条记录:
SQL> select * from hz.t6;
 
        ID VNAME                VADDR                      VQTY
---------- -------------------- -------------------- ----------
         1 a1                   guangzhou                   100
         2 b1                   zhenzhou                    300
         3 c1                   zhenming                    200
 
目标库做删除操作:
SQL> DELETE HZ.T6 WHERE ID=3;
 
1 row deleted
 
SQL> commit;
 
Commit complete
 
SQL>  select * from hz.t6;
 
 
        ID VNAME                VADDR                      VQTY
---------- -------------------- -------------------- ----------
         1 a1                   guangzhou                   100
         2 b1                   zhenzhou                    300

原库也删除相同的序号3记录,
SQL> delete hz.t6 where id=3;
 
1 row deleted
 
SQL> commit;
 
Commit complete
 

目标库的应用进程还是正常的.
SQL> select APPLY_NAME,status from dba_apply;
 
APPLY_NAME                     STATUS
------------------------------ --------
APPLY_STANDY                   ENABLED
 
SQL> select * from event_log;
 
        ID TIMESTAMP   EVENT
---------- ----------- -----------------------------------------------------------
         2 2010-10-15  -30625|ORA-30625: method dispatch on NULL SELF argument is disallowed,v_old_id=,cnt=,v_name= ,dmlcommand=DELETE
        
目标库插入需要6的记录:
SQL>  insert into hz.t6 values (6,'e1','haikou',600);
 
1 row inserted
 
SQL> commit;
 
Commit complete

原库也插入目标为6的序号:
SQL> insert into hz.t6 values (6,'e1','haikou',600);
 
1 row inserted
 
SQL> commit;
 
Commit complete
 
SQL> select APPLY_NAME,status from dba_apply;
 
APPLY_NAME                     STATUS
------------------------------ --------
APPLY_STANDY                   ENABLED

SQL> select * from event_log;
 
        ID TIMESTAMP   EVENT
---------- ----------- --------------------------------------------------------------------------
         2 2010-10-15  -30625|ORA-30625: method dispatch on NULL SELF argument is disallowed,v_old_id=,cnt=,v_name= ,dmlcommand=DELETE
         3 2010-10-15  -30625|ORA-30625: method dispatch on NULL SELF argument is disallowed,v_old_id=,cnt=,v_name= ,dmlcommand=INSERT
 

至此t6表的插入,删除操作 都已经正常进行,不会因为冲突而中断应用进程的应用了。



本文转自 gjm008 51CTO博客,原文链接:http://blog.51cto.com/gaoshan/406547,如需转载请自行联系原作者
上一篇:c#简单实现二维数组和二维数组列表List<>的转置


下一篇:从数据流动说开去,解读数据主权之争