1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
4 * Copyright (C) 2001-2003 Cluster File Systems, Inc.
5 * Author: Peter J. Braam <braam@clusterfs.com>
6 * Author: Phil Schwan <phil@clusterfs.com>
8 * This file is part of Lustre, http://www.lustre.org.
10 * Lustre is free software; you can redistribute it and/or
11 * modify it under the terms of version 2 of the GNU General Public
12 * License as published by the Free Software Foundation.
14 * Lustre is distributed in the hope that it will be useful,
15 * but WITHOUT ANY WARRANTY; without even the implied warranty of
16 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 * GNU General Public License for more details.
19 * You should have received a copy of the GNU General Public License
20 * along with Lustre; if not, write to the Free Software
21 * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
23 * Storage Target Handling functions
24 * Lustre Object Server Module (OST)
26 * This server is single threaded at present (but can easily be multi
27 * threaded). For testing and management it is treated as an
28 * obd_device, although it does not export a full OBD method table
29 * (the requests are coming in over the wire, so object target
30 * modules do not have a full method table.)
34 # define EXPORT_SYMTAB
36 #define DEBUG_SUBSYSTEM S_OST
38 #include <linux/module.h>
39 #include <linux/obd_ost.h>
40 #include <linux/lustre_net.h>
41 #include <linux/lustre_dlm.h>
42 #include <linux/lustre_export.h>
43 #include <linux/init.h>
44 #include <linux/lprocfs_status.h>
45 #include <linux/lustre_commit_confd.h>
46 #include <portals/list.h>
48 void oti_init(struct obd_trans_info *oti, struct ptlrpc_request *req)
52 memset(oti, 0, sizeof *oti);
54 if (req->rq_repmsg && req->rq_reqmsg != 0)
55 oti->oti_transno = req->rq_repmsg->transno;
58 void oti_to_request(struct obd_trans_info *oti, struct ptlrpc_request *req)
60 struct oti_req_ack_lock *ack_lock;
67 req->rq_repmsg->transno = oti->oti_transno;
69 /* XXX 4 == entries in oti_ack_locks??? */
70 for (ack_lock = oti->oti_ack_locks, i = 0; i < 4; i++, ack_lock++) {
73 ldlm_put_lock_into_req(req, &ack_lock->lock, ack_lock->mode);
77 static int ost_destroy(struct obd_export *exp, struct ptlrpc_request *req,
78 struct obd_trans_info *oti)
80 struct ost_body *body, *repbody;
81 int rc, size = sizeof(*body);
84 body = lustre_swab_reqbuf(req, 0, sizeof(*body), lustre_swab_ost_body);
88 rc = lustre_pack_reply(req, 1, &size, NULL);
92 if (body->oa.o_valid & OBD_MD_FLCOOKIE)
93 oti->oti_logcookies = obdo_logcookie(&body->oa);
94 repbody = lustre_msg_buf(req->rq_repmsg, 0, sizeof(*repbody));
95 memcpy(&repbody->oa, &body->oa, sizeof(body->oa));
96 req->rq_status = obd_destroy(exp, &body->oa, NULL, oti);
100 static int ost_getattr(struct obd_export *exp, struct ptlrpc_request *req)
102 struct ost_body *body, *repbody;
103 int rc, size = sizeof(*body);
106 body = lustre_swab_reqbuf(req, 0, sizeof(*body), lustre_swab_ost_body);
110 rc = lustre_pack_reply(req, 1, &size, NULL);
114 repbody = lustre_msg_buf (req->rq_repmsg, 0, sizeof(*repbody));
115 memcpy(&repbody->oa, &body->oa, sizeof(body->oa));
116 req->rq_status = obd_getattr(exp, &repbody->oa, NULL);
120 static int ost_statfs(struct ptlrpc_request *req)
122 struct obd_statfs *osfs;
123 int rc, size = sizeof(*osfs);
126 rc = lustre_pack_reply(req, 1, &size, NULL);
130 osfs = lustre_msg_buf(req->rq_repmsg, 0, sizeof(*osfs));
132 req->rq_status = obd_statfs(req->rq_export->exp_obd, osfs, jiffies-HZ);
133 if (req->rq_status != 0)
134 CERROR("ost: statfs failed: rc %d\n", req->rq_status);
139 static int ost_create(struct obd_export *exp, struct ptlrpc_request *req,
140 struct obd_trans_info *oti)
142 struct ost_body *body, *repbody;
143 int rc, size = sizeof(*repbody);
146 body = lustre_swab_reqbuf(req, 0, sizeof(*body), lustre_swab_ost_body);
150 rc = lustre_pack_reply(req, 1, &size, NULL);
154 repbody = lustre_msg_buf (req->rq_repmsg, 0, sizeof(*repbody));
155 memcpy(&repbody->oa, &body->oa, sizeof(body->oa));
156 oti->oti_logcookies = obdo_logcookie(&repbody->oa);
157 req->rq_status = obd_create(exp, &repbody->oa, NULL, oti);
158 //obd_log_cancel(conn, NULL, 1, oti->oti_logcookies, 0);
162 static int ost_punch(struct obd_export *exp, struct ptlrpc_request *req,
163 struct obd_trans_info *oti)
165 struct ost_body *body, *repbody;
166 int rc, size = sizeof(*repbody);
169 body = lustre_swab_reqbuf(req, 0, sizeof(*body), lustre_swab_ost_body);
173 if ((body->oa.o_valid & (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS)) !=
174 (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS))
177 rc = lustre_pack_reply(req, 1, &size, NULL);
181 repbody = lustre_msg_buf(req->rq_repmsg, 0, sizeof(*repbody));
182 memcpy(&repbody->oa, &body->oa, sizeof(body->oa));
183 req->rq_status = obd_punch(exp, &repbody->oa, NULL, repbody->oa.o_size,
184 repbody->oa.o_blocks, oti);
188 static int ost_sync(struct obd_export *exp, struct ptlrpc_request *req)
190 struct ost_body *body, *repbody;
191 int rc, size = sizeof(*repbody);
194 body = lustre_swab_reqbuf(req, 0, sizeof(*body), lustre_swab_ost_body);
198 rc = lustre_pack_reply(req, 1, &size, NULL);
202 repbody = lustre_msg_buf(req->rq_repmsg, 0, sizeof(*repbody));
203 memcpy(&repbody->oa, &body->oa, sizeof(body->oa));
204 req->rq_status = obd_sync(exp, &repbody->oa, NULL, repbody->oa.o_size,
205 repbody->oa.o_blocks);
209 static int ost_setattr(struct obd_export *exp, struct ptlrpc_request *req,
210 struct obd_trans_info *oti)
212 struct ost_body *body, *repbody;
213 int rc, size = sizeof(*repbody);
216 body = lustre_swab_reqbuf(req, 0, sizeof(*body), lustre_swab_ost_body);
220 rc = lustre_pack_reply(req, 1, &size, NULL);
224 repbody = lustre_msg_buf(req->rq_repmsg, 0, sizeof(*repbody));
225 memcpy(&repbody->oa, &body->oa, sizeof(body->oa));
227 req->rq_status = obd_setattr(exp, &repbody->oa, NULL, oti);
231 static int ost_bulk_timeout(void *data)
234 /* We don't fail the connection here, because having the export
235 * killed makes the (vital) call to commitrw very sad.
240 static int get_per_page_niobufs(struct obd_ioobj *ioo, int nioo,
241 struct niobuf_remote *rnb, int nrnb,
242 struct niobuf_remote **pp_rnbp)
244 /* Copy a remote niobuf, splitting it into page-sized chunks
245 * and setting ioo[i].ioo_bufcnt accordingly */
246 struct niobuf_remote *pp_rnb;
253 /* first count and check the number of pages required */
254 for (i = 0; i < nioo; i++)
255 for (j = 0; j < ioo->ioo_bufcnt; j++, rnbidx++) {
256 obd_off offset = rnb[rnbidx].offset;
257 obd_off p0 = offset >> PAGE_SHIFT;
258 obd_off pn = (offset + rnb[rnbidx].len - 1)>>PAGE_SHIFT;
260 LASSERT(rnbidx < nrnb);
262 npages += (pn + 1 - p0);
264 if (rnb[rnbidx].len == 0) {
265 CERROR("zero len BRW: obj %d objid "LPX64
266 " buf %u\n", i, ioo[i].ioo_id, j);
270 rnb[rnbidx].offset <= rnb[rnbidx-1].offset) {
271 CERROR("unordered BRW: obj %d objid "LPX64
272 " buf %u offset "LPX64" <= "LPX64"\n",
273 i, ioo[i].ioo_id, j, rnb[rnbidx].offset,
279 LASSERT(rnbidx == nrnb);
281 if (npages == nrnb) { /* all niobufs are for single pages */
286 OBD_ALLOC(pp_rnb, sizeof(*pp_rnb) * npages);
290 /* now do the actual split */
292 for (i = 0; i < nioo; i++) {
295 for (j = 0; j < ioo[i].ioo_bufcnt; j++, rnbidx++) {
296 obd_off off = rnb[rnbidx].offset;
297 int nob = rnb[rnbidx].len;
299 LASSERT(rnbidx < nrnb);
301 obd_off poff = off & (PAGE_SIZE - 1);
302 int pnob = (poff + nob > PAGE_SIZE) ?
303 PAGE_SIZE - poff : nob;
305 LASSERT(page < npages);
306 pp_rnb[page].len = pnob;
307 pp_rnb[page].offset = off;
308 pp_rnb[page].flags = rnb[rnbidx].flags;
310 CDEBUG(0, " obj %d id "LPX64
311 "page %d(%d) "LPX64" for %d, flg %x\n",
312 i, ioo[i].ioo_id, obj_pages, page,
313 pp_rnb[page].offset, pp_rnb[page].len,
323 ioo[i].ioo_bufcnt = obj_pages;
325 LASSERT(page == npages);
331 static void free_per_page_niobufs (int npages, struct niobuf_remote *pp_rnb,
332 struct niobuf_remote *rnb)
334 if (pp_rnb == rnb) /* didn't allocate above */
337 OBD_FREE(pp_rnb, sizeof(*pp_rnb) * npages);
341 obd_count ost_checksum_bulk(struct ptlrpc_bulk_desc *desc)
344 struct ptlrpc_bulk_page *bp;
346 list_for_each_entry(bp, &desc->bd_page_list, bp_link) {
347 ost_checksum(&cksum, kmap(bp->bp_page) + bp->bp_pageoffset,
356 static int ost_brw_read(struct ptlrpc_request *req)
358 struct ptlrpc_bulk_desc *desc;
359 struct niobuf_remote *remote_nb;
360 struct niobuf_remote *pp_rnb;
361 struct niobuf_local *local_nb;
362 struct obd_ioobj *ioo;
363 struct ost_body *body, *repbody;
364 struct l_wait_info lwi;
365 struct obd_trans_info oti = { 0 };
366 char str[PTL_NALFMT_SIZE];
367 int size[1] = { sizeof(*body) };
376 if (OBD_FAIL_CHECK(OBD_FAIL_OST_BRW_READ_BULK))
377 GOTO(out, rc = -EIO);
379 OBD_FAIL_TIMEOUT(OBD_FAIL_OST_BRW_PAUSE_BULK | OBD_FAIL_ONCE,
380 (obd_timeout + 1) / 4);
382 body = lustre_swab_reqbuf(req, 0, sizeof(*body), lustre_swab_ost_body);
384 CERROR("Missing/short ost_body\n");
385 GOTO(out, rc = -EFAULT);
388 ioo = lustre_swab_reqbuf(req, 1, sizeof(*ioo), lustre_swab_obd_ioobj);
390 CERROR("Missing/short ioobj\n");
391 GOTO(out, rc = -EFAULT);
394 niocount = ioo->ioo_bufcnt;
395 remote_nb = lustre_swab_reqbuf(req, 2, niocount * sizeof(*remote_nb),
396 lustre_swab_niobuf_remote);
397 if (remote_nb == NULL) {
398 CERROR("Missing/short niobuf\n");
399 GOTO(out, rc = -EFAULT);
401 if (lustre_msg_swabbed(req->rq_reqmsg)) { /* swab remaining niobufs */
402 for (i = 1; i < niocount; i++)
403 lustre_swab_niobuf_remote (&remote_nb[i]);
406 rc = lustre_pack_reply(req, 1, size, NULL);
410 /* FIXME all niobuf splitting should be done in obdfilter if needed */
411 /* CAVEAT EMPTOR this sets ioo->ioo_bufcnt to # pages */
412 npages = get_per_page_niobufs(ioo, 1, remote_nb, niocount, &pp_rnb);
414 GOTO(out, rc = npages);
416 OBD_ALLOC(local_nb, sizeof(*local_nb) * npages);
417 if (local_nb == NULL)
418 GOTO(out_pp_rnb, rc = -ENOMEM);
420 desc = ptlrpc_prep_bulk_exp(req, BULK_PUT_SOURCE, OST_BULK_PORTAL);
422 GOTO(out_local, rc = -ENOMEM);
424 rc = obd_preprw(OBD_BRW_READ, req->rq_export, &body->oa, 1,
425 ioo, npages, pp_rnb, local_nb, &oti);
430 for (i = 0; i < npages; i++) {
431 int page_rc = local_nb[i].rc;
433 if (page_rc < 0) { /* error */
438 LASSERT(page_rc <= pp_rnb[i].len);
440 if (page_rc != 0) { /* some data! */
441 LASSERT (local_nb[i].page != NULL);
442 rc = ptlrpc_prep_bulk_page(desc, local_nb[i].page,
443 pp_rnb[i].offset& ~PAGE_MASK,
449 if (page_rc != pp_rnb[i].len) { /* short read */
450 /* All subsequent pages should be 0 */
452 LASSERT(local_nb[i].rc == 0);
458 rc = ptlrpc_bulk_put(desc);
460 lwi = LWI_TIMEOUT(obd_timeout * HZ / 4,
461 ost_bulk_timeout, desc);
462 rc = l_wait_event(desc->bd_waitq,
463 ptlrpc_bulk_complete(desc), &lwi);
465 LASSERT(rc == -ETIMEDOUT);
466 DEBUG_REQ(D_ERROR, req, "timeout on bulk PUT");
467 ptlrpc_abort_bulk(desc);
470 DEBUG_REQ(D_ERROR, req, "bulk PUT failed: rc %d\n", rc);
472 comms_error = rc != 0;
475 /* Must commit after prep above in all cases */
476 rc = obd_commitrw(OBD_BRW_READ, req->rq_export, &body->oa, 1,
477 ioo, npages, local_nb, &oti);
480 repbody = lustre_msg_buf(req->rq_repmsg, 0, sizeof(*repbody));
481 memcpy(&repbody->oa, &body->oa, sizeof(repbody->oa));
484 repbody->oa.o_cksum = ost_checksum_bulk(desc);
485 repbody->oa.o_valid |= OBD_MD_FLCKSUM;
490 ptlrpc_free_bulk(desc);
492 OBD_FREE(local_nb, sizeof(*local_nb) * npages);
494 free_per_page_niobufs(npages, pp_rnb, remote_nb);
498 req->rq_status = nob;
500 } else if (!comms_error) {
501 /* only reply if comms OK */
505 if (req->rq_repmsg != NULL) {
506 /* reply out callback would free */
507 OBD_FREE(req->rq_repmsg, req->rq_replen);
509 if (req->rq_reqmsg->conn_cnt == req->rq_export->exp_conn_cnt) {
510 CERROR("bulk IO comms error: "
511 "evicting %s@%s nid "LPX64" (%s)\n",
512 req->rq_export->exp_client_uuid.uuid,
513 req->rq_export->exp_connection->c_remote_uuid.uuid,
514 req->rq_peer.peer_nid,
515 portals_nid2str(req->rq_peer.peer_ni->pni_number,
516 req->rq_peer.peer_nid,
518 ptlrpc_fail_export(req->rq_export);
520 CERROR("ignoring bulk IO comms error: "
521 "client reconnected %s@%s nid "LPX64" (%s)\n",
522 req->rq_export->exp_client_uuid.uuid,
523 req->rq_export->exp_connection->c_remote_uuid.uuid,
524 req->rq_peer.peer_nid,
525 portals_nid2str(req->rq_peer.peer_ni->pni_number,
526 req->rq_peer.peer_nid,
534 static int ost_brw_write(struct ptlrpc_request *req, struct obd_trans_info *oti)
536 struct ptlrpc_bulk_desc *desc;
537 struct niobuf_remote *remote_nb;
538 struct niobuf_remote *pp_rnb;
539 struct niobuf_local *local_nb;
540 struct obd_ioobj *ioo;
541 struct ost_body *body, *repbody;
542 struct l_wait_info lwi;
544 int size[2] = { sizeof(*body) };
545 int objcount, niocount, npages;
547 int rc, rc2, swab, i, j;
548 char str[PTL_NALFMT_SIZE];
551 if (OBD_FAIL_CHECK(OBD_FAIL_OST_BRW_WRITE_BULK))
552 GOTO(out, rc = -EIO);
554 /* pause before transaction has been started */
555 OBD_FAIL_TIMEOUT(OBD_FAIL_OST_BRW_PAUSE_BULK | OBD_FAIL_ONCE,
556 (obd_timeout + 1) / 4);
558 swab = lustre_msg_swabbed(req->rq_reqmsg);
559 body = lustre_swab_reqbuf(req, 0, sizeof(*body), lustre_swab_ost_body);
561 CERROR("Missing/short ost_body\n");
562 GOTO(out, rc = -EFAULT);
565 LASSERT_REQSWAB(req, 1);
566 objcount = req->rq_reqmsg->buflens[1] / sizeof(*ioo);
568 CERROR("Missing/short ioobj\n");
569 GOTO(out, rc = -EFAULT);
571 ioo = lustre_msg_buf (req->rq_reqmsg, 1, objcount * sizeof(*ioo));
572 LASSERT (ioo != NULL);
573 for (niocount = i = 0; i < objcount; i++) {
575 lustre_swab_obd_ioobj (&ioo[i]);
576 if (ioo[i].ioo_bufcnt == 0) {
577 CERROR("ioo[%d] has zero bufcnt\n", i);
578 GOTO(out, rc = -EFAULT);
580 niocount += ioo[i].ioo_bufcnt;
583 remote_nb = lustre_swab_reqbuf(req, 2, niocount * sizeof(*remote_nb),
584 lustre_swab_niobuf_remote);
585 if (remote_nb == NULL) {
586 CERROR("Missing/short niobuf\n");
587 GOTO(out, rc = -EFAULT);
589 if (swab) { /* swab the remaining niobufs */
590 for (i = 1; i < niocount; i++)
591 lustre_swab_niobuf_remote (&remote_nb[i]);
594 size[1] = niocount * sizeof(*rcs);
595 rc = lustre_pack_reply(req, 2, size, NULL);
598 rcs = lustre_msg_buf(req->rq_repmsg, 1, niocount * sizeof(*rcs));
600 /* FIXME all niobuf splitting should be done in obdfilter if needed */
601 /* CAVEAT EMPTOR this sets ioo->ioo_bufcnt to # pages */
602 npages = get_per_page_niobufs(ioo, objcount,remote_nb,niocount,&pp_rnb);
604 GOTO(out, rc = npages);
606 OBD_ALLOC(local_nb, sizeof(*local_nb) * npages);
607 if (local_nb == NULL)
608 GOTO(out_pp_rnb, rc = -ENOMEM);
610 desc = ptlrpc_prep_bulk_exp(req, BULK_GET_SINK, OST_BULK_PORTAL);
612 GOTO(out_local, rc = -ENOMEM);
614 rc = obd_preprw(OBD_BRW_WRITE, req->rq_export, &body->oa, objcount,
615 ioo, npages, pp_rnb, local_nb, oti);
619 /* NB Having prepped, we must commit... */
621 for (i = 0; i < npages; i++) {
622 rc = ptlrpc_prep_bulk_page(desc, local_nb[i].page,
623 pp_rnb[i].offset & (PAGE_SIZE - 1),
630 rc = ptlrpc_bulk_get(desc);
632 lwi = LWI_TIMEOUT(obd_timeout * HZ / 4,
633 ost_bulk_timeout, desc);
634 rc = l_wait_event(desc->bd_waitq,
635 ptlrpc_bulk_complete(desc), &lwi);
637 LASSERT(rc == -ETIMEDOUT);
638 DEBUG_REQ(D_ERROR, req, "timeout on bulk GET");
639 ptlrpc_abort_bulk(desc);
642 DEBUG_REQ(D_ERROR, req, "bulk GET failed: rc %d\n", rc);
644 comms_error = rc != 0;
647 repbody = lustre_msg_buf(req->rq_repmsg, 0, sizeof(*repbody));
648 memcpy(&repbody->oa, &body->oa, sizeof(repbody->oa));
651 if (rc == 0 && (body->oa.o_valid & OBD_MD_FLCKSUM) != 0) {
652 static int cksum_counter;
653 obd_count client_cksum = body->oa.o_cksum;
654 obd_count cksum = ost_checksum_bulk(desc);
656 portals_nid2str(req->rq_connection->c_peer.peer_ni->pni_number,
657 req->rq_connection->c_peer.peer_nid, str);
658 if (client_cksum != cksum) {
659 CERROR("Bad checksum: client %x, server %x, client NID "
660 LPX64" (%s)\n", client_cksum, cksum,
661 req->rq_connection->c_peer.peer_nid, str);
663 repbody->oa.o_cksum = cksum;
666 if ((cksum_counter & (-cksum_counter)) == cksum_counter)
667 CWARN("Checksum %u from "LPX64": %x OK\n",
669 req->rq_connection->c_peer.peer_nid,
674 /* Must commit after prep above in all cases */
675 rc2 = obd_commitrw(OBD_BRW_WRITE, req->rq_export, &repbody->oa,
676 objcount, ioo, npages, local_nb, oti);
679 /* set per-requested niobuf return codes */
680 for (i = j = 0; i < niocount; i++) {
681 int nob = remote_nb[i].len;
686 if (local_nb[j].rc < 0)
687 rcs[i] = local_nb[j].rc;
688 nob -= pp_rnb[j].len;
693 LASSERT(j == npages);
699 ptlrpc_free_bulk(desc);
701 OBD_FREE(local_nb, sizeof(*local_nb) * npages);
703 free_per_page_niobufs(npages, pp_rnb, remote_nb);
706 oti_to_request(oti, req);
707 rc = ptlrpc_reply(req);
708 } else if (!comms_error) {
709 /* Only reply if there was no comms problem with bulk */
713 if (req->rq_repmsg != NULL) {
714 /* reply out callback would free */
715 OBD_FREE (req->rq_repmsg, req->rq_replen);
717 if (req->rq_reqmsg->conn_cnt == req->rq_export->exp_conn_cnt) {
718 CERROR("bulk IO comms error: "
719 "evicting %s@%s nid "LPX64" (%s)\n",
720 req->rq_export->exp_client_uuid.uuid,
721 req->rq_export->exp_connection->c_remote_uuid.uuid,
722 req->rq_peer.peer_nid,
723 portals_nid2str(req->rq_peer.peer_ni->pni_number,
724 req->rq_peer.peer_nid,
726 ptlrpc_fail_export(req->rq_export);
728 CERROR("ignoring bulk IO comms error: "
729 "client reconnected %s@%s nid "LPX64" (%s)\n",
730 req->rq_export->exp_client_uuid.uuid,
731 req->rq_export->exp_connection->c_remote_uuid.uuid,
732 req->rq_peer.peer_nid,
733 portals_nid2str(req->rq_peer.peer_ni->pni_number,
734 req->rq_peer.peer_nid,
741 static int ost_san_brw(struct ptlrpc_request *req, int cmd)
743 struct niobuf_remote *remote_nb, *res_nb, *pp_rnb;
744 struct obd_ioobj *ioo;
745 struct ost_body *body, *repbody;
746 int rc, i, objcount, niocount, size[2] = {sizeof(*body)}, npages;
750 /* XXX not set to use latest protocol */
752 swab = lustre_msg_swabbed(req->rq_reqmsg);
753 body = lustre_swab_reqbuf(req, 0, sizeof(*body), lustre_swab_ost_body);
755 CERROR("Missing/short ost_body\n");
756 GOTO(out, rc = -EFAULT);
759 ioo = lustre_swab_reqbuf(req, 1, sizeof(*ioo), lustre_swab_obd_ioobj);
761 CERROR("Missing/short ioobj\n");
762 GOTO(out, rc = -EFAULT);
764 objcount = req->rq_reqmsg->buflens[1] / sizeof(*ioo);
765 niocount = ioo[0].ioo_bufcnt;
766 for (i = 1; i < objcount; i++) {
768 lustre_swab_obd_ioobj (&ioo[i]);
769 niocount += ioo[i].ioo_bufcnt;
772 remote_nb = lustre_swab_reqbuf(req, 2, niocount * sizeof(*remote_nb),
773 lustre_swab_niobuf_remote);
774 if (remote_nb == NULL) {
775 CERROR("Missing/short niobuf\n");
776 GOTO(out, rc = -EFAULT);
778 if (swab) { /* swab the remaining niobufs */
779 for (i = 1; i < niocount; i++)
780 lustre_swab_niobuf_remote (&remote_nb[i]);
783 /* CAVEAT EMPTOR this sets ioo->ioo_bufcnt to # pages */
784 npages = get_per_page_niobufs(ioo, objcount,remote_nb,niocount,&pp_rnb);
786 GOTO (out, rc = npages);
788 size[1] = npages * sizeof(*pp_rnb);
789 rc = lustre_pack_reply(req, 2, size, NULL);
791 GOTO(out_pp_rnb, rc);
793 req->rq_status = obd_san_preprw(cmd, req->rq_export, &body->oa,
794 objcount, ioo, npages, pp_rnb);
797 GOTO(out_pp_rnb, rc = 0);
799 repbody = lustre_msg_buf(req->rq_repmsg, 0, sizeof(*repbody));
800 memcpy(&repbody->oa, &body->oa, sizeof(body->oa));
802 res_nb = lustre_msg_buf(req->rq_repmsg, 1, size[1]);
803 memcpy(res_nb, remote_nb, size[1]);
806 free_per_page_niobufs(npages, pp_rnb, remote_nb);
809 OBD_FREE(req->rq_repmsg, req->rq_replen);
810 req->rq_repmsg = NULL;
820 static int ost_set_info(struct obd_export *exp, struct ptlrpc_request *req)
826 key = lustre_msg_buf(req->rq_reqmsg, 0, 1);
828 DEBUG_REQ(D_HA, req, "no set_info key");
831 keylen = req->rq_reqmsg->buflens[0];
833 rc = lustre_pack_reply(req, 0, NULL, NULL);
837 rc = obd_set_info(exp, keylen, key, 0, NULL);
838 req->rq_repmsg->status = 0;
842 static int ost_get_info(struct obd_export *exp, struct ptlrpc_request *req)
845 int keylen, rc = 0, size = sizeof(obd_id);
849 key = lustre_msg_buf(req->rq_reqmsg, 0, 1);
851 DEBUG_REQ(D_HA, req, "no get_info key");
854 keylen = req->rq_reqmsg->buflens[0];
856 if (keylen < strlen("last_id") || memcmp(key, "last_id", 7) != 0)
859 rc = lustre_pack_reply(req, 1, &size, NULL);
863 reply = lustre_msg_buf(req->rq_repmsg, 0, sizeof(*reply));
864 rc = obd_get_info(exp, keylen, key, &size, reply);
865 req->rq_repmsg->status = 0;
869 static int ost_filter_recovery_request(struct ptlrpc_request *req,
870 struct obd_device *obd, int *process)
872 switch (req->rq_reqmsg->opc) {
873 case OST_CONNECT: /* This will never get here, but for completeness. */
887 *process = target_queue_recovery_request(req, obd);
891 DEBUG_REQ(D_ERROR, req, "not permitted during recovery");
893 /* XXX what should we set rq_status to here? */
894 req->rq_status = -EAGAIN;
895 RETURN(ptlrpc_error(req));
901 static int ost_handle(struct ptlrpc_request *req)
903 struct obd_trans_info trans_info = { 0, };
904 struct obd_trans_info *oti = &trans_info;
905 int should_process, fail = OBD_FAIL_OST_ALL_REPLY_NET, rc = 0;
906 struct obd_export *exp = NULL;
909 LASSERT(current->journal_info == NULL);
910 /* XXX identical to MDS */
911 if (req->rq_reqmsg->opc != OST_CONNECT) {
912 struct obd_device *obd;
913 int abort_recovery, recovering;
915 exp = req->rq_export;
918 CDEBUG(D_HA, "operation %d on unconnected OST\n",
919 req->rq_reqmsg->opc);
920 req->rq_status = -ENOTCONN;
921 GOTO(out, rc = -ENOTCONN);
926 /* Check for aborted recovery. */
927 spin_lock_bh(&obd->obd_processing_task_lock);
928 abort_recovery = obd->obd_abort_recovery;
929 recovering = obd->obd_recovering;
930 spin_unlock_bh(&obd->obd_processing_task_lock);
931 if (abort_recovery) {
932 target_abort_recovery(obd);
933 } else if (recovering) {
934 rc = ost_filter_recovery_request(req, obd,
936 if (rc || !should_process)
943 switch (req->rq_reqmsg->opc) {
945 CDEBUG(D_INODE, "connect\n");
946 OBD_FAIL_RETURN(OBD_FAIL_OST_CONNECT_NET, 0);
947 rc = target_handle_connect(req, ost_handle);
951 CDEBUG(D_INODE, "disconnect\n");
952 OBD_FAIL_RETURN(OBD_FAIL_OST_DISCONNECT_NET, 0);
953 rc = target_handle_disconnect(req);
956 CDEBUG(D_INODE, "create\n");
957 OBD_FAIL_RETURN(OBD_FAIL_OST_CREATE_NET, 0);
958 rc = ost_create(exp, req, oti);
961 CDEBUG(D_INODE, "destroy\n");
962 OBD_FAIL_RETURN(OBD_FAIL_OST_DESTROY_NET, 0);
963 rc = ost_destroy(exp, req, oti);
966 CDEBUG(D_INODE, "getattr\n");
967 OBD_FAIL_RETURN(OBD_FAIL_OST_GETATTR_NET, 0);
968 rc = ost_getattr(exp, req);
971 CDEBUG(D_INODE, "setattr\n");
972 OBD_FAIL_RETURN(OBD_FAIL_OST_SETATTR_NET, 0);
973 rc = ost_setattr(exp, req, oti);
976 CDEBUG(D_INODE, "write\n");
977 OBD_FAIL_RETURN(OBD_FAIL_OST_BRW_NET, 0);
978 rc = ost_brw_write(req, oti);
979 LASSERT(current->journal_info == NULL);
980 /* ost_brw sends its own replies */
983 CDEBUG(D_INODE, "read\n");
984 OBD_FAIL_RETURN(OBD_FAIL_OST_BRW_NET, 0);
985 rc = ost_brw_read(req);
986 LASSERT(current->journal_info == NULL);
987 /* ost_brw sends its own replies */
990 CDEBUG(D_INODE, "san read\n");
991 OBD_FAIL_RETURN(OBD_FAIL_OST_BRW_NET, 0);
992 rc = ost_san_brw(req, OBD_BRW_READ);
993 /* ost_san_brw sends its own replies */
996 CDEBUG(D_INODE, "san write\n");
997 OBD_FAIL_RETURN(OBD_FAIL_OST_BRW_NET, 0);
998 rc = ost_san_brw(req, OBD_BRW_WRITE);
999 /* ost_san_brw sends its own replies */
1002 CDEBUG(D_INODE, "punch\n");
1003 OBD_FAIL_RETURN(OBD_FAIL_OST_PUNCH_NET, 0);
1004 rc = ost_punch(exp, req, oti);
1007 CDEBUG(D_INODE, "statfs\n");
1008 OBD_FAIL_RETURN(OBD_FAIL_OST_STATFS_NET, 0);
1009 rc = ost_statfs(req);
1012 CDEBUG(D_INODE, "sync\n");
1013 OBD_FAIL_RETURN(OBD_FAIL_OST_SYNC_NET, 0);
1014 rc = ost_sync(exp, req);
1017 DEBUG_REQ(D_INODE, req, "set_info");
1018 rc = ost_set_info(exp, req);
1021 DEBUG_REQ(D_INODE, req, "get_info");
1022 rc = ost_get_info(exp, req);
1025 DEBUG_REQ(D_INODE, req, "ping");
1026 rc = target_handle_ping(req);
1028 /* FIXME - just reply status */
1029 case LLOG_ORIGIN_CONNECT:
1030 DEBUG_REQ(D_INODE, req, "log connect\n");
1031 rc = llog_handle_connect(req);
1032 req->rq_status = rc;
1033 rc = lustre_pack_reply(req, 0, NULL, NULL);
1036 RETURN(ptlrpc_reply(req));
1037 case OBD_LOG_CANCEL:
1038 CDEBUG(D_INODE, "log cancel\n");
1039 OBD_FAIL_RETURN(OBD_FAIL_OBD_LOG_CANCEL_NET, 0);
1040 rc = llog_origin_handle_cancel(req);
1041 req->rq_status = rc;
1042 rc = lustre_pack_reply(req, 0, NULL, NULL);
1045 RETURN(ptlrpc_reply(req));
1047 CDEBUG(D_INODE, "enqueue\n");
1048 OBD_FAIL_RETURN(OBD_FAIL_LDLM_ENQUEUE, 0);
1049 rc = ldlm_handle_enqueue(req, ldlm_server_completion_ast,
1050 ldlm_server_blocking_ast);
1051 fail = OBD_FAIL_OST_LDLM_REPLY_NET;
1054 CDEBUG(D_INODE, "convert\n");
1055 OBD_FAIL_RETURN(OBD_FAIL_LDLM_CONVERT, 0);
1056 rc = ldlm_handle_convert(req);
1059 CDEBUG(D_INODE, "cancel\n");
1060 OBD_FAIL_RETURN(OBD_FAIL_LDLM_CANCEL, 0);
1061 rc = ldlm_handle_cancel(req);
1063 case LDLM_BL_CALLBACK:
1064 case LDLM_CP_CALLBACK:
1065 CDEBUG(D_INODE, "callback\n");
1066 CERROR("callbacks should not happen on OST\n");
1069 CERROR("Unexpected opcode %d\n", req->rq_reqmsg->opc);
1070 req->rq_status = -ENOTSUPP;
1071 rc = ptlrpc_error(req);
1075 LASSERT(current->journal_info == NULL);
1078 /* If we're DISCONNECTing, the export_data is already freed */
1079 if (!rc && req->rq_reqmsg->opc != OST_DISCONNECT) {
1080 struct obd_device *obd = req->rq_export->exp_obd;
1081 if (!obd->obd_no_transno) {
1082 req->rq_repmsg->last_committed =
1083 obd->obd_last_committed;
1085 DEBUG_REQ(D_IOCTL, req,
1086 "not sending last_committed update");
1088 CDEBUG(D_INFO, "last_committed "LPU64", xid "LPX64"\n",
1089 obd->obd_last_committed, req->rq_xid);
1093 if (lustre_msg_get_flags(req->rq_reqmsg) & MSG_LAST_REPLAY) {
1094 struct obd_device *obd = req->rq_export->exp_obd;
1096 if (obd && obd->obd_recovering) {
1097 DEBUG_REQ(D_HA, req, "LAST_REPLAY, queuing reply");
1098 return target_queue_final_reply(req, rc);
1100 /* Lost a race with recovery; let the error path DTRT. */
1101 rc = req->rq_status = -ENOTCONN;
1105 oti_to_request(oti, req);
1107 target_send_reply(req, rc, fail);
1111 static int ost_setup(struct obd_device *obddev, obd_count len, void *buf)
1113 struct ost_obd *ost = &obddev->u.ost;
1117 /* Get rid of unneeded supplementary groups */
1118 current->ngroups = 0;
1119 memset(current->groups, 0, sizeof(current->groups));
1121 rc = llog_start_commit_thread();
1125 ost->ost_service = ptlrpc_init_svc(OST_NEVENTS, OST_NBUFS,
1126 OST_BUFSIZE, OST_MAXREQSIZE,
1127 OST_REQUEST_PORTAL, OSC_REPLY_PORTAL,
1129 obddev->obd_proc_entry);
1130 if (ost->ost_service == NULL) {
1131 CERROR("failed to start service\n");
1135 rc = ptlrpc_start_n_threads(obddev, ost->ost_service, OST_NUM_THREADS,
1138 GOTO(out, rc = -EINVAL);
1140 ost->ost_create_service =
1141 ptlrpc_init_svc(OST_NEVENTS, OST_NBUFS, OST_BUFSIZE,
1142 OST_MAXREQSIZE, OST_CREATE_PORTAL,
1143 OSC_REPLY_PORTAL, ost_handle, "ost_create",
1144 obddev->obd_proc_entry);
1145 if (ost->ost_create_service == NULL) {
1146 CERROR("failed to start OST create service\n");
1147 GOTO(out, rc = -ENOMEM);
1150 rc = ptlrpc_start_n_threads(obddev, ost->ost_create_service, 1,
1153 GOTO(out_create, rc = -EINVAL);
1158 ptlrpc_unregister_service(ost->ost_create_service);
1160 ptlrpc_unregister_service(ost->ost_service);
1164 static int ost_cleanup(struct obd_device *obddev, int flags)
1166 struct ost_obd *ost = &obddev->u.ost;
1170 spin_lock_bh(&obddev->obd_processing_task_lock);
1171 if (obddev->obd_recovering) {
1172 target_cancel_recovery_timer(obddev);
1173 obddev->obd_recovering = 0;
1175 spin_unlock_bh(&obddev->obd_processing_task_lock);
1177 ptlrpc_stop_all_threads(ost->ost_service);
1178 ptlrpc_unregister_service(ost->ost_service);
1180 ptlrpc_stop_all_threads(ost->ost_create_service);
1181 ptlrpc_unregister_service(ost->ost_create_service);
1186 int ost_attach(struct obd_device *dev, obd_count len, void *data)
1188 struct lprocfs_static_vars lvars;
1190 lprocfs_init_vars(ost,&lvars);
1191 return lprocfs_obd_attach(dev, lvars.obd_vars);
1194 int ost_detach(struct obd_device *dev)
1196 return lprocfs_obd_detach(dev);
1199 /* use obd ops to offer management infrastructure */
1200 static struct obd_ops ost_obd_ops = {
1201 o_owner: THIS_MODULE,
1202 o_attach: ost_attach,
1203 o_detach: ost_detach,
1205 o_cleanup: ost_cleanup,
1208 static int __init ost_init(void)
1210 struct lprocfs_static_vars lvars;
1213 lprocfs_init_vars(ost,&lvars);
1214 RETURN(class_register_type(&ost_obd_ops, lvars.module_vars,
1218 static void /*__exit*/ ost_exit(void)
1220 class_unregister_type(LUSTRE_OST_NAME);
1223 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
1224 MODULE_DESCRIPTION("Lustre Object Storage Target (OST) v0.01");
1225 MODULE_LICENSE("GPL");
1227 module_init(ost_init);
1228 module_exit(ost_exit);