Skip to content

Commit 3005269

Browse files
committed
11.10.4.7
1 parent 549ac3a commit 3005269

3 files changed

Lines changed: 208 additions & 2 deletions

File tree

src/issue_delivery_worker.rs

Lines changed: 171 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,171 @@
1+
use std::time::{self, Duration};
2+
use crate::domain::SubscriberEmail;
3+
use crate::email_client::{self, EmailClient};
4+
use crate::{configuration::Settings, startup::get_connection_pool};
5+
6+
use sqlx::{Executor, PgPool, Postgres, Transaction};
7+
use tracing::{field::display, Span};
8+
use uuid::Uuid;
9+
10+
enum ExecutionOutcome {
11+
TaskCompleted,
12+
EmptyQueue,
13+
}
14+
15+
#[tracing::instrument(
16+
skip_all,
17+
fields(
18+
newsletter_issue_id=tracing::field::Empty,
19+
subscriber_email=tracing::field::Empty,
20+
),
21+
err
22+
)]
23+
async fn try_execute_task(pool: &PgPool, email_client: &EmailClient) -> Result<ExecutionOutcome, anyhow::Error> {
24+
let task = dequeue_task(pool).await?;
25+
if task.is_none() {
26+
return Ok(ExecutionOutcome::EmptyQueue)
27+
}
28+
let (transaction, issue_id, email) = task.unwrap();
29+
Span::current()
30+
.record("newsletter_issue_id", &display(issue_id))
31+
.record("subscriber_email", &display(&email));
32+
match SubscriberEmail::parse(email.clone()) {
33+
Ok(email) => {
34+
let issue = get_issue(pool, issue_id).await?;
35+
if let Err(e) = email_client
36+
.send_email(
37+
&email,
38+
&issue.title,
39+
&issue.html_content,
40+
&issue.text_content,
41+
)
42+
.await
43+
{
44+
tracing::error!(
45+
error.cause_chain = ?e,
46+
error.message = %e,
47+
"Failed to deliver issue to a confirmed subscriber. \
48+
Skipping.",
49+
);
50+
}
51+
}
52+
Err(e) => {
53+
tracing::error!(
54+
error.cause_chain = ?e,
55+
error.message = %e,
56+
"Skipping a confirmed subscriber. \
57+
Their stored contact details are invalid.",
58+
);
59+
}
60+
}
61+
delete_task(transaction, issue_id, &email).await?;
62+
Ok(ExecutionOutcome::TaskCompleted)
63+
}
64+
65+
type PgTransaction = Transaction<'static, Postgres>;
66+
67+
#[tracing::instrument(skip_all)]
68+
async fn dequeue_task(
69+
pool: &PgPool,
70+
) -> Result<Option<(PgTransaction, Uuid, String)>, anyhow::Error> {
71+
let mut transaction = pool.begin().await?;
72+
let r = sqlx::query!(
73+
r#"
74+
SELECT newsletter_issue_id, subscriber_email
75+
FROM issue_delivery_queue
76+
FOR UPDATE
77+
SKIP LOCKED
78+
LIMIT 1
79+
"#,
80+
)
81+
.fetch_optional(&mut *transaction)
82+
.await?;
83+
if let Some(r) = r {
84+
Ok(Some((
85+
transaction,
86+
r.newsletter_issue_id,
87+
r.subscriber_email,
88+
)))
89+
} else {
90+
Ok(None)
91+
}
92+
}
93+
94+
async fn delete_task(
95+
mut transaction: PgTransaction,
96+
issue_id: Uuid,
97+
email: &str,
98+
) -> Result<(), anyhow::Error> {
99+
let query = sqlx::query!(
100+
r#"
101+
DELETE FROM issue_delivery_queue
102+
WHERE
103+
newsletter_issue_id = $1 AND
104+
subscriber_email = $2
105+
"#,
106+
issue_id,
107+
email
108+
);
109+
transaction.execute(query).await?;
110+
transaction.commit().await?;
111+
Ok(())
112+
}
113+
114+
struct NewsletterIssue {
115+
title: String,
116+
text_content: String,
117+
html_content: String,
118+
}
119+
120+
#[tracing::instrument(skip_all)]
121+
async fn get_issue(pool: &PgPool, issue_id: Uuid) -> Result<NewsletterIssue, anyhow::Error> {
122+
let issue = sqlx::query_as!(
123+
NewsletterIssue,
124+
r#"
125+
SELECT title, text_content, html_content
126+
FROM newsletter_issues
127+
WHERE
128+
newsletter_issue_id = $1
129+
"#,
130+
issue_id
131+
)
132+
.fetch_one(pool)
133+
.await?;
134+
Ok(issue)
135+
}
136+
137+
async fn worker_loop(
138+
pool: PgPool,
139+
email_client: EmailClient
140+
) -> Result<(), anyhow::Error> {
141+
loop {
142+
match try_execute_task(&pool, &email_client).await {
143+
Ok(ExecutionOutcome::EmptyQueue) => {
144+
tokio::time::sleep(Duration::from_secs(10)).await;
145+
}
146+
Err(_) => {
147+
tokio::time::sleep(Duration::from_secs(1)).await;
148+
}
149+
Ok(ExecutionOutcome::TaskCompleted) => {}
150+
}
151+
}
152+
}
153+
154+
pub async fn run_worker_until_stopped(
155+
configuration: Settings
156+
) -> Result<(), anyhow::Error> {
157+
let connection_pool = get_connection_pool(&configuration.database);
158+
159+
let sender_email = configuration
160+
.email_client
161+
.sender()
162+
.expect("Invalid sender email address.");
163+
let timeout = configuration.email_client.timeout();
164+
let email_client = EmailClient::new(
165+
configuration.email_client.base_url,
166+
sender_email,
167+
configuration.email_client.authorization_token,
168+
timeout
169+
);
170+
worker_loop(connection_pool, email_client).await
171+
}

src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ pub mod configuration;
33
pub mod domain;
44
pub mod email_client;
55
pub mod idempotency;
6+
pub mod issue_delivery_worker;
67
pub mod routes;
78
pub mod session_state;
89
pub mod startup;

src/main.rs

Lines changed: 36 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,7 @@
1+
use std::fmt::{Debug, Display};
2+
use tokio::task::JoinError;
13
use zero2prod::configuration::get_configuration;
4+
use zero2prod::issue_delivery_worker::run_worker_until_stopped;
25
use zero2prod::startup::Application;
36
use zero2prod::telemetry::{get_subscriber, init_subscriber};
47

@@ -8,7 +11,38 @@ async fn main() -> anyhow::Result<()> {
811
init_subscriber(subscriber);
912

1013
let configuration = get_configuration().expect("Failed to read configuration.");
11-
let application = Application::build(configuration).await?;
12-
application.run_until_stopped().await?;
14+
let application = Application::build(configuration.clone()).await?;
15+
let application_task = tokio::spawn(application.run_until_stopped());
16+
let worker_task = tokio::spawn(run_worker_until_stopped(configuration));
17+
18+
tokio::select! {
19+
o = application_task => report_exit("API", o),
20+
o = worker_task => report_exit("Background worker", o),
21+
};
22+
1323
Ok(())
1424
}
25+
26+
fn report_exit(task_name: &str, outcome: Result<Result<(), impl Debug + Display>, JoinError>) {
27+
match outcome {
28+
Ok(Ok(())) => {
29+
tracing::info!("{} has exited", task_name)
30+
}
31+
Ok(Err(e)) => {
32+
tracing::error!(
33+
error.cause_chain = ?e,
34+
error.message = %e,
35+
"{} failed",
36+
task_name
37+
)
38+
}
39+
Err(e) => {
40+
tracing::error!(
41+
error.cause_chain = ?e,
42+
error.message = %e,
43+
"{}' task failed to complete",
44+
task_name
45+
)
46+
}
47+
}
48+
}

0 commit comments

Comments
 (0)