Aleph-w  1.9
General library for algorithms and data structures
ringfilecache.H
1 
2 /* Aleph-w
3 
4  / \ | | ___ _ __ | |__ __ __
5  / _ \ | |/ _ \ '_ \| '_ \ ____\ \ /\ / / Data structures & Algorithms
6  / ___ \| | __/ |_) | | | |_____\ V V / version 1.9b
7  /_/ \_\_|\___| .__/|_| |_| \_/\_/ https://github.com/lrleon/Aleph-w
8  |_|
9 
10  This file is part of Aleph-w library
11 
12  Copyright (c) 2002-2018 Leandro Rabindranath Leon & Alejandro Mujica
13 
14  This program is free software: you can redistribute it and/or modify
15  it under the terms of the GNU General Public License as published by
16  the Free Software Foundation, either version 3 of the License, or
17  (at your option) any later version.
18 
19  This program is distributed in the hope that it will be useful, but
20  WITHOUT ANY WARRANTY; without even the implied warranty of
21  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
22  General Public License for more details.
23 
24  You should have received a copy of the GNU General Public License
25  along with this program. If not, see <https://www.gnu.org/licenses/>.
26 */
27 # ifndef RINGFILECACHE_H
28 # define RINGFILECACHE_H
29 
30 # include <sys/time.h>
31 # include <stdio.h>
32 # include <cassert>
33 # include <string.h>
34 # include <stdexcept>
35 # include <memory>
36 # include <fstream>
37 # include <iostream>
38 # include <string>
39 # include <sstream>
40 # include <tpl_array.H>
41 
42 using namespace std;
43 
44 extern const size_t Ring_Max_Name_Size;
45 
50 template <typename T>
52 {
53 public:
54 
55  // The cache parameters are stored in a binary file with this structure
56  struct Pars
57  {
58  size_t dim; // capacity in entries of cache
59  size_t n; // Number of entries stored in cache
60  size_t head; // next integral position ready for reading
61  size_t tail; // next integral position ready for writing
62  size_t size_cache_file = 0; // num of chars of string (including '\0')
63  char cache_file_name[0]; // be careful with this field. DON'T USE IT!
64 
65  string to_string() const
66  {
67  ostringstream s;
68  s <<dim << " " << n << " " << tail << " " << head << " "
69  << size_cache_file;
70  return s.str();
71  }
72 
73  friend ostream & operator << (ostream & out, const Pars & pars)
74  {
75  return out << pars.to_string();
76  }
77  };
78 
79 private:
80 
81  bool initialized = false;
82  string pars_file_name;
83  string cache_file_name;
84 
85  fstream pars_stream; // the stream for the pars (read/write mode)
86  fstream cache_stream; // the stream for the cache (read/write mode)
87 
88  size_t dim = 0; // the maximum number of entries
89  size_t n = 0; // the current number of stored entries
90  size_t head = 0; // next integral position ready for reading
91  size_t tail = 0; // next integral position ready for writing
92 
93  void test_and_set_tail_pointer()
94  {
95  cache_stream.seekp(tail*sizeof(T));
96  }
97 
98  bool is_valid_offset(const size_t offset) const noexcept
99  {
100  return offset < n;
101  }
102 
103  bool is_valid_position(const size_t pos) const noexcept
104  {
105  return pos >= head and pos < tail;
106  }
107 
108  void test_and_set_head_pointer(const size_t num_entries = 0)
109  {
110  const size_t pos = (head + num_entries) % dim;
111  cache_stream.seekg(pos*sizeof(T));
112  }
113 
114  // load an entry from stream. WARNING: the stream pointer is not a priori set
115  T read_entry()
116  {
117  T ret;
118  cache_stream.read((char*) &ret, sizeof ret);
119 
120  return ret;
121  }
122 
123  // read the pos entry respect to the head (oldest entry)
124  T read_entry(const size_t pos)
125  {
126  if (pos >= n)
127  {
128  ostringstream s;
129  s << "read_entry(" << pos << "): cache has " << n << " items";
130  throw range_error(s.str());
131  }
132 
133  const size_t real_pos = (head + pos) % dim;
134 
135  T ret;
136  cache_stream.seekg(real_pos*sizeof(ret));
137  cache_stream.read((char*) &ret, sizeof ret);
138  return ret;
139  }
140 
141  void write_entry(const T & item)
142  {
143  cache_stream.write((char*) &item, sizeof item);
144  }
145 
146  void validate_absolute_position(const size_t pos) const
147  {
148  if (pos < dim)
149  return;
150 
151  ostringstream s;
152  s << "position " << pos << " is greater than dim " << dim;
153  throw out_of_range(s.str());
154  }
155 
156  T read_absolute(const size_t pos)
157  {
158  validate_absolute_position(pos);
159  T ret;
160  cache_stream.seekg(pos*sizeof(T));
161  cache_stream.read((char*) &ret, sizeof ret);
162  return ret;
163  }
164 
165  void write_absolute(const size_t pos, const T & item)
166  {
167  validate_absolute_position(pos);
168  cache_stream.seekp(pos*sizeof(T));
169  cache_stream.write((char*) &item, sizeof item);
170  }
171 
172 public:
173 
174  string to_string() const
175  {
176  const size_t tg = const_cast<fstream&>(cache_stream).tellg()/sizeof(T);
177  const size_t tp = const_cast<fstream&>(cache_stream).tellp()/sizeof(T);
178  ostringstream s;
179  s << "Cache pars" << endl
180  << "capacity = " << capacity() << endl
181  << "size = " << size() << endl
182  << "sizeof T = " << sizeof(T) << endl
183  << "head pos = " << head_pos() << endl
184  << "tail pos = " << tail_pos() << endl
185  << "tellg/T = " << tg << endl
186  << "tellp/T = " << tp << endl;
187  return s.str();
188  }
189 
190  friend ostream & operator << (ostream & out, const RingFileCache & cache)
191  {
192  return out << cache.to_string();
193  }
194 
219  static void create(const string & pars_file_name,
220  const string & cache_file_name,
221  const size_t num_entries)
222  {
223  static_assert(std::is_default_constructible<T>::value,
224  "T type has not default constructor");
225 
226  ostringstream s;
227 
228  fstream pars_stream(pars_file_name, ios::binary | ios::out);
229  if ((not pars_stream.is_open()) or (not pars_stream.good()))
230  {
231  s << "cannot open " << pars_file_name;
232  throw std::domain_error(s.str());
233  }
234 
235  fstream cache_stream(cache_file_name, ios::binary | ios::out);
236  if ((not cache_stream.is_open()) or (not cache_stream.good()))
237  {
238  s << "cannot open " << cache_file_name;
239  throw domain_error(s.str());
240  }
241 
242  const size_t pars_size = sizeof(Pars) + cache_file_name.size() + 1;
243  shared_ptr<Pars> pars_ptr((Pars*) malloc(pars_size), free);
244  if (pars_ptr == nullptr)
245  throw bad_alloc();
246 
247  pars_ptr->dim = num_entries;
248  pars_ptr->n = pars_ptr->head = pars_ptr->tail = 0;
249  pars_ptr->size_cache_file = cache_file_name.size() + 1;
250  strcpy(pars_ptr->cache_file_name, cache_file_name.c_str());
251  pars_stream.write((char*) pars_ptr.get(), pars_size);
252 
253  T init;
254  memset(&init, 0, sizeof(T));
255  for (size_t i = 0; i < num_entries; ++i)
256  cache_stream.write((char*) &init, sizeof(init));
257  }
258 
259 private:
260 
261  static string state(fstream & ss)
262  {
263  ostringstream s;
264  auto state = ss.rdstate();
265  if (state & ios::goodbit)
266  s << " goodbit = true " << endl;
267  else
268  s << " goodbit = false " << endl;
269  if (state & ios::badbit)
270  s << " badbit = true " << endl;
271  else
272  s << " badbit = false " << endl;
273  if (state & ios::failbit)
274  s << " failbit = true " << endl;
275  else
276  s << " failbit = false " << endl;
277  if (state & ios::eofbit)
278  s << " eofbit = true " << endl;
279  else
280  s << " eofbit = false " << endl;
281  return s.str();
282  }
283 
284  // Helper method for initializing the cache from the pars file
285  void read_pars(const string & pars_file_name)
286  {
287  ostringstream s;
288  pars_stream.open(pars_file_name, ios::binary | ios::out | ios::in);
289  if ((not pars_stream.is_open()) or (not pars_stream.good()))
290  {
291  s << "cannot open " << pars_file_name;
292  throw domain_error(s.str());
293  }
294  // set the cache internal state from the saved state in pars file
295  Pars pars;
296  pars_stream.read((char*) &pars, sizeof pars);
297  dim = pars.dim;
298  n = pars.n;
299  head = pars.head;
300  tail = pars.tail;
301 
302  // read the cache file name, which shound be saved at the end of
303  // pars file
304  const size_t name_size = min(Ring_Max_Name_Size, pars.size_cache_file);
305  shared_ptr<char> name((char*) malloc(name_size), free);
306  pars_stream.read(name.get(), name_size);
307  cache_file_name = name.get();
308 
309  cache_stream.open(cache_file_name, ios::binary | ios::out | ios::in);
310  if ((not cache_stream.is_open()) or (not cache_stream.good()))
311  {
312  s << "cannot open " << cache_file_name;
313  throw domain_error(s.str());
314  }
315 
316  pars_stream.seekp(0);
317 
318  cache_stream.seekg(head*sizeof(T)); // set read position
319  cache_stream.seekp(tail*sizeof(T)); // set write position
320  initialized = true;
321 
322  assert(pars_stream.good());
323  assert(cache_stream.good());
324  }
325 
326 public:
327 
332  class Pointer
333  {
334  friend class RingFileCache;
335 
336  RingFileCache * cache_ptr = nullptr;
337  size_t pos = 0; // this an absolute position in the file
338  size_t end_pos; // bound according to dimension
339 
340  void validate_position(const size_t pos) const
341  {
342  if (pos < cache_ptr->dim)
343  return;
344 
345  ostringstream s;
346  s << "position " << pos << " is outside of maximum " << cache_ptr->dim;
347  throw std::out_of_range(s.str());
348  }
349 
350  void increase_pos(const long n = 1) noexcept
351  {
352  if (n < 0)
353  {
354  decrease_pos(-n);
355  return;
356  }
357 
358  pos += n;
359  if (pos > end_pos)
360  pos = pos % cache_ptr->dim;
361  }
362 
363  void decrease_pos(const long n = 1) noexcept
364  {
365  if (n < 0)
366  {
367  increase_pos(-n);
368  return;
369  }
370 
371  long val = pos - n;
372  if (val < 0)
373  {
374  size_t m = n % cache_ptr->dim; // real positional offset
375  if (pos < m)
376  pos = cache_ptr->dim - (m - pos);
377  else
378  pos -= m;
379  }
380  else
381  pos = val;
382  }
383 
384  public:
385 
386  Pointer() {}
387 
393  Pointer(const RingFileCache & cache, const size_t __pos = 0)
394  : cache_ptr(&const_cast<RingFileCache&>(cache)),
395  pos(cache.head_pos() + __pos), end_pos(cache.dim) {}
396 
397  Pointer operator ++ () noexcept // prefix increment
398  {
399  increase_pos(1);
400  return *this;
401  }
402 
403  Pointer operator ++ (int) noexcept // postfix increment
404  {
405  Pointer ret = *this;
406  increase_pos(1);
407  return ret;
408  }
409 
410  Pointer operator -- () noexcept // prefix decrement
411  {
412  decrease_pos(1);
413  return *this;
414  }
415 
416  Pointer operator -- (int) noexcept // postfix decrement
417  {
418  Pointer ret = *this;
419  decrease_pos(1);
420  return ret;
421  }
422 
423  Pointer & operator += (const long val) noexcept
424  {
425  increase_pos(val);
426  return *this;
427  }
428 
429  Pointer & operator -= (const long val) noexcept
430  {
431  decrease_pos(val);
432  return *this;
433  }
434 
435  Pointer operator + (const long val) const noexcept
436  {
437  Pointer ret = *this;
438  ret += val;
439  return ret;
440  }
441 
442  Pointer operator - (const long val) const noexcept
443  {
444  Pointer ret = *this;
445  ret -= val;
446  return ret;
447  }
448 
449  size_t get_pos_respect_to_head() const noexcept
450  {
451  auto head = cache_ptr->head;
452  if (head < pos)
453  return pos - head;
454  else
455  return cache_ptr->dim - head + pos;
456  }
457 
458  size_t get_pos() const noexcept
459  {
460  return get_pos_respect_to_head();
461  }
462  };
463 
464  T read(const Pointer & ptr)
465  {
466  if (ptr.cache_ptr != this)
467  throw domain_error("RingFileCache::read(const Pointer&): invalid ptr");
468 
469  return read_absolute(ptr.pos);
470  }
471 
472  void write(const Pointer & ptr, const T & item)
473  {
474  if (ptr.cache_ptr != this)
475  throw domain_error("RingFileCache::write(const Pointer&): invalid ptr");
476 
477  return write_absolute(ptr.pos, item);
478  }
479 
480  bool is_initialized() const { return initialized; }
481 
483  size_t size() const noexcept { return n; }
484 
486  size_t capacity() const noexcept { return dim; }
487 
489  size_t avail() const noexcept { return dim - n; }
490 
492  size_t head_pos() const noexcept { return head; }
493 
495  size_t tail_pos() const noexcept { return tail; }
496 
498  bool is_empty() const noexcept { return n == 0; }
499 
514  RingFileCache(const string & pars_fname) : pars_file_name(pars_fname)
515  {
516  read_pars(pars_file_name);
517  }
518 
528 
535  static bool test(const string & pars_fname)
536  {
537  ifstream pars_stream(pars_fname, ios::binary);
538  if (not pars_stream.good())
539  return false;
540 
541  Pars pars;
542  pars_stream.read((char*) &pars, sizeof pars);
543 
544  // read the cache file name, which should be saved at the end of
545  // pars file
546  const size_t name_size = min(Ring_Max_Name_Size, pars.size_cache_file);
547  shared_ptr<char> name((char*) malloc(name_size), free);
548  pars_stream.read(name.get(), name_size);
549  const char * cache_file_name = name.get();
550 
551  ifstream cache_stream(cache_file_name, ios::binary);
552  if (not cache_stream.good())
553  return false;
554 
555  return true;
556  }
557 
567  void init(const string & pars_fname)
568  {
569  if (pars_stream.is_open())
570  throw std::domain_error("this cache has already an opened pars file");
571 
572  new (this) RingFileCache(pars_fname);
573  }
574 
586  bool put(const T & item)
587  {
588  if (n == dim) // cache full?
589  return false;
590 
591  // assert(cache_stream.good());
592 
593  test_and_set_tail_pointer();
594 
595  assert(cache_stream.tellp() == tail*sizeof(T));
596 
597  write_entry(item);
598  if (++tail == dim) // eof border reached?
599  {
600  tail = 0;
601  cache_stream.seekp(0);
602  assert(cache_stream.good());
603  }
604  ++n;
605 
606  assert(cache_stream.good());
607  assert(cache_stream.tellp() == tail*sizeof(T));
608 
609  return true;
610  }
611 
635  bool read(T entries[], const size_t m = 1)
636  {
637  if (m > n)
638  return false;
639 
640  test_and_set_head_pointer();
641 
642  assert(cache_stream.good());
643  // assert(cache_stream.tellg() == head*sizeof(T));
644 
645  const size_t n_avail_until_eof = dim - head;
646  if (m <= n_avail_until_eof)
647  cache_stream.read((char*) entries, m*sizeof(T));
648  else
649  {
650  cache_stream.read((char*) entries, n_avail_until_eof*sizeof(T));
651  cache_stream.seekg(0);
652  cache_stream.read((char*) &entries[n_avail_until_eof],
653  (m - n_avail_until_eof)*sizeof(T));
654  assert(cache_stream.gcount() == (m - n_avail_until_eof)*sizeof(T));
655  }
656 
657  assert(cache_stream.good());
658 
659  return true;
660  }
661 
664  {
665  if (n == 0)
666  throw underflow_error("read_first(): cache is empty");
667 
668  test_and_set_head_pointer();
669  T ret_val;
670  read(&ret_val);
671  return ret_val;
672  }
673 
676  {
677  if (n == 0)
678  throw underflow_error("read_last(): cache is empty");
679 
680  if (tail == 0)
681  cache_stream.seekg((dim - 1)*sizeof(T));
682  else
683  cache_stream.seekg((tail - 1)*sizeof(T));
684 
685  return read_entry();
686  }
687 
688  T youngest() { return read_last(); }
689 
690  T oldest() { return read_first(); }
691 
692  T oldest(size_t i)
693  {
694  if (i > n)
695  {
696  ostringstream s;
697  s << "youngest(" << i << ") but the cache has " << n << " entries";
698  throw overflow_error(s.str());
699  }
700 
701  T ret_val;
702  read_entry(i);
703  return ret_val;
704  }
705 
711  {
712  Array<T> ret(n);
713  ret.putn(n);
714  assert(ret.capacity() >= n);
715  read(&ret.base(), n);
716  return ret;
717  }
718 
725  Array<T> read_from(const size_t pos, const size_t m)
726  {
727  Array<T> ret;
728  Iterator it(*this, pos);
729  for (size_t i = 0; i < m and it.has_curr(); ++i, it.next_ne())
730  ret.append(it.get_curr_ne());
731 
732  return ret;
733  }
734 
741  Array<T> read_from(const Pointer & ptr, const size_t m)
742  {
743  Pointer p = ptr;
744  Array<T> ret;
745  for (size_t i = 0; i < m; ++i)
746  ret.append(read(p++));
747 
748  return ret;
749  }
750 
756  bool get(const size_t m = 1) noexcept
757  {
758  if (m > n)
759  return false;
760 
761  head = (head + m) % dim;
762  n -= m;
763 
764  return true;
765  }
766 
768  void empty() { get(n); }
769 
776  void flush()
777  {
778  assert(pars_stream.tellp() == 0);
779  Pars pars;
780  pars.dim = dim;
781  pars.n = n;
782  pars.head = head;
783  pars.tail = tail;
784  pars.size_cache_file = cache_file_name.size() + 1;
785  pars_stream.write((char*) &pars, sizeof(pars));
786  pars_stream.flush();
787  pars_stream.seekp(0);
788 
789  cache_stream.flush();
790  }
791 
792  void close()
793  {
794  if (not initialized)
795  return;
796  flush();
797  cache_stream.close();
798  pars_stream.close();
799  initialized = false;
800  }
801 
802  ~RingFileCache() { close(); }
803 
812  void resize(const size_t sz)
813  {
814  if (sz < dim)
815  throw domain_error("RingFileCache::resize(): file truncation is not "
816  "implemented (yet?)");
817 
818  T init;
819  cache_stream.seekp(dim*sizeof(T));
820  for (size_t i = 0, n = sz - dim; i < n; ++i)
821  write_entry(init);
822 
823  dim = sz;
824  flush();
825  }
826 
827  // Only for reading and *NOT REENTRANT* ==> while using you MUST not
828  // to execute any others operations on cache object
829  //
830  // Iterate from oldest until youngest
831  class Iterator
832  {
833  RingFileCache<T> * cache_ptr = nullptr;
834  T curr;
835  size_t pos;
836  size_t curr_pos;
837 
838  void set_curr_pointer()
839  {
840  cache_ptr->cache_stream.seekg(curr_pos*sizeof(T));
841  }
842 
843  void increase_pos()
844  {
845  ++pos;
846  if (++curr_pos == cache_ptr->dim)
847  curr_pos = 0;
848  }
849 
850  void load_curr()
851  {
852  set_curr_pointer();
853  curr = cache_ptr->read_entry();
854  }
855 
856  public:
857 
858  size_t get_pos() const noexcept { return pos; }
859 
860  bool has_curr() const { return pos < cache_ptr->n; }
861 
868  Iterator(const RingFileCache<T> & cache, const size_t offset = 0)
869  : cache_ptr(&const_cast<RingFileCache<T>&>(cache)),
870  pos(offset), curr_pos((cache_ptr->head + offset) % cache.dim)
871  {
872  if (cache_ptr->is_empty())
873  return;
874 
875  if (not cache_ptr->is_valid_offset(pos))
876  return;
877 
878  load_curr();
879  }
880 
881  T get_curr_ne() const noexcept { return curr; }
882 
883  T get_curr() const
884  {
885  if (not has_curr())
886  throw overflow_error("RingFileCache::Iterator::get_curr()");
887  return get_curr_ne();
888  }
889 
890  void next_ne() noexcept
891  {
892  increase_pos();
893  if (has_curr())
894  load_curr();
895  }
896 
897  void next()
898  {
899  if (not has_curr())
900  throw overflow_error("RingFileCache::Iterator::next()");
901  next_ne();
902  }
903  };
904 
905  auto get_it() { return Iterator(*this); }
906 };
907 
908 # endif // RINGFILECACHE_H
size_t capacity() const noexcept
Returns the maximum capacity.
Definition: ringfilecache.H:486
T read_last()
Read the youngest entry in the set.
Definition: ringfilecache.H:675
void putn(const size_t n)
Definition: tpl_array.H:170
bool put(const T &item)
Definition: ringfilecache.H:586
Definition: ringfilecache.H:332
Pointer(const RingFileCache &cache, const size_t __pos=0)
Definition: ringfilecache.H:393
RingFileCache(const string &pars_fname)
Definition: ringfilecache.H:514
Array< T > read_from(const Pointer &ptr, const size_t m)
Definition: ringfilecache.H:741
void flush()
Definition: ringfilecache.H:776
RingFileCache()
Definition: ringfilecache.H:527
T & base() const noexcept
Return a modifiable reference to first element of array.
Definition: tpl_array.H:180
static bool test(const string &pars_fname)
Definition: ringfilecache.H:535
bool is_empty() const noexcept
Returns true if the cache is empty.
Definition: ringfilecache.H:498
T read_first()
Read the oldest entry in the set.
Definition: ringfilecache.H:663
size_t avail() const noexcept
Returns the number of available entries.
Definition: ringfilecache.H:489
void resize(const size_t sz)
Definition: ringfilecache.H:812
Definition: ringfilecache.H:51
T & append(const T &data)
Definition: tpl_array.H:109
static void create(const string &pars_file_name, const string &cache_file_name, const size_t num_entries)
Definition: ringfilecache.H:219
Definition: tpl_array.H:54
bool read(T entries[], const size_t m=1)
Definition: ringfilecache.H:635
void init(const string &pars_fname)
Definition: ringfilecache.H:567
size_t tail_pos() const noexcept
return the current tail position
Definition: ringfilecache.H:495
void empty()
empties the cache; all the entries are deleted
Definition: ringfilecache.H:768
size_t capacity() const noexcept
Return the internal capacity.
Definition: tpl_array.H:192
size_t head_pos() const noexcept
Returns the current head position.
Definition: ringfilecache.H:492
Array< T > read_all()
Definition: ringfilecache.H:710
size_t size() const noexcept
Returns the number of entries stored in the cache.
Definition: ringfilecache.H:483
Array< T > read_from(const size_t pos, const size_t m)
Definition: ringfilecache.H:725

Leandro Rabindranath León