Whamcloud - gitweb
b=17491
authorzhanghc <zhanghc>
Tue, 3 Mar 2009 18:33:37 +0000 (18:33 +0000)
committerzhanghc <zhanghc>
Tue, 3 Mar 2009 18:33:37 +0000 (18:33 +0000)
Speedup the insertion of ptlrpc_request in
function "ptlrpc_at_add_timed" to fixup
"Slow req_in handling" problem in
"ptlrpc_server_handle_req_in"

i=nathan.rutman
i=johann

lustre/include/lustre_import.h
lustre/include/lustre_net.h
lustre/ptlrpc/service.c

index 4f6e83f..f3fd466 100644 (file)
@@ -69,6 +69,14 @@ enum lustre_imp_state {
         LUSTRE_IMP_EVICTED    = 10,
 };
 
         LUSTRE_IMP_EVICTED    = 10,
 };
 
+struct ptlrpc_at_array {
+        struct list_head *paa_reqs_array; /* array to hold requests */
+        __u32             paa_size;       /* the size of array */
+        __u32             paa_count;      /* the total count of reqs */
+        time_t            paa_deadline;   /* the earliest deadline of reqs */
+        __u32            *paa_reqs_count; /* the count of reqs in each entry */
+};
+
 static inline char * ptlrpc_import_state_name(enum lustre_imp_state state)
 {
         static char* import_state_names[] = {
 static inline char * ptlrpc_import_state_name(enum lustre_imp_state state)
 {
         static char* import_state_names[] = {
index f0babd3..179219a 100644 (file)
@@ -344,6 +344,8 @@ struct ptlrpc_request {
         struct list_head rq_exp_list;           /* server-side per-export list */
         struct ptlrpc_hpreq_ops *rq_ops;        /* server-side hp handlers */
         __u64            rq_history_seq;        /* history sequence # */
         struct list_head rq_exp_list;           /* server-side per-export list */
         struct ptlrpc_hpreq_ops *rq_ops;        /* server-side hp handlers */
         __u64            rq_history_seq;        /* history sequence # */
+        /* the index of service's srv_at_array into which request is linked */
+        time_t rq_at_index;
         int rq_status;
         spinlock_t rq_lock;
         /* client-side flags are serialized by rq_lock */
         int rq_status;
         spinlock_t rq_lock;
         /* client-side flags are serialized by rq_lock */
@@ -367,7 +369,8 @@ struct ptlrpc_request {
                 /* server-side flags */
                 rq_packed_final:1,  /* packed final reply */
                 rq_sent_final:1,    /* stop sending early replies */
                 /* server-side flags */
                 rq_packed_final:1,  /* packed final reply */
                 rq_sent_final:1,    /* stop sending early replies */
-                rq_hp:1;            /* high priority RPC */
+                rq_hp:1,            /* high priority RPC */
+                rq_at_linked:1;     /* link into service's srv_at_array */
 
         enum rq_phase rq_phase; /* one of RQ_PHASE_* */
         enum rq_phase rq_next_phase; /* one of RQ_PHASE_* to be used next */
 
         enum rq_phase rq_phase; /* one of RQ_PHASE_* */
         enum rq_phase rq_next_phase; /* one of RQ_PHASE_* to be used next */
@@ -714,7 +717,7 @@ struct ptlrpc_service {
         /* AT stuff */
         struct adaptive_timeout srv_at_estimate;/* estimated rpc service time */
         spinlock_t        srv_at_lock;
         /* AT stuff */
         struct adaptive_timeout srv_at_estimate;/* estimated rpc service time */
         spinlock_t        srv_at_lock;
-        struct list_head  srv_at_list;          /* reqs waiting for replies */
+        struct ptlrpc_at_array  srv_at_array;   /* reqs waiting for replies */
         cfs_timer_t       srv_at_timer;         /* early reply timer */
 
         int               srv_n_queued_reqs;    /* # reqs in either of the queues below */
         cfs_timer_t       srv_at_timer;         /* early reply timer */
 
         int               srv_n_queued_reqs;    /* # reqs in either of the queues below */
index a8d0785..99f79a4 100644 (file)
@@ -466,8 +466,10 @@ ptlrpc_init_svc(int nbufs, int bufsize, int max_req_size, int max_reply_size,
                 char *threadname, __u32 ctx_tags,
                 svc_hpreq_handler_t hp_handler)
 {
                 char *threadname, __u32 ctx_tags,
                 svc_hpreq_handler_t hp_handler)
 {
-        int                    rc;
-        struct ptlrpc_service *service;
+        int                     rc;
+        struct ptlrpc_at_array *array;
+        struct ptlrpc_service  *service;
+        unsigned int            size, index;
         ENTRY;
 
         LASSERT (nbufs > 0);
         ENTRY;
 
         LASSERT (nbufs > 0);
@@ -523,7 +525,25 @@ ptlrpc_init_svc(int nbufs, int bufsize, int max_req_size, int max_reply_size,
 
         spin_lock_init(&service->srv_at_lock);
         CFS_INIT_LIST_HEAD(&service->srv_req_in_queue);
 
         spin_lock_init(&service->srv_at_lock);
         CFS_INIT_LIST_HEAD(&service->srv_req_in_queue);
-        CFS_INIT_LIST_HEAD(&service->srv_at_list);
+
+        array = &service->srv_at_array;
+        size = at_est2timeout(at_max);
+        array->paa_size = size;
+        array->paa_count = 0;
+        array->paa_deadline = -1;
+
+        /* allocate memory for srv_at_array (ptlrpc_at_array) */ 
+        OBD_ALLOC(array->paa_reqs_array, sizeof(struct list_head) * size);
+        if (array->paa_reqs_array == NULL)
+                GOTO(failed, NULL);
+
+        for (index = 0; index < size; index++)
+                CFS_INIT_LIST_HEAD(&array->paa_reqs_array[index]);
+        
+        OBD_ALLOC(array->paa_reqs_count, sizeof(__u32) * size);
+        if (array->paa_reqs_count == NULL)
+                GOTO(failed, NULL);
+
         cfs_timer_init(&service->srv_at_timer, ptlrpc_at_timer, service);
         /* At SOW, service time should be quick; 10s seems generous. If client
            timeout is less than this, we'll be sending an early reply. */
         cfs_timer_init(&service->srv_at_timer, ptlrpc_at_timer, service);
         /* At SOW, service time should be quick; 10s seems generous. If client
            timeout is less than this, we'll be sending an early reply. */
@@ -683,6 +703,14 @@ static void ptlrpc_server_finish_request(struct ptlrpc_request *req)
         spin_lock(&svc->srv_at_lock);
         req->rq_sent_final = 1;
         list_del_init(&req->rq_timed_list);
         spin_lock(&svc->srv_at_lock);
         req->rq_sent_final = 1;
         list_del_init(&req->rq_timed_list);
+        if (req->rq_at_linked) {
+                struct ptlrpc_at_array *array = &svc->srv_at_array;
+                __u32 index = req->rq_at_index;
+        
+                req->rq_at_linked = 0;        
+                array->paa_reqs_count[index]--;
+                array->paa_count--;
+        }
         spin_unlock(&svc->srv_at_lock);
 
         ptlrpc_server_drop_request(req);
         spin_unlock(&svc->srv_at_lock);
 
         ptlrpc_server_drop_request(req);
@@ -796,20 +824,18 @@ static int ptlrpc_check_req(struct ptlrpc_request *req)
 
 static void ptlrpc_at_set_timer(struct ptlrpc_service *svc)
 {
 
 static void ptlrpc_at_set_timer(struct ptlrpc_service *svc)
 {
-        struct ptlrpc_request *rq;
+        struct ptlrpc_at_array *array = &svc->srv_at_array;
         __s32 next;
 
         spin_lock(&svc->srv_at_lock);
         __s32 next;
 
         spin_lock(&svc->srv_at_lock);
-        if (list_empty(&svc->srv_at_list)) {
+        if (array->paa_count == 0) {
                 cfs_timer_disarm(&svc->srv_at_timer);
                 spin_unlock(&svc->srv_at_lock);
                 return;
         }
 
         /* Set timer for closest deadline */
                 cfs_timer_disarm(&svc->srv_at_timer);
                 spin_unlock(&svc->srv_at_lock);
                 return;
         }
 
         /* Set timer for closest deadline */
-        rq = list_entry(svc->srv_at_list.next, struct ptlrpc_request,
-                        rq_timed_list);
-        next = (__s32)(rq->rq_deadline - cfs_time_current_sec() -
+        next = (__s32)(array->paa_deadline - cfs_time_current_sec() -
                        at_early_margin);
         if (next <= 0)
                 ptlrpc_at_timer((unsigned long)svc);
                        at_early_margin);
         if (next <= 0)
                 ptlrpc_at_timer((unsigned long)svc);
@@ -823,7 +849,9 @@ static void ptlrpc_at_set_timer(struct ptlrpc_service *svc)
 static int ptlrpc_at_add_timed(struct ptlrpc_request *req)
 {
         struct ptlrpc_service *svc = req->rq_rqbd->rqbd_service;
 static int ptlrpc_at_add_timed(struct ptlrpc_request *req)
 {
         struct ptlrpc_service *svc = req->rq_rqbd->rqbd_service;
-        struct ptlrpc_request *rq;
+        struct ptlrpc_request *rq = NULL;
+        struct ptlrpc_at_array *array = &svc->srv_at_array;
+        __u32 index, wtimes;
         int found = 0;
 
         if (AT_OFF)
         int found = 0;
 
         if (AT_OFF)
@@ -843,22 +871,40 @@ static int ptlrpc_at_add_timed(struct ptlrpc_request *req)
         }
 
         LASSERT(list_empty(&req->rq_timed_list));
         }
 
         LASSERT(list_empty(&req->rq_timed_list));
-        /* Add to sorted list.  Presumably latest rpcs will have the latest
-           deadlines, so search backward. */
-        list_for_each_entry_reverse(rq, &svc->srv_at_list, rq_timed_list) {
-                if (req->rq_deadline >= rq->rq_deadline) {
-                        list_add(&req->rq_timed_list, &rq->rq_timed_list);
-                        found++;
-                        break;
+
+        wtimes = req->rq_deadline / array->paa_size;
+        index = req->rq_deadline % array->paa_size;
+        if (array->paa_reqs_count[index] > 0)
+                rq = list_entry(array->paa_reqs_array[index].next, 
+                                struct ptlrpc_request, rq_timed_list);
+
+        if (rq != NULL && (rq->rq_deadline / array->paa_size) < wtimes) {
+                /* latest rpcs will have the latest deadlines in the list,
+                 * so search backward. */
+                list_for_each_entry_reverse(rq, &array->paa_reqs_array[index], 
+                                            rq_timed_list) {
+                        if (req->rq_deadline >= rq->rq_deadline) {
+                                list_add(&req->rq_timed_list, 
+                                         &rq->rq_timed_list);
+                                break;
+                        }
                 }
                 }
-        }
-        if (!found)
-                /* Add to front if shortest deadline or list empty */
-                list_add(&req->rq_timed_list, &svc->srv_at_list);
 
 
-        /* Check if we're the head of the list */
-        found = (svc->srv_at_list.next == &req->rq_timed_list);
+                /* AT array is corrupted? */
+                LASSERT(!list_empty(&req->rq_timed_list));
+        } else {
+                /* Add the request at the head of the list */
+                list_add(&req->rq_timed_list, &array->paa_reqs_array[index]);
+        }
 
 
+        req->rq_at_linked = 1;
+        req->rq_at_index = index;
+        array->paa_reqs_count[index]++;
+        array->paa_count++;
+        if (array->paa_count == 1 || array->paa_deadline > req->rq_deadline) {
+                array->paa_deadline = req->rq_deadline;
+                found = 1;
+        }
         spin_unlock(&svc->srv_at_lock);
 
         if (found)
         spin_unlock(&svc->srv_at_lock);
 
         if (found)
@@ -1002,6 +1048,9 @@ static int ptlrpc_at_check_timed(struct ptlrpc_service *svc)
 {
         struct ptlrpc_request *rq, *n;
         struct list_head work_list;
 {
         struct ptlrpc_request *rq, *n;
         struct list_head work_list;
+        struct ptlrpc_at_array *array = &svc->srv_at_array;
+        __u32  index, count;
+        time_t deadline;
         time_t now = cfs_time_current_sec();
         cfs_duration_t delay;
         int first, counter = 0;
         time_t now = cfs_time_current_sec();
         cfs_duration_t delay;
         int first, counter = 0;
@@ -1015,15 +1064,13 @@ static int ptlrpc_at_check_timed(struct ptlrpc_service *svc)
         delay = cfs_time_sub(cfs_time_current(), svc->srv_at_checktime);
         svc->srv_at_check = 0;
 
         delay = cfs_time_sub(cfs_time_current(), svc->srv_at_checktime);
         svc->srv_at_check = 0;
 
-        if (list_empty(&svc->srv_at_list)) {
+        if (array->paa_count == 0) {
                 spin_unlock(&svc->srv_at_lock);
                 RETURN(0);
         }
 
         /* The timer went off, but maybe the nearest rpc already completed. */
                 spin_unlock(&svc->srv_at_lock);
                 RETURN(0);
         }
 
         /* The timer went off, but maybe the nearest rpc already completed. */
-        rq = list_entry(svc->srv_at_list.next, struct ptlrpc_request,
-                        rq_timed_list);
-        first = (int)(rq->rq_deadline - now);
+        first = array->paa_deadline - now;
         if (first > at_early_margin) {
                 /* We've still got plenty of time.  Reset the timer. */
                 spin_unlock(&svc->srv_at_lock);
         if (first > at_early_margin) {
                 /* We've still got plenty of time.  Reset the timer. */
                 spin_unlock(&svc->srv_at_lock);
@@ -1034,15 +1081,33 @@ static int ptlrpc_at_check_timed(struct ptlrpc_service *svc)
         /* We're close to a timeout, and we don't know how much longer the
            server will take. Send early replies to everyone expiring soon. */
         CFS_INIT_LIST_HEAD(&work_list);
         /* We're close to a timeout, and we don't know how much longer the
            server will take. Send early replies to everyone expiring soon. */
         CFS_INIT_LIST_HEAD(&work_list);
-        list_for_each_entry_safe(rq, n, &svc->srv_at_list, rq_timed_list) {
-                if (rq->rq_deadline <= now + at_early_margin) {
-                        list_move_tail(&rq->rq_timed_list, &work_list);
-                        counter++;
-                } else {
+        deadline = -1;
+        index = array->paa_deadline % array->paa_size;
+        count = array->paa_count;
+        while (count > 0) {
+                count -= array->paa_reqs_count[index];
+                list_for_each_entry_safe(rq, n, &array->paa_reqs_array[index], 
+                                         rq_timed_list) {
+                        if (rq->rq_deadline <= now + at_early_margin) {
+                                list_move(&rq->rq_timed_list, &work_list);
+                                counter++;
+                                array->paa_reqs_count[index]--;
+                                array->paa_count--;
+                                rq->rq_at_linked = 0;
+                                continue;
+                        }
+                        
+                        /* update the earliest deadline */
+                        if (deadline == -1 || rq->rq_deadline < deadline)
+                                deadline = rq->rq_deadline;
+
                         break;
                 }
                         break;
                 }
-        }
 
 
+                if (++index >= array->paa_size)
+                        index = 0;
+        }
+        array->paa_deadline = deadline;
         spin_unlock(&svc->srv_at_lock);
 
         /* we have a new earliest deadline, restart the timer */
         spin_unlock(&svc->srv_at_lock);
 
         /* we have a new earliest deadline, restart the timer */
@@ -2297,6 +2362,7 @@ int ptlrpc_unregister_service(struct ptlrpc_service *service)
         struct l_wait_info    lwi;
         struct list_head     *tmp;
         struct ptlrpc_reply_state *rs, *t;
         struct l_wait_info    lwi;
         struct list_head     *tmp;
         struct ptlrpc_reply_state *rs, *t;
+        struct ptlrpc_at_array *array = &service->srv_at_array;
         ENTRY;
 
         service->srv_is_stopping = 1;
         ENTRY;
 
         service->srv_is_stopping = 1;
@@ -2415,6 +2481,18 @@ int ptlrpc_unregister_service(struct ptlrpc_service *service)
         /* In case somebody rearmed this in the meantime */
         cfs_timer_disarm(&service->srv_at_timer);
 
         /* In case somebody rearmed this in the meantime */
         cfs_timer_disarm(&service->srv_at_timer);
 
+        if (array->paa_reqs_array != NULL) {
+                OBD_FREE(array->paa_reqs_array, 
+                         sizeof(struct list_head) * array->paa_size);
+                array->paa_reqs_array = NULL;
+        }
+        
+        if (array->paa_reqs_count != NULL) {
+                OBD_FREE(array->paa_reqs_count, 
+                         sizeof(__u32) * array->paa_size);
+                array->paa_reqs_count= NULL;
+        }
+       
         OBD_FREE_PTR(service);
         RETURN(0);
 }
         OBD_FREE_PTR(service);
         RETURN(0);
 }