Belle II Software  release-05-02-19
Downloader.cc
1 /**************************************************************************
2  * BASF2 (Belle Analysis Framework 2) *
3  * Copyright(C) 2016-2019 Belle II Collaboration *
4  * *
5  * Author: The Belle II Collaboration *
6  * Contributors: Martin Ritter *
7  * *
8  * This software is provided "as is" without any warranty. *
9  **************************************************************************/
10 
11 #include <framework/database/Downloader.h>
12 #include <framework/logging/Logger.h>
13 #include <framework/gearbox/Unit.h>
14 #include <framework/utilities/Utils.h>
15 #include <framework/utilities/EnvironmentVariables.h>
16 
17 #include <curl/curl.h>
18 #include <TMD5.h>
19 #include <TRandom.h>
20 
21 #include <boost/algorithm/string.hpp>
22 
23 #include <chrono>
24 #include <memory>
25 #include <thread>
26 
27 namespace Belle2::Conditions {
29  struct CurlSession {
31  CURL* curl{nullptr};
33  curl_slist* headers{nullptr};
35  char errbuf[CURL_ERROR_SIZE];
37  double lasttime{0};
38  };
39 
40  namespace {
51  size_t write_function(void* buffer, size_t size, size_t nmemb, void* userp)
52  {
53  // size in bytes is size*nmemb so copy the correct amount and return it to curl
54  try {
55  std::ostream& stream = *static_cast<std::ostream*>(userp);
56  stream.write(static_cast<const char*>(buffer), size * nmemb);
57  } catch (std::ios_base::failure& e) {
58  B2ERROR("Writing error while downloading: " << e.code().message() << '(' << e.code().value() << ')');
59  return 0;
60  }
61  return size * nmemb;
62  }
63 
74  int progress_callback(void* clientp, curl_off_t dltotal, curl_off_t dlnow,
75  __attribute((unused)) curl_off_t ultotal, __attribute((unused)) curl_off_t ulnow)
76  {
77  // nothing to show ...
78  if (dlnow == 0) return 0;
79  // otherwise print number of transferred bytes
80  CurlSession& status = *static_cast<CurlSession*>(clientp);
81  double time = Utils::getClock();
82  // make sure we don't print the status too often
83  if (status.lasttime != 0 && (time - status.lasttime) / Unit::ms < 200) {
84  return 0;
85  }
86  status.lasttime = time;
87  if (dltotal > 0) {
88  B2DEBUG(39, "curl:= " << dlnow << " / " << dltotal << " bytes transferred");
89  } else {
90  B2DEBUG(39, "curl:= " << dlnow << " bytes transferred");
91  }
92  return 0;
93  }
94 
105  int debug_callback([[maybe_unused]] CURL* handle, curl_infotype type, char* data, size_t size,
106  [[maybe_unused]] void* userptr)
107  {
108  std::string prefix = "curl:";
109  // Choose loglevel: if type is CURLINFO_TEXT the messages are general
110  // informations about what curl is doing. The more detailed information
111  // about incoming/outgoing headers is a bit less important so give it a
112  // higher log level.
113  int level = 39;
114  if (type == CURLINFO_TEXT) { prefix += "*"; level = 38; }
115  else if (type == CURLINFO_HEADER_OUT) prefix += ">";
116  else if (type == CURLINFO_HEADER_IN) prefix += "<";
117  else return 0;
118  // Convert char* data to a string and strip whitespace ...
119  std::string message(data, size);
120  boost::trim(message);
121  // And log if there's something left
122  if (!message.empty()) B2DEBUG(level, prefix << " " << message);
123  return 0;
124  }
125 
127  std::string getUserAgent()
128  {
129  return "BASF2/" + ::Belle2::EnvironmentVariables::get("BELLE2_RELEASE", "unknown");
130  }
131  }
132  /* We only want to initialize curl once */
133  bool Downloader::s_globalInit{false};
134 
135  Downloader& Downloader::getDefaultInstance()
136  {
137  static Downloader instance;
138  return instance;
139  }
140 
142 
143  std::string Downloader::escapeString(const std::string& text)
144  {
145  //make sure we have an active curl session ...
146  auto session = ensureSession();
147  char* escaped = curl_easy_escape(m_session->curl, text.c_str(), text.size());
148  if (!escaped) {
149  throw std::runtime_error("Could not escape string");
150  }
151  std::string escapedStr{escaped};
152  curl_free(escaped);
153  return escapedStr;
154  }
155 
157  std::string Downloader::joinWithSlash(const std::string& base, const std::string& rest)
158  {
159  return boost::trim_right_copy_if(base, boost::is_any_of("/")) + "/" +
160  boost::trim_left_copy_if(rest, boost::is_any_of("/"));
161  }
162 
164  {
165  // start a curl session but if there is already one return false
166  if (m_session) return false;
167  // make sure curl is initialized correctly
168  if (!s_globalInit) {
169  curl_global_init(CURL_GLOBAL_ALL);
170  s_globalInit = true;
171  }
172  // create the curl session
173  m_session = std::make_unique<CurlSession>();
174  m_session->curl = curl_easy_init();
175  if (!m_session->curl) {
176  B2FATAL("Cannot initialize libcurl");
177  }
178  m_session->headers = curl_slist_append(nullptr, "Accept: application/json");
179  curl_easy_setopt(m_session->curl, CURLOPT_HTTPHEADER, m_session->headers);
180  curl_easy_setopt(m_session->curl, CURLOPT_TCP_KEEPALIVE, 1L);
181  curl_easy_setopt(m_session->curl, CURLOPT_CONNECTTIMEOUT, m_connectionTimeout);
182  curl_easy_setopt(m_session->curl, CURLOPT_LOW_SPEED_LIMIT, 10 * 1024); //10 kB/s
183  curl_easy_setopt(m_session->curl, CURLOPT_LOW_SPEED_TIME, m_stalledTimeout);
184  curl_easy_setopt(m_session->curl, CURLOPT_WRITEFUNCTION, write_function);
185  curl_easy_setopt(m_session->curl, CURLOPT_VERBOSE, 1);
186  curl_easy_setopt(m_session->curl, CURLOPT_NOPROGRESS, 0);
187  curl_easy_setopt(m_session->curl, CURLOPT_DEBUGFUNCTION, debug_callback);
188  curl_easy_setopt(m_session->curl, CURLOPT_XFERINFOFUNCTION, progress_callback);
189  curl_easy_setopt(m_session->curl, CURLOPT_XFERINFODATA, m_session.get());
190  curl_easy_setopt(m_session->curl, CURLOPT_FAILONERROR, true);
191  curl_easy_setopt(m_session->curl, CURLOPT_ERRORBUFFER, m_session->errbuf);
192  // enable transparent compression support
193  curl_easy_setopt(m_session->curl, CURLOPT_ACCEPT_ENCODING, "");
194  // Set proxy if defined
195  if (EnvironmentVariables::isSet("BELLE2_CONDB_PROXY")) {
196  const std::string proxy = EnvironmentVariables::get("BELLE2_CONDB_PROXY");
197  curl_easy_setopt(m_session->curl, CURLOPT_PROXY, proxy.c_str());
198  }
199  curl_easy_setopt(m_session->curl, CURLOPT_AUTOREFERER, 1L);
200  curl_easy_setopt(m_session->curl, CURLOPT_FOLLOWLOCATION, 1L);
201  curl_easy_setopt(m_session->curl, CURLOPT_MAXREDIRS, 10L);
202  curl_easy_setopt(m_session->curl, CURLOPT_TCP_FASTOPEN, 0L);
203  curl_easy_setopt(m_session->curl, CURLOPT_SSL_VERIFYPEER, 0L);
204  curl_easy_setopt(m_session->curl, CURLOPT_SSL_VERIFYHOST, 0L);
205  curl_easy_setopt(m_session->curl, CURLOPT_SSL_VERIFYSTATUS, 0L);
206  // Don't cache DNS entries, ask the system every time we need to connect ...
207  curl_easy_setopt(m_session->curl, CURLOPT_DNS_CACHE_TIMEOUT, 0L);
208  // and shuffle the addresses so we try a different node, otherwise we might
209  // always get the same address due to system caching and RFC 3484
210  curl_easy_setopt(m_session->curl, CURLOPT_DNS_SHUFFLE_ADDRESSES, 1L);
211  auto version = getUserAgent();
212  curl_easy_setopt(m_session->curl, CURLOPT_USERAGENT, version.c_str());
213  return true;
214  }
215 
217  {
218  // if there's a session clean it ...
219  if (m_session) {
220  curl_easy_cleanup(m_session->curl);
221  curl_slist_free_all(m_session->headers);
222  m_session.reset();
223  }
224  }
225 
226  std::string Downloader::calculateChecksum(std::istream& input)
227  {
228  // rewind stream
229  input.clear();
230  input.seekg(0, std::ios::beg);
231  // and calculate md5 checksum by feeding it blockwise to the TMD5 update
232  TMD5 md5;
233  char buffer[4096];
234  while (input.good()) {
235  input.read(buffer, 4096);
236  if (input.gcount() == 0) break;
237  md5.Update((unsigned char*)buffer, input.gcount());
238  }
239  // finalize and return output
240  md5.Final();
241  return md5.AsString();
242  }
243 
244  void Downloader::setConnectionTimeout(unsigned int timeout)
245  {
246  m_connectionTimeout = timeout;
247  if (m_session) {
248  curl_easy_setopt(m_session->curl, CURLOPT_CONNECTTIMEOUT, m_connectionTimeout);
249  }
250  }
251 
252  void Downloader::setStalledTimeout(unsigned int timeout)
253  {
254  m_stalledTimeout = timeout;
255  if (m_session) {
256  curl_easy_setopt(m_session->curl, CURLOPT_LOW_SPEED_TIME, m_stalledTimeout);
257  }
258  }
259 
260  bool Downloader::download(const std::string& url, std::ostream& buffer, bool silentOnMissing)
261  {
262  //make sure we have an active curl session ...
263  auto session = ensureSession();
264  B2DEBUG(37, "Download started ..." << LogVar("url", url));
265  // we might need to try a few times in case of HTTP>=500
266  for (unsigned int retry{1};; ++retry) {
267  //rewind the stream to the beginning
268  buffer.clear();
269  buffer.seekp(0, std::ios::beg);
270  if (!buffer.good()) {
271  throw std::runtime_error("cannot write to stream");
272  }
273  // Set the exception flags to notify us of any problem during writing
274  auto oldExceptionMask = buffer.exceptions();
275  buffer.exceptions(std::ios::failbit | std::ios::badbit);
276  // build the request ...
277  CURLcode res{CURLE_FAILED_INIT};
278  // and set all the curl options
279  curl_easy_setopt(m_session->curl, CURLOPT_URL, url.c_str());
280  curl_easy_setopt(m_session->curl, CURLOPT_WRITEDATA, &buffer);
281  // perform the request ...
282  res = curl_easy_perform(m_session->curl);
283  // flush output
284  buffer.exceptions(oldExceptionMask);
285  buffer.flush();
286  // and check for errors which occurred during download ...
287  if (res != CURLE_OK) {
288  size_t len = strlen(m_session->errbuf);
289  const std::string error = len ? m_session->errbuf : curl_easy_strerror(res);
290  if (m_maxRetries > 0 && res == CURLE_HTTP_RETURNED_ERROR) {
291  if (retry <= m_maxRetries) {
292  // we treat everything below 500 as permanent error with the request,
293  // only retry on 500.
294  long responseCode{0};
295  curl_easy_getinfo(m_session->curl, CURLINFO_RESPONSE_CODE, &responseCode);
296  if (responseCode >= 500) {
297  // use exponential backoff but don't restrict to exact slots like
298  // Ethernet, just use a random wait time between 1s and maxDelay =
299  // 2^(retry)-1 * backoffFactor
300  double maxDelay = (std::pow(2, retry) - 1) * m_backoffFactor;
301  double seconds = gRandom->Uniform(1., maxDelay);
302  B2WARNING("Could not download url, retrying ..."
303  << LogVar("url", url) << LogVar("error", error)
304  << LogVar("try", retry) << LogVar("waiting time", seconds));
305  std::this_thread::sleep_for(std::chrono::milliseconds((int)(seconds * 1e3)));
306  continue;
307  }
308  if (responseCode == 404 and silentOnMissing) return false;
309  }
310  }
311  throw std::runtime_error(error);
312  }
313  break;
314  }
315  // all fine
316  B2DEBUG(37, "Download finished successfully." << LogVar("url", url));
317  return true;
318  }
319 } // namespace Belle2::Conditions
Belle2::Unit::ms
static const double ms
[millisecond]
Definition: Unit.h:106
Belle2::Conditions::Downloader::finishSession
void finishSession()
Finish an existing curl session if any is active at the moment.
Definition: Downloader.cc:224
Belle2::Conditions::Downloader::download
bool download(const std::string &url, std::ostream &stream, bool silentOnMissing=false)
get an url and save the content to stream This function raises exceptions when there are any problems
Definition: Downloader.cc:268
Belle2::Conditions::Downloader::calculateChecksum
static std::string calculateChecksum(std::istream &input)
calculate the digest/checksum on a given string.
Definition: Downloader.cc:234
Belle2::Conditions::Downloader::~Downloader
~Downloader()
Destructor.
Definition: Downloader.cc:149
Belle2::Conditions::Downloader::setStalledTimeout
void setStalledTimeout(unsigned int timeout)
Set the timeout to wait for stalled connections (<10KB/s), 0 disables timeout.
Definition: Downloader.cc:260
Belle2::Conditions::Downloader::ensureSession
ScopeGuard ensureSession()
Make sure there's an active session and return a ScopeGuard object that closes the session on destruc...
Definition: Downloader.h:53
Belle2::Conditions::Downloader::m_stalledTimeout
unsigned int m_stalledTimeout
Timeout to wait for stalled connections (<10KB/s)
Definition: Downloader.h:112
Belle2::Conditions::Downloader::escapeString
std::string escapeString(const std::string &text)
Escape a string to make it safe to be used in web requests.
Definition: Downloader.cc:151
Belle2::EnvironmentVariables::get
static std::string get(const std::string &name, const std::string &fallback="")
Get the value of an environment variable or the given fallback value if the variable is not set.
Definition: EnvironmentVariables.cc:35
Belle2::Conditions::Downloader::joinWithSlash
std::string joinWithSlash(const std::string &base, const std::string &second)
Join two strings and make sure that there is exactly one '/' between them.
Definition: Downloader.cc:165
Belle2::Conditions::Downloader::Downloader
Downloader()=default
Create a new payload downloader.
Belle2::Conditions::CurlSession::errbuf
char errbuf[CURL_ERROR_SIZE]
error buffer in case some error happens during downloading
Definition: Downloader.cc:51
Belle2::Conditions::Downloader::m_maxRetries
unsigned int m_maxRetries
Number of retries to perform when downloading fails with HTTP response code >=500.
Definition: Downloader.h:114
Belle2::Conditions::Downloader::m_connectionTimeout
unsigned int m_connectionTimeout
Timeout to wait for connections in seconds.
Definition: Downloader.h:110
Belle2::Conditions::Downloader::s_globalInit
static bool s_globalInit
flag to indicate whether curl has been initialized already
Definition: Downloader.h:108
Belle2::Conditions::Downloader::startSession
bool startSession()
Start a new curl session if none is active at the moment.
Definition: Downloader.cc:171
Belle2::Conditions::Downloader::getDefaultInstance
static Downloader & getDefaultInstance()
Return the default instance.
Definition: Downloader.cc:143
LogVar
Class to store variables with their name which were sent to the logging service.
Definition: LogVariableStream.h:24
Belle2::EnvironmentVariables::isSet
static bool isSet(const std::string &name)
Check if a value is set in the database.
Definition: EnvironmentVariables.cc:29
Belle2::Conditions::CurlSession::headers
curl_slist * headers
headers to send with every request
Definition: Downloader.cc:49
Belle2::Conditions::CurlSession::curl
CURL * curl
curl session information
Definition: Downloader.cc:47
Belle2::Utils::getClock
double getClock()
Return current value of the real-time clock.
Definition: Utils.cc:58
Belle2::Conditions::CurlSession::lasttime
double lasttime
last time we printed the status (in ns)
Definition: Downloader.cc:53
Belle2::Conditions::Downloader::setConnectionTimeout
void setConnectionTimeout(unsigned int timeout)
Set the timeout to wait for connections in seconds, 0 means built in curl default.
Definition: Downloader.cc:252
Belle2::Conditions::Downloader::m_session
std::unique_ptr< CurlSession > m_session
curl session handle
Definition: Downloader.h:106
Belle2::Conditions::Downloader::m_backoffFactor
unsigned int m_backoffFactor
Backoff factor for retries in seconds.
Definition: Downloader.h:116