more gevent abstractions

This commit is contained in:
Piper Merriam 2017-01-10 08:59:58 -07:00
parent 9d2b1dd2de
commit 5bafda2393
13 changed files with 52 additions and 31 deletions

View File

@ -26,7 +26,7 @@ For testing you can use the `TestRPCProvider`. This depends on
following installation command.
```sh
pip install web3[Tester]
pip install web3[tester]
```
Then in your code:

View File

@ -18,7 +18,7 @@ Or to install with support for the ``TestRPCProvider`` and
.. code-block:: shell
$ pip install web3[Tester]
$ pip install web3[tester]
Installation from source can be done from the root of the project with the
following command.

View File

@ -2,10 +2,9 @@ pytest>=2.8.2
pytest-pythonpath>=0.3
tox>=1.8.0
eth-testrpc>=0.9.3
py-geth>=1.4.0
ethereum>=1.5.2
secp256k1>=0.13.1
rlp>=0.4.6
rlp>=0.4.6,<0.4.7
hypothesis>=3.4.2
flaky>=3.3.0
flake8==3.0.4

View File

@ -1,9 +1,9 @@
import pytest
from gevent import socket
from flaky import flaky
from web3.utils.async import socket
def get_open_port():
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

View File

@ -1,11 +1,11 @@
import pytest
from gevent import socket
from web3.providers.manager import RequestManager
from web3.providers.tester import (
TestRPCProvider,
is_testrpc_available,
)
from web3.utils.async import socket
def get_open_port():

View File

@ -1,20 +1,21 @@
import gevent
from web3.utils.async import sleep
def test_shh_filter(web3, skip_if_testrpc):
skip_if_testrpc(web3)
recieved_messages = []
shh_filter = web3.shh.filter({"topics":[web3.fromAscii("test")]})
shh_filter.watch(recieved_messages.append)
payloads = []
payloads.append(str.encode("payload1"))
web3.shh.post({"topics":[web3.fromAscii("test")], "payload":web3.fromAscii(payloads[len(payloads)-1])})
gevent.sleep(1)
sleep(1)
payloads.append(str.encode("payload2"))
web3.shh.post({"topics":[web3.fromAscii("test")], "payload":web3.fromAscii(payloads[len(payloads)-1])})
gevent.sleep(1)
sleep(1)
assert len(recieved_messages) > 1
for message in recieved_messages:
assert web3.toAscii(message["payload"]) in payloads

View File

@ -1,5 +1,6 @@
import random
import gevent
from web3.utils import async
def test_txpool_content(web3_empty):
@ -7,9 +8,10 @@ def test_txpool_content(web3_empty):
web3.miner.stop()
with gevent.Timeout(60):
with async.Timeout(60) as timeout:
while web3.miner.hashrate or web3.eth.mining:
gevent.sleep(random.random())
async.sleep(random.random())
timeout.check()
txn_1_hash = web3.eth.sendTransaction({
'from': web3.eth.coinbase,

View File

@ -1,5 +1,6 @@
import random
import gevent
from web3.utils import async
def test_txpool_inspect(web3_empty):
@ -7,9 +8,10 @@ def test_txpool_inspect(web3_empty):
web3.miner.stop()
with gevent.Timeout(60):
with async.Timeout(60) as timeout:
while web3.miner.hashrate or web3.eth.mining:
gevent.sleep(random.random())
async.sleep(random.random())
timeout.check()
txn_1_hash = web3.eth.sendTransaction({
'from': web3.eth.coinbase,

View File

@ -8,6 +8,7 @@ if 'WEB3_ASYNC_GEVENT' in os.environ:
socket,
threading,
make_server,
GreenletThread,
)
else:
from .stdlib_async import ( # noqa
@ -16,4 +17,5 @@ else:
socket,
threading,
make_server,
GreenletThread,
)

View File

@ -11,6 +11,7 @@ from gevent import ( # noqa: F401
sleep = gevent.sleep
spawn = gevent.spawn
GreenletThread = gevent.Greenlet
class Timeout(gevent.Timeout):

View File

@ -101,3 +101,14 @@ def spawn(target, *args, **kwargs):
thread.daemon = True
thread.start()
return thread
class GreenletThread(threading.Thread):
def __init__(self, target=None, args=None, kwargs=None):
if target is None:
target = self._run
super(GreenletThread, self).__init__(target=target, args=args, kwargs=kwargs)
self.daemon = True
def _run(self):
pass

View File

@ -1,6 +1,5 @@
import re
import random
import gevent
from .types import (
is_string,
@ -10,6 +9,10 @@ from .events import (
construct_event_topic_set,
construct_event_data_set,
)
from .async import (
sleep,
GreenletThread,
)
def construct_event_filter_params(event_abi,
@ -56,7 +59,7 @@ def construct_event_filter_params(event_abi,
return data_filters_set, filter_params
class Filter(gevent.Greenlet):
class Filter(GreenletThread):
callbacks = None
running = None
stopped = False
@ -66,7 +69,7 @@ class Filter(gevent.Greenlet):
self.web3 = web3
self.filter_id = filter_id
self.callbacks = []
gevent.Greenlet.__init__(self)
super(Filter, self).__init__()
def __str__(self):
return "Filter for {0}".format(self.filter_id)
@ -84,9 +87,9 @@ class Filter(gevent.Greenlet):
if self.is_valid_entry(entry):
callback_fn(self.format_entry(entry))
if self.poll_interval is None:
gevent.sleep(random.random())
sleep(random.random())
else:
gevent.sleep(self.poll_interval)
sleep(self.poll_interval)
def format_entry(self, entry):
"""
@ -108,7 +111,7 @@ class Filter(gevent.Greenlet):
if not self.running:
self.start()
gevent.sleep(0)
sleep(0)
def stop_watching(self, timeout=0):
self.running = False
@ -224,9 +227,9 @@ class ShhFilter(Filter):
if self.is_valid_entry(entry):
callback_fn(self.format_entry(entry))
if self.poll_interval is None:
gevent.sleep(random.random())
sleep(random.random())
else:
gevent.sleep(self.poll_interval)
sleep(self.poll_interval)
def stop_watching(self, timeout=0):
self.running = False

View File

@ -1,7 +1,5 @@
import random
import gevent
import rlp
from rlp.sedes import big_endian_int, binary, Binary
from rlp.utils import int_to_big_endian
@ -27,15 +25,17 @@ from .address import (
from .formatting import (
pad_left,
)
from . import async
def wait_for_transaction_receipt(web3, txn_hash, timeout=120):
with gevent.Timeout(timeout):
with async.Timeout(timeout) as _timeout:
while True:
txn_receipt = web3.eth.getTransactionReceipt(txn_hash)
if txn_receipt is not None:
break
gevent.sleep(random.random())
async.sleep(random.random())
_timeout.check()
return txn_receipt