Skip to content

Commit b29eb37

Browse files
committed
📌 Nodepp | worker.h update | V1.4.2 📌
1 parent 89982e2 commit b29eb37

6 files changed

Lines changed: 180 additions & 295 deletions

File tree

include/nodepp/evloop.h

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,15 +16,24 @@
1616

1717
namespace nodepp { namespace process {
1818

19-
kernel_t& NODEPP_EV_LOOP(){ thread_local static kernel_t evloop; return evloop; }
19+
kernel_t& NODEPP_EV_LOOP(){ thread_local static kernel_t out; return out; }
20+
invoke_t& NODEPP_INVOKE (){ thread_local static invoke_t out; return out; }
2021

2122
/*─······································································─*/
2223

2324
template< class... T >
24-
void await( const T&... args ){ while(NODEPP_EV_LOOP().await( args... )==1){/*unused*/} }
25+
void revoke( const T&... args ){ NODEPP_INVOKE().off( args... ); }
26+
27+
template< class... T >
28+
int call( const T&... args ){ return NODEPP_INVOKE().emit( args... ); }
29+
30+
template< class... T >
31+
string_t invoke( const T&... args ){ return NODEPP_INVOKE().add( args... ); }
32+
33+
/*─······································································─*/
2534

2635
template< class... T >
27-
ptr_t<task_t> foop( const T&... args ){ return NODEPP_EV_LOOP().loop_add( args... ); }
36+
void await( const T&... args ){ while(NODEPP_EV_LOOP().await( args... )==1){/*unused*/} }
2837

2938
template< class... T >
3039
ptr_t<task_t> loop( const T&... args ){ return NODEPP_EV_LOOP().loop_add( args... ); }

include/nodepp/import.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,7 @@
6464
#include "os.h"
6565
#include "query.h"
6666
#include "kernel.h"
67+
#include "invoke.h"
6768

6869
/*────────────────────────────────────────────────────────────────────────────*/
6970

include/nodepp/invoke.h

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
/*
2+
* Copyright 2023 The Nodepp Project Authors. All Rights Reserved.
3+
*
4+
* Licensed under the MIT (the "License"). You may not use
5+
* this file except in compliance with the License. You can obtain a copy
6+
* in the file LICENSE in the source distribution or at
7+
* https://github.com/NodeppOfficial/nodepp/blob/main/LICENSE
8+
*/
9+
10+
/*────────────────────────────────────────────────────────────────────────────*/
11+
12+
#ifndef NODEPP_DMA_INVOKE
13+
#define NODEPP_DMA_INVOKE
14+
15+
/*────────────────────────────────────────────────────────────────────────────*/
16+
17+
namespace nodepp { class invoke_t {
18+
protected:
19+
20+
using NODE_CLB = function_t<int,any_t>;
21+
struct NODE {
22+
queue_t<NODE_CLB> queue;
23+
}; ptr_t<NODE> obj;
24+
25+
public:
26+
27+
~invoke_t() { if( obj.count() > 1 ){ return; } free(); }
28+
invoke_t() : obj( new NODE() ){}
29+
30+
/*─······································································─*/
31+
32+
ulong size () const noexcept { return obj->queue.size (); }
33+
void clear() const noexcept { /*--*/ obj->queue.clear(); }
34+
void free () const noexcept { /*--*/ obj->queue.clear(); }
35+
36+
/*─······································································─*/
37+
38+
int emit( string_t address, any_t value ) const noexcept {
39+
if( address.empty() ) /*-------------*/ { return -1; }
40+
auto mem = obj->queue.as( string::to_addr( address ) );
41+
if( mem == nullptr ) /*--------------*/ { return -1; }
42+
int c = mem->data.emit( value );
43+
if( c==-1 ){ off( address ); }
44+
return c; }
45+
46+
int off( string_t address ) const noexcept {
47+
if( address.empty() ) /*-------------*/ { return -1; }
48+
auto mem = obj->queue.as( string::to_addr( address ) );
49+
if( mem == nullptr ) /*--------------*/ { return -1; }
50+
memset( address.get(), 0, address.size() );
51+
obj->queue.erase(mem); return 1;
52+
}
53+
54+
string_t add( NODE_CLB callback ) const noexcept {
55+
obj->queue.push ( callback );
56+
void* ID = obj->queue.last();
57+
return string::to_string( ID ); }
58+
59+
}; }
60+
61+
/*────────────────────────────────────────────────────────────────────────────*/
62+
63+
#endif
64+
65+
/*────────────────────────────────────────────────────────────────────────────*/

include/nodepp/posix/worker.h

Lines changed: 34 additions & 133 deletions
Original file line numberDiff line numberDiff line change
@@ -14,13 +14,12 @@
1414

1515
/*────────────────────────────────────────────────────────────────────────────*/
1616

17-
#ifndef NODEPP_POLL_NPOLL
18-
19-
/*────────────────────────────────────────────────────────────────────────────*/
20-
2117
namespace nodepp { class worker_t {
2218
private:
2319

20+
mutex_t& get_mutex () const noexcept { static mutex_t out; return out; }
21+
invoke_t& get_invoker() const noexcept { static invoke_t out; return out; }
22+
2423
enum STATE {
2524
WK_STATE_UNKNOWN = 0b00000000,
2625
WK_STATE_OPEN = 0b00000001,
@@ -31,28 +30,25 @@ namespace nodepp { class worker_t {
3130
protected:
3231

3332
struct NODE {
34-
pthread_t id;
33+
pthread_t id;
34+
string_t addr;
3535
atomic_t<char> state;
36-
ptr_t<kernel_t> krn;
3736
function_t<int> cb;
3837
}; ptr_t<NODE> obj;
3938

4039
static void* callback( void* arg ){
40+
4141
auto self = type::cast<worker_t>(arg);
4242
self->obj->state=STATE::WK_STATE_OPEN;
4343

4444
while( !self->is_closed() && self->obj->cb()>=0 ){
4545
auto info = coroutine::getno();
4646

4747
if( info.delay>0 ){
48-
worker::delay( info.delay );
49-
} else {
50-
worker::yield();
51-
}}
48+
worker::delay( info.delay );
49+
} else { worker::yield(); }}
5250

53-
self->obj->state = STATE::WK_STATE_CLOSE;
54-
self->obj->krn->emit(); /**/ delete self;
55-
worker::exit(); return nullptr; }
51+
self->free(); worker::exit(); return nullptr; }
5652

5753
public:
5854

@@ -64,13 +60,18 @@ namespace nodepp { class worker_t {
6460

6561
/*─······································································─*/
6662

63+
~worker_t() noexcept { if( obj.count()>1 ){ return; } free(); }
64+
6765
worker_t() noexcept : obj( new NODE ) {}
6866

69-
~worker_t() noexcept { if( obj.count()>1 ){ return; } free(); }
67+
void free() const noexcept {
68+
if( obj->state.get() == 0x00 ) /*----------*/ { return; }
69+
if( obj->state.get() & STATE::WK_STATE_CLOSE ){ return; }
70+
get_invoker().emit( obj->addr, nullptr );
71+
}
7072

7173
/*─······································································─*/
7274

73-
void free() const noexcept { obj->state = STATE::WK_STATE_AWAIT; }
7475
void off() const noexcept { obj->state = STATE::WK_STATE_AWAIT; }
7576
void close() const noexcept { obj->state = STATE::WK_STATE_AWAIT; }
7677

@@ -83,12 +84,17 @@ namespace nodepp { class worker_t {
8384

8485
/*─······································································─*/
8586

86-
int emit() const noexcept {
87-
if( obj->state != 0x00 ){ return 0; }
87+
int emit() const noexcept { if( obj->state != 0x00 ){ return 0; }
8888

89-
obj->krn = type::bind( process::NODEPP_EV_LOOP() );
89+
auto krn = type::bind( process::NODEPP_EV_LOOP() );
90+
auto self= type::bind( this );
9091

91-
auto pth = pthread_create( &obj->id, NULL, &callback, (void*) new worker_t(*this) );
92+
obj->addr= get_invoker().add([=]( any_t ){ self->get_mutex().lock([=](){
93+
self->obj->state = STATE::WK_STATE_CLOSE;
94+
self->get_invoker().off( self->obj->addr );
95+
krn->emit(); }); return -1; });
96+
97+
auto pth = pthread_create( &obj->id, NULL, &callback, (void*) &self );
9298
if ( pth!= 0 ){ return -1; } pthread_detach( obj->id );
9399

94100
while( obj->state == 0x00 ) { /*unused*/ }
@@ -101,133 +107,28 @@ namespace nodepp { class worker_t {
101107

102108
/*─······································································─*/
103109

104-
int await() const noexcept { if( obj->state != 0x00 ){ return 0; }
105-
106-
obj->krn = type::bind( process::NODEPP_EV_LOOP() );
107-
108-
auto pth = pthread_create( &obj->id, NULL, &callback, (void*) new worker_t(*this) );
109-
if ( pth!= 0 ){ return -1; } pthread_detach( obj->id );
110-
111-
while( obj->state.get() ==0x00 ) /*----------*/ { /*-- unused --*/ }
112-
while( obj->state.get() & STATE::WK_STATE_OPEN ){ process::next(); }
110+
int await() const noexcept { if( obj->state != 0x00 ){ return 0; }
113111

114-
return 1; }
115-
116-
};}
117-
118-
/*────────────────────────────────────────────────────────────────────────────*/
119-
120-
#else
121-
122-
/*────────────────────────────────────────────────────────────────────────────*/
123-
124-
namespace nodepp { class worker_t {
125-
private:
126-
127-
enum STATE {
128-
WK_STATE_UNKNOWN = 0b00000000,
129-
WK_STATE_OPEN = 0b00000001,
130-
WK_STATE_CLOSE = 0b00000010,
131-
WK_STATE_AWAIT = 0b00000111,
132-
};
133-
134-
protected:
135-
136-
struct NODE {
137-
pthread_t id;
138-
atomic_t<char> state;
139-
ptr_t<kernel_t> krn;
140-
function_t<int> cb;
141-
}; ptr_t<NODE> obj;
112+
auto krn = type::bind( process::NODEPP_EV_LOOP() );
113+
auto self= type::bind( this );
142114

143-
static void* callback( void* arg ){
144-
auto self = type::cast<worker_t>(arg);
145-
self->obj->state=STATE::WK_STATE_OPEN;
115+
obj->addr= get_invoker().add([=]( any_t ){ self->get_mutex().lock([=](){
116+
self->obj->state = STATE::WK_STATE_CLOSE;
117+
self->get_invoker().off( self->obj->addr );
118+
krn->emit(); }); return -1; });
146119

147-
while( !self->is_closed() && self->obj->cb()>=0 ){
148-
auto info = coroutine::getno();
149-
150-
if( info.delay>0 ){
151-
worker::delay( info.delay );
152-
} else {
153-
worker::yield();
154-
}}
155-
156-
self->obj->state = STATE::WK_STATE_CLOSE;
157-
worker::exit(); return nullptr; }
158-
159-
public:
160-
161-
template< class T, class... V >
162-
worker_t( T cb, const V&... arg ) noexcept : obj( new NODE() ){
163-
auto clb = type::bind(cb);
164-
obj->cb = function_t<int>([=](){ return (*clb)(arg...); });
165-
}
166-
167-
/*─······································································─*/
168-
169-
worker_t() noexcept : obj( new NODE ) {}
170-
171-
~worker_t() noexcept { if( obj.count()>1 ){ return; } free(); }
172-
173-
/*─······································································─*/
174-
175-
void free() const noexcept { obj->state = STATE::WK_STATE_AWAIT; }
176-
void off() const noexcept { obj->state = STATE::WK_STATE_AWAIT; }
177-
void close() const noexcept { obj->state = STATE::WK_STATE_AWAIT; }
178-
179-
/*─······································································─*/
180-
181-
bool is_closed() const noexcept {
182-
char x = obj->state.get();
183-
return ( x & STATE::WK_STATE_CLOSE ) || x==0x00 ;
184-
}
185-
186-
/*─······································································─*/
187-
188-
int emit() const noexcept {
189-
190-
if( obj->state != 0x00 ) { return 0; } auto self = type::bind( this );
191120
auto pth = pthread_create( &obj->id, NULL, &callback, (void*) &self );
192121
if ( pth!= 0 ){ return -1; } pthread_detach( obj->id );
193-
194-
process::add( coroutine::add( COROUTINE(){
195-
coBegin
196-
while( self->obj->state.get() ==0x00 )/*-*/{ coNext; }
197-
while( self->obj->state.get() & STATE::WK_STATE_OPEN )
198-
{ coDelay( 1000 ); }
199-
coFinish
200-
}));
201-
202-
return 1; }
203-
204-
/*─······································································─*/
205122

206-
int add() const noexcept { return emit(); }
207-
208-
/*─······································································─*/
209-
210-
int await() const noexcept {
211-
212-
if( obj->state != 0x00 ) { return 0; } auto self = type::bind( this );
213-
auto pth = pthread_create( &obj->id, NULL, &callback, (void*) &self );
214-
if ( pth!= 0 ){ return -1; } pthread_detach( obj->id );
215-
216-
process::await( coroutine::add( COROUTINE(){
217-
coBegin
218-
while( self->obj->state.get() ==0x00 )/*-*/{ coNext; }
219-
while( self->obj->state.get() & STATE::WK_STATE_OPEN )
220-
{ coDelay( 1000 ); }
221-
coFinish
222-
}));
123+
while( obj->state.get() ==0x00 ) /*----------*/ { /*-- unused --*/ }
124+
while( obj->state.get() & STATE::WK_STATE_OPEN ){ process::next(); }
223125

224126
return 1; }
225127

226128
};}
227129

228130
/*────────────────────────────────────────────────────────────────────────────*/
229131

230-
#endif
231132
#endif
232133

233134
/*────────────────────────────────────────────────────────────────────────────*/

0 commit comments

Comments
 (0)