S3

import boto3

class S3wrapper(object):

    """Docstring for S3wrapper. """

    def __init__(self):
        # Create an S3 client
        session = boto3.Session(profile_name='some-profile')
        self.s3 = session.client('s3')


    def upload(self, file_name = 'tags', bucket_name = 'some-bucket',
            file_path = "some-path"):

        self.file_name = file_name
        self.bucket_name = bucket_name
        self.file_path = file_path
        self.key = file_path + file_name

        # Uploads the given file using a managed uploader, which will split up large
        # files automatically and pload parts in parallel.
        self.s3.upload_file(self.file_name, self.bucket_name, self.key)

Kinesis

Simply send some data

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import sys

import boto3


STREAM_NAME="kinesis_stream_name"
STREAM_REGION="us-east-1"
PROFILE_NAME="pruebas-kinesis"

if len(sys.argv) > 1:
    print "File to be read is: {}".format(sys.argv[1])
    filename = sys.argv[1]
else:
    print "Usage:"
    print "======"
    print "     {} file_to_send".format(sys.argv[0])
    sys.exit(0)

# Kinesis initialization and information
session = boto3.Session(profile_name=PROFILE_NAME)
kinesis = session.client(service_name='kinesis')

data = open(filename).read()
print "********************"
print "Will try to send this data: "
print data
print "********************"

response = kinesis.put_record(
    StreamName=STREAM_NAME,
    Data=data,
    PartitionKey='something'
)

print "********************"
print "Data sent"
print "********************"

print "HTTPStatusCode: {}".format(response.values()[1]['HTTPStatusCode'])

Read write

import json
import time
from boto import kinesis

STREAM_NAME="stream_name"
STREAM_REGION="stream_region"

# Kinesis initialization and information
kinesis = kinesis.connect_to_region(STREAM_REGION)
kinesis.describe_stream(STREAM_NAME)

# Put data
c = open('sample_data.json').read()
print "********************"
print "Will try to send data: "
print c
kinesis.put_record(STREAM_NAME, c, "partitonkey")
print "Data sent"
print "********************"

# Read data
print "********************"
print "Will try to read data"
shard_id = 'shardId-00000000001'
shard_it = kinesis.get_shard_iterator(STREAM_NAME, shard_id, "LATEST")["ShardIterator"]
while True:
    out = kinesis.get_records(shard_it, limit=2)
    shard_it = out['NextShardIterator']
    print out
    time.sleep(0.2)
print "********************"

Read

import sys
from time import strftime, gmtime
import logging
import json

# Boto stuff
from boto import kinesis
import boto3
import botocore

max_records = 20000

logging.basicConfig(level=logging.ERROR)

STREAM_NAME="STREAM_NAME"
STREAM_REGION="us-east-1"

session = boto3.Session(profile_name='some-profile')
client = session.client(service_name='kinesis')

if len(sys.argv) > 1:
    timestamp = sys.argv[1]
else:
    timestamp = "2017-07-17T17:00:00.000-00:00"

# Get shard iterator dict
shardId="shardId-000000000001"
d = client.get_shard_iterator(ShardIteratorType="AT_TIMESTAMP", ShardId=shardId, Timestamp=timestamp, StreamName='tvmetrix')
iterator = d.get('ShardIterator')
nextIterator = iterator

# Debug stuff files
file_name_debug = "debug_file_{}.debug".format(strftime("%Y-%m-%d_%H-%M-%S", gmtime()))
debug_file = open(file_name_debug, 'w')
file_name_data = "data_file_{}.log".format(strftime("%Y-%m-%d_%H-%M-%S", gmtime()))
data_file = open(file_name_data, 'w')
print "Debug file will be: {}".format(file_name_debug)
print "Data file will be: {}".format(file_name_data)

counter = 0
while counter < max_records:
    try:
        response = client.get_records(ShardIterator=nextIterator, Limit=20)
        logging.debug(response)
        print response
        debug_file.write(str(response) + '\n')
        records = response.get('Records')
        nextIterator = response.get("NextShardIterator")
        print "There are {} records".format(len(records))
        counter += len(records)
        for record in records:
            print "*" * 10
            print "Record"
            print "-" * 10
            data = record.get('Data')
            tokens = data.split('\n')
            for token in tokens:
                data_dict = json.loads(token)
                ### read stuff
    except ValueError as valueError:
        print valueError
    except botocore.exceptions.ClientError as ce:
        print "*" * 6, "WARNING", "*" * 6
        print ce
        print "Limit exceeded"
        print "*" * 6, "WARNING", "*" * 6
        break
debug_file.close()
data_file.close()