Amazon S3 Data Receiver

The s3 data receiver connects to an Amazon S3 instance and reads files stored on it.

Example:

If we wanted to get CSV files with “station1” in the file name from the /data/test-data/multiple-files directory in our AWS S3 instance, our configuration will look something like this:

{
    "data_receiver": [
        {
            "data_receiver_name": "s3_receiver0",
            "protocol": "s3",
            "poll_interval": 5,
            "connections": [
                {
                    "endpoint_url": "https://s1-s2-s3",
                    "access_key_id": "akey",
                    "secret_access_key": "asecretkey",
                    "bucket": "mybucket1",
                    "regex": ".*station1.*"
                }
            ],
            "streams": [
                {
                    "asset": "S3_Station_1",
                    "stream_type": "cycle",
                    "file_filter": ["*.csv"]
                }
            ]
        }
    ]
}
Configuration:

The s3 data receiver uses the base file receiver that connects to a remote S3 instance.

All file receivers support these required and optional properties:

  • data_receiver_name: Unique name of the data receiver. This name will be used to track the progress state of the data stream.

  • protocol: Protocol to be used.

  • streams: List of input data streams to read from.

    • asset: Asset identifier

    • stream_type: Type of data stream

    • parser: Name of the parser used to convert the file.

    • file_filter: List of files to filter on. Items can be regular expressions. You must select either file_filter or path_filter but not both.

    • path_filter: List of paths to filter on. Items can be regular expressions. You must select either file_filter or path_filter but not both.

    • parser: Name of the parser used to convert the file. This has to match one of the parser_name values in the parsers section.

  • connections: A list of connection settings used to connect to the data source. Please refer to the connection settings below.

  • process_files_alphabetically: true to process all of the files retrieved by the connection(s) in alphabetical order. By default, this option is false, so files are processed by last modified (or created) time with oldest files first.

  • process_ordered_connections: If there are multiple connections and you want to process (and transmit) the files received from one connection before another, set option to true to process files based on the order of the connection(s). By default, this option is set to false. This option is usually related to the Kafka transmit because events within a Kafka topic need to be in order. For example, if one connection receives historical files and another connection receives “realtime” files, you’ll want to enable this setting and order the connections with the historical connection first, so that events in the Kafka topic are in chronological order. Please note that the order of the files per connection is maintained: if process_files_alphabetically is enabled, files will be parsed in alphabetical order; if disabled, files will be parsed in chronological order based on the last modified time.

  • parsers: A list of parsers that can be used for the input streams. Each parser has the following default properties:

    • parser_name: A customizable name used by streams to identify the parser.

    • parser_type: Type of parser to apply (e.g. csv)

    • parser_version: Version of the parser type to apply.

    FactoryTX has a few built-in parsers available for you to use. Please refer to the Parsers Configurations section of the manual for more details about them.

  • skip_parser_errors: Whether the receiver will allow parser errors when processing files. If True, parsing errors will be skipped and logged in a quarantine log file while the receiver continues processing. Otherwise if we encounter a parsing error, the receiver will halt until the file gets repaired. (Only works for csv files)

  • archive_completed: true to keep a local copy of each file received from the remote server, or false if local copies should not be kept. If enabled, files will be archived until their total size increases above the amount specified in the max_archive_size_mb parameter.

  • max_archive_size_mb: If archive_completed is true, delete the archive completed files once the total size (in mb) is greater than this value. A negative value means never delete any files. Defaults to -1.

  • delete_completed: true if files should be deleted from the data directory after they have been received by FactoryTX, or false if files should never be deleted. Defaults to false to avoid accidentally losing data.

    • archive_completed and delete_completed are independent of each other. Archiving will create a copy of the file in a new directory, while deleting will remove the original file.

  • read_in_progress_files: Whether to read files as soon as they are created, or wait for the upstream service to stop writing to the file before reading it.

  • emit_file_metadata: Whether or not to inject additional columns into each record containing metadata about the file it came from. If this setting is enabled then every record will contain fields named file_name, file_path, and file_timestamp.

  • poll_interval: Number of seconds to wait before fetching new data from the server.

  • temporary_file_directory: Specify the directory to temporarily store files that have been downloaded from the connection(s). By default, the directory used is /tmp.

Connection Settings:

Required and optional properties that can be configured for a S3 connection:

  • access_key_id: The Access Key ID for the S3 connection

  • secret_access_key: The Secret Access Key for the S3 connection

  • bucket: The name of the bucket within S3 to access

  • assume_role: Credentials for an IAM user or IAM role that FactoryTX uses to request temporary security credentials to access AWS resources.

    • role_arn: The Amazon Resource Name (ARN) for the role to assume. (e.g. arn:aws:iam::012345678901:role/my-role-name).

    • role_session_name: An identifier for the assumed role session to uniquely identify a session when the same role is assumed by different reasons. The regex used to validate this parameter is a string of characters consisting of upper- and lower-case alphanumeric characters with no spaces. Underscores and =,.@- are also allowed characters. By default, the session name is AssumeRoleSession.

    • external_id: A unique identifier that might be required when you assume a role in another account (e.g. MyArbitraryId).

  • endpoint_url: The complete URL to use for the constructed client

  • connection_name: Unique name for the connection.

  • regex: A regular expression to filter the contents of the S3 bucket.

  • data_directory: Path to a subdirectory containing data files to ingest. If this option is not set then all files in all subdirectories will be scanned.

  • retrieve_ordered_files: When enabled, this option speeds up the operation of retrieving new files if the following conditions apply:

    • Files are added in a lexicographical (alphabetical) order based on their file name.

    • Files are not modified after being added to the directory.

    A violation of these conditions will cause unknown behavior.

    The default value of this setting is false. When set to true, the receiver will consume File_0100 followed by File_0200 as long as the last completed file has a name lexicographically smaller than the one currently being processed.

    Please note that if this option is enabled, you will need to explicitly set a connection_name.

Please refer to Notes on ordered file processing for more details about file receiver options.