Skip to content

Commit 54b2cce

Browse files
add some kind of notification to clean up after error in subscriber thread
1 parent 6f2eb88 commit 54b2cce

4 files changed

Lines changed: 41 additions & 18 deletions

File tree

plugins/helix/include/helix/helix.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ namespace hal
4848
class Helix
4949
{
5050
public:
51-
static std::string channel;
51+
static const std::string channel;
5252

5353
Helix();
5454

plugins/helix/include/helix/plugin_helix.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@ namespace hal
7373

7474
void on_unload() override;
7575

76-
helix::Helix *get_helix();
76+
helix::Helix *get_helix() const;
7777

7878
private:
7979
GuiExtensionHelix *m_gui_extension;

plugins/helix/src/helix.cc

Lines changed: 38 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
#include <errno.h>
1010
#include <event2/event.h>
1111
#include <event2/thread.h>
12+
#include <future>
1213
#include <hiredis/adapters/libevent.h>
1314
#include <hiredis/async.h>
1415
#include <hiredis/hiredis.h>
@@ -31,7 +32,7 @@ namespace hal
3132
log_error( "helix", "could not connect to redis instance (async): {}", ctx->errstr );
3233
return;
3334
}
34-
log_debug( "helix", "connected to redis instance (async)" );
35+
log_info( "helix", "connected to redis instance (async)" );
3536
}
3637

3738
void disconnect_callback( const redisAsyncContext *ctx, int status )
@@ -42,7 +43,7 @@ namespace hal
4243
}
4344
else
4445
{
45-
log_debug( "helix", "disconnected from redis instance (async)" );
46+
log_info( "helix", "disconnected from redis instance (async)" );
4647
}
4748
}
4849

@@ -116,23 +117,27 @@ namespace hal
116117
void subscriber( const Netlist *netlist,
117118
redisAsyncContext *ctx,
118119
const std::vector<std::string> &channels,
119-
struct event_base *base )
120+
struct event_base *base,
121+
std::promise<bool> subscribe_promise )
120122
{
121123
if( netlist == nullptr )
122124
{
123125
log_error( "helix", "no netlist provided" );
126+
subscribe_promise.set_value( false );
124127
return;
125128
}
126129

127130
if( ctx == nullptr )
128131
{
129132
log_error( "helix", "no redis context provided" );
133+
subscribe_promise.set_value( false );
130134
return;
131135
}
132136

133137
if( base == nullptr )
134138
{
135139
log_error( "helix", "no event base provided" );
140+
subscribe_promise.set_value( false );
136141
return;
137142
}
138143

@@ -142,10 +147,13 @@ namespace hal
142147
!= REDIS_OK )
143148
{
144149
log_error( "helix", "async subscribe to channels {} failed", subscribe_command.substr( 10 ) );
150+
subscribe_promise.set_value( false );
145151
return;
146152
}
147153

148-
// TODO: signal main thread to cleanup resources if errors occurr
154+
// I just assume that the dispath won't fail
155+
subscribe_promise.set_value( true );
156+
149157
if( event_base_dispatch( base ) != 0 )
150158
{
151159
char *errstr = strerror( errno );
@@ -156,7 +164,7 @@ namespace hal
156164
} // namespace
157165

158166
Helix *Helix::inst = nullptr;
159-
std::string Helix::channel = "hal";
167+
const std::string Helix::channel = "hal";
160168

161169
Helix *Helix::instance()
162170
{
@@ -223,7 +231,9 @@ namespace hal
223231
{
224232
log_error(
225233
"helix", "redisAsyncConnect: {}", ( m_sctx ? m_sctx->errstr : "context allocation failed" ) );
226-
goto error;
234+
redisAsyncDisconnect( m_sctx );
235+
event_base_free( m_base );
236+
return;
227237
}
228238

229239
redisAsyncSetConnectCallback( m_sctx, connect_callback );
@@ -233,27 +243,40 @@ namespace hal
233243
if( m_pctx == nullptr || m_pctx->err )
234244
{
235245
log_error( "helix", "redisConnect: {}", ( m_pctx ? m_pctx->errstr : "context allocation failed" ) );
236-
goto error;
246+
redisAsyncDisconnect( m_sctx );
247+
event_base_free( m_base );
248+
redisFree( m_pctx );
249+
return;
237250
}
238251

239252
log_info( "helix", "connected to redis instance at {}:{}", host, port );
240253

241254
if( redisLibeventAttach( m_sctx, m_base ) != REDIS_OK )
242255
{
243256
log_error( "helix", "redisLibeventAttach: {}", m_sctx->errstr );
244-
goto error;
257+
redisAsyncDisconnect( m_sctx );
258+
event_base_free( m_base );
259+
redisFree( m_pctx );
260+
return;
261+
}
262+
263+
std::promise<bool> subscribe_promise;
264+
std::future<bool> subscribe_future = subscribe_promise.get_future();
265+
266+
m_subscriber = std::thread( subscriber, netlist, m_sctx, channels, m_base, std::move( subscribe_promise ) );
267+
268+
const bool success = subscribe_future.get();
269+
if( !success )
270+
{
271+
redisAsyncDisconnect( m_sctx );
272+
event_base_free( m_base );
273+
redisFree( m_pctx );
274+
return;
245275
}
246276

247-
m_subscriber = std::thread( subscriber, netlist, m_sctx, channels, m_base );
248277
m_is_running = true;
249278

250279
log_info( "helix", "started subscriber thread" );
251-
return;
252-
253-
error:
254-
redisAsyncDisconnect( m_sctx );
255-
event_base_free( m_base );
256-
redisFree( m_pctx );
257280
}
258281

259282
void Helix::stop()

plugins/helix/src/plugin_helix.cc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ namespace hal
6666
return retval;
6767
}
6868

69-
helix::Helix *HelixPlugin::get_helix()
69+
helix::Helix *HelixPlugin::get_helix() const
7070
{
7171
return m_helix;
7272
}

0 commit comments

Comments
 (0)