extend server-side locking used for punch only to other operations, getattr here
i=green
i=zam
/*0x00000100 : "OBD_FL_NO_USRQUOTA",*/
/*0x00000200 : "OBD_FL_NO_GRPQUOTA",*/
/*0x00000400 : "OBD_FL_CREATE_CROW",*/
- /*0x00000800 : "OBD_FL_TRUNCLOCK",*/
+ /*0x00000800 : "OBD_FL_SRVLOCK",*/
//}
return offset;
}
OBD_FL_NO_USRQUOTA = 0x00000100, /* the object's owner is over quota */
OBD_FL_NO_GRPQUOTA = 0x00000200, /* the object's group is over quota */
OBD_FL_CREATE_CROW = 0x00000400, /* object should be create on write */
- OBD_FL_TRUNCLOCK = 0x00000800, /* delegate DLM locking during punch*/
+ OBD_FL_SRVLOCK = 0x00000800, /* delegate DLM locking to server */
OBD_FL_CKSUM_CRC32 = 0x00001000, /* CRC32 checksum type */
OBD_FL_CKSUM_ADLER = 0x00002000, /* ADLER checksum type */
OBD_FL_CKSUM_RSVD1 = 0x00004000, /* for future cksum types */
#define OBD_MD_FLOSSCAPA (0x0000040000000000ULL) /* OSS capability */
#define OBD_MD_FLCKSPLIT (0x0000080000000000ULL) /* Check split on server */
#define OBD_MD_FLCROSSREF (0x0000100000000000ULL) /* Cross-ref case */
-
+#define OBD_MD_FLGETATTRLOCK (0x0000200000000000ULL) /* Get IOEpoch attributes
+ * under lock */
#define OBD_FL_TRUNC (0x0000200000000000ULL) /* for filter_truncate */
#define OBD_MD_FLRMTLSETFACL (0x0001000000000000ULL) /* lfs lsetfacl case */
MF_MDC_CANCEL_FID2 = (1 << 4),
MF_MDC_CANCEL_FID3 = (1 << 5),
MF_MDC_CANCEL_FID4 = (1 << 6),
+ /* There is a pending attribute update. */
+ MF_SOM_AU = (1 << 7),
+ /* Cancel OST locks while getattr OST attributes. */
+ MF_GETATTR_LOCK = (1 << 8),
};
#define MF_SOM_LOCAL_FLAGS (MF_SOM_CHANGE | MF_EPOCH_OPEN | MF_EPOCH_CLOSE)
struct llu_inode_info *lli = llu_i2info(inode);
struct llu_sb_info *sbi = llu_i2sbi(inode);
struct obdo oa = { 0 };
+ __u32 old_flags;
int rc;
ENTRY;
LASSERT(!(lli->lli_flags & LLIF_MDS_SIZE_LOCK));
LASSERT(sbi->ll_lco.lco_flags & OBD_CONNECT_SOM);
+ old_flags = op_data->op_flags;
op_data->op_flags = MF_SOM_CHANGE;
/* If inode is already in another epoch, skip getattr from OSTs. */
if (lli->lli_ioepoch == op_data->op_ioepoch) {
- rc = llu_inode_getattr(inode, &oa, op_data->op_ioepoch);
+ rc = llu_inode_getattr(inode, &oa, op_data->op_ioepoch,
+ old_flags & MF_GETATTR_LOCK);
if (rc) {
oa.o_valid = 0;
if (rc == -ENOENT)
void obdo_from_inode(struct obdo *dst, struct inode *src, obd_flag valid);
int ll_it_open_error(int phase, struct lookup_intent *it);
struct inode *llu_iget(struct filesys *fs, struct lustre_md *md);
-int llu_inode_getattr(struct inode *inode, struct obdo *obdo, __u64 ioepoch);
+int llu_inode_getattr(struct inode *inode, struct obdo *obdo,
+ __u64 ioepoch, int sync);
int llu_md_setattr(struct inode *inode, struct md_op_data *op_data,
struct md_open_data **mod);
int llu_setattr_raw(struct inode *inode, struct iattr *attr);
dst->o_valid |= newvalid;
}
-/*
- * really does the getattr on the inode and updates its fields
+/**
+ * Performs the getattr on the inode and updates its fields.
+ * If @sync != 0, perform the getattr under the server-side lock.
*/
-int llu_inode_getattr(struct inode *inode, struct obdo *obdo, __u64 ioepoch)
+int llu_inode_getattr(struct inode *inode, struct obdo *obdo,
+ __u64 ioepoch, int sync)
{
struct llu_inode_info *lli = llu_i2info(inode);
struct ptlrpc_request_set *set;
OBD_MD_FLBLKSZ | OBD_MD_FLMTIME |
OBD_MD_FLCTIME | OBD_MD_FLGROUP |
OBD_MD_FLATIME | OBD_MD_FLEPOCH;
+ if (sync) {
+ oinfo.oi_oa->o_valid |= OBD_MD_FLFLAGS;
+ oinfo.oi_oa->o_flags |= OBD_FL_SRVLOCK;
+ }
set = ptlrpc_prep_set();
if (set == NULL) {
/* Fills the obdo with the attributes for the lsm */
static int ll_lsm_getattr(struct lov_stripe_md *lsm, struct obd_export *exp,
struct obd_capa *capa, struct obdo *obdo,
- __u64 ioepoch)
+ __u64 ioepoch, int sync)
{
struct ptlrpc_request_set *set;
struct obd_info oinfo = { { { 0 } } };
OBD_MD_FLMTIME | OBD_MD_FLCTIME |
OBD_MD_FLGROUP | OBD_MD_FLEPOCH;
oinfo.oi_capa = capa;
+ if (sync) {
+ oinfo.oi_oa->o_valid |= OBD_MD_FLFLAGS;
+ oinfo.oi_oa->o_flags |= OBD_FL_SRVLOCK;
+ }
set = ptlrpc_prep_set();
if (set == NULL) {
RETURN(rc);
}
-/** Performs the getattr for @ioepoch on the inode and updates its fields. */
-int ll_inode_getattr(struct inode *inode, struct obdo *obdo, __u64 ioepoch)
+/**
+ * Performs the getattr on the inode and updates its fields.
+ * If @sync != 0, perform the getattr under the server-side lock.
+ */
+int ll_inode_getattr(struct inode *inode, struct obdo *obdo,
+ __u64 ioepoch, int sync)
{
struct ll_inode_info *lli = ll_i2info(inode);
struct obd_capa *capa = ll_mdscapa_get(inode);
ENTRY;
rc = ll_lsm_getattr(lli->lli_smd, ll_i2dtexp(inode),
- capa, obdo, ioepoch);
+ capa, obdo, ioepoch, sync);
capa_put(capa);
if (rc == 0) {
obdo_refresh_inode(inode, obdo, obdo->o_valid);
struct obdo obdo = { 0 };
int rc;
- rc = ll_lsm_getattr(lsm, sbi->ll_dt_exp, NULL, &obdo, 0);
+ rc = ll_lsm_getattr(lsm, sbi->ll_dt_exp, NULL, &obdo, 0, 0);
if (rc == 0) {
st->st_size = obdo.o_size;
st->st_blocks = obdo.o_blocks;
{
struct ll_inode_info *lli = ll_i2info(inode);
struct ptlrpc_request *request = NULL;
+ __u32 old_flags;
struct obdo *oa;
int rc;
ENTRY;
RETURN(-ENOMEM);
}
+ old_flags = op_data->op_flags;
op_data->op_flags = MF_SOM_CHANGE;
/* If inode is already in another epoch, skip getattr from OSTs. */
if (lli->lli_ioepoch == op_data->op_ioepoch) {
- rc = ll_inode_getattr(inode, oa, op_data->op_ioepoch);
+ rc = ll_inode_getattr(inode, oa, op_data->op_ioepoch,
+ old_flags & MF_GETATTR_LOCK);
if (rc) {
oa->o_valid = 0;
if (rc == -ENOENT)
struct obd_client_handle **och, unsigned long flags);
void ll_done_writing_attr(struct inode *inode, struct md_op_data *op_data);
int ll_som_update(struct inode *inode, struct md_op_data *op_data);
-int ll_inode_getattr(struct inode *inode, struct obdo *obdo, __u64 ioepoch);
+int ll_inode_getattr(struct inode *inode, struct obdo *obdo,
+ __u64 ioepoch, int sync);
int ll_md_setattr(struct inode *inode, struct md_op_data *op_data,
struct md_open_data **mod);
void ll_pack_inode2opdata(struct inode *inode, struct md_op_data *op_data,
RETURN(0);
}
+/* Prepares the request for the replay by the given reply */
+static void mdc_close_handle_reply(struct ptlrpc_request *req,
+ struct md_op_data *op_data, int rc) {
+ struct mdt_body *repbody;
+ struct mdt_ioepoch *epoch;
+
+ if (req && rc == -EAGAIN) {
+ repbody = req_capsule_server_get(&req->rq_pill, &RMF_MDT_BODY);
+ epoch = req_capsule_client_get(&req->rq_pill, &RMF_MDT_EPOCH);
+
+ epoch->flags |= MF_SOM_AU;
+ if (repbody->valid & OBD_MD_FLGETATTRLOCK)
+ op_data->op_flags |= MF_GETATTR_LOCK;
+ }
+}
+
int mdc_close(struct obd_export *exp, struct md_op_data *op_data,
struct md_open_data *mod, struct ptlrpc_request **request)
{
obd_mod_put(mod);
}
*request = req;
+ mdc_close_handle_reply(req, op_data, rc);
RETURN(rc);
}
* thus DW req does not keep a reference on mod anymore. */
obd_mod_put(mod);
}
+
+ mdc_close_handle_reply(req, op_data, rc);
ptlrpc_req_finished(req);
RETURN(rc);
}
static inline int mdt_ioepoch_close_on_replay(struct mdt_thread_info *info,
struct mdt_object *o)
{
+ int rc = MDT_IOEPOCH_CLOSED;
ENTRY;
down(&o->mot_ioepoch_sem);
o->mot_ioepoch, PFID(mdt_object_fid(o)), o->mot_ioepoch_count);
o->mot_ioepoch_count--;
+ /* Get an info from the replayed request if client is supposed
+ * to send an Attibute Update, reconstruct @rc if so */
+ if (info->mti_ioepoch->flags & MF_SOM_AU)
+ rc = MDT_IOEPOCH_GETATTR;
+
if (!mdt_ioepoch_opened(o))
mdt_object_som_enable(o, info->mti_ioepoch->ioepoch);
up(&o->mot_ioepoch_sem);
- RETURN(0);
+ RETURN(rc);
}
/**
* Regular file IOepoch close.
* Closes the ioepoch, checks the object state, apply obtained attributes and
- * re-enable SOM on the object, if possible.
+ * re-enable SOM on the object, if possible. Also checks if the recovery is
+ * needed and packs OBD_MD_FLGETATTRLOCK flag into the reply to force the client
+ * to obtain SOM attributes under the server-side OST locks.
*
* Return value:
* MDT_IOEPOCH_CLOSED if ioepoch is closed.
mdt_object_som_enable(o, o->mot_ioepoch);
}
- EXIT;
+ up(&o->mot_ioepoch_sem);
+ /* If recovery is needed, tell the client to perform GETATTR under
+ * the lock. */
+ if (ret == MDT_IOEPOCH_GETATTR && recovery) {
+ struct mdt_body *rep;
+ rep = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY);
+ rep->valid |= OBD_MD_FLGETATTRLOCK;
+ }
+
+ RETURN(rc ? : ret);
+
error_up:
up(&o->mot_ioepoch_sem);
- return rc ? : ret;
+ return rc;
}
/**
/* If this is a replay, reconstruct the transno. */
if (lustre_msg_get_flags(req->rq_reqmsg) & MSG_REPLAY) {
mdt_empty_transno(info);
- RETURN(0);
+ RETURN(info->mti_ioepoch->flags & MF_SOM_AU ?
+ -EAGAIN : 0);
}
RETURN(-ESTALE);
}
oa->o_valid = OBD_MD_FLID | OBD_MD_FLGROUP | OBD_MD_FLATIME |
OBD_MD_FLCTIME | OBD_MD_FLMTIME;
if (oio->oi_lockless) {
- oa->o_flags = OBD_FL_TRUNCLOCK;
+ oa->o_flags = OBD_FL_SRVLOCK;
oa->o_valid |= OBD_MD_FLFLAGS;
}
oa->o_size = size;
RETURN(0);
}
+/**
+ * Helper function for getting server side [start, start+count] DLM lock
+ * if asked by client.
+ */
+static int ost_lock_get(struct obd_export *exp, struct obdo *oa,
+ __u64 start, __u64 count, struct lustre_handle *lh,
+ int mode, int flags)
+{
+ struct ldlm_res_id res_id = { .name = { oa->o_id, 0, oa->o_gr, 0} };
+ ldlm_policy_data_t policy;
+ __u64 end = start + count;
+
+ ENTRY;
+
+ LASSERT(!lustre_handle_is_used(lh));
+ LASSERT((oa->o_valid & (OBD_MD_FLID | OBD_MD_FLGROUP)) ==
+ (OBD_MD_FLID | OBD_MD_FLGROUP));
+
+ if (!(oa->o_valid & OBD_MD_FLFLAGS) ||
+ !(oa->o_flags & OBD_FL_SRVLOCK))
+ RETURN(0);
+
+ CDEBUG(D_INODE, "OST-side extent lock.\n");
+
+ policy.l_extent.start = start & CFS_PAGE_MASK;
+
+ /* If ->o_blocks is EOF it means "lock till the end of the
+ * file". Otherwise, it's size of a hole being punched (in bytes) */
+ if (count == OBD_OBJECT_EOF || end < start)
+ policy.l_extent.end = OBD_OBJECT_EOF;
+ else
+ policy.l_extent.end = end | ~CFS_PAGE_MASK;
+
+ RETURN(ldlm_cli_enqueue_local(exp->exp_obd->obd_namespace, &res_id,
+ LDLM_EXTENT, &policy, mode, &flags,
+ ldlm_blocking_ast, ldlm_completion_ast,
+ ldlm_glimpse_ast, NULL, 0, NULL, lh));
+}
+
+/* Helper function: release lock, if any. */
+static void ost_lock_put(struct obd_export *exp,
+ struct lustre_handle *lh, int mode)
+{
+ ENTRY;
+ if (lustre_handle_is_used(lh))
+ ldlm_lock_decref(lh, mode);
+ EXIT;
+}
+
static int ost_getattr(struct obd_export *exp, struct ptlrpc_request *req)
{
struct ost_body *body, *repbody;
struct obd_info oinfo = { { { 0 } } };
+ struct lustre_handle lh = { 0 };
int rc;
ENTRY;
repbody = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
repbody->oa = body->oa;
+ rc = ost_lock_get(exp, &body->oa, 0, OBD_OBJECT_EOF, &lh, LCK_PR, 0);
+ if (rc)
+ RETURN(rc);
+
oinfo.oi_oa = &repbody->oa;
if (oinfo.oi_oa->o_valid & OBD_MD_FLOSSCAPA) {
oinfo.oi_capa = req_capsule_client_get(&req->rq_pill,
}
req->rq_status = obd_getattr(exp, &oinfo);
+ ost_lock_put(exp, &lh, LCK_PR);
+
ost_drop_id(exp, &repbody->oa);
RETURN(0);
}
RETURN(0);
}
-/**
- * Helper function for ost_punch(): if asked by client, acquire [size, EOF]
- * lock on the file being truncated.
- */
-static int ost_punch_lock_get(struct obd_export *exp, struct obdo *oa,
- struct lustre_handle *lh)
-{
- int flags;
- struct ldlm_res_id res_id;
- ldlm_policy_data_t policy;
- __u64 start;
- __u64 finis;
-
- ENTRY;
-
- osc_build_res_name(oa->o_id, oa->o_gr, &res_id);
- LASSERT(!lustre_handle_is_used(lh));
-
- if (!(oa->o_valid & OBD_MD_FLFLAGS) ||
- !(oa->o_flags & OBD_FL_TRUNCLOCK))
- RETURN(0);
-
- CDEBUG(D_INODE, "OST-side truncate lock.\n");
-
- start = oa->o_size;
- finis = start + oa->o_blocks;
-
- /*
- * standard truncate optimization: if file body is completely
- * destroyed, don't send data back to the server.
- */
- flags = (start == 0) ? LDLM_AST_DISCARD_DATA : 0;
-
- policy.l_extent.start = start & CFS_PAGE_MASK;
-
- /*
- * If ->o_blocks is EOF it means "lock till the end of the
- * file". Otherwise, it's size of a hole being punched (in bytes)
- */
- if (oa->o_blocks == OBD_OBJECT_EOF || finis < start)
- policy.l_extent.end = OBD_OBJECT_EOF;
- else
- policy.l_extent.end = finis | ~CFS_PAGE_MASK;
-
- RETURN(ldlm_cli_enqueue_local(exp->exp_obd->obd_namespace, &res_id,
- LDLM_EXTENT, &policy, LCK_PW, &flags,
- ldlm_blocking_ast, ldlm_completion_ast,
- ldlm_glimpse_ast, NULL, 0, NULL, lh));
-}
-
-/**
- * Helper function for ost_punch(): release lock acquired by
- * ost_punch_lock_get(), if any.
- */
-static void ost_punch_lock_put(struct obd_export *exp, struct obdo *oa,
- struct lustre_handle *lh)
-{
- ENTRY;
- if (lustre_handle_is_used(lh))
- ldlm_lock_decref(lh, LCK_PW);
- EXIT;
-}
-
static int ost_punch(struct obd_export *exp, struct ptlrpc_request *req,
struct obd_trans_info *oti)
{
struct obd_info oinfo = { { { 0 } } };
struct ost_body *body, *repbody;
- int rc;
+ int rc, flags = 0;
struct lustre_handle lh = {0,};
ENTRY;
if (rc)
RETURN(rc);
+ /* standard truncate optimization: if file body is completely
+ * destroyed, don't send data back to the server. */
+ if (oinfo.oi_oa->o_size == 0)
+ flags |= LDLM_AST_DISCARD_DATA;
+
repbody = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
- rc = ost_punch_lock_get(exp, oinfo.oi_oa, &lh);
+ rc = ost_lock_get(exp, oinfo.oi_oa, oinfo.oi_oa->o_size,
+ oinfo.oi_oa->o_blocks, &lh, LCK_PW, flags);
if (rc == 0) {
if (oinfo.oi_oa->o_valid & OBD_MD_FLFLAGS &&
- oinfo.oi_oa->o_flags == OBD_FL_TRUNCLOCK)
+ oinfo.oi_oa->o_flags == OBD_FL_SRVLOCK)
/*
- * If OBD_FL_TRUNCLOCK is the only bit set in
+ * If OBD_FL_SRVLOCK is the only bit set in
* ->o_flags, clear OBD_MD_FLFLAGS to avoid falling
* through filter_setattr() to filter_iocontrol().
*/
}
}
req->rq_status = obd_punch(exp, &oinfo, oti, NULL);
- ost_punch_lock_put(exp, oinfo.oi_oa, &lh);
+ ost_lock_put(exp, &lh, LCK_PW);
}
repbody->oa = *oinfo.oi_oa;
ost_drop_id(exp, &repbody->oa);
RETURN(-EFAULT);
LASSERT(!(body->oa.o_valid & OBD_MD_FLFLAGS) ||
- !(body->oa.o_flags & OBD_FL_TRUNCLOCK));
+ !(body->oa.o_flags & OBD_FL_SRVLOCK));
RETURN(ost_punch_prolong_locks(req, &body->oa));
}
}
if (!(body->oa.o_valid & OBD_MD_FLFLAGS) ||
- !(body->oa.o_flags & OBD_FL_TRUNCLOCK))
+ !(body->oa.o_flags & OBD_FL_SRVLOCK))
req->rq_ops = &ost_hpreq_punch;
}
}
CLASSERT(OBD_FL_DEBUG_CHECK == 64);
CLASSERT(OBD_FL_NO_USRQUOTA == 256);
CLASSERT(OBD_FL_NO_GRPQUOTA == 512);
- CLASSERT(OBD_FL_TRUNCLOCK == 2048);
+ CLASSERT(OBD_FL_SRVLOCK == 2048);
CLASSERT(OBD_FL_CKSUM_CRC32 == 4096);
CLASSERT(OBD_FL_CKSUM_ADLER == 8192);
CLASSERT(OBD_FL_SHRINK_GRANT == 131072);
CHECK_CVALUE(OBD_FL_DEBUG_CHECK);
CHECK_CVALUE(OBD_FL_NO_USRQUOTA);
CHECK_CVALUE(OBD_FL_NO_GRPQUOTA);
- CHECK_CVALUE(OBD_FL_TRUNCLOCK);
+ CHECK_CVALUE(OBD_FL_SRVLOCK);
CHECK_CVALUE(OBD_FL_CKSUM_CRC32);
CHECK_CVALUE(OBD_FL_CKSUM_ADLER);
CHECK_CVALUE(OBD_FL_SHRINK_GRANT);
CLASSERT(OBD_FL_DEBUG_CHECK == 64);
CLASSERT(OBD_FL_NO_USRQUOTA == 256);
CLASSERT(OBD_FL_NO_GRPQUOTA == 512);
- CLASSERT(OBD_FL_TRUNCLOCK == 2048);
+ CLASSERT(OBD_FL_SRVLOCK == 2048);
CLASSERT(OBD_FL_CKSUM_CRC32 == 4096);
CLASSERT(OBD_FL_CKSUM_ADLER == 8192);
CLASSERT(OBD_FL_SHRINK_GRANT == 131072);