1 # ifndef TPL_AGENT_GRAPH_H
2 # define TPL_AGENT_GRAPH_H
8 # include <sys/types.h>
10 # include <tpl_concurrent_graph.H>
11 # include <tpl_dynArray.H>
12 # include <tpl_arrayQueue.H>
14 using namespace Aleph;
22 template <
typename Agent_Info>
class Agent;
73 template <
typename Agent_Info>
83 Deleting_From_Suspended,
88 enum Transition_Action
134 typedef long (*Process_From_Node) (
void * agent,
164 typedef long (*Process_From_Arc) (
void * agent,
void * graph,
void * arc);
183 typedef long (*Process_Out_Location) (
void * agent,
188 Agent_State agent_state;
195 Dlink agent_link_in_graph;
198 Dlink schedule_link_in_graph;
212 Process_From_Node process_from_node;
214 Process_From_Arc process_from_arc;
216 Process_Out_Location process_out_location;
222 void set_agent_state()
224 if (process_from_node == NULL or process_from_arc)
225 agent_state = Invalid;
227 agent_state = Without_Queue;
234 process_from_node = __process_from_node;
242 process_from_arc = __process_from_arc;
246 bool is_a_valid_agent()
const
248 return ((process_from_node != NULL and process_from_arc != NULL) or
249 (agent_state == Invalid));
254 : location(NULL), src(NULL), cookie(NULL),
255 process_from_node(NULL), process_from_arc(NULL)
257 agent_state = Invalid;
263 Process_From_Node __process_from_node = NULL,
264 Process_From_Arc __process_from_arc = NULL,
265 Process_Out_Location __process_out_location = NULL)
266 : info(agent_info), location(NULL), src(NULL), cookie(NULL),
267 process_from_node(__process_from_node),
268 process_from_arc(__process_from_arc),
269 process_out_location(__process_out_location)
276 Process_From_Arc __process_from_arc,
277 Process_Out_Location __process_out_location)
278 : agent_state(Without_Queue), location(NULL), src(NULL), cookie(NULL),
279 process_from_node(__process_from_node),
280 process_from_arc(__process_from_arc),
281 process_out_location(__process_out_location)
286 typedef Agent_Info Agent_Type;
295 Agent_Info & get_info() {
return info; }
312 template <
typename Agents_Node_Info>
321 typedef Agents_Node_Info Node_Type;
334 template <
typename Agents_Arc_Info>
343 typedef Agents_Arc_Info Arc_Type;
418 template <
template <
class,
class>
class __GT,
434 typedef __Agent
Agent;
452 pthread_mutex_t graph_mutex;
468 pthread_cond_t graph_cond_var;
483 friend class Agent_Iterator;
493 size_t num_agents_ready;
495 Dlink suspended_queue;
496 size_t num_agents_suspended;
498 size_t num_executing_agents;
501 pthread_cond_t block_starter_cond_var;
504 void * (*time_callback)(
void * graph);
506 struct timespec time_callback_period;
508 pthread_t time_callback_thread;
510 pthread_cond_t time_callback_cond_var;
512 typedef void * (Time_Callback)(
void *);
516 void insert_agent_in_ready_queue(
Agent * agent)
518 ready_queue.append(&agent->schedule_link_in_graph);
520 pthread_cond_signal(&graph_cond_var);
525 Agent * get_next_ready_agent()
527 if (not ready_queue.is_empty())
532 Agent::schedule_link_in_graph_to_Walking_Agent(link);
534 Agent * result =
static_cast<Agent*
>(walking_agent);
536 ready_queue.remove_next();
548 void remove_agent_from_ready_queue(
Agent * agent)
551 agent->schedule_link_in_graph.del();
556 void insert_agent_in_suspended_queue(
Agent * agent)
558 suspended_queue.append(&agent->schedule_link_in_graph);
559 num_agents_suspended++;
565 void remove_agent_from_suspended_queue(
Agent * agent)
567 agent->schedule_link_in_graph.del();
568 num_agents_suspended--;
576 CRITICAL_SECTION(graph_mutex);
583 CRITICAL_SECTION(graph_mutex);
590 template <
typename Location>
inline static
591 void __insert_agent(
Agent * agent, Location * location)
593 location->agent_list.append(&agent->location_link);
596 void __insert_agent_in_node(
Agent * agent, Node * p)
599 agent->in_node =
true;
600 __insert_agent(agent, p);
603 void __insert_agent_in_arc(
Agent * agent, Arc * a)
606 agent->in_node =
false;
607 __insert_agent(agent, a);
610 void __remove_agent_from_location(
Agent * agent)
612 agent->location = NULL;
613 agent->in_node =
false;
614 agent->location_link.del();
617 void __remove_agent_from_node(
Agent * agent)
619 __remove_agent_from_location(agent);
622 void __remove_agent_from_arc(
Agent * agent)
624 __remove_agent_from_location(agent);
629 Agent * get_first_agent()
631 CRITICAL_SECTION(graph_mutex);
633 if (get_num_agents() == 0)
634 throw std::range_error(
"Graph has not agents ");
636 return static_cast<Agent*
>
637 (Agent::agent_link_in_graph_to_Walking_Agent (&*agent_list.get_next()));
644 void __remove_agent_from_graph(
Agent * agent)
646 if (not agent->location_link.is_empty())
647 agent->location_link.del();
649 agent->agent_link_in_graph.del();
652 agent->schedule_link_in_graph.del();
654 switch (agent->agent_state)
656 case Agent::Deleting_From_Ready:
657 num_agents_ready--;
break;
659 case Agent::Deleting_From_Suspended:
660 num_agents_suspended--;
break;
662 case Agent::Without_Queue:
663 case Agent::Invalid:
break;
665 default: EXIT(
"Invalid call (state %ld)", agent->agent_state);
671 void remove_agent(
Agent * agent)
673 CRITICAL_SECTION(graph_mutex);
675 switch (agent->agent_state)
678 agent->agent_state = Agent::Deleting_From_Ready;
681 case Agent::Suspended:
682 agent->agent_state = Agent::Deleting_From_Suspended;
685 case Agent::Without_Queue:
686 case Agent::Invalid:
break;
688 case Agent::Executing:
689 case Agent::Suspending:
692 agent->agent_state = Agent::Deleting_From_Ready;
695 case Agent::Deleting_From_Ready:
696 case Agent::Deleting_From_Suspended:
697 throw std::domain_error(
"Agent is already deleting");
699 default: EXIT(
"Invalid agent state %ld", agent->agent_state);
702 __remove_agent_from_graph(agent);
709 void create_thread(pthread_t & thread,
void *( *start_routine)(
void*))
711 int result = pthread_create(&thread, NULL, start_routine,
this);
715 if (result == EAGAIN)
716 throw std::bad_alloc();
718 throw std::exception();
723 int search_thread(
const pthread_t & thread)
728 void cancel_threads()
733 for (
int i = 0; i < num_threads; i++)
735 int st = pthread_cancel(threads[i]);
738 ERROR(
"Error %d in pthread_join", st);
742 void wait_for_threads_termination()
747 for (
int i = 0; i < num_threads; i++)
749 int st = pthread_join(threads[i] , NULL);
752 ERROR(
"Error %d in pthread_join", st);
758 void start_graph(
const bool block_caller =
true)
760 CRITICAL_SECTION(graph_mutex);
762 if (status == Running)
763 throw std::domain_error(
"Graph is already running");
767 threads =
new pthread_t[num_threads];
770 for (
int i = 0; i < num_threads; i++)
771 create_thread(threads[i], agent_handler);
773 if (time_callback != NULL)
774 create_thread(time_callback_thread, time_callback_handler);
777 pthread_cond_wait(&block_starter_cond_var, &graph_mutex);
787 if (status == Stopped)
788 throw std::domain_error(
"Graph is already stopped");
790 if (status == Suspended)
791 throw std::domain_error(
"Graph is suspended");
795 pthread_cond_broadcast(&graph_cond_var);
796 pthread_cond_signal(&time_callback_cond_var);
804 CRITICAL_SECTION(graph_mutex);
808 wait_for_threads_termination();
813 CRITICAL_SECTION(graph_mutex);
820 CRITICAL_SECTION(graph_mutex);
822 if (status != Suspended)
823 throw std::domain_error(
"Graph has not been previously suspended");
828 pthread_cond_broadcast(&graph_cond_var);
831 pthread_cond_signal(&time_callback_cond_var);
834 void clear_agent_list()
836 if (status == Running)
842 Agent::agent_link_in_graph_to_Walking_Agent(it.get_current());
844 Agent * agent =
static_cast<Agent*
>(walking_agent);
855 CRITICAL_SECTION(graph_mutex);
857 return num_agents_ready;
860 const long & get_num_agents_suspended()
const
862 CRITICAL_SECTION(graph_mutex);
864 return num_agents_suspended;
869 void __suspend_agent_in_graph(
Agent * agent)
871 remove_agent_from_ready_queue(agent);
872 insert_agent_in_suspended_queue(agent);
877 void suspend_agent(
Agent * agent)
879 CRITICAL_SECTION(graph_mutex);
882 switch (agent->agent_state)
884 case Agent::Suspended:
885 throw std::domain_error(
"Agent is already suspended");
887 case Agent::Suspending:
888 throw std::domain_error(
"Agent is already suspending");
890 case Agent::Executing:
892 agent->agent_state = Agent::Suspending;
896 agent->agent_state = Agent::Suspended;
900 EXIT(
"Invalid agent state %ld", agent->agent_state);
902 __suspend_agent_in_graph(agent);
907 void __resume_agent_in_graph(
Agent * agent)
909 remove_agent_from_suspended_queue(agent);
910 insert_agent_in_ready_queue(agent);
915 void resume_agent(
Agent * agent)
917 CRITICAL_SECTION(graph_mutex);
919 switch (agent->agent_state)
922 case Agent::Executing:
923 throw throw std::domain_error(
"Agent is not suspended");
925 case Agent::Suspending:
926 case Agent::Suspended:
927 agent->agent_state = Agent::Ready;
931 Exit(
"Invalid agent state %ld", agent->agent_state);
934 __resume_agent_in_graph(agent);
941 init_mutex(graph_mutex);
942 pthread_cond_init(&graph_cond_var, NULL);
943 pthread_cond_init(&block_starter_cond_var, NULL);
948 Agent_Graph(
const size_t & __num_threads = 3,
const size_t & num_mutexes = 4)
949 : Base_Graph(num_mutexes),
950 num_threads(__num_threads), threads(NULL), status(Init),
951 num_agents(0), num_agents_ready(0), num_agents_suspended(0),
952 num_executing_agents(0), time_callback(NULL)
957 void set_num_threads(
const size_t & __num_threads)
960 throw std::domain_error(
"The graph has already threads");
962 num_threads = __num_threads;
968 CRITICAL_SECTION(graph_mutex);
971 if (status == Running or status == Suspended or num_agents_ready == 0)
974 pthread_cond_broadcast(&graph_cond_var);
978 cancel_time_callback();
980 wait_for_threads_termination();
983 destroy_mutex(graph_mutex);
984 pthread_cond_destroy(&graph_cond_var);
985 pthread_cond_destroy(&block_starter_cond_var);
992 num_threads(g.num_threads), threads(NULL), status(Init),
993 num_agents(0), num_agents_ready(), num_agents_suspended(0),
994 num_executing_agents(0), time_callback(g.time_callback)
1001 template <
typename Location>
Agent *
1002 insert_agent_in_location(
Agent * agent,
1003 Location * location,
1004 const bool suspended =
false)
1006 if (not (agent->agent_state == Agent::Invalid))
1007 agent->agent_state = suspended ? Agent::Suspended : Agent::Ready;
1009 CRITICAL_SECTION(graph_mutex);
1011 if (not agent->location_link.is_empty())
1012 throw std::domain_error(
"agent is already inside the graph");
1014 { CRITICAL_SECTION(location->mutex);
1015 __insert_agent_in_node(agent, location);
1018 agent_list.append(&agent->agent_link_in_graph);
1022 insert_agent_in_ready_queue(agent);
1024 insert_agent_in_suspended_queue(agent);
1031 Agent * insert_agent_in_node(
Agent * agent, Node * p,
1032 const bool suspended =
false)
1034 return insert_agent_in_location(agent, p, suspended);
1037 Agent * insert_agent_in_arc(
Agent * agent, Arc * a,
1038 const bool suspended =
false)
1040 return insert_agent_in_location(agent, a, suspended);
1045 template <
typename Location>
Agent *
1046 create_agent_in_location(
const Agent_Type & agent_data,
1047 Location * location,
1048 long (*process_node) (
void * agent,
1051 void *& arc) = NULL,
1052 long (*process_arc) (
void * agent,
1055 const bool suspended =
false)
1059 Agent * agent =
new Agent(process_node, process_arc);
1061 agent->info = agent_data;
1063 if (agent->is_a_valid_agent())
1064 return insert_agent_in_location(agent, location, suspended);
1066 return insert_agent_in_location(agent, location,
true);
1074 create_agent_in_node(
const Agent_Type & agent_data,
1076 long (*process_node) (
void * agent,
1079 void *& arc) = NULL,
1080 long (*process_arc) (
void * agent,
1083 const bool suspended =
false)
1085 return create_agent_in_location(agent_data, node,
1086 process_node, process_arc, suspended);
1090 create_agent_in_arc(
const Agent_Type & agent_data,
1092 long (*process_node) (
void * agent,
1095 void *& arc) = NULL,
1096 long (*process_arc) (
void * agent,
1099 const bool & suspended =
false)
1101 return create_agent_in_location(agent_data, arc,
1102 process_node, process_arc, suspended);
1105 template <
class Equal>
1106 Agent * search_agent(
const Agent_Type & agent_data)
1108 CRITICAL_SECTION(graph_mutex);
1113 (Agent::agent_link_in_graph_to_Walking_Agent(it.get_current()));
1115 if (Equal () (agent->get_info(), agent_data))
1122 Agent * search_agent(
const Agent_Type & agent)
1124 return search_agent<Aleph::equal_to<Agent_Type> >(agent);
1129 CRITICAL_SECTION(graph_mutex);
1135 static void * time_callback_handler(
void * cookie)
1142 time_t sec = graph->time_callback_period.tv_sec;
1143 long nsec = graph->time_callback_period.tv_nsec;
1147 CRITICAL_SECTION(graph->graph_mutex);
1149 switch (graph->status)
1151 case Stopped:
return NULL;
1154 pthread_cond_wait(&graph->time_callback_cond_var,
1155 &graph->graph_mutex);
1159 gettimeofday(&now, NULL);
1160 timeout.tv_sec = now.tv_sec + sec;
1161 timeout.tv_nsec = now.tv_usec * 1000 + nsec;
1162 if (timeout.tv_nsec >= 1000000000)
1165 timeout.tv_nsec %= 1000000000;
1168 if (pthread_cond_timedwait(&graph->time_callback_cond_var,
1169 &graph->graph_mutex,
1170 &timeout) == ETIMEDOUT)
1171 if (graph->status == Running)
1172 (*graph->time_callback)(graph);
1176 default: EXIT(
"Invalid execution state %ld", graph->status);
1184 void set_time_callback(Time_Callback __time_callback,
1185 const int & sec = 10,
1186 const long & nanosec = 0)
1188 CRITICAL_SECTION(graph_mutex);
1190 I(__time_callback != NULL);
1192 time_callback = __time_callback;
1193 time_callback_period.tv_sec = sec;
1194 time_callback_period.tv_nsec = nanosec;
1196 pthread_cond_init(&time_callback_cond_var, NULL);
1199 void cancel_time_callback()
1201 if (time_callback == NULL)
1204 CRITICAL_SECTION(graph_mutex);
1205 pthread_cancel(time_callback_thread);
1206 time_callback = NULL;
1207 pthread_cond_destroy(&time_callback_cond_var);
1212 long execute_from_node(
Agent * agent, Node * src)
1214 if (agent->process_from_node == NULL)
1215 throw std::domain_error(
"There is no function for arc processing");
1217 I(agent->is_in_node());
1222 { CRITICAL_SECTION(src->mutex);
1223 ret = agent->process_from_node(agent,
this, src, a);
1228 case Agent::Remain_In_Node:
1229 case Agent::Suspend:
1233 case Agent::Enter_In_Arc:
1236 ERROR(
"there is no arc to enter");
1239 { CRITICAL_SECTION(src->mutex);
1240 __remove_agent_from_node(agent);
1243 Arc * arc = (Arc*) a;
1245 { CRITICAL_SECTION(arc->mutex);
1246 __insert_agent_in_arc(agent, arc);
1250 case Agent::Enter_In_Node:
1251 ERROR(
"Invalid action: Enter_In_Node transition from a node");
1253 case Agent::Remain_In_Arc:
1254 ERROR(
"Invalid action: Remain_In_Node transition from a node");
1256 default: ERROR(
"Panic: invalid transtion action");
1262 long execute_from_arc(
Agent * agent, Arc * a)
1264 if (agent->process_from_arc == NULL)
1265 throw std::domain_error(
"There is no function for arc processing");
1268 { CRITICAL_SECTION(a->mutex);
1269 ret = agent->process_from_arc(agent,
this, a);
1274 case Agent::Remain_In_Arc:
1275 case Agent::Suspend:
1279 case Agent::Enter_In_Node:
1282 Node * src = (Node *) agent->src;
1284 { CRITICAL_SECTION(a->mutex);
1285 tgt = get_connected_node(a, src);
1286 __remove_agent_from_arc(agent);
1289 { CRITICAL_SECTION(tgt->mutex);
1290 __insert_agent_in_node(agent, tgt);
1297 case Agent::Enter_In_Arc:
1298 ERROR(
"Invalid action: Enter_In_Arc transition from a arc");
1300 case Agent::Remain_In_Node:
1301 ERROR(
"Invalid action: Remain_In_Arc transition from a arc");
1303 default: ERROR(
"Panic: invalid transtion action");
1309 long execute_out_location(
Agent * agent)
1311 if (agent->process_out_location == NULL)
1312 throw std::domain_error(
"There is no function for arc processing");
1314 I(agent->location_link.is_empty());
1319 long execute_agent(
Agent * agent)
1323 if (agent->is_in_node())
1325 Node * p = (Node*)agent->location;
1326 return execute_from_node(agent, p);
1329 Arc * a = (Arc*)agent->location;
1330 return execute_from_arc(agent, a);
1333 static void * agent_handler(
void * cookie)
1339 CRITICAL_SECTION(graph->graph_mutex);
1341 switch (graph->status)
1343 case Stopped:
return NULL;
1346 pthread_cond_wait(&graph->graph_cond_var, &graph->graph_mutex);
1351 Agent * ready_agent = graph->get_next_ready_agent();
1353 if (ready_agent == NULL)
1355 if (graph->num_executing_agents == 0 and
1356 graph->num_agents_ready == 0)
1358 pthread_cond_signal(&graph->block_starter_cond_var);
1362 pthread_cond_wait(&graph->graph_cond_var, &graph->graph_mutex);
1365 { graph->num_executing_agents++;
1368 critical_section.unlock();
1369 long action = graph->execute_agent(ready_agent);
1370 critical_section.lock();
1372 graph->num_executing_agents--;
1376 case Agent::Remain_In_Node:
1377 case Agent::Remain_In_Arc:
1378 case Agent::Enter_In_Node:
1379 case Agent::Enter_In_Arc:
1380 graph->insert_agent_in_ready_queue(ready_agent);
1383 case Agent::Suspend:
1384 graph->__suspend_agent_in_graph(ready_agent);
1388 graph->__remove_agent_from_graph(ready_agent);
1395 default: EXIT(
"Invalid execution state %ld", graph->status);
1400 void verify_graph_conditions_of_modification()
1402 if (status == Running)
1403 throw std::domain_error(
"graph is running");
1406 throw std::domain_error(
"graph is Init state. Use without mutex");
1422 Agent * get_current()
1424 Agent * curr_agent =
static_cast<Agent*
>
Definition: tpl_agent_graph.H:1411
const long & get_num_agents_ready() const
Definition: tpl_agent_graph.H:853
Definition: tpl_concurrent_graph.H:22
Definition: tpl_agent_graph.H:335
Definition: tpl_graph.H:21
int sequential_search(T *a, const T &x, const int l, const int r, Equal &eq)
Definition: tpl_sort_utils.H:260
void set_process_arc(Process_From_Arc __process_from_arc)
Definition: tpl_agent_graph.H:240
#define LINKNAME_TO_TYPE(type_name, link_name)
Definition: dlink.H:741
Iterador sobre enlaces.
Definition: dlink.H:437
GT::Node Node
El tipo de nodo.
Definition: tpl_agent_graph.H:429
Walking_Agent(Process_From_Node __process_from_node, Process_From_Arc __process_from_arc, Process_Out_Location __process_out_location)
Instancia un agente con funciones de transito.
Definition: tpl_agent_graph.H:275
Dlink *& get_next()
Retorna enlace después de this.
Definition: dlink.H:179
size_t get_num_threads() const
Retorna el número de threads que ejecutan a los agentes.
Definition: tpl_agent_graph.H:581
bool is_in_arc() const
Retorna true si el agente está en un arco; false si está en un nodo.
Definition: tpl_agent_graph.H:304
Arc::Arc_Type Arc_Type
El tipo de información que contiene el arco.
Definition: tpl_agent_graph.H:478
Node::Node_Type Node_Type
El tipo de información que contiene el nodo.
Definition: tpl_agent_graph.H:475
bool has_current() const
Retorna true si iterador aún tiene elemento actual.
Definition: dlink.H:554
bool is_in_node() const
Retorna true si el agente está en un nodo; false si está en un arco.
Definition: tpl_agent_graph.H:301
GT::Arc Arc
El tipo de arco.
Definition: tpl_agent_graph.H:432
void * get_src_node()
Definition: tpl_agent_graph.H:220
Definition: tpl_agent_graph.H:313
Definition: tpl_agent_graph.H:22
Definition: tpl_graph.H:19
Walking_Agent()
Instancia un agente vacío.
Definition: tpl_agent_graph.H:253
Walking_Agent(const Agent_Info &agent_info, Process_From_Node __process_from_node=NULL, Process_From_Arc __process_from_arc=NULL, Process_Out_Location __process_out_location=NULL)
Definition: tpl_agent_graph.H:262
void set_process_node(Process_From_Node __process_from_node)
Definition: tpl_agent_graph.H:232
void *& get_cookie()
Retorna una referencia al cookie del agente.
Definition: tpl_agent_graph.H:298
Dlink * get_current() const
Retorna dirección de nodo actual.
Definition: dlink.H:564
Definition: tpl_agent_graph.H:422
Agent::Agent_Type Agent_Type
El tipo de información que contiene el agente.
Definition: tpl_agent_graph.H:481
Definition: tpl_agent_graph.H:74
size_t get_num_agents() const
Retorna el número total de agentes del grafo.
Definition: tpl_agent_graph.H:574