import threading
import time
from typing import Dict, List, Optional, Union
import web3.main
from eth_typing import BlockNumber
from web3 import Web3
from web3.datastructures import AttributeDict
from arbitrum_py.data_entities.constants import ARB_SYS_ADDRESS, NODE_INTERFACE_ADDRESS
from arbitrum_py.data_entities.errors import ArbSdkError
from arbitrum_py.data_entities.message import ChildToParentMessageStatus
from arbitrum_py.data_entities.networks import get_arbitrum_network
from arbitrum_py.data_entities.signer_or_provider import (
SignerOrProvider,
SignerProviderUtils,
)
from arbitrum_py.utils.event_fetcher import EventFetcher, FetchedEvent
from arbitrum_py.utils.helper import CaseDict, format_contract_output, load_contract
from arbitrum_py.utils.lib import (
get_block_ranges_for_l1_block,
is_arbitrum_chain,
)
# Same constants you had before, now referencing child->parent
ASSERTION_CREATED_PADDING = 50
ASSERTION_CONFIRMED_PADDING = 20
child_block_range_cache = {}
child_block_cache_lock = threading.Lock()
[docs]def get_child_block_range_cache_key(child_chain_id: int, l1_block_number: int) -> str:
"""
Create a unique cache key for the child block range lookup.
Args:
child_chain_id: The chain ID of the child chain
l1_block_number: The L1 block number to create a cache key for
Returns:
A unique string key combining chain ID and block number
"""
return f"{child_chain_id}-{l1_block_number}"
[docs]def set_child_block_range_cache(key: str, value: List[Optional[int]]) -> None:
"""
Cache the child block range under the given key.
Args:
key: The cache key to store the value under
value: The block range values to cache
"""
child_block_range_cache[key] = value
[docs]def get_block_ranges_for_l1_block_with_cache(
parent_provider: Web3, child_provider: Web3, for_l1_block: int
) -> List[Optional[int]]:
"""
Get block ranges for an L1 block using a shared cache to avoid repeated calls.
Args:
parent_provider: Web3 provider instance for the parent chain
child_provider: Web3 provider instance for the child chain
for_l1_block: The L1 block number to get ranges for
Returns:
List of block ranges, where each element can be None
"""
child_chain_id = child_provider.eth.chain_id
key = get_child_block_range_cache_key(child_chain_id, for_l1_block)
# Return cached value if exists
if key in child_block_range_cache:
return child_block_range_cache[key]
# Otherwise, lock so only one fetch is in-flight
with child_block_cache_lock:
# Maybe it got cached while we waited for the lock
if key in child_block_range_cache:
return child_block_range_cache[key]
# Actually fetch
child_block_range = get_block_ranges_for_l1_block(parent_provider, for_l1_block)
set_child_block_range_cache(key, child_block_range)
return child_block_range
[docs]class ChildToParentMessageNitro:
"""
Base functionality for 'nitro' child->parent messages.
This class provides the core functionality for handling messages sent from a child chain
to its parent chain in the Arbitrum Nitro protocol.
"""
[docs] def __init__(self, event: dict) -> None:
"""
Initialize a new ChildToParentMessageNitro instance.
Args:
event: The ChildToParentTx event dictionary from the child chain
"""
self.event = event
[docs] @classmethod
def from_event(
cls, parent_signer_or_provider: Web3, event: dict, parent_provider: Optional[Web3] = None
) -> Union["ChildToParentMessageReaderNitro", "ChildToParentMessageWriterNitro"]:
"""
Create a message reader or writer based on the provided signer/provider.
Args:
parent_signer_or_provider: Signer or provider for the parent chain
event: The event data containing message details
parent_provider: Optional override provider for the parent chain
Returns:
Either a reader or writer instance based on input type
"""
if SignerProviderUtils.is_signer(parent_signer_or_provider):
return ChildToParentMessageWriterNitro(parent_signer_or_provider, event, parent_provider)
else:
return ChildToParentMessageReaderNitro(parent_signer_or_provider, event)
[docs] @staticmethod
def get_child_to_parent_events(
child_provider: Web3,
filter_dict: dict,
position: Optional[int] = None,
destination: Optional[str] = None,
hash: Optional[str] = None,
) -> List[dict]:
"""
Fetch Child->Parent events from the child chain.
Args:
child_provider: Web3 provider for the child chain
filter_dict: Dictionary containing filter parameters (fromBlock, toBlock)
position: Optional position to filter events by
destination: Optional destination address to filter events by
hash: Optional hash to filter events by
Returns:
List of matching event dictionaries
"""
event_fetcher = EventFetcher(child_provider)
argument_filters = {}
if position:
argument_filters["position"] = position
if destination:
argument_filters["destination"] = destination
if hash:
argument_filters["hash"] = hash
# The underlying event name is still "L2ToL1Tx" on-chain, even though we've renamed the TypeScript interface.
events = event_fetcher.get_events(
contract_factory="ArbSys",
event_name="L2ToL1Tx",
argument_filters=argument_filters,
filter={
"fromBlock": filter_dict["fromBlock"],
"toBlock": filter_dict["toBlock"],
"address": ARB_SYS_ADDRESS,
**filter_dict,
},
)
return events
[docs]class ChildToParentMessageReaderNitro(ChildToParentMessageNitro):
"""
Read-only logic for child->parent messages in Nitro. Replaces L2ToL1MessageReaderNitro.
"""
[docs] def __init__(self, parent_provider: web3.main.Web3, event: CaseDict) -> None:
super().__init__(event)
self.parent_provider = parent_provider
self.send_root_hash = None
self.send_root_size = None
self.send_root_confirmed = None
self.outbox_address = None
self.l1_batch_number = None
[docs] def get_outbox_proof(self, child_provider: web3.main.Web3) -> List[bytes]:
"""
Equivalent to getOutboxProof in the TS code. Constructs a proof to execute the message.
Args:
child_provider: Web3 provider for the child chain
Returns:
The outbox proof
"""
send_props = self.get_send_props(child_provider)
send_root_size = send_props.get("sendRootSize", None)
if not send_root_size:
raise ArbSdkError("Assertion not yet created, cannot get proof.")
node_interface_contract = load_contract(
provider=child_provider,
contract_name="NodeInterface",
address=NODE_INTERFACE_ADDRESS,
)
# callStatic.constructOutboxProof(...) in TS
outbox_proof_params = node_interface_contract.functions.constructOutboxProof(
send_root_size, self.event["position"]
).call()
outbox_proof_params = format_contract_output(
node_interface_contract,
"constructOutboxProof",
outbox_proof_params,
)
return outbox_proof_params["proof"]
[docs] def has_executed(self, child_provider: web3.main.Web3) -> bool:
"""
Checks if the message is already executed by calling Outbox.isSpent(position).
Args:
child_provider: Web3 provider for the child chain
Returns:
Whether the message has been executed
"""
child_chain = get_arbitrum_network(child_provider)
outbox_contract = load_contract(
provider=self.parent_provider,
contract_name="Outbox",
address=child_chain.ethBridge.outbox,
)
return outbox_contract.functions.isSpent(self.event["position"]).call()
[docs] def status(self, child_provider: web3.main.Web3) -> ChildToParentMessageStatus:
"""
Returns the status of this message (UNCONFIRMED, CONFIRMED, or EXECUTED).
Args:
child_provider: Web3 provider for the child chain
Returns:
The status of the message
"""
send_props = self.get_send_props(child_provider)
if not send_props.get("sendRootConfirmed"):
return ChildToParentMessageStatus.UNCONFIRMED
# If the send root is confirmed, check if the message was executed
executed = self.has_executed(child_provider)
return ChildToParentMessageStatus.EXECUTED if executed else ChildToParentMessageStatus.CONFIRMED
[docs] def parse_node_created_assertion(self, fetched_event: FetchedEvent) -> Dict[str, Dict[str, bytes]]:
"""
For a classic RollupUserLogic NodeCreated event.
Replaces parseNodeCreatedAssertion from TS code.
Args:
fetched_event: The event data
Returns:
The parsed assertion data
"""
return {
"afterState": {
"blockHash": fetched_event["event"]["assertion"]["afterState"]["globalState"]["bytes32Vals"][0],
"sendRoot": fetched_event["event"]["assertion"]["afterState"]["globalState"]["bytes32Vals"][1],
}
}
[docs] def parse_assertion_created_event(self, fetched_event):
"""
For a BoldRollupUserLogic AssertionCreated event (BoLD).
Replaces parseAssertionCreatedEvent from TS code.
Args:
fetched_event: The event data
Returns:
The parsed assertion data
"""
# Both NodeCreated and AssertionCreated have similar structure. BoLD has an 'assertionHash'.
return {
"afterState": {
"blockHash": fetched_event["event"]["assertion"]["afterState"]["globalState"]["bytes32Vals"][0],
"sendRoot": fetched_event["event"]["assertion"]["afterState"]["globalState"]["bytes32Vals"][1],
}
}
[docs] def is_assertion_created_log(self, fetched_event: dict) -> bool:
"""
Distinguishes between NodeCreated (legacy) vs. AssertionCreated (BoLD).
In TS, we look for 'event.challengeManager != undefined'.
Args:
fetched_event: The event data
Returns:
Whether the event is an AssertionCreated log
"""
# We'll look for 'assertionHash' or 'challengeManager' or do a simpler check here:
return "assertionHash" in fetched_event["event"]
[docs] def get_block_from_assertion_log(
self, child_provider: web3.main.Web3, fetched_event: Optional[FetchedEvent] = None
) -> AttributeDict:
"""
Merges logic from getBlockFromNodeLog + getBlockFromAssertionLog in TS.
Args:
child_provider: Web3 provider for the child chain
fetched_event: The event data
Returns:
The block data
"""
arbitrum_provider = child_provider
if not fetched_event:
# If no logs found, default block 0
return arbitrum_provider.eth.get_block(0)
if self.is_assertion_created_log(fetched_event):
parsed = self.parse_assertion_created_event(fetched_event)
else:
parsed = self.parse_node_created_assertion(fetched_event)
block_hash = parsed["afterState"]["blockHash"]
send_root = parsed["afterState"]["sendRoot"]
child_block = arbitrum_provider.eth.get_block(block_hash)
if not child_block:
raise ArbSdkError(f"Block not found. {block_hash}")
if child_block["sendRoot"] != Web3.to_hex(send_root):
raise ArbSdkError(
f"Child chain block send root doesn't match assertion log. {child_block['sendRoot']} {send_root}"
)
return child_block
[docs] def get_block_from_assertion_id(self, rollup_contract, assertion_id, child_provider):
"""
Merges getBlockFromNodeNum with the BoLD path using getAssertion.
In TS, we do rollup.getNode(...) vs rollup.getAssertion(...).
Args:
rollup_contract: The rollup contract instance
assertion_id: The assertion ID
child_provider: Web3 provider for the child chain
Returns:
The block data
"""
# Distinguish whether this is a 'BoldRollupUserLogic' by trying a call
# But in Python, we might just try to call .extraChallengeTimeBlocks() and catch.
# We'll do a simpler check: see if getAssertion(...) is valid.
# For your use-case, you may implement a more robust detection logic.
is_bold = False
try:
# If this call fails, it's classic
rollup_contract.functions.extraChallengeTimeBlocks().call()
except Exception:
is_bold = True
if is_bold:
# The BoLD rollup contract has a method getAssertion(hash)
# We'll attempt something like: rollup_contract.functions.getAssertion(assertionHash).call()
# But the input might be a string, not a BigNumber
assertion_data = rollup_contract.functions.getAssertion(assertion_id).call()
assertion_data = format_contract_output(rollup_contract, "getAssertion", assertion_data)
created_at_block = assertion_data["createdAtBlock"]
else:
# Classic path
node = rollup_contract.functions.getNode(assertion_id).call()
node = format_contract_output(rollup_contract, "getNode", node)
created_at_block = node["createdAtBlock"]
# Convert to Python int
created_at_block = int(created_at_block)
created_from_block = created_at_block
created_to_block = created_at_block
# If parent is Arbitrum, then child is Orbit. We try to find the child block range
if is_arbitrum_chain(self.parent_provider):
# Some or all of these calls might fail, so we do a try-catch fallback:
success = False
# One approach: call nodeInterface.l2BlockRangeForL1() if available
try:
node_interface = load_contract(
provider=self.parent_provider,
contract_name="NodeInterface",
address=NODE_INTERFACE_ADDRESS,
)
block_range = node_interface.functions.l2BlockRangeForL1(created_at_block).call()
block_range = format_contract_output(node_interface, "l2BlockRangeForL1", block_range)
created_from_block = block_range["firstBlock"]
created_to_block = block_range["lastBlock"]
success = True
except Exception:
pass
if not success:
try:
# fallback: do the binary search approach
child_block_range = get_block_ranges_for_l1_block_with_cache(
self.parent_provider, child_provider, created_at_block
)
start_block, end_block = child_block_range
if not start_block or not end_block:
raise Exception()
created_from_block = start_block
created_to_block = end_block
except Exception:
# fallback all the way to naive approach
created_from_block = created_at_block
created_to_block = created_at_block
# Now let's fetch the actual event from logs
event_fetcher = EventFetcher(rollup_contract.w3)
if is_bold:
# We're searching for AssertionCreated(assertionHash)
logs = event_fetcher.get_events(
contract_factory=rollup_contract,
event_name="AssertionCreated",
argument_filters={"assertionHash": assertion_id},
filter={
"fromBlock": created_from_block,
"toBlock": created_to_block,
"address": rollup_contract.address,
},
)
else:
# Searching for NodeCreated(nodeNum)
logs = event_fetcher.get_events(
contract_factory=rollup_contract,
event_name="NodeCreated",
argument_filters={"nodeNum": assertion_id},
filter={
"fromBlock": created_from_block,
"toBlock": created_to_block,
"address": rollup_contract.address,
},
)
if len(logs) > 1:
raise ArbSdkError(
f"Unexpected number of AssertionCreated/NodeCreated events. Expected 0 or 1, got {len(logs)}."
)
return self.get_block_from_assertion_log(child_provider, logs[0] if logs else None)
[docs] def get_batch_number(self, child_provider) -> Optional[int]:
"""
findBatchContainingBlock parallels TS logic, but might fail if the block doesn't exist yet.
Args:
child_provider: Web3 provider for the child chain
Returns:
The batch number
"""
if self.l1_batch_number is None:
try:
node_interface_contract = load_contract(
provider=child_provider,
contract_name="NodeInterface",
address=NODE_INTERFACE_ADDRESS,
)
res = node_interface_contract.functions.findBatchContainingBlock(self.event["arbBlockNum"]).call()
self.l1_batch_number = int(res)
except Exception:
pass
return self.l1_batch_number
[docs] def get_send_props(self, child_provider: web3.main.Web3) -> Dict[str, Optional[Union[int, str, bool]]]:
"""
Merges logic from getSendProps in TS: checks whether the node is confirmed or not.
If so, we store sendRootSize, sendRootHash, sendRootConfirmed, etc.
Args:
child_provider: Web3 provider for the child chain
Returns:
The send properties
"""
if not self.send_root_confirmed:
child_chain = get_arbitrum_network(child_provider)
rollup_contract = load_contract(
provider=self.parent_provider,
contract_name="RollupUserLogic", # or BoldRollupUserLogic if determined
address=child_chain.ethBridge.rollup,
)
# latestConfirmed is the ID (nodeNum or assertionHash) of the last confirmed node
latest_confirmed = rollup_contract.functions.latestConfirmed().call()
# This merges classic or bold logic in a single path, see get_block_from_assertion_id
confirmed_block = self.get_block_from_assertion_id(rollup_contract, latest_confirmed, child_provider)
send_root_size_confirmed = int(Web3.to_int(hexstr=confirmed_block["sendCount"]))
if send_root_size_confirmed > self.event["position"]:
self.send_root_size = send_root_size_confirmed
self.send_root_hash = confirmed_block["sendRoot"]
self.send_root_confirmed = True
else:
# latestNodeCreated or latestAssertionCreated
# For Bold, this might be an assertionHash; for classic, it's a nodeNum.
if rollup_contract.functions.latestNodeCreated:
latest_created = rollup_contract.functions.latestNodeCreated().call()
else:
# if that fails, it's a bold approach
# you'd adapt to a separate bold method
pass
# If there's a strictly larger node number / assertion ID
if latest_created > latest_confirmed:
unconfirmed_block = self.get_block_from_assertion_id(
rollup_contract, latest_created, child_provider
)
send_root_size_unconfirmed = int(Web3.to_int(hexstr=unconfirmed_block["sendCount"]))
if send_root_size_unconfirmed > self.event["position"]:
self.send_root_size = send_root_size_unconfirmed
self.send_root_hash = unconfirmed_block["sendRoot"]
return {
"sendRootSize": self.send_root_size,
"sendRootHash": self.send_root_hash,
"sendRootConfirmed": self.send_root_confirmed,
}
[docs] def wait_until_ready_to_execute(
self,
child_provider: Web3,
retry_delay: int = 1000,
) -> ChildToParentMessageStatus:
"""
Wait repeatedly until the outbox entry (assertion) is confirmed, so the message can be executed.
Warning:
This operation may take a very long time (1 week+) as outbox entries
are only created when the corresponding node is confirmed.
Args:
child_provider: Web3 provider for the child chain
retry_delay: Milliseconds to wait between status checks
Returns:
Final message status (either EXECUTED or CONFIRMED)
"""
while True:
current_status = self.status(child_provider)
if current_status in [
ChildToParentMessageStatus.EXECUTED,
ChildToParentMessageStatus.CONFIRMED,
]:
return current_status
# Sleep for the specified delay (converting ms to seconds)
time.sleep(retry_delay / 1000.0)
[docs] def get_first_executable_block(self, child_provider) -> Optional[BlockNumber]:
"""
getFirstExecutableBlock from the TS code:
- If message can be or is already executed, return None
- Otherwise, find the earliest block in which it can be executed
Args:
child_provider: Web3 provider for the child chain
Returns:
The first executable block number
"""
child_chain = get_arbitrum_network(child_provider)
rollup_contract = load_contract(
provider=self.parent_provider,
contract_name="RollupUserLogic",
address=child_chain.ethBridge.rollup,
)
current_status = self.status(child_provider)
if current_status in (
ChildToParentMessageStatus.EXECUTED,
ChildToParentMessageStatus.CONFIRMED,
):
return None
if current_status != ChildToParentMessageStatus.UNCONFIRMED:
raise ArbSdkError("ChildToParentMessage expected to be UNCONFIRMED")
latest_block = self.parent_provider.eth.block_number
event_fetcher = EventFetcher(self.parent_provider)
# We check either NodeCreated or AssertionCreated logs
# For simplicity, we assume classic here (NodeCreated).
# If bold, you'd search for AssertionCreated instead.
logs = event_fetcher.get_events(
contract_factory=rollup_contract,
event_name="NodeCreated",
argument_filters={},
filter={
"fromBlock": max(
latest_block - child_chain.confirmPeriodBlocks - ASSERTION_CONFIRMED_PADDING,
0,
),
"toBlock": "latest",
"address": rollup_contract.address,
},
)
# Sort them in ascending nodeNum order
logs.sort(key=lambda x: x["event"]["nodeNum"])
# Get the last block from the last NodeCreated event
last_child_block = self.get_block_from_assertion_log(child_provider, logs[-1] if logs else None)
last_send_count = int(last_child_block["sendCount"]) if last_child_block else 0
# If the last node does not include this position,
# we assume we must wait the max time for a new node + confirmation
if last_send_count <= self.event["position"]:
return (
child_chain.confirmPeriodBlocks + ASSERTION_CREATED_PADDING + ASSERTION_CONFIRMED_PADDING + latest_block
)
# Otherwise, do a binary search in logs to find the first node whose sendCount > position
left, right = 0, len(logs) - 1
found_log = logs[-1] if logs else None
while left <= right:
mid = (left + right) // 2
test_log = logs[mid]
child_block = self.get_block_from_assertion_log(child_provider, test_log)
send_count = int(child_block["sendCount"])
if send_count > self.event["position"]:
found_log = test_log
right = mid - 1
else:
left = mid + 1
if not found_log:
# No logs, fallback
return (
child_chain.confirmPeriodBlocks + ASSERTION_CREATED_PADDING + ASSERTION_CONFIRMED_PADDING + latest_block
)
earliest_node_with_exit = found_log["event"]["nodeNum"]
node = rollup_contract.functions.getNode(earliest_node_with_exit).call()
node = format_contract_output(rollup_contract, "getNode", node)
return node["deadlineBlock"] + ASSERTION_CONFIRMED_PADDING
[docs]class ChildToParentMessageWriterNitro(ChildToParentMessageReaderNitro):
"""
Read+Write access for nitro Child->Parent messages.
Replaces L2ToL1MessageWriterNitro.
"""
[docs] def __init__(self, parent_signer: SignerOrProvider, event: CaseDict, parent_provider: None = None) -> None:
super().__init__(parent_provider if parent_provider else parent_signer.provider, event)
self.parent_signer = parent_signer
[docs] def execute(self, child_provider: web3.main.Web3, overrides: None = None) -> AttributeDict:
"""
Executes the Child->Parent message on the parent chain once the outbox entry is confirmed.
Throws if message is not yet CONFIRMED.
Args:
child_provider: Web3 provider for the child chain
overrides: Optional transaction overrides
Returns:
The transaction receipt
"""
current_status = self.status(child_provider)
if current_status != ChildToParentMessageStatus.CONFIRMED:
raise ArbSdkError(
f"Cannot execute message. Status is: {current_status} but must be {ChildToParentMessageStatus.CONFIRMED}."
)
proof = self.get_outbox_proof(child_provider)
child_chain = get_arbitrum_network(child_provider)
outbox_contract = load_contract(
provider=self.parent_signer.provider,
contract_name="Outbox",
address=child_chain.ethBridge.outbox,
)
if overrides is None:
overrides = {}
if "from" not in overrides:
overrides["from"] = self.parent_signer.account.address
execute_tx = outbox_contract.functions.executeTransaction(
proof,
self.event["position"],
self.event["caller"],
self.event["destination"],
self.event["arbBlockNum"],
self.event["ethBlockNum"],
self.event["timestamp"],
self.event["callvalue"],
self.event["data"],
)
if "nonce" not in overrides:
overrides["nonce"] = self.parent_signer.get_nonce()
if "chainId" not in overrides:
overrides["chainId"] = self.parent_signer.provider.eth.chain_id
tx = execute_tx.build_transaction(overrides)
if "gas" not in tx:
gas_estimate = self.parent_signer.provider.eth.estimate_gas(tx)
tx["gas"] = gas_estimate
if "gasPrice" not in tx:
if "maxPriorityFeePerGas" in tx or "maxFeePerGas" in tx:
pass
else:
tx["gasPrice"] = self.parent_signer.provider.eth.gas_price
signed_tx = self.parent_signer.account.sign_transaction(tx)
tx_hash = self.parent_signer.provider.eth.send_raw_transaction(signed_tx.rawTransaction)
tx_receipt = self.parent_signer.provider.eth.wait_for_transaction_receipt(tx_hash)
return tx_receipt