*/
struct inode *coc_inode;
/**
- * Invalidate the current stripe configuration due to losing
- * layout lock.
+ * Layout lock handle.
*/
- bool coc_invalidate;
+ struct ldlm_lock *coc_lock;
+ /**
+ * Operation to handle layout, OBJECT_CONF_XYZ.
+ */
+ int coc_opc;
+};
+
+enum {
+ /** configure layout, set up a new stripe, must be called while
+ * holding layout lock. */
+ OBJECT_CONF_SET = 0,
+ /** invalidate the current stripe configuration due to losing
+ * layout lock. */
+ OBJECT_CONF_INVALIDATE = 1,
+ /** wait for old layout to go away so that new layout can be
+ * set up. */
+ OBJECT_CONF_WAIT = 2
};
/**
OBD_CONNECT_64BITHASH | OBD_CONNECT_JOBSTATS | \
OBD_CONNECT_EINPROGRESS | \
OBD_CONNECT_LIGHTWEIGHT | OBD_CONNECT_UMASK | \
- OBD_CONNECT_LVB_TYPE)
+ OBD_CONNECT_LVB_TYPE | OBD_CONNECT_LAYOUTLOCK)
#define OST_CONNECT_SUPPORTED (OBD_CONNECT_SRVLOCK | OBD_CONNECT_GRANT | \
OBD_CONNECT_REQPORTAL | OBD_CONNECT_VERSION | \
OBD_CONNECT_TRUNCLOCK | OBD_CONNECT_INDEX | \
OBD_CONNECT_MAX_EASIZE | \
OBD_CONNECT_EINPROGRESS | \
OBD_CONNECT_JOBSTATS | \
- OBD_CONNECT_LIGHTWEIGHT | OBD_CONNECT_LVB_TYPE)
+ OBD_CONNECT_LIGHTWEIGHT | OBD_CONNECT_LVB_TYPE|\
+ OBD_CONNECT_LAYOUTLOCK)
#define ECHO_CONNECT_SUPPORTED (0)
#define MGS_CONNECT_SUPPORTED (OBD_CONNECT_VERSION | OBD_CONNECT_AT | \
OBD_CONNECT_FULL20 | OBD_CONNECT_IMP_RECOV | \
#define MDS_INODELOCK_OPEN 0x000004 /* For opened files */
#define MDS_INODELOCK_LAYOUT 0x000008 /* for layout */
-/* Do not forget to increase MDS_INODELOCK_MAXSHIFT when adding new bits
- * XXX: MDS_INODELOCK_MAXSHIFT should be increased to 3 once the layout lock is
- * supported */
-#define MDS_INODELOCK_MAXSHIFT 2
+#define MDS_INODELOCK_MAXSHIFT 3
/* This FULL lock is useful to take on unlink sort of operations */
#define MDS_INODELOCK_FULL ((1<<(MDS_INODELOCK_MAXSHIFT+1))-1)
void lustre_swab_fid2path (struct getinfo_fid2path *gf);
+enum {
+ LAYOUT_INTENT_ACCESS = 0,
+ LAYOUT_INTENT_READ = 1,
+ LAYOUT_INTENT_WRITE = 2,
+ LAYOUT_INTENT_GLIMPSE = 3,
+ LAYOUT_INTENT_TRUNC = 4,
+ LAYOUT_INTENT_RELEASE = 5,
+ LAYOUT_INTENT_RESTORE = 6
+};
+
+/* enqueue layout lock with intent */
+struct layout_intent {
+ __u32 li_opc; /* intent operation for enqueue, read, write etc */
+ __u32 li_flags;
+ __u64 li_start;
+ __u64 li_end;
+};
+
+void lustre_swab_layout_intent(struct layout_intent *li);
#endif
/** @} lustreidl */
ELDLM_LOCK_ABORTED = 301,
ELDLM_LOCK_REPLACED = 302,
ELDLM_NO_LOCK_DATA = 303,
+ ELDLM_LOCK_WOULDBLOCK = 304,
ELDLM_NAMESPACE_EXISTS = 400,
ELDLM_BAD_NAMESPACE = 401
struct inode *lr_lvb_inode;
};
+static inline bool ldlm_has_layout(struct ldlm_lock *lock)
+{
+ return lock->l_resource->lr_type == LDLM_IBITS &&
+ lock->l_policy_data.l_inodebits.bits & MDS_INODELOCK_LAYOUT;
+}
+
static inline char *
ldlm_ns_name(struct ldlm_namespace *ns)
{
return !!(ocd->ocd_connect_flags & OBD_CONNECT_LRU_RESIZE);
}
+static inline int exp_connect_layout(struct obd_export *exp)
+{
+ return !!(exp->exp_connect_flags & OBD_CONNECT_LAYOUTLOCK);
+}
+
static inline bool exp_connect_lvb_type(struct obd_export *exp)
{
LASSERT(exp != NULL);
name->name[LUSTRE_RES_ID_VER_OID_OFF] == fid_ver_oid(f);
}
+/* reverse function of fid_build_reg_res_name() */
+static inline void fid_build_from_res_name(struct lu_fid *f,
+ const struct ldlm_res_id *name)
+{
+ fid_zero(f);
+ f->f_seq = name->name[LUSTRE_RES_ID_SEQ_OFF];
+ f->f_oid = name->name[LUSTRE_RES_ID_VER_OID_OFF] & 0xffffffff;
+ f->f_ver = name->name[LUSTRE_RES_ID_VER_OID_OFF] >> 32;
+ LASSERT(fid_res_name_eq(f, name));
+}
static inline struct ldlm_res_id *
fid_build_pdo_res_name(const struct lu_fid *f,
extern struct req_format RQF_LDLM_CONVERT;
extern struct req_format RQF_LDLM_INTENT;
extern struct req_format RQF_LDLM_INTENT_BASIC;
+extern struct req_format RQF_LDLM_INTENT_LAYOUT;
extern struct req_format RQF_LDLM_INTENT_GETATTR;
extern struct req_format RQF_LDLM_INTENT_OPEN;
extern struct req_format RQF_LDLM_INTENT_CREATE;
extern struct req_msg_field RMF_DLM_LVB;
extern struct req_msg_field RMF_DLM_GL_DESC;
extern struct req_msg_field RMF_LDLM_INTENT;
+extern struct req_msg_field RMF_LAYOUT_INTENT;
extern struct req_msg_field RMF_MDT_MD;
extern struct req_msg_field RMF_REC_REINT;
extern struct req_msg_field RMF_EADATA;
#define OBD_FAIL_MDS_OSC_CREATE_FAIL 0x147
#define OBD_FAIL_MDS_NEGATIVE_POSITIVE 0x148
+/* layout lock */
+#define OBD_FAIL_MDS_NO_LL_GETATTR 0x170
+#define OBD_FAIL_MDS_NO_LL_OPEN 0x171
+#define OBD_FAIL_MDS_LL_BLOCK 0x172
+
/* CMD */
#define OBD_FAIL_MDS_IS_SUBDIR_NET 0x180
#define OBD_FAIL_MDS_IS_SUBDIR_PACK 0x181
#define OBD_FAIL_LLITE_FAULT_TRUNC_RACE 0x1401
#define OBD_FAIL_LOCK_STATE_WAIT_INTR 0x1402
#define OBD_FAIL_LOV_INIT 0x1403
+#define OBD_FAIL_GLIMPSE_DELAY 0x1404
/* Assign references to moved code to reduce code changes */
#define OBD_FAIL_PRECHECK(id) CFS_FAIL_PRECHECK(id)
else if (result == 0)
result = cl_glimpse_lock(env, io, inode, io->ci_obj,
agl);
+
+ OBD_FAIL_TIMEOUT(OBD_FAIL_GLIMPSE_DELAY, 2);
cl_io_fini(env, io);
if (unlikely(io->ci_need_restart))
goto again;
/*
* No locking is necessary, as new inode is
* locked by I_NEW bit.
- *
- * XXX not true for call from ll_update_inode().
*/
lli->lli_clob = clob;
+ lli->lli_has_smd = md->lsm != NULL;
lu_object_ref_add(&clob->co_lu, "inode", inode);
} else
result = PTR_ERR(clob);
- } else
- result = cl_conf_set(env, lli->lli_clob, &conf);
+ }
cl_env_put(env, &refcheck);
if (result != 0)
LASSERT(cfs_list_empty(&res->lr_converting));
check_res_locked(res);
- if (!first_enq) {
- LASSERT(work_list != NULL);
+ /* (*flags & LDLM_FL_BLOCK_NOWAIT) is for layout lock right now. */
+ if (!first_enq || (*flags & LDLM_FL_BLOCK_NOWAIT)) {
+ *err = ELDLM_LOCK_ABORTED;
+ if (*flags & LDLM_FL_BLOCK_NOWAIT)
+ *err = ELDLM_LOCK_WOULDBLOCK;
+
rc = ldlm_inodebits_compat_queue(&res->lr_granted, lock, NULL);
if (!rc)
RETURN(LDLM_ITER_STOP);
ldlm_resource_unlink_lock(lock);
ldlm_grant_lock(lock, work_list);
+
+ *err = ELDLM_OK;
RETURN(LDLM_ITER_CONTINUE);
}
ldlm_lock_decref_internal_nolock(lock, mode);
+ /* release lvb data for layout lock */
+ if (ns_is_client(ns) && !lock->l_readers && !lock->l_writers &&
+ ldlm_has_layout(lock) && lock->l_flags & LDLM_FL_LVB_READY) {
+ /* this is the last user of a layout lock and stripe has
+ * been set up, lvb is no longer used.
+ * This may be a large amount of memory, so we should free it
+ * when possible. */
+ if (lock->l_lvb_data != NULL) {
+ OBD_FREE_LARGE(lock->l_lvb_data, lock->l_lvb_len);
+ lock->l_lvb_data = NULL;
+ lock->l_lvb_len = 0;
+ }
+ }
+
if (lock->l_flags & LDLM_FL_LOCAL &&
!lock->l_readers && !lock->l_writers) {
/* If this is a local lock on a server namespace and this was
memcpy(data, lvb, size);
break;
default:
- LDLM_ERROR(lock, "Unexpected LVB type");
+ LDLM_ERROR(lock, "Unknown LVB type: %d\n", lock->l_lvb_type);
+ libcfs_debug_dumpstack(NULL);
RETURN(-EINVAL);
}
"(err=%d, rc=%d)", err, rc);
if (rc == 0) {
- int lvb_len = ldlm_lvbo_size(lock);
-
- if (lvb_len > 0) {
+ if (req_capsule_has_field(&req->rq_pill, &RMF_DLM_LVB,
+ RCL_SERVER) &&
+ ldlm_lvbo_size(lock) > 0) {
void *buf;
int buflen;
lock->l_lvb_len, lvb_len);
GOTO(out, rc = -EINVAL);
}
- } else { /* for layout lock, lvb has variable length */
+ } else if (ldlm_has_layout(lock)) { /* for layout lock, lvb has
+ * variable length */
void *lvb_data;
OBD_ALLOC(lvb_data, lvb_len);
if (lvb_data == NULL) {
- LDLM_ERROR(lock, "No memory.\n");
+ LDLM_ERROR(lock, "No memory: %d.\n", lvb_len);
GOTO(out, rc = -ENOMEM);
}
lock_res_and_lock(lock);
lock->l_flags |= LDLM_FL_FAILED;
unlock_res_and_lock(lock);
+ cfs_waitq_signal(&lock->l_waitq);
}
LDLM_LOCK_RELEASE(lock);
}
res->lr_name.name[0], res->lr_name.name[1],
res->lr_name.name[2], res->lr_name.name[3],
cfs_atomic_read(&res->lr_refcount) - 1);
+
+ ldlm_resource_dump(D_ERROR, res);
return 0;
}
* was opened several times without close, we track an
* open_count here */
struct ll_file_data *lli_file_data;
+ /* checking lli_has_smd is reliable only inside an IO
+ * i.e, lov stripe has been held. */
bool lli_has_smd;
int lli_open_flags;
int lli_open_count;
void ll_intent_drop_lock(struct lookup_intent *it)
{
- struct lustre_handle *handle;
-
if (it->it_op && it->d.lustre.it_lock_mode) {
- struct ldlm_lock *lock;
-
- handle = (struct lustre_handle *)&it->d.lustre.it_lock_handle;
- lock = ldlm_handle2lock(handle);
- if (lock != NULL) {
- /* it can only be allowed to match after layout is
- * applied to inode otherwise false layout would be
- * seen. Applying layout shoud happen before dropping
- * the intent lock. */
- if (it->d.lustre.it_lock_bits & MDS_INODELOCK_LAYOUT)
- ldlm_lock_allow_match(lock);
- LDLM_LOCK_PUT(lock);
- }
+ struct lustre_handle handle;
+
+ handle.cookie = it->d.lustre.it_lock_handle;
CDEBUG(D_DLMTRACE, "releasing lock with cookie "LPX64
- " from it %p\n", handle->cookie, it);
- ldlm_lock_decref(handle, it->d.lustre.it_lock_mode);
+ " from it %p\n", handle.cookie, it);
+ ldlm_lock_decref(&handle, it->d.lustre.it_lock_mode);
/* bug 494: intent_release may be called multiple times, from
* this thread and we don't want to double-decref this lock */
if (it_disposition(it, DISP_LOOKUP_NEG))
RETURN(-ENOENT);
- rc = ll_prep_inode(&de->d_inode, request, NULL);
+ rc = ll_prep_inode(&de->d_inode, request, NULL, it);
RETURN(rc);
}
GOTO(out, rc);
}
- rc = ll_prep_inode(&file->f_dentry->d_inode, req, NULL);
+ rc = ll_prep_inode(&file->f_dentry->d_inode, req, NULL, itp);
if (!rc && itp->d.lustre.it_lock_mode)
ll_set_lock_data(sbi->ll_md_exp, file->f_dentry->d_inode,
itp, NULL);
rc = ll_lov_setstripe_ea_info(inode, file, flags, lumv1, lum_size);
if (rc == 0) {
struct lov_stripe_md *lsm;
+ __u32 gen;
+
put_user(0, &lumv1p->lmm_stripe_count);
+
+ ll_layout_refresh(inode, &gen);
lsm = ccc_inode_lsm_get(inode);
rc = obd_iocontrol(LL_IOC_LOV_GETSTRIPE, ll_i2dtexp(inode),
0, lsm, (void *)arg);
RETURN(rc);
}
- rc = ll_prep_inode(&inode, req, NULL);
+ rc = ll_prep_inode(&inode, req, NULL, NULL);
}
out:
ptlrpc_req_finished(req);
result = cl_conf_set(env, lli->lli_clob, conf);
cl_env_nested_put(&nest, env);
+
+ if (conf->coc_opc == OBJECT_CONF_SET) {
+ struct ldlm_lock *lock = conf->coc_lock;
+
+ LASSERT(lock != NULL);
+ LASSERT(ldlm_has_layout(lock));
+ if (result == 0) {
+ /* it can only be allowed to match after layout is
+ * applied to inode otherwise false layout would be
+ * seen. Applying layout shoud happen before dropping
+ * the intent lock. */
+ ldlm_lock_allow_match(lock);
+ }
+ }
RETURN(result);
}
/**
+ * Apply the layout to the inode. Layout lock is held and will be released
+ * in this function.
+ */
+static int ll_layout_lock_set(struct lustre_handle *lockh, ldlm_mode_t mode,
+ struct inode *inode, __u32 *gen, bool reconf)
+{
+ struct ll_inode_info *lli = ll_i2info(inode);
+ struct ll_sb_info *sbi = ll_i2sbi(inode);
+ struct ldlm_lock *lock;
+ struct lustre_md md = { NULL };
+ struct cl_object_conf conf;
+ int rc = 0;
+ bool lvb_ready;
+ ENTRY;
+
+ LASSERT(lustre_handle_is_used(lockh));
+
+ lock = ldlm_handle2lock(lockh);
+ LASSERT(lock != NULL);
+ LASSERT(ldlm_has_layout(lock));
+
+ LDLM_DEBUG(lock, "File %p/"DFID" being reconfigured: %d.\n",
+ inode, PFID(&lli->lli_fid), reconf);
+
+ lock_res_and_lock(lock);
+ lvb_ready = !!(lock->l_flags & LDLM_FL_LVB_READY);
+ unlock_res_and_lock(lock);
+ /* checking lvb_ready is racy but this is okay. The worst case is
+ * that multi processes may configure the file on the same time. */
+ if (lvb_ready || !reconf) {
+ LDLM_LOCK_PUT(lock);
+
+ rc = -ENODATA;
+ if (lvb_ready) {
+ /* layout_gen must be valid if layout lock is not
+ * cancelled and stripe has already set */
+ *gen = lli->lli_layout_gen;
+ rc = 0;
+ }
+ ldlm_lock_decref(lockh, mode);
+ RETURN(rc);
+ }
+
+ /* for layout lock, lmm is returned in lock's lvb.
+ * lvb_data is immutable if the lock is held so it's safe to access it
+ * without res lock. See the description in ldlm_lock_decref_internal()
+ * for the condition to free lvb_data of layout lock */
+ if (lock->l_lvb_data != NULL) {
+ rc = obd_unpackmd(sbi->ll_dt_exp, &md.lsm,
+ lock->l_lvb_data, lock->l_lvb_len);
+ if (rc >= 0) {
+ if (md.lsm != NULL)
+ *gen = md.lsm->lsm_layout_gen;
+ rc = 0;
+ } else {
+ CERROR("%s: file "DFID" unpackmd error: %d\n",
+ ll_get_fsname(inode->i_sb, NULL, 0),
+ PFID(&lli->lli_fid), rc);
+ }
+ }
+ if (rc < 0) {
+ LDLM_LOCK_PUT(lock);
+ ldlm_lock_decref(lockh, mode);
+ RETURN(rc);
+ }
+
+ /* set layout to file. Unlikely this will fail as old layout was
+ * surely eliminated */
+ memset(&conf, 0, sizeof conf);
+ conf.coc_opc = OBJECT_CONF_SET;
+ conf.coc_inode = inode;
+ conf.coc_lock = lock;
+ conf.u.coc_md = &md;
+ rc = ll_layout_conf(inode, &conf);
+ LDLM_LOCK_PUT(lock);
+
+ ldlm_lock_decref(lockh, mode);
+
+ if (md.lsm != NULL)
+ obd_free_memmd(sbi->ll_dt_exp, &md.lsm);
+
+ /* wait for IO to complete if it's still being used. */
+ if (rc == -EBUSY) {
+ CDEBUG(D_INODE, "%s: %p/"DFID" wait for layout reconf.\n",
+ ll_get_fsname(inode->i_sb, NULL, 0),
+ inode, PFID(&lli->lli_fid));
+
+ memset(&conf, 0, sizeof conf);
+ conf.coc_opc = OBJECT_CONF_WAIT;
+ conf.coc_inode = inode;
+ rc = ll_layout_conf(inode, &conf);
+ if (rc == 0)
+ rc = -EAGAIN;
+
+ CDEBUG(D_INODE, "file: "DFID" waiting layout return: %d.\n",
+ PFID(&lli->lli_fid), rc);
+ }
+
+ RETURN(rc);
+}
+
+/**
* This function checks if there exists a LAYOUT lock on the client side,
* or enqueues it if it doesn't have one in cache.
*
{
struct ll_inode_info *lli = ll_i2info(inode);
struct ll_sb_info *sbi = ll_i2sbi(inode);
- struct md_op_data *op_data = NULL;
- struct lookup_intent it = { .it_op = IT_LAYOUT };
- struct lustre_handle lockh = { 0 };
+ struct md_op_data *op_data;
+ struct lookup_intent it;
+ struct lustre_handle lockh;
ldlm_mode_t mode;
struct ldlm_enqueue_info einfo = { .ei_type = LDLM_IBITS,
.ei_mode = LCK_CR,
int rc;
ENTRY;
- *gen = 0;
+ *gen = LL_LAYOUT_GEN_ZERO;
if (!(sbi->ll_flags & LL_SBI_LAYOUT_LOCK))
RETURN(0);
/* mostly layout lock is caching on the local side, so try to match
* it before grabbing layout lock mutex. */
- mode = ll_take_md_lock(inode, MDS_INODELOCK_LAYOUT, &lockh,
- LDLM_FL_LVB_READY);
+ mode = ll_take_md_lock(inode, MDS_INODELOCK_LAYOUT, &lockh, 0);
if (mode != 0) { /* hit cached lock */
- /* lsm_layout_gen is started from 0, plus 1 here to distinguish
- * the cases of no layout and first layout. */
- *gen = lli->lli_layout_gen + 1;
+ rc = ll_layout_lock_set(&lockh, mode, inode, gen, false);
+ if (rc == 0)
+ RETURN(0);
- ldlm_lock_decref(&lockh, mode);
- RETURN(0);
+ /* better hold lli_layout_mutex to try again otherwise
+ * it will have starvation problem. */
}
- op_data = ll_prep_md_op_data(NULL, inode, inode, NULL,
- 0, 0, LUSTRE_OPC_ANY, NULL);
- if (IS_ERR(op_data))
- RETURN(PTR_ERR(op_data));
-
/* take layout lock mutex to enqueue layout lock exclusively. */
mutex_lock(&lli->lli_layout_mutex);
- /* try again inside layout mutex */
- mode = ll_take_md_lock(inode, MDS_INODELOCK_LAYOUT, &lockh,
- LDLM_FL_LVB_READY);
+again:
+ /* try again. Maybe somebody else has done this. */
+ mode = ll_take_md_lock(inode, MDS_INODELOCK_LAYOUT, &lockh, 0);
if (mode != 0) { /* hit cached lock */
- *gen = lli->lli_layout_gen + 1;
+ rc = ll_layout_lock_set(&lockh, mode, inode, gen, true);
+ if (rc == -EAGAIN)
+ goto again;
- ldlm_lock_decref(&lockh, mode);
mutex_unlock(&lli->lli_layout_mutex);
- ll_finish_md_op_data(op_data);
- RETURN(0);
+ RETURN(rc);
+ }
+
+ op_data = ll_prep_md_op_data(NULL, inode, inode, NULL,
+ 0, 0, LUSTRE_OPC_ANY, NULL);
+ if (IS_ERR(op_data)) {
+ mutex_unlock(&lli->lli_layout_mutex);
+ RETURN(PTR_ERR(op_data));
}
/* have to enqueue one */
+ memset(&it, 0, sizeof(it));
+ it.it_op = IT_LAYOUT;
+ lockh.cookie = 0ULL;
+
+ LDLM_DEBUG_NOLOCK("%s: requeue layout lock for file %p/"DFID".\n",
+ ll_get_fsname(inode->i_sb, NULL, 0), inode,
+ PFID(&lli->lli_fid));
+
rc = md_enqueue(sbi->ll_md_exp, &einfo, &it, op_data, &lockh,
NULL, 0, NULL, 0);
if (it.d.lustre.it_data != NULL)
ptlrpc_req_finished(it.d.lustre.it_data);
it.d.lustre.it_data = NULL;
- if (rc == 0) {
- struct ldlm_lock *lock;
- struct cl_object_conf conf;
- struct lustre_md md = { NULL };
- void *lmm;
- int lmmsize;
+ ll_finish_md_op_data(op_data);
- LASSERT(lustre_handle_is_used(&lockh));
+ mode = it.d.lustre.it_lock_mode;
+ it.d.lustre.it_lock_mode = 0;
+ ll_intent_drop_lock(&it);
+ if (rc == 0) {
/* set lock data in case this is a new lock */
ll_set_lock_data(sbi->ll_md_exp, inode, &it, NULL);
-
- lock = ldlm_handle2lock(&lockh);
- LASSERT(lock != NULL);
-
- /* for IT_LAYOUT lock, lmm is returned in lock's lvb
- * data via completion callback */
- lmm = lock->l_lvb_data;
- lmmsize = lock->l_lvb_len;
- if (lmm != NULL) {
- rc = obd_unpackmd(sbi->ll_dt_exp, &md.lsm,
- lmm, lmmsize);
- if (rc >= 0) {
- if (md.lsm != NULL)
- *gen = md.lsm->lsm_layout_gen + 1;
- rc = 0;
- } else {
- CERROR("file: "DFID" unpackmd error: %d\n",
- PFID(&lli->lli_fid), rc);
- }
- }
- LDLM_LOCK_PUT(lock);
-
- /* set layout to file. This may cause lock expiration as we
- * set layout inside layout ibits lock. */
- memset(&conf, 0, sizeof conf);
- conf.coc_inode = inode;
- conf.u.coc_md = &md;
- ll_layout_conf(inode, &conf);
- /* is this racy? */
- lli->lli_has_smd = md.lsm != NULL;
- if (md.lsm != NULL)
- obd_free_memmd(sbi->ll_dt_exp, &md.lsm);
+ rc = ll_layout_lock_set(&lockh, mode, inode, gen, true);
+ if (rc == -EAGAIN)
+ goto again;
}
- ll_intent_drop_lock(&it);
-
mutex_unlock(&lli->lli_layout_mutex);
- ll_finish_md_op_data(op_data);
RETURN(rc);
}
#endif
void ll_dirty_page_discard_warn(cfs_page_t *page, int ioret);
int ll_prep_inode(struct inode **inode, struct ptlrpc_request *req,
- struct super_block *);
+ struct super_block *, struct lookup_intent *);
void lustre_dump_dentry(struct dentry *, int recur);
void lustre_dump_inode(struct inode *);
int ll_obd_statfs(struct inode *inode, void *arg);
#warning "remove old LL_IOC_QUOTACTL_18 compatibility code"
#endif /* LUSTRE_VERSION_CODE < OBD_OCD_VERSION(2, 7, 50, 0) */
+#define LL_LAYOUT_GEN_ZERO ((__u32)-1)
int ll_layout_conf(struct inode *inode, const struct cl_object_conf *conf);
int ll_layout_refresh(struct inode *inode, __u32 *gen);
OBD_CONNECT_RMT_CLIENT | OBD_CONNECT_VBR |
OBD_CONNECT_FULL20 | OBD_CONNECT_64BITHASH|
OBD_CONNECT_EINPROGRESS |
- OBD_CONNECT_JOBSTATS | OBD_CONNECT_LVB_TYPE;
+ OBD_CONNECT_JOBSTATS | OBD_CONNECT_LVB_TYPE |
+ OBD_CONNECT_LAYOUTLOCK;
if (sbi->ll_flags & LL_SBI_SOM_PREVIEW)
data->ocd_connect_flags |= OBD_CONNECT_SOM;
OBD_CONNECT_FULL20 | OBD_CONNECT_64BITHASH |
OBD_CONNECT_MAXBYTES |
OBD_CONNECT_EINPROGRESS |
- OBD_CONNECT_JOBSTATS | OBD_CONNECT_LVB_TYPE;
+ OBD_CONNECT_JOBSTATS | OBD_CONNECT_LVB_TYPE |
+ OBD_CONNECT_LAYOUTLOCK;
if (sbi->ll_flags & LL_SBI_SOM_PREVIEW)
data->ocd_connect_flags |= OBD_CONNECT_SOM;
mutex_init(&lli->lli_och_mutex);
spin_lock_init(&lli->lli_agl_lock);
lli->lli_has_smd = false;
+ lli->lli_layout_gen = LL_LAYOUT_GEN_ZERO;
lli->lli_clob = NULL;
LASSERT(lli->lli_vfs_inode.i_mode != 0);
LASSERT ((lsm != NULL) == ((body->valid & OBD_MD_FLEASIZE) != 0));
if (lsm != NULL) {
- LASSERT(S_ISREG(inode->i_mode));
- CDEBUG(D_INODE, "adding lsm %p to inode %lu/%u(%p)\n",
- lsm, inode->i_ino, inode->i_generation, inode);
- /* cl_file_inode_init must go before lli_has_smd or a race
- * is possible where client thinks the file has stripes,
- * but lov raid0 is not setup yet and parallel e.g.
- * glimpse would try to use uninitialized lov */
- if (cl_file_inode_init(inode, md) == 0)
- lli->lli_has_smd = true;
-
lli->lli_maxbytes = lsm->lsm_maxbytes;
if (lli->lli_maxbytes > MAX_LFS_FILESIZE)
lli->lli_maxbytes = MAX_LFS_FILESIZE;
- if (md->lsm != NULL)
- obd_free_memmd(ll_i2dtexp(inode), &md->lsm);
}
if (sbi->ll_flags & LL_SBI_RMT_CLIENT) {
return 0;
}
-int ll_prep_inode(struct inode **inode,
- struct ptlrpc_request *req,
- struct super_block *sb)
+int ll_prep_inode(struct inode **inode, struct ptlrpc_request *req,
+ struct super_block *sb, struct lookup_intent *it)
{
struct ll_sb_info *sbi = NULL;
struct lustre_md md;
- __u64 ibits;
int rc;
ENTRY;
*inode = ll_iget(sb, cl_fid_build_ino(&md.body->fid1, 0), &md);
if (*inode == NULL || IS_ERR(*inode)) {
- if (md.lsm)
- obd_free_memmd(sbi->ll_dt_exp, &md.lsm);
#ifdef CONFIG_FS_POSIX_ACL
if (md.posix_acl) {
posix_acl_release(md.posix_acl);
}
}
- /* sanity check for LAYOUT lock. */
- ibits = MDS_INODELOCK_LAYOUT;
- if (S_ISREG(md.body->mode) && sbi->ll_flags & LL_SBI_LAYOUT_LOCK &&
- md.lsm != NULL && !ll_have_md_lock(*inode, &ibits, LCK_MINMODE)) {
- CERROR("%s: inode "DFID" (%p) layout lock not granted.\n",
- ll_get_fsname(sb, NULL, 0),
- PFID(ll_inode2fid(*inode)), *inode);
+ /* Handling piggyback layout lock.
+ * Layout lock can be piggybacked by getattr and open request.
+ * The lsm can be applied to inode only if it comes with a layout lock
+ * otherwise correct layout may be overwritten, for example:
+ * 1. proc1: mdt returns a lsm but not granting layout
+ * 2. layout was changed by another client
+ * 3. proc2: refresh layout and layout lock granted
+ * 4. proc1: to apply a stale layout */
+ if (it != NULL && it->d.lustre.it_lock_mode != 0) {
+ struct lustre_handle lockh;
+ struct ldlm_lock *lock;
+
+ lockh.cookie = it->d.lustre.it_lock_handle;
+ lock = ldlm_handle2lock(&lockh);
+ LASSERT(lock != NULL);
+ if (ldlm_has_layout(lock)) {
+ struct cl_object_conf conf;
+
+ memset(&conf, 0, sizeof(conf));
+ conf.coc_opc = OBJECT_CONF_SET;
+ conf.coc_inode = *inode;
+ conf.coc_lock = lock;
+ conf.u.coc_md = &md;
+ (void)ll_layout_conf(*inode, &conf);
+ }
+ LDLM_LOCK_PUT(lock);
}
out:
+ if (md.lsm != NULL)
+ obd_free_memmd(sbi->ll_dt_exp, &md.lsm);
md_free_lustre_md(sbi->ll_md_exp, &md);
RETURN(rc);
}
PFID(fid), rc);
RETURN(ERR_PTR(rc));
}
- rc = ll_prep_inode(&inode, req, sb);
+ rc = ll_prep_inode(&inode, req, sb, NULL);
ptlrpc_req_finished(req);
if (rc)
RETURN(ERR_PTR(rc));
ll_read_inode2(inode, md);
if (S_ISREG(inode->i_mode) &&
- ll_i2info(inode)->lli_clob == NULL)
+ ll_i2info(inode)->lli_clob == NULL) {
+ CDEBUG(D_INODE,
+ "%s: apply lsm %p to inode "DFID".\n",
+ ll_get_fsname(sb, NULL, 0), md->lsm,
+ PFID(ll_inode2fid(inode)));
rc = cl_file_inode_init(inode, md);
+ }
if (rc != 0) {
- md->lsm = NULL;
make_bad_inode(inode);
unlock_new_inode(inode);
iput(inode);
lli = ll_i2info(inode);
if (bits & MDS_INODELOCK_LAYOUT) {
- struct cl_object_conf conf = { .coc_inode = inode,
- .coc_invalidate = true };
+ struct cl_object_conf conf = { { 0 } };
+
+ conf.coc_opc = OBJECT_CONF_INVALIDATE;
+ conf.coc_inode = inode;
rc = ll_layout_conf(inode, &conf);
if (rc)
CDEBUG(D_INODE, "invaliding layout %d.\n", rc);
CDEBUG(D_DENTRY, "it %p it_disposition %x\n", it,
it->d.lustre.it_disposition);
if (!it_disposition(it, DISP_LOOKUP_NEG)) {
- rc = ll_prep_inode(&inode, request, (*de)->d_sb);
+ rc = ll_prep_inode(&inode, request, (*de)->d_sb, it);
if (rc)
RETURN(rc);
LASSERT(it_disposition(it, DISP_ENQ_CREATE_REF));
request = it->d.lustre.it_data;
it_clear_disposition(it, DISP_ENQ_CREATE_REF);
- rc = ll_prep_inode(&inode, request, dir->i_sb);
+ rc = ll_prep_inode(&inode, request, dir->i_sb, it);
if (rc)
GOTO(out, inode = ERR_PTR(rc));
ll_update_times(request, dir);
if (dchild) {
- err = ll_prep_inode(&inode, request, dchild->d_sb);
+ err = ll_prep_inode(&inode, request, dchild->d_sb, NULL);
if (err)
GOTO(err_exit, err);
if (rc != 1)
GOTO(out, rc = -EAGAIN);
- rc = ll_prep_inode(&child, req, dir->i_sb);
+ rc = ll_prep_inode(&child, req, dir->i_sb, it);
if (rc)
GOTO(out, rc);
io->ci_lockreq = CILR_MANDATORY;
}
+ /* ignore layout change for generic CIT_MISC but not for glimpse.
+ * io context for glimpse must set ci_verify_layout to true,
+ * see cl_glimpse_size0() for details. */
+ if (io->ci_type == CIT_MISC && !io->ci_verify_layout)
+ io->ci_ignore_layout = 1;
+
/* Enqueue layout lock and get layout version. We need to do this
* even for operations requiring to open file, such as read and write,
* because it might not grant layout lock in IT_OPEN. */
{
struct ll_inode_info *lli = ll_i2info(conf->coc_inode);
- if (conf->u.coc_md != NULL && conf->u.coc_md->lsm != NULL)
+ if (conf->coc_opc != OBJECT_CONF_SET)
+ return 0;
+
+ if (conf->u.coc_md != NULL && conf->u.coc_md->lsm != NULL) {
+ CDEBUG(D_VFSTRACE, "layout lock change: %u -> %u\n",
+ lli->lli_layout_gen,
+ conf->u.coc_md->lsm->lsm_layout_gen);
+
+ lli->lli_has_smd = true;
lli->lli_layout_gen = conf->u.coc_md->lsm->lsm_layout_gen;
+ } else {
+ CDEBUG(D_VFSTRACE, "layout lock destroyed: %u.\n",
+ lli->lli_layout_gen);
+ lli->lli_has_smd = false;
+ lli->lli_layout_gen = LL_LAYOUT_GEN_ZERO;
+ }
return 0;
}
return 0;
}
+/* XXX: more methods will be added later. */
static const struct cl_lock_operations lov_empty_lock_ops = {
.clo_fini = lov_empty_lock_fini,
.clo_print = lov_empty_lock_print
int result;
if (OBD_FAIL_CHECK(OBD_FAIL_LOV_INIT)) {
+ /* For sanity:test_206.
+ * Do not leave the object in cache to avoid accessing
+ * freed memory. This is because osc_object is referring to
+ * lov_oinfo of lsm_stripe_data which will be freed due to
+ * this failure. */
+ cl_object_kill(env, stripe);
cl_object_put(env, stripe);
return -EIO;
}
RETURN(0);
LASSERT(cfs_atomic_read(&lsm->lsm_refc) > 0);
- while (cfs_atomic_read(&lsm->lsm_refc) > 1) {
+ while (cfs_atomic_read(&lsm->lsm_refc) > 1 && lov->lo_lsm_invalid) {
lov_conf_unlock(lov);
+
+ CDEBUG(D_INODE, "file:"DFID" wait for active IO, now: %d.\n",
+ PFID(lu_object_fid(lov2lu(lov))),
+ cfs_atomic_read(&lsm->lsm_refc));
+
l_wait_event(lov->lo_waitq,
cfs_atomic_read(&lsm->lsm_refc) == 1, &lwi);
lov_conf_lock(lov);
ENTRY;
lov_conf_lock(lov);
- if (conf->coc_invalidate) {
+ if (conf->coc_opc == OBJECT_CONF_INVALIDATE) {
lov->lo_lsm_invalid = 1;
GOTO(out, result = 0);
}
+ if (conf->coc_opc == OBJECT_CONF_WAIT) {
+ result = lov_layout_wait(env, lov);
+ GOTO(out, result);
+ }
+
+ LASSERT(conf->coc_opc == OBJECT_CONF_SET);
+
if (conf->u.coc_md != NULL)
lsm = conf->u.coc_md->lsm;
-
if ((lsm == NULL && lov->lo_lsm == NULL) ||
(lsm != NULL && lov->lo_lsm != NULL &&
lov->lo_lsm->lsm_layout_gen == lsm->lsm_layout_gen)) {
+ /* same version of layout */
lov->lo_lsm_invalid = 0;
GOTO(out, result = 0);
}
- /* will change layout */
- lov_layout_wait(env, lov);
+ /* will change layout - check if there still exists active IO. */
+ if (lov->lo_lsm != NULL &&
+ cfs_atomic_read(&lov->lo_lsm->lsm_refc) > 1) {
+ lov->lo_lsm_invalid = 1;
+ GOTO(out, result = -EBUSY);
+ }
/*
* Only LLT_EMPTY <-> LLT_RAID0 transitions are supported.
RETURN(req);
}
+static struct ptlrpc_request *mdc_intent_layout_pack(struct obd_export *exp,
+ struct lookup_intent *it,
+ struct md_op_data *unused)
+{
+ struct obd_device *obd = class_exp2obd(exp);
+ struct ptlrpc_request *req;
+ struct ldlm_intent *lit;
+ struct layout_intent *layout;
+ int rc;
+ ENTRY;
+
+ req = ptlrpc_request_alloc(class_exp2cliimp(exp),
+ &RQF_LDLM_INTENT_LAYOUT);
+ if (req == NULL)
+ RETURN(ERR_PTR(-ENOMEM));
+
+ req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT, 0);
+ rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
+ if (rc) {
+ ptlrpc_request_free(req);
+ RETURN(ERR_PTR(rc));
+ }
+
+ /* pack the intent */
+ lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
+ lit->opc = (__u64)it->it_op;
+
+ /* pack the layout intent request */
+ layout = req_capsule_client_get(&req->rq_pill, &RMF_LAYOUT_INTENT);
+ /* LAYOUT_INTENT_ACCESS is generic, specific operation will be
+ * set for replication */
+ layout->li_opc = LAYOUT_INTENT_ACCESS;
+
+ req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
+ obd->u.cli.cl_max_mds_easize);
+ ptlrpc_request_set_replen(req);
+ RETURN(req);
+}
+
static struct ptlrpc_request *
mdc_enqueue_pack(struct obd_export *exp, int lvb_len)
{
struct ldlm_request *lockreq;
struct ldlm_reply *lockrep;
struct lustre_intent_data *intent = &it->d.lustre;
+ struct ldlm_lock *lock;
+ void *lvb_data = NULL;
+ int lvb_len = 0;
ENTRY;
LASSERT(rc >= 0);
memset(lockh, 0, sizeof(*lockh));
rc = 0;
} else { /* rc = 0 */
- struct ldlm_lock *lock = ldlm_handle2lock(lockh);
- LASSERT(lock);
+ lock = ldlm_handle2lock(lockh);
+ LASSERT(lock != NULL);
/* If the server gave us back a different lock mode, we should
* fix up our variables. */
mdc_set_open_replay_data(NULL, NULL, req);
}
- /* TODO: make sure LAYOUT lock must be granted along with EA */
-
if ((body->valid & (OBD_MD_FLDIREA | OBD_MD_FLEASIZE)) != 0) {
void *eadata;
- mdc_update_max_ea_from_body(exp, body);
+ mdc_update_max_ea_from_body(exp, body);
/*
* The eadata is opaque; just check that it is there.
if (eadata == NULL)
RETURN(-EPROTO);
+ /* save lvb data and length in case this is for layout
+ * lock */
+ lvb_data = eadata;
+ lvb_len = body->eadatasize;
+
/*
* We save the reply LOV EA in case we have to replay a
* create for recovery. If we didn't allocate a large
RETURN(-EPROTO);
}
} else if (it->it_op & IT_LAYOUT) {
- struct ldlm_lock *lock = ldlm_handle2lock(lockh);
+ /* maybe the lock was granted right away and layout
+ * is packed into RMF_DLM_LVB of req */
+ lvb_len = req_capsule_get_size(pill, &RMF_DLM_LVB, RCL_SERVER);
+ if (lvb_len > 0) {
+ lvb_data = req_capsule_server_sized_get(pill,
+ &RMF_DLM_LVB, lvb_len);
+ if (lvb_data == NULL)
+ RETURN(-EPROTO);
+ }
+ }
- if (lock != NULL && lock->l_lvb_data == NULL) {
- int lvb_len;
+ /* fill in stripe data for layout lock */
+ lock = ldlm_handle2lock(lockh);
+ if (lock != NULL && ldlm_has_layout(lock) && lvb_data != NULL) {
+ void *lmm;
- /* maybe the lock was granted right away and layout
- * is packed into RMF_DLM_LVB of req */
- lvb_len = req_capsule_get_size(pill, &RMF_DLM_LVB,
- RCL_SERVER);
- if (lvb_len > 0) {
- void *lvb;
- void *lmm;
+ LDLM_DEBUG(lock, "layout lock returned by: %s, lvb_len: %d\n",
+ ldlm_it2str(it->it_op), lvb_len);
- lvb = req_capsule_server_sized_get(pill,
- &RMF_DLM_LVB, lvb_len);
- if (lvb == NULL) {
- LDLM_LOCK_PUT(lock);
- RETURN(-EPROTO);
- }
-
- OBD_ALLOC_LARGE(lmm, lvb_len);
- if (lmm == NULL) {
- LDLM_LOCK_PUT(lock);
- RETURN(-ENOMEM);
- }
- memcpy(lmm, lvb, lvb_len);
-
- /* install lvb_data */
- lock_res_and_lock(lock);
- LASSERT(lock->l_lvb_data == NULL);
- lock->l_lvb_data = lmm;
- lock->l_lvb_len = lvb_len;
- unlock_res_and_lock(lock);
- }
- }
- if (lock != NULL)
+ OBD_ALLOC_LARGE(lmm, lvb_len);
+ if (lmm == NULL) {
LDLM_LOCK_PUT(lock);
+ RETURN(-ENOMEM);
+ }
+ memcpy(lmm, lvb_data, lvb_len);
+
+ /* install lvb_data */
+ lock_res_and_lock(lock);
+ if (lock->l_lvb_data == NULL) {
+ lock->l_lvb_data = lmm;
+ lock->l_lvb_len = lvb_len;
+ lmm = NULL;
+ }
+ unlock_res_and_lock(lock);
+ if (lmm != NULL)
+ OBD_FREE_LARGE(lmm, lvb_len);
}
+ if (lock != NULL)
+ LDLM_LOCK_PUT(lock);
RETURN(rc);
}
if (!imp_connect_lvb_type(class_exp2cliimp(exp)))
RETURN(-EOPNOTSUPP);
- req = mdc_enqueue_pack(exp, obddev->u.cli.cl_max_mds_easize);
+ req = mdc_intent_layout_pack(exp, it, op_data);
lvb_type = LVB_T_LAYOUT;
} else {
LBUG();
}
}
- rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
-
- RETURN(rc);
+ rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
+ if (rc < 0) {
+ if (lustre_handle_is_used(lockh)) {
+ ldlm_lock_decref(lockh, einfo->ei_mode);
+ memset(lockh, 0, sizeof(*lockh));
+ }
+ ptlrpc_req_finished(req);
+ }
+ RETURN(rc);
}
static int mdc_finish_intent_lock(struct obd_export *exp,
lockh.cookie = 0;
if (fid_is_sane(&op_data->op_fid2) &&
- (it->it_op & (IT_LOOKUP | IT_GETATTR | IT_LAYOUT))) {
+ (it->it_op & (IT_LOOKUP | IT_GETATTR))) {
/* We could just return 1 immediately, but since we should only
* be called in revalidate_it if we already have a lock, let's
* verify that. */
LDLM_LOCK_PUT(lock);
rc = 0;
} else {
+ bool try_layout = false;
+
relock:
OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_RESEND, obd_timeout*2);
mdt_lock_handle_init(lhc);
- if (child_bits == MDS_INODELOCK_LAYOUT)
- mdt_lock_reg_init(lhc, LCK_CR);
- else
- mdt_lock_reg_init(lhc, LCK_PR);
+ mdt_lock_reg_init(lhc, LCK_PR);
if (mdt_object_exists(child) == 0) {
LU_OBJECT_DEBUG(D_INODE, info->mti_env,
if (unlikely(rc != 0))
GOTO(out_child, rc);
- /* layout lock is used only on regular files */
- if ((ma->ma_valid & MA_INODE) &&
- (ma->ma_attr.la_valid & LA_MODE) &&
- !S_ISREG(ma->ma_attr.la_mode))
- child_bits &= ~MDS_INODELOCK_LAYOUT;
-
/* If the file has not been changed for some time, we
* return not only a LOOKUP lock, but also an UPDATE
* lock and this might save us RPC on later STAT. For
child_bits |= MDS_INODELOCK_UPDATE;
}
- rc = mdt_object_lock(info, child, lhc, child_bits,
- MDT_CROSS_LOCK);
+ /* layout lock must be granted in a best-effort way
+ * for IT operations */
+ LASSERT(!(child_bits & MDS_INODELOCK_LAYOUT));
+ if (!OBD_FAIL_CHECK(OBD_FAIL_MDS_NO_LL_GETATTR) &&
+ exp_connect_layout(info->mti_exp) &&
+ S_ISREG(lu_object_attr(&child->mot_obj.mo_lu)) &&
+ ldlm_rep != NULL) {
+ /* try to grant layout lock for regular file. */
+ try_layout = true;
+ }
+ rc = 0;
+ if (try_layout) {
+ child_bits |= MDS_INODELOCK_LAYOUT;
+ /* try layout lock, it may fail to be granted due to
+ * contention at LOOKUP or UPDATE */
+ if (!mdt_object_lock_try(info, child, lhc, child_bits,
+ MDT_CROSS_LOCK)) {
+ child_bits &= ~MDS_INODELOCK_LAYOUT;
+ LASSERT(child_bits != 0);
+ rc = mdt_object_lock(info, child, lhc,
+ child_bits, MDT_CROSS_LOCK);
+ } else {
+ ma_need |= MA_LOV;
+ }
+ } else {
+ rc = mdt_object_lock(info, child, lhc, child_bits,
+ MDT_CROSS_LOCK);
+ }
if (unlikely(rc != 0))
GOTO(out_child, rc);
}
if (lock &&
lock->l_policy_data.l_inodebits.bits & MDS_INODELOCK_UPDATE &&
S_ISREG(lu_object_attr(&mdt_object_child(child)->mo_lu)))
- ma_need = MA_SOM;
+ ma_need |= MA_SOM;
/* finally, we can get attr for child. */
mdt_set_capainfo(info, 1, child_fid, BYPASS_CAPA);
/*
* DLM handlers.
*/
+
static struct ldlm_callback_suite cbs = {
.lcs_completion = ldlm_server_completion_ast,
.lcs_blocking = ldlm_server_blocking_ast,
RETURN(rc);
}
-int mdt_object_lock(struct mdt_thread_info *info, struct mdt_object *o,
- struct mdt_lock_handle *lh, __u64 ibits, int locality)
+static int mdt_object_lock0(struct mdt_thread_info *info, struct mdt_object *o,
+ struct mdt_lock_handle *lh, __u64 ibits,
+ bool nonblock, int locality)
{
struct ldlm_namespace *ns = info->mti_mdt->mdt_namespace;
ldlm_policy_data_t *policy = &info->mti_policy;
struct ldlm_res_id *res_id = &info->mti_res_id;
+ __u64 dlmflags;
int rc;
ENTRY;
memset(policy, 0, sizeof(*policy));
fid_build_reg_res_name(mdt_object_fid(o), res_id);
+ dlmflags = LDLM_FL_ATOMIC_CB;
+ if (nonblock)
+ dlmflags |= LDLM_FL_BLOCK_NOWAIT;
+
/*
* Take PDO lock on whole directory and build correct @res_id for lock
* on part of directory.
*/
policy->l_inodebits.bits = MDS_INODELOCK_UPDATE;
rc = mdt_fid_lock(ns, &lh->mlh_pdo_lh, lh->mlh_pdo_mode,
- policy, res_id, LDLM_FL_ATOMIC_CB,
+ policy, res_id, dlmflags,
&info->mti_exp->exp_handle.h_cookie);
if (unlikely(rc))
RETURN(rc);
* fix it up and turn FL_LOCAL flag off.
*/
rc = mdt_fid_lock(ns, &lh->mlh_reg_lh, lh->mlh_reg_mode, policy,
- res_id, LDLM_FL_LOCAL_ONLY | LDLM_FL_ATOMIC_CB,
+ res_id, LDLM_FL_LOCAL_ONLY | dlmflags,
&info->mti_exp->exp_handle.h_cookie);
if (rc)
mdt_object_unlock(info, o, lh, 1);
RETURN(rc);
}
+int mdt_object_lock(struct mdt_thread_info *info, struct mdt_object *o,
+ struct mdt_lock_handle *lh, __u64 ibits, int locality)
+{
+ return mdt_object_lock0(info, o, lh, ibits, false, locality);
+}
+
+int mdt_object_lock_try(struct mdt_thread_info *info, struct mdt_object *o,
+ struct mdt_lock_handle *lh, __u64 ibits, int locality)
+{
+ struct mdt_lock_handle tmp = *lh;
+ int rc;
+
+ rc = mdt_object_lock0(info, o, &tmp, ibits, true, locality);
+ if (rc == 0)
+ *lh = tmp;
+
+ return rc == 0;
+}
+
/**
* Save a lock within request object.
*
struct mdt_thread_info *info,
struct ldlm_lock **,
__u64);
+static int mdt_intent_layout(enum mdt_it_code opcode,
+ struct mdt_thread_info *info,
+ struct ldlm_lock **,
+ __u64);
static int mdt_intent_reint(enum mdt_it_code opcode,
struct mdt_thread_info *info,
struct ldlm_lock **,
.it_flags = 0,
.it_act = NULL
},
- [MDT_IT_LAYOUT] = {
- .it_fmt = &RQF_LDLM_INTENT_GETATTR,
- .it_flags = HABEO_REFERO,
- .it_act = mdt_intent_getattr
- }
+ [MDT_IT_LAYOUT] = {
+ .it_fmt = &RQF_LDLM_INTENT_LAYOUT,
+ .it_flags = 0,
+ .it_act = mdt_intent_layout
+ }
};
int mdt_intent_lock_replace(struct mdt_thread_info *info,
case MDT_IT_GETATTR:
child_bits = MDS_INODELOCK_LOOKUP | MDS_INODELOCK_UPDATE;
break;
- case MDT_IT_LAYOUT: {
- static int printed = 0;
-
- if (!printed) {
- CERROR("layout lock not supported by this version\n");
- printed = 1;
- }
- GOTO(out_shrink, rc = -EINVAL);
- break;
- }
default:
CERROR("Unsupported intent (%d)\n", opcode);
GOTO(out_shrink, rc = -EINVAL);
return rc;
}
+static int mdt_intent_layout(enum mdt_it_code opcode,
+ struct mdt_thread_info *info,
+ struct ldlm_lock **lockp,
+ __u64 flags)
+{
+ struct layout_intent *layout;
+ int rc;
+ ENTRY;
+
+ if (opcode != MDT_IT_LAYOUT) {
+ CERROR("%s: Unknown intent (%d)\n",
+ info->mti_exp->exp_obd->obd_name, opcode);
+ RETURN(-EINVAL);
+ }
+
+ (*lockp)->l_lvb_type = LVB_T_LAYOUT;
+ req_capsule_set_size(info->mti_pill, &RMF_DLM_LVB, RCL_SERVER,
+ ldlm_lvbo_size(*lockp));
+ rc = req_capsule_server_pack(info->mti_pill);
+ if (rc != 0)
+ RETURN(-EINVAL);
+
+ layout = req_capsule_client_get(info->mti_pill, &RMF_LAYOUT_INTENT);
+ LASSERT(layout != NULL);
+ if (layout->li_opc == LAYOUT_INTENT_ACCESS)
+ /* return to normal ldlm handling */
+ RETURN(0);
+
+ CERROR("%s: Unsupported layout intent (%d)\n",
+ info->mti_exp->exp_obd->obd_name, layout->li_opc);
+ RETURN(-EINVAL);
+}
+
static int mdt_intent_reint(enum mdt_it_code opcode,
struct mdt_thread_info *info,
struct ldlm_lock **lockp,
*/
if (lustre_handle_is_used(&lhc->mlh_reg_lh)) {
LASSERTF(rc == 0, "Error occurred but lock handle "
- "is still in use\n");
+ "is still in use, rc = %d\n", rc);
rep->lock_policy_res2 = 0;
rc = mdt_intent_lock_replace(info, lockp, NULL, lhc, flags);
RETURN(rc);
RETURN(rc);
}
- if (opc == MDT_IT_LAYOUT) {
- (*lockp)->l_lvb_type = LVB_T_LAYOUT;
- /* XXX: set replay RMF_DLM_LVB as the real EA size when LAYOUT
- * lock enabled. */
- } else if (opc == MDT_IT_READDIR) {
- req_capsule_set_size(pill, &RMF_DLM_LVB, RCL_SERVER, 0);
- }
-
flv = &mdt_it_flavor[opc];
if (flv->it_fmt != NULL)
req_capsule_extend(pill, flv->it_fmt);
MDT_LH_PARENT, /* parent lockh */
MDT_LH_CHILD, /* child lockh */
MDT_LH_OLD, /* old lockh for rename */
+ MDT_LH_LAYOUT = MDT_LH_OLD, /* layout lock */
MDT_LH_NEW, /* new lockh for rename */
MDT_LH_RMT, /* used for return lh to caller */
MDT_LH_NR
struct mdt_lock_handle *,
__u64, int);
+int mdt_object_lock_try(struct mdt_thread_info *,
+ struct mdt_object *,
+ struct mdt_lock_handle *,
+ __u64, int);
+
void mdt_object_unlock(struct mdt_thread_info *,
struct mdt_object *,
struct mdt_lock_handle *,
static int mdt_lvbo_size(struct ldlm_lock *lock)
{
- if (IS_LQUOTA_RES(lock->l_resource)) {
- struct mdt_device *mdt;
+ struct mdt_device *mdt;
+
+ /* resource on server side never changes. */
+ mdt = ldlm_res_to_ns(lock->l_resource)->ns_lvbp;
+ LASSERT(mdt != NULL);
- mdt = ldlm_res_to_ns(lock->l_resource)->ns_lvbp;
+ if (IS_LQUOTA_RES(lock->l_resource)) {
if (mdt->mdt_qmt_dev == NULL)
return 0;
return qmt_hdls.qmth_lvbo_size(mdt->mdt_qmt_dev, lock);
}
+ if (ldlm_has_layout(lock))
+ return mdt->mdt_max_mdsize;
+
return 0;
}
static int mdt_lvbo_fill(struct ldlm_lock *lock, void *lvb, int lvblen)
{
+ struct lu_env env;
+ struct mdt_thread_info *info;
+ struct mdt_device *mdt;
+ struct lu_fid *fid;
+ struct mdt_object *obj = NULL;
+ struct md_object *child = NULL;
+ int rc;
+ ENTRY;
+
+ mdt = ldlm_lock_to_ns(lock)->ns_lvbp;
if (IS_LQUOTA_RES(lock->l_resource)) {
- struct mdt_device *mdt;
-
- mdt = ldlm_res_to_ns(lock->l_resource)->ns_lvbp;
if (mdt->mdt_qmt_dev == NULL)
- return 0;
+ RETURN(0);
/* call lvbo fill function of quota master */
- return qmt_hdls.qmth_lvbo_fill(mdt->mdt_qmt_dev, lock, lvb,
- lvblen);
+ rc = qmt_hdls.qmth_lvbo_fill(mdt->mdt_qmt_dev, lock, lvb,
+ lvblen);
+ RETURN(rc);
}
- return 0;
+ if (lock->l_resource->lr_type != LDLM_IBITS ||
+ !(lock->l_policy_data.l_inodebits.bits & MDS_INODELOCK_LAYOUT))
+ RETURN(0);
+
+ /* layout lock will be granted to client, fill in lvb with layout */
+
+ /* XXX create an env to talk to mdt stack. We should get this env from
+ * ptlrpc_thread->t_env. */
+ rc = lu_env_init(&env, LCT_MD_THREAD);
+ LASSERT(rc == 0);
+
+ info = lu_context_key_get(&env.le_ctx, &mdt_thread_key);
+ LASSERT(info != NULL);
+ memset(info, 0, sizeof *info);
+ info->mti_env = &env;
+ info->mti_exp = lock->l_export;
+ info->mti_mdt = mdt;
+
+ /* XXX get fid by resource id. why don't include fid in ldlm_resource */
+ fid = &info->mti_tmp_fid2;
+ fid_build_from_res_name(fid, &lock->l_resource->lr_name);
+
+ obj = mdt_object_find(&env, info->mti_mdt, fid);
+ if (IS_ERR(obj))
+ GOTO(out, rc = PTR_ERR(obj));
+
+ if (mdt_object_exists(obj) <= 0)
+ GOTO(out, rc = -ENOENT);
+
+ child = mdt_object_child(obj);
+
+ /* get the length of lsm */
+ rc = mo_xattr_get(&env, child, &LU_BUF_NULL, XATTR_NAME_LOV);
+ if (rc < 0)
+ GOTO(out, rc);
+
+ if (rc > 0) {
+ struct lu_buf *lmm = NULL;
+
+ if (lvblen < rc) {
+ CERROR("%s: expected %d actual %d.\n",
+ info->mti_exp->exp_obd->obd_name, rc, lvblen);
+ GOTO(out, rc = -ERANGE);
+ }
+
+ lmm = &info->mti_buf;
+ lmm->lb_buf = lvb;
+ lmm->lb_len = rc;
+
+ rc = mo_xattr_get(&env, child, lmm, XATTR_NAME_LOV);
+ if (rc < 0)
+ GOTO(out, rc);
+ }
+
+out:
+ if (obj != NULL && !IS_ERR(obj))
+ mdt_object_put(&env, obj);
+ lu_env_fini(&env);
+ RETURN(rc < 0 ? 0 : rc);
}
static int mdt_lvbo_free(struct ldlm_resource *res)
RETURN(rc);
}
+/* lock object for open */
+static int mdt_object_open_lock(struct mdt_thread_info *info,
+ struct mdt_object *obj,
+ struct mdt_lock_handle *lhc,
+ __u64 *ibits)
+{
+ struct md_attr *ma = &info->mti_attr;
+ __u64 open_flags = info->mti_spec.sp_cr_flags;
+ ldlm_mode_t lm = LCK_CR;
+ bool try_layout = false;
+ bool create_layout = false;
+ int rc = 0;
+ ENTRY;
+
+ *ibits = 0;
+ if (open_flags & MDS_OPEN_LOCK) {
+ if (open_flags & FMODE_WRITE)
+ lm = LCK_CW;
+ else if (open_flags & MDS_FMODE_EXEC)
+ lm = LCK_PR;
+ else
+ lm = LCK_CR;
+
+ *ibits = MDS_INODELOCK_LOOKUP | MDS_INODELOCK_OPEN;
+ }
+
+ if (S_ISREG(lu_object_attr(&obj->mot_obj.mo_lu))) {
+ if (ma->ma_need & MA_LOV && !(ma->ma_valid & MA_LOV) &&
+ md_should_create(open_flags))
+ create_layout = true;
+ if (exp_connect_layout(info->mti_exp) && !create_layout &&
+ ma->ma_need & MA_LOV)
+ try_layout = true;
+ }
+
+ mdt_lock_handle_init(lhc);
+ mdt_lock_reg_init(lhc, lm);
+
+ /* one problem to return layout lock on open is that it may result
+ * in too many layout locks cached on the client side. */
+ if (!OBD_FAIL_CHECK(OBD_FAIL_MDS_NO_LL_OPEN) && try_layout) {
+ /* return lookup lock to validate inode at the client side,
+ * this is pretty important otherwise mdt will return layout
+ * lock for each open.
+ * However this is a double-edged sword because changing
+ * permission will revoke huge # of LOOKUP locks. */
+ *ibits |= MDS_INODELOCK_LAYOUT | MDS_INODELOCK_LOOKUP;
+ if (!mdt_object_lock_try(info, obj, lhc, *ibits,
+ MDT_CROSS_LOCK)) {
+ *ibits &= ~(MDS_INODELOCK_LAYOUT|MDS_INODELOCK_LOOKUP);
+ if (*ibits != 0)
+ rc = mdt_object_lock(info, obj, lhc, *ibits,
+ MDT_CROSS_LOCK);
+ }
+ } else if (*ibits != 0) {
+ rc = mdt_object_lock(info, obj, lhc, *ibits, MDT_CROSS_LOCK);
+ }
+
+ CDEBUG(D_INODE, "Requested bits lock:"DFID ", ibits = "LPX64
+ ", open_flags = "LPO64", try_layout = %d, rc = %d\n",
+ PFID(mdt_object_fid(obj)), *ibits, open_flags, try_layout, rc);
+
+ /* will change layout, revoke layout locks by enqueuing EX lock. */
+ if (rc == 0 && create_layout) {
+ struct mdt_lock_handle *ll = &info->mti_lh[MDT_LH_LAYOUT];
+
+ CDEBUG(D_INODE, "Will create layout, get EX layout lock:"DFID
+ ", open_flags = "LPO64"\n",
+ PFID(mdt_object_fid(obj)), open_flags);
+
+ LASSERT(!try_layout);
+ mdt_lock_handle_init(ll);
+ mdt_lock_reg_init(ll, LCK_EX);
+ rc = mdt_object_lock(info, obj, ll, MDS_INODELOCK_LAYOUT,
+ MDT_LOCAL_LOCK);
+
+ OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_LL_BLOCK, 2);
+ }
+
+ RETURN(rc);
+}
+
+static void mdt_object_open_unlock(struct mdt_thread_info *info,
+ struct mdt_object *obj,
+ struct mdt_lock_handle *lhc,
+ __u64 ibits, int rc)
+{
+ __u64 open_flags = info->mti_spec.sp_cr_flags;
+ struct mdt_lock_handle *ll = &info->mti_lh[MDT_LH_LAYOUT];
+
+ /* Release local layout lock - the layout lock put in MDT_LH_LAYOUT
+ * will never return to client side. */
+ if (lustre_handle_is_used(&ll->mlh_reg_lh)) {
+ LASSERT(!(ibits & MDS_INODELOCK_LAYOUT));
+ mdt_object_unlock(info, obj, ll, 1);
+ }
+
+ if (ibits == 0)
+ return;
+
+ if (!(open_flags & MDS_OPEN_LOCK) && !(ibits & MDS_INODELOCK_LAYOUT)) {
+ /* for the open request, the lock will only return to client
+ * if open or layout lock is granted. */
+ rc = 1;
+ }
+ if (rc != 0)
+ mdt_object_unlock(info, obj, lhc, 1);
+}
+
int mdt_open_by_fid_lock(struct mdt_thread_info *info, struct ldlm_reply *rep,
struct mdt_lock_handle *lhc)
{
struct mdt_object *parent= NULL;
struct mdt_object *o;
int rc;
- ldlm_mode_t lm;
+ __u64 ibits;
ENTRY;
if (md_should_create(flags) && !(flags & MDS_OPEN_HAS_EA)) {
mdt_set_disposition(info, rep, (DISP_IT_EXECD |
DISP_LOOKUP_EXECD));
- if (flags & FMODE_WRITE)
- lm = LCK_CW;
- else if (flags & MDS_FMODE_EXEC)
- lm = LCK_PR;
- else
- lm = LCK_CR;
-
- mdt_lock_handle_init(lhc);
- mdt_lock_reg_init(lhc, lm);
- rc = mdt_object_lock(info, o, lhc,
- MDS_INODELOCK_LOOKUP | MDS_INODELOCK_OPEN,
- MDT_CROSS_LOCK);
+ rc = mdt_attr_get_complex(info, o, ma);
if (rc)
GOTO(out, rc);
- rc = mdt_attr_get_complex(info, o, ma);
+ rc = mdt_object_open_lock(info, o, lhc, &ibits);
if (rc)
GOTO(out, rc);
}
rc = mdt_finish_open(info, parent, o, flags, 0, rep);
-
- if (!(flags & MDS_OPEN_LOCK) || rc)
- mdt_object_unlock(info, o, lhc, 1);
-
if (!rc) {
mdt_set_disposition(info, rep, DISP_LOOKUP_POS);
if (flags & MDS_OPEN_LOCK)
mdt_set_disposition(info, rep, DISP_OPEN_LOCK);
}
-
GOTO(out, rc);
+
out:
+ mdt_object_open_unlock(info, o, lhc, ibits, rc);
mdt_object_put(env, o);
if (parent != NULL)
mdt_object_put(env, parent);
struct lu_fid *child_fid = &info->mti_tmp_fid1;
struct md_attr *ma = &info->mti_attr;
__u64 create_flags = info->mti_spec.sp_cr_flags;
+ __u64 ibits;
struct mdt_reint_record *rr = &info->mti_rr;
struct lu_name *lname;
int result, rc;
LASSERT(!lustre_handle_is_used(&lhc->mlh_reg_lh));
- /* get openlock if this is not replay and if a client requested it */
- if (!req_is_replay(req) && create_flags & MDS_OPEN_LOCK) {
- ldlm_mode_t lm;
-
- if (create_flags & FMODE_WRITE)
- lm = LCK_CW;
- else if (create_flags & MDS_FMODE_EXEC)
- lm = LCK_PR;
- else
- lm = LCK_CR;
- mdt_lock_handle_init(lhc);
- mdt_lock_reg_init(lhc, lm);
- rc = mdt_object_lock(info, child, lhc,
- MDS_INODELOCK_LOOKUP | MDS_INODELOCK_OPEN,
- MDT_CROSS_LOCK);
- if (rc) {
- result = rc;
- GOTO(out_child, result);
- } else {
- result = -EREMOTE;
- mdt_set_disposition(info, ldlm_rep, DISP_OPEN_LOCK);
- }
- }
+ /* get openlock if this is not replay and if a client requested it */
+ if (!req_is_replay(req)) {
+ rc = mdt_object_open_lock(info, child, lhc, &ibits);
+ if (rc != 0) {
+ GOTO(out_child, result = rc);
+ } else if (create_flags & MDS_OPEN_LOCK) {
+ result = -EREMOTE;
+ mdt_set_disposition(info, ldlm_rep, DISP_OPEN_LOCK);
+ }
+ }
/* Try to open it now. */
rc = mdt_finish_open(info, parent, child, create_flags,
created, ldlm_rep);
if (rc) {
result = rc;
- if (lustre_handle_is_used(&lhc->mlh_reg_lh)) {
- /* openlock was acquired and mdt_finish_open failed -
- drop the openlock */
- mdt_object_unlock(info, child, lhc, 1);
- mdt_clear_disposition(info, ldlm_rep, DISP_OPEN_LOCK);
- }
+ /* openlock will be released if mdt_finish_open failed */
+ mdt_clear_disposition(info, ldlm_rep, DISP_OPEN_LOCK);
if (created) {
ma->ma_need = 0;
ma->ma_valid = 0;
}
EXIT;
out_child:
+ mdt_object_open_unlock(info, child, lhc, ibits,
+ result == -EREMOTE ? 0 : result);
mdt_object_put(info->mti_env, child);
out_parent:
mdt_object_unlock_put(info, parent, lh, result || !created);
&RMF_ACL
};
+static const struct req_msg_field *ldlm_intent_layout_client[] = {
+ &RMF_PTLRPC_BODY,
+ &RMF_DLM_REQ,
+ &RMF_LDLM_INTENT,
+ &RMF_LAYOUT_INTENT,
+ &RMF_EADATA /* for new layout to be set up */
+};
static const struct req_msg_field *ldlm_intent_open_server[] = {
&RMF_PTLRPC_BODY,
&RMF_DLM_REP,
&RQF_LDLM_GL_DESC_CALLBACK,
&RQF_LDLM_INTENT,
&RQF_LDLM_INTENT_BASIC,
+ &RQF_LDLM_INTENT_LAYOUT,
&RQF_LDLM_INTENT_GETATTR,
&RQF_LDLM_INTENT_OPEN,
&RQF_LDLM_INTENT_CREATE,
lustre_swab_lustre_capa, NULL);
EXPORT_SYMBOL(RMF_CAPA2);
+struct req_msg_field RMF_LAYOUT_INTENT =
+ DEFINE_MSGF("layout_intent", 0,
+ sizeof(struct layout_intent), lustre_swab_layout_intent,
+ NULL);
+EXPORT_SYMBOL(RMF_LAYOUT_INTENT);
+
/*
* OST request field.
*/
ldlm_intent_client, ldlm_intent_server);
EXPORT_SYMBOL(RQF_LDLM_INTENT);
+struct req_format RQF_LDLM_INTENT_LAYOUT =
+ DEFINE_REQ_FMT0("LDLM_INTENT_LAYOUT ",
+ ldlm_intent_layout_client, ldlm_enqueue_lvb_server);
+EXPORT_SYMBOL(RQF_LDLM_INTENT_LAYOUT);
+
struct req_format RQF_LDLM_INTENT_GETATTR =
DEFINE_REQ_FMT0("LDLM_INTENT_GETATTR",
ldlm_intent_getattr_client, ldlm_intent_getattr_server);
ost_get_fiemap_server);
EXPORT_SYMBOL(RQF_OST_GET_INFO_FIEMAP);
-
#if !defined(__REQ_LAYOUT_USER__)
/* Convenience macro */
}
EXPORT_SYMBOL(lustre_swab_hsm_progress);
-
+void lustre_swab_layout_intent(struct layout_intent *li)
+{
+ __swab32s(&li->li_opc);
+ __swab32s(&li->li_flags);
+ __swab64s(&li->li_start);
+ __swab64s(&li->li_end);
+}
+EXPORT_SYMBOL(lustre_swab_layout_intent);
(long long)(int)offsetof(struct hsm_user_state, hus_in_progress_location));
LASSERTF((int)sizeof(((struct hsm_user_state *)0)->hus_in_progress_location) == 16, "found %lld\n",
(long long)(int)sizeof(((struct hsm_user_state *)0)->hus_in_progress_location));
+
+ /* Checks for struct layout_intent */
+ LASSERTF((int)sizeof(struct layout_intent) == 24, "found %lld\n",
+ (long long)(int)sizeof(struct layout_intent));
+ LASSERTF((int)offsetof(struct layout_intent, li_opc) == 0, "found %lld\n",
+ (long long)(int)offsetof(struct layout_intent, li_opc));
+ LASSERTF((int)sizeof(((struct layout_intent *)0)->li_opc) == 4, "found %lld\n",
+ (long long)(int)sizeof(((struct layout_intent *)0)->li_opc));
+ LASSERTF((int)offsetof(struct layout_intent, li_flags) == 4, "found %lld\n",
+ (long long)(int)offsetof(struct layout_intent, li_flags));
+ LASSERTF((int)sizeof(((struct layout_intent *)0)->li_flags) == 4, "found %lld\n",
+ (long long)(int)sizeof(((struct layout_intent *)0)->li_flags));
+ LASSERTF((int)offsetof(struct layout_intent, li_start) == 8, "found %lld\n",
+ (long long)(int)offsetof(struct layout_intent, li_start));
+ LASSERTF((int)sizeof(((struct layout_intent *)0)->li_start) == 8, "found %lld\n",
+ (long long)(int)sizeof(((struct layout_intent *)0)->li_start));
+ LASSERTF((int)offsetof(struct layout_intent, li_end) == 16, "found %lld\n",
+ (long long)(int)offsetof(struct layout_intent, li_end));
+ LASSERTF((int)sizeof(((struct layout_intent *)0)->li_end) == 8, "found %lld\n",
+ (long long)(int)sizeof(((struct layout_intent *)0)->li_end));
+ LASSERTF(LAYOUT_INTENT_ACCESS == 0, "found %lld\n",
+ (long long)LAYOUT_INTENT_ACCESS);
+ LASSERTF(LAYOUT_INTENT_READ == 1, "found %lld\n",
+ (long long)LAYOUT_INTENT_READ);
+ LASSERTF(LAYOUT_INTENT_WRITE == 2, "found %lld\n",
+ (long long)LAYOUT_INTENT_WRITE);
+ LASSERTF(LAYOUT_INTENT_GLIMPSE == 3, "found %lld\n",
+ (long long)LAYOUT_INTENT_GLIMPSE);
+ LASSERTF(LAYOUT_INTENT_TRUNC == 4, "found %lld\n",
+ (long long)LAYOUT_INTENT_TRUNC);
+ LASSERTF(LAYOUT_INTENT_RELEASE == 5, "found %lld\n",
+ (long long)LAYOUT_INTENT_RELEASE);
+ LASSERTF(LAYOUT_INTENT_RESTORE == 6, "found %lld\n",
+ (long long)LAYOUT_INTENT_RESTORE);
}
#include <stdlib.h>
#include <unistd.h>
#include <semaphore.h>
+#include <time.h>
#include <lustre/lustreapi.h>
int save_errno;
int verbose = 0;
int gid = 0;
+ struct timespec ts;
if (argc < 3) {
fprintf(stderr, usage, argv[0]);
printf("PAUSING\n");
fflush(stdout);
}
- while (sem_wait(&sem) == -1 && errno == EINTR);
+ len = atoi(commands+1);
+ if (len <= 0)
+ len = 3600; /* 1 hour */
+ ts.tv_sec = time(NULL) + len;
+ ts.tv_nsec = 0;
+ while (sem_timedwait(&sem, &ts) < 0 && errno == EINTR);
break;
case 'c':
if (close(fd) == -1) {
}
run_test 206 "fail lov_init_raid0() doesn't lbug"
+test_207a() {
+ dd if=/dev/zero of=$DIR/$tfile bs=4k count=$((RANDOM%10+1))
+ local fsz=`stat -c %s $DIR/$tfile`
+ cancel_lru_locks mdc
+
+ # do not return layout in getattr intent
+#define OBD_FAIL_MDS_NO_LL_GETATTR 0x170
+ $LCTL set_param fail_loc=0x170
+ local sz=`stat -c %s $DIR/$tfile`
+
+ [ $fsz -eq $sz ] || error "file size expected $fsz, actual $sz"
+
+ rm -rf $DIR/$tfile
+}
+run_test 207a "can refresh layout at glimpse"
+
+test_207b() {
+ dd if=/dev/zero of=$DIR/$tfile bs=4k count=$((RANDOM%10+1))
+ local cksum=`md5sum $DIR/$tfile`
+ local fsz=`stat -c %s $DIR/$tfile`
+ cancel_lru_locks mdc
+ cancel_lru_locks osc
+
+ # do not return layout in getattr intent
+#define OBD_FAIL_MDS_NO_LL_OPEN 0x171
+ $LCTL set_param fail_loc=0x171
+
+ # it will refresh layout after the file is opened but before read issues
+ echo checksum is "$cksum"
+ echo "$cksum" |md5sum -c --quiet || error "file differs"
+
+ rm -rf $DIR/$tfile
+}
+run_test 207b "can refresh layout at open"
+
test_212() {
size=`date +%s`
size=$((size % 8192 + 1))
}
run_test 50 "osc lvb attrs: enqueue vs. CP AST =============="
+test_51a() {
+ local filesize
+ local origfile=/etc/hosts
+
+ filesize=`stat -c %s $origfile`
+
+ # create an empty file
+ $MCREATE $DIR1/$tfile
+ # cache layout lock on both mount point
+ stat $DIR1/$tfile > /dev/null
+ stat $DIR2/$tfile > /dev/null
+
+ # open and sleep 2 seconds then read
+ $MULTIOP $DIR2/$tfile o_2r${filesize}c &
+ local pid=$!
+ sleep 0.1
+
+ # create the layout of testing file
+ dd if=$origfile of=$DIR1/$tfile conv=notrunc > /dev/null
+
+ # MULTIOP proc should be able to read enough bytes and exit
+ sleep 2
+ kill -0 $pid && error "multiop is still there"
+ cmp $origfile $DIR2/$tfile || error "$MCREATE and $DIR2/$tfile differs"
+
+ rm -f $DIR1/$tfile
+}
+run_test 51a "layout lock: refresh layout should work"
+
+test_51b() {
+ local tmpfile=`mktemp`
+
+ # create an empty file
+ $MCREATE $DIR1/$tfile
+
+ # delay glimpse so that layout has changed when glimpse finish
+#define OBD_FAIL_GLIMPSE_DELAY 0x1404
+ $LCTL set_param fail_loc=0x1404
+ stat -c %s $DIR2/$tfile |tee $tmpfile &
+ local pid=$!
+ sleep 0.1
+
+ # create layout of testing file
+ dd if=/dev/zero of=$DIR1/$tfile bs=1k count=1 conv=notrunc > /dev/null
+
+ wait $pid
+ local fsize=`cat $tmpfile`
+
+ [ x$fsize = x1024 ] || error "file size is $fsize, should be 1024"
+
+ rm -f $DIR1/$tfile $tmpfile
+}
+run_test 51b "layout lock: glimpse should be able to restart if layout changed"
+
+test_51c() {
+ # create an empty file
+ $MCREATE $DIR1/$tfile
+
+#define OBD_FAIL_MDS_LL_BLOCK 0x172
+ $LCTL set_param fail_loc=0x172
+
+ # change the layout of testing file
+ echo "Setting layout ..."
+ $LFS setstripe -c $OSTCOUNT $DIR1/$tfile &
+ pid=$!
+ sleep 0.1
+
+ # get layout of this file should wait until dd is finished
+ local stripecnt=`$LFS getstripe -c $DIR2/$tfile`
+ [ $stripecnt -eq $OSTCOUNT ] || error "layout wrong"
+
+ rm -f $DIR1/$tfile
+}
+run_test 51c "layout lock: IT_LAYOUT blocked and correct layout can be returned"
+
test_60() {
[[ $(lustre_version_code $SINGLEMDS) -ge $(version_code 2.3.0) ]] ||
{ skip "Need MDS version at least 2.3.0"; return; }
#define lustre_swab_ldlm_request NULL
#define lustre_swab_ldlm_reply NULL
#define lustre_swab_ldlm_intent NULL
+#define lustre_swab_layout_intent NULL
/* #define lustre_swab_lov_mds_md NULL */
#define lustre_swab_mdt_rec_reint NULL
#define lustre_swab_lustre_capa NULL
CHECK_MEMBER(hsm_user_state, hus_in_progress_location);
}
+static void check_layout_intent(void)
+{
+ BLANK_LINE();
+ CHECK_STRUCT(layout_intent);
+ CHECK_MEMBER(layout_intent, li_opc);
+ CHECK_MEMBER(layout_intent, li_flags);
+ CHECK_MEMBER(layout_intent, li_start);
+ CHECK_MEMBER(layout_intent, li_end);
+
+ CHECK_VALUE(LAYOUT_INTENT_ACCESS);
+ CHECK_VALUE(LAYOUT_INTENT_READ);
+ CHECK_VALUE(LAYOUT_INTENT_WRITE);
+ CHECK_VALUE(LAYOUT_INTENT_GLIMPSE);
+ CHECK_VALUE(LAYOUT_INTENT_TRUNC);
+ CHECK_VALUE(LAYOUT_INTENT_RELEASE);
+ CHECK_VALUE(LAYOUT_INTENT_RESTORE);
+}
+
static void
system_string (char *cmdline, char *str, int len)
{
check_hsm_user_item();
check_hsm_user_request();
check_hsm_user_state();
+ check_layout_intent();
printf("}\n\n");
(long long)(int)offsetof(struct hsm_user_state, hus_in_progress_location));
LASSERTF((int)sizeof(((struct hsm_user_state *)0)->hus_in_progress_location) == 16, "found %lld\n",
(long long)(int)sizeof(((struct hsm_user_state *)0)->hus_in_progress_location));
+
+ /* Checks for struct layout_intent */
+ LASSERTF((int)sizeof(struct layout_intent) == 24, "found %lld\n",
+ (long long)(int)sizeof(struct layout_intent));
+ LASSERTF((int)offsetof(struct layout_intent, li_opc) == 0, "found %lld\n",
+ (long long)(int)offsetof(struct layout_intent, li_opc));
+ LASSERTF((int)sizeof(((struct layout_intent *)0)->li_opc) == 4, "found %lld\n",
+ (long long)(int)sizeof(((struct layout_intent *)0)->li_opc));
+ LASSERTF((int)offsetof(struct layout_intent, li_flags) == 4, "found %lld\n",
+ (long long)(int)offsetof(struct layout_intent, li_flags));
+ LASSERTF((int)sizeof(((struct layout_intent *)0)->li_flags) == 4, "found %lld\n",
+ (long long)(int)sizeof(((struct layout_intent *)0)->li_flags));
+ LASSERTF((int)offsetof(struct layout_intent, li_start) == 8, "found %lld\n",
+ (long long)(int)offsetof(struct layout_intent, li_start));
+ LASSERTF((int)sizeof(((struct layout_intent *)0)->li_start) == 8, "found %lld\n",
+ (long long)(int)sizeof(((struct layout_intent *)0)->li_start));
+ LASSERTF((int)offsetof(struct layout_intent, li_end) == 16, "found %lld\n",
+ (long long)(int)offsetof(struct layout_intent, li_end));
+ LASSERTF((int)sizeof(((struct layout_intent *)0)->li_end) == 8, "found %lld\n",
+ (long long)(int)sizeof(((struct layout_intent *)0)->li_end));
+ LASSERTF(LAYOUT_INTENT_ACCESS == 0, "found %lld\n",
+ (long long)LAYOUT_INTENT_ACCESS);
+ LASSERTF(LAYOUT_INTENT_READ == 1, "found %lld\n",
+ (long long)LAYOUT_INTENT_READ);
+ LASSERTF(LAYOUT_INTENT_WRITE == 2, "found %lld\n",
+ (long long)LAYOUT_INTENT_WRITE);
+ LASSERTF(LAYOUT_INTENT_GLIMPSE == 3, "found %lld\n",
+ (long long)LAYOUT_INTENT_GLIMPSE);
+ LASSERTF(LAYOUT_INTENT_TRUNC == 4, "found %lld\n",
+ (long long)LAYOUT_INTENT_TRUNC);
+ LASSERTF(LAYOUT_INTENT_RELEASE == 5, "found %lld\n",
+ (long long)LAYOUT_INTENT_RELEASE);
+ LASSERTF(LAYOUT_INTENT_RESTORE == 6, "found %lld\n",
+ (long long)LAYOUT_INTENT_RESTORE);
}