* Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
* Use is subject to license terms.
*
- * Copyright (c) 2011, 2013, Intel Corporation.
+ * Copyright (c) 2011, 2014, Intel Corporation.
*/
/*
* This file is part of Lustre, http://www.lustre.org/
#define DEBUG_SUBSYSTEM S_MDS
+#include <lprocfs_status.h>
#include "mdt_internal.h"
#include <lustre_lmv.h>
* This checks version of 'name'. Many reint functions uses 'name' for child not
* FID, therefore we need to get object by name and check its version.
*/
-int mdt_lookup_version_check(struct mdt_thread_info *info,
- struct mdt_object *p, const struct lu_name *lname,
- struct lu_fid *fid, int idx)
+static int mdt_lookup_version_check(struct mdt_thread_info *info,
+ struct mdt_object *p,
+ const struct lu_name *lname,
+ struct lu_fid *fid, int idx)
{
int rc, vbrc;
return -ENOTSUPP;
if (S_ISDIR(attr->la_mode) && spec->u.sp_ea.eadata != NULL &&
- spec->u.sp_ea.eadatalen != 0 && !mdt_is_striped_client(exp))
- return -ENOTSUPP;
+ spec->u.sp_ea.eadatalen != 0) {
+ const struct lmv_user_md *lum = spec->u.sp_ea.eadata;
+
+ if (le32_to_cpu(lum->lum_stripe_count) > 1 &&
+ !mdt_is_striped_client(exp))
+ return -ENOTSUPP;
+ }
return 0;
}
RETURN(rc);
}
-int mdt_attr_set(struct mdt_thread_info *info, struct mdt_object *mo,
- struct md_attr *ma, int flags)
+static int mdt_attr_set(struct mdt_thread_info *info, struct mdt_object *mo,
+ struct md_attr *ma)
{
struct mdt_lock_handle *lh;
int do_vbr = ma->ma_attr.la_valid & (LA_MODE|LA_UID|LA_GID|LA_FLAGS);
int rc;
ENTRY;
- /* attr shouldn't be set on remote object */
- LASSERT(!mdt_object_remote(mo));
-
lh = &info->mti_lh[MDT_LH_PARENT];
mdt_lock_reg_init(lh, LCK_PW);
RETURN(rc);
s0_lh = &info->mti_lh[MDT_LH_LOCAL];
- mdt_lock_reg_init(s0_lh, LCK_EX);
+ mdt_lock_reg_init(s0_lh, LCK_PW);
rc = mdt_lock_slaves(info, mo, LCK_PW, lockpart, s0_lh, &s0_obj, einfo);
if (rc != 0)
GOTO(out_unlock, rc);
- if (mdt_object_exists(mo) == 0)
- GOTO(out_unlock, rc = -ENOENT);
-
/* all attrs are packed into mti_attr in unpack_setattr */
mdt_fail_write(info->mti_env, info->mti_mdt->mdt_bottom,
OBD_FAIL_MDS_REINT_SETATTR_WRITE);
struct md_attr *ma = &info->mti_attr;
struct mdt_reint_record *rr = &info->mti_rr;
struct ptlrpc_request *req = mdt_info_req(info);
- struct mdt_export_data *med = &req->rq_export->exp_mdt_data;
- struct mdt_file_data *mfd;
struct mdt_object *mo;
struct mdt_body *repbody;
- int som_au, rc, rc2;
+ int rc, rc2;
ENTRY;
DEBUG_REQ(D_INODE, req, "setattr "DFID" %x", PFID(rr->rr_fid1),
(unsigned int)ma->ma_attr.la_valid);
- if (info->mti_dlm_req)
- ldlm_request_cancel(req, info->mti_dlm_req, 0);
+ if (info->mti_dlm_req)
+ ldlm_request_cancel(req, info->mti_dlm_req, 0, LATF_SKIP);
repbody = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY);
mo = mdt_object_find(info->mti_env, info->mti_mdt, rr->rr_fid1);
if (IS_ERR(mo))
GOTO(out, rc = PTR_ERR(mo));
- /* start a log jounal handle if needed */
- if (!(mdt_conn_flags(info) & OBD_CONNECT_SOM)) {
- if ((ma->ma_attr.la_valid & LA_SIZE) ||
- (rr->rr_flags & MRF_OPEN_TRUNC)) {
- /* Check write access for the O_TRUNC case */
- if (mdt_write_read(mo) < 0)
- GOTO(out_put, rc = -ETXTBSY);
- }
- } else if (info->mti_ioepoch &&
- (info->mti_ioepoch->flags & MF_EPOCH_OPEN)) {
- /* Truncate case. IOEpoch is opened. */
- rc = mdt_write_get(mo);
- if (rc)
- GOTO(out_put, rc);
-
- mfd = mdt_mfd_new(med);
- if (mfd == NULL) {
- mdt_write_put(mo);
- GOTO(out_put, rc = -ENOMEM);
- }
-
- mdt_ioepoch_open(info, mo, 0);
- repbody->mbo_ioepoch = mo->mot_ioepoch;
-
- mdt_object_get(info->mti_env, mo);
- mdt_mfd_set_mode(mfd, MDS_FMODE_TRUNC);
- mfd->mfd_object = mo;
- mfd->mfd_xid = req->rq_xid;
+ if (!mdt_object_exists(mo))
+ GOTO(out_put, rc = -ENOENT);
- spin_lock(&med->med_open_lock);
- list_add(&mfd->mfd_list, &med->med_open_head);
- spin_unlock(&med->med_open_lock);
- repbody->mbo_handle.cookie = mfd->mfd_handle.h_cookie;
- }
+ if (mdt_object_remote(mo))
+ GOTO(out_put, rc = -EREMOTE);
- som_au = info->mti_ioepoch && info->mti_ioepoch->flags & MF_SOM_CHANGE;
- if (som_au) {
- /* SOM Attribute update case. Find the proper mfd and update
- * SOM attributes on the proper object. */
- LASSERT(mdt_conn_flags(info) & OBD_CONNECT_SOM);
- LASSERT(info->mti_ioepoch);
-
- spin_lock(&med->med_open_lock);
- mfd = mdt_handle2mfd(med, &info->mti_ioepoch->handle,
- req_is_replay(req));
- if (mfd == NULL) {
- spin_unlock(&med->med_open_lock);
- CDEBUG(D_INODE, "no handle for file close: "
- "fid = "DFID": cookie = "LPX64"\n",
- PFID(info->mti_rr.rr_fid1),
- info->mti_ioepoch->handle.cookie);
- GOTO(out_put, rc = -ESTALE);
- }
- LASSERT(mfd->mfd_mode == MDS_FMODE_SOM);
- LASSERT(!(info->mti_ioepoch->flags & MF_EPOCH_CLOSE));
+ if ((ma->ma_attr.la_valid & LA_SIZE) ||
+ (rr->rr_flags & MRF_OPEN_TRUNC)) {
+ /* Check write access for the O_TRUNC case */
+ if (mdt_write_read(mo) < 0)
+ GOTO(out_put, rc = -ETXTBSY);
+ }
- class_handle_unhash(&mfd->mfd_handle);
- list_del_init(&mfd->mfd_list);
- spin_unlock(&med->med_open_lock);
+ if ((ma->ma_valid & MA_INODE) && ma->ma_attr.la_valid) {
+ if (ma->ma_valid & MA_LOV)
+ GOTO(out_put, rc = -EPROTO);
- mdt_mfd_close(info, mfd);
- } else if ((ma->ma_valid & MA_INODE) && ma->ma_attr.la_valid) {
- LASSERT((ma->ma_valid & MA_LOV) == 0);
- rc = mdt_attr_set(info, mo, ma, rr->rr_flags);
+ rc = mdt_attr_set(info, mo, ma);
if (rc)
GOTO(out_put, rc);
} else if ((ma->ma_valid & MA_LOV) && (ma->ma_valid & MA_INODE)) {
struct lu_buf *buf = &info->mti_buf;
- LASSERT(ma->ma_attr.la_valid == 0);
+
+ if (ma->ma_attr.la_valid != 0)
+ GOTO(out_put, rc = -EPROTO);
+
buf->lb_buf = ma->ma_lmm;
buf->lb_len = ma->ma_lmm_size;
rc = mo_xattr_set(info->mti_env, mdt_object_child(mo),
GOTO(out_put, rc);
} else if ((ma->ma_valid & MA_LMV) && (ma->ma_valid & MA_INODE)) {
struct lu_buf *buf = &info->mti_buf;
+ struct mdt_lock_handle *lh;
+
+ if (ma->ma_attr.la_valid != 0)
+ GOTO(out_put, rc = -EPROTO);
+
+ lh = &info->mti_lh[MDT_LH_PARENT];
+ mdt_lock_reg_init(lh, LCK_PW);
+
+ rc = mdt_object_lock(info, mo, lh,
+ MDS_INODELOCK_XATTR,
+ MDT_LOCAL_LOCK);
+ if (rc != 0)
+ GOTO(out_put, rc);
- LASSERT(ma->ma_attr.la_valid == 0);
buf->lb_buf = ma->ma_lmv;
buf->lb_len = ma->ma_lmv_size;
rc = mo_xattr_set(info->mti_env, mdt_object_child(mo),
buf, XATTR_NAME_DEFAULT_LMV, 0);
+
+ mdt_object_unlock(info, mo, lh, rc);
if (rc)
GOTO(out_put, rc);
- } else
- LBUG();
+ } else {
+ GOTO(out_put, rc = -EPROTO);
+ }
/* If file data is modified, add the dirty flag */
if (ma->ma_attr_flags & MDS_DATA_MODIFIED)
if (info->mti_mdt->mdt_lut.lut_oss_capa &&
exp_connect_flags(info->mti_exp) & OBD_CONNECT_OSS_CAPA &&
S_ISREG(lu_object_attr(&mo->mot_obj)) &&
- (ma->ma_attr.la_valid & LA_SIZE) && !som_au) {
+ (ma->ma_attr.la_valid & LA_SIZE)) {
struct lustre_capa *capa;
capa = req_capsule_server_get(info->mti_pill, &RMF_CAPA2);
if (OBD_FAIL_CHECK(OBD_FAIL_MDS_REINT_CREATE))
RETURN(err_serious(-ESTALE));
- if (info->mti_dlm_req)
- ldlm_request_cancel(mdt_info_req(info), info->mti_dlm_req, 0);
+ if (info->mti_dlm_req)
+ ldlm_request_cancel(mdt_info_req(info),
+ info->mti_dlm_req, 0, LATF_SKIP);
if (!lu_name_is_valid(&info->mti_rr.rr_name))
RETURN(-EPROTO);
DEBUG_REQ(D_INODE, req, "unlink "DFID"/"DNAME"", PFID(rr->rr_fid1),
PNAME(&rr->rr_name));
- if (info->mti_dlm_req)
- ldlm_request_cancel(req, info->mti_dlm_req, 0);
+ if (info->mti_dlm_req)
+ ldlm_request_cancel(req, info->mti_dlm_req, 0, LATF_SKIP);
if (OBD_FAIL_CHECK(OBD_FAIL_MDS_REINT_UNLINK))
RETURN(err_serious(-ENOENT));
if (OBD_FAIL_CHECK(OBD_FAIL_MDS_REINT_LINK))
RETURN(err_serious(-ENOENT));
- if (info->mti_dlm_req)
- ldlm_request_cancel(req, info->mti_dlm_req, 0);
+ if (info->mti_dlm_req)
+ ldlm_request_cancel(req, info->mti_dlm_req, 0, LATF_SKIP);
/* Invalid case so return error immediately instead of
* processing it */
struct list_head *lock_list)
{
struct lu_buf *buf = &info->mti_big_buf;
- struct linkea_data ldata = { 0 };
+ struct linkea_data ldata = { NULL };
int count;
int rc;
ENTRY;
ENTRY;
if (info->mti_dlm_req)
- ldlm_request_cancel(req, info->mti_dlm_req, 0);
+ ldlm_request_cancel(req, info->mti_dlm_req, 0, LATF_SKIP);
if (!fid_is_md_operative(rr->rr_fid1) ||
!fid_is_md_operative(rr->rr_fid2))
return mdt_reint_rename_or_migrate(info, lhc, MRL_MIGRATE);
}
-typedef int (*mdt_reinter)(struct mdt_thread_info *info,
- struct mdt_lock_handle *lhc);
-
-static mdt_reinter reinters[REINT_MAX] = {
- [REINT_SETATTR] = mdt_reint_setattr,
- [REINT_CREATE] = mdt_reint_create,
- [REINT_LINK] = mdt_reint_link,
- [REINT_UNLINK] = mdt_reint_unlink,
- [REINT_RENAME] = mdt_reint_rename,
- [REINT_OPEN] = mdt_reint_open,
- [REINT_SETXATTR] = mdt_reint_setxattr,
- [REINT_RMENTRY] = mdt_reint_unlink,
- [REINT_MIGRATE] = mdt_reint_migrate,
+struct mdt_reinter {
+ int (*mr_handler)(struct mdt_thread_info *, struct mdt_lock_handle *);
+ enum lprocfs_extra_opc mr_extra_opc;
+};
+
+static const struct mdt_reinter mdt_reinters[] = {
+ [REINT_SETATTR] = {
+ .mr_handler = &mdt_reint_setattr,
+ .mr_extra_opc = MDS_REINT_SETATTR,
+ },
+ [REINT_CREATE] = {
+ .mr_handler = &mdt_reint_create,
+ .mr_extra_opc = MDS_REINT_CREATE,
+ },
+ [REINT_LINK] = {
+ .mr_handler = &mdt_reint_link,
+ .mr_extra_opc = MDS_REINT_LINK,
+ },
+ [REINT_UNLINK] = {
+ .mr_handler = &mdt_reint_unlink,
+ .mr_extra_opc = MDS_REINT_UNLINK,
+ },
+ [REINT_RENAME] = {
+ .mr_handler = &mdt_reint_rename,
+ .mr_extra_opc = MDS_REINT_RENAME,
+ },
+ [REINT_OPEN] = {
+ .mr_handler = &mdt_reint_open,
+ .mr_extra_opc = MDS_REINT_OPEN,
+ },
+ [REINT_SETXATTR] = {
+ .mr_handler = &mdt_reint_setxattr,
+ .mr_extra_opc = MDS_REINT_SETXATTR,
+ },
+ [REINT_RMENTRY] = {
+ .mr_handler = &mdt_reint_unlink,
+ .mr_extra_opc = MDS_REINT_UNLINK,
+ },
+ [REINT_MIGRATE] = {
+ .mr_handler = &mdt_reint_migrate,
+ .mr_extra_opc = MDS_REINT_RENAME,
+ },
};
int mdt_reint_rec(struct mdt_thread_info *info,
- struct mdt_lock_handle *lhc)
+ struct mdt_lock_handle *lhc)
{
- int rc;
- ENTRY;
+ const struct mdt_reinter *mr;
+ int rc;
+ ENTRY;
- rc = reinters[info->mti_rr.rr_opcode](info, lhc);
+ if (!(info->mti_rr.rr_opcode < ARRAY_SIZE(mdt_reinters)))
+ RETURN(-EPROTO);
- RETURN(rc);
+ mr = &mdt_reinters[info->mti_rr.rr_opcode];
+ if (mr->mr_handler == NULL)
+ RETURN(-EPROTO);
+
+ rc = (*mr->mr_handler)(info, lhc);
+
+ lprocfs_counter_incr(ptlrpc_req2svc(mdt_info_req(info))->srv_stats,
+ PTLRPC_LAST_CNTR + mr->mr_extra_opc);
+
+ RETURN(rc);
}