Contract Name:
CurveRateProvider
Contract Source Code:
File 1 of 1 : CurveRateProvider
# pragma version 0.3.10
# pragma evm-version paris
"""
@title CurveRateProvider
@custom:version 1.0.1
@author Curve.Fi
@license Copyright (c) Curve.Fi, 2020-2024 - all rights reserved
@notice Provides quotes for coin pairs, iff coin pair is in a Curve AMM that the Metaregistry recognises.
"""
version: public(constant(String[8])) = "1.0.1"
from vyper.interfaces import ERC20Detailed
MAX_COINS: constant(uint256) = 8
MAX_QUOTES: constant(uint256) = 100
struct Quote:
source_token_index: uint256
dest_token_index: uint256
is_underlying: bool
amount_out: uint256
pool: address
source_token_pool_balance: uint256
dest_token_pool_balance: uint256
pool_type: uint8 # 0 for stableswap, 1 for cryptoswap, 2 for LLAMMA.
# Interfaces
interface AddressProvider:
def get_address(id: uint256) -> address: view
interface Metaregistry:
def find_pools_for_coins(source_coin: address, destination_coin: address) -> DynArray[address, 1000]: view
def get_coin_indices(_pool: address, _from: address, _to: address) -> (int128, int128, bool): view
def get_underlying_balances(_pool: address) -> uint256[MAX_COINS]: view
ADDRESS_PROVIDER: public(immutable(AddressProvider))
METAREGISTRY_ID: constant(uint256) = 7
STABLESWAP_META_ABI: constant(String[64]) = "get_dy_underlying(int128,int128,uint256)"
STABLESWAP_ABI: constant(String[64]) = "get_dy(int128,int128,uint256)"
CRYPTOSWAP_ABI: constant(String[64]) = "get_dy(uint256,uint256,uint256)"
@external
def __init__(address_provider: address):
ADDRESS_PROVIDER = AddressProvider(address_provider)
@external
@view
def get_quotes(source_token: address, destination_token: address, amount_in: uint256) -> DynArray[Quote, MAX_QUOTES]:
return self._get_quotes(source_token, destination_token, amount_in)
@external
@view
def get_aggregated_rate(source_token: address, destination_token: address) -> uint256:
amount_in: uint256 = 10**convert(ERC20Detailed(source_token).decimals(), uint256)
quotes: DynArray[Quote, MAX_QUOTES] = self._get_quotes(source_token, destination_token, amount_in)
return self.weighted_average_quote(
convert(ERC20Detailed(source_token).decimals(), uint256),
convert(ERC20Detailed(destination_token).decimals(), uint256),
quotes,
)
@internal
@pure
def weighted_average_quote(
source_token_decimals: uint256,
dest_token_decimals: uint256,
quotes: DynArray[Quote, MAX_QUOTES]
) -> uint256:
num_quotes: uint256 = len(quotes)
# Calculate total balance with normalization
total_balance: uint256 = 0
for i in range(num_quotes, bound=MAX_QUOTES):
source_balance_normalized: uint256 = quotes[i].source_token_pool_balance * 10**(18 - source_token_decimals)
dest_balance_normalized: uint256 = quotes[i].dest_token_pool_balance * 10**(18 - dest_token_decimals)
total_balance += source_balance_normalized + dest_balance_normalized
# Calculate weighted sum with normalization
weighted_avg: uint256 = 0
for i in range(num_quotes, bound=MAX_QUOTES):
source_balance_normalized: uint256 = quotes[i].source_token_pool_balance * 10**(18 - source_token_decimals)
dest_balance_normalized: uint256 = quotes[i].dest_token_pool_balance * 10**(18 - dest_token_decimals)
pool_balance_normalized: uint256 = source_balance_normalized + dest_balance_normalized
weight: uint256 = (pool_balance_normalized * 10**18) / total_balance # Use 18 decimal places for precision
weighted_avg += weight * quotes[i].amount_out / 10**18
return weighted_avg
@internal
@view
def _get_quotes(source_token: address, destination_token: address, amount_in: uint256) -> DynArray[Quote, MAX_QUOTES]:
quotes: DynArray[Quote, MAX_QUOTES] = []
metaregistry: Metaregistry = Metaregistry(ADDRESS_PROVIDER.get_address(METAREGISTRY_ID))
pools: DynArray[address, 1000] = metaregistry.find_pools_for_coins(source_token, destination_token)
if len(pools) == 0:
return quotes
# get pool types for each pool
for pool in pools:
# is it a stableswap pool? are the coin pairs part of a metapool?
pool_type: uint8 = self._get_pool_type(pool, metaregistry)
# get coin indices
i: int128 = 0
j: int128 = 0
is_underlying: bool = False
(i, j, is_underlying) = metaregistry.get_coin_indices(pool, source_token, destination_token)
# get balances
balances: uint256[MAX_COINS] = metaregistry.get_underlying_balances(pool)
balances_i: uint256 = balances[i]
balances_j: uint256 = balances[j]
# skip if pool is too small or if amount_in is zero
if 0 in [balances_i, balances_j] or amount_in == 0:
continue
# do a get_dy call and only save quote if call does not bork; use correct abi (in128 vs uint256)
quote: uint256 = self._get_pool_quote(i, j, amount_in, pool, pool_type, is_underlying)
# check if get_dy works and if so, append quote to dynarray
if quote > 0 and len(quotes) < MAX_QUOTES:
quotes.append(
Quote(
{
source_token_index: convert(i, uint256),
dest_token_index: convert(j, uint256),
is_underlying: is_underlying,
amount_out: quote,
pool: pool,
source_token_pool_balance: balances_i,
dest_token_pool_balance: balances_j,
pool_type: pool_type
}
)
)
return quotes
@internal
@view
def _get_pool_type(pool: address, metaregistry: Metaregistry) -> uint8:
# 0 for stableswap, 1 for cryptoswap, 2 for LLAMMA.
success: bool = False
response: Bytes[32] = b""
# check if cryptoswap
success, response = raw_call(
pool,
method_id("allowed_extra_profit()"),
max_outsize=32,
revert_on_failure=False,
is_static_call=True
)
if success:
return 1
# check if llamma
success, response = raw_call(
pool,
method_id("get_rate_mul()"),
max_outsize=32,
revert_on_failure=False,
is_static_call=True
)
if success:
return 2
return 0
@internal
@view
def _get_pool_quote(
i: int128,
j: int128,
amount_in: uint256,
pool: address,
pool_type: uint8,
is_underlying: bool
) -> uint256:
success: bool = False
response: Bytes[32] = b""
method_abi: Bytes[4] = b""
# choose the right abi:
if pool_type == 0 and is_underlying:
method_abi = method_id(STABLESWAP_META_ABI)
elif pool_type == 0 and not is_underlying:
method_abi = method_id(STABLESWAP_ABI)
else:
method_abi = method_id(CRYPTOSWAP_ABI)
success, response = raw_call(
pool,
concat(
method_abi,
convert(i, bytes32),
convert(j, bytes32),
convert(amount_in, bytes32),
),
max_outsize=32,
revert_on_failure=False,
is_static_call=True
)
if success:
return convert(response, uint256)
return 0