Skip to content

Commit 4e3241f

Browse files
committed
Merge branch 'main_loop with single-threaded websocket'
* Branch commit log: ase/websocket.cc: use single-threaded POSIX socket + main_loop for WebSockets * ase/jsonapi.cc: adjust message() handling for single-threaded WebSockets ase/server.cc: fix websocket tear-down crash in TelemetryPlan, keep it alive via shared_ptr
2 parents 6acb293 + 88b9e45 commit 4e3241f

File tree

3 files changed

+265
-104
lines changed

3 files changed

+265
-104
lines changed

ase/jsonapi.cc

Lines changed: 9 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -81,34 +81,16 @@ class JsonapiConnection : public WebSocketConnection, public CustomDataContainer
8181
{
8282
JsonapiConnectionP conp = std::dynamic_pointer_cast<JsonapiConnection> (shared_from_this());
8383
assert_return (conp);
84-
struct MessageData { // keep arguments alive, even after __func__ returns
85-
ScopedSemaphore sem;
86-
String message, reply;
87-
JsonapiConnectionP conp;
88-
MessageData (JsonapiConnectionP c, const String &m) : message (m), conp (c) {}
89-
};
90-
auto data = std::make_shared<MessageData> (conp, message);
91-
main_jobs += [data, this] () {
92-
if (main_loop->has_quit())
93-
return;
94-
current_message_conection = data->conp;
95-
data->reply = this->handle_jsonipc (data->message);
84+
String reply;
85+
if (!main_loop->has_quit()) {
86+
current_message_conection = conp;
87+
reply = this->handle_jsonipc (message);
9688
current_message_conection = nullptr;
97-
data->sem.post();
98-
};
99-
nickname(); // cache socket nickname for use during errors
100-
// wait with timeout, checking if main loop has quit to avoid deadlock on shutdown
101-
const uint64_t timeout_us = 50 * 1000;
102-
do {
103-
const int ret = data->sem.wait_for (timeout_us);
104-
if (ret == 0)
105-
break; // semaphore was posted, job completed successfully
106-
if (data->reply.empty() && main_loop->has_quit())
107-
data->reply = "{id:0,error:{code:-32601,message:\"Method not found: endpoint shutting down\"}}\n";
108-
} while (data->reply.empty());
109-
// when queueing asynchronously, we have to use WebSocketConnectionP
110-
if (!data->reply.empty())
111-
send_text (data->reply);
89+
}
90+
else
91+
reply = "{id:0,error:{code:-32601,message:\"Method not found: endpoint shutting down\"}}\n";
92+
if (!reply.empty())
93+
send_text (reply);
11294
}
11395
String handle_jsonipc (const std::string &message);
11496
std::vector<JsTrigger> triggers_; // HINT: use unordered_map if this becomes slow

ase/server.cc

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -456,7 +456,7 @@ validate_telemetry_segments (const TelemetrySegmentS &segments, size_t *payloadl
456456
}
457457

458458
ASE_CLASS_DECLS (TelemetryPlan);
459-
class TelemetryPlan {
459+
class TelemetryPlan : public std::enable_shared_from_this<TelemetryPlan> {
460460
public:
461461
int32 interval_ms_ = -1;
462462
LoopID timerid_ = LoopID::INVALID;
@@ -503,7 +503,8 @@ TelemetryPlan::setup (const char *start, size_t payloadlength, const TelemetrySe
503503
{
504504
if (timerid_ != LoopID::INVALID)
505505
main_loop->cancel (timerid_);
506-
auto send_telemetry = [this] () { this->send_telemetry(); return true; };
506+
auto tplan = shared_from_this();
507+
auto send_telemetry = [tplan] () { tplan->send_telemetry(); return true; };
507508
interval_ms_ = interval_ms;
508509
timerid_ = interval_ms <= 0 || segments.empty() ? LoopID::INVALID : main_loop->add (send_telemetry, std::chrono::milliseconds (interval_ms));
509510
}
@@ -524,21 +525,27 @@ TelemetryPlan::setup (const char *start, size_t payloadlength, const TelemetrySe
524525
void
525526
TelemetryPlan::send_telemetry ()
526527
{
528+
// Safety check: don't send if telemetry memory or segments are invalid
529+
if (!telemem_ || segments_.empty() || payload_.empty())
530+
return;
527531
char *data = &payload_[0];
528532
size_t datapos = 0;
529533
for (const auto &seg : segments_) // offsets and lengths were validated earlier
530534
{
531535
memcpy (data + datapos, telemem_ + seg.offset, seg.length);
532536
datapos += seg.length;
533537
}
534-
send_blob_ (payload_);
538+
// send_blob_ handles the case where the connection is closed (returns false)
539+
(void) send_blob_ (payload_);
535540
}
536541

537542
TelemetryPlan::~TelemetryPlan()
538543
{
539544
if (timerid_ != LoopID::INVALID)
540545
{
541-
main_loop->cancel (timerid_);
546+
// Only cancel if the loop is still running
547+
if (!main_loop->has_quit())
548+
main_loop->cancel (timerid_);
542549
timerid_ = LoopID::INVALID;
543550
}
544551
}

0 commit comments

Comments
 (0)