使用气流配置单元运算符并输出到文本文件

嗨,我想使用气流配置单元运算符执行配置单元查询,并将结果输出到文件.我不想在这里使用INSERT OVERWRITE.

hive_ex = HiveOperator(
    task_id='hive-ex',
    hql='/sql/hive-ex.sql',
    hiveconfs={
        'DAY': '{{ ds }}',
        'YESTERDAY': '{{ yesterday_ds }}',
        'OUTPUT': '{{ file_path }}'+'csv',
    },
    dag=dag
)

做这个的最好方式是什么?

我知道如何使用bash运算符执行此操作,但想知道是否可以使用hive运算符

hive_ex = BashOperator(
    task_id='hive-ex',
    bash_command='hive -f hive.sql -DAY={{ ds }} >> {{ file_path }} 
    /file_{{ds}}.json',
    dag=dag
)

解决方法:

由于这是一个非常自定义的用例,因此最好的方法是扩展Hive运算符(或创建自己的Hive2CSVOperator).具体实施取决于您是否可以通过CLI或HiveServer2访问hive.

Hive CLI

我将首先尝试按照Hive CLI hook code配置Hive CLI连接并添加hive_cli_params,如果这不起作用,请扩展Hook(这将使您可以访问所有内容).

HiveServer2

这种情况下有一个单独的挂钩(link).由于它具有get_results方法(source)或to_csv方法(source),因此更加方便.

然后,运算符代码中的execute可能类似于以下内容:

def execute():
  ...
  self.hook = HiveServer2Hook(...)
  self.conn = self.hook.get_conn()

  self.conn.to_csv(hql=self.hql, csv_filepath=self.output_filepath, ...)
上一篇:python-气流DAG-如何首先检查BQ(如有必要,请删除),然后运行数据流作业?


下一篇:如何在无需重启气流Web服务器的情况下更新气流中的python函数