1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
4 * Copyright (C) 2001-2003 Cluster File Systems, Inc.
5 * Author: Peter J. Braam <braam@clusterfs.com>
6 * Author: Phil Schwan <phil@clusterfs.com>
8 * This file is part of Lustre, http://www.lustre.org.
10 * Lustre is free software; you can redistribute it and/or
11 * modify it under the terms of version 2 of the GNU General Public
12 * License as published by the Free Software Foundation.
14 * Lustre is distributed in the hope that it will be useful,
15 * but WITHOUT ANY WARRANTY; without even the implied warranty of
16 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 * GNU General Public License for more details.
19 * You should have received a copy of the GNU General Public License
20 * along with Lustre; if not, write to the Free Software
21 * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
23 * Storage Target Handling functions
24 * Lustre Object Server Module (OST)
26 * This server is single threaded at present (but can easily be multi
27 * threaded). For testing and management it is treated as an
28 * obd_device, although it does not export a full OBD method table
29 * (the requests are coming in over the wire, so object target
30 * modules do not have a full method table.)
34 #define DEBUG_SUBSYSTEM S_OST
36 #include <linux/module.h>
37 #include <linux/obd_ost.h>
38 #include <linux/lustre_net.h>
39 #include <linux/lustre_dlm.h>
40 #include <linux/lustre_export.h>
41 #include <linux/init.h>
42 #include <linux/lprocfs_status.h>
44 inline void oti_init(struct obd_trans_info *oti,
45 struct ptlrpc_request *req)
49 memset(oti, 0, sizeof *oti);
52 if (req->rq_repmsg && req->rq_reqmsg != 0)
53 oti->oti_transno = req->rq_repmsg->transno;
58 inline void oti_to_request(struct obd_trans_info *oti,
59 struct ptlrpc_request *req)
62 struct oti_req_ack_lock *ack_lock;
68 req->rq_repmsg->transno = oti->oti_transno;
70 /* XXX 4 == entries in oti_ack_locks??? */
71 for (ack_lock = oti->oti_ack_locks, i = 0; i < 4; i++, ack_lock++) {
74 memcpy(&req->rq_ack_locks[i].lock, &ack_lock->lock,
75 sizeof(req->rq_ack_locks[i].lock));
76 req->rq_ack_locks[i].mode = ack_lock->mode;
81 static int ost_destroy(struct ptlrpc_request *req, struct obd_trans_info *oti)
83 struct lustre_handle *conn = &req->rq_reqmsg->handle;
84 struct ost_body *body;
85 int rc, size = sizeof(*body);
88 body = lustre_swab_reqbuf (req, 0, sizeof (*body),
89 lustre_swab_ost_body);
93 rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
97 req->rq_status = obd_destroy(conn, &body->oa, NULL, oti);
101 static int ost_getattr(struct ptlrpc_request *req)
103 struct lustre_handle *conn = (struct lustre_handle *)req->rq_reqmsg;
104 struct ost_body *body, *repbody;
105 int rc, size = sizeof(*body);
108 body = lustre_swab_reqbuf (req, 0, sizeof (*body),
109 lustre_swab_ost_body);
113 rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
117 repbody = lustre_msg_buf (req->rq_repmsg, 0, sizeof (*repbody));
118 memcpy(&repbody->oa, &body->oa, sizeof(body->oa));
119 req->rq_status = obd_getattr(conn, &repbody->oa, NULL);
123 static int ost_statfs(struct ptlrpc_request *req)
125 struct obd_statfs *osfs;
126 int rc, size = sizeof(*osfs);
129 rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
133 osfs = lustre_msg_buf(req->rq_repmsg, 0, sizeof (*osfs));
134 memset(osfs, 0, size);
136 req->rq_status = obd_statfs(req->rq_export, osfs);
137 if (req->rq_status != 0)
138 CERROR("ost: statfs failed: rc %d\n", req->rq_status);
143 static int ost_syncfs(struct ptlrpc_request *req)
145 struct obd_statfs *osfs;
146 int rc, size = sizeof(*osfs);
149 rc = lustre_pack_msg(0, &size, NULL, &req->rq_replen, &req->rq_repmsg);
153 rc = obd_syncfs(req->rq_export);
155 CERROR("ost: syncfs failed: rc %d\n", rc);
163 static int ost_open(struct ptlrpc_request *req, struct obd_trans_info *oti)
165 struct lustre_handle *conn = (struct lustre_handle *)req->rq_reqmsg;
166 struct ost_body *body, *repbody;
167 int rc, size = sizeof(*repbody);
170 body = lustre_swab_reqbuf (req, 0, sizeof (*body),
171 lustre_swab_ost_body);
175 rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
179 repbody = lustre_msg_buf (req->rq_repmsg, 0, sizeof (*repbody));
180 memcpy(&repbody->oa, &body->oa, sizeof(body->oa));
181 req->rq_status = obd_open(conn, &repbody->oa, NULL, oti, NULL);
185 static int ost_close(struct ptlrpc_request *req, struct obd_trans_info *oti)
187 struct lustre_handle *conn = (struct lustre_handle *)req->rq_reqmsg;
188 struct ost_body *body, *repbody;
189 int rc, size = sizeof(*repbody);
192 body = lustre_swab_reqbuf (req, 0, sizeof (*body),
193 lustre_swab_ost_body);
197 rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
201 repbody = lustre_msg_buf(req->rq_repmsg, 0, sizeof (*repbody));
202 memcpy(&repbody->oa, &body->oa, sizeof(body->oa));
203 req->rq_status = obd_close(conn, &repbody->oa, NULL, oti);
207 static int ost_create(struct ptlrpc_request *req, struct obd_trans_info *oti)
209 struct lustre_handle *conn = (struct lustre_handle *)req->rq_reqmsg;
210 struct ost_body *body, *repbody;
211 int rc, size = sizeof(*repbody);
214 body = lustre_swab_reqbuf (req, 0, sizeof (*body),
215 lustre_swab_ost_body);
219 rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
223 repbody = lustre_msg_buf (req->rq_repmsg, 0, sizeof (*repbody));
224 memcpy(&repbody->oa, &body->oa, sizeof(body->oa));
225 req->rq_status = obd_create(conn, &repbody->oa, NULL, oti);
229 static int ost_punch(struct ptlrpc_request *req, struct obd_trans_info *oti)
231 struct lustre_handle *conn = (struct lustre_handle *)req->rq_reqmsg;
232 struct ost_body *body, *repbody;
233 int rc, size = sizeof(*repbody);
236 body = lustre_swab_reqbuf (req, 0, sizeof (*body),
237 lustre_swab_ost_body);
241 if ((body->oa.o_valid & (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS)) !=
242 (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS))
245 rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
249 repbody = lustre_msg_buf(req->rq_repmsg, 0, sizeof (*repbody));
250 memcpy(&repbody->oa, &body->oa, sizeof(body->oa));
251 req->rq_status = obd_punch(conn, &repbody->oa, NULL, repbody->oa.o_size,
252 repbody->oa.o_blocks, oti);
256 static int ost_setattr(struct ptlrpc_request *req, struct obd_trans_info *oti)
258 struct lustre_handle *conn = &req->rq_reqmsg->handle;
259 struct ost_body *body, *repbody;
260 int rc, size = sizeof(*repbody);
263 body = lustre_swab_reqbuf (req, 0, sizeof (*body),
264 lustre_swab_ost_body);
268 rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
272 repbody = lustre_msg_buf(req->rq_repmsg, 0, sizeof (*repbody));
273 memcpy(&repbody->oa, &body->oa, sizeof(body->oa));
275 req->rq_status = obd_setattr(conn, &repbody->oa, NULL, oti);
279 static int ost_bulk_timeout(void *data)
282 /* We don't fail the connection here, because having the export
283 * killed makes the (vital) call to commitrw very sad.
288 static int get_per_page_niobufs (struct obd_ioobj *ioo, int nioo,
289 struct niobuf_remote *rnb, int nrnb,
290 struct niobuf_remote **pp_rnbp)
292 /* Copy a remote niobuf, splitting it into page-sized chunks
293 * and setting ioo[i].ioo_bufcnt accordingly */
294 struct niobuf_remote *pp_rnb;
301 /* first count and check the number of pages required */
302 for (i = 0; i < nioo; i++)
303 for (j = 0; j < ioo->ioo_bufcnt; j++, rnbidx++) {
304 obd_off offset = rnb[rnbidx].offset;
305 obd_off p0 = offset >> PAGE_SHIFT;
306 obd_off pn = (offset + rnb[rnbidx].len - 1)>>PAGE_SHIFT;
308 LASSERT (rnbidx < nrnb);
310 npages += (pn + 1 - p0);
312 if (rnb[rnbidx].len == 0) {
313 CERROR("zero len BRW: obj %d objid "LPX64
314 " buf %u\n", i, ioo[i].ioo_id, j);
318 rnb[rnbidx].offset <= rnb[rnbidx-1].offset) {
319 CERROR("unordered BRW: obj %d objid "LPX64
320 " buf %u offset "LPX64" <= "LPX64"\n",
321 i, ioo[i].ioo_id, j, rnb[rnbidx].offset,
327 LASSERT (rnbidx == nrnb);
329 if (npages == nrnb) { /* all niobufs are for single pages */
334 OBD_ALLOC (pp_rnb, sizeof (*pp_rnb) * npages);
338 /* now do the actual split */
340 for (i = 0; i < nioo; i++) {
343 for (j = 0; j < ioo[i].ioo_bufcnt; j++, rnbidx++) {
344 obd_off off = rnb[rnbidx].offset;
345 int nob = rnb[rnbidx].len;
347 LASSERT (rnbidx < nrnb);
349 obd_off poff = off & (PAGE_SIZE - 1);
350 int pnob = (poff + nob > PAGE_SIZE) ?
351 PAGE_SIZE - poff : nob;
353 LASSERT (page < npages);
354 pp_rnb[page].len = pnob;
355 pp_rnb[page].offset = off;
356 pp_rnb[page].flags = rnb->flags;
358 CDEBUG (D_PAGE, " obj %d id "LPX64
359 "page %d(%d) "LPX64" for %d\n",
360 i, ioo[i].ioo_id, obj_pages, page,
361 pp_rnb[page].offset, pp_rnb[page].len);
370 ioo[i].ioo_bufcnt = obj_pages;
372 LASSERT (page == npages);
378 static void free_per_page_niobufs (int npages, struct niobuf_remote *pp_rnb,
379 struct niobuf_remote *rnb)
381 if (pp_rnb == rnb) /* didn't allocate above */
384 OBD_FREE (pp_rnb, sizeof (*pp_rnb) * npages);
388 __u64 ost_checksum_bulk (struct ptlrpc_bulk_desc *desc)
391 struct list_head *tmp;
394 list_for_each (tmp, &desc->bd_page_list) {
395 struct ptlrpc_bulk_page *bp;
397 bp = list_entry (tmp, struct ptlrpc_bulk_page, bp_link);
398 ptr = kmap (bp->bp_page);
399 ost_checksum (&cksum, ptr + bp->bp_pageoffset, bp->bp_buflen);
400 kunmap (bp->bp_page);
405 static int ost_brw_read(struct ptlrpc_request *req)
407 struct ptlrpc_bulk_desc *desc;
408 struct niobuf_remote *remote_nb;
409 struct niobuf_remote *pp_rnb;
410 struct niobuf_local *local_nb;
411 struct obd_ioobj *ioo;
412 struct ost_body *body;
413 struct l_wait_info lwi;
414 void *desc_priv = NULL;
415 int size[1] = { sizeof(*body) };
424 if (OBD_FAIL_CHECK(OBD_FAIL_OST_BRW_READ_BULK))
425 GOTO(out, rc = -EIO);
427 body = lustre_swab_reqbuf(req, 0, sizeof(*body), lustre_swab_ost_body);
429 CERROR ("Missing/short ost_body\n");
430 GOTO (out, rc = -EFAULT);
433 ioo = lustre_swab_reqbuf (req, 1, sizeof (*ioo),
434 lustre_swab_obd_ioobj);
436 CERROR ("Missing/short ioobj\n");
437 GOTO (out, rc = -EFAULT);
440 niocount = ioo->ioo_bufcnt;
441 remote_nb = lustre_swab_reqbuf(req, 2, niocount * sizeof (*remote_nb),
442 lustre_swab_niobuf_remote);
443 if (remote_nb == NULL) {
444 CERROR ("Missing/short niobuf\n");
445 GOTO (out, rc = -EFAULT);
447 if (lustre_msg_swabbed (req->rq_reqmsg)) { /* swab remaining niobufs */
448 for (i = 1; i < niocount; i++)
449 lustre_swab_niobuf_remote (&remote_nb[i]);
452 rc = lustre_pack_msg(1, size, NULL, &req->rq_replen, &req->rq_repmsg);
456 /* CAVEAT EMPTOR this sets ioo->ioo_bufcnt to # pages */
457 npages = get_per_page_niobufs (ioo, 1, remote_nb, niocount, &pp_rnb);
459 GOTO(out, rc = npages);
461 OBD_ALLOC(local_nb, sizeof(*local_nb) * npages);
462 if (local_nb == NULL)
463 GOTO(out_pp_rnb, rc = -ENOMEM);
465 desc = ptlrpc_prep_bulk_exp (req, BULK_PUT_SOURCE, OST_BULK_PORTAL);
467 GOTO(out_local, rc = -ENOMEM);
469 rc = obd_preprw(OBD_BRW_READ, req->rq_export, NULL, 1, ioo, npages,
470 pp_rnb, local_nb, &desc_priv, NULL);
475 for (i = 0; i < npages; i++) {
476 int page_rc = local_nb[i].rc;
478 if (page_rc < 0) { /* error */
483 LASSERT (page_rc <= pp_rnb[i].len);
485 if (page_rc != 0) { /* some data! */
486 LASSERT (local_nb[i].page != NULL);
487 rc = ptlrpc_prep_bulk_page(desc, local_nb[i].page,
488 pp_rnb[i].offset& ~PAGE_MASK,
494 if (page_rc != pp_rnb[i].len) { /* short read */
495 /* All subsequent pages should be 0 */
497 LASSERT (local_nb[i].rc == 0);
503 rc = ptlrpc_bulk_put(desc);
505 lwi = LWI_TIMEOUT(obd_timeout * HZ, ost_bulk_timeout,
507 rc = l_wait_event(desc->bd_waitq,
508 ptlrpc_bulk_complete(desc), &lwi);
510 LASSERT(rc == -ETIMEDOUT);
511 CERROR ("timeout waiting for bulk PUT\n");
512 ptlrpc_abort_bulk (desc);
515 CERROR("ptlrpc_bulk_put failed RC: %d\n", rc);
517 comms_error = rc != 0;
520 /* Must commit after prep above in all cases */
521 rc = obd_commitrw(OBD_BRW_READ, req->rq_export, 1, ioo, npages,
522 local_nb, desc_priv, NULL);
526 body = lustre_msg_buf(req->rq_repmsg, 0, sizeof (*body));
527 body->oa.o_rdev = ost_checksum_bulk (desc);
528 body->oa.o_valid |= OBD_MD_FLCKSUM;
533 ptlrpc_free_bulk (desc);
535 OBD_FREE(local_nb, sizeof(*local_nb) * npages);
537 free_per_page_niobufs (npages, pp_rnb, remote_nb);
541 req->rq_status = nob;
543 } else if (!comms_error) {
544 /* only reply if comms OK */
548 if (req->rq_repmsg != NULL) {
549 /* reply out callback would free */
550 OBD_FREE (req->rq_repmsg, req->rq_replen);
552 CERROR("bulk IO comms error: evicting %s@%s nid "LPU64"\n",
553 req->rq_export->exp_client_uuid.uuid,
554 req->rq_connection->c_remote_uuid.uuid,
555 req->rq_connection->c_peer.peer_nid);
556 ptlrpc_fail_export(req->rq_export);
562 static int ost_brw_write(struct ptlrpc_request *req, struct obd_trans_info *oti)
564 struct ptlrpc_bulk_desc *desc;
565 struct niobuf_remote *remote_nb;
566 struct niobuf_remote *pp_rnb;
567 struct niobuf_local *local_nb;
568 struct obd_ioobj *ioo;
569 struct ost_body *body;
570 struct l_wait_info lwi;
571 void *desc_priv = NULL;
573 int size[2] = { sizeof (*body) };
574 int objcount, niocount, npages;
576 int rc, rc2, swab, i, j;
579 if (OBD_FAIL_CHECK(OBD_FAIL_OST_BRW_WRITE_BULK))
580 GOTO(out, rc = -EIO);
582 /* pause before transaction has been started */
583 OBD_FAIL_TIMEOUT(OBD_FAIL_OST_BRW_PAUSE_BULK | OBD_FAIL_ONCE,
586 swab = lustre_msg_swabbed (req->rq_reqmsg);
587 body = lustre_swab_reqbuf (req, 0, sizeof (*body),
588 lustre_swab_ost_body);
590 CERROR ("Missing/short ost_body\n");
591 GOTO(out, rc = -EFAULT);
594 LASSERT_REQSWAB (req, 1);
595 objcount = req->rq_reqmsg->buflens[1] / sizeof(*ioo);
597 CERROR ("Missing/short ioobj\n");
598 GOTO (out, rc = -EFAULT);
600 ioo = lustre_msg_buf (req->rq_reqmsg, 1, objcount * sizeof (*ioo));
601 LASSERT (ioo != NULL);
602 for (niocount = i = 0; i < objcount; i++) {
604 lustre_swab_obd_ioobj (&ioo[i]);
605 if (ioo[i].ioo_bufcnt == 0) {
606 CERROR ("ioo[%d] has zero bufcnt\n", i);
607 GOTO (out, rc = -EFAULT);
609 niocount += ioo[i].ioo_bufcnt;
612 remote_nb = lustre_swab_reqbuf(req, 2, niocount * sizeof (*remote_nb),
613 lustre_swab_niobuf_remote);
614 if (remote_nb == NULL) {
615 CERROR ("Missing/short niobuf\n");
616 GOTO(out, rc = -EFAULT);
618 if (swab) { /* swab the remaining niobufs */
619 for (i = 1; i < niocount; i++)
620 lustre_swab_niobuf_remote (&remote_nb[i]);
623 size[1] = niocount * sizeof (*rcs);
624 rc = lustre_pack_msg(2, size, NULL, &req->rq_replen,
628 rcs = lustre_msg_buf (req->rq_repmsg, 1, niocount * sizeof (*rcs));
630 /* CAVEAT EMPTOR this sets ioo->ioo_bufcnt to # pages */
631 npages = get_per_page_niobufs(ioo, objcount,remote_nb,niocount,&pp_rnb);
633 GOTO (out, rc = npages);
635 OBD_ALLOC(local_nb, sizeof(*local_nb) * npages);
636 if (local_nb == NULL)
637 GOTO(out_pp_rnb, rc = -ENOMEM);
639 desc = ptlrpc_prep_bulk_exp (req, BULK_GET_SINK, OST_BULK_PORTAL);
641 GOTO(out_local, rc = -ENOMEM);
643 rc = obd_preprw(OBD_BRW_WRITE, req->rq_export, NULL, objcount, ioo,
644 npages, pp_rnb, local_nb, &desc_priv, oti);
648 /* NB Having prepped, we must commit... */
650 for (i = 0; i < npages; i++) {
651 rc = ptlrpc_prep_bulk_page(desc, local_nb[i].page,
652 pp_rnb[i].offset & (PAGE_SIZE - 1),
659 rc = ptlrpc_bulk_get(desc);
661 lwi = LWI_TIMEOUT(obd_timeout * HZ, ost_bulk_timeout,
663 rc = l_wait_event(desc->bd_waitq,
664 ptlrpc_bulk_complete(desc), &lwi);
666 LASSERT(rc == -ETIMEDOUT);
667 CERROR ("timeout waiting for bulk GET\n");
668 ptlrpc_abort_bulk (desc);
671 CERROR("ptlrpc_bulk_get failed RC: %d\n", rc);
673 comms_error = rc != 0;
677 if (rc == 0 && (body->oa.o_valid & OBD_MD_FLCKSUM) != 0) {
678 static int cksum_counter;
679 __u64 client_cksum = body->oa.o_rdev;
680 __u64 cksum = ost_checksum_bulk (desc);
682 if (client_cksum != cksum) {
683 CERROR("Bad checksum: client "LPX64", server "LPX64
684 ", client NID "LPX64"\n", client_cksum, cksum,
685 req->rq_connection->c_peer.peer_nid);
689 if ((cksum_counter & (-cksum_counter)) == cksum_counter)
690 CERROR("Checksum %d from "LPX64": "LPX64" OK\n",
692 req->rq_connection->c_peer.peer_nid,
697 /* Must commit after prep above in all cases */
698 rc2 = obd_commitrw(OBD_BRW_WRITE, req->rq_export, objcount, ioo,
699 npages, local_nb, desc_priv, oti);
702 /* set per-requested niobuf return codes */
703 for (i = j = 0; i < niocount; i++) {
704 int nob = remote_nb[i].len;
708 LASSERT (j < npages);
709 if (local_nb[j].rc < 0)
710 rcs[i] = local_nb[j].rc;
711 nob -= pp_rnb[j].len;
716 LASSERT (j == npages);
722 ptlrpc_free_bulk (desc);
724 OBD_FREE(local_nb, sizeof(*local_nb) * npages);
726 free_per_page_niobufs (npages, pp_rnb, remote_nb);
729 oti_to_request(oti, req);
730 rc = ptlrpc_reply(req);
731 } else if (!comms_error) {
732 /* Only reply if there was no comms problem with bulk */
736 if (req->rq_repmsg != NULL) {
737 /* reply out callback would free */
738 OBD_FREE (req->rq_repmsg, req->rq_replen);
740 CERROR("bulk IO comms error: evicting %s@%s nid "LPU64"\n",
741 req->rq_export->exp_client_uuid.uuid,
742 req->rq_connection->c_remote_uuid.uuid,
743 req->rq_connection->c_peer.peer_nid);
744 ptlrpc_fail_export(req->rq_export);
749 static int ost_san_brw(struct ptlrpc_request *req, int cmd)
751 struct lustre_handle *conn = &req->rq_reqmsg->handle;
752 struct niobuf_remote *remote_nb, *res_nb;
753 struct obd_ioobj *ioo;
754 struct ost_body *body;
755 int rc, i, j, objcount, niocount, size[2] = {sizeof(*body)};
760 /* XXX not set to use latest protocol */
762 swab = lustre_msg_swabbed (req->rq_reqmsg);
763 body = lustre_swab_reqbuf (req, 0, sizeof (*body),
764 lustre_swab_ost_body);
766 CERROR ("Missing/short ost_body\n");
767 GOTO (out, rc = -EFAULT);
770 ioo = lustre_swab_reqbuf(req, 1, sizeof (*ioo),
771 lustre_swab_obd_ioobj);
773 CERROR ("Missing/short ioobj\n");
774 GOTO (out, rc = -EFAULT);
776 objcount = req->rq_reqmsg->buflens[1] / sizeof(*ioo);
777 niocount = ioo[0].ioo_bufcnt;
778 for (i = 1; i < objcount; i++) {
780 lustre_swab_obd_ioobj (&ioo[i]);
781 niocount += ioo[i].ioo_bufcnt;
784 remote_nb = lustre_swab_reqbuf(req, 2, niocount * sizeof (*remote_nb),
785 lustre_swab_niobuf_remote);
786 if (remote_nb == NULL) {
787 CERROR ("Missing/short niobuf\n");
788 GOTO (out, rc = -EFAULT);
790 if (swab) { /* swab the remaining niobufs */
791 for (i = 1; i < niocount; i++)
792 lustre_swab_niobuf_remote (&remote_nb[i]);
795 for (i = n = 0; i < objcount; i++) {
796 for (j = 0; j < ioo[i].ioo_bufcnt; j++, n++) {
797 if (remote_nb[n].len == 0) {
798 CERROR("zero len BRW: objid "LPX64" buf %u\n",
800 GOTO(out, rc = -EINVAL);
802 if (j && remote_nb[n].offset <= remote_nb[n-1].offset) {
803 CERROR("unordered BRW: objid "LPX64
804 " buf %u offset "LPX64" <= "LPX64"\n",
805 ioo[i].ioo_id, j, remote_nb[n].offset,
806 remote_nb[n-1].offset);
807 GOTO(out, rc = -EINVAL);
812 size[1] = niocount * sizeof(*remote_nb);
813 rc = lustre_pack_msg(2, size, NULL, &req->rq_replen, &req->rq_repmsg);
817 req->rq_status = obd_san_preprw(cmd, conn, objcount, ioo,
818 niocount, remote_nb);
823 res_nb = lustre_msg_buf(req->rq_repmsg, 1, size[1]);
824 memcpy (res_nb, remote_nb, size[1]);
828 OBD_FREE(req->rq_repmsg, req->rq_replen);
829 req->rq_repmsg = NULL;
838 static int filter_recovery_request(struct ptlrpc_request *req,
839 struct obd_device *obd, int *process)
841 switch (req->rq_reqmsg->opc) {
842 case OST_CONNECT: /* This will never get here, but for completeness. */
857 *process = target_queue_recovery_request(req, obd);
861 DEBUG_REQ(D_ERROR, req, "not permitted during recovery");
863 /* XXX what should we set rq_status to here? */
864 req->rq_status = -EAGAIN;
865 RETURN(ptlrpc_error(req));
871 static int ost_handle(struct ptlrpc_request *req)
873 struct obd_trans_info trans_info = { 0, };
874 struct obd_trans_info *oti = &trans_info;
875 int should_process, fail = OBD_FAIL_OST_ALL_REPLY_NET, rc = 0;
878 /* XXX identical to MDS */
879 if (req->rq_reqmsg->opc != OST_CONNECT) {
880 struct obd_device *obd;
881 int abort_recovery, recovering;
883 if (req->rq_export == NULL) {
884 CERROR("lustre_ost: operation %d on unconnected OST\n",
885 req->rq_reqmsg->opc);
886 req->rq_status = -ENOTCONN;
887 GOTO(out, rc = -ENOTCONN);
890 obd = req->rq_export->exp_obd;
892 /* Check for aborted recovery. */
893 spin_lock_bh(&obd->obd_processing_task_lock);
894 abort_recovery = obd->obd_abort_recovery;
895 recovering = obd->obd_recovering;
896 spin_unlock_bh(&obd->obd_processing_task_lock);
897 if (abort_recovery) {
898 target_abort_recovery(obd);
899 } else if (recovering) {
900 rc = filter_recovery_request(req, obd, &should_process);
901 if (rc || !should_process)
906 if (strcmp(req->rq_obd->obd_type->typ_name, "ost") != 0)
907 GOTO(out, rc = -EINVAL);
911 switch (req->rq_reqmsg->opc) {
913 CDEBUG(D_INODE, "connect\n");
914 OBD_FAIL_RETURN(OBD_FAIL_OST_CONNECT_NET, 0);
915 rc = target_handle_connect(req, ost_handle);
918 CDEBUG(D_INODE, "disconnect\n");
919 OBD_FAIL_RETURN(OBD_FAIL_OST_DISCONNECT_NET, 0);
920 rc = target_handle_disconnect(req);
923 CDEBUG(D_INODE, "create\n");
924 OBD_FAIL_RETURN(OBD_FAIL_OST_CREATE_NET, 0);
925 rc = ost_create(req, oti);
928 CDEBUG(D_INODE, "destroy\n");
929 OBD_FAIL_RETURN(OBD_FAIL_OST_DESTROY_NET, 0);
930 rc = ost_destroy(req, oti);
933 CDEBUG(D_INODE, "getattr\n");
934 OBD_FAIL_RETURN(OBD_FAIL_OST_GETATTR_NET, 0);
935 rc = ost_getattr(req);
938 CDEBUG(D_INODE, "setattr\n");
939 OBD_FAIL_RETURN(OBD_FAIL_OST_SETATTR_NET, 0);
940 rc = ost_setattr(req, oti);
943 CDEBUG(D_INODE, "open\n");
944 OBD_FAIL_RETURN(OBD_FAIL_OST_OPEN_NET, 0);
945 rc = ost_open(req, oti);
948 CDEBUG(D_INODE, "close\n");
949 OBD_FAIL_RETURN(OBD_FAIL_OST_CLOSE_NET, 0);
950 rc = ost_close(req, oti);
953 CDEBUG(D_INODE, "write\n");
954 OBD_FAIL_RETURN(OBD_FAIL_OST_BRW_NET, 0);
955 rc = ost_brw_write(req, oti);
956 /* ost_brw sends its own replies */
959 CDEBUG(D_INODE, "read\n");
960 OBD_FAIL_RETURN(OBD_FAIL_OST_BRW_NET, 0);
961 rc = ost_brw_read(req);
962 /* ost_brw sends its own replies */
965 CDEBUG(D_INODE, "san read\n");
966 OBD_FAIL_RETURN(OBD_FAIL_OST_BRW_NET, 0);
967 rc = ost_san_brw(req, OBD_BRW_READ);
968 /* ost_san_brw sends its own replies */
971 CDEBUG(D_INODE, "san write\n");
972 OBD_FAIL_RETURN(OBD_FAIL_OST_BRW_NET, 0);
973 rc = ost_san_brw(req, OBD_BRW_WRITE);
974 /* ost_san_brw sends its own replies */
977 CDEBUG(D_INODE, "punch\n");
978 OBD_FAIL_RETURN(OBD_FAIL_OST_PUNCH_NET, 0);
979 rc = ost_punch(req, oti);
982 CDEBUG(D_INODE, "statfs\n");
983 OBD_FAIL_RETURN(OBD_FAIL_OST_STATFS_NET, 0);
984 rc = ost_statfs(req);
987 CDEBUG(D_INODE, "sync\n");
988 OBD_FAIL_RETURN(OBD_FAIL_OST_SYNCFS_NET, 0);
989 rc = ost_syncfs(req);
992 DEBUG_REQ(D_INODE, req, "ping");
993 rc = target_handle_ping(req);
996 CDEBUG(D_INODE, "enqueue\n");
997 OBD_FAIL_RETURN(OBD_FAIL_LDLM_ENQUEUE, 0);
998 rc = ldlm_handle_enqueue(req, ldlm_server_completion_ast,
999 ldlm_server_blocking_ast);
1000 fail = OBD_FAIL_OST_LDLM_REPLY_NET;
1003 CDEBUG(D_INODE, "convert\n");
1004 OBD_FAIL_RETURN(OBD_FAIL_LDLM_CONVERT, 0);
1005 rc = ldlm_handle_convert(req);
1008 CDEBUG(D_INODE, "cancel\n");
1009 OBD_FAIL_RETURN(OBD_FAIL_LDLM_CANCEL, 0);
1010 rc = ldlm_handle_cancel(req);
1012 case LDLM_BL_CALLBACK:
1013 case LDLM_CP_CALLBACK:
1014 CDEBUG(D_INODE, "callback\n");
1015 CERROR("callbacks should not happen on OST\n");
1018 CERROR("Unexpected opcode %d\n", req->rq_reqmsg->opc);
1019 req->rq_status = -ENOTSUPP;
1020 rc = ptlrpc_error(req);
1025 /* If we're DISCONNECTing, the export_data is already freed */
1026 if (!rc && req->rq_reqmsg->opc != OST_DISCONNECT) {
1027 struct obd_device *obd = req->rq_export->exp_obd;
1028 if (!obd->obd_no_transno) {
1029 req->rq_repmsg->last_committed =
1030 obd->obd_last_committed;
1032 DEBUG_REQ(D_IOCTL, req,
1033 "not sending last_committed update");
1035 CDEBUG(D_INFO, "last_committed "LPU64", xid "LPX64"\n",
1036 obd->obd_last_committed, req->rq_xid);
1040 if (lustre_msg_get_flags(req->rq_reqmsg) & MSG_LAST_REPLAY) {
1041 struct obd_device *obd = req->rq_export->exp_obd;
1043 if (obd && obd->obd_recovering) {
1044 DEBUG_REQ(D_HA, req, "LAST_REPLAY, queuing reply");
1045 return target_queue_final_reply(req, rc);
1047 /* Lost a race with recovery; let the error path DTRT. */
1048 rc = req->rq_status = -ENOTCONN;
1052 oti_to_request(oti, req);
1054 target_send_reply(req, rc, fail);
1058 static int ost_setup(struct obd_device *obddev, obd_count len, void *buf)
1060 struct ost_obd *ost = &obddev->u.ost;
1065 ost->ost_service = ptlrpc_init_svc(OST_NEVENTS, OST_NBUFS,
1066 OST_BUFSIZE, OST_MAXREQSIZE,
1067 OST_REQUEST_PORTAL, OSC_REPLY_PORTAL,
1068 ost_handle, "ost", obddev);
1069 if (!ost->ost_service) {
1070 CERROR("failed to start service\n");
1071 GOTO(error_disc, err = -ENOMEM);
1074 for (i = 0; i < OST_NUM_THREADS; i++) {
1076 sprintf(name, "ll_ost_%02d", i);
1077 err = ptlrpc_start_thread(obddev, ost->ost_service, name);
1079 CERROR("error starting thread #%d: rc %d\n", i, err);
1080 GOTO(error_disc, err = -EINVAL);
1090 static int ost_cleanup(struct obd_device *obddev, int force, int failover)
1092 struct ost_obd *ost = &obddev->u.ost;
1096 if (obddev->obd_recovering)
1097 target_cancel_recovery_timer(obddev);
1099 ptlrpc_stop_all_threads(ost->ost_service);
1100 ptlrpc_unregister_service(ost->ost_service);
1105 int ost_attach(struct obd_device *dev, obd_count len, void *data)
1107 struct lprocfs_static_vars lvars;
1109 lprocfs_init_vars(&lvars);
1110 return lprocfs_obd_attach(dev, lvars.obd_vars);
1113 int ost_detach(struct obd_device *dev)
1115 return lprocfs_obd_detach(dev);
1118 /* I don't think this function is ever used, since nothing
1119 * connects directly to this module.
1121 static int ost_connect(struct lustre_handle *conn,
1122 struct obd_device *obd, struct obd_uuid *cluuid)
1124 struct obd_export *exp;
1128 if (!conn || !obd || !cluuid)
1131 rc = class_connect(conn, obd, cluuid);
1134 exp = class_conn2export(conn);
1136 class_export_put(exp);
1141 /* use obd ops to offer management infrastructure */
1142 static struct obd_ops ost_obd_ops = {
1143 o_owner: THIS_MODULE,
1144 o_attach: ost_attach,
1145 o_detach: ost_detach,
1147 o_cleanup: ost_cleanup,
1148 o_connect: ost_connect,
1151 static int __init ost_init(void)
1153 struct lprocfs_static_vars lvars;
1156 lprocfs_init_vars(&lvars);
1157 RETURN(class_register_type(&ost_obd_ops, lvars.module_vars,
1161 static void __exit ost_exit(void)
1163 class_unregister_type(LUSTRE_OST_NAME);
1166 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
1167 MODULE_DESCRIPTION("Lustre Object Storage Target (OST) v0.01");
1168 MODULE_LICENSE("GPL");
1170 module_init(ost_init);
1171 module_exit(ost_exit);