+ struct lustre_handle *conn = (struct lustre_handle *)req->rq_reqmsg;
+ struct niobuf_remote *remote_nb, *res_nb;
+ struct obd_ioobj *ioo;
+ struct ost_body *body;
+ int cmd, rc, i, j, objcount, niocount, size[2] = {sizeof(*body)};
+ void *tmp1, *tmp2, *end2;
+ ENTRY;
+
+ body = lustre_msg_buf(req->rq_reqmsg, 0);
+ tmp1 = lustre_msg_buf(req->rq_reqmsg, 1);
+ tmp2 = lustre_msg_buf(req->rq_reqmsg, 2);
+ end2 = (char *)tmp2 + req->rq_reqmsg->buflens[2];
+ objcount = req->rq_reqmsg->buflens[1] / sizeof(*ioo);
+ niocount = req->rq_reqmsg->buflens[2] / sizeof(*remote_nb);
+
+ cmd = alloc ? OBD_BRW_WRITE : OBD_BRW_READ;
+
+ for (i = 0; i < objcount; i++) {
+ ost_unpack_ioo((void *)&tmp1, &ioo);
+ if (tmp2 + ioo->ioo_bufcnt > end2) {
+ rc = -EFAULT;
+ break;
+ }
+ for (j = 0; j < ioo->ioo_bufcnt; j++)
+ ost_unpack_niobuf((void *)&tmp2, &remote_nb);
+ }
+
+ size[1] = niocount * sizeof(*remote_nb);
+ rc = lustre_pack_msg(2, size, NULL, &req->rq_replen, &req->rq_repmsg);
+ if (rc)
+ GOTO(out, rc);
+
+ /* The unpackers move tmp1 and tmp2, so reset them before using */
+ tmp1 = lustre_msg_buf(req->rq_reqmsg, 1);
+ tmp2 = lustre_msg_buf(req->rq_reqmsg, 2);
+
+ req->rq_status = obd_san_preprw(cmd, conn, objcount, tmp1,
+ niocount, tmp2);
+
+ if (req->rq_status) {
+ rc = 0;
+ goto out;
+ }
+
+ remote_nb = lustre_msg_buf(req->rq_repmsg, 1);
+ res_nb = lustre_msg_buf(req->rq_reqmsg, 2);
+ for (i = 0; i < niocount; i++) {
+ /* this advances remote_nb */
+ ost_pack_niobuf((void **)&remote_nb,
+ res_nb[i].offset,
+ res_nb[i].len, /* 0 */
+ res_nb[i].flags, /* 0 */
+ res_nb[i].xid
+ );
+ }
+
+ rc = 0;
+
+out:
+ if (rc) {
+ OBD_FREE(req->rq_repmsg, req->rq_replen);
+ req->rq_repmsg = NULL;
+ ptlrpc_error(req->rq_svc, req);
+ } else
+ ptlrpc_reply(req->rq_svc, req);
+
+ return rc;
+}
+
+static int filter_recovery_request(struct ptlrpc_request *req,
+ struct obd_device *obd, int *process)
+{
+ switch (req->rq_reqmsg->opc) {
+ case OST_CONNECT: /* This will never get here, but for completeness. */
+ case OST_DISCONNECT:
+ *process = 1;
+ RETURN(0);
+
+ case OST_CLOSE:
+ case OST_CREATE:
+ case OST_DESTROY:
+ case OST_OPEN:
+ case OST_PUNCH:
+ case OST_SETATTR:
+ case OST_SYNCFS:
+ case OST_WRITE:
+ case LDLM_ENQUEUE:
+ *process = target_queue_recovery_request(req, obd);
+ RETURN(0);
+
+ default:
+ DEBUG_REQ(D_ERROR, req, "not permitted during recovery");
+ *process = 0;
+ /* XXX what should we set rq_status to here? */
+ RETURN(ptlrpc_error(req->rq_svc, req));
+ }