Belle II Software  release-05-02-19
StreamHelper.cc
1 /**************************************************************************
2  * BASF2 (Belle Analysis Framework 2) *
3  * Copyright(C) 2018 - Belle II Collaboration *
4  * *
5  * Author: The Belle II Collaboration *
6  * Contributors: Nils Braun *
7  * *
8  * This software is provided "as is" without any warranty. *
9  **************************************************************************/
10 
11 #include <framework/pcore/zmq/utils/StreamHelper.h>
12 #include <framework/core/Environment.h>
13 #include <framework/core/RandomNumbers.h>
14 #include <framework/logging/Logger.h>
15 
16 #include <TSystem.h>
17 
18 using namespace Belle2;
19 
20 void StreamHelper::initialize(int compressionLevel, bool handleMergeable)
21 {
22  gSystem->Load("libdataobjects");
23  m_streamer = std::make_unique<DataStoreStreamer>(compressionLevel, handleMergeable);
24 
25  if ((Environment::Instance().getStreamingObjects()).size() > 0) {
26  m_streamer->setStreamingObjects(Environment::Instance().getStreamingObjects());
27  B2INFO("Tx: Streaming objects limited : " << (Environment::Instance().getStreamingObjects()).size() << " objects");
28  }
29 }
30 
31 std::unique_ptr<EvtMessage> StreamHelper::stream(bool addPersistentDurability, bool streamTransientObjects)
32 {
33  if (m_randomGenerator.isOptional()) {
34  if (not m_randomGenerator.isValid()) {
36  } else {
38  }
39  }
40  return std::unique_ptr<EvtMessage>(m_streamer->streamDataStore(addPersistentDurability, streamTransientObjects));
41 }
42 
43 void StreamHelper::read(std::unique_ptr<ZMQNoIdMessage> message)
44 {
45  EvtMessage eventMessage(message->getMessagePartAsCharArray<ZMQNoIdMessage::c_data>());
46  m_streamer->restoreDataStore(&eventMessage);
47 
48  if (m_randomGenerator.isValid()) {
50  }
51 }
Belle2::ZMQNoIdMessage::c_data
static constexpr const unsigned int c_data
Where the data is stored.
Definition: ZMQNoIdMessage.h:38
Belle2::StreamHelper::stream
std::unique_ptr< EvtMessage > stream(bool addPersistentDurability=true, bool streamTransientObjects=true)
Stream the data store into an event message.
Definition: StreamHelper.cc:31
Belle2::StreamHelper::initialize
void initialize(int compressionLevel, bool handleMergeable)
Initialize this class. Call this e.g. in the first event.
Definition: StreamHelper.cc:20
Belle2::StreamHelper::m_streamer
std::unique_ptr< DataStoreStreamer > m_streamer
The data store streamer to use.
Definition: StreamHelper.h:44
Belle2::EvtMessage
Class to manage streamed object.
Definition: EvtMessage.h:60
Belle2
Abstract base class for different kinds of events.
Definition: MillepedeAlgorithm.h:19
Belle2::RandomNumbers::getEventRandomGenerator
static RandomGenerator & getEventRandomGenerator()
return reference to the event dependent random generator
Definition: RandomNumbers.h:83
Belle2::StreamHelper::read
void read(std::unique_ptr< ZMQNoIdMessage > message)
Read in a ZMQ message and rebuilt the data store from it.
Definition: StreamHelper.cc:43
Belle2::Environment::Instance
static Environment & Instance()
Static method to get a reference to the Environment instance.
Definition: Environment.cc:31
Belle2::StreamHelper::m_randomGenerator
StoreObjPtr< RandomGenerator > m_randomGenerator
The random generator object in the data store that we need to transport also.
Definition: StreamHelper.h:46