XSC003 Token Contract
Overview
Example Xian Token Smart Contract based Dice game with random functions and house edge. It features a cryptographic permit system for approvals as well as a streaming functionality to support subscription type functionality.
This example demonstrates :
- How to use the
crypto
module to verify signatures. - How to use the
hashlib
module to generate hashes. - How to use the
datetime
module to handle dates and times.
Source Code
- The source code, including tests for this project can be found here: Xian GitHub
Running Tests
- For running the tests in this example, we recommend using the Contract Dev Environment.
Contract Code
python
balances = Hash(default_value=0)
metadata = Hash()
permits = Hash()
streams = Hash()
TransferEvent = LogEvent(
event="Transfer",
params={
"from": {"type": str, "idx": True},
"to": {"type": str, "idx": True},
"amount": {"type": (int, float, decimal)},
},
)
ApproveEvent = LogEvent(
event="Approve",
params={
"from": {"type": str, "idx": True},
"to": {"type": str, "idx": True},
"amount": {"type": (int, float, decimal)},
},
)
StreamCreatedEvent = LogEvent(
event="StreamCreated",
params={
"sender": {"type": str, "idx": True},
"receiver": {"type": str, "idx": True},
"stream_id": {"type": str, "idx": True},
"rate": {"type": (int, float, decimal)},
"begins": {"type": str},
"closes": {"type": str},
},
)
StreamBalanceEvent = LogEvent(
event="StreamBalance",
params={
"receiver": {"type": str, "idx": True},
"sender": {"type": str, "idx": True},
"stream_id": {"type": str, "idx": True},
"amount": {"type": (int, float, decimal)},
"balancer": {"type": str},
},
)
StreamCloseChangeEvent = LogEvent(
event="StreamCloseChange",
params={
"receiver": {"type": str, "idx": True},
"sender": {"type": str, "idx": True},
"stream_id": {"type": str, "idx": True},
"time": {"type": str},
},
)
StreamForfeitEvent = LogEvent(
event="StreamForfeit",
params={
"receiver": {"type": str, "idx": True},
"sender": {"type": str, "idx": True},
"stream_id": {"type": str, "idx": True},
"time": {"type": str},
},
)
StreamFinalizedEvent = LogEvent(
event="StreamFinalized",
params={
"receiver": {"type": str, "idx": True},
"sender": {"type": str, "idx": True},
"stream_id": {"type": str, "idx": True},
"time": {"type": str},
},
)
# XSC001
@construct
def seed():
balances[ctx.caller] = 1_000_000
metadata["token_name"] = "TEST TOKEN"
metadata["token_symbol"] = "TST"
metadata["token_logo_url"] = "https://some.token.url/test-token.png"
metadata["token_website"] = "https://some.token.url"
metadata["operator"] = ctx.caller
@export
def change_metadata(key: str, value: Any):
assert ctx.caller == metadata["operator"], "Only operator can set metadata."
metadata[key] = value
@export
def transfer(amount: float, to: str):
assert amount > 0, "Cannot send negative balances."
assert balances[ctx.caller] >= amount, "Not enough coins to send."
balances[ctx.caller] -= amount
balances[to] += amount
TransferEvent({"from": ctx.caller, "to": to, "amount": amount})
@export
def approve(amount: float, to: str):
assert amount >= 0, "Cannot approve negative balances."
balances[ctx.caller, to] = amount
ApproveEvent({"from": ctx.caller, "to": to, "amount": amount})
@export
def transfer_from(amount: float, to: str, main_account: str):
assert amount > 0, "Cannot send negative balances."
assert (
balances[main_account, ctx.caller] >= amount
), f"Not enough coins approved to send. You have {balances[main_account, ctx.caller]} and are trying to spend {amount}"
assert balances[main_account] >= amount, "Not enough coins to send."
balances[main_account, ctx.caller] -= amount
balances[main_account] -= amount
balances[to] += amount
TransferEvent({"from": main_account, "to": to, "amount": amount})
@export
def balance_of(address: str):
return balances[address]
# XSC002 / Permit
@export
def permit(owner: str, spender: str, value: float, deadline: str, signature: str) -> str:
deadline = strptime_ymdhms(deadline)
permit_msg = construct_permit_msg(owner, spender, value, str(deadline))
permit_hash = hashlib.sha3(permit_msg)
assert permits[permit_hash] is None, "Permit can only be used once."
assert now < deadline, "Permit has expired."
assert value >= 0, "Cannot approve negative balances!"
assert crypto.verify(owner, permit_msg, signature), "Invalid signature."
balances[owner, spender] = value
permits[permit_hash] = True
ApproveEvent({"from":owner, "to":spender, "amount":value})
return permit_hash
def construct_permit_msg(owner: str, spender: str, value: float, deadline: str):
return f"{owner}:{spender}:{value}:{deadline}:{ctx.this}:{chain_id}"
# XSC003 / Streaming Payments
SENDER_KEY = "sender"
RECEIVER_KEY = "receiver"
STATUS_KEY = "status"
BEGIN_KEY = "begins"
CLOSE_KEY = "closes"
RATE_KEY = "rate"
CLAIMED_KEY = "claimed"
STREAM_ACTIVE = "active"
STREAM_FINALIZED = "finalized"
STREAM_FORFEIT = "forfeit"
# Creates a new stream to a receiver from ctx.caller
# Stream can begin at any point in past / present / future
# Wrapper for perform_create_stream
@export
def create_stream(receiver: str, rate: float, begins: str, closes: str):
begins = strptime_ymdhms(begins)
closes = strptime_ymdhms(closes)
sender = ctx.caller
stream_id = perform_create_stream(sender, receiver, rate, begins, closes)
return stream_id
# Internal function used to create a stream from a permit or from a direct call from the sender
def perform_create_stream(
sender: str, receiver: str, rate: float, begins: datetime.datetime, closes: datetime.datetime
):
stream_id = hashlib.sha3(f"{sender}:{receiver}:{begins}:{closes}:{rate}")
assert streams[stream_id, STATUS_KEY] is None, "Stream already exists."
assert begins < closes, "Stream cannot begin after the close date."
assert rate > 0, "Rate must be greater than 0."
streams[stream_id, STATUS_KEY] = STREAM_ACTIVE
streams[stream_id, BEGIN_KEY] = begins
streams[stream_id, CLOSE_KEY] = closes
streams[stream_id, RECEIVER_KEY] = receiver
streams[stream_id, SENDER_KEY] = sender
streams[stream_id, RATE_KEY] = rate
streams[stream_id, CLAIMED_KEY] = 0
StreamCreatedEvent({"sender":sender, "receiver":receiver, "stream_id":stream_id, "rate":rate, "begins":str(begins), "closes":str(closes)})
return stream_id
# Creates a payment stream from a valid signature of a permit message
# Wrapper for perform_create_stream
@export
def create_stream_from_permit(
sender: str,
receiver: str,
rate: float,
begins: str,
closes: str,
deadline: str,
signature: str,
):
begins = strptime_ymdhms(begins)
closes = strptime_ymdhms(closes)
deadline = strptime_ymdhms(deadline)
assert now < deadline, "Permit has expired."
permit_msg = construct_stream_permit_msg(
sender, receiver, rate, begins, closes, deadline
)
permit_hash = hashlib.sha3(permit_msg)
assert permits[permit_hash] is None, "Permit can only be used once."
assert crypto.verify(sender, permit_msg, signature), "Invalid signature."
permits[permit_hash] = True
return perform_create_stream(sender, receiver, rate, begins, closes)
# Moves balance due from stream from sender to receiver.
# Called by `sender` or `receiver`
@export
def balance_stream(stream_id: str):
assert streams[stream_id, STATUS_KEY], "Stream does not exist."
assert (
streams[stream_id, STATUS_KEY] == STREAM_ACTIVE
), "You can only balance active streams."
assert now > streams[stream_id, BEGIN_KEY], "Stream has not started yet."
sender = streams[stream_id, SENDER_KEY]
receiver = streams[stream_id, RECEIVER_KEY]
assert ctx.caller in [
sender,
receiver,
], "Only sender or receiver can balance a stream."
closes = streams[stream_id, CLOSE_KEY]
begins = streams[stream_id, BEGIN_KEY]
rate = streams[stream_id, RATE_KEY]
claimed = streams[stream_id, CLAIMED_KEY]
# Calculate the amount of tokens that can be claimed
outstanding_balance = calc_outstanding_balance(begins, closes, rate, claimed)
assert outstanding_balance > 0, "No amount due on this stream."
claimable_amount = calc_claimable_amount(outstanding_balance, sender)
balances[sender] -= claimable_amount
balances[receiver] += claimable_amount
streams[stream_id, CLAIMED_KEY] += claimable_amount
StreamBalanceEvent({"receiver":receiver, "sender":sender, "stream_id":stream_id, "amount":claimable_amount, "balancer":ctx.caller})
# Sets a stream to expire at some point greater than or equal to the current time.
# If the new closes time is in the past, the stream is closed immediately
# If the new close time < begins, the stream is closed at begin time <invalidated>
# Called by `sender`
@export
def change_close_time(stream_id: str, new_close_time: str):
new_close_time = strptime_ymdhms(new_close_time)
assert streams[stream_id, STATUS_KEY], "Stream does not exist."
assert streams[stream_id, STATUS_KEY] == STREAM_ACTIVE, "Stream is not active."
sender = streams[stream_id, SENDER_KEY]
receiver = streams[stream_id, RECEIVER_KEY]
assert ctx.caller == sender, "Only sender can change the close time of a stream."
if (
new_close_time < streams[stream_id, BEGIN_KEY]
and now < streams[stream_id, BEGIN_KEY]
):
streams[stream_id, CLOSE_KEY] = streams[stream_id, BEGIN_KEY]
elif new_close_time <= now:
streams[stream_id, CLOSE_KEY] = now
else:
streams[stream_id, CLOSE_KEY] = new_close_time
StreamCloseChangeEvent(
{
"receiver": receiver,
"sender": sender,
"stream_id": stream_id,
"time": str(new_close_time),
}
)
# Set the stream inactive.
# A stream must be balanced before it can be finalized.
# Closes must be <= now
# Once a stream is finalized, it cannot be re-opened.
# Called by : `sender` or `receiver`
@export
def finalize_stream(stream_id: str):
assert streams[stream_id, STATUS_KEY], "Stream does not exist."
assert streams[stream_id, STATUS_KEY] == STREAM_ACTIVE, "Stream is not active."
sender = streams[stream_id, "sender"]
receiver = streams[stream_id, "receiver"]
assert ctx.caller in [
sender,
receiver,
], "Only sender or receiver can finalize a stream."
begins = streams[stream_id, BEGIN_KEY]
closes = streams[stream_id, CLOSE_KEY]
rate = streams[stream_id, RATE_KEY]
claimed = streams[stream_id, CLAIMED_KEY]
assert now <= closes, "Stream has not closed yet."
outstanding_balance = calc_outstanding_balance(begins, closes, rate, claimed)
assert outstanding_balance == 0, "Stream has outstanding balance."
streams[stream_id, STATUS_KEY] = STREAM_FINALIZED
StreamFinalizedEvent(
{
"receiver": receiver,
"sender": sender,
"stream_id": stream_id,
"time": str(now),
}
)
# Convenience method to close a stream, balance it and finalize it
# Called by `sender`
@export
def close_balance_finalize(stream_id: str):
change_close_time(stream_id=stream_id, new_close_time=str(now))
balance_finalize(stream_id=stream_id)
# Convenience method to balance a stream and finalize it
# Called by `receiver` or `sender`
@export
def balance_finalize(stream_id: str):
balance_stream(stream_id=stream_id)
finalize_stream(stream_id=stream_id)
# Forfeit a stream to the sender
# Called by `receiver`
@export
def forfeit_stream(stream_id: str) -> str:
assert streams[stream_id, STATUS_KEY], "Stream does not exist."
assert streams[stream_id, STATUS_KEY] == STREAM_ACTIVE, "Stream is not active."
receiver = streams[stream_id, RECEIVER_KEY]
sender = streams[stream_id, SENDER_KEY]
assert ctx.caller == receiver, "Only receiver can forfeit a stream."
streams[stream_id, STATUS_KEY] = STREAM_FORFEIT
streams[stream_id, CLOSE_KEY] = now
StreamForfeitEvent(
{
"receiver": receiver,
"sender": sender,
"stream_id": stream_id,
"time": str(now),
}
)
def calc_outstanding_balance(
begins: datetime.datetime, closes: datetime.datetime, rate: float, claimed: float
) -> float:
claimable_end_point = now if now < closes else closes
claimable_period = claimable_end_point - begins
claimable_seconds = claimable_period.seconds
amount_due = (rate * claimable_seconds) - claimed
return amount_due
def calc_claimable_amount(amount_due: float, sender: str) -> float:
return amount_due if amount_due < balances[sender] else balances[sender]
def construct_stream_permit_msg(
sender: str, receiver: str, rate: float, begins: str, closes: str, deadline: str
) -> str:
return f"{sender}:{receiver}:{rate}:{begins}:{closes}:{deadline}:{ctx.this}:{chain_id}"
def strptime_ymdhms(date_string: str) -> datetime.datetime:
return datetime.datetime.strptime(date_string, "%Y-%m-%d %H:%M:%S")