9#include <botan/filters.h>
11#if defined(BOTAN_HAS_THREAD_UTILS)
13#include <botan/internal/semaphore.h>
14#include <botan/internal/barrier.h>
19struct Threaded_Fork_Data
25 Semaphore m_input_ready_semaphore;
30 Barrier m_input_complete_barrier;
37 const uint8_t* m_input =
nullptr;
42 size_t m_input_length = 0;
48Threaded_Fork::Threaded_Fork(Filter* f1, Filter* f2, Filter* f3, Filter* f4) :
49 Fork(nullptr, static_cast<size_t>(0)),
50 m_thread_data(new Threaded_Fork_Data)
52 Filter* filters[4] = { f1, f2, f3, f4 };
59Threaded_Fork::Threaded_Fork(Filter* filters[],
size_t count) :
60 Fork(nullptr, static_cast<size_t>(0)),
61 m_thread_data(new Threaded_Fork_Data)
63 set_next(filters, count);
66Threaded_Fork::~Threaded_Fork()
68 m_thread_data->m_input =
nullptr;
69 m_thread_data->m_input_length = 0;
71 m_thread_data->m_input_ready_semaphore.release(m_threads.size());
73 for(
auto& thread : m_threads)
77std::string Threaded_Fork::name()
const
79 return "Threaded Fork";
82void Threaded_Fork::set_next(Filter* f[],
size_t n)
87 if(n < m_threads.size())
92 for(
size_t i = m_threads.size(); i != n; ++i)
95 std::shared_ptr<std::thread>(
97 std::bind(&Threaded_Fork::thread_entry,
this, m_next[i]))));
102void Threaded_Fork::send(
const uint8_t input[],
size_t length)
104 if(m_write_queue.size())
105 thread_delegate_work(m_write_queue.data(), m_write_queue.size());
106 thread_delegate_work(input, length);
108 bool nothing_attached =
true;
109 for(
size_t j = 0; j != total_ports(); ++j)
111 nothing_attached =
false;
114 m_write_queue += std::make_pair(input, length);
116 m_write_queue.clear();
119void Threaded_Fork::thread_delegate_work(
const uint8_t input[],
size_t length)
122 m_thread_data->m_input = input;
123 m_thread_data->m_input_length = length;
126 m_thread_data->m_input_complete_barrier.wait(total_ports() + 1);
127 m_thread_data->m_input_ready_semaphore.release(total_ports());
130 m_thread_data->m_input_complete_barrier.sync();
133 m_thread_data->m_input =
nullptr;
134 m_thread_data->m_input_length = 0;
137void Threaded_Fork::thread_entry(Filter* filter)
141 m_thread_data->m_input_ready_semaphore.acquire();
143 if(!m_thread_data->m_input)
146 filter->write(m_thread_data->m_input, m_thread_data->m_input_length);
147 m_thread_data->m_input_complete_barrier.sync();