+/* Fetch layout from MDT with getxattr request, if it's not ready yet */
+static int ll_layout_fetch(struct inode *inode, struct ldlm_lock *lock)
+
+{
+ struct ll_sb_info *sbi = ll_i2sbi(inode);
+ struct obd_capa *oc;
+ struct ptlrpc_request *req;
+ struct mdt_body *body;
+ void *lvbdata;
+ void *lmm;
+ int lmmsize;
+ int rc;
+ ENTRY;
+
+ if (lock->l_lvb_data != NULL)
+ RETURN(0);
+
+ /* if layout lock was granted right away, the layout is returned
+ * within DLM_LVB of dlm reply; otherwise if the lock was ever
+ * blocked and then granted via completion ast, we have to fetch
+ * layout here. Please note that we can't use the LVB buffer in
+ * completion AST because it doesn't have a large enough buffer */
+ oc = ll_mdscapa_get(inode);
+ rc = ll_get_max_mdsize(sbi, &lmmsize);
+ if (rc == 0)
+ rc = md_getxattr(sbi->ll_md_exp, ll_inode2fid(inode), oc,
+ OBD_MD_FLXATTR, XATTR_NAME_LOV, NULL, 0,
+ lmmsize, 0, &req);
+ capa_put(oc);
+ if (rc < 0)
+ RETURN(rc);
+
+ body = req_capsule_server_get(&req->rq_pill, &RMF_MDT_BODY);
+ if (body == NULL || body->eadatasize > lmmsize)
+ GOTO(out, rc = -EPROTO);
+
+ lmmsize = body->eadatasize;
+ if (lmmsize == 0) /* empty layout */
+ GOTO(out, rc = 0);
+
+ lmm = req_capsule_server_sized_get(&req->rq_pill, &RMF_EADATA, lmmsize);
+ if (lmm == NULL)
+ GOTO(out, rc = -EFAULT);
+
+ OBD_ALLOC_LARGE(lvbdata, lmmsize);
+ if (lvbdata == NULL)
+ GOTO(out, rc = -ENOMEM);
+
+ memcpy(lvbdata, lmm, lmmsize);
+ lock_res_and_lock(lock);
+ if (lock->l_lvb_data == NULL) {
+ lock->l_lvb_data = lvbdata;
+ lock->l_lvb_len = lmmsize;
+ lvbdata = NULL;
+ }
+ unlock_res_and_lock(lock);
+
+ if (lvbdata != NULL)
+ OBD_FREE_LARGE(lvbdata, lmmsize);
+ EXIT;
+
+out:
+ ptlrpc_req_finished(req);
+ return rc;
+}
+
+/**
+ * Apply the layout to the inode. Layout lock is held and will be released
+ * in this function.
+ */
+static int ll_layout_lock_set(struct lustre_handle *lockh, ldlm_mode_t mode,
+ struct inode *inode, __u32 *gen, bool reconf)
+{
+ struct ll_inode_info *lli = ll_i2info(inode);
+ struct ll_sb_info *sbi = ll_i2sbi(inode);
+ struct ldlm_lock *lock;
+ struct lustre_md md = { NULL };
+ struct cl_object_conf conf;
+ int rc = 0;
+ bool lvb_ready;
+ bool wait_layout = false;
+ ENTRY;
+
+ LASSERT(lustre_handle_is_used(lockh));
+
+ lock = ldlm_handle2lock(lockh);
+ LASSERT(lock != NULL);
+ LASSERT(ldlm_has_layout(lock));
+
+ LDLM_DEBUG(lock, "File %p/"DFID" being reconfigured: %d.\n",
+ inode, PFID(&lli->lli_fid), reconf);
+
+ /* in case this is a caching lock and reinstate with new inode */
+ md_set_lock_data(sbi->ll_md_exp, &lockh->cookie, inode, NULL);
+
+ lock_res_and_lock(lock);
+ lvb_ready = !!(lock->l_flags & LDLM_FL_LVB_READY);
+ unlock_res_and_lock(lock);
+ /* checking lvb_ready is racy but this is okay. The worst case is
+ * that multi processes may configure the file on the same time. */
+ if (lvb_ready || !reconf) {
+ rc = -ENODATA;
+ if (lvb_ready) {
+ /* layout_gen must be valid if layout lock is not
+ * cancelled and stripe has already set */
+ *gen = lli->lli_layout_gen;
+ rc = 0;
+ }
+ GOTO(out, rc);
+ }
+
+ rc = ll_layout_fetch(inode, lock);
+ if (rc < 0)
+ GOTO(out, rc);
+
+ /* for layout lock, lmm is returned in lock's lvb.
+ * lvb_data is immutable if the lock is held so it's safe to access it
+ * without res lock. See the description in ldlm_lock_decref_internal()
+ * for the condition to free lvb_data of layout lock */
+ if (lock->l_lvb_data != NULL) {
+ rc = obd_unpackmd(sbi->ll_dt_exp, &md.lsm,
+ lock->l_lvb_data, lock->l_lvb_len);
+ if (rc >= 0) {
+ *gen = LL_LAYOUT_GEN_EMPTY;
+ if (md.lsm != NULL)
+ *gen = md.lsm->lsm_layout_gen;
+ rc = 0;
+ } else {
+ CERROR("%s: file "DFID" unpackmd error: %d\n",
+ ll_get_fsname(inode->i_sb, NULL, 0),
+ PFID(&lli->lli_fid), rc);
+ }
+ }
+ if (rc < 0)
+ GOTO(out, rc);
+
+ /* set layout to file. Unlikely this will fail as old layout was
+ * surely eliminated */
+ memset(&conf, 0, sizeof conf);
+ conf.coc_opc = OBJECT_CONF_SET;
+ conf.coc_inode = inode;
+ conf.coc_lock = lock;
+ conf.u.coc_md = &md;
+ rc = ll_layout_conf(inode, &conf);
+
+ if (md.lsm != NULL)
+ obd_free_memmd(sbi->ll_dt_exp, &md.lsm);
+
+ /* refresh layout failed, need to wait */
+ wait_layout = rc == -EBUSY;
+ EXIT;
+
+out:
+ LDLM_LOCK_PUT(lock);
+ ldlm_lock_decref(lockh, mode);
+
+ /* wait for IO to complete if it's still being used. */
+ if (wait_layout) {
+ CDEBUG(D_INODE, "%s: %p/"DFID" wait for layout reconf.\n",
+ ll_get_fsname(inode->i_sb, NULL, 0),
+ inode, PFID(&lli->lli_fid));
+
+ memset(&conf, 0, sizeof conf);
+ conf.coc_opc = OBJECT_CONF_WAIT;
+ conf.coc_inode = inode;
+ rc = ll_layout_conf(inode, &conf);
+ if (rc == 0)
+ rc = -EAGAIN;
+
+ CDEBUG(D_INODE, "file: "DFID" waiting layout return: %d.\n",
+ PFID(&lli->lli_fid), rc);
+ }
+ RETURN(rc);
+}
+