Whamcloud - gitweb
LU-14876 out: don't connect to busy MDS-MDS export
[fs/lustre-release.git] / lustre / ptlrpc / service.c
index 1bd8fae..6a68a19 100644 (file)
@@ -27,7 +27,6 @@
  */
 /*
  * This file is part of Lustre, http://www.lustre.org/
- * Lustre is a trademark of Sun Microsystems, Inc.
  */
 
 #define DEBUG_SUBSYSTEM S_RPC
@@ -63,6 +62,8 @@ MODULE_PARM_DESC(at_extra, "How much extra time to give with each early reply");
 static int ptlrpc_server_post_idle_rqbds(struct ptlrpc_service_part *svcpt);
 static void ptlrpc_server_hpreq_fini(struct ptlrpc_request *req);
 static void ptlrpc_at_remove_timed(struct ptlrpc_request *req);
+static int ptlrpc_start_threads(struct ptlrpc_service *svc);
+static int ptlrpc_start_thread(struct ptlrpc_service_part *svcpt, int wait);
 
 /** Holds a list of all PTLRPC services */
 LIST_HEAD(ptlrpc_all_services);
@@ -447,9 +448,9 @@ static int ptlrpc_server_post_idle_rqbds(struct ptlrpc_service_part *svcpt)
                        return posted;
                }
 
-               rqbd = list_entry(svcpt->scp_rqbd_idle.next,
-                                     struct ptlrpc_request_buffer_desc,
-                                     rqbd_list);
+               rqbd = list_first_entry(&svcpt->scp_rqbd_idle,
+                                       struct ptlrpc_request_buffer_desc,
+                                       rqbd_list);
 
                /* assume we will post successfully */
                svcpt->scp_nrqbds_posted++;
@@ -684,13 +685,12 @@ static int ptlrpc_service_part_init(struct ptlrpc_service *svc,
 
  failed:
        if (array->paa_reqs_count != NULL) {
-               OBD_FREE(array->paa_reqs_count, sizeof(__u32) * size);
+               OBD_FREE_PTR_ARRAY(array->paa_reqs_count, size);
                array->paa_reqs_count = NULL;
        }
 
        if (array->paa_reqs_array != NULL) {
-               OBD_FREE(array->paa_reqs_array,
-                        sizeof(struct list_head) * array->paa_size);
+               OBD_FREE_PTR_ARRAY(array->paa_reqs_array, array->paa_size);
                array->paa_reqs_array = NULL;
        }
 
@@ -744,7 +744,7 @@ struct ptlrpc_service *ptlrpc_register_service(struct ptlrpc_service_conf *conf,
                                                 strlen(cconf->cc_pattern),
                                                 0, ncpts - 1, &el);
                        if (rc != 0) {
-                               CERROR("%s: invalid CPT pattern string: %s",
+                               CERROR("%s: invalid CPT pattern string: %s\n",
                                       conf->psc_name, cconf->cc_pattern);
                                RETURN(ERR_PTR(-EINVAL));
                        }
@@ -755,7 +755,7 @@ struct ptlrpc_service *ptlrpc_register_service(struct ptlrpc_service_conf *conf,
                                CERROR("%s: failed to parse CPT array %s: %d\n",
                                       conf->psc_name, cconf->cc_pattern, rc);
                                if (cpts != NULL)
-                                       OBD_FREE(cpts, sizeof(*cpts) * ncpts);
+                                       OBD_FREE_PTR_ARRAY(cpts, ncpts);
                                RETURN(ERR_PTR(rc < 0 ? rc : -EINVAL));
                        }
                        ncpts = rc;
@@ -765,7 +765,7 @@ struct ptlrpc_service *ptlrpc_register_service(struct ptlrpc_service_conf *conf,
        OBD_ALLOC(service, offsetof(struct ptlrpc_service, srv_parts[ncpts]));
        if (service == NULL) {
                if (cpts != NULL)
-                       OBD_FREE(cpts, sizeof(*cpts) * ncpts);
+                       OBD_FREE_PTR_ARRAY(cpts, ncpts);
                RETURN(ERR_PTR(-ENOMEM));
        }
 
@@ -906,8 +906,6 @@ void ptlrpc_server_drop_request(struct ptlrpc_request *req)
        struct ptlrpc_service_part        *svcpt = rqbd->rqbd_svcpt;
        struct ptlrpc_service             *svc = svcpt->scp_service;
        int                                refcount;
-       struct list_head                          *tmp;
-       struct list_head                          *nxt;
 
        if (!atomic_dec_and_test(&req->rq_refcount))
                return;
@@ -951,9 +949,9 @@ void ptlrpc_server_drop_request(struct ptlrpc_request *req)
                 * I expect only about 1 or 2 rqbds need to be recycled here
                 */
                while (svcpt->scp_hist_nrqbds > svc->srv_hist_nrqbds_cpt_max) {
-                       rqbd = list_entry(svcpt->scp_hist_rqbds.next,
-                                         struct ptlrpc_request_buffer_desc,
-                                         rqbd_list);
+                       rqbd = list_first_entry(&svcpt->scp_hist_rqbds,
+                                               struct ptlrpc_request_buffer_desc,
+                                               rqbd_list);
 
                        list_del(&rqbd->rqbd_list);
                        svcpt->scp_hist_nrqbds--;
@@ -962,9 +960,7 @@ void ptlrpc_server_drop_request(struct ptlrpc_request *req)
                         * remove rqbd's reqs from svc's req history while
                         * I've got the service lock
                         */
-                       list_for_each(tmp, &rqbd->rqbd_reqs) {
-                               req = list_entry(tmp, struct ptlrpc_request,
-                                                rq_list);
+                       list_for_each_entry(req, &rqbd->rqbd_reqs, rq_list) {
                                /* Track the highest culled req seq */
                                if (req->rq_history_seq >
                                    svcpt->scp_hist_seq_culled) {
@@ -976,10 +972,9 @@ void ptlrpc_server_drop_request(struct ptlrpc_request *req)
 
                        spin_unlock(&svcpt->scp_lock);
 
-                       list_for_each_safe(tmp, nxt, &rqbd->rqbd_reqs) {
-                               req = list_entry(rqbd->rqbd_reqs.next,
-                                                struct ptlrpc_request,
-                                                rq_list);
+                       while ((req = list_first_entry_or_null(
+                                       &rqbd->rqbd_reqs,
+                                       struct ptlrpc_request, rq_list))) {
                                list_del(&req->rq_list);
                                ptlrpc_server_free_request(req);
                        }
@@ -1357,7 +1352,8 @@ static int ptlrpc_at_send_early_reply(struct ptlrpc_request *req)
 
        ENTRY;
 
-       if (CFS_FAIL_CHECK(OBD_FAIL_TGT_REPLAY_RECONNECT)) {
+       if (CFS_FAIL_CHECK(OBD_FAIL_TGT_REPLAY_RECONNECT) ||
+           CFS_FAIL_PRECHECK(OBD_FAIL_PTLRPC_ENQ_RESEND)) {
                /* don't send early reply */
                RETURN(1);
        }
@@ -1620,9 +1616,9 @@ static int ptlrpc_at_check_timed(struct ptlrpc_service_part *svcpt)
         * we took additional refcount so entries can't be deleted from list, no
         * locking is needed
         */
-       while (!list_empty(&work_list)) {
-               rq = list_entry(work_list.next, struct ptlrpc_request,
-                               rq_timed_list);
+       while ((rq = list_first_entry_or_null(&work_list,
+                                             struct ptlrpc_request,
+                                             rq_timed_list)) != NULL) {
                list_del_init(&rq->rq_timed_list);
 
                if (ptlrpc_at_send_early_reply(rq) == 0)
@@ -1648,13 +1644,6 @@ ptlrpc_server_check_resend_in_progress(struct ptlrpc_request *req)
                return NULL;
 
        /*
-        * bulk request are aborted upon reconnect, don't try to
-        * find a match
-        */
-       if (req->rq_bulk_write || req->rq_bulk_read)
-               return NULL;
-
-       /*
         * This list should not be longer than max_requests in
         * flights on the client, so it is not all that long.
         * Also we only hit this codepath in case of a resent
@@ -1863,6 +1852,7 @@ static int ptlrpc_server_request_add(struct ptlrpc_service_part *svcpt,
                                ptlrpc_at_remove_timed(orig);
                        spin_unlock(&orig->rq_rqbd->rqbd_svcpt->scp_at_lock);
                        orig->rq_deadline = req->rq_deadline;
+                       orig->rq_rep_mbits = req->rq_rep_mbits;
                        if (likely(linked))
                                ptlrpc_at_add_timed(orig);
                        ptlrpc_server_drop_request(orig);
@@ -2056,6 +2046,7 @@ static int ptlrpc_server_handle_req_in(struct ptlrpc_service_part *svcpt,
        struct ptlrpc_service *svc = svcpt->scp_service;
        struct ptlrpc_request *req;
        __u32 deadline;
+       __u32 opc;
        int rc;
 
        ENTRY;
@@ -2066,8 +2057,8 @@ static int ptlrpc_server_handle_req_in(struct ptlrpc_service_part *svcpt,
                RETURN(0);
        }
 
-       req = list_entry(svcpt->scp_req_incoming.next,
-                            struct ptlrpc_request, rq_list);
+       req = list_first_entry(&svcpt->scp_req_incoming,
+                              struct ptlrpc_request, rq_list);
        list_del_init(&req->rq_list);
        svcpt->scp_nreqs_incoming--;
        /*
@@ -2112,8 +2103,9 @@ static int ptlrpc_server_handle_req_in(struct ptlrpc_service_part *svcpt,
                goto err_req;
        }
 
+       opc = lustre_msg_get_opc(req->rq_reqmsg);
        if (OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_DROP_REQ_OPC) &&
-           lustre_msg_get_opc(req->rq_reqmsg) == cfs_fail_val) {
+           opc == cfs_fail_val) {
                CERROR("drop incoming rpc opc %u, x%llu\n",
                       cfs_fail_val, req->rq_xid);
                goto err_req;
@@ -2127,7 +2119,7 @@ static int ptlrpc_server_handle_req_in(struct ptlrpc_service_part *svcpt,
                goto err_req;
        }
 
-       switch (lustre_msg_get_opc(req->rq_reqmsg)) {
+       switch (opc) {
        case MDS_WRITEPAGE:
        case OST_WRITE:
        case OUT_UPDATE:
@@ -2198,8 +2190,20 @@ static int ptlrpc_server_handle_req_in(struct ptlrpc_service_part *svcpt,
                thread->t_env->le_ses = &req->rq_session;
        }
 
+
+       if (unlikely(OBD_FAIL_PRECHECK(OBD_FAIL_PTLRPC_ENQ_RESEND) &&
+                    (opc == LDLM_ENQUEUE) &&
+                    (lustre_msg_get_flags(req->rq_reqmsg) & MSG_RESENT)))
+               OBD_FAIL_TIMEOUT(OBD_FAIL_PTLRPC_ENQ_RESEND, 6);
+
        ptlrpc_at_add_timed(req);
 
+       if (opc != OST_CONNECT && opc != MDS_CONNECT &&
+           opc != MGS_CONNECT && req->rq_export != NULL) {
+               if (exp_connect_flags2(req->rq_export) & OBD_CONNECT2_REP_MBITS)
+                       req->rq_rep_mbits = lustre_msg_get_mbits(req->rq_reqmsg);
+       }
+
        /* Move it over to the request processing queue */
        rc = ptlrpc_server_request_add(svcpt, req);
        if (rc)
@@ -2661,18 +2665,18 @@ static void ptlrpc_watchdog_fire(struct work_struct *w)
        }
 }
 
-static void ptlrpc_watchdog_init(struct delayed_work *work, timeout_t timeout)
+void ptlrpc_watchdog_init(struct delayed_work *work, timeout_t timeout)
 {
        INIT_DELAYED_WORK(work, ptlrpc_watchdog_fire);
        schedule_delayed_work(work, cfs_time_seconds(timeout));
 }
 
-static void ptlrpc_watchdog_disable(struct delayed_work *work)
+void ptlrpc_watchdog_disable(struct delayed_work *work)
 {
        cancel_delayed_work_sync(work);
 }
 
-static void ptlrpc_watchdog_touch(struct delayed_work *work, timeout_t timeout)
+void ptlrpc_watchdog_touch(struct delayed_work *work, timeout_t timeout)
 {
        struct ptlrpc_thread *thread = container_of(&work->work,
                                                    struct ptlrpc_thread,
@@ -2747,7 +2751,6 @@ static int ptlrpc_main(void *arg)
 
        thread->t_task = current;
        thread->t_pid = current->pid;
-       unshare_fs_struct();
 
        if (svc->srv_cpt_bind) {
                rc = cfs_cpt_bind(svc->srv_cptable, svcpt->scp_cpt);
@@ -2955,8 +2958,6 @@ static int ptlrpc_hr_main(void *arg)
        if (env == NULL)
                RETURN(-ENOMEM);
 
-       unshare_fs_struct();
-
        rc = cfs_cpt_bind(ptlrpc_hr.hr_cpt_table, hrp->hrp_cpt);
        if (rc != 0) {
                char threadname[20];
@@ -3020,7 +3021,7 @@ static void ptlrpc_stop_hr_threads(void)
                if (hrp->hrp_thrs == NULL)
                        continue; /* uninitialized */
                for (j = 0; j < hrp->hrp_nthrs; j++)
-                       wake_up_all(&hrp->hrp_thrs[j].hrt_waitq);
+                       wake_up(&hrp->hrp_thrs[j].hrt_waitq);
        }
 
        cfs_percpt_for_each(hrp, i, ptlrpc_hr.hr_partitions) {
@@ -3089,9 +3090,9 @@ static void ptlrpc_svcpt_stop_threads(struct ptlrpc_service_part *svcpt)
 
        wake_up_all(&svcpt->scp_waitq);
 
-       while (!list_empty(&svcpt->scp_threads)) {
-               thread = list_entry(svcpt->scp_threads.next,
-                                       struct ptlrpc_thread, t_link);
+       while ((thread = list_first_entry_or_null(&svcpt->scp_threads,
+                                                 struct ptlrpc_thread,
+                                                 t_link)) != NULL) {
                if (thread_is_stopped(thread)) {
                        list_move(&thread->t_link, &zombie);
                        continue;
@@ -3108,9 +3109,9 @@ static void ptlrpc_svcpt_stop_threads(struct ptlrpc_service_part *svcpt)
 
        spin_unlock(&svcpt->scp_lock);
 
-       while (!list_empty(&zombie)) {
-               thread = list_entry(zombie.next,
-                                       struct ptlrpc_thread, t_link);
+       while ((thread = list_first_entry_or_null(&zombie,
+                                                 struct ptlrpc_thread,
+                                                 t_link)) != NULL) {
                list_del(&thread->t_link);
                OBD_FREE_PTR(thread);
        }
@@ -3120,7 +3121,7 @@ static void ptlrpc_svcpt_stop_threads(struct ptlrpc_service_part *svcpt)
 /**
  * Stops all threads of a particular service \a svc
  */
-void ptlrpc_stop_all_threads(struct ptlrpc_service *svc)
+static void ptlrpc_stop_all_threads(struct ptlrpc_service *svc)
 {
        struct ptlrpc_service_part *svcpt;
        int i;
@@ -3135,7 +3136,7 @@ void ptlrpc_stop_all_threads(struct ptlrpc_service *svc)
        EXIT;
 }
 
-int ptlrpc_start_threads(struct ptlrpc_service *svc)
+static int ptlrpc_start_threads(struct ptlrpc_service *svc)
 {
        int rc = 0;
        int i;
@@ -3167,7 +3168,7 @@ int ptlrpc_start_threads(struct ptlrpc_service *svc)
        RETURN(rc);
 }
 
-int ptlrpc_start_thread(struct ptlrpc_service_part *svcpt, int wait)
+static int ptlrpc_start_thread(struct ptlrpc_service_part *svcpt, int wait)
 {
        struct ptlrpc_thread *thread;
        struct ptlrpc_service *svc;
@@ -3348,10 +3349,8 @@ void ptlrpc_hr_fini(void)
        ptlrpc_stop_hr_threads();
 
        cfs_percpt_for_each(hrp, cpt, ptlrpc_hr.hr_partitions) {
-               if (hrp->hrp_thrs != NULL) {
-                       OBD_FREE(hrp->hrp_thrs,
-                                hrp->hrp_nthrs * sizeof(hrp->hrp_thrs[0]));
-               }
+               if (hrp->hrp_thrs)
+                       OBD_FREE_PTR_ARRAY(hrp->hrp_thrs, hrp->hrp_nthrs);
        }
 
        cfs_percpt_free(ptlrpc_hr.hr_partitions);
@@ -3430,7 +3429,7 @@ ptlrpc_service_unlink_rqbd(struct ptlrpc_service *svc)
                 */
                spin_lock(&svcpt->scp_lock);
                while (svcpt->scp_nrqbds_posted != 0) {
-                       int seconds = LONG_UNLINK;
+                       int seconds = PTLRPC_REQ_LONG_UNLINK;
 
                        spin_unlock(&svcpt->scp_lock);
                        /*
@@ -3468,9 +3467,9 @@ ptlrpc_service_purge_all(struct ptlrpc_service *svc)
                        break;
 
                spin_lock(&svcpt->scp_rep_lock);
-               while (!list_empty(&svcpt->scp_rep_active)) {
-                       rs = list_entry(svcpt->scp_rep_active.next,
-                                           struct ptlrpc_reply_state, rs_list);
+               while ((rs = list_first_entry_or_null(&svcpt->scp_rep_active,
+                                                     struct ptlrpc_reply_state,
+                                                     rs_list)) != NULL) {
                        spin_lock(&rs->rs_lock);
                        ptlrpc_schedule_difficult_reply(rs);
                        spin_unlock(&rs->rs_lock);
@@ -3482,10 +3481,9 @@ ptlrpc_service_purge_all(struct ptlrpc_service *svc)
                 * all unlinked) and no service threads, so I'm the only
                 * thread noodling the request queue now
                 */
-               while (!list_empty(&svcpt->scp_req_incoming)) {
-                       req = list_entry(svcpt->scp_req_incoming.next,
-                                            struct ptlrpc_request, rq_list);
-
+               while ((req = list_first_entry_or_null(&svcpt->scp_req_incoming,
+                                                      struct ptlrpc_request,
+                                                      rq_list)) != NULL) {
                        list_del(&req->rq_list);
                        svcpt->scp_nreqs_incoming--;
                        ptlrpc_server_finish_request(svcpt, req);
@@ -3496,7 +3494,23 @@ ptlrpc_service_purge_all(struct ptlrpc_service *svc)
                        ptlrpc_server_finish_active_request(svcpt, req);
                }
 
-               LASSERT(list_empty(&svcpt->scp_rqbd_posted));
+               /*
+                * The portal may be shared by several services (eg:OUT_PORTAL).
+                * So the request could be referenced by other target. So we
+                * have to wait the ptlrpc_server_drop_request invoked.
+                *
+                * TODO: move the req_buffer as global rather than per service.
+                */
+               spin_lock(&svcpt->scp_lock);
+               while (!list_empty(&svcpt->scp_rqbd_posted)) {
+                       spin_unlock(&svcpt->scp_lock);
+                       wait_event_idle_timeout(svcpt->scp_waitq,
+                               list_empty(&svcpt->scp_rqbd_posted),
+                               cfs_time_seconds(1));
+                       spin_lock(&svcpt->scp_lock);
+               }
+               spin_unlock(&svcpt->scp_lock);
+
                LASSERT(svcpt->scp_nreqs_incoming == 0);
                LASSERT(svcpt->scp_nreqs_active == 0);
                /*
@@ -3509,19 +3523,16 @@ ptlrpc_service_purge_all(struct ptlrpc_service *svc)
                 * Now free all the request buffers since nothing
                 * references them any more...
                 */
-
-               while (!list_empty(&svcpt->scp_rqbd_idle)) {
-                       rqbd = list_entry(svcpt->scp_rqbd_idle.next,
-                                             struct ptlrpc_request_buffer_desc,
-                                             rqbd_list);
+               while ((rqbd = list_first_entry_or_null(&svcpt->scp_rqbd_idle,
+                                                       struct ptlrpc_request_buffer_desc,
+                                                       rqbd_list)) != NULL)
                        ptlrpc_free_rqbd(rqbd);
-               }
+
                ptlrpc_wait_replies(svcpt);
 
-               while (!list_empty(&svcpt->scp_rep_idle)) {
-                       rs = list_entry(svcpt->scp_rep_idle.next,
-                                           struct ptlrpc_reply_state,
-                                           rs_list);
+               while ((rs = list_first_entry_or_null(&svcpt->scp_rep_idle,
+                                                     struct ptlrpc_reply_state,
+                                                     rs_list)) != NULL) {
                        list_del(&rs->rs_list);
                        OBD_FREE_LARGE(rs, svc->srv_max_reply_size);
                }
@@ -3544,14 +3555,14 @@ ptlrpc_service_free(struct ptlrpc_service *svc)
                array = &svcpt->scp_at_array;
 
                if (array->paa_reqs_array != NULL) {
-                       OBD_FREE(array->paa_reqs_array,
-                                sizeof(struct list_head) * array->paa_size);
+                       OBD_FREE_PTR_ARRAY(array->paa_reqs_array,
+                                          array->paa_size);
                        array->paa_reqs_array = NULL;
                }
 
                if (array->paa_reqs_count != NULL) {
-                       OBD_FREE(array->paa_reqs_count,
-                                sizeof(__u32) * array->paa_size);
+                       OBD_FREE_PTR_ARRAY(array->paa_reqs_count,
+                                          array->paa_size);
                        array->paa_reqs_count = NULL;
                }
        }