From 9d2b1dd2de440fb9ef8ca547c3dfcc097e3fb68d Mon Sep 17 00:00:00 2001 From: Piper Merriam Date: Mon, 9 Jan 2017 23:25:48 -0700 Subject: [PATCH] dirty --- .travis.yml | 9 +- conftest.py | 22 +- requirements-gevent.txt | 2 + setup.py | 7 +- tests/admin-module/test_admin_setSolc.py | 2 +- .../test_contract_on_event_filtering.py | 23 +- .../test_contract_past_event_filtering.py | 13 +- .../test_filter_against_latest_blocks.py | 8 +- ...est_filter_against_pending_transactions.py | 8 +- .../test_filter_against_transaction_logs.py | 8 +- tests/mining-module/test_miner_setExtra.py | 8 +- tests/mining-module/test_miner_setGasPrice.py | 8 +- tests/mining-module/test_miner_start.py | 9 +- tests/mining-module/test_miner_stop.py | 9 +- tests/providers/conftest.py | 5 +- tox.ini | 8 +- web3/providers/ipc.py | 20 +- web3/providers/manager.py | 7 +- web3/providers/rpc.py | 197 ++++++------------ web3/providers/tester.py | 32 ++- web3/utils/async/__init__.py | 6 + web3/utils/async/gevent_async.py | 13 ++ web3/utils/async/http/__init__.py | 11 + web3/utils/async/http/gevent_http.py | 47 +++++ web3/utils/async/http/requests_http.py | 8 + web3/utils/async/stdlib_async.py | 6 + web3/utils/compat/__init__.py | 13 ++ web3/utils/compat/compat_py2.py | 4 + web3/utils/compat/compat_py3.py | 4 + web3/utils/functional.py | 1 + web3/utils/http.py | 8 + 31 files changed, 305 insertions(+), 221 deletions(-) create mode 100644 requirements-gevent.txt create mode 100644 web3/utils/async/http/__init__.py create mode 100644 web3/utils/async/http/gevent_http.py create mode 100644 web3/utils/async/http/requests_http.py create mode 100644 web3/utils/compat/__init__.py create mode 100644 web3/utils/compat/compat_py2.py create mode 100644 web3/utils/compat/compat_py3.py create mode 100644 web3/utils/http.py diff --git a/.travis.yml b/.travis.yml index a159201..35f407e 100644 --- a/.travis.yml +++ b/.travis.yml @@ -9,9 +9,12 @@ before_install: - travis_retry sudo apt-get update env: matrix: - - TOX_ENV=py27-all - - TOX_ENV=py34-all - - TOX_ENV=py35-all + - TOX_ENV=py27-all-stdlib + - TOX_ENV=py34-all-stdlib + - TOX_ENV=py35-all-stdlib + - TOX_ENV=py27-all-gevent + - TOX_ENV=py34-all-gevent + - TOX_ENV=py35-all-gevent - TOX_ENV=flake8 cache: pip: true diff --git a/conftest.py b/conftest.py index cb9aace..429782a 100644 --- a/conftest.py +++ b/conftest.py @@ -1,15 +1,10 @@ import pytest -import gevent - from web3.providers.tester import ( EthereumTesterProvider, TestRPCProvider, ) -from web3.utils.async import ( - Timeout, - sleep, -) +from web3.utils import async from web3.main import Web3 @@ -53,9 +48,10 @@ def skip_if_testrpc(): def wait_for_miner_start(): def _wait_for_miner_start(web3, timeout=60): poll_delay_counter = PollDelayCounter() - with Timeout(timeout): + with async.Timeout(timeout) as timeout: while not web3.eth.mining or not web3.eth.hashrate: - sleep(poll_delay_counter()) + async.sleep(poll_delay_counter()) + timeout.check() return _wait_for_miner_start @@ -63,13 +59,14 @@ def wait_for_miner_start(): def wait_for_block(): def _wait_for_block(web3, block_number=1, timeout=60 * 10): poll_delay_counter = PollDelayCounter() - with Timeout(timeout): + with async.Timeout(timeout) as timeout: while True: if web3.eth.blockNumber >= block_number: break if isinstance(web3.currentProvider, (TestRPCProvider, EthereumTesterProvider)): web3._requestManager.request_blocking("evm_mine", []) - sleep(poll_delay_counter()) + async.async.sleep(poll_delay_counter()) + timeout.check() return _wait_for_block @@ -77,12 +74,13 @@ def wait_for_block(): def wait_for_transaction(): def _wait_for_transaction(web3, txn_hash, timeout=120): poll_delay_counter = PollDelayCounter() - with Timeout(timeout): + with async.Timeout(timeout) as timeout: while True: txn_receipt = web3.eth.getTransactionReceipt(txn_hash) if txn_receipt is not None: break - sleep(poll_delay_counter()) + async.sleep(poll_delay_counter()) + timeout.check() return txn_receipt return _wait_for_transaction diff --git a/requirements-gevent.txt b/requirements-gevent.txt new file mode 100644 index 0000000..4bc3ad5 --- /dev/null +++ b/requirements-gevent.txt @@ -0,0 +1,2 @@ +gevent>=1.1.1,<1.2.0 +geventhttpclient>=1.3.1 diff --git a/setup.py b/setup.py index 0c50c89..bc656c4 100644 --- a/setup.py +++ b/setup.py @@ -26,14 +26,17 @@ setup( install_requires=[ "pysha3>=0.3", "rlp>=0.4.6,<0.4.7", - "gevent>=1.1.1,<1.2.0", - "geventhttpclient>=1.3.1", "ethereum-abi-utils>=0.2.1", "pysha3>=0.3", "pylru>=1.0.9", ], extras_require={ 'Tester': ["eth-testrpc>=0.9.3"], + 'tester': ["eth-testrpc>=0.9.3"], + 'gevent': [ + "gevent>=1.1.1,<1.2.0", + "geventhttpclient>=1.3.1", + ], }, py_modules=['web3'], license="MIT", diff --git a/tests/admin-module/test_admin_setSolc.py b/tests/admin-module/test_admin_setSolc.py index dc20006..7da080f 100644 --- a/tests/admin-module/test_admin_setSolc.py +++ b/tests/admin-module/test_admin_setSolc.py @@ -1,6 +1,6 @@ import pytest -from gevent import subprocess +from web3.utils.async import subprocess from web3.utils.string import force_text diff --git a/tests/filtering/test_contract_on_event_filtering.py b/tests/filtering/test_contract_on_event_filtering.py index 8010675..e7298a7 100644 --- a/tests/filtering/test_contract_on_event_filtering.py +++ b/tests/filtering/test_contract_on_event_filtering.py @@ -1,7 +1,8 @@ import pytest -import gevent from flaky import flaky +from web3.utils import async + @flaky(max_runs=3) @pytest.mark.parametrize('call_as_instance', (True, False)) @@ -21,9 +22,10 @@ def test_on_filter_using_get_interface(web3, txn_hash = emitter.transact().logNoArgs(emitter_event_ids.LogNoArguments) txn_receipt = wait_for_transaction(web3, txn_hash) - with gevent.Timeout(30): + with async.Timeout(30) as timeout: while not filter.get(False): - gevent.sleep(sleep_interval()) + async.sleep(sleep_interval()) + timeout.check() log_entries = filter.get() @@ -51,9 +53,10 @@ def test_on_filter_with_only_event_name(web3, txn_hash = emitter.transact().logNoArgs(emitter_event_ids.LogNoArguments) txn_receipt = wait_for_transaction(web3, txn_hash) - with gevent.Timeout(30): + with async.Timeout(30) as timeout: while not seen_logs: - gevent.sleep(sleep_interval()) + async.sleep(sleep_interval()) + timeout.check() filter.stop_watching(30) @@ -95,9 +98,10 @@ def test_on_filter_with_event_name_and_single_argument(web3, for txn_hash in txn_hashes: wait_for_transaction(web3, txn_hash) - with gevent.Timeout(30): + with async.Timeout(30) as timeout: while len(seen_logs) < 2: - gevent.sleep(sleep_interval()) + async.sleep(sleep_interval()) + timeout.check() filter.stop_watching(30) @@ -139,9 +143,10 @@ def test_on_filter_with_event_name_and_non_indexed_argument(web3, for txn_hash in txn_hashes: wait_for_transaction(web3, txn_hash) - with gevent.Timeout(30): + with async.Timeout(30) as timeout: while not seen_logs: - gevent.sleep(sleep_interval()) + async.sleep(sleep_interval()) + timeout.check() filter.stop_watching(30) diff --git a/tests/filtering/test_contract_past_event_filtering.py b/tests/filtering/test_contract_past_event_filtering.py index 30c9a01..7839ba6 100644 --- a/tests/filtering/test_contract_past_event_filtering.py +++ b/tests/filtering/test_contract_past_event_filtering.py @@ -1,8 +1,9 @@ import pytest import random -import gevent from flaky import flaky +from web3.utils import async + @flaky(max_runs=3) @pytest.mark.parametrize('call_as_instance', (True, False)) @@ -25,9 +26,10 @@ def test_past_events_filter_with_callback(web3, else: filter = Emitter.pastEvents('LogNoArguments', {}, seen_logs.append) - with gevent.Timeout(30): + with async.Timeout(30) as timeout: while not seen_logs: - gevent.sleep(sleep_interval()) + async.sleep(sleep_interval()) + timeout.check() filter.stop_watching(30) @@ -60,9 +62,10 @@ def test_past_events_filter_using_get_api(web3, else: filter = Emitter.pastEvents('LogNoArguments') - with gevent.Timeout(30): + with async.Timeout(30) as timeout: while not filter.get(False): - gevent.sleep(sleep_interval()) + async.sleep(sleep_interval()) + timeout.check() log_entries = filter.get(False) diff --git a/tests/filtering/test_filter_against_latest_blocks.py b/tests/filtering/test_filter_against_latest_blocks.py index 2be110b..efaa988 100644 --- a/tests/filtering/test_filter_against_latest_blocks.py +++ b/tests/filtering/test_filter_against_latest_blocks.py @@ -1,7 +1,8 @@ import random -import gevent from flaky import flaky +from web3.utils import async + @flaky(max_runs=3) def test_filter_against_latest_blocks(web3, sleep_interval, wait_for_block, skip_if_testrpc): @@ -15,9 +16,10 @@ def test_filter_against_latest_blocks(web3, sleep_interval, wait_for_block, skip wait_for_block(web3, current_block + 3) - with gevent.Timeout(5): + with async.Timeout(5) as timeout: while len(seen_blocks) < 2: - gevent.sleep(sleep_interval()) + async.sleep(sleep_interval()) + timeout.check() txn_filter.stop_watching(3) diff --git a/tests/filtering/test_filter_against_pending_transactions.py b/tests/filtering/test_filter_against_pending_transactions.py index be4d432..66f9276 100644 --- a/tests/filtering/test_filter_against_pending_transactions.py +++ b/tests/filtering/test_filter_against_pending_transactions.py @@ -1,7 +1,8 @@ import random -import gevent from flaky import flaky +from web3.utils import async + @flaky(max_runs=3) def test_filter_against_pending_transactions(web3_empty, @@ -28,9 +29,10 @@ def test_filter_against_pending_transactions(web3_empty, wait_for_transaction(web3, txn_1_hash) wait_for_transaction(web3, txn_2_hash) - with gevent.Timeout(5): + with async.Timeout(5) as timeout: while not seen_txns: - gevent.sleep(random.random()) + async.sleep(random.random()) + timeout.check() txn_filter.stop_watching(30) diff --git a/tests/filtering/test_filter_against_transaction_logs.py b/tests/filtering/test_filter_against_transaction_logs.py index 4140770..5ce3966 100644 --- a/tests/filtering/test_filter_against_transaction_logs.py +++ b/tests/filtering/test_filter_against_transaction_logs.py @@ -1,7 +1,8 @@ import random -import gevent from flaky import flaky +from web3.utils import async + @flaky(max_runs=3) def test_filter_against_log_events(web3_empty, @@ -22,9 +23,10 @@ def test_filter_against_log_events(web3_empty, for txn_hash in txn_hashes: wait_for_transaction(web3, txn_hash) - with gevent.Timeout(5): + with async.Timeout(5) as timeout: while not seen_logs: - gevent.sleep(random.random()) + async.sleep(random.random()) + timeout.check() txn_filter.stop_watching(30) diff --git a/tests/mining-module/test_miner_setExtra.py b/tests/mining-module/test_miner_setExtra.py index b608a03..5a25679 100644 --- a/tests/mining-module/test_miner_setExtra.py +++ b/tests/mining-module/test_miner_setExtra.py @@ -2,9 +2,8 @@ import random from flaky import flaky -import gevent - from web3.utils.encoding import decode_hex +from web3.utils import async @flaky(max_runs=3) @@ -20,12 +19,13 @@ def test_miner_setExtra(web3_empty, wait_for_block): web3.miner.setExtra(new_extra_data) - with gevent.Timeout(60): + with async.Timeout(60) as timeout: while True: extra_data = decode_hex(web3.eth.getBlock(web3.eth.blockNumber)['extraData']) if extra_data == new_extra_data: break - gevent.sleep(random.random()) + async.sleep(random.random()) + timeout.check() after_extra = decode_hex(web3.eth.getBlock(web3.eth.blockNumber)['extraData']) diff --git a/tests/mining-module/test_miner_setGasPrice.py b/tests/mining-module/test_miner_setGasPrice.py index a68863e..8346d44 100644 --- a/tests/mining-module/test_miner_setGasPrice.py +++ b/tests/mining-module/test_miner_setGasPrice.py @@ -1,8 +1,9 @@ import random -import gevent from flaky import flaky +from web3.utils import async + @flaky(max_runs=3) def test_miner_setGasPrice(web3_empty, wait_for_block): @@ -15,9 +16,10 @@ def test_miner_setGasPrice(web3_empty, wait_for_block): web3.miner.setGasPrice(initial_gas_price // 2) - with gevent.Timeout(60): + with async.Timeout(60) as timeout: while web3.eth.gasPrice == initial_gas_price: - gevent.sleep(random.random()) + async.sleep(random.random()) + timeout.check() after_gas_price = web3.eth.gasPrice assert after_gas_price < initial_gas_price diff --git a/tests/mining-module/test_miner_start.py b/tests/mining-module/test_miner_start.py index 65f1ace..38cb279 100644 --- a/tests/mining-module/test_miner_start.py +++ b/tests/mining-module/test_miner_start.py @@ -1,9 +1,9 @@ import random -import gevent - from flaky import flaky +from web3.utils import async + @flaky(max_runs=3) def test_miner_start(web3_empty, wait_for_miner_start): @@ -15,9 +15,10 @@ def test_miner_start(web3_empty, wait_for_miner_start): web3.miner.stop() - with gevent.Timeout(60): + with async.Timeout(60) as timeout: while web3.eth.mining or web3.eth.hashrate: - gevent.sleep(random.random()) + async.sleep(random.random()) + timeout.check() assert not web3.eth.mining assert not web3.miner.hashrate diff --git a/tests/mining-module/test_miner_stop.py b/tests/mining-module/test_miner_stop.py index 0735217..3847c6f 100644 --- a/tests/mining-module/test_miner_stop.py +++ b/tests/mining-module/test_miner_stop.py @@ -1,9 +1,9 @@ import random -import gevent - from flaky import flaky +from web3.utils import async + @flaky(max_runs=3) def test_miner_stop(web3_empty): @@ -14,9 +14,10 @@ def test_miner_stop(web3_empty): web3.miner.stop() - with gevent.Timeout(60): + with async.Timeout(60) as timeout: while web3.eth.mining or web3.eth.hashrate: - gevent.sleep(random.random()) + async.sleep(random.random()) + timeout.check() assert not web3.eth.mining assert not web3.miner.hashrate diff --git a/tests/providers/conftest.py b/tests/providers/conftest.py index 9dbf2f3..536fe00 100644 --- a/tests/providers/conftest.py +++ b/tests/providers/conftest.py @@ -1,7 +1,5 @@ import pytest -from gevent import socket - from web3.providers.ipc import IPCProvider from web3.providers.rpc import ( @@ -11,10 +9,11 @@ from web3.providers.rpc import ( from web3.providers.tester import ( TestRPCProvider, ) +from web3.utils import async def get_open_port(): - s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + s = async.socket.socket(socket.AF_INET, socket.SOCK_STREAM) s.bind(("", 0)) s.listen(1) port = s.getsockname()[1] diff --git a/tox.ini b/tox.ini index e22a0de..3c01223 100644 --- a/tox.ini +++ b/tox.ini @@ -1,6 +1,6 @@ [tox] envlist= - py{27,34,35}-{all,admin,eth,mining,providers,version,contracts,filtering,net,txpool,db,managers,personal,utilities} + py{27,34,35}-{all,admin,eth,mining,providers,version,contracts,filtering,net,txpool,db,managers,personal,utilities}-{gevent-stdlib} flake8 [flake8] @@ -27,7 +27,11 @@ commands= testing: py.test {posargs:tests/testing-module} utilities: py.test {posargs:tests/utilities} deps = - -r{toxinidir}/requirements-dev.txt + stdlib: -r{toxinidir}/requirements-dev.txt + gevent: -r{toxinidir}/requirements-gevent.txt +setenv = + gevent: WEB3_ASYNC_GEVENT=True + TESTRPC_ASYNC_GEVENT=True basepython = py27: python2.7 py34: python3.4 diff --git a/web3/providers/ipc.py b/web3/providers/ipc.py index b3009a8..139889d 100644 --- a/web3/providers/ipc.py +++ b/web3/providers/ipc.py @@ -10,10 +10,11 @@ try: except ImportError: JSONDecodeError = ValueError -import gevent -from gevent import socket -from gevent.threading import Lock - +from web3.utils import async +from web3.utils.async import ( + threading, + socket, +) from web3.utils.string import ( force_text, ) @@ -74,7 +75,7 @@ class IPCProvider(JSONBaseProvider): else: self.ipc_path = ipc_path - self._lock = Lock() + self._lock = threading.Lock() super(IPCProvider, self).__init__(*args, **kwargs) def make_request(self, method, params): @@ -87,21 +88,22 @@ class IPCProvider(JSONBaseProvider): sock.sendall(request) response_raw = b"" - with gevent.Timeout(10): + with async.Timeout(10) as timeout: while True: + timeout.check() try: response_raw += sock.recv(4096) except socket.timeout: - gevent.sleep(0) + async.sleep(0) continue if response_raw == b"": - gevent.sleep(0) + async.sleep(0) else: try: json.loads(force_text(response_raw)) except JSONDecodeError: - gevent.sleep(0) + async.sleep(0) continue else: break diff --git a/web3/providers/manager.py b/web3/providers/manager.py index 4203c35..769a051 100644 --- a/web3/providers/manager.py +++ b/web3/providers/manager.py @@ -2,8 +2,6 @@ import uuid import json import collections -import gevent - import rlp from web3.utils.crypto import sha3 @@ -24,6 +22,7 @@ from web3.utils.transactions import ( serialize_transaction, add_signature_to_transaction, ) +from web3.utils import async class RequestManager(object): @@ -52,7 +51,7 @@ class RequestManager(object): def request_async(self, method, params): request_id = uuid.uuid4() - self.pending_requests[request_id] = gevent.spawn( + self.pending_requests[request_id] = async.spawn( self.request_blocking, method, params, @@ -65,8 +64,6 @@ class RequestManager(object): except KeyError: raise KeyError("Request for id:{0} not found".format(request_id)) else: - if timeout is not None: - timeout = gevent.Timeout(timeout).start() response_raw = request.get(timeout=timeout) response = json.loads(response_raw) diff --git a/web3/providers/rpc.py b/web3/providers/rpc.py index 3dd9399..0b925d8 100644 --- a/web3/providers/rpc.py +++ b/web3/providers/rpc.py @@ -1,23 +1,54 @@ -import contextlib -from geventhttpclient import HTTPClient import logging -import pylru - from .base import JSONBaseProvider # noqa: E402 +from web3.utils.compat import urlunparse +from web3.utils.http import construct_user_agent +from web3.utils.functional import cast_return_to_dict +from web3.utils.async.http import make_post_request + logger = logging.getLogger(__name__) -class RPCProvider(JSONBaseProvider): - """Create a RPC client. +class HTTPProvider(JSONBaseProvider): + endpoint_uri = None + _request_args = None + _request_kwargs = None - .. note :: + def __init__(self, endpoint_uri, request_kwargs=None): + self.endpoint_uri = endpoint_uri + self._request_kwargs = request_kwargs or {} - You should preferably create only one client per process, - or otherwise underlying HTTP network connections may leak. + def __str__(self): + return "RPC connection {0}".format(self.endpoint_uri) + @cast_return_to_dict + def get_request_kwargs(self): + if 'headers' not in self._request_kwargs: + yield 'headers', self.get_request_headers() + for key, value in self._request_kwargs: + yield key, value + + def get_request_headers(self): + return { + 'Content-Type': 'application/json', + 'User-Agent': construct_user_agent(str(type(self))), + } + + def make_request(self, method, params): + request_data = self.encode_rpc_request(method, params) + response = make_post_request( + self.endpoint, + request_data, + **self.get_request_kwargs() + ) + return response + + +class RPCProvider(HTTPProvider): + """ + Deprecated: Use HTTPProvider instead. """ def __init__(self, host="127.0.0.1", @@ -26,62 +57,28 @@ class RPCProvider(JSONBaseProvider): ssl=False, connection_timeout=10, network_timeout=10, - *args, **kwargs): - """Create a new RPC client. + netloc = "{0}:{1}".format(host, port) + endpoint_uri = urlunparse(( + 'https' if ssl else 'http', + netloc, + path, + '', + '', + '' + )) + request_kwargs = { + 'connection_timeout': connection_timeout, + 'network_timeout': network_timeout, + } + request_kwargs.update(kwargs) - :param connection_timeout: See :class:`geventhttpclient.HTTPClient` - - :param network_timeout: See :class:`geventhttpclient.HTTPClient` - """ - self.host = host - self.port = int(port) - self.path = path - self.ssl = ssl - self.connection_timeout = connection_timeout - self.network_timeout = network_timeout - - super(RPCProvider, self).__init__(*args, **kwargs) - - def __str__(self): - return "RPC connection {}:{}".format(self.host, self.port) - - def __repr__(self): - return self.__str__() - - def make_request(self, method, params): - from web3 import __version__ as web3_version - request_data = self.encode_rpc_request(method, params) - request_user_agent = 'Web3.py/{version}/{class_name}'.format( - version=web3_version, - class_name=type(self), - ) - client = HTTPClient( - host=self.host, - port=self.port, - ssl=self.ssl, - connection_timeout=self.connection_timeout, - network_timeout=self.network_timeout, - headers={ - 'Content-Type': 'application/json', - 'User-Agent': request_user_agent, - }, - ) - with contextlib.closing(client): - response = client.post(self.path, body=request_data) - response_body = response.read() - - return response_body + super(RPCProvider, self).__init__(endpoint_uri, request_kwargs) -_client_cache = pylru.lrucache(128) - - -class KeepAliveRPCProvider(JSONBaseProvider): - """RPC-provider that handles HTTP keep-alive connection correctly. - - HTTP client is recycled across requests. Create only one instance of - KeepAliveProvider per process. +class KeepAliveRPCProvider(RPCProvider): + """ + Deprecated: Use HTTPProvider instead. """ def __init__(self, host="127.0.0.1", @@ -91,72 +88,14 @@ class KeepAliveRPCProvider(JSONBaseProvider): connection_timeout=10, network_timeout=10, concurrency=10, - *args, **kwargs): - """Create a new RPC client with keep-alive connection pool. - - :param concurrency: See :class:`geventhttpclient.HTTPClient` - - :param connection_timeout: See :class:`geventhttpclient.HTTPClient` - - :param network_timeout: See :class:`geventhttpclient.HTTPClient` - """ - self.host = host - self.port = int(port) - self.path = path - self.ssl = ssl - self.connection_timeout = connection_timeout - self.network_timeout = network_timeout - self.concurrency = concurrency - - super(KeepAliveRPCProvider, self).__init__(*args, **kwargs) - - self.client = self.get_or_create_client() - - def get_or_create_client(self): - from web3 import __version__ as web3_version - global _client_cache - - key = "{}:{}".format(self.host, self.port) - - try: - # Get in-process client instance for this host - client = _client_cache[key] - logger.debug("Re-using HTTP client for RPC connection to %s", key) - except KeyError: - request_user_agent = 'Web3.py/{version}/{class_name}'.format( - version=web3_version, - class_name=type(self), - ) - - client = HTTPClient( - host=self.host, - port=self.port, - ssl=self.ssl, - connection_timeout=self.connection_timeout, - network_timeout=self.network_timeout, - concurrency=self.concurrency, - headers={ - 'Content-Type': 'application/json', - 'User-Agent': request_user_agent, - }, - ) - _client_cache[key] = client - logger.debug( - "Created new keep-alive HTTP client for RPC connection to %s", - key, - ) - - return client - - def __str__(self): - return "Keep-alive RPC connection {}:{}".format(self.host, self.port) - - def __repr__(self): - return self.__str__() - - def make_request(self, method, params): - request_data = self.encode_rpc_request(method, params) - response = self.client.post(self.path, body=request_data) - response_body = response.read() - return response_body + super(KeepAliveRPCProvider, self).__init__( + host, + port, + path, + ssl, + connection_timeout, + network_timeout, + concurrency=10, + **kwargs + ) diff --git a/web3/providers/tester.py b/web3/providers/tester.py index c47c866..4837165 100644 --- a/web3/providers/tester.py +++ b/web3/providers/tester.py @@ -1,9 +1,12 @@ import logging -import gevent +from web3.utils.async import ( + make_server, + spawn, +) from .base import BaseProvider # noqa: E402 -from .rpc import RPCProvider # noqa: E402 +from .rpc import HTTPProvider # noqa: E402 logger = logging.getLogger(__name__) @@ -21,13 +24,6 @@ class EthereumTesterProvider(BaseProvider): def __init__(self, *args, **kwargs): - """ - Create a new RPC client. - - :param connection_timeout: See :class:`geventhttpclient.HTTPClient` - - :param network_timeout: See :class:`geventhttpclient.HTTPClient` - """ if not is_testrpc_available(): raise Exception("`TestRPCProvider` requires the `eth-testrpc` package to be installed") from testrpc.rpc import RPCMethods @@ -53,11 +49,10 @@ class EthereumTesterProvider(BaseProvider): return True -class TestRPCProvider(RPCProvider): +class TestRPCProvider(HTTPProvider): def __init__(self, host="127.0.0.1", port=8545, *args, **kwargs): if not is_testrpc_available(): raise Exception("`TestRPCProvider` requires the `eth-testrpc` package to be installed") - from gevent.pywsgi import WSGIServer from testrpc.server import get_application try: @@ -67,13 +62,16 @@ class TestRPCProvider(RPCProvider): application = get_application() - self.server = WSGIServer( - (host, port), + self.server = make_server( + host, + port, application, - log=logger, - error_log=logger, + # TODO: what to do about the loggers + #log=logger, + #error_log=logger, ) - self.thread = gevent.spawn(self.server.serve_forever) + self.thread = spawn(self.server.serve_forever) + endpoint_uri = 'http://{0}:{1}'.format(host, port) - super(TestRPCProvider, self).__init__(host, str(port), *args, **kwargs) + super(TestRPCProvider, self).__init__(endpoint_uri, *args, **kwargs) diff --git a/web3/utils/async/__init__.py b/web3/utils/async/__init__.py index 938f4a6..e2178ad 100644 --- a/web3/utils/async/__init__.py +++ b/web3/utils/async/__init__.py @@ -5,9 +5,15 @@ if 'WEB3_ASYNC_GEVENT' in os.environ: from .gevent_async import ( # noqa Timeout, sleep, + socket, + threading, + make_server, ) else: from .stdlib_async import ( # noqa Timeout, sleep, + socket, + threading, + make_server, ) diff --git a/web3/utils/async/gevent_async.py b/web3/utils/async/gevent_async.py index 446652f..c3d5f72 100644 --- a/web3/utils/async/gevent_async.py +++ b/web3/utils/async/gevent_async.py @@ -1,4 +1,12 @@ import gevent +from gevent.pywsgi import ( # noqa: F401 + WSGIServer, +) +from gevent import ( # noqa: F401 + subprocess, + socket, + threading, +) sleep = gevent.sleep @@ -8,3 +16,8 @@ spawn = gevent.spawn class Timeout(gevent.Timeout): def check(self): pass + + +def make_server(host, port, application, *args, **kwargs): + server = WSGIServer((host, port), application, *args, **kwargs) + return server diff --git a/web3/utils/async/http/__init__.py b/web3/utils/async/http/__init__.py new file mode 100644 index 0000000..6f58bbc --- /dev/null +++ b/web3/utils/async/http/__init__.py @@ -0,0 +1,11 @@ +import os + + +if 'WEB3_ASYNC_GEVENT' in os.environ: + from .requests_http import ( + make_post_request, + ) +else: + from .gevent_http import ( # noqa: F401 + make_post_request, + ) diff --git a/web3/utils/async/http/gevent_http.py b/web3/utils/async/http/gevent_http.py new file mode 100644 index 0000000..6ed1b9b --- /dev/null +++ b/web3/utils/async/http/gevent_http.py @@ -0,0 +1,47 @@ +import collections + +import pylru + +from geventhttpclient import HTTPClient + +from web3.utils.compat import urlparse + + +_client_cache = pylru.lrucache(8) + + +def _get_client(host, port, **kwargs): + ordered_kwargs = collections.OrderedDict(sorted(kwargs.items())) + cache_key = '{0}:{1}:{2}'.format( + host, + port, + ':'.join(( + "{0}={1}".format(str(key), str(value)) + for key, value in ordered_kwargs + )) + ) + if cache_key not in _client_cache: + _client_cache[cache_key] = HTTPClient(host, port, **kwargs) + return _client_cache[cache_key] + + +def make_post_request(endpoint_uri, data, **kwargs): + url_parts = urlparse(endpoint_uri) + + host, _, port = url_parts.netloc.partition(':') + + if not port: + if url_parts.scheme == 'http': + port = 80 + elif url_parts.scheme == 'https': + port = 443 + else: + raise ValueError("Unsupported scheme: '{0}'".format(url_parts.scheme)) + + kwargs.setdefault('ssl', url_parts.scheme == 'https') + + client = _get_client(host, port, **kwargs) + response = client.post(url_parts.path, body=data) + response_body = response.read() + + return response_body diff --git a/web3/utils/async/http/requests_http.py b/web3/utils/async/http/requests_http.py new file mode 100644 index 0000000..97a5654 --- /dev/null +++ b/web3/utils/async/http/requests_http.py @@ -0,0 +1,8 @@ +import requests + + +def make_post_request(endpoint_uri, data, *args, **kwargs): + response = requests.post(endpoint_uri, data=data, *args, **kwargs) + response.raise_for_status() + + return response.content diff --git a/web3/utils/async/stdlib_async.py b/web3/utils/async/stdlib_async.py index 5bd0bb9..36b700a 100644 --- a/web3/utils/async/stdlib_async.py +++ b/web3/utils/async/stdlib_async.py @@ -1,5 +1,11 @@ +""" +A minimal implementation of the various gevent APIs used within this codebase. +""" import time import threading +import subprocess # noqa: F401 +import socket # noqa: F401 +from wsgiref.simple_server import make_server # noqa: F401 sleep = time.sleep diff --git a/web3/utils/compat/__init__.py b/web3/utils/compat/__init__.py new file mode 100644 index 0000000..76cc6ef --- /dev/null +++ b/web3/utils/compat/__init__.py @@ -0,0 +1,13 @@ +import sys + + +if sys.version_info.major == 2: + from .compat_py2 import ( + urlparse, + urlunparse, + ) +else: + from .compat_py3 import ( # noqa: #401 + urlparse, + urlunparse, + ) diff --git a/web3/utils/compat/compat_py2.py b/web3/utils/compat/compat_py2.py new file mode 100644 index 0000000..4560f1d --- /dev/null +++ b/web3/utils/compat/compat_py2.py @@ -0,0 +1,4 @@ +from urlparse import ( # noqa: F401 + urlparse, + urlunparse, +) diff --git a/web3/utils/compat/compat_py3.py b/web3/utils/compat/compat_py3.py new file mode 100644 index 0000000..265d478 --- /dev/null +++ b/web3/utils/compat/compat_py3.py @@ -0,0 +1,4 @@ +from urllib.parse import ( # noqa: F401 + urlparse, + urlunparse, +) diff --git a/web3/utils/functional.py b/web3/utils/functional.py index 6601573..691720f 100644 --- a/web3/utils/functional.py +++ b/web3/utils/functional.py @@ -36,3 +36,4 @@ def cast_return(_type): cast_return_to_tuple = cast_return(tuple) +cast_return_to_dict = cast_return(dict) diff --git a/web3/utils/http.py b/web3/utils/http.py new file mode 100644 index 0000000..62a253f --- /dev/null +++ b/web3/utils/http.py @@ -0,0 +1,8 @@ +def construct_user_agent(class_name): + from web3 import __version__ as web3_version + + user_agent = 'Web3.py/{version}/{class_name}'.format( + version=web3_version, + class_name=class_name, + ) + return user_agent