Skip to content

Commit acf245e

Browse files
committed
[update] llm_sys add sys_push and sys_pull code
1 parent db8d051 commit acf245e

2 files changed

Lines changed: 159 additions & 44 deletions

File tree

projects/llm_framework/main_sys/src/event_loop.cpp

Lines changed: 158 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,13 @@
1919
#include <iostream>
2020
#include <fstream>
2121
#include <thread>
22+
#include <fstream>
2223

2324
#include <dirent.h>
2425
#include <stdio.h>
2526
#include <stdlib.h>
27+
#include <sys/stat.h>
28+
#include <sys/types.h>
2629

2730
#include <list>
2831
#include <map>
@@ -37,6 +40,8 @@
3740
#include <simdjson.h>
3841
#include "hv/ifconfig.h"
3942

43+
#include "StackFlowUtil.h"
44+
4045
void usr_print_error(const std::string &request_id, const std::string &work_id, const std::string &error_msg,
4146
int zmq_out)
4247
{
@@ -270,64 +275,173 @@ int sys_lstask(int com_id, const nlohmann::json &json_obj)
270275
return out;
271276
}
272277

273-
int sys_push(int com_id, const nlohmann::json &json_obj)
278+
bool read_file(const std::string &path, std::vector<char> &data)
274279
{
275-
int out;
276-
nlohmann::json out_body;
277-
out_body["request_id"] = json_obj["request_id"];
278-
out_body["work_id"] = json_obj["work_id"];
279-
out_body["created"] = time(NULL);
280-
out_body["object"] = json_obj["object"];
281-
out_body["error"] = nlohmann::json::parse("{\"code\":-10, \"message\":\"Not available at the moment.\"}");
282-
283-
int stream = false;
284-
int stream_size = 512;
285-
std::string object = json_obj["object"];
286-
std::vector<std::string> object_fragment;
287-
std::string fragment;
288-
for (auto i = object.begin(); i != object.end(); i++) {
289-
if (*i == '.') {
290-
if (fragment.length()) object_fragment.push_back(fragment);
291-
fragment.clear();
292-
} else {
293-
fragment.push_back(*i);
294-
}
280+
std::fstream fs(path, std::ios::in | std::ios::binary);
281+
if (!fs.is_open()) {
282+
return false;
295283
}
296-
if (fragment.length()) object_fragment.push_back(fragment);
284+
fs.seekg(std::ios::end);
285+
auto fs_end = fs.tellg();
286+
fs.seekg(std::ios::beg);
287+
auto fs_beg = fs.tellg();
288+
auto file_size = static_cast<size_t>(fs_end - fs_beg);
289+
auto vector_size = data.size();
290+
data.reserve(vector_size + file_size);
291+
data.insert(data.end(), std::istreambuf_iterator<char>(fs), std::istreambuf_iterator<char>());
292+
fs.close();
293+
return true;
294+
}
297295

298-
if (object_fragment.size() >= 3) stream = object_fragment[2] == "stream" ? true : false;
299-
if (object_fragment.size() >= 4) stream_size = std::stoi(object_fragment[3]);
296+
bool dump_file(const std::string &path, char *data, int size)
297+
{
298+
std::fstream fs(path, std::ios::out | std::ios::binary);
299+
if (!fs.is_open() || fs.fail()) {
300+
fprintf(stderr, "[ERR] cannot open file %s \n", path.c_str());
301+
}
302+
fs.write(data, size);
303+
return true;
304+
}
300305

301-
static std::string bash_str;
306+
// sys.file./opt/data/test.txt
307+
// sys.stream.file./opt/data/test.txt
308+
// sys.base64.file./opt/data/test.bin
309+
// sys.base64.stream.file./opt/data.bin
310+
int sys_push(int com_id, const nlohmann::json &json_obj)
311+
{
312+
int out = 0;
313+
std::string object = json_obj["object"];
314+
bool stream = (object.find("stream") != std::string::npos);
315+
bool base64 = (object.find("base64") != std::string::npos);
316+
std::string file_path = object.substr(object.find("file") + 5);
317+
if (file_path[0] != '/') {
318+
usr_print_error(json_obj["request_id"], json_obj["work_id"], "{\"code\":-17, \"message\":\"file path error\"}",
319+
com_id);
320+
return out;
321+
}
322+
std::string file_dir = file_path.substr(0, file_path.rfind("/") + 1);
323+
file_dir += ".tmp_file";
324+
if (access(file_dir.c_str(), F_OK) != 0) {
325+
mkdir(file_dir.c_str(), 0777);
326+
}
327+
std::string file_panduan;
302328
if (stream) {
303-
static std::map<int, std::string> bash_delta;
304-
bash_delta[json_obj["data"]["index"]] = json_obj["data"]["delta"];
305-
if (!json_obj["data"]["finish"]) {
306-
out_body["data"]["index"] = json_obj["data"]["index"];
307-
zmq_com_send(com_id, out_body.dump());
329+
int index = json_obj["data"]["index"];
330+
bool finish = json_obj["data"]["finish"];
331+
std::string delta = json_obj["data"]["delta"];
332+
file_panduan = file_dir + std::string("/") + std::to_string(index) + std::string(".bin");
333+
dump_file(file_panduan, (char *)delta.c_str(), delta.length());
334+
if (!finish) {
308335
return out;
309-
} else {
310-
for (size_t i = 0; i < bash_delta.size(); i++) {
311-
bash_str += bash_delta[i];
312-
}
313-
bash_delta.clear();
314336
}
315337
} else {
316-
bash_str = json_obj["data"];
338+
file_panduan = file_dir + std::string("/") + std::string("0.bin");
339+
std::string data = json_obj["data"];
340+
dump_file(file_panduan, (char *)data.c_str(), data.length());
317341
}
318-
319-
sys_push_err_1:
320-
zmq_com_send(com_id, out_body.dump());
342+
std::string merah_cmd;
343+
if (base64) {
344+
merah_cmd = R"(find )";
345+
merah_cmd += file_dir;
346+
merah_cmd += R"( -maxdepth 1 -name "*.bin" | sort -n | xargs -i cat {} >> )";
347+
merah_cmd += file_dir + "/src.base64";
348+
// std::cout << "merah_cmd: " << merah_cmd << "\n" ;
349+
system(merah_cmd.c_str());
350+
merah_cmd = R"(base64 -d )";
351+
merah_cmd += file_dir + "/src.base64";
352+
merah_cmd += R"( > )";
353+
merah_cmd += file_path;
354+
// std::cout << "merah_cmd: " << merah_cmd << "\n" ;
355+
system(merah_cmd.c_str());
356+
} else {
357+
merah_cmd = R"(rm -f )";
358+
merah_cmd += file_path;
359+
system(merah_cmd.c_str());
360+
merah_cmd = R"(find )";
361+
merah_cmd += file_dir;
362+
merah_cmd += R"( -maxdepth 1 -name "*.bin" | sort -n | xargs -i cat {} >> )";
363+
merah_cmd += file_path;
364+
system(merah_cmd.c_str());
365+
}
366+
merah_cmd = R"(rm -rf )";
367+
merah_cmd += file_dir;
368+
system(merah_cmd.c_str());
369+
merah_cmd = R"(sha256sum )";
370+
merah_cmd += file_path;
371+
merah_cmd += "> /tmp";
372+
merah_cmd += file_path.substr(file_path.rfind("/") + 1);
373+
merah_cmd += ".base64_info";
374+
system(merah_cmd.c_str());
375+
merah_cmd = "/tmp";
376+
merah_cmd += file_path.substr(file_path.rfind("/") + 1);
377+
merah_cmd += ".base64_info";
378+
std::vector<char> base64_info;
379+
read_file(merah_cmd, base64_info);
380+
merah_cmd = std::string("rm -rf ") + merah_cmd;
381+
system(merah_cmd.c_str());
382+
std::string base64_info_str = std::string(base64_info.data(), 64);
383+
std::string error_str = "{\"code\":0, \"message\":\"sha256:";
384+
error_str += base64_info_str;
385+
error_str += "\"}";
386+
usr_print_error(json_obj["request_id"], json_obj["work_id"], error_str, com_id);
321387
return out;
322388
}
323389

324390
int sys_pull(int com_id, const nlohmann::json &json_obj)
325391
{
326-
int out;
327-
328-
sys_pull_err_1:
329-
usr_print_error(json_obj["request_id"], json_obj["work_id"],
330-
"{\"code\":-10, \"message\":\"Not available at the moment.\"}", com_id);
392+
int out = 0;
393+
std::string object = json_obj["object"];
394+
bool stream = (object.find("stream") != std::string::npos);
395+
std::string file_path = object.substr(object.find("file") + 5);
396+
if (access(file_path.c_str(), F_OK) != 0) {
397+
usr_print_error(json_obj["request_id"], json_obj["work_id"],
398+
"{\"code\":-17, \"message\":\"file does not exist.\"}", com_id);
399+
return -1;
400+
}
401+
std::vector<char> file_data;
402+
read_file(file_path, file_data);
403+
std::string base64_data;
404+
StackFlows::encode_base64(std::string(file_data.data(), file_data.size()), base64_data);
405+
if (stream) {
406+
int config_sys_stream_length;
407+
SAFE_READING(config_sys_stream_length, int, "config_sys_stream_length");
408+
int send_pos = 0;
409+
nlohmann::json out_body;
410+
out_body["request_id"] = json_obj["request_id"];
411+
out_body["work_id"] = "sys";
412+
out_body["created"] = time(NULL);
413+
out_body["object"] = std::string("sys.base64.stream");
414+
out_body["error"] = nlohmann::json::parse("{\"code\":0, \"message\":\"\"}");
415+
size_t i = 0;
416+
for (;;) {
417+
out_body["data"]["index"] = i;
418+
out_body["data"]["finish"] = false;
419+
if ((send_pos + config_sys_stream_length) < base64_data.size()) {
420+
out_body["data"]["delta"] = base64_data.substr(send_pos, config_sys_stream_length);
421+
send_pos += config_sys_stream_length;
422+
zmq_com_send(com_id, out_body.dump());
423+
} else {
424+
out_body["data"]["delta"] = base64_data.substr(send_pos);
425+
zmq_com_send(com_id, out_body.dump());
426+
i++;
427+
break;
428+
}
429+
i++;
430+
};
431+
out_body["data"]["index"] = i;
432+
out_body["data"]["finish"] = true;
433+
out_body["data"]["delta"] = std::string("");
434+
zmq_com_send(com_id, out_body.dump());
435+
} else {
436+
nlohmann::json out_body;
437+
out_body["request_id"] = json_obj["request_id"];
438+
out_body["work_id"] = "sys";
439+
out_body["created"] = time(NULL);
440+
out_body["object"] = std::string("sys.base64.stream");
441+
out_body["error"] = nlohmann::json::parse("{\"code\":0, \"message\":\"\"}");
442+
out_body["data"] = base64_data;
443+
zmq_com_send(com_id, out_body.dump());
444+
}
331445
return out;
332446
}
333447

projects/llm_framework/main_sys/src/main.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@ void get_run_config()
6666
key_sql["config_sys_pcm_rate"] = std::string("16000");
6767
key_sql["config_sys_pcm_bit"] = std::string("S16_LE");
6868
key_sql["config_sys_pcm_cap_channel"] = std::string("ipc:///tmp/llm/pcm.cap.socket");
69+
key_sql["config_sys_stream_length"] = 1024;
6970
load_default_config();
7071
}
7172

0 commit comments

Comments
 (0)