XRootD
XrdEcRedundancyProvider.cc
Go to the documentation of this file.
1 /************************************************************************
2  * KineticIo - a file io interface library to kinetic devices. *
3  * *
4  * This Source Code Form is subject to the terms of the Mozilla *
5  * Public License, v. 2.0. If a copy of the MPL was not *
6  * distributed with this file, You can obtain one at *
7  * https://mozilla.org/MP:/2.0/. *
8  * *
9  * This program is distributed in the hope that it will be useful, *
10  * but is provided AS-IS, WITHOUT ANY WARRANTY; including without *
11  * the implied warranty of MERCHANTABILITY, NON-INFRINGEMENT or *
12  * FITNESS FOR A PARTICULAR PURPOSE. See the Mozilla Public *
13  * License for more details. *
14  ************************************************************************/
15 
17 
18 #include <isa-l.h>
19 #include <cstring>
20 #include <sstream>
21 #include <algorithm>
22 
23 namespace XrdEc
24 {
25 
26 //--------------------------------------------------------------------------
30 //--------------------------------------------------------------------------
31 class Convert{
32  public:
33  //--------------------------------------------------------------------------
38  //--------------------------------------------------------------------------
39  template<typename...Args>
40  static std::string toString(Args&&...args){
41  std::stringstream s;
42  argsToStream(s, std::forward<Args>(args)...);
43  return s.str();
44  }
45  private:
46  //--------------------------------------------------------------------------
50  //--------------------------------------------------------------------------
51  template<typename Last>
52  static void argsToStream(std::stringstream& stream, Last&& last) {
53  stream << last;
54  }
55 
56  //--------------------------------------------------------------------------
61  //--------------------------------------------------------------------------
62  template<typename First, typename...Rest >
63  static void argsToStream(std::stringstream& stream, First&& first, Rest&&...rest) {
64  stream << first;
65  argsToStream(stream, std::forward<Rest>(rest)...);
66  }
67 };
68 
69 
70 
71 /* This function is (almost) completely ripped from the erasure_code_test.cc file
72  distributed with the isa-l library. */
74  unsigned char* encode_matrix, // in: encode matrix
75  unsigned char* decode_matrix, // in: buffer, out: generated decode matrix
76  unsigned int* decode_index, // out: order of healthy blocks used for decoding [data#1, data#3, ..., parity#1... ]
77  unsigned char* src_err_list, // in: array of #nerrs size [index error #1, index error #2, ... ]
78  unsigned char* src_in_err, // in: array of #data size > [1,0,0,0,1,0...] -> 0 == no error, 1 == error
79  unsigned int nerrs, // #total errors
80  unsigned int nsrcerrs, // #data errors
81  unsigned int k, // #data
82  unsigned int m // #data+parity
83 )
84 {
85  unsigned i, j, p;
86  unsigned int r;
87  unsigned char* invert_matrix, * backup, * b, s;
88  int incr = 0;
89 
90  std::vector<unsigned char> memory((size_t) (m * k * 3));
91  b = &memory[0];
92  backup = &memory[m * k];
93  invert_matrix = &memory[2 * m * k];
94 
95  // Construct matrix b by removing error rows
96  for (i = 0, r = 0; i < k; i++, r++) {
97  while (src_in_err[r]) {
98  r++;
99  }
100  for (j = 0; j < k; j++) {
101  b[k * i + j] = encode_matrix[k * r + j];
102  backup[k * i + j] = encode_matrix[k * r + j];
103  }
104  decode_index[i] = r;
105  }
106  incr = 0;
107  while (gf_invert_matrix(b, invert_matrix, k) < 0) {
108  if (nerrs == (m - k)) {
109  return -1;
110  }
111  incr++;
112  memcpy(b, backup, (size_t) (m * k));
113  for (i = nsrcerrs; i < nerrs - nsrcerrs; i++) {
114  if (src_err_list[i] == (decode_index[k - 1] + incr)) {
115  // skip the erased parity line
116  incr++;
117  continue;
118  }
119  }
120  if (decode_index[k - 1] + incr >= m) {
121  return -1;
122  }
123  decode_index[k - 1] += incr;
124  for (j = 0; j < k; j++) {
125  b[k * (k - 1) + j] = encode_matrix[k * decode_index[k - 1] + j];
126  }
127 
128  };
129 
130  for (i = 0; i < nsrcerrs; i++) {
131  for (j = 0; j < k; j++) {
132  decode_matrix[k * i + j] = invert_matrix[k * src_err_list[i] + j];
133  }
134  }
135  /* src_err_list from encode_matrix * invert of b for parity decoding */
136  for (p = nsrcerrs; p < nerrs; p++) {
137  for (i = 0; i < k; i++) {
138  s = 0;
139  for (j = 0; j < k; j++) {
140  s ^= gf_mul(invert_matrix[j * k + i],
141  encode_matrix[k * src_err_list[p] + j]);
142  }
143 
144  decode_matrix[k * p + i] = s;
145  }
146  }
147  return 0;
148 }
149 
151  objcfg( objcfg ),
152  encode_matrix( objcfg.nbchunks * objcfg.nbdata )
153 {
154  // k = data
155  // m = data + parity
156  gf_gen_cauchy1_matrix( encode_matrix.data(), static_cast<int>( objcfg.nbchunks ), static_cast<int>( objcfg.nbdata ) );
157 }
158 
159 
160 std::string RedundancyProvider::getErrorPattern( stripes_t &stripes ) const
161 {
162  std::string pattern( objcfg.nbchunks, 0 );
163  for( uint8_t i = 0; i < objcfg.nbchunks; ++i )
164  if( !stripes[i].valid ) pattern[i] = '\1';
165 
166  return pattern;
167 }
168 
169 
170 RedundancyProvider::CodingTable& RedundancyProvider::getCodingTable( const std::string& pattern )
171 {
172  std::lock_guard<std::mutex> lock(mutex);
173 
174  /* If decode matrix is not already cached we have to construct it. */
175  if( !cache.count(pattern) )
176  {
177  /* Expand pattern */
178  int nerrs = 0, nsrcerrs = 0;
179  unsigned char err_indx_list[objcfg.nbparity];
180  for (std::uint8_t i = 0; i < pattern.size(); i++) {
181  if (pattern[i]) {
182  err_indx_list[nerrs++] = i;
183  if (i < objcfg.nbdata) { nsrcerrs++; }
184  }
185  }
186 
187  /* Allocate Decode Object. */
188  CodingTable dd;
189  dd.nErrors = nerrs;
190  dd.blockIndices.resize( objcfg.nbdata );
191  dd.table.resize( objcfg.nbdata * objcfg.nbparity * 32);
192 
193  /* Compute decode matrix. */
194  std::vector<unsigned char> decode_matrix(objcfg.nbchunks * objcfg.nbdata);
195 
196  if (gf_gen_decode_matrix( encode_matrix.data(), decode_matrix.data(), dd.blockIndices.data(),
197  err_indx_list, (unsigned char*) pattern.c_str(), nerrs, nsrcerrs,
198  static_cast<int>( objcfg.nbdata ), static_cast<int>( objcfg.nbchunks ) ) )
199  throw IOError( XrdCl::XRootDStatus( XrdCl::stError, XrdCl::errDataError, errno, "Failed computing decode matrix" ) );
200 
201  /* Compute Tables. */
202  ec_init_tables( static_cast<int>( objcfg.nbdata ), nerrs, decode_matrix.data(), dd.table.data() );
203  cache.insert( std::make_pair(pattern, dd) );
204  }
205  return cache.at(pattern);
206 }
207 
208 void RedundancyProvider::replication( stripes_t &stripes )
209 {
210  // get index of a valid block
211  void *healthy = nullptr;
212  for( auto itr = stripes.begin(); itr != stripes.end(); ++itr )
213  {
214  if( itr->valid )
215  healthy = itr->buffer;
216  }
217 
218  if( !healthy ) throw IOError( XrdCl::XRootDStatus( XrdCl::stError, XrdCl::errDataError ) );
219 
220  // now replicate, by now 'buffers' should contain all chunks
221  for( uint8_t i = 0; i < objcfg.nbchunks; ++i )
222  {
223  if( !stripes[i].valid )
224  memcpy( stripes[i].buffer, healthy, objcfg.chunksize );
225  }
226 }
227 
229 {
230  /* throws if stripe is not recoverable */
231  std::string pattern = getErrorPattern( stripes );
232 
233  /* nothing to do if there are no parity blocks. */
234  if ( !objcfg.nbparity ) return;
235 
236  /* in case of a single data block use replication */
237  if ( objcfg.nbdata == 1 )
238  return replication( stripes );
239 
240  /* normal operation: erasure coding */
241  CodingTable& dd = getCodingTable(pattern);
242 
243  unsigned char* inbuf[objcfg.nbdata];
244  for( uint8_t i = 0; i < objcfg.nbdata; i++ )
245  inbuf[i] = reinterpret_cast<unsigned char*>( stripes[dd.blockIndices[i]].buffer );
246 
247  std::vector<unsigned char> memory( dd.nErrors * objcfg.chunksize );
248 
249  unsigned char* outbuf[dd.nErrors];
250  for (int i = 0; i < dd.nErrors; i++)
251  {
252  outbuf[i] = &memory[i * objcfg.chunksize];
253  }
254 
255  ec_encode_data(
256  static_cast<int>( objcfg.chunksize ), // Length of each block of data (vector) of source or destination data.
257  static_cast<int>( objcfg.nbdata ), // The number of vector sources in the generator matrix for coding.
258  dd.nErrors, // The number of output vectors to concurrently encode/decode.
259  dd.table.data(), // Pointer to array of input tables
260  inbuf, // Array of pointers to source input buffers
261  outbuf // Array of pointers to coded output buffers
262  );
263 
264  int e = 0;
265  for (size_t i = 0; i < objcfg.nbchunks; i++)
266  {
267  if( pattern[i] )
268  {
269  memcpy( stripes[i].buffer, outbuf[e], objcfg.chunksize );
270  e++;
271  }
272  }
273 }
274 
275 
276 };
Class for computing parities and recovering data.
static std::string toString(Args &&...args)
RedundancyProvider(const ObjCfg &objcfg)
void compute(stripes_t &stripes)
const uint16_t stError
An error occurred that could potentially be retried.
Definition: XrdClStatus.hh:32
const uint16_t errDataError
data is corrupted
Definition: XrdClStatus.hh:63
std::vector< stripe_t > stripes_t
All stripes in a block.
static int gf_gen_decode_matrix(unsigned char *encode_matrix, unsigned char *decode_matrix, unsigned int *decode_index, unsigned char *src_err_list, unsigned char *src_in_err, unsigned int nerrs, unsigned int nsrcerrs, unsigned int k, unsigned int m)
const uint8_t nbdata
Definition: XrdEcObjCfg.hh:87
const uint8_t nbchunks
Definition: XrdEcObjCfg.hh:85
const uint8_t nbparity
Definition: XrdEcObjCfg.hh:86
const uint64_t chunksize
Definition: XrdEcObjCfg.hh:89