Aleph-w  1.5a.2
Biblioteca general de algoritmos y estructuras de datos
 Todo Clases Archivos Funciones Variables 'typedefs' Enumeraciones Amigas Grupos Páginas
tpl_agent_graph.H
1 # ifndef TPL_AGENT_GRAPH_H
2 # define TPL_AGENT_GRAPH_H
3 
4 # include <sys/time.h>
5 
6 # include <iostream>
7 # include <exception>
8 # include <sys/types.h>
9 # include <dlink.H>
10 # include <tpl_concurrent_graph.H>
11 # include <tpl_dynArray.H>
12 # include <tpl_arrayQueue.H>
13 
14 using namespace Aleph;
15 using namespace std;
16 
17 namespace Aleph
18 {
19 
20  // Declaraciones adelantadas de clases
21 
22 template <typename Agent_Info> class Agent; // Agente que circula por grafo
23 
24 
73  template <typename Agent_Info>
75 {
76  enum Agent_State
77  {
78  Ready, // Listo para ejecutarse dentro de la cola ready
79  Executing, // Ejecutándose, sin estar en ninguna cola
80  Suspending, // dentro de la cola suspendidos
81  Suspended, // ejecutándose pero recibe una señal de suspender
82  Deleting_From_Ready, // ejecutándose pero recibe señal de eliminar de grafo
83  Deleting_From_Suspended, // dentro de ready pero recibe señal eliminar de grafo
84  Without_Queue, // No está dentro del grafo
85  Invalid // agente no tiene función de transición
86  } ;
87 
88  enum Transition_Action
89  {
90  Remain_In_Node,
91  Remain_In_Arc,
92  Enter_In_Node,
93  Enter_In_Arc,
94  Out_Of_Location, // no implementado todavía
95  Leave_Node, // no implementado todavía
96  Leave_Arc, // no implementado todavía
97  Suspend,
98  Dead
99  } ;
100 
134  typedef long (*Process_From_Node) (void * agent,
135  void * graph,
136  void * src,
137  void *& arc);
138 
164  typedef long (*Process_From_Arc) (void * agent, void * graph, void * arc);
165 
183  typedef long (*Process_Out_Location) (void * agent,
184  void * graph,
185  void *& node,
186  void *& arc);
187 
188  Agent_State agent_state;
189 
190  pthread_t thread_id;
191 
192  Agent_Info info;
193 
194  // enlace a lista de agentes de todo el grafo
195  Dlink agent_link_in_graph;
196 
197  // enlace a cola de listos o suspendidos
198  Dlink schedule_link_in_graph;
199 
200  // cada nodo o arco contiene una lista de los agentes que se
201  // encuentran en él. Este es el enlace
202  Dlink location_link;
203 
204  bool in_node; // true ==> agente en un nodo; false ==> agente es un arco
205 
206  void * location; // puntero a nodo o arco donde se encuentra el agente
207 
208  void * src; // si location es un arco ==> src es el nodo fuente
209 
210  void * cookie;
211 
212  Process_From_Node process_from_node;
213 
214  Process_From_Arc process_from_arc;
215 
216  Process_Out_Location process_out_location;
217 
220  void * get_src_node() { return src; }
221 
222  void set_agent_state()
223  {
224  if (process_from_node == NULL or process_from_arc)
225  agent_state = Invalid;
226  else
227  agent_state = Without_Queue;
228  }
229 
232  void set_process_node(Process_From_Node __process_from_node)
233  {
234  process_from_node = __process_from_node;
235  set_agent_state();
236  }
237 
240  void set_process_arc(Process_From_Arc __process_from_arc)
241  {
242  process_from_arc = __process_from_arc;
243  set_agent_state();
244  }
245 
246  bool is_a_valid_agent() const
247  {
248  return ((process_from_node != NULL and process_from_arc != NULL) or
249  (agent_state == Invalid));
250  }
251 
254  : location(NULL), src(NULL), cookie(NULL),
255  process_from_node(NULL), process_from_arc(NULL)
256  {
257  agent_state = Invalid;
258  }
259 
262  Walking_Agent(const Agent_Info & agent_info,
263  Process_From_Node __process_from_node = NULL,
264  Process_From_Arc __process_from_arc = NULL,
265  Process_Out_Location __process_out_location = NULL)
266  : info(agent_info), location(NULL), src(NULL), cookie(NULL),
267  process_from_node(__process_from_node),
268  process_from_arc(__process_from_arc),
269  process_out_location(__process_out_location)
270  {
271  set_agent_state();
272  }
273 
275  Walking_Agent(Process_From_Node __process_from_node,
276  Process_From_Arc __process_from_arc,
277  Process_Out_Location __process_out_location)
278  : agent_state(Without_Queue), location(NULL), src(NULL), cookie(NULL),
279  process_from_node(__process_from_node),
280  process_from_arc(__process_from_arc),
281  process_out_location(__process_out_location)
282  {
283  set_agent_state();
284  }
285 
286  typedef Agent_Info Agent_Type;
287 
288  // funciones de conversión a Agent desde un dlink
289  LINKNAME_TO_TYPE (Walking_Agent, agent_link_in_graph);
290  LINKNAME_TO_TYPE (Walking_Agent, schedule_link_in_graph);
291  LINKNAME_TO_TYPE (Walking_Agent, location_link);
292 
293  // funciones de acceso a atributos públicos de un agente
294 
295  Agent_Info & get_info() { return info; }
296 
298  void *& get_cookie() { return cookie; }
299 
301  bool is_in_node() const { return in_node; }
302 
304  bool is_in_arc() const { return not in_node; }
305 };
306 
307 
312  template <typename Agents_Node_Info>
313 class Agent_Node : public Graph_Node <Agents_Node_Info>
314 {
315 public:
316 
317  Dlink agent_list; // lista de agentes que tiene el nodo
318 
320 
321  typedef Agents_Node_Info Node_Type;
322 
323  Agent_Node() : Graph_Node<Agents_Node_Info>() { /* empty */ }
324 
325  Agent_Node(const Node_Type & info)
326  : Graph_Node<Agents_Node_Info>(info) { /* empty */ }
327 };
328 
329 
334  template <typename Agents_Arc_Info>
335 class Agent_Arc : public Graph_Arc<Agents_Arc_Info>
336 {
337 public:
338 
339  Dlink agent_list; // lista de agentes que tiene el arco
340 
342 
343  typedef Agents_Arc_Info Arc_Type;
344 
345  Agent_Arc() { /* empty */ }
346 
347  Agent_Arc(const Arc_Type & info) : Base_Arc (info) { /* empty */ }
348 
349  Agent_Arc(void * src, void * tgt) : Base_Arc (src, tgt)
350  {
351  // empty
352  }
353 };
354 
355 
418 template <template <class, class> class __GT, // El tipo de grafo
419  class __Node, // Definción de nodo basada Agent_Node<T>
420  class __Arc, // Definición de arco basada en Agent_Arc<T>
421  class __Agent> // Definción de agente basada en Walking_Agent<T>
422 class Agent_Graph : public Concurrent_Graph<__GT, __Node, __Arc>
423 {
424  typedef Agent_Graph GT;
425 
426 public:
427 
429  typedef typename GT::Node Node;
430 
432  typedef typename GT::Arc Arc;
433 
434  typedef __Agent Agent;
435 
436 private:
437 
438  // Estados de ejecución del grafo
439  enum Status {
440  Init, // Estado inicial colocado por el constructor de Walking_Agent
441  Running, // Los agentes se están ejecutando
442  Suspended, // La ejecución está suspendida pero puede ser reanudada
443  Stopped // La ejecución ha sido detenida (las threads han sido
444  // destruidas)
445  };
446 
447  size_t num_threads;
448  pthread_t * threads; // Arreglo de threads de dimensión num_threads
449  Status status; // Estado de ejecución del grafo
450 
451  /* protege las colas ready y suspended y en general todo el grafo */
452  pthread_mutex_t graph_mutex;
453 
454  /*
455  Esta variable de condición se utiliza para bloquear las threads en
456  dos situaciones:
457 
458  1-. Tenemos num_threads para procesar una cantidad arbitraria de
459  agentes. Sin embargo, en algunas circunstancias puede haber
460  menos agentes que num_threads. En este caso, debemos suspender
461  num_threads - num_agents threads que no se pueden procesar
462  puesto que no tienen un agente para correr. Esta variable de
463  condición es la que bloquea una thread que no tiene agente para
464  ejecutar.
465 
466  2-. Cuando el grafo se manda a suspender
467  */
468  pthread_cond_t graph_cond_var;
469 
470 public:
471 
472  typedef Concurrent_Graph<__GT, __Node, __Arc> Base_Graph;
473 
475  typedef typename Node::Node_Type Node_Type;
476 
478  typedef typename Arc::Arc_Type Arc_Type;
479 
481  typedef typename Agent::Agent_Type Agent_Type;
482 
483  friend class Agent_Iterator;
484 
485 private:
486 
489  Dlink agent_list; // lista global de agentes
490  size_t num_agents;
491 
492  Dlink ready_queue; // cola de agentes listos para procesarse
493  size_t num_agents_ready;
494 
495  Dlink suspended_queue; // cola de agentes suspendidos
496  size_t num_agents_suspended;
497 
498  size_t num_executing_agents;
499 
500  // bloquea start_graph() si usuario lo solicita
501  pthread_cond_t block_starter_cond_var;
502 
503  // atributos de la callback por timer
504  void * (*time_callback)(void * graph);
505 
506  struct timespec time_callback_period;
507 
508  pthread_t time_callback_thread;
509 
510  pthread_cond_t time_callback_cond_var;
511 
512  typedef void * (Time_Callback)(void *);
513 
514  // inserta agente por el trasero de la cola de listos.
515  // ATENCIÓN: No toma el mutex
516  void insert_agent_in_ready_queue(Agent * agent)
517  {
518  ready_queue.append(&agent->schedule_link_in_graph);
519  num_agents_ready++;
520  pthread_cond_signal(&graph_cond_var);
521  }
522 
523  // saca de cola de listos el agente del frente.
524  // ATENCIÓN: No toma el mutex
525  Agent * get_next_ready_agent()
526  {
527  if (not ready_queue.is_empty())
528  {
529  Dlink * link = ready_queue.get_next();
530 
531  Walking_Agent<Agent_Type> * walking_agent =
532  Agent::schedule_link_in_graph_to_Walking_Agent(link);
533 
534  Agent * result = static_cast<Agent*>(walking_agent);
535 
536  ready_queue.remove_next();
537  num_agents_ready--;
538 
539  return result;
540  }
541 
542  return NULL;
543  }
544 
545  // elimina el agente de la cola de listos; nótese que se trata de
546  // un agente arbitrario, no del que está en el frente
547  // ATENCIÓN: No toma el mutex
548  void remove_agent_from_ready_queue(Agent * agent)
549  {
550  num_agents_ready--;
551  agent->schedule_link_in_graph.del();
552  }
553 
554  // inserta agente por el trasero de la cola de suspendidos
555  // ATENCIÓN: No toma el mutex
556  void insert_agent_in_suspended_queue(Agent * agent)
557  {
558  suspended_queue.append(&agent->schedule_link_in_graph);
559  num_agents_suspended++;
560  }
561 
562  // elimina el agente de la cola de suspendidos; nótese que se trata de
563  // un agente arbitrario, no del que está en el frente
564  // ATENCIÓN: No toma el mutex
565  void remove_agent_from_suspended_queue(Agent * agent)
566  {
567  agent->schedule_link_in_graph.del();
568  num_agents_suspended--;
569  }
570 
571 public:
572 
574  size_t get_num_agents() const
575  {
576  CRITICAL_SECTION(graph_mutex);
577  return num_agents;
578  }
579 
581  size_t get_num_threads() const
582  {
583  CRITICAL_SECTION(graph_mutex);
584  return num_threads;
585  }
586 
587 private:
588 
589  // inserción genérica en nodo o arco
590  template <typename Location> inline static
591  void __insert_agent(Agent * agent, Location * location)
592  {
593  location->agent_list.append(&agent->location_link);
594  }
595 
596  void __insert_agent_in_node(Agent * agent, Node * p)
597  {
598  agent->location = p;
599  agent->in_node = true;
600  __insert_agent(agent, p);
601  }
602 
603  void __insert_agent_in_arc(Agent * agent, Arc * a)
604  {
605  agent->location = a;
606  agent->in_node = false;
607  __insert_agent(agent, a);
608  }
609 
610  void __remove_agent_from_location(Agent * agent)
611  {
612  agent->location = NULL;
613  agent->in_node = false;
614  agent->location_link.del();
615  }
616 
617  void __remove_agent_from_node(Agent * agent)
618  {
619  __remove_agent_from_location(agent);
620  }
621 
622  void __remove_agent_from_arc(Agent * agent)
623  {
624  __remove_agent_from_location(agent);
625  }
626 
627 public:
628 
629  Agent * get_first_agent()
630  {
631  CRITICAL_SECTION(graph_mutex);
632 
633  if (get_num_agents() == 0)
634  throw std::range_error("Graph has not agents ");
635 
636  return static_cast<Agent*>
637  (Agent::agent_link_in_graph_to_Walking_Agent (&*agent_list.get_next()));
638  }
639 
640 private:
641 
642  // asume que mutex del grafo ya está tomados.
643  // ATENCIÓN: INVOCANTE DEBE SOLTAR EL MUTEX Y LUEGO HACER DELETE
644  void __remove_agent_from_graph(Agent * agent)
645  {
646  if (not agent->location_link.is_empty())
647  agent->location_link.del();
648 
649  agent->agent_link_in_graph.del();
650  num_agents--;
651 
652  agent->schedule_link_in_graph.del();
653 
654  switch (agent->agent_state)
655  {
656  case Agent::Deleting_From_Ready:
657  num_agents_ready--; break;
658 
659  case Agent::Deleting_From_Suspended:
660  num_agents_suspended--; break;
661 
662  case Agent::Without_Queue:
663  case Agent::Invalid: break;
664 
665  default: EXIT("Invalid call (state %ld)", agent->agent_state);
666  }
667  }
668 
669 public:
670 
671  void remove_agent(Agent * agent)
672  {
673  CRITICAL_SECTION(graph_mutex);
674 
675  switch (agent->agent_state)
676  {
677  case Agent::Ready:
678  agent->agent_state = Agent::Deleting_From_Ready;
679  break; // eliminará agente después del switch
680 
681  case Agent::Suspended:
682  agent->agent_state = Agent::Deleting_From_Suspended;
683  break; // eliminará agente después del switch
684 
685  case Agent::Without_Queue:
686  case Agent::Invalid: break;
687 
688  case Agent::Executing:
689  case Agent::Suspending:
690  // Eliminación será realizada por thread que está
691  // ejecutando el agente
692  agent->agent_state = Agent::Deleting_From_Ready;
693  return;
694 
695  case Agent::Deleting_From_Ready:
696  case Agent::Deleting_From_Suspended:
697  throw std::domain_error("Agent is already deleting");
698 
699  default: EXIT("Invalid agent state %ld", agent->agent_state);
700  }
701 
702  __remove_agent_from_graph(agent);
703 
704  delete agent;
705  }
706 
707 private:
708 
709  void create_thread(pthread_t & thread, void *( *start_routine)(void*))
710  {
711  int result = pthread_create(&thread, NULL, start_routine, this);
712 
713  if (result != 0)
714  {
715  if (result == EAGAIN)
716  throw std::bad_alloc();
717  else
718  throw std::exception();
719  }
720  }
721 
722  // retorna el índice dentro del arreglo threads[] de thread
723  int search_thread(const pthread_t & thread)
724  {
725  return sequential_search(threads, thread, 0, num_threads - 1);
726  }
727 
728  void cancel_threads()
729  {
730  if (threads == NULL) // ¿fueron las threads creadas?
731  return; // NO
732 
733  for (int i = 0; i < num_threads; i++)
734  {
735  int st = pthread_cancel(threads[i]);
736 
737  if (st != 0)
738  ERROR("Error %d in pthread_join", st);
739  }
740  }
741 
742  void wait_for_threads_termination()
743  {
744  if (threads == NULL)
745  return;
746 
747  for (int i = 0; i < num_threads; i++)
748  {
749  int st = pthread_join(threads[i] , NULL);
750 
751  if (st != 0)
752  ERROR("Error %d in pthread_join", st);
753  }
754  }
755 
756 public:
757 
758  void start_graph(const bool block_caller = true)
759  {
760  CRITICAL_SECTION(graph_mutex);
761 
762  if (status == Running)
763  throw std::domain_error("Graph is already running");
764 
765  status = Running;
766 
767  threads = new pthread_t[num_threads];
768 
769  // crear las threads y colocarlas a ejecutar inmediatamente
770  for (int i = 0; i < num_threads; i++)
771  create_thread(threads[i], agent_handler);
772 
773  if (time_callback != NULL)
774  create_thread(time_callback_thread, time_callback_handler);
775 
776  if (block_caller)
777  pthread_cond_wait(&block_starter_cond_var, &graph_mutex);
778  }
779 
780 private:
781 
782  void __stop_graph()
783  {
784  if (threads == NULL)
785  return;
786 
787  if (status == Stopped)
788  throw std::domain_error("Graph is already stopped");
789 
790  if (status == Suspended)
791  throw std::domain_error("Graph is suspended");
792 
793  status = Stopped;
794 
795  pthread_cond_broadcast(&graph_cond_var);
796  pthread_cond_signal(&time_callback_cond_var);
797  }
798 
799 public:
800 
801  void stop_graph()
802  {
803  {
804  CRITICAL_SECTION(graph_mutex);
805  __stop_graph();
806  }
807 
808  wait_for_threads_termination();
809  }
810 
811  void suspend_graph()
812  {
813  CRITICAL_SECTION(graph_mutex);
814  status = Suspended;
815  }
816 
817  void resume_graph()
818  {
819  {
820  CRITICAL_SECTION(graph_mutex);
821 
822  if (status != Suspended)
823  throw std::domain_error("Graph has not been previously suspended");
824 
825  status = Running;
826  }
827 
828  pthread_cond_broadcast(&graph_cond_var);
829 
830  if (time_callback)
831  pthread_cond_signal(&time_callback_cond_var);
832  }
833 
834  void clear_agent_list()
835  {
836  if (status == Running)
837  suspend_graph();
838 
839  for (Dlink::Iterator it(&agent_list); it.has_current(); /* empty */)
840  {
841  Walking_Agent<Agent_Type> * walking_agent =
842  Agent::agent_link_in_graph_to_Walking_Agent(it.get_current());
843 
844  Agent * agent = static_cast<Agent*>(walking_agent);
845 
846  it.next();
847  remove_agent(agent);
848  }
849  }
850 
853  const long & get_num_agents_ready() const
854  {
855  CRITICAL_SECTION(graph_mutex);
856 
857  return num_agents_ready;
858  }
859 
860  const long & get_num_agents_suspended() const
861  {
862  CRITICAL_SECTION(graph_mutex);
863 
864  return num_agents_suspended;
865  }
866 
867 private:
868 
869  void __suspend_agent_in_graph(Agent * agent)
870  {
871  remove_agent_from_ready_queue(agent);
872  insert_agent_in_suspended_queue(agent);
873  }
874 
875 public:
876 
877  void suspend_agent(Agent * agent)
878  {
879  CRITICAL_SECTION(graph_mutex);
880 
881  // validar que el agente se suspenda en un estado correcto
882  switch (agent->agent_state)
883  {
884  case Agent::Suspended:
885  throw std::domain_error("Agent is already suspended");
886 
887  case Agent::Suspending:
888  throw std::domain_error("Agent is already suspending");
889 
890  case Agent::Executing:
891  // suspención será realizada por run() luego de ejecución
892  agent->agent_state = Agent::Suspending;
893  return;
894 
895  case Agent::Ready:
896  agent->agent_state = Agent::Suspended;
897  break;
898 
899  default:
900  EXIT("Invalid agent state %ld", agent->agent_state);
901  }
902  __suspend_agent_in_graph(agent);
903  }
904 
905 private:
906 
907  void __resume_agent_in_graph(Agent * agent)
908  {
909  remove_agent_from_suspended_queue(agent);
910  insert_agent_in_ready_queue(agent);
911  }
912 
913 public:
914 
915  void resume_agent(Agent * agent)
916  {
917  CRITICAL_SECTION(graph_mutex);
918 
919  switch (agent->agent_state)
920  {
921  case Agent::Ready:
922  case Agent::Executing:
923  throw throw std::domain_error("Agent is not suspended");
924 
925  case Agent::Suspending:
926  case Agent::Suspended:
927  agent->agent_state = Agent::Ready;
928  break;
929 
930  default:
931  Exit("Invalid agent state %ld", agent->agent_state);
932  }
933 
934  __resume_agent_in_graph(agent);
935  }
936 
937 private:
938 
939  void init()
940  {
941  init_mutex(graph_mutex);
942  pthread_cond_init(&graph_cond_var, NULL);
943  pthread_cond_init(&block_starter_cond_var, NULL);
944  }
945 
946 public:
947 
948  Agent_Graph(const size_t & __num_threads = 3, const size_t & num_mutexes = 4)
949  : Base_Graph(num_mutexes),
950  num_threads(__num_threads), threads(NULL), status(Init),
951  num_agents(0), num_agents_ready(0), num_agents_suspended(0),
952  num_executing_agents(0), time_callback(NULL)
953  {
954  init();
955  }
956 
957  void set_num_threads(const size_t & __num_threads)
958  {
959  if (threads != NULL)
960  throw std::domain_error("The graph has already threads");
961 
962  num_threads = __num_threads;
963  }
964 
965  virtual ~Agent_Graph()
966  {
967  {
968  CRITICAL_SECTION(graph_mutex);
969 
970  if (threads != NULL)
971  if (status == Running or status == Suspended or num_agents_ready == 0)
972  {
973  status = Stopped;
974  pthread_cond_broadcast(&graph_cond_var);
975  }
976  }
977 
978  cancel_time_callback();
979 
980  wait_for_threads_termination();
981 
982  clear_agent_list();
983  destroy_mutex(graph_mutex);
984  pthread_cond_destroy(&graph_cond_var);
985  pthread_cond_destroy(&block_starter_cond_var);
986 
987  delete [] threads;
988  }
989 
990  Agent_Graph(const Agent_Graph & g)
991  : Base_Graph(g),
992  num_threads(g.num_threads), threads(NULL), status(Init),
993  num_agents(0), num_agents_ready(), num_agents_suspended(0),
994  num_executing_agents(0), time_callback(g.time_callback)
995  {
996  init();
997  }
998 
999 private:
1000 
1001  template <typename Location> Agent *
1002  insert_agent_in_location(Agent * agent,
1003  Location * location,
1004  const bool suspended = false)
1005  {
1006  if (not (agent->agent_state == Agent::Invalid))
1007  agent->agent_state = suspended ? Agent::Suspended : Agent::Ready;
1008 
1009  CRITICAL_SECTION(graph_mutex);
1010 
1011  if (not agent->location_link.is_empty())
1012  throw std::domain_error("agent is already inside the graph");
1013 
1014  { CRITICAL_SECTION(location->mutex);
1015  __insert_agent_in_node(agent, location);
1016  }
1017 
1018  agent_list.append(&agent->agent_link_in_graph);
1019  num_agents++;
1020 
1021  if (not suspended)
1022  insert_agent_in_ready_queue(agent);
1023  else
1024  insert_agent_in_suspended_queue(agent);
1025 
1026  return agent;
1027  }
1028 
1029 public:
1030 
1031  Agent * insert_agent_in_node(Agent * agent, Node * p,
1032  const bool suspended = false)
1033  {
1034  return insert_agent_in_location(agent, p, suspended);
1035  }
1036 
1037  Agent * insert_agent_in_arc(Agent * agent, Arc * a,
1038  const bool suspended = false)
1039  {
1040  return insert_agent_in_location(agent, a, suspended);
1041  }
1042 
1043 private:
1044 
1045  template <typename Location> Agent *
1046  create_agent_in_location(const Agent_Type & agent_data,
1047  Location * location,
1048  long (*process_node) (void * agent,
1049  void * graph,
1050  void * src,
1051  void *& arc) = NULL,
1052  long (*process_arc) (void * agent,
1053  void * graph,
1054  void * arc) = NULL,
1055  const bool suspended = false)
1056  {
1057  /* puesto que esta función crea el agente y determina su dirección,
1058  no es necesario proteger sus atributos con el mutex */
1059  Agent * agent = new Agent(process_node, process_arc);
1060 
1061  agent->info = agent_data;
1062 
1063  if (agent->is_a_valid_agent())
1064  return insert_agent_in_location(agent, location, suspended);
1065  else
1066  return insert_agent_in_location(agent, location, true); // suspendido
1067 
1068  return agent;
1069  }
1070 
1071 public:
1072 
1073  Agent *
1074  create_agent_in_node(const Agent_Type & agent_data,
1075  Node * node,
1076  long (*process_node) (void * agent,
1077  void * graph,
1078  void * src,
1079  void *& arc) = NULL,
1080  long (*process_arc) (void * agent,
1081  void * graph,
1082  void * arc) = NULL,
1083  const bool suspended = false)
1084  {
1085  return create_agent_in_location(agent_data, node,
1086  process_node, process_arc, suspended);
1087  }
1088 
1089  Agent *
1090  create_agent_in_arc(const Agent_Type & agent_data,
1091  Arc * arc,
1092  long (*process_node) (void * agent,
1093  void * graph,
1094  void * src,
1095  void *& arc) = NULL,
1096  long (*process_arc) (void * agent,
1097  void * graph,
1098  void * arc) = NULL,
1099  const bool & suspended = false)
1100  {
1101  return create_agent_in_location(agent_data, arc,
1102  process_node, process_arc, suspended);
1103  }
1104 
1105  template <class Equal>
1106  Agent * search_agent(const Agent_Type & agent_data)
1107  {
1108  CRITICAL_SECTION(graph_mutex);
1109 
1110  for (Dlink::Iterator it(agent_list); it.has_current(); it.next())
1111  {
1112  Agent * agent = static_cast<Agent*>
1113  (Agent::agent_link_in_graph_to_Walking_Agent(it.get_current()));
1114 
1115  if (Equal () (agent->get_info(), agent_data))
1116  return agent;
1117  }
1118 
1119  return NULL;
1120  }
1121 
1122  Agent * search_agent(const Agent_Type & agent)
1123  {
1124  return search_agent<Aleph::equal_to<Agent_Type> >(agent);
1125  }
1126 
1127  int get_status()
1128  {
1129  CRITICAL_SECTION(graph_mutex);
1130  return status;
1131  }
1132 
1133 private:
1134 
1135  static void * time_callback_handler(void * cookie)
1136  {
1137  Agent_Graph * graph = static_cast<Agent_Graph*>(cookie);
1138 
1139  timeval now;
1140  timespec timeout;
1141 
1142  time_t sec = graph->time_callback_period.tv_sec;
1143  long nsec = graph->time_callback_period.tv_nsec;
1144 
1145  while (true)
1146  {
1147  CRITICAL_SECTION(graph->graph_mutex);
1148 
1149  switch (graph->status)
1150  {
1151  case Stopped: return NULL; // terminar thread
1152 
1153  case Suspended: // suspender la thread con variable condición
1154  pthread_cond_wait(&graph->time_callback_cond_var,
1155  &graph->graph_mutex);
1156  break;
1157 
1158  case Running:
1159  gettimeofday(&now, NULL);
1160  timeout.tv_sec = now.tv_sec + sec;
1161  timeout.tv_nsec = now.tv_usec * 1000 + nsec;
1162  if (timeout.tv_nsec >= 1000000000)
1163  {
1164  timeout.tv_sec++;
1165  timeout.tv_nsec %= 1000000000;
1166  }
1167 
1168  if (pthread_cond_timedwait(&graph->time_callback_cond_var,
1169  &graph->graph_mutex,
1170  &timeout) == ETIMEDOUT)
1171  if (graph->status == Running)
1172  (*graph->time_callback)(graph); // invocar time_callback
1173 
1174  break; // case Running:
1175 
1176  default: EXIT("Invalid execution state %ld", graph->status);
1177  }
1178  }
1179  }
1180 
1181 
1182 public:
1183 
1184  void set_time_callback(Time_Callback __time_callback,
1185  const int & sec = 10,
1186  const long & nanosec = 0)
1187  {
1188  CRITICAL_SECTION(graph_mutex);
1189 
1190  I(__time_callback != NULL);
1191 
1192  time_callback = __time_callback;
1193  time_callback_period.tv_sec = sec;
1194  time_callback_period.tv_nsec = nanosec;
1195 
1196  pthread_cond_init(&time_callback_cond_var, NULL);
1197  }
1198 
1199  void cancel_time_callback()
1200  {
1201  if (time_callback == NULL)
1202  return;
1203 
1204  CRITICAL_SECTION(graph_mutex);
1205  pthread_cancel(time_callback_thread);
1206  time_callback = NULL;
1207  pthread_cond_destroy(&time_callback_cond_var);
1208  }
1209 
1210 private:
1211 
1212  long execute_from_node(Agent * agent, Node * src)
1213  {
1214  if (agent->process_from_node == NULL)
1215  throw std::domain_error("There is no function for arc processing");
1216 
1217  I(agent->is_in_node());
1218 
1219  void * a = NULL;
1220  long ret = -1;
1221 
1222  { CRITICAL_SECTION(src->mutex);
1223  ret = agent->process_from_node(agent, this, src, a);
1224  }
1225 
1226  switch (ret)
1227  {
1228  case Agent::Remain_In_Node:
1229  case Agent::Suspend:
1230  case Agent::Dead:
1231  break;
1232 
1233  case Agent::Enter_In_Arc:
1234  {
1235  if (a == NULL)
1236  ERROR("there is no arc to enter");
1237  agent->src = src;
1238 
1239  { CRITICAL_SECTION(src->mutex);
1240  __remove_agent_from_node(agent);
1241  }
1242 
1243  Arc * arc = (Arc*) a;
1244 
1245  { CRITICAL_SECTION(arc->mutex);
1246  __insert_agent_in_arc(agent, arc);
1247  }
1248  break;
1249  }
1250  case Agent::Enter_In_Node:
1251  ERROR("Invalid action: Enter_In_Node transition from a node");
1252 
1253  case Agent::Remain_In_Arc:
1254  ERROR("Invalid action: Remain_In_Node transition from a node");
1255 
1256  default: ERROR("Panic: invalid transtion action");
1257  }
1258 
1259  return ret;
1260  }
1261 
1262  long execute_from_arc(Agent * agent, Arc * a)
1263  {
1264  if (agent->process_from_arc == NULL)
1265  throw std::domain_error("There is no function for arc processing");
1266 
1267  long ret = -1;
1268  { CRITICAL_SECTION(a->mutex);
1269  ret = agent->process_from_arc(agent, this, a);
1270  }
1271 
1272  switch (ret)
1273  {
1274  case Agent::Remain_In_Arc:
1275  case Agent::Suspend:
1276  case Agent::Dead:
1277  break;
1278 
1279  case Agent::Enter_In_Node:
1280  {
1281  Node * tgt = NULL;
1282  Node * src = (Node *) agent->src;
1283 
1284  { CRITICAL_SECTION(a->mutex);
1285  tgt = get_connected_node(a, src);
1286  __remove_agent_from_arc(agent);
1287  }
1288 
1289  { CRITICAL_SECTION(tgt->mutex);
1290  __insert_agent_in_node(agent, tgt);
1291  }
1292 
1293  agent->src = NULL;
1294 
1295  break;
1296  }
1297  case Agent::Enter_In_Arc:
1298  ERROR("Invalid action: Enter_In_Arc transition from a arc");
1299 
1300  case Agent::Remain_In_Node:
1301  ERROR("Invalid action: Remain_In_Arc transition from a arc");
1302 
1303  default: ERROR("Panic: invalid transtion action");
1304  }
1305 
1306  return ret;
1307  }
1308 
1309  long execute_out_location(Agent * agent)
1310  {
1311  if (agent->process_out_location == NULL)
1312  throw std::domain_error("There is no function for arc processing");
1313 
1314  I(agent->location_link.is_empty());
1315 
1316 
1317  }
1318 
1319  long execute_agent(Agent * agent)
1320  {
1321  // if (agent->location_link.is_empty())
1322  // TODO: una nueva función si el agente no está en localidad
1323  if (agent->is_in_node())
1324  {
1325  Node * p = (Node*)agent->location;
1326  return execute_from_node(agent, p);
1327  }
1328 
1329  Arc * a = (Arc*)agent->location;
1330  return execute_from_arc(agent, a);
1331  }
1332 
1333  static void * agent_handler(void * cookie)
1334  {
1335  Agent_Graph * graph = static_cast<Agent_Graph*>(cookie);
1336 
1337  while (true)
1338  {
1339  CRITICAL_SECTION(graph->graph_mutex);
1340 
1341  switch (graph->status)
1342  {
1343  case Stopped: return NULL; // terminar thread
1344 
1345  case Suspended: // suspender la thread con variable condición
1346  pthread_cond_wait(&graph->graph_cond_var, &graph->graph_mutex);
1347  break;
1348 
1349  case Running:
1350  {
1351  Agent * ready_agent = graph->get_next_ready_agent();
1352 
1353  if (ready_agent == NULL)
1354  {
1355  if (graph->num_executing_agents == 0 and
1356  graph->num_agents_ready == 0)
1357  // despertar a thread start_graph() (si fue suspendida)
1358  pthread_cond_signal(&graph->block_starter_cond_var);
1359 
1360  // si no hay agente se señaliza start_graph() y se
1361  // suspende esta thread
1362  pthread_cond_wait(&graph->graph_cond_var, &graph->graph_mutex);
1363  }
1364  else
1365  { graph->num_executing_agents++;
1366 
1367  // libera mutex grafo antes ejecutar agente
1368  critical_section.unlock();
1369  long action = graph->execute_agent(ready_agent);
1370  critical_section.lock(); // retoma el mutex del grafo
1371 
1372  graph->num_executing_agents--;
1373 
1374  switch (action)
1375  {
1376  case Agent::Remain_In_Node:
1377  case Agent::Remain_In_Arc:
1378  case Agent::Enter_In_Node:
1379  case Agent::Enter_In_Arc:
1380  graph->insert_agent_in_ready_queue(ready_agent);
1381  break;
1382 
1383  case Agent::Suspend:
1384  graph->__suspend_agent_in_graph(ready_agent);
1385  break;
1386 
1387  case Agent::Dead:
1388  graph->__remove_agent_from_graph(ready_agent);
1389  delete ready_agent;
1390  break;
1391  } // end switch (action)
1392  }
1393  break; // case Running:
1394  }
1395  default: EXIT("Invalid execution state %ld", graph->status);
1396  }
1397  } // end while (true)
1398  }
1399 
1400  void verify_graph_conditions_of_modification()
1401  {
1402  if (status == Running)
1403  throw std::domain_error("graph is running");
1404 
1405  if (status == Init)
1406  throw std::domain_error("graph is Init state. Use without mutex");
1407  }
1408 
1409 public:
1410 
1412  {
1413  Agent_Graph * g;
1414 
1415  public:
1416 
1417  Agent_Iterator(Agent_Graph & __g) : Dlink::Iterator(&(__g.agent_list)), g(&__g)
1418  {
1419  // empty
1420  }
1421 
1422  Agent * get_current()
1423  {
1424  Agent * curr_agent = static_cast<Agent*>
1425  (Agent::agent_link_in_graph_to_Walking_Agent(Dlink::Iterator::get_current()));
1426 
1427  return curr_agent;
1428  }
1429  };
1430 
1431 
1432 }; // end Agent_Graph
1433 
1434 
1435 } // namespace Aleph
1436 # endif
Definition: tpl_agent_graph.H:1411
const long & get_num_agents_ready() const
Definition: tpl_agent_graph.H:853
Definition: tpl_concurrent_graph.H:22
Definition: tpl_agent_graph.H:335
Definition: tpl_graph.H:21
int sequential_search(T *a, const T &x, const int l, const int r, Equal &eq)
Definition: tpl_sort_utils.H:260
void set_process_arc(Process_From_Arc __process_from_arc)
Definition: tpl_agent_graph.H:240
#define LINKNAME_TO_TYPE(type_name, link_name)
Definition: dlink.H:741
GT::Node Node
El tipo de nodo.
Definition: tpl_agent_graph.H:429
Walking_Agent(Process_From_Node __process_from_node, Process_From_Arc __process_from_arc, Process_Out_Location __process_out_location)
Instancia un agente con funciones de transito.
Definition: tpl_agent_graph.H:275
size_t get_num_threads() const
Retorna el número de threads que ejecutan a los agentes.
Definition: tpl_agent_graph.H:581
bool is_in_arc() const
Retorna true si el agente está en un arco; false si está en un nodo.
Definition: tpl_agent_graph.H:304
Arc::Arc_Type Arc_Type
El tipo de información que contiene el arco.
Definition: tpl_agent_graph.H:478
Node::Node_Type Node_Type
El tipo de información que contiene el nodo.
Definition: tpl_agent_graph.H:475
bool is_in_node() const
Retorna true si el agente está en un nodo; false si está en un arco.
Definition: tpl_agent_graph.H:301
Definition: tpl_ant.H:93
GT::Arc Arc
El tipo de arco.
Definition: tpl_agent_graph.H:432
void * get_src_node()
Definition: tpl_agent_graph.H:220
Definition: tpl_agent_graph.H:313
Definition: tpl_agent_graph.H:22
Definition: tpl_graph.H:19
Walking_Agent()
Instancia un agente vacío.
Definition: tpl_agent_graph.H:253
Walking_Agent(const Agent_Info &agent_info, Process_From_Node __process_from_node=NULL, Process_From_Arc __process_from_arc=NULL, Process_Out_Location __process_out_location=NULL)
Definition: tpl_agent_graph.H:262
void set_process_node(Process_From_Node __process_from_node)
Definition: tpl_agent_graph.H:232
void *& get_cookie()
Retorna una referencia al cookie del agente.
Definition: tpl_agent_graph.H:298
Definition: tpl_agent_graph.H:422
Definition: ahDefs.H:24
Agent::Agent_Type Agent_Type
El tipo de información que contiene el agente.
Definition: tpl_agent_graph.H:481
Definition: tpl_agent_graph.H:74
size_t get_num_agents() const
Retorna el número total de agentes del grafo.
Definition: tpl_agent_graph.H:574

Leandro Rabindranath León