*/
/*
* Copyright (c) 2013, Intel Corporation.
- */
-/*
- * This file is part of Lustre, http://www.lustre.org/
- * Lustre is a trademark of Sun Microsystems, Inc.
*
* lustre/mdt/out_handler.c
*
static const char dot[] = ".";
static const char dotdot[] = "..";
+/* Current out and mdt shared the same thread info, but in the future,
+ * this should be decoupled with MDT XXX*/
+#define out_thread_info mdt_thread_info
+#define out_thread_key mdt_thread_key
+
+struct out_thread_info *out_env_info(const struct lu_env *env)
+{
+ struct out_thread_info *info;
+
+ info = lu_context_key_get(&env->le_ctx, &out_thread_key);
+ LASSERT(info != NULL);
+ return info;
+}
+
+static inline char *dt_obd_name(struct dt_device *dt)
+{
+ return dt->dd_lu_dev.ld_obd->obd_name;
+}
+
struct tx_arg *tx_add_exec(struct thandle_exec_args *ta, tx_exec_func_t func,
tx_exec_func_t undo, char *file, int line)
{
return &ta->ta_args[i];
}
-static int out_tx_start(const struct lu_env *env, struct mdt_device *mdt,
+static int out_tx_start(const struct lu_env *env, struct dt_device *dt,
struct thandle_exec_args *th)
{
- struct dt_device *dt = mdt->mdt_bottom;
-
memset(th, 0, sizeof(*th));
th->ta_handle = dt_trans_create(env, dt);
if (IS_ERR(th->ta_handle)) {
CERROR("%s: start handle error: rc = %ld\n",
- mdt2obd_dev(mdt)->obd_name, PTR_ERR(th->ta_handle));
+ dt_obd_name(dt), PTR_ERR(th->ta_handle));
return PTR_ERR(th->ta_handle);
}
th->ta_dev = dt;
return 0;
}
-static int out_trans_start(const struct lu_env *env, struct dt_device *dt,
- struct thandle *th)
+static int out_trans_start(const struct lu_env *env,
+ struct thandle_exec_args *th)
{
/* Always do sync commit for Phase I */
- LASSERT(th->th_sync != 0);
- return dt_trans_start(env, dt, th);
+ LASSERT(th->ta_handle->th_sync != 0);
+ return dt_trans_start(env, th->ta_dev, th->ta_handle);
}
static int out_trans_stop(const struct lu_env *env,
return rc;
}
-int out_tx_end(struct mdt_thread_info *info, struct thandle_exec_args *th)
+int out_tx_end(const struct lu_env *env, struct thandle_exec_args *th)
{
- struct thandle_exec_args *_th = &info->mti_handle;
+ struct out_thread_info *info = out_env_info(env);
int i = 0, rc;
- LASSERT(th == _th);
LASSERT(th->ta_dev);
LASSERT(th->ta_handle);
if (th->ta_err != 0 || th->ta_argno == 0)
GOTO(stop, rc = th->ta_err);
- rc = out_trans_start(info->mti_env, th->ta_dev, th->ta_handle);
+ rc = out_trans_start(env, th);
if (unlikely(rc))
GOTO(stop, rc);
for (i = 0; i < th->ta_argno; i++) {
- rc = th->ta_args[i].exec_fn(info, th->ta_handle,
+ rc = th->ta_args[i].exec_fn(env, th->ta_handle,
&th->ta_args[i]);
if (unlikely(rc)) {
CDEBUG(D_INFO, "error during execution of #%u from"
while (--i >= 0) {
LASSERTF(th->ta_args[i].undo_fn != NULL,
"can't undo changes, hope for failover!\n");
- th->ta_args[i].undo_fn(info, th->ta_handle,
+ th->ta_args[i].undo_fn(env, th->ta_handle,
&th->ta_args[i]);
}
break;
}
}
+
+ /* Only fail for real update */
+ info->mti_fail_id = OBD_FAIL_UPDATE_OBJ_NET_REP;
stop:
CDEBUG(D_INFO, "%s: executed %u/%u: rc = %d\n",
- mdt_obd_name(info->mti_mdt), i, th->ta_argno, rc);
- out_trans_stop(info->mti_env, th, rc);
+ dt_obd_name(th->ta_dev), i, th->ta_argno, rc);
+ out_trans_stop(env, th, rc);
th->ta_handle = NULL;
th->ta_argno = 0;
th->ta_err = 0;
RETURN(rc);
}
-static struct dt_object *out_get_dt_obj(struct lu_object *obj)
+static void out_reconstruct(const struct lu_env *env, struct dt_device *dt,
+ struct dt_object *obj, struct update_reply *reply,
+ int index)
{
- struct mdt_device *mdt;
- struct dt_device *dt;
- struct lu_object *bottom_obj;
+ CDEBUG(D_INFO, "%s: fork reply reply %p index %d: rc = %d\n",
+ dt_obd_name(dt), reply, index, 0);
- mdt = lu2mdt_dev(obj->lo_dev);
- dt = mdt->mdt_bottom;
+ update_insert_reply(reply, NULL, 0, index, 0);
+ return;
+}
- bottom_obj = lu_object_locate(obj->lo_header, dt->dd_lu_dev.ld_type);
- if (bottom_obj == NULL)
- return ERR_PTR(-ENOENT);
+typedef void (*out_reconstruct_t)(const struct lu_env *env,
+ struct dt_device *dt,
+ struct dt_object *obj,
+ struct update_reply *reply,
+ int index);
+
+static inline int out_check_resent(const struct lu_env *env,
+ struct dt_device *dt,
+ struct dt_object *obj,
+ struct ptlrpc_request *req,
+ out_reconstruct_t reconstruct,
+ struct update_reply *reply,
+ int index)
+{
+ if (likely(!(lustre_msg_get_flags(req->rq_reqmsg) & MSG_RESENT)))
+ return 0;
- return lu2dt_obj(bottom_obj);
+ if (req_xid_is_last(req)) {
+ reconstruct(env, dt, obj, reply, index);
+ return 1;
+ }
+ DEBUG_REQ(D_HA, req, "no reply for RESENT req (have "LPD64")",
+ req->rq_export->exp_target_data.ted_lcd->lcd_last_xid);
+ return 0;
}
-static struct dt_object *out_object_find(struct mdt_thread_info *info,
- struct lu_fid *fid)
+static int out_obj_destroy(const struct lu_env *env, struct dt_object *dt_obj,
+ struct thandle *th)
{
- struct lu_object *obj;
- struct dt_object *dt_obj;
-
- obj = lu_object_find(info->mti_env,
- &info->mti_mdt->mdt_md_dev.md_lu_dev,
- fid, NULL);
- if (IS_ERR(obj))
- return (struct dt_object *)obj;
-
- dt_obj = out_get_dt_obj(obj);
- if (IS_ERR(dt_obj)) {
- lu_object_put(info->mti_env, obj);
- return ERR_PTR(-ENOENT);
- }
+ int rc;
- return dt_obj;
+ CDEBUG(D_INFO, "%s: destroy "DFID"\n", dt_obd_name(th->th_dev),
+ PFID(lu_object_fid(&dt_obj->do_lu)));
+
+ dt_write_lock(env, dt_obj, MOR_TGT_CHILD);
+ rc = dt_destroy(env, dt_obj, th);
+ dt_write_unlock(env, dt_obj);
+
+ return rc;
}
/**
* declare phase, i.e. if declare succeed, it should make sure
* the following executing phase succeed in anyway, so these undo
* should be useless for most of the time in Phase I
- **/
-int out_tx_create_undo(struct mdt_thread_info *info, struct thandle *th,
+ */
+int out_tx_create_undo(const struct lu_env *env, struct thandle *th,
struct tx_arg *arg)
{
- struct dt_object *dt_obj = arg->object;
int rc;
- LASSERT(dt_obj != NULL && !IS_ERR(dt_obj));
-
- dt_write_lock(info->mti_env, dt_obj, MOR_TGT_CHILD);
- rc = dt_destroy(info->mti_env, dt_obj, th);
- dt_write_unlock(info->mti_env, dt_obj);
-
- /* we don't like double failures */
+ rc = out_obj_destroy(env, arg->object, th);
if (rc != 0)
CERROR("%s: undo failure, we are doomed!: rc = %d\n",
- mdt_obd_name(info->mti_mdt), rc);
+ dt_obd_name(th->th_dev), rc);
return rc;
}
-int out_tx_create_exec(struct mdt_thread_info *info, struct thandle *th,
+int out_tx_create_exec(const struct lu_env *env, struct thandle *th,
struct tx_arg *arg)
{
struct dt_object *dt_obj = arg->object;
int rc;
- LASSERT(dt_obj != NULL && !IS_ERR(dt_obj));
-
- CDEBUG(D_OTHER, "create "DFID": dof %u, mode %o\n",
+ CDEBUG(D_OTHER, "%s: create "DFID": dof %u, mode %o\n",
+ dt_obd_name(th->th_dev),
PFID(lu_object_fid(&arg->object->do_lu)),
arg->u.create.dof.dof_type,
arg->u.create.attr.la_mode & S_IFMT);
- dt_write_lock(info->mti_env, dt_obj, MOR_TGT_CHILD);
- rc = dt_create(info->mti_env, dt_obj, &arg->u.create.attr,
+ dt_write_lock(env, dt_obj, MOR_TGT_CHILD);
+ rc = dt_create(env, dt_obj, &arg->u.create.attr,
&arg->u.create.hint, &arg->u.create.dof, th);
- dt_write_unlock(info->mti_env, dt_obj);
- CDEBUG(D_INFO, "insert create reply mode %o index %d\n",
- arg->u.create.attr.la_mode, arg->index);
+ dt_write_unlock(env, dt_obj);
+
+ CDEBUG(D_INFO, "%s: insert create reply %p index %d: rc = %d\n",
+ dt_obd_name(th->th_dev), arg->reply, arg->index, rc);
update_insert_reply(arg->reply, NULL, 0, arg->index, rc);
return rc;
}
-static int __out_tx_create(struct mdt_thread_info *info, struct dt_object *obj,
+static int __out_tx_create(const struct lu_env *env, struct dt_object *obj,
struct lu_attr *attr, struct lu_fid *parent_fid,
struct dt_object_format *dof,
struct thandle_exec_args *th,
struct tx_arg *arg;
LASSERT(th->ta_handle != NULL);
- th->ta_err = dt_declare_create(info->mti_env, obj, attr, NULL, dof,
+ th->ta_err = dt_declare_create(env, obj, attr, NULL, dof,
th->ta_handle);
if (th->ta_err != 0)
return th->ta_err;
return 0;
}
-static int out_create(struct mdt_thread_info *info)
+static int out_create(struct out_thread_info *info)
{
struct update *update = info->mti_u.update.mti_update;
struct dt_object *obj = info->mti_u.update.mti_dt_object;
}
}
- rc = out_tx_create(info, obj, attr, fid, dof, &info->mti_handle,
+ if (lu_object_exists(&obj->do_lu))
+ RETURN(-EEXIST);
+
+ rc = out_tx_create(info->mti_env, obj, attr, fid, dof,
+ &info->mti_handle,
info->mti_u.update.mti_update_reply,
info->mti_u.update.mti_update_reply_index);
RETURN(rc);
}
-static int out_tx_attr_set_undo(struct mdt_thread_info *info,
+static int out_tx_attr_set_undo(const struct lu_env *env,
struct thandle *th, struct tx_arg *arg)
{
CERROR("%s: attr set undo "DFID" unimplemented yet!: rc = %d\n",
- mdt_obd_name(info->mti_mdt),
+ dt_obd_name(th->th_dev),
PFID(lu_object_fid(&arg->object->do_lu)), -ENOTSUPP);
return -ENOTSUPP;
}
-static int out_tx_attr_set_exec(struct mdt_thread_info *info,
- struct thandle *th, struct tx_arg *arg)
+static int out_tx_attr_set_exec(const struct lu_env *env, struct thandle *th,
+ struct tx_arg *arg)
{
struct dt_object *dt_obj = arg->object;
int rc;
- LASSERT(dt_obj != NULL && !IS_ERR(dt_obj));
-
- CDEBUG(D_OTHER, "attr set "DFID"\n",
- PFID(lu_object_fid(&arg->object->do_lu)));
+ CDEBUG(D_OTHER, "%s: attr set "DFID"\n", dt_obd_name(th->th_dev),
+ PFID(lu_object_fid(&dt_obj->do_lu)));
- dt_write_lock(info->mti_env, dt_obj, MOR_TGT_CHILD);
- rc = dt_attr_set(info->mti_env, dt_obj, &arg->u.attr_set.attr,
+ dt_write_lock(env, dt_obj, MOR_TGT_CHILD);
+ rc = dt_attr_set(env, dt_obj, &arg->u.attr_set.attr,
th, NULL);
- dt_write_unlock(info->mti_env, dt_obj);
+ dt_write_unlock(env, dt_obj);
+
+ CDEBUG(D_INFO, "%s: insert attr_set reply %p index %d: rc = %d\n",
+ dt_obd_name(th->th_dev), arg->reply, arg->index, rc);
update_insert_reply(arg->reply, NULL, 0, arg->index, rc);
return rc;
}
-static int __out_tx_attr_set(struct mdt_thread_info *info,
+static int __out_tx_attr_set(const struct lu_env *env,
struct dt_object *dt_obj,
const struct lu_attr *attr,
struct thandle_exec_args *th,
struct tx_arg *arg;
LASSERT(th->ta_handle != NULL);
- th->ta_err = dt_declare_attr_set(info->mti_env, dt_obj, attr,
- th->ta_handle);
+ th->ta_err = dt_declare_attr_set(env, dt_obj, attr, th->ta_handle);
if (th->ta_err != 0)
return th->ta_err;
return 0;
}
-static int out_attr_set(struct mdt_thread_info *info)
+static int out_attr_set(struct out_thread_info *info)
{
struct update *update = info->mti_u.update.mti_update;
struct lu_attr *attr = &info->mti_attr.ma_attr;
lustre_get_wire_obdo(lobdo, wobdo);
la_from_obdo(attr, lobdo, lobdo->o_valid);
- rc = out_tx_attr_set(info, obj, attr, &info->mti_handle,
+ rc = out_tx_attr_set(info->mti_env, obj, attr, &info->mti_handle,
info->mti_u.update.mti_update_reply,
info->mti_u.update.mti_update_reply_index);
RETURN(rc);
}
-static int out_attr_get(struct mdt_thread_info *info)
+static int out_attr_get(struct out_thread_info *info)
{
struct obdo *obdo = &info->mti_u.update.mti_obdo;
const struct lu_env *env = info->mti_env;
out_unlock:
dt_read_unlock(env, obj);
+
+ CDEBUG(D_INFO, "%s: insert attr get reply %p index %d: rc = %d\n",
+ mdt_obd_name(info->mti_mdt),
+ info->mti_u.update.mti_update_reply, 0, rc);
+
update_insert_reply(info->mti_u.update.mti_update_reply, obdo,
sizeof(*obdo), 0, rc);
RETURN(rc);
}
-static int out_xattr_get(struct mdt_thread_info *info)
+static int out_xattr_get(struct out_thread_info *info)
{
struct update *update = info->mti_u.update.mti_update;
const struct lu_env *env = info->mti_env;
RETURN(rc);
}
-static int out_index_lookup(struct mdt_thread_info *info)
+static int out_index_lookup(struct out_thread_info *info)
{
struct update *update = info->mti_u.update.mti_update;
const struct lu_env *env = info->mti_env;
out_unlock:
dt_read_unlock(env, obj);
+
+ CDEBUG(D_INFO, "%s: insert lookup reply %p index %d: rc = %d\n",
+ mdt_obd_name(info->mti_mdt),
+ info->mti_u.update.mti_update_reply, 0, rc);
+
update_insert_reply(info->mti_u.update.mti_update_reply,
&info->mti_tmp_fid1, sizeof(info->mti_tmp_fid1),
0, rc);
RETURN(rc);
}
-static int out_tx_xattr_set_exec(struct mdt_thread_info *info,
+static int out_tx_xattr_set_exec(const struct lu_env *env,
struct thandle *th,
struct tx_arg *arg)
{
struct dt_object *dt_obj = arg->object;
int rc;
- LASSERT(dt_obj != NULL && !IS_ERR(dt_obj));
+ CDEBUG(D_INFO, "%s: set xattr buf %p name %s flag %d\n",
+ dt_obd_name(th->th_dev), arg->u.xattr_set.buf.lb_buf,
+ arg->u.xattr_set.name, arg->u.xattr_set.flags);
- CDEBUG(D_OTHER, "attr set "DFID"\n",
- PFID(lu_object_fid(&arg->object->do_lu)));
-
- dt_write_lock(info->mti_env, dt_obj, MOR_TGT_CHILD);
- rc = dt_xattr_set(info->mti_env, dt_obj, &arg->u.xattr_set.buf,
+ dt_write_lock(env, dt_obj, MOR_TGT_CHILD);
+ rc = dt_xattr_set(env, dt_obj, &arg->u.xattr_set.buf,
arg->u.xattr_set.name, arg->u.xattr_set.flags,
th, NULL);
- dt_write_unlock(info->mti_env, dt_obj);
+ dt_write_unlock(env, dt_obj);
/**
* Ignore errors if this is LINK EA
**/
strlen(XATTR_NAME_LINK))))
rc = 0;
- update_insert_reply(arg->reply, NULL, 0, arg->index, rc);
+ CDEBUG(D_INFO, "%s: insert xattr set reply %p index %d: rc = %d\n",
+ dt_obd_name(th->th_dev), arg->reply, arg->index, rc);
- CDEBUG(D_INFO, "set xattr buf %p name %s flag %d\n",
- arg->u.xattr_set.buf.lb_buf, arg->u.xattr_set.name,
- arg->u.xattr_set.flags);
+ update_insert_reply(arg->reply, NULL, 0, arg->index, rc);
return rc;
}
-static int __out_tx_xattr_set(struct mdt_thread_info *info,
+static int __out_tx_xattr_set(const struct lu_env *env,
struct dt_object *dt_obj,
const struct lu_buf *buf,
const char *name, int flags,
struct tx_arg *arg;
LASSERT(th->ta_handle != NULL);
- th->ta_err = dt_declare_xattr_set(info->mti_env, dt_obj, buf, name,
+ th->ta_err = dt_declare_xattr_set(env, dt_obj, buf, name,
flags, th->ta_handle);
if (th->ta_err != 0)
return th->ta_err;
return 0;
}
-static int out_xattr_set(struct mdt_thread_info *info)
+static int out_xattr_set(struct out_thread_info *info)
{
struct update *update = info->mti_u.update.mti_update;
struct dt_object *obj = info->mti_u.update.mti_dt_object;
flag = le32_to_cpu(*(int *)tmp);
- rc = out_tx_xattr_set(info, obj, lbuf, name, flag, &info->mti_handle,
- info->mti_u.update.mti_update_reply,
- info->mti_u.update.mti_update_reply_index);
-
+ rc = out_tx_xattr_set(info->mti_env, obj, lbuf, name, flag,
+ &info->mti_handle,
+ info->mti_u.update.mti_update_reply,
+ info->mti_u.update.mti_update_reply_index);
RETURN(rc);
}
-static int out_tx_ref_add_exec(struct mdt_thread_info *info, struct thandle *th,
- struct tx_arg *arg)
+static int out_obj_ref_add(const struct lu_env *env,
+ struct dt_object *dt_obj,
+ struct thandle *th)
{
- struct dt_object *dt_obj = arg->object;
+ int rc;
- LASSERT(dt_obj != NULL && !IS_ERR(dt_obj));
+ dt_write_lock(env, dt_obj, MOR_TGT_CHILD);
+ rc = dt_ref_add(env, dt_obj, th);
+ dt_write_unlock(env, dt_obj);
- CDEBUG(D_OTHER, "ref add "DFID"\n",
- PFID(lu_object_fid(&arg->object->do_lu)));
+ return rc;
+}
- dt_write_lock(info->mti_env, dt_obj, MOR_TGT_CHILD);
- dt_ref_add(info->mti_env, dt_obj, th);
- dt_write_unlock(info->mti_env, dt_obj);
+static int out_obj_ref_del(const struct lu_env *env,
+ struct dt_object *dt_obj,
+ struct thandle *th)
+{
+ int rc;
- update_insert_reply(arg->reply, NULL, 0, arg->index, 0);
- return 0;
+ dt_write_lock(env, dt_obj, MOR_TGT_CHILD);
+ rc = dt_ref_del(env, dt_obj, th);
+ dt_write_unlock(env, dt_obj);
+
+ return rc;
}
-static int out_tx_ref_add_undo(struct mdt_thread_info *info, struct thandle *th,
+static int out_tx_ref_add_exec(const struct lu_env *env, struct thandle *th,
struct tx_arg *arg)
{
struct dt_object *dt_obj = arg->object;
+ int rc;
- LASSERT(dt_obj != NULL && !IS_ERR(dt_obj));
+ rc = out_obj_ref_add(env, dt_obj, th);
- CDEBUG(D_OTHER, "ref del "DFID"\n",
- PFID(lu_object_fid(&arg->object->do_lu)));
+ CDEBUG(D_INFO, "%s: insert ref_add reply %p index %d: rc = %d\n",
+ dt_obd_name(th->th_dev), arg->reply, arg->index, rc);
- dt_write_lock(info->mti_env, dt_obj, MOR_TGT_CHILD);
- dt_ref_del(info->mti_env, dt_obj, th);
- dt_write_unlock(info->mti_env, dt_obj);
+ update_insert_reply(arg->reply, NULL, 0, arg->index, rc);
+ return rc;
+}
- return 0;
+static int out_tx_ref_add_undo(const struct lu_env *env, struct thandle *th,
+ struct tx_arg *arg)
+{
+ return out_obj_ref_del(env, arg->object, th);
}
-static int __out_tx_ref_add(struct mdt_thread_info *info,
+static int __out_tx_ref_add(const struct lu_env *env,
struct dt_object *dt_obj,
struct thandle_exec_args *th,
struct update_reply *reply,
struct tx_arg *arg;
LASSERT(th->ta_handle != NULL);
- th->ta_err = dt_declare_ref_add(info->mti_env, dt_obj,
- th->ta_handle);
+ th->ta_err = dt_declare_ref_add(env, dt_obj, th->ta_handle);
if (th->ta_err != 0)
return th->ta_err;
/**
* increase ref of the object
**/
-static int out_ref_add(struct mdt_thread_info *info)
+static int out_ref_add(struct out_thread_info *info)
{
struct dt_object *obj = info->mti_u.update.mti_dt_object;
int rc;
ENTRY;
- rc = out_tx_ref_add(info, obj, &info->mti_handle,
+ rc = out_tx_ref_add(info->mti_env, obj, &info->mti_handle,
info->mti_u.update.mti_update_reply,
info->mti_u.update.mti_update_reply_index);
RETURN(rc);
}
-static int out_tx_ref_del_exec(struct mdt_thread_info *info, struct thandle *th,
+static int out_tx_ref_del_exec(const struct lu_env *env, struct thandle *th,
struct tx_arg *arg)
{
- out_tx_ref_add_undo(info, th, arg);
- update_insert_reply(arg->reply, NULL, 0, arg->index, 0);
- return 0;
+ struct dt_object *dt_obj = arg->object;
+ int rc;
+
+ rc = out_obj_ref_del(env, dt_obj, th);
+
+ CDEBUG(D_INFO, "%s: insert ref_del reply %p index %d: rc = %d\n",
+ dt_obd_name(th->th_dev), arg->reply, arg->index, 0);
+
+ update_insert_reply(arg->reply, NULL, 0, arg->index, rc);
+
+ return rc;
}
-static int out_tx_ref_del_undo(struct mdt_thread_info *info, struct thandle *th,
+static int out_tx_ref_del_undo(const struct lu_env *env, struct thandle *th,
struct tx_arg *arg)
{
- return out_tx_ref_add_exec(info, th, arg);
+ return out_obj_ref_add(env, arg->object, th);
}
-static int __out_tx_ref_del(struct mdt_thread_info *info,
+static int __out_tx_ref_del(const struct lu_env *env,
struct dt_object *dt_obj,
struct thandle_exec_args *th,
struct update_reply *reply,
struct tx_arg *arg;
LASSERT(th->ta_handle != NULL);
- th->ta_err = dt_declare_ref_del(info->mti_env, dt_obj, th->ta_handle);
+ th->ta_err = dt_declare_ref_del(env, dt_obj, th->ta_handle);
if (th->ta_err != 0)
return th->ta_err;
return 0;
}
-static int out_ref_del(struct mdt_thread_info *info)
+static int out_ref_del(struct out_thread_info *info)
{
struct dt_object *obj = info->mti_u.update.mti_dt_object;
int rc;
if (!lu_object_exists(&obj->do_lu))
RETURN(-ENOENT);
- rc = out_tx_ref_del(info, obj, &info->mti_handle,
+ rc = out_tx_ref_del(info->mti_env, obj, &info->mti_handle,
info->mti_u.update.mti_update_reply,
info->mti_u.update.mti_update_reply_index);
RETURN(rc);
}
-static int out_tx_index_insert_exec(struct mdt_thread_info *info,
- struct thandle *th, struct tx_arg *arg)
+static int out_obj_index_insert(const struct lu_env *env,
+ struct dt_object *dt_obj,
+ const struct dt_rec *rec,
+ const struct dt_key *key,
+ struct thandle *th)
{
- struct dt_object *dt_obj = arg->object;
int rc;
- LASSERT(dt_obj != NULL && !IS_ERR(dt_obj));
-
- CDEBUG(D_OTHER, "index insert "DFID" name: %s fid "DFID"\n",
- PFID(lu_object_fid(&arg->object->do_lu)),
- (char *)arg->u.insert.key,
- PFID((struct lu_fid *)arg->u.insert.rec));
+ CDEBUG(D_INFO, "%s: index insert "DFID" name: %s fid "DFID"\n",
+ dt_obd_name(th->th_dev), PFID(lu_object_fid(&dt_obj->do_lu)),
+ (char *)key, PFID((struct lu_fid *)rec));
- if (dt_try_as_dir(info->mti_env, dt_obj) == 0)
+ if (dt_try_as_dir(env, dt_obj) == 0)
return -ENOTDIR;
- dt_write_lock(info->mti_env, dt_obj, MOR_TGT_CHILD);
- rc = dt_insert(info->mti_env, dt_obj, arg->u.insert.rec,
- arg->u.insert.key, th, NULL, 0);
- dt_write_unlock(info->mti_env, dt_obj);
+ dt_write_lock(env, dt_obj, MOR_TGT_CHILD);
+ rc = dt_insert(env, dt_obj, rec, key, th, NULL, 0);
+ dt_write_unlock(env, dt_obj);
- update_insert_reply(arg->reply, NULL, 0, arg->index, rc);
+ return rc;
+}
+
+static int out_obj_index_delete(const struct lu_env *env,
+ struct dt_object *dt_obj,
+ const struct dt_key *key,
+ struct thandle *th)
+{
+ int rc;
+
+ CDEBUG(D_INFO, "%s: index delete "DFID" name: %s\n",
+ dt_obd_name(th->th_dev), PFID(lu_object_fid(&dt_obj->do_lu)),
+ (char *)key);
+
+ if (dt_try_as_dir(env, dt_obj) == 0)
+ return -ENOTDIR;
+
+ dt_write_lock(env, dt_obj, MOR_TGT_CHILD);
+ rc = dt_delete(env, dt_obj, key, th, NULL);
+ dt_write_unlock(env, dt_obj);
return rc;
}
-static int out_tx_index_insert_undo(struct mdt_thread_info *info,
+static int out_tx_index_insert_exec(const struct lu_env *env,
struct thandle *th, struct tx_arg *arg)
{
struct dt_object *dt_obj = arg->object;
int rc;
- LASSERT(dt_obj != NULL && !IS_ERR(dt_obj));
+ rc = out_obj_index_insert(env, dt_obj, arg->u.insert.rec,
+ arg->u.insert.key, th);
- CDEBUG(D_OTHER, "index delete "DFID" name: %s\n",
- PFID(lu_object_fid(&arg->object->do_lu)),
- (char *)arg->u.insert.key);
-
- if (dt_try_as_dir(info->mti_env, dt_obj) == 0) {
- CERROR("%s: "DFID" is not directory: rc = %d\n",
- mdt_obd_name(info->mti_mdt),
- PFID(lu_object_fid(&dt_obj->do_lu)), -ENOTDIR);
- return -ENOTDIR;
- }
-
- dt_write_lock(info->mti_env, dt_obj, MOR_TGT_CHILD);
- rc = dt_delete(info->mti_env, dt_obj, arg->u.insert.key, th, NULL);
- dt_write_unlock(info->mti_env, dt_obj);
+ CDEBUG(D_INFO, "%s: insert idx insert reply %p index %d: rc = %d\n",
+ dt_obd_name(th->th_dev), arg->reply, arg->index, rc);
update_insert_reply(arg->reply, NULL, 0, arg->index, rc);
return rc;
}
-static int __out_tx_index_insert(struct mdt_thread_info *info,
+static int out_tx_index_insert_undo(const struct lu_env *env,
+ struct thandle *th, struct tx_arg *arg)
+{
+ return out_obj_index_delete(env, arg->object, arg->u.insert.key, th);
+}
+
+static int __out_tx_index_insert(const struct lu_env *env,
struct dt_object *dt_obj,
char *name, struct lu_fid *fid,
struct thandle_exec_args *th,
LASSERT(th->ta_handle != NULL);
if (lu_object_exists(&dt_obj->do_lu)) {
- if (dt_try_as_dir(info->mti_env, dt_obj) == 0) {
+ if (dt_try_as_dir(env, dt_obj) == 0) {
th->ta_err = -ENOTDIR;
return th->ta_err;
}
- th->ta_err = dt_declare_insert(info->mti_env, dt_obj,
+ th->ta_err = dt_declare_insert(env, dt_obj,
(struct dt_rec *)fid,
(struct dt_key *)name,
th->ta_handle);
return 0;
}
-static int out_index_insert(struct mdt_thread_info *info)
+static int out_index_insert(struct out_thread_info *info)
{
struct update *update = info->mti_u.update.mti_update;
struct dt_object *obj = info->mti_u.update.mti_dt_object;
RETURN(err_serious(-EPROTO));
}
- rc = out_tx_index_insert(info, obj, name, fid, &info->mti_handle,
+ rc = out_tx_index_insert(info->mti_env, obj, name, fid,
+ &info->mti_handle,
info->mti_u.update.mti_update_reply,
info->mti_u.update.mti_update_reply_index);
RETURN(rc);
}
-static int out_tx_index_delete_exec(struct mdt_thread_info *info,
+static int out_tx_index_delete_exec(const struct lu_env *env,
struct thandle *th,
struct tx_arg *arg)
{
- return out_tx_index_insert_undo(info, th, arg);
+ int rc;
+
+ rc = out_obj_index_delete(env, arg->object, arg->u.insert.key, th);
+
+ CDEBUG(D_INFO, "%s: insert idx insert reply %p index %d: rc = %d\n",
+ dt_obd_name(th->th_dev), arg->reply, arg->index, rc);
+
+ update_insert_reply(arg->reply, NULL, 0, arg->index, rc);
+
+ return rc;
}
-static int out_tx_index_delete_undo(struct mdt_thread_info *info,
+static int out_tx_index_delete_undo(const struct lu_env *env,
struct thandle *th,
struct tx_arg *arg)
{
- return out_tx_index_insert_exec(info, th, arg);
+ CERROR("%s: Oops, can not rollback index_delete yet: rc = %d\n",
+ dt_obd_name(th->th_dev), -ENOTSUPP);
+ return -ENOTSUPP;
}
-static int __out_tx_index_delete(struct mdt_thread_info *info,
+static int __out_tx_index_delete(const struct lu_env *env,
struct dt_object *dt_obj, char *name,
struct thandle_exec_args *th,
struct update_reply *reply,
{
struct tx_arg *arg;
- if (dt_try_as_dir(info->mti_env, dt_obj) == 0) {
+ if (dt_try_as_dir(env, dt_obj) == 0) {
th->ta_err = -ENOTDIR;
return th->ta_err;
}
LASSERT(th->ta_handle != NULL);
- th->ta_err = dt_declare_delete(info->mti_env, dt_obj,
+ th->ta_err = dt_declare_delete(env, dt_obj,
(struct dt_key *)name,
th->ta_handle);
if (th->ta_err != 0)
return 0;
}
-static int out_index_delete(struct mdt_thread_info *info)
+static int out_index_delete(struct out_thread_info *info)
{
struct update *update = info->mti_u.update.mti_update;
struct dt_object *obj = info->mti_u.update.mti_dt_object;
RETURN(err_serious(-EPROTO));
}
- rc = out_tx_index_delete(info, obj, name, &info->mti_handle,
+ rc = out_tx_index_delete(info->mti_env, obj, name, &info->mti_handle,
info->mti_u.update.mti_update_reply,
info->mti_u.update.mti_update_reply_index);
RETURN(rc);
}
+static int out_tx_destroy_exec(const struct lu_env *env, struct thandle *th,
+ struct tx_arg *arg)
+{
+ struct dt_object *dt_obj = arg->object;
+ int rc;
+
+ rc = out_obj_destroy(env, dt_obj, th);
+
+ CDEBUG(D_INFO, "%s: insert destroy reply %p index %d: rc = %d\n",
+ dt_obd_name(th->th_dev), arg->reply, arg->index, rc);
+
+ update_insert_reply(arg->reply, NULL, 0, arg->index, rc);
+
+ RETURN(rc);
+}
+
+static int out_tx_destroy_undo(const struct lu_env *env, struct thandle *th,
+ struct tx_arg *arg)
+{
+ CERROR("%s: not support destroy undo yet!: rc = %d\n",
+ dt_obd_name(th->th_dev), -ENOTSUPP);
+ return -ENOTSUPP;
+}
+
+static int __out_tx_destroy(const struct lu_env *env, struct dt_object *dt_obj,
+ struct thandle_exec_args *th,
+ struct update_reply *reply,
+ int index, char *file, int line)
+{
+ struct tx_arg *arg;
+
+ LASSERT(th->ta_handle != NULL);
+ th->ta_err = dt_declare_destroy(env, dt_obj, th->ta_handle);
+ if (th->ta_err)
+ return th->ta_err;
+
+ arg = tx_add_exec(th, out_tx_destroy_exec, out_tx_destroy_undo,
+ file, line);
+ LASSERT(arg);
+ lu_object_get(&dt_obj->do_lu);
+ arg->object = dt_obj;
+ arg->reply = reply;
+ arg->index = index;
+ return 0;
+}
+
+static int out_destroy(struct out_thread_info *info)
+{
+ struct update *update = info->mti_u.update.mti_update;
+ struct dt_object *obj = info->mti_u.update.mti_dt_object;
+ struct lu_fid *fid;
+ int rc;
+ ENTRY;
+
+ fid = &update->u_fid;
+ fid_le_to_cpu(fid, fid);
+ if (!fid_is_sane(fid)) {
+ CERROR("%s: invalid FID "DFID": rc = %d\n",
+ mdt_obd_name(info->mti_mdt), PFID(fid), -EPROTO);
+ RETURN(err_serious(-EPROTO));
+ }
+
+ if (!lu_object_exists(&obj->do_lu))
+ RETURN(-ENOENT);
+
+ rc = out_tx_destroy(info->mti_env, obj, &info->mti_handle,
+ info->mti_u.update.mti_update_reply,
+ info->mti_u.update.mti_update_reply_index);
+
+ RETURN(rc);
+}
+
#define DEF_OUT_HNDL(opc, name, fail_id, flags, fn) \
[opc - OBJ_CREATE] = { \
.mh_name = name, \
static struct out_handler out_update_ops[] = {
DEF_OUT_HNDL(OBJ_CREATE, "obj_create", 0, MUTABOR | HABEO_REFERO,
out_create),
+ DEF_OUT_HNDL(OBJ_DESTROY, "obj_create", 0, MUTABOR | HABEO_REFERO,
+ out_destroy),
DEF_OUT_HNDL(OBJ_REF_ADD, "obj_ref_add", 0, MUTABOR | HABEO_REFERO,
out_ref_add),
DEF_OUT_HNDL(OBJ_REF_DEL, "obj_ref_del", 0, MUTABOR | HABEO_REFERO,
out_xattr_set),
DEF_OUT_HNDL(OBJ_XATTR_GET, "obj_xattr_get", 0, HABEO_REFERO,
out_xattr_get),
- DEF_OUT_HNDL(OBJ_INDEX_LOOKUP, "obj_index_lookup", 0,
- HABEO_REFERO, out_index_lookup),
+ DEF_OUT_HNDL(OBJ_INDEX_LOOKUP, "obj_index_lookup", 0, HABEO_REFERO,
+ out_index_lookup),
DEF_OUT_HNDL(OBJ_INDEX_INSERT, "obj_index_insert", 0,
MUTABOR | HABEO_REFERO, out_index_insert),
DEF_OUT_HNDL(OBJ_INDEX_DELETE, "obj_index_delete", 0,
* Please refer to lustre/include/lustre/lustre_idl.h for req/reply
* format.
*/
-int out_handle(struct mdt_thread_info *info)
+int out_handle(struct out_thread_info *info)
{
- struct req_capsule *pill = info->mti_pill;
- struct update_buf *ubuf;
- struct update *update;
- struct thandle_exec_args *th = &info->mti_handle;
- int bufsize;
- int count;
- unsigned off;
- int i;
- int rc = 0;
- int rc1 = 0;
+ struct thandle_exec_args *th = &info->mti_handle;
+ struct req_capsule *pill = info->mti_pill;
+ struct mdt_device *mdt = info->mti_mdt;
+ struct dt_device *dt = mdt->mdt_bottom;
+ const struct lu_env *env = info->mti_env;
+ struct update_buf *ubuf;
+ struct update *update;
+ struct update_reply *update_reply;
+ int bufsize;
+ int count;
+ int old_batchid = -1;
+ unsigned off;
+ int i;
+ int rc = 0;
+ int rc1 = 0;
ENTRY;
req_capsule_set(pill, &RQF_UPDATE_OBJ);
bufsize = req_capsule_get_size(pill, &RMF_UPDATE, RCL_CLIENT);
if (bufsize != UPDATE_BUFFER_SIZE) {
CERROR("%s: invalid bufsize %d: rc = %d\n",
- mdt_obd_name(info->mti_mdt), bufsize, -EPROTO);
+ mdt_obd_name(mdt), bufsize, -EPROTO);
RETURN(err_serious(-EPROTO));
}
ubuf = req_capsule_client_get(pill, &RMF_UPDATE);
if (ubuf == NULL) {
- CERROR("%s: No buf!: rc = %d\n", mdt_obd_name(info->mti_mdt),
+ CERROR("%s: No buf!: rc = %d\n", mdt_obd_name(mdt),
-EPROTO);
RETURN(err_serious(-EPROTO));
}
if (le32_to_cpu(ubuf->ub_magic) != UPDATE_BUFFER_MAGIC) {
CERROR("%s: invalid magic %x expect %x: rc = %d\n",
- mdt_obd_name(info->mti_mdt), le32_to_cpu(ubuf->ub_magic),
+ mdt_obd_name(mdt), le32_to_cpu(ubuf->ub_magic),
UPDATE_BUFFER_MAGIC, -EPROTO);
RETURN(err_serious(-EPROTO));
}
count = le32_to_cpu(ubuf->ub_count);
if (count <= 0) {
CERROR("%s: No update!: rc = %d\n",
- mdt_obd_name(info->mti_mdt), -EPROTO);
+ mdt_obd_name(mdt), -EPROTO);
RETURN(err_serious(-EPROTO));
}
rc = req_capsule_server_pack(pill);
if (rc != 0) {
CERROR("%s: Can't pack response: rc = %d\n",
- mdt_obd_name(info->mti_mdt), rc);
+ mdt_obd_name(mdt), rc);
RETURN(rc);
}
/* Prepare the update reply buffer */
- info->mti_u.update.mti_update_reply =
- req_capsule_server_get(pill, &RMF_UPDATE_REPLY);
- update_init_reply_buf(info->mti_u.update.mti_update_reply, count);
+ update_reply = req_capsule_server_get(pill, &RMF_UPDATE_REPLY);
+ update_init_reply_buf(update_reply, count);
+ info->mti_u.update.mti_update_reply = update_reply;
- rc = out_tx_start(info->mti_env, info->mti_mdt, th);
+ rc = out_tx_start(env, dt, th);
if (rc != 0)
RETURN(rc);
struct dt_object *dt_obj;
update = (struct update *)((char *)ubuf + off);
+ if (old_batchid == -1) {
+ old_batchid = update->u_batchid;
+ } else if (old_batchid != update->u_batchid) {
+ /* Stop the current update transaction,
+ * create a new one */
+ rc = out_tx_end(env, th);
+ if (rc != 0)
+ RETURN(rc);
+
+ rc = out_tx_start(env, dt, th);
+ if (rc != 0)
+ RETURN(rc);
+ old_batchid = update->u_batchid;
+ }
fid_le_to_cpu(&update->u_fid, &update->u_fid);
if (!fid_is_sane(&update->u_fid)) {
CERROR("%s: invalid FID "DFID": rc = %d\n",
- mdt_obd_name(info->mti_mdt),
- PFID(&update->u_fid), -EPROTO);
+ mdt_obd_name(mdt), PFID(&update->u_fid),
+ -EPROTO);
GOTO(out, rc = err_serious(-EPROTO));
}
- dt_obj = out_object_find(info, &update->u_fid);
+
+ dt_obj = dt_locate(env, dt, &update->u_fid);
if (IS_ERR(dt_obj))
GOTO(out, rc = PTR_ERR(dt_obj));
h = mdt_handler_find(update->u_type, out_handlers);
if (likely(h != NULL)) {
+ /* For real modification RPC, check if the update
+ * has been executed */
+ if (h->mh_flags & MUTABOR) {
+ struct ptlrpc_request *req = mdt_info_req(info);
+
+ if (out_check_resent(env, dt, dt_obj, req,
+ out_reconstruct,
+ update_reply, i))
+ GOTO(next, rc);
+ }
+
rc = h->mh_act(info);
} else {
CERROR("%s: The unsupported opc: 0x%x\n",
- mdt_obd_name(info->mti_mdt), update->u_type);
- lu_object_put(info->mti_env, &dt_obj->do_lu);
+ mdt_obd_name(mdt), update->u_type);
+ lu_object_put(env, &dt_obj->do_lu);
GOTO(out, rc = -ENOTSUPP);
}
- lu_object_put(info->mti_env, &dt_obj->do_lu);
+next:
+ lu_object_put(env, &dt_obj->do_lu);
if (rc < 0)
GOTO(out, rc);
off += cfs_size_round(update_size(update));
}
-
out:
- rc1 = out_tx_end(info, th);
+ rc1 = out_tx_end(env, th);
rc = rc == 0 ? rc1 : rc;
- info->mti_fail_id = OBD_FAIL_UPDATE_OBJ_NET;
RETURN(rc);
}