#include <dt_object.h>
#include <lustre_acl.h>
#include <lustre_export.h>
-#include <lustre_ioctl.h>
+#include <uapi/linux/lustre_ioctl.h>
#include <lustre_lfsck.h>
#include <lustre_log.h>
#include <lustre_net.h>
#include <lustre_swab.h>
#include <obd.h>
#include <obd_support.h>
+#include <lustre_barrier.h>
#include <llog_swab.h>
lh->mlh_type = MDT_PDO_LOCK;
if (lu_name_is_valid(lname)) {
- lh->mlh_pdo_hash = full_name_hash(lname->ln_name,
- lname->ln_namelen);
+ lh->mlh_pdo_hash = ll_full_name_hash(NULL, lname->ln_name,
+ lname->ln_namelen);
/* XXX Workaround for LU-2856
*
* Zero is a valid return value of full_name_hash, but
const struct lu_env *env = info->mti_env;
struct md_object *next = mdt_object_child(o);
struct lu_buf *buf = &info->mti_buf;
+ struct mdt_device *mdt = info->mti_mdt;
int rc;
+ ENTRY;
+
buf->lb_buf = req_capsule_server_get(info->mti_pill, &RMF_ACL);
buf->lb_len = req_capsule_get_size(info->mti_pill, &RMF_ACL,
RCL_SERVER);
if (buf->lb_len == 0)
- return 0;
+ RETURN(0);
+again:
rc = mo_xattr_get(env, next, buf, XATTR_NAME_ACL_ACCESS);
if (rc < 0) {
if (rc == -ENODATA) {
} else if (rc == -EOPNOTSUPP) {
rc = 0;
} else {
+ if (rc == -ERANGE &&
+ exp_connect_large_acl(info->mti_exp) &&
+ buf->lb_buf != info->mti_big_acl) {
+ if (info->mti_big_acl == NULL) {
+ OBD_ALLOC_LARGE(info->mti_big_acl,
+ mdt->mdt_max_ea_size);
+ if (info->mti_big_acl == NULL) {
+ CERROR("%s: unable to grow "
+ DFID" ACL buffer\n",
+ mdt_obd_name(mdt),
+ PFID(mdt_object_fid(o)));
+ RETURN(-ENOMEM);
+ }
+
+ info->mti_big_aclsize =
+ mdt->mdt_max_ea_size;
+ }
+
+ CDEBUG(D_INODE, "%s: grow the "DFID
+ " ACL buffer to size %d\n",
+ mdt_obd_name(mdt),
+ PFID(mdt_object_fid(o)),
+ mdt->mdt_max_ea_size);
+
+ buf->lb_buf = info->mti_big_acl;
+ buf->lb_len = info->mti_big_aclsize;
+
+ goto again;
+ }
+
CERROR("%s: unable to read "DFID" ACL: rc = %d\n",
- mdt_obd_name(info->mti_mdt),
- PFID(mdt_object_fid(o)), rc);
+ mdt_obd_name(mdt), PFID(mdt_object_fid(o)), rc);
}
} else {
+ if (buf->lb_buf == info->mti_big_acl)
+ info->mti_big_acl_used = 1;
+
rc = nodemap_map_acl(nodemap, buf->lb_buf,
rc, NODEMAP_FS_TO_CLIENT);
/* if all ACLs mapped out, rc is still >= 0 */
if (rc < 0) {
CERROR("%s: nodemap_map_acl unable to parse "DFID
- " ACL: rc = %d\n", mdt_obd_name(info->mti_mdt),
+ " ACL: rc = %d\n", mdt_obd_name(mdt),
PFID(mdt_object_fid(o)), rc);
} else {
repbody->mbo_aclsize = rc;
rc = 0;
}
}
- return rc;
+
+ RETURN(rc);
}
#endif
+/* XXX Look into layout in MDT layer. */
+static inline bool mdt_hsm_is_released(struct lov_mds_md *lmm)
+{
+ struct lov_comp_md_v1 *comp_v1;
+ struct lov_mds_md *v1;
+ int i;
+
+ if (lmm->lmm_magic == LOV_MAGIC_COMP_V1) {
+ comp_v1 = (struct lov_comp_md_v1 *)lmm;
+
+ for (i = 0; i < comp_v1->lcm_entry_count; i++) {
+ v1 = (struct lov_mds_md *)((char *)comp_v1 +
+ comp_v1->lcm_entries[i].lcme_offset);
+ /* We don't support partial release for now */
+ if (!(v1->lmm_pattern & LOV_PATTERN_F_RELEASED))
+ return false;
+ }
+ return true;
+ } else {
+ return (lmm->lmm_pattern & LOV_PATTERN_F_RELEASED) ?
+ true : false;
+ }
+}
+
void mdt_pack_attr2body(struct mdt_thread_info *info, struct mdt_body *b,
const struct lu_attr *attr, const struct lu_fid *fid)
{
b->mbo_valid |= OBD_MD_FLGID;
}
+ if (attr->la_valid & LA_PROJID) {
+ /* TODO, nodemap for project id */
+ b->mbo_projid = attr->la_projid;
+ b->mbo_valid |= OBD_MD_FLPROJID;
+ }
+
b->mbo_mode = attr->la_mode;
if (attr->la_valid & LA_MODE)
b->mbo_valid |= OBD_MD_FLMODE;
* b=22272 */
b->mbo_valid |= OBD_MD_FLSIZE | OBD_MD_FLBLOCKS;
} else if ((ma->ma_valid & MA_LOV) && ma->ma_lmm != NULL &&
- ma->ma_lmm->lmm_pattern & LOV_PATTERN_F_RELEASED) {
+ mdt_hsm_is_released(ma->ma_lmm)) {
/* A released file stores its size on MDS. */
/* But return 1 block for released file, unless tools like tar
* will consider it fully sparse. (LU-3864)
struct lu_buf *buffer = &info->mti_buf;
struct obd_export *exp = info->mti_exp;
int rc;
- int is_root;
ENTRY;
if (OBD_FAIL_CHECK(OBD_FAIL_MDS_GETATTR_PACK))
repbody->mbo_t_state = MS_RESTORE;
}
- is_root = lu_fid_eq(mdt_object_fid(o), &info->mti_mdt->mdt_md_root_fid);
-
- /* the Lustre protocol supposes to return default striping
- * on the user-visible root if explicitly requested */
- if ((ma->ma_valid & MA_LOV) == 0 && S_ISDIR(la->la_mode) &&
- (ma->ma_need & MA_LOV_DEF && is_root) && ma->ma_need & MA_LOV) {
- struct lu_fid rootfid;
- struct mdt_object *root;
- struct mdt_device *mdt = info->mti_mdt;
-
- rc = dt_root_get(env, mdt->mdt_bottom, &rootfid);
- if (rc)
- RETURN(rc);
- root = mdt_object_find(env, mdt, &rootfid);
- if (IS_ERR(root))
- RETURN(PTR_ERR(root));
- rc = mdt_stripe_get(info, root, ma, XATTR_NAME_LOV);
- mdt_object_put(info->mti_env, root);
- if (unlikely(rc)) {
- CERROR("%s: getattr error for "DFID": rc = %d\n",
- mdt_obd_name(info->mti_mdt),
- PFID(mdt_object_fid(o)), rc);
- RETURN(rc);
- }
- }
-
if (likely(ma->ma_valid & MA_INODE))
mdt_pack_attr2body(info, repbody, la, mdt_object_fid(o));
else
req_capsule_set_size(pill, &RMF_MDT_MD, RCL_SERVER, rc);
+ /* Set ACL reply buffer size as LUSTRE_POSIX_ACL_MAX_SIZE_OLD
+ * by default. If the target object has more ACL entries, then
+ * enlarge the buffer when necessary. */
+ req_capsule_set_size(pill, &RMF_ACL, RCL_SERVER,
+ LUSTRE_POSIX_ACL_MAX_SIZE_OLD);
+
rc = req_capsule_server_pack(pill);
if (unlikely(rc != 0))
GOTO(out, rc = err_serious(rc));
}
/**
+ * Handler of layout intent RPC requiring the layout modification
+ *
+ * \param[in] info thread environment
+ * \param[in] obj object
+ * \param[in] layout layout intent
+ * \param[in] buf buffer containing client's lovea, could be empty
+ *
+ * \retval 0 on success
+ * \retval < 0 error code
+ */
+static int mdt_layout_change(struct mdt_thread_info *info,
+ struct mdt_object *obj,
+ struct layout_intent *layout,
+ const struct lu_buf *buf)
+{
+ struct mdt_lock_handle *lh = &info->mti_lh[MDT_LH_LOCAL];
+ int rc;
+ ENTRY;
+
+ CDEBUG(D_INFO, "got layout change request from client: "
+ "opc:%u flags:%#x extent[%#llx,%#llx)\n",
+ layout->li_opc, layout->li_flags,
+ layout->li_start, layout->li_end);
+ if (layout->li_start >= layout->li_end) {
+ CERROR("Recieved an invalid layout change range [%llu, %llu) "
+ "for "DFID"\n", layout->li_start, layout->li_end,
+ PFID(mdt_object_fid(obj)));
+ RETURN(-EINVAL);
+ }
+
+ if (!S_ISREG(lu_object_attr(&obj->mot_obj)))
+ GOTO(out, rc = -EINVAL);
+
+ rc = mo_permission(info->mti_env, NULL, mdt_object_child(obj), NULL,
+ MAY_WRITE);
+ if (rc)
+ GOTO(out, rc);
+
+ /* take layout lock to prepare layout change */
+ mdt_lock_reg_init(lh, LCK_EX);
+ rc = mdt_object_lock(info, obj, lh,
+ MDS_INODELOCK_LAYOUT | MDS_INODELOCK_XATTR);
+ if (rc)
+ GOTO(out, rc);
+
+ rc = mo_layout_change(info->mti_env, mdt_object_child(obj), layout,
+ buf);
+
+ mdt_object_unlock(info, obj, lh, 1);
+out:
+ RETURN(rc);
+}
+
+/**
* Exchange MOF_LOV_CREATED flags between two objects after a
* layout swap. No assumption is made on whether o1 or o2 have
* created objects or not.
if (req_capsule_has_field(pill, &RMF_LOGCOOKIES, RCL_SERVER))
req_capsule_set_size(pill, &RMF_LOGCOOKIES, RCL_SERVER, 0);
+ /* Set ACL reply buffer size as LUSTRE_POSIX_ACL_MAX_SIZE_OLD
+ * by default. If the target object has more ACL entries, then
+ * enlarge the buffer when necessary. */
+ if (req_capsule_has_field(pill, &RMF_ACL, RCL_SERVER))
+ req_capsule_set_size(pill, &RMF_ACL, RCL_SERVER,
+ LUSTRE_POSIX_ACL_MAX_SIZE_OLD);
+
rc = req_capsule_server_pack(pill);
if (rc != 0) {
CERROR("Can't pack response, rc %d\n", rc);
}
id = oqctl->qc_id;
- if (oqctl->qc_type == USRQUOTA)
+ switch (oqctl->qc_type) {
+ case USRQUOTA:
id = nodemap_map_id(nodemap, NODEMAP_UID,
NODEMAP_CLIENT_TO_FS, id);
- else if (oqctl->qc_type == GRPQUOTA)
- id = nodemap_map_id(nodemap, NODEMAP_UID,
+ break;
+ case GRPQUOTA:
+ id = nodemap_map_id(nodemap, NODEMAP_GID,
NODEMAP_CLIENT_TO_FS, id);
-
+ break;
+ case PRJQUOTA:
+ /* todo: check/map project id */
+ id = oqctl->qc_id;
+ break;
+ default:
+ GOTO(out_nodemap, rc = -EOPNOTSUPP);
+ }
repoqc = req_capsule_server_get(pill, &RMF_OBD_QUOTACTL);
if (repoqc == NULL)
GOTO(out_nodemap, rc = err_serious(-EFAULT));
+ if (oqctl->qc_cmd == Q_SETINFO || oqctl->qc_cmd == Q_SETQUOTA)
+ barrier_exit(tsi->tsi_tgt->lut_bottom);
+
if (oqctl->qc_id != id)
swap(oqctl->qc_id, id);
+ if (oqctl->qc_cmd == Q_SETINFO || oqctl->qc_cmd == Q_SETQUOTA) {
+ if (unlikely(!barrier_entry(tsi->tsi_tgt->lut_bottom)))
+ RETURN(-EINPROGRESS);
+ }
+
switch (oqctl->qc_cmd) {
case Q_GETINFO:
* \see ldlm_blocking_ast_nocheck
*/
int mdt_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc,
- void *data, int flag)
+ void *data, int flag)
{
- struct obd_device *obd = ldlm_lock_to_ns(lock)->ns_obd;
- struct mdt_device *mdt = mdt_dev(obd->obd_lu_dev);
- int rc;
- ENTRY;
+ struct obd_device *obd = ldlm_lock_to_ns(lock)->ns_obd;
+ struct mdt_device *mdt = mdt_dev(obd->obd_lu_dev);
+ bool commit_async = false;
+ int rc;
+ ENTRY;
- if (flag == LDLM_CB_CANCELING)
- RETURN(0);
+ if (flag == LDLM_CB_CANCELING)
+ RETURN(0);
- lock_res_and_lock(lock);
- if (lock->l_blocking_ast != mdt_blocking_ast) {
- unlock_res_and_lock(lock);
- RETURN(0);
- }
+ lock_res_and_lock(lock);
+ if (lock->l_blocking_ast != mdt_blocking_ast) {
+ unlock_res_and_lock(lock);
+ RETURN(0);
+ }
+ /* There is no lock conflict if l_blocking_lock == NULL,
+ * it indicates a blocking ast sent from ldlm_lock_decref_internal
+ * when the last reference to a local lock was released */
if (lock->l_req_mode & (LCK_PW | LCK_EX) &&
lock->l_blocking_lock != NULL) {
- if (mdt_cos_is_enabled(mdt) &&
- lock->l_client_cookie !=
- lock->l_blocking_lock->l_client_cookie)
- mdt_set_lock_sync(lock);
- else if (mdt_slc_is_enabled(mdt) &&
- ldlm_is_cos_incompat(lock->l_blocking_lock))
+ if (mdt_cos_is_enabled(mdt)) {
+ if (lock->l_client_cookie !=
+ lock->l_blocking_lock->l_client_cookie)
+ mdt_set_lock_sync(lock);
+ } else if (mdt_slc_is_enabled(mdt) &&
+ ldlm_is_cos_incompat(lock->l_blocking_lock)) {
mdt_set_lock_sync(lock);
+ /*
+ * we may do extra commit here, but there is a small
+ * window to miss a commit: lock was unlocked (saved),
+ * then a conflict lock queued and we come here, but
+ * REP-ACK not received, so lock was not converted to
+ * COS mode yet.
+ * Fortunately this window is quite small, so the
+ * extra commit should be rare (not to say distributed
+ * operation is rare too).
+ */
+ commit_async = true;
+ }
+ } else if (lock->l_req_mode == LCK_COS &&
+ lock->l_blocking_lock != NULL) {
+ commit_async = true;
}
- rc = ldlm_blocking_ast_nocheck(lock);
- /* There is no lock conflict if l_blocking_lock == NULL,
- * it indicates a blocking ast sent from ldlm_lock_decref_internal
- * when the last reference to a local lock was released */
- if (lock->l_req_mode == LCK_COS && lock->l_blocking_lock != NULL) {
- struct lu_env env;
+ rc = ldlm_blocking_ast_nocheck(lock);
+
+ if (commit_async) {
+ struct lu_env env;
rc = lu_env_init(&env, LCT_LOCAL);
if (unlikely(rc != 0))
CWARN("%s: lu_env initialization failed, cannot "
"start asynchronous commit: rc = %d\n",
obd->obd_name, rc);
- else
- mdt_device_commit_async(&env, mdt);
- lu_env_fini(&env);
- }
- RETURN(rc);
+ else
+ mdt_device_commit_async(&env, mdt);
+ lu_env_fini(&env);
+ }
+ RETURN(rc);
}
/*
struct mdt_device *mdt = info->mti_mdt;
struct ldlm_lock *lock = ldlm_handle2lock(h);
struct ptlrpc_request *req = mdt_info_req(info);
- int cos;
-
- cos = (mdt_cos_is_enabled(mdt) ||
- mdt_slc_is_enabled(mdt));
+ bool cos = mdt_cos_is_enabled(mdt);
+ bool convert_lock = !cos && mdt_slc_is_enabled(mdt);
LASSERTF(lock != NULL, "no lock for cookie %#llx\n",
h->cookie);
/* there is no request if mdt_object_unlock() is called
* from mdt_export_cleanup()->mdt_add_dirty_flag() */
if (likely(req != NULL)) {
- CDEBUG(D_HA, "request = %p reply state = %p"
- " transno = %lld\n", req,
- req->rq_reply_state, req->rq_transno);
+ LDLM_DEBUG(lock, "save lock request %p reply "
+ "state %p transno %lld\n", req,
+ req->rq_reply_state, req->rq_transno);
if (cos) {
ldlm_lock_downgrade(lock, LCK_COS);
mode = LCK_COS;
}
- ptlrpc_save_lock(req, h, mode, cos);
+ ptlrpc_save_lock(req, h, mode, cos,
+ convert_lock);
} else {
mdt_fid_unlock(h, mode);
}
req_capsule_set_size(pill, &RMF_LOGCOOKIES,
RCL_SERVER, 0);
+ /* Set ACL reply buffer size as LUSTRE_POSIX_ACL_MAX_SIZE_OLD
+ * by default. If the target object has more ACL entries, then
+ * enlarge the buffer when necessary. */
+ if (req_capsule_has_field(pill, &RMF_ACL, RCL_SERVER))
+ req_capsule_set_size(pill, &RMF_ACL, RCL_SERVER,
+ LUSTRE_POSIX_ACL_MAX_SIZE_OLD);
+
rc = req_capsule_server_pack(pill);
}
RETURN(rc);
info->mti_cross_ref = 0;
info->mti_opdata = 0;
info->mti_big_lmm_used = 0;
+ info->mti_big_acl_used = 0;
info->mti_spec.no_create = 0;
info->mti_spec.sp_rm_entry = 0;
struct layout_intent *layout;
struct lu_fid *fid;
struct mdt_object *obj = NULL;
+ bool layout_change = false;
int layout_size = 0;
int rc = 0;
ENTRY;
if (layout == NULL)
RETURN(-EPROTO);
- if (layout->li_opc != LAYOUT_INTENT_ACCESS) {
+ switch (layout->li_opc) {
+ case LAYOUT_INTENT_TRUNC:
+ case LAYOUT_INTENT_WRITE:
+ layout_change = true;
+ break;
+ case LAYOUT_INTENT_ACCESS:
+ break;
+ case LAYOUT_INTENT_READ:
+ case LAYOUT_INTENT_GLIMPSE:
+ case LAYOUT_INTENT_RELEASE:
+ case LAYOUT_INTENT_RESTORE:
CERROR("%s: Unsupported layout intent opc %d\n",
mdt_obd_name(info->mti_mdt), layout->li_opc);
- RETURN(-EINVAL);
+ rc = -ENOTSUPP;
+ break;
+ default:
+ CERROR("%s: Unknown layout intent opc %d\n",
+ mdt_obd_name(info->mti_mdt), layout->li_opc);
+ rc = -EINVAL;
+ break;
}
+ if (rc < 0)
+ RETURN(rc);
fid = &info->mti_tmp_fid2;
fid_extract_from_res_name(fid, &(*lockp)->l_resource->lr_name);
info->mti_mdt->mdt_max_mdsize = layout_size;
}
+ /*
+ * set reply buffer size, so that ldlm_handle_enqueue0()->
+ * ldlm_lvbo_fill() will fill the reply buffer with lovea.
+ */
(*lockp)->l_lvb_type = LVB_T_LAYOUT;
req_capsule_set_size(info->mti_pill, &RMF_DLM_LVB, RCL_SERVER,
layout_size);
rc = req_capsule_server_pack(info->mti_pill);
- GOTO(out_obj, rc);
+ if (rc)
+ GOTO(out_obj, rc);
+
+
+ if (layout_change) {
+ struct lu_buf *buf = &info->mti_buf;
+
+ /**
+ * mdt_layout_change is a reint operation, when the request
+ * is resent, layout write shouldn't reprocess it again.
+ */
+ rc = mdt_check_resent(info, mdt_reconstruct_generic, lhc);
+ if (rc)
+ GOTO(out_obj, rc = rc < 0 ? rc : 0);
+ /**
+ * There is another resent case: the client's job has been
+ * done by another client, referring lod_declare_layout_change
+ * -EALREADY case, and it became a operation w/o transaction,
+ * so we should not do the layout change, otherwise
+ * mdt_layout_change() will try to cancel the granted server
+ * CR lock whose remote counterpart is still in hold on the
+ * client, and a deadlock ensues.
+ */
+ rc = mdt_check_resent_lock(info, obj, lhc);
+ if (rc <= 0)
+ GOTO(out_obj, rc);
+
+ buf->lb_buf = NULL;
+ buf->lb_len = 0;
+ if (unlikely(req_is_replay(mdt_info_req(info)))) {
+ buf->lb_buf = req_capsule_client_get(info->mti_pill,
+ &RMF_EADATA);
+ buf->lb_len = req_capsule_get_size(info->mti_pill,
+ &RMF_EADATA, RCL_CLIENT);
+ /*
+ * If it's a replay of layout write intent RPC, the
+ * client has saved the extended lovea when
+ * it get reply then.
+ */
+ if (buf->lb_len > 0)
+ mdt_fix_lov_magic(info, buf->lb_buf);
+ }
+
+ /*
+ * Instantiate some layout components, if @buf contains
+ * lovea, then it's a replay of the layout intent write
+ * RPC.
+ */
+ rc = mdt_layout_change(info, obj, layout, buf);
+ if (rc)
+ GOTO(out_obj, rc);
+ }
out_obj:
mdt_object_put(info->mti_env, obj);
if (qmt == NULL)
RETURN(-EOPNOTSUPP);
+ if (mdt_rdonly(req->rq_export))
+ RETURN(-EROFS);
+
(*lockp)->l_lvb_type = LVB_T_LQUOTA;
/* pass the request to quota master */
rc = qmt_hdls.qmth_intent_policy(info->mti_env, qmt,
if (rc < 0)
RETURN(rc);
- if (flv->it_flags & MUTABOR &&
- exp_connect_flags(req->rq_export) & OBD_CONNECT_RDONLY)
+ if (flv->it_flags & MUTABOR && mdt_rdonly(req->rq_export))
RETURN(-EROFS);
if (flv->it_act != NULL) {
if (rc)
GOTO(err_tgt, rc);
- tgt_adapt_sptlrpc_conf(&m->mdt_lut, 1);
+ tgt_adapt_sptlrpc_conf(&m->mdt_lut);
next = m->mdt_child;
rc = next->md_ops->mdo_iocontrol(env, next, OBD_IOC_GET_MNTOPT, 0,
spin_lock_init(&mo->mot_write_lock);
mutex_init(&mo->mot_lov_mutex);
init_rwsem(&mo->mot_open_sem);
+ atomic_set(&mo->mot_open_count, 0);
RETURN(o);
}
RETURN(NULL);
ENTRY;
if (KEY_IS(KEY_SPTLRPC_CONF)) {
- rc = tgt_adapt_sptlrpc_conf(class_exp2tgt(exp), 0);
+ rc = tgt_adapt_sptlrpc_conf(class_exp2tgt(exp));
RETURN(rc);
}
data->ocd_connect_flags &= MDT_CONNECT_SUPPORTED;
+ if (mdt->mdt_bottom->dd_rdonly &&
+ !(data->ocd_connect_flags & OBD_CONNECT_MDS_MDS) &&
+ !(data->ocd_connect_flags & OBD_CONNECT_RDONLY))
+ RETURN(-EACCES);
+
if (data->ocd_connect_flags & OBD_CONNECT_FLAGS2)
data->ocd_connect_flags2 &= MDT_CONNECT_SUPPORTED2;
if (!mdt->mdt_opts.mo_user_xattr)
data->ocd_connect_flags &= ~OBD_CONNECT_XATTR;
- if (data->ocd_connect_flags & OBD_CONNECT_BRW_SIZE) {
- data->ocd_brw_size = min(data->ocd_brw_size,
- (__u32)MD_MAX_BRW_SIZE);
+ if (OCD_HAS_FLAG(data, BRW_SIZE)) {
+ data->ocd_brw_size = min(data->ocd_brw_size, MD_MAX_BRW_SIZE);
if (data->ocd_brw_size == 0) {
- CERROR("%s: cli %s/%p ocd_connect_flags: %#llx"
- " ocd_version: %x ocd_grant: %d "
- "ocd_index: %u ocd_brw_size is "
- "unexpectedly zero, network data "
- "corruption? Refusing connection of this"
- " client\n",
+ CERROR("%s: cli %s/%p ocd_connect_flags: %#llx "
+ "ocd_version: %x ocd_grant: %d ocd_index: %u "
+ "ocd_brw_size unexpectedly zero, network data "
+ "corruption? Refusing to connect this client\n",
mdt_obd_name(mdt),
exp->exp_client_uuid.uuid,
exp, data->ocd_connect_flags, data->ocd_version,
rc = mdt_ctxt_add_dirty_flag(&env, info, mfd);
/* Don't unlink orphan on failover umount, LU-184 */
- if (exp->exp_flags & OBD_OPT_FAILOVER) {
+ if (exp->exp_obd->obd_fail) {
ma->ma_valid = MA_FLAGS;
ma->ma_attr_flags |= MDS_KEEP_ORPHAN;
}
}
info->mti_mdt = NULL;
/* cleanup client slot early */
- /* Do not erase record for recoverable client. */
- if (!(exp->exp_flags & OBD_OPT_FAILOVER) || exp->exp_failed)
- tgt_client_del(&env, exp);
+ tgt_client_del(&env, exp);
lu_env_fini(&env);
RETURN(rc);
RETURN(rc);
}
-/* Pass the ioc down */
-static int mdt_ioc_child(struct lu_env *env, struct mdt_device *mdt,
- unsigned int cmd, int len, void *data)
-{
- struct lu_context ioctl_session;
- struct md_device *next = mdt->mdt_child;
- int rc;
- ENTRY;
-
- rc = lu_context_init(&ioctl_session, LCT_SERVER_SESSION);
- if (rc)
- RETURN(rc);
- ioctl_session.lc_thread = (struct ptlrpc_thread *)current;
- lu_context_enter(&ioctl_session);
- env->le_ses = &ioctl_session;
-
- LASSERT(next->md_ops->mdo_iocontrol);
- rc = next->md_ops->mdo_iocontrol(env, next, cmd, len, data);
-
- lu_context_exit(&ioctl_session);
- lu_context_fini(&ioctl_session);
- RETURN(rc);
-}
-
static int mdt_ioc_version_get(struct mdt_thread_info *mti, void *karg)
{
struct obd_ioctl_data *data = karg;
case OBD_IOC_CHANGELOG_REG:
case OBD_IOC_CHANGELOG_DEREG:
case OBD_IOC_CHANGELOG_CLEAR:
- rc = mdt_ioc_child(&env, mdt, cmd, len, karg);
+ rc = mdt->mdt_child->md_ops->mdo_iocontrol(&env,
+ mdt->mdt_child,
+ cmd, len, karg);
break;
case OBD_IOC_START_LFSCK: {
struct md_device *next = mdt->mdt_child;
int rc;
ENTRY;
- if (!mdt->mdt_skip_lfsck) {
+ if (!mdt->mdt_skip_lfsck && !mdt->mdt_bottom->dd_rdonly) {
struct lfsck_start_param lsp;
lsp.lsp_start = NULL;
info->mti_big_lmm = NULL;
info->mti_big_lmmsize = 0;
}
+
+ if (info->mti_big_acl) {
+ OBD_FREE_LARGE(info->mti_big_acl, info->mti_big_aclsize);
+ info->mti_big_acl = NULL;
+ info->mti_big_aclsize = 0;
+ }
+
OBD_FREE_PTR(info);
}