UPDATE Airflow 1.10 makes logging a lot easier.
For s3 logging, set up the connection hook as per the above answer
and then simply add the following to airflow.cfg
[core]
# Airflow can store logs remotely in AWS S3. Users must supply a remote
# location URL (starting with either 's3://...') and an Airflow connection
# id that provides access to the storage location.
remote_base_log_folder = s3://my-bucket/path/to/logs
remote_log_conn_id = MyS3Conn
# Use server-side encryption for logs stored in S3
encrypt_s3_logs = False
For gcs logging,
-
Install the gcp_api package first, like so: pip install apache-airflow[gcp_api].
-
Set up the connection hook as per the above answer
-
Add the following to airflow.cfg
[core] # Airflow can store logs remotely in AWS S3. Users must supply a remote # location URL (starting with either 's3://...') and an Airflow connection # id that provides access to the storage location. remote_logging = True remote_base_log_folder = gs://my-bucket/path/to/logs remote_log_conn_id = MyGCSConn
NOTE: As of Airflow 1.9 remote logging has been significantly altered. If you are using 1.9, read on.
Reference here
Complete Instructions:
-
Create a directory to store configs and place this so that it can be found in PYTHONPATH. One example is $AIRFLOW_HOME/config
-
Create empty files called $AIRFLOW_HOME/config/log_config.py and
$AIRFLOW_HOME/config/__init__.py -
Copy the contents of airflow/config_templates/airflow_local_settings.py into the log_config.py file that was just created in the step above.
-
Customize the following portions of the template:
#Add this variable to the top of the file. Note the trailing slash. S3_LOG_FOLDER = 's3://<bucket where logs should be persisted>/' Rename DEFAULT_LOGGING_CONFIG to LOGGING CONFIG LOGGING_CONFIG = ... Add a S3TaskHandler to the 'handlers' block of the LOGGING_CONFIG variable 's3.task': { 'class': 'airflow.utils.log.s3_task_handler.S3TaskHandler', 'formatter': 'airflow.task', 'base_log_folder': os.path.expanduser(BASE_LOG_FOLDER), 's3_log_folder': S3_LOG_FOLDER, 'filename_template': FILENAME_TEMPLATE, }, Update the airflow.task and airflow.task_runner blocks to be 's3.task' instead >of 'file.task'. 'loggers': { 'airflow.task': { 'handlers': ['s3.task'], ... }, 'airflow.task_runner': { 'handlers': ['s3.task'], ... }, 'airflow': { 'handlers': ['console'], ... }, }
-
Make sure a s3 connection hook has been defined in Airflow, as per the above answer. The hook should have read and write access to the s3 bucket defined above in S3_LOG_FOLDER.
-
Update $AIRFLOW_HOME/airflow.cfg to contain:
task_log_reader = s3.task logging_config_class = log_config.LOGGING_CONFIG remote_log_conn_id = <name of the s3 platform hook>
-
Restart the Airflow webserver and scheduler, and trigger (or wait for) a new task execution.
-
Verify that logs are showing up for newly executed tasks in the bucket you’ve defined.
-
Verify that the s3 storage viewer is working in the UI. Pull up a newly executed task, and verify that you see something like:
*** Reading remote log from gs://<bucket where logs should be persisted>/example_bash_operator/run_this_last/2017-10-03T00:00:00/16.log. [2017-10-03 21:57:50,056] {cli.py:377} INFO - Running on host chrisr-00532 [2017-10-03 21:57:50,093] {base_task_runner.py:115} INFO - Running: ['bash', '-c', u'airflow run example_bash_operator run_this_last 2017-10-03T00:00:00 --job_id 47 --raw -sd DAGS_FOLDER/example_dags/example_bash_operator.py'] [2017-10-03 21:57:51,264] {base_task_runner.py:98} INFO - Subtask: [2017-10-03 21:57:51,263] {__init__.py:45} INFO - Using executor SequentialExecutor [2017-10-03 21:57:51,306] {base_task_runner.py:98} INFO - Subtask: [2017-10-03 21:57:51,306] {models.py:186} INFO - Filling up the DagBag from /airflow/dags/example_dags/example_bash_operator.py