Skip to content

Commit

Permalink
Merge pull request #17 from blockchain-etl/feature/streaming_refactoring
Browse files Browse the repository at this point in the history
Feature/streaming refactoring
  • Loading branch information
medvedev1088 authored Apr 15, 2019
2 parents 6d2b444 + 81e2f49 commit 384dbe5
Show file tree
Hide file tree
Showing 18 changed files with 394 additions and 186 deletions.
1 change: 1 addition & 0 deletions .dockerignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
.*
last_synced_block.txt
pid.txt
output
7 changes: 6 additions & 1 deletion Dockerfile_with_streaming
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,9 @@ WORKDIR /$PROJECT_DIR
COPY . .
RUN pip install --upgrade pip && pip install -e /$PROJECT_DIR/[streaming]

ENTRYPOINT ["python", "bitcoinetl"]
# Add Tini
ENV TINI_VERSION v0.18.0
ADD https://github.com/krallin/tini/releases/download/${TINI_VERSION}/tini /tini
RUN chmod +x /tini

ENTRYPOINT ["/tini", "--", "python", "bitcoinetl"]
6 changes: 4 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ Stream blockchain data continually to Google Pub/Sub:

```bash
> export GOOGLE_APPLICATION_CREDENTIALS=/path_to_credentials_file.json
> bitcoinetl stream -p http://user:pass@localhost:8332 --start-block 500000 --output projects/your-project/topics/bitcoin_blockchain
> bitcoinetl stream -p http://user:pass@localhost:8332 --start-block 500000 --output projects/your-project/topics/crypto_bitcoin

```

Expand Down Expand Up @@ -291,7 +291,9 @@ You can tune `--export-batch-size`, `--max-workers` for performance.

- This command outputs blocks and transactions to the console by default.
- Use `--output` option to specify the Google Pub/Sub topic where to publish blockchain data,
e.g. `projects/your-project/topics/bitcoin_blockchain`.
e.g. `projects/your-project/topics/crypto_bitcoin`. Blocks and transactions will be pushed to
`projects/your-project/topics/crypto_bitcoin.blocks` and `projects/your-project/topics/crypto_bitcoin.transactions`
topics.
- The command saves its state to `last_synced_block.txt` file where the last synced block number is saved periodically.
- Specify either `--start-block` or `--last-synced-block-file` option. `--last-synced-block-file` should point to the
file where the block number, from which to start streaming the blockchain data, is saved.
Expand Down
2 changes: 1 addition & 1 deletion bitcoinetl/cli/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@


@click.group()
@click.version_option(version='1.2.0')
@click.version_option(version='1.2.1')
@click.pass_context
def cli(ctx):
pass
Expand Down
30 changes: 21 additions & 9 deletions bitcoinetl/cli/stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,13 @@
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.


import click

from bitcoinetl.enumeration.chain import Chain
from bitcoinetl.rpc.bitcoin_rpc import BitcoinRpc

from blockchainetl.logging_utils import logging_basic_config
from blockchainetl.streaming.streaming_utils import configure_logging, configure_signals
from blockchainetl.thread_local_proxy import ThreadLocalProxy

logging_basic_config()
Expand All @@ -41,25 +42,36 @@
'If not specified will print to console')
@click.option('-s', '--start-block', default=None, type=int, help='Start block')
@click.option('-c', '--chain', default=Chain.BITCOIN, type=click.Choice(Chain.ALL), help='The type of chain')
@click.option('-s', '--period-seconds', default=10, type=int, help='How many seconds to sleep between syncs')
@click.option('--period-seconds', default=10, type=int, help='How many seconds to sleep between syncs')
@click.option('-b', '--batch-size', default=2, type=int, help='How many blocks to batch in single request')
@click.option('-B', '--block-batch-size', default=10, type=int, help='How many blocks to batch in single sync round')
@click.option('-w', '--max-workers', default=5, type=int, help='The number of workers')
@click.option('--log-file', default=None, type=str, help='Log file')
@click.option('--pid-file', default=None, type=str, help='pid file')
def stream(last_synced_block_file, lag, provider_uri, output, start_block, chain=Chain.BITCOIN,
period_seconds=10, batch_size=2, block_batch_size=10, max_workers=5):
period_seconds=10, batch_size=2, block_batch_size=10, max_workers=5, log_file=None, pid_file=None):
"""Streams all data types to console or Google Pub/Sub."""
configure_logging(log_file)
configure_signals()

from bitcoinetl.streaming.streaming_utils import get_item_exporter
from bitcoinetl.streaming.stream import stream as do_stream
from bitcoinetl.streaming.btc_streamer_adapter import BtcStreamerAdapter
from blockchainetl.streaming.streamer import Streamer

do_stream(
streamer_adapter = BtcStreamerAdapter(
bitcoin_rpc=ThreadLocalProxy(lambda: BitcoinRpc(provider_uri)),
item_exporter=get_item_exporter(output),
chain=chain,
batch_size=batch_size,
max_workers=max_workers
)
streamer = Streamer(
blockchain_streamer_adapter=streamer_adapter,
last_synced_block_file=last_synced_block_file,
lag=lag,
item_exporter=get_item_exporter(output),
start_block=start_block,
chain=chain,
period_seconds=period_seconds,
batch_size=batch_size,
block_batch_size=block_batch_size,
max_workers=max_workers
pid_file=pid_file,
)
streamer.stream()
95 changes: 95 additions & 0 deletions bitcoinetl/streaming/btc_streamer_adapter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
# MIT License
#
# Copyright (c) 2018 Evgeny Medvedev, evge.medvedev@gmail.com
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.


import logging

from bitcoinetl.enumeration.chain import Chain
from bitcoinetl.jobs.enrich_transactions import EnrichTransactionsJob
from bitcoinetl.jobs.export_blocks_job import ExportBlocksJob
from bitcoinetl.service.btc_service import BtcService
from blockchainetl.jobs.exporters.console_item_exporter import ConsoleItemExporter
from blockchainetl.jobs.exporters.in_memory_item_exporter import InMemoryItemExporter


class BtcStreamerAdapter:
def __init__(
self,
bitcoin_rpc,
item_exporter=ConsoleItemExporter(),
chain=Chain.BITCOIN,
batch_size=2,
max_workers=5):
self.bitcoin_rpc = bitcoin_rpc
self.chain = chain
self.btc_service = BtcService(bitcoin_rpc, chain)
self.item_exporter = item_exporter
self.batch_size = batch_size
self.max_workers = max_workers

def open(self):
self.item_exporter.open()

def get_current_block_number(self):
return int(self.btc_service.get_latest_block().number)

def export_all(self, start_block, end_block):
# Export blocks and transactions
blocks_and_transactions_item_exporter = InMemoryItemExporter(item_types=['block', 'transaction'])

blocks_and_transactions_job = ExportBlocksJob(
start_block=start_block,
end_block=end_block,
batch_size=self.batch_size,
bitcoin_rpc=self.bitcoin_rpc,
max_workers=self.max_workers,
item_exporter=blocks_and_transactions_item_exporter,
chain=self.chain,
export_blocks=True,
export_transactions=True
)
blocks_and_transactions_job.run()

blocks = blocks_and_transactions_item_exporter.get_items('block')
transactions = blocks_and_transactions_item_exporter.get_items('transaction')

# Enrich transactions
enriched_transactions_item_exporter = InMemoryItemExporter(item_types=['transaction'])

enrich_transactions_job = EnrichTransactionsJob(
transactions_iterable=transactions,
batch_size=self.batch_size,
bitcoin_rpc=self.bitcoin_rpc,
max_workers=self.max_workers,
item_exporter=enriched_transactions_item_exporter,
chain=self.chain
)
enrich_transactions_job.run()
enriched_transactions = enriched_transactions_item_exporter.get_items('transaction')
if len(enriched_transactions) != len(transactions):
raise ValueError('The number of transactions is wrong ' + str(transactions))

logging.info('Exporting with ' + type(self.item_exporter).__name__)
self.item_exporter.export_items(blocks + enriched_transactions)

def close(self):
self.item_exporter.close()
145 changes: 0 additions & 145 deletions bitcoinetl/streaming/stream.py

This file was deleted.

3 changes: 1 addition & 2 deletions blockchainetl/jobs/exporters/console_item_exporter.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
# SOFTWARE.

import json
import logging


class ConsoleItemExporter:
Expand All @@ -33,7 +32,7 @@ def export_items(self, items):
self.export_item(item)

def export_item(self, item):
logging.info(json.dumps(item))
print(json.dumps(item))

def close(self):
pass
Loading

0 comments on commit 384dbe5

Please sign in to comment.