X-Git-Url: https://git.whamcloud.com/?a=blobdiff_plain;f=lustre%2Fost%2Fost_handler.c;h=18a1b85120498115f9ddf8268cff8ff031447fa6;hb=040033cef24c5aca2967daf2da7a862abcd074cf;hp=d595757f6b2d0cfcccc728537c8ea02937923b69;hpb=5c0b2855f617d17e3c0749244edeca1b706f8a7e;p=fs%2Flustre-release.git diff --git a/lustre/ost/ost_handler.c b/lustre/ost/ost_handler.c index d595757..18a1b85 100644 --- a/lustre/ost/ost_handler.c +++ b/lustre/ost/ost_handler.c @@ -41,6 +41,12 @@ #include #include +inline void oti_to_request(struct obd_trans_info *oti, struct ptlrpc_request *req) +{ + if (oti && req->rq_repmsg) + req->rq_repmsg->transno = HTON__u64(oti->oti_transno); + EXIT; +} static int ost_destroy(struct ptlrpc_request *req, struct obd_trans_info *oti) { @@ -104,6 +110,27 @@ static int ost_statfs(struct ptlrpc_request *req) RETURN(0); } +static int ost_syncfs(struct ptlrpc_request *req) +{ + struct lustre_handle *conn = (struct lustre_handle *)req->rq_reqmsg; + struct obd_statfs *osfs; + int rc, size = sizeof(*osfs); + ENTRY; + + rc = lustre_pack_msg(0, &size, NULL, &req->rq_replen, &req->rq_repmsg); + if (rc) + RETURN(rc); + + rc = obd_syncfs(conn); + if (rc) { + CERROR("ost: syncfs failed: rc %d\n", rc); + req->rq_status = rc; + RETURN(rc); + } + + RETURN(0); +} + static int ost_open(struct ptlrpc_request *req, struct obd_trans_info *oti) { struct lustre_handle *conn = (struct lustre_handle *)req->rq_reqmsg; @@ -232,6 +259,9 @@ static int ost_brw_read(struct ptlrpc_request *req) void *desc_priv = NULL; int cmd, i, j, objcount, niocount, size = sizeof(*body); int rc = 0; +#if CHECKSUM_BULK + __u64 cksum = 0; +#endif ENTRY; body = lustre_msg_buf(req->rq_reqmsg, 0); @@ -245,6 +275,13 @@ static int ost_brw_read(struct ptlrpc_request *req) if (OBD_FAIL_CHECK(OBD_FAIL_OST_BRW_READ_BULK)) GOTO(out, req->rq_status = -EIO); + /* Hmm, we don't return anything in this reply buffer? + * We should be returning per-page status codes and also + * per-object size, blocks count, mtime, ctime. (bug 593) */ + rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg); + if (rc) + GOTO(out, req->rq_status = rc); + for (i = 0; i < objcount; i++) { ost_unpack_ioo(&tmp1, &ioo); if (tmp2 + ioo->ioo_bufcnt > end2) { @@ -284,6 +321,8 @@ static int ost_brw_read(struct ptlrpc_request *req) bulk->bp_xid = remote_nb[i].xid; bulk->bp_buf = local_nb[i].addr; bulk->bp_buflen = remote_nb[i].len; + if (body->oa.o_valid & NTOH__u32(OBD_MD_FLCKSUM)) + ost_checksum(&cksum, bulk->bp_buf, bulk->bp_buflen); } rc = ptlrpc_bulk_put(desc); @@ -306,16 +345,17 @@ out_bulk: out_local: OBD_FREE(local_nb, sizeof(*local_nb) * niocount); out: - if (!rc) - /* Hmm, we don't return anything in this reply buffer? - * We should be returning per-page status codes and also - * per-object size, blocks count, mtime, ctime. (bug 593) */ - rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, - &req->rq_repmsg); if (rc) ptlrpc_error(req->rq_svc, req); - else + else { +#if CHECKSUM_BULK + body = lustre_msg_buf(req->rq_repmsg, 0); + body->oa.o_rdev = HTON__u64(cksum); + body->oa.o_valid |= HTON__u32(OBD_MD_FLCKSUM); +#endif ptlrpc_reply(req->rq_svc, req); + } + RETURN(rc); } @@ -358,7 +398,7 @@ static int ost_brw_write(struct ptlrpc_request *req, struct obd_trans_info *oti) } } - OBD_ALLOC(local_nb, sizeof(*local_nb)* niocount); + OBD_ALLOC(local_nb, sizeof(*local_nb) * niocount); if (local_nb == NULL) GOTO(out, rc = -ENOMEM); @@ -369,7 +409,7 @@ static int ost_brw_write(struct ptlrpc_request *req, struct obd_trans_info *oti) remote_nb, local_nb, &desc_priv, oti); if (req->rq_status) - GOTO(out, rc = 0); + GOTO(out_local, rc = 0); desc = ptlrpc_prep_bulk(req->rq_connection); if (desc == NULL) @@ -403,6 +443,38 @@ static int ost_brw_write(struct ptlrpc_request *req, struct obd_trans_info *oti) GOTO(out_bulk, rc); } +#if CHECKSUM_BULK + if ((body->oa.o_valid & NTOH__u32(OBD_MD_FLCKSUM))) { + static int cksum_counter; + __u64 client_cksum = NTOH__u64(body->oa.o_rdev); + __u64 cksum = 0; + + for (i = 0; i < niocount; i++) { + char *ptr = kmap(local_nb[i].page); + int off = local_nb[i].offset & (PAGE_SIZE - 1); + int len = local_nb[i].len; + + LASSERT(off + len <= PAGE_SIZE); + ost_checksum(&cksum, ptr + off, len); + kunmap(local_nb[i].page); + } + + if (client_cksum != cksum) { + CERROR("Bad checksum: client "LPX64", server "LPX64 + ", client NID "LPX64"\n", client_cksum, cksum, + req->rq_connection->c_peer.peer_nid); + cksum_counter = 1; + } else { + cksum_counter++; + if ((cksum_counter & (-cksum_counter)) == cksum_counter) + CERROR("Checksum %d from "LPX64": "LPX64" OK\n", + cksum_counter, + req->rq_connection->c_peer.peer_nid, + cksum); + } + } +#endif + req->rq_status = obd_commitrw(cmd, conn, objcount, ioo, niocount, local_nb, desc_priv, oti); @@ -419,22 +491,117 @@ static int ost_brw_write(struct ptlrpc_request *req, struct obd_trans_info *oti) &req->rq_repmsg); if (rc) ptlrpc_error(req->rq_svc, req); - else + else { + oti_to_request(oti, req); rc = ptlrpc_reply(req->rq_svc, req); + } RETURN(rc); } -inline void oti_to_request(struct obd_trans_info *oti, struct ptlrpc_request *req) +static int ost_san_brw(struct ptlrpc_request *req, int alloc) { - if (oti && req->rq_repmsg) - req->rq_repmsg->transno = HTON__u64(oti->oti_transno); - EXIT; + struct lustre_handle *conn = (struct lustre_handle *)req->rq_reqmsg; + struct niobuf_remote *remote_nb, *res_nb; + struct obd_ioobj *ioo; + struct ost_body *body; + int cmd, rc, i, j, objcount, niocount, size[2] = {sizeof(*body)}; + void *tmp1, *tmp2, *end2; + ENTRY; + + body = lustre_msg_buf(req->rq_reqmsg, 0); + tmp1 = lustre_msg_buf(req->rq_reqmsg, 1); + tmp2 = lustre_msg_buf(req->rq_reqmsg, 2); + end2 = (char *)tmp2 + req->rq_reqmsg->buflens[2]; + objcount = req->rq_reqmsg->buflens[1] / sizeof(*ioo); + niocount = req->rq_reqmsg->buflens[2] / sizeof(*remote_nb); + + cmd = alloc ? OBD_BRW_WRITE : OBD_BRW_READ; + + for (i = 0; i < objcount; i++) { + ost_unpack_ioo((void *)&tmp1, &ioo); + if (tmp2 + ioo->ioo_bufcnt > end2) { + rc = -EFAULT; + break; + } + for (j = 0; j < ioo->ioo_bufcnt; j++) + ost_unpack_niobuf((void *)&tmp2, &remote_nb); + } + + size[1] = niocount * sizeof(*remote_nb); + rc = lustre_pack_msg(2, size, NULL, &req->rq_replen, &req->rq_repmsg); + if (rc) + GOTO(out, rc); + + /* The unpackers move tmp1 and tmp2, so reset them before using */ + tmp1 = lustre_msg_buf(req->rq_reqmsg, 1); + tmp2 = lustre_msg_buf(req->rq_reqmsg, 2); + + req->rq_status = obd_san_preprw(cmd, conn, objcount, tmp1, + niocount, tmp2); + + if (req->rq_status) { + rc = 0; + goto out; + } + + remote_nb = lustre_msg_buf(req->rq_repmsg, 1); + res_nb = lustre_msg_buf(req->rq_reqmsg, 2); + for (i = 0; i < niocount; i++) { + /* this advances remote_nb */ + ost_pack_niobuf((void **)&remote_nb, + res_nb[i].offset, + res_nb[i].len, /* 0 */ + res_nb[i].flags, /* 0 */ + res_nb[i].xid + ); + } + + rc = 0; + +out: + if (rc) { + OBD_FREE(req->rq_repmsg, req->rq_replen); + req->rq_repmsg = NULL; + ptlrpc_error(req->rq_svc, req); + } else + ptlrpc_reply(req->rq_svc, req); + + return rc; +} + +static int filter_recovery_request(struct ptlrpc_request *req, + struct obd_device *obd, int *process) +{ + switch (req->rq_reqmsg->opc) { + case OST_CONNECT: /* This will never get here, but for completeness. */ + case OST_DISCONNECT: + *process = 1; + RETURN(0); + + case OST_CLOSE: + case OST_CREATE: + case OST_DESTROY: + case OST_OPEN: + case OST_PUNCH: + case OST_SETATTR: + case OST_SYNCFS: + case OST_WRITE: + case LDLM_ENQUEUE: + *process = target_queue_recovery_request(req, obd); + RETURN(0); + + default: + DEBUG_REQ(D_ERROR, req, "not permitted during recovery"); + *process = 0; + /* XXX what should we set rq_status to here? */ + RETURN(ptlrpc_error(req->rq_svc, req)); + } } static int ost_handle(struct ptlrpc_request *req) { struct obd_trans_info trans_info = { 0, }, *oti = &trans_info; - int rc; + int should_process, rc; ENTRY; rc = lustre_unpack_msg(req->rq_reqmsg, req->rq_reqlen); @@ -443,12 +610,44 @@ static int ost_handle(struct ptlrpc_request *req) GOTO(out, rc); } - if (req->rq_reqmsg->opc != OST_CONNECT && req->rq_export == NULL) { - CERROR("lustre_ost: operation %d on unconnected OST\n", - req->rq_reqmsg->opc); - req->rq_status = -ENOTCONN; - GOTO(out, rc = -ENOTCONN); - } + if (req->rq_reqmsg->opc != OST_CONNECT) { + struct obd_device *obd; + + if (req->rq_export == NULL) { + CERROR("lustre_ost: operation %d on unconnected OST\n", + req->rq_reqmsg->opc); + req->rq_status = -ENOTCONN; + GOTO(out, rc = -ENOTCONN); + } + + obd = req->rq_export->exp_obd; + + spin_lock_bh(&obd->obd_processing_task_lock); + if (obd->obd_flags & OBD_ABORT_RECOVERY) + target_abort_recovery(obd); + spin_unlock_bh(&obd->obd_processing_task_lock); + + if (obd->obd_flags & OBD_RECOVERING) { + rc = filter_recovery_request(req, obd, &should_process); + if (rc || !should_process) + RETURN(rc); + } else if (lustre_msg_get_flags(req->rq_reqmsg) & MSG_RESENT) { +#if 0 +/* need to store this reply somewhere... */ + if (req->rq_xid == med->med_last_xid) { + DEBUG_REQ(D_HA, req, "resending reply"); + OBD_ALLOC(req->rq_repmsg, med->med_last_replen); + req->rq_replen = med->med_last_replen; + memcpy(req->rq_repmsg, med->med_last_reply, + req->rq_replen); + ptlrpc_reply(req->rq_svc, req); + return 0; + } + DEBUG_REQ(D_HA, req, "no reply for resend, continuing"); +#endif + } + + } if (strcmp(req->rq_obd->obd_type->typ_name, "ost") != 0) GOTO(out, rc = -EINVAL); @@ -457,7 +656,7 @@ static int ost_handle(struct ptlrpc_request *req) case OST_CONNECT: CDEBUG(D_INODE, "connect\n"); OBD_FAIL_RETURN(OBD_FAIL_OST_CONNECT_NET, 0); - rc = target_handle_connect(req); + rc = target_handle_connect(req, ost_handle); break; case OST_DISCONNECT: CDEBUG(D_INODE, "disconnect\n"); @@ -506,6 +705,18 @@ static int ost_handle(struct ptlrpc_request *req) rc = ost_brw_read(req); /* ost_brw sends its own replies */ RETURN(rc); + case OST_SAN_READ: + CDEBUG(D_INODE, "san read\n"); + OBD_FAIL_RETURN(OBD_FAIL_OST_BRW_NET, 0); + rc = ost_san_brw(req, 0); + /* ost_san_brw sends its own replies */ + RETURN(rc); + case OST_SAN_WRITE: + CDEBUG(D_INODE, "san write\n"); + OBD_FAIL_RETURN(OBD_FAIL_OST_BRW_NET, 0); + rc = ost_san_brw(req, 1); + /* ost_san_brw sends its own replies */ + RETURN(rc); case OST_PUNCH: CDEBUG(D_INODE, "punch\n"); OBD_FAIL_RETURN(OBD_FAIL_OST_PUNCH_NET, 0); @@ -516,6 +727,11 @@ static int ost_handle(struct ptlrpc_request *req) OBD_FAIL_RETURN(OBD_FAIL_OST_STATFS_NET, 0); rc = ost_statfs(req); break; + case OST_SYNCFS: + CDEBUG(D_INODE, "sync\n"); + OBD_FAIL_RETURN(OBD_FAIL_OST_SYNCFS_NET, 0); + rc = ost_syncfs(req); + break; case LDLM_ENQUEUE: CDEBUG(D_INODE, "enqueue\n"); OBD_FAIL_RETURN(OBD_FAIL_LDLM_ENQUEUE, 0); @@ -561,7 +777,17 @@ static int ost_handle(struct ptlrpc_request *req) } out: - //req->rq_status = rc; + if (lustre_msg_get_flags(req->rq_reqmsg) & MSG_LAST_REPLAY) { + struct obd_device *obd = req->rq_export->exp_obd; + + if (obd && (obd->obd_flags & OBD_RECOVERING)) { + DEBUG_REQ(D_HA, req, "LAST_REPLAY, queuing reply"); + return target_queue_final_reply(req, rc); + } + /* Lost a race with recovery; let the error path DTRT. */ + rc = req->rq_status = -ENOTCONN; + } + if (rc) { CERROR("ost: processing error (opcode=%d): %d\n", req->rq_reqmsg->opc, rc); @@ -583,7 +809,6 @@ out: static int ost_setup(struct obd_device *obddev, obd_count len, void *buf) { struct ost_obd *ost = &obddev->u.ost; - struct obd_uuid self = { "self" }; int err; int i; ENTRY; @@ -591,7 +816,7 @@ static int ost_setup(struct obd_device *obddev, obd_count len, void *buf) ost->ost_service = ptlrpc_init_svc(OST_NEVENTS, OST_NBUFS, OST_BUFSIZE, OST_MAXREQSIZE, OST_REQUEST_PORTAL, OSC_REPLY_PORTAL, - &self, ost_handle, "ost"); + ost_handle, "ost"); if (!ost->ost_service) { CERROR("failed to start service\n"); GOTO(error_disc, err = -ENOMEM); @@ -639,9 +864,8 @@ int ost_detach(struct obd_device *dev) return lprocfs_obd_detach(dev); } -/* This is so similar to mds_connect that it makes my heart weep: we should - * shuffle the UUID into obd_export proper and make this all happen in - * target_handle_connect. +/* I don't think this function is ever used, since nothing + * connects directly to this module. */ static int ost_connect(struct lustre_handle *conn, struct obd_device *obd, struct obd_uuid *cluuid, @@ -649,41 +873,18 @@ static int ost_connect(struct lustre_handle *conn, ptlrpc_recovery_cb_t recover) { struct obd_export *exp; - struct ost_export_data *oed; - struct list_head *p; int rc; ENTRY; if (!conn || !obd || !cluuid) RETURN(-EINVAL); - /* lctl gets a backstage, all-access pass. */ - if (!strcmp(cluuid->uuid, "OBD_CLASS_UUID")) - goto dont_check_exports; - - spin_lock(&obd->obd_dev_lock); - list_for_each(p, &obd->obd_exports) { - exp = list_entry(p, struct obd_export, exp_obd_chain); - oed = &exp->exp_ost_data; - if (!memcmp(cluuid->uuid, oed->oed_uuid.uuid, - sizeof(oed->oed_uuid.uuid))) { - spin_unlock(&obd->obd_dev_lock); - LASSERT(exp->exp_obd == obd); - - RETURN(target_handle_reconnect(conn, exp, cluuid)); - } - } - - dont_check_exports: rc = class_connect(conn, obd, cluuid); if (rc) RETURN(rc); exp = class_conn2export(conn); LASSERT(exp); - oed = &exp->exp_ost_data; - memcpy(oed->oed_uuid.uuid, cluuid->uuid, sizeof(oed->oed_uuid.uuid)); - RETURN(0); }