* Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
* Use is subject to license terms.
*
- * Copyright (c) 2011, 2012, Whamcloud, Inc.
+ * Copyright (c) 2010, 2013, Intel Corporation.
*/
/*
* This file is part of Lustre, http://www.lustre.org/
#include <lustre_mdt.h>
#include <lustre_log.h>
#include "mdt_internal.h"
-#ifdef HAVE_QUOTA_SUPPORT
-# include <lustre_quota.h>
-#endif
#include <lustre_acl.h>
#include <lustre_param.h>
+#include <lustre_quota.h>
mdl_mode_t mdt_mdl_lock_modes[] = {
[LCK_MINMODE] = MDL_MINMODE,
[MDL_GROUP] = LCK_GROUP
};
-/*
- * Initialized in mdt_mod_init().
- */
-static unsigned long mdt_num_threads;
-CFS_MODULE_PARM(mdt_num_threads, "ul", ulong, 0444,
- "number of MDS service threads to start "
- "(deprecated in favor of mds_num_threads)");
-
-static unsigned long mds_num_threads;
-CFS_MODULE_PARM(mds_num_threads, "ul", ulong, 0444,
- "number of MDS service threads to start");
-
-static char *mds_num_cpts;
-CFS_MODULE_PARM(mds_num_cpts, "c", charp, 0444,
- "CPU partitions MDS threads should run on");
-
-static unsigned long mds_rdpg_num_threads;
-CFS_MODULE_PARM(mds_rdpg_num_threads, "ul", ulong, 0444,
- "number of MDS readpage service threads to start");
-
-static char *mds_rdpg_num_cpts;
-CFS_MODULE_PARM(mds_rdpg_num_cpts, "c", charp, 0444,
- "CPU partitions MDS readpage threads should run on");
-
-/* NB: these two should be removed along with setattr service in the future */
-static unsigned long mds_attr_num_threads;
-CFS_MODULE_PARM(mds_attr_num_threads, "ul", ulong, 0444,
- "number of MDS setattr service threads to start");
-
-static char *mds_attr_num_cpts;
-CFS_MODULE_PARM(mds_attr_num_cpts, "c", charp, 0444,
- "CPU partitions MDS setattr threads should run on");
-
-/* ptlrpc request handler for MDT. All handlers are
- * grouped into several slices - struct mdt_opc_slice,
- * and stored in an array - mdt_handlers[].
- */
-struct mdt_handler {
- /* The name of this handler. */
- const char *mh_name;
- /* Fail id for this handler, checked at the beginning of this handler*/
- int mh_fail_id;
- /* Operation code for this handler */
- __u32 mh_opc;
- /* flags are listed in enum mdt_handler_flags below. */
- __u32 mh_flags;
- /* The actual handler function to execute. */
- int (*mh_act)(struct mdt_thread_info *info);
- /* Request format for this request. */
- const struct req_format *mh_fmt;
-};
-
-enum mdt_handler_flags {
- /*
- * struct mdt_body is passed in the incoming message, and object
- * identified by this fid exists on disk.
- *
- * "habeo corpus" == "I have a body"
- */
- HABEO_CORPUS = (1 << 0),
- /*
- * struct ldlm_request is passed in the incoming message.
- *
- * "habeo clavis" == "I have a key"
- */
- HABEO_CLAVIS = (1 << 1),
- /*
- * this request has fixed reply format, so that reply message can be
- * packed by generic code.
- *
- * "habeo refero" == "I have a reply"
- */
- HABEO_REFERO = (1 << 2),
- /*
- * this request will modify something, so check whether the filesystem
- * is readonly or not, then return -EROFS to client asap if necessary.
- *
- * "mutabor" == "I shall modify"
- */
- MUTABOR = (1 << 3)
-};
-
-struct mdt_opc_slice {
- __u32 mos_opc_start;
- int mos_opc_end;
- struct mdt_handler *mos_hs;
-};
-
-static struct mdt_opc_slice mdt_regular_handlers[];
-static struct mdt_opc_slice mdt_readpage_handlers[];
-static struct mdt_opc_slice mdt_xmds_handlers[];
-static struct mdt_opc_slice mdt_seq_handlers[];
-static struct mdt_opc_slice mdt_fld_handlers[];
-
static struct mdt_device *mdt_dev(struct lu_device *d);
-static int mdt_regular_handle(struct ptlrpc_request *req);
static int mdt_unpack_req_pack_rep(struct mdt_thread_info *info, __u32 flags);
static int mdt_fid2path(const struct lu_env *env, struct mdt_device *mdt,
struct getinfo_fid2path *fp);
{
lh->mlh_pdo_hash = 0;
lh->mlh_reg_mode = lm;
+ lh->mlh_rreg_mode = lm;
lh->mlh_type = MDT_REG_LOCK;
}
const char *name, int namelen)
{
lh->mlh_reg_mode = lm;
+ lh->mlh_rreg_mode = lm;
lh->mlh_type = MDT_PDO_LOCK;
if (name != NULL && (name[0] != '\0')) {
EXIT;
}
-static int mdt_getstatus(struct mdt_thread_info *info)
+int mdt_getstatus(struct mdt_thread_info *info)
{
struct mdt_device *mdt = info->mti_mdt;
struct md_device *next = mdt->mdt_child;
repbody->valid |= OBD_MD_FLID;
if (mdt->mdt_opts.mo_mds_capa &&
- info->mti_exp->exp_connect_flags & OBD_CONNECT_MDS_CAPA) {
+ exp_connect_flags(info->mti_exp) & OBD_CONNECT_MDS_CAPA) {
struct mdt_object *root;
struct lustre_capa *capa;
RETURN(rc);
}
-static int mdt_statfs(struct mdt_thread_info *info)
+int mdt_statfs(struct mdt_thread_info *info)
{
struct ptlrpc_request *req = mdt_info_req(info);
struct md_device *next = info->mti_mdt->mdt_child;
rc = next->md_ops->mdo_statfs(info->mti_env, next, osfs);
if (rc)
RETURN(rc);
- cfs_spin_lock(&info->mti_mdt->mdt_osfs_lock);
+ spin_lock(&info->mti_mdt->mdt_osfs_lock);
info->mti_mdt->mdt_osfs = *osfs;
info->mti_mdt->mdt_osfs_age = cfs_time_current_64();
- cfs_spin_unlock(&info->mti_mdt->mdt_osfs_lock);
+ spin_unlock(&info->mti_mdt->mdt_osfs_lock);
} else {
/** use cached statfs data */
- cfs_spin_lock(&info->mti_mdt->mdt_osfs_lock);
+ spin_lock(&info->mti_mdt->mdt_osfs_lock);
*osfs = info->mti_mdt->mdt_osfs;
- cfs_spin_unlock(&info->mti_mdt->mdt_osfs_lock);
+ spin_unlock(&info->mti_mdt->mdt_osfs_lock);
}
- if (rc == 0)
+ if (rc == 0)
mdt_counter_incr(req, LPROC_MDT_STATFS);
- RETURN(rc);
+ RETURN(rc);
}
/**
if (fid) {
b->fid1 = *fid;
b->valid |= OBD_MD_FLID;
-
- /* FIXME: these should be fixed when new igif ready.*/
- b->ino = fid_oid(fid); /* 1.6 compatibility */
- b->generation = fid_ver(fid); /* 1.6 compatibility */
- b->valid |= OBD_MD_FLGENER; /* 1.6 compatibility */
-
CDEBUG(D_INODE, DFID": nlink=%d, mode=%o, size="LPU64"\n",
PFID(fid), b->nlink, b->mode, b->size);
}
struct lu_attr *la = &ma->ma_attr;
ENTRY;
- if (exp->exp_connect_flags & OBD_CONNECT_LAYOUTLOCK)
- /* the client can deal with 16-bit lmm_stripe_count */
- RETURN_EXIT;
+ if (exp_connect_layout(exp))
+ /* the client can deal with 16-bit lmm_stripe_count */
+ RETURN_EXIT;
body = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY);
EXIT;
}
-static int mdt_big_lmm_get(const struct lu_env *env, struct mdt_object *o,
- struct md_attr *ma)
+static int mdt_big_xattr_get(struct mdt_thread_info *info, struct mdt_object *o,
+ char *name)
{
- struct mdt_thread_info *info;
+ const struct lu_env *env = info->mti_env;
int rc;
ENTRY;
- info = lu_context_key_get(&env->le_ctx, &mdt_thread_key);
- LASSERT(info != NULL);
- LASSERT(ma->ma_lmm_size > 0);
LASSERT(info->mti_big_lmm_used == 0);
- rc = mo_xattr_get(env, mdt_object_child(o), &LU_BUF_NULL,
- XATTR_NAME_LOV);
+ rc = mo_xattr_get(env, mdt_object_child(o), &LU_BUF_NULL, name);
if (rc < 0)
RETURN(rc);
info->mti_buf.lb_buf = info->mti_big_lmm;
info->mti_buf.lb_len = info->mti_big_lmmsize;
- rc = mo_xattr_get(env, mdt_object_child(o), &info->mti_buf,
- XATTR_NAME_LOV);
- if (rc < 0)
- RETURN(rc);
-
- info->mti_big_lmm_used = 1;
- ma->ma_valid |= MA_LOV;
- ma->ma_lmm = info->mti_big_lmm;
- ma->ma_lmm_size = rc;
-
- /* update mdt_max_mdsize so all clients will be aware about that */
- if (info->mti_mdt->mdt_max_mdsize < rc)
- info->mti_mdt->mdt_max_mdsize = rc;
+ rc = mo_xattr_get(env, mdt_object_child(o), &info->mti_buf, name);
- RETURN(0);
+ RETURN(rc);
}
int mdt_attr_get_lov(struct mdt_thread_info *info,
/* no LOV EA */
rc = 0;
} else if (rc == -ERANGE) {
- rc = mdt_big_lmm_get(info->mti_env, o, ma);
+ rc = mdt_big_xattr_get(info, o, XATTR_NAME_LOV);
+ if (rc > 0) {
+ info->mti_big_lmm_used = 1;
+ ma->ma_valid |= MA_LOV;
+ ma->ma_lmm = info->mti_big_lmm;
+ ma->ma_lmm_size = rc;
+ /* update mdt_max_mdsize so all clients
+ * will be aware about that */
+ if (info->mti_mdt->mdt_max_mdsize < rc)
+ info->mti_mdt->mdt_max_mdsize = rc;
+ rc = 0;
+ }
}
return rc;
}
+int mdt_attr_get_pfid(struct mdt_thread_info *info,
+ struct mdt_object *o, struct lu_fid *pfid)
+{
+ struct lu_buf *buf = &info->mti_buf;
+ struct link_ea_header *leh;
+ struct link_ea_entry *lee;
+ int rc;
+ ENTRY;
+
+ buf->lb_buf = info->mti_big_lmm;
+ buf->lb_len = info->mti_big_lmmsize;
+ rc = mo_xattr_get(info->mti_env, mdt_object_child(o),
+ buf, XATTR_NAME_LINK);
+ /* ignore errors, MA_PFID won't be set and it is
+ * up to the caller to treat this as an error */
+ if (rc == -ERANGE || buf->lb_len == 0) {
+ rc = mdt_big_xattr_get(info, o, XATTR_NAME_LINK);
+ buf->lb_buf = info->mti_big_lmm;
+ buf->lb_len = info->mti_big_lmmsize;
+ }
+
+ if (rc < 0)
+ RETURN(rc);
+ if (rc < sizeof(*leh)) {
+ CERROR("short LinkEA on "DFID": rc = %d\n",
+ PFID(mdt_object_fid(o)), rc);
+ RETURN(-ENODATA);
+ }
+
+ leh = (struct link_ea_header *) buf->lb_buf;
+ lee = (struct link_ea_entry *)(leh + 1);
+ if (leh->leh_magic == __swab32(LINK_EA_MAGIC)) {
+ leh->leh_magic = LINK_EA_MAGIC;
+ leh->leh_reccount = __swab32(leh->leh_reccount);
+ leh->leh_len = __swab64(leh->leh_len);
+ }
+ if (leh->leh_magic != LINK_EA_MAGIC)
+ RETURN(-EINVAL);
+ if (leh->leh_reccount == 0)
+ RETURN(-ENODATA);
+
+ memcpy(pfid, &lee->lee_parent_fid, sizeof(*pfid));
+ fid_be_to_cpu(pfid, pfid);
+
+ RETURN(0);
+}
+
int mdt_attr_get_complex(struct mdt_thread_info *info,
struct mdt_object *o, struct md_attr *ma)
{
int rc = 0, rc2;
ENTRY;
- /* do we really need PFID */
- LASSERT((ma->ma_need & MA_PFID) == 0);
-
ma->ma_valid = 0;
if (need & MA_INODE) {
ma->ma_valid |= MA_INODE;
}
+ if (need & MA_PFID) {
+ rc = mdt_attr_get_pfid(info, o, &ma->ma_pfid);
+ if (rc == 0)
+ ma->ma_valid |= MA_PFID;
+ /* ignore this error, parent fid is not mandatory */
+ rc = 0;
+ }
+
if (need & MA_LOV && (S_ISREG(mode) || S_ISDIR(mode))) {
rc = mdt_attr_get_lov(info, o, ma);
if (rc)
GOTO(out, rc = rc2);
}
+ if (need & MA_SOM && S_ISREG(mode)) {
+ buf->lb_buf = info->mti_xattr_buf;
+ buf->lb_len = sizeof(info->mti_xattr_buf);
+ CLASSERT(sizeof(struct som_attrs) <=
+ sizeof(info->mti_xattr_buf));
+ rc2 = mo_xattr_get(info->mti_env, next, buf, XATTR_NAME_SOM);
+ rc2 = lustre_buf2som(info->mti_xattr_buf, rc2, ma->ma_som);
+ if (rc2 == 0)
+ ma->ma_valid |= MA_SOM;
+ else if (rc2 < 0 && rc2 != -ENODATA)
+ GOTO(out, rc = rc2);
+ }
- if (rc == 0 && S_ISREG(mode) && (need & (MA_HSM | MA_SOM))) {
- struct lustre_mdt_attrs *lma;
-
- lma = (struct lustre_mdt_attrs *)info->mti_xattr_buf;
- CLASSERT(sizeof(*lma) <= sizeof(info->mti_xattr_buf));
-
- buf->lb_buf = lma;
+ if (need & MA_HSM && S_ISREG(mode)) {
+ buf->lb_buf = info->mti_xattr_buf;
buf->lb_len = sizeof(info->mti_xattr_buf);
- rc = mo_xattr_get(env, next, buf, XATTR_NAME_LMA);
- if (rc > 0) {
- lustre_lma_swab(lma);
- /* Swab and copy LMA */
- if (need & MA_HSM) {
- if (lma->lma_compat & LMAC_HSM)
- ma->ma_hsm.mh_flags =
- lma->lma_flags & HSM_FLAGS_MASK;
- else
- ma->ma_hsm.mh_flags = 0;
- ma->ma_valid |= MA_HSM;
- }
- /* Copy SOM */
- if (need & MA_SOM && lma->lma_compat & LMAC_SOM) {
- LASSERT(ma->ma_som != NULL);
- ma->ma_som->msd_ioepoch = lma->lma_ioepoch;
- ma->ma_som->msd_size = lma->lma_som_size;
- ma->ma_som->msd_blocks = lma->lma_som_blocks;
- ma->ma_som->msd_mountid = lma->lma_som_mountid;
- ma->ma_valid |= MA_SOM;
- }
- rc = 0;
- } else if (rc == -ENODATA) {
- rc = 0;
- }
+ CLASSERT(sizeof(struct hsm_attrs) <=
+ sizeof(info->mti_xattr_buf));
+ rc2 = mo_xattr_get(info->mti_env, next, buf, XATTR_NAME_HSM);
+ rc2 = lustre_buf2hsm(info->mti_xattr_buf, rc2, &ma->ma_hsm);
+ if (rc2 == 0)
+ ma->ma_valid |= MA_HSM;
+ else if (rc2 < 0 && rc2 != -ENODATA)
+ GOTO(out, rc = rc2);
}
#ifdef CONFIG_FS_POSIX_ACL
ma->ma_valid = 0;
- rc = mdt_object_exists(o);
- if (rc < 0) {
- /* This object is located on remote node.*/
- repbody->fid1 = *mdt_object_fid(o);
- repbody->valid = OBD_MD_FLID | OBD_MD_MDS;
- GOTO(out, rc = 0);
- }
+ if (mdt_object_remote(o)) {
+ /* This object is located on remote node.*/
+ /* Return -EIO for old client */
+ if (!mdt_is_dne_client(req->rq_export))
+ GOTO(out, rc = -EIO);
+
+ repbody->fid1 = *mdt_object_fid(o);
+ repbody->valid = OBD_MD_FLID | OBD_MD_MDS;
+ GOTO(out, rc = 0);
+ }
buffer->lb_len = reqbody->eadatasize;
if (buffer->lb_len > 0)
} else {
ma->ma_lmm = buffer->lb_buf;
ma->ma_lmm_size = buffer->lb_len;
- ma->ma_need = MA_LOV | MA_INODE;
+ ma->ma_need = MA_LOV | MA_INODE | MA_HSM;
}
if (S_ISDIR(lu_object_attr(&next->mo_lu)) &&
}
if (reqbody->valid & OBD_MD_FLMODEASIZE) {
- repbody->max_cookiesize = info->mti_mdt->mdt_max_cookiesize;
+ repbody->max_cookiesize = 0;
repbody->max_mdsize = info->mti_mdt->mdt_max_mdsize;
repbody->valid |= OBD_MD_FLMODEASIZE;
CDEBUG(D_INODE, "I am going to change the MAX_MD_SIZE & "
}
}
#ifdef CONFIG_FS_POSIX_ACL
- else if ((req->rq_export->exp_connect_flags & OBD_CONNECT_ACL) &&
- (reqbody->valid & OBD_MD_FLACL)) {
+ else if ((exp_connect_flags(req->rq_export) & OBD_CONNECT_ACL) &&
+ (reqbody->valid & OBD_MD_FLACL)) {
buffer->lb_buf = req_capsule_server_get(pill, &RMF_ACL);
buffer->lb_len = req_capsule_get_size(pill,
&RMF_ACL, RCL_SERVER);
}
#endif
- if (reqbody->valid & OBD_MD_FLMDSCAPA &&
- info->mti_mdt->mdt_opts.mo_mds_capa &&
- info->mti_exp->exp_connect_flags & OBD_CONNECT_MDS_CAPA) {
+ if (reqbody->valid & OBD_MD_FLMDSCAPA &&
+ info->mti_mdt->mdt_opts.mo_mds_capa &&
+ exp_connect_flags(info->mti_exp) & OBD_CONNECT_MDS_CAPA) {
struct lustre_capa *capa;
capa = req_capsule_server_get(pill, &RMF_CAPA1);
* return directly, client will find body->valid OBD_MD_FLOSSCAPA
* flag not set.
*/
- if (!obj || !info->mti_mdt->mdt_opts.mo_oss_capa ||
- !(info->mti_exp->exp_connect_flags & OBD_CONNECT_OSS_CAPA))
- RETURN(0);
+ if (!obj || !info->mti_mdt->mdt_opts.mo_oss_capa ||
+ !(exp_connect_flags(info->mti_exp) & OBD_CONNECT_OSS_CAPA))
+ RETURN(0);
body = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY);
LASSERT(body != NULL);
RETURN(rc);
}
-static int mdt_getattr(struct mdt_thread_info *info)
+int mdt_getattr(struct mdt_thread_info *info)
{
struct mdt_object *obj = info->mti_object;
struct req_capsule *pill = info->mti_pill;
if (unlikely(rc))
GOTO(out_shrink, rc);
- info->mti_spec.sp_ck_split = !!(reqbody->valid & OBD_MD_FLCKSPLIT);
info->mti_cross_ref = !!(reqbody->valid & OBD_MD_FLCROSSREF);
/*
return rc;
}
-static int mdt_is_subdir(struct mdt_thread_info *info)
+int mdt_is_subdir(struct mdt_thread_info *info)
{
struct mdt_object *o = info->mti_object;
struct req_capsule *pill = info->mti_pill;
repbody = req_capsule_server_get(pill, &RMF_MDT_BODY);
- /*
- * We save last checked parent fid to @repbody->fid1 for remote
- * directory case.
- */
- LASSERT(fid_is_sane(&body->fid2));
- LASSERT(mdt_object_exists(o) > 0);
- rc = mdo_is_subdir(info->mti_env, mdt_object_child(o),
- &body->fid2, &repbody->fid1);
- if (rc == 0 || rc == -EREMOTE)
- repbody->valid |= OBD_MD_FLID;
+ /*
+ * We save last checked parent fid to @repbody->fid1 for remote
+ * directory case.
+ */
+ LASSERT(fid_is_sane(&body->fid2));
+ LASSERT(mdt_object_exists(o) && !mdt_object_remote(o));
+ rc = mdo_is_subdir(info->mti_env, mdt_object_child(o),
+ &body->fid2, &repbody->fid1);
+ if (rc == 0 || rc == -EREMOTE)
+ repbody->valid |= OBD_MD_FLID;
- RETURN(rc);
+ RETURN(rc);
+}
+
+int mdt_swap_layouts(struct mdt_thread_info *info)
+{
+ struct ptlrpc_request *req = mdt_info_req(info);
+ struct obd_export *exp = req->rq_export;
+ struct mdt_object *o1, *o2, *o;
+ struct mdt_lock_handle *lh1, *lh2;
+ struct mdc_swap_layouts *msl;
+ int rc;
+ ENTRY;
+
+ /* client does not support layout lock, so layout swaping
+ * is disabled.
+ * FIXME: there is a problem for old clients which don't support
+ * layout lock yet. If those clients have already opened the file
+ * they won't be notified at all so that old layout may still be
+ * used to do IO. This can be fixed after file release is landed by
+ * doing exclusive open and taking full EX ibits lock. - Jinshan */
+ if (!exp_connect_layout(exp))
+ RETURN(-EOPNOTSUPP);
+
+ if (req_capsule_get_size(info->mti_pill, &RMF_CAPA1, RCL_CLIENT))
+ mdt_set_capainfo(info, 0, &info->mti_body->fid1,
+ req_capsule_client_get(info->mti_pill,
+ &RMF_CAPA1));
+
+ if (req_capsule_get_size(info->mti_pill, &RMF_CAPA2, RCL_CLIENT))
+ mdt_set_capainfo(info, 1, &info->mti_body->fid2,
+ req_capsule_client_get(info->mti_pill,
+ &RMF_CAPA2));
+
+ o1 = info->mti_object;
+ o = o2 = mdt_object_find(info->mti_env, info->mti_mdt,
+ &info->mti_body->fid2);
+ if (IS_ERR(o))
+ GOTO(out, rc = PTR_ERR(o));
+
+ if (mdt_object_exists(o) < 0) /* remote object */
+ GOTO(put, rc = -ENOENT);
+
+ rc = lu_fid_cmp(&info->mti_body->fid1, &info->mti_body->fid2);
+ if (unlikely(rc == 0)) /* same file, you kidding me? no-op. */
+ GOTO(put, rc);
+
+ if (rc < 0)
+ swap(o1, o2);
+
+ /* permission check. Make sure the calling process having permission
+ * to write both files. */
+ rc = mo_permission(info->mti_env, NULL, mdt_object_child(o1), NULL,
+ MAY_WRITE);
+ if (rc < 0)
+ GOTO(put, rc);
+
+ rc = mo_permission(info->mti_env, NULL, mdt_object_child(o2), NULL,
+ MAY_WRITE);
+ if (rc < 0)
+ GOTO(put, rc);
+
+ msl = req_capsule_client_get(info->mti_pill, &RMF_SWAP_LAYOUTS);
+ LASSERT(msl != NULL);
+
+ lh1 = &info->mti_lh[MDT_LH_NEW];
+ mdt_lock_reg_init(lh1, LCK_EX);
+ lh2 = &info->mti_lh[MDT_LH_OLD];
+ mdt_lock_reg_init(lh2, LCK_EX);
+
+ rc = mdt_object_lock(info, o1, lh1, MDS_INODELOCK_LAYOUT,
+ MDT_LOCAL_LOCK);
+ if (rc < 0)
+ GOTO(put, rc);
+
+ rc = mdt_object_lock(info, o2, lh2, MDS_INODELOCK_LAYOUT,
+ MDT_LOCAL_LOCK);
+ if (rc < 0)
+ GOTO(unlock1, rc);
+
+ rc = mo_swap_layouts(info->mti_env, mdt_object_child(o1),
+ mdt_object_child(o2), msl->msl_flags);
+ GOTO(unlock2, rc);
+unlock2:
+ mdt_object_unlock(info, o2, lh2, rc);
+unlock1:
+ mdt_object_unlock(info, o1, lh1, rc);
+put:
+ mdt_object_put(info->mti_env, o);
+out:
+ RETURN(rc);
}
static int mdt_raw_lookup(struct mdt_thread_info *info,
}
mdt_set_disposition(info, ldlm_rep, DISP_LOOKUP_EXECD);
- rc = mdt_object_exists(parent);
- if (unlikely(rc == 0)) {
+ if (unlikely(!mdt_object_exists(parent))) {
LU_OBJECT_DEBUG(D_INODE, info->mti_env,
&parent->mot_obj.mo_lu,
"Parent doesn't exist!\n");
RETURN(-ESTALE);
} else if (!info->mti_cross_ref) {
- LASSERTF(rc > 0, "Parent "DFID" is on remote server\n",
+ LASSERTF(!mdt_object_remote(parent),
+ "Parent "DFID" is on remote server\n",
PFID(mdt_object_fid(parent)));
}
if (lname) {
* needed here but update is.
*/
child_bits &= ~MDS_INODELOCK_LOOKUP;
- child_bits |= MDS_INODELOCK_UPDATE;
+ child_bits |= MDS_INODELOCK_PERM | MDS_INODELOCK_UPDATE;
- rc = mdt_object_lock(info, child, lhc, child_bits,
+ rc = mdt_object_lock(info, child, lhc, child_bits,
MDT_LOCAL_LOCK);
}
if (rc == 0) {
LDLM_LOCK_PUT(lock);
rc = 0;
} else {
+ bool try_layout = false;
+
relock:
OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_RESEND, obd_timeout*2);
mdt_lock_handle_init(lhc);
- if (child_bits == MDS_INODELOCK_LAYOUT)
- mdt_lock_reg_init(lhc, LCK_CR);
- else
- mdt_lock_reg_init(lhc, LCK_PR);
+ mdt_lock_reg_init(lhc, LCK_PR);
- if (mdt_object_exists(child) == 0) {
- LU_OBJECT_DEBUG(D_INODE, info->mti_env,
- &child->mot_obj.mo_lu,
- "Object doesn't exist!\n");
- GOTO(out_child, rc = -ENOENT);
- }
+ if (!mdt_object_exists(child)) {
+ LU_OBJECT_DEBUG(D_INODE, info->mti_env,
+ &child->mot_obj.mo_lu,
+ "Object doesn't exist!\n");
+ GOTO(out_child, rc = -ENOENT);
+ }
- if (!(child_bits & MDS_INODELOCK_UPDATE)) {
+ if (!(child_bits & MDS_INODELOCK_UPDATE) &&
+ mdt_object_exists(child) && !mdt_object_remote(child)) {
struct md_attr *ma = &info->mti_attr;
ma->ma_valid = 0;
if (unlikely(rc != 0))
GOTO(out_child, rc);
- /* layout lock is used only on regular files */
- if ((ma->ma_valid & MA_INODE) &&
- (ma->ma_attr.la_valid & LA_MODE) &&
- !S_ISREG(ma->ma_attr.la_mode))
- child_bits &= ~MDS_INODELOCK_LAYOUT;
-
/* If the file has not been changed for some time, we
* return not only a LOOKUP lock, but also an UPDATE
* lock and this might save us RPC on later STAT. For
child_bits |= MDS_INODELOCK_UPDATE;
}
- rc = mdt_object_lock(info, child, lhc, child_bits,
- MDT_CROSS_LOCK);
+ /* layout lock must be granted in a best-effort way
+ * for IT operations */
+ LASSERT(!(child_bits & MDS_INODELOCK_LAYOUT));
+ if (!OBD_FAIL_CHECK(OBD_FAIL_MDS_NO_LL_GETATTR) &&
+ exp_connect_layout(info->mti_exp) &&
+ S_ISREG(lu_object_attr(&child->mot_obj.mo_lu)) &&
+ ldlm_rep != NULL) {
+ /* try to grant layout lock for regular file. */
+ try_layout = true;
+ }
+ rc = 0;
+ if (try_layout) {
+ child_bits |= MDS_INODELOCK_LAYOUT;
+ /* try layout lock, it may fail to be granted due to
+ * contention at LOOKUP or UPDATE */
+ if (!mdt_object_lock_try(info, child, lhc, child_bits,
+ MDT_CROSS_LOCK)) {
+ child_bits &= ~MDS_INODELOCK_LAYOUT;
+ LASSERT(child_bits != 0);
+ rc = mdt_object_lock(info, child, lhc,
+ child_bits, MDT_CROSS_LOCK);
+ } else {
+ ma_need |= MA_LOV;
+ }
+ } else {
+ rc = mdt_object_lock(info, child, lhc, child_bits,
+ MDT_CROSS_LOCK);
+ }
if (unlikely(rc != 0))
GOTO(out_child, rc);
}
if (lock &&
lock->l_policy_data.l_inodebits.bits & MDS_INODELOCK_UPDATE &&
S_ISREG(lu_object_attr(&mdt_object_child(child)->mo_lu)))
- ma_need = MA_SOM;
+ ma_need |= MA_SOM;
/* finally, we can get attr for child. */
mdt_set_capainfo(info, 1, child_fid, BYPASS_CAPA);
(unsigned long)res_id->name[1],
(unsigned long)res_id->name[2],
PFID(mdt_object_fid(child)));
- mdt_pack_size2body(info, child);
+ if (mdt_object_exists(child) && !mdt_object_remote(child))
+ mdt_pack_size2body(info, child);
}
if (lock)
LDLM_LOCK_PUT(lock);
}
/* normal handler: should release the child lock */
-static int mdt_getattr_name(struct mdt_thread_info *info)
+int mdt_getattr_name(struct mdt_thread_info *info)
{
struct mdt_lock_handle *lhc = &info->mti_lh[MDT_LH_CHILD];
struct mdt_body *reqbody;
repbody = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY);
LASSERT(repbody != NULL);
- info->mti_spec.sp_ck_split = !!(reqbody->valid & OBD_MD_FLCKSPLIT);
info->mti_cross_ref = !!(reqbody->valid & OBD_MD_FLCROSSREF);
repbody->eadatasize = 0;
repbody->aclsize = 0;
return rc;
}
-static const struct lu_device_operations mdt_lu_ops;
-
-static int lu_device_is_mdt(struct lu_device *d)
-{
- return ergo(d != NULL && d->ld_ops != NULL, d->ld_ops == &mdt_lu_ops);
-}
-
static int mdt_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
void *karg, void *uarg);
-static int mdt_set_info(struct mdt_thread_info *info)
+int mdt_set_info(struct mdt_thread_info *info)
{
struct ptlrpc_request *req = mdt_info_req(info);
char *key;
req->rq_status = 0;
lustre_msg_set_status(req->rq_repmsg, 0);
- cfs_spin_lock(&req->rq_export->exp_lock);
- if (*(__u32 *)val)
- req->rq_export->exp_connect_flags |= OBD_CONNECT_RDONLY;
- else
- req->rq_export->exp_connect_flags &=~OBD_CONNECT_RDONLY;
- cfs_spin_unlock(&req->rq_export->exp_lock);
+ spin_lock(&req->rq_export->exp_lock);
+ if (*(__u32 *)val)
+ *exp_connect_flags_ptr(req->rq_export) |=
+ OBD_CONNECT_RDONLY;
+ else
+ *exp_connect_flags_ptr(req->rq_export) &=
+ ~OBD_CONNECT_RDONLY;
+ spin_unlock(&req->rq_export->exp_lock);
} else if (KEY_IS(KEY_CHANGELOG_CLEAR)) {
struct changelog_setinfo *cs =
RETURN(0);
}
-static int mdt_connect(struct mdt_thread_info *info)
+/**
+ * Top-level handler for MDT connection requests.
+ */
+int mdt_connect(struct mdt_thread_info *info)
{
- int rc;
- struct ptlrpc_request *req;
+ int rc;
+ struct obd_connect_data *reply;
+ struct obd_export *exp;
+ struct ptlrpc_request *req = mdt_info_req(info);
+
+ rc = target_handle_connect(req);
+ if (rc != 0)
+ return err_serious(rc);
+
+ LASSERT(req->rq_export != NULL);
+ info->mti_mdt = mdt_dev(req->rq_export->exp_obd->obd_lu_dev);
+ rc = mdt_init_sec_level(info);
+ if (rc != 0) {
+ obd_disconnect(class_export_get(req->rq_export));
+ return rc;
+ }
- req = mdt_info_req(info);
- rc = target_handle_connect(req);
- if (rc == 0) {
- LASSERT(req->rq_export != NULL);
- info->mti_mdt = mdt_dev(req->rq_export->exp_obd->obd_lu_dev);
- rc = mdt_init_sec_level(info);
- if (rc == 0)
- rc = mdt_init_idmap(info);
- if (rc != 0)
- obd_disconnect(class_export_get(req->rq_export));
- } else {
- rc = err_serious(rc);
- }
- return rc;
+ /* To avoid exposing partially initialized connection flags, changes up
+ * to this point have been staged in reply->ocd_connect_flags. Now that
+ * connection handling has completed successfully, atomically update
+ * the connect flags in the shared export data structure. LU-1623 */
+ reply = req_capsule_server_get(info->mti_pill, &RMF_CONNECT_DATA);
+ exp = req->rq_export;
+ spin_lock(&exp->exp_lock);
+ *exp_connect_flags_ptr(exp) = reply->ocd_connect_flags;
+ spin_unlock(&exp->exp_lock);
+
+ rc = mdt_init_idmap(info);
+ if (rc != 0)
+ obd_disconnect(class_export_get(req->rq_export));
+
+ return rc;
}
-static int mdt_disconnect(struct mdt_thread_info *info)
+int mdt_disconnect(struct mdt_thread_info *info)
{
int rc;
ENTRY;
int rc;
ENTRY;
- desc = ptlrpc_prep_bulk_exp(req, rdpg->rp_npages, BULK_PUT_SOURCE,
- MDS_BULK_PORTAL);
- if (desc == NULL)
- RETURN(-ENOMEM);
+ desc = ptlrpc_prep_bulk_exp(req, rdpg->rp_npages, 1, BULK_PUT_SOURCE,
+ MDS_BULK_PORTAL);
+ if (desc == NULL)
+ RETURN(-ENOMEM);
- if (!(exp->exp_connect_flags & OBD_CONNECT_BRW_SIZE))
+ if (!(exp_connect_flags(exp) & OBD_CONNECT_BRW_SIZE))
/* old client requires reply size in it's PAGE_SIZE,
- * which is rdpg->rp_count */
+ * which is rdpg->rp_count */
nob = rdpg->rp_count;
for (i = 0, tmpcount = nob; i < rdpg->rp_npages && tmpcount > 0;
i++, tmpcount -= tmpsize) {
tmpsize = min_t(int, tmpcount, CFS_PAGE_SIZE);
- ptlrpc_prep_bulk_page(desc, rdpg->rp_pages[i], 0, tmpsize);
+ ptlrpc_prep_bulk_page_pin(desc, rdpg->rp_pages[i], 0, tmpsize);
}
LASSERT(desc->bd_nob == nob);
rc = target_bulk_io(exp, desc, lwi);
- ptlrpc_free_bulk(desc);
- RETURN(rc);
-}
-
-#ifdef HAVE_SPLIT_SUPPORT
-/*
- * Retrieve dir entry from the page and insert it to the slave object, actually,
- * this should be in osd layer, but since it will not in the final product, so
- * just do it here and do not define more moo api anymore for this.
- */
-static int mdt_write_dir_page(struct mdt_thread_info *info, struct page *page,
- int size)
-{
- struct mdt_object *object = info->mti_object;
- struct lu_fid *lf = &info->mti_tmp_fid2;
- struct md_attr *ma = &info->mti_attr;
- struct lu_dirpage *dp;
- struct lu_dirent *ent;
- int rc = 0, offset = 0;
- ENTRY;
-
- /* Make sure we have at least one entry. */
- if (size == 0)
- RETURN(-EINVAL);
-
- /*
- * Disable trans for this name insert, since it will include many trans
- * for this.
- */
- info->mti_no_need_trans = 1;
- /*
- * When write_dir_page, no need update parent's ctime,
- * and no permission check for name_insert.
- */
- ma->ma_attr.la_ctime = 0;
- ma->ma_attr.la_valid = LA_MODE;
- ma->ma_valid = MA_INODE;
-
- cfs_kmap(page);
- dp = page_address(page);
- offset = (int)((__u32)lu_dirent_start(dp) - (__u32)dp);
-
- for (ent = lu_dirent_start(dp); ent != NULL;
- ent = lu_dirent_next(ent)) {
- struct lu_name *lname;
- char *name;
-
- if (le16_to_cpu(ent->lde_namelen) == 0)
- continue;
-
- fid_le_to_cpu(lf, &ent->lde_fid);
- if (le64_to_cpu(ent->lde_hash) & MAX_HASH_HIGHEST_BIT)
- ma->ma_attr.la_mode = S_IFDIR;
- else
- ma->ma_attr.la_mode = 0;
- OBD_ALLOC(name, le16_to_cpu(ent->lde_namelen) + 1);
- if (name == NULL)
- GOTO(out, rc = -ENOMEM);
-
- memcpy(name, ent->lde_name, le16_to_cpu(ent->lde_namelen));
- lname = mdt_name(info->mti_env, name,
- le16_to_cpu(ent->lde_namelen));
- ma->ma_attr_flags |= (MDS_PERM_BYPASS | MDS_QUOTA_IGNORE);
- rc = mdo_name_insert(info->mti_env,
- md_object_next(&object->mot_obj),
- lname, lf, ma);
- OBD_FREE(name, le16_to_cpu(ent->lde_namelen) + 1);
- if (rc) {
- CERROR("Can't insert %*.*s, rc %d\n",
- le16_to_cpu(ent->lde_namelen),
- le16_to_cpu(ent->lde_namelen),
- ent->lde_name, rc);
- GOTO(out, rc);
- }
-
- offset += lu_dirent_size(ent);
- if (offset >= size)
- break;
- }
- EXIT;
-out:
- cfs_kunmap(page);
- return rc;
-}
-
-static int mdt_bulk_timeout(void *data)
-{
- ENTRY;
-
- CERROR("mdt bulk transfer timeout \n");
-
- RETURN(1);
-}
-
-static int mdt_writepage(struct mdt_thread_info *info)
-{
- struct ptlrpc_request *req = mdt_info_req(info);
- struct mdt_body *reqbody;
- struct l_wait_info *lwi;
- struct ptlrpc_bulk_desc *desc;
- struct page *page;
- int rc;
- ENTRY;
-
-
- reqbody = req_capsule_client_get(info->mti_pill, &RMF_MDT_BODY);
- if (reqbody == NULL)
- RETURN(err_serious(-EFAULT));
-
- desc = ptlrpc_prep_bulk_exp(req, 1, BULK_GET_SINK, MDS_BULK_PORTAL);
- if (desc == NULL)
- RETURN(err_serious(-ENOMEM));
-
- /* allocate the page for the desc */
- page = cfs_alloc_page(CFS_ALLOC_STD);
- if (page == NULL)
- GOTO(desc_cleanup, rc = -ENOMEM);
-
- CDEBUG(D_INFO, "Received page offset %d size %d \n",
- (int)reqbody->size, (int)reqbody->nlink);
-
- ptlrpc_prep_bulk_page(desc, page, (int)reqbody->size,
- (int)reqbody->nlink);
-
- rc = sptlrpc_svc_prep_bulk(req, desc);
- if (rc != 0)
- GOTO(cleanup_page, rc);
- /*
- * Check if client was evicted while we were doing i/o before touching
- * network.
- */
- OBD_ALLOC_PTR(lwi);
- if (!lwi)
- GOTO(cleanup_page, rc = -ENOMEM);
-
- if (desc->bd_export->exp_failed)
- rc = -ENOTCONN;
- else
- rc = ptlrpc_start_bulk_transfer (desc);
- if (rc == 0) {
- *lwi = LWI_TIMEOUT_INTERVAL(obd_timeout * CFS_HZ / 4, CFS_HZ,
- mdt_bulk_timeout, desc);
- rc = l_wait_event(desc->bd_waitq, !ptlrpc_bulk_active(desc) ||
- desc->bd_export->exp_failed, lwi);
- LASSERT(rc == 0 || rc == -ETIMEDOUT);
- if (rc == -ETIMEDOUT) {
- DEBUG_REQ(D_ERROR, req, "timeout on bulk GET");
- ptlrpc_abort_bulk(desc);
- } else if (desc->bd_export->exp_failed) {
- DEBUG_REQ(D_ERROR, req, "Eviction on bulk GET");
- rc = -ENOTCONN;
- ptlrpc_abort_bulk(desc);
- } else if (!desc->bd_success ||
- desc->bd_nob_transferred != desc->bd_nob) {
- DEBUG_REQ(D_ERROR, req, "%s bulk GET %d(%d)",
- desc->bd_success ?
- "truncated" : "network error on",
- desc->bd_nob_transferred, desc->bd_nob);
- /* XXX should this be a different errno? */
- rc = -ETIMEDOUT;
- }
- } else {
- DEBUG_REQ(D_ERROR, req, "ptlrpc_bulk_get failed: rc %d", rc);
- }
- if (rc)
- GOTO(cleanup_lwi, rc);
- rc = mdt_write_dir_page(info, page, reqbody->nlink);
-
-cleanup_lwi:
- OBD_FREE_PTR(lwi);
-cleanup_page:
- cfs_free_page(page);
-desc_cleanup:
- ptlrpc_free_bulk(desc);
+ ptlrpc_free_bulk_pin(desc);
RETURN(rc);
}
-#endif
-static int mdt_readpage(struct mdt_thread_info *info)
+int mdt_readpage(struct mdt_thread_info *info)
{
struct mdt_object *object = info->mti_object;
struct lu_rdpg *rdpg = &info->mti_u.rdpg.mti_rdpg;
}
rdpg->rp_attrs = reqbody->mode;
- if (info->mti_exp->exp_connect_flags & OBD_CONNECT_64BITHASH)
- rdpg->rp_attrs |= LUDA_64BITHASH;
- rdpg->rp_count = min_t(unsigned int, reqbody->nlink,
- PTLRPC_MAX_BRW_SIZE);
+ if (exp_connect_flags(info->mti_exp) & OBD_CONNECT_64BITHASH)
+ rdpg->rp_attrs |= LUDA_64BITHASH;
+ rdpg->rp_count = min_t(unsigned int, reqbody->nlink,
+ exp_max_brw_size(info->mti_exp));
rdpg->rp_npages = (rdpg->rp_count + CFS_PAGE_SIZE - 1) >>
CFS_PAGE_SHIFT;
OBD_ALLOC(rdpg->rp_pages, rdpg->rp_npages * sizeof rdpg->rp_pages[0]);
__u32 op)
{
struct req_capsule *pill = info->mti_pill;
- struct md_quota *mq = md_quota(info->mti_env);
struct mdt_body *repbody;
int rc = 0, rc2;
ENTRY;
req_capsule_set_size(pill, &RMF_MDT_MD, RCL_SERVER,
info->mti_rr.rr_eadatalen);
+ /* llog cookies are always 0, the field is kept for compatibility */
if (req_capsule_has_field(pill, &RMF_LOGCOOKIES, RCL_SERVER))
- req_capsule_set_size(pill, &RMF_LOGCOOKIES, RCL_SERVER,
- info->mti_mdt->mdt_max_cookiesize);
+ req_capsule_set_size(pill, &RMF_LOGCOOKIES, RCL_SERVER, 0);
rc = req_capsule_server_pack(pill);
if (rc != 0) {
rc = lustre_msg_get_status(mdt_info_req(info)->rq_repmsg);
GOTO(out_ucred, rc);
}
- mq->mq_exp = info->mti_exp;
rc = mdt_reint_rec(info, lhc);
EXIT;
out_ucred:
return opc;
}
-static int mdt_reint(struct mdt_thread_info *info)
+int mdt_reint(struct mdt_thread_info *info)
{
- long opc;
- int rc;
-
- static const struct req_format *reint_fmts[REINT_MAX] = {
- [REINT_SETATTR] = &RQF_MDS_REINT_SETATTR,
- [REINT_CREATE] = &RQF_MDS_REINT_CREATE,
- [REINT_LINK] = &RQF_MDS_REINT_LINK,
- [REINT_UNLINK] = &RQF_MDS_REINT_UNLINK,
- [REINT_RENAME] = &RQF_MDS_REINT_RENAME,
- [REINT_OPEN] = &RQF_MDS_REINT_OPEN,
- [REINT_SETXATTR] = &RQF_MDS_REINT_SETXATTR
- };
+ long opc;
+ int rc;
+
+ static const struct req_format *reint_fmts[REINT_MAX] = {
+ [REINT_SETATTR] = &RQF_MDS_REINT_SETATTR,
+ [REINT_CREATE] = &RQF_MDS_REINT_CREATE,
+ [REINT_LINK] = &RQF_MDS_REINT_LINK,
+ [REINT_UNLINK] = &RQF_MDS_REINT_UNLINK,
+ [REINT_RENAME] = &RQF_MDS_REINT_RENAME,
+ [REINT_OPEN] = &RQF_MDS_REINT_OPEN,
+ [REINT_SETXATTR] = &RQF_MDS_REINT_SETXATTR,
+ [REINT_RMENTRY] = &RQF_MDS_REINT_UNLINK
+ };
ENTRY;
RETURN(rc);
}
-static int mdt_sync(struct mdt_thread_info *info)
+int mdt_sync(struct mdt_thread_info *info)
{
struct ptlrpc_request *req = mdt_info_req(info);
struct req_capsule *pill = info->mti_pill;
RETURN(rc);
}
-#ifdef HAVE_QUOTA_SUPPORT
-static int mdt_quotacheck_handle(struct mdt_thread_info *info)
+/*
+ * Quotacheck handler.
+ * in-kernel quotacheck isn't supported any more.
+ */
+int mdt_quotacheck(struct mdt_thread_info *info)
{
- struct obd_quotactl *oqctl;
- struct req_capsule *pill = info->mti_pill;
- struct obd_export *exp = info->mti_exp;
- struct md_quota *mq = md_quota(info->mti_env);
- struct md_device *next = info->mti_mdt->mdt_child;
- int rc;
- ENTRY;
-
- oqctl = req_capsule_client_get(pill, &RMF_OBD_QUOTACTL);
- if (oqctl == NULL)
- RETURN(-EPROTO);
+ struct obd_quotactl *oqctl;
+ int rc;
+ ENTRY;
- /* remote client has no permission for quotacheck */
- if (unlikely(exp_connect_rmtclient(exp)))
- RETURN(-EPERM);
+ oqctl = req_capsule_client_get(info->mti_pill, &RMF_OBD_QUOTACTL);
+ if (oqctl == NULL)
+ RETURN(err_serious(-EPROTO));
- rc = req_capsule_server_pack(pill);
- if (rc)
- RETURN(rc);
+ rc = req_capsule_server_pack(info->mti_pill);
+ if (rc)
+ RETURN(err_serious(rc));
- mq->mq_exp = exp;
- rc = next->md_ops->mdo_quota.mqo_check(info->mti_env, next,
- oqctl->qc_type);
- RETURN(rc);
+ /* deprecated, not used any more */
+ RETURN(-EOPNOTSUPP);
}
-static int mdt_quotactl_handle(struct mdt_thread_info *info)
+/*
+ * Handle quota control requests to consult current usage/limit, but also
+ * to configure quota enforcement
+ */
+int mdt_quotactl(struct mdt_thread_info *info)
{
- struct obd_quotactl *oqctl, *repoqc;
- struct req_capsule *pill = info->mti_pill;
- struct obd_export *exp = info->mti_exp;
- struct md_quota *mq = md_quota(info->mti_env);
- struct md_device *next = info->mti_mdt->mdt_child;
- const struct md_quota_operations *mqo = &next->md_ops->mdo_quota;
- int id, rc;
- ENTRY;
+ struct obd_export *exp = info->mti_exp;
+ struct req_capsule *pill = info->mti_pill;
+ struct obd_quotactl *oqctl, *repoqc;
+ int id, rc;
+ struct lu_device *qmt = info->mti_mdt->mdt_qmt_dev;
+ ENTRY;
- oqctl = req_capsule_client_get(pill, &RMF_OBD_QUOTACTL);
- if (oqctl == NULL)
- RETURN(-EPROTO);
+ oqctl = req_capsule_client_get(pill, &RMF_OBD_QUOTACTL);
+ if (oqctl == NULL)
+ RETURN(err_serious(-EPROTO));
- id = oqctl->qc_id;
- if (exp_connect_rmtclient(exp)) {
- struct ptlrpc_request *req = mdt_info_req(info);
- struct mdt_export_data *med = mdt_req2med(req);
- struct lustre_idmap_table *idmap = med->med_idmap;
+ rc = req_capsule_server_pack(pill);
+ if (rc)
+ RETURN(err_serious(rc));
- if (unlikely(oqctl->qc_cmd != Q_GETQUOTA &&
- oqctl->qc_cmd != Q_GETINFO))
- RETURN(-EPERM);
+ switch (oqctl->qc_cmd) {
+ case Q_QUOTACHECK:
+ case LUSTRE_Q_INVALIDATE:
+ case LUSTRE_Q_FINVALIDATE:
+ case Q_QUOTAON:
+ case Q_QUOTAOFF:
+ case Q_INITQUOTA:
+ /* deprecated, not used any more */
+ RETURN(-EOPNOTSUPP);
+ /* master quotactl */
+ case Q_GETINFO:
+ case Q_SETINFO:
+ case Q_SETQUOTA:
+ case Q_GETQUOTA:
+ if (qmt == NULL)
+ RETURN(-EOPNOTSUPP);
+ /* slave quotactl */
+ case Q_GETOINFO:
+ case Q_GETOQUOTA:
+ break;
+ default:
+ CERROR("Unsupported quotactl command: %d\n", oqctl->qc_cmd);
+ RETURN(-EFAULT);
+ }
+ /* map uid/gid for remote client */
+ id = oqctl->qc_id;
+ if (exp_connect_rmtclient(exp)) {
+ struct lustre_idmap_table *idmap;
+
+ idmap = mdt_req2med(mdt_info_req(info))->med_idmap;
+
+ if (unlikely(oqctl->qc_cmd != Q_GETQUOTA &&
+ oqctl->qc_cmd != Q_GETINFO))
+ RETURN(-EPERM);
+
+ if (oqctl->qc_type == USRQUOTA)
+ id = lustre_idmap_lookup_uid(NULL, idmap, 0,
+ oqctl->qc_id);
+ else if (oqctl->qc_type == GRPQUOTA)
+ id = lustre_idmap_lookup_gid(NULL, idmap, 0,
+ oqctl->qc_id);
+ else
+ RETURN(-EINVAL);
+
+ if (id == CFS_IDMAP_NOTFOUND) {
+ CDEBUG(D_QUOTA, "no mapping for id %u\n", oqctl->qc_id);
+ RETURN(-EACCES);
+ }
+ }
- if (oqctl->qc_type == USRQUOTA)
- id = lustre_idmap_lookup_uid(NULL, idmap, 0,
- oqctl->qc_id);
- else if (oqctl->qc_type == GRPQUOTA)
- id = lustre_idmap_lookup_gid(NULL, idmap, 0,
- oqctl->qc_id);
- else
- RETURN(-EINVAL);
+ repoqc = req_capsule_server_get(pill, &RMF_OBD_QUOTACTL);
+ if (repoqc == NULL)
+ RETURN(err_serious(-EFAULT));
- if (id == CFS_IDMAP_NOTFOUND) {
- CDEBUG(D_QUOTA, "no mapping for id %u\n",
- oqctl->qc_id);
- RETURN(-EACCES);
- }
- }
+ if (oqctl->qc_id != id)
+ swap(oqctl->qc_id, id);
- rc = req_capsule_server_pack(pill);
- if (rc)
- RETURN(rc);
+ switch (oqctl->qc_cmd) {
- repoqc = req_capsule_server_get(pill, &RMF_OBD_QUOTACTL);
- LASSERT(repoqc != NULL);
+ case Q_GETINFO:
+ case Q_SETINFO:
+ case Q_SETQUOTA:
+ case Q_GETQUOTA:
+ /* forward quotactl request to QMT */
+ rc = qmt_hdls.qmth_quotactl(info->mti_env, qmt, oqctl);
+ break;
- mq->mq_exp = exp;
- switch (oqctl->qc_cmd) {
- case Q_QUOTAON:
- rc = mqo->mqo_on(info->mti_env, next, oqctl->qc_type);
- break;
- case Q_QUOTAOFF:
- rc = mqo->mqo_off(info->mti_env, next, oqctl->qc_type);
- break;
- case Q_SETINFO:
- rc = mqo->mqo_setinfo(info->mti_env, next, oqctl->qc_type, id,
- &oqctl->qc_dqinfo);
- break;
- case Q_GETINFO:
- rc = mqo->mqo_getinfo(info->mti_env, next, oqctl->qc_type, id,
- &oqctl->qc_dqinfo);
- break;
- case Q_SETQUOTA:
- rc = mqo->mqo_setquota(info->mti_env, next, oqctl->qc_type, id,
- &oqctl->qc_dqblk);
- break;
- case Q_GETQUOTA:
- rc = mqo->mqo_getquota(info->mti_env, next, oqctl->qc_type, id,
- &oqctl->qc_dqblk);
- break;
- case Q_GETOINFO:
- rc = mqo->mqo_getoinfo(info->mti_env, next, oqctl->qc_type, id,
- &oqctl->qc_dqinfo);
- break;
- case Q_GETOQUOTA:
- rc = mqo->mqo_getoquota(info->mti_env, next, oqctl->qc_type, id,
- &oqctl->qc_dqblk);
- break;
- case LUSTRE_Q_INVALIDATE:
- rc = mqo->mqo_invalidate(info->mti_env, next, oqctl->qc_type);
- break;
- case LUSTRE_Q_FINVALIDATE:
- rc = mqo->mqo_finvalidate(info->mti_env, next, oqctl->qc_type);
- break;
- default:
- CERROR("unsupported mdt_quotactl command: %d\n",
- oqctl->qc_cmd);
- RETURN(-EFAULT);
- }
+ case Q_GETOINFO:
+ case Q_GETOQUOTA:
+ /* slave quotactl */
+ rc = lquotactl_slv(info->mti_env, info->mti_mdt->mdt_bottom,
+ oqctl);
+ break;
- *repoqc = *oqctl;
- RETURN(rc);
-}
-#endif
+ default:
+ CERROR("Unsupported quotactl command: %d\n", oqctl->qc_cmd);
+ RETURN(-EFAULT);
+ }
+ if (oqctl->qc_id != id)
+ swap(oqctl->qc_id, id);
+
+ *repoqc = *oqctl;
+ RETURN(rc);
+}
/*
* OBD PING and other handlers.
*/
-static int mdt_obd_ping(struct mdt_thread_info *info)
+int mdt_obd_ping(struct mdt_thread_info *info)
{
int rc;
ENTRY;
/*
* OBD_IDX_READ handler
*/
-static int mdt_obd_idx_read(struct mdt_thread_info *info)
+int mdt_obd_idx_read(struct mdt_thread_info *info)
{
struct mdt_device *mdt = info->mti_mdt;
struct lu_rdpg *rdpg = &info->mti_u.rdpg.mti_rdpg;
if (req_ii->ii_count <= 0)
GOTO(out, rc = -EFAULT);
rdpg->rp_count = min_t(unsigned int, req_ii->ii_count << LU_PAGE_SHIFT,
- PTLRPC_MAX_BRW_SIZE);
+ exp_max_brw_size(info->mti_exp));
rdpg->rp_npages = (rdpg->rp_count + CFS_PAGE_SIZE -1) >> CFS_PAGE_SHIFT;
/* allocate pages to store the containers */
return rc;
}
-static int mdt_obd_log_cancel(struct mdt_thread_info *info)
+int mdt_obd_log_cancel(struct mdt_thread_info *info)
{
return err_serious(-EOPNOTSUPP);
}
-static int mdt_obd_qc_callback(struct mdt_thread_info *info)
+int mdt_obd_qc_callback(struct mdt_thread_info *info)
{
return err_serious(-EOPNOTSUPP);
}
-
/*
* LLOG handlers.
*/
rc = next->md_ops->mdo_llog_ctxt_get(env, next, idx, (void **)&ctxt);
if (rc || ctxt == NULL) {
- CERROR("Can't get mdd ctxt %d\n", rc);
- return rc;
+ return 0;
}
rc = llog_group_set_ctxt(&mdt2obd_dev(mdt)->obd_olg, ctxt, idx);
return 0;
}
-static int mdt_llog_create(struct mdt_thread_info *info)
+int mdt_llog_create(struct mdt_thread_info *info)
{
int rc;
return (rc < 0 ? err_serious(rc) : rc);
}
-static int mdt_llog_destroy(struct mdt_thread_info *info)
+int mdt_llog_destroy(struct mdt_thread_info *info)
{
int rc;
return (rc < 0 ? err_serious(rc) : rc);
}
-static int mdt_llog_read_header(struct mdt_thread_info *info)
+int mdt_llog_read_header(struct mdt_thread_info *info)
{
int rc;
return (rc < 0 ? err_serious(rc) : rc);
}
-static int mdt_llog_next_block(struct mdt_thread_info *info)
+int mdt_llog_next_block(struct mdt_thread_info *info)
{
int rc;
return (rc < 0 ? err_serious(rc) : rc);
}
-static int mdt_llog_prev_block(struct mdt_thread_info *info)
+int mdt_llog_prev_block(struct mdt_thread_info *info)
{
int rc;
/*
* DLM handlers.
*/
+
static struct ldlm_callback_suite cbs = {
- .lcs_completion = ldlm_server_completion_ast,
- .lcs_blocking = ldlm_server_blocking_ast,
- .lcs_glimpse = NULL
+ .lcs_completion = ldlm_server_completion_ast,
+ .lcs_blocking = ldlm_server_blocking_ast,
+ .lcs_glimpse = ldlm_server_glimpse_ast
};
-static int mdt_enqueue(struct mdt_thread_info *info)
+int mdt_enqueue(struct mdt_thread_info *info)
{
struct ptlrpc_request *req;
int rc;
return rc ? err_serious(rc) : req->rq_status;
}
-static int mdt_convert(struct mdt_thread_info *info)
+int mdt_convert(struct mdt_thread_info *info)
{
int rc;
struct ptlrpc_request *req;
return rc ? err_serious(rc) : req->rq_status;
}
-static int mdt_bl_callback(struct mdt_thread_info *info)
+int mdt_bl_callback(struct mdt_thread_info *info)
{
CERROR("bl callbacks should not happen on MDS\n");
LBUG();
return err_serious(-EOPNOTSUPP);
}
-static int mdt_cp_callback(struct mdt_thread_info *info)
+int mdt_cp_callback(struct mdt_thread_info *info)
{
CERROR("cp callbacks should not happen on MDS\n");
LBUG();
/*
* sec context handlers
*/
-static int mdt_sec_ctx_handle(struct mdt_thread_info *info)
+int mdt_sec_ctx_handle(struct mdt_thread_info *info)
{
int rc;
return rc;
}
+/*
+ * quota request handlers
+ */
+int mdt_quota_dqacq(struct mdt_thread_info *info)
+{
+ struct lu_device *qmt = info->mti_mdt->mdt_qmt_dev;
+ int rc;
+ ENTRY;
+
+ if (qmt == NULL)
+ RETURN(err_serious(-EOPNOTSUPP));
+
+ rc = qmt_hdls.qmth_dqacq(info->mti_env, qmt, mdt_info_req(info));
+ RETURN(rc);
+}
+
static struct mdt_object *mdt_obj(struct lu_object *o)
{
LASSERT(lu_device_is_mdt(o->lo_dev));
RETURN(rc);
}
-int mdt_object_lock(struct mdt_thread_info *info, struct mdt_object *o,
- struct mdt_lock_handle *lh, __u64 ibits, int locality)
+int mdt_md_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc,
+ void *data, int flag)
+{
+ struct lustre_handle lockh;
+ int rc;
+
+ switch (flag) {
+ case LDLM_CB_BLOCKING:
+ ldlm_lock2handle(lock, &lockh);
+ rc = ldlm_cli_cancel(&lockh);
+ if (rc < 0) {
+ CDEBUG(D_INODE, "ldlm_cli_cancel: %d\n", rc);
+ RETURN(rc);
+ }
+ break;
+ case LDLM_CB_CANCELING:
+ LDLM_DEBUG(lock, "Revoke remote lock\n");
+ break;
+ default:
+ LBUG();
+ }
+ RETURN(0);
+}
+
+int mdt_remote_object_lock(struct mdt_thread_info *mti,
+ struct mdt_object *o, struct lustre_handle *lh,
+ ldlm_mode_t mode, __u64 ibits)
+{
+ struct ldlm_enqueue_info *einfo = &mti->mti_einfo;
+ ldlm_policy_data_t *policy = &mti->mti_policy;
+ int rc = 0;
+ ENTRY;
+
+ LASSERT(mdt_object_remote(o));
+
+ LASSERT((ibits & MDS_INODELOCK_UPDATE));
+
+ memset(einfo, 0, sizeof(*einfo));
+ einfo->ei_type = LDLM_IBITS;
+ einfo->ei_mode = mode;
+ einfo->ei_cb_bl = mdt_md_blocking_ast;
+ einfo->ei_cb_cp = ldlm_completion_ast;
+
+ memset(policy, 0, sizeof(*policy));
+ policy->l_inodebits.bits = ibits;
+
+ rc = mo_object_lock(mti->mti_env, mdt_object_child(o), lh, einfo,
+ policy);
+ RETURN(rc);
+}
+
+static int mdt_object_lock0(struct mdt_thread_info *info, struct mdt_object *o,
+ struct mdt_lock_handle *lh, __u64 ibits,
+ bool nonblock, int locality)
{
struct ldlm_namespace *ns = info->mti_mdt->mdt_namespace;
ldlm_policy_data_t *policy = &info->mti_policy;
struct ldlm_res_id *res_id = &info->mti_res_id;
+ __u64 dlmflags;
int rc;
ENTRY;
LASSERT(lh->mlh_reg_mode != LCK_MINMODE);
LASSERT(lh->mlh_type != MDT_NUL_LOCK);
- if (mdt_object_exists(o) < 0) {
+ if (mdt_object_remote(o)) {
if (locality == MDT_CROSS_LOCK) {
- /* cross-ref object fix */
- ibits &= ~MDS_INODELOCK_UPDATE;
+ ibits &= ~(MDS_INODELOCK_UPDATE | MDS_INODELOCK_PERM);
ibits |= MDS_INODELOCK_LOOKUP;
} else {
- LASSERT(!(ibits & MDS_INODELOCK_UPDATE));
+ LASSERTF(!(ibits &
+ (MDS_INODELOCK_UPDATE | MDS_INODELOCK_PERM)),
+ "%s: wrong bit "LPX64" for remote obj "DFID"\n",
+ mdt_obd_name(info->mti_mdt), ibits,
+ PFID(mdt_object_fid(o)));
LASSERT(ibits & MDS_INODELOCK_LOOKUP);
}
/* No PDO lock on remote object */
LASSERT(lh->mlh_type != MDT_PDO_LOCK);
}
- if (lh->mlh_type == MDT_PDO_LOCK) {
+ if (lh->mlh_type == MDT_PDO_LOCK) {
/* check for exists after object is locked */
if (mdt_object_exists(o) == 0) {
/* Non-existent object shouldn't have PDO lock */
memset(policy, 0, sizeof(*policy));
fid_build_reg_res_name(mdt_object_fid(o), res_id);
+ dlmflags = LDLM_FL_ATOMIC_CB;
+ if (nonblock)
+ dlmflags |= LDLM_FL_BLOCK_NOWAIT;
+
/*
* Take PDO lock on whole directory and build correct @res_id for lock
* on part of directory.
*/
policy->l_inodebits.bits = MDS_INODELOCK_UPDATE;
rc = mdt_fid_lock(ns, &lh->mlh_pdo_lh, lh->mlh_pdo_mode,
- policy, res_id, LDLM_FL_ATOMIC_CB,
+ policy, res_id, dlmflags,
&info->mti_exp->exp_handle.h_cookie);
if (unlikely(rc))
RETURN(rc);
* fix it up and turn FL_LOCAL flag off.
*/
rc = mdt_fid_lock(ns, &lh->mlh_reg_lh, lh->mlh_reg_mode, policy,
- res_id, LDLM_FL_LOCAL_ONLY | LDLM_FL_ATOMIC_CB,
+ res_id, LDLM_FL_LOCAL_ONLY | dlmflags,
&info->mti_exp->exp_handle.h_cookie);
if (rc)
mdt_object_unlock(info, o, lh, 1);
RETURN(rc);
}
+int mdt_object_lock(struct mdt_thread_info *info, struct mdt_object *o,
+ struct mdt_lock_handle *lh, __u64 ibits, int locality)
+{
+ return mdt_object_lock0(info, o, lh, ibits, false, locality);
+}
+
+int mdt_object_lock_try(struct mdt_thread_info *info, struct mdt_object *o,
+ struct mdt_lock_handle *lh, __u64 ibits, int locality)
+{
+ struct mdt_lock_handle tmp = *lh;
+ int rc;
+
+ rc = mdt_object_lock0(info, o, &tmp, ibits, true, locality);
+ if (rc == 0)
+ *lh = tmp;
+
+ return rc == 0;
+}
+
/**
* Save a lock within request object.
*
mdt_save_lock(info, &lh->mlh_pdo_lh, lh->mlh_pdo_mode, decref);
mdt_save_lock(info, &lh->mlh_reg_lh, lh->mlh_reg_mode, decref);
+ if (lustre_handle_is_used(&lh->mlh_rreg_lh))
+ ldlm_lock_decref(&lh->mlh_rreg_lh, lh->mlh_rreg_mode);
+
EXIT;
}
mdt_object_put(info->mti_env, o);
}
-static struct mdt_handler *mdt_handler_find(__u32 opc,
- struct mdt_opc_slice *supported)
+struct mdt_handler *mdt_handler_find(__u32 opc, struct mdt_opc_slice *supported)
{
struct mdt_opc_slice *s;
struct mdt_handler *h;
req_capsule_set_size(pill, &RMF_MDT_MD, RCL_SERVER,
info->mti_body->eadatasize);
if (req_capsule_has_field(pill, &RMF_LOGCOOKIES, RCL_SERVER))
- req_capsule_set_size(pill, &RMF_LOGCOOKIES, RCL_SERVER,
- info->mti_mdt->mdt_max_cookiesize);
+ req_capsule_set_size(pill, &RMF_LOGCOOKIES,
+ RCL_SERVER, 0);
rc = req_capsule_server_pack(pill);
}
rc = mdt_unpack_req_pack_rep(info, flags);
}
- if (rc == 0 && flags & MUTABOR &&
- req->rq_export->exp_connect_flags & OBD_CONNECT_RDONLY)
- /* should it be rq_status? */
- rc = -EROFS;
+ if (rc == 0 && flags & MUTABOR &&
+ exp_connect_flags(req->rq_export) & OBD_CONNECT_RDONLY)
+ /* should it be rq_status? */
+ rc = -EROFS;
if (rc == 0 && flags & HABEO_CLAVIS) {
struct ldlm_request *dlm_req;
lh->mlh_reg_mode = LCK_MINMODE;
lh->mlh_pdo_lh.cookie = 0ull;
lh->mlh_pdo_mode = LCK_MINMODE;
+ lh->mlh_rreg_lh.cookie = 0ull;
+ lh->mlh_rreg_mode = LCK_MINMODE;
}
void mdt_lock_handle_fini(struct mdt_lock_handle *lh)
struct mdt_thread_info *info)
{
int i;
- struct md_capainfo *ci;
req_capsule_init(&req->rq_pill, req, RCL_SERVER);
info->mti_pill = &req->rq_pill;
} else
info->mti_mdt = NULL;
info->mti_env = req->rq_svc_thread->t_env;
- ci = md_capainfo(info->mti_env);
- memset(ci, 0, sizeof *ci);
- if (req->rq_export) {
- if (exp_connect_rmtclient(req->rq_export))
- ci->mc_auth = LC_ID_CONVERT;
- else if (req->rq_export->exp_connect_flags &
- OBD_CONNECT_MDS_CAPA)
- ci->mc_auth = LC_ID_PLAIN;
- else
- ci->mc_auth = LC_ID_NONE;
- }
-
info->mti_fail_id = OBD_FAIL_MDS_ALL_REPLY_NET;
info->mti_transno = lustre_msg_get_transno(req->rq_reqmsg);
info->mti_mos = NULL;
info->mti_object = NULL;
info->mti_dlm_req = NULL;
info->mti_has_trans = 0;
- info->mti_no_need_trans = 0;
info->mti_cross_ref = 0;
info->mti_opdata = 0;
info->mti_big_lmm_used = 0;
/* To not check for split by default. */
- info->mti_spec.sp_ck_split = 0;
info->mti_spec.no_create = 0;
}
req_capsule_fini(info->mti_pill);
if (info->mti_object != NULL) {
- /*
- * freeing an object may lead to OSD level transaction, do not
- * let it mess with MDT. bz19385.
- */
- info->mti_no_need_trans = 1;
mdt_object_put(info->mti_env, info->mti_object);
info->mti_object = NULL;
}
case OST_CONNECT: /* This will never get here, but for completeness. */
case MDS_DISCONNECT:
case OST_DISCONNECT:
+ case OBD_IDX_READ:
*process = 1;
RETURN(0);
}
if (unlikely(!class_connected_export(req->rq_export))) {
- CERROR("operation %d on unconnected MDS from %s\n",
- lustre_msg_get_opc(req->rq_reqmsg),
- libcfs_id2str(req->rq_peer));
+ CDEBUG(D_HA, "operation %d on unconnected MDS from %s\n",
+ lustre_msg_get_opc(req->rq_reqmsg),
+ libcfs_id2str(req->rq_peer));
/* FIXME: For CMD cleanup, when mds_B stop, the req from
* mds_A will get -ENOTCONN(especially for ping req),
* which will cause that mds_A deactive timeout, then when
case MDS_SETXATTR:
case MDS_SET_INFO:
case MDS_GET_INFO:
+ case MDS_HSM_PROGRESS:
+ case MDS_HSM_REQUEST:
+ case MDS_HSM_CT_REGISTER:
+ case MDS_HSM_CT_UNREGISTER:
+ case MDS_HSM_STATE_GET:
+ case MDS_HSM_STATE_SET:
+ case MDS_HSM_ACTION:
case MDS_QUOTACHECK:
case MDS_QUOTACTL:
+ case UPDATE_OBJ:
+ case MDS_SWAP_LAYOUTS:
case QUOTA_DQACQ:
case QUOTA_DQREL:
case SEQ_QUERY:
* XXX common "target" functionality should be factored into separate module
* shared by mdt, ost and stand-alone services like fld.
*/
-static int mdt_handle_common(struct ptlrpc_request *req,
- struct mdt_opc_slice *supported)
+int mdt_handle_common(struct ptlrpc_request *req,
+ struct mdt_opc_slice *supported)
{
struct lu_env *env;
struct mdt_thread_info *info;
ENTRY;
env = req->rq_svc_thread->t_env;
+ /* Refill(initilize) the context(mdt_thread_info), in case it is
+ * not initialized yet. Usually it happens during start up, after
+ * MDS(ptlrpc threads) is start up, it gets the first CONNECT request,
+ * before MDT_thread_info is initialized */
+ lu_env_refill(env);
LASSERT(env != NULL);
LASSERT(env->le_ses != NULL);
LASSERT(env->le_ctx.lc_thread == req->rq_svc_thread);
RETURN(rc);
}
-static int mdt_regular_handle(struct ptlrpc_request *req)
-{
- return mdt_handle_common(req, mdt_regular_handlers);
-}
-
-static int mdt_readpage_handle(struct ptlrpc_request *req)
-{
- return mdt_handle_common(req, mdt_readpage_handlers);
-}
-
-static int mdt_xmds_handle(struct ptlrpc_request *req)
-{
- return mdt_handle_common(req, mdt_xmds_handlers);
-}
-
-static int mdt_mdsc_handle(struct ptlrpc_request *req)
-{
- return mdt_handle_common(req, mdt_seq_handlers);
-}
-
-static int mdt_mdss_handle(struct ptlrpc_request *req)
-{
- return mdt_handle_common(req, mdt_seq_handlers);
-}
-
-static int mdt_dtss_handle(struct ptlrpc_request *req)
-{
- return mdt_handle_common(req, mdt_seq_handlers);
-}
-
-static int mdt_fld_handle(struct ptlrpc_request *req)
-{
- return mdt_handle_common(req, mdt_fld_handlers);
-}
-
enum mdt_it_code {
MDT_IT_OPEN,
MDT_IT_OCREAT,
MDT_IT_TRUNC,
MDT_IT_GETXATTR,
MDT_IT_LAYOUT,
+ MDT_IT_QUOTA,
MDT_IT_NR
};
static int mdt_intent_getattr(enum mdt_it_code opcode,
struct mdt_thread_info *info,
struct ldlm_lock **,
- int);
+ __u64);
+static int mdt_intent_layout(enum mdt_it_code opcode,
+ struct mdt_thread_info *info,
+ struct ldlm_lock **,
+ __u64);
static int mdt_intent_reint(enum mdt_it_code opcode,
struct mdt_thread_info *info,
struct ldlm_lock **,
- int);
+ __u64);
static struct mdt_it_flavor {
const struct req_format *it_fmt;
int (*it_act)(enum mdt_it_code ,
struct mdt_thread_info *,
struct ldlm_lock **,
- int);
+ __u64);
long it_reint;
} mdt_it_flavor[] = {
[MDT_IT_OPEN] = {
.it_flags = 0,
.it_act = NULL
},
- [MDT_IT_LAYOUT] = {
- .it_fmt = &RQF_LDLM_INTENT_GETATTR,
- .it_flags = HABEO_REFERO,
- .it_act = mdt_intent_getattr
- }
+ [MDT_IT_LAYOUT] = {
+ .it_fmt = &RQF_LDLM_INTENT_LAYOUT,
+ .it_flags = 0,
+ .it_act = mdt_intent_layout
+ }
};
int mdt_intent_lock_replace(struct mdt_thread_info *info,
struct ldlm_lock **lockp,
struct ldlm_lock *new_lock,
struct mdt_lock_handle *lh,
- int flags)
+ __u64 flags)
{
struct ptlrpc_request *req = mdt_info_req(info);
struct ldlm_lock *lock = *lockp;
static int mdt_intent_getattr(enum mdt_it_code opcode,
struct mdt_thread_info *info,
struct ldlm_lock **lockp,
- int flags)
+ __u64 flags)
{
struct mdt_lock_handle *lhc = &info->mti_lh[MDT_LH_RMT];
struct ldlm_lock *new_lock = NULL;
repbody = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY);
LASSERT(repbody);
- info->mti_spec.sp_ck_split = !!(reqbody->valid & OBD_MD_FLCKSPLIT);
info->mti_cross_ref = !!(reqbody->valid & OBD_MD_FLCROSSREF);
repbody->eadatasize = 0;
repbody->aclsize = 0;
switch (opcode) {
case MDT_IT_LOOKUP:
- child_bits = MDS_INODELOCK_LOOKUP;
+ child_bits = MDS_INODELOCK_LOOKUP | MDS_INODELOCK_PERM;
break;
case MDT_IT_GETATTR:
- child_bits = MDS_INODELOCK_LOOKUP | MDS_INODELOCK_UPDATE;
- break;
- case MDT_IT_LAYOUT: {
- static int printed = 0;
-
- if (!printed) {
- CERROR("layout lock not supported by this version\n");
- printed = 1;
- }
- GOTO(out_shrink, rc = -EINVAL);
+ child_bits = MDS_INODELOCK_LOOKUP | MDS_INODELOCK_UPDATE |
+ MDS_INODELOCK_PERM;
break;
- }
default:
CERROR("Unsupported intent (%d)\n", opcode);
GOTO(out_shrink, rc = -EINVAL);
return rc;
}
+static int mdt_intent_layout(enum mdt_it_code opcode,
+ struct mdt_thread_info *info,
+ struct ldlm_lock **lockp,
+ __u64 flags)
+{
+ struct layout_intent *layout;
+ int rc;
+ ENTRY;
+
+ if (opcode != MDT_IT_LAYOUT) {
+ CERROR("%s: Unknown intent (%d)\n",
+ info->mti_exp->exp_obd->obd_name, opcode);
+ RETURN(-EINVAL);
+ }
+
+ (*lockp)->l_lvb_type = LVB_T_LAYOUT;
+ req_capsule_set_size(info->mti_pill, &RMF_DLM_LVB, RCL_SERVER,
+ ldlm_lvbo_size(*lockp));
+ rc = req_capsule_server_pack(info->mti_pill);
+ if (rc != 0)
+ RETURN(-EINVAL);
+
+ layout = req_capsule_client_get(info->mti_pill, &RMF_LAYOUT_INTENT);
+ LASSERT(layout != NULL);
+ if (layout->li_opc == LAYOUT_INTENT_ACCESS)
+ /* return to normal ldlm handling */
+ RETURN(0);
+
+ CERROR("%s: Unsupported layout intent (%d)\n",
+ info->mti_exp->exp_obd->obd_name, layout->li_opc);
+ RETURN(-EINVAL);
+}
+
static int mdt_intent_reint(enum mdt_it_code opcode,
struct mdt_thread_info *info,
struct ldlm_lock **lockp,
- int flags)
+ __u64 flags)
{
struct mdt_lock_handle *lhc = &info->mti_lh[MDT_LH_RMT];
struct ldlm_reply *rep = NULL;
*/
if (lustre_handle_is_used(&lhc->mlh_reg_lh)) {
LASSERTF(rc == 0, "Error occurred but lock handle "
- "is still in use\n");
+ "is still in use, rc = %d\n", rc);
rep->lock_policy_res2 = 0;
rc = mdt_intent_lock_replace(info, lockp, NULL, lhc, flags);
RETURN(rc);
case IT_LAYOUT:
rc = MDT_IT_LAYOUT;
break;
+ case IT_QUOTA_DQACQ:
+ case IT_QUOTA_CONN:
+ rc = MDT_IT_QUOTA;
+ break;
default:
CERROR("Unknown intent opcode: %ld\n", itcode);
rc = -EINVAL;
}
static int mdt_intent_opc(long itopc, struct mdt_thread_info *info,
- struct ldlm_lock **lockp, int flags)
+ struct ldlm_lock **lockp, __u64 flags)
{
struct req_capsule *pill;
struct mdt_it_flavor *flv;
RETURN(-EINVAL);
pill = info->mti_pill;
- flv = &mdt_it_flavor[opc];
+ if (opc == MDT_IT_QUOTA) {
+ struct lu_device *qmt = info->mti_mdt->mdt_qmt_dev;
+
+ if (qmt == NULL)
+ RETURN(-EOPNOTSUPP);
+
+ (*lockp)->l_lvb_type = LVB_T_LQUOTA;
+ /* pass the request to quota master */
+ rc = qmt_hdls.qmth_intent_policy(info->mti_env, qmt,
+ mdt_info_req(info), lockp,
+ flags);
+ RETURN(rc);
+ }
+
+ flv = &mdt_it_flavor[opc];
if (flv->it_fmt != NULL)
req_capsule_extend(pill, flv->it_fmt);
rc = mdt_unpack_req_pack_rep(info, flv->it_flags);
if (rc == 0) {
struct ptlrpc_request *req = mdt_info_req(info);
- if (flv->it_flags & MUTABOR &&
- req->rq_export->exp_connect_flags & OBD_CONNECT_RDONLY)
- RETURN(-EROFS);
+ if (flv->it_flags & MUTABOR &&
+ exp_connect_flags(req->rq_export) & OBD_CONNECT_RDONLY)
+ RETURN(-EROFS);
}
if (rc == 0 && flv->it_act != NULL) {
/* execute policy */
static int mdt_intent_policy(struct ldlm_namespace *ns,
struct ldlm_lock **lockp, void *req_cookie,
- ldlm_mode_t mode, int flags, void *data)
+ ldlm_mode_t mode, __u64 flags, void *data)
{
struct mdt_thread_info *info;
struct ptlrpc_request *req = req_cookie;
LASSERT(pill->rc_req == req);
if (req->rq_reqmsg->lm_bufcount > DLM_INTENT_IT_OFF) {
- req_capsule_extend(pill, &RQF_LDLM_INTENT);
+ req_capsule_extend(pill, &RQF_LDLM_INTENT_BASIC);
it = req_capsule_client_get(pill, &RMF_LDLM_INTENT);
if (it != NULL) {
rc = mdt_intent_opc(it->opc, info, lockp, flags);
} else {
/* No intent was provided */
LASSERT(pill->rc_fmt == &RQF_LDLM_ENQUEUE);
+ req_capsule_set_size(pill, &RMF_DLM_LVB, RCL_SERVER, 0);
rc = req_capsule_server_pack(pill);
if (rc)
rc = err_serious(rc);
static int mdt_seq_fini(const struct lu_env *env,
struct mdt_device *m)
{
- struct md_site *ms = mdt_md_site(m);
- ENTRY;
-
- if (ms != NULL) {
- if (ms->ms_server_seq) {
- seq_server_fini(ms->ms_server_seq, env);
- OBD_FREE_PTR(ms->ms_server_seq);
- ms->ms_server_seq = NULL;
- }
-
- if (ms->ms_control_seq) {
- seq_server_fini(ms->ms_control_seq, env);
- OBD_FREE_PTR(ms->ms_control_seq);
- ms->ms_control_seq = NULL;
- }
-
- if (ms->ms_client_seq) {
- seq_client_fini(ms->ms_client_seq);
- OBD_FREE_PTR(ms->ms_client_seq);
- ms->ms_client_seq = NULL;
- }
- }
-
- RETURN(0);
+ return seq_site_fini(env, mdt_seq_site(m));
}
static int mdt_seq_init(const struct lu_env *env,
const char *uuid,
struct mdt_device *m)
{
- struct md_site *ms;
- char *prefix;
- int rc;
- ENTRY;
+ struct seq_server_site *ss;
+ char *prefix;
+ int rc;
+ ENTRY;
- ms = mdt_md_site(m);
+ ss = mdt_seq_site(m);
- /*
- * This is sequence-controller node. Init seq-controller server on local
- * MDT.
- */
- if (ms->ms_node_id == 0) {
- LASSERT(ms->ms_control_seq == NULL);
+ /*
+ * This is sequence-controller node. Init seq-controller server on local
+ * MDT.
+ */
+ if (ss->ss_node_id == 0) {
+ LASSERT(ss->ss_control_seq == NULL);
- OBD_ALLOC_PTR(ms->ms_control_seq);
- if (ms->ms_control_seq == NULL)
- RETURN(-ENOMEM);
+ OBD_ALLOC_PTR(ss->ss_control_seq);
+ if (ss->ss_control_seq == NULL)
+ RETURN(-ENOMEM);
- rc = seq_server_init(ms->ms_control_seq,
- m->mdt_bottom, uuid,
- LUSTRE_SEQ_CONTROLLER,
- ms,
- env);
+ rc = seq_server_init(ss->ss_control_seq,
+ m->mdt_bottom, uuid,
+ LUSTRE_SEQ_CONTROLLER,
+ ss,
+ env);
- if (rc)
- GOTO(out_seq_fini, rc);
+ if (rc)
+ GOTO(out_seq_fini, rc);
- OBD_ALLOC_PTR(ms->ms_client_seq);
- if (ms->ms_client_seq == NULL)
- GOTO(out_seq_fini, rc = -ENOMEM);
+ OBD_ALLOC_PTR(ss->ss_client_seq);
+ if (ss->ss_client_seq == NULL)
+ GOTO(out_seq_fini, rc = -ENOMEM);
- OBD_ALLOC(prefix, MAX_OBD_NAME + 5);
- if (prefix == NULL) {
- OBD_FREE_PTR(ms->ms_client_seq);
- GOTO(out_seq_fini, rc = -ENOMEM);
- }
+ OBD_ALLOC(prefix, MAX_OBD_NAME + 5);
+ if (prefix == NULL) {
+ OBD_FREE_PTR(ss->ss_client_seq);
+ GOTO(out_seq_fini, rc = -ENOMEM);
+ }
- snprintf(prefix, MAX_OBD_NAME + 5, "ctl-%s",
- uuid);
+ snprintf(prefix, MAX_OBD_NAME + 5, "ctl-%s",
+ uuid);
- /*
- * Init seq-controller client after seq-controller server is
- * ready. Pass ms->ms_control_seq to it for direct talking.
- */
- rc = seq_client_init(ms->ms_client_seq, NULL,
- LUSTRE_SEQ_METADATA, prefix,
- ms->ms_control_seq);
- OBD_FREE(prefix, MAX_OBD_NAME + 5);
+ /*
+ * Init seq-controller client after seq-controller server is
+ * ready. Pass ss->ss_control_seq to it for direct talking.
+ */
+ rc = seq_client_init(ss->ss_client_seq, NULL,
+ LUSTRE_SEQ_METADATA, prefix,
+ ss->ss_control_seq);
+ OBD_FREE(prefix, MAX_OBD_NAME + 5);
- if (rc)
- GOTO(out_seq_fini, rc);
- }
+ if (rc)
+ GOTO(out_seq_fini, rc);
+ }
- /* Init seq-server on local MDT */
- LASSERT(ms->ms_server_seq == NULL);
+ /* Init seq-server on local MDT */
+ LASSERT(ss->ss_server_seq == NULL);
- OBD_ALLOC_PTR(ms->ms_server_seq);
- if (ms->ms_server_seq == NULL)
- GOTO(out_seq_fini, rc = -ENOMEM);
+ OBD_ALLOC_PTR(ss->ss_server_seq);
+ if (ss->ss_server_seq == NULL)
+ GOTO(out_seq_fini, rc = -ENOMEM);
- rc = seq_server_init(ms->ms_server_seq,
- m->mdt_bottom, uuid,
- LUSTRE_SEQ_SERVER,
- ms,
- env);
- if (rc)
- GOTO(out_seq_fini, rc = -ENOMEM);
+ rc = seq_server_init(ss->ss_server_seq,
+ m->mdt_bottom, uuid,
+ LUSTRE_SEQ_SERVER,
+ ss,
+ env);
+ if (rc)
+ GOTO(out_seq_fini, rc = -ENOMEM);
- /* Assign seq-controller client to local seq-server. */
- if (ms->ms_node_id == 0) {
- LASSERT(ms->ms_client_seq != NULL);
+ /* Assign seq-controller client to local seq-server. */
+ if (ss->ss_node_id == 0) {
+ LASSERT(ss->ss_client_seq != NULL);
- rc = seq_server_set_cli(ms->ms_server_seq,
- ms->ms_client_seq,
- env);
- }
+ rc = seq_server_set_cli(ss->ss_server_seq,
+ ss->ss_client_seq,
+ env);
+ }
- EXIT;
+ EXIT;
out_seq_fini:
- if (rc)
- mdt_seq_fini(env, m);
-
- return rc;
-}
-/*
- * Init client sequence manager which is used by local MDS to talk to sequence
- * controller on remote node.
- */
-static int mdt_seq_init_cli(const struct lu_env *env,
- struct mdt_device *m,
- struct lustre_cfg *cfg)
-{
- struct md_site *ms = mdt_md_site(m);
- struct obd_device *mdc;
- struct obd_uuid *uuidp, *mdcuuidp;
- char *uuid_str, *mdc_uuid_str;
- int rc;
- int index;
- struct mdt_thread_info *info;
- char *p, *index_string = lustre_cfg_string(cfg, 2);
- ENTRY;
-
- info = lu_context_key_get(&env->le_ctx, &mdt_thread_key);
- uuidp = &info->mti_u.uuid[0];
- mdcuuidp = &info->mti_u.uuid[1];
-
- LASSERT(index_string);
-
- index = simple_strtol(index_string, &p, 10);
- if (*p) {
- CERROR("Invalid index in lustre_cgf, offset 2\n");
- RETURN(-EINVAL);
- }
-
- /* check if this is adding the first MDC and controller is not yet
- * initialized. */
- if (index != 0 || ms->ms_client_seq)
- RETURN(0);
-
- uuid_str = lustre_cfg_string(cfg, 1);
- mdc_uuid_str = lustre_cfg_string(cfg, 4);
- obd_str2uuid(uuidp, uuid_str);
- obd_str2uuid(mdcuuidp, mdc_uuid_str);
-
- mdc = class_find_client_obd(uuidp, LUSTRE_MDC_NAME, mdcuuidp);
- if (!mdc) {
- CERROR("can't find controller MDC by uuid %s\n",
- uuid_str);
- rc = -ENOENT;
- } else if (!mdc->obd_set_up) {
- CERROR("target %s not set up\n", mdc->obd_name);
- rc = -EINVAL;
- } else {
- LASSERT(ms->ms_control_exp);
- OBD_ALLOC_PTR(ms->ms_client_seq);
- if (ms->ms_client_seq != NULL) {
- char *prefix;
-
- OBD_ALLOC(prefix, MAX_OBD_NAME + 5);
- if (!prefix)
- RETURN(-ENOMEM);
-
- snprintf(prefix, MAX_OBD_NAME + 5, "ctl-%s",
- mdc->obd_name);
-
- rc = seq_client_init(ms->ms_client_seq,
- ms->ms_control_exp,
- LUSTRE_SEQ_METADATA,
- prefix, NULL);
- OBD_FREE(prefix, MAX_OBD_NAME + 5);
- } else
- rc = -ENOMEM;
-
- if (rc)
- RETURN(rc);
-
- LASSERT(ms->ms_server_seq != NULL);
- rc = seq_server_set_cli(ms->ms_server_seq, ms->ms_client_seq,
- env);
- }
-
- RETURN(rc);
-}
-
-static void mdt_seq_fini_cli(struct mdt_device *m)
-{
- struct md_site *ms;
-
- ENTRY;
-
- ms = mdt_md_site(m);
-
- if (ms != NULL) {
- if (ms->ms_server_seq)
- seq_server_set_cli(ms->ms_server_seq,
- NULL, NULL);
+ if (rc)
+ mdt_seq_fini(env, m);
- if (ms->ms_control_exp) {
- class_export_put(ms->ms_control_exp);
- ms->ms_control_exp = NULL;
- }
- }
- EXIT;
+ return rc;
}
/*
static int mdt_fld_fini(const struct lu_env *env,
struct mdt_device *m)
{
- struct md_site *ms = mdt_md_site(m);
- ENTRY;
+ struct seq_server_site *ss = mdt_seq_site(m);
+ ENTRY;
- if (ms && ms->ms_server_fld) {
- fld_server_fini(ms->ms_server_fld, env);
- OBD_FREE_PTR(ms->ms_server_fld);
- ms->ms_server_fld = NULL;
- }
+ if (ss && ss->ss_server_fld) {
+ fld_server_fini(env, ss->ss_server_fld);
+ OBD_FREE_PTR(ss->ss_server_fld);
+ ss->ss_server_fld = NULL;
+ }
- RETURN(0);
+ RETURN(0);
}
static int mdt_fld_init(const struct lu_env *env,
const char *uuid,
struct mdt_device *m)
{
- struct md_site *ms;
- int rc;
- ENTRY;
-
- ms = mdt_md_site(m);
+ struct seq_server_site *ss;
+ int rc;
+ ENTRY;
- OBD_ALLOC_PTR(ms->ms_server_fld);
- if (ms->ms_server_fld == NULL)
- RETURN(rc = -ENOMEM);
+ ss = mdt_seq_site(m);
- rc = fld_server_init(ms->ms_server_fld,
- m->mdt_bottom, uuid,
- env, ms->ms_node_id);
- if (rc) {
- OBD_FREE_PTR(ms->ms_server_fld);
- ms->ms_server_fld = NULL;
- RETURN(rc);
- }
+ OBD_ALLOC_PTR(ss->ss_server_fld);
+ if (ss->ss_server_fld == NULL)
+ RETURN(rc = -ENOMEM);
- RETURN(0);
-}
+ rc = fld_server_init(env, ss->ss_server_fld, m->mdt_bottom, uuid,
+ ss->ss_node_id, LU_SEQ_RANGE_MDT);
+ if (rc) {
+ OBD_FREE_PTR(ss->ss_server_fld);
+ ss->ss_server_fld = NULL;
+ RETURN(rc);
+ }
-/* device init/fini methods */
-static void mdt_stop_ptlrpc_service(struct mdt_device *m)
-{
- ENTRY;
- if (m->mdt_regular_service != NULL) {
- ptlrpc_unregister_service(m->mdt_regular_service);
- m->mdt_regular_service = NULL;
- }
- if (m->mdt_readpage_service != NULL) {
- ptlrpc_unregister_service(m->mdt_readpage_service);
- m->mdt_readpage_service = NULL;
- }
- if (m->mdt_xmds_service != NULL) {
- ptlrpc_unregister_service(m->mdt_xmds_service);
- m->mdt_xmds_service = NULL;
- }
- if (m->mdt_setattr_service != NULL) {
- ptlrpc_unregister_service(m->mdt_setattr_service);
- m->mdt_setattr_service = NULL;
- }
- if (m->mdt_mdsc_service != NULL) {
- ptlrpc_unregister_service(m->mdt_mdsc_service);
- m->mdt_mdsc_service = NULL;
- }
- if (m->mdt_mdss_service != NULL) {
- ptlrpc_unregister_service(m->mdt_mdss_service);
- m->mdt_mdss_service = NULL;
- }
- if (m->mdt_dtss_service != NULL) {
- ptlrpc_unregister_service(m->mdt_dtss_service);
- m->mdt_dtss_service = NULL;
- }
- if (m->mdt_fld_service != NULL) {
- ptlrpc_unregister_service(m->mdt_fld_service);
- m->mdt_fld_service = NULL;
- }
- EXIT;
+ RETURN(0);
}
-static int mdt_start_ptlrpc_service(struct mdt_device *m)
+static void mdt_stack_pre_fini(const struct lu_env *env,
+ struct mdt_device *m, struct lu_device *top)
{
- static struct ptlrpc_service_conf conf;
- cfs_proc_dir_entry_t *procfs_entry;
- int rc = 0;
+ struct obd_device *obd = mdt2obd_dev(m);
+ struct lustre_cfg_bufs *bufs;
+ struct lustre_cfg *lcfg;
+ struct mdt_thread_info *info;
ENTRY;
- m->mdt_ldlm_client = &m->mdt_md_dev.md_lu_dev.ld_obd->obd_ldlm_client;
- ptlrpc_init_client(LDLM_CB_REQUEST_PORTAL, LDLM_CB_REPLY_PORTAL,
- "mdt_ldlm_client", m->mdt_ldlm_client);
+ LASSERT(top);
- procfs_entry = m->mdt_md_dev.md_lu_dev.ld_obd->obd_proc_entry;
-
- conf = (typeof(conf)) {
- .psc_name = LUSTRE_MDT_NAME,
- .psc_watchdog_factor = MDT_SERVICE_WATCHDOG_FACTOR,
- .psc_buf = {
- .bc_nbufs = MDS_NBUFS,
- .bc_buf_size = MDS_BUFSIZE,
- .bc_req_max_size = MDS_MAXREQSIZE,
- .bc_rep_max_size = MDS_MAXREPSIZE,
- .bc_req_portal = MDS_REQUEST_PORTAL,
- .bc_rep_portal = MDC_REPLY_PORTAL,
- },
- /*
- * We'd like to have a mechanism to set this on a per-device
- * basis, but alas...
- */
- .psc_thr = {
- .tc_thr_name = LUSTRE_MDT_NAME,
- .tc_thr_factor = MDT_THR_FACTOR,
- .tc_nthrs_init = MDT_NTHRS_INIT,
- .tc_nthrs_base = MDT_NTHRS_BASE,
- .tc_nthrs_max = MDT_NTHRS_MAX,
- .tc_nthrs_user = mds_num_threads,
- .tc_cpu_affinity = 1,
- .tc_ctx_tags = LCT_MD_THREAD,
- },
- .psc_cpt = {
- .cc_pattern = mds_num_cpts,
- },
- .psc_ops = {
- .so_req_handler = mdt_regular_handle,
- .so_req_printer = target_print_req,
- .so_hpreq_handler = ptlrpc_hpreq_handler,
- },
- };
- m->mdt_regular_service = ptlrpc_register_service(&conf, procfs_entry);
- if (IS_ERR(m->mdt_regular_service)) {
- rc = PTR_ERR(m->mdt_regular_service);
- CERROR("failed to start regular mdt service: %d\n", rc);
- m->mdt_regular_service = NULL;
+ info = lu_context_key_get(&env->le_ctx, &mdt_thread_key);
+ LASSERT(info != NULL);
- RETURN(rc);
+ bufs = &info->mti_u.bufs;
+
+ LASSERT(m->mdt_child_exp);
+ LASSERT(m->mdt_child_exp->exp_obd);
+ obd = m->mdt_child_exp->exp_obd;
+
+ /* process cleanup, pass mdt obd name to get obd umount flags */
+ /* XXX: this is needed because all layers are referenced by
+ * objects (some of them are pinned by osd, for example *
+ * the proper solution should be a model where object used
+ * by osd only doesn't have mdt/mdd slices -bzzz */
+ lustre_cfg_bufs_reset(bufs, obd->obd_name);
+ lustre_cfg_bufs_set_string(bufs, 1, NULL);
+ lcfg = lustre_cfg_new(LCFG_PRE_CLEANUP, bufs);
+ if (!lcfg) {
+ CERROR("%s:Cannot alloc lcfg!\n", mdt_obd_name(m));
+ return;
}
+ top->ld_ops->ldo_process_config(env, top, lcfg);
+ lustre_cfg_free(lcfg);
+ EXIT;
+}
- /*
- * readpage service configuration. Parameters have to be adjusted,
- * ideally.
- */
- memset(&conf, 0, sizeof(conf));
- conf = (typeof(conf)) {
- .psc_name = LUSTRE_MDT_NAME "_readpage",
- .psc_watchdog_factor = MDT_SERVICE_WATCHDOG_FACTOR,
- .psc_buf = {
- .bc_nbufs = MDS_NBUFS,
- .bc_buf_size = MDS_BUFSIZE,
- .bc_req_max_size = MDS_MAXREQSIZE,
- .bc_rep_max_size = MDS_MAXREPSIZE,
- .bc_req_portal = MDS_READPAGE_PORTAL,
- .bc_rep_portal = MDC_REPLY_PORTAL,
- },
- .psc_thr = {
- .tc_thr_name = "mdt_rdpg",
- .tc_thr_factor = MDT_RDPG_THR_FACTOR,
- .tc_nthrs_init = MDT_RDPG_NTHRS_INIT,
- .tc_nthrs_base = MDT_RDPG_NTHRS_BASE,
- .tc_nthrs_max = MDT_RDPG_NTHRS_MAX,
- .tc_nthrs_user = mds_rdpg_num_threads,
- .tc_cpu_affinity = 1,
- .tc_ctx_tags = LCT_MD_THREAD,
- },
- .psc_cpt = {
- .cc_pattern = mds_rdpg_num_cpts,
- },
- .psc_ops = {
- .so_req_handler = mdt_readpage_handle,
- .so_req_printer = target_print_req,
- .so_hpreq_handler = NULL,
- },
- };
- m->mdt_readpage_service = ptlrpc_register_service(&conf, procfs_entry);
- if (IS_ERR(m->mdt_readpage_service)) {
- rc = PTR_ERR(m->mdt_readpage_service);
- CERROR("failed to start readpage service: %d\n", rc);
- m->mdt_readpage_service = NULL;
+static void mdt_stack_fini(const struct lu_env *env,
+ struct mdt_device *m, struct lu_device *top)
+{
+ struct obd_device *obd = mdt2obd_dev(m);
+ struct lustre_cfg_bufs *bufs;
+ struct lustre_cfg *lcfg;
+ struct mdt_thread_info *info;
+ char flags[3]="";
+ ENTRY;
- GOTO(err_mdt_svc, rc);
- }
+ info = lu_context_key_get(&env->le_ctx, &mdt_thread_key);
+ LASSERT(info != NULL);
- /*
- * setattr service configuration.
- *
- * XXX To keep the compatibility with old client(< 2.2), we need to
- * preserve this portal for a certain time, it should be removed
- * eventually. LU-617.
- */
- memset(&conf, 0, sizeof(conf));
- conf = (typeof(conf)) {
- .psc_name = LUSTRE_MDT_NAME "_setattr",
- .psc_watchdog_factor = MDT_SERVICE_WATCHDOG_FACTOR,
- .psc_buf = {
- .bc_nbufs = MDS_NBUFS,
- .bc_buf_size = MDS_BUFSIZE,
- .bc_req_max_size = MDS_MAXREQSIZE,
- .bc_rep_max_size = MDS_MAXREPSIZE,
- .bc_req_portal = MDS_SETATTR_PORTAL,
- .bc_rep_portal = MDC_REPLY_PORTAL,
- },
- .psc_thr = {
- .tc_thr_name = "mdt_attr",
- .tc_thr_factor = MDT_SETA_THR_FACTOR,
- .tc_nthrs_init = MDT_SETA_NTHRS_INIT,
- .tc_nthrs_base = MDT_SETA_NTHRS_BASE,
- .tc_nthrs_max = MDT_SETA_NTHRS_MAX,
- .tc_nthrs_user = mds_attr_num_threads,
- .tc_cpu_affinity = 1,
- .tc_ctx_tags = LCT_MD_THREAD,
- },
- .psc_cpt = {
- .cc_pattern = mds_attr_num_cpts,
- },
- .psc_ops = {
- .so_req_handler = mdt_regular_handle,
- .so_req_printer = target_print_req,
- .so_hpreq_handler = NULL,
- },
- };
- m->mdt_setattr_service = ptlrpc_register_service(&conf, procfs_entry);
- if (IS_ERR(m->mdt_setattr_service)) {
- rc = PTR_ERR(m->mdt_setattr_service);
- CERROR("failed to start setattr service: %d\n", rc);
- m->mdt_setattr_service = NULL;
+ lu_dev_del_linkage(top->ld_site, top);
- GOTO(err_mdt_svc, rc);
- }
-
- /*
- * sequence controller service configuration
- */
- memset(&conf, 0, sizeof(conf));
- conf = (typeof(conf)) {
- .psc_name = LUSTRE_MDT_NAME "_mdsc",
- .psc_watchdog_factor = MDT_SERVICE_WATCHDOG_FACTOR,
- .psc_buf = {
- .bc_nbufs = MDS_NBUFS,
- .bc_buf_size = MDS_BUFSIZE,
- .bc_req_max_size = SEQ_MAXREQSIZE,
- .bc_rep_max_size = SEQ_MAXREPSIZE,
- .bc_req_portal = SEQ_CONTROLLER_PORTAL,
- .bc_rep_portal = MDC_REPLY_PORTAL,
- },
- .psc_thr = {
- .tc_thr_name = "mdt_mdsc",
- .tc_nthrs_init = MDT_OTHR_NTHRS_INIT,
- .tc_nthrs_max = MDT_OTHR_NTHRS_MAX,
- .tc_ctx_tags = LCT_MD_THREAD,
- },
- .psc_ops = {
- .so_req_handler = mdt_mdsc_handle,
- .so_req_printer = target_print_req,
- .so_hpreq_handler = NULL,
- },
- };
- m->mdt_mdsc_service = ptlrpc_register_service(&conf, procfs_entry);
- if (IS_ERR(m->mdt_mdsc_service)) {
- rc = PTR_ERR(m->mdt_mdsc_service);
- CERROR("failed to start seq controller service: %d\n", rc);
- m->mdt_mdsc_service = NULL;
-
- GOTO(err_mdt_svc, rc);
- }
-
- /*
- * metadata sequence server service configuration
- */
- memset(&conf, 0, sizeof(conf));
- conf = (typeof(conf)) {
- .psc_name = LUSTRE_MDT_NAME "_mdss",
- .psc_watchdog_factor = MDT_SERVICE_WATCHDOG_FACTOR,
- .psc_buf = {
- .bc_nbufs = MDS_NBUFS,
- .bc_buf_size = MDS_BUFSIZE,
- .bc_req_max_size = SEQ_MAXREQSIZE,
- .bc_rep_max_size = SEQ_MAXREPSIZE,
- .bc_req_portal = SEQ_METADATA_PORTAL,
- .bc_rep_portal = MDC_REPLY_PORTAL,
- },
- .psc_thr = {
- .tc_thr_name = "mdt_mdss",
- .tc_nthrs_init = MDT_OTHR_NTHRS_INIT,
- .tc_nthrs_max = MDT_OTHR_NTHRS_MAX,
- .tc_ctx_tags = LCT_MD_THREAD | LCT_DT_THREAD
- },
- .psc_ops = {
- .so_req_handler = mdt_mdss_handle,
- .so_req_printer = target_print_req,
- .so_hpreq_handler = NULL,
- },
- };
- m->mdt_mdss_service = ptlrpc_register_service(&conf, procfs_entry);
- if (IS_ERR(m->mdt_mdss_service)) {
- rc = PTR_ERR(m->mdt_mdss_service);
- CERROR("failed to start metadata seq server service: %d\n", rc);
- m->mdt_mdss_service = NULL;
-
- GOTO(err_mdt_svc, rc);
- }
-
- /*
- * Data sequence server service configuration. We want to have really
- * cluster-wide sequences space. This is why we start only one sequence
- * controller which manages space.
- */
- memset(&conf, 0, sizeof(conf));
- conf = (typeof(conf)) {
- .psc_name = LUSTRE_MDT_NAME "_dtss",
- .psc_watchdog_factor = MDT_SERVICE_WATCHDOG_FACTOR,
- .psc_buf = {
- .bc_nbufs = MDS_NBUFS,
- .bc_buf_size = MDS_BUFSIZE,
- .bc_req_max_size = SEQ_MAXREQSIZE,
- .bc_rep_max_size = SEQ_MAXREPSIZE,
- .bc_req_portal = SEQ_DATA_PORTAL,
- .bc_rep_portal = OSC_REPLY_PORTAL,
- },
- .psc_thr = {
- .tc_thr_name = "mdt_dtss",
- .tc_nthrs_init = MDT_OTHR_NTHRS_INIT,
- .tc_nthrs_max = MDT_OTHR_NTHRS_MAX,
- .tc_ctx_tags = LCT_MD_THREAD | LCT_DT_THREAD
- },
- .psc_ops = {
- .so_req_handler = mdt_dtss_handle,
- .so_req_printer = target_print_req,
- .so_hpreq_handler = NULL,
- },
- };
- m->mdt_dtss_service = ptlrpc_register_service(&conf, procfs_entry);
- if (IS_ERR(m->mdt_dtss_service)) {
- rc = PTR_ERR(m->mdt_dtss_service);
- CERROR("failed to start data seq server service: %d\n", rc);
- m->mdt_dtss_service = NULL;
-
- GOTO(err_mdt_svc, rc);
- }
-
- /* FLD service start */
- memset(&conf, 0, sizeof(conf));
- conf = (typeof(conf)) {
- .psc_name = LUSTRE_MDT_NAME "_fld",
- .psc_watchdog_factor = MDT_SERVICE_WATCHDOG_FACTOR,
- .psc_buf = {
- .bc_nbufs = MDS_NBUFS,
- .bc_buf_size = MDS_BUFSIZE,
- .bc_req_max_size = FLD_MAXREQSIZE,
- .bc_rep_max_size = FLD_MAXREPSIZE,
- .bc_req_portal = FLD_REQUEST_PORTAL,
- .bc_rep_portal = MDC_REPLY_PORTAL,
- },
- .psc_thr = {
- .tc_thr_name = "mdt_fld",
- .tc_nthrs_init = MDT_OTHR_NTHRS_INIT,
- .tc_nthrs_max = MDT_OTHR_NTHRS_MAX,
- .tc_ctx_tags = LCT_DT_THREAD | LCT_MD_THREAD
- },
- .psc_ops = {
- .so_req_handler = mdt_fld_handle,
- .so_req_printer = target_print_req,
- .so_hpreq_handler = NULL,
- },
- };
- m->mdt_fld_service = ptlrpc_register_service(&conf, procfs_entry);
- if (IS_ERR(m->mdt_fld_service)) {
- rc = PTR_ERR(m->mdt_fld_service);
- CERROR("failed to start fld service: %d\n", rc);
- m->mdt_fld_service = NULL;
-
- GOTO(err_mdt_svc, rc);
- }
-
- /*
- * mds-mds service configuration. Separate portal is used to allow
- * mds-mds requests be not blocked during recovery.
- */
- memset(&conf, 0, sizeof(conf));
- conf = (typeof(conf)) {
- .psc_name = LUSTRE_MDT_NAME "_mds",
- .psc_watchdog_factor = MDT_SERVICE_WATCHDOG_FACTOR,
- .psc_buf = {
- .bc_nbufs = MDS_NBUFS,
- .bc_buf_size = MDS_BUFSIZE,
- .bc_req_max_size = MDS_MAXREQSIZE,
- .bc_rep_max_size = MDS_MAXREPSIZE,
- .bc_req_portal = MDS_MDS_PORTAL,
- .bc_rep_portal = MDC_REPLY_PORTAL,
- },
- .psc_thr = {
- .tc_thr_name = "mdt_mds",
- .tc_nthrs_init = MDT_OTHR_NTHRS_INIT,
- .tc_nthrs_max = MDT_OTHR_NTHRS_MAX,
- .tc_ctx_tags = LCT_MD_THREAD,
- },
- .psc_ops = {
- .so_req_handler = mdt_xmds_handle,
- .so_req_printer = target_print_req,
- .so_hpreq_handler = ptlrpc_hpreq_handler,
- },
- };
- m->mdt_xmds_service = ptlrpc_register_service(&conf, procfs_entry);
- if (IS_ERR(m->mdt_xmds_service)) {
- rc = PTR_ERR(m->mdt_xmds_service);
- CERROR("failed to start xmds service: %d\n", rc);
- m->mdt_xmds_service = NULL;
-
- GOTO(err_mdt_svc, rc);
- }
-
- EXIT;
-err_mdt_svc:
- if (rc)
- mdt_stop_ptlrpc_service(m);
-
- return rc;
-}
-
-static void mdt_stack_fini(const struct lu_env *env,
- struct mdt_device *m, struct lu_device *top)
-{
- struct obd_device *obd = mdt2obd_dev(m);
- struct lustre_cfg_bufs *bufs;
- struct lustre_cfg *lcfg;
- struct mdt_thread_info *info;
- char flags[3]="";
- ENTRY;
-
- info = lu_context_key_get(&env->le_ctx, &mdt_thread_key);
- LASSERT(info != NULL);
-
- lu_dev_del_linkage(top->ld_site, top);
+ lu_site_purge(env, top->ld_site, -1);
bufs = &info->mti_u.bufs;
/* process cleanup, pass mdt obd name to get obd umount flags */
+ /* another purpose is to let all layers to release their objects */
lustre_cfg_bufs_reset(bufs, obd->obd_name);
if (obd->obd_force)
strcat(flags, "F");
top->ld_ops->ldo_process_config(env, top, lcfg);
lustre_cfg_free(lcfg);
- lu_stack_fini(env, top);
+ lu_site_purge(env, top->ld_site, -1);
+
m->mdt_child = NULL;
m->mdt_bottom = NULL;
- obd_disconnect(m->mdt_bottom_exp);
-}
-
-static struct lu_device *mdt_layer_setup(struct lu_env *env,
- const char *typename,
- struct lu_device *child,
- struct lustre_cfg *cfg)
-{
- const char *dev = lustre_cfg_string(cfg, 0);
- struct obd_type *type;
- struct lu_device_type *ldt;
- struct lu_device *d;
- int rc;
- ENTRY;
-
- /* find the type */
- type = class_get_type(typename);
- if (!type) {
- CERROR("Unknown type: '%s'\n", typename);
- GOTO(out, rc = -ENODEV);
- }
-
- rc = lu_env_refill((struct lu_env *)env);
- if (rc != 0) {
- CERROR("Failure to refill session: '%d'\n", rc);
- GOTO(out_type, rc);
- }
-
- ldt = type->typ_lu;
- if (ldt == NULL) {
- CERROR("type: '%s'\n", typename);
- GOTO(out_type, rc = -EINVAL);
- }
-
- ldt->ldt_obd_type = type;
- d = ldt->ldt_ops->ldto_device_alloc(env, ldt, cfg);
- if (IS_ERR(d)) {
- CERROR("Cannot allocate device: '%s'\n", typename);
- GOTO(out_type, rc = -ENODEV);
- }
+ obd_disconnect(m->mdt_child_exp);
+ m->mdt_child_exp = NULL;
- LASSERT(child->ld_site);
- d->ld_site = child->ld_site;
-
- type->typ_refcnt++;
- rc = ldt->ldt_ops->ldto_device_init(env, d, dev, child);
- if (rc) {
- CERROR("can't init device '%s', rc %d\n", typename, rc);
- GOTO(out_alloc, rc);
- }
- lu_device_get(d);
- lu_ref_add(&d->ld_reference, "lu-stack", &lu_site_init);
-
- lu_dev_add_linkage(d->ld_site, d);
- RETURN(d);
-out_alloc:
- ldt->ldt_ops->ldto_device_free(env, d);
- type->typ_refcnt--;
-out_type:
- class_put_type(type);
-out:
- return ERR_PTR(rc);
+ obd_disconnect(m->mdt_bottom_exp);
+ m->mdt_child_exp = NULL;
}
static int mdt_connect_to_next(const struct lu_env *env, struct mdt_device *m,
RETURN(rc);
}
-static int mdt_stack_init(struct lu_env *env,
- struct mdt_device *m,
- struct lustre_cfg *cfg,
- struct lustre_mount_info *lmi)
+static int mdt_stack_init(const struct lu_env *env, struct mdt_device *mdt,
+ struct lustre_cfg *cfg)
{
- struct lu_device *d = &m->mdt_md_dev.md_lu_dev;
- struct lu_device *tmp;
- struct md_device *md;
- struct lu_device *child_lu_dev;
- char *osdname;
- int rc;
+ char *dev = lustre_cfg_string(cfg, 0);
+ int rc, name_size, uuid_size;
+ char *name, *uuid, *p;
+ struct lustre_cfg_bufs *bufs;
+ struct lustre_cfg *lcfg;
+ struct obd_device *obd;
+ struct lustre_profile *lprof;
+ struct lu_site *site;
ENTRY;
- /* find bottom osd */
- OBD_ALLOC(osdname, MTI_NAME_MAXLEN);
- if (osdname == NULL)
- RETURN(-ENOMEM);
+ /* in 1.8 we had the only device in the stack - MDS.
+ * 2.0 introduces MDT, MDD, OSD; MDT starts others internally.
+ * in 2.3 OSD is instantiated by obd_mount.c, so we need
+ * to generate names and setup MDT, MDD. MDT will be using
+ * generated name to connect to MDD. for MDD the next device
+ * will be LOD with name taken from so called "profile" which
+ * is generated by mount_option line
+ *
+ * 1.8 MGS generates config. commands like this:
+ * #06 (104)mount_option 0: 1:lustre-MDT0000 2:lustre-mdtlov
+ * #08 (120)setup 0:lustre-MDT0000 1:dev 2:type 3:lustre-MDT0000
+ * 2.0 MGS generates config. commands like this:
+ * #07 (112)mount_option 0: 1:lustre-MDT0000 2:lustre-MDT0000-mdtlov
+ * #08 (160)setup 0:lustre-MDT0000 1:lustre-MDT0000_UUID 2:0
+ * 3:lustre-MDT0000-mdtlov 4:f
+ *
+ * we generate MDD name from MDT one, just replacing T with D
+ *
+ * after all the preparations, the logical equivalent will be
+ * #01 (160)setup 0:lustre-MDD0000 1:lustre-MDD0000_UUID 2:0
+ * 3:lustre-MDT0000-mdtlov 4:f
+ * #02 (160)setup 0:lustre-MDT0000 1:lustre-MDT0000_UUID 2:0
+ * 3:lustre-MDD0000 4:f
+ *
+ * notice we build the stack from down to top: MDD first, then MDT */
+
+ name_size = MAX_OBD_NAME;
+ uuid_size = MAX_OBD_NAME;
+
+ OBD_ALLOC(name, name_size);
+ OBD_ALLOC(uuid, uuid_size);
+ if (name == NULL || uuid == NULL)
+ GOTO(cleanup_mem, rc = -ENOMEM);
+
+ OBD_ALLOC_PTR(bufs);
+ if (!bufs)
+ GOTO(cleanup_mem, rc = -ENOMEM);
+
+ strcpy(name, dev);
+ p = strstr(name, "-MDT");
+ if (p == NULL)
+ GOTO(cleanup_mem, rc = -ENOMEM);
+ p[3] = 'D';
+
+ snprintf(uuid, MAX_OBD_NAME, "%s_UUID", name);
+
+ lprof = class_get_profile(lustre_cfg_string(cfg, 0));
+ if (lprof == NULL || lprof->lp_dt == NULL) {
+ CERROR("can't find the profile: %s\n",
+ lustre_cfg_string(cfg, 0));
+ GOTO(cleanup_mem, rc = -EINVAL);
+ }
+
+ lustre_cfg_bufs_reset(bufs, name);
+ lustre_cfg_bufs_set_string(bufs, 1, LUSTRE_MDD_NAME);
+ lustre_cfg_bufs_set_string(bufs, 2, uuid);
+ lustre_cfg_bufs_set_string(bufs, 3, lprof->lp_dt);
- snprintf(osdname, MTI_NAME_MAXLEN, "%s-osd", lustre_cfg_string(cfg, 0));
- rc = mdt_connect_to_next(env, m, osdname, &m->mdt_bottom_exp);
- OBD_FREE(osdname, MTI_NAME_MAXLEN);
+ lcfg = lustre_cfg_new(LCFG_ATTACH, bufs);
+ if (!lcfg)
+ GOTO(free_bufs, rc = -ENOMEM);
+
+ rc = class_attach(lcfg);
if (rc)
- RETURN(rc);
+ GOTO(lcfg_cleanup, rc);
- tmp = m->mdt_bottom_exp->exp_obd->obd_lu_dev;
- LASSERT(tmp);
- m->mdt_bottom = lu2dt_dev(tmp);
-
- /* initialize site's pointers: md_site, top device */
- d->ld_site = tmp->ld_site;
- d->ld_site->ls_top_dev = d;
- m->mdt_mite.ms_lu = tmp->ld_site;
- tmp->ld_site->ld_md_site = &m->mdt_mite;
- LASSERT(d->ld_site);
- d = tmp;
-
- tmp = mdt_layer_setup(env, LUSTRE_MDD_NAME, d, cfg);
- if (IS_ERR(tmp)) {
- GOTO(out, rc = PTR_ERR(tmp));
- }
- d = tmp;
- md = lu2md_dev(d);
+ obd = class_name2obd(name);
+ if (!obd) {
+ CERROR("Can not find obd %s (%s in config)\n",
+ MDD_OBD_NAME, lustre_cfg_string(cfg, 0));
+ GOTO(class_detach, rc = -EINVAL);
+ }
- tmp = mdt_layer_setup(env, LUSTRE_CMM_NAME, d, cfg);
- if (IS_ERR(tmp)) {
- GOTO(out, rc = PTR_ERR(tmp));
- }
- d = tmp;
- /*set mdd upcall device*/
- md_upcall_dev_set(md, lu2md_dev(d));
+ lustre_cfg_free(lcfg);
- md = lu2md_dev(d);
- /*set cmm upcall device*/
- md_upcall_dev_set(md, &m->mdt_md_dev);
+ lustre_cfg_bufs_reset(bufs, name);
+ lustre_cfg_bufs_set_string(bufs, 1, uuid);
+ lustre_cfg_bufs_set_string(bufs, 2, dev);
+ lustre_cfg_bufs_set_string(bufs, 3, lprof->lp_dt);
- m->mdt_child = lu2md_dev(d);
+ lcfg = lustre_cfg_new(LCFG_SETUP, bufs);
- /* process setup config */
- tmp = &m->mdt_md_dev.md_lu_dev;
- rc = tmp->ld_ops->ldo_process_config(env, tmp, cfg);
- if (rc)
- GOTO(out, rc);
+ rc = class_setup(obd, lcfg);
+ if (rc)
+ GOTO(class_detach, rc);
+
+ /* connect to MDD we just setup */
+ rc = mdt_connect_to_next(env, mdt, name, &mdt->mdt_child_exp);
+ if (rc)
+ RETURN(rc);
+
+ site = mdt->mdt_child_exp->exp_obd->obd_lu_dev->ld_site;
+ LASSERT(site);
+ LASSERT(mdt->mdt_md_dev.md_lu_dev.ld_site == NULL);
+ mdt->mdt_md_dev.md_lu_dev.ld_site = site;
+ site->ls_top_dev = &mdt->mdt_md_dev.md_lu_dev;
+ mdt->mdt_child = lu2md_dev(mdt->mdt_child_exp->exp_obd->obd_lu_dev);
- /* initialize local objects */
- child_lu_dev = &m->mdt_child->md_lu_dev;
- rc = child_lu_dev->ld_ops->ldo_prepare(env,
- &m->mdt_md_dev.md_lu_dev,
- child_lu_dev);
+ /* now connect to bottom OSD */
+ snprintf(name, MAX_OBD_NAME, "%s-osd", dev);
+ rc = mdt_connect_to_next(env, mdt, name, &mdt->mdt_bottom_exp);
if (rc)
- GOTO(out, rc);
+ RETURN(rc);
+ mdt->mdt_bottom =
+ lu2dt_dev(mdt->mdt_bottom_exp->exp_obd->obd_lu_dev);
- rc = m->mdt_child->md_ops->mdo_root_get(env, m->mdt_child,
- &m->mdt_md_root_fid);
-out:
- /* fini from last known good lu_device */
- if (rc)
- mdt_stack_fini(env, m, d);
+ rc = lu_env_refill((struct lu_env *)env);
+ if (rc != 0)
+ CERROR("Failure to refill session: '%d'\n", rc);
- return rc;
+ lu_dev_add_linkage(site, &mdt->mdt_md_dev.md_lu_dev);
+
+ EXIT;
+class_detach:
+ if (rc)
+ class_detach(obd, lcfg);
+lcfg_cleanup:
+ lustre_cfg_free(lcfg);
+free_bufs:
+ OBD_FREE_PTR(bufs);
+cleanup_mem:
+ if (name)
+ OBD_FREE(name, name_size);
+ if (uuid)
+ OBD_FREE(uuid, uuid_size);
+ RETURN(rc);
}
-/**
- * setup CONFIG_ORIG context, used to access local config log.
- * this may need to be rewrite as part of llog rewrite for lu-api.
- */
-static int mdt_obd_llog_setup(struct obd_device *obd,
- struct lustre_sb_info *lsi)
+/* setup quota master target on MDT0 */
+static int mdt_quota_init(const struct lu_env *env, struct mdt_device *mdt,
+ struct lustre_cfg *cfg)
{
- int rc;
+ struct obd_device *obd;
+ char *dev = lustre_cfg_string(cfg, 0);
+ char *qmtname, *uuid, *p;
+ struct lustre_cfg_bufs *bufs;
+ struct lustre_cfg *lcfg;
+ struct lustre_profile *lprof;
+ struct obd_connect_data *data;
+ int rc;
+ ENTRY;
- LASSERT(obd->obd_fsops == NULL);
+ LASSERT(mdt->mdt_qmt_exp == NULL);
+ LASSERT(mdt->mdt_qmt_dev == NULL);
+
+ /* quota master is on MDT0 only for now */
+ if (mdt->mdt_seq_site.ss_node_id != 0)
+ RETURN(0);
+
+ /* MGS generates config commands which look as follows:
+ * #01 (160)setup 0:lustre-MDT0000 1:lustre-MDT0000_UUID 2:0
+ * 3:lustre-MDT0000-mdtlov 4:f
+ *
+ * We generate the QMT name from the MDT one, just replacing MD with QM
+ * after all the preparations, the logical equivalent will be:
+ * #01 (160)setup 0:lustre-QMT0000 1:lustre-QMT0000_UUID 2:0
+ * 3:lustre-MDT0000-osd 4:f */
+ OBD_ALLOC(qmtname, MAX_OBD_NAME);
+ OBD_ALLOC(uuid, UUID_MAX);
+ OBD_ALLOC_PTR(bufs);
+ OBD_ALLOC_PTR(data);
+ if (qmtname == NULL || uuid == NULL || bufs == NULL || data == NULL)
+ GOTO(cleanup_mem, rc = -ENOMEM);
+
+ strcpy(qmtname, dev);
+ p = strstr(qmtname, "-MDT");
+ if (p == NULL)
+ GOTO(cleanup_mem, rc = -ENOMEM);
+ /* replace MD with QM */
+ p[1] = 'Q';
+ p[2] = 'M';
+
+ snprintf(uuid, UUID_MAX, "%s_UUID", qmtname);
+
+ lprof = class_get_profile(lustre_cfg_string(cfg, 0));
+ if (lprof == NULL || lprof->lp_dt == NULL) {
+ CERROR("can't find profile for %s\n",
+ lustre_cfg_string(cfg, 0));
+ GOTO(cleanup_mem, rc = -EINVAL);
+ }
- obd->obd_fsops = fsfilt_get_ops(lsi->lsi_fstype);
- if (IS_ERR(obd->obd_fsops))
- return PTR_ERR(obd->obd_fsops);
+ lustre_cfg_bufs_reset(bufs, qmtname);
+ lustre_cfg_bufs_set_string(bufs, 1, LUSTRE_QMT_NAME);
+ lustre_cfg_bufs_set_string(bufs, 2, uuid);
+ lustre_cfg_bufs_set_string(bufs, 3, lprof->lp_dt);
- rc = fsfilt_setup(obd, lsi->lsi_srv_mnt->mnt_sb);
- if (rc) {
- fsfilt_put_ops(obd->obd_fsops);
- return rc;
- }
+ lcfg = lustre_cfg_new(LCFG_ATTACH, bufs);
+ if (!lcfg)
+ GOTO(cleanup_mem, rc = -ENOMEM);
- OBD_SET_CTXT_MAGIC(&obd->obd_lvfs_ctxt);
- obd->obd_lvfs_ctxt.pwdmnt = lsi->lsi_srv_mnt;
- obd->obd_lvfs_ctxt.pwd = lsi->lsi_srv_mnt->mnt_root;
- obd->obd_lvfs_ctxt.fs = get_ds();
+ rc = class_attach(lcfg);
+ if (rc)
+ GOTO(lcfg_cleanup, rc);
- rc = llog_setup(obd, &obd->obd_olg, LLOG_CONFIG_ORIG_CTXT, obd,
- 0, NULL, &llog_lvfs_ops);
- if (rc) {
- CERROR("llog_setup() failed: %d\n", rc);
- fsfilt_put_ops(obd->obd_fsops);
- }
+ obd = class_name2obd(qmtname);
+ if (!obd) {
+ CERROR("Can not find obd %s (%s in config)\n", qmtname,
+ lustre_cfg_string(cfg, 0));
+ GOTO(class_detach, rc = -EINVAL);
+ }
- return rc;
+ lustre_cfg_free(lcfg);
+
+ lustre_cfg_bufs_reset(bufs, qmtname);
+ lustre_cfg_bufs_set_string(bufs, 1, uuid);
+ lustre_cfg_bufs_set_string(bufs, 2, dev);
+
+ /* for quota, the next device should be the OSD device */
+ lustre_cfg_bufs_set_string(bufs, 3,
+ mdt->mdt_bottom->dd_lu_dev.ld_obd->obd_name);
+
+ lcfg = lustre_cfg_new(LCFG_SETUP, bufs);
+
+ rc = class_setup(obd, lcfg);
+ if (rc)
+ GOTO(class_detach, rc);
+
+ mdt->mdt_qmt_dev = obd->obd_lu_dev;
+
+ /* configure local quota objects */
+ rc = mdt->mdt_qmt_dev->ld_ops->ldo_prepare(env,
+ &mdt->mdt_md_dev.md_lu_dev,
+ mdt->mdt_qmt_dev);
+ if (rc)
+ GOTO(class_cleanup, rc);
+
+ /* connect to quota master target */
+ data->ocd_connect_flags = OBD_CONNECT_VERSION;
+ data->ocd_version = LUSTRE_VERSION_CODE;
+ rc = obd_connect(NULL, &mdt->mdt_qmt_exp, obd, &obd->obd_uuid,
+ data, NULL);
+ if (rc) {
+ CERROR("cannot connect to quota master device %s (%d)\n",
+ qmtname, rc);
+ GOTO(class_cleanup, rc);
+ }
+
+ EXIT;
+class_cleanup:
+ if (rc) {
+ class_manual_cleanup(obd);
+ mdt->mdt_qmt_dev = NULL;
+ }
+class_detach:
+ if (rc)
+ class_detach(obd, lcfg);
+lcfg_cleanup:
+ lustre_cfg_free(lcfg);
+cleanup_mem:
+ if (bufs)
+ OBD_FREE_PTR(bufs);
+ if (qmtname)
+ OBD_FREE(qmtname, MAX_OBD_NAME);
+ if (uuid)
+ OBD_FREE(uuid, UUID_MAX);
+ if (data)
+ OBD_FREE_PTR(data);
+ return rc;
}
-static void mdt_obd_llog_cleanup(struct obd_device *obd)
+/* Shutdown quota master target associated with mdt */
+static void mdt_quota_fini(const struct lu_env *env, struct mdt_device *mdt)
{
- struct llog_ctxt *ctxt;
+ ENTRY;
- ctxt = llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT);
- if (ctxt)
- llog_cleanup(ctxt);
+ if (mdt->mdt_qmt_exp == NULL)
+ RETURN_EXIT;
+ LASSERT(mdt->mdt_qmt_dev != NULL);
- if (obd->obd_fsops) {
- fsfilt_put_ops(obd->obd_fsops);
- obd->obd_fsops = NULL;
- }
+ /* the qmt automatically shuts down when the mdt disconnects */
+ obd_disconnect(mdt->mdt_qmt_exp);
+ mdt->mdt_qmt_exp = NULL;
+ mdt->mdt_qmt_dev = NULL;
+ EXIT;
}
static void mdt_fini(const struct lu_env *env, struct mdt_device *m)
ping_evictor_stop();
- mdt_stop_ptlrpc_service(m);
+
+ mdt_stack_pre_fini(env, m, md2lu_dev(m->mdt_child));
mdt_llog_ctxt_unclone(env, m, LLOG_CHANGELOG_ORIG_CTXT);
- mdt_obd_llog_cleanup(obd);
obd_exports_barrier(obd);
obd_zombie_barrier();
mdt_procfs_fini(m);
-#ifdef HAVE_QUOTA_SUPPORT
- next->md_ops->mdo_quota.mqo_cleanup(env, next);
-#endif
- lut_fini(env, &m->mdt_lut);
+ tgt_fini(env, &m->mdt_lut);
mdt_fs_cleanup(env, m);
upcall_cache_cleanup(m->mdt_identity_cache);
m->mdt_identity_cache = NULL;
d->ld_obd->obd_namespace = m->mdt_namespace = NULL;
}
+ mdt_quota_fini(env, m);
+
cfs_free_nidlist(&m->mdt_nosquash_nids);
if (m->mdt_nosquash_str) {
OBD_FREE(m->mdt_nosquash_str, m->mdt_nosquash_strlen);
m->mdt_nosquash_strlen = 0;
}
+ next->md_ops->mdo_iocontrol(env, next, OBD_IOC_PAUSE_LFSCK,
+ 0, NULL);
mdt_seq_fini(env, m);
- mdt_seq_fini_cli(m);
mdt_fld_fini(env, m);
sptlrpc_rule_set_free(&m->mdt_sptlrpc_rset);
sptlrpc_target_update_exp_flavor(obd, &tmp_rset);
- cfs_write_lock(&m->mdt_sptlrpc_lock);
+ write_lock(&m->mdt_sptlrpc_lock);
sptlrpc_rule_set_free(&m->mdt_sptlrpc_rset);
m->mdt_sptlrpc_rset = tmp_rset;
- cfs_write_unlock(&m->mdt_sptlrpc_lock);
+ write_unlock(&m->mdt_sptlrpc_lock);
return 0;
}
struct lustre_mount_info *lmi = NULL;
struct lustre_sb_info *lsi;
struct lu_site *s;
- struct md_site *mite;
+ struct seq_server_site *ss_site;
const char *identity_upcall = "NONE";
struct md_device *next;
int rc;
LASSERT(obd != NULL);
m->mdt_max_mdsize = MAX_MD_SIZE; /* 4 stripes */
- m->mdt_max_cookiesize = sizeof(struct llog_cookie);
m->mdt_som_conf = 0;
/* CMD is supported only in IAM mode */
LASSERT(num);
node_id = simple_strtol(num, NULL, 10);
- if (!(lsi->lsi_flags & LDD_F_IAM_DIR) && node_id) {
- CERROR("CMD Operation not allowed in IOP mode\n");
- GOTO(err_lmi, rc = -EINVAL);
- }
-
- obd->u.obt.obt_magic = OBT_MAGIC;
+ obd->u.obt.obt_magic = OBT_MAGIC;
}
- cfs_rwlock_init(&m->mdt_sptlrpc_lock);
+ rwlock_init(&m->mdt_sptlrpc_lock);
sptlrpc_rule_set_init(&m->mdt_sptlrpc_rset);
- cfs_spin_lock_init(&m->mdt_ioepoch_lock);
+ spin_lock_init(&m->mdt_ioepoch_lock);
m->mdt_opts.mo_compat_resname = 0;
m->mdt_opts.mo_mds_capa = 1;
m->mdt_opts.mo_oss_capa = 1;
CFS_INIT_LIST_HEAD(&m->mdt_nosquash_nids);
m->mdt_nosquash_str = NULL;
m->mdt_nosquash_strlen = 0;
- cfs_init_rwsem(&m->mdt_squash_sem);
- cfs_spin_lock_init(&m->mdt_osfs_lock);
+ init_rwsem(&m->mdt_squash_sem);
+ spin_lock_init(&m->mdt_osfs_lock);
m->mdt_osfs_age = cfs_time_shift_64(-1000);
+ m->mdt_enable_remote_dir = 0;
m->mdt_md_dev.md_lu_dev.ld_ops = &mdt_lu_ops;
m->mdt_md_dev.md_lu_dev.ld_obd = obd;
obd->obd_lu_dev = &m->mdt_md_dev.md_lu_dev;
/* init the stack */
- rc = mdt_stack_init((struct lu_env *)env, m, cfg, lmi);
+ rc = mdt_stack_init((struct lu_env *)env, m, cfg);
if (rc) {
CERROR("Can't init device stack, rc %d\n", rc);
- RETURN(rc);
+ GOTO(err_lmi, rc);
}
s = m->mdt_md_dev.md_lu_dev.ld_site;
- mite = &m->mdt_mite;
+ ss_site = &m->mdt_seq_site;
+ s->ld_seq_site = ss_site;
+ ss_site->ss_lu = s;
/* set server index */
- mite->ms_node_id = node_id;
+ ss_site->ss_node_id = node_id;
- /* failover is the default
- * FIXME: we do not failout mds0/mgs, which may cause some problems.
- * assumed whose ms_node_id == 0 XXX
- * */
+ /* failover is the default
+ * FIXME: we do not failout mds0/mgs, which may cause some problems.
+ * assumed whose ss_node_id == 0 XXX
+ * */
obd->obd_replayable = 1;
/* No connection accepted until configurations will finish */
obd->obd_no_conn = 1;
}
}
- rc = lut_init(env, &m->mdt_lut, obd, m->mdt_bottom);
+ rc = tgt_init(env, &m->mdt_lut, obd, m->mdt_bottom);
if (rc)
GOTO(err_fini_stack, rc);
- rc = mdt_fld_init(env, obd->obd_name, m);
- if (rc)
- GOTO(err_lut, rc);
+ rc = mdt_fld_init(env, obd->obd_name, m);
+ if (rc)
+ GOTO(err_lut, rc);
- rc = mdt_seq_init(env, obd->obd_name, m);
- if (rc)
- GOTO(err_fini_fld, rc);
+ rc = mdt_seq_init(env, obd->obd_name, m);
+ if (rc)
+ GOTO(err_fini_fld, rc);
snprintf(info->mti_u.ns_name, sizeof info->mti_u.ns_name,
LUSTRE_MDT_NAME"-%p", m);
if (rc)
GOTO(err_capa, rc);
- rc = mdt_obd_llog_setup(obd, lsi);
- if (rc)
- GOTO(err_fs_cleanup, rc);
-
- rc = mdt_llog_ctxt_clone(env, m, LLOG_CHANGELOG_ORIG_CTXT);
- if (rc)
- GOTO(err_llog_cleanup, rc);
-
mdt_adapt_sptlrpc_conf(obd, 1);
next = m->mdt_child;
-#ifdef HAVE_QUOTA_SUPPORT
- rc = next->md_ops->mdo_quota.mqo_setup(env, next, lmi->lmi_mnt);
- if (rc)
- GOTO(err_llog_cleanup, rc);
-#endif
-
rc = next->md_ops->mdo_iocontrol(env, next, OBD_IOC_GET_MNTOPT, 0,
&mntopts);
if (rc)
- GOTO(err_quota, rc);
+ GOTO(err_llog_cleanup, rc);
if (mntopts & MNTOPT_USERXATTR)
m->mdt_opts.mo_user_xattr = 1;
if (IS_ERR(m->mdt_identity_cache)) {
rc = PTR_ERR(m->mdt_identity_cache);
m->mdt_identity_cache = NULL;
- GOTO(err_quota, rc);
+ GOTO(err_llog_cleanup, rc);
}
- target_recovery_init(&m->mdt_lut, mdt_recovery_handle);
-
rc = mdt_procfs_init(m, dev);
if (rc) {
CERROR("Can't init MDT lprocfs, rc %d\n", rc);
GOTO(err_recovery, rc);
}
- rc = mdt_start_ptlrpc_service(m);
- if (rc)
- GOTO(err_procfs, rc);
+ rc = mdt_quota_init(env, m, cfg);
+ if (rc)
+ GOTO(err_procfs, rc);
+
+ m->mdt_ldlm_client = &mdt2obd_dev(m)->obd_ldlm_client;
+ ptlrpc_init_client(LDLM_CB_REQUEST_PORTAL, LDLM_CB_REPLY_PORTAL,
+ "mdt_ldlm_client", m->mdt_ldlm_client);
ping_evictor_start();
- if (obd->obd_recovering == 0)
- mdt_postrecov(env, m);
+ /* recovery will be started upon mdt_prepare()
+ * when the whole stack is complete and ready
+ * to serve the requests */
mdt_init_capa_ctxt(env, m);
RETURN(0);
- ping_evictor_stop();
- mdt_stop_ptlrpc_service(m);
err_procfs:
mdt_procfs_fini(m);
err_recovery:
target_recovery_fini(obd);
upcall_cache_cleanup(m->mdt_identity_cache);
m->mdt_identity_cache = NULL;
-err_quota:
-#ifdef HAVE_QUOTA_SUPPORT
- next->md_ops->mdo_quota.mqo_cleanup(env, next);
-#endif
err_llog_cleanup:
mdt_llog_ctxt_unclone(env, m, LLOG_CHANGELOG_ORIG_CTXT);
- mdt_obd_llog_cleanup(obd);
-err_fs_cleanup:
mdt_fs_cleanup(env, m);
err_capa:
cfs_timer_disarm(&m->mdt_ck_timer);
ldlm_namespace_free(m->mdt_namespace, NULL, 0);
obd->obd_namespace = m->mdt_namespace = NULL;
err_fini_seq:
- mdt_seq_fini(env, m);
+ mdt_seq_fini(env, m);
err_fini_fld:
- mdt_fld_fini(env, m);
+ mdt_fld_fini(env, m);
err_lut:
- lut_fini(env, &m->mdt_lut);
+ tgt_fini(env, &m->mdt_lut);
err_fini_stack:
mdt_stack_fini(env, m, md2lu_dev(m->mdt_child));
err_lmi:
return (rc);
}
-/* For interoperability between 1.8 and 2.0. */
+/* For interoperability, the left element is old parameter, the right one
+ * is the new version of the parameter, if some parameter is deprecated,
+ * the new version should be set as NULL. */
static struct cfg_interop_param mdt_interop_param[] = {
{ "mdt.group_upcall", NULL },
- { "mdt.quota_type", "mdd.quota_type" },
+ { "mdt.quota_type", NULL },
+ { "mdd.quota_type", NULL },
{ "mdt.rootsquash", "mdt.root_squash" },
{ "mdt.nosquash_nid", "mdt.nosquash_nids" },
{ NULL }
struct mdt_device *m = mdt_dev(d);
struct md_device *md_next = m->mdt_child;
struct lu_device *next = md2lu_dev(md_next);
- int rc = 0;
+ int rc;
ENTRY;
switch (cfg->lcfg_command) {
struct lprocfs_static_vars lvars;
struct obd_device *obd = d->ld_obd;
- /* For interoperability between 1.8 and 2.0. */
+ /* For interoperability */
struct cfg_interop_param *ptr = NULL;
struct lustre_cfg *old_cfg = NULL;
char *param = NULL;
ptr = class_find_old_param(param, mdt_interop_param);
if (ptr != NULL) {
if (ptr->new_param == NULL) {
- CWARN("For 1.8 interoperability, skip this %s."
+ rc = 0;
+ CWARN("For interoperability, skip this %s."
" It is obsolete.\n", ptr->old_param);
- break;
+ break;
}
CWARN("Found old param %s, changed it to %s.\n",
break;
}
- case LCFG_ADD_MDC:
- /*
- * Add mdc hook to get first MDT uuid and connect it to
- * ls->controller to use for seq manager.
- */
- rc = next->ld_ops->ldo_process_config(env, next, cfg);
- if (rc)
- CERROR("Can't add mdc, rc %d\n", rc);
- else
- rc = mdt_seq_init_cli(env, mdt_dev(d), cfg);
- break;
default:
/* others are passed further */
rc = next->ld_ops->ldo_process_config(env, next, cfg);
lu_object_init(o, h, d);
lu_object_add_top(h, o);
o->lo_ops = &mdt_obj_ops;
- cfs_mutex_init(&mo->mot_ioepoch_mutex);
- cfs_mutex_init(&mo->mot_lov_mutex);
+ mutex_init(&mo->mot_ioepoch_mutex);
+ mutex_init(&mo->mot_lov_mutex);
RETURN(o);
} else
RETURN(NULL);
mdto->mot_ioepoch_count, mdto->mot_writecount);
}
-static const struct lu_device_operations mdt_lu_ops = {
+static int mdt_prepare(const struct lu_env *env,
+ struct lu_device *pdev,
+ struct lu_device *cdev)
+{
+ struct mdt_device *mdt = mdt_dev(cdev);
+ struct lu_device *next = &mdt->mdt_child->md_lu_dev;
+ struct obd_device *obd = cdev->ld_obd;
+ int rc;
+
+ ENTRY;
+
+ LASSERT(obd);
+
+ rc = next->ld_ops->ldo_prepare(env, cdev, next);
+ if (rc)
+ RETURN(rc);
+
+ rc = mdt_llog_ctxt_clone(env, mdt, LLOG_CHANGELOG_ORIG_CTXT);
+ if (rc)
+ RETURN(rc);
+
+ rc = mdt->mdt_child->md_ops->mdo_iocontrol(env, mdt->mdt_child,
+ OBD_IOC_START_LFSCK,
+ 0, NULL);
+ if (rc != 0)
+ CWARN("Fail to auto trigger paused LFSCK.\n");
+
+ rc = mdt->mdt_child->md_ops->mdo_root_get(env, mdt->mdt_child,
+ &mdt->mdt_md_root_fid);
+ if (rc)
+ RETURN(rc);
+
+ LASSERT(!test_bit(MDT_FL_CFGLOG, &mdt->mdt_state));
+ target_recovery_init(&mdt->mdt_lut, mdt_recovery_handle);
+ set_bit(MDT_FL_CFGLOG, &mdt->mdt_state);
+ LASSERT(obd->obd_no_conn);
+ spin_lock(&obd->obd_dev_lock);
+ obd->obd_no_conn = 0;
+ spin_unlock(&obd->obd_dev_lock);
+
+ if (obd->obd_recovering == 0)
+ mdt_postrecov(env, mdt);
+
+ RETURN(rc);
+}
+
+const struct lu_device_operations mdt_lu_ops = {
.ldo_object_alloc = mdt_object_alloc,
.ldo_process_config = mdt_process_config,
+ .ldo_prepare = mdt_prepare,
};
static const struct lu_object_operations mdt_obj_ops = {
RETURN(0);
}
-/* mds_connect_internal */
+/**
+ * Match client and server connection feature flags.
+ *
+ * Compute the compatibility flags for a connection request based on
+ * features mutually supported by client and server.
+ *
+ * The obd_export::exp_connect_data.ocd_connect_flags field in \a exp
+ * must not be updated here, otherwise a partially initialized value may
+ * be exposed. After the connection request is successfully processed,
+ * the top-level MDT connect request handler atomically updates the export
+ * connect flags from the obd_connect_data::ocd_connect_flags field of the
+ * reply. \see mdt_connect().
+ *
+ * \param exp the obd_export associated with this client/target pair
+ * \param mdt the target device for the connection
+ * \param data stores data for this connect request
+ *
+ * \retval 0 success
+ * \retval -EPROTO \a data unexpectedly has zero obd_connect_data::ocd_brw_size
+ * \retval -EBADE client and server feature requirements are incompatible
+ */
static int mdt_connect_internal(struct obd_export *exp,
- struct mdt_device *mdt,
- struct obd_connect_data *data)
-{
- if (data != NULL) {
- data->ocd_connect_flags &= MDT_CONNECT_SUPPORTED;
- data->ocd_ibits_known &= MDS_INODELOCK_FULL;
-
- /* If no known bits (which should not happen, probably,
- as everybody should support LOOKUP and UPDATE bits at least)
- revert to compat mode with plain locks. */
- if (!data->ocd_ibits_known &&
- data->ocd_connect_flags & OBD_CONNECT_IBITS)
- data->ocd_connect_flags &= ~OBD_CONNECT_IBITS;
-
- if (!mdt->mdt_opts.mo_acl)
- data->ocd_connect_flags &= ~OBD_CONNECT_ACL;
-
- if (!mdt->mdt_opts.mo_user_xattr)
- data->ocd_connect_flags &= ~OBD_CONNECT_XATTR;
-
- if (!mdt->mdt_som_conf)
- data->ocd_connect_flags &= ~OBD_CONNECT_SOM;
-
- if (data->ocd_connect_flags & OBD_CONNECT_BRW_SIZE) {
- data->ocd_brw_size = min(data->ocd_brw_size,
- (__u32)(PTLRPC_MAX_BRW_PAGES << CFS_PAGE_SHIFT));
- if (data->ocd_brw_size == 0) {
- CERROR("%s: cli %s/%p ocd_connect_flags: "LPX64
- " ocd_version: %x ocd_grant: %d "
- "ocd_index: %u ocd_brw_size is "
- "unexpectedly zero, network data "
- "corruption? Refusing connection of this"
- " client\n",
- exp->exp_obd->obd_name,
- exp->exp_client_uuid.uuid,
- exp, data->ocd_connect_flags, data->ocd_version,
- data->ocd_grant, data->ocd_index);
- return -EPROTO;
- }
- }
+ struct mdt_device *mdt,
+ struct obd_connect_data *data)
+{
+ LASSERT(data != NULL);
+
+ data->ocd_connect_flags &= MDT_CONNECT_SUPPORTED;
+ data->ocd_ibits_known &= MDS_INODELOCK_FULL;
+
+ /* If no known bits (which should not happen, probably,
+ as everybody should support LOOKUP and UPDATE bits at least)
+ revert to compat mode with plain locks. */
+ if (!data->ocd_ibits_known &&
+ data->ocd_connect_flags & OBD_CONNECT_IBITS)
+ data->ocd_connect_flags &= ~OBD_CONNECT_IBITS;
+
+ if (!mdt->mdt_opts.mo_acl)
+ data->ocd_connect_flags &= ~OBD_CONNECT_ACL;
+
+ if (!mdt->mdt_opts.mo_user_xattr)
+ data->ocd_connect_flags &= ~OBD_CONNECT_XATTR;
+
+ if (!mdt->mdt_som_conf)
+ data->ocd_connect_flags &= ~OBD_CONNECT_SOM;
+
+ if (data->ocd_connect_flags & OBD_CONNECT_BRW_SIZE) {
+ data->ocd_brw_size = min(data->ocd_brw_size,
+ (__u32)MD_MAX_BRW_SIZE);
+ if (data->ocd_brw_size == 0) {
+ CERROR("%s: cli %s/%p ocd_connect_flags: "LPX64
+ " ocd_version: %x ocd_grant: %d "
+ "ocd_index: %u ocd_brw_size is "
+ "unexpectedly zero, network data "
+ "corruption? Refusing connection of this"
+ " client\n",
+ exp->exp_obd->obd_name,
+ exp->exp_client_uuid.uuid,
+ exp, data->ocd_connect_flags, data->ocd_version,
+ data->ocd_grant, data->ocd_index);
+ return -EPROTO;
+ }
+ }
- cfs_spin_lock(&exp->exp_lock);
- exp->exp_connect_flags = data->ocd_connect_flags;
- cfs_spin_unlock(&exp->exp_lock);
- data->ocd_version = LUSTRE_VERSION_CODE;
- exp->exp_mdt_data.med_ibits_known = data->ocd_ibits_known;
- }
+ /* NB: Disregard the rule against updating
+ * exp_connect_data.ocd_connect_flags in this case, since
+ * tgt_client_new() needs to know if this is a lightweight
+ * connection, and it is safe to expose this flag before
+ * connection processing completes. */
+ if (data->ocd_connect_flags & OBD_CONNECT_LIGHTWEIGHT) {
+ spin_lock(&exp->exp_lock);
+ *exp_connect_flags_ptr(exp) |= OBD_CONNECT_LIGHTWEIGHT;
+ spin_unlock(&exp->exp_lock);
+ }
-#if 0
- if (mdt->mdt_opts.mo_acl &&
- ((exp->exp_connect_flags & OBD_CONNECT_ACL) == 0)) {
- CWARN("%s: MDS requires ACL support but client does not\n",
- mdt->mdt_md_dev.md_lu_dev.ld_obd->obd_name);
- return -EBADE;
- }
-#endif
+ data->ocd_version = LUSTRE_VERSION_CODE;
+ exp->exp_connect_data = *data;
+ exp->exp_mdt_data.med_ibits_known = data->ocd_ibits_known;
- if ((exp->exp_connect_flags & OBD_CONNECT_FID) == 0) {
- CWARN("%s: MDS requires FID support, but client not\n",
- mdt->mdt_md_dev.md_lu_dev.ld_obd->obd_name);
- return -EBADE;
- }
+ if ((data->ocd_connect_flags & OBD_CONNECT_FID) == 0) {
+ CWARN("%s: MDS requires FID support, but client not\n",
+ mdt->mdt_md_dev.md_lu_dev.ld_obd->obd_name);
+ return -EBADE;
+ }
- if (mdt->mdt_som_conf && !exp_connect_som(exp) &&
- !(exp->exp_connect_flags & OBD_CONNECT_MDS_MDS)) {
- CWARN("%s: MDS has SOM enabled, but client does not support "
- "it\n", mdt->mdt_md_dev.md_lu_dev.ld_obd->obd_name);
- return -EBADE;
- }
+ if (mdt->mdt_som_conf &&
+ !(data->ocd_connect_flags & (OBD_CONNECT_LIGHTWEIGHT |
+ OBD_CONNECT_MDS_MDS |
+ OBD_CONNECT_SOM))) {
+ CWARN("%s: MDS has SOM enabled, but client does not support "
+ "it\n", mdt->mdt_md_dev.md_lu_dev.ld_obd->obd_name);
+ return -EBADE;
+ }
- return 0;
+ if (data->ocd_connect_flags & OBD_CONNECT_PINGLESS) {
+ if (suppress_pings) {
+ spin_lock(&exp->exp_obd->obd_dev_lock);
+ list_del_init(&exp->exp_obd_chain_timed);
+ spin_unlock(&exp->exp_obd->obd_dev_lock);
+ } else {
+ data->ocd_connect_flags &= ~OBD_CONNECT_PINGLESS;
+ }
+ }
+
+ return 0;
}
static int mdt_connect_check_sptlrpc(struct mdt_device *mdt,
- struct obd_export *exp,
- struct ptlrpc_request *req)
+ struct obd_export *exp,
+ struct ptlrpc_request *req)
{
- struct sptlrpc_flavor flvr;
- int rc = 0;
+ struct sptlrpc_flavor flvr;
+ int rc = 0;
- if (exp->exp_flvr.sf_rpc == SPTLRPC_FLVR_INVALID) {
- cfs_read_lock(&mdt->mdt_sptlrpc_lock);
- sptlrpc_target_choose_flavor(&mdt->mdt_sptlrpc_rset,
- req->rq_sp_from,
- req->rq_peer.nid,
- &flvr);
- cfs_read_unlock(&mdt->mdt_sptlrpc_lock);
+ if (exp->exp_flvr.sf_rpc == SPTLRPC_FLVR_INVALID) {
+ read_lock(&mdt->mdt_sptlrpc_lock);
+ sptlrpc_target_choose_flavor(&mdt->mdt_sptlrpc_rset,
+ req->rq_sp_from,
+ req->rq_peer.nid,
+ &flvr);
+ read_unlock(&mdt->mdt_sptlrpc_lock);
- cfs_spin_lock(&exp->exp_lock);
+ spin_lock(&exp->exp_lock);
exp->exp_sp_peer = req->rq_sp_from;
exp->exp_flvr = flvr;
rc = -EACCES;
}
- cfs_spin_unlock(&exp->exp_lock);
+ spin_unlock(&exp->exp_lock);
} else {
if (exp->exp_sp_peer != req->rq_sp_from) {
CERROR("RPC source %s doesn't match %s\n",
req = info->mti_pill->rc_req;
mdt = mdt_dev(obd->obd_lu_dev);
+ /*
+ * first, check whether the stack is ready to handle requests
+ * XXX: probably not very appropriate method is used now
+ * at some point we should find a better one
+ */
+ if (!test_bit(MDT_FL_SYNCED, &mdt->mdt_state) &&
+ !(data->ocd_connect_flags & OBD_CONNECT_LIGHTWEIGHT)) {
+ rc = obd_health_check(env, mdt->mdt_child_exp->exp_obd);
+ if (rc)
+ RETURN(-EAGAIN);
+ set_bit(MDT_FL_SYNCED, &mdt->mdt_state);
+ }
+
rc = class_connect(&conn, obd, cluuid);
if (rc)
RETURN(rc);
LASSERT(lcd);
info->mti_exp = lexp;
memcpy(lcd->lcd_uuid, cluuid, sizeof lcd->lcd_uuid);
- rc = lut_client_new(env, lexp);
+ rc = tgt_client_new(env, lexp);
if (rc == 0)
mdt_export_stats_init(obd, lexp, localdata);
}
int rc = 0;
ENTRY;
- cfs_spin_lock(&med->med_open_lock);
- while (!cfs_list_empty(&med->med_open_head)) {
- cfs_list_t *tmp = med->med_open_head.next;
- mfd = cfs_list_entry(tmp, struct mdt_file_data, mfd_list);
+ spin_lock(&med->med_open_lock);
+ while (!cfs_list_empty(&med->med_open_head)) {
+ cfs_list_t *tmp = med->med_open_head.next;
+ mfd = cfs_list_entry(tmp, struct mdt_file_data, mfd_list);
- /* Remove mfd handle so it can't be found again.
- * We are consuming the mfd_list reference here. */
- class_handle_unhash(&mfd->mfd_handle);
- cfs_list_move_tail(&mfd->mfd_list, &closing_list);
- }
- cfs_spin_unlock(&med->med_open_lock);
+ /* Remove mfd handle so it can't be found again.
+ * We are consuming the mfd_list reference here. */
+ class_handle_unhash(&mfd->mfd_handle);
+ cfs_list_move_tail(&mfd->mfd_list, &closing_list);
+ }
+ spin_unlock(&med->med_open_lock);
mdt = mdt_dev(obd->obd_lu_dev);
LASSERT(mdt != NULL);
if (!cfs_list_empty(&closing_list)) {
struct md_attr *ma = &info->mti_attr;
- int lmm_size;
- int cookie_size;
-
- lmm_size = mdt->mdt_max_mdsize;
- OBD_ALLOC_LARGE(ma->ma_lmm, lmm_size);
- if (ma->ma_lmm == NULL)
- GOTO(out_lmm, rc = -ENOMEM);
-
- cookie_size = mdt->mdt_max_cookiesize;
- OBD_ALLOC_LARGE(ma->ma_cookie, cookie_size);
- if (ma->ma_cookie == NULL)
- GOTO(out_cookie, rc = -ENOMEM);
/* Close any open files (which may also cause orphan unlinking). */
cfs_list_for_each_entry_safe(mfd, n, &closing_list, mfd_list) {
cfs_list_del_init(&mfd->mfd_list);
- memset(&ma->ma_attr, 0, sizeof(ma->ma_attr));
- ma->ma_lmm_size = lmm_size;
- ma->ma_cookie_size = cookie_size;
- ma->ma_need = 0;
- /* It is not for setattr, just tell MDD to send
- * DESTROY RPC to OSS if needed */
- ma->ma_valid = MA_FLAGS;
- ma->ma_attr_flags = MDS_CLOSE_CLEANUP;
- /* Don't unlink orphan on failover umount, LU-184 */
- if (exp->exp_flags & OBD_OPT_FAILOVER)
- ma->ma_attr_flags |= MDS_KEEP_ORPHAN;
+ ma->ma_need = ma->ma_valid = 0;
+ /* Don't unlink orphan on failover umount, LU-184 */
+ if (exp->exp_flags & OBD_OPT_FAILOVER) {
+ ma->ma_valid = MA_FLAGS;
+ ma->ma_attr_flags |= MDS_KEEP_ORPHAN;
+ }
mdt_mfd_close(info, mfd);
}
- OBD_FREE_LARGE(ma->ma_cookie, cookie_size);
- ma->ma_cookie = NULL;
-out_cookie:
- OBD_FREE_LARGE(ma->ma_lmm, lmm_size);
- ma->ma_lmm = NULL;
}
-out_lmm:
info->mti_mdt = NULL;
/* cleanup client slot early */
/* Do not erase record for recoverable client. */
if (!(exp->exp_flags & OBD_OPT_FAILOVER) || exp->exp_failed)
- lut_client_del(&env, exp);
+ tgt_client_del(&env, exp);
lu_env_fini(&env);
RETURN(rc);
ENTRY;
CFS_INIT_LIST_HEAD(&med->med_open_head);
- cfs_spin_lock_init(&med->med_open_lock);
- cfs_mutex_init(&med->med_idmap_mutex);
- med->med_idmap = NULL;
- cfs_spin_lock(&exp->exp_lock);
- exp->exp_connecting = 1;
- cfs_spin_unlock(&exp->exp_lock);
+ spin_lock_init(&med->med_open_lock);
+ mutex_init(&med->med_idmap_mutex);
+ med->med_idmap = NULL;
+ spin_lock(&exp->exp_lock);
+ exp->exp_connecting = 1;
+ spin_unlock(&exp->exp_lock);
/* self-export doesn't need client data and ldlm initialization */
if (unlikely(obd_uuid_equals(&exp->exp_obd->obd_uuid,
&exp->exp_client_uuid)))
RETURN(0);
- rc = lut_client_alloc(exp);
+ rc = tgt_client_alloc(exp);
if (rc)
GOTO(err, rc);
RETURN(rc);
err_free:
- lut_client_free(exp);
+ tgt_client_free(exp);
err:
CERROR("%s: Failed to initialize export: rc = %d\n",
exp->exp_obd->obd_name, rc);
RETURN(0);
ldlm_destroy_export(exp);
- lut_client_free(exp);
+ tgt_client_free(exp);
LASSERT(cfs_list_empty(&exp->exp_outstanding_replies));
LASSERT(cfs_list_empty(&exp->exp_mdt_data.med_open_head));
RETURN(0);
}
-static void mdt_allow_cli(struct mdt_device *m, unsigned int flag)
-{
- if (flag & CONFIG_LOG)
- cfs_set_bit(MDT_FL_CFGLOG, &m->mdt_state);
-
- /* also notify active event */
- if (flag & CONFIG_SYNC)
- cfs_set_bit(MDT_FL_SYNCED, &m->mdt_state);
-
- if (cfs_test_bit(MDT_FL_CFGLOG, &m->mdt_state) &&
- cfs_test_bit(MDT_FL_SYNCED, &m->mdt_state)) {
- struct obd_device *obd = m->mdt_md_dev.md_lu_dev.ld_obd;
-
- /* Open for clients */
- if (obd->obd_no_conn) {
- cfs_spin_lock(&obd->obd_dev_lock);
- obd->obd_no_conn = 0;
- cfs_spin_unlock(&obd->obd_dev_lock);
- }
- }
-}
-
-static int mdt_upcall(const struct lu_env *env, struct md_device *md,
- enum md_upcall_event ev, void *data)
-{
- struct mdt_device *m = mdt_dev(&md->md_lu_dev);
- struct md_device *next = m->mdt_child;
- struct mdt_thread_info *mti;
- int rc = 0;
- ENTRY;
-
- switch (ev) {
- case MD_LOV_SYNC:
- rc = next->md_ops->mdo_maxsize_get(env, next,
- &m->mdt_max_mdsize,
- &m->mdt_max_cookiesize);
- CDEBUG(D_INFO, "get max mdsize %d max cookiesize %d\n",
- m->mdt_max_mdsize, m->mdt_max_cookiesize);
- mdt_allow_cli(m, CONFIG_SYNC);
- if (data)
- (*(__u64 *)data) =
- m->mdt_lut.lut_obd->u.obt.obt_mount_count;
- break;
- case MD_NO_TRANS:
- mti = lu_context_key_get(&env->le_ctx, &mdt_thread_key);
- mti->mti_no_need_trans = 1;
- CDEBUG(D_INFO, "disable mdt trans for this thread\n");
- break;
- case MD_LOV_CONFIG:
- /* Check that MDT is not yet configured */
- LASSERT(!cfs_test_bit(MDT_FL_CFGLOG, &m->mdt_state));
- break;
-#ifdef HAVE_QUOTA_SUPPORT
- case MD_LOV_QUOTA:
- if (md->md_lu_dev.ld_obd->obd_recovering == 0 &&
- likely(md->md_lu_dev.ld_obd->obd_stopping == 0))
- next->md_ops->mdo_quota.mqo_recovery(env, next);
- break;
-#endif
- default:
- CERROR("invalid event\n");
- rc = -EINVAL;
- break;
- }
- RETURN(rc);
-}
-
-static int mdt_obd_notify(struct obd_device *obd,
- struct obd_device *watched,
- enum obd_notify_event ev, void *data)
-{
- struct mdt_device *mdt = mdt_dev(obd->obd_lu_dev);
-#ifdef HAVE_QUOTA_SUPPORT
- struct md_device *next = mdt->mdt_child;
-#endif
- ENTRY;
-
- switch (ev) {
- case OBD_NOTIFY_CONFIG:
- mdt_allow_cli(mdt, (unsigned long)data);
-
-#ifdef HAVE_QUOTA_SUPPORT
- /* quota_type has been processed, we can now handle
- * incoming quota requests */
- next->md_ops->mdo_quota.mqo_notify(NULL, next);
-#endif
- break;
- default:
- CDEBUG(D_INFO, "Unhandled notification %#x\n", ev);
- }
- RETURN(0);
-}
-
static int mdt_rpc_fid2path(struct mdt_thread_info *info, void *key,
void *val, int vallen)
{
}
static int mdt_fid2path(const struct lu_env *env, struct mdt_device *mdt,
- struct getinfo_fid2path *fp)
+ struct getinfo_fid2path *fp)
{
- struct mdt_object *obj;
- int rc;
- ENTRY;
+ struct mdt_object *obj;
+ struct obd_device *obd = mdt2obd_dev(mdt);
+ int rc;
+ ENTRY;
- CDEBUG(D_IOCTL, "path get "DFID" from "LPU64" #%d\n",
- PFID(&fp->gf_fid), fp->gf_recno, fp->gf_linkno);
+ CDEBUG(D_IOCTL, "path get "DFID" from "LPU64" #%d\n",
+ PFID(&fp->gf_fid), fp->gf_recno, fp->gf_linkno);
- if (!fid_is_sane(&fp->gf_fid))
- RETURN(-EINVAL);
+ if (!fid_is_sane(&fp->gf_fid))
+ RETURN(-EINVAL);
- obj = mdt_object_find(env, mdt, &fp->gf_fid);
- if (obj == NULL || IS_ERR(obj)) {
- CDEBUG(D_IOCTL, "no object "DFID": %ld\n", PFID(&fp->gf_fid),
- PTR_ERR(obj));
- RETURN(-EINVAL);
- }
+ if (!fid_is_client_mdt_visible(&fp->gf_fid)) {
+ CWARN("%s: "DFID" is invalid, sequence should be "
+ ">= "LPX64"\n", obd->obd_name,
+ PFID(&fp->gf_fid), (__u64)FID_SEQ_NORMAL);
+ RETURN(-EINVAL);
+ }
- rc = lu_object_exists(&obj->mot_obj.mo_lu);
- if (rc <= 0) {
- if (rc == -1)
- rc = -EREMOTE;
- else
- rc = -ENOENT;
- mdt_object_put(env, obj);
- CDEBUG(D_IOCTL, "nonlocal object "DFID": %d\n",
- PFID(&fp->gf_fid), rc);
- RETURN(rc);
- }
+ obj = mdt_object_find(env, mdt, &fp->gf_fid);
+ if (obj == NULL || IS_ERR(obj)) {
+ CDEBUG(D_IOCTL, "no object "DFID": %ld\n", PFID(&fp->gf_fid),
+ PTR_ERR(obj));
+ RETURN(-EINVAL);
+ }
- rc = mo_path(env, md_object_next(&obj->mot_obj), fp->gf_path,
- fp->gf_pathlen, &fp->gf_recno, &fp->gf_linkno);
- mdt_object_put(env, obj);
+ if (mdt_object_remote(obj))
+ rc = -EREMOTE;
+ else if (!mdt_object_exists(obj))
+ rc = -ENOENT;
- RETURN(rc);
+ if (rc < 0) {
+ mdt_object_put(env, obj);
+ CDEBUG(D_IOCTL, "nonlocal object "DFID": %d\n",
+ PFID(&fp->gf_fid), rc);
+ RETURN(rc);
+ }
+
+ rc = mo_path(env, md_object_next(&obj->mot_obj), fp->gf_path,
+ fp->gf_pathlen, &fp->gf_recno, &fp->gf_linkno);
+ mdt_object_put(env, obj);
+
+ RETURN(rc);
}
-static int mdt_get_info(struct mdt_thread_info *info)
+int mdt_get_info(struct mdt_thread_info *info)
{
struct ptlrpc_request *req = mdt_info_req(info);
char *key;
if (IS_ERR(obj))
RETURN(PTR_ERR(obj));
- rc = mdt_object_exists(obj);
- if (rc < 0) {
- rc = -EREMOTE;
- /**
- * before calling version get the correct MDS should be
- * fid, this is error to find remote object here
- */
- CERROR("nonlocal object "DFID"\n", PFID(fid));
- } else if (rc == 0) {
- *(__u64 *)data->ioc_inlbuf2 = ENOENT_VERSION;
- rc = -ENOENT;
- } else {
- version = dt_version_get(mti->mti_env, mdt_obj2dt(obj));
- *(__u64 *)data->ioc_inlbuf2 = version;
- rc = 0;
- }
- mdt_object_unlock_put(mti, obj, lh, 1);
- RETURN(rc);
+ if (mdt_object_remote(obj)) {
+ rc = -EREMOTE;
+ /**
+ * before calling version get the correct MDS should be
+ * fid, this is error to find remote object here
+ */
+ CERROR("nonlocal object "DFID"\n", PFID(fid));
+ } else if (!mdt_object_exists(obj)) {
+ *(__u64 *)data->ioc_inlbuf2 = ENOENT_VERSION;
+ rc = -ENOENT;
+ } else {
+ version = dt_version_get(mti->mti_env, mdt_obj2dt(obj));
+ *(__u64 *)data->ioc_inlbuf2 = version;
+ rc = 0;
+ }
+ mdt_object_unlock_put(mti, obj, lh, 1);
+ RETURN(rc);
}
/* ioctls on obd dev */
int mdt_postrecov(const struct lu_env *env, struct mdt_device *mdt)
{
struct lu_device *ld = md2lu_dev(mdt->mdt_child);
-#ifdef HAVE_QUOTA_SUPPORT
- struct obd_device *obd = mdt2obd_dev(mdt);
- struct md_device *next = mdt->mdt_child;
-#endif
int rc;
ENTRY;
rc = ld->ld_ops->ldo_recovery_complete(env, ld);
-#ifdef HAVE_QUOTA_SUPPORT
- if (likely(obd->obd_stopping == 0))
- next->md_ops->mdo_quota.mqo_recovery(env, next);
-#endif
RETURN(rc);
}
return rc;
}
-/**
- * Send a copytool req to a client
- * Note this sends a request RPC from a server (MDT) to a client (MDC),
- * backwards of normal comms.
- */
-int mdt_hsm_copytool_send(struct obd_export *exp)
-{
- struct kuc_hdr *lh;
- struct hsm_action_list *hal;
- struct hsm_action_item *hai;
- int rc, len;
- ENTRY;
-
- CWARN("%s: writing to mdc at %s\n", exp->exp_obd->obd_name,
- libcfs_nid2str(exp->exp_connection->c_peer.nid));
-
- len = sizeof(*lh) + sizeof(*hal) + MTI_NAME_MAXLEN +
- /* for mockup below */ 2 * cfs_size_round(sizeof(*hai));
- OBD_ALLOC(lh, len);
- if (lh == NULL)
- RETURN(-ENOMEM);
-
- lh->kuc_magic = KUC_MAGIC;
- lh->kuc_transport = KUC_TRANSPORT_HSM;
- lh->kuc_msgtype = HMT_ACTION_LIST;
- lh->kuc_msglen = len;
-
- hal = (struct hsm_action_list *)(lh + 1);
- hal->hal_version = HAL_VERSION;
- hal->hal_archive_num = 1;
- obd_uuid2fsname(hal->hal_fsname, exp->exp_obd->obd_name,
- MTI_NAME_MAXLEN);
-
- /* mock up an action list */
- hal->hal_count = 2;
- hai = hai_zero(hal);
- hai->hai_action = HSMA_ARCHIVE;
- hai->hai_fid.f_oid = 0xA00A;
- hai->hai_len = sizeof(*hai);
- hai = hai_next(hai);
- hai->hai_action = HSMA_RESTORE;
- hai->hai_fid.f_oid = 0xB00B;
- hai->hai_len = sizeof(*hai);
-
- /* Uses the ldlm reverse import; this rpc will be seen by
- the ldlm_callback_handler */
- rc = do_set_info_async(exp->exp_imp_reverse,
- LDLM_SET_INFO, LUSTRE_OBD_VERSION,
- sizeof(KEY_HSM_COPYTOOL_SEND),
- KEY_HSM_COPYTOOL_SEND,
- len, lh, NULL);
-
- OBD_FREE(lh, len);
-
- RETURN(rc);
-}
-
static struct obd_ops mdt_obd_device_ops = {
.o_owner = THIS_MODULE,
.o_set_info_async = mdt_obd_set_info_async,
.o_destroy_export = mdt_destroy_export,
.o_iocontrol = mdt_iocontrol,
.o_postrecov = mdt_obd_postrecov,
- .o_notify = mdt_obd_notify
};
static struct lu_device* mdt_device_fini(const struct lu_env *env,
l = ERR_PTR(rc);
return l;
}
- md_upcall_init(&m->mdt_md_dev, mdt_upcall);
} else
l = ERR_PTR(-ENOMEM);
return l;
/* context key: mdt_thread_key */
LU_CONTEXT_KEY_DEFINE(mdt, LCT_MD_THREAD);
-struct md_ucred *mdt_ucred(const struct mdt_thread_info *info)
+struct lu_ucred *mdt_ucred(const struct mdt_thread_info *info)
+{
+ return lu_ucred(info->mti_env);
+}
+
+struct lu_ucred *mdt_ucred_check(const struct mdt_thread_info *info)
{
- return md_ucred(info->mti_env);
+ return lu_ucred_check(info->mti_env);
}
/**
return mdt->mdt_opts.mo_cos != 0;
}
-/* type constructor/destructor: mdt_type_init, mdt_type_fini */
-LU_TYPE_INIT_FINI(mdt, &mdt_thread_key);
-
static struct lu_device_type_operations mdt_device_type_ops = {
- .ldto_init = mdt_type_init,
- .ldto_fini = mdt_type_fini,
-
- .ldto_start = mdt_type_start,
- .ldto_stop = mdt_type_stop,
-
.ldto_device_alloc = mdt_device_alloc,
.ldto_device_free = mdt_device_free,
.ldto_device_fini = mdt_device_fini
static int __init mdt_mod_init(void)
{
- struct lprocfs_static_vars lvars;
- int rc;
+ struct lprocfs_static_vars lvars;
+ int rc;
rc = lu_kmem_init(mdt_caches);
if (rc)
return rc;
- if (mdt_num_threads != 0 && mds_num_threads == 0) {
- LCONSOLE_INFO("mdt_num_threads module parameter is deprecated,"
- "use mds_num_threads instead or unset both for"
- "dynamic thread startup\n");
- mds_num_threads = mdt_num_threads;
- }
-
- lprocfs_mdt_init_vars(&lvars);
- rc = class_register_type(&mdt_obd_device_ops, NULL,
- lvars.module_vars, LUSTRE_MDT_NAME,
- &mdt_device_type);
+ rc = mds_mod_init();
+ if (rc)
+ GOTO(lu_fini, rc);
+ lprocfs_mdt_init_vars(&lvars);
+ rc = class_register_type(&mdt_obd_device_ops, NULL,
+ lvars.module_vars, LUSTRE_MDT_NAME,
+ &mdt_device_type);
+ if (rc)
+ GOTO(mds_fini, rc);
+lu_fini:
if (rc)
lu_kmem_fini(mdt_caches);
- return rc;
+mds_fini:
+ if (rc)
+ mds_mod_exit();
+ return rc;
}
static void __exit mdt_mod_exit(void)
{
- class_unregister_type(LUSTRE_MDT_NAME);
+ class_unregister_type(LUSTRE_MDT_NAME);
+ mds_mod_exit();
lu_kmem_fini(mdt_caches);
}
-
-#define DEF_HNDL(prefix, base, suffix, flags, opc, fn, fmt) \
-[prefix ## _ ## opc - prefix ## _ ## base] = { \
- .mh_name = #opc, \
- .mh_fail_id = OBD_FAIL_ ## prefix ## _ ## opc ## suffix, \
- .mh_opc = prefix ## _ ## opc, \
- .mh_flags = flags, \
- .mh_act = fn, \
- .mh_fmt = fmt \
-}
-
-#define DEF_MDT_HNDL(flags, name, fn, fmt) \
- DEF_HNDL(MDS, GETATTR, _NET, flags, name, fn, fmt)
-
-#define DEF_SEQ_HNDL(flags, name, fn, fmt) \
- DEF_HNDL(SEQ, QUERY, _NET, flags, name, fn, fmt)
-
-#define DEF_FLD_HNDL(flags, name, fn, fmt) \
- DEF_HNDL(FLD, QUERY, _NET, flags, name, fn, fmt)
-/*
- * Request with a format known in advance
- */
-#define DEF_MDT_HNDL_F(flags, name, fn) \
- DEF_HNDL(MDS, GETATTR, _NET, flags, name, fn, &RQF_MDS_ ## name)
-
-#define DEF_SEQ_HNDL_F(flags, name, fn) \
- DEF_HNDL(SEQ, QUERY, _NET, flags, name, fn, &RQF_SEQ_ ## name)
-
-#define DEF_FLD_HNDL_F(flags, name, fn) \
- DEF_HNDL(FLD, QUERY, _NET, flags, name, fn, &RQF_FLD_ ## name)
-/*
- * Request with a format we do not yet know
- */
-#define DEF_MDT_HNDL_0(flags, name, fn) \
- DEF_HNDL(MDS, GETATTR, _NET, flags, name, fn, NULL)
-
-static struct mdt_handler mdt_mds_ops[] = {
-DEF_MDT_HNDL_F(0, CONNECT, mdt_connect),
-DEF_MDT_HNDL_F(0, DISCONNECT, mdt_disconnect),
-DEF_MDT_HNDL (0, SET_INFO, mdt_set_info,
- &RQF_OBD_SET_INFO),
-DEF_MDT_HNDL_F(0, GET_INFO, mdt_get_info),
-DEF_MDT_HNDL_F(0 |HABEO_REFERO, GETSTATUS, mdt_getstatus),
-DEF_MDT_HNDL_F(HABEO_CORPUS, GETATTR, mdt_getattr),
-DEF_MDT_HNDL_F(HABEO_CORPUS|HABEO_REFERO, GETATTR_NAME, mdt_getattr_name),
-DEF_MDT_HNDL_F(HABEO_CORPUS, GETXATTR, mdt_getxattr),
-DEF_MDT_HNDL_F(0 |HABEO_REFERO, STATFS, mdt_statfs),
-DEF_MDT_HNDL_F(0 |MUTABOR, REINT, mdt_reint),
-DEF_MDT_HNDL_F(HABEO_CORPUS, CLOSE, mdt_close),
-DEF_MDT_HNDL_F(HABEO_CORPUS, DONE_WRITING, mdt_done_writing),
-DEF_MDT_HNDL_F(0 |HABEO_REFERO, PIN, mdt_pin),
-DEF_MDT_HNDL_0(0, SYNC, mdt_sync),
-DEF_MDT_HNDL_F(HABEO_CORPUS|HABEO_REFERO, IS_SUBDIR, mdt_is_subdir),
-#ifdef HAVE_QUOTA_SUPPORT
-DEF_MDT_HNDL_F(0, QUOTACHECK, mdt_quotacheck_handle),
-DEF_MDT_HNDL_F(0, QUOTACTL, mdt_quotactl_handle)
-#endif
-};
-
-#define DEF_OBD_HNDL(flags, name, fn) \
- DEF_HNDL(OBD, PING, _NET, flags, name, fn, NULL)
-
-
-static struct mdt_handler mdt_obd_ops[] = {
- DEF_OBD_HNDL(0, PING, mdt_obd_ping),
- DEF_OBD_HNDL(0, LOG_CANCEL, mdt_obd_log_cancel),
- DEF_OBD_HNDL(0, QC_CALLBACK, mdt_obd_qc_callback),
- DEF_OBD_HNDL(0, IDX_READ, mdt_obd_idx_read)
-};
-
-#define DEF_DLM_HNDL_0(flags, name, fn) \
- DEF_HNDL(LDLM, ENQUEUE, , flags, name, fn, NULL)
-#define DEF_DLM_HNDL_F(flags, name, fn) \
- DEF_HNDL(LDLM, ENQUEUE, , flags, name, fn, &RQF_LDLM_ ## name)
-
-static struct mdt_handler mdt_dlm_ops[] = {
- DEF_DLM_HNDL_F(HABEO_CLAVIS, ENQUEUE, mdt_enqueue),
- DEF_DLM_HNDL_0(HABEO_CLAVIS, CONVERT, mdt_convert),
- DEF_DLM_HNDL_0(0, BL_CALLBACK, mdt_bl_callback),
- DEF_DLM_HNDL_0(0, CP_CALLBACK, mdt_cp_callback)
-};
-
-#define DEF_LLOG_HNDL(flags, name, fn) \
- DEF_HNDL(LLOG, ORIGIN_HANDLE_CREATE, _NET, flags, name, fn, NULL)
-
-static struct mdt_handler mdt_llog_ops[] = {
- DEF_LLOG_HNDL(0, ORIGIN_HANDLE_CREATE, mdt_llog_create),
- DEF_LLOG_HNDL(0, ORIGIN_HANDLE_NEXT_BLOCK, mdt_llog_next_block),
- DEF_LLOG_HNDL(0, ORIGIN_HANDLE_READ_HEADER, mdt_llog_read_header),
- DEF_LLOG_HNDL(0, ORIGIN_HANDLE_WRITE_REC, NULL),
- DEF_LLOG_HNDL(0, ORIGIN_HANDLE_CLOSE, NULL),
- DEF_LLOG_HNDL(0, ORIGIN_CONNECT, NULL),
- DEF_LLOG_HNDL(0, CATINFO, NULL),
- DEF_LLOG_HNDL(0, ORIGIN_HANDLE_PREV_BLOCK, mdt_llog_prev_block),
- DEF_LLOG_HNDL(0, ORIGIN_HANDLE_DESTROY, mdt_llog_destroy),
-};
-
-#define DEF_SEC_CTX_HNDL(name, fn) \
- DEF_HNDL(SEC_CTX, INIT, _NET, 0, name, fn, NULL)
-
-static struct mdt_handler mdt_sec_ctx_ops[] = {
- DEF_SEC_CTX_HNDL(INIT, mdt_sec_ctx_handle),
- DEF_SEC_CTX_HNDL(INIT_CONT, mdt_sec_ctx_handle),
- DEF_SEC_CTX_HNDL(FINI, mdt_sec_ctx_handle)
-};
-
-static struct mdt_opc_slice mdt_regular_handlers[] = {
- {
- .mos_opc_start = MDS_GETATTR,
- .mos_opc_end = MDS_LAST_OPC,
- .mos_hs = mdt_mds_ops
- },
- {
- .mos_opc_start = OBD_PING,
- .mos_opc_end = OBD_LAST_OPC,
- .mos_hs = mdt_obd_ops
- },
- {
- .mos_opc_start = LDLM_ENQUEUE,
- .mos_opc_end = LDLM_LAST_OPC,
- .mos_hs = mdt_dlm_ops
- },
- {
- .mos_opc_start = LLOG_ORIGIN_HANDLE_CREATE,
- .mos_opc_end = LLOG_LAST_OPC,
- .mos_hs = mdt_llog_ops
- },
- {
- .mos_opc_start = SEC_CTX_INIT,
- .mos_opc_end = SEC_LAST_OPC,
- .mos_hs = mdt_sec_ctx_ops
- },
- {
- .mos_hs = NULL
- }
-};
-
-static struct mdt_handler mdt_readpage_ops[] = {
- DEF_MDT_HNDL_F(0, CONNECT, mdt_connect),
- DEF_MDT_HNDL_F(HABEO_CORPUS|HABEO_REFERO, READPAGE, mdt_readpage),
-#ifdef HAVE_SPLIT_SUPPORT
- DEF_MDT_HNDL_F(HABEO_CORPUS|HABEO_REFERO, WRITEPAGE, mdt_writepage),
-#endif
-
- /*
- * XXX: this is ugly and should be fixed one day, see mdc_close() for
- * detailed comments. --umka
- */
- DEF_MDT_HNDL_F(HABEO_CORPUS, CLOSE, mdt_close),
- DEF_MDT_HNDL_F(HABEO_CORPUS, DONE_WRITING, mdt_done_writing),
-};
-
-static struct mdt_opc_slice mdt_readpage_handlers[] = {
- {
- .mos_opc_start = MDS_GETATTR,
- .mos_opc_end = MDS_LAST_OPC,
- .mos_hs = mdt_readpage_ops
- },
- {
- .mos_opc_start = OBD_FIRST_OPC,
- .mos_opc_end = OBD_LAST_OPC,
- .mos_hs = mdt_obd_ops
- },
- {
- .mos_hs = NULL
- }
-};
-
-static struct mdt_handler mdt_xmds_ops[] = {
- DEF_MDT_HNDL_F(0, CONNECT, mdt_connect),
- DEF_MDT_HNDL_F(HABEO_CORPUS , GETATTR, mdt_getattr),
- DEF_MDT_HNDL_F(0 | MUTABOR , REINT, mdt_reint),
- DEF_MDT_HNDL_F(HABEO_CORPUS|HABEO_REFERO, IS_SUBDIR, mdt_is_subdir),
-};
-
-static struct mdt_opc_slice mdt_xmds_handlers[] = {
- {
- .mos_opc_start = MDS_GETATTR,
- .mos_opc_end = MDS_LAST_OPC,
- .mos_hs = mdt_xmds_ops
- },
- {
- .mos_opc_start = OBD_PING,
- .mos_opc_end = OBD_LAST_OPC,
- .mos_hs = mdt_obd_ops
- },
- {
- .mos_opc_start = SEC_CTX_INIT,
- .mos_opc_end = SEC_LAST_OPC,
- .mos_hs = mdt_sec_ctx_ops
- },
- {
- .mos_hs = NULL
- }
-};
-
-static struct mdt_handler mdt_seq_ops[] = {
- DEF_SEQ_HNDL_F(0, QUERY, (int (*)(struct mdt_thread_info *))seq_query)
-};
-
-static struct mdt_opc_slice mdt_seq_handlers[] = {
- {
- .mos_opc_start = SEQ_QUERY,
- .mos_opc_end = SEQ_LAST_OPC,
- .mos_hs = mdt_seq_ops
- },
- {
- .mos_hs = NULL
- }
-};
-
-static struct mdt_handler mdt_fld_ops[] = {
- DEF_FLD_HNDL_F(0, QUERY, (int (*)(struct mdt_thread_info *))fld_query)
-};
-
-static struct mdt_opc_slice mdt_fld_handlers[] = {
- {
- .mos_opc_start = FLD_QUERY,
- .mos_opc_end = FLD_LAST_OPC,
- .mos_hs = mdt_fld_ops
- },
- {
- .mos_hs = NULL
- }
-};
-
MODULE_AUTHOR("Sun Microsystems, Inc. <http://www.lustre.org/>");
-MODULE_DESCRIPTION("Lustre Meta-data Target ("LUSTRE_MDT_NAME")");
+MODULE_DESCRIPTION("Lustre Metadata Target ("LUSTRE_MDT_NAME")");
MODULE_LICENSE("GPL");
-cfs_module(mdt, "0.2.0", mdt_mod_init, mdt_mod_exit);
+cfs_module(mdt, LUSTRE_VERSION_STRING, mdt_mod_init, mdt_mod_exit);