Whamcloud - gitweb
LU-11444 ptlrpc: resend may corrupt the data
[fs/lustre-release.git] / lustre / target / tgt_lastrcvd.c
index bcb4ff9..b36908b 100644 (file)
@@ -1145,8 +1145,30 @@ int tgt_client_del(const struct lu_env *env, struct obd_export *exp)
 }
 EXPORT_SYMBOL(tgt_client_del);
 
+static void tgt_clean_by_tag(struct obd_export *exp, __u64 xid, __u16 tag)
+{
+       struct tg_export_data   *ted = &exp->exp_target_data;
+       struct lu_target        *lut = class_exp2tgt(exp);
+       struct tg_reply_data    *trd, *tmp;
+
+       if (tag == 0)
+               return;
+
+       list_for_each_entry_safe(trd, tmp, &ted->ted_reply_list, trd_list) {
+               if (trd->trd_tag != tag)
+                       continue;
+
+               LASSERT(ergo(tgt_is_increasing_xid_client(exp),
+                            trd->trd_reply.lrd_xid <= xid));
+
+               ted->ted_release_tag++;
+               tgt_release_reply_data(lut, ted, trd);
+       }
+}
+
 int tgt_add_reply_data(const struct lu_env *env, struct lu_target *tgt,
                       struct tg_export_data *ted, struct tg_reply_data *trd,
+                      struct ptlrpc_request *req,
                       struct thandle *th, bool update_lrd_file)
 {
        struct lsd_reply_data   *lrd;
@@ -1183,6 +1205,19 @@ int tgt_add_reply_data(const struct lu_env *env, struct lu_target *tgt,
        }
        /* add reply data to target export's reply list */
        mutex_lock(&ted->ted_lcd_lock);
+       if (req != NULL) {
+               int exclude = tgt_is_increasing_xid_client(req->rq_export) ?
+                             MSG_REPLAY : MSG_REPLAY|MSG_RESENT;
+
+               if (req->rq_obsolete) {
+                       mutex_unlock(&ted->ted_lcd_lock);
+                       RETURN(-EALREADY);
+               }
+
+               if (!(lustre_msg_get_flags(req->rq_reqmsg) & exclude))
+                       tgt_clean_by_tag(req->rq_export, req->rq_xid,
+                                        trd->trd_tag);
+       }
        list_add(&trd->trd_list, &ted->ted_reply_list);
        ted->ted_reply_cnt++;
        if (ted->ted_reply_cnt > ted->ted_reply_max)
@@ -1192,7 +1227,8 @@ int tgt_add_reply_data(const struct lu_env *env, struct lu_target *tgt,
        CDEBUG(D_TRACE, "add reply %p: xid %llu, transno %llu, "
               "tag %hu, client gen %u, slot idx %d\n",
               trd, lrd->lrd_xid, lrd->lrd_transno,
-              trd->trd_tag, lrd->lrd_client_gen, i);
+              trd->trd_tag, lrd->lrd_client_gen, trd->trd_index);
+
        RETURN(0);
 }
 EXPORT_SYMBOL(tgt_add_reply_data);
@@ -1323,7 +1359,8 @@ static int tgt_last_rcvd_update(const struct lu_env *env, struct lu_target *tgt,
                        trd->trd_pre_versions[3] = pre_versions[3];
                }
 
-               rc = tgt_add_reply_data(env, tgt, ted, trd, th, write_update);
+               rc = tgt_add_reply_data(env, tgt, ted, trd, req,
+                                       th, write_update);
                if (rc < 0)
                        OBD_FREE_PTR(trd);
                return rc;
@@ -2044,43 +2081,70 @@ out:
        return rc;
 }
 
-struct tg_reply_data *tgt_lookup_reply_by_xid(struct tg_export_data *ted,
-                                             __u64 xid)
+static int tgt_check_lookup_req(struct ptlrpc_request *req, int lookup,
+                               struct tg_reply_data *trd)
 {
-       struct tg_reply_data    *found = NULL;
-       struct tg_reply_data    *reply;
+       struct tg_export_data *ted = &req->rq_export->exp_target_data;
+       struct lu_target *lut = class_exp2tgt(req->rq_export);
+       __u16 tag = lustre_msg_get_tag(req->rq_reqmsg);
+       int rc = 0;
+       struct tg_reply_data *reply;
+       bool check_increasing;
+
+       if (tag == 0)
+               return 0;
+
+       check_increasing = tgt_is_increasing_xid_client(req->rq_export) &&
+                          !(lustre_msg_get_flags(req->rq_reqmsg) & MSG_REPLAY);
+       if (!lookup && !check_increasing)
+               return 0;
 
-       mutex_lock(&ted->ted_lcd_lock);
        list_for_each_entry(reply, &ted->ted_reply_list, trd_list) {
-               if (reply->trd_reply.lrd_xid == xid) {
-                       found = reply;
+               if (lookup && reply->trd_reply.lrd_xid == req->rq_xid) {
+                       rc = 1;
+                       if (trd != NULL)
+                               *trd = *reply;
+                       break;
+               } else if (check_increasing && reply->trd_tag == tag &&
+                          reply->trd_reply.lrd_xid > req->rq_xid) {
+                       rc = -EPROTO;
+                       CERROR("%s: busy tag=%u req_xid=%llu, trd=%p: xid=%llu transno=%llu client_gen=%u slot_idx=%d: rc = %d\n",
+                              tgt_name(lut), tag, req->rq_xid, trd,
+                              reply->trd_reply.lrd_xid,
+                              reply->trd_reply.lrd_transno,
+                              reply->trd_reply.lrd_client_gen,
+                              reply->trd_index, rc);
                        break;
                }
        }
-       mutex_unlock(&ted->ted_lcd_lock);
-       return found;
+
+       return rc;
 }
-EXPORT_SYMBOL(tgt_lookup_reply_by_xid);
 
 /* Look for a reply data matching specified request @req
  * A copy is returned in @trd if the pointer is not NULL
  */
-bool tgt_lookup_reply(struct ptlrpc_request *req, struct tg_reply_data *trd)
+int tgt_lookup_reply(struct ptlrpc_request *req, struct tg_reply_data *trd)
 {
-       struct tg_export_data   *ted = &req->rq_export->exp_target_data;
-       struct tg_reply_data    *reply;
-       bool                     found = false;
-
-       reply = tgt_lookup_reply_by_xid(ted, req->rq_xid);
-       if (reply != NULL) {
-               found = true;
-               if (trd != NULL)
-                       *trd = *reply;
+       struct tg_export_data *ted = &req->rq_export->exp_target_data;
+       int found = 0;
+       bool not_replay = !(lustre_msg_get_flags(req->rq_reqmsg) & MSG_REPLAY);
+
+       mutex_lock(&ted->ted_lcd_lock);
+       if (not_replay && req->rq_xid <= req->rq_export->exp_last_xid) {
+               /* A check for the last_xid is needed here in case there is
+                * no reply data is left in the list. It may happen if another
+                * RPC on another slot increased the last_xid between our
+                * process_req_last_xid & tgt_lookup_reply calls */
+               found = -EPROTO;
+       } else {
+               found = tgt_check_lookup_req(req, 1, trd);
        }
+       mutex_unlock(&ted->ted_lcd_lock);
 
-       CDEBUG(D_TRACE, "%s: lookup reply xid %llu, found %d\n",
-              tgt_name(class_exp2tgt(req->rq_export)), req->rq_xid,
-              found ? 1 : 0);
+       CDEBUG(D_TRACE, "%s: lookup reply xid %llu, found %d last_xid %llu\n",
+              tgt_name(class_exp2tgt(req->rq_export)), req->rq_xid, found,
+              req->rq_export->exp_last_xid);
 
        return found;
 }
@@ -2092,37 +2156,19 @@ int tgt_handle_received_xid(struct obd_export *exp, __u64 rcvd_xid)
        struct lu_target        *lut = class_exp2tgt(exp);
        struct tg_reply_data    *trd, *tmp;
 
-       mutex_lock(&ted->ted_lcd_lock);
+
        list_for_each_entry_safe(trd, tmp, &ted->ted_reply_list, trd_list) {
                if (trd->trd_reply.lrd_xid > rcvd_xid)
                        continue;
                ted->ted_release_xid++;
                tgt_release_reply_data(lut, ted, trd);
        }
-       mutex_unlock(&ted->ted_lcd_lock);
 
        return 0;
 }
 
-int tgt_handle_tag(struct obd_export *exp, __u16 tag)
+int tgt_handle_tag(struct ptlrpc_request *req)
 {
-       struct tg_export_data   *ted = &exp->exp_target_data;
-       struct lu_target        *lut = class_exp2tgt(exp);
-       struct tg_reply_data    *trd, *tmp;
-
-       if (tag == 0)
-               return 0;
-
-       mutex_lock(&ted->ted_lcd_lock);
-       list_for_each_entry_safe(trd, tmp, &ted->ted_reply_list, trd_list) {
-               if (trd->trd_tag != tag)
-                       continue;
-               ted->ted_release_tag++;
-               tgt_release_reply_data(lut, ted, trd);
-               break;
-       }
-       mutex_unlock(&ted->ted_lcd_lock);
-
-       return 0;
+       return tgt_check_lookup_req(req, 0, NULL);
 }