RETURN(0);
}
-static void mdc_commit_open(struct ptlrpc_request *req)
-{
- struct mdc_open_data *mod = req->rq_cb_data;
- if (mod == NULL)
- return;
-
- if (mod->mod_close_req != NULL)
- mod->mod_close_req->rq_cb_data = NULL;
-
- if (mod->mod_och != NULL)
- mod->mod_och->och_mod = NULL;
-
- OBD_FREE(mod, sizeof(*mod));
- req->rq_cb_data = NULL;
-}
-
static void mdc_replay_open(struct ptlrpc_request *req)
{
- struct mdc_open_data *mod = req->rq_cb_data;
- struct ptlrpc_request *close_req;
+ struct md_open_data *mod = req->rq_cb_data;
+ struct ptlrpc_request *cur, *tmp;
struct obd_client_handle *och;
struct lustre_handle old;
struct mdt_body *body;
body = lustre_swab_repbuf(req, DLM_REPLY_REC_OFF, sizeof(*body),
lustre_swab_mdt_body);
+ LASSERT(body != NULL);
och = mod->mod_och;
if (och != NULL) {
struct lustre_handle *file_fh;
LASSERT(och->och_magic == OBD_CLIENT_HANDLE_MAGIC);
- LASSERT(body != NULL);
file_fh = &och->och_fh;
CDEBUG(D_HA, "updating handle from "LPX64" to "LPX64"\n",
file_fh->cookie, body->handle.cookie);
- memcpy(&old, file_fh, sizeof(old));
- memcpy(file_fh, &body->handle, sizeof(*file_fh));
+ old = *file_fh;
+ *file_fh = body->handle;
}
- close_req = mod->mod_close_req;
- if (close_req != NULL) {
- struct mdt_epoch *epoch;
- LASSERT(lustre_msg_get_opc(close_req->rq_reqmsg) == MDS_CLOSE);
- LASSERT(body != NULL);
-
- epoch = lustre_msg_buf(close_req->rq_reqmsg, REQ_REC_OFF,
- sizeof(*epoch));
- LASSERT(epoch);
- if (och != NULL)
- LASSERT(!memcmp(&old, &epoch->handle, sizeof(old)));
- DEBUG_REQ(D_HA, close_req, "updating close body with new fh");
- memcpy(&epoch->handle, &body->handle, sizeof(epoch->handle));
+ list_for_each_entry_safe(cur, tmp, &mod->mod_replay_list, rq_mod_list) {
+ int opc = lustre_msg_get_opc(cur->rq_reqmsg);
+ struct mdt_epoch *epoch = NULL;
+
+ if (opc == MDS_CLOSE || opc == MDS_DONE_WRITING) {
+ epoch = lustre_msg_buf(cur->rq_reqmsg,
+ REQ_REC_OFF, sizeof(*epoch));
+ LASSERT(epoch);
+ DEBUG_REQ(D_HA, cur, "updating %s body with new fh",
+ opc == MDS_CLOSE ? "CLOSE" : "DONE_WRITING");
+ } else if (opc == MDS_REINT) {
+ struct mdt_rec_setattr *rec;
+
+ /* Check this is REINT_SETATTR. */
+ rec = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF,
+ sizeof (*rec));
+ LASSERT(rec && rec->sa_opcode == REINT_SETATTR);
+
+ epoch = lustre_msg_buf(cur->rq_reqmsg,
+ REQ_REC_OFF + 2, sizeof(*epoch));
+ LASSERT(epoch);
+ DEBUG_REQ(D_HA, cur, "updating REINT_SETATTR body "
+ "with new fh");
+ }
+ if (epoch) {
+ if (och != NULL)
+ LASSERT(!memcmp(&old, &epoch->handle,
+ sizeof(old)));
+ epoch->handle = body->handle;
+ }
}
EXIT;
}
+void mdc_commit_delayed(struct ptlrpc_request *req)
+{
+ struct md_open_data *mod = req->rq_cb_data;
+ struct ptlrpc_request *cur, *tmp;
+
+ DEBUG_REQ(D_HA, req, "req committed");
+
+ if (mod == NULL)
+ return;
+
+ req->rq_cb_data = NULL;
+ req->rq_commit_cb = NULL;
+ list_del_init(&req->rq_mod_list);
+ if (req->rq_sequence) {
+ list_for_each_entry_safe(cur, tmp, &mod->mod_replay_list,
+ rq_mod_list)
+ {
+ LASSERT(cur != LP_POISON);
+ LASSERT(cur->rq_type != LI_POISON);
+ DEBUG_REQ(D_HA, cur, "req balanced");
+ LASSERT(cur->rq_transno != 0);
+ LASSERT(cur->rq_import == req->rq_import);
+
+ list_del_init(&cur->rq_mod_list);
+ /* We no longer want to preserve this for transno-
+ * unconditional replay. */
+ spin_lock(&cur->rq_lock);
+ cur->rq_replay = 0;
+ spin_unlock(&cur->rq_lock);
+ }
+ }
+
+ if (list_empty(&mod->mod_replay_list)) {
+ if (mod->mod_och != NULL)
+ mod->mod_och->och_mod = NULL;
+
+ OBD_FREE_PTR(mod);
+ }
+}
+
int mdc_set_open_replay_data(struct obd_export *exp,
struct obd_client_handle *och,
struct ptlrpc_request *open_req)
{
- struct mdc_open_data *mod;
+ struct md_open_data *mod;
struct mdt_rec_create *rec = lustre_msg_buf(open_req->rq_reqmsg,
DLM_INTENT_REC_OFF,
sizeof(*rec));
OBD_ALLOC(mod, sizeof(*mod));
if (mod == NULL) {
DEBUG_REQ(D_ERROR, open_req,
- "Can't allocate mdc_open_data");
+ "Can't allocate md_open_data");
RETURN(0);
}
+ CFS_INIT_LIST_HEAD(&mod->mod_replay_list);
spin_lock(&open_req->rq_lock);
if (!open_req->rq_replay) {
och->och_mod = mod;
mod->mod_och = och;
open_req->rq_cb_data = mod;
- mod->mod_open_req = open_req;
- open_req->rq_commit_cb = mdc_commit_open;
+ list_add_tail(&open_req->rq_mod_list, &mod->mod_replay_list);
+ open_req->rq_commit_cb = mdc_commit_delayed;
spin_unlock(&open_req->rq_lock);
}
int mdc_clear_open_replay_data(struct obd_export *exp,
struct obd_client_handle *och)
{
- struct mdc_open_data *mod = och->och_mod;
+ struct md_open_data *mod = och->och_mod;
ENTRY;
/*
- * Don't free the structure now (it happens in mdc_commit_open(), after
- * we're sure we won't need to fix up the close request in the future),
+ * Don't free the structure now (it happens in mdc_commit_delayed(),
+ * after the last request is removed from its replay list),
* but make sure that replay doesn't poke at the och, which is about to
* be freed.
*/
RETURN(0);
}
-static void mdc_commit_close(struct ptlrpc_request *req)
-{
- struct mdc_open_data *mod = req->rq_cb_data;
- struct ptlrpc_request *open_req;
- struct obd_import *imp = req->rq_import;
-
- DEBUG_REQ(D_HA, req, "close req committed");
- if (mod == NULL)
- return;
-
- mod->mod_close_req = NULL;
- req->rq_cb_data = NULL;
- req->rq_commit_cb = NULL;
-
- open_req = mod->mod_open_req;
- LASSERT(open_req != NULL);
- LASSERT(open_req != LP_POISON);
- LASSERT(open_req->rq_type != LI_POISON);
-
- DEBUG_REQ(D_HA, open_req, "open req balanced");
- LASSERT(open_req->rq_transno != 0);
- LASSERT(open_req->rq_import == imp);
-
- /*
- * We no longer want to preserve this for transno-unconditional
- * replay. Decref open req here as well.
- */
- spin_lock(&open_req->rq_lock);
- open_req->rq_replay = 0;
- spin_unlock(&open_req->rq_lock);
-}
-
int mdc_close(struct obd_export *exp, struct md_op_data *op_data,
- struct obd_client_handle *och, struct ptlrpc_request **request)
+ struct md_open_data *mod, struct ptlrpc_request **request)
{
struct obd_device *obd = class_exp2obd(exp);
int reqsize[4] = { sizeof(struct ptlrpc_body),
obd->u.cli.cl_max_mds_easize,
obd->u.cli.cl_max_mds_cookiesize };
struct ptlrpc_request *req;
- struct mdc_open_data *mod;
int rc;
ENTRY;
req->rq_request_portal = MDS_READPAGE_PORTAL;
/* Ensure that this close's handle is fixed up during replay. */
- LASSERT(och != NULL);
- LASSERT(och->och_magic == OBD_CLIENT_HANDLE_MAGIC);
- mod = och->och_mod;
- if (likely(mod != NULL)) {
- if (mod->mod_open_req->rq_type == LI_POISON) {
- CERROR("LBUG POISONED open %p!\n", mod->mod_open_req);
- LBUG();
- ptlrpc_req_finished(req);
- req = NULL;
- GOTO(out, rc = -EIO);
- }
- mod->mod_close_req = req;
- DEBUG_REQ(D_HA, mod->mod_open_req, "matched open");
- } else {
+ if (likely(mod != NULL))
+ list_add_tail(&req->rq_mod_list, &mod->mod_replay_list);
+ else
CDEBUG(D_HA, "couldn't find open req; expecting close error\n");
- }
mdc_close_pack(req, REQ_REC_OFF, op_data);
ptlrpc_req_set_repsize(req, 4, repsize);
- req->rq_commit_cb = mdc_commit_close;
+ req->rq_commit_cb = mdc_commit_delayed;
+ req->rq_replay = 1;
LASSERT(req->rq_cb_data == NULL);
req->rq_cb_data = mod;
rc = -rc;
} else if (mod == NULL) {
if (req->rq_import->imp_replayable)
- CERROR("Unexpected: can't find mdc_open_data,"
+ CERROR("Unexpected: can't find md_open_data,"
"but close succeeded with replayable imp"
"Please tell CFS.\n");
}
}
int mdc_done_writing(struct obd_export *exp, struct md_op_data *op_data,
- struct obd_client_handle *och)
+ struct md_open_data *mod)
{
struct obd_device *obd = class_exp2obd(exp);
struct ptlrpc_request *req;
if (req == NULL)
RETURN(-ENOMEM);
- /* XXX: add DONE_WRITING request to och -- when Size-on-MDS
- * recovery will be ready. */
mdc_close_pack(req, REQ_REC_OFF, op_data);
+
+ req->rq_replay = 1;
+ req->rq_cb_data = mod;
+ req->rq_commit_cb = mdc_commit_delayed;
+ if (likely(mod != NULL))
+ list_add_tail(&req->rq_mod_list, &mod->mod_replay_list);
+ else
+ CDEBUG(D_HA, "couldn't find open req; expecting close error\n");
ptlrpc_req_set_repsize(req, 2, repsize);
mdc_get_rpc_lock(obd->u.cli.cl_close_lock, NULL);
rc = ptlrpc_queue_wait(req);
mdc_put_rpc_lock(obd->u.cli.cl_close_lock, NULL);
+
+ /* Close the open replay sequence if an error occured or no SOM
+ * attribute update is needed. */
+ if (rc != -EAGAIN)
+ ptlrpc_close_replay_seq(req);
+
+ if (rc && rc != -EAGAIN && req->rq_commit_cb)
+ req->rq_commit_cb(req);
+
ptlrpc_req_finished(req);
RETURN(rc);
}
OBD_ALLOC(handle->och_mod, sizeof(*handle->och_mod));
if (handle->och_mod == NULL) {
- DEBUG_REQ(D_ERROR, req, "can't allocate mdc_open_data");
+ DEBUG_REQ(D_ERROR, req, "can't allocate md_open_data");
RETURN(-ENOMEM);
}
- handle->och_mod->mod_open_req = req; /* will be dropped by unpin */
+
+ /* will be dropped by unpin */
+ CFS_INIT_LIST_HEAD(&handle->och_mod->mod_replay_list);
+ list_add_tail(&req->rq_mod_list, &handle->och_mod->mod_replay_list);
RETURN(rc);
}
CERROR("unpin failed: %d\n", rc);
ptlrpc_req_finished(req);
- ptlrpc_req_finished(handle->och_mod->mod_open_req);
+
+ LASSERT(!list_empty(&handle->och_mod->mod_replay_list));
+ req = list_entry(handle->och_mod->mod_replay_list.next,
+ typeof(*req), rq_mod_list);
+ list_del_init(&req->rq_mod_list);
+ ptlrpc_req_finished(req);
+ LASSERT(list_empty(&handle->och_mod->mod_replay_list));
+
OBD_FREE(handle->och_mod, sizeof(*handle->och_mod));
RETURN(rc);
}