unsigned long l_callback_timeout;
__u32 l_pid; /* pid which created this lock */
+
+ struct list_head l_tmp;
};
#define LDLM_PLAIN 10
struct llog_cookie oti_onecookie;
struct llog_cookie *oti_logcookies;
int oti_numcookies;
+ int oti_async;
};
static inline void oti_alloc_cookies(struct obd_trans_info *oti,int num_cookies)
INIT_LIST_HEAD(&lock->l_lru);
INIT_LIST_HEAD(&lock->l_export_chain);
INIT_LIST_HEAD(&lock->l_pending_chain);
+ INIT_LIST_HEAD(&lock->l_tmp);
init_waitqueue_head(&lock->l_waitq);
spin_lock(&resource->lr_namespace->ns_counter_lock);
* callback will be performed in this function. */
int ldlm_cancel_lru(struct ldlm_namespace *ns, ldlm_sync_t sync)
{
- struct list_head *tmp, *next;
- struct ldlm_lock *lock;
+ struct ldlm_lock *lock, *next;
int count, rc = 0;
LIST_HEAD(cblist);
ENTRY;
RETURN(0);
}
- list_for_each_safe(tmp, next, &ns->ns_unused_list) {
-
- lock = list_entry(tmp, struct ldlm_lock, l_lru);
-
+ list_for_each_entry_safe(lock, next, &ns->ns_unused_list, l_lru) {
LASSERT(!lock->l_readers && !lock->l_writers);
/* Setting the CBPENDING flag is a little misleading, but
LDLM_LOCK_GET(lock); /* dropped by bl thread */
ldlm_lock_remove_from_lru(lock);
+
+ /* We can't re-add to l_lru as it confuses the refcounting in
+ * ldlm_lock_remove_from_lru() if an AST arrives after we drop
+ * ns_lock below. We use l_tmp and can't use l_pending_chain as
+ * it is used both on server and client nevertheles bug 5666
+ * says it is used only on server. --umka */
if (sync != LDLM_ASYNC || ldlm_bl_to_thread(ns, NULL, lock))
- list_add(&lock->l_lru, &cblist);
+ list_add(&lock->l_tmp, &cblist);
if (--count == 0)
break;
}
l_unlock(&ns->ns_lock);
- list_for_each_safe(tmp, next, &cblist) {
- lock = list_entry(tmp, struct ldlm_lock, l_lru);
- list_del_init(&lock->l_lru);
+ list_for_each_entry_safe(lock, next, &cblist, l_tmp) {
+ list_del_init(&lock->l_tmp);
ldlm_handle_bl_callback(ns, NULL, lock);
}
+
RETURN(rc);
}
ns->ns_max_unused = 0;
ldlm_cancel_lru(ns, LDLM_SYNC);
ns->ns_max_unused = tmp;
- return count;
+ return count;
}
tmp = simple_strtoul(dummy, &end, 0);
CDEBUG(D_DLMTRACE, "changing namespace %s max_unused from %u to %u\n",
ns->ns_name, ns->ns_max_unused, (unsigned int)tmp);
- ns->ns_max_unused = (unsigned int)tmp;
+ ns->ns_max_unused = (unsigned int)tmp;
ldlm_cancel_lru(ns, LDLM_ASYNC);
-
return count;
}
(unsigned long)inode->i_ino, rc);
}
- /* objects are destroed on OST only if metadata close was
- * successful.*/
- if (rc == 0) {
- rc = ll_objects_destroy(req, inode, 1);
- if (rc)
- CERROR("inode %lu ll_objects destroy: rc = %d\n",
- inode->i_ino, rc);
- }
-
ptlrpc_req_finished(req);
EXIT;
out:
RETURN(rc);
}
-int ll_objects_destroy(struct ptlrpc_request *request,
- struct inode *dir, int offset)
-{
- struct mds_body *body;
- struct lov_mds_md *eadata;
- struct lov_stripe_md *lsm = NULL;
- struct obd_trans_info oti = { 0 };
- struct obdo *oa;
- int rc;
- ENTRY;
-
- /* req is swabbed so this is safe */
- body = lustre_msg_buf(request->rq_repmsg, 0, sizeof(*body));
-
- if (!(body->valid & OBD_MD_FLEASIZE))
- RETURN(0);
-
- if (body->eadatasize == 0) {
- CERROR("OBD_MD_FLEASIZE set but eadatasize zero\n");
- GOTO(out, rc = -EPROTO);
- }
-
- /*
- * the MDS sent back the EA because we unlinked the last reference to
- * this file. Use this EA to unlink the objects on the OST. It's opaque
- * so we don't swab here; we leave it to obd_unpackmd() to check it is
- * complete and sensible.
- */
- eadata = lustre_swab_repbuf(request, 1, body->eadatasize, NULL);
- LASSERT(eadata != NULL);
- if (eadata == NULL) {
- CERROR("Can't unpack MDS EA data\n");
- GOTO(out, rc = -EPROTO);
- }
-
- rc = obd_unpackmd(ll_i2dtexp(dir), &lsm, eadata, body->eadatasize);
- if (rc < 0) {
- CERROR("obd_unpackmd: %d\n", rc);
- GOTO(out, rc);
- }
- LASSERT(rc >= sizeof(*lsm));
-
- oa = obdo_alloc();
- if (oa == NULL)
- GOTO(out_free_memmd, rc = -ENOMEM);
-
- oa->o_id = lsm->lsm_object_id;
- oa->o_gr = lsm->lsm_object_gr;
- oa->o_mode = body->mode & S_IFMT;
- oa->o_valid = OBD_MD_FLID | OBD_MD_FLTYPE | OBD_MD_FLGROUP;
-
- if (body->valid & OBD_MD_FLCOOKIE) {
- int length = sizeof(struct llog_cookie) *
- lsm->lsm_stripe_count;
- oa->o_valid |= OBD_MD_FLCOOKIE;
- oti.oti_logcookies =
- lustre_msg_buf(request->rq_repmsg, 2, length);
- if (oti.oti_logcookies == NULL) {
- oa->o_valid &= ~OBD_MD_FLCOOKIE;
- body->valid &= ~OBD_MD_FLCOOKIE;
- } else {
- /* copy llog cookies to request to replay unlink
- * so that the same llog file and records as those created
- * during fail can be re-created while doing replay
- */
- if (offset >= 0)
- memcpy(lustre_msg_buf(request->rq_reqmsg, offset, 0),
- oti.oti_logcookies, length);
- }
- }
-
- rc = obd_destroy(ll_i2dtexp(dir), oa, lsm, &oti);
- obdo_free(oa);
- if (rc)
- CERROR("obd destroy objid "LPX64" error %d\n",
- lsm->lsm_object_id, rc);
- EXIT;
- out_free_memmd:
- obd_free_memmd(ll_i2dtexp(dir), &lsm);
- out:
- return rc;
-}
-
static int ll_unlink_raw(struct nameidata *nd)
{
struct inode *dir = nd->dentry->d_inode;
if (rc)
GOTO(out, rc);
ll_update_times(request, 0, dir);
-
- rc = ll_objects_destroy(request, dir, 2);
EXIT;
out:
ptlrpc_req_finished(request);
if (!err) {
ll_update_times(request, 0, src);
ll_update_times(request, 0, tgt);
- err = ll_objects_destroy(request, src, 3);
}
ptlrpc_req_finished(request);
LUSTRE_IT(it)->it_lock_mode);
}
-int lmv_handle_remote_inode(struct obd_export *exp, void *lmm,
- int lmmsize, struct lookup_intent *it,
- int flags, struct ptlrpc_request **reqp,
- ldlm_blocking_callback cb_blocking)
+int lmv_intent_remote(struct obd_export *exp, void *lmm,
+ int lmmsize, struct lookup_intent *it,
+ int flags, struct ptlrpc_request **reqp,
+ ldlm_blocking_callback cb_blocking)
{
struct obd_device *obd = exp->exp_obd;
struct lmv_obd *lmv = &obd->u.lmv;
+ struct ptlrpc_request *req = NULL;
struct mds_body *body = NULL;
- int rc = 0;
+ struct lustre_handle plock;
+ struct lustre_id nid;
+ int pmode, rc = 0;
ENTRY;
body = lustre_msg_buf((*reqp)->rq_repmsg, 1, sizeof(*body));
LASSERT(body != NULL);
- if (body->valid & OBD_MD_MDS) {
+ if (!(body->valid & OBD_MD_MDS))
+ RETURN(0);
+
+ /*
+ * oh, MDS reports that this is remote inode case i.e. we have to ask
+ * for real attrs on another MDS.
+ */
+ if (it->it_op == IT_LOOKUP || it->it_op == IT_CHDIR) {
/*
- * oh, MDS reports that this is remote inode case i.e. we have
- * to ask for real attrs on another MDS.
+ * unfortunately, we have to lie to MDC/MDS to retrieve
+ * attributes llite needs.
*/
- struct ptlrpc_request *req = NULL;
- struct lustre_handle plock;
- struct lustre_id nid;
- int pmode;
-
- if (it->it_op == IT_LOOKUP || it->it_op == IT_CHDIR) {
- /*
- * unfortunately, we have to lie to MDC/MDS to retrieve
- * attributes llite needs.
- */
- it->it_op = IT_GETATTR;
- }
+ it->it_op = IT_GETATTR;
+ }
- /* we got LOOKUP lock, but we really need attrs */
- pmode = LUSTRE_IT(it)->it_lock_mode;
- if (pmode) {
- memcpy(&plock, &LUSTRE_IT(it)->it_lock_handle,
- sizeof(plock));
- LUSTRE_IT(it)->it_lock_mode = 0;
- }
+ /* we got LOOKUP lock, but we really need attrs */
+ pmode = LUSTRE_IT(it)->it_lock_mode;
+ if (pmode) {
+ memcpy(&plock, &LUSTRE_IT(it)->it_lock_handle,
+ sizeof(plock));
+ LUSTRE_IT(it)->it_lock_mode = 0;
+ LUSTRE_IT(it)->it_data = 0;
+ }
- LASSERT((body->valid & OBD_MD_FID) != 0);
+ LASSERT((body->valid & OBD_MD_FID) != 0);
- nid = body->id1;
- LUSTRE_IT(it)->it_disposition &= ~DISP_ENQ_COMPLETE;
- rc = md_intent_lock(lmv->tgts[id_group(&nid)].ltd_exp, &nid, NULL,
- 0, lmm, lmmsize, NULL, it, flags, &req, cb_blocking);
+ nid = body->id1;
+ LUSTRE_IT(it)->it_disposition &= ~DISP_ENQ_COMPLETE;
+ rc = md_intent_lock(lmv->tgts[id_group(&nid)].ltd_exp, &nid,
+ NULL, 0, lmm, lmmsize, NULL, it, flags,
+ &req, cb_blocking);
- /*
- * llite needs LOOKUP lock to track dentry revocation in order
- * to maintain dcache consistency. Thus drop UPDATE lock here
- * and put LOOKUP in request.
- */
- if (rc == 0) {
- lmv_drop_intent_lock(it);
- memcpy(&LUSTRE_IT(it)->it_lock_handle, &plock,
- sizeof(plock));
- LUSTRE_IT(it)->it_lock_mode = pmode;
- } else if (pmode)
- ldlm_lock_decref(&plock, pmode);
-
- ptlrpc_req_finished(*reqp);
- *reqp = req;
+ /*
+ * llite needs LOOKUP lock to track dentry revocation in order to
+ * maintain dcache consistency. Thus drop UPDATE lock here and put
+ * LOOKUP in request.
+ */
+ if (rc == 0) {
+ lmv_drop_intent_lock(it);
+ memcpy(&LUSTRE_IT(it)->it_lock_handle, &plock,
+ sizeof(plock));
+ LUSTRE_IT(it)->it_lock_mode = pmode;
+ } else if (pmode) {
+ ldlm_lock_decref(&plock, pmode);
}
+
+ ptlrpc_req_finished(*reqp);
+ *reqp = req;
RETURN(rc);
}
/* okay, MDS has returned success. Probably name has been resolved in
* remote inode */
- rc = lmv_handle_remote_inode(exp, lmm, lmmsize, it,
- flags, reqp, cb_blocking);
+ rc = lmv_intent_remote(exp, lmm, lmmsize, it, flags, reqp, cb_blocking);
if (rc != 0) {
LASSERT(rc < 0);
/*
* this is possible, that some userspace application will try to
- * open file as directory and we will have error -20 here. As
+ * open file as directory and we will have -ENOTDIR here. As
* this is "usual" situation, we should not print error here,
* only debug info.
*/
* nothing is found, do not access body->id1 as it is zero and thus
* pointless.
*/
- if (LUSTRE_IT(it)->it_disposition & DISP_LOOKUP_NEG)
+ if ((LUSTRE_IT(it)->it_disposition & DISP_LOOKUP_NEG) &&
+ !(LUSTRE_IT(it)->it_disposition & DISP_OPEN_CREATE) &&
+ !(LUSTRE_IT(it)->it_disposition & DISP_OPEN_OPEN))
RETURN(0);
/* caller may use attrs MDS returns on IT_OPEN lock request so, we have
* to update them for splitted dir */
body = lustre_msg_buf((*reqp)->rq_repmsg, 1, sizeof(*body));
LASSERT(body != NULL);
- LASSERT((body->valid & OBD_MD_FID) != 0);
+
+ /* could not find object, FID is not present in response. */
+ if (!(body->valid & OBD_MD_FID))
+ RETURN(0);
cid = &body->id1;
obj = lmv_grab_obj(obd, cid);
/* okay, MDS has returned success. probably name has been
* resolved in remote inode */
- rc = lmv_handle_remote_inode(exp, lmm, lmmsize, it,
- flags, reqp, cb_blocking);
+ rc = lmv_intent_remote(exp, lmm, lmmsize, it, flags,
+ reqp, cb_blocking);
if (rc < 0)
RETURN(rc);
body = lustre_msg_buf((*reqp)->rq_repmsg, 1, sizeof(*body));
LASSERT(body != NULL);
- LASSERT((body->valid & OBD_MD_FID) != 0);
+
+ /* could not find object, FID is not present in response. */
+ if (!(body->valid & OBD_MD_FID))
+ RETURN(0);
cid = &body->id1;
obj2 = lmv_grab_obj(obd, cid);
/* okay, MDS has returned success. Probably name has been resolved in
* remote inode. */
- rc = lmv_handle_remote_inode(exp, lmm, lmmsize, it,
- flags, reqp, cb_blocking);
+ rc = lmv_intent_remote(exp, lmm, lmmsize, it, flags, reqp, cb_blocking);
if (rc == 0 && (mea = lmv_splitted_dir_body(*reqp, 1))) {
/* wow! this is splitted dir, we'd like to handle it */
/* mds/mds_unlink_open.c */
int mds_cleanup_orphans(struct obd_device *obd);
+int mds_unlink_object(struct mds_obd *mds, struct inode *inode,
+ struct lov_mds_md *lmm, int lmm_size,
+ struct llog_cookie *logcookies,
+ int log_unlink, int async);
+
/* mds/mds_log.c */
int mds_log_op_unlink(struct obd_device *obd, struct inode *inode,
struct lustre_msg *msg, int offset);
/* mds/mds_open.c */
+int mds_destroy_objects(struct obd_device *obd,
+ struct inode *inode, int async);
int mds_query_write_access(struct inode *inode);
int mds_open(struct mds_update_record *rec, int offset,
struct ptlrpc_request *req, struct lustre_handle *);
RETURN(rc);
}
+int
+mds_destroy_objects(struct obd_device *obd,
+ struct inode *inode, int async)
+{
+ struct mds_obd *mds = &obd->u.mds;
+ struct lov_mds_md *lmm = NULL;
+ int rc, lmm_size;
+ ENTRY;
+
+ LASSERT(inode != NULL);
+
+ if (inode->i_nlink != 0) {
+ CWARN("attempt to destroy OSS object when "
+ "i_nlink == %d\n", (int)inode->i_nlink);
+ RETURN(0);
+ }
+
+ OBD_ALLOC(lmm, mds->mds_max_mdsize);
+ if (lmm == NULL)
+ RETURN(-ENOMEM);
+
+ lmm_size = mds->mds_max_mdsize;
+ rc = mds_get_md(obd, inode, lmm, &lmm_size, 1, 0);
+ if (rc < 0) {
+ CERROR("no stripe info for %lu/%lu inode\n",
+ (unsigned long)inode->i_ino,
+ (unsigned long)inode->i_generation);
+ GOTO(out_free_lmm, rc);
+ }
+
+ if (rc > 0) {
+ /* asynchronously unlink objecect on OSS */
+ rc = mds_unlink_object(mds, inode, lmm, lmm_size,
+ NULL, 0, async);
+ if (rc) {
+ CERROR("error unlinking object on OSS, "
+ "err %d\n", rc);
+ GOTO(out_free_lmm, rc);
+ }
+ } else {
+ CDEBUG(D_INODE, "no stripping info found for inode "
+ "%lu/%lu\n", (unsigned long)inode->i_ino,
+ (unsigned long)inode->i_generation);
+ }
+ EXIT;
+out_free_lmm:
+ OBD_FREE(lmm, mds->mds_max_mdsize);
+ return rc;
+}
+
static void reconstruct_open(struct mds_update_record *rec, int offset,
struct ptlrpc_request *req,
struct lustre_handle *child_lockh)
req->rq_repmsg->buflens[2], &lcl) > 0) {
reply_body->valid |= OBD_MD_FLCOOKIE;
}
+
+ rc = mds_destroy_objects(obd, inode, 1);
+ if (rc) {
+ CERROR("cannot destroy OSS object on close, err %d\n",
+ rc);
+ rc = 0;
+ }
goto out; /* Don't bother updating attrs on unlinked inode */
}
if (req->rq_export->exp_failed) {
CERROR("committing transaction for disconnected client\n");
if (handle)
- GOTO(out_commit, rc);
+ GOTO(out_commit, rc = -EIO);
RETURN(rc);
}
mds_req_from_mcd(req, med->med_mcd);
}
-/* If we are unlinking an open file/dir (i.e. creating an orphan) then
- * we instead link the inode into the PENDING directory until it is
- * finally released. We can't simply call mds_reint_rename() or some
- * part thereof, because we don't have the inode to check for link
- * count/open status until after it is locked.
+/* If we are unlinking an open file/dir (i.e. creating an orphan) then we
+ * instead link the inode into the PENDING directory until it is finally
+ * released. We can't simply call mds_reint_rename() or some part thereof,
+ * because we don't have the inode to check for link count/open status until
+ * after it is locked.
*
* For lock ordering, caller must get child->i_sem first, then pending->i_sem
* before starting journal transaction.
GOTO(out_dput, rc = 0);
}
- /* link() is semanticaly-wrong for S_IFDIR, so we set S_IFREG
- * for linking and return real mode back then -bzzz */
+ /*
+ * link() is semanticaly-wrong for S_IFDIR, so we set S_IFREG for
+ * linking and return real mode back then -bzzz
+ */
mode = inode->i_mode;
inode->i_mode = S_IFREG;
rc = vfs_link(dentry, pending_dir, pending_child);
mark_inode_dirty(pending_dir);
}
- EXIT;
+ GOTO(out_dput, rc = 1);
out_dput:
l_dput(pending_child);
return rc;
body->valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS |
OBD_MD_FLATIME | OBD_MD_FLMTIME);
} else if (mds_log_op_unlink(obd, child_inode,
- lustre_msg_buf(req->rq_repmsg, offset + 1, 0),
- req->rq_repmsg->buflens[offset + 1],
- lustre_msg_buf(req->rq_repmsg, offset + 2, 0),
- req->rq_repmsg->buflens[offset+2],
- &lcl) > 0){
+ lustre_msg_buf(req->rq_repmsg, offset + 1, 0),
+ req->rq_repmsg->buflens[offset + 1],
+ lustre_msg_buf(req->rq_repmsg, offset + 2, 0),
+ req->rq_repmsg->buflens[offset + 2],
+ &lcl) > 0){
body->valid |= OBD_MD_FLCOOKIE;
}
+
+ rc = mds_destroy_objects(obd, child_inode, 1);
+ if (rc) {
+ CERROR("can't remove OST object, err %d\n",
+ rc);
+ }
}
GOTO(cleanup, rc);
&lcl) > 0) {
body->valid |= OBD_MD_FLCOOKIE;
}
+
+ rc = mds_destroy_objects(obd, old_inode, 1);
+ if (rc) {
+ CERROR("can't remove OST object, err %d\n",
+ rc);
+ }
}
EXIT;
#include "mds_internal.h"
-static int mds_osc_destroy_orphan(struct mds_obd *mds,
- struct inode *inode,
- struct lov_mds_md *lmm,
- int lmm_size,
- struct llog_cookie *logcookies,
- int log_unlink)
+/*
+ * used when destroying orphanes and from mds_reint_unlink() when MDS wants to
+ * destroy objects on OSS.
+ */
+int
+mds_unlink_object(struct mds_obd *mds, struct inode *inode,
+ struct lov_mds_md *lmm, int lmm_size,
+ struct llog_cookie *logcookies,
+ int log_unlink, int async)
{
struct lov_stripe_md *lsm = NULL;
struct obd_trans_info oti = { 0 };
oti.oti_logcookies = logcookies;
}
+ CDEBUG(D_INODE, "destroy OSS object %d/%d\n",
+ (int)oa->o_id, (int)oa->o_gr);
+
+ oti.oti_async = async;
rc = obd_destroy(mds->mds_dt_exp, oa, lsm, &oti);
obdo_free(oa);
- if (rc)
- CDEBUG(D_INODE, "destroy orphan objid 0x"LPX64" on ost error "
- "%d\n", lsm->lsm_object_id, rc);
out_free_memmd:
obd_free_memmd(mds->mds_dt_exp, &lsm);
RETURN(rc);
if (!rc)
rc = err;
} else if (!rc) {
- rc = mds_osc_destroy_orphan(mds, inode, lmm, lmm_size,
- logcookies, log_unlink);
+ rc = mds_unlink_object(mds, inode, lmm, lmm_size,
+ logcookies, log_unlink, 0);
}
if (logcookies != NULL)
memcpy(&body->oa, oa, sizeof(*oa));
request->rq_replen = lustre_msg_size(1, &size);
- rc = ptlrpc_queue_wait(request);
-
- if (rc == -ENOENT)
+ if (oti != NULL && oti->oti_async) {
+ /* asynchrounous destroy */
+ ptlrpcd_add_req(request);
rc = 0;
- if (rc)
- GOTO(out, rc);
+ } else {
+ rc = ptlrpc_queue_wait(request);
+
+ if (rc == -ENOENT)
+ rc = 0;
- body = lustre_swab_repbuf(request, 0, sizeof(*body),
- lustre_swab_ost_body);
- if (body == NULL) {
- CERROR ("Can't unpack body\n");
- GOTO (out, rc = -EPROTO);
- }
+ if (rc) {
+ ptlrpc_req_finished(request);
+ RETURN(rc);
+ }
- memcpy(oa, &body->oa, sizeof(*oa));
+ body = lustre_swab_repbuf(request, 0, sizeof(*body),
+ lustre_swab_ost_body);
+ if (body == NULL) {
+ CERROR ("Can't unpack body\n");
+ ptlrpc_req_finished(request);
+ RETURN(-EPROTO);
+ }
- EXIT;
- out:
- ptlrpc_req_finished(request);
- return rc;
+ memcpy(oa, &body->oa, sizeof(*oa));
+ ptlrpc_req_finished(request);
+ }
+ RETURN(rc);
}
static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
/* Store transno in reqmsg for replay. */
req->rq_reqmsg->transno = req->rq_transno = req->rq_repmsg->transno;
-
if (req->rq_import->imp_replayable) {
spin_lock_irqsave(&imp->imp_lock, flags);
if (req->rq_transno != 0)
pid1=$!
multiop $DIR/$tfile O_c &
pid2=$!
- # give multiop a chance to open
- sleep 1
+ # give multiop a chance to open.
+ # 1 second is not enough, I increased it to 5, however in ideal word
+ # I should have to wait for open finish in more smart manner. --umka
+ sleep 5
mds_evict_client
df $MOUNT || sleep 1 && df $MOUNT || return 1
kill -USR1 $pid1
kill -USR1 $pid2
- sleep 1
+ sleep 5
return 0
}
run_test 32 "close() notices client eviction; close() after client eviction"