Belle II Software  release-08-01-10
StreamHelper.cc
1 /**************************************************************************
2  * basf2 (Belle II Analysis Software Framework) *
3  * Author: The Belle II Collaboration *
4  * *
5  * See git log for contributors and copyright holders. *
6  * This file is licensed under LGPL-3.0, see LICENSE.md. *
7  **************************************************************************/
8 
9 #include <framework/pcore/zmq/utils/StreamHelper.h>
10 #include <framework/core/Environment.h>
11 #include <framework/core/RandomNumbers.h>
12 #include <framework/logging/Logger.h>
13 
14 #include <TSystem.h>
15 
16 using namespace Belle2;
17 
18 void StreamHelper::initialize(int compressionLevel, bool handleMergeable)
19 {
20  gSystem->Load("libdataobjects");
21  m_streamer = std::make_unique<DataStoreStreamer>(compressionLevel, handleMergeable);
22 
23  if ((Environment::Instance().getStreamingObjects()).size() > 0) {
24  m_streamer->setStreamingObjects(Environment::Instance().getStreamingObjects());
25  B2INFO("Tx: Streaming objects limited : " << (Environment::Instance().getStreamingObjects()).size() << " objects");
26  }
27 }
28 
29 std::unique_ptr<EvtMessage> StreamHelper::stream(bool addPersistentDurability, bool streamTransientObjects)
30 {
31  if (m_randomGenerator.isOptional()) {
32  if (not m_randomGenerator.isValid()) {
34  } else {
36  }
37  }
38  return std::unique_ptr<EvtMessage>(m_streamer->streamDataStore(addPersistentDurability, streamTransientObjects));
39 }
40 
41 void StreamHelper::read(std::unique_ptr<ZMQNoIdMessage> message)
42 {
43  EvtMessage eventMessage(message->getMessagePartAsCharArray<ZMQNoIdMessage::c_data>());
44  m_streamer->restoreDataStore(&eventMessage);
45 
46  if (m_randomGenerator.isValid()) {
48  }
49 }
static Environment & Instance()
Static method to get a reference to the Environment instance.
Definition: Environment.cc:28
Class to manage streamed object.
Definition: EvtMessage.h:59
static RandomGenerator & getEventRandomGenerator()
return reference to the event dependent random generator
Definition: RandomNumbers.h:73
std::unique_ptr< DataStoreStreamer > m_streamer
The data store streamer to use.
Definition: StreamHelper.h:34
void read(std::unique_ptr< ZMQNoIdMessage > message)
Read in a ZMQ message and rebuilt the data store from it.
Definition: StreamHelper.cc:41
std::unique_ptr< EvtMessage > stream(bool addPersistentDurability=true, bool streamTransientObjects=true)
Stream the data store into an event message.
Definition: StreamHelper.cc:29
void initialize(int compressionLevel, bool handleMergeable)
Initialize this class. Call this e.g. in the first event.
Definition: StreamHelper.cc:18
StoreObjPtr< RandomGenerator > m_randomGenerator
The random generator object in the data store that we need to transport also.
Definition: StreamHelper.h:36
static constexpr const unsigned int c_data
Where the data is stored.
Abstract base class for different kinds of events.