feat: badger generated badge art

This commit is contained in:
nftchance 2022-10-17 22:48:04 -05:00
parent b80b580a64
commit ab2552e764
4 changed files with 597 additions and 1 deletions

View File

@ -1,8 +1,10 @@
from rest_framework import routers
from .views import (
ArtViewSet,
BadgeViewSet,
)
router = routers.DefaultRouter()
router.register(r'art', ArtViewSet, basename='art')
router.register(r'badges', BadgeViewSet)

View File

@ -1,3 +1,6 @@
import base64
import random
from django.contrib.auth import get_user_model
from rest_framework import status, viewsets
@ -16,6 +19,235 @@ from .serializers import BadgeSerializer
User = get_user_model()
class ArtViewSet(viewsets.ViewSet):
def _encode(self, value):
max_smudge = 8
smudge = 0
if value.isdigit():
smudge = int(value)
elif value.islower():
smudge = ord(value) - 87
elif value.isupper():
smudge = ord(value) - 29
return [
1 + smudge % max_smudge,
smudge % 2 == 0,
]
def _handle_fingerprint(self, address, badge_id):
fingerprint = []
for char in f"{address}{badge_id}":
fingerprint.append(self._encode(char))
return fingerprint
def list(self, request):
organization = request.query_params.get('organization', None)
organization_ethereum_address = request.query_params.get(
'organization_ethereum_address', None)
badge_name = request.query_params.get('badge_name', None)
invert = request.query_params.get('inverse', False)
fingerprint = self._handle_fingerprint(
organization_ethereum_address, badge_name)
fill = "#fff"
if invert:
fill = "#000"
size = 500
blob_count = random.randint(2, 4)
blob, useblob = "", ""
colors = [
"#f06",
"#00FF9D",
"#00FFEB",
"#FF00EB",
"#FFBB00",
"#C668B9",
"#B823AF",
"#551CCD",
"#1330dd",
"#0fd3b2",
"#30E64C",
"#5540EA",
"#F19021"
]
# rotate blob position animation
for i in range(blob_count):
x_random = random.randint(0, size / 2)
y_random = random.randint(0, size / 2)
# create random svg blog
x_random_rotation = random.randint(0, 360)
y_random_rotation = random.randint(0, 360)
random_color = random.choice(colors)
animation_from = "f0 {x_random} {y_random}"
animation_to = f"360 {x_random} {y_random}"
# random direction of rotation
if random.randint(0, 1) == 0:
animation_from = f"360 {x_random_rotation} {y_random_rotation}"
animation_to = f"0 {x_random_rotation} {y_random_rotation}"
rotate_blob_animation = f"""
<animateTransform
attributeName="transform"
type="rotate"
from="{animation_from}"
to="{animation_to}"
dur="{random.randint(20,45)}s"
repeatCount="indefinite"
/>
"""
blob += f"""
<path
id="{"blob-%s" % i}"
d="M363.37-8.441c51.956,46.825,91.084,111.61,83.386,168.376S384.537,266.093,332.581,290.147C280.945,313.88,231.555,312.6,176.712,318.37s-115.458,18.6-168.7-5.452-98.781-84.669-89.48-135.984S-8.021,83.284,45.218,36.459C98.137-10.365,140.471-62.321,193.71-73.547S311.734-55.586,363.37-8.441Z"
transform="translate({x_random} {y_random})"
fill="{random_color}"
style="mix-blend-mode: multiply;"
>
{rotate_blob_animation}
</path>
"""
useblob += f"""
<use href="#{"blob-%s" % i}" filter="url(#blur)" />
"""
fingerprint_svg = ""
r = 2
spacer = (size - (r * 2 * len(fingerprint))) / (len(fingerprint) - 1)
width = (r * 2 + spacer) * len(fingerprint)
buffer = (size - width) / 2
for xi, item in enumerate(fingerprint):
x = buffer + (r * 2 + spacer) * xi
height = item[0]
for i in range(height):
y = size - r * (height - i) * 5
circle_fill = "#fff" if fill == "#000" else "#000"
# make the circles pulse with a keyframe animation
solid_animation = f"""
<animate
attributeName="r"
values="{r};{r * 1.5};{r}"
dur="3s"
repeatCount="indefinite"
begin="{i * .5}s"
fill="{circle_fill}"
/>
"""
fingerprint_svg += f"""
<circle
cx="{x}"
cy="{y}"
r="{r if not item[1] else r * 1.5}"
fill="{circle_fill}"
>
{solid_animation}
</circle>
"""
words = badge_name.split(" ")
lines = []
for word in words:
if len(word) > 14:
for i in range(0, len(word), 14):
lines.append(word[i:i + 14])
else:
lines.append(word)
text_color = "#000" if fill == "#fff" else "#fff"
# create a line for each word
badge_text = ""
line_height = 52
organization_line_height=30
if len(lines) > 3:
line_height = 40
organization_line_height=28
line_buffer = 10
line_x = 50
line_y = (size - (line_height) * len(lines) - line_buffer * (len(lines) - 1)) / 2
organization_y = 65
if len(lines) > 3:
line_y -= 40
organization_y += 5
for i, line in enumerate(lines):
badge_text += f"""
<text
x="{line_x}"
y="{line_y + (line_height + line_buffer) * i}"
font-size="{line_height}"
fill="{text_color}"
font-family="sans-serif"
font-weight="bold"
>
{line.upper()}
</text>
"""
organization_text = f"""
<text
x="50px"
y="{organization_y}%"
dominant-baseline="middle"
font-size="{organization_line_height}"
fill="{text_color}"
font-family="sans-serif"
opacity=".45"
>
{organization}
</text>
"""
svg = f"""
<svg xmlns="http://www.w3.org/2000/svg" width="{size}" height="{size}">
<rect width="100%" height="100%" fill="#fff" />
{blob}
<rect width="100%" height="100%" fill="{fill}" />
<filter id="blur">
<feGaussianBlur in="SourceGraphic" stdDeviation="30" />
</filter>
{useblob}
<rect width="100%" height="100%" fill="{fill}" opacity="{0.93 if fill == "#fff" else 0.75}" />
{fingerprint_svg}
{badge_text}
{organization_text}
</svg>
"""
# return base64 encoded svg
image = base64.b64encode(svg.encode('utf-8')).decode('utf-8')
url_ready_base64 = f"data:image/svg+xml;base64,{image}"
return Response({
"image": url_ready_base64,
})
class BadgeViewSet(viewsets.ModelViewSet):
queryset = Badge.objects.all()
serializer_class = BadgeSerializer

View File

@ -0,0 +1,362 @@
import requests
from web3 import Web3
from django.conf import settings
from django.contrib.auth import get_user_model
from badge.models import Badge
from balance.models import Balance, Transaction
from organization.models import Organization
from .abis import ORGANIZATION as ORGANIZATION_ABI
ALCHEMY_PROVIDER_URL = "wss://polygon-mainnet.g.alchemy.com/v2/YOf5rgn_gm9hY1UPxUrw1zcocM-Ksjte"
w3 = Web3(Web3.WebsocketProvider(ALCHEMY_PROVIDER_URL))
User = get_user_model()
class Loader:
def __init__(self):
self.loader_mapping = {
# Factory events
"OrganizationCreated": [
self.handle_organization_created,
],
"BadgeUpdated": [
self.handle_badge_updated
],
"DelegateUpdated": [
self.handle_delegate_updated
],
"OrganizationUpdated": [self.handle_organization_updated],
"OwnershipTransferred": [self.handle_ownership_transferred],
"PaymentTokenDeposited": [self.handle_payment_token_deposited],
"TransferSingle": [
self.handle_transfer_single
],
"TransferBatch": [
self.handle_transfer_batch
],
"URI": [self.handle_uri],
}
self.contracts = {}
def _handle_users(self, ethereum_address):
if not User.objects.filter(ethereum_address=ethereum_address).exists():
return User.objects.create_user(
ethereum_address=ethereum_address)
return User.objects.get(
ethereum_address=ethereum_address)
def _handle_badge_user_balance_changes(self, badge, user, balance):
if balance.amount > 0:
badge.users.add(user)
badge.save()
elif user in badge.users.all():
badge.users.remove(user)
else:
return
badge.save()
def _handle_user_balance(self, i, event, organization, address_field):
# get the from user
user = self._handle_users(event['args'][address_field])
# get the balance
if event['event'] == "TransferSingle":
token_ids = [event['args']['id']]
values = [event['args']['value']]
else:
token_ids = event['args']['ids']
values = event['args']['values']
for i, token_id in enumerate(token_ids):
balance, created = Balance.objects.get_or_create(
user=user,
organization=organization,
token_id=token_id
)
# check if transaction is not already in balance
transaction, created = Transaction.objects.get_or_create(
tx_hash=event['transactionHash'].hex(),
)
if transaction not in balance.transactions.all():
# apply the balance change if not a mint from 0x0
change = values[i]
if address_field == "from":
change *= - 1
if event['args']['from'] == "0x0000000000000000000000000000000000000000":
change = -0
balance.transactions.add(transaction)
balance.amount += change
balance.save()
# Add the user to the badge if not already in it if the balance is > 0
badge = organization.badges.get(token_id=token_id)
if badge is None:
badge = self.handle_badge_created(event, None)
# Add the user to the badge if not already in it if the balance is > 0
# or remove them if the balance is 0
self._handle_badge_user_balance_changes(badge, user, balance)
def get_organization_contract(self, ethereum_address):
if ethereum_address not in self.contracts:
self.contracts[ethereum_address] = w3.eth.contract(
address=ethereum_address,
abi=ORGANIZATION_ABI
)
return self.contracts[ethereum_address]
def handle_organization_created(self, event, chained_response):
created = False
if not Organization.objects.filter(ethereum_address=event["args"]["organization"]).exists():
organization, created = Organization.objects.get_or_create(
ethereum_address=event["args"]["organization"],
name="Loading"
)
response = "Organization created"
else:
organization = Organization.objects.get(
ethereum_address=event["args"]["organization"]
)
response = "Organization already exists"
if created or not organization.owner:
organization.is_active = True
organization.chain = "Polygon"
organization.owner = self._handle_users(event["args"]["owner"])
organization.save()
response = "Organization management setup"
return (response, event['args'])
def handle_organization_updated(self, event, chained_response):
# Use the organization that was created in the OrganizationCreated event
if chained_response is not None:
organization = Organization.objects.get(
ethereum_address=chained_response[1]["organization"]
)
else:
organization = Organization.objects.get(
ethereum_address=event["address"]
)
if organization is None:
return ("Organization does not exist", event['args'])
organization_contract = self.get_organization_contract(
organization.ethereum_address)
changed = False
if not organization.symbol:
organization.symbol = organization_contract.functions.symbol().call()
uri = organization_contract.functions.contractURI().call()
organization.contract_uri_hash = uri.split("/ipfs/")[1]
changed = True
if organization.contract_uri_hash and (
organization.name == "Loading"
or not organization.description
or not organization.image_hash
):
url = f"{settings.PINATA_INDEXER_URL}{organization.contract_uri_hash}"
response = requests.get(url)
if response.status_code == 200:
data = response.json()
organization.name = data["name"]
organization.description = data["description"]
organization.image_hash = data["image"].split("/ipfs/")[1]
changed = True
if changed:
organization.save()
return ("Organization details updated", event['args'])
def handle_ownership_transferred(self, event, chained_response):
# get the address of the organization
organization = Organization.objects.get(
ethereum_address=event["address"]
)
if organization is None:
return ("Organization does not exist", event['args'])
organization.owner = self._handle_users(event["args"]["newOwner"])
organization.save()
return ("Need to update the organization owner", event['args'])
def handle_payment_token_deposited(self, event, chained_response):
pass
def handle_transfer_batch(self, event, chained_response):
# when we detect a new transfer, update the Balance model for the user
# get the address of the organization
organization = Organization.objects.get(
ethereum_address=event["address"]
)
if organization is None:
return ("Organization does not exist", event['args'])
# Update the balance of the `to` and `from` addresses
for i in range(len(event['args']['ids'])):
self._handle_user_balance(i, event, organization, "from")
self._handle_user_balance(i, event, organization, "to")
return ("Balance updated", event['args'])
def handle_transfer_single(self, event, chained_response):
# when we detect a new transfer, update the Balance model for the user
# get the address of the organization
organization = Organization.objects.get(
ethereum_address=event["address"]
)
if organization is None:
return ("Organization does not exist", event['args'])
# Update the balance of the `to` and `from` addresses
self._handle_user_balance(0, event, organization, "from")
self._handle_user_balance(0, event, organization, "to")
return ("Balance updated", event['args'])
def handle_badge_updated(self, event, chained_response):
changed = False
organization = Organization.objects.get(
ethereum_address=event["address"]
)
if organization is None:
return ("Organization does not exist", event['args'])
badge, created = organization.badges.get_or_create(
token_id=event['args']['id']
)
response = "Badge updated"
if created or not badge.token_uri:
badge.is_active = True
organization_contract = self.get_organization_contract(
organization.ethereum_address)
badge.token_uri = organization_contract.functions.uri(
event['args']['id']).call()
badge.account_bound = organization_contract.functions.getAccountBound(
event['args']['id']).call()
badge.signer_ethereum_address = organization_contract.functions.getSigner(
event['args']['id']).call()
changed = True
response = "Badge created"
if not badge.name or not badge.description or not badge.image_hash:
# use the 1155 uri spec to replace id with the token id
url = f"{badge.token_uri}".replace(
"{id}", str(event['args']['id']))
if "http" not in url:
url = f"{settings.PINATA_INDEXER_URL}{url}"
response = requests.get(url)
if response.status_code == 200:
data = response.json()
badge.name = data["name"]
badge.description = data["description"]
badge.image_hash = data["image"].split("/ipfs/")[1]
changed = True
response = "Badge details updated"
if changed:
badge.save()
return (response, event['args'])
def handle_delegate_updated(self, event, chained_response):
# get the address of the organization
organization = Organization.objects.get(
ethereum_address=event["address"]
)
if organization is None:
return ("Organization does not exist", event['args'])
# get the badge that was updated
badge = organization.badges.get(token_id=event['args']['id'])
if badge is None:
return ("Badge does not exist", event['args'])
# get the user that was updated
user = self._handle_users(event["args"]["delegate"])
if user is None:
return ("User does not exist", event['args'])
# add the user to the badge delegates if the args are true
if event['args']['isDelegate']:
badge.delegates.add(user)
else:
badge.delegates.remove(user)
badge.save()
return ("Delegate updated", event['args'])
def handle_uri(self, event, chained_response):
# get the address of the organization
organization = Organization.objects.get(
ethereum_address=event["address"]
)
if organization is None:
return ("Organization does not exist", event['args'])
# get the badge that was updated
badge = organization.badges.get(token_id=event['args']['id'])
if badge is None:
return ("Badge does not exist", event['args'])
badge.token_uri = event['args']['value']
badge.save()
return ("Badge uri updated", event['args'])
def handle_events(self, events):
event_responses = []
response = None
for event in events:
response = None
if 'event' in event:
if event['event'] in self.loader_mapping:
for handler in self.loader_mapping[event['event']]:
response = handler(event, response)
event_responses.append(response)
else:
event_responses.append(
("Event not handled", event['event'], event['args']))
else:
event_responses.append(("Event not decoded", event))
return event_responses

View File

@ -40,7 +40,7 @@ services:
env_file:
- ./.env
volumes:
- .:/api/code
- ./api:/code
depends_on:
- badger_db
links: