Skip to content

Commit

Permalink
Support get version by providing timestamp
Browse files Browse the repository at this point in the history
Signed-off-by: Lei Wang <jiede.wl@alibaba-inc.com>
  • Loading branch information
doudoubobo committed Jun 27, 2024
1 parent 273a512 commit 8df9117
Show file tree
Hide file tree
Showing 17 changed files with 223 additions and 4 deletions.
1 change: 1 addition & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ option(ADD_CONVERTER "Option to add converter" ON)
option(ADD_VEGITO "Option to add Vegito" ON)
option(ADD_PGQL "Option to add PGQL" ON)
option(ADD_GAE_ENGINE "Option to add GAE engine" OFF)
option(ENABLE_CHECKPOINT "Option to support checkpoint" OFF)

if (ADD_CONVERTER)
add_subdirectory(converter)
Expand Down
2 changes: 2 additions & 0 deletions charts/gart/templates/converter/deployment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ spec:
- name: converter
image: {{ include "gart.converter.image" . }}
imagePullPolicy: {{ .Values.converter.image.pullPolicy | quote }}
resources: {{ toYaml .Values.converter.resources | nindent 10 }}
command: ["/bin/bash", "-c"]
args:
- |
Expand Down Expand Up @@ -90,6 +91,7 @@ spec:
- name: controller
image: {{ include "gart.controller.image" . }}
imagePullPolicy: {{ .Values.controller.image.pullPolicy | quote }}
resources: {{ toYaml .Values.controller.resources | nindent 10 }}
command: ["/workspace/gart/scripts/controller.py"]
ports:
- containerPort: {{ .Values.controller.containerPort }}
Expand Down
2 changes: 1 addition & 1 deletion charts/gart/templates/converter/svc.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ metadata:
labels:
app: converter
spec:
type: ClusterIP
type: LoadBalancer
selector:
app: converter
ports:
Expand Down
1 change: 1 addition & 0 deletions charts/gart/templates/debezium/deployment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ spec:
- name: debezium-connect
image: {{ include "gart.debezium.image" . }}
imagePullPolicy: {{ .Values.debezium.image.pullPolicy | quote }}
resources: {{ toYaml .Values.debezium.resources | nindent 10 }}
env:
- name: BOOTSTRAP_SERVERS
value: {{ $kafka_service }}
Expand Down
1 change: 1 addition & 0 deletions charts/gart/templates/gie_frontend/deployment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ spec:
- name: gie-frontend
image: {{ include "gart.gie_frontend.image" . }}
imagePullPolicy: {{ .Values.gie_frontend.image.pullPolicy | quote }}
resources: {{ toYaml .Values.gie_frontend.resources | nindent 10 }}
command: ["/bin/bash", "-c"]
args:
- |
Expand Down
5 changes: 2 additions & 3 deletions charts/gart/templates/gie_frontend/svc.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,10 @@ metadata:
labels:
app: gie-frontend
spec:
type: ClusterIP
type: LoadBalancer
selector:
app: gie-frontend
ports:
- protocol: TCP
port: {{ .Values.gie_frontend.gremlinPort }}
targetPort: gremlin
nodePort: null
targetPort: gremlin
3 changes: 3 additions & 0 deletions charts/gart/templates/writer/statefulset.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ spec:
- name: writer
image: {{ include "gart.writer.image" . }}
imagePullPolicy: {{ .Values.writer.image.pullPolicy | quote }}
resources: {{ toYaml .Values.writer.resources | nindent 10 }}
command: ["/bin/bash", "-c"]
args:
- |
Expand Down Expand Up @@ -64,6 +65,7 @@ spec:
- name: analyzer
image: {{ include "gart.analyzer.image" . }}
imagePullPolicy: {{ .Values.analyzer.image.pullPolicy | quote }}
resources: {{ toYaml .Values.analyzer.resources | nindent 10 }}
command: ["/bin/bash", "-c"]
args:
- |
Expand All @@ -79,6 +81,7 @@ spec:
- name: gie-executor
image: {{ include "gart.gie_executor.image" . }}
imagePullPolicy: {{ .Values.gie_executor.image.pullPolicy | quote }}
resources: {{ toYaml .Values.gie_executor.resources | nindent 10 }}
ports:
- name: gaia-rpc
containerPort: {{ .Values.gie_executor.GAIA_RPC_PORT }}
Expand Down
18 changes: 18 additions & 0 deletions charts/gart/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,17 @@ debezium:
tag: latest
pullPolicy: IfNotPresent
containerPort: 8083
resources: {}
# We usually recommend not to specify default resources and to leave this as a conscious
# choice for the user. This also increases chances charts run on environments with little
# resources, such as Minikube. If you do want to specify resources, uncomment the following
# lines, adjust them as necessary, and remove the curly braces after 'resources:'.
# limits:
# cpu: 100m
# memory: 128Mi
# requests:
# cpu: 100m
# memory: 128Mi

# converter config
converter:
Expand All @@ -162,6 +173,7 @@ converter:
repository: gart-converter
tag: latest
pullPolicy: IfNotPresent
resources: {}

# writer config
writer:
Expand All @@ -170,6 +182,7 @@ writer:
repository: gart-writer
tag: latest
pullPolicy: IfNotPresent
resources: {}

# analyzer config
analyzer:
Expand All @@ -178,6 +191,7 @@ analyzer:
repository: gart-analyzer
tag: latest
pullPolicy: IfNotPresent
resources: {}

# controller config
controller:
Expand All @@ -189,6 +203,7 @@ controller:
port: 80
# container port
containerPort: 5000
resources: {}

#gie frontend config
gie_frontend:
Expand All @@ -198,6 +213,7 @@ gie_frontend:
pullPolicy: IfNotPresent
replicaCount: 1
gremlinPort: 8182
resources: {}

# gie executor config
gie_executor:
Expand All @@ -209,6 +225,8 @@ gie_executor:
ENGINE_PORT: 9000
HTTP_SERVICE_PORT: 5000
FLASK_PORT: 5000
resources: {}



dataconfig:
Expand Down
15 changes: 15 additions & 0 deletions converter/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,11 @@ if (GLOG_FOUND)
set(CMAKE_REQUIRED_LIBRARIES "${CMAKE_REQUIRED_LIBRARIES} ${GLOG_LIBRARIES}")
endif ()

if(ENABLE_CHECKPOINT)
find_package(Boost REQUIRED COMPONENTS serialization)
include_directories(${Boost_INCLUDE_DIRS})
endif()

file(GLOB_RECURSE FILES_NEED_FORMAT "*.cc" "*.h")

add_custom_target(convert_clformat
Expand All @@ -70,10 +75,20 @@ target_include_directories(binlog_convert_debezium PRIVATE ${RDKAFKA_INCLUDE_DIR
target_link_libraries(binlog_convert_debezium ${RDKAFKA_LIBRARIES} ${GFLAGS_LIBRARIES} ${GLOG_LIBRARIES} ${CMAKE_DL_LIBS} ${YAML_CPP_LIBRARIES})
target_compile_definitions(binlog_convert_debezium PUBLIC -DUSE_DEBEZIUM)

if(ENABLE_CHECKPOINT)
target_compile_definitions(binlog_convert_debezium PRIVATE ENABLE_CHECKPOINT)
target_link_libraries(binlog_convert_debezium ${Boost_LIBRARIES})
endif()

add_executable(binlog_convert_maxwell binlog_convert.cc flags.cc parser.cc)

target_include_directories(binlog_convert_maxwell PRIVATE ${RDKAFKA_INCLUDE_DIR})
target_link_libraries(binlog_convert_maxwell ${RDKAFKA_LIBRARIES} ${GFLAGS_LIBRARIES} ${GLOG_LIBRARIES} ${CMAKE_DL_LIBS} ${YAML_CPP_LIBRARIES})

add_executable(json2yaml json2yaml.cc)
target_link_libraries(json2yaml ${YAML_CPP_LIBRARIES})

if(ENABLE_CHECKPOINT)
target_compile_definitions(binlog_convert_maxwell PRIVATE ENABLE_CHECKPOINT)
target_link_libraries(binlog_convert_maxwell ${Boost_LIBRARIES})
endif()
26 changes: 26 additions & 0 deletions converter/binlog_convert.cc
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,11 @@
#include <fstream>
#include <iostream>

#ifdef ENABLE_CHECKPOINT
#include <thread>
#include <mutex>
#endif // ENABLE_CHECKPOINT

#include "converter/flags.h"
#include "converter/kafka_helper.h"
#include "converter/parser.h"
Expand All @@ -38,6 +43,10 @@ using std::endl;
using std::flush;
using std::make_shared;

#ifdef ENABLE_CHECKPOINT
std::mutex checkpoint_mutex;
#endif // ENABLE_CHECKPOINT

int main(int argc, char** argv) {
google::InitGoogleLogging(argv[0]);
gflags::ParseCommandLineFlags(&argc, &argv, true);
Expand All @@ -48,6 +57,23 @@ int main(int argc, char** argv) {

TxnLogParser parser(FLAGS_etcd_endpoint, FLAGS_etcd_prefix,
FLAGS_subgraph_num);
#ifdef ENABLE_CHECKPOINT
std::thread checkpoint_thread([&]() {
while (1) {
std::this_thread::sleep_for(
std::chrono::minutes(FLAGS_checkpoint_interval));
{
std::lock_guard<std::mutex> guard(checkpoint_mutex);
parser.checkpoint_vertex_maps(FLAGS_checkpoint_dir);
}
}
});
checkpoint_thread.detach();
{
std::lock_guard<std::mutex> guard(checkpoint_mutex);
parser.load_vertex_maps_checkpoint(FLAGS_checkpoint_dir);
}
#endif // ENABLE_CHECKPOINT

int log_count = 0;
bool is_timeout = false;
Expand Down
3 changes: 3 additions & 0 deletions converter/flags.cc
Original file line number Diff line number Diff line change
Expand Up @@ -30,4 +30,7 @@ DEFINE_string(etcd_prefix, "", "etcd prefix.");

DEFINE_int32(subgraph_num, 1, "Number of subgraphs for GAP workloads.");

DEFINE_int32(checkpoint_interval, 10, "Checkpoint interval in minutes.");
DEFINE_string(checkpoint_dir, "/tmp/checkpoint", "Checkpoint directory.");

DEFINE_bool(enable_bulkload, false, "Enable bulkload from existing data.");
3 changes: 3 additions & 0 deletions converter/flags.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,4 +32,7 @@ DECLARE_string(etcd_prefix);
DECLARE_int32(subgraph_num);
DECLARE_bool(enable_bulkload);

DECLARE_int32(checkpoint_interval); // in minutes
DECLARE_string(checkpoint_dir);

#endif // CONVERTER_FLAGS_H_
48 changes: 48 additions & 0 deletions converter/parser.cc
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

#include "converter/parser.h"

#include <chrono>
#include <fstream>
#include <string>
#include <utility>
Expand Down Expand Up @@ -549,4 +550,51 @@ void TxnLogParser::fill_prop(LogEntry& out, const json& log) const {
}
}

#ifdef ENABLE_CHECKPOINT
void TxnLogParser::checkpoint_vertex_maps(const string& folder_path) {
// file format: vlabel_id_string_vertex_map.bin or vlabel_id_int_vertex_map.bin
for (auto v_label = 0; v_label < vlabel_num_; v_label++) {
if (!string_oid2gid_maps_.empty()) {
std::string file_name = folder_path + "/vlabel_" + std::to_string(v_label) +
"_string_vertex_map.bin";
std::ofstream ofs(file_name);
boost::archive::binary_oarchive oa(ofs);
oa << string_oid2gid_maps_[v_label];
} else {
std::string file_name = folder_path + "/vlabel_" + std::to_string(v_label) +
"_int_vertex_map.bin";
std::ofstream ofs(file_name);
boost::archive::binary_oarchive oa(ofs);
oa << int64_oid2gid_maps_[v_label];
}
}
// write the values of vertex_nums_ and vertex_nums_per_fragment_ into files
std::string file_name = folder_path + "/vertex_nums.bin";
std::ofstream ofs(file_name);
}

void TxnLogParser::load_vertex_maps_checkpoint(const string& folder_path) {
for (auto v_label = 0; v_label < vlabel_num_; v_label++) {
std::string file_name = folder_path + "/vlabel_" + std::to_string(v_label) +
"_string_vertex_map.bin";
// check if file exists
std::ifstream ifs(file_name, std::ios::binary);
if (ifs.good()) {
boost::archive::binary_iarchive ia(ifs);
ia >> string_oid2gid_maps_[v_label];
continue;
}

file_name = folder_path + "/vlabel_" + std::to_string(v_label) +
"_int_vertex_map.bin";
ifs.open(file_name, std::ios::binary);
if (ifs.good()) {
boost::archive::binary_iarchive ia(ifs);
ia >> int64_oid2gid_maps_[v_label];
continue;
}
}
}
#endif // ENABLE_CHECKPOINT

} // namespace converter
12 changes: 12 additions & 0 deletions converter/parser.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,12 @@
#include <string>
#include <utility>
#include <vector>
#ifdef ENABLE_CHECKPOINT
#include <boost/archive/binary_oarchive.hpp>
#include <boost/archive/binary_iarchive.hpp>
#include <boost/serialization/map.hpp>
#include <boost/serialization/string.hpp>
#endif // ENABLE_CHECKPOINT

#include "vineyard/common/util/json.h"

Expand Down Expand Up @@ -100,6 +106,12 @@ class TxnLogParser {

gart::Status parse(LogEntry& out, const std::string& log_str, int epoch);

#ifdef ENABLE_CHECKPOINT
void checkpoint_vertex_maps(const std::string& folder_path);

void load_vertex_maps_checkpoint(const std::string& folder_path);
#endif // ENABLE_CHECKPOINT

~TxnLogParser() = default;

private:
Expand Down
Loading

0 comments on commit 8df9117

Please sign in to comment.