Whamcloud - gitweb
LU-2430 mdd: add lfs mv to migrate inode.
[fs/lustre-release.git] / lustre / target / out_handler.c
index 11be1cd..74e8f47 100644 (file)
@@ -58,96 +58,6 @@ struct tx_arg *tx_add_exec(struct thandle_exec_args *ta, tx_exec_func_t func,
        return &ta->ta_args[i];
 }
 
-static int out_tx_start(const struct lu_env *env, struct dt_device *dt,
-                       struct thandle_exec_args *ta, struct obd_export *exp)
-{
-       memset(ta, 0, sizeof(*ta));
-       ta->ta_handle = dt_trans_create(env, dt);
-       if (IS_ERR(ta->ta_handle)) {
-               int rc;
-
-               CERROR("%s: start handle error: rc = %ld\n",
-                      dt_obd_name(dt), PTR_ERR(ta->ta_handle));
-               rc = PTR_ERR(ta->ta_handle);
-               ta->ta_handle = NULL;
-               return rc;
-       }
-       ta->ta_dev = dt;
-       if (exp->exp_need_sync)
-               ta->ta_handle->th_sync = 1;
-
-       return 0;
-}
-
-static int out_trans_start(const struct lu_env *env,
-                          struct thandle_exec_args *ta)
-{
-       return dt_trans_start(env, ta->ta_dev, ta->ta_handle);
-}
-
-static int out_trans_stop(const struct lu_env *env,
-                         struct thandle_exec_args *ta, int err)
-{
-       int i;
-       int rc;
-
-       ta->ta_handle->th_result = err;
-       rc = dt_trans_stop(env, ta->ta_dev, ta->ta_handle);
-       for (i = 0; i < ta->ta_argno; i++) {
-               if (ta->ta_args[i].object != NULL) {
-                       lu_object_put(env, &ta->ta_args[i].object->do_lu);
-                       ta->ta_args[i].object = NULL;
-               }
-       }
-
-       return rc;
-}
-
-int out_tx_end(const struct lu_env *env, struct thandle_exec_args *ta)
-{
-       struct tgt_session_info *tsi = tgt_ses_info(env);
-       int i = 0, rc;
-
-       LASSERT(ta->ta_dev);
-       LASSERT(ta->ta_handle);
-
-       if (ta->ta_err != 0 || ta->ta_argno == 0)
-               GOTO(stop, rc = ta->ta_err);
-
-       rc = out_trans_start(env, ta);
-       if (unlikely(rc))
-               GOTO(stop, rc);
-
-       for (i = 0; i < ta->ta_argno; i++) {
-               rc = ta->ta_args[i].exec_fn(env, ta->ta_handle,
-                                           &ta->ta_args[i]);
-               if (unlikely(rc)) {
-                       CDEBUG(D_INFO, "error during execution of #%u from"
-                              " %s:%d: rc = %d\n", i, ta->ta_args[i].file,
-                              ta->ta_args[i].line, rc);
-                       while (--i >= 0) {
-                               LASSERTF(ta->ta_args[i].undo_fn != NULL,
-                                   "can't undo changes, hope for failover!\n");
-                               ta->ta_args[i].undo_fn(env, ta->ta_handle,
-                                                      &ta->ta_args[i]);
-                       }
-                       break;
-               }
-       }
-
-       /* Only fail for real update */
-       tsi->tsi_reply_fail_id = OBD_FAIL_OUT_UPDATE_NET_REP;
-stop:
-       CDEBUG(D_INFO, "%s: executed %u/%u: rc = %d\n",
-              dt_obd_name(ta->ta_dev), i, ta->ta_argno, rc);
-       out_trans_stop(env, ta, rc);
-       ta->ta_handle = NULL;
-       ta->ta_argno = 0;
-       ta->ta_err = 0;
-
-       RETURN(rc);
-}
-
 static void out_reconstruct(const struct lu_env *env, struct dt_device *dt,
                            struct dt_object *obj,
                            struct object_update_reply *reply,
@@ -436,8 +346,15 @@ static int out_attr_get(struct tgt_session_info *tsi)
 
        ENTRY;
 
-       if (!lu_object_exists(&obj->do_lu))
+       if (!lu_object_exists(&obj->do_lu)) {
+               /* Usually, this will be called when the master MDT try
+                * to init a remote object(see osp_object_init), so if
+                * the object does not exist on slave, we need set BANSHEE flag,
+                * so the object can be removed from the cache immediately */
+               set_bit(LU_OBJECT_HEARD_BANSHEE,
+                       &obj->do_lu.lo_header->loh_flags);
                RETURN(-ENOENT);
+       }
 
        dt_read_lock(env, obj, MOR_TGT_CHILD);
        rc = dt_attr_get(env, obj, la, NULL);
@@ -617,6 +534,9 @@ static int out_tx_xattr_set_exec(const struct lu_env *env,
               dt_obd_name(th->th_dev), arg->u.xattr_set.buf.lb_buf,
               arg->u.xattr_set.name, arg->u.xattr_set.flags);
 
+       if (!lu_object_exists(&dt_obj->do_lu))
+               GOTO(out, rc = -ENOENT);
+
        dt_write_lock(env, dt_obj, MOR_TGT_CHILD);
        rc = dt_xattr_set(env, dt_obj, &arg->u.xattr_set.buf,
                          arg->u.xattr_set.name, arg->u.xattr_set.flags,
@@ -627,7 +547,7 @@ static int out_tx_xattr_set_exec(const struct lu_env *env,
         **/
        if (unlikely(rc && !strcmp(arg->u.xattr_set.name, XATTR_NAME_LINK)))
                rc = 0;
-
+out:
        CDEBUG(D_INFO, "%s: insert xattr set reply %p index %d: rc = %d\n",
               dt_obd_name(th->th_dev), arg->reply, arg->index, rc);
 
@@ -940,17 +860,16 @@ static int __out_tx_index_insert(const struct lu_env *env,
 
        LASSERT(ta->ta_handle != NULL);
 
-       if (lu_object_exists(&dt_obj->do_lu)) {
-               if (dt_try_as_dir(env, dt_obj) == 0) {
-                       ta->ta_err = -ENOTDIR;
-                       return ta->ta_err;
-               }
-               ta->ta_err = dt_declare_insert(env, dt_obj,
-                                              (struct dt_rec *)fid,
-                                              (struct dt_key *)name,
-                                              ta->ta_handle);
+       if (dt_try_as_dir(env, dt_obj) == 0) {
+               ta->ta_err = -ENOTDIR;
+               return ta->ta_err;
        }
 
+       ta->ta_err = dt_declare_insert(env, dt_obj,
+                                      (struct dt_rec *)fid,
+                                      (struct dt_key *)name,
+                                      ta->ta_handle);
+
        if (ta->ta_err != 0)
                return ta->ta_err;
 
@@ -1163,6 +1082,91 @@ static int out_destroy(struct tgt_session_info *tsi)
        RETURN(rc);
 }
 
+static int out_tx_write_exec(const struct lu_env *env, struct thandle *th,
+                            struct tx_arg *arg)
+{
+       struct dt_object *dt_obj = arg->object;
+       int rc;
+
+       dt_write_lock(env, dt_obj, MOR_TGT_CHILD);
+       rc = dt_record_write(env, dt_obj, &arg->u.write.buf,
+                            &arg->u.write.pos, th);
+       dt_write_unlock(env, dt_obj);
+
+       if (rc == 0)
+               rc = arg->u.write.buf.lb_len;
+
+       object_update_result_insert(arg->reply, NULL, 0, arg->index, rc);
+
+       return rc > 0 ? 0 : rc;
+}
+
+static int __out_tx_write(const struct lu_env *env,
+                         struct dt_object *dt_obj,
+                         const struct lu_buf *buf,
+                         loff_t pos, struct thandle_exec_args *ta,
+                         struct object_update_reply *reply,
+                         int index, char *file, int line)
+{
+       struct tx_arg   *arg;
+
+       LASSERT(ta->ta_handle != NULL);
+       ta->ta_err = dt_declare_record_write(env, dt_obj, buf, pos,
+                                            ta->ta_handle);
+       if (ta->ta_err != 0)
+               return ta->ta_err;
+
+       arg = tx_add_exec(ta, out_tx_write_exec, NULL, file, line);
+       LASSERT(arg);
+       lu_object_get(&dt_obj->do_lu);
+       arg->object = dt_obj;
+       arg->u.write.buf = *buf;
+       arg->u.write.pos = pos;
+       arg->reply = reply;
+       arg->index = index;
+       return 0;
+}
+
+static int out_write(struct tgt_session_info *tsi)
+{
+       struct tgt_thread_info  *tti = tgt_th_info(tsi->tsi_env);
+       struct object_update    *update = tti->tti_u.update.tti_update;
+       struct dt_object        *obj = tti->tti_u.update.tti_dt_object;
+       struct lu_buf           *lbuf = &tti->tti_buf;
+       char                    *buf;
+       char                    *tmp;
+       int                     buf_len = 0;
+       loff_t                  pos;
+       int                      rc;
+       ENTRY;
+
+       buf = object_update_param_get(update, 0, &buf_len);
+       if (buf == NULL || buf_len == 0) {
+               CERROR("%s: empty buf for xattr set: rc = %d\n",
+                      tgt_name(tsi->tsi_tgt), -EPROTO);
+               RETURN(err_serious(-EPROTO));
+       }
+       lbuf->lb_buf = buf;
+       lbuf->lb_len = buf_len;
+
+       tmp = (char *)object_update_param_get(update, 1, NULL);
+       if (tmp == NULL) {
+               CERROR("%s: empty flag for xattr set: rc = %d\n",
+                      tgt_name(tsi->tsi_tgt), -EPROTO);
+               RETURN(err_serious(-EPROTO));
+       }
+
+       if (ptlrpc_req_need_swab(tsi->tsi_pill->rc_req))
+               __swab64s((__u64 *)tmp);
+       pos = *(loff_t *)tmp;
+
+       rc = out_tx_write(tsi->tsi_env, obj, lbuf, pos,
+                         &tti->tti_tea,
+                         tti->tti_u.update.tti_update_reply,
+                         tti->tti_u.update.tti_update_reply_index);
+       RETURN(rc);
+}
+
 #define DEF_OUT_HNDL(opc, name, flags, fn)     \
 [opc - OUT_CREATE] = {                                 \
        .th_name    = name,                             \
@@ -1198,6 +1202,7 @@ static struct tgt_handler out_update_ops[] = {
                     MUTABOR | HABEO_REFERO, out_index_insert),
        DEF_OUT_HNDL(OUT_INDEX_DELETE, "out_index_delete",
                     MUTABOR | HABEO_REFERO, out_index_delete),
+       DEF_OUT_HNDL(OUT_WRITE, "out_write", MUTABOR | HABEO_REFERO, out_write),
 };
 
 struct tgt_handler *out_handler_find(__u32 opc)
@@ -1215,6 +1220,111 @@ struct tgt_handler *out_handler_find(__u32 opc)
        return h;
 }
 
+static int out_tx_start(const struct lu_env *env, struct dt_device *dt,
+                       struct thandle_exec_args *ta, struct obd_export *exp)
+{
+       memset(ta, 0, sizeof(*ta));
+       ta->ta_handle = dt_trans_create(env, dt);
+       if (IS_ERR(ta->ta_handle)) {
+               int rc;
+
+               rc = PTR_ERR(ta->ta_handle);
+               ta->ta_handle = NULL;
+               CERROR("%s: start handle error: rc = %d\n",
+                      dt_obd_name(dt), rc);
+               return rc;
+       }
+       ta->ta_dev = dt;
+       if (exp->exp_need_sync)
+               ta->ta_handle->th_sync = 1;
+
+       return 0;
+}
+
+static int out_trans_start(const struct lu_env *env,
+                          struct thandle_exec_args *ta)
+{
+       return dt_trans_start(env, ta->ta_dev, ta->ta_handle);
+}
+
+static int out_trans_stop(const struct lu_env *env,
+                         struct thandle_exec_args *ta, int err)
+{
+       int i;
+       int rc;
+
+       ta->ta_handle->th_result = err;
+       rc = dt_trans_stop(env, ta->ta_dev, ta->ta_handle);
+       for (i = 0; i < ta->ta_argno; i++) {
+               if (ta->ta_args[i].object != NULL) {
+                       struct dt_object *obj = ta->ta_args[i].object;
+
+                       /* If the object is being created during this
+                        * transaction, we need to remove them from the
+                        * cache immediately, because a few layers are
+                        * missing in OUT handler, i.e. the object might
+                        * not be initialized in all layers */
+                       if (ta->ta_args[i].exec_fn == out_tx_create_exec)
+                               set_bit(LU_OBJECT_HEARD_BANSHEE,
+                                       &obj->do_lu.lo_header->loh_flags);
+                       lu_object_put(env, &ta->ta_args[i].object->do_lu);
+                       ta->ta_args[i].object = NULL;
+               }
+       }
+
+       return rc;
+}
+
+int out_tx_end(const struct lu_env *env, struct thandle_exec_args *ta)
+{
+       struct tgt_session_info *tsi = tgt_ses_info(env);
+       int i = 0, rc;
+
+       LASSERT(ta->ta_dev);
+       LASSERT(ta->ta_handle);
+
+       if (ta->ta_err != 0 || ta->ta_argno == 0)
+               GOTO(stop, rc = ta->ta_err);
+
+       rc = out_trans_start(env, ta);
+       if (unlikely(rc))
+               GOTO(stop, rc);
+
+       for (i = 0; i < ta->ta_argno; i++) {
+               rc = ta->ta_args[i].exec_fn(env, ta->ta_handle,
+                                           &ta->ta_args[i]);
+               if (unlikely(rc != 0)) {
+                       CDEBUG(D_INFO, "error during execution of #%u from"
+                              " %s:%d: rc = %d\n", i, ta->ta_args[i].file,
+                              ta->ta_args[i].line, rc);
+                       while (--i >= 0) {
+                               if (ta->ta_args[i].undo_fn != NULL)
+                                       ta->ta_args[i].undo_fn(env,
+                                                              ta->ta_handle,
+                                                             &ta->ta_args[i]);
+                               else
+                                       CERROR("%s: undo for %s:%d: rc = %d\n",
+                                              dt_obd_name(ta->ta_dev),
+                                              ta->ta_args[i].file,
+                                              ta->ta_args[i].line, -ENOTSUPP);
+                       }
+                       break;
+               }
+       }
+
+       /* Only fail for real update */
+       tsi->tsi_reply_fail_id = OBD_FAIL_OUT_UPDATE_NET_REP;
+stop:
+       CDEBUG(D_INFO, "%s: executed %u/%u: rc = %d\n",
+              dt_obd_name(ta->ta_dev), i, ta->ta_argno, rc);
+       out_trans_stop(env, ta, rc);
+       ta->ta_handle = NULL;
+       ta->ta_argno = 0;
+       ta->ta_err = 0;
+
+       RETURN(rc);
+}
+
 /**
  * Object updates between Targets. Because all the updates has been
  * dis-assemblied into object updates at sender side, so OUT will
@@ -1314,7 +1424,7 @@ int out_handle(struct tgt_session_info *tsi)
                        /* Stop the current update transaction,
                         * create a new one */
                        rc = out_tx_end(env, ta);
-                       if (rc != 0)
+                       if (rc < 0)
                                RETURN(rc);
 
                        rc = out_tx_start(env, dt, ta, tsi->tsi_exp);