嗨,我想使用气流配置单元运算符执行配置单元查询,并将结果输出到文件.我不想在这里使用INSERT OVERWRITE.
hive_ex = HiveOperator(
task_id='hive-ex',
hql='/sql/hive-ex.sql',
hiveconfs={
'DAY': '{{ ds }}',
'YESTERDAY': '{{ yesterday_ds }}',
'OUTPUT': '{{ file_path }}'+'csv',
},
dag=dag
)
做这个的最好方式是什么?
我知道如何使用bash运算符执行此操作,但想知道是否可以使用hive运算符
hive_ex = BashOperator(
task_id='hive-ex',
bash_command='hive -f hive.sql -DAY={{ ds }} >> {{ file_path }}
/file_{{ds}}.json',
dag=dag
)
解决方法:
由于这是一个非常自定义的用例,因此最好的方法是扩展Hive运算符(或创建自己的Hive2CSVOperator).具体实施取决于您是否可以通过CLI或HiveServer2访问hive.
Hive CLI
我将首先尝试按照Hive CLI hook code配置Hive CLI连接并添加hive_cli_params,如果这不起作用,请扩展Hook(这将使您可以访问所有内容).
HiveServer2
这种情况下有一个单独的挂钩(link).由于它具有get_results方法(source)或to_csv方法(source),因此更加方便.
然后,运算符代码中的execute可能类似于以下内容:
def execute():
...
self.hook = HiveServer2Hook(...)
self.conn = self.hook.get_conn()
self.conn.to_csv(hql=self.hql, csv_filepath=self.output_filepath, ...)