/* Data stored per client in the last_rcvd file. In le32 order. */
struct mds_client_data;
+struct mdt_client_data;
struct mds_export_data {
struct list_head med_open_head;
int med_lr_idx;
};
+struct mdt_export_data {
+ struct list_head med_open_head;
+ spinlock_t med_open_lock; /* lock med_open_head, mfd_list*/
+ struct mdt_client_data *med_mcd;
+ __u64 med_ibits_known;
+ loff_t med_lr_off;
+ int med_lr_idx;
+};
struct osc_creator {
spinlock_t oscc_lock;
struct list_head oscc_list;
exp_libclient:1; /* liblustre client? */
union {
struct mds_export_data eu_mds_data;
+ struct mdt_export_data eu_mdt_data;
struct filter_export_data eu_filter_data;
struct ec_export_data eu_ec_data;
} u;
};
#define exp_mds_data u.eu_mds_data
+#define exp_mdt_data u.eu_mdt_data
#define exp_lov_data u.eu_lov_data
#define exp_filter_data u.eu_filter_data
#define exp_ec_data u.eu_ec_data
#include <obd.h>
/* lu2dt_dev() */
#include <dt_object.h>
-/* struct mds_client_data */
-#include "../mds/mds_internal.h"
#include "mdt_internal.h"
/*
struct mdt_lock_handle *lhp;
struct lu_fid child_fid;
struct ldlm_namespace *ns;
-
ENTRY;
- lhp = &info->mti_lh[MDT_LH_PARENT];
- lhp->mlh_mode = LCK_CR;
- lhc->mlh_mode = LCK_CR;
-
LASSERT(info->mti_object != NULL);
name = req_capsule_client_get(&info->mti_pill, &RMF_NAME);
ns = info->mti_mdt->mdt_namespace;
/*step 1: lock parent */
+ lhp = &info->mti_lh[MDT_LH_PARENT];
+ lhp->mlh_mode = LCK_CR;
result = mdt_object_lock(ns, parent, lhp, MDS_INODELOCK_UPDATE);
if (result != 0)
RETURN(result);
/*step 4: lock child: this lock is returned back to caller
* if successfully get attr.
*/
+ lhc->mlh_mode = LCK_CR;
result = mdt_object_lock(ns, child, lhc, child_bits);
if (result != 0)
GOTO(out_child, result);
}
if (result != 0)
mdt_object_unlock(ns, child, lhc);
+ EXIT;
out_child:
mdt_object_put(info->mti_ctxt, child);
out_parent:
mdt_object_unlock(ns, parent, lhp);
- RETURN(result);
+ return result;
}
/* normal handler: should release the child lock */
/* TODO these two methods not available now. */
/* this should sync the whole device */
-static int mdt_device_sync(struct mdt_device *mdt)
+static int mdt_device_sync(struct mdt_thread_info *info)
{
return 0;
}
/* this should sync this object */
-static int mdt_object_sync(struct mdt_object *m)
+static int mdt_object_sync(struct mdt_thread_info *info)
{
return 0;
}
/* sync the whole device */
rc = req_capsule_pack(pill);
if (rc == 0)
- rc = mdt_device_sync(info->mti_mdt);
+ rc = mdt_device_sync(info);
} else {
/* sync an object */
rc = mdt_unpack_req_pack_rep(info, HABEO_CORPUS | HABEO_REFERO);
if (rc != 0)
RETURN(rc);
- rc = mdt_object_sync(info->mti_object);
+ rc = mdt_object_sync(info);
if (rc != 0)
RETURN(rc);
static inline __u64 req_exp_last_xid(struct ptlrpc_request *req)
{
- return req->rq_export->exp_mds_data.med_mcd->mcd_last_xid;
+ return req->rq_export->exp_mdt_data.med_mcd->mcd_last_xid;
}
static int mdt_lock_resname_compat(struct mdt_device *m,
if (h->mh_fmt != NULL) {
req_capsule_set(&info->mti_pill, h->mh_fmt);
- if (result == 0)
- result = mdt_unpack_req_pack_rep(info, flags);
+ result = mdt_unpack_req_pack_rep(info, flags);
}
if (result == 0 && flags & HABEO_CLAVIS) {
result = mdt_lock_reply_compat(info->mti_mdt, dlm_rep);
}
- /* If we're DISCONNECTing, the mds_export_data is already freed */
+ /* If we're DISCONNECTing, the mdt_export_data is already freed */
if (result == 0 && h->mh_opc != MDS_DISCONNECT) {
+#ifdef MDT_CODE
+ /* FIXME: fake untill journal callback & open handling is OK.*/
+ __u64 last_transno;
+ __u64 last_committed;
+ struct mdt_device *mdt = info->mti_mdt;
+
+ LASSERT(mdt != NULL);
+ spin_lock(&mdt->mdt_transno_lock);
+ last_transno = ++ (mdt->mdt_last_transno);
+ last_committed = ++ (mdt->mdt_last_committed);
+ spin_unlock(&mdt->mdt_transno_lock);
+
+ req->rq_repmsg->transno = req->rq_transno = last_transno;
+ req->rq_repmsg->last_xid = req->rq_xid;
+ req->rq_repmsg->last_committed = last_committed;
+ req->rq_export->exp_obd->obd_last_committed = last_committed;
+#else
req->rq_repmsg->last_xid = le64_to_cpu(req_exp_last_xid(req));
target_committed_to_req(req);
+#endif
}
req_capsule_fini(&info->mti_pill);
RETURN(result);
}
+
void mdt_lock_handle_init(struct mdt_lock_handle *lh)
{
lh->mlh_lh.cookie = 0ull;
new_lock = ldlm_handle2lock(&lhc.mlh_lh);
if (new_lock == NULL && (flags & LDLM_FL_INTENT_ONLY))
RETURN(0);
+
+ LASSERTF(new_lock != NULL, "op %d lockh "LPX64"\n",
+ opcode, lhc.mlh_lh.cookie);
*lockp = new_lock;
+ /* FIXME:This only happen when I can handle RESENT */
+ if (new_lock->l_export == req->rq_export) {
+ /* Already gave this to the client, which means that we
+ * reconstructed a reply. */
+ LASSERT(lustre_msg_get_flags(req->rq_reqmsg) &
+ MSG_RESENT);
+ RETURN(ELDLM_LOCK_REPLACED);
+ }
+
/* Fixup the lock to be given to the client */
l_lock(&new_lock->l_resource->lr_namespace->ns_lock);
new_lock->l_readers = 0;
static int mdt_intent_opc(long itopc, struct mdt_thread_info *info,
struct ldlm_lock **lockp, int flags)
{
- int opc;
- int rc;
-
struct req_capsule *pill;
struct mdt_it_flavor *flv;
-
+ int opc;
+ int rc;
ENTRY;
opc = mdt_intent_code(itopc);
.psc_max_reply_size = MDS_MAXREPSIZE,
.psc_req_portal = MDS_REQUEST_PORTAL,
.psc_rep_portal = MDC_REPLY_PORTAL,
- .psc_watchdog_timeout = MDS_SERVICE_WATCHDOG_TIMEOUT,
+ .psc_watchdog_timeout = MDT_SERVICE_WATCHDOG_TIMEOUT,
/*
* We'd like to have a mechanism to set this on a per-device
* basis, but alas...
obd = class_name2obd(dev);
m->mdt_md_dev.md_lu_dev.ld_obd = obd;
+ spin_lock_init(&m->mdt_transno_lock);
+ /* FIXME: We need to load them from disk. But now fake it */
+ m->mdt_last_transno = 0;
+ m->mdt_last_committed = 0;
+ m->mdt_max_mdsize = MAX_MD_SIZE;
+ m->mdt_max_cookiesize = sizeof(struct llog_cookie);
+
OBD_ALLOC_PTR(s);
if (s == NULL)
RETURN(-ENOMEM);
exp->exp_connect_flags = data->ocd_connect_flags;
data->ocd_version = LUSTRE_VERSION_CODE;
- exp->exp_mds_data.med_ibits_known = data->ocd_ibits_known;
+ exp->exp_mdt_data.med_ibits_known = data->ocd_ibits_known;
}
if (mdt->mdt_opts.mo_acl &&
struct obd_export *exp;
int rc;
struct mdt_device *mdt;
- struct mds_export_data *med;
- struct mds_client_data *mcd;
+ struct mdt_export_data *med;
+ struct mdt_client_data *mcd;
ENTRY;
if (!conn || !obd || !cluuid)
exp = class_conn2export(conn);
LASSERT(exp != NULL);
- med = &exp->exp_mds_data;
+ med = &exp->exp_mdt_data;
rc = mdt_connect0(mdt, exp, data);
if (rc == 0) {
static int mdt_obd_disconnect(struct obd_export *exp)
{
- struct mds_export_data *med = &exp->exp_mds_data;
+ struct mdt_export_data *med = &exp->exp_mdt_data;
unsigned long irqflags;
int rc;
ENTRY;
l = &m->mdt_md_dev.md_lu_dev;
result = mdt_init0(ctx, m, t, cfg);
if (result != 0) {
- mdt_device_free(ctx, l);
- return ERR_PTR(result);
+ OBD_FREE_PTR(m);
+ l = ERR_PTR(result);
}
} else
#include <lustre_fid.h>
#include <lustre_fld.h>
#include <lustre_req_layout.h>
+/* LR_CLIENT_SIZE, etc. */
+#include <lustre_disk.h>
struct mdt_device {
/* super-class */
signed int mo_user_xattr :1;
signed int mo_acl :1;
} mdt_opts;
+ /* Transaction related stuff here */
+ spinlock_t mdt_transno_lock;
+ __u64 mdt_last_transno;
+ __u64 mdt_last_committed;
+ /* these values should be updated from lov if necessary.
+ * or should be placed somewhere else. */
+ int mdt_max_mdsize;
+ int mdt_max_cookiesize;
+
+};
+/* Data stored per client in the last_rcvd file. In le32 order. */
+struct mdt_client_data {
+ __u8 mcd_uuid[40]; /* client UUID */
+ __u64 mcd_last_transno; /* last completed transaction ID */
+ __u64 mcd_last_xid; /* xid for the last transaction */
+ __u32 mcd_last_result; /* result from last RPC */
+ __u32 mcd_last_data; /* per-op data (disposition for open &c.) */
+ __u8 mcd_padding[LR_CLIENT_SIZE - 64];
};
+#define MDT_SERVICE_WATCHDOG_TIMEOUT (obd_timeout * 1000)
static inline struct md_device_operations *mdt_child_ops(struct mdt_device * m)
{
rec = req_capsule_client_get(pill, &RMF_REC_SETATTR);
if (rec == NULL)
- RETURN (-EFAULT);
+ RETURN(-EFAULT);
rr->rr_fid1 = &rec->sa_fid;
attr->la_valid = rec->sa_valid;
attr->la_flags = rec->cr_flags;
attr->la_ctime = rec->cr_time;
if (req_capsule_field_present(pill, &RMF_SYMTGT))
- rr->rr_tgt = req_capsule_client_get(pill, &RMF_SYMTGT);
+ rr->rr_tgt = req_capsule_client_get(pill, &RMF_SYMTGT);
#endif
} else
result = -EFAULT;
rec = req_capsule_client_get(pill, &RMF_REC_LINK);
if (rec == NULL)
- RETURN (-EFAULT);
+ RETURN(-EFAULT);
attr->la_uid = rec->lk_fsuid;
attr->la_gid = rec->lk_fsgid;
rr->rr_name = req_capsule_client_get(pill, &RMF_NAME);
if (rr->rr_name == NULL)
- RETURN (-EFAULT);
+ RETURN(-EFAULT);
RETURN(0);
#endif
ENTRY;
rec = req_capsule_client_get(pill, &RMF_REC_UNLINK);
if (rec == NULL)
- RETURN (-EFAULT);
+ RETURN(-EFAULT);
attr->la_uid = rec->ul_fsuid;
attr->la_gid = rec->ul_fsgid;
rr->rr_name = req_capsule_client_get(pill, &RMF_NAME);
if (rr->rr_name == NULL)
- RETURN (-EFAULT);
+ RETURN(-EFAULT);
RETURN(0);
#endif
ENTRY;
rec = req_capsule_client_get(pill, &RMF_REC_RENAME);
if (rec == NULL)
- RETURN (-EFAULT);
+ RETURN(-EFAULT);
attr->la_uid = rec->rn_fsuid;
attr->la_gid = rec->rn_fsgid;
rr->rr_name = req_capsule_client_get(pill, &RMF_NAME);
if (rr->rr_name == NULL)
- RETURN (-EFAULT);
+ RETURN(-EFAULT);
rr->rr_tgt = req_capsule_client_get(pill, &RMF_SYMTGT);
if (rr->rr_tgt == NULL)
- RETURN (-EFAULT);
+ RETURN(-EFAULT);
RETURN(0);
#endif
struct mdt_object *child;
struct mdt_lock_handle *lh;
struct mdt_body *repbody = info->mti_reint_rep.mrr_body;
+ struct lu_attr *attr = &info->mti_attr;
+ struct mdt_reint_record *rr = &info->mti_rr;
int rc;
ENTRY;
lh = &info->mti_lh[MDT_LH_PARENT];
lh->mlh_mode = LCK_PW;
- parent = mdt_object_find_lock(info->mti_ctxt, mdt, info->mti_rr.rr_fid1,
+ parent = mdt_object_find_lock(info->mti_ctxt, mdt, rr->rr_fid1,
lh, MDS_INODELOCK_UPDATE);
if (IS_ERR(parent))
- return PTR_ERR(parent);
+ RETURN(PTR_ERR(parent));
- child = mdt_object_find(info->mti_ctxt, mdt, info->mti_rr.rr_fid2);
+ child = mdt_object_find(info->mti_ctxt, mdt, rr->rr_fid2);
if (!IS_ERR(child)) {
struct md_object *next = mdt_object_child(parent);
- rc = mdo_create(info->mti_ctxt, next, info->mti_rr.rr_name,
- mdt_object_child(child), &info->mti_attr);
+ rc = mdo_create(info->mti_ctxt, next, rr->rr_name,
+ mdt_object_child(child), attr);
if (rc == 0) {
- /* return fid to client. */
+ /* return fid to client. attr is over-written!!*/
rc = mo_attr_get(info->mti_ctxt,
mdt_object_child(child),
- &info->mti_attr);
+ attr);
if (rc == 0) {
- mdt_pack_attr2body(repbody, &info->mti_attr);
+ mdt_pack_attr2body(repbody, attr);
repbody->fid1 = *mdt_object_fid(child);
repbody->valid |= OBD_MD_FLID;
}
RETURN(rc);
}
+#if 0
static int mdt_md_mkdir(struct mdt_thread_info *info)
{
struct mdt_device *mdt = info->mti_mdt;
mdt_object_put(info->mti_ctxt, parent);
return result;
}
+#endif
/* partial request to create object only */
static int mdt_md_mkobj(struct mdt_thread_info *info)
#ifdef MDT_CODE
struct lu_attr *attr = &info->mti_attr;
struct mdt_reint_record *rr = &info->mti_rr;
- struct req_capsule *pill = &info->mti_pill;
struct ptlrpc_request *req = mdt_info_req(info);
struct mdt_object *mo;
struct mdt_lock_handle *lh;
/* MDS_CHECK_RESENT */
- rc = req_capsule_pack(pill);
- if (rc)
- RETURN(rc);
-
- if (req->rq_export->exp_connect_flags & OBD_CONNECT_RDONLY)
- RETURN(-EROFS);
-
if (attr->la_valid & ATTR_FROM_OPEN) {
mo = mdt_object_find(info->mti_ctxt, info->mti_mdt, rr->rr_fid1);
if (IS_ERR(mo))
if (lu_object_assert_not_exists(info->mti_ctxt, &mo->mot_obj.mo_lu))
GOTO(out_unlock, rc = -ENOENT);
+ if (req->rq_export->exp_connect_flags & OBD_CONNECT_RDONLY)
+ GOTO(out_unlock, rc = -EROFS);
+
rc = mo_attr_set(info->mti_ctxt, mdt_object_child(mo), attr);
if (rc != 0)
GOTO(out_unlock, rc);
repbody->valid |= OBD_MD_FLMTIME;
if (attr->la_valid & (ATTR_ATIME | ATTR_ATIME_SET))
repbody->valid |= OBD_MD_FLATIME;
-
+
+ /* FIXME: I have to combine the attr_set & xattr_set into one single
+ transaction. How can I?
+ */
rc = mo_xattr_set(info->mti_ctxt, mdt_object_child(mo),
rr->rr_eadata, rr->rr_eadatalen, "lov");
if (rc)
#ifdef MDT_CODE
struct lu_attr *attr = &info->mti_attr;
struct mdt_reint_record *rr = &info->mti_rr;
- struct req_capsule *pill = &info->mti_pill;
struct ptlrpc_request *req = mdt_info_req(info);
struct mdt_object *mp;
struct mdt_object *mc;
/* MDS_CHECK_RESENT here */
- rc = req_capsule_pack(pill);
- if (rc)
- RETURN(rc);
-
- if (req->rq_export->exp_connect_flags & OBD_CONNECT_RDONLY)
- RETURN(-EROFS);
-
/* step 1: lock the parent */
lhp = &info->mti_lh[MDT_LH_PARENT];
lhp->mlh_mode = LCK_EX;
RETURN(PTR_ERR(mp));
if (strlen(rr->rr_name) == 0) {
+ if (req->rq_export->exp_connect_flags & OBD_CONNECT_RDONLY)
+ GOTO(out_unlock_parent, rc = -EROFS);
+
/* remote partial operation */
rc = mo_ref_del(info->mti_ctxt, mdt_object_child(mp));
GOTO(out_unlock_parent, rc);
if (rc != 0)
GOTO(out_unlock_child, rc);
- if ((S_ISREG(attr->la_mode) && attr->la_nlink == 1))
+ if ((S_ISREG(attr->la_mode) && attr->la_nlink == 1)) {
/* && if opencount == 0*/
/* see mds_reint */
void * lmm = req_capsule_server_get(&info->mti_pill,
rc = 0;
}
+ if (req->rq_export->exp_connect_flags & OBD_CONNECT_RDONLY)
+ GOTO(out_unlock_child, rc = -EROFS);
+
/* step 4: delete it */
rc = mdo_unlink(info->mti_ctxt, mdt_object_child(mc),
mdt_object_child(mc), rr->rr_name);
static int mdt_reint_link(struct mdt_thread_info *info)
{
#ifdef MDT_CODE
- struct lu_attr *attr = &info->mti_attr;
struct mdt_reint_record *rr = &info->mti_rr;
- struct req_capsule *pill = &info->mti_pill;
struct ptlrpc_request *req = mdt_info_req(info);
struct mdt_object *ms;
struct mdt_object *mt;
struct mdt_lock_handle *lhs;
struct mdt_lock_handle *lht;
- struct mdt_body *repbody;
int rc;
ENTRY;
/* MDS_CHECK_RESENT here */
- rc = req_capsule_pack(pill);
- if (rc)
- RETURN(rc);
-
- if (req->rq_export->exp_connect_flags & OBD_CONNECT_RDONLY)
- RETURN(-EROFS);
-
/* step 1: lock the source */
lhs = &info->mti_lh[MDT_LH_PARENT];
lhs->mlh_mode = LCK_EX;
RETURN(PTR_ERR(ms));
if (strlen(rr->rr_name) == 0) {
+ if (req->rq_export->exp_connect_flags & OBD_CONNECT_RDONLY)
+ GOTO(out_unlock_source, rc = -EROFS);
+
/* remote partial operation */
rc = mo_ref_add(info->mti_ctxt, mdt_object_child(ms));
GOTO(out_unlock_source, rc);
* return -ENOENT
*/
+ if (req->rq_export->exp_connect_flags & OBD_CONNECT_RDONLY)
+ GOTO(out_unlock_target, rc = -EROFS);
+
/* step 4: link it */
rc = mdo_link(info->mti_ctxt, mdt_object_child(mt),
mdt_object_child(ms), rr->rr_name);
/* partial operation for rename */
static int mdt_reint_rename_tgt(struct mdt_thread_info *info)
{
- struct lu_attr *attr = &info->mti_attr;
struct mdt_reint_record *rr = &info->mti_rr;
- struct req_capsule *pill = &info->mti_pill;
struct ptlrpc_request *req = mdt_info_req(info);
struct mdt_object *mtgtdir;
struct mdt_object *mtgt = NULL;
struct mdt_lock_handle *lh_tgtdir;
struct mdt_lock_handle *lh_tgt;
- struct mdt_body *repbody;
struct lu_fid tgt_fid;
struct ldlm_namespace *ns = info->mti_mdt->mdt_namespace;
int rc;
mtgt = mdt_object_find_lock(info->mti_ctxt, info->mti_mdt,
&tgt_fid, lh_tgt,
MDS_INODELOCK_LOOKUP);
- if (IS_ERR(mold))
+ if (IS_ERR(mtgt))
GOTO(out_unlock_tgtdir, rc = PTR_ERR(mtgt));
}
+ if (req->rq_export->exp_connect_flags & OBD_CONNECT_RDONLY)
+ GOTO(out_unlock_tgt, rc = -EROFS);
+
/* step 3: rename_tgt or name_insert */
if (mtgt)
rc = mdo_rename_tgt(info->mti_ctxt, mdt_object_child(mtgtdir),
mdt_object_child(mtgt),
- &rr->rr_fid2, rr->rr_tgt);
+ rr->rr_fid2, rr->rr_tgt);
else
rc = mdo_name_insert(info->mti_ctxt, mdt_object_child(mtgtdir),
- rr->rr_tgt, &rr->rr_fid2);
+ rr->rr_tgt, rr->rr_fid2);
GOTO(out_unlock_tgt, rc);
out_unlock_tgt:
mdt_object_put(info->mti_ctxt, mtgt);
}
out_unlock_tgtdir:
- mdt_object_unlock(ns, msrcdir, lh_tgtdir);
+ mdt_object_unlock(ns, mtgtdir, lh_tgtdir);
mdt_object_put(info->mti_ctxt, mtgtdir);
out:
return rc;
static int mdt_reint_rename(struct mdt_thread_info *info)
{
#ifdef MDT_CODE
- struct lu_attr *attr = &info->mti_attr;
struct mdt_reint_record *rr = &info->mti_rr;
struct req_capsule *pill = &info->mti_pill;
struct ptlrpc_request *req = mdt_info_req(info);
struct mdt_lock_handle lh_new;
struct lu_fid old_fid = {0};
struct lu_fid new_fid = {0};
- struct mdt_body *repbody;
struct ldlm_namespace *ns = info->mti_mdt->mdt_namespace;
int rc;
/* MDS_CHECK_RESENT here */
- rc = req_capsule_pack(pill);
- if (rc)
- RETURN(rc);
-
- if (req->rq_export->exp_connect_flags & OBD_CONNECT_RDONLY)
- RETURN(-EROFS);
-
rc = req_capsule_get_size(pill, &RMF_NAME, RCL_CLIENT);
if (rc == 1) {
/* if (strlen(rr->rr_name) == 0) {*/
/* step 5:TODO orphan handling on new object*/
/* if isorphan(mnew) {...}
*/
+ if (req->rq_export->exp_connect_flags & OBD_CONNECT_RDONLY)
+ GOTO(out_unlock_new, rc = -EROFS);
/* step 6: rename it */
rc = mdo_rename(info->mti_ctxt, mdt_object_child(msrcdir),
EXPORT_SYMBOL(RMF_EADATA);
const struct req_msg_field RMF_LOGCOOKIES =
- DEFINE_MSGF("logcookies", 0, 16, NULL);
+ DEFINE_MSGF("logcookies", 0, sizeof(struct llog_cookie), NULL);
EXPORT_SYMBOL(RMF_LOGCOOKIES);
const struct req_msg_field RMF_REINT_OPC =