GRPC Core  9.0.0
flow_control.h
Go to the documentation of this file.
1 /*
2  *
3  * Copyright 2017 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
19 #ifndef GRPC_CORE_EXT_TRANSPORT_CHTTP2_TRANSPORT_FLOW_CONTROL_H
20 #define GRPC_CORE_EXT_TRANSPORT_CHTTP2_TRANSPORT_FLOW_CONTROL_H
21 
23 
24 #include <stdint.h>
25 
31 
33 struct grpc_chttp2_stream;
34 
36 
37 namespace grpc {
38 namespace testing {
39 class TrickledCHTTP2; // to make this a friend
40 } // namespace testing
41 } // namespace grpc
42 
43 namespace grpc_core {
44 namespace chttp2 {
45 
46 static constexpr uint32_t kDefaultWindow = 65535;
47 static constexpr int64_t kMaxWindow = static_cast<int64_t>((1u << 31) - 1);
48 // TODO(ncteisen): Tune this
49 static constexpr uint32_t kFrameSize = 1024 * 1024;
50 
51 class TransportFlowControl;
52 class StreamFlowControl;
53 
54 // Encapsulates a collections of actions the transport needs to take with
55 // regard to flow control. Each action comes with urgencies that tell the
56 // transport how quickly the action must take place.
58  public:
59  enum class Urgency : uint8_t {
60  // Nothing to be done.
61  NO_ACTION_NEEDED = 0,
62  // Initiate a write to update the initial window immediately.
64  // Push the flow control update into a send buffer, to be sent
65  // out the next time a write is initiated.
67  };
68 
69  Urgency send_stream_update() const { return send_stream_update_; }
70  Urgency send_transport_update() const { return send_transport_update_; }
72  return send_initial_window_update_;
73  }
75  return send_max_frame_size_update_;
76  }
77  uint32_t initial_window_size() const { return initial_window_size_; }
78  uint32_t max_frame_size() const { return max_frame_size_; }
79 
81  send_stream_update_ = u;
82  return *this;
83  }
85  send_transport_update_ = u;
86  return *this;
87  }
89  uint32_t update) {
90  send_initial_window_update_ = u;
91  initial_window_size_ = update;
92  return *this;
93  }
95  uint32_t update) {
96  send_max_frame_size_update_ = u;
97  max_frame_size_ = update;
98  return *this;
99  }
100 
101  static const char* UrgencyString(Urgency u);
102  void Trace(grpc_chttp2_transport* t) const;
103 
104  private:
105  Urgency send_stream_update_ = Urgency::NO_ACTION_NEEDED;
106  Urgency send_transport_update_ = Urgency::NO_ACTION_NEEDED;
107  Urgency send_initial_window_update_ = Urgency::NO_ACTION_NEEDED;
108  Urgency send_max_frame_size_update_ = Urgency::NO_ACTION_NEEDED;
109  uint32_t initial_window_size_ = 0;
110  uint32_t max_frame_size_ = 0;
111 };
112 
114  public:
115  FlowControlTrace(const char* reason, TransportFlowControl* tfc,
116  StreamFlowControl* sfc) {
117  if (enabled_) Init(reason, tfc, sfc);
118  }
119 
121  if (enabled_) Finish();
122  }
123 
124  private:
125  void Init(const char* reason, TransportFlowControl* tfc,
126  StreamFlowControl* sfc);
127  void Finish();
128 
129  const bool enabled_ = GRPC_TRACE_FLAG_ENABLED(grpc_flowctl_trace);
130 
131  TransportFlowControl* tfc_;
132  StreamFlowControl* sfc_;
133  const char* reason_;
134  int64_t remote_window_;
135  int64_t target_window_;
136  int64_t announced_window_;
137  int64_t remote_window_delta_;
138  int64_t local_window_delta_;
139  int64_t announced_window_delta_;
140 };
141 
142 // Fat interface with all methods a flow control implementation needs to
143 // support. gRPC C Core does not support pure virtual functions, so instead
144 // we abort in any methods which require implementation in the base class.
146  public:
149 
150  // Is flow control enabled? This is needed in other codepaths like the checks
151  // in parsing and in writing.
152  virtual bool flow_control_enabled() const { abort(); }
153 
154  // Called to check if the transport needs to send a WINDOW_UPDATE frame
155  virtual uint32_t MaybeSendUpdate(bool /* writing_anyway */) { abort(); }
156 
157  // Using the protected members, returns and Action to be taken by the
158  // tranport.
159  virtual FlowControlAction MakeAction() { abort(); }
160 
161  // Using the protected members, returns and Action to be taken by the
162  // tranport. Also checks for updates to our BDP estimate and acts
163  // accordingly.
164  virtual FlowControlAction PeriodicUpdate() { abort(); }
165 
166  // Called to do bookkeeping when a stream owned by this transport sends
167  // data on the wire
168  virtual void StreamSentData(int64_t /* size */) { abort(); }
169 
170  // Called to do bookkeeping when a stream owned by this transport receives
171  // data from the wire. Also does error checking for frame size.
172  virtual grpc_error* RecvData(int64_t /* incoming_frame_size */) { abort(); }
173 
174  // Called to do bookkeeping when we receive a WINDOW_UPDATE frame.
175  virtual void RecvUpdate(uint32_t /* size */) { abort(); }
176 
177  // Returns the BdpEstimator held by this object. Caller is responsible for
178  // checking for nullptr. TODO(ncteisen): consider fully encapsulating all
179  // bdp estimator actions inside TransportFlowControl
180  virtual BdpEstimator* bdp_estimator() { return nullptr; }
181 
182  // Getters
183  int64_t remote_window() const { return remote_window_; }
184  virtual int64_t target_window() const { return target_initial_window_size_; }
185  int64_t announced_window() const { return announced_window_; }
186 
187  // Used in certain benchmarks in which we don't want FlowControl to be a
188  // factor
189  virtual void TestOnlyForceHugeWindow() {}
190 
191  protected:
192  friend class ::grpc::testing::TrickledCHTTP2;
193  int64_t remote_window_ = kDefaultWindow;
194  int64_t target_initial_window_size_ = kDefaultWindow;
195  int64_t announced_window_ = kDefaultWindow;
196 };
197 
198 // Implementation of flow control that does NOTHING. Always returns maximum
199 // values, never initiates writes, and assumes that the remote peer is doing
200 // the same. To be used to narrow down on flow control as the cause of negative
201 // performance.
203  public:
204  // Maxes out all values
206 
207  bool flow_control_enabled() const override { return false; }
208 
209  // Never do anything.
210  uint32_t MaybeSendUpdate(bool /* writing_anyway */) override { return 0; }
213  void StreamSentData(int64_t /* size */) override {}
214  grpc_error* RecvData(int64_t /* incoming_frame_size */) override {
215  return GRPC_ERROR_NONE;
216  }
217  void RecvUpdate(uint32_t /* size */) override {}
218 };
219 
220 // Implementation of flow control that abides to HTTP/2 spec and attempts
221 // to be as performant as possible.
223  public:
224  TransportFlowControl(const grpc_chttp2_transport* t, bool enable_bdp_probe);
226 
227  bool flow_control_enabled() const override { return true; }
228 
229  bool bdp_probe() const { return enable_bdp_probe_; }
230 
231  // returns an announce if we should send a transport update to our peer,
232  // else returns zero; writing_anyway indicates if a write would happen
233  // regardless of the send - if it is false and this function returns non-zero,
234  // this announce will cause a write to occur
235  uint32_t MaybeSendUpdate(bool writing_anyway) override;
236 
237  // Reads the flow control data and returns and actionable struct that will
238  // tell chttp2 exactly what it needs to do
240  return UpdateAction(FlowControlAction());
241  }
242 
243  // Call periodically (at a low-ish rate, 100ms - 10s makes sense)
244  // to perform more complex flow control calculations and return an action
245  // to let chttp2 change its parameters
247 
248  void StreamSentData(int64_t size) override { remote_window_ -= size; }
249 
250  grpc_error* ValidateRecvData(int64_t incoming_frame_size);
251  void CommitRecvData(int64_t incoming_frame_size) {
252  announced_window_ -= incoming_frame_size;
253  }
254 
255  grpc_error* RecvData(int64_t incoming_frame_size) override {
256  FlowControlTrace trace(" data recv", this, nullptr);
257  grpc_error* error = ValidateRecvData(incoming_frame_size);
258  if (error != GRPC_ERROR_NONE) return error;
259  CommitRecvData(incoming_frame_size);
260  return GRPC_ERROR_NONE;
261  }
262 
263  // we have received a WINDOW_UPDATE frame for a transport
264  void RecvUpdate(uint32_t size) override {
265  FlowControlTrace trace("t updt recv", this, nullptr);
266  remote_window_ += size;
267  }
268 
269  // See comment above announced_stream_total_over_incoming_window_ for the
270  // logic behind this decision.
271  int64_t target_window() const override {
272  return static_cast<uint32_t> GPR_MIN(
273  (int64_t)((1u << 31) - 1),
274  announced_stream_total_over_incoming_window_ +
276  }
277 
278  const grpc_chttp2_transport* transport() const { return t_; }
279 
281  if (delta > 0) {
282  announced_stream_total_over_incoming_window_ -= delta;
283  }
284  }
285 
287  if (delta > 0) {
288  announced_stream_total_over_incoming_window_ += delta;
289  }
290  }
291 
292  BdpEstimator* bdp_estimator() override { return &bdp_estimator_; }
293 
294  void TestOnlyForceHugeWindow() override {
295  announced_window_ = 1024 * 1024 * 1024;
296  remote_window_ = 1024 * 1024 * 1024;
297  }
298 
299  private:
300  double TargetLogBdp();
301  double SmoothLogBdp(double value);
302  FlowControlAction::Urgency DeltaUrgency(int64_t value,
303  grpc_chttp2_setting_id setting_id);
304 
305  FlowControlAction UpdateAction(FlowControlAction action) {
306  if (announced_window_ < target_window() / 2) {
309  }
310  return action;
311  }
312 
313  const grpc_chttp2_transport* const t_;
314 
323  int64_t announced_stream_total_over_incoming_window_ = 0;
324 
326  const bool enable_bdp_probe_;
327 
328  /* bdp estimation */
329  grpc_core::BdpEstimator bdp_estimator_;
330 
331  /* pid controller */
332  grpc_core::PidController pid_controller_;
333  grpc_millis last_pid_update_ = 0;
334 };
335 
336 // Fat interface with all methods a stream flow control implementation needs
337 // to support. gRPC C Core does not support pure virtual functions, so instead
338 // we abort in any methods which require implementation in the base class.
340  public:
343 
344  // Updates an action using the protected members.
346  abort();
347  }
348 
349  // Using the protected members, returns an Action for this stream to be
350  // taken by the tranport.
351  virtual FlowControlAction MakeAction() { abort(); }
352 
353  // Bookkeeping for when data is sent on this stream.
354  virtual void SentData(int64_t /* outgoing_frame_size */) { abort(); }
355 
356  // Bookkeeping and error checking for when data is received by this stream.
357  virtual grpc_error* RecvData(int64_t /* incoming_frame_size */) { abort(); }
358 
359  // Called to check if this stream needs to send a WINDOW_UPDATE frame.
360  virtual uint32_t MaybeSendUpdate() { abort(); }
361 
362  // Bookkeeping for receiving a WINDOW_UPDATE from for this stream.
363  virtual void RecvUpdate(uint32_t /* size */) { abort(); }
364 
365  // Bookkeeping for when a call pulls bytes out of the transport. At this
366  // point we consider the data 'used' and can thus let out peer know we are
367  // ready for more data.
368  virtual void IncomingByteStreamUpdate(size_t /* max_size_hint */,
369  size_t /* have_already */) {
370  abort();
371  }
372 
373  // Used in certain benchmarks in which we don't want FlowControl to be a
374  // factor
375  virtual void TestOnlyForceHugeWindow() {}
376 
377  // Getters
381 
382  protected:
383  friend class ::grpc::testing::TrickledCHTTP2;
384  int64_t remote_window_delta_ = 0;
385  int64_t local_window_delta_ = 0;
387 };
388 
389 // Implementation of flow control that does NOTHING. Always returns maximum
390 // values, never initiates writes, and assumes that the remote peer is doing
391 // the same. To be used to narrow down on flow control as the cause of negative
392 // performance.
394  public:
396  return action;
397  }
399  void SentData(int64_t /* outgoing_frame_size */) override {}
400  grpc_error* RecvData(int64_t /* incoming_frame_size */) override {
401  return GRPC_ERROR_NONE;
402  }
403  uint32_t MaybeSendUpdate() override { return 0; }
404  void RecvUpdate(uint32_t /* size */) override {}
405  void IncomingByteStreamUpdate(size_t /* max_size_hint */,
406  size_t /* have_already */) override {}
407 };
408 
409 // Implementation of flow control that abides to HTTP/2 spec and attempts
410 // to be as performant as possible.
412  public:
416  }
417 
420  return UpdateAction(tfc_->MakeAction());
421  }
422 
423  // we have sent data on the wire, we must track this in our bookkeeping for
424  // the remote peer's flow control.
425  void SentData(int64_t outgoing_frame_size) override {
426  FlowControlTrace tracer(" data sent", tfc_, this);
427  tfc_->StreamSentData(outgoing_frame_size);
428  remote_window_delta_ -= outgoing_frame_size;
429  }
430 
431  // we have received data from the wire
432  grpc_error* RecvData(int64_t incoming_frame_size) override;
433 
434  // returns an announce if we should send a stream update to our peer, else
435  // returns zero
436  uint32_t MaybeSendUpdate() override;
437 
438  // we have received a WINDOW_UPDATE frame for a stream
439  void RecvUpdate(uint32_t size) override {
440  FlowControlTrace trace("s updt recv", tfc_, this);
441  remote_window_delta_ += size;
442  }
443 
444  // the application is asking for a certain amount of bytes
445  void IncomingByteStreamUpdate(size_t max_size_hint,
446  size_t have_already) override;
447 
448  int64_t remote_window_delta() const { return remote_window_delta_; }
449  int64_t local_window_delta() const { return local_window_delta_; }
451 
452  const grpc_chttp2_stream* stream() const { return s_; }
453 
454  void TestOnlyForceHugeWindow() override {
455  announced_window_delta_ = 1024 * 1024 * 1024;
456  local_window_delta_ = 1024 * 1024 * 1024;
457  remote_window_delta_ = 1024 * 1024 * 1024;
458  }
459 
460  private:
461  TransportFlowControl* const tfc_;
462  const grpc_chttp2_stream* const s_;
463 
464  void UpdateAnnouncedWindowDelta(TransportFlowControl* tfc, int64_t change) {
466  announced_window_delta_ += change;
468  }
469 };
470 
471 } // namespace chttp2
472 } // namespace grpc_core
473 
474 #endif
Definition: bdp_estimator.h:38
Definition: pid_controller.h:35
Definition: trace.h:61
Definition: flow_control.h:57
FlowControlAction & set_send_max_frame_size_update(Urgency u, uint32_t update)
Definition: flow_control.h:94
uint32_t max_frame_size() const
Definition: flow_control.h:78
FlowControlAction & set_send_initial_window_update(Urgency u, uint32_t update)
Definition: flow_control.h:88
uint32_t initial_window_size() const
Definition: flow_control.h:77
Urgency send_stream_update() const
Definition: flow_control.h:69
Urgency send_max_frame_size_update() const
Definition: flow_control.h:74
FlowControlAction & set_send_stream_update(Urgency u)
Definition: flow_control.h:80
static const char * UrgencyString(Urgency u)
Definition: flow_control.cc:126
FlowControlAction & set_send_transport_update(Urgency u)
Definition: flow_control.h:84
Urgency
Definition: flow_control.h:59
Urgency send_transport_update() const
Definition: flow_control.h:70
void Trace(grpc_chttp2_transport *t) const
Definition: flow_control.cc:140
Urgency send_initial_window_update() const
Definition: flow_control.h:71
Definition: flow_control.h:113
~FlowControlTrace()
Definition: flow_control.h:120
FlowControlTrace(const char *reason, TransportFlowControl *tfc, StreamFlowControl *sfc)
Definition: flow_control.h:115
Definition: flow_control.h:339
int64_t local_window_delta()
Definition: flow_control.h:379
StreamFlowControlBase()
Definition: flow_control.h:341
virtual grpc_error * RecvData(int64_t)
Definition: flow_control.h:357
virtual FlowControlAction UpdateAction(FlowControlAction)
Definition: flow_control.h:345
virtual FlowControlAction MakeAction()
Definition: flow_control.h:351
virtual void SentData(int64_t)
Definition: flow_control.h:354
virtual void IncomingByteStreamUpdate(size_t, size_t)
Definition: flow_control.h:368
virtual ~StreamFlowControlBase()
Definition: flow_control.h:342
int64_t announced_window_delta()
Definition: flow_control.h:380
int64_t remote_window_delta()
Definition: flow_control.h:378
virtual uint32_t MaybeSendUpdate()
Definition: flow_control.h:360
int64_t announced_window_delta_
Definition: flow_control.h:386
virtual void TestOnlyForceHugeWindow()
Definition: flow_control.h:375
virtual void RecvUpdate(uint32_t)
Definition: flow_control.h:363
int64_t local_window_delta_
Definition: flow_control.h:385
int64_t remote_window_delta_
Definition: flow_control.h:384
Definition: flow_control.h:393
void SentData(int64_t) override
Definition: flow_control.h:399
grpc_error * RecvData(int64_t) override
Definition: flow_control.h:400
FlowControlAction MakeAction() override
Definition: flow_control.h:398
FlowControlAction UpdateAction(FlowControlAction action) override
Definition: flow_control.h:395
void RecvUpdate(uint32_t) override
Definition: flow_control.h:404
void IncomingByteStreamUpdate(size_t, size_t) override
Definition: flow_control.h:405
uint32_t MaybeSendUpdate() override
Definition: flow_control.h:403
Definition: flow_control.h:411
FlowControlAction MakeAction() override
Definition: flow_control.h:419
StreamFlowControl(TransportFlowControl *tfc, const grpc_chttp2_stream *s)
Definition: flow_control.cc:218
int64_t local_window_delta() const
Definition: flow_control.h:449
int64_t remote_window_delta() const
Definition: flow_control.h:448
void SentData(int64_t outgoing_frame_size) override
Definition: flow_control.h:425
void RecvUpdate(uint32_t size) override
Definition: flow_control.h:439
uint32_t MaybeSendUpdate() override
Definition: flow_control.cc:267
~StreamFlowControl()
Definition: flow_control.h:414
void IncomingByteStreamUpdate(size_t max_size_hint, size_t have_already) override
Definition: flow_control.cc:278
int64_t announced_window_delta() const
Definition: flow_control.h:450
FlowControlAction UpdateAction(FlowControlAction action) override
Definition: flow_control.cc:389
const grpc_chttp2_stream * stream() const
Definition: flow_control.h:452
grpc_error * RecvData(int64_t incoming_frame_size) override
Definition: flow_control.cc:222
void TestOnlyForceHugeWindow() override
Definition: flow_control.h:454
Definition: flow_control.h:145
int64_t announced_window() const
Definition: flow_control.h:185
virtual BdpEstimator * bdp_estimator()
Definition: flow_control.h:180
virtual bool flow_control_enabled() const
Definition: flow_control.h:152
virtual int64_t target_window() const
Definition: flow_control.h:184
virtual ~TransportFlowControlBase()
Definition: flow_control.h:148
int64_t remote_window() const
Definition: flow_control.h:183
TransportFlowControlBase()
Definition: flow_control.h:147
virtual void TestOnlyForceHugeWindow()
Definition: flow_control.h:189
int64_t target_initial_window_size_
Definition: flow_control.h:194
int64_t announced_window_
Definition: flow_control.h:195
virtual grpc_error * RecvData(int64_t)
Definition: flow_control.h:172
virtual uint32_t MaybeSendUpdate(bool)
Definition: flow_control.h:155
virtual void RecvUpdate(uint32_t)
Definition: flow_control.h:175
virtual void StreamSentData(int64_t)
Definition: flow_control.h:168
int64_t remote_window_
Definition: flow_control.h:193
virtual FlowControlAction MakeAction()
Definition: flow_control.h:159
virtual FlowControlAction PeriodicUpdate()
Definition: flow_control.h:164
bool flow_control_enabled() const override
Definition: flow_control.h:207
FlowControlAction PeriodicUpdate() override
Definition: flow_control.h:212
void RecvUpdate(uint32_t) override
Definition: flow_control.h:217
void StreamSentData(int64_t) override
Definition: flow_control.h:213
TransportFlowControlDisabled(grpc_chttp2_transport *t)
Definition: flow_control.cc:156
uint32_t MaybeSendUpdate(bool) override
Definition: flow_control.h:210
grpc_error * RecvData(int64_t) override
Definition: flow_control.h:214
FlowControlAction MakeAction() override
Definition: flow_control.h:211
Definition: flow_control.h:222
grpc_error * ValidateRecvData(int64_t incoming_frame_size)
Definition: flow_control.cc:204
void RecvUpdate(uint32_t size) override
Definition: flow_control.h:264
grpc_error * RecvData(int64_t incoming_frame_size) override
Definition: flow_control.h:255
void StreamSentData(int64_t size) override
Definition: flow_control.h:248
BdpEstimator * bdp_estimator() override
Definition: flow_control.h:292
void TestOnlyForceHugeWindow() override
Definition: flow_control.h:294
~TransportFlowControl()
Definition: flow_control.h:225
FlowControlAction MakeAction() override
Definition: flow_control.h:239
void PreUpdateAnnouncedWindowOverIncomingWindow(int64_t delta)
Definition: flow_control.h:280
int64_t target_window() const override
Definition: flow_control.h:271
void CommitRecvData(int64_t incoming_frame_size)
Definition: flow_control.h:251
const grpc_chttp2_transport * transport() const
Definition: flow_control.h:278
FlowControlAction PeriodicUpdate() override
Definition: flow_control.cc:356
bool bdp_probe() const
Definition: flow_control.h:229
TransportFlowControl(const grpc_chttp2_transport *t, bool enable_bdp_probe)
Definition: flow_control.cc:175
uint32_t MaybeSendUpdate(bool writing_anyway) override
Definition: flow_control.cc:190
void PostUpdateAnnouncedWindowOverIncomingWindow(int64_t delta)
Definition: flow_control.h:286
bool flow_control_enabled() const override
Definition: flow_control.h:227
#define GRPC_ERROR_NONE
The following "special" errors can be propagated without allocating memory.
Definition: error.h:125
int64_t grpc_millis
Definition: exec_ctx.h:35
grpc_core::TraceFlag grpc_flowctl_trace
grpc_chttp2_setting_id
Definition: http2_settings.h:29
Round Robin Policy.
Definition: backend_metric.cc:24
Definition: flow_control.h:37
Definition: internal.h:508
Definition: internal.h:290
Definition: error_internal.h:39
#define GRPC_TRACE_FLAG_ENABLED(f)
Definition: trace.h:112
#define GPR_MIN(a, b)
useful macros that don't belong anywhere else
Definition: useful.h:24