1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
4 * Copyright (C) 2001-2003 Cluster File Systems, Inc.
5 * Author: Peter J. Braam <braam@clusterfs.com>
6 * Author: Phil Schwan <phil@clusterfs.com>
8 * This file is part of Lustre, http://www.lustre.org.
10 * Lustre is free software; you can redistribute it and/or
11 * modify it under the terms of version 2 of the GNU General Public
12 * License as published by the Free Software Foundation.
14 * Lustre is distributed in the hope that it will be useful,
15 * but WITHOUT ANY WARRANTY; without even the implied warranty of
16 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 * GNU General Public License for more details.
19 * You should have received a copy of the GNU General Public License
20 * along with Lustre; if not, write to the Free Software
21 * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
23 * Storage Target Handling functions
24 * Lustre Object Server Module (OST)
26 * This server is single threaded at present (but can easily be multi
27 * threaded). For testing and management it is treated as an
28 * obd_device, although it does not export a full OBD method table
29 * (the requests are coming in over the wire, so object target
30 * modules do not have a full method table.)
34 #define DEBUG_SUBSYSTEM S_OST
36 #include <linux/module.h>
37 #include <linux/obd_ost.h>
38 #include <linux/lustre_net.h>
39 #include <linux/lustre_dlm.h>
40 #include <linux/lustre_export.h>
41 #include <linux/init.h>
42 #include <linux/lprocfs_status.h>
43 #include <linux/lustre_commit_confd.h>
44 #include <portals/list.h>
46 void oti_init(struct obd_trans_info *oti, struct ptlrpc_request *req)
50 memset(oti, 0, sizeof *oti);
52 if (req->rq_repmsg && req->rq_reqmsg != 0)
53 oti->oti_transno = req->rq_repmsg->transno;
56 void oti_to_request(struct obd_trans_info *oti, struct ptlrpc_request *req)
58 struct oti_req_ack_lock *ack_lock;
65 req->rq_repmsg->transno = oti->oti_transno;
67 /* XXX 4 == entries in oti_ack_locks??? */
68 for (ack_lock = oti->oti_ack_locks, i = 0; i < 4; i++, ack_lock++) {
71 memcpy(&req->rq_ack_locks[i].lock, &ack_lock->lock,
72 sizeof(req->rq_ack_locks[i].lock));
73 req->rq_ack_locks[i].mode = ack_lock->mode;
77 static int ost_destroy(struct ptlrpc_request *req, struct obd_trans_info *oti)
79 struct lustre_handle *conn = &req->rq_reqmsg->handle;
80 struct ost_body *body;
81 int rc, size = sizeof(*body);
84 body = lustre_swab_reqbuf(req, 0, sizeof(*body), lustre_swab_ost_body);
88 rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
92 if (body->oa.o_valid & OBD_MD_FLCOOKIE)
93 oti->oti_logcookies = obdo_logcookie(&body->oa);
94 req->rq_status = obd_destroy(conn, &body->oa, NULL, oti);
98 static int ost_getattr(struct ptlrpc_request *req)
100 struct lustre_handle *conn = (struct lustre_handle *)req->rq_reqmsg;
101 struct ost_body *body, *repbody;
102 int rc, size = sizeof(*body);
105 body = lustre_swab_reqbuf(req, 0, sizeof(*body), lustre_swab_ost_body);
109 rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
113 repbody = lustre_msg_buf (req->rq_repmsg, 0, sizeof(*repbody));
114 memcpy(&repbody->oa, &body->oa, sizeof(body->oa));
115 req->rq_status = obd_getattr(conn, &repbody->oa, NULL);
119 static int ost_statfs(struct ptlrpc_request *req)
121 struct obd_statfs *osfs;
122 int rc, size = sizeof(*osfs);
125 rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
129 osfs = lustre_msg_buf(req->rq_repmsg, 0, sizeof(*osfs));
131 req->rq_status = obd_statfs(req->rq_export->exp_obd, osfs, jiffies-HZ);
132 if (req->rq_status != 0)
133 CERROR("ost: statfs failed: rc %d\n", req->rq_status);
138 static int ost_syncfs(struct ptlrpc_request *req)
140 struct obd_statfs *osfs;
141 int rc, size = sizeof(*osfs);
144 rc = lustre_pack_msg(0, &size, NULL, &req->rq_replen, &req->rq_repmsg);
148 rc = obd_syncfs(req->rq_export);
150 CERROR("ost: syncfs failed: rc %d\n", rc);
158 static int ost_open(struct ptlrpc_request *req, struct obd_trans_info *oti)
160 struct lustre_handle *conn = (struct lustre_handle *)req->rq_reqmsg;
161 struct ost_body *body, *repbody;
162 int rc, size = sizeof(*repbody);
165 body = lustre_swab_reqbuf(req, 0, sizeof(*body), lustre_swab_ost_body);
169 rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
173 repbody = lustre_msg_buf (req->rq_repmsg, 0, sizeof(*repbody));
174 memcpy(&repbody->oa, &body->oa, sizeof(body->oa));
175 req->rq_status = obd_open(conn, &repbody->oa, NULL, oti, NULL);
179 static int ost_close(struct ptlrpc_request *req, struct obd_trans_info *oti)
181 struct lustre_handle *conn = (struct lustre_handle *)req->rq_reqmsg;
182 struct ost_body *body, *repbody;
183 int rc, size = sizeof(*repbody);
186 body = lustre_swab_reqbuf(req, 0, sizeof(*body), lustre_swab_ost_body);
190 rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
194 repbody = lustre_msg_buf(req->rq_repmsg, 0, sizeof(*repbody));
195 memcpy(&repbody->oa, &body->oa, sizeof(body->oa));
196 req->rq_status = obd_close(conn, &repbody->oa, NULL, oti);
200 static int ost_create(struct ptlrpc_request *req, struct obd_trans_info *oti)
202 struct lustre_handle *conn = (struct lustre_handle *)req->rq_reqmsg;
203 struct ost_body *body, *repbody;
204 int rc, size = sizeof(*repbody);
207 body = lustre_swab_reqbuf(req, 0, sizeof(*body), lustre_swab_ost_body);
211 rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
215 repbody = lustre_msg_buf (req->rq_repmsg, 0, sizeof(*repbody));
216 memcpy(&repbody->oa, &body->oa, sizeof(body->oa));
217 oti->oti_logcookies = obdo_logcookie(&repbody->oa);
218 req->rq_status = obd_create(conn, &repbody->oa, NULL, oti);
219 //obd_log_cancel(conn, NULL, 1, oti->oti_logcookies, 0);
223 static int ost_punch(struct ptlrpc_request *req, struct obd_trans_info *oti)
225 struct lustre_handle *conn = (struct lustre_handle *)req->rq_reqmsg;
226 struct ost_body *body, *repbody;
227 int rc, size = sizeof(*repbody);
230 body = lustre_swab_reqbuf(req, 0, sizeof(*body), lustre_swab_ost_body);
234 if ((body->oa.o_valid & (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS)) !=
235 (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS))
238 rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
242 repbody = lustre_msg_buf(req->rq_repmsg, 0, sizeof(*repbody));
243 memcpy(&repbody->oa, &body->oa, sizeof(body->oa));
244 req->rq_status = obd_punch(conn, &repbody->oa, NULL, repbody->oa.o_size,
245 repbody->oa.o_blocks, oti);
249 static int ost_setattr(struct ptlrpc_request *req, struct obd_trans_info *oti)
251 struct lustre_handle *conn = &req->rq_reqmsg->handle;
252 struct ost_body *body, *repbody;
253 int rc, size = sizeof(*repbody);
256 body = lustre_swab_reqbuf(req, 0, sizeof(*body), lustre_swab_ost_body);
260 rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
264 repbody = lustre_msg_buf(req->rq_repmsg, 0, sizeof(*repbody));
265 memcpy(&repbody->oa, &body->oa, sizeof(body->oa));
267 req->rq_status = obd_setattr(conn, &repbody->oa, NULL, oti);
271 static int ost_bulk_timeout(void *data)
274 /* We don't fail the connection here, because having the export
275 * killed makes the (vital) call to commitrw very sad.
280 static int get_per_page_niobufs(struct obd_ioobj *ioo, int nioo,
281 struct niobuf_remote *rnb, int nrnb,
282 struct niobuf_remote **pp_rnbp)
284 /* Copy a remote niobuf, splitting it into page-sized chunks
285 * and setting ioo[i].ioo_bufcnt accordingly */
286 struct niobuf_remote *pp_rnb;
293 /* first count and check the number of pages required */
294 for (i = 0; i < nioo; i++)
295 for (j = 0; j < ioo->ioo_bufcnt; j++, rnbidx++) {
296 obd_off offset = rnb[rnbidx].offset;
297 obd_off p0 = offset >> PAGE_SHIFT;
298 obd_off pn = (offset + rnb[rnbidx].len - 1)>>PAGE_SHIFT;
300 LASSERT(rnbidx < nrnb);
302 npages += (pn + 1 - p0);
304 if (rnb[rnbidx].len == 0) {
305 CERROR("zero len BRW: obj %d objid "LPX64
306 " buf %u\n", i, ioo[i].ioo_id, j);
310 rnb[rnbidx].offset <= rnb[rnbidx-1].offset) {
311 CERROR("unordered BRW: obj %d objid "LPX64
312 " buf %u offset "LPX64" <= "LPX64"\n",
313 i, ioo[i].ioo_id, j, rnb[rnbidx].offset,
319 LASSERT(rnbidx == nrnb);
321 if (npages == nrnb) { /* all niobufs are for single pages */
326 OBD_ALLOC(pp_rnb, sizeof(*pp_rnb) * npages);
330 /* now do the actual split */
332 for (i = 0; i < nioo; i++) {
335 for (j = 0; j < ioo[i].ioo_bufcnt; j++, rnbidx++) {
336 obd_off off = rnb[rnbidx].offset;
337 int nob = rnb[rnbidx].len;
339 LASSERT(rnbidx < nrnb);
341 obd_off poff = off & (PAGE_SIZE - 1);
342 int pnob = (poff + nob > PAGE_SIZE) ?
343 PAGE_SIZE - poff : nob;
345 LASSERT(page < npages);
346 pp_rnb[page].len = pnob;
347 pp_rnb[page].offset = off;
348 pp_rnb[page].flags = rnb->flags;
350 CDEBUG(D_PAGE, " obj %d id "LPX64
351 "page %d(%d) "LPX64" for %d\n",
352 i, ioo[i].ioo_id, obj_pages, page,
353 pp_rnb[page].offset, pp_rnb[page].len);
362 ioo[i].ioo_bufcnt = obj_pages;
364 LASSERT(page == npages);
370 static void free_per_page_niobufs (int npages, struct niobuf_remote *pp_rnb,
371 struct niobuf_remote *rnb)
373 if (pp_rnb == rnb) /* didn't allocate above */
376 OBD_FREE(pp_rnb, sizeof(*pp_rnb) * npages);
380 __u64 ost_checksum_bulk (struct ptlrpc_bulk_desc *desc)
383 struct ptlrpc_bulk_page *bp;
385 list_for_each_entry(bp, &desc->bd_page_list, bp_link) {
386 ost_checksum(&cksum, kmap(bp->bp_page) + bp->bp_pageoffset,
393 static int ost_brw_read(struct ptlrpc_request *req)
395 struct ptlrpc_bulk_desc *desc;
396 struct niobuf_remote *remote_nb;
397 struct niobuf_remote *pp_rnb;
398 struct niobuf_local *local_nb;
399 struct obd_ioobj *ioo;
400 struct ost_body *body, *repbody;
401 struct l_wait_info lwi;
402 struct obd_trans_info oti = { 0 };
403 int size[1] = { sizeof(*body) };
412 if (OBD_FAIL_CHECK(OBD_FAIL_OST_BRW_READ_BULK))
413 GOTO(out, rc = -EIO);
415 OBD_FAIL_TIMEOUT(OBD_FAIL_OST_BRW_PAUSE_BULK | OBD_FAIL_ONCE,
416 (obd_timeout + 1) / 4);
418 body = lustre_swab_reqbuf(req, 0, sizeof(*body), lustre_swab_ost_body);
420 CERROR("Missing/short ost_body\n");
421 GOTO(out, rc = -EFAULT);
424 /* BUG 974: when we send back cache grants, don't clear this flag */
425 body->oa.o_valid &= ~OBD_MD_FLRDEV;
427 ioo = lustre_swab_reqbuf(req, 1, sizeof(*ioo), lustre_swab_obd_ioobj);
429 CERROR("Missing/short ioobj\n");
430 GOTO(out, rc = -EFAULT);
433 niocount = ioo->ioo_bufcnt;
434 remote_nb = lustre_swab_reqbuf(req, 2, niocount * sizeof(*remote_nb),
435 lustre_swab_niobuf_remote);
436 if (remote_nb == NULL) {
437 CERROR("Missing/short niobuf\n");
438 GOTO(out, rc = -EFAULT);
440 if (lustre_msg_swabbed(req->rq_reqmsg)) { /* swab remaining niobufs */
441 for (i = 1; i < niocount; i++)
442 lustre_swab_niobuf_remote (&remote_nb[i]);
445 size[0] = sizeof(*body);
446 rc = lustre_pack_msg(1, size, NULL, &req->rq_replen, &req->rq_repmsg);
450 /* FIXME all niobuf splitting should be done in obdfilter if needed */
451 /* CAVEAT EMPTOR this sets ioo->ioo_bufcnt to # pages */
452 npages = get_per_page_niobufs(ioo, 1, remote_nb, niocount, &pp_rnb);
454 GOTO(out, rc = npages);
456 OBD_ALLOC(local_nb, sizeof(*local_nb) * npages);
457 if (local_nb == NULL)
458 GOTO(out_pp_rnb, rc = -ENOMEM);
460 desc = ptlrpc_prep_bulk_exp(req, BULK_PUT_SOURCE, OST_BULK_PORTAL);
462 GOTO(out_local, rc = -ENOMEM);
464 rc = obd_preprw(OBD_BRW_READ, req->rq_export, &body->oa, 1,
465 ioo, npages, pp_rnb, local_nb, &oti);
470 for (i = 0; i < npages; i++) {
471 int page_rc = local_nb[i].rc;
473 if (page_rc < 0) { /* error */
478 LASSERT(page_rc <= pp_rnb[i].len);
480 if (page_rc != 0) { /* some data! */
481 LASSERT (local_nb[i].page != NULL);
482 rc = ptlrpc_prep_bulk_page(desc, local_nb[i].page,
483 pp_rnb[i].offset& ~PAGE_MASK,
489 if (page_rc != pp_rnb[i].len) { /* short read */
490 /* All subsequent pages should be 0 */
492 LASSERT(local_nb[i].rc == 0);
498 rc = ptlrpc_bulk_put(desc);
500 lwi = LWI_TIMEOUT(obd_timeout * HZ / 4,
501 ost_bulk_timeout, desc);
502 rc = l_wait_event(desc->bd_waitq,
503 ptlrpc_bulk_complete(desc), &lwi);
505 LASSERT(rc == -ETIMEDOUT);
506 DEBUG_REQ(D_ERROR, req, "timeout on bulk PUT");
507 ptlrpc_abort_bulk(desc);
510 DEBUG_REQ(D_ERROR, req, "bulk PUT failed: rc %d\n", rc);
512 comms_error = rc != 0;
515 /* Must commit after prep above in all cases */
516 rc = obd_commitrw(OBD_BRW_READ, req->rq_export, &body->oa, 1,
517 ioo, npages, local_nb, &oti);
519 repbody = lustre_msg_buf(req->rq_repmsg, 0, sizeof(*repbody));
520 memcpy(&repbody->oa, &body->oa, sizeof(repbody->oa));
524 repbody->oa.o_rdev = ost_checksum_bulk(desc);
525 repbody->oa.o_valid |= OBD_MD_FLCKSUM;
530 ptlrpc_free_bulk(desc);
532 OBD_FREE(local_nb, sizeof(*local_nb) * npages);
534 free_per_page_niobufs(npages, pp_rnb, remote_nb);
538 req->rq_status = nob;
540 } else if (!comms_error) {
541 /* only reply if comms OK */
545 if (req->rq_repmsg != NULL) {
546 /* reply out callback would free */
547 OBD_FREE(req->rq_repmsg, req->rq_replen);
549 CERROR("bulk IO comms error: evicting %s@%s nid "LPU64"\n",
550 req->rq_export->exp_client_uuid.uuid,
551 req->rq_connection->c_remote_uuid.uuid,
552 req->rq_connection->c_peer.peer_nid);
553 ptlrpc_fail_export(req->rq_export);
559 static int ost_brw_write(struct ptlrpc_request *req, struct obd_trans_info *oti)
561 struct ptlrpc_bulk_desc *desc;
562 struct niobuf_remote *remote_nb;
563 struct niobuf_remote *pp_rnb;
564 struct niobuf_local *local_nb;
565 struct obd_ioobj *ioo;
566 struct ost_body *body, *repbody;
567 struct l_wait_info lwi;
569 int size[2] = { sizeof(*body) };
570 int objcount, niocount, npages;
572 int rc, rc2, swab, i, j;
575 if (OBD_FAIL_CHECK(OBD_FAIL_OST_BRW_WRITE_BULK))
576 GOTO(out, rc = -EIO);
578 /* pause before transaction has been started */
579 OBD_FAIL_TIMEOUT(OBD_FAIL_OST_BRW_PAUSE_BULK | OBD_FAIL_ONCE,
580 (obd_timeout + 1) / 4);
582 swab = lustre_msg_swabbed(req->rq_reqmsg);
583 body = lustre_swab_reqbuf(req, 0, sizeof(*body), lustre_swab_ost_body);
585 CERROR("Missing/short ost_body\n");
586 GOTO(out, rc = -EFAULT);
589 /* BUG 974: when we send back cache grants, don't clear this flag */
590 body->oa.o_valid &= ~OBD_MD_FLRDEV;
592 LASSERT_REQSWAB(req, 1);
593 objcount = req->rq_reqmsg->buflens[1] / sizeof(*ioo);
595 CERROR("Missing/short ioobj\n");
596 GOTO(out, rc = -EFAULT);
598 ioo = lustre_msg_buf (req->rq_reqmsg, 1, objcount * sizeof(*ioo));
599 LASSERT (ioo != NULL);
600 for (niocount = i = 0; i < objcount; i++) {
602 lustre_swab_obd_ioobj (&ioo[i]);
603 if (ioo[i].ioo_bufcnt == 0) {
604 CERROR("ioo[%d] has zero bufcnt\n", i);
605 GOTO(out, rc = -EFAULT);
607 niocount += ioo[i].ioo_bufcnt;
610 remote_nb = lustre_swab_reqbuf(req, 2, niocount * sizeof(*remote_nb),
611 lustre_swab_niobuf_remote);
612 if (remote_nb == NULL) {
613 CERROR("Missing/short niobuf\n");
614 GOTO(out, rc = -EFAULT);
616 if (swab) { /* swab the remaining niobufs */
617 for (i = 1; i < niocount; i++)
618 lustre_swab_niobuf_remote (&remote_nb[i]);
621 size[1] = niocount * sizeof(*rcs);
622 rc = lustre_pack_msg(2, size, NULL, &req->rq_replen,
626 rcs = lustre_msg_buf(req->rq_repmsg, 1, niocount * sizeof(*rcs));
628 /* FIXME all niobuf splitting should be done in obdfilter if needed */
629 /* CAVEAT EMPTOR this sets ioo->ioo_bufcnt to # pages */
630 npages = get_per_page_niobufs(ioo, objcount,remote_nb,niocount,&pp_rnb);
632 GOTO(out, rc = npages);
634 OBD_ALLOC(local_nb, sizeof(*local_nb) * npages);
635 if (local_nb == NULL)
636 GOTO(out_pp_rnb, rc = -ENOMEM);
638 desc = ptlrpc_prep_bulk_exp(req, BULK_GET_SINK, OST_BULK_PORTAL);
640 GOTO(out_local, rc = -ENOMEM);
642 rc = obd_preprw(OBD_BRW_WRITE, req->rq_export, &body->oa, objcount,
643 ioo, npages, pp_rnb, local_nb, oti);
647 /* NB Having prepped, we must commit... */
649 for (i = 0; i < npages; i++) {
650 rc = ptlrpc_prep_bulk_page(desc, local_nb[i].page,
651 pp_rnb[i].offset & (PAGE_SIZE - 1),
658 rc = ptlrpc_bulk_get(desc);
660 lwi = LWI_TIMEOUT(obd_timeout * HZ / 4,
661 ost_bulk_timeout, desc);
662 rc = l_wait_event(desc->bd_waitq,
663 ptlrpc_bulk_complete(desc), &lwi);
665 LASSERT(rc == -ETIMEDOUT);
666 DEBUG_REQ(D_ERROR, req, "timeout on bulk GET");
667 ptlrpc_abort_bulk(desc);
670 DEBUG_REQ(D_ERROR, req, "bulk GET failed: rc %d\n", rc);
672 comms_error = rc != 0;
675 repbody = lustre_msg_buf(req->rq_repmsg, 0, sizeof(*repbody));
676 memcpy(&repbody->oa, &body->oa, sizeof(repbody->oa));
679 if (rc == 0 && (body->oa.o_valid & OBD_MD_FLCKSUM) != 0) {
680 static int cksum_counter;
681 __u64 client_cksum = body->oa.o_rdev;
682 __u64 cksum = ost_checksum_bulk(desc);
684 if (client_cksum != cksum) {
685 CERROR("Bad checksum: client "LPX64", server "LPX64
686 ", client NID "LPX64"\n", client_cksum, cksum,
687 req->rq_connection->c_peer.peer_nid);
689 repbody->oa.o_rdev = cksum;
692 if ((cksum_counter & (-cksum_counter)) == cksum_counter)
693 CERROR("Checksum %d from "LPX64": "LPX64" OK\n",
695 req->rq_connection->c_peer.peer_nid,
700 /* Must commit after prep above in all cases */
701 rc2 = obd_commitrw(OBD_BRW_WRITE, req->rq_export, &repbody->oa,
702 objcount, ioo, npages, local_nb, oti);
705 /* set per-requested niobuf return codes */
706 for (i = j = 0; i < niocount; i++) {
707 int nob = remote_nb[i].len;
712 if (local_nb[j].rc < 0)
713 rcs[i] = local_nb[j].rc;
714 nob -= pp_rnb[j].len;
719 LASSERT(j == npages);
725 ptlrpc_free_bulk(desc);
727 OBD_FREE(local_nb, sizeof(*local_nb) * npages);
729 free_per_page_niobufs(npages, pp_rnb, remote_nb);
732 oti_to_request(oti, req);
733 rc = ptlrpc_reply(req);
734 } else if (!comms_error) {
735 /* Only reply if there was no comms problem with bulk */
739 if (req->rq_repmsg != NULL) {
740 /* reply out callback would free */
741 OBD_FREE (req->rq_repmsg, req->rq_replen);
743 CERROR("bulk IO comms error: evicting %s@%s nid "LPU64"\n",
744 req->rq_export->exp_client_uuid.uuid,
745 req->rq_connection->c_remote_uuid.uuid,
746 req->rq_connection->c_peer.peer_nid);
747 ptlrpc_fail_export(req->rq_export);
752 static int ost_san_brw(struct ptlrpc_request *req, int cmd)
754 struct niobuf_remote *remote_nb, *res_nb;
755 struct obd_ioobj *ioo;
756 struct ost_body *body, *repbody;
757 int rc, i, j, objcount, niocount, size[2] = {sizeof(*body)};
762 /* XXX not set to use latest protocol */
764 swab = lustre_msg_swabbed(req->rq_reqmsg);
765 body = lustre_swab_reqbuf(req, 0, sizeof(*body), lustre_swab_ost_body);
767 CERROR("Missing/short ost_body\n");
768 GOTO(out, rc = -EFAULT);
771 ioo = lustre_swab_reqbuf(req, 1, sizeof(*ioo), lustre_swab_obd_ioobj);
773 CERROR("Missing/short ioobj\n");
774 GOTO(out, rc = -EFAULT);
776 objcount = req->rq_reqmsg->buflens[1] / sizeof(*ioo);
777 niocount = ioo[0].ioo_bufcnt;
778 for (i = 1; i < objcount; i++) {
780 lustre_swab_obd_ioobj (&ioo[i]);
781 niocount += ioo[i].ioo_bufcnt;
784 remote_nb = lustre_swab_reqbuf(req, 2, niocount * sizeof(*remote_nb),
785 lustre_swab_niobuf_remote);
786 if (remote_nb == NULL) {
787 CERROR("Missing/short niobuf\n");
788 GOTO(out, rc = -EFAULT);
790 if (swab) { /* swab the remaining niobufs */
791 for (i = 1; i < niocount; i++)
792 lustre_swab_niobuf_remote (&remote_nb[i]);
795 for (i = n = 0; i < objcount; i++) {
796 for (j = 0; j < ioo[i].ioo_bufcnt; j++, n++) {
797 if (remote_nb[n].len == 0) {
798 CERROR("zero len BRW: objid "LPX64" buf %u\n",
800 GOTO(out, rc = -EINVAL);
802 if (j && remote_nb[n].offset <= remote_nb[n-1].offset) {
803 CERROR("unordered BRW: objid "LPX64
804 " buf %u offset "LPX64" <= "LPX64"\n",
805 ioo[i].ioo_id, j, remote_nb[n].offset,
806 remote_nb[n-1].offset);
807 GOTO(out, rc = -EINVAL);
812 size[1] = niocount * sizeof(*remote_nb);
813 rc = lustre_pack_msg(2, size, NULL, &req->rq_replen, &req->rq_repmsg);
817 req->rq_status = obd_san_preprw(cmd, req->rq_export, &body->oa,
818 objcount, ioo, niocount, remote_nb);
823 repbody = lustre_msg_buf(req->rq_repmsg, 0, sizeof(*repbody));
824 memcpy(&repbody->oa, &body->oa, sizeof(body->oa));
826 res_nb = lustre_msg_buf(req->rq_repmsg, 1, size[1]);
827 memcpy(res_nb, remote_nb, size[1]);
831 OBD_FREE(req->rq_repmsg, req->rq_replen);
832 req->rq_repmsg = NULL;
841 static int ost_log_cancel(struct ptlrpc_request *req)
843 struct lustre_handle *conn;
844 struct llog_cookie *logcookies;
845 int num_cookies, rc = 0;
848 logcookies = lustre_msg_buf(req->rq_reqmsg, 0, sizeof(*logcookies));
849 if (logcookies == NULL) {
850 DEBUG_REQ(D_HA, req, "no cookies sent");
853 num_cookies = req->rq_reqmsg->buflens[0] / sizeof(*logcookies);
855 /* workaround until we don't need to send replies */
856 rc = lustre_pack_msg(0, NULL, NULL, &req->rq_replen, &req->rq_repmsg);
859 req->rq_repmsg->status = 0;
862 conn = (struct lustre_handle *)&req->rq_reqmsg->handle;
863 rc = obd_log_cancel(conn, NULL, num_cookies, logcookies, 0);
868 static int ost_set_info(struct ptlrpc_request *req)
870 struct lustre_handle *conn;
875 key = lustre_msg_buf(req->rq_reqmsg, 0, 1);
877 DEBUG_REQ(D_HA, req, "no set_info key");
880 keylen = req->rq_reqmsg->buflens[0];
882 rc = lustre_pack_msg(0, NULL, NULL, &req->rq_replen, &req->rq_repmsg);
886 conn = (struct lustre_handle *)&req->rq_reqmsg->handle;
887 rc = obd_set_info(conn, keylen, key, 0, NULL);
888 req->rq_repmsg->status = 0;
892 static int filter_recovery_request(struct ptlrpc_request *req,
893 struct obd_device *obd, int *process)
895 switch (req->rq_reqmsg->opc) {
896 case OST_CONNECT: /* This will never get here, but for completeness. */
912 *process = target_queue_recovery_request(req, obd);
916 DEBUG_REQ(D_ERROR, req, "not permitted during recovery");
918 /* XXX what should we set rq_status to here? */
919 req->rq_status = -EAGAIN;
920 RETURN(ptlrpc_error(req));
926 static int ost_handle(struct ptlrpc_request *req)
928 struct obd_trans_info trans_info = { 0, };
929 struct obd_trans_info *oti = &trans_info;
930 int should_process, fail = OBD_FAIL_OST_ALL_REPLY_NET, rc = 0;
933 LASSERT(current->journal_info == NULL);
934 /* XXX identical to MDS */
935 if (req->rq_reqmsg->opc != OST_CONNECT) {
936 struct obd_device *obd;
937 int abort_recovery, recovering;
939 if (req->rq_export == NULL) {
940 CDEBUG(D_HA, "operation %d on unconnected OST\n",
941 req->rq_reqmsg->opc);
942 req->rq_status = -ENOTCONN;
943 GOTO(out, rc = -ENOTCONN);
946 obd = req->rq_export->exp_obd;
948 /* Check for aborted recovery. */
949 spin_lock_bh(&obd->obd_processing_task_lock);
950 abort_recovery = obd->obd_abort_recovery;
951 recovering = obd->obd_recovering;
952 spin_unlock_bh(&obd->obd_processing_task_lock);
953 if (abort_recovery) {
954 target_abort_recovery(obd);
955 } else if (recovering) {
956 rc = filter_recovery_request(req, obd, &should_process);
957 if (rc || !should_process)
962 if (strcmp(req->rq_obd->obd_type->typ_name, "ost") != 0)
963 GOTO(out, rc = -EINVAL);
967 switch (req->rq_reqmsg->opc) {
969 CDEBUG(D_INODE, "connect\n");
970 OBD_FAIL_RETURN(OBD_FAIL_OST_CONNECT_NET, 0);
971 rc = target_handle_connect(req, ost_handle);
974 CDEBUG(D_INODE, "disconnect\n");
975 OBD_FAIL_RETURN(OBD_FAIL_OST_DISCONNECT_NET, 0);
976 rc = target_handle_disconnect(req);
979 CDEBUG(D_INODE, "create\n");
980 OBD_FAIL_RETURN(OBD_FAIL_OST_CREATE_NET, 0);
981 rc = ost_create(req, oti);
984 CDEBUG(D_INODE, "destroy\n");
985 OBD_FAIL_RETURN(OBD_FAIL_OST_DESTROY_NET, 0);
986 rc = ost_destroy(req, oti);
989 CDEBUG(D_INODE, "getattr\n");
990 OBD_FAIL_RETURN(OBD_FAIL_OST_GETATTR_NET, 0);
991 rc = ost_getattr(req);
994 CDEBUG(D_INODE, "setattr\n");
995 OBD_FAIL_RETURN(OBD_FAIL_OST_SETATTR_NET, 0);
996 rc = ost_setattr(req, oti);
999 CDEBUG(D_INODE, "open\n");
1000 OBD_FAIL_RETURN(OBD_FAIL_OST_OPEN_NET, 0);
1001 rc = ost_open(req, oti);
1004 CDEBUG(D_INODE, "close\n");
1005 OBD_FAIL_RETURN(OBD_FAIL_OST_CLOSE_NET, 0);
1006 rc = ost_close(req, oti);
1009 CDEBUG(D_INODE, "write\n");
1010 OBD_FAIL_RETURN(OBD_FAIL_OST_BRW_NET, 0);
1011 rc = ost_brw_write(req, oti);
1012 LASSERT(current->journal_info == NULL);
1013 /* ost_brw sends its own replies */
1016 CDEBUG(D_INODE, "read\n");
1017 OBD_FAIL_RETURN(OBD_FAIL_OST_BRW_NET, 0);
1018 rc = ost_brw_read(req);
1019 LASSERT(current->journal_info == NULL);
1020 /* ost_brw sends its own replies */
1023 CDEBUG(D_INODE, "san read\n");
1024 OBD_FAIL_RETURN(OBD_FAIL_OST_BRW_NET, 0);
1025 rc = ost_san_brw(req, OBD_BRW_READ);
1026 /* ost_san_brw sends its own replies */
1029 CDEBUG(D_INODE, "san write\n");
1030 OBD_FAIL_RETURN(OBD_FAIL_OST_BRW_NET, 0);
1031 rc = ost_san_brw(req, OBD_BRW_WRITE);
1032 /* ost_san_brw sends its own replies */
1035 CDEBUG(D_INODE, "punch\n");
1036 OBD_FAIL_RETURN(OBD_FAIL_OST_PUNCH_NET, 0);
1037 rc = ost_punch(req, oti);
1040 CDEBUG(D_INODE, "statfs\n");
1041 OBD_FAIL_RETURN(OBD_FAIL_OST_STATFS_NET, 0);
1042 rc = ost_statfs(req);
1045 CDEBUG(D_INODE, "sync\n");
1046 OBD_FAIL_RETURN(OBD_FAIL_OST_SYNCFS_NET, 0);
1047 rc = ost_syncfs(req);
1050 DEBUG_REQ(D_INODE, req, "set_info");
1051 rc = ost_set_info(req);
1053 DEBUG_REQ(D_INODE, req, "ping");
1054 rc = target_handle_ping(req);
1056 case OBD_LOG_CANCEL:
1057 CDEBUG(D_INODE, "log cancel\n");
1058 OBD_FAIL_RETURN(OBD_FAIL_OBD_LOG_CANCEL_NET, 0);
1059 rc = ost_log_cancel(req);
1062 CDEBUG(D_INODE, "enqueue\n");
1063 OBD_FAIL_RETURN(OBD_FAIL_LDLM_ENQUEUE, 0);
1064 rc = ldlm_handle_enqueue(req, ldlm_server_completion_ast,
1065 ldlm_server_blocking_ast);
1066 fail = OBD_FAIL_OST_LDLM_REPLY_NET;
1069 CDEBUG(D_INODE, "convert\n");
1070 OBD_FAIL_RETURN(OBD_FAIL_LDLM_CONVERT, 0);
1071 rc = ldlm_handle_convert(req);
1074 CDEBUG(D_INODE, "cancel\n");
1075 OBD_FAIL_RETURN(OBD_FAIL_LDLM_CANCEL, 0);
1076 rc = ldlm_handle_cancel(req);
1078 case LDLM_BL_CALLBACK:
1079 case LDLM_CP_CALLBACK:
1080 CDEBUG(D_INODE, "callback\n");
1081 CERROR("callbacks should not happen on OST\n");
1084 CERROR("Unexpected opcode %d\n", req->rq_reqmsg->opc);
1085 req->rq_status = -ENOTSUPP;
1086 rc = ptlrpc_error(req);
1090 LASSERT(current->journal_info == NULL);
1093 /* If we're DISCONNECTing, the export_data is already freed */
1094 if (!rc && req->rq_reqmsg->opc != OST_DISCONNECT) {
1095 struct obd_device *obd = req->rq_export->exp_obd;
1096 if (!obd->obd_no_transno) {
1097 req->rq_repmsg->last_committed =
1098 obd->obd_last_committed;
1100 DEBUG_REQ(D_IOCTL, req,
1101 "not sending last_committed update");
1103 CDEBUG(D_INFO, "last_committed "LPU64", xid "LPX64"\n",
1104 obd->obd_last_committed, req->rq_xid);
1108 if (lustre_msg_get_flags(req->rq_reqmsg) & MSG_LAST_REPLAY) {
1109 struct obd_device *obd = req->rq_export->exp_obd;
1111 if (obd && obd->obd_recovering) {
1112 DEBUG_REQ(D_HA, req, "LAST_REPLAY, queuing reply");
1113 return target_queue_final_reply(req, rc);
1115 /* Lost a race with recovery; let the error path DTRT. */
1116 rc = req->rq_status = -ENOTCONN;
1120 oti_to_request(oti, req);
1122 target_send_reply(req, rc, fail);
1126 static int ost_setup(struct obd_device *obddev, obd_count len, void *buf)
1128 struct ost_obd *ost = &obddev->u.ost;
1132 #ifdef ENABLE_ORPHANS
1133 err = llog_start_commit_thread();
1138 ost->ost_service = ptlrpc_init_svc(OST_NEVENTS, OST_NBUFS,
1139 OST_BUFSIZE, OST_MAXREQSIZE,
1140 OST_REQUEST_PORTAL, OSC_REPLY_PORTAL,
1141 ost_handle, "ost", obddev);
1142 if (!ost->ost_service) {
1143 CERROR("failed to start service\n");
1147 for (i = 0; i < OST_NUM_THREADS; i++) {
1149 sprintf(name, "ll_ost_%02d", i);
1150 err = ptlrpc_start_thread(obddev, ost->ost_service, name);
1152 CERROR("error starting thread #%d: rc %d\n", i, err);
1160 static int ost_cleanup(struct obd_device *obddev, int flags)
1162 struct ost_obd *ost = &obddev->u.ost;
1166 if (obddev->obd_recovering)
1167 target_cancel_recovery_timer(obddev);
1169 ptlrpc_stop_all_threads(ost->ost_service);
1170 ptlrpc_unregister_service(ost->ost_service);
1175 int ost_attach(struct obd_device *dev, obd_count len, void *data)
1177 struct lprocfs_static_vars lvars;
1179 lprocfs_init_vars(ost,&lvars);
1180 return lprocfs_obd_attach(dev, lvars.obd_vars);
1183 int ost_detach(struct obd_device *dev)
1185 return lprocfs_obd_detach(dev);
1188 /* I don't think this function is ever used, since nothing
1189 * connects directly to this module.
1191 static int ost_connect(struct lustre_handle *conn,
1192 struct obd_device *obd, struct obd_uuid *cluuid)
1194 struct obd_export *exp;
1198 if (!conn || !obd || !cluuid)
1201 rc = class_connect(conn, obd, cluuid);
1204 exp = class_conn2export(conn);
1206 class_export_put(exp);
1211 /* use obd ops to offer management infrastructure */
1212 static struct obd_ops ost_obd_ops = {
1213 o_owner: THIS_MODULE,
1214 o_attach: ost_attach,
1215 o_detach: ost_detach,
1217 o_cleanup: ost_cleanup,
1218 o_connect: ost_connect,
1221 static int __init ost_init(void)
1223 struct lprocfs_static_vars lvars;
1226 lprocfs_init_vars(ost,&lvars);
1227 RETURN(class_register_type(&ost_obd_ops, lvars.module_vars,
1231 static void /*__exit*/ ost_exit(void)
1233 class_unregister_type(LUSTRE_OST_NAME);
1236 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
1237 MODULE_DESCRIPTION("Lustre Object Storage Target (OST) v0.01");
1238 MODULE_LICENSE("GPL");
1240 module_init(ost_init);
1241 module_exit(ost_exit);