setting up s3 for logs in airflow

UPDATE Airflow 1.10 makes logging a lot easier.

For s3 logging, set up the connection hook as per the above answer

and then simply add the following to airflow.cfg

    [core]
    # Airflow can store logs remotely in AWS S3. Users must supply a remote
    # location URL (starting with either 's3://...') and an Airflow connection
    # id that provides access to the storage location.
    remote_base_log_folder = s3://my-bucket/path/to/logs
    remote_log_conn_id = MyS3Conn
    # Use server-side encryption for logs stored in S3
    encrypt_s3_logs = False

For gcs logging,

  1. Install the gcp_api package first, like so: pip install apache-airflow[gcp_api].

  2. Set up the connection hook as per the above answer

  3. Add the following to airflow.cfg

    [core]
    # Airflow can store logs remotely in AWS S3. Users must supply a remote
    # location URL (starting with either 's3://...') and an Airflow connection
    # id that provides access to the storage location.
    remote_logging = True
    remote_base_log_folder = gs://my-bucket/path/to/logs
    remote_log_conn_id = MyGCSConn
    

NOTE: As of Airflow 1.9 remote logging has been significantly altered. If you are using 1.9, read on.

Reference here

Complete Instructions:

  1. Create a directory to store configs and place this so that it can be found in PYTHONPATH. One example is $AIRFLOW_HOME/config

  2. Create empty files called $AIRFLOW_HOME/config/log_config.py and
    $AIRFLOW_HOME/config/__init__.py

  3. Copy the contents of airflow/config_templates/airflow_local_settings.py into the log_config.py file that was just created in the step above.

  4. Customize the following portions of the template:

    #Add this variable to the top of the file. Note the trailing slash.
    S3_LOG_FOLDER = 's3://<bucket where logs should be persisted>/'
    
    Rename DEFAULT_LOGGING_CONFIG to LOGGING CONFIG
    LOGGING_CONFIG = ...
    
    Add a S3TaskHandler to the 'handlers' block of the LOGGING_CONFIG variable
    's3.task': {
        'class': 'airflow.utils.log.s3_task_handler.S3TaskHandler',
        'formatter': 'airflow.task',
        'base_log_folder': os.path.expanduser(BASE_LOG_FOLDER),
        's3_log_folder': S3_LOG_FOLDER,
        'filename_template': FILENAME_TEMPLATE,
    },
    
     Update the airflow.task and airflow.task_runner blocks to be 's3.task' instead >of 'file.task'.
    'loggers': {
        'airflow.task': {
            'handlers': ['s3.task'],
            ...
        },
        'airflow.task_runner': {
            'handlers': ['s3.task'],
            ...
        },
        'airflow': {
            'handlers': ['console'],
            ...
        },
    }
    
  5. Make sure a s3 connection hook has been defined in Airflow, as per the above answer. The hook should have read and write access to the s3 bucket defined above in S3_LOG_FOLDER.

  6. Update $AIRFLOW_HOME/airflow.cfg to contain:

    task_log_reader = s3.task
    logging_config_class = log_config.LOGGING_CONFIG
    remote_log_conn_id = <name of the s3 platform hook>
    
  7. Restart the Airflow webserver and scheduler, and trigger (or wait for) a new task execution.

  8. Verify that logs are showing up for newly executed tasks in the bucket you’ve defined.

  9. Verify that the s3 storage viewer is working in the UI. Pull up a newly executed task, and verify that you see something like:

    *** Reading remote log from gs://<bucket where logs should be persisted>/example_bash_operator/run_this_last/2017-10-03T00:00:00/16.log.
    [2017-10-03 21:57:50,056] {cli.py:377} INFO - Running on host chrisr-00532
    [2017-10-03 21:57:50,093] {base_task_runner.py:115} INFO - Running: ['bash', '-c', u'airflow run example_bash_operator run_this_last 2017-10-03T00:00:00 --job_id 47 --raw -sd DAGS_FOLDER/example_dags/example_bash_operator.py']
    [2017-10-03 21:57:51,264] {base_task_runner.py:98} INFO - Subtask: [2017-10-03 21:57:51,263] {__init__.py:45} INFO - Using executor SequentialExecutor
    [2017-10-03 21:57:51,306] {base_task_runner.py:98} INFO - Subtask: [2017-10-03 21:57:51,306] {models.py:186} INFO - Filling up the DagBag from /airflow/dags/example_dags/example_bash_operator.py
    

Leave a Comment