#include <linux/init.h>
#include <linux/lprocfs_status.h>
+inline void oti_to_request(struct obd_trans_info *oti, struct ptlrpc_request *req)
+{
+ if (oti && req->rq_repmsg)
+ req->rq_repmsg->transno = HTON__u64(oti->oti_transno);
+ EXIT;
+}
static int ost_destroy(struct ptlrpc_request *req, struct obd_trans_info *oti)
{
RETURN(0);
}
+static int ost_syncfs(struct ptlrpc_request *req)
+{
+ struct lustre_handle *conn = (struct lustre_handle *)req->rq_reqmsg;
+ struct obd_statfs *osfs;
+ int rc, size = sizeof(*osfs);
+ ENTRY;
+
+ rc = lustre_pack_msg(0, &size, NULL, &req->rq_replen, &req->rq_repmsg);
+ if (rc)
+ RETURN(rc);
+
+ rc = obd_syncfs(conn);
+ if (rc) {
+ CERROR("ost: syncfs failed: rc %d\n", rc);
+ req->rq_status = rc;
+ RETURN(rc);
+ }
+
+ RETURN(0);
+}
+
static int ost_open(struct ptlrpc_request *req, struct obd_trans_info *oti)
{
struct lustre_handle *conn = (struct lustre_handle *)req->rq_reqmsg;
void *desc_priv = NULL;
int cmd, i, j, objcount, niocount, size = sizeof(*body);
int rc = 0;
+#if CHECKSUM_BULK
+ __u64 cksum = 0;
+#endif
ENTRY;
body = lustre_msg_buf(req->rq_reqmsg, 0);
if (OBD_FAIL_CHECK(OBD_FAIL_OST_BRW_READ_BULK))
GOTO(out, req->rq_status = -EIO);
+ /* Hmm, we don't return anything in this reply buffer?
+ * We should be returning per-page status codes and also
+ * per-object size, blocks count, mtime, ctime. (bug 593) */
+ rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
+ if (rc)
+ GOTO(out, req->rq_status = rc);
+
for (i = 0; i < objcount; i++) {
ost_unpack_ioo(&tmp1, &ioo);
if (tmp2 + ioo->ioo_bufcnt > end2) {
bulk->bp_xid = remote_nb[i].xid;
bulk->bp_buf = local_nb[i].addr;
bulk->bp_buflen = remote_nb[i].len;
+ if (body->oa.o_valid & NTOH__u32(OBD_MD_FLCKSUM))
+ ost_checksum(&cksum, bulk->bp_buf, bulk->bp_buflen);
}
rc = ptlrpc_bulk_put(desc);
out_local:
OBD_FREE(local_nb, sizeof(*local_nb) * niocount);
out:
- if (!rc)
- /* Hmm, we don't return anything in this reply buffer?
- * We should be returning per-page status codes and also
- * per-object size, blocks count, mtime, ctime. (bug 593) */
- rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen,
- &req->rq_repmsg);
if (rc)
ptlrpc_error(req->rq_svc, req);
- else
+ else {
+#if CHECKSUM_BULK
+ body = lustre_msg_buf(req->rq_repmsg, 0);
+ body->oa.o_rdev = HTON__u64(cksum);
+ body->oa.o_valid |= HTON__u32(OBD_MD_FLCKSUM);
+#endif
ptlrpc_reply(req->rq_svc, req);
+ }
+
RETURN(rc);
}
}
}
- OBD_ALLOC(local_nb, sizeof(*local_nb)* niocount);
+ OBD_ALLOC(local_nb, sizeof(*local_nb) * niocount);
if (local_nb == NULL)
GOTO(out, rc = -ENOMEM);
remote_nb, local_nb, &desc_priv, oti);
if (req->rq_status)
- GOTO(out, rc = 0);
+ GOTO(out_local, rc = 0);
desc = ptlrpc_prep_bulk(req->rq_connection);
if (desc == NULL)
GOTO(out_bulk, rc);
}
+#if CHECKSUM_BULK
+ if ((body->oa.o_valid & NTOH__u32(OBD_MD_FLCKSUM))) {
+ static int cksum_counter;
+ __u64 client_cksum = NTOH__u64(body->oa.o_rdev);
+ __u64 cksum = 0;
+
+ for (i = 0; i < niocount; i++) {
+ char *ptr = kmap(local_nb[i].page);
+ int off = local_nb[i].offset & (PAGE_SIZE - 1);
+ int len = local_nb[i].len;
+
+ LASSERT(off + len <= PAGE_SIZE);
+ ost_checksum(&cksum, ptr + off, len);
+ kunmap(local_nb[i].page);
+ }
+
+ if (client_cksum != cksum) {
+ CERROR("Bad checksum: client "LPX64", server "LPX64
+ ", client NID "LPX64"\n", client_cksum, cksum,
+ req->rq_connection->c_peer.peer_nid);
+ cksum_counter = 1;
+ } else {
+ cksum_counter++;
+ if ((cksum_counter & (-cksum_counter)) == cksum_counter)
+ CERROR("Checksum %d from "LPX64": "LPX64" OK\n",
+ cksum_counter,
+ req->rq_connection->c_peer.peer_nid,
+ cksum);
+ }
+ }
+#endif
+
req->rq_status = obd_commitrw(cmd, conn, objcount, ioo, niocount,
local_nb, desc_priv, oti);
&req->rq_repmsg);
if (rc)
ptlrpc_error(req->rq_svc, req);
- else
+ else {
+ oti_to_request(oti, req);
rc = ptlrpc_reply(req->rq_svc, req);
+ }
RETURN(rc);
}
-inline void oti_to_request(struct obd_trans_info *oti, struct ptlrpc_request *req)
+static int ost_san_brw(struct ptlrpc_request *req, int alloc)
{
- if (oti && req->rq_repmsg)
- req->rq_repmsg->transno = HTON__u64(oti->oti_transno);
- EXIT;
+ struct lustre_handle *conn = (struct lustre_handle *)req->rq_reqmsg;
+ struct niobuf_remote *remote_nb, *res_nb;
+ struct obd_ioobj *ioo;
+ struct ost_body *body;
+ int cmd, rc, i, j, objcount, niocount, size[2] = {sizeof(*body)};
+ void *tmp1, *tmp2, *end2;
+ ENTRY;
+
+ body = lustre_msg_buf(req->rq_reqmsg, 0);
+ tmp1 = lustre_msg_buf(req->rq_reqmsg, 1);
+ tmp2 = lustre_msg_buf(req->rq_reqmsg, 2);
+ end2 = (char *)tmp2 + req->rq_reqmsg->buflens[2];
+ objcount = req->rq_reqmsg->buflens[1] / sizeof(*ioo);
+ niocount = req->rq_reqmsg->buflens[2] / sizeof(*remote_nb);
+
+ cmd = alloc ? OBD_BRW_WRITE : OBD_BRW_READ;
+
+ for (i = 0; i < objcount; i++) {
+ ost_unpack_ioo((void *)&tmp1, &ioo);
+ if (tmp2 + ioo->ioo_bufcnt > end2) {
+ rc = -EFAULT;
+ break;
+ }
+ for (j = 0; j < ioo->ioo_bufcnt; j++)
+ ost_unpack_niobuf((void *)&tmp2, &remote_nb);
+ }
+
+ size[1] = niocount * sizeof(*remote_nb);
+ rc = lustre_pack_msg(2, size, NULL, &req->rq_replen, &req->rq_repmsg);
+ if (rc)
+ GOTO(out, rc);
+
+ /* The unpackers move tmp1 and tmp2, so reset them before using */
+ tmp1 = lustre_msg_buf(req->rq_reqmsg, 1);
+ tmp2 = lustre_msg_buf(req->rq_reqmsg, 2);
+
+ req->rq_status = obd_san_preprw(cmd, conn, objcount, tmp1,
+ niocount, tmp2);
+
+ if (req->rq_status) {
+ rc = 0;
+ goto out;
+ }
+
+ remote_nb = lustre_msg_buf(req->rq_repmsg, 1);
+ res_nb = lustre_msg_buf(req->rq_reqmsg, 2);
+ for (i = 0; i < niocount; i++) {
+ /* this advances remote_nb */
+ ost_pack_niobuf((void **)&remote_nb,
+ res_nb[i].offset,
+ res_nb[i].len, /* 0 */
+ res_nb[i].flags, /* 0 */
+ res_nb[i].xid
+ );
+ }
+
+ rc = 0;
+
+out:
+ if (rc) {
+ OBD_FREE(req->rq_repmsg, req->rq_replen);
+ req->rq_repmsg = NULL;
+ ptlrpc_error(req->rq_svc, req);
+ } else
+ ptlrpc_reply(req->rq_svc, req);
+
+ return rc;
+}
+
+static int filter_recovery_request(struct ptlrpc_request *req,
+ struct obd_device *obd, int *process)
+{
+ switch (req->rq_reqmsg->opc) {
+ case OST_CONNECT: /* This will never get here, but for completeness. */
+ case OST_DISCONNECT:
+ *process = 1;
+ RETURN(0);
+
+ case OST_CLOSE:
+ case OST_CREATE:
+ case OST_DESTROY:
+ case OST_OPEN:
+ case OST_PUNCH:
+ case OST_SETATTR:
+ case OST_SYNCFS:
+ case OST_WRITE:
+ case LDLM_ENQUEUE:
+ *process = target_queue_recovery_request(req, obd);
+ RETURN(0);
+
+ default:
+ DEBUG_REQ(D_ERROR, req, "not permitted during recovery");
+ *process = 0;
+ /* XXX what should we set rq_status to here? */
+ RETURN(ptlrpc_error(req->rq_svc, req));
+ }
}
static int ost_handle(struct ptlrpc_request *req)
{
struct obd_trans_info trans_info = { 0, }, *oti = &trans_info;
- int rc;
+ int should_process, rc;
ENTRY;
rc = lustre_unpack_msg(req->rq_reqmsg, req->rq_reqlen);
GOTO(out, rc);
}
- if (req->rq_reqmsg->opc != OST_CONNECT && req->rq_export == NULL) {
- CERROR("lustre_ost: operation %d on unconnected OST\n",
- req->rq_reqmsg->opc);
- req->rq_status = -ENOTCONN;
- GOTO(out, rc = -ENOTCONN);
- }
+ if (req->rq_reqmsg->opc != OST_CONNECT) {
+ struct obd_device *obd;
+
+ if (req->rq_export == NULL) {
+ CERROR("lustre_ost: operation %d on unconnected OST\n",
+ req->rq_reqmsg->opc);
+ req->rq_status = -ENOTCONN;
+ GOTO(out, rc = -ENOTCONN);
+ }
+
+ obd = req->rq_export->exp_obd;
+
+ spin_lock_bh(&obd->obd_processing_task_lock);
+ if (obd->obd_flags & OBD_ABORT_RECOVERY)
+ target_abort_recovery(obd);
+ spin_unlock_bh(&obd->obd_processing_task_lock);
+
+ if (obd->obd_flags & OBD_RECOVERING) {
+ rc = filter_recovery_request(req, obd, &should_process);
+ if (rc || !should_process)
+ RETURN(rc);
+ } else if (lustre_msg_get_flags(req->rq_reqmsg) & MSG_RESENT) {
+#if 0
+/* need to store this reply somewhere... */
+ if (req->rq_xid == med->med_last_xid) {
+ DEBUG_REQ(D_HA, req, "resending reply");
+ OBD_ALLOC(req->rq_repmsg, med->med_last_replen);
+ req->rq_replen = med->med_last_replen;
+ memcpy(req->rq_repmsg, med->med_last_reply,
+ req->rq_replen);
+ ptlrpc_reply(req->rq_svc, req);
+ return 0;
+ }
+ DEBUG_REQ(D_HA, req, "no reply for resend, continuing");
+#endif
+ }
+
+ }
if (strcmp(req->rq_obd->obd_type->typ_name, "ost") != 0)
GOTO(out, rc = -EINVAL);
case OST_CONNECT:
CDEBUG(D_INODE, "connect\n");
OBD_FAIL_RETURN(OBD_FAIL_OST_CONNECT_NET, 0);
- rc = target_handle_connect(req);
+ rc = target_handle_connect(req, ost_handle);
break;
case OST_DISCONNECT:
CDEBUG(D_INODE, "disconnect\n");
rc = ost_brw_read(req);
/* ost_brw sends its own replies */
RETURN(rc);
+ case OST_SAN_READ:
+ CDEBUG(D_INODE, "san read\n");
+ OBD_FAIL_RETURN(OBD_FAIL_OST_BRW_NET, 0);
+ rc = ost_san_brw(req, 0);
+ /* ost_san_brw sends its own replies */
+ RETURN(rc);
+ case OST_SAN_WRITE:
+ CDEBUG(D_INODE, "san write\n");
+ OBD_FAIL_RETURN(OBD_FAIL_OST_BRW_NET, 0);
+ rc = ost_san_brw(req, 1);
+ /* ost_san_brw sends its own replies */
+ RETURN(rc);
case OST_PUNCH:
CDEBUG(D_INODE, "punch\n");
OBD_FAIL_RETURN(OBD_FAIL_OST_PUNCH_NET, 0);
OBD_FAIL_RETURN(OBD_FAIL_OST_STATFS_NET, 0);
rc = ost_statfs(req);
break;
+ case OST_SYNCFS:
+ CDEBUG(D_INODE, "sync\n");
+ OBD_FAIL_RETURN(OBD_FAIL_OST_SYNCFS_NET, 0);
+ rc = ost_syncfs(req);
+ break;
case LDLM_ENQUEUE:
CDEBUG(D_INODE, "enqueue\n");
OBD_FAIL_RETURN(OBD_FAIL_LDLM_ENQUEUE, 0);
}
out:
- //req->rq_status = rc;
+ if (lustre_msg_get_flags(req->rq_reqmsg) & MSG_LAST_REPLAY) {
+ struct obd_device *obd = req->rq_export->exp_obd;
+
+ if (obd && (obd->obd_flags & OBD_RECOVERING)) {
+ DEBUG_REQ(D_HA, req, "LAST_REPLAY, queuing reply");
+ return target_queue_final_reply(req, rc);
+ }
+ /* Lost a race with recovery; let the error path DTRT. */
+ rc = req->rq_status = -ENOTCONN;
+ }
+
if (rc) {
CERROR("ost: processing error (opcode=%d): %d\n",
req->rq_reqmsg->opc, rc);
static int ost_setup(struct obd_device *obddev, obd_count len, void *buf)
{
struct ost_obd *ost = &obddev->u.ost;
- struct obd_uuid self = { "self" };
int err;
int i;
ENTRY;
ost->ost_service = ptlrpc_init_svc(OST_NEVENTS, OST_NBUFS,
OST_BUFSIZE, OST_MAXREQSIZE,
OST_REQUEST_PORTAL, OSC_REPLY_PORTAL,
- &self, ost_handle, "ost");
+ ost_handle, "ost");
if (!ost->ost_service) {
CERROR("failed to start service\n");
GOTO(error_disc, err = -ENOMEM);
return lprocfs_obd_detach(dev);
}
-/* This is so similar to mds_connect that it makes my heart weep: we should
- * shuffle the UUID into obd_export proper and make this all happen in
- * target_handle_connect.
+/* I don't think this function is ever used, since nothing
+ * connects directly to this module.
*/
static int ost_connect(struct lustre_handle *conn,
struct obd_device *obd, struct obd_uuid *cluuid,
ptlrpc_recovery_cb_t recover)
{
struct obd_export *exp;
- struct ost_export_data *oed;
- struct list_head *p;
int rc;
ENTRY;
if (!conn || !obd || !cluuid)
RETURN(-EINVAL);
- /* lctl gets a backstage, all-access pass. */
- if (!strcmp(cluuid->uuid, "OBD_CLASS_UUID"))
- goto dont_check_exports;
-
- spin_lock(&obd->obd_dev_lock);
- list_for_each(p, &obd->obd_exports) {
- exp = list_entry(p, struct obd_export, exp_obd_chain);
- oed = &exp->exp_ost_data;
- if (!memcmp(cluuid->uuid, oed->oed_uuid.uuid,
- sizeof(oed->oed_uuid.uuid))) {
- spin_unlock(&obd->obd_dev_lock);
- LASSERT(exp->exp_obd == obd);
-
- RETURN(target_handle_reconnect(conn, exp, cluuid));
- }
- }
-
- dont_check_exports:
rc = class_connect(conn, obd, cluuid);
if (rc)
RETURN(rc);
exp = class_conn2export(conn);
LASSERT(exp);
- oed = &exp->exp_ost_data;
- memcpy(oed->oed_uuid.uuid, cluuid->uuid, sizeof(oed->oed_uuid.uuid));
-
RETURN(0);
}