Belle II Software light-2406-ragdoll
StreamHelper.cc
1/**************************************************************************
2 * basf2 (Belle II Analysis Software Framework) *
3 * Author: The Belle II Collaboration *
4 * *
5 * See git log for contributors and copyright holders. *
6 * This file is licensed under LGPL-3.0, see LICENSE.md. *
7 **************************************************************************/
8
9#include <framework/pcore/zmq/utils/StreamHelper.h>
10#include <framework/core/Environment.h>
11#include <framework/core/RandomNumbers.h>
12#include <framework/logging/Logger.h>
13
14#include <TSystem.h>
15
16using namespace Belle2;
17
18void StreamHelper::initialize(int compressionLevel, bool handleMergeable)
19{
20 gSystem->Load("libdataobjects");
21 m_streamer = std::make_unique<DataStoreStreamer>(compressionLevel, handleMergeable);
22
23 if ((Environment::Instance().getStreamingObjects()).size() > 0) {
24 m_streamer->setStreamingObjects(Environment::Instance().getStreamingObjects());
25 B2INFO("Tx: Streaming objects limited : " << (Environment::Instance().getStreamingObjects()).size() << " objects");
26 }
27}
28
29std::unique_ptr<EvtMessage> StreamHelper::stream(bool addPersistentDurability, bool streamTransientObjects)
30{
31 if (m_randomGenerator.isOptional()) {
32 if (not m_randomGenerator.isValid()) {
34 } else {
36 }
37 }
38 return std::unique_ptr<EvtMessage>(m_streamer->streamDataStore(addPersistentDurability, streamTransientObjects));
39}
40
41void StreamHelper::read(std::unique_ptr<ZMQNoIdMessage> message)
42{
43 EvtMessage eventMessage(message->getMessagePartAsCharArray<ZMQNoIdMessage::c_data>());
44 m_streamer->restoreDataStore(&eventMessage);
45
46 if (m_randomGenerator.isValid()) {
48 }
49}
static Environment & Instance()
Static method to get a reference to the Environment instance.
Definition: Environment.cc:28
Class to manage streamed object.
Definition: EvtMessage.h:59
static RandomGenerator & getEventRandomGenerator()
return reference to the event dependent random generator
Definition: RandomNumbers.h:73
std::unique_ptr< DataStoreStreamer > m_streamer
The data store streamer to use.
Definition: StreamHelper.h:34
void read(std::unique_ptr< ZMQNoIdMessage > message)
Read in a ZMQ message and rebuilt the data store from it.
Definition: StreamHelper.cc:41
std::unique_ptr< EvtMessage > stream(bool addPersistentDurability=true, bool streamTransientObjects=true)
Stream the data store into an event message.
Definition: StreamHelper.cc:29
void initialize(int compressionLevel, bool handleMergeable)
Initialize this class. Call this e.g. in the first event.
Definition: StreamHelper.cc:18
StoreObjPtr< RandomGenerator > m_randomGenerator
The random generator object in the data store that we need to transport also.
Definition: StreamHelper.h:36
static constexpr const unsigned int c_data
Where the data is stored.
Abstract base class for different kinds of events.
Definition: ClusterUtils.h:24