Whamcloud - gitweb
LU-3534 out: only start transaction for modified update 83/7983/27
authorwang di <di.wang@intel.com>
Mon, 16 Dec 2013 17:21:16 +0000 (09:21 -0800)
committerOleg Drokin <oleg.drokin@intel.com>
Wed, 19 Mar 2014 07:17:40 +0000 (07:17 +0000)
Only start transaction, if the update will modify the
filesystem.

Signed-off-by: wang di <di.wang@intel.com>
Change-Id: I9c516b5fe6f6bc743e1dbc69ab04fd7062b2311d
Reviewed-on: http://review.whamcloud.com/7983
Tested-by: Jenkins
Tested-by: Maloo <hpdd-maloo@intel.com>
Reviewed-by: Andreas Dilger <andreas.dilger@intel.com>
Reviewed-by: John L. Hammond <john.hammond@intel.com>
Reviewed-by: Alex Zhuravlev <alexey.zhuravlev@intel.com>
Reviewed-by: Oleg Drokin <oleg.drokin@intel.com>
lustre/target/out_handler.c

index 55e826d..48bb61c 100644 (file)
@@ -601,9 +601,10 @@ static int out_xattr_set(struct tgt_session_info *tsi)
        struct lu_buf           *lbuf = &tti->tti_buf;
        char                    *name;
        char                    *buf;
-       char                    *tmp;
+       __u32                   *tmp;
        int                      buf_len = 0;
        int                      flag;
+       int                      size = 0;
        int                      rc;
        ENTRY;
 
@@ -624,16 +625,16 @@ static int out_xattr_set(struct tgt_session_info *tsi)
        lbuf->lb_buf = buf;
        lbuf->lb_len = buf_len;
 
-       tmp = (char *)object_update_param_get(update, 2, NULL);
-       if (tmp == NULL) {
-               CERROR("%s: empty flag for xattr set: rc = %d\n",
-                      tgt_name(tsi->tsi_tgt), -EPROTO);
+       tmp = object_update_param_get(update, 2, &size);
+       if (tmp == NULL || size != sizeof(*tmp)) {
+               CERROR("%s: emptry or wrong size %d flag: rc = %d\n",
+                      tgt_name(tsi->tsi_tgt), size, -EPROTO);
                RETURN(err_serious(-EPROTO));
        }
 
        if (ptlrpc_req_need_swab(tsi->tsi_pill->rc_req))
-               __swab32s((__u32 *)tmp);
-       flag = *(int *)tmp;
+               __swab32s(tmp);
+       flag = *tmp;
 
        rc = out_tx_xattr_set(tsi->tsi_env, obj, lbuf, name, flag,
                              &tti->tti_tea,
@@ -1142,7 +1143,8 @@ static int out_write(struct tgt_session_info *tsi)
        struct dt_object        *obj = tti->tti_u.update.tti_dt_object;
        struct lu_buf           *lbuf = &tti->tti_buf;
        char                    *buf;
-       char                    *tmp;
+       __u64                   *tmp;
+       int                     size = 0;
        int                     buf_len = 0;
        loff_t                  pos;
        int                      rc;
@@ -1157,16 +1159,16 @@ static int out_write(struct tgt_session_info *tsi)
        lbuf->lb_buf = buf;
        lbuf->lb_len = buf_len;
 
-       tmp = (char *)object_update_param_get(update, 1, NULL);
-       if (tmp == NULL) {
-               CERROR("%s: empty flag for xattr set: rc = %d\n",
-                      tgt_name(tsi->tsi_tgt), -EPROTO);
+       tmp = object_update_param_get(update, 1, &size);
+       if (tmp == NULL || size != sizeof(*tmp)) {
+               CERROR("%s: empty or wrong size %d pos: rc = %d\n",
+                      tgt_name(tsi->tsi_tgt), size, -EPROTO);
                RETURN(err_serious(-EPROTO));
        }
 
        if (ptlrpc_req_need_swab(tsi->tsi_pill->rc_req))
-               __swab64s((__u64 *)tmp);
-       pos = *(loff_t *)tmp;
+               __swab64s(tmp);
+       pos = *tmp;
 
        rc = out_tx_write(tsi->tsi_env, obj, lbuf, pos,
                          &tti->tti_tea,
@@ -1285,17 +1287,21 @@ static int out_trans_stop(const struct lu_env *env,
 
 int out_tx_end(const struct lu_env *env, struct thandle_exec_args *ta)
 {
-       struct tgt_session_info *tsi = tgt_ses_info(env);
-       int i = 0, rc;
+       struct tgt_session_info *tsi = tgt_ses_info(env);
+       int                     i;
+       int                     rc;
+       int                     rc1;
+       ENTRY;
 
-       LASSERT(ta->ta_dev);
-       LASSERT(ta->ta_handle);
+       if (ta->ta_handle == NULL)
+               RETURN(0);
 
        if (ta->ta_err != 0 || ta->ta_argno == 0)
                GOTO(stop, rc = ta->ta_err);
 
+       LASSERT(ta->ta_dev != NULL);
        rc = out_trans_start(env, ta);
-       if (unlikely(rc))
+       if (unlikely(rc != 0))
                GOTO(stop, rc);
 
        for (i = 0; i < ta->ta_argno; i++) {
@@ -1318,14 +1324,17 @@ int out_tx_end(const struct lu_env *env, struct thandle_exec_args *ta)
                        }
                        break;
                }
+               CDEBUG(D_INFO, "%s: executed %u/%u: rc = %d\n",
+                      dt_obd_name(ta->ta_dev), i, ta->ta_argno, rc);
        }
 
        /* Only fail for real update */
        tsi->tsi_reply_fail_id = OBD_FAIL_OUT_UPDATE_NET_REP;
 stop:
-       CDEBUG(D_INFO, "%s: executed %u/%u: rc = %d\n",
-              dt_obd_name(ta->ta_dev), i, ta->ta_argno, rc);
-       out_trans_stop(env, ta, rc);
+       rc1 = out_trans_stop(env, ta, rc);
+       if (rc == 0)
+               rc = rc1;
+
        ta->ta_handle = NULL;
        ta->ta_argno = 0;
        ta->ta_err = 0;
@@ -1356,7 +1365,7 @@ int out_handle(struct tgt_session_info *tsi)
        struct object_update_reply      *reply;
        int                              bufsize;
        int                              count;
-       int                              old_batchid = -1;
+       int                              current_batchid = -1;
        int                              i;
        int                              rc = 0;
        int                              rc1 = 0;
@@ -1407,13 +1416,9 @@ int out_handle(struct tgt_session_info *tsi)
                RETURN(err_serious(-EPROTO));
        object_update_reply_init(reply, count);
        tti->tti_u.update.tti_update_reply = reply;
-
-       rc = out_tx_start(env, dt, ta, tsi->tsi_exp);
-       if (rc != 0)
-               RETURN(rc);
-
        tti->tti_mult_trans = !req_is_replay(tgt_ses_req(tsi));
 
+       memset(ta, 0, sizeof(*ta));
        /* Walk through updates in the request to execute them synchronously */
        for (i = 0; i < count; i++) {
                struct tgt_handler      *h;
@@ -1426,21 +1431,6 @@ int out_handle(struct tgt_session_info *tsi)
                if (ptlrpc_req_need_swab(pill->rc_req))
                        lustre_swab_object_update(update);
 
-               if (old_batchid == -1) {
-                       old_batchid = update->ou_batchid;
-               } else if (old_batchid != update->ou_batchid) {
-                       /* Stop the current update transaction,
-                        * create a new one */
-                       rc = out_tx_end(env, ta);
-                       if (rc < 0)
-                               RETURN(rc);
-
-                       rc = out_tx_start(env, dt, ta, tsi->tsi_exp);
-                       if (rc != 0)
-                               RETURN(rc);
-                       old_batchid = update->ou_batchid;
-               }
-
                if (!fid_is_sane(&update->ou_fid)) {
                        CERROR("%s: invalid FID "DFID": rc = %d\n",
                               tgt_name(tsi->tsi_tgt), PFID(&update->ou_fid),
@@ -1463,33 +1453,63 @@ int out_handle(struct tgt_session_info *tsi)
                tti->tti_u.update.tti_update_reply_index = i;
 
                h = out_handler_find(update->ou_type);
-               if (likely(h != NULL)) {
-                       /* For real modification RPC, check if the update
-                        * has been executed */
-                       if (h->th_flags & MUTABOR) {
-                               struct ptlrpc_request *req = tgt_ses_req(tsi);
+               if (unlikely(h == NULL)) {
+                       CERROR("%s: unsupported opc: 0x%x\n",
+                              tgt_name(tsi->tsi_tgt), update->ou_type);
+                       GOTO(next, rc = -ENOTSUPP);
+               }
+
+               /* start transaction for modification RPC only */
+               if (h->th_flags & MUTABOR && current_batchid == -1) {
+                       current_batchid = update->ou_batchid;
+                       rc = out_tx_start(env, dt, ta, tsi->tsi_exp);
+                       if (rc < 0)
+                               GOTO(next, rc);
+               }
 
-                               if (out_check_resent(env, dt, dt_obj, req,
-                                                    out_reconstruct, reply, i))
+               /* Stop the current update transaction,
+                * if it meets a different batchid, or
+                * it is read-only RPC */
+               if (((current_batchid != update->ou_batchid) ||
+                    !(h->th_flags & MUTABOR)) && ta->ta_handle != NULL) {
+                       rc = out_tx_end(env, ta);
+                       current_batchid = -1;
+                       if (rc != 0)
+                               GOTO(next, rc);
+
+                       /* start a new transaction if needed */
+                       if (h->th_flags & MUTABOR) {
+                               rc = out_tx_start(env, dt, ta, tsi->tsi_exp);
+                               if (rc != 0)
                                        GOTO(next, rc);
+
+                               current_batchid = update->ou_batchid;
                        }
+               }
 
-                       rc = h->th_act(tsi);
-               } else {
-                       CERROR("%s: The unsupported opc: 0x%x\n",
-                              tgt_name(tsi->tsi_tgt), update->ou_type);
-                       lu_object_put(env, &dt_obj->do_lu);
-                       GOTO(out, rc = -ENOTSUPP);
+               /* Check resend case only for modifying RPC */
+               if (h->th_flags & MUTABOR) {
+                       struct ptlrpc_request *req = tgt_ses_req(tsi);
+
+                       if (out_check_resent(env, dt, dt_obj, req,
+                                            out_reconstruct, reply,
+                                            i))
+                               GOTO(next, rc);
                }
+
+               rc = h->th_act(tsi);
 next:
                lu_object_put(env, &dt_obj->do_lu);
                if (rc < 0)
                        GOTO(out, rc);
        }
 out:
-       rc1 = out_tx_end(env, ta);
-       if (rc == 0)
-               rc = rc1;
+       if (current_batchid != -1) {
+               rc1 = out_tx_end(env, ta);
+               if (rc == 0)
+                       rc = rc1;
+       }
+
        RETURN(rc);
 }