1 /*
2 * Copyright (c) 2022 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15 #include "schema_negotiate.h"
16
17 #include "log_print.h"
18 #include "schema_utils.h"
19
20 namespace DistributedDB {
21 // Some principle in current version describe below. (Relative-type will be introduced in future but not involved now)
22 // 1. PermitSync: Be false may because schemaType-unrecognized, schemaType-different, schema-unparsable,
23 // schemaVersion-unrecognized, schema-incompatible, and so on.
24 // 2. RequirePeerConvert: Be true normally when permitSync false, for future possible sync and convert(by remote).
25 // 3. checkOnReceive: Be false when local is KV-DB, or when local is not KV-DB only if schema type equal as well as
26 // define equal or remote is the upgradation of local.
MakeLocalSyncOpinion(const SchemaObject & localSchema,const std::string & remoteSchema,uint8_t remoteSchemaType)27 SyncOpinion SchemaNegotiate::MakeLocalSyncOpinion(const SchemaObject &localSchema, const std::string &remoteSchema,
28 uint8_t remoteSchemaType)
29 {
30 SchemaType localType = localSchema.GetSchemaType(); // An invalid schemaObject will return SchemaType::NONE
31 SchemaType remoteType = ReadSchemaType(remoteSchemaType);
32 // Logic below only be correct in current version, should be redesigned if new type added in the future
33 // 1. If remote-type unrecognized(Include Relative-type), Do not permit sync.
34 if (remoteType == SchemaType::UNRECOGNIZED) {
35 LOGE("[Schema][Opinion] Remote-type=%" PRIu8 " unrecognized.", remoteSchemaType);
36 return SyncOpinion{false, true, true};
37 }
38 // 2. If local-type is KV(Here remote-type is within recognized), Always permit sync.
39 if (localType == SchemaType::NONE) {
40 LOGI("[Schema][Opinion] Local-type KV.");
41 return SyncOpinion{true, false, false};
42 }
43 // 3. If remote-type is KV(Here local-type can only be JSON or FLATBUFFER), Always permit sync but need check.
44 if (remoteType == SchemaType::NONE) {
45 LOGI("[Schema][Opinion] Remote-type KV.");
46 return SyncOpinion{true, false, true};
47 }
48 // 4. If local-type differ with remote-type(Here both type can only be JSON or FLATBUFFER), Do not permit sync.
49 if (localType != remoteType) {
50 LOGE("[Schema][Opinion] Local-type=%s differ remote-type=%s.", SchemaUtils::SchemaTypeString(localType).c_str(),
51 SchemaUtils::SchemaTypeString(remoteType).c_str());
52 return SyncOpinion{false, true, true};
53 }
54 // 5. If schema parse fail, Do not permit sync.
55 SchemaObject remoteSchemaObj;
56 int errCode = remoteSchemaObj.ParseFromSchemaString(remoteSchema);
57 if (errCode != E_OK) {
58 LOGE("[Schema][Opinion] Parse remote-schema fail, errCode=%d, remote-type=%s.", errCode,
59 SchemaUtils::SchemaTypeString(remoteType).c_str());
60 return SyncOpinion{false, true, true};
61 }
62 // 6. If remote-schema is not incompatible based on local-schema(SchemaDefine Equal), Permit sync and don't check.
63 errCode = localSchema.CompareAgainstSchemaObject(remoteSchemaObj);
64 if (errCode != -E_SCHEMA_UNEQUAL_INCOMPATIBLE) {
65 return SyncOpinion{true, false, false};
66 }
67 // 7. If local-schema is not incompatible based on remote-schema(Can only be COMPATIBLE_UPGRADE), Sync and check.
68 errCode = remoteSchemaObj.CompareAgainstSchemaObject(localSchema);
69 if (errCode != -E_SCHEMA_UNEQUAL_INCOMPATIBLE) {
70 return SyncOpinion{true, false, true};
71 }
72 // 8. Local-schema incompatible with remote-schema mutually.
73 LOGE("[Schema][Opinion] Local-schema incompatible with remote-schema mutually.");
74 return SyncOpinion{false, true, true};
75 }
76
ConcludeSyncStrategy(const SyncOpinion & localOpinion,const SyncOpinion & remoteOpinion)77 SyncStrategy SchemaNegotiate::ConcludeSyncStrategy(const SyncOpinion &localOpinion, const SyncOpinion &remoteOpinion)
78 {
79 SyncStrategy outStrategy;
80 // Any side permit sync, the final conclusion is permit sync.
81 outStrategy.permitSync = (localOpinion.permitSync || remoteOpinion.permitSync);
82 bool convertConflict = (localOpinion.requirePeerConvert && remoteOpinion.requirePeerConvert);
83 if (convertConflict) {
84 outStrategy.permitSync = false;
85 }
86 // Responsible for conversion on send now that local do not require remote to do conversion
87 outStrategy.convertOnSend = (!localOpinion.requirePeerConvert);
88 // Responsible for conversion on receive since remote will not do conversion on send and require local to convert
89 outStrategy.convertOnReceive = remoteOpinion.requirePeerConvert;
90 // Only depend on local opinion
91 outStrategy.checkOnReceive = localOpinion.checkOnReceive;
92 LOGI("[Schema][Strategy] PermitSync=%d, SendConvert=%d, ReceiveConvert=%d, ReceiveCheck=%d.",
93 outStrategy.permitSync, outStrategy.convertOnSend, outStrategy.convertOnReceive, outStrategy.checkOnReceive);
94 return outStrategy;
95 }
96
MakeOpinionEachTable(const RelationalSchemaObject & localSchema,const RelationalSchemaObject & remoteSchema)97 RelationalSyncOpinion SchemaNegotiate::MakeOpinionEachTable(const RelationalSchemaObject &localSchema,
98 const RelationalSchemaObject &remoteSchema)
99 {
100 RelationalSyncOpinion opinion;
101 for (const auto &it : localSchema.GetTables()) {
102 if (remoteSchema.GetTable(it.first).GetTableName() != it.first) {
103 LOGW("[RelationalSchema][opinion] Table was missing in remote schema");
104 continue;
105 }
106 // remote table is compatible(equal or upgrade) based on local table, permit sync and don't need check
107 int errCode = it.second.CompareWithTable(remoteSchema.GetTable(it.first), localSchema.GetSchemaVersion());
108 if (errCode != -E_RELATIONAL_TABLE_INCOMPATIBLE) {
109 opinion[it.first] = {true, false, false};
110 continue;
111 }
112 // local table is compatible upgrade based on remote table, permit sync and need check
113 errCode = remoteSchema.GetTable(it.first).CompareWithTable(it.second, remoteSchema.GetSchemaVersion());
114 if (errCode != -E_RELATIONAL_TABLE_INCOMPATIBLE) {
115 opinion[it.first] = {true, false, true};
116 continue;
117 }
118 // local table is incompatible with remote table mutually, don't permit sync and need check
119 LOGW("[RelationalSchema][opinion] Local table is incompatible with remote table mutually.");
120 opinion[it.first] = {false, true, true};
121 }
122 return opinion;
123 }
124
MakeLocalSyncOpinion(const RelationalSchemaObject & localSchema,const std::string & remoteSchema,uint8_t remoteSchemaType)125 RelationalSyncOpinion SchemaNegotiate::MakeLocalSyncOpinion(const RelationalSchemaObject &localSchema,
126 const std::string &remoteSchema, uint8_t remoteSchemaType)
127 {
128 SchemaType localType = localSchema.GetSchemaType();
129 SchemaType remoteType = ReadSchemaType(remoteSchemaType);
130 if (remoteType == SchemaType::UNRECOGNIZED) {
131 LOGW("[RelationalSchema][opinion] Remote schema type %" PRIu8 " is unrecognized.", remoteSchemaType);
132 return {};
133 }
134
135 if (remoteType != SchemaType::RELATIVE) {
136 LOGW("[RelationalSchema][opinion] Not support sync with schema type: local-type=[%s] remote-type=[%s]",
137 SchemaUtils::SchemaTypeString(localType).c_str(), SchemaUtils::SchemaTypeString(remoteType).c_str());
138 return {};
139 }
140
141 if (!localSchema.IsSchemaValid()) {
142 LOGW("[RelationalSchema][opinion] Local schema is not valid");
143 return {};
144 }
145
146 RelationalSchemaObject remoteSchemaObj;
147 int errCode = remoteSchemaObj.ParseFromSchemaString(remoteSchema);
148 if (errCode != E_OK) {
149 LOGW("[RelationalSchema][opinion] Parse remote schema failed %d, remote schema type %s", errCode,
150 SchemaUtils::SchemaTypeString(remoteType).c_str());
151 return {};
152 }
153
154 if (localSchema.GetSchemaVersion() != remoteSchemaObj.GetSchemaVersion()) {
155 LOGW("[RelationalSchema][opinion] Schema version mismatch, local %s, remote %s",
156 localSchema.GetSchemaVersion().c_str(), remoteSchemaObj.GetSchemaVersion().c_str());
157 return {};
158 }
159
160 if (localSchema.GetSchemaVersion() == SchemaConstant::SCHEMA_SUPPORT_VERSION_V2_1 &&
161 localSchema.GetTableMode() != remoteSchemaObj.GetTableMode()) {
162 LOGW("[RelationalSchema][opinion] Schema table mode mismatch, local %d, remote %d",
163 localSchema.GetTableMode(), remoteSchemaObj.GetTableMode());
164 return {};
165 }
166
167 return MakeOpinionEachTable(localSchema, remoteSchemaObj);
168 }
169
ConcludeSyncStrategy(const RelationalSyncOpinion & localOpinion,const RelationalSyncOpinion & remoteOpinion)170 RelationalSyncStrategy SchemaNegotiate::ConcludeSyncStrategy(const RelationalSyncOpinion &localOpinion,
171 const RelationalSyncOpinion &remoteOpinion)
172 {
173 RelationalSyncStrategy syncStrategy;
174 for (const auto &itLocal : localOpinion) {
175 if (remoteOpinion.find(itLocal.first) == remoteOpinion.end()) {
176 LOGW("[RelationalSchema][Strategy] Table opinion is not found from remote.");
177 continue;
178 }
179 SyncOpinion localTableOpinion = itLocal.second;
180 SyncOpinion remoteTableOpinion = remoteOpinion.at(itLocal.first);
181 syncStrategy[itLocal.first] = ConcludeSyncStrategy(localTableOpinion, remoteTableOpinion);
182 }
183
184 return syncStrategy;
185 }
186
187 namespace {
188 const std::string MAGIC = "relational_opinion";
189 const uint32_t SYNC_OPINION_VERSION = 1;
190 } // namespace
191
192
CalculateParcelLen(const RelationalSyncOpinion & opinions)193 uint32_t SchemaNegotiate::CalculateParcelLen(const RelationalSyncOpinion &opinions)
194 {
195 uint64_t len = Parcel::GetStringLen(MAGIC);
196 len += Parcel::GetUInt32Len();
197 len += Parcel::GetUInt32Len();
198 len = Parcel::GetEightByteAlign(len);
199 for (const auto &it : opinions) {
200 len += Parcel::GetStringLen(it.first);
201 len += Parcel::GetUInt32Len();
202 len += Parcel::GetUInt32Len();
203 len = Parcel::GetEightByteAlign(len);
204 }
205 if (len > UINT32_MAX) {
206 return 0;
207 }
208 return static_cast<uint32_t>(len);
209 }
210
SerializeData(const RelationalSyncOpinion & opinions,Parcel & parcel)211 int SchemaNegotiate::SerializeData(const RelationalSyncOpinion &opinions, Parcel &parcel)
212 {
213 (void)parcel.WriteString(MAGIC);
214 (void)parcel.WriteUInt32(SYNC_OPINION_VERSION);
215 (void)parcel.WriteUInt32(static_cast<uint32_t>(opinions.size()));
216 (void)parcel.EightByteAlign();
217 for (const auto &it : opinions) {
218 (void)parcel.WriteString(it.first);
219 (void)parcel.WriteUInt32(it.second.permitSync);
220 (void)parcel.WriteUInt32(it.second.requirePeerConvert);
221 (void)parcel.EightByteAlign();
222 }
223 return parcel.IsError() ? -E_INVALID_ARGS : E_OK;
224 }
225
DeserializeData(Parcel & parcel,RelationalSyncOpinion & opinion)226 int SchemaNegotiate::DeserializeData(Parcel &parcel, RelationalSyncOpinion &opinion)
227 {
228 if (!parcel.IsContinueRead()) {
229 return E_OK;
230 }
231 std::string magicStr;
232 (void)parcel.ReadString(magicStr);
233 if (magicStr != MAGIC) {
234 LOGE("Deserialize sync opinion failed while read MAGIC string [%s]", magicStr.c_str());
235 return -E_INVALID_ARGS;
236 }
237 uint32_t version;
238 (void)parcel.ReadUInt32(version);
239 if (version != SYNC_OPINION_VERSION) {
240 LOGE("Not support sync opinion version: %u", version);
241 return -E_NOT_SUPPORT;
242 }
243 uint32_t opinionSize;
244 (void)parcel.ReadUInt32(opinionSize);
245 (void)parcel.EightByteAlign();
246 static const uint32_t MAX_OPINION_SIZE = 1024; // max 1024 opinions
247 if (parcel.IsError() || opinionSize > MAX_OPINION_SIZE) {
248 return -E_INVALID_ARGS;
249 }
250 for (uint32_t i = 0; i < opinionSize; i++) {
251 std::string tableName;
252 SyncOpinion tableOpinion;
253 (void)parcel.ReadString(tableName);
254 uint32_t permitSync;
255 (void)parcel.ReadUInt32(permitSync);
256 tableOpinion.permitSync = static_cast<bool>(permitSync);
257 uint32_t requirePeerConvert;
258 (void)parcel.ReadUInt32(requirePeerConvert);
259 tableOpinion.requirePeerConvert = static_cast<bool>(requirePeerConvert);
260 (void)parcel.EightByteAlign();
261 opinion[tableName] = tableOpinion;
262 }
263 return parcel.IsError() ? -E_INVALID_ARGS : E_OK;
264 }
265 }