#include #include namespace c10d { PrefixStore::PrefixStore(std::string prefix, c10::intrusive_ptr store) : prefix_(std::move(prefix)), store_(std::move(store)) {} std::string PrefixStore::joinKey(const std::string& key) { return prefix_ + "/" + key; } std::vector PrefixStore::joinKeys( const std::vector& keys) { std::vector joinedKeys; joinedKeys.reserve(keys.size()); for (const auto& key : keys) { joinedKeys.emplace_back(joinKey(key)); } return joinedKeys; } void PrefixStore::set( const std::string& key, const std::vector& value) { store_->set(joinKey(key), value); } std::vector PrefixStore::compareSet( const std::string& key, const std::vector& expectedValue, const std::vector& desiredValue) { return store_->compareSet(joinKey(key), expectedValue, desiredValue); } std::vector PrefixStore::get(const std::string& key) { return store_->get(joinKey(key)); } int64_t PrefixStore::add(const std::string& key, int64_t value) { return store_->add(joinKey(key), value); } bool PrefixStore::deleteKey(const std::string& key) { return store_->deleteKey(joinKey(key)); } int64_t PrefixStore::getNumKeys() { return store_->getNumKeys(); } bool PrefixStore::check(const std::vector& keys) { auto joinedKeys = joinKeys(keys); return store_->check(joinedKeys); } void PrefixStore::wait(const std::vector& keys) { auto joinedKeys = joinKeys(keys); store_->wait(joinedKeys); } void PrefixStore::wait( const std::vector& keys, const std::chrono::milliseconds& timeout) { auto joinedKeys = joinKeys(keys); store_->wait(joinedKeys, timeout); } const std::chrono::milliseconds& PrefixStore::getTimeout() const noexcept { return store_->getTimeout(); } void PrefixStore::setTimeout(const std::chrono::milliseconds& timeout) { store_->setTimeout(timeout); } void PrefixStore::append( const std::string& key, const std::vector& value) { store_->append(joinKey(key), value); } std::vector> PrefixStore::multiGet( const std::vector& keys) { std::vector prefixed_keys; prefixed_keys.reserve(keys.size()); for (auto& key : keys) { prefixed_keys.push_back(joinKey(key)); } return store_->multiGet(prefixed_keys); } void PrefixStore::multiSet( const std::vector& keys, const std::vector>& values) { std::vector prefixed_keys; prefixed_keys.reserve(keys.size()); for (auto& key : keys) { prefixed_keys.push_back(joinKey(key)); } store_->multiSet(prefixed_keys, values); } // Returns true if this store support append, multiGet and multiSet bool PrefixStore::hasExtendedApi() const { return store_->hasExtendedApi(); } c10::intrusive_ptr PrefixStore::getUnderlyingStore() { return store_; } c10::intrusive_ptr PrefixStore::getUnderlyingNonPrefixStore() { c10::intrusive_ptr store = store_; while (store) { // Attempt to dynamically cast to PrefixStore PrefixStore* asPrefixStore = dynamic_cast(store.get()); if (asPrefixStore) { store = asPrefixStore->getUnderlyingStore(); } else { break; // We've reached a non-PrefixStore } } TORCH_CHECK( store != nullptr, "Underlying Non-PrefixStore shouldn't be null."); return store; } } // namespace c10d