Skip to content

Commit

Permalink
first commit
Browse files Browse the repository at this point in the history
  • Loading branch information
adelinaenache committed Jan 10, 2018
0 parents commit 9c7e87d
Show file tree
Hide file tree
Showing 7 changed files with 333 additions and 0 deletions.
104 changes: 104 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*$py.class

# C extensions
*.so

# Distribution / packaging
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
*.egg-info/
.installed.cfg
*.egg
MANIFEST

# PyInstaller
# Usually these files are written by a python script from a template
# before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
*.spec

# Installer logs
pip-log.txt
pip-delete-this-directory.txt

# Unit test / coverage reports
htmlcov/
.tox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*.cover
.hypothesis/

# Translations
*.mo
*.pot

# Django stuff:
*.log
.static_storage/
.media/
local_settings.py

# Flask stuff:
instance/
.webassets-cache

# Scrapy stuff:
.scrapy

# Sphinx documentation
docs/_build/

# PyBuilder
target/

# Jupyter Notebook
.ipynb_checkpoints

# pyenv
.python-version

# celery beat schedule file
celerybeat-schedule

# SageMath parsed files
*.sage.py

# Environments
.env
.venv
env/
venv/
ENV/
env.bak/
venv.bak/

# Spyder project settings
.spyderproject
.spyproject

# Rope project settings
.ropeproject

# mkdocs documentation
/site

# mypy
.mypy_cache/
23 changes: 23 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
# Plugin - Stripe to S3

This plugin moves data from the [Stripe](https://stripe.com/docs/api) API to S3 based on the specified object

## Hooks
### StripeHook
This hook handles the authentication and request to Stripe. Based on [stripe-python](https://github.com/stripe/stripe-python) module.

### S3Hook
[Core Airflow S3Hook](https://pythonhosted.org/airflow/_modules/S3_hook.html) with the standard boto dependency.

## Operators
### StripeToS3Operator
This operator composes the logic for this plugin. It fetches the stripe specified object and saves the result in a S3 Bucket, under a specified key, in njson format. The parameters it can accept include the following.

- `stripe_conn_id`: The Stripe connection id from Airflow
- `stripe_object`: Stripe object to query. Tested for `BalanceTransaction`, `Charge`, `Coupon`, `Customer`, `Event`, `InvoiceItem`, `Invoice`, `Plan`, `Subscription`, `Transfer`
- `stripe_args`: *optional* dictionary with any extra arguments accepted by stripe-python module,
- `s3_conn_id`: S3 connection id from Airflow.
- `s3_bucket`: The output s3 bucket.
- `s3_key`: The input s3 key.
- `fields`: *optional* list of fields that you want to get from the object. If *None*, then this will get all fields for the object
- `replication_key_value`: *(optional)* value of the replication key, if needed. The operator will import only the objects created after the object with this id.
15 changes: 15 additions & 0 deletions __init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
from airflow.plugins_manager import AirflowPlugin
from stripe_plugin.operators.stripe_to_s3_operator import StripeToS3Operator
from stripe_plugin.hooks.stripe_hook import StripeHook


class stripe_plugin(AirflowPlugin):
name = "stripe_plugin"
operators = [StripeToS3Operator]
hooks = [StripeHook]
# Leave in for explicitness
executors = []
macros = []
admin_views = []
flask_blueprints = []
menu_links = []
Empty file added hooks/__init__.py
Empty file.
63 changes: 63 additions & 0 deletions hooks/stripe_hook.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
from airflow.hooks.base_hook import BaseHook
import json
import stripe


class StripeHook(BaseHook):
def __init__(
self,
conn_id,
*args,
**kwargs):
self.conn_id = conn_id
self._args = args
self._kwargs = kwargs

self.connection = None
self.extras = None
self.stripe = None

def get_conn(self):
"""
Initialize a stripe instance.
"""
if self.stripe:
return self.stripe

self.connection = self.get_connection(self.conn_id)
self.extras = self.connection.extra_dejson

stripe.api_key = self.extras['api_key']
self.stripe = stripe

return stripe

def run_query(self, model, replication_key_value=None, **kwargs):
"""
Run a query against stripe
:param model: name of the Stripe model
:param replication_key_value: Stripe replicaton key value
"""
stripe_instance = self.get_conn()
stripe_model = getattr(stripe_instance, model)

method_to_call = 'list'
if model is 'BalanceHistory':
method_to_call = 'all'
if replication_key_value:
stripe_response = getattr(stripe_model, method_to_call)(
ending_before=replication_key_value, **kwargs)
else:
stripe_response = getattr(stripe_model, method_to_call)(**kwargs)

for res in stripe_response.auto_paging_iter():
yield res

def get_records(self, sql):
pass

def get_pandas_df(self, sql):
pass

def run(self, sql):
pass
Empty file added operators/__init__.py
Empty file.
128 changes: 128 additions & 0 deletions operators/stripe_to_s3_operator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
import logging
import json
import collections
from airflow.hooks.S3_hook import S3Hook
from airflow.models import BaseOperator
from airflow.utils.decorators import apply_defaults
from stripe_plugin.hooks.stripe_hook import StripeHook
from tempfile import NamedTemporaryFile


class StripeToS3Operator(BaseOperator):
"""
Make a query against Stripe and write the resulting data to s3
"""
template_field = ('s3_key', )

@apply_defaults
def __init__(self,
stripe_conn_id,
stripe_object,
stripe_args={},
s3_conn_id=None,
s3_key=None,
s3_bucket=None,
fields=None,
replication_key_value=0,
*args,
**kwargs
):
"""
Initialize the operator
:param stripe_conn_id: name of the Airflow connection that has
your Stripe username, password and user_key
:param stripe_object: name of the Stripe object we are
fetching data from
:param stripe_args *(optional)* dictionary with extra stripe
arguments
:param s3_conn_id: name of the Airflow connection that has
your Amazon S3 conection params
:param s3_bucket: name of the destination S3 bucketcd
:param s3_key: name of the destination file from bucket
:param fields: *(optional)* list of fields that you want
to get from the object.
If *None*, then this will get all fields
for the object
:param replication_key_value: *(optional)* value of the replication key,
if needed. The operator will import only
results with the id grater than the value of
this param.
"""

super().__init__(*args, **kwargs)

self.stripe_conn_id = stripe_conn_id
self.stripe_object = stripe_object
self.stripe_args = stripe_args

self.s3_conn_id = s3_conn_id
self.s3_bucket = s3_bucket
self.s3_key = s3_key

self.fields = fields
self.replication_key_value = replication_key_value
self._kwargs = kwargs

def filter_fields(self, result):
"""
Filter the fields from an resulting object.
This will return a object only with fields given
as parameter in the constructor.
All fields are returned when "fields" param is None.
"""
if not self.fields:
return result
obj = {}
for field in self.fields:
obj[field] = result[field]
return obj

def execute(self, context):
"""
Execute the operator.
This will get all the data for a particular Stripe model
and write it to a file.
"""
logging.info("Prepping to gather data from Stripe")
hook = StripeHook(
conn_id=self.stripe_conn_id
)

# attempt to connect to Stripe
# if this process fails, it will raise an error and die right here
# we could wrap it
hook.get_conn()

logging.info(
"Making request for"
" {0} object".format(self.stripe_object)
)

results = hook.run_query(
self.stripe_object,
self.replication_key_value,
**self.stripe_args)


# write the results to a temporary file and save that file to s3
with NamedTemporaryFile("w") as tmp:
for result in results:
filtered_result = self.filter_fields(result)
tmp.write(json.dumps(filtered_result) + '\n')

tmp.flush()

dest_s3 = S3Hook(s3_conn_id=self.s3_conn_id)
dest_s3.load_file(
filename=tmp.name,
key=self.s3_key,
bucket_name=self.s3_bucket,
replace=True

)
dest_s3.connection.close()
tmp.close()

logging.info("Query finished!")

0 comments on commit 9c7e87d

Please sign in to comment.