#include #include #include #include #include #include #define N 2 struct msg { int value; }; struct port { struct msg messages[N]; pthread_mutex_t m; pthread_cond_t empty; pthread_cond_t full; int count; int head; int tail; }; void msg_send(struct port *p, struct msg *m) { pthread_mutex_lock(&p->m); while (p->count == N) { pthread_cond_wait(&p->full, &p->m); } memcpy(&p->messages[p->tail], m, sizeof(struct msg)); p->tail = (p->tail + 1) % N; p->count = p->count + 1; pthread_cond_signal(&p->empty); pthread_mutex_unlock(&p->m); } void msg_receive(struct port *p, struct msg *m) { pthread_mutex_lock(&p->m); while (p->count == 0) { pthread_cond_wait(&p->empty, &p->m); } memcpy(m, &p->messages[p->head], sizeof(struct msg)); p->head = (p->head + 1) % N; p->count = p->count - 1; pthread_cond_signal(&p->full); pthread_mutex_unlock(&p->m); } struct port *port_init(void) { struct port *p; p = malloc(sizeof(struct port)); if (p == NULL) { return NULL; } p->head = 0; p->tail = 0; p->count = 0; pthread_mutex_init(&p->m, NULL); pthread_cond_init(&p->empty, NULL); pthread_cond_init(&p->full, NULL); return p; } typedef struct periodic_task { struct timespec r; int period; } periodic_task_t; #define NSEC_PER_SEC 1000000000ULL static inline void timespec_add_us(struct timespec *t, uint64_t d) { d *= 1000; d += t->tv_nsec; while (d >= NSEC_PER_SEC) { d -= NSEC_PER_SEC; t->tv_sec += 1; } t->tv_nsec = d; } static void wait_next_activation(periodic_task_t *pd) { clock_nanosleep(CLOCK_REALTIME, TIMER_ABSTIME, &pd->r, NULL); timespec_add_us(&pd->r, pd->period); } int start_periodic_timer(uint64_t offs, int t, periodic_task_t *pd) { clock_gettime(CLOCK_REALTIME, &pd->r); timespec_add_us(&pd->r, offs); pd->period = t; return 0; } struct port *p; int th_cnt = 1; void *thread1(void *v) { struct msg m; periodic_task_t p_d; int cnt = th_cnt++; cnt *= 1000; start_periodic_timer(2000000, 50 * cnt, &p_d); while (1) { wait_next_activation(&p_d); m.value = cnt++; printf("Sending %d\n", m.value); msg_send(p, &m); } return NULL; } void *thread2(void *v) { struct msg m; while(1) { msg_receive(p, &m); printf("Received %d\n", m.value); } return NULL; } int main(int argc, char *argv[]) { int err; pthread_t id_producer1, id_producer2, id_consumer; pthread_attr_t attrs; struct sched_param sp; int pmin; p = port_init(); pthread_attr_init(&attrs); pthread_attr_setinheritsched(&attrs, PTHREAD_EXPLICIT_SCHED); pthread_attr_setschedpolicy(&attrs, SCHED_FIFO); pmin = sched_get_priority_min(SCHED_FIFO); sp.sched_priority = pmin + 3; err = pthread_attr_setschedparam(&attrs, &sp); err = pthread_create(&id_consumer, &attrs, thread2, NULL); if (err) { perror("PThread Create"); } sp.sched_priority = pmin + 2; err = pthread_attr_setschedparam(&attrs, &sp); err = pthread_create(&id_producer1, &attrs, thread1, NULL); if (err) { perror("PThread Create"); } sp.sched_priority = pmin + 1; err = pthread_attr_setschedparam(&attrs, &sp); err = pthread_create(&id_producer2, NULL, thread1, NULL); if (err) { perror("PThread Create"); } pthread_join(id_consumer, NULL); pthread_join(id_producer1, NULL); pthread_join(id_producer2, NULL); return 0; }