diff --git a/include/NacosExceptions.h b/include/NacosExceptions.h index 56f9561..be451fd 100644 --- a/include/NacosExceptions.h +++ b/include/NacosExceptions.h @@ -81,6 +81,7 @@ class NacosException : public std::exception { static const int UNABLE_TO_GET_HOST_IP = 1008; static const int UNABLE_TO_CREATE_SOCKET = 1009; static const int INVALID_CONFIG_PARAM = 1010; + static const int UNABLE_TO_GET_HOST_NAME = 1011; }; diff --git a/include/constant/PropertyKeyConst.h b/include/constant/PropertyKeyConst.h index 83e98be..647ff9f 100644 --- a/include/constant/PropertyKeyConst.h +++ b/include/constant/PropertyKeyConst.h @@ -58,6 +58,10 @@ class PropertyKeyConst { static const int NACOS_DEFAULT_PORT = 8848; + static const NacosString INSTANCE_ID_SEQ_FILE; + + static const NacosString INSTANCE_ID_PREFIX; + /*public static class SystemEnv { static const NacosString ALIBABA_ALIWARE_ENDPOINT_PORT = "ALIBABA_ALIWARE_ENDPOINT_PORT"; diff --git a/include/listen/Listener.h b/include/listen/Listener.h index eb00f7e..acd596e 100644 --- a/include/listen/Listener.h +++ b/include/listen/Listener.h @@ -8,7 +8,7 @@ namespace nacos{ class Listener { private: NacosString listenerName; - AtomicInt refCount; + AtomicInt refCount; public: Listener() { this->listenerName = "theListener"; diff --git a/include/naming/subscribe/EventListener.h b/include/naming/subscribe/EventListener.h index f7a6c05..1341b62 100644 --- a/include/naming/subscribe/EventListener.h +++ b/include/naming/subscribe/EventListener.h @@ -14,7 +14,7 @@ namespace nacos{ class EventListener { private: NacosString listenerName; - AtomicInt refCount; + AtomicInt refCount; public: EventListener() { this->listenerName = "theListener"; diff --git a/include/thread/AtomicInt.h b/include/thread/AtomicInt.h index dc50576..0786f94 100644 --- a/include/thread/AtomicInt.h +++ b/include/thread/AtomicInt.h @@ -2,22 +2,30 @@ #define __ATOMIC_INT_H_ namespace nacos{ +template class AtomicInt { private: - volatile int _intval; + volatile T _curval; public: - AtomicInt(int initval = 0) : _intval(initval) {}; + AtomicInt(T curval = 0) : _curval(curval) {}; - int inc(int incval = 1) { - int oldValue = __sync_fetch_and_add(&_intval, incval); + void set(T val) { _curval = val; }; + + T inc(T incval = 1) { + T oldValue = getAndInc(incval); return incval + oldValue; }; - int dec(int decval = 1) { + T getAndInc(T incval = 1) { + T oldValue = __sync_fetch_and_add(&_curval, incval); + return oldValue; + } + + T dec(int decval = 1) { return inc(-decval); }; - int get() const { return _intval; }; + T get() const { return _curval; }; }; }//namespace nacos diff --git a/src/config/AppConfigManager.cpp b/src/config/AppConfigManager.cpp index ff76a64..935c5c0 100644 --- a/src/config/AppConfigManager.cpp +++ b/src/config/AppConfigManager.cpp @@ -102,6 +102,8 @@ void AppConfigManager::initDefaults() { NacosString homedir = DirUtils::getHome(); + set(PropertyKeyConst::INSTANCE_ID_PREFIX, NetUtils::getHostName()); + set(PropertyKeyConst::INSTANCE_ID_SEQ_FILE, homedir + ConfigConstant::FILE_SEPARATOR + "nacos" + ConfigConstant::FILE_SEPARATOR + "instance_seq.dat"); set(PropertyKeyConst::NACOS_SNAPSHOT_PATH, homedir + ConfigConstant::FILE_SEPARATOR + "nacos" + ConfigConstant::FILE_SEPARATOR + "snapshot"); log_info("[AppConfigManager]-initDefaults:DEFAULT_SNAPSHOT_PATH:%s\n", appConfig[PropertyKeyConst::NACOS_SNAPSHOT_PATH].c_str()); } diff --git a/src/constant/PropertyKeyConst.cpp b/src/constant/PropertyKeyConst.cpp index 0db1d0b..36aa952 100644 --- a/src/constant/PropertyKeyConst.cpp +++ b/src/constant/PropertyKeyConst.cpp @@ -50,4 +50,6 @@ const NacosString PropertyKeyConst::CLIENT_NAME = "nacos.client.name"; const NacosString PropertyKeyConst::AUTH_USERNAME = "nacos.auth.username"; const NacosString PropertyKeyConst::AUTH_PASSWORD = "nacos.auth.password"; const NacosString PropertyKeyConst::LOCAL_IP = "nacos.client.ip"; +const NacosString PropertyKeyConst::INSTANCE_ID_SEQ_FILE = "nacos.instId.seq.file"; +const NacosString PropertyKeyConst::INSTANCE_ID_PREFIX = "nacos.instId.prefix"; }//namespace nacos diff --git a/src/factory/NacosServiceFactory.cpp b/src/factory/NacosServiceFactory.cpp index 86914a1..d8ce633 100644 --- a/src/factory/NacosServiceFactory.cpp +++ b/src/factory/NacosServiceFactory.cpp @@ -19,6 +19,7 @@ #include "src/naming/subscribe/HostReactor.h" #include "src/security/SecurityManager.h" #include "src/utils/ConfigParserUtils.h" +#include "src/utils/SequenceProvider.h" #include "utils/DirUtils.h" //Unlike Java, in cpp, there's no container, no spring to do the ORM job, so I have to handle it myself @@ -114,6 +115,10 @@ NamingService *NacosServiceFactory::CreateNamingService() NACOS_THROW(NacosExcep HostReactor *hostReactor = new HostReactor(objectConfigData); objectConfigData->_hostReactor = hostReactor; + const NacosString &seqConfile = appConfigManager->get(PropertyKeyConst::INSTANCE_ID_SEQ_FILE); + SequenceProvider *sequenceProvider = new SequenceProvider(seqConfile, 1, 10); + objectConfigData->_sequenceProvider = sequenceProvider; + objectConfigData->checkAssembledObject(); NamingService *instance = new NacosNamingService(objectConfigData); diff --git a/src/factory/ObjectConfigData.cpp b/src/factory/ObjectConfigData.cpp index 68be291..42be791 100644 --- a/src/factory/ObjectConfigData.cpp +++ b/src/factory/ObjectConfigData.cpp @@ -9,6 +9,7 @@ #include "src/listen/ClientWorker.h" #include "src/security/SecurityManager.h" #include "utils/UuidUtils.h" +#include "src/utils/SequenceProvider.h" namespace nacos{ @@ -44,6 +45,7 @@ void ObjectConfigData::checkNamingService() NACOS_THROW(NacosException) { NACOS_ASSERT(_serverListManager != NULL); NACOS_ASSERT(_udpNamingServiceListener != NULL); NACOS_ASSERT(_udpNamingServiceListener != NULL); + NACOS_ASSERT(_sequenceProvider != NULL); } void ObjectConfigData::checkConfigService() NACOS_THROW(NacosException) { @@ -206,6 +208,12 @@ void ObjectConfigData::destroyNamingService() { delete _appConfigManager; _appConfigManager = NULL; } + + if (_sequenceProvider != NULL) + { + delete _sequenceProvider; + _sequenceProvider = NULL; + } } void ObjectConfigData::destroyMaintainService() { diff --git a/src/factory/ObjectConfigData.h b/src/factory/ObjectConfigData.h index 0a3258c..c708a7d 100644 --- a/src/factory/ObjectConfigData.h +++ b/src/factory/ObjectConfigData.h @@ -20,6 +20,7 @@ class LocalSnapshotManager; class SecurityManager; class UdpNamingServiceListener; class HostReactor; +template class SequenceProvider; enum FactoryType { CONFIG = 0, @@ -58,6 +59,7 @@ class ObjectConfigData { SecurityManager *_securityManager; UdpNamingServiceListener *_udpNamingServiceListener; HostReactor *_hostReactor; + SequenceProvider *_sequenceProvider; }; }//namespace nacos diff --git a/src/json/JSON.cpp b/src/json/JSON.cpp index a76b43f..c305717 100644 --- a/src/json/JSON.cpp +++ b/src/json/JSON.cpp @@ -110,9 +110,10 @@ long JSON::getLong(const NacosString &jsonString, const NacosString &fieldname) Instance JSON::Json2Instance(const Value &host) NACOS_THROW(NacosException) { Instance theinstance; - markRequired(host, "instanceId"); - const Value &instanceId = host["instanceId"]; - theinstance.instanceId = instanceId.GetString(); + if (host.HasMember("instanceId")) { + const Value &instanceId = host["instanceId"]; + theinstance.instanceId = instanceId.GetString(); + } markRequired(host, "port"); const Value &port = host["port"]; diff --git a/src/naming/NacosNamingService.cpp b/src/naming/NacosNamingService.cpp index b1e3f5b..5a5b1f4 100644 --- a/src/naming/NacosNamingService.cpp +++ b/src/naming/NacosNamingService.cpp @@ -2,6 +2,7 @@ #include "src/naming/subscribe/SubscriptionPoller.h" #include "src/naming/subscribe/UdpNamingServiceListener.h" #include "src/naming/beat/BeatReactor.h" +#include "src/utils/SequenceProvider.h" #include "utils/NamingUtils.h" #include "constant/UtilAndComs.h" #include "utils/ParamUtils.h" @@ -88,7 +89,8 @@ void NacosNamingService::registerInstance const NacosString &groupName, Instance &instance ) NACOS_THROW(NacosException) { - + const NacosString &instanceIdPrefix = _objectConfigData->_appConfigManager->get(PropertyKeyConst::INSTANCE_ID_PREFIX); + instance.instanceId = instanceIdPrefix + NacosStringOps::valueOf(_objectConfigData->_sequenceProvider->next()); if (instance.ephemeral) { BeatInfo beatInfo; beatInfo.serviceName = NamingUtils::getGroupedName(serviceName, groupName); diff --git a/src/utils/NetUtils.cpp b/src/utils/NetUtils.cpp index 44935a7..21aee69 100644 --- a/src/utils/NetUtils.cpp +++ b/src/utils/NetUtils.cpp @@ -2,9 +2,13 @@ #include #include #include +#include #include #include #include +#include + +#define HOST_AND_LEN 250 namespace nacos{ @@ -44,4 +48,17 @@ NacosString NetUtils::getHostIp() NACOS_THROW(NacosException){ //Usually the program will not run to here throw NacosException(NacosException::UNABLE_TO_GET_HOST_IP, "Failed to get IF address"); } + +NacosString NetUtils::getHostName() NACOS_THROW(NacosException) +{ + char hostname[HOST_AND_LEN]; + + int res = gethostname(hostname, HOST_AND_LEN); + if (res == 0) { + return NacosString(hostname); + } + + throw NacosException(NacosException::UNABLE_TO_GET_HOST_NAME, "Failed to get hostname, errno = " + NacosStringOps::valueOf(errno)); +} + }//namespace nacos diff --git a/src/utils/NetUtils.h b/src/utils/NetUtils.h index 71de3b6..bb7b23e 100644 --- a/src/utils/NetUtils.h +++ b/src/utils/NetUtils.h @@ -10,6 +10,9 @@ class NetUtils { public: //Get IP address (best guess) static NacosString getHostIp() NACOS_THROW(NacosException); + + //Get hostname + static NacosString getHostName() NACOS_THROW(NacosException); }; }//namespace nacos diff --git a/src/utils/SequenceProvider.h b/src/utils/SequenceProvider.h new file mode 100644 index 0000000..49421d4 --- /dev/null +++ b/src/utils/SequenceProvider.h @@ -0,0 +1,82 @@ +#include +#include +#include +#include +#include "NacosString.h" +#include "NacosExceptions.h" +#include "thread/AtomicInt.h" +#include "src/thread/Mutex.h" +#include "src/config/IOUtils.h" + +namespace nacos +{ + +template +class SequenceProvider { +private: + NacosString _fileName; + AtomicInt _current; + Mutex _acquireMutex; + T _nr_to_preserve; + T _initSequence; + volatile T _hwm;//high water mark + + void ensureWrite(int fd, T data) { + size_t bytes_written = 0; + while (bytes_written < sizeof(T)) { + bytes_written += write(fd, (char*)&data + bytes_written, sizeof(T) - bytes_written); + } + } + + T preserve() { + T current; + int fd; + bool newFile = false; + if (IOUtils::checkNotExistOrNotFile(_fileName)) { + newFile = true; + } + mode_t mode = S_IRUSR | S_IWUSR | S_IRWXG | S_IWGRP; + fd = open(_fileName.c_str(), O_RDWR | O_CREAT, mode); + if (fd <= 0) { + throw new NacosException(NacosException::UNABLE_TO_OPEN_FILE, _fileName); + } + + if (newFile) { + ensureWrite(fd, _initSequence); + lseek(fd, 0, SEEK_SET);//read from the beginning + } + + size_t bytes_read = 0; + while (bytes_read < sizeof(T)) + { + bytes_read += read(fd, (char*)¤t + bytes_read, sizeof(T) - bytes_read); + } + lseek(fd, 0, SEEK_SET);//write from the beginning + + ensureWrite(fd, current + _nr_to_preserve); + close(fd); + _hwm = current + _nr_to_preserve; + return current; + }; +public: + SequenceProvider(const NacosString &fileName, T initSequence, T nr_to_preserve) { + _fileName = fileName; + _initSequence = initSequence; + _nr_to_preserve = nr_to_preserve; + _current.set(preserve()); + }; + + T next(int step = 1) { + T res = _current.getAndInc(step); + while (res >= _hwm) { + _acquireMutex.lock(); + if (res >= _hwm) { + preserve(); + } + _acquireMutex.unlock(); + } + return res; + }; +}; + +} // namespace nacos diff --git a/test/allinone.cpp b/test/allinone.cpp index be76a67..3f111ea 100644 --- a/test/allinone.cpp +++ b/test/allinone.cpp @@ -130,6 +130,8 @@ bool testNamingServiceAndDeRegisterActively(); bool testThreadPoolConcurrentWithAtomicCounter(); +bool testSequenceProvider(); + TestData disabledTestList[] = TEST_ITEM_START TEST_ITEM_END @@ -137,7 +139,6 @@ TEST_ITEM_END TestData testList[] = TEST_ITEM_START - TEST_ITEM("Normal http test", testNormalHttpRequest) TEST_ITEM("No server request, should fail", testNoServerRequest) TEST_ITEM("Publish config to server", testPublishConfig) @@ -152,6 +153,7 @@ TEST_ITEM_START TEST_ITEM("Test for string characteristics", testStringEqual) TEST_ITEM("Read&Write file test", testReadWriteFile) TEST_ITEM("GetFileSize, should work well", testGetFileSize) + TEST_ITEM("Test get instances with predicate(testRandomByWeightSelector)", testRandomByWeightSelector) TEST_ITEM("Check whether file exists or not", testFileExists) TEST_ITEM("Create&Remove file", testCreateAndRemove) TEST_ITEM("Create a directory with subdirectories, and clean it", testCleanDirectory) @@ -170,6 +172,7 @@ TEST_ITEM_START TEST_ITEM("Smoke test for ThreadPool", testThreadPoolSmoke) TEST_ITEM("Test basic function of NacosNamingService's registerService", testNamingServiceRegister) TEST_ITEM("Test serialization/deserialization of Business Object", testString2ServiceInfo) + TEST_ITEM("Test get instances with predicate(Randomly)", testInstanceSelectors) TEST_ITEM("Test serialization/deserialization of malformed Business Object", testMalformedJson2ServiceInfo) TEST_ITEM("Test serialization/deserialization of malformed Business Object (Double)", testMalformedDouble2ServiceInfo) TEST_ITEM("Test serialization/deserialization of malformed Business Object (no cacheMillis)", testLackcacheMillisServiceInfo) @@ -184,8 +187,6 @@ TEST_ITEM_START TEST_ITEM("Register many services and get one", testGetAllInstances) TEST_ITEM("Subscribe & unsubscribe services", testListenService) TEST_ITEM("Test get all service names", testGetServiceNames) - TEST_ITEM("Test get instances with predicate(Randomly)", testInstanceSelectors) - TEST_ITEM("Test get instances with predicate(testRandomByWeightSelector)", testRandomByWeightSelector) TEST_ITEM("Smoking test of ThreadLocal", testThreadLocal) TEST_ITEM("Smoking test of ThreadLocal(pointer)", testThreadLocalPtr) TEST_ITEM("Smoking test of ThreadLocal(pointer with initializer)", testThreadLocalPtrWithInitializer) @@ -200,6 +201,7 @@ TEST_ITEM_START TEST_ITEM("Test delayed task pool - multiple tasks triggered at the same time", testDelayedThread2) TEST_ITEM("Register a service instance and remove it actively", testNamingServiceAndDeRegisterActively) TEST_ITEM("thread pool with concurrent add & atomic operation", testThreadPoolConcurrentWithAtomicCounter) + TEST_ITEM("Test sequence provider", testSequenceProvider) TEST_ITEM_END int main() { diff --git a/test/testcase/testDelayedThreadPool.cpp b/test/testcase/testDelayedThreadPool.cpp index 289322d..e4e0e13 100644 --- a/test/testcase/testDelayedThreadPool.cpp +++ b/test/testcase/testDelayedThreadPool.cpp @@ -29,7 +29,7 @@ class DelayedTask : public Task { if (executor == NULL) { throw NacosException(NacosException::INVALID_CONFIG_PARAM, "no executor"); } - printf(">>>>>>>>>>>>>>>>>>Task %s triggered, time =%lu (%lu), interval = %lu\n", getTaskName().c_str(), now_ms/1000, now_ms, interval_calc); + printf(">>>>>>>>>>>>>>>>>>Task %s triggered, time =%llu (%llu), interval = %llu\n", getTaskName().c_str(), now_ms/1000, now_ms, interval_calc); sleep(1); } diff --git a/test/testcase/testNamingService.cpp b/test/testcase/testNamingService.cpp index e9806c3..952156e 100644 --- a/test/testcase/testNamingService.cpp +++ b/test/testcase/testNamingService.cpp @@ -71,8 +71,10 @@ bool testNamingProxySmokeTest() { for (int i = 0; i < 10; i++) { NacosString serviceName = "TestServiceName" + NacosStringOps::valueOf(i); NacosString serverlist = namingProxy->queryList(serviceName, ConfigConstant::DEFAULT_GROUP, "TestCluster", 0, false); - - if (serverlist.find("\"serviceName\":\"" + serviceName + "\"") == string::npos) { + cout << serverlist << endl; + if (serverlist.find("\"serviceName\":\"" + serviceName + "\"") == string::npos && + //nacos 2.x compatibility + serverlist.find("\"serviceName\":\"DEFAULT_GROUP@@" + serviceName + "\"") == string::npos) { cout << "Failed to get data for:" << serviceName << endl; return false; } diff --git a/test/testcase/testSequenceProvider.cpp b/test/testcase/testSequenceProvider.cpp new file mode 100644 index 0000000..ef5948e --- /dev/null +++ b/test/testcase/testSequenceProvider.cpp @@ -0,0 +1,58 @@ +#include +#include "src/thread/Thread.h" +#include "src/utils/SequenceProvider.h" +#include "utils/DirUtils.h" + +using namespace std; +using namespace nacos; + +#define NR_THREADS 200 +#define GENERATION_PER_THREAD 1000 + +int64_t sequences[GENERATION_PER_THREAD * NR_THREADS]; +int tid[NR_THREADS]; + +SequenceProvider *sequenceProvider; + +void *SeqThreadFunc(void *param) { + int *thread_no = (int*)param; + for (int i = 0; i < GENERATION_PER_THREAD; i++) { + int64_t res = sequenceProvider->next(); + sequences[(*thread_no) * GENERATION_PER_THREAD + i] = res; + } + + return NULL; +} + +bool testSequenceProvider() { + cout << "in function testSequenceProvider" << endl; + + cout << "Generating SEQ..." << endl; + + sequenceProvider = new SequenceProvider (DirUtils::getCwd() + "/test_seq.dat", 20000, 100); + + Thread *threads[NR_THREADS] = {NULL}; + for (int i = 0; i < NR_THREADS; i++) { + NacosString threadName = "SEQThread-" + NacosStringOps::valueOf(i); + tid[i] = i; + threads[i] = new Thread(threadName, SeqThreadFunc, (void *) &tid[i]); + threads[i]->start(); + } + + for (int i = 0; i < NR_THREADS; i++) { + threads[i]->join(); + delete threads[i]; + } + + cout << "Generated." << endl; + + for (int i = 0; i < NR_THREADS; i++) { + for (int j = 0; j < GENERATION_PER_THREAD; j++) { + cout << "Thread " << i << ": sequence =\t" << sequences[i * GENERATION_PER_THREAD + j] << endl; + } + } + + cout << "test end..." << endl; + + return true; +} diff --git a/test/testcase/testThreadSmoke.cpp b/test/testcase/testThreadSmoke.cpp index 0d52aac..8021eba 100644 --- a/test/testcase/testThreadSmoke.cpp +++ b/test/testcase/testThreadSmoke.cpp @@ -82,13 +82,13 @@ bool testThreadPoolSmoke() { return true; } -AtomicInt totalFinishedThreads; +AtomicInt totalFinishedThreads; class SmokingTestThreadTask : public Task { private: - AtomicInt &_counter; + AtomicInt &_counter; public: - SmokingTestThreadTask(const NacosString &taskName, AtomicInt &counter) : _counter(counter) { + SmokingTestThreadTask(const NacosString &taskName, AtomicInt &counter) : _counter(counter) { setTaskName(taskName); }; @@ -108,7 +108,7 @@ bool testThreadPoolConcurrentWithAtomicCounter() { ThreadPool tp(10); tp.start(); cout << "ok, size = 10" << endl; - AtomicInt totalCounter; + AtomicInt totalCounter; Task *tasks[1000]; for (size_t i = 0; i < 40; i++) {