add a refcount to md_open_data to control its usage in open-close and setattr-done-writing
i=green
i=tappro
rq_packed_final:1, /* packed final reply */
rq_hp:1, /* high priority RPC */
rq_at_linked:1, /* link into service's srv_at_array */
rq_packed_final:1, /* packed final reply */
rq_hp:1, /* high priority RPC */
rq_at_linked:1, /* link into service's srv_at_array */
+ rq_reply_truncate:1,
+ rq_committed:1;
enum rq_phase rq_phase; /* one of RQ_PHASE_* */
enum rq_phase rq_next_phase; /* one of RQ_PHASE_* to be used next */
enum rq_phase rq_phase; /* one of RQ_PHASE_* */
enum rq_phase rq_next_phase; /* one of RQ_PHASE_* to be used next */
struct obd_client_handle *mod_och;
struct ptlrpc_request *mod_open_req;
struct ptlrpc_request *mod_close_req;
struct obd_client_handle *mod_och;
struct ptlrpc_request *mod_open_req;
struct ptlrpc_request *mod_close_req;
+static inline struct md_open_data *obd_mod_alloc(void)
+{
+ struct md_open_data *mod;
+ OBD_ALLOC_PTR(mod);
+ if (mod == NULL)
+ return NULL;
+ atomic_set(&mod->mod_refcount, 1);
+ return mod;
+}
+
+#define obd_mod_get(mod) atomic_inc(&(mod)->mod_refcount)
+#define obd_mod_put(mod) \
+({ \
+ if (atomic_dec_and_test(&(mod)->mod_refcount)) { \
+ if ((mod)->mod_open_req) \
+ ptlrpc_req_finished((mod)->mod_open_req); \
+ OBD_FREE_PTR(mod); \
+ } \
+})
+
- /*
- * here we check if this is forced umount. If so this is called on
- * canceling "open lock" and we do not call md_close() in this case, as
- * it will not be successful, as import is already deactivated.
- */
- if (obd->obd_force)
- GOTO(out, rc = 0);
-
OBD_ALLOC_PTR(op_data);
if (op_data == NULL)
GOTO(out, rc = -ENOMEM); // XXX We leak openhandle and request here.
OBD_ALLOC_PTR(op_data);
if (op_data == NULL)
GOTO(out, rc = -ENOMEM); // XXX We leak openhandle and request here.
+ *mod = obd_mod_alloc();
if (*mod == NULL) {
DEBUG_REQ(D_ERROR, req, "Can't allocate "
"md_open_data");
if (*mod == NULL) {
DEBUG_REQ(D_ERROR, req, "Can't allocate "
"md_open_data");
req->rq_cb_data = *mod;
(*mod)->mod_open_req = req;
req->rq_commit_cb = mdc_commit_open;
req->rq_cb_data = *mod;
(*mod)->mod_open_req = req;
req->rq_commit_cb = mdc_commit_open;
+ /**
+ * Take an extra reference on \var mod, it protects \var
+ * mod from being freed on eviction (commit callback is
+ * called despite rq_replay flag).
+ * Will be put on mdc_done_writing().
+ */
+ obd_mod_get(*mod);
rc = 0;
}
*request = req;
rc = 0;
}
*request = req;
- if (rc && req->rq_commit_cb)
+ if (rc && req->rq_commit_cb) {
+ /* Put an extra reference on \var mod on error case. */
+ obd_mod_put(*mod);
- if (mod->mod_och != NULL)
- mod->mod_och->och_mod = NULL;
+ /**
+ * No need to touch md_open_data::mod_och, it holds a reference on
+ * \var mod and will zero references to each other, \var mod will be
+ * freed after that when md_open_data::mod_och will put the reference.
+ */
- OBD_FREE(mod, sizeof(*mod));
+ /**
+ * Do not let open request to disappear as it still may be needed
+ * for close rpc to happen (it may happen on evict only, otherwise
+ * ptlrpc_request::rq_replay does not let mdc_commit_open() to be
+ * called), just mark this rpc as committed to distinguish these 2
+ * cases, see mdc_close() for details. The open request reference will
+ * be put along with freeing \var mod.
+ */
+ ptlrpc_request_addref(req);
+ spin_lock(&req->rq_lock);
+ req->rq_committed = 1;
+ spin_unlock(&req->rq_lock);
}
int mdc_set_open_replay_data(struct obd_export *exp,
}
int mdc_set_open_replay_data(struct obd_export *exp,
/* Only if the import is replayable, we set replay_open data */
if (och && imp->imp_replayable) {
/* Only if the import is replayable, we set replay_open data */
if (och && imp->imp_replayable) {
if (mod == NULL) {
DEBUG_REQ(D_ERROR, open_req,
"Can't allocate md_open_data");
RETURN(0);
}
if (mod == NULL) {
DEBUG_REQ(D_ERROR, open_req,
"Can't allocate md_open_data");
RETURN(0);
}
+ /**
+ * Take a reference on \var mod, to be freed on mdc_close().
+ * It protects \var mod from being freed on eviction (commit
+ * callback is called despite rq_replay flag).
+ * Another reference for \var och.
+ */
+ obd_mod_get(mod);
+ obd_mod_get(mod);
+
spin_lock(&open_req->rq_lock);
och->och_mod = mod;
mod->mod_och = och;
spin_lock(&open_req->rq_lock);
och->och_mod = mod;
mod->mod_och = och;
struct md_open_data *mod = och->och_mod;
ENTRY;
struct md_open_data *mod = och->och_mod;
ENTRY;
- /*
- * Don't free the structure now (it happens in mdc_commit_open(), after
- * we're sure we won't need to fix up the close request in the future),
- * but make sure that replay doesn't poke at the och, which is about to
- * be freed.
- */
- LASSERT(mod != LP_POISON);
- if (mod != NULL)
- mod->mod_och = NULL;
+ LASSERT(mod != LP_POISON && mod != NULL);
/* Ensure that this close's handle is fixed up during replay. */
if (likely(mod != NULL)) {
/* Ensure that this close's handle is fixed up during replay. */
if (likely(mod != NULL)) {
- LASSERTF(mod->mod_open_req->rq_type != LI_POISON,
+ LASSERTF(mod->mod_open_req != NULL &&
+ mod->mod_open_req->rq_type != LI_POISON,
"POISONED open %p!\n", mod->mod_open_req);
mod->mod_close_req = req;
"POISONED open %p!\n", mod->mod_open_req);
mod->mod_close_req = req;
DEBUG_REQ(D_HA, mod->mod_open_req, "matched open");
/* We no longer want to preserve this open for replay even
* though the open was committed. b=3632, b=3633 */
DEBUG_REQ(D_HA, mod->mod_open_req, "matched open");
/* We no longer want to preserve this open for replay even
* though the open was committed. b=3632, b=3633 */
* server failed before close was sent. Let's check if mod
* exists and return no error in that case
*/
* server failed before close was sent. Let's check if mod
* exists and return no error in that case
*/
- if (mod && (mod->mod_open_req == NULL))
- rc = 0;
+ if (mod) {
+ LASSERT(mod->mod_open_req != NULL);
+ if (mod->mod_open_req->rq_committed)
+ rc = 0;
+ }
- if (rc != 0 && mod)
- mod->mod_close_req = NULL;
-
+ if (mod) {
+ if (rc != 0)
+ mod->mod_close_req = NULL;
+ /* Since now, mod is accessed through open_req only,
+ * thus close req does not keep a reference on mod anymore. */
+ obd_mod_put(mod);
+ }
*request = req;
RETURN(rc);
}
*request = req;
RETURN(rc);
}
- LASSERTF(mod->mod_open_req->rq_type != LI_POISON,
+ LASSERTF(mod->mod_open_req != NULL &&
+ mod->mod_open_req->rq_type != LI_POISON,
"POISONED setattr %p!\n", mod->mod_open_req);
mod->mod_close_req = req;
"POISONED setattr %p!\n", mod->mod_open_req);
mod->mod_close_req = req;
* committed and server failed before close was sent.
* Let's check if mod exists and return no error in that case
*/
* committed and server failed before close was sent.
* Let's check if mod exists and return no error in that case
*/
- if (mod && (mod->mod_open_req == NULL))
- rc = 0;
+ if (mod) {
+ LASSERT(mod->mod_open_req != NULL);
+ if (mod->mod_open_req->rq_committed)
+ rc = 0;
+ }
+ if (mod) {
+ if (rc != 0)
+ mod->mod_close_req = NULL;
+ /* Since now, mod is accessed through setattr req only,
+ * thus DW req does not keep a reference on mod anymore. */
+ obd_mod_put(mod);
+ }
ptlrpc_req_finished(req);
RETURN(rc);
}
ptlrpc_req_finished(req);
RETURN(rc);
}
handle->och_fh = body->handle;
handle->och_magic = OBD_CLIENT_HANDLE_MAGIC;
handle->och_fh = body->handle;
handle->och_magic = OBD_CLIENT_HANDLE_MAGIC;
- OBD_ALLOC_PTR(handle->och_mod);
+ handle->och_mod = obd_mod_alloc();
if (handle->och_mod == NULL) {
DEBUG_REQ(D_ERROR, req, "can't allocate md_open_data");
GOTO(err_out, rc = -ENOMEM);
if (handle->och_mod == NULL) {
DEBUG_REQ(D_ERROR, req, "can't allocate md_open_data");
GOTO(err_out, rc = -ENOMEM);
ptlrpc_req_finished(req);
ptlrpc_req_finished(handle->och_mod->mod_open_req);
ptlrpc_req_finished(req);
ptlrpc_req_finished(handle->och_mod->mod_open_req);
- OBD_FREE(handle->och_mod, sizeof(*handle->och_mod));
+ obd_mod_put(handle->och_mod);