CDEBUG(D_NET, "mode: %o\n", body->mode);
offset = REPLY_REC_OFF + 1;
- LASSERT_REPSWAB(req, offset);
+ lustre_set_rep_swabbed(req, offset);
if (body->eadatasize != 0) {
/* reply indicates presence of eadata; check it's there... */
eadata = lustre_msg_buf(req->rq_repmsg, offset++,
md->body = lustre_msg_buf(req->rq_repmsg, offset, sizeof (*md->body));
LASSERT (md->body != NULL);
- LASSERT_REPSWABBED(req, offset);
+ LASSERT(lustre_rep_swabbed(req, offset));
offset++;
if (md->body->valid & OBD_MD_FLEASIZE) {
int lmmsize;
struct lov_mds_md *lmm;
- LASSERT(S_ISREG(md->body->mode));
+ if (!S_ISREG(md->body->mode)) {
+ CERROR("OBD_MD_FLEASIZE set, should be a regular file, "
+ "but is not\n");
+ GOTO(out, rc = -EPROTO);
+ }
if (md->body->eadatasize == 0) {
CERROR("OBD_MD_FLEASIZE set, but eadatasize 0\n");
- RETURN(-EPROTO);
+ GOTO(out, rc = -EPROTO);
}
lmmsize = md->body->eadatasize;
lmm = lustre_msg_buf(req->rq_repmsg, offset, lmmsize);
- LASSERT (lmm != NULL);
- LASSERT_REPSWABBED(req, offset);
+ if (!lmm) {
+ CERROR ("incorrect message: lmm == 0\n");
+ GOTO(out, rc = -EPROTO);
+ }
+ LASSERT(lustre_rep_swabbed(req, offset));
rc = obd_unpackmd(dt_exp, &md->lsm, lmm, lmmsize);
if (rc < 0)
- RETURN(rc);
+ GOTO(out, rc);
+
+ if (rc < sizeof(*md->lsm)) {
+ CERROR ("lsm size too small: rc < sizeof (*md->lsm) "
+ "(%d < %d)\n", rc, sizeof(*md->lsm));
+ GOTO(out, rc = -EPROTO);
+ }
- LASSERT (rc >= sizeof (*md->lsm));
offset++;
} else if (md->body->valid & OBD_MD_FLDIREA) {
int lmvsize;
struct lov_mds_md *lmv;
-
- LASSERT(S_ISDIR(md->body->mode));
+
+ if(!S_ISDIR(md->body->mode)) {
+ CERROR("OBD_MD_FLDIREA set, should be a directory, but "
+ "is not\n");
+ GOTO(out, rc = -EPROTO);
+ }
if (md->body->eadatasize == 0) {
- CERROR("OBD_MD_FLEASIZE is set, but eadatasize 0\n");
+ CERROR("OBD_MD_FLDIREA is set, but eadatasize 0\n");
RETURN(-EPROTO);
}
if (md->body->valid & OBD_MD_MEA) {
lmvsize = md->body->eadatasize;
lmv = lustre_msg_buf(req->rq_repmsg, offset, lmvsize);
- LASSERT (lmv != NULL);
- LASSERT_REPSWABBED(req, offset);
+ if (!lmv) {
+ CERROR ("incorrect message: lmv == 0\n");
+ GOTO(out, rc = -EPROTO);
+ }
+
+ LASSERT(lustre_rep_swabbed(req, offset));
rc = obd_unpackmd(md_exp, (void *)&md->mea, lmv,
lmvsize);
if (rc < 0)
- RETURN(rc);
+ GOTO(out, rc);
- LASSERT (rc >= sizeof (*md->mea));
+ if (rc < sizeof(*md->mea)) {
+ CERROR ("size too small: rc < sizeof(*md->mea) "
+ "(%d < %d)\n", rc, sizeof(*md->mea));
+ GOTO(out, rc = -EPROTO);
+ }
}
offset++;
}
if (md->body->valid & OBD_MD_FLRMTPERM) {
md->remote_perm = lustre_msg_buf(req->rq_repmsg, offset++,
sizeof(struct mdt_remote_perm));
- LASSERT(md->remote_perm);
+ if (!md->remote_perm) {
+ CERROR ("incorrect message: remote_perm == 0\n");
+ GOTO(out, rc = -EPROTO);
+ }
}
/* for ACL, it's possible that FLACL is set but aclsize is zero. only
RETURN(0);
}
-static void mdc_commit_open(struct ptlrpc_request *req)
-{
- struct mdc_open_data *mod = req->rq_cb_data;
- if (mod == NULL)
- return;
-
- if (mod->mod_close_req != NULL)
- mod->mod_close_req->rq_cb_data = NULL;
-
- if (mod->mod_och != NULL)
- mod->mod_och->och_mod = NULL;
-
- OBD_FREE(mod, sizeof(*mod));
- req->rq_cb_data = NULL;
-}
-
static void mdc_replay_open(struct ptlrpc_request *req)
{
- struct mdc_open_data *mod = req->rq_cb_data;
- struct ptlrpc_request *close_req;
+ struct md_open_data *mod = req->rq_cb_data;
+ struct ptlrpc_request *cur, *tmp;
struct obd_client_handle *och;
struct lustre_handle old;
struct mdt_body *body;
body = lustre_swab_repbuf(req, DLM_REPLY_REC_OFF, sizeof(*body),
lustre_swab_mdt_body);
+ LASSERT(body != NULL);
och = mod->mod_och;
if (och != NULL) {
struct lustre_handle *file_fh;
LASSERT(och->och_magic == OBD_CLIENT_HANDLE_MAGIC);
- LASSERT(body != NULL);
file_fh = &och->och_fh;
CDEBUG(D_HA, "updating handle from "LPX64" to "LPX64"\n",
file_fh->cookie, body->handle.cookie);
- memcpy(&old, file_fh, sizeof(old));
- memcpy(file_fh, &body->handle, sizeof(*file_fh));
+ old = *file_fh;
+ *file_fh = body->handle;
}
- close_req = mod->mod_close_req;
- if (close_req != NULL) {
- struct mdt_epoch *epoch;
-
- LASSERT(lustre_msg_get_opc(close_req->rq_reqmsg) == MDS_CLOSE);
- LASSERT(body != NULL);
- epoch = lustre_msg_buf(close_req->rq_reqmsg, REQ_REC_OFF,
- sizeof(*epoch));
- LASSERT(epoch);
- if (och != NULL)
- LASSERT(!memcmp(&old, &epoch->handle, sizeof(old)));
- DEBUG_REQ(D_HA, close_req, "updating close body with new fh");
- memcpy(&epoch->handle, &body->handle, sizeof(epoch->handle));
+ list_for_each_entry_safe(cur, tmp, &mod->mod_replay_list, rq_mod_list) {
+ int opc = lustre_msg_get_opc(cur->rq_reqmsg);
+ struct mdt_epoch *epoch = NULL;
+
+ if (opc == MDS_CLOSE || opc == MDS_DONE_WRITING) {
+ epoch = lustre_msg_buf(cur->rq_reqmsg,
+ REQ_REC_OFF, sizeof(*epoch));
+ LASSERT(epoch);
+ DEBUG_REQ(D_HA, cur, "updating %s body with new fh",
+ opc == MDS_CLOSE ? "CLOSE" : "DONE_WRITING");
+ } else if (opc == MDS_REINT) {
+ struct mdt_rec_setattr *rec;
+
+ /* Check this is REINT_SETATTR. */
+ rec = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF,
+ sizeof (*rec));
+ LASSERT(rec && rec->sa_opcode == REINT_SETATTR);
+
+ epoch = lustre_msg_buf(cur->rq_reqmsg,
+ REQ_REC_OFF + 2, sizeof(*epoch));
+ LASSERT(epoch);
+ DEBUG_REQ(D_HA, cur, "updating REINT_SETATTR body "
+ "with new fh");
+ }
+ if (epoch) {
+ if (och != NULL)
+ LASSERT(!memcmp(&old, &epoch->handle,
+ sizeof(old)));
+ epoch->handle = body->handle;
+ }
}
EXIT;
}
+void mdc_commit_delayed(struct ptlrpc_request *req)
+{
+ struct md_open_data *mod = req->rq_cb_data;
+ struct ptlrpc_request *cur, *tmp;
+
+ DEBUG_REQ(D_HA, req, "req committed");
+
+ if (mod == NULL)
+ return;
+
+ req->rq_cb_data = NULL;
+ req->rq_commit_cb = NULL;
+ list_del_init(&req->rq_mod_list);
+ if (req->rq_sequence) {
+ list_for_each_entry_safe(cur, tmp, &mod->mod_replay_list,
+ rq_mod_list) {
+ LASSERT(cur != LP_POISON);
+ LASSERT(cur->rq_type != LI_POISON);
+ DEBUG_REQ(D_HA, cur, "req balanced");
+ LASSERT(cur->rq_transno != 0);
+ LASSERT(cur->rq_import == req->rq_import);
+
+ /* We no longer want to preserve this for transno-
+ * unconditional replay. */
+ spin_lock(&cur->rq_lock);
+ cur->rq_replay = 0;
+ spin_unlock(&cur->rq_lock);
+ }
+ }
+
+ if (list_empty(&mod->mod_replay_list)) {
+ if (mod->mod_och != NULL)
+ mod->mod_och->och_mod = NULL;
+
+ OBD_FREE_PTR(mod);
+ }
+}
+
int mdc_set_open_replay_data(struct obd_export *exp,
struct obd_client_handle *och,
struct ptlrpc_request *open_req)
{
- struct mdc_open_data *mod;
+ struct md_open_data *mod;
struct mdt_rec_create *rec = lustre_msg_buf(open_req->rq_reqmsg,
DLM_INTENT_REC_OFF,
sizeof(*rec));
LASSERT(rec != NULL);
/* Incoming message in my byte order (it's been swabbed). */
- LASSERT_REPSWABBED(open_req, DLM_REPLY_REC_OFF);
+ LASSERT(lustre_rep_swabbed(open_req, DLM_REPLY_REC_OFF));
/* Outgoing messages always in my byte order. */
LASSERT(body != NULL);
OBD_ALLOC(mod, sizeof(*mod));
if (mod == NULL) {
DEBUG_REQ(D_ERROR, open_req,
- "Can't allocate mdc_open_data");
+ "Can't allocate md_open_data");
RETURN(0);
}
+ CFS_INIT_LIST_HEAD(&mod->mod_replay_list);
spin_lock(&open_req->rq_lock);
if (!open_req->rq_replay) {
och->och_mod = mod;
mod->mod_och = och;
open_req->rq_cb_data = mod;
- mod->mod_open_req = open_req;
- open_req->rq_commit_cb = mdc_commit_open;
+ list_add_tail(&open_req->rq_mod_list, &mod->mod_replay_list);
+ open_req->rq_commit_cb = mdc_commit_delayed;
spin_unlock(&open_req->rq_lock);
}
LBUG();
}
- DEBUG_REQ(D_HA, open_req, "Set up open replay data");
+ DEBUG_REQ(D_RPCTRACE, open_req, "Set up open replay data");
RETURN(0);
}
int mdc_clear_open_replay_data(struct obd_export *exp,
struct obd_client_handle *och)
{
- struct mdc_open_data *mod = och->och_mod;
+ struct md_open_data *mod = och->och_mod;
ENTRY;
/*
- * Don't free the structure now (it happens in mdc_commit_open(), after
- * we're sure we won't need to fix up the close request in the future),
+ * Don't free the structure now (it happens in mdc_commit_delayed(),
+ * after the last request is removed from its replay list),
* but make sure that replay doesn't poke at the och, which is about to
* be freed.
*/
RETURN(0);
}
-static void mdc_commit_close(struct ptlrpc_request *req)
-{
- struct mdc_open_data *mod = req->rq_cb_data;
- struct ptlrpc_request *open_req;
- struct obd_import *imp = req->rq_import;
-
- DEBUG_REQ(D_HA, req, "close req committed");
- if (mod == NULL)
- return;
-
- mod->mod_close_req = NULL;
- req->rq_cb_data = NULL;
- req->rq_commit_cb = NULL;
-
- open_req = mod->mod_open_req;
- LASSERT(open_req != NULL);
- LASSERT(open_req != LP_POISON);
- LASSERT(open_req->rq_type != LI_POISON);
-
- DEBUG_REQ(D_HA, open_req, "open req balanced");
- LASSERT(open_req->rq_transno != 0);
- LASSERT(open_req->rq_import == imp);
-
- /*
- * We no longer want to preserve this for transno-unconditional
- * replay. Decref open req here as well.
- */
- spin_lock(&open_req->rq_lock);
- open_req->rq_replay = 0;
- spin_unlock(&open_req->rq_lock);
-}
-
int mdc_close(struct obd_export *exp, struct md_op_data *op_data,
- struct obd_client_handle *och, struct ptlrpc_request **request)
+ struct md_open_data *mod, struct ptlrpc_request **request)
{
struct obd_device *obd = class_exp2obd(exp);
int reqsize[4] = { sizeof(struct ptlrpc_body),
obd->u.cli.cl_max_mds_easize,
obd->u.cli.cl_max_mds_cookiesize };
struct ptlrpc_request *req;
- struct mdc_open_data *mod;
int rc;
ENTRY;
req->rq_request_portal = MDS_READPAGE_PORTAL;
/* Ensure that this close's handle is fixed up during replay. */
- LASSERT(och != NULL);
- LASSERT(och->och_magic == OBD_CLIENT_HANDLE_MAGIC);
- mod = och->och_mod;
- if (likely(mod != NULL)) {
- if (mod->mod_open_req->rq_type == LI_POISON) {
- CERROR("LBUG POISONED open %p!\n", mod->mod_open_req);
- LBUG();
- ptlrpc_req_finished(req);
- req = NULL;
- GOTO(out, rc = -EIO);
- }
- mod->mod_close_req = req;
- DEBUG_REQ(D_HA, mod->mod_open_req, "matched open");
- } else {
+ if (likely(mod != NULL))
+ list_add_tail(&req->rq_mod_list, &mod->mod_replay_list);
+ else
CDEBUG(D_HA, "couldn't find open req; expecting close error\n");
- }
mdc_close_pack(req, REQ_REC_OFF, op_data);
ptlrpc_req_set_repsize(req, 4, repsize);
- req->rq_commit_cb = mdc_commit_close;
+ req->rq_commit_cb = mdc_commit_delayed;
+ req->rq_replay = 1;
LASSERT(req->rq_cb_data == NULL);
req->rq_cb_data = mod;
mdc_put_rpc_lock(obd->u.cli.cl_close_lock, NULL);
if (req->rq_repmsg == NULL) {
- CDEBUG(D_HA, "request failed to send: %p, %d\n", req,
+ CDEBUG(D_RPCTRACE, "request failed to send: %p, %d\n", req,
req->rq_status);
if (rc == 0)
rc = req->rq_status ? req->rq_status : -EIO;
rc = -rc;
} else if (mod == NULL) {
if (req->rq_import->imp_replayable)
- CERROR("Unexpected: can't find mdc_open_data,"
+ CERROR("Unexpected: can't find md_open_data,"
"but close succeeded with replayable imp"
"Please tell CFS.\n");
}
}
int mdc_done_writing(struct obd_export *exp, struct md_op_data *op_data,
- struct obd_client_handle *och)
+ struct md_open_data *mod)
{
struct obd_device *obd = class_exp2obd(exp);
struct ptlrpc_request *req;
if (req == NULL)
RETURN(-ENOMEM);
- /* XXX: add DONE_WRITING request to och -- when Size-on-MDS
- * recovery will be ready. */
mdc_close_pack(req, REQ_REC_OFF, op_data);
+
+ req->rq_replay = 1;
+ req->rq_cb_data = mod;
+ req->rq_commit_cb = mdc_commit_delayed;
+ if (likely(mod != NULL))
+ list_add_tail(&req->rq_mod_list, &mod->mod_replay_list);
+ else
+ CDEBUG(D_HA, "couldn't find open req; expecting close error\n");
ptlrpc_req_set_repsize(req, 2, repsize);
mdc_get_rpc_lock(obd->u.cli.cl_close_lock, NULL);
rc = ptlrpc_queue_wait(req);
mdc_put_rpc_lock(obd->u.cli.cl_close_lock, NULL);
+
+ /* Close the open replay sequence if an error occured or no SOM
+ * attribute update is needed. */
+ if (rc != -EAGAIN)
+ ptlrpc_close_replay_seq(req);
+
+ if (rc && rc != -EAGAIN && req->rq_commit_cb)
+ req->rq_commit_cb(req);
+
ptlrpc_req_finished(req);
RETURN(rc);
}
int rc;
ENTRY;
-#if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
- MOD_INC_USE_COUNT;
-#else
if (!try_module_get(THIS_MODULE)) {
CERROR("Can't get module. Is it alive?");
return -EINVAL;
}
-#endif
switch (cmd) {
case OBD_IOC_CLIENT_RECOVER:
rc = ptlrpc_recover_import(imp, data->ioc_inlbuf1);
GOTO(out, rc = -ENOTTY);
}
out:
-#if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
- MOD_DEC_USE_COUNT;
-#else
module_put(THIS_MODULE);
-#endif
return rc;
}
OBD_ALLOC(handle->och_mod, sizeof(*handle->och_mod));
if (handle->och_mod == NULL) {
- DEBUG_REQ(D_ERROR, req, "can't allocate mdc_open_data");
+ DEBUG_REQ(D_ERROR, req, "can't allocate md_open_data");
RETURN(-ENOMEM);
}
- handle->och_mod->mod_open_req = req; /* will be dropped by unpin */
+
+ /* will be dropped by unpin */
+ CFS_INIT_LIST_HEAD(&handle->och_mod->mod_replay_list);
+ list_add_tail(&req->rq_mod_list, &handle->och_mod->mod_replay_list);
RETURN(rc);
}
CERROR("unpin failed: %d\n", rc);
ptlrpc_req_finished(req);
- ptlrpc_req_finished(handle->och_mod->mod_open_req);
+
+ LASSERT(!list_empty(&handle->och_mod->mod_replay_list));
+ req = list_entry(handle->och_mod->mod_replay_list.next,
+ typeof(*req), rq_mod_list);
+ list_del_init(&req->rq_mod_list);
+ ptlrpc_req_finished(req);
+ LASSERT(list_empty(&handle->och_mod->mod_replay_list));
+
OBD_FREE(handle->och_mod, sizeof(*handle->och_mod));
RETURN(rc);
}
GOTO(err_close_lock, rc);
lprocfs_init_vars(mdc, &lvars);
lprocfs_obd_setup(obd, lvars.obd_vars);
+ ptlrpc_lprocfs_register_obd(obd);
rc = obd_llog_init(obd, NULL, obd, 0, NULL, NULL);
if (rc) {
OBD_FREE(cli->cl_setattr_lock, sizeof (*cli->cl_setattr_lock));
OBD_FREE(cli->cl_close_lock, sizeof (*cli->cl_close_lock));
+ ptlrpc_lprocfs_unregister_obd(obd);
lprocfs_obd_cleanup(obd);
ptlrpcd_decref();