Data Pipeline for Time Series Data

Data Pipeline for Time Series Data

Table of Contents

Overview: Building a Scalable Time Series Data Pipeline

This project involved designing and implementing a robust data pipeline that collects monthly time series data from multiple sources, including FRED, US Census, AlphaVantage, and SEC Edgar, utilizing their respective APIs. Airflow, running within a Docker container, orchestrates the pipeline’s logic as a single DAG. A Python operator within the DAG extracts data from each API and stores it in CSV format within an S3 bucket acting as a data lake/landing zone. The pipeline guarantees the rate limit for each API is respected by limiting Airflow’s maximum parallelism. After the data is stored within the S3 bucket, a Lambda ETL task is initiated to read CSV files, apply necessary transformations, and upload the data to AWS Timestream data warehouse. Users can query time series data effortlessly utilizing SQL queries written on the AWS Query Editor. Moreover, we created a Temporal Fusion Transformer (TFT) model, which predicts inflation (CPI) in the US. This model is deployed on a local Jupyter notebook leveraging CUDA and can also be deployed on a SageMaker instance, enabling continuous data ingestion and prediction.

Image 1

Pipeline Orchestration: Streamlining Data Pipeline Management with Airflow on Docker

For orchestration, Airflow is a popular open-source platform for managing data pipelines. Its modular and scalable architecture allows users to easily manage and schedule complex workflows. Airflow provides a rich set of built-in operators and plugins for interfacing with a wide variety of data sources and destinations, making it a versatile tool for ETL and general data processing pipelines. Additionally, Airflow’s web interface makes it easy to monitor and troubleshoot pipeline execution. However, Airflow can be complex to set up and configure, and scaling horizontally may require additional infrastructure resources. Additionally, users may need to develop custom operators or plugins to interface with certain data sources or destinations.

Setting up Airflow on Docker

Steps to setup airflow in docker:

  1. Install Docker Community Edition (CE)
  2. Install Docker Compose v1.29.1 or newer
  3. run the following command to fetch the docker-compose.yaml file in terminal:
    • curl -LfO 'https://airflow.apache.org/docs/apache-airflow/2.6.0/docker-compose.yaml'
  4. Next, we must extend the official docker image in order to import our python dependencies. First, create a requirements.txt file and import the follwing packages:
      apache-airflow[amazon]
      apache-airflow-providers-http
      apache-airflow-providers-amazon
      matplotlib
      fredapi
      boto3
      setuptools
      awswrangler
      cryptography
    
  5. Now, create a Dockerfile, and add the follwing code to the file:
    FROM apache/airflow:2.5.3
    COPY requirements.txt /requirements.txt
    RUN pip install --user --upgrade pip
    RUN pip install --no-cache-dir --user -r /requirements.txt
    
  6. Next run the follwing command in terminal:
    • docker build . --tag extending_airflow:latest
  7. In the docker-compose.yml, under the webserver section, update image to:
    webserver:
     image: extending_airflow:latest
    
  8. Now you can simply run/shutdown the docker container using the follwoing commands
    • docker-compose up -d
    • docker-compose down
  9. The default username/password is ‘airflow’, and the airflow ui can be accessed here: http://localhost:4000/fred-data-pipeline/

Data Ingestion

We implemented separate Python operators to collect time series data from various APIs. These operators can be executed sequentially or in parallel to efficiently fetch and process the data. Currently, we are running this pipeline locally since it’s a small task. However, to scale this pipeline, we can leverage the modular architecture of Airflow to run these operators across multiple nodes in a distributed system. This can help improve pipeline throughput and reduce the overall execution time.

Data Sources

We are gathering monthly time series data that pertains to demographic and economic factors. Our data sources include FRED, US Census, alphavantage, and SEC Edgar. The data we are collecting covers a range of economic indicators such as inflation rate, industrial production index, consumer price index, housing starts, crude oil prices, and employment data. In particular, we are focusing on selecting leading indicators that have the potential to provide valuable information on future economic trends, specifically inflation.

Airflow DAG Setup

We have implemented separate python_callable files for each data source, which interact with their respective APIs, convert JSON responses to intermediate pandas dataframes, and write them to a CSV file in an S3 bucket. To avoid exceeding API rate limits (set to 10), we utilize Airflow pools. These python_callable tasks may be executed in parallel, but for visual purposes, we run them sequentially as shown in the image below. Following every parallel execution of python operators, a task_#_check is called, which is a dummyoperator. This is necessary because Airflow does not permit [task_list] » [task_list] notation. Instead, we must use [task_list] » dummyOperator » [task_list]. Finally, the lambda ETL job is triggered once all CSV files are written to the S3 bucket.

description of image

The following code is for the python_callable for retrieving data from the third party API (AlphaVantage):

      
        import sys
        import pandas as pd
        from datetime import datetime
        sys.path.append('/opt/airflow/dags/common_package/')
        import config
        from fredapi import Fred
        import awswrangler as wr
        import boto3
        from cryptography.fernet import Fernet
        from airflow.models import Variable
        import requests


        # Define the AWS credentials and S3 bucket name
        #aws_access_key_id = config.get_aws_key_id()
        #aws_secret_access_key = config.get_secret_access_key()
        s3_bucket_name = config.get_s3_bucket()
        fernet_key = config.get_fernet_key()
        fred_api_key = config.get_fred_key()
        vantage_api_key = config.get_alpha_vantage_key()

        # Retrieve the encrypted credentials from Airflow Variables
        encrypted_access_key_id = Variable.get('aws_access_key_id', deserialize_json=False)
        encrypted_secret_access_key = Variable.get('aws_secret_access_key', deserialize_json=False)

        # Decrypt the credentials using the encryption key
        key = fernet_key  # Replace with the encryption key generated in step 1
        fernet = Fernet(key)
        aws_access_key_id = fernet.decrypt(encrypted_access_key_id.encode()).decode()
        aws_secret_access_key = fernet.decrypt(encrypted_secret_access_key.encode()).decode()


        # Define a function to fetch data for a given FRED series and store it in a CSV file in S3
        def fetch_vantage_series_to_s3(vantage_series_id, **context):
            # Make API call and convert fron pd series to dataframe
            vantage_series = get_vantage_series(vantage_series_id, "TIME_SERIES_MONTHLY", vantage_api_key)

            for i in range(2):
                df = vantage_series[i]
                df = df.to_frame().reset_index()
                df = df.rename(columns={0:'value'})
                
                s3_session = boto3.Session(aws_access_key_id=aws_access_key_id,
                                    aws_secret_access_key=aws_secret_access_key)
                
                if i == 0: # for close price
                    # Generate a unique file name for the CSV file
                    file_name = f'{vantage_series_id.replace("/", "-")}-{datetime.now().strftime("%Y%m%d")}.csv'
                    # Write the DataFrame to a CSV file and upload it to S3
                    wr.s3.to_csv(df=df, path=f"s3://{s3_bucket_name}/csv-series-vantage/{file_name}", index=False, boto3_session=s3_session)
                else: # for volumne
                    # Generate a unique file name for the CSV file
                    name = f'{vantage_series_id}VOL'
                    file_name = f'{name.replace("/", "-")}-{datetime.now().strftime("%Y%m%d")}.csv'
                    # Write the DataFrame to a CSV file and upload it to S3
                    wr.s3.to_csv(df=df, path=f"s3://{s3_bucket_name}/csv-series-vantage/{file_name}", index=False, boto3_session=s3_session)


        def get_vantage_series(series, function, alpha_vantage_key):
            if function == "TIME_SERIES_MONTHLY":
                url = f"https://www.alphavantage.co/query?function=TIME_SERIES_MONTHLY&symbol={series}&apikey={alpha_vantage_key}"
                r = requests.get(url)
                json_data = r.json()
                monthly_data = json_data["Monthly Time Series"]
                close_series = pd.Series(
                    {datetime.strptime(date, '%Y-%m-%d').replace(day=1): float(data["4. close"]) for date, data in monthly_data.items()},
                    name="Close")
                # Convert the index to a DatetimeIndex
                #print(type(close_series.index))
                #atetime_object = datetime.strptime(str(close_series.index), '%y-%m-%d').replace(day=1)
                #print(datetime_object)
                close_series.index = pd.to_datetime(close_series.index)
                
                vol_series = pd.Series(
                    {datetime.strptime(date, '%Y-%m-%d').replace(day=1): float(data["5. volume"]) for date, data in monthly_data.items()},
                    name="Volume")
                vol_series.index = pd.to_datetime(vol_series.index)
                
                
                close_series.name = series
                vol_series.name = series + "-VOL"
                
                #close_series.name = close_series.index.replace(day="01")
                #vol_series.name = vol_series.index.replace(day="01")
                
                data = [close_series, vol_series]
            elif function == "DIGITAL_CURRENCY_MONTHLY":
                url = f'https://www.alphavantage.co/query?function=DIGITAL_CURRENCY_MONTHLY&symbol={series}&market=USD&apikey={alpha_vantage_key}'
                r = requests.get(url)
                json_data = r.json()
                #data = json_data
                monthly_data = json_data["Time Series (Digital Currency Monthly)"]
                close_series = pd.Series(
                    {datetime.strptime(date, '%Y-%m-%d').replace(day=1): float(data["4a. close (USD)"]) for date, data in monthly_data.items()},
                    name="Close")
                # Convert the index to a DatetimeIndex
                print(type(close_series.index))
                close_series.index = pd.to_datetime(close_series.index)
                
                vol_series = pd.Series(
                    {datetime.strptime(date, '%Y-%m-%d').replace(day=1): float(data['5. volume']) for date, data in monthly_data.items()},
                    name="Volume")
                vol_series.index = pd.to_datetime(vol_series.index)
                data = [close_series, vol_series]
            return data
      
  

The code for our dag is shown below.

      
          from datetime import datetime
          import requests
          import datetime
          from airflow import DAG
          from airflow.operators.python_operator import PythonOperator
          from airflow.operators.dummy_operator import DummyOperator
          from airflow.hooks.S3_hook import S3Hook


          import sys
          sys.path.append('/opt/airflow/dags/common_package/')
          import config
          import fetch_fred_series_s3
          import states_met_to_s3
          import lambda_invoke
          import fetch_vantage_series_s3
          import fetch_census_series_s3

          def test_task():
            print("First dag task test: successful")

          # Define the DAG
          # to limit the number of proccess calling fred api (60/min) ~ we limit the execution parallelism to 10 tasks using airflow pools
          with DAG(
              'fred_to_s3_batch_dag',
              start_date=datetime.datetime(2023, 4, 10),
              schedule_interval='@once', #Batch upload
              catchup=False,
              default_args={
                  'owner': 'airflow',
                  'depends_on_past': False,
                  'email_on_failure': False,
                  'email_on_retry': False,
                  'retries': 2,
                  'retry_delay': datetime.timedelta(minutes=5),
                  'pool':'fred_batch_pool'
              }
          ) as dag:
            
            test_task1 = PythonOperator(
              task_id='test_to_s3_task1',
              python_callable=test_task,
            )


            states_met_to_s3_task_final = PythonOperator(
              task_id='states_met_to_s3_task_final',
              python_callable=states_met_to_s3.states_met_to_s3,
              op_kwargs={
                  'test': 'Last time this works',
              },
              provide_context=True,
            )

            lambda_invoke = PythonOperator(
              task_id='lambda_invoke',
              python_callable=lambda_invoke.lambda_invoke_test
            )

            task_1_check = DummyOperator(task_id='task_1_check')

            task_2_check = DummyOperator(task_id='task_2_check')

            task_3_check = DummyOperator(task_id='task_3_check')


          # Define a list of FRED series IDs to fetch
          fred_series_ids = ["CPIAUCNS", # Target
                        
                        "M2SL", "INDPRO", "PPIACO", "CPITRNSL", "POPTHM", "CES4300000001", "USEPUINDXM", "DSPIC96", # CONVERT TO GROWTH RATE

                        "HOUST", "MCOILWTICO", "FEDFUNDS", "UNRATE" # Keep as is
                      ]
          # Define a PythonOperator for each FRED series to fetch
          fred_series_tasks = []
          for fred_series_id in fred_series_ids:
              task_id = f'fetch_{fred_series_id}_to_s3'
              op = PythonOperator(
                  task_id=task_id,
                  python_callable=fetch_fred_series_s3.fetch_fred_series_to_s3,
                  op_kwargs={
                      'fred_series_id': fred_series_id,
                  },
                  provide_context=True
                  
              )
              fred_series_tasks.append(op)


          vantage_series_ids = ["VOO", "XLP", "XLE", "XLB", "XAG"] # voo = market trackings, xlp=consumer staples, xle = energy, xlb = basic materials, xag = gold
          vantage_series_tasks = []
          for vantage_series_id in vantage_series_ids:
              task_id_vantage = f'fetch_{vantage_series_id}_to_s3'
              op = PythonOperator(
                  task_id=task_id_vantage,
                  python_callable=fetch_vantage_series_s3.fetch_vantage_series_to_s3
          ,
                  op_kwargs={
                      'vantage_series_id': vantage_series_id,
                  },
                  provide_context=True
                  
              )
              vantage_series_tasks.append(op)

          vantage_series_ids = ["VOO"]


          census_series_ids = ["firms", "emp", "fsize1", "fsize2", 'taxes', "payroll", "sup_val"]
          census_series_tasks = []
          for census_series_id in census_series_ids:
              task_id_census = f'fetch_{census_series_id}_to_s3'
              op = PythonOperator(
                  task_id=task_id_census,
                  python_callable=fetch_census_series_s3.fetch_census_series_to_s3
          ,
                  op_kwargs={
                      'series_id': census_series_id,
                  },
                  provide_context=True
                  
              )
              census_series_tasks.append(op)


          # Set the task dependencies so that all FRED series are fetched in parallel
          test_task1  >> fred_series_tasks >> task_1_check >> vantage_series_tasks >> task_2_check >> census_series_tasks >> task_3_check >> states_met_to_s3_task_final >>  lambda_invoke
      
  

AWS S3: Landing zone for CSV files

AWS S3 serves as our landing zone for all the CSV files generated by the data ingestion pipeline. We created an S3 bucket in the same region as our Airflow lambda instance for low latency access. This region choice is important as it can affect network latency and data transfer rates. In addition, we enabled versioning on the S3 bucket to ensure we can access previous versions of the CSV files if necessary.

Having a landing zone for our CSV files is an important component of a data pipeline as it allows us to store, access, and process large amounts of data efficiently. This is often referred to as a “data lake” approach, where data is stored in its raw form and processed as needed. An alternative approach is the “ELT” (extract, load, transform) method, where data is extracted from its source, loaded into a centralized database, and then transformed into a usable format. While both approaches have their pros and cons, the data lake approach allows for greater flexibility and scalability as it enables processing of data in its raw form without the need for complex ETL processes.

Note that the S3 bucket name must be unique globally. Our bucket is called dk-airflow-test-bucket. Below we can see that a separate folder is created for each of the data sources.

Image 1

Next, we can see all the files that have been added to the folder for FRED, after a successful DAG run.

Image 2

AWS Lambda: ETL Job

This ETL (Extract, Transform, Load) job comprises a Python script that functions as an AWS Lambda triggered by Airflow following successful ingestion of data into S3. The job reads and transforms multiple CSV files from an S3 bucket, and then writes the data to Amazon Timestream, a fully managed time series database. Notably, if a Timestream Table already contains data, only missing or future values are inserted into the database. The following are the primary steps of this job:

AWS Lambda Setup

Lambda Setup:

  • Memory: 128 MB, Ephemeral Storage: 512 MB, Timeout: 2 min (price per 1ms: $0.0000000083, Free Tier Eligible)
  • We add a layer to our lambda function to allow us to work with pandas (arn:aws:lambda:us-east-2:336392948345:layer:AWSSDKPandas-Python39:6).
  • We assign an create an IAM Role with full read/write access to Timestream and S3, and assign it to the lambda function

ETL Overview: Batch Writing to Timestream

ETL Steps:

  • Define the S3 bucket, Timestream client, and the database and table names, utilizing the boto3 package for python to communicate with AWS services.
  • Create a Timestream table with the create_timestream_table function, as detailed in the (#AWS Timestream Setup) section.
  • Extract: Read CSV files from each folder in the S3 bucket and merge them into a single Pandas dataframe through joining on date_index.
  • Transform: Convert pandas datetime to Unix timestamp and create a Timestream record if year >= 1970 (or has a non-negative Unix value).
  • Load: Batch write to Timestream with a batch size of 100 Timestream records to maximize throughput.

The code for this ETL job is displayed below (also on github):

      
        import json
        import boto3
        import re
        from datetime import datetime, timezone
        import time
        import pandas as pd
        import math


        def lambda_handler(event, context):
            # specify region to be us-east-2 for s3
            s3 = boto3.resource('s3', region_name='us-east-2')
            bucket = s3.Bucket('dk-airflow-test-bucket')
            client = boto3.client('timestream-write', region_name='us-east-2')
            print(client.meta.region_name)
            bucket_name = 'dk-airflow-test-bucket'
            database_name = 'fred-batch-data'
            table_name = 'csv_series_fred_combined'

            
            create_timestream_table(client, table_name, database_name)

            df = read_s3_csv_multiple(bucket_name, 'csv-series-fred')

            print(df.head())
            print(f'df shape: {df.shape}')
            
            create_timestream_records_df(client, table_name, database_name, df)

            return "Success"



        def read_s3_csv(bucket, key):
            s3 = boto3.resource('s3', region_name='us-east-2')
            obj = s3.Object(bucket, key)
            return obj.get()['Body'].read().decode('utf-8').split('\n')

        
        # read multiple csv files from s3 and return a list of lists of rows with date as index


        def read_s3_csv_multiple(bucket, prefix):
            s3 = boto3.resource('s3', region_name='us-east-2')
            csv_list = []

            for obj in s3.Bucket(bucket).objects.all():
                # Get the filename from the object key
                filename = obj.key.split('/')[-1]
                print(f'filename: {filename}')

                # Check if file is CSV and not empty
                if not filename.endswith('.csv') or obj.size == 0:
                    continue

                # Read the CSV file from S3 into a pandas dataframe
                csv_obj = s3.Object(bucket, obj.key)
                body = csv_obj.get()['Body']
                df = pd.read_csv(body, index_col=0, parse_dates=True)

                # Rename the first column to the filename (without extension)
                col_name = filename.split('.')[0]
                df.rename(columns={df.columns[0]: col_name}, inplace=True)

                # Add the dataframe as a new column in the csv_list
                csv_list.append(df.iloc[:, 0])

            # Combine all the dataframes in the csv_list into a single dataframe using the index as the key
            final_df = pd.concat(csv_list, axis=1)

            # Add a new column called "date" with the date index as the values
            final_df['date_index'] = final_df.index

            return final_df



        def create_timestream_table(client, table_name, database_name):
            print('table name: ', table_name)
            try:
                client.create_table(
                    DatabaseName=database_name,
                    TableName=table_name,
                    RetentionProperties={
                        'MemoryStoreRetentionPeriodInHours': 24,
                        'MagneticStoreRetentionPeriodInDays': 73000
                    },
                    Tags=[],
                    MagneticStoreWriteProperties={
                        'EnableMagneticStoreWrites': True,
                        'MagneticStoreRejectedDataLocation': {
                            'S3Configuration': {
                                'BucketName': 'dk-airflow-test-bucket',
                                'ObjectKeyPrefix': 'rejected-data/',
                                'EncryptionOption': 'SSE_S3'
                            }
                        }
                    }
                )
                print(f"Created table {table_name} in database {database_name}")
            except client.exceptions.ValidationException as e:
                print(f"Error creating table {table_name}: {str(e)}")
            except Exception as e:
                print(f"Unknown error creating table {table_name}: {str(e)}")



        def create_timestream_records(client, table_name, database_name, rows):
            records_details = {
                'DatabaseName': database_name,
                'TableName': table_name,
            }

            records = []
            record_counter = 0

            table_name_split = table_name.split('_')

            try:
                series_name = table_name_split[3]
            except ValueError:
                print(table_name_split)
                series_name = table_name
                


            count = len(rows[1:])
            print(f"row count: {count}")
            for row in rows[1:]:
                values = row.split(',')
                if len(values) != 2:
                    print(f"Error: {row} does not have 2 values (lmd)")
                    print(f"values: {values}")
                    continue
                    
                # Convert timestamp to unix
                try:
                    values[0] = re.sub(r"[-]", "/", values[0])
                    if validate_y_m_d(values[0]) == True:
                        date_obj = datetime.strptime(values[0], '%Y/%m/%d')
                    else:
                        date_obj = datetime.strptime(values[0], '%m/%d/%Y')
                    if date_obj < datetime(1970, 1, 1):
                        continue
                    
                    timestamp = int(date_obj.timestamp() * 1000)
                    #print('entered this')
                except ValueError:
                    #print(f"Invalid date string: {values[0]}")
                    continue

                

                #print(f'exited after')
                # Create a new record with the current time as the timestamp
                current_time = str(int(round(time.time() * 1000)) - (count))
                #print(f'current time: {current_time}')

          

                multi_record = {
                    'Dimensions': [
                        {'Name': 'S3 Region', 'Value': 'us-east-2'}
                    ],
                    'MeasureValueType': 'MULTI',
                    'MeasureName': series_name,
                    'Time': current_time,
                    'MeasureValues': [
                    {'Name': 'Date', 'Type': 'TIMESTAMP', 'Value': str(timestamp)},
                    {'Name': 'Value', 'Type': 'DOUBLE', 'Value': str(values[1]) }
                    ]
                }

                records.append(multi_record)
                record_counter += 1
                count += 10

                if len(records) == 100:
                    batch_to_timestream(client, table_name, database_name, records, record_counter)
                    records= []

            if len(records) != 0:
                batch_to_timestream(client, table_name, database_name, records, record_counter) 

            #print(f"table {table_name}, with {len(records['Records'])} actual records")
            return True


        def create_timestream_records_df(client, table_name, database_name, df):
            records = []
            record_counter = 0
            count = 0

            for index, row in df.iterrows():

                # Check if the date in 'date_index' column is before the Unix time
                if row['date_index'] < datetime(1970, 1, 1):
                  # print(f"Invalid date string: {row['date_index']}")
                    continue

                current_time = str(int(round(time.time() * 1000)) - (record_counter*100))

                # create measure values for each column in the row
                measure_values = []
                for col in df.columns:
                    if col == 'date_index':
                    
                        #ts_date = datetime.strptime(row[col], '%Y-%m-%d %H:%M:%S')
                        ts_date = row[col].to_pydatetime()
                        timestamp = int(ts_date.timestamp() * 1000)
                        print(f'type = {type(row[col])} ,row[col]: {row[col]},  Timestamp: {timestamp}')
                  

                        measure_values.append({'Name': 'date_index', 'Type': 'TIMESTAMP', 'Value': str(timestamp)})

                        
                        
                    else:
                    # Check for NaN values and replace them with null
                        value = row[col]
                        if pd.isna(value):
                            value = str(float(0.0))
                        else:
                            value = str(value)
                        col_name = col.split('-')[0]
                        measure_values.append({'Name': col_name, 'Type': 'DOUBLE', 'Value': value})

                # Create a new record with the current time as the timestamp
                multi_record = {
                    'Dimensions': [
                        {'Name': 'S3 Region', 'Value': 'us-east-2'}
                    ],
                    'MeasureValueType': 'MULTI',
                    'MeasureName': 'fred_pipeline_measure',
                    'Time': current_time,
                    'MeasureValues': measure_values
                }

                records.append(multi_record)
                record_counter += 1

                if len(records) == 100:
                    batch_to_timestream(client, table_name, database_name, records, record_counter)
                    records = []

            if len(records) != 0:
                batch_to_timestream(client, table_name, database_name, records, record_counter)

            return True




        def batch_to_timestream(client, table_name, database_name, records, counter):
            try:
                result = client.write_records(DatabaseName=database_name, TableName=table_name,
                                                    Records=records, CommonAttributes={})
                print("Processed [%d] records. WriteRecords Status: [%s]" % (counter,
                                                                                result['ResponseMetadata']['HTTPStatusCode']))
            except client.exceptions.RejectedRecordsException as err:
                print("RejectedRecords: ", err)
                for rr in err.response["RejectedRecords"]:
                    print("Rejected Index " + str(rr["RecordIndex"]) + ": " + rr["Reason"])
                print("Other records were written successfully. ")
            except Exception as err:
                print("Error:", err)

        def delete_table(client, table_name, database_name):
            print("Deleting Table")
            try:
                result = client.delete_table(DatabaseName=database_name, TableName=table_name)
                print("Delete table status [%s]" % result['ResponseMetadata']
                ['HTTPStatusCode'])
            except client.exceptions.ResourceNotFoundException:
                print("Table [%s] doesn't exist" % table_name)
            except Exception as err:
                print("Delete table failed:", err)

            
        def validate_y_m_d(date_str):
            pattern = r'^(\d{4})[-/](\d{1,2})[-/](\d{1,2})$'
            match = re.match(pattern, date_str)
            if not match:
                return False
            year, month, day = match.groups()
            year = int(year)
            month = int(month)
            day = int(day)
            try:
                datetime(year, month, day)
                return True
            except ValueError:
                return False


 
      
  

AWS Timestream: Time Series Data Warehouse

AWS Timestream is a fully managed time-series database service offered by Amazon Web Services (AWS) that is designed to handle large-scale time series data with high durability and availability. Unlike traditional SQL databases, Timestream is optimized for handling time series data and provides features like automatic scaling, data retention management, and the ability to query large datasets quickly. One of the main advantages of Timestream over traditional databases is that it allows for efficient and easy storage and retrieval of time series data at scale.

Other populer alternatives to Timstream include InfluxDB, OpenTSDB, and TimescaleDB. These databases provide similar features to Timestream, but they may differ in terms of performance, scalability, and ease of use. InfluxDB, for instance, is a popular open-source time-series database that offers high write and query performance and supports SQL-like query languages. OpenTSDB is another popular open-source database that provides horizontal scaling and advanced features like histograms and percentile aggregations. TimescaleDB is an open-source database that extends PostgreSQL to handle time-series data and provides features like automatic data partitioning and multi-node clustering. However, Timestream is specifically designed and optimized for the AWS ecosystem, which makes it a better choice if you are already using AWS services and need a fully managed time-series database that can easily integrate with other AWS services.

AWS Timestream Setup

Our created Timestream table has the following properties:

  • Database retention period: 73000 days for magnetic store (Max amount for historical data) and 24 hours for memory store
    • Note: Magnetic Store is for larger volume & cost effective long storage, while memory store is for faster access of recently stored data
  • Memory store and magnetic store writes are enabled
  • Magnetic store rejected data is stored in an S3 bucket with Server-Side Encryption (SSE-S3)

Forecasting US Inflation with Temporal Fusion Transformer DNN