Whamcloud - gitweb
LU-5951 ptlrpc: track unreplied requests 73/15473/20
authorNiu Yawei <yawei.niu@intel.com>
Thu, 2 Jul 2015 15:46:22 +0000 (11:46 -0400)
committerOleg Drokin <oleg.drokin@intel.com>
Fri, 2 Oct 2015 04:13:50 +0000 (04:13 +0000)
The request xid was used to make sure the ost object timestamps
being updated by the out of order setattr/punch/write requests
properly. However, this mechanism is broken by the multiple rcvd
slot feature, where we deferred the xid assignment from request
packing to request sending.

This patch moved back the xid assignment to request packing, and
the manner of finding lowest unreplied xid is changed from scan
sending & delay list to scan a unreplied requests list.

This patch also skipped packing the known replied XID in connect
and disconnect request, so that we can make sure the known replied
XID is increased only on both server & client side.

Signed-off-by: Niu Yawei <yawei.niu@intel.com>
Change-Id: Ib079b2029680934a4c448da766bf0e42d580be26
Reviewed-on: http://review.whamcloud.com/15473
Tested-by: Jenkins
Tested-by: Maloo <hpdd-maloo@intel.com>
Reviewed-by: Grégoire Pichon <gregoire.pichon@bull.net>
Reviewed-by: Alex Zhuravlev <alexey.zhuravlev@intel.com>
Reviewed-by: Oleg Drokin <oleg.drokin@intel.com>
lustre/include/lustre_import.h
lustre/include/lustre_net.h
lustre/obdclass/genops.c
lustre/ptlrpc/client.c
lustre/ptlrpc/import.c
lustre/ptlrpc/niobuf.c
lustre/ptlrpc/ptlrpc_internal.h
lustre/ptlrpc/recover.c
lustre/target/tgt_handler.c

index f5fc224..e064c05 100644 (file)
@@ -196,6 +196,11 @@ struct obd_import {
        struct list_head        *imp_replay_cursor;
        /** @} */
 
+       /** List of not replied requests */
+       struct list_head        imp_unreplied_list;
+       /** Known maximal replied XID */
+       __u64                   imp_known_replied_xid;
+
        /** obd device for this import */
        struct obd_device       *imp_obd;
 
index a7f37c2..26d6f01 100644 (file)
@@ -827,6 +827,8 @@ struct ptlrpc_cli_req {
        union ptlrpc_async_args          cr_async_args;
        /** Opaq data for replay and commit callbacks. */
        void                            *cr_cb_data;
+       /** Link to the imp->imp_unreplied_list */
+       struct list_head                 cr_unreplied_list;
        /**
         * Commit callback, called when request is committed and about to be
         * freed.
@@ -865,6 +867,7 @@ struct ptlrpc_cli_req {
 #define rq_resend_cb           rq_cli.cr_resend_cb
 #define rq_async_args          rq_cli.cr_async_args
 #define rq_cb_data             rq_cli.cr_cb_data
+#define rq_unreplied_list      rq_cli.cr_unreplied_list
 #define rq_commit_cb           rq_cli.cr_commit_cb
 #define rq_replay_cb           rq_cli.cr_replay_cb
 
index 4fb3ecc..b0af24a 100644 (file)
@@ -1057,6 +1057,8 @@ struct obd_import *class_new_import(struct obd_device *obd)
        INIT_LIST_HEAD(&imp->imp_sending_list);
        INIT_LIST_HEAD(&imp->imp_delayed_list);
        INIT_LIST_HEAD(&imp->imp_committed_list);
+       INIT_LIST_HEAD(&imp->imp_unreplied_list);
+       imp->imp_known_replied_xid = 0;
        imp->imp_replay_cursor = &imp->imp_committed_list;
        spin_lock_init(&imp->imp_lock);
        imp->imp_last_success_conn = 0;
index a791abf..842d158 100644 (file)
@@ -651,6 +651,42 @@ static void __ptlrpc_free_req_to_pool(struct ptlrpc_request *request)
        spin_unlock(&pool->prp_lock);
 }
 
+void ptlrpc_add_unreplied(struct ptlrpc_request *req)
+{
+       struct obd_import       *imp = req->rq_import;
+       struct list_head        *tmp;
+       struct ptlrpc_request   *iter;
+
+       assert_spin_locked(&imp->imp_lock);
+       LASSERT(list_empty(&req->rq_unreplied_list));
+
+       /* unreplied list is sorted by xid in ascending order */
+       list_for_each_prev(tmp, &imp->imp_unreplied_list) {
+               iter = list_entry(tmp, struct ptlrpc_request,
+                                 rq_unreplied_list);
+
+               LASSERT(req->rq_xid != iter->rq_xid);
+               if (req->rq_xid < iter->rq_xid)
+                       continue;
+               list_add(&req->rq_unreplied_list, &iter->rq_unreplied_list);
+               return;
+       }
+       list_add(&req->rq_unreplied_list, &imp->imp_unreplied_list);
+}
+
+void ptlrpc_assign_next_xid_nolock(struct ptlrpc_request *req)
+{
+       req->rq_xid = ptlrpc_next_xid();
+       ptlrpc_add_unreplied(req);
+}
+
+static inline void ptlrpc_assign_next_xid(struct ptlrpc_request *req)
+{
+       spin_lock(&req->rq_import->imp_lock);
+       ptlrpc_assign_next_xid_nolock(req);
+       spin_unlock(&req->rq_import->imp_lock);
+}
+
 static int __ptlrpc_request_bufs_pack(struct ptlrpc_request *request,
                                       __u32 version, int opcode,
                                       int count, __u32 *lengths, char **bufs,
@@ -697,6 +733,7 @@ static int __ptlrpc_request_bufs_pack(struct ptlrpc_request *request,
         ptlrpc_at_set_req_timeout(request);
 
        lustre_msg_set_opc(request->rq_reqmsg, opcode);
+       ptlrpc_assign_next_xid(request);
 
        RETURN(0);
 out_ctx:
@@ -1270,6 +1307,24 @@ static void ptlrpc_save_versions(struct ptlrpc_request *req)
         EXIT;
 }
 
+__u64 ptlrpc_known_replied_xid(struct obd_import *imp)
+{
+       struct ptlrpc_request *req;
+
+       assert_spin_locked(&imp->imp_lock);
+       if (list_empty(&imp->imp_unreplied_list))
+               return 0;
+
+       req = list_entry(imp->imp_unreplied_list.next, struct ptlrpc_request,
+                        rq_unreplied_list);
+       LASSERTF(req->rq_xid >= 1, "XID:"LPU64"\n", req->rq_xid);
+
+       if (imp->imp_known_replied_xid < req->rq_xid - 1)
+               imp->imp_known_replied_xid = req->rq_xid - 1;
+
+       return req->rq_xid - 1;
+}
+
 /**
  * Callback function called when client receives RPC reply for \a req.
  * Returns 0 on success or error code.
@@ -1357,6 +1412,11 @@ static int after_reply(struct ptlrpc_request *req)
                else
                        req->rq_sent = now + req->rq_nr_resend;
 
+               /* Resend for EINPROGRESS will use a new XID */
+               spin_lock(&imp->imp_lock);
+               list_del_init(&req->rq_unreplied_list);
+               spin_unlock(&imp->imp_lock);
+
                RETURN(0);
        }
 
@@ -1472,8 +1532,7 @@ static int after_reply(struct ptlrpc_request *req)
 static int ptlrpc_send_new_req(struct ptlrpc_request *req)
 {
         struct obd_import     *imp = req->rq_import;
-       struct list_head      *tmp;
-       __u64                  min_xid = ~0ULL;
+       __u64                  min_xid = 0;
         int rc;
         ENTRY;
 
@@ -1494,15 +1553,8 @@ static int ptlrpc_send_new_req(struct ptlrpc_request *req)
 
        spin_lock(&imp->imp_lock);
 
-       /* the very first time we assign XID. it's important to assign XID
-        * and put it on the list atomically, so that the lowest assigned
-        * XID is always known. this is vital for multislot last_rcvd */
-       if (req->rq_send_state == LUSTRE_IMP_REPLAY) {
-               LASSERT(req->rq_xid != 0);
-       } else {
-               LASSERT(req->rq_xid == 0);
-               req->rq_xid = ptlrpc_next_xid();
-       }
+       LASSERT(req->rq_xid != 0);
+       LASSERT(!list_empty(&req->rq_unreplied_list));
 
        if (!req->rq_generation_set)
                req->rq_import_generation = imp->imp_generation;
@@ -1534,23 +1586,23 @@ static int ptlrpc_send_new_req(struct ptlrpc_request *req)
        list_add_tail(&req->rq_list, &imp->imp_sending_list);
        atomic_inc(&req->rq_import->imp_inflight);
 
-       /* find the lowest unreplied XID */
-       list_for_each(tmp, &imp->imp_delayed_list) {
-               struct ptlrpc_request *r;
-               r = list_entry(tmp, struct ptlrpc_request, rq_list);
-               if (r->rq_xid < min_xid)
-                       min_xid = r->rq_xid;
-       }
-       list_for_each(tmp, &imp->imp_sending_list) {
-               struct ptlrpc_request *r;
-               r = list_entry(tmp, struct ptlrpc_request, rq_list);
-               if (r->rq_xid < min_xid)
-                       min_xid = r->rq_xid;
-       }
+       /* find the known replied XID from the unreplied list, CONNECT
+        * and DISCONNECT requests are skipped to make the sanity check
+        * on server side happy. see process_req_last_xid().
+        *
+        * For CONNECT: Because replay requests have lower XID, it'll
+        * break the sanity check if CONNECT bump the exp_last_xid on
+        * server.
+        *
+        * For DISCONNECT: Since client will abort inflight RPC before
+        * sending DISCONNECT, DISCONNECT may carry an XID which higher
+        * than the inflight RPC.
+        */
+       if (!ptlrpc_req_is_connect(req) && !ptlrpc_req_is_disconnect(req))
+               min_xid = ptlrpc_known_replied_xid(imp);
        spin_unlock(&imp->imp_lock);
 
-       if (likely(min_xid != ~0ULL))
-               lustre_msg_set_last_xid(req->rq_reqmsg, min_xid - 1);
+       lustre_msg_set_last_xid(req->rq_reqmsg, min_xid);
 
        lustre_msg_set_status(req->rq_reqmsg, current_pid());
 
@@ -1978,6 +2030,7 @@ int ptlrpc_check_set(const struct lu_env *env, struct ptlrpc_request_set *set)
                        list_del_init(&req->rq_list);
                        atomic_dec(&imp->imp_inflight);
                }
+               list_del_init(&req->rq_unreplied_list);
                spin_unlock(&imp->imp_lock);
 
                atomic_dec(&set->set_remaining);
@@ -2375,6 +2428,7 @@ static void __ptlrpc_free_req(struct ptlrpc_request *request, int locked)
                if (!locked)
                        spin_lock(&request->rq_import->imp_lock);
                list_del_init(&request->rq_replay_list);
+               list_del_init(&request->rq_unreplied_list);
                if (!locked)
                        spin_unlock(&request->rq_import->imp_lock);
         }
@@ -3122,7 +3176,7 @@ void ptlrpc_set_bulk_mbits(struct ptlrpc_request *req)
 
        LASSERT(bd != NULL);
 
-       if (!req->rq_resend || req->rq_nr_resend != 0) {
+       if (!req->rq_resend) {
                /* this request has a new xid, just use it as bulk matchbits */
                req->rq_mbits = req->rq_xid;
 
index b663ca5..6d798ca 100644 (file)
@@ -944,6 +944,37 @@ static int ptlrpc_connect_set_flags(struct obd_import *imp,
 }
 
 /**
+ * Add all replay requests back to unreplied list before start replay,
+ * so that we can make sure the known replied XID is always increased
+ * only even if when replaying requests.
+ */
+static void ptlrpc_prepare_replay(struct obd_import *imp)
+{
+       struct ptlrpc_request *req;
+
+       if (imp->imp_state != LUSTRE_IMP_REPLAY ||
+           imp->imp_resend_replay)
+               return;
+
+       /* If the server was restart during repaly, the requests may
+        * have been added to the unreplied list in former replay. */
+       spin_lock(&imp->imp_lock);
+
+       list_for_each_entry(req, &imp->imp_committed_list, rq_replay_list) {
+               if (list_empty(&req->rq_unreplied_list))
+                       ptlrpc_add_unreplied(req);
+       }
+
+       list_for_each_entry(req, &imp->imp_replay_list, rq_replay_list) {
+               if (list_empty(&req->rq_unreplied_list))
+                       ptlrpc_add_unreplied(req);
+       }
+
+       imp->imp_known_replied_xid = ptlrpc_known_replied_xid(imp);
+       spin_unlock(&imp->imp_lock);
+}
+
+/**
  * interpret_reply callback for connect RPCs.
  * Looks into returned status of connect operation and decides
  * what to do with the import - i.e enter recovery, promote it to
@@ -1218,6 +1249,7 @@ static int ptlrpc_connect_interpret(const struct lu_env *env,
         }
 
 finish:
+       ptlrpc_prepare_replay(imp);
        rc = ptlrpc_import_recovery_state_machine(imp);
        if (rc == -ENOTCONN) {
                CDEBUG(D_HA, "evicted/aborted by %s@%s during recovery;"
index 7a96e2f..4dcbe3c 100644 (file)
@@ -702,19 +702,37 @@ int ptl_send_rpc(struct ptlrpc_request *request, int noreply)
        lustre_msghdr_set_flags(request->rq_reqmsg,
                                imp->imp_msghdr_flags);
 
-       if (request->rq_nr_resend != 0) {
+       /* If it's the first time to resend the request for EINPROGRESS,
+        * we need to allocate a new XID (see after_reply()), it's different
+        * from the resend for reply timeout. */
+       if (request->rq_nr_resend != 0 &&
+           list_empty(&request->rq_unreplied_list)) {
+               __u64 min_xid = 0;
                /* resend for EINPROGRESS, allocate new xid to avoid reply
                 * reconstruction */
-               request->rq_xid = ptlrpc_next_xid();
+               spin_lock(&imp->imp_lock);
+               ptlrpc_assign_next_xid_nolock(request);
+               request->rq_mbits = request->rq_xid;
+               min_xid = ptlrpc_known_replied_xid(imp);
+               spin_unlock(&imp->imp_lock);
+
+               lustre_msg_set_last_xid(request->rq_reqmsg, min_xid);
                DEBUG_REQ(D_RPCTRACE, request, "Allocating new xid for "
                          "resend on EINPROGRESS");
-       }
-
-       if (request->rq_bulk != NULL) {
+       } else if (request->rq_bulk != NULL) {
                ptlrpc_set_bulk_mbits(request);
                lustre_msg_set_mbits(request->rq_reqmsg, request->rq_mbits);
        }
 
+       if (list_empty(&request->rq_unreplied_list) ||
+           request->rq_xid <= imp->imp_known_replied_xid) {
+               DEBUG_REQ(D_ERROR, request, "xid: "LPU64", replied: "LPU64", "
+                         "list_empty:%d\n", request->rq_xid,
+                         imp->imp_known_replied_xid,
+                         list_empty(&request->rq_unreplied_list));
+               LBUG();
+       }
+
        /** For enabled AT all request should have AT_SUPPORT in the
         * FULL import state when OBD_CONNECT_AT is set */
        LASSERT(AT_OFF || imp->imp_state != LUSTRE_IMP_FULL ||
index 30e72a0..8874ad0 100644 (file)
@@ -89,6 +89,9 @@ int ptlrpc_expired_set(void *data);
 int ptlrpc_set_next_timeout(struct ptlrpc_request_set *);
 void ptlrpc_resend_req(struct ptlrpc_request *request);
 void ptlrpc_set_bulk_mbits(struct ptlrpc_request *req);
+void ptlrpc_assign_next_xid_nolock(struct ptlrpc_request *req);
+__u64 ptlrpc_known_replied_xid(struct obd_import *imp);
+void ptlrpc_add_unreplied(struct ptlrpc_request *req);
 
 /* events.c */
 int ptlrpc_init_portals(void);
@@ -373,6 +376,7 @@ static inline void ptlrpc_cli_req_init(struct ptlrpc_request *req)
 
        INIT_LIST_HEAD(&cr->cr_set_chain);
        INIT_LIST_HEAD(&cr->cr_ctx_chain);
+       INIT_LIST_HEAD(&cr->cr_unreplied_list);
        init_waitqueue_head(&cr->cr_reply_waitq);
        init_waitqueue_head(&cr->cr_set_waitq);
 }
@@ -389,4 +393,24 @@ static inline void ptlrpc_srv_req_init(struct ptlrpc_request *req)
        INIT_LIST_HEAD(&sr->sr_hist_list);
 }
 
+static inline bool ptlrpc_req_is_connect(struct ptlrpc_request *req)
+{
+       if (lustre_msg_get_opc(req->rq_reqmsg) == MDS_CONNECT ||
+           lustre_msg_get_opc(req->rq_reqmsg) == OST_CONNECT ||
+           lustre_msg_get_opc(req->rq_reqmsg) == MGS_CONNECT)
+               return true;
+       else
+               return false;
+}
+
+static inline bool ptlrpc_req_is_disconnect(struct ptlrpc_request *req)
+{
+       if (lustre_msg_get_opc(req->rq_reqmsg) == MDS_DISCONNECT ||
+           lustre_msg_get_opc(req->rq_reqmsg) == OST_DISCONNECT ||
+           lustre_msg_get_opc(req->rq_reqmsg) == MGS_DISCONNECT)
+               return true;
+       else
+               return false;
+}
+
 #endif /* PTLRPC_INTERNAL_H */
index 4a3ba0c..f1a19b4 100644 (file)
@@ -158,25 +158,35 @@ int ptlrpc_replay_next(struct obd_import *imp, int *inflight)
 
        /* If need to resend the last sent transno (because a reconnect
         * has occurred), then stop on the matching req and send it again.
-        * If, however, the last sent transno has been committed then we 
+        * If, however, the last sent transno has been committed then we
         * continue replay from the next request. */
        if (req != NULL && imp->imp_resend_replay)
                lustre_msg_add_flags(req->rq_reqmsg, MSG_RESENT);
 
        spin_lock(&imp->imp_lock);
+       /* The resend replay request may have been removed from the
+        * unreplied list. */
+       if (req != NULL && imp->imp_resend_replay &&
+           list_empty(&req->rq_unreplied_list))
+               ptlrpc_add_unreplied(req);
+
        imp->imp_resend_replay = 0;
        spin_unlock(&imp->imp_lock);
 
-        if (req != NULL) {
-                rc = ptlrpc_replay_req(req);
-                if (rc) {
-                        CERROR("recovery replay error %d for req "
-                               LPU64"\n", rc, req->rq_xid);
-                        RETURN(rc);
-                }
-                *inflight = 1;
-        }
-        RETURN(rc);
+       if (req != NULL) {
+               /* The request should have been added back in unreplied list
+                * by ptlrpc_prepare_replay(). */
+               LASSERT(!list_empty(&req->rq_unreplied_list));
+
+               rc = ptlrpc_replay_req(req);
+               if (rc) {
+                       CERROR("recovery replay error %d for req "
+                              LPU64"\n", rc, req->rq_xid);
+                       RETURN(rc);
+               }
+               *inflight = 1;
+       }
+       RETURN(rc);
 }
 
 /**
index 1110902..52fc489 100644 (file)
@@ -604,6 +604,53 @@ static struct tgt_handler *tgt_handler_find_check(struct ptlrpc_request *req)
        RETURN(h);
 }
 
+static int process_req_last_xid(struct ptlrpc_request *req)
+{
+       __u64   last_xid;
+       ENTRY;
+
+       /* check request's xid is consistent with export's last_xid */
+       last_xid = lustre_msg_get_last_xid(req->rq_reqmsg);
+       if (last_xid > req->rq_export->exp_last_xid)
+               req->rq_export->exp_last_xid = last_xid;
+
+       if (req->rq_xid == 0 ||
+           (req->rq_xid <= req->rq_export->exp_last_xid)) {
+               DEBUG_REQ(D_ERROR, req, "Unexpected xid %llx vs. "
+                         "last_xid %llx\n", req->rq_xid,
+                         req->rq_export->exp_last_xid);
+#if LUSTRE_VERSION_CODE < OBD_OCD_VERSION(2, 7, 93, 0)
+               /* This LBUG() can be triggered in following case:
+                *
+                * - Client send a no_resend RPC, like statfs;
+                * - The RPC timedout (or some other error) on client,
+                *   then it's removed from the unreplied list;
+                * - Client send some other request to bump the
+                *   exp_last_xid on server;
+                * - The former RPC got chance to be processed;
+                * - LBUG();
+                *
+                * Let's keep this for debug purpose for now, and it
+                * should be removed when release.
+                */
+               LBUG();
+#endif
+               req->rq_status = -EPROTO;
+               RETURN(ptlrpc_error(req));
+       }
+
+       /* try to release in-memory reply data */
+       if (tgt_is_multimodrpcs_client(req->rq_export)) {
+               tgt_handle_received_xid(req->rq_export,
+                               lustre_msg_get_last_xid(req->rq_reqmsg));
+               if (!(lustre_msg_get_flags(req->rq_reqmsg) &
+                     (MSG_RESENT | MSG_REPLAY)))
+                       tgt_handle_tag(req->rq_export,
+                                      lustre_msg_get_tag(req->rq_reqmsg));
+       }
+       RETURN(0);
+}
+
 int tgt_request_handle(struct ptlrpc_request *req)
 {
        struct tgt_session_info *tsi = tgt_ses_info(req->rq_svc_thread->t_env);
@@ -613,8 +660,9 @@ int tgt_request_handle(struct ptlrpc_request *req)
        struct lu_target        *tgt;
        int                      request_fail_id = 0;
        __u32                    opc = lustre_msg_get_opc(msg);
+       struct obd_device       *obd;
        int                      rc;
-
+       bool                     is_connect = false;
        ENTRY;
 
        /* Refill the context, to make sure all thread keys are allocated */
@@ -628,6 +676,7 @@ int tgt_request_handle(struct ptlrpc_request *req)
         * target, otherwise that should be connect operation */
        if (opc == MDS_CONNECT || opc == OST_CONNECT ||
            opc == MGS_CONNECT) {
+               is_connect = true;
                req_capsule_set(&req->rq_pill, &RQF_CONNECT);
                rc = target_handle_connect(req);
                if (rc != 0) {
@@ -671,38 +720,24 @@ int tgt_request_handle(struct ptlrpc_request *req)
                GOTO(out, rc);
        }
 
-       /* check request's xid is consistent with export's last_xid */
-       if (req->rq_export != NULL) {
-               __u64 last_xid = lustre_msg_get_last_xid(req->rq_reqmsg);
-               if (last_xid != 0)
-                       req->rq_export->exp_last_xid = last_xid;
-               if (req->rq_xid == 0 ||
-                   req->rq_xid <= req->rq_export->exp_last_xid) {
-                       DEBUG_REQ(D_ERROR, req,
-                                 "Unexpected xid %llx vs. last_xid %llx\n",
-                                 req->rq_xid, req->rq_export->exp_last_xid);
-#if LUSTRE_VERSION_CODE < OBD_OCD_VERSION(2, 7, 93, 0)
-                       LBUG();
-#endif
-                       req->rq_status = -EPROTO;
-                       rc = ptlrpc_error(req);
+       /* Skip last_xid processing for the recovery thread, otherwise, the
+        * last_xid on same request could be processed twice: first time when
+        * processing the incoming request, second time when the request is
+        * being processed by recovery thread. */
+       obd = class_exp2obd(req->rq_export);
+       if (is_connect) {
+               /* reset the exp_last_xid on each connection. */
+               req->rq_export->exp_last_xid = 0;
+       } else if (obd->obd_recovery_data.trd_processing_task !=
+                  current_pid()) {
+               rc = process_req_last_xid(req);
+               if (rc)
                        GOTO(out, rc);
-               }
        }
 
        request_fail_id = tgt->lut_request_fail_id;
        tsi->tsi_reply_fail_id = tgt->lut_reply_fail_id;
 
-       /* try to release in-memory reply data */
-       if (tgt_is_multimodrpcs_client(req->rq_export)) {
-               tgt_handle_received_xid(req->rq_export,
-                               lustre_msg_get_last_xid(req->rq_reqmsg));
-               if (!(lustre_msg_get_flags(req->rq_reqmsg) &
-                     (MSG_RESENT | MSG_REPLAY)))
-                       tgt_handle_tag(req->rq_export,
-                                      lustre_msg_get_tag(req->rq_reqmsg));
-       }
-
        h = tgt_handler_find_check(req);
        if (IS_ERR(h)) {
                req->rq_status = PTR_ERR(h);