Skip to content
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/coverage-twmq.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ jobs:

# Run coverage with tarpaulin
- name: Run coverage
run: cargo tarpaulin -p twmq --skip-clean --out Xml --out Html --output-dir coverage --exclude-files "aa-core/*" --exclude-files "core/*" --exclude-files "server/*" --exclude-files "thirdweb-core/*" --exclude-files "executors/*"
run: cargo tarpaulin -p twmq --skip-clean --timeout 300 --out Xml --out Html --output-dir coverage --exclude-files "aa-core/*" --exclude-files "core/*" --exclude-files "server/*" --exclude-files "thirdweb-core/*" --exclude-files "executors/*"

# Upload coverage to Codecov
# TODO: Uncomment once we have open-sourced the repo
Expand Down
14 changes: 2 additions & 12 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -98,17 +98,7 @@ config = "0.15.11"
aws-arn = "0.3.1"

# Redis
redis = { version = "0.31.0", features = ["tokio-comp", "connection-manager", "cluster", "cluster-async", "tls-rustls", "tokio-rustls-comp"] }
redis = { version = "0.31.0", features = ["tokio-comp", "connection-manager"] }

# Dev dependencies
criterion = { version = "0.6", features = ["html_reports", "async_tokio"] }

# Rustls
#
# NOTE: rustls 0.23 requires selecting exactly one process-wide crypto provider
# (features: `ring` or `aws_lc_rs` / `aws-lc-rs`). Some dependency graphs (e.g. via
# redis-rs' rustls integration) can end up with *no* provider enabled, which causes a
# runtime panic when building TLS client/server configs.
#
# We explicitly enable the `ring` provider here to make TLS work reliably.
rustls = { version = "0.23.32", default-features = false, features = ["std", "ring"] }
criterion = { version = "0.6", features = ["html_reports", "async_tokio"] }
9 changes: 0 additions & 9 deletions core/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,6 @@ impl RpcCredentials {

Ok(header_map)
}

pub fn client_id_for_logs(&self) -> Option<&str> {
match self {
RpcCredentials::Thirdweb(ThirdwebAuth::ClientIdServiceKey(creds)) => {
Some(&creds.client_id)
}
RpcCredentials::Thirdweb(ThirdwebAuth::SecretKey(_)) => None,
}
}
}

pub trait Chain: Send + Sync {
Expand Down
53 changes: 6 additions & 47 deletions executors/src/eip7702_executor/confirm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,6 @@ use crate::{
},
};

const EIP7702_CONFIRM_QUEUE_ID: &str = "eip7702_confirm";

// --- Job Payload ---
#[derive(Serialize, Deserialize, Debug, Clone)]
#[serde(rename_all = "camelCase")]
Expand Down Expand Up @@ -170,7 +168,7 @@ where
type ErrorData = Eip7702ConfirmationError;
type JobData = Eip7702ConfirmationJobData;

#[tracing::instrument(skip(self, job), fields(transaction_id = job.job.id, chain_id = job.job.data.chain_id, client_id = ?job.job.data.rpc_credentials.client_id_for_logs(), queue_id = EIP7702_CONFIRM_QUEUE_ID, stage = Self::stage_name(), executor = Self::executor_name()))]
#[tracing::instrument(skip(self, job), fields(transaction_id = job.job.id, stage = Self::stage_name(), executor = Self::executor_name()))]
async fn process(
&self,
job: &BorrowedJob<Self::JobData>,
Expand Down Expand Up @@ -204,14 +202,7 @@ where
.await
.map_err(|e| {
tracing::error!(
transaction_id = %job_data.transaction_id,
chain_id = job_data.chain_id,
client_id = job_data
.rpc_credentials
.client_id_for_logs()
.unwrap_or("unknown"),
queue_id = EIP7702_CONFIRM_QUEUE_ID,
bundler_transaction_id = %job_data.bundler_transaction_id,
bundler_transaction_id = job_data.bundler_transaction_id,
sender_details = ?job_data.sender_details,
error = ?e,
"Failed to get transaction hash from bundler"
Expand Down Expand Up @@ -330,15 +321,7 @@ where
// Send webhook
if let Err(e) = self.queue_success_webhook(job, success_data, tx) {
tracing::error!(
transaction_id = %job.job.data.transaction_id,
chain_id = job.job.data.chain_id,
client_id = job
.job
.data
.rpc_credentials
.client_id_for_logs()
.unwrap_or("unknown"),
queue_id = EIP7702_CONFIRM_QUEUE_ID,
transaction_id = job.job.data.transaction_id,
error = ?e,
"Failed to queue success webhook"
);
Expand All @@ -363,15 +346,7 @@ where
if should_queue_webhook {
if let Err(e) = self.queue_nack_webhook(job, nack_data, tx) {
tracing::error!(
transaction_id = %job.job.data.transaction_id,
chain_id = job.job.data.chain_id,
client_id = job
.job
.data
.rpc_credentials
.client_id_for_logs()
.unwrap_or("unknown"),
queue_id = EIP7702_CONFIRM_QUEUE_ID,
transaction_id = job.job.data.transaction_id,
error = ?e,
"Failed to queue nack webhook"
);
Expand All @@ -395,30 +370,14 @@ where
.add_remove_command(tx.pipeline(), &job.job.data.transaction_id);

tracing::error!(
transaction_id = %job.job.data.transaction_id,
chain_id = job.job.data.chain_id,
client_id = job
.job
.data
.rpc_credentials
.client_id_for_logs()
.unwrap_or("unknown"),
queue_id = EIP7702_CONFIRM_QUEUE_ID,
transaction_id = job.job.data.transaction_id,
error = ?fail_data.error,
"EIP-7702 confirmation job failed"
);

if let Err(e) = self.queue_fail_webhook(job, fail_data, tx) {
tracing::error!(
transaction_id = %job.job.data.transaction_id,
chain_id = job.job.data.chain_id,
client_id = job
.job
.data
.rpc_credentials
.client_id_for_logs()
.unwrap_or("unknown"),
queue_id = EIP7702_CONFIRM_QUEUE_ID,
transaction_id = job.job.data.transaction_id,
error = ?e,
"Failed to queue fail webhook"
);
Expand Down
55 changes: 6 additions & 49 deletions executors/src/eip7702_executor/send.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,6 @@ use crate::{

use super::confirm::{Eip7702ConfirmationHandler, Eip7702ConfirmationJobData};

const EIP7702_SEND_QUEUE_ID: &str = "eip7702_send";
const EIP7702_CONFIRM_QUEUE_ID: &str = "eip7702_confirm";

// --- Job Payload ---
#[derive(Serialize, Deserialize, Debug, Clone)]
#[serde(rename_all = "camelCase")]
Expand Down Expand Up @@ -178,7 +175,7 @@ where
type ErrorData = Eip7702SendError;
type JobData = Eip7702SendJobData;

#[tracing::instrument(skip(self, job), fields(transaction_id = job.job.id, chain_id = job.job.data.chain_id, client_id = ?job.job.data.rpc_credentials.client_id_for_logs(), queue_id = EIP7702_SEND_QUEUE_ID, stage = Self::stage_name(), executor = Self::executor_name()))]
#[tracing::instrument(skip(self, job), fields(transaction_id = job.job.id, stage = Self::stage_name(), executor = Self::executor_name()))]
async fn process(
&self,
job: &BorrowedJob<Self::JobData>,
Expand Down Expand Up @@ -389,15 +386,7 @@ where

if let Err(e) = tx.queue_job(confirmation_job) {
tracing::error!(
transaction_id = %job.job.data.transaction_id,
chain_id = job.job.data.chain_id,
client_id = job
.job
.data
.rpc_credentials
.client_id_for_logs()
.unwrap_or("unknown"),
queue_id = EIP7702_CONFIRM_QUEUE_ID,
transaction_id = job.job.data.transaction_id,
error = ?e,
"Failed to enqueue confirmation job"
);
Expand All @@ -406,15 +395,7 @@ where
// Send webhook
if let Err(e) = self.queue_success_webhook(job, success_data, tx) {
tracing::error!(
transaction_id = %job.job.data.transaction_id,
chain_id = job.job.data.chain_id,
client_id = job
.job
.data
.rpc_credentials
.client_id_for_logs()
.unwrap_or("unknown"),
queue_id = EIP7702_SEND_QUEUE_ID,
transaction_id = job.job.data.transaction_id,
error = ?e,
"Failed to queue success webhook"
);
Expand All @@ -430,15 +411,7 @@ where
// Don't modify transaction registry on NACK - job will be retried
if let Err(e) = self.queue_nack_webhook(job, nack_data, tx) {
tracing::error!(
transaction_id = %job.job.data.transaction_id,
chain_id = job.job.data.chain_id,
client_id = job
.job
.data
.rpc_credentials
.client_id_for_logs()
.unwrap_or("unknown"),
queue_id = EIP7702_SEND_QUEUE_ID,
transaction_id = job.job.data.transaction_id,
error = ?e,
"Failed to queue nack webhook"
);
Expand All @@ -456,30 +429,14 @@ where
.add_remove_command(tx.pipeline(), &job.job.data.transaction_id);

tracing::error!(
transaction_id = %job.job.data.transaction_id,
chain_id = job.job.data.chain_id,
client_id = job
.job
.data
.rpc_credentials
.client_id_for_logs()
.unwrap_or("unknown"),
queue_id = EIP7702_SEND_QUEUE_ID,
transaction_id = job.job.data.transaction_id,
error = ?fail_data.error,
"EIP-7702 send job failed"
);

if let Err(e) = self.queue_fail_webhook(job, fail_data, tx) {
tracing::error!(
transaction_id = %job.job.data.transaction_id,
chain_id = job.job.data.chain_id,
client_id = job
.job
.data
.rpc_credentials
.client_id_for_logs()
.unwrap_or("unknown"),
queue_id = EIP7702_SEND_QUEUE_ID,
transaction_id = job.job.data.transaction_id,
error = ?e,
"Failed to queue fail webhook"
);
Expand Down
28 changes: 4 additions & 24 deletions executors/src/eoa/store/atomic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,7 @@ use alloy::{
consensus::{Signed, TypedTransaction},
primitives::Address,
};
use twmq::redis::{AsyncCommands, Pipeline};
use twmq::redis::cluster_async::ClusterConnection;
use twmq::redis::{AsyncCommands, Pipeline, aio::ConnectionManager};

use crate::{
eoa::{
Expand All @@ -31,7 +30,6 @@ use crate::{

const MAX_RETRIES: u32 = 10;
const RETRY_BASE_DELAY_MS: u64 = 10;
const EOA_QUEUE_ID: &str = "eoa_executor";

pub trait SafeRedisTransaction: Send + Sync {
type ValidationData;
Expand All @@ -45,7 +43,7 @@ pub trait SafeRedisTransaction: Send + Sync {
) -> Self::OperationResult;
fn validation(
&self,
conn: &mut ClusterConnection,
conn: &mut ConnectionManager,
store: &EoaExecutorStore,
) -> impl Future<Output = Result<Self::ValidationData, TransactionStoreError>> + Send;
fn watch_keys(&self) -> Vec<String>;
Expand Down Expand Up @@ -614,18 +612,7 @@ impl AtomicEoaExecutorStore {
&mut tx_context,
webhook_queue.clone(),
) {
tracing::error!(
transaction_id = %pending_transaction.transaction_id,
chain_id = pending_transaction.user_request.chain_id,
client_id = pending_transaction
.user_request
.rpc_credentials
.client_id_for_logs()
.unwrap_or("unknown"),
queue_id = EOA_QUEUE_ID,
"Failed to queue webhook for fail: {}",
e
);
tracing::error!("Failed to queue webhook for fail: {}", e);
}
}

Expand Down Expand Up @@ -707,13 +694,6 @@ impl AtomicEoaExecutorStore {
) {
tracing::error!(
transaction_id = %pending_transaction.transaction_id,
chain_id = pending_transaction.user_request.chain_id,
client_id = pending_transaction
.user_request
.rpc_credentials
.client_id_for_logs()
.unwrap_or("unknown"),
queue_id = EOA_QUEUE_ID,
error = ?e,
"Failed to queue webhook for batch fail"
);
Expand Down Expand Up @@ -835,7 +815,7 @@ impl SafeRedisTransaction for ResetNoncesTransaction<'_> {

async fn validation(
&self,
_conn: &mut ClusterConnection,
_conn: &mut ConnectionManager,
store: &EoaExecutorStore,
) -> Result<Self::ValidationData, TransactionStoreError> {
let now = chrono::Utc::now().timestamp_millis().max(0) as u64;
Expand Down
Loading
Loading