#include <obd_class.h>
#include <lustre_fid.h>
#include <lustre_mds.h>
+#include <lustre_idl.h>
#include "cmm_internal.h"
#include "mdc_internal.h"
#define CMM_NO_SPLITTABLE 2
enum {
- SPLIT_SIZE = 8*1024
+ SPLIT_SIZE = 12*1024
};
static inline struct lu_fid* cmm2_fid(struct cmm_object *obj)
if (rc)
GOTO(cleanup, rc);
+ rc = CMM_EXPECT_SPLIT;
+
if (lu_fid_eq(fid, cmm2_fid(md2cmm_obj(mo))))
GOTO(cleanup, rc = CMM_NO_SPLIT_EXPECTED);
}
#define cmm_md_size(stripes) \
- (sizeof(struct lmv_stripe_md) + stripes * sizeof(struct lu_fid))
+ (sizeof(struct lmv_stripe_md) + (stripes) * sizeof(struct lu_fid))
static int cmm_alloc_fid(const struct lu_context *ctx, struct cmm_device *cmm,
struct lu_fid *fid, int count)
mc_linkage) {
LASSERT(cmm->cmm_local_num != mc->mc_num);
- rc = obd_fid_alloc(mc->mc_desc.cl_exp, &fid[i++], NULL);
+ rc = obd_fid_alloc(mc->mc_desc.cl_exp, &fid[i], NULL);
if (rc > 0) {
struct lu_site *ls;
spin_unlock(&cmm->cmm_tgt_guard);
RETURN(rc);
}
+ i++;
}
spin_unlock(&cmm->cmm_tgt_guard);
LASSERT(i == count);
if (!lmv)
RETURN(-ENOMEM);
- lmv->mea_master = -1;
- lmv->mea_magic = MEA_MAGIC_ALL_CHARS;
- lmv->mea_count = cmm->cmm_tgt_count;
+ lmv->mea_master = cmm->cmm_local_num;
+ lmv->mea_magic = MEA_MAGIC_HASH_SEGMENT;
+ lmv->mea_count = cmm->cmm_tgt_count + 1;
lmv->mea_ids[0] = *lf;
if (rc)
GOTO(cleanup, rc);
- for (i = 1; i < cmm->cmm_tgt_count; i ++) {
+ for (i = 1; i < cmm->cmm_tgt_count + 1; i ++) {
rc = cmm_creat_remote_obj(ctx, cmm, &lmv->mea_ids[i], ma);
if (rc)
GOTO(cleanup, rc);
}
- rc = mo_xattr_set(ctx, md_object_next(mo), lmv, lmv_size,
- MDS_LMV_MD_NAME, 0);
-
ma->ma_lmv_size = lmv_size;
ma->ma_lmv = lmv;
cleanup:
RETURN(rc);
}
#endif
-#define MAX_HASH_SIZE 0x3fffffff
#define SPLIT_PAGE_COUNT 1
static int cmm_scan_and_split(const struct lu_context *ctx,
struct md_object *mo, struct md_attr *ma)
GOTO(cleanup, rc = -ENOMEM);
}
- hash_segement = MAX_HASH_SIZE / cmm->cmm_tgt_count;
- for (i = 1; i < cmm->cmm_tgt_count; i++) {
+ hash_segement = MAX_HASH_SIZE / (cmm->cmm_tgt_count + 1);
+ for (i = 1; i < cmm->cmm_tgt_count + 1; i++) {
struct lu_fid *lf = &ma->ma_lmv->mea_ids[i];
__u32 hash_end;
if (ma == NULL)
RETURN(-ENOMEM);
- ma->ma_need = MA_INODE;
+ ma->ma_need = MA_INODE|MA_LMV;
rc = mo_attr_get(ctx, mo, ma);
if (rc)
GOTO(cleanup, ma);
/* step3: scan and split the object */
rc = cmm_scan_and_split(ctx, mo, ma);
+ if (rc)
+ GOTO(cleanup, ma);
+
+ /* step4: set mea to the master object */
+ rc = mo_xattr_set(ctx, md_object_next(mo), ma->ma_lmv, ma->ma_lmv_size,
+ MDS_LMV_MD_NAME, 0);
+ if (rc == -ERESTART)
+ CWARN("Dir"DFID" has been split \n",
+ PFID(lu_object_fid(&mo->mo_lu)));
cleanup:
if (ma->ma_lmv_size && ma->ma_lmv)
OBD_FREE(ma->ma_lmv, ma->ma_lmv_size);
#include <obd_class.h>
#include <lprocfs_status.h>
#include <lustre_ver.h>
+#include "cmm_internal.h"
#include "mdc_internal.h"
static struct lu_device_operations mdc_lu_ops;
const struct lu_object_header *,
struct lu_device *);
#ifdef HAVE_SPLIT_SUPPORT
-int mdc_send_page(const struct lu_context *ctx, struct md_object *mo,
- struct page *page, __u32 end);
+int mdc_send_page(struct cmm_device *cmm, const struct lu_context *ctx,
+ struct md_object *mo, struct page *page, __u32 end);
#endif
#endif /* __KERNEL__ */
#include <lustre_lib.h>
#include <obd_class.h>
#include <lustre_mdc.h>
+#include "cmm_internal.h"
#include "mdc_internal.h"
static struct md_object_operations mdc_mo_ops;
kmap(page);
dp = page_address(page);
+
+ ent = lu_dirent_start(dp);
+ if (ent->lde_hash > end)
+ RETURN(-E2BIG);
+
for (ent = lu_dirent_start(dp); ent != NULL;
ent = lu_dirent_next(ent)) {
- if (ent->lde_hash < end) {
+ if (ent->lde_hash > end) {
offset = (int)((__u32)ent - (__u32)dp);
rc1 = -E2BIG;
goto send_page;
}
-
+
/* allocate new fid for each obj */
rc = obd_fid_alloc(mc->mc_desc.cl_exp, &ent->lde_fid, NULL);
if (rc > 0) {
fid_seq(&ent->lde_fid),
mc->mc_num, ctx);
}
-
+
if (rc < 0) {
kunmap(page);
RETURN(rc);
#define MEA_MAGIC_LAST_CHAR 0xb2221ca1
#define MEA_MAGIC_ALL_CHARS 0xb222a11c
+#define MEA_MAGIC_HASH_SEGMENT 0xb222a11b
struct lmv_stripe_md {
__u32 mea_magic;
extern const struct req_format RQF_MDS_CONNECT;
extern const struct req_format RQF_MDS_DISCONNECT;
extern const struct req_format RQF_MDS_READPAGE;
+extern const struct req_format RQF_MDS_WRITEPAGE;
extern const struct req_format RQF_MDS_DONE_WRITING;
/*
int (*m_init_ea_size)(struct obd_export *, int, int, int);
int (*m_get_lustre_md)(struct obd_export *, struct ptlrpc_request *,
- int, struct obd_export *, struct lustre_md *);
+ int, struct obd_export *, struct obd_export *,
+ struct lustre_md *);
int (*m_free_lustre_md)(struct obd_export *, struct lustre_md *);
static inline int md_get_lustre_md(struct obd_export *exp,
struct ptlrpc_request *req,
int offset, struct obd_export *dt_exp,
+ struct obd_export *md_exp,
struct lustre_md *md)
{
ENTRY;
EXP_CHECK_MD_OP(exp, get_lustre_md);
MD_COUNTER_INCREMENT(exp->exp_obd, get_lustre_md);
RETURN(MDP(exp->exp_obd, get_lustre_md)(exp, req, offset,
- dt_exp, md));
+ dt_exp, md_exp, md));
}
static inline int md_free_lustre_md(struct obd_export *exp,
RETURN(-ENOENT);
rc = md_get_lustre_md(llu_i2sbi(inode)->ll_md_exp, req,
- offset, llu_i2sbi(inode)->ll_dt_exp, &md);
+ offset, llu_i2sbi(inode)->ll_dt_exp,
+ llu_i2sbi(inode)->ll_md_exp, &md);
if (rc)
RETURN(rc);
ptlrpc_req_finished(request);
rc = md_get_lustre_md(sbi->ll_md_exp, request, offset,
- sbi->ll_dt_exp, &md);
+ sbi->ll_dt_exp, sbi->ll_md_exp, &md);
if (rc)
RETURN(rc);
RETURN(-abs(rc));
}
rc = md_get_lustre_md(sbi->ll_md_exp, req, REPLY_REC_OFF,
- sbi->ll_dt_exp, &md);
+ sbi->ll_dt_exp, sbi->ll_md_exp, &md);
/* XXX Too paranoid? */
if (((md.body->valid ^ valid) & OBD_MD_FLEASIZE) &&
}
rc = md_get_lustre_md(sbi->ll_md_exp, request, REPLY_REC_OFF,
- sbi->ll_dt_exp, &md);
+ sbi->ll_dt_exp, sbi->ll_md_exp, &md);
if (rc) {
ptlrpc_req_finished(request);
RETURN(rc);
}
rc = md_get_lustre_md(sbi->ll_md_exp, req,
- 1, sbi->ll_dt_exp, &md);
+ 1, sbi->ll_dt_exp, sbi->ll_md_exp, &md);
if (rc)
GOTO(out, rc);
}
err = md_get_lustre_md(sbi->ll_md_exp, request, REPLY_REC_OFF,
- sbi->ll_dt_exp, &md);
+ sbi->ll_dt_exp, sbi->ll_md_exp, &md);
if (err) {
CERROR("failed to understand root inode md: rc = %d\n",err);
GOTO(out_request, err);
}
err = md_get_lustre_md(sbi->ll_md_exp, request,
- REPLY_REC_OFF, sbi->ll_dt_exp, &md);
+ REPLY_REC_OFF, sbi->ll_dt_exp, sbi->ll_md_exp,
+ &md);
if (err) {
CERROR("failed to understand root inode md: rc = %d\n", err);
ptlrpc_req_finished (request);
}
rc = md_get_lustre_md(sbi->ll_md_exp, request,
- REPLY_REC_OFF, sbi->ll_dt_exp, &md);
+ REPLY_REC_OFF, sbi->ll_dt_exp,
+ sbi->ll_md_exp, &md);
if (rc) {
ptlrpc_req_finished(request);
RETURN(rc);
prune_deathrow(sbi, 1);
rc = md_get_lustre_md(sbi->ll_md_exp, req, offset,
- sbi->ll_dt_exp, &md);
+ sbi->ll_dt_exp, sbi->ll_md_exp, &md);
if (rc)
RETURN(rc);
#endif
#include <lustre/lustre_idl.h>
+#include <lustre_idl.h>
#include <obd_support.h>
#include <lustre_lib.h>
#include <lustre_net.h>
mds = raw_name2idx(obj->lo_hashtype, obj->lo_objcount,
(char *)op_data->name, op_data->namelen);
- CDEBUG(D_OTHER, "forward to MDS #"LPU64" ("DFID")\n",
- mds, PFID(&rpid));
rpid = obj->lo_inodes[mds].li_fid;
+ rc = lmv_fld_lookup(lmv, &rpid, &mds);
lmv_obj_put(obj);
+ if (rc)
+ GOTO(out_free_sop_data, rc);
+
+ CDEBUG(D_OTHER, "forward to MDS #"LPU64" ("DFID")\n",
+ mds, PFID(&rpid));
}
sop_data->fid1 = rpid;
LASSERT(body);
update:
- obj->lo_inodes[i].li_size = body->size;
+ obj->lo_inodes[i].li_size = (MAX_HASH_SIZE/obj->lo_objcount) *
+ (i + 1);
CDEBUG(D_OTHER, "fresh: %lu\n",
(unsigned long)obj->lo_inodes[i].li_size);
#include <linux/init.h>
#include <linux/slab.h>
#include <linux/pagemap.h>
+#include <linux/mm.h>
#include <asm/div64.h>
#include <linux/seq_file.h>
#include <linux/namei.h>
#include <linux/ext2_fs.h>
#include <lustre/lustre_idl.h>
+#include <lustre_idl.h>
#include <lustre_log.h>
#include <obd_support.h>
#include <lustre_lib.h>
#endif
} else {
- /* default policy is to use parent MDS */
+ struct lmv_obj *obj;
LASSERT(fid_is_sane(hint->ph_pfid));
- rc = lmv_fld_lookup(lmv, hint->ph_pfid, mds);
+
+ obj = lmv_obj_grab(obd, hint->ph_pfid);
+ if (obj) {
+ /* If the dir got split, alloc fid according
+ * to its hash
+ */
+ struct lu_fid *rpid;
+
+ *mds = raw_name2idx(obj->lo_hashtype,
+ obj->lo_objcount,
+ hint->ph_cname->name,
+ hint->ph_cname->len);
+ rpid = &obj->lo_inodes[*mds].li_fid;
+ rc = lmv_fld_lookup(lmv, rpid, mds);
+ if (rc) {
+ lmv_obj_put(obj);
+ GOTO(exit, rc);
+ }
+ CDEBUG(D_INODE, "the obj "DFID" has been"
+ "splitted,got MDS at "LPU64" by name %s\n",
+ PFID(hint->ph_pfid), *mds,
+ hint->ph_cname->name);
+
+ rc = 0;
+ } else {
+ /* default policy is to use parent MDS */
+ rc = lmv_fld_lookup(lmv, hint->ph_pfid, mds);
+ }
+
}
} else {
/* sequences among all tgts are not well balanced, allocate new
*mds = 0;
rc = -EINVAL;
}
-
+exit:
if (rc) {
CERROR("cannot choose MDS, err = %d\n", rc);
} else {
GOTO(cleanup, rc);
}
- rc = md_get_lustre_md(tgt_exp, req, 0, NULL, &md);
+ rc = md_get_lustre_md(tgt_exp, req, 1, NULL, exp, &md);
if (rc) {
CERROR("mdc_get_lustre_md() failed, error %d\n", rc);
GOTO(cleanup, rc);
struct obd_export *tgt_exp;
struct lu_fid rid = *fid;
struct lmv_obj *obj;
- int i, rc;
+ int i = 0, rc;
ENTRY;
rc = lmv_check_connect(obd);
rid = obj->lo_inodes[i].li_fid;
lmv_obj_unlock(obj);
- lmv_obj_put(obj);
CDEBUG(D_OTHER, "forward to "DFID" with offset %lu\n",
PFID(&rid), (unsigned long)offset);
}
-
+
tgt_exp = lmv_get_export(lmv, &rid);
if (IS_ERR(tgt_exp))
- RETURN(PTR_ERR(tgt_exp));
+ GOTO(cleanup, PTR_ERR(tgt_exp));
rc = md_readpage(tgt_exp, &rid, offset, page, request);
-
+ if (rc)
+ GOTO(cleanup, rc);
+#ifdef __KERNEL__
+ if (obj && i < obj->lo_objcount - 1) {
+ struct lu_dirpage *dp;
+ __u32 end;
+ /* This dirobj has been splitted, so we
+ * check whether reach the end of one hash_segment
+ * and reset ldp->ldp_hash_end
+ */
+ kmap(page);
+ dp = page_address(page);
+ end = le32_to_cpu(dp->ldp_hash_end);
+ if (end == ~0ul) {
+ __u32 hash_segment_end = (i + 1) *
+ MAX_HASH_SIZE/obj->lo_objcount;
+ dp->ldp_hash_end = cpu_to_le32(hash_segment_end);
+ CDEBUG(D_INFO,"reset hash end %x for split obj "DFID"",
+ le32_to_cpu(dp->ldp_hash_end), PFID(&rid));
+ }
+ kunmap(page);
+
+ }
+#endif
/*
* Here we could remove "." and ".." from all pages which at not from
* master. But MDS has only "." and ".." for master dir.
*/
+cleanup:
+ if (obj)
+ lmv_obj_put(obj);
RETURN(rc);
}
RETURN(mea_size);
if (mea->mea_magic == MEA_MAGIC_LAST_CHAR ||
- mea->mea_magic == MEA_MAGIC_ALL_CHARS)
+ mea->mea_magic == MEA_MAGIC_ALL_CHARS ||
+ mea->mea_magic == MEA_MAGIC_HASH_SEGMENT)
{
magic = le32_to_cpu(mea->mea_magic);
} else {
}
int lmv_get_lustre_md(struct obd_export *exp, struct ptlrpc_request *req,
- int offset, struct obd_export *dt_exp,
- struct lustre_md *md)
+ int offset, struct obd_export *dt_exp,
+ struct obd_export *md_exp, struct lustre_md *md)
{
struct obd_device *obd = exp->exp_obd;
struct lmv_obd *lmv = &obd->u.lmv;
int rc;
ENTRY;
- rc = md_get_lustre_md(lmv->tgts[0].ltd_exp, req, offset, dt_exp, md);
+ rc = md_get_lustre_md(lmv->tgts[0].ltd_exp, req, offset, dt_exp, md_exp,
+ md);
RETURN(rc);
}
#include <lustre_lib.h>
#include <lustre_net.h>
#include <lustre_dlm.h>
+#include <lustre_idl.h>
#include <obd_class.h>
#include <lprocfs_status.h>
#include "lmv_internal.h"
struct lmv_obd *lmv = &obd->u.lmv;
LASSERT(mea->mea_magic == MEA_MAGIC_LAST_CHAR
- || mea->mea_magic == MEA_MAGIC_ALL_CHARS);
+ || mea->mea_magic == MEA_MAGIC_ALL_CHARS
+ || mea->mea_magic == MEA_MAGIC_HASH_SEGMENT);
OBD_SLAB_ALLOC(obj, obj_cache, CFS_ALLOC_STD,
sizeof(*obj));
GOTO(cleanup, obj = ERR_PTR(rc));
}
- rc = md_get_lustre_md(exp, req, 0, NULL, &md);
+ rc = md_get_lustre_md(exp, req, 0, NULL, exp, &md);
if (rc) {
CERROR("mdc_get_lustre_md() failed, error %d\n", rc);
GOTO(cleanup, obj = ERR_PTR(rc));
struct obd_client_handle;
int mdc_get_lustre_md(struct obd_export *md_exp, struct ptlrpc_request *req,
- int offset, struct obd_export *dt_exp, struct lustre_md *md);
+ int offset, struct obd_export *dt_exp,
+ struct obd_export *lmv_exp,
+ struct lustre_md *md);
int mdc_free_lustre_md(struct obd_export *exp, struct lustre_md *md);
!it_open_error(DISP_OPEN_OPEN, it))
mdc_set_open_replay_data(NULL, NULL, req);
- if ((body->valid & OBD_MD_FLEASIZE) != 0) {
+ if ((body->valid & OBD_MD_FLDIREA) != 0) {
+ if (body->eadatasize) {
+ eadata = lustre_swab_repbuf(req,
+ DLM_REPLY_REC_OFF + 1,
+ body->eadatasize, NULL);
+ if (eadata == NULL) {
+ CERROR ("Missing/short eadata\n");
+ RETURN (-EPROTO);
+ }
+ }
+ }
+ if ((body->valid & OBD_MD_FLEASIZE)) {
/* The eadata is opaque; just check that it is there.
* Eventually, obd_unpackmd() will check the contents */
eadata = lustre_swab_repbuf(req, DLM_REPLY_REC_OFF + 1,
#endif
int mdc_get_lustre_md(struct obd_export *exp, struct ptlrpc_request *req,
- int offset, struct obd_export *dt_exp, struct lustre_md *md)
+ int offset, struct obd_export *dt_exp,
+ struct obd_export *md_exp,
+ struct lustre_md *md)
{
int rc = 0;
ENTRY;
offset++;
} else if (md->body->valid & OBD_MD_FLDIREA) {
- /* TODO: umka, please handle this case */
+ int lmvsize;
+ struct lov_mds_md *lmv;
LASSERT(S_ISDIR(md->body->mode));
- offset++;
+
+ if (md->body->eadatasize == 0) {
+ RETURN(0);
+ }
+ if (md->body->valid & OBD_MD_MEA) {
+ lmvsize = md->body->eadatasize;
+ lmv = lustre_msg_buf(req->rq_repmsg, offset, lmvsize);
+ LASSERT (lmv != NULL);
+ LASSERT_REPSWABBED(req, offset);
+
+ rc = obd_unpackmd(md_exp, (void *)&md->mea, lmv,
+ lmvsize);
+ if (rc < 0)
+ RETURN(rc);
+
+ LASSERT (rc >= sizeof (*md->mea));
+ }
+ rc = 0;
+ offset ++;
}
/* for ACL, it's possible that FLACL is set but aclsize is zero. only
RETURN(rc);
}
-#ifdef HAVE_SPLIT_SUPPORT
/* get lmv EA only*/
static int __mdd_lmv_get(const struct lu_context *ctxt,
struct mdd_object *mdd_obj, struct md_attr *ma)
}
RETURN(rc);
}
-#endif
static int mdd_attr_get_internal(const struct lu_context *ctxt,
struct mdd_object *mdd_obj,
S_ISDIR(mdd_object_type(mdd_obj)))
rc = __mdd_lmm_get(ctxt, mdd_obj, ma);
}
-#ifdef HAVE_SPLIT_SUPPORT
if (rc == 0 && ma->ma_need & MA_LMV) {
if (S_ISDIR(mdd_object_type(mdd_obj)))
rc = __mdd_lmv_get(ctxt, mdd_obj, ma);
}
-#endif
CDEBUG(D_INODE, "after getattr rc = %d, ma_valid = "LPX64"\n",
rc, ma->ma_valid);
RETURN(rc);
struct lu_device *next = &mdd->mdd_child->dd_lu_dev;
int rc;
ENTRY;
-/* TODO:
+ /* TODO:
rc = mdd_lov_set_nextid(ctx, mdd);
if (rc) {
CERROR("%s: mdd_lov_set_nextid failed %d\n",
if (buf && buf_len > 0) {
rc = next->do_ops->do_xattr_set(ctxt, next, buf, buf_len, name,
0, handle);
+#ifdef HAVE_SPLIT_SUPPORT
+ if (rc == 0) {
+ /* very ugly hack, if setting lmv, it means splitting
+ * sucess, we should return -ERESTART to notify the
+ * client, so transno for this splitting should be
+ * zero according to the replay rules. so return -ERESTART
+ * here let mdt trans stop callback know this.
+ */
+ if (strncmp(name, MDS_LMV_MD_NAME, strlen(name)) == 0)
+ rc = -ERESTART;
+ }
+#endif
}else if (buf == NULL && buf_len == 0) {
rc = next->do_ops->do_xattr_del(ctxt, next, name, handle);
}
case MDS_GETATTR_NAME:
case MDS_STATFS:
case MDS_READPAGE:
+ case MDS_WRITEPAGE:
case MDS_REINT:
case MDS_CLOSE:
case MDS_DONE_WRITING:
repbody->eadatasize = 0;
repbody->aclsize = 0;
- ma->ma_lmm = req_capsule_server_get(pill, &RMF_MDT_MD);
- ma->ma_lmm_size = req_capsule_get_size(pill, &RMF_MDT_MD, RCL_SERVER);
-
- ma->ma_need = MA_INODE | MA_LOV;
+ if(reqbody->valid & OBD_MD_MEA) {
+ /* Assumption: MDT_MD size is enough for lmv size FIXME */
+ ma->ma_lmv = req_capsule_server_get(pill, &RMF_MDT_MD);
+ ma->ma_lmv_size = req_capsule_get_size(pill, &RMF_MDT_MD,
+ RCL_SERVER);
+ ma->ma_need = MA_INODE | MA_LMV;
+ } else {
+ ma->ma_need = MA_INODE | MA_LOV ;
+ ma->ma_lmm = req_capsule_server_get(pill, &RMF_MDT_MD);
+ ma->ma_lmm_size = req_capsule_get_size(pill, &RMF_MDT_MD,
+ RCL_SERVER);
+ }
rc = mo_attr_get(ctxt, next, ma);
if (rc == -EREMOTE) {
/* This object is located on remote node.*/
else
repbody->valid |= OBD_MD_FLEASIZE;
}
+ if (ma->ma_valid & MA_LMV) {
+ LASSERT(S_ISDIR(la->la_mode));
+ repbody->eadatasize = ma->ma_lmv_size;
+ repbody->valid |= OBD_MD_FLDIREA;
+ repbody->valid |= OBD_MD_MEA;
+ }
} else if (S_ISLNK(la->la_mode) &&
reqbody->valid & OBD_MD_LINKNAME) {
rc = mo_readlink(ctxt, next, ma->ma_lmm, ma->ma_lmm_size);
rc = mdo_name_insert(info->mti_ctxt,
md_object_next(&object->mot_obj),
ent->lde_name, lf, 0);
- /* FIXME: add cross_flags */
+ CDEBUG(D_INFO, "insert name %s rc %d \n", ent->lde_name, rc);
if (rc) {
kunmap(page);
RETURN(rc);
ENTRY;
desc = ptlrpc_prep_bulk_exp (req, 1, BULK_GET_SINK, MDS_BULK_PORTAL);
- if (desc)
+ if (!desc)
RETURN(-ENOMEM);
/* allocate the page for the desc */
if (result == -ENOENT) {
/* not found and with MDS_OPEN_CREAT: let's create it */
- mdt_set_disposition(info, ldlm_rep, DISP_OPEN_CREATE);
result = mdo_create(info->mti_ctxt,
mdt_object_child(parent),
rr->rr_name,
mdt_object_child(child),
&info->mti_spec,
&info->mti_attr);
- if (result != 0)
+ if (result == -ERESTART)
GOTO(out_child, result);
+ else {
+ mdt_set_disposition(info, ldlm_rep, DISP_OPEN_CREATE);
+ if (result != 0)
+ GOTO(out_child, result);
+ }
created = 1;
} else {
/* we have to get attr & lov ea for this object*/
#include <obd.h>
#endif
#include <lprocfs_status.h>
+#include <lustre_idl.h>
static int mea_last_char_hash(int count, char *name, int namelen)
{
return c;
}
+/* This hash calculate method must be same as the lvar hash method */
+static int mea_hash_segment(int count, char *name, int namelen)
+{
+ __u32 result = 0;
+ __u32 hash_segment = MAX_HASH_SIZE / count;
+
+ strncpy((void *)&result, name, min(namelen, (int)sizeof result));
+
+ result = (result << 1) & 0x7fffffff;
+
+ return result / hash_segment;
+}
+
int raw_name2idx(int hashtype, int count, const char *name, int namelen)
{
unsigned int c = 0;
case MEA_MAGIC_ALL_CHARS:
c = mea_all_chars_hash(count, (char *) name, namelen);
break;
+ case MEA_MAGIC_HASH_SEGMENT:
+ c = mea_hash_segment(count, (char *) name, namelen);
+ break;
default:
CERROR("unknown hash type 0x%x\n", hashtype);
}
&RQF_MDS_CLOSE,
&RQF_MDS_PIN,
&RQF_MDS_READPAGE,
+ &RQF_MDS_WRITEPAGE,
&RQF_MDS_DONE_WRITING
};
mdt_body_only, mdt_body_only);
EXPORT_SYMBOL(RQF_MDS_READPAGE);
+const struct req_format RQF_MDS_WRITEPAGE =
+ DEFINE_REQ_FMT0("MDS_WRITEPAGE",
+ mdt_body_only, mdt_body_only);
+EXPORT_SYMBOL(RQF_MDS_WRITEPAGE);
+
#if !defined(__REQ_LAYOUT_USER__)
int req_layout_init(void)