一、简介
ORACLE11g R2版本的新特性之一就是引进了DBMS_PARALLEL_EXECUTE包,使用DBMS_PARALLEL_EXECUTE包批量并行递增式的更新表。
更多ORACLE11g新特性请参考:http://www.cnblogs.com/oracle-dba/articles/3632223.html
基本原理:
(1)
把数据集分割成小的块(chunk),可基于rowid进行分块,也可根据指定范围的行数(rows)进行分块。
(2)
在每一个块上以并行的方式应用update语句,在每个块执行完成后,立即提交,原理是通过调用JOB进行并发操作。
好处在于:
(1)
在执行update操作时,仅仅锁住一个chunk而非锁住整个表;
(2) 因为对每个chunk
执行完毕就提交,所以当update操作失败后,之前变更的并不会回滚;
(3) 减小回滚空间(undo)的使用;
(4)
提高性能,可根据服务器的性能设置chunk_size与parallel_level的大小。
备注:parallel_level取决于CPU的个数,以及parallel_threads_per_cpu。
DBMS_PARALLEL_EXECUTE 使用三种将一个表的数据分割成多个chunk的方法:
(1)
CREATE_CHUNKS_BY_NUMBER_COL : 通过指定的字段来切割表
(2) CREATE_CHUNKS_BY_ROWID :
通过ROWID来切割表,本文只介绍通过BY ROWID
进行分割
(4) CREATE_CHUNKS_BY_SQL : 通过用户提供的sql语句来切割表
二、实践操作
DBMS_PARALLEL_EXECUTE这个包操作起来比较简单,大体步骤为:
(1)创建任务task,即调用create_task()过程;
(2)创建分块规则,即调用create_chunk_by_rowid()或者create_chunk_by_number_col()等过程;
(3)写动态update的sql语句,复杂和简单的均可,可以通过dbms_output.put_line()来验证动态sql的正确性;
(4)运行task任务,即调用run_task()过程;
(5)监控task任务,可通过视图user_parallel_execute_tasks和user_parallel_execute_chunks来监控task状态;
(6)task成功运行完毕后,将任务删除,即调用drop_task()过程。
2.1 准备工作
给用户授权
调用dbms_parallel_execute包时,需要有执行create
job的权限,因此需要给用于赋予create job权限。
grant create job to
username;
也可把grant execute on dbms_scheduler to username;赋予某用户。
2.2 写代码
实践操作代码,有改动,仅做参考。
1
2
3
4
5
6
7
8
9
10
11
12
|
CREATE OR REPLACE
PROCEDURE Pro_Parallel_Exec_Update(i_Taskid IN
VARCHAR2,
--任务ID 传入是序列 para_task_seq.nextval
i_Inputdate IN
DATE ,
--日期
i_Tabname IN
VARCHAR2,
--更新表名
i_Column IN
VARCHAR2,
--更新列名
i_Tasktp IN
VARCHAR2 DEFAULT
‘BY ROWID‘
-- 任务类型 未启用的,可设置不能类型的 update
-- 如: 如不同类型的分块方法 by rowid , by number_col ,by sql
) IS
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
|
/******************************************************************* PROC_NAME : 并行更新过程,本例采用 BY
ROWID 方法
EDIT_NAME : zzd @2014-03-26
REQUIREMENT :
1) ORACLE 11g R2 版本
2) DBMS_PARALLEL_EXECUTE
3) OWN CREATE
JOB PRIVILEGE eg: GRANT
EXECUTE ON DBMS_SCHEDULER TO
USER
;
4) SET
JOB_QUEUE_PROCESSES
REFERENCE:
(1) USER_PARALLEL_EXECUTE_CHUNKS
(2) USER_PARALLEL_EXECUTE_TASKS
(3) DBMS_PARALLEL_EXECUTE.DROP_TASK(TASK_NAME);
ATTACH :
(1) 分块范围 Chunk_Size 和并行度 Paralle_Level 可以根据服务器性能自行设置
(2) DBMS_PARALLEL_EXECUTE 包三种 ( BY
ROWID, BY
NUMBER_COL , BY
SQL ) 分块方法,原理差不多
(3) 更多说明参考PLSQL Packages and
Types Reference_11R2.pdf
********************************************************************/
--- 任务名常量
v_Taskname VARCHAR2(20) := ‘UPDATE_TASK‘ ;
--- schema常量
v_Schema CONSTANT VARCHAR2(20) := ‘USER‘ ;
--- rowid范围常量
v_Cksize CONSTANT INT
:= 8000;
--- 并行度大小
v_Plevel CONSTANT INT
:= 10;
--- 日期变量
v_Inputdate VARCHAR2(64);
--- 临时动态sql 变量
v_Sql_Stmt VARCHAR2(4000);
--- 临时动态insert_sql 变量
v_Sql_Insert VARCHAR2(4000);
--- 控制尝试运行次数
v_Trynum NUMBER;
--- 返回运行状态
v_Status NUMBER;
BEGIN v_Trynum := 0;
--- 日期转换
v_Inputdate := ‘TO_DATE(‘ ‘‘
|| To_Char(i_Inputdate, ‘YYYY-MM-DD‘ ) ||
‘‘ ‘,‘ ‘‘
|| ‘YYYY-MM-DD‘
|| ‘‘ ‘)‘ ;
--- 写日志开始更新
Dbms_Output.Put_Line( ‘开始运行 ‘
|| ‘ USER = ‘
|| v_Schema ||
‘ Table_Name= ‘
|| i_Tabname ||
‘ Update_Column = ‘
|| i_Column || ‘ 任务类型为: ‘
||
i_Tasktp);
--- <span style="color: rgb(255, 0, 0);">注意这里只能用实体表作为临时表,此处称“中间表”否则在创建task 这步会把临时表的数据情况,事务性和会话性临时表均不可以</span>
--- 清空 中间表数据
EXECUTE
IMMEDIATE ‘ Truncate TABLE User_Table_Temp ‘ ; -- 注这个是个实体中间表
--- 动态 insert sql 语句
v_Sql_Insert := ‘ INSERT INTO User_Table_Temp(Table_Name, Status)
SELECT a.Table_Name, a.Status
FROM User_Tables a
WHERE a.Table_Name LIKE ‘
|| ‘‘ ‘‘
||
‘ USER_NAME% ‘
|| ‘‘ ‘‘ ;
--- 执行动态sql
EXECUTE
IMMEDIATE v_Sql_Insert;
--- 测试动态sql
Dbms_Output.Put_Line(v_Sql_Insert);
--- 创建任务
v_Taskname := v_Taskname || ‘ _ ‘
|| i_Taskid;
Dbms_Parallel_Execute.Create_Task(v_Taskname);
--- 使用 ROWID 进行范围分组
Dbms_Parallel_Execute.Create_Chunks_By_Rowid(Task_Name => v_Taskname,
Table_Owner => Upper (v_Schema),
Table_Name => Upper (i_Tabname),
By_Row => TRUE ,
Chunk_Size => v_Cksize);
--- 动态update的sql 脚本
v_Sql_Stmt := ‘ UPDATE /*+ ROWID (dda) */ ‘
|| Upper (i_Tabname) || ‘ b
SET b.‘
|| i_Column;
v_Sql_Stmt := v_Sql_Stmt || ‘ = (SELECT a.TABLE_NAME|| A.STATUS
FROM User_Table_Temp a
WHERE b. Object_Name‘
||
‘ = a.TABLE_NAME
AND b.Created = ‘
|| v_Inputdate || ‘) ‘ ;
v_Sql_Stmt := v_Sql_Stmt || ‘
WHERE EXISTS (SELECT 1
FROM User_Table_Temp a
WHERE b.Object_Name ‘
||
‘ = a.TABLE_NAME
AND b.Created = ‘
|| v_Inputdate || ‘)‘ ;
v_Sql_Stmt := v_Sql_Stmt || Chr(10) ||
‘AND ROWID BETWEEN :Start_Id AND :End_Id ‘ ;
--- 测试动态SQL
Dbms_Output.Put_Line(v_Sql_Stmt);
--- 执行并行更新
Dbms_Parallel_Execute.Run_Task(Task_Name => v_Taskname,
Sql_Stmt => v_Sql_Stmt,
Language_Flag => Dbms_Sql.Native,
Parallel_Level => v_Plevel);
--- 更新结束
Dbms_Output.Put_Line( ‘完成运行 ‘
|| ‘ USER = ‘
|| v_Schema ||
‘ Table_Name= ‘
|| i_Tabname ||
‘ Update_Column = ‘
|| i_Column || ‘ 任务类型为: ‘
||
i_Tasktp);
--- 如果 task任务出差,则恢复它,并重新执行,如此进行尝试两次
v_Status := Dbms_Parallel_Execute.Task_Status(v_Taskname);
WHILE (v_Trynum < 2 AND
v_Status != Dbms_Parallel_Execute.Finished) LOOP
v_Trynum := v_Trynum + 1;
--- 恢复任务,尝试继续执行
Dbms_Parallel_Execute.Resume_Task(v_Taskname);
--- 获取当前任务状态
v_Status := Dbms_Parallel_Execute.Task_Status(v_Taskname);
END
LOOP;
-- 更新完毕后,删除此任务
Dbms_Parallel_Execute.Drop_Task(v_Taskname);
END Pro_Parallel_Exec_Update;
|
三、相关视图
(1)USER_PARALLEL_EXECUTE_CHUNKS
(2)USER_PARALLEL_EXECUTE_TASKS