19 #ifndef GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_LB_POLICY_SUBCHANNEL_LIST_H
20 #define GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_LB_POLICY_SUBCHANNEL_LIST_H
70 template <
typename SubchannelListType,
typename SubchannelDataType>
76 template <
typename SubchannelListType,
typename SubchannelDataType>
81 return static_cast<SubchannelListType*
>(subchannel_list_);
86 return static_cast<size_t>(
static_cast<const SubchannelDataType*
>(
this) -
87 subchannel_list_->subchannel(0));
99 connectivity_state_ = subchannel_->CheckConnectivityState();
100 return connectivity_state_;
141 : subchannel_data_(subchannel_data),
144 ~Watcher() { subchannel_list_.reset(
DEBUG_LOCATION,
"Watcher dtor"); }
149 return subchannel_list_->policy()->interested_parties();
158 void UnrefSubchannelLocked(
const char* reason);
172 template <
typename SubchannelListType,
typename SubchannelDataType>
181 SubchannelDataType*
subchannel(
size_t index) {
return &subchannels_[index]; }
210 friend class SubchannelData<SubchannelListType, SubchannelDataType>;
212 void ShutdownLocked();
225 bool shutting_down_ =
false;
236 template <
typename SubchannelListType,
typename SubchannelDataType>
241 "[%s %p] subchannel list %p index %" PRIuPTR
" of %" PRIuPTR
242 " (subchannel %p): connectivity changed: state=%s, "
243 "shutting_down=%d, pending_watcher=%p",
244 subchannel_list_->tracer()->name(), subchannel_list_->policy(),
245 subchannel_list_.get(), subchannel_data_->Index(),
246 subchannel_list_->num_subchannels(),
247 subchannel_data_->subchannel_.get(),
249 subchannel_data_->pending_watcher_);
251 if (!subchannel_list_->shutting_down() &&
252 subchannel_data_->pending_watcher_ !=
nullptr) {
253 subchannel_data_->connectivity_state_ = new_state;
255 subchannel_data_->ProcessConnectivityChangeLocked(new_state);
263 template <
typename SubchannelListType,
typename SubchannelDataType>
268 : subchannel_list_(subchannel_list),
269 subchannel_(std::move(subchannel)),
274 template <
typename SubchannelListType,
typename SubchannelDataType>
279 template <
typename SubchannelListType,
typename SubchannelDataType>
282 if (subchannel_ !=
nullptr) {
285 "[%s %p] subchannel list %p index %" PRIuPTR
" of %" PRIuPTR
286 " (subchannel %p): unreffing subchannel (%s)",
287 subchannel_list_->tracer()->name(), subchannel_list_->policy(),
288 subchannel_list_, Index(), subchannel_list_->num_subchannels(),
289 subchannel_.get(), reason);
295 template <
typename SubchannelListType,
typename SubchannelDataType>
296 void SubchannelData<SubchannelListType,
297 SubchannelDataType>::ResetBackoffLocked() {
298 if (subchannel_ !=
nullptr) {
303 template <
typename SubchannelListType,
typename SubchannelDataType>
305 SubchannelDataType>::StartConnectivityWatchLocked() {
308 "[%s %p] subchannel list %p index %" PRIuPTR
" of %" PRIuPTR
309 " (subchannel %p): starting watch (from %s)",
310 subchannel_list_->tracer()->name(), subchannel_list_->policy(),
311 subchannel_list_, Index(), subchannel_list_->num_subchannels(),
316 new Watcher(
this, subchannel_list()->Ref(
DEBUG_LOCATION,
"Watcher"));
319 std::unique_ptr<SubchannelInterface::ConnectivityStateWatcherInterface>(
323 template <
typename SubchannelListType,
typename SubchannelDataType>
328 "[%s %p] subchannel list %p index %" PRIuPTR
" of %" PRIuPTR
329 " (subchannel %p): canceling connectivity watch (%s)",
330 subchannel_list_->tracer()->name(), subchannel_list_->policy(),
331 subchannel_list_, Index(), subchannel_list_->num_subchannels(),
332 subchannel_.get(), reason);
334 if (pending_watcher_ !=
nullptr) {
336 pending_watcher_ =
nullptr;
340 template <
typename SubchannelListType,
typename SubchannelDataType>
342 if (pending_watcher_ !=
nullptr) CancelConnectivityWatchLocked(
"shutdown");
343 UnrefSubchannelLocked(
"shutdown");
350 template <
typename SubchannelListType,
typename SubchannelDataType>
361 "[%s %p] Creating subchannel list %p for %" PRIuPTR
" subchannels",
372 for (
size_t i = 0; i < addresses.
size(); i++) {
377 if (addresses[i].IsBalancer()) {
381 const size_t subchannel_address_arg_index = args_to_add.
size();
384 if (addresses[i].args() !=
nullptr) {
385 for (
size_t j = 0; j < addresses[i].args()->num_args; ++j) {
391 args_to_add.
data(), args_to_add.
size());
392 gpr_free(args_to_add[subchannel_address_arg_index].value.string);
401 "[%s %p] could not create subchannel for address uri %s, "
403 tracer_->
name(), policy_, address_uri);
411 "[%s %p] subchannel list %p index %" PRIuPTR
412 ": Created subchannel %p for address uri %s",
413 tracer_->
name(), policy_,
this, subchannels_.
size(),
421 template <
typename SubchannelListType,
typename SubchannelDataType>
424 gpr_log(
GPR_INFO,
"[%s %p] Destroying subchannel_list %p", tracer_->name(),
429 template <
typename SubchannelListType,
typename SubchannelDataType>
433 tracer_->name(), policy_,
this);
436 shutting_down_ =
true;
437 for (
size_t i = 0; i < subchannels_.size(); i++) {
438 SubchannelDataType* sd = &subchannels_[i];
439 sd->ShutdownLocked();
443 template <
typename SubchannelListType,
typename SubchannelDataType>
444 void SubchannelList<SubchannelListType,
445 SubchannelDataType>::ResetBackoffLocked() {
446 for (
size_t i = 0; i < subchannels_.size(); i++) {
447 SubchannelDataType* sd = &subchannels_[i];
448 sd->ResetBackoffLocked();
void grpc_channel_args_destroy(grpc_channel_args *a)
Destroy arguments created by grpc_channel_args_copy.
Definition: channel_args.cc:197
grpc_channel_args * grpc_channel_args_copy_and_add_and_remove(const grpc_channel_args *src, const char **to_remove, size_t num_to_remove, const grpc_arg *to_add, size_t num_to_add)
Copies the arguments from src except for those whose keys are in to_remove and appends the arguments ...
Definition: channel_args.cc:77
T * data()
Definition: inlined_vector.h:93
void reserve(size_t capacity)
Definition: inlined_vector.h:125
void emplace_back(Args &&... args)
Definition: inlined_vector.h:145
size_t size() const
Definition: inlined_vector.h:165
Definition: orphanable.h:77
void Unref()
Definition: orphanable.h:107
A proxy object implemented by the client channel and used by the LB policy to communicate with the ch...
Definition: lb_policy.h:260
virtual RefCountedPtr< SubchannelInterface > CreateSubchannel(const grpc_channel_args &args)=0
Creates a new subchannel with the specified channel args.
Interface for load balancing policies.
Definition: lb_policy.h:81
Definition: ref_counted_ptr.h:35
Definition: server_address.h:44
Definition: subchannel_list.h:77
SubchannelInterface * subchannel() const
Definition: subchannel_list.h:91
virtual ~SubchannelData()
Definition: subchannel_list.h:275
virtual void ProcessConnectivityChangeLocked(grpc_connectivity_state connectivity_state)=0
void ShutdownLocked()
Definition: subchannel_list.h:341
SubchannelListType * subchannel_list() const
Definition: subchannel_list.h:80
size_t Index() const
Definition: subchannel_list.h:85
SubchannelData(SubchannelList< SubchannelListType, SubchannelDataType > *subchannel_list, const ServerAddress &address, RefCountedPtr< SubchannelInterface > subchannel)
Definition: subchannel_list.h:264
grpc_connectivity_state CheckConnectivityStateLocked()
Definition: subchannel_list.h:97
void ResetBackoffLocked()
Definition: subchannel_list.h:297
void CancelConnectivityWatchLocked(const char *reason)
Definition: subchannel_list.h:325
void StartConnectivityWatchLocked()
Definition: subchannel_list.h:305
void CancelConnectivityStateWatch(const char *health_check_service_name, ConnectivityStateWatcherInterface *watcher)
Definition: subchannel.cc:809
static grpc_arg CreateSubchannelAddressArg(const grpc_resolved_address *addr)
Definition: subchannel.cc:841
void ResetBackoff()
Definition: subchannel.cc:829
void WatchConnectivityState(grpc_connectivity_state initial_state, grpc_core::UniquePtr< char > health_check_service_name, OrphanablePtr< ConnectivityStateWatcherInterface > watcher)
Definition: subchannel.cc:788
Definition: subchannel_interface.h:32
Definition: subchannel_interface.h:30
Definition: subchannel_list.h:173
bool shutting_down() const
Definition: subchannel_list.h:184
void ResetBackoffLocked()
Definition: subchannel_list.h:445
SubchannelDataType * subchannel(size_t index)
Definition: subchannel_list.h:181
TraceFlag * tracer() const
Definition: subchannel_list.h:188
LoadBalancingPolicy * policy() const
Definition: subchannel_list.h:187
virtual ~SubchannelList()
Definition: subchannel_list.h:422
SubchannelList(LoadBalancingPolicy *policy, TraceFlag *tracer, const ServerAddressList &addresses, LoadBalancingPolicy::ChannelControlHelper *helper, const grpc_channel_args &args)
Definition: subchannel_list.h:351
size_t num_subchannels() const
Definition: subchannel_list.h:178
void Orphan() override
Definition: subchannel_list.h:195
InlinedVector< SubchannelDataType, 10 > SubchannelVector
Definition: subchannel_list.h:175
const char * name() const
Definition: trace.h:68
#define DEBUG_LOCATION
Definition: debug_location.h:41
#define GRPC_ARG_SERVICE_CONFIG
Service config data in JSON form.
Definition: grpc_types.h:291
#define GPR_ASSERT(x)
abort() the process if x is zero, having written a line to the log.
Definition: log.h:94
GPRAPI void gpr_log(const char *file, int line, gpr_log_severity severity, const char *format,...) GPR_PRINT_FORMAT_CHECK(4
Log a message.
#define GPR_INFO
Definition: log.h:56
grpc_connectivity_state
Connectivity state of a channel.
Definition: connectivity_state.h:27
@ GRPC_CHANNEL_IDLE
channel is idle
Definition: connectivity_state.h:29
GPRAPI void gpr_free(void *ptr)
free
Definition: alloc.cc:50
Round Robin Policy.
Definition: backend_metric.cc:24
const char * ConnectivityStateName(grpc_connectivity_state state)
Definition: connectivity_state.cc:36
struct grpc_pollset_set grpc_pollset_set
Definition: pollset_set.h:31
char * grpc_sockaddr_to_uri(const grpc_resolved_address *resolved_addr)
Definition: sockaddr_utils.cc:219
An array of arguments that can be passed around.
Definition: grpc_types.h:132
#define GRPC_ARG_SUBCHANNEL_ADDRESS
Definition: subchannel.h:41
#define GRPC_TRACE_FLAG_ENABLED(f)
Definition: trace.h:112
#define GPR_ARRAY_SIZE(array)
Definition: useful.h:31