Skip to content

Commit acc1a15

Browse files
committed
Call process function with the trait instance
1 parent d1a23a1 commit acc1a15

2 files changed

Lines changed: 17 additions & 11 deletions

File tree

examples/repeat.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,12 +18,14 @@ impl ParticipantTrait for Repeat {
1818

1919
}
2020

21-
fn process(input: Vec<u8>) -> Result<Vec<u8>, Vec<u8>> {
21+
unsafe impl Sync for Repeat {} // FIXME: figure out how to avoid
22+
23+
fn process(r: &ParticipantTrait, input: Vec<u8>) -> Result<Vec<u8>, Vec<u8>> {
2224
println!("repeat process():");
2325
return Ok(input);
2426
}
2527

2628
fn main() {
27-
let r = Repeat { state: None };
29+
static r: Repeat = Repeat { state: None };
2830
msgflo::participant::main(&r, process);
2931
}

src/participant.rs

Lines changed: 13 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -64,18 +64,16 @@ impl InfoBuilder {
6464
}
6565
}
6666

67-
68-
type SendFunction = fn(String, Vec<u8>);
69-
pub type ProcessFunction = fn(Vec<u8>) -> Result<Vec<u8>, Vec<u8>>;
70-
71-
pub trait ParticipantTrait {
67+
pub trait ParticipantTrait : Sync {
7268
fn info(&self) -> Info;
7369
// fn process(&self, Vec<u8>) -> Result<Vec<u8>, Vec<u8>>;
7470
}
71+
pub type ProcessFunction = fn(&ParticipantTrait, Vec<u8>) -> Result<Vec<u8>, Vec<u8>>;
7572

7673
struct Participant {
7774
info: Info,
78-
process: ProcessFunction
75+
process: ProcessFunction,
76+
_trait: & 'static ParticipantTrait,
7977
}
8078

8179
struct Connection {
@@ -105,10 +103,15 @@ fn send_discovery(channel: &mut Channel, info: &Info) {
105103

106104
struct PortConsumer {
107105
process: ProcessFunction,
106+
participant: & 'static ParticipantTrait,
108107
portname: String,
109108
outqueue: String, // FIXME: allow sending on any port, also multiple times
110109
}
111110

111+
//unsafe impl Sync for PortConsumer {}
112+
//unsafe impl Send for ParticipantTrait {}
113+
//unsafe impl Sync for ParticipantTrait {}
114+
112115
fn send_out(channel: &mut Channel, exchange: String, data: Vec<u8>) {
113116

114117
let routing_key = "".to_string();
@@ -128,7 +131,7 @@ impl Consumer for PortConsumer {
128131

129132
debug!("calling process()");
130133
let f = self.process;
131-
let res = f(body);
134+
let res = f(self.participant, body);
132135
debug!("process() returned");
133136

134137
if res.is_ok() {
@@ -149,6 +152,7 @@ fn setup_inport(participant: &Participant, port: &Port, connection: &mut Connect
149152

150153
let consumer = PortConsumer {
151154
process: participant.process,
155+
participant: participant._trait,
152156
portname: port.id.to_string(),
153157
outqueue: participant.info.outports[0].queue.to_string(),
154158
};
@@ -271,15 +275,15 @@ fn parse(options: &mut Options) {
271275
// XXX: seems rust-amqp makes program hangs forever if error occurs / channel is borked?
272276
// TODO: pass port info in/out of process()
273277
// TODO: nicer way to declare ports? ideally they are enums not stringly typed?
274-
pub fn main(orig: &ParticipantTrait, func: ProcessFunction) {
278+
pub fn main(orig: & 'static ParticipantTrait, func: ProcessFunction) {
275279

276280
let mut options = Options { .. Default::default() };
277281
parse(&mut options);
278282

279283
let i : Info = orig.info();
280284
let info = normalize_info(&i, &options);
281285

282-
let p = Participant { info: info, process: func }; // XXX: hack
286+
let p = Participant { info: info, process: func, _trait: orig }; // XXX: hack
283287

284288
let mut c = start_participant(&p, &options);
285289
println!("{}({}) started", &p.info.role, &p.info.component);

0 commit comments

Comments
 (0)