1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
4 * Copyright (C) 2001-2003 Cluster File Systems, Inc.
5 * Author: Peter J. Braam <braam@clusterfs.com>
6 * Author: Phil Schwan <phil@clusterfs.com>
8 * This file is part of Lustre, http://www.lustre.org.
10 * Lustre is free software; you can redistribute it and/or
11 * modify it under the terms of version 2 of the GNU General Public
12 * License as published by the Free Software Foundation.
14 * Lustre is distributed in the hope that it will be useful,
15 * but WITHOUT ANY WARRANTY; without even the implied warranty of
16 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 * GNU General Public License for more details.
19 * You should have received a copy of the GNU General Public License
20 * along with Lustre; if not, write to the Free Software
21 * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
23 * Storage Target Handling functions
24 * Lustre Object Server Module (OST)
26 * This server is single threaded at present (but can easily be multi
27 * threaded). For testing and management it is treated as an
28 * obd_device, although it does not export a full OBD method table
29 * (the requests are coming in over the wire, so object target
30 * modules do not have a full method table.)
34 # define EXPORT_SYMTAB
36 #define DEBUG_SUBSYSTEM S_OST
38 #include <linux/module.h>
39 #include <linux/obd_ost.h>
40 #include <linux/lustre_net.h>
41 #include <linux/lustre_dlm.h>
42 #include <linux/lustre_export.h>
43 #include <linux/lustre_debug.h>
44 #include <linux/init.h>
45 #include <linux/lprocfs_status.h>
46 #include <linux/lustre_commit_confd.h>
47 #include <libcfs/list.h>
48 #include "ost_internal.h"
50 void oti_init(struct obd_trans_info *oti, struct ptlrpc_request *req)
54 memset(oti, 0, sizeof *oti);
56 if (req->rq_repmsg && req->rq_reqmsg != 0)
57 oti->oti_transno = req->rq_repmsg->transno;
60 void oti_to_request(struct obd_trans_info *oti, struct ptlrpc_request *req)
62 struct oti_req_ack_lock *ack_lock;
69 req->rq_repmsg->transno = oti->oti_transno;
71 /* XXX 4 == entries in oti_ack_locks??? */
72 for (ack_lock = oti->oti_ack_locks, i = 0; i < 4; i++, ack_lock++) {
75 /* XXX not even calling target_send_reply in some cases... */
76 ptlrpc_save_lock (req, &ack_lock->lock, ack_lock->mode);
80 static int ost_destroy(struct obd_export *exp, struct ptlrpc_request *req,
81 struct obd_trans_info *oti)
83 struct ost_body *body, *repbody;
84 int rc, size = sizeof(*body);
87 body = lustre_swab_reqbuf(req, 0, sizeof(*body), lustre_swab_ost_body);
91 rc = lustre_pack_reply(req, 1, &size, NULL);
95 if (body->oa.o_valid & OBD_MD_FLCOOKIE)
96 oti->oti_logcookies = obdo_logcookie(&body->oa);
97 repbody = lustre_msg_buf(req->rq_repmsg, 0, sizeof(*repbody));
98 memcpy(&repbody->oa, &body->oa, sizeof(body->oa));
99 req->rq_status = obd_destroy(exp, &body->oa, NULL, oti);
103 static int ost_getattr(struct obd_export *exp, struct ptlrpc_request *req)
105 struct ost_body *body, *repbody;
106 int rc, size = sizeof(*body);
109 body = lustre_swab_reqbuf(req, 0, sizeof(*body), lustre_swab_ost_body);
113 rc = lustre_pack_reply(req, 1, &size, NULL);
117 repbody = lustre_msg_buf (req->rq_repmsg, 0, sizeof(*repbody));
118 memcpy(&repbody->oa, &body->oa, sizeof(body->oa));
119 req->rq_status = obd_getattr(exp, &repbody->oa, NULL);
123 static int ost_statfs(struct ptlrpc_request *req)
125 struct obd_statfs *osfs;
126 int rc, size = sizeof(*osfs);
129 rc = lustre_pack_reply(req, 1, &size, NULL);
133 osfs = lustre_msg_buf(req->rq_repmsg, 0, sizeof(*osfs));
135 req->rq_status = obd_statfs(req->rq_export->exp_obd, osfs, jiffies-HZ);
136 if (req->rq_status != 0)
137 CERROR("ost: statfs failed: rc %d\n", req->rq_status);
142 static int ost_create(struct obd_export *exp, struct ptlrpc_request *req,
143 struct obd_trans_info *oti)
145 struct ost_body *body, *repbody;
146 int rc, size = sizeof(*repbody);
149 body = lustre_swab_reqbuf(req, 0, sizeof(*body), lustre_swab_ost_body);
153 rc = lustre_pack_reply(req, 1, &size, NULL);
157 repbody = lustre_msg_buf (req->rq_repmsg, 0, sizeof(*repbody));
158 memcpy(&repbody->oa, &body->oa, sizeof(body->oa));
159 oti->oti_logcookies = obdo_logcookie(&repbody->oa);
160 req->rq_status = obd_create(exp, &repbody->oa, NULL, oti);
161 //obd_log_cancel(conn, NULL, 1, oti->oti_logcookies, 0);
165 static int ost_punch(struct obd_export *exp, struct ptlrpc_request *req,
166 struct obd_trans_info *oti)
168 struct ost_body *body, *repbody;
169 int rc, size = sizeof(*repbody);
172 body = lustre_swab_reqbuf(req, 0, sizeof(*body), lustre_swab_ost_body);
176 if ((body->oa.o_valid & (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS)) !=
177 (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS))
180 rc = lustre_pack_reply(req, 1, &size, NULL);
184 repbody = lustre_msg_buf(req->rq_repmsg, 0, sizeof(*repbody));
185 memcpy(&repbody->oa, &body->oa, sizeof(body->oa));
186 req->rq_status = obd_punch(exp, &repbody->oa, NULL, repbody->oa.o_size,
187 repbody->oa.o_blocks, oti);
191 static int ost_sync(struct obd_export *exp, struct ptlrpc_request *req)
193 struct ost_body *body, *repbody;
194 int rc, size = sizeof(*repbody);
197 body = lustre_swab_reqbuf(req, 0, sizeof(*body), lustre_swab_ost_body);
201 rc = lustre_pack_reply(req, 1, &size, NULL);
205 repbody = lustre_msg_buf(req->rq_repmsg, 0, sizeof(*repbody));
206 memcpy(&repbody->oa, &body->oa, sizeof(body->oa));
207 req->rq_status = obd_sync(exp, &repbody->oa, NULL, repbody->oa.o_size,
208 repbody->oa.o_blocks);
212 static int ost_setattr(struct obd_export *exp, struct ptlrpc_request *req,
213 struct obd_trans_info *oti)
215 struct ost_body *body, *repbody;
216 int rc, size = sizeof(*repbody);
219 body = lustre_swab_reqbuf(req, 0, sizeof(*body), lustre_swab_ost_body);
223 rc = lustre_pack_reply(req, 1, &size, NULL);
227 repbody = lustre_msg_buf(req->rq_repmsg, 0, sizeof(*repbody));
228 memcpy(&repbody->oa, &body->oa, sizeof(body->oa));
230 req->rq_status = obd_setattr(exp, &repbody->oa, NULL, oti);
234 static int ost_bulk_timeout(void *data)
237 /* We don't fail the connection here, because having the export
238 * killed makes the (vital) call to commitrw very sad.
243 static int get_per_page_niobufs(struct obd_ioobj *ioo, int nioo,
244 struct niobuf_remote *rnb, int nrnb,
245 struct niobuf_remote **pp_rnbp)
247 /* Copy a remote niobuf, splitting it into page-sized chunks
248 * and setting ioo[i].ioo_bufcnt accordingly */
249 struct niobuf_remote *pp_rnb;
256 /* first count and check the number of pages required */
257 for (i = 0; i < nioo; i++)
258 for (j = 0; j < ioo->ioo_bufcnt; j++, rnbidx++) {
259 obd_off offset = rnb[rnbidx].offset;
260 obd_off p0 = offset >> PAGE_SHIFT;
261 obd_off pn = (offset + rnb[rnbidx].len - 1)>>PAGE_SHIFT;
263 LASSERT(rnbidx < nrnb);
265 npages += (pn + 1 - p0);
267 if (rnb[rnbidx].len == 0) {
268 CERROR("zero len BRW: obj %d objid "LPX64
269 " buf %u\n", i, ioo[i].ioo_id, j);
273 rnb[rnbidx].offset <= rnb[rnbidx-1].offset) {
274 CERROR("unordered BRW: obj %d objid "LPX64
275 " buf %u offset "LPX64" <= "LPX64"\n",
276 i, ioo[i].ioo_id, j, rnb[rnbidx].offset,
282 LASSERT(rnbidx == nrnb);
284 if (npages == nrnb) { /* all niobufs are for single pages */
289 OBD_ALLOC(pp_rnb, sizeof(*pp_rnb) * npages);
293 /* now do the actual split */
295 for (i = 0; i < nioo; i++) {
298 for (j = 0; j < ioo[i].ioo_bufcnt; j++, rnbidx++) {
299 obd_off off = rnb[rnbidx].offset;
300 int nob = rnb[rnbidx].len;
302 LASSERT(rnbidx < nrnb);
304 obd_off poff = off & (PAGE_SIZE - 1);
305 int pnob = (poff + nob > PAGE_SIZE) ?
306 PAGE_SIZE - poff : nob;
308 LASSERT(page < npages);
309 pp_rnb[page].len = pnob;
310 pp_rnb[page].offset = off;
311 pp_rnb[page].flags = rnb[rnbidx].flags;
313 CDEBUG(0, " obj %d id "LPX64
314 "page %d(%d) "LPX64" for %d, flg %x\n",
315 i, ioo[i].ioo_id, obj_pages, page,
316 pp_rnb[page].offset, pp_rnb[page].len,
326 ioo[i].ioo_bufcnt = obj_pages;
328 LASSERT(page == npages);
334 static void free_per_page_niobufs (int npages, struct niobuf_remote *pp_rnb,
335 struct niobuf_remote *rnb)
337 if (pp_rnb == rnb) /* didn't allocate above */
340 OBD_FREE(pp_rnb, sizeof(*pp_rnb) * npages);
344 obd_count ost_checksum_bulk(struct ptlrpc_bulk_desc *desc)
349 for (i = 0; i < desc->bd_iov_count; i++) {
350 struct page *page = desc->bd_iov[i].kiov_page;
351 char *ptr = kmap(page);
352 int psum, off = desc->bd_iov[i].kiov_offset & ~PAGE_MASK;
353 int count = desc->bd_iov[i].kiov_len;
356 ost_checksum(&cksum, &psum, ptr + off,
357 count > CHECKSUM_CHUNK ?
358 CHECKSUM_CHUNK : count);
359 LL_CDEBUG_PAGE(D_PAGE, page, "off %d checksum %x\n",
361 off += CHECKSUM_CHUNK;
362 count -= CHECKSUM_CHUNK;
371 static int ost_brw_read(struct ptlrpc_request *req, struct obd_trans_info *oti)
373 struct ptlrpc_bulk_desc *desc;
374 struct niobuf_remote *remote_nb;
375 struct niobuf_remote *pp_rnb;
376 struct niobuf_local *local_nb;
377 struct obd_ioobj *ioo;
378 struct ost_body *body, *repbody;
379 struct l_wait_info lwi;
380 int size[1] = { sizeof(*body) };
389 if (OBD_FAIL_CHECK(OBD_FAIL_OST_BRW_READ_BULK))
390 GOTO(out, rc = -EIO);
392 OBD_FAIL_TIMEOUT(OBD_FAIL_OST_BRW_PAUSE_BULK | OBD_FAIL_ONCE,
393 (obd_timeout + 1) / 4);
395 body = lustre_swab_reqbuf(req, 0, sizeof(*body), lustre_swab_ost_body);
397 CERROR("Missing/short ost_body\n");
398 GOTO(out, rc = -EFAULT);
401 ioo = lustre_swab_reqbuf(req, 1, sizeof(*ioo), lustre_swab_obd_ioobj);
403 CERROR("Missing/short ioobj\n");
404 GOTO(out, rc = -EFAULT);
407 niocount = ioo->ioo_bufcnt;
408 remote_nb = lustre_swab_reqbuf(req, 2, niocount * sizeof(*remote_nb),
409 lustre_swab_niobuf_remote);
410 if (remote_nb == NULL) {
411 CERROR("Missing/short niobuf\n");
412 GOTO(out, rc = -EFAULT);
414 if (lustre_msg_swabbed(req->rq_reqmsg)) { /* swab remaining niobufs */
415 for (i = 1; i < niocount; i++)
416 lustre_swab_niobuf_remote (&remote_nb[i]);
419 rc = lustre_pack_reply(req, 1, size, NULL);
423 /* FIXME all niobuf splitting should be done in obdfilter if needed */
424 /* CAVEAT EMPTOR this sets ioo->ioo_bufcnt to # pages */
425 npages = get_per_page_niobufs(ioo, 1, remote_nb, niocount, &pp_rnb);
427 GOTO(out, rc = npages);
429 OBD_ALLOC(local_nb, sizeof(*local_nb) * npages);
430 if (local_nb == NULL)
431 GOTO(out_pp_rnb, rc = -ENOMEM);
433 desc = ptlrpc_prep_bulk_exp (req, npages,
434 BULK_PUT_SOURCE, OST_BULK_PORTAL);
436 GOTO(out_local, rc = -ENOMEM);
438 rc = obd_preprw(OBD_BRW_READ, req->rq_export, &body->oa, 1,
439 ioo, npages, pp_rnb, local_nb, oti);
443 /* We're finishing using body->oa as an input variable */
444 body->oa.o_valid = 0;
447 for (i = 0; i < npages; i++) {
448 int page_rc = local_nb[i].rc;
450 if (page_rc < 0) { /* error */
455 LASSERT(page_rc <= pp_rnb[i].len);
457 if (page_rc != 0) { /* some data! */
458 LASSERT (local_nb[i].page != NULL);
459 ptlrpc_prep_bulk_page(desc, local_nb[i].page,
460 pp_rnb[i].offset & (PAGE_SIZE-1),
464 if (page_rc != pp_rnb[i].len) { /* short read */
465 /* All subsequent pages should be 0 */
467 LASSERT(local_nb[i].rc == 0);
473 rc = ptlrpc_start_bulk_transfer(desc);
475 lwi = LWI_TIMEOUT(obd_timeout * HZ / 4,
476 ost_bulk_timeout, desc);
477 rc = l_wait_event(desc->bd_waitq,
478 !ptlrpc_bulk_active(desc), &lwi);
479 LASSERT(rc == 0 || rc == -ETIMEDOUT);
480 if (rc == -ETIMEDOUT) {
481 DEBUG_REQ(D_ERROR, req, "timeout on bulk PUT");
482 ptlrpc_abort_bulk(desc);
483 } else if (!desc->bd_success ||
484 desc->bd_nob_transferred != desc->bd_nob) {
485 DEBUG_REQ(D_ERROR, req, "%s bulk PUT %d(%d)",
487 "truncated" : "network error on",
488 desc->bd_nob_transferred,
490 /* XXX should this be a different errno? */
494 DEBUG_REQ(D_ERROR, req, "bulk PUT failed: rc %d\n", rc);
496 comms_error = rc != 0;
499 /* Must commit after prep above in all cases */
500 rc = obd_commitrw(OBD_BRW_READ, req->rq_export, &body->oa, 1,
501 ioo, npages, local_nb, oti, rc);
504 repbody = lustre_msg_buf(req->rq_repmsg, 0, sizeof(*repbody));
505 memcpy(&repbody->oa, &body->oa, sizeof(repbody->oa));
508 repbody->oa.o_cksum = ost_checksum_bulk(desc);
509 repbody->oa.o_valid |= OBD_MD_FLCKSUM;
514 ptlrpc_free_bulk(desc);
516 OBD_FREE(local_nb, sizeof(*local_nb) * npages);
518 free_per_page_niobufs(npages, pp_rnb, remote_nb);
522 req->rq_status = nob;
523 target_committed_to_req(req);
525 } else if (!comms_error) {
526 /* only reply if comms OK */
527 target_committed_to_req(req);
531 if (req->rq_reply_state != NULL) {
532 /* reply out callback would free */
533 ptlrpc_rs_decref(req->rq_reply_state);
534 req->rq_reply_state = NULL;
536 if (req->rq_reqmsg->conn_cnt == req->rq_export->exp_conn_cnt) {
537 CERROR("bulk IO comms error: "
538 "evicting %s@%s id %s\n",
539 req->rq_export->exp_client_uuid.uuid,
540 req->rq_export->exp_connection->c_remote_uuid.uuid,
542 ptlrpc_fail_export(req->rq_export);
544 CERROR("ignoring bulk IO comms error: "
545 "client reconnected %s@%s id %s\n",
546 req->rq_export->exp_client_uuid.uuid,
547 req->rq_export->exp_connection->c_remote_uuid.uuid,
555 static int ost_brw_write(struct ptlrpc_request *req, struct obd_trans_info *oti)
557 struct ptlrpc_bulk_desc *desc;
558 struct niobuf_remote *remote_nb;
559 struct niobuf_remote *pp_rnb;
560 struct niobuf_local *local_nb;
561 struct obd_ioobj *ioo;
562 struct ost_body *body, *repbody;
563 struct l_wait_info lwi;
565 int size[2] = { sizeof(*body) };
566 int objcount, niocount, npages;
571 if (OBD_FAIL_CHECK(OBD_FAIL_OST_BRW_WRITE_BULK))
572 GOTO(out, rc = -EIO);
574 /* pause before transaction has been started */
575 OBD_FAIL_TIMEOUT(OBD_FAIL_OST_BRW_PAUSE_BULK | OBD_FAIL_ONCE,
576 (obd_timeout + 1) / 4);
578 swab = lustre_msg_swabbed(req->rq_reqmsg);
579 body = lustre_swab_reqbuf(req, 0, sizeof(*body), lustre_swab_ost_body);
581 CERROR("Missing/short ost_body\n");
582 GOTO(out, rc = -EFAULT);
585 LASSERT_REQSWAB(req, 1);
586 objcount = req->rq_reqmsg->buflens[1] / sizeof(*ioo);
588 CERROR("Missing/short ioobj\n");
589 GOTO(out, rc = -EFAULT);
591 ioo = lustre_msg_buf (req->rq_reqmsg, 1, objcount * sizeof(*ioo));
592 LASSERT (ioo != NULL);
593 for (niocount = i = 0; i < objcount; i++) {
595 lustre_swab_obd_ioobj (&ioo[i]);
596 if (ioo[i].ioo_bufcnt == 0) {
597 CERROR("ioo[%d] has zero bufcnt\n", i);
598 GOTO(out, rc = -EFAULT);
600 niocount += ioo[i].ioo_bufcnt;
603 remote_nb = lustre_swab_reqbuf(req, 2, niocount * sizeof(*remote_nb),
604 lustre_swab_niobuf_remote);
605 if (remote_nb == NULL) {
606 CERROR("Missing/short niobuf\n");
607 GOTO(out, rc = -EFAULT);
609 if (swab) { /* swab the remaining niobufs */
610 for (i = 1; i < niocount; i++)
611 lustre_swab_niobuf_remote (&remote_nb[i]);
614 size[1] = niocount * sizeof(*rcs);
615 rc = lustre_pack_reply(req, 2, size, NULL);
618 rcs = lustre_msg_buf(req->rq_repmsg, 1, niocount * sizeof(*rcs));
620 /* FIXME all niobuf splitting should be done in obdfilter if needed */
621 /* CAVEAT EMPTOR this sets ioo->ioo_bufcnt to # pages */
622 npages = get_per_page_niobufs(ioo, objcount,remote_nb,niocount,&pp_rnb);
624 GOTO(out, rc = npages);
626 OBD_ALLOC(local_nb, sizeof(*local_nb) * npages);
627 if (local_nb == NULL)
628 GOTO(out_pp_rnb, rc = -ENOMEM);
630 desc = ptlrpc_prep_bulk_exp (req, npages,
631 BULK_GET_SINK, OST_BULK_PORTAL);
633 GOTO(out_local, rc = -ENOMEM);
635 rc = obd_preprw(OBD_BRW_WRITE, req->rq_export, &body->oa, objcount,
636 ioo, npages, pp_rnb, local_nb, oti);
640 /* NB Having prepped, we must commit... */
642 for (i = 0; i < npages; i++)
643 ptlrpc_prep_bulk_page(desc, local_nb[i].page,
644 pp_rnb[i].offset & (PAGE_SIZE - 1),
647 rc = ptlrpc_start_bulk_transfer (desc);
649 lwi = LWI_TIMEOUT(obd_timeout * HZ / 4,
650 ost_bulk_timeout, desc);
651 rc = l_wait_event(desc->bd_waitq, !ptlrpc_bulk_active(desc),
653 LASSERT(rc == 0 || rc == -ETIMEDOUT);
654 if (rc == -ETIMEDOUT) {
655 DEBUG_REQ(D_ERROR, req, "timeout on bulk GET");
656 ptlrpc_abort_bulk(desc);
657 } else if (!desc->bd_success ||
658 desc->bd_nob_transferred != desc->bd_nob) {
659 DEBUG_REQ(D_ERROR, req, "%s bulk GET %d(%d)",
661 "truncated" : "network error on",
662 desc->bd_nob_transferred, desc->bd_nob);
663 /* XXX should this be a different errno? */
667 DEBUG_REQ(D_ERROR, req, "ptlrpc_bulk_get failed: rc %d\n", rc);
669 comms_error = rc != 0;
671 repbody = lustre_msg_buf(req->rq_repmsg, 0, sizeof(*repbody));
672 memcpy(&repbody->oa, &body->oa, sizeof(repbody->oa));
675 if (rc == 0 && (body->oa.o_valid & OBD_MD_FLCKSUM) != 0) {
676 static int cksum_counter;
677 obd_count client_cksum = body->oa.o_cksum;
678 obd_count cksum = ost_checksum_bulk(desc);
680 if (client_cksum != cksum) {
681 CERROR("Bad checksum: client %x, server %x id %s\n",
685 repbody->oa.o_cksum = cksum;
688 if ((cksum_counter & (-cksum_counter)) == cksum_counter)
689 CWARN("Checksum %u from %s: %x OK\n",
696 /* Must commit after prep above in all cases */
697 rc = obd_commitrw(OBD_BRW_WRITE, req->rq_export, &repbody->oa,
698 objcount, ioo, npages, local_nb, oti, rc);
701 /* set per-requested niobuf return codes */
702 for (i = j = 0; i < niocount; i++) {
703 int nob = remote_nb[i].len;
708 if (local_nb[j].rc < 0)
709 rcs[i] = local_nb[j].rc;
710 nob -= pp_rnb[j].len;
715 LASSERT(j == npages);
719 ptlrpc_free_bulk(desc);
721 OBD_FREE(local_nb, sizeof(*local_nb) * npages);
723 free_per_page_niobufs(npages, pp_rnb, remote_nb);
726 oti_to_request(oti, req);
727 target_committed_to_req(req);
728 rc = ptlrpc_reply(req);
729 } else if (!comms_error) {
730 /* Only reply if there was no comms problem with bulk */
731 target_committed_to_req(req);
735 if (req->rq_reply_state != NULL) {
736 /* reply out callback would free */
737 ptlrpc_rs_decref(req->rq_reply_state);
738 req->rq_reply_state = NULL;
740 if (req->rq_reqmsg->conn_cnt == req->rq_export->exp_conn_cnt) {
741 CERROR("%s: bulk IO comm error evicting %s@%s id %s\n",
742 req->rq_export->exp_obd->obd_name,
743 req->rq_export->exp_client_uuid.uuid,
744 req->rq_export->exp_connection->c_remote_uuid.uuid,
746 ptlrpc_fail_export(req->rq_export);
748 CERROR("ignoring bulk IO comms error: "
749 "client reconnected %s@%s id %s\n",
750 req->rq_export->exp_client_uuid.uuid,
751 req->rq_export->exp_connection->c_remote_uuid.uuid,
758 static int ost_san_brw(struct ptlrpc_request *req, int cmd)
760 struct niobuf_remote *remote_nb, *res_nb, *pp_rnb;
761 struct obd_ioobj *ioo;
762 struct ost_body *body, *repbody;
763 int rc, i, objcount, niocount, size[2] = {sizeof(*body)}, npages;
767 /* XXX not set to use latest protocol */
769 swab = lustre_msg_swabbed(req->rq_reqmsg);
770 body = lustre_swab_reqbuf(req, 0, sizeof(*body), lustre_swab_ost_body);
772 CERROR("Missing/short ost_body\n");
773 GOTO(out, rc = -EFAULT);
776 ioo = lustre_swab_reqbuf(req, 1, sizeof(*ioo), lustre_swab_obd_ioobj);
778 CERROR("Missing/short ioobj\n");
779 GOTO(out, rc = -EFAULT);
781 objcount = req->rq_reqmsg->buflens[1] / sizeof(*ioo);
782 niocount = ioo[0].ioo_bufcnt;
783 for (i = 1; i < objcount; i++) {
785 lustre_swab_obd_ioobj (&ioo[i]);
786 niocount += ioo[i].ioo_bufcnt;
789 remote_nb = lustre_swab_reqbuf(req, 2, niocount * sizeof(*remote_nb),
790 lustre_swab_niobuf_remote);
791 if (remote_nb == NULL) {
792 CERROR("Missing/short niobuf\n");
793 GOTO(out, rc = -EFAULT);
795 if (swab) { /* swab the remaining niobufs */
796 for (i = 1; i < niocount; i++)
797 lustre_swab_niobuf_remote (&remote_nb[i]);
800 /* CAVEAT EMPTOR this sets ioo->ioo_bufcnt to # pages */
801 npages = get_per_page_niobufs(ioo, objcount,remote_nb,niocount,&pp_rnb);
803 GOTO (out, rc = npages);
805 size[1] = npages * sizeof(*pp_rnb);
806 rc = lustre_pack_reply(req, 2, size, NULL);
808 GOTO(out_pp_rnb, rc);
810 req->rq_status = obd_san_preprw(cmd, req->rq_export, &body->oa,
811 objcount, ioo, npages, pp_rnb);
814 GOTO(out_pp_rnb, rc = 0);
816 repbody = lustre_msg_buf(req->rq_repmsg, 0, sizeof(*repbody));
817 memcpy(&repbody->oa, &body->oa, sizeof(body->oa));
819 res_nb = lustre_msg_buf(req->rq_repmsg, 1, size[1]);
820 memcpy(res_nb, remote_nb, size[1]);
823 free_per_page_niobufs(npages, pp_rnb, remote_nb);
825 target_committed_to_req(req);
837 static int ost_set_info(struct obd_export *exp, struct ptlrpc_request *req)
843 key = lustre_msg_buf(req->rq_reqmsg, 0, 1);
845 DEBUG_REQ(D_HA, req, "no set_info key");
848 keylen = req->rq_reqmsg->buflens[0];
850 rc = lustre_pack_reply(req, 0, NULL, NULL);
854 rc = obd_set_info(exp, keylen, key, 0, NULL);
855 req->rq_repmsg->status = 0;
859 static int ost_get_info(struct obd_export *exp, struct ptlrpc_request *req)
862 int keylen, rc = 0, size = sizeof(obd_id);
866 key = lustre_msg_buf(req->rq_reqmsg, 0, 1);
868 DEBUG_REQ(D_HA, req, "no get_info key");
871 keylen = req->rq_reqmsg->buflens[0];
873 if (keylen < strlen("last_id") || memcmp(key, "last_id", 7) != 0)
876 rc = lustre_pack_reply(req, 1, &size, NULL);
880 reply = lustre_msg_buf(req->rq_repmsg, 0, sizeof(*reply));
881 rc = obd_get_info(exp, keylen, key, &size, reply);
882 req->rq_repmsg->status = 0;
886 static int ost_filter_recovery_request(struct ptlrpc_request *req,
887 struct obd_device *obd, int *process)
889 switch (req->rq_reqmsg->opc) {
890 case OST_CONNECT: /* This will never get here, but for completeness. */
904 *process = target_queue_recovery_request(req, obd);
908 DEBUG_REQ(D_ERROR, req, "not permitted during recovery");
910 /* XXX what should we set rq_status to here? */
911 req->rq_status = -EAGAIN;
912 RETURN(ptlrpc_error(req));
916 static int ost_handle(struct ptlrpc_request *req)
918 struct obd_trans_info trans_info = { 0, };
919 struct obd_trans_info *oti = &trans_info;
920 int should_process, fail = OBD_FAIL_OST_ALL_REPLY_NET, rc = 0;
921 struct obd_device *obd = NULL;
924 LASSERT(current->journal_info == NULL);
925 /* XXX identical to MDS */
926 if (req->rq_reqmsg->opc != OST_CONNECT) {
927 int abort_recovery, recovering;
929 if (req->rq_export == NULL) {
930 CDEBUG(D_HA,"operation %d on unconnected OST from %s\n",
931 req->rq_reqmsg->opc, req->rq_peerstr);
932 req->rq_status = -ENOTCONN;
933 GOTO(out, rc = -ENOTCONN);
936 obd = req->rq_export->exp_obd;
938 /* Check for aborted recovery. */
939 spin_lock_bh(&obd->obd_processing_task_lock);
940 abort_recovery = obd->obd_abort_recovery;
941 recovering = obd->obd_recovering;
942 spin_unlock_bh(&obd->obd_processing_task_lock);
943 if (abort_recovery) {
944 target_abort_recovery(obd);
945 } else if (recovering) {
946 rc = ost_filter_recovery_request(req, obd,
948 if (rc || !should_process)
955 switch (req->rq_reqmsg->opc) {
957 CDEBUG(D_INODE, "connect\n");
958 OBD_FAIL_RETURN(OBD_FAIL_OST_CONNECT_NET, 0);
959 rc = target_handle_connect(req, ost_handle);
961 obd = req->rq_export->exp_obd;
965 CDEBUG(D_INODE, "disconnect\n");
966 OBD_FAIL_RETURN(OBD_FAIL_OST_DISCONNECT_NET, 0);
967 rc = target_handle_disconnect(req);
970 CDEBUG(D_INODE, "create\n");
971 OBD_FAIL_RETURN(OBD_FAIL_OST_CREATE_NET, 0);
972 if (OBD_FAIL_CHECK_ONCE(OBD_FAIL_OST_ENOSPC))
973 GOTO(out, rc = -ENOSPC);
974 if (OBD_FAIL_CHECK_ONCE(OBD_FAIL_OST_EROFS))
975 GOTO(out, rc = -EROFS);
976 rc = ost_create(req->rq_export, req, oti);
979 CDEBUG(D_INODE, "destroy\n");
980 OBD_FAIL_RETURN(OBD_FAIL_OST_DESTROY_NET, 0);
981 if (OBD_FAIL_CHECK_ONCE(OBD_FAIL_OST_EROFS))
982 GOTO(out, rc = -EROFS);
983 rc = ost_destroy(req->rq_export, req, oti);
986 CDEBUG(D_INODE, "getattr\n");
987 OBD_FAIL_RETURN(OBD_FAIL_OST_GETATTR_NET, 0);
988 rc = ost_getattr(req->rq_export, req);
991 CDEBUG(D_INODE, "setattr\n");
992 OBD_FAIL_RETURN(OBD_FAIL_OST_SETATTR_NET, 0);
993 rc = ost_setattr(req->rq_export, req, oti);
996 CDEBUG(D_INODE, "write\n");
997 OBD_FAIL_RETURN(OBD_FAIL_OST_BRW_NET, 0);
998 if (OBD_FAIL_CHECK_ONCE(OBD_FAIL_OST_ENOSPC))
999 GOTO(out, rc = -ENOSPC);
1000 if (OBD_FAIL_CHECK_ONCE(OBD_FAIL_OST_EROFS))
1001 GOTO(out, rc = -EROFS);
1002 rc = ost_brw_write(req, oti);
1003 LASSERT(current->journal_info == NULL);
1004 /* ost_brw_write sends its own replies */
1007 CDEBUG(D_INODE, "read\n");
1008 OBD_FAIL_RETURN(OBD_FAIL_OST_BRW_NET, 0);
1009 rc = ost_brw_read(req, oti);
1010 LASSERT(current->journal_info == NULL);
1011 /* ost_brw_read sends its own replies */
1014 CDEBUG(D_INODE, "san read\n");
1015 OBD_FAIL_RETURN(OBD_FAIL_OST_BRW_NET, 0);
1016 rc = ost_san_brw(req, OBD_BRW_READ);
1017 /* ost_san_brw sends its own replies */
1020 CDEBUG(D_INODE, "san write\n");
1021 OBD_FAIL_RETURN(OBD_FAIL_OST_BRW_NET, 0);
1022 rc = ost_san_brw(req, OBD_BRW_WRITE);
1023 /* ost_san_brw sends its own replies */
1026 CDEBUG(D_INODE, "punch\n");
1027 OBD_FAIL_RETURN(OBD_FAIL_OST_PUNCH_NET, 0);
1028 if (OBD_FAIL_CHECK_ONCE(OBD_FAIL_OST_EROFS))
1029 GOTO(out, rc = -EROFS);
1030 rc = ost_punch(req->rq_export, req, oti);
1033 CDEBUG(D_INODE, "statfs\n");
1034 OBD_FAIL_RETURN(OBD_FAIL_OST_STATFS_NET, 0);
1035 rc = ost_statfs(req);
1038 CDEBUG(D_INODE, "sync\n");
1039 OBD_FAIL_RETURN(OBD_FAIL_OST_SYNC_NET, 0);
1040 rc = ost_sync(req->rq_export, req);
1043 DEBUG_REQ(D_INODE, req, "set_info");
1044 rc = ost_set_info(req->rq_export, req);
1047 DEBUG_REQ(D_INODE, req, "get_info");
1048 rc = ost_get_info(req->rq_export, req);
1051 DEBUG_REQ(D_INODE, req, "ping");
1052 rc = target_handle_ping(req);
1054 /* FIXME - just reply status */
1055 case LLOG_ORIGIN_CONNECT:
1056 DEBUG_REQ(D_INODE, req, "log connect\n");
1057 rc = llog_handle_connect(req);
1058 req->rq_status = rc;
1059 rc = lustre_pack_reply(req, 0, NULL, NULL);
1062 RETURN(ptlrpc_reply(req));
1063 case OBD_LOG_CANCEL:
1064 CDEBUG(D_INODE, "log cancel\n");
1065 OBD_FAIL_RETURN(OBD_FAIL_OBD_LOG_CANCEL_NET, 0);
1066 rc = llog_origin_handle_cancel(req);
1067 req->rq_status = rc;
1068 rc = lustre_pack_reply(req, 0, NULL, NULL);
1071 RETURN(ptlrpc_reply(req));
1073 CDEBUG(D_INODE, "enqueue\n");
1074 OBD_FAIL_RETURN(OBD_FAIL_LDLM_ENQUEUE, 0);
1075 rc = ldlm_handle_enqueue(req, ldlm_server_completion_ast,
1076 ldlm_server_blocking_ast,
1077 ldlm_server_glimpse_ast);
1078 fail = OBD_FAIL_OST_LDLM_REPLY_NET;
1081 CDEBUG(D_INODE, "convert\n");
1082 OBD_FAIL_RETURN(OBD_FAIL_LDLM_CONVERT, 0);
1083 rc = ldlm_handle_convert(req);
1086 CDEBUG(D_INODE, "cancel\n");
1087 OBD_FAIL_RETURN(OBD_FAIL_LDLM_CANCEL, 0);
1088 rc = ldlm_handle_cancel(req);
1090 case LDLM_BL_CALLBACK:
1091 case LDLM_CP_CALLBACK:
1092 CDEBUG(D_INODE, "callback\n");
1093 CERROR("callbacks should not happen on OST\n");
1096 CERROR("Unexpected opcode %d\n", req->rq_reqmsg->opc);
1097 req->rq_status = -ENOTSUPP;
1098 rc = ptlrpc_error(req);
1102 LASSERT(current->journal_info == NULL);
1105 /* If we're DISCONNECTing, the export_data is already freed */
1106 if (!rc && req->rq_reqmsg->opc != OST_DISCONNECT)
1107 target_committed_to_req(req);
1110 if (lustre_msg_get_flags(req->rq_reqmsg) & MSG_LAST_REPLAY) {
1111 if (obd && obd->obd_recovering) {
1112 DEBUG_REQ(D_HA, req, "LAST_REPLAY, queuing reply");
1113 return target_queue_final_reply(req, rc);
1115 /* Lost a race with recovery; let the error path DTRT. */
1116 rc = req->rq_status = -ENOTCONN;
1120 oti_to_request(oti, req);
1122 target_send_reply(req, rc, fail);
1126 static int ost_setup(struct obd_device *obd, obd_count len, void *buf)
1128 struct ost_obd *ost = &obd->u.ost;
1129 struct lprocfs_static_vars lvars;
1133 rc = cleanup_group_info();
1137 rc = llog_start_commit_thread();
1141 lprocfs_init_vars(ost, &lvars);
1142 lprocfs_obd_setup(obd, lvars.obd_vars);
1145 ptlrpc_init_svc(OST_NBUFS, OST_BUFSIZE, OST_MAXREQSIZE,
1146 OST_REQUEST_PORTAL, OSC_REPLY_PORTAL,
1147 obd_timeout * 1000, ost_handle, "ost",
1148 obd->obd_proc_entry, ost_print_req);
1149 if (ost->ost_service == NULL) {
1150 CERROR("failed to start service\n");
1151 GOTO(out_lprocfs, rc = -ENOMEM);
1154 rc = ptlrpc_start_n_threads(obd, ost->ost_service, OST_NUM_THREADS,
1157 GOTO(out_service, rc = -EINVAL);
1159 ost->ost_create_service =
1160 ptlrpc_init_svc(OST_NBUFS, OST_BUFSIZE, OST_MAXREQSIZE,
1161 OST_CREATE_PORTAL, OSC_REPLY_PORTAL,
1162 obd_timeout * 1000, ost_handle, "ost_create",
1163 obd->obd_proc_entry, ost_print_req);
1164 if (ost->ost_create_service == NULL) {
1165 CERROR("failed to start OST create service\n");
1166 GOTO(out_service, rc = -ENOMEM);
1169 rc = ptlrpc_start_n_threads(obd, ost->ost_create_service, 1,
1172 GOTO(out_create, rc = -EINVAL);
1177 ptlrpc_unregister_service(ost->ost_create_service);
1179 ptlrpc_unregister_service(ost->ost_service);
1181 lprocfs_obd_cleanup(obd);
1185 static int ost_cleanup(struct obd_device *obd, int flags)
1187 struct ost_obd *ost = &obd->u.ost;
1191 spin_lock_bh(&obd->obd_processing_task_lock);
1192 if (obd->obd_recovering) {
1193 target_cancel_recovery_timer(obd);
1194 obd->obd_recovering = 0;
1196 spin_unlock_bh(&obd->obd_processing_task_lock);
1198 ptlrpc_unregister_service(ost->ost_service);
1199 ptlrpc_unregister_service(ost->ost_create_service);
1201 lprocfs_obd_cleanup(obd);
1206 /* use obd ops to offer management infrastructure */
1207 static struct obd_ops ost_obd_ops = {
1208 .o_owner = THIS_MODULE,
1209 .o_setup = ost_setup,
1210 .o_cleanup = ost_cleanup,
1213 static int __init ost_init(void)
1215 struct lprocfs_static_vars lvars;
1218 lprocfs_init_vars(ost,&lvars);
1219 RETURN(class_register_type(&ost_obd_ops, lvars.module_vars,
1223 static void /*__exit*/ ost_exit(void)
1225 class_unregister_type(LUSTRE_OST_NAME);
1228 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
1229 MODULE_DESCRIPTION("Lustre Object Storage Target (OST) v0.01");
1230 MODULE_LICENSE("GPL");
1232 module_init(ost_init);
1233 module_exit(ost_exit);