1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
4 * Copyright (C) 2001-2003 Cluster File Systems, Inc.
5 * Author: Peter J. Braam <braam@clusterfs.com>
6 * Author: Phil Schwan <phil@clusterfs.com>
8 * This file is part of Lustre, http://www.lustre.org.
10 * Lustre is free software; you can redistribute it and/or
11 * modify it under the terms of version 2 of the GNU General Public
12 * License as published by the Free Software Foundation.
14 * Lustre is distributed in the hope that it will be useful,
15 * but WITHOUT ANY WARRANTY; without even the implied warranty of
16 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 * GNU General Public License for more details.
19 * You should have received a copy of the GNU General Public License
20 * along with Lustre; if not, write to the Free Software
21 * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
23 * Storage Target Handling functions
24 * Lustre Object Server Module (OST)
26 * This server is single threaded at present (but can easily be multi
27 * threaded). For testing and management it is treated as an
28 * obd_device, although it does not export a full OBD method table
29 * (the requests are coming in over the wire, so object target
30 * modules do not have a full method table.)
34 # define EXPORT_SYMTAB
36 #define DEBUG_SUBSYSTEM S_OST
38 #include <linux/module.h>
39 #include <linux/obd_ost.h>
40 #include <linux/lustre_net.h>
41 #include <linux/lustre_dlm.h>
42 #include <linux/lustre_export.h>
43 #include <linux/init.h>
44 #include <linux/lprocfs_status.h>
45 #include <linux/lustre_commit_confd.h>
46 #include <libcfs/list.h>
47 #include <linux/lustre_sec.h>
49 void oti_init(struct obd_trans_info *oti, struct ptlrpc_request *req)
53 memset(oti, 0, sizeof *oti);
55 if (req->rq_repmsg && req->rq_reqmsg != 0)
56 oti->oti_transno = req->rq_repmsg->transno;
59 void oti_to_request(struct obd_trans_info *oti, struct ptlrpc_request *req)
61 struct oti_req_ack_lock *ack_lock;
68 req->rq_repmsg->transno = oti->oti_transno;
70 /* XXX 4 == entries in oti_ack_locks??? */
71 for (ack_lock = oti->oti_ack_locks, i = 0; i < 4; i++, ack_lock++) {
74 /* XXX not even calling target_send_reply in some cases... */
75 ptlrpc_save_lock (req, &ack_lock->lock, ack_lock->mode);
79 static int ost_destroy(struct obd_export *exp, struct ptlrpc_request *req,
80 struct obd_trans_info *oti)
82 struct ost_body *body, *repbody;
83 int rc, size = sizeof(*body);
86 body = lustre_swab_reqbuf(req, 0, sizeof(*body), lustre_swab_ost_body);
90 rc = lustre_pack_reply(req, 1, &size, NULL);
94 if (body->oa.o_valid & OBD_MD_FLCOOKIE)
95 oti->oti_logcookies = obdo_logcookie(&body->oa);
96 repbody = lustre_msg_buf(req->rq_repmsg, 0, sizeof(*repbody));
97 memcpy(&repbody->oa, &body->oa, sizeof(body->oa));
98 req->rq_status = obd_destroy(exp, &body->oa, NULL, oti);
102 static int ost_getattr(struct obd_export *exp, struct ptlrpc_request *req)
104 struct ost_body *body, *repbody;
105 int rc, size = sizeof(*body);
108 body = lustre_swab_reqbuf(req, 0, sizeof(*body), lustre_swab_ost_body);
112 rc = lustre_pack_reply(req, 1, &size, NULL);
116 repbody = lustre_msg_buf (req->rq_repmsg, 0, sizeof(*repbody));
117 memcpy(&repbody->oa, &body->oa, sizeof(body->oa));
118 req->rq_status = obd_getattr(exp, &repbody->oa, NULL);
122 static int ost_statfs(struct ptlrpc_request *req)
124 struct obd_statfs *osfs;
125 int rc, size = sizeof(*osfs);
128 rc = lustre_pack_reply(req, 1, &size, NULL);
132 osfs = lustre_msg_buf(req->rq_repmsg, 0, sizeof(*osfs));
134 req->rq_status = obd_statfs(req->rq_export->exp_obd, osfs, jiffies-HZ);
135 if (req->rq_status != 0)
136 CERROR("ost: statfs failed: rc %d\n", req->rq_status);
141 static int ost_create(struct obd_export *exp, struct ptlrpc_request *req,
142 struct obd_trans_info *oti)
144 struct ost_body *body, *repbody;
145 int rc, size = sizeof(*repbody);
148 body = lustre_swab_reqbuf(req, 0, sizeof(*body), lustre_swab_ost_body);
152 rc = lustre_pack_reply(req, 1, &size, NULL);
156 repbody = lustre_msg_buf (req->rq_repmsg, 0, sizeof(*repbody));
157 memcpy(&repbody->oa, &body->oa, sizeof(body->oa));
158 oti->oti_logcookies = obdo_logcookie(&repbody->oa);
159 req->rq_status = obd_create(exp, &repbody->oa, NULL, 0, NULL, oti);
160 //obd_log_cancel(conn, NULL, 1, oti->oti_logcookies, 0);
164 static int ost_punch(struct obd_export *exp, struct ptlrpc_request *req,
165 struct obd_trans_info *oti)
167 struct ost_body *body, *repbody;
168 int rc, size = sizeof(*repbody);
171 body = lustre_swab_reqbuf(req, 0, sizeof(*body), lustre_swab_ost_body);
175 if ((body->oa.o_valid & (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS)) !=
176 (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS))
179 rc = lustre_pack_reply(req, 1, &size, NULL);
183 repbody = lustre_msg_buf(req->rq_repmsg, 0, sizeof(*repbody));
184 memcpy(&repbody->oa, &body->oa, sizeof(body->oa));
185 req->rq_status = obd_punch(exp, &repbody->oa, NULL, repbody->oa.o_size,
186 repbody->oa.o_blocks, oti);
190 static int ost_sync(struct obd_export *exp, struct ptlrpc_request *req)
192 struct ost_body *body, *repbody;
193 int rc, size = sizeof(*repbody);
196 body = lustre_swab_reqbuf(req, 0, sizeof(*body), lustre_swab_ost_body);
200 rc = lustre_pack_reply(req, 1, &size, NULL);
204 repbody = lustre_msg_buf(req->rq_repmsg, 0, sizeof(*repbody));
205 memcpy(&repbody->oa, &body->oa, sizeof(body->oa));
206 req->rq_status = obd_sync(exp, &repbody->oa, NULL, repbody->oa.o_size,
207 repbody->oa.o_blocks);
211 static int ost_setattr(struct obd_export *exp, struct ptlrpc_request *req,
212 struct obd_trans_info *oti)
214 struct ost_body *body, *repbody;
215 int rc, size = sizeof(*repbody);
218 body = lustre_swab_reqbuf(req, 0, sizeof(*body), lustre_swab_ost_body);
222 rc = lustre_pack_reply(req, 1, &size, NULL);
226 repbody = lustre_msg_buf(req->rq_repmsg, 0, sizeof(*repbody));
227 memcpy(&repbody->oa, &body->oa, sizeof(body->oa));
229 req->rq_status = obd_setattr(exp, &repbody->oa, NULL, oti);
233 static int ost_bulk_timeout(void *data)
236 /* We don't fail the connection here, because having the export
237 * killed makes the (vital) call to commitrw very sad.
242 static int get_per_page_niobufs(struct obd_ioobj *ioo, int nioo,
243 struct niobuf_remote *rnb, int nrnb,
244 struct niobuf_remote **pp_rnbp)
246 /* Copy a remote niobuf, splitting it into page-sized chunks
247 * and setting ioo[i].ioo_bufcnt accordingly */
248 struct niobuf_remote *pp_rnb;
255 /* first count and check the number of pages required */
256 for (i = 0; i < nioo; i++)
257 for (j = 0; j < ioo->ioo_bufcnt; j++, rnbidx++) {
258 obd_off offset = rnb[rnbidx].offset;
259 obd_off p0 = offset >> PAGE_SHIFT;
260 obd_off pn = (offset + rnb[rnbidx].len - 1)>>PAGE_SHIFT;
262 LASSERT(rnbidx < nrnb);
264 npages += (pn + 1 - p0);
266 if (rnb[rnbidx].len == 0) {
267 CERROR("zero len BRW: obj %d objid "LPX64
268 " buf %u\n", i, ioo[i].ioo_id, j);
272 rnb[rnbidx].offset <= rnb[rnbidx-1].offset) {
273 CERROR("unordered BRW: obj %d objid "LPX64
274 " buf %u offset "LPX64" <= "LPX64"\n",
275 i, ioo[i].ioo_id, j, rnb[rnbidx].offset,
281 LASSERT(rnbidx == nrnb);
283 if (npages == nrnb) { /* all niobufs are for single pages */
288 OBD_ALLOC(pp_rnb, sizeof(*pp_rnb) * npages);
292 /* now do the actual split */
294 for (i = 0; i < nioo; i++) {
297 for (j = 0; j < ioo[i].ioo_bufcnt; j++, rnbidx++) {
298 obd_off off = rnb[rnbidx].offset;
299 int nob = rnb[rnbidx].len;
301 LASSERT(rnbidx < nrnb);
303 obd_off poff = off & (PAGE_SIZE - 1);
304 int pnob = (poff + nob > PAGE_SIZE) ?
305 PAGE_SIZE - poff : nob;
307 LASSERT(page < npages);
308 pp_rnb[page].len = pnob;
309 pp_rnb[page].offset = off;
310 pp_rnb[page].flags = rnb[rnbidx].flags;
312 CDEBUG(0, " obj %d id "LPX64
313 "page %d(%d) "LPX64" for %d, flg %x\n",
314 i, ioo[i].ioo_id, obj_pages, page,
315 pp_rnb[page].offset, pp_rnb[page].len,
325 ioo[i].ioo_bufcnt = obj_pages;
327 LASSERT(page == npages);
333 static void free_per_page_niobufs (int npages, struct niobuf_remote *pp_rnb,
334 struct niobuf_remote *rnb)
336 if (pp_rnb == rnb) /* didn't allocate above */
339 OBD_FREE(pp_rnb, sizeof(*pp_rnb) * npages);
343 obd_count ost_checksum_bulk(struct ptlrpc_bulk_desc *desc)
346 struct ptlrpc_bulk_page *bp;
348 list_for_each_entry(bp, &desc->bd_page_list, bp_link) {
349 ost_checksum(&cksum, kmap(bp->bp_page) + bp->bp_pageoffset,
358 static void ost_stime_record(struct ptlrpc_request *req, struct timeval *start,
359 unsigned rw, unsigned phase)
361 struct obd_device *obd = req->rq_svc->srv_obddev;
363 int ind = rw *3 + phase;
365 if (obd && obd->obd_type && obd->obd_type->typ_name) {
366 if (!strcmp(obd->obd_type->typ_name, OBD_OST_DEVICENAME)) {
367 struct ost_obd *ost = NULL;
370 if (ind >= (sizeof(ost->ost_stimes) /
371 sizeof(ost->ost_stimes[0])))
373 do_gettimeofday(&stop);
375 spin_lock(&ost->ost_lock);
376 lprocfs_stime_record(&ost->ost_stimes[ind],&stop,start);
377 spin_unlock(&ost->ost_lock);
378 memcpy(start, &stop, sizeof(*start));
383 static int ost_brw_read(struct ptlrpc_request *req)
385 struct ptlrpc_bulk_desc *desc;
386 struct niobuf_remote *remote_nb;
387 struct niobuf_remote *pp_rnb;
388 struct niobuf_local *local_nb;
389 struct obd_ioobj *ioo;
390 struct ost_body *body, *repbody;
391 struct l_wait_info lwi;
392 struct obd_trans_info oti = { 0 };
393 int size[1] = { sizeof(*body) };
400 struct timeval start;
403 if (OBD_FAIL_CHECK(OBD_FAIL_OST_BRW_READ_BULK))
404 GOTO(out, rc = -EIO);
406 OBD_FAIL_TIMEOUT(OBD_FAIL_OST_BRW_PAUSE_BULK | OBD_FAIL_ONCE,
407 (obd_timeout + 1) / 4);
409 body = lustre_swab_reqbuf(req, 0, sizeof(*body), lustre_swab_ost_body);
411 CERROR("Missing/short ost_body\n");
412 GOTO(out, rc = -EFAULT);
415 ioo = lustre_swab_reqbuf(req, 1, sizeof(*ioo), lustre_swab_obd_ioobj);
417 CERROR("Missing/short ioobj\n");
418 GOTO(out, rc = -EFAULT);
421 niocount = ioo->ioo_bufcnt;
422 remote_nb = lustre_swab_reqbuf(req, 2, niocount * sizeof(*remote_nb),
423 lustre_swab_niobuf_remote);
424 if (remote_nb == NULL) {
425 CERROR("Missing/short niobuf\n");
426 GOTO(out, rc = -EFAULT);
428 if (lustre_msg_swabbed(req->rq_reqmsg)) { /* swab remaining niobufs */
429 for (i = 1; i < niocount; i++)
430 lustre_swab_niobuf_remote (&remote_nb[i]);
433 rc = lustre_pack_reply(req, 1, size, NULL);
437 /* FIXME all niobuf splitting should be done in obdfilter if needed */
438 /* CAVEAT EMPTOR this sets ioo->ioo_bufcnt to # pages */
439 npages = get_per_page_niobufs(ioo, 1, remote_nb, niocount, &pp_rnb);
441 GOTO(out, rc = npages);
443 OBD_ALLOC(local_nb, sizeof(*local_nb) * npages);
444 if (local_nb == NULL)
445 GOTO(out_pp_rnb, rc = -ENOMEM);
447 desc = ptlrpc_prep_bulk_exp (req, npages,
448 BULK_PUT_SOURCE, OST_BULK_PORTAL);
450 GOTO(out_local, rc = -ENOMEM);
452 do_gettimeofday(&start);
453 rc = obd_preprw(OBD_BRW_READ, req->rq_export, &body->oa, 1,
454 ioo, npages, pp_rnb, local_nb, &oti);
455 ost_stime_record(req, &start, 0, 0);
459 /* We're finishing using body->oa as an input variable */
460 body->oa.o_valid = 0;
463 for (i = 0; i < npages; i++) {
464 int page_rc = local_nb[i].rc;
466 if (page_rc < 0) { /* error */
471 LASSERT(page_rc <= pp_rnb[i].len);
473 if (page_rc != 0) { /* some data! */
474 LASSERT (local_nb[i].page != NULL);
475 ptlrpc_prep_bulk_page(desc, local_nb[i].page,
476 pp_rnb[i].offset & (PAGE_SIZE-1),
480 if (page_rc != pp_rnb[i].len) { /* short read */
481 /* All subsequent pages should be 0 */
483 LASSERT(local_nb[i].rc == 0);
489 rc = ptlrpc_start_bulk_transfer(desc);
491 lwi = LWI_TIMEOUT(obd_timeout * HZ / 4,
492 ost_bulk_timeout, desc);
493 rc = l_wait_event(desc->bd_waitq,
494 !ptlrpc_bulk_active(desc), &lwi);
495 LASSERT(rc == 0 || rc == -ETIMEDOUT);
496 if (rc == -ETIMEDOUT) {
497 DEBUG_REQ(D_ERROR, req, "timeout on bulk PUT");
498 ptlrpc_abort_bulk(desc);
499 } else if (!desc->bd_success ||
500 desc->bd_nob_transferred != desc->bd_nob) {
501 DEBUG_REQ(D_ERROR, req, "%s bulk PUT %d(%d)",
503 "truncated" : "network error on",
504 desc->bd_nob_transferred,
506 /* XXX should this be a different errno? */
510 DEBUG_REQ(D_ERROR, req, "bulk PUT failed: rc %d\n", rc);
512 comms_error = rc != 0;
515 ost_stime_record(req, &start, 0, 1);
516 /* Must commit after prep above in all cases */
517 rc = obd_commitrw(OBD_BRW_READ, req->rq_export, &body->oa, 1,
518 ioo, npages, local_nb, &oti, rc);
519 ost_stime_record(req, &start, 0, 2);
522 repbody = lustre_msg_buf(req->rq_repmsg, 0, sizeof(*repbody));
523 memcpy(&repbody->oa, &body->oa, sizeof(repbody->oa));
526 repbody->oa.o_cksum = ost_checksum_bulk(desc);
527 repbody->oa.o_valid |= OBD_MD_FLCKSUM;
532 ptlrpc_free_bulk(desc);
534 OBD_FREE(local_nb, sizeof(*local_nb) * npages);
536 free_per_page_niobufs(npages, pp_rnb, remote_nb);
540 req->rq_status = nob;
542 } else if (!comms_error) {
543 /* only reply if comms OK */
547 if (req->rq_reply_state != NULL) {
548 /* reply out callback would free */
549 lustre_free_reply_state (req->rq_reply_state);
551 if (req->rq_reqmsg->conn_cnt == req->rq_export->exp_conn_cnt) {
552 CERROR("bulk IO comms error: "
553 "evicting %s@%s id %s\n",
554 req->rq_export->exp_client_uuid.uuid,
555 req->rq_export->exp_connection->c_remote_uuid.uuid,
557 ptlrpc_fail_export(req->rq_export);
559 CERROR("ignoring bulk IO comms error: "
560 "client reconnected %s@%s id %s\n",
561 req->rq_export->exp_client_uuid.uuid,
562 req->rq_export->exp_connection->c_remote_uuid.uuid,
570 int ost_brw_write(struct ptlrpc_request *req, struct obd_trans_info *oti)
572 struct ptlrpc_bulk_desc *desc;
573 struct niobuf_remote *remote_nb;
574 struct niobuf_remote *pp_rnb;
575 struct niobuf_local *local_nb;
576 struct obd_ioobj *ioo;
577 struct ost_body *body, *repbody;
578 struct l_wait_info lwi;
580 int size[2] = { sizeof(*body) };
581 int objcount, niocount, npages;
584 struct timeval start;
587 if (OBD_FAIL_CHECK(OBD_FAIL_OST_BRW_WRITE_BULK))
588 GOTO(out, rc = -EIO);
590 /* pause before transaction has been started */
591 OBD_FAIL_TIMEOUT(OBD_FAIL_OST_BRW_PAUSE_BULK | OBD_FAIL_ONCE,
592 (obd_timeout + 1) / 4);
594 swab = lustre_msg_swabbed(req->rq_reqmsg);
595 body = lustre_swab_reqbuf(req, 0, sizeof(*body), lustre_swab_ost_body);
597 CERROR("Missing/short ost_body\n");
598 GOTO(out, rc = -EFAULT);
601 LASSERT_REQSWAB(req, 1);
602 objcount = req->rq_reqmsg->buflens[1] / sizeof(*ioo);
604 CERROR("Missing/short ioobj\n");
605 GOTO(out, rc = -EFAULT);
607 ioo = lustre_msg_buf (req->rq_reqmsg, 1, objcount * sizeof(*ioo));
608 LASSERT (ioo != NULL);
609 for (niocount = i = 0; i < objcount; i++) {
611 lustre_swab_obd_ioobj (&ioo[i]);
612 if (ioo[i].ioo_bufcnt == 0) {
613 CERROR("ioo[%d] has zero bufcnt\n", i);
614 GOTO(out, rc = -EFAULT);
616 niocount += ioo[i].ioo_bufcnt;
619 remote_nb = lustre_swab_reqbuf(req, 2, niocount * sizeof(*remote_nb),
620 lustre_swab_niobuf_remote);
621 if (remote_nb == NULL) {
622 CERROR("Missing/short niobuf\n");
623 GOTO(out, rc = -EFAULT);
625 if (swab) { /* swab the remaining niobufs */
626 for (i = 1; i < niocount; i++)
627 lustre_swab_niobuf_remote (&remote_nb[i]);
630 size[1] = niocount * sizeof(*rcs);
631 rc = lustre_pack_reply(req, 2, size, NULL);
634 rcs = lustre_msg_buf(req->rq_repmsg, 1, niocount * sizeof(*rcs));
637 /* Do snap options here*/
638 rc = obd_do_cow(req->rq_export, ioo, objcount, remote_nb);
643 /* FIXME all niobuf splitting should be done in obdfilter if needed */
644 /* CAVEAT EMPTOR this sets ioo->ioo_bufcnt to # pages */
645 npages = get_per_page_niobufs(ioo, objcount,remote_nb,niocount,&pp_rnb);
647 GOTO(out, rc = npages);
649 OBD_ALLOC(local_nb, sizeof(*local_nb) * npages);
650 if (local_nb == NULL)
651 GOTO(out_pp_rnb, rc = -ENOMEM);
653 desc = ptlrpc_prep_bulk_exp (req, npages,
654 BULK_GET_SINK, OST_BULK_PORTAL);
656 GOTO(out_local, rc = -ENOMEM);
658 do_gettimeofday(&start);
659 rc = obd_preprw(OBD_BRW_WRITE, req->rq_export, &body->oa, objcount,
660 ioo, npages, pp_rnb, local_nb, oti);
661 ost_stime_record(req, &start, 1, 0);
665 /* NB Having prepped, we must commit... */
667 for (i = 0; i < npages; i++)
668 ptlrpc_prep_bulk_page(desc, local_nb[i].page,
669 pp_rnb[i].offset & (PAGE_SIZE - 1),
672 rc = ptlrpc_start_bulk_transfer (desc);
674 lwi = LWI_TIMEOUT(obd_timeout * HZ / 4,
675 ost_bulk_timeout, desc);
676 rc = l_wait_event(desc->bd_waitq, !ptlrpc_bulk_active(desc),
678 LASSERT(rc == 0 || rc == -ETIMEDOUT);
679 if (rc == -ETIMEDOUT) {
680 DEBUG_REQ(D_ERROR, req, "timeout on bulk GET");
681 ptlrpc_abort_bulk(desc);
682 } else if (!desc->bd_success ||
683 desc->bd_nob_transferred != desc->bd_nob) {
684 DEBUG_REQ(D_ERROR, req, "%s bulk GET %d(%d)",
686 "truncated" : "network error on",
687 desc->bd_nob_transferred, desc->bd_nob);
688 /* XXX should this be a different errno? */
692 DEBUG_REQ(D_ERROR, req, "ptlrpc_bulk_get failed: rc %d\n", rc);
694 comms_error = rc != 0;
696 repbody = lustre_msg_buf(req->rq_repmsg, 0, sizeof(*repbody));
697 memcpy(&repbody->oa, &body->oa, sizeof(repbody->oa));
700 if (rc == 0 && (body->oa.o_valid & OBD_MD_FLCKSUM) != 0) {
701 static int cksum_counter;
702 obd_count client_cksum = body->oa.o_cksum;
703 obd_count cksum = ost_checksum_bulk(desc);
705 if (client_cksum != cksum) {
706 CERROR("Bad checksum: client %x, server %x id %s\n",
710 repbody->oa.o_cksum = cksum;
713 if ((cksum_counter & (-cksum_counter)) == cksum_counter)
714 CWARN("Checksum %u from NID %s: %x OK\n",
715 cksum_counter, req->rq_peerstr, cksum);
719 ost_stime_record(req, &start, 1, 1);
720 /* Must commit after prep above in all cases */
721 rc = obd_commitrw(OBD_BRW_WRITE, req->rq_export, &repbody->oa,
722 objcount, ioo, npages, local_nb, oti, rc);
724 ost_stime_record(req, &start, 1, 2);
726 repbody = lustre_msg_buf(req->rq_repmsg, 0, sizeof(*repbody));
727 memcpy(&repbody->oa, &body->oa, sizeof(repbody->oa));
730 repbody->oa.o_cksum = ost_checksum_bulk(desc);
731 repbody->oa.o_valid |= OBD_MD_FLCKSUM;
733 /* set per-requested niobuf return codes */
734 for (i = j = 0; i < niocount; i++) {
735 int nob = remote_nb[i].len;
740 if (local_nb[j].rc < 0)
741 rcs[i] = local_nb[j].rc;
742 nob -= pp_rnb[j].len;
747 LASSERT(j == npages);
749 /*XXX This write extents only for write-back cache extents*/
750 rc = obd_write_extents(req->rq_export, ioo, objcount, niocount,
753 ptlrpc_free_bulk(desc);
755 OBD_FREE(local_nb, sizeof(*local_nb) * npages);
757 free_per_page_niobufs(npages, pp_rnb, remote_nb);
760 oti_to_request(oti, req);
761 rc = ptlrpc_reply(req);
762 } else if (!comms_error) {
763 /* Only reply if there was no comms problem with bulk */
767 if (req->rq_reply_state != NULL) {
768 /* reply out callback would free */
769 lustre_free_reply_state (req->rq_reply_state);
771 if (req->rq_reqmsg->conn_cnt == req->rq_export->exp_conn_cnt) {
772 CERROR("%s: bulk IO comm error evicting %s@%s id %s\n",
773 req->rq_export->exp_obd->obd_name,
774 req->rq_export->exp_client_uuid.uuid,
775 req->rq_export->exp_connection->c_remote_uuid.uuid,
777 ptlrpc_fail_export(req->rq_export);
779 CERROR("ignoring bulk IO comms error: "
780 "client reconnected %s@%s id %s\n",
781 req->rq_export->exp_client_uuid.uuid,
782 req->rq_export->exp_connection->c_remote_uuid.uuid,
788 EXPORT_SYMBOL(ost_brw_write);
790 static int ost_san_brw(struct ptlrpc_request *req, int cmd)
792 struct niobuf_remote *remote_nb, *res_nb, *pp_rnb;
793 struct obd_ioobj *ioo;
794 struct ost_body *body, *repbody;
795 int rc, i, objcount, niocount, size[2] = {sizeof(*body)}, npages;
799 /* XXX not set to use latest protocol */
801 swab = lustre_msg_swabbed(req->rq_reqmsg);
802 body = lustre_swab_reqbuf(req, 0, sizeof(*body), lustre_swab_ost_body);
804 CERROR("Missing/short ost_body\n");
805 GOTO(out, rc = -EFAULT);
808 ioo = lustre_swab_reqbuf(req, 1, sizeof(*ioo), lustre_swab_obd_ioobj);
810 CERROR("Missing/short ioobj\n");
811 GOTO(out, rc = -EFAULT);
813 objcount = req->rq_reqmsg->buflens[1] / sizeof(*ioo);
814 niocount = ioo[0].ioo_bufcnt;
815 for (i = 1; i < objcount; i++) {
817 lustre_swab_obd_ioobj (&ioo[i]);
818 niocount += ioo[i].ioo_bufcnt;
821 remote_nb = lustre_swab_reqbuf(req, 2, niocount * sizeof(*remote_nb),
822 lustre_swab_niobuf_remote);
823 if (remote_nb == NULL) {
824 CERROR("Missing/short niobuf\n");
825 GOTO(out, rc = -EFAULT);
827 if (swab) { /* swab the remaining niobufs */
828 for (i = 1; i < niocount; i++)
829 lustre_swab_niobuf_remote (&remote_nb[i]);
832 /* CAVEAT EMPTOR this sets ioo->ioo_bufcnt to # pages */
833 npages = get_per_page_niobufs(ioo, objcount,remote_nb,niocount,&pp_rnb);
835 GOTO (out, rc = npages);
837 size[1] = npages * sizeof(*pp_rnb);
838 rc = lustre_pack_reply(req, 2, size, NULL);
840 GOTO(out_pp_rnb, rc);
842 req->rq_status = obd_san_preprw(cmd, req->rq_export, &body->oa,
843 objcount, ioo, npages, pp_rnb);
846 GOTO(out_pp_rnb, rc = 0);
848 repbody = lustre_msg_buf(req->rq_repmsg, 0, sizeof(*repbody));
849 memcpy(&repbody->oa, &body->oa, sizeof(body->oa));
851 res_nb = lustre_msg_buf(req->rq_repmsg, 1, size[1]);
852 memcpy(res_nb, remote_nb, size[1]);
855 free_per_page_niobufs(npages, pp_rnb, remote_nb);
866 static int ost_set_info(struct obd_export *exp, struct ptlrpc_request *req)
872 key = lustre_msg_buf(req->rq_reqmsg, 0, 1);
874 DEBUG_REQ(D_HA, req, "no set_info key");
877 keylen = req->rq_reqmsg->buflens[0];
879 rc = lustre_pack_reply(req, 0, NULL, NULL);
883 val = lustre_msg_buf(req->rq_reqmsg, 1, 0);
885 rc = obd_set_info(exp, keylen, key, req->rq_reqmsg->buflens[1], val);
886 req->rq_repmsg->status = 0;
890 static int ost_get_info(struct obd_export *exp, struct ptlrpc_request *req)
893 int keylen, rc = 0, size = sizeof(obd_id);
897 key = lustre_msg_buf(req->rq_reqmsg, 0, 1);
899 DEBUG_REQ(D_HA, req, "no get_info key");
902 keylen = req->rq_reqmsg->buflens[0];
904 if (keylen < strlen("last_id") || memcmp(key, "last_id", 7) != 0)
907 rc = lustre_pack_reply(req, 1, &size, NULL);
911 reply = lustre_msg_buf(req->rq_repmsg, 0, sizeof(*reply));
912 rc = obd_get_info(exp, keylen, key, (__u32 *)&size, reply);
913 req->rq_repmsg->status = 0;
917 static int ost_llog_handle_connect(struct obd_export *exp,
918 struct ptlrpc_request *req)
920 struct llogd_conn_body *body;
924 body = lustre_msg_buf(req->rq_reqmsg, 0, sizeof(*body));
925 rc = obd_llog_connect(exp, body);
929 static int ost_filter_recovery_request(struct ptlrpc_request *req,
930 struct obd_device *obd, int *process)
932 switch (req->rq_reqmsg->opc) {
933 case OST_CONNECT: /* This will never get here, but for completeness. */
947 *process = target_queue_recovery_request(req, obd);
951 DEBUG_REQ(D_ERROR, req, "not permitted during recovery");
953 /* XXX what should we set rq_status to here? */
954 req->rq_status = -EAGAIN;
955 RETURN(ptlrpc_error(req));
959 int ost_msg_check_version(struct lustre_msg *msg)
980 rc = lustre_msg_check_version(msg, LUSTRE_OBD_VERSION);
982 CERROR("bad opc %u version %08x, expecting %08x\n",
983 msg->opc, msg->version, LUSTRE_OBD_VERSION);
988 case LDLM_BL_CALLBACK:
989 case LDLM_CP_CALLBACK:
990 rc = lustre_msg_check_version(msg, LUSTRE_DLM_VERSION);
992 CERROR("bad opc %u version %08x, expecting %08x\n",
993 msg->opc, msg->version, LUSTRE_DLM_VERSION);
996 case LLOG_ORIGIN_CONNECT:
997 rc = lustre_msg_check_version(msg, LUSTRE_LOG_VERSION);
999 CERROR("bad opc %u version %08x, expecting %08x\n",
1000 msg->opc, msg->version, LUSTRE_LOG_VERSION);
1003 case SEC_INIT_CONTINUE:
1008 CERROR("OST unexpected opcode %d\n", msg->opc);
1015 int ost_handle(struct ptlrpc_request *req)
1017 int should_process, fail = OBD_FAIL_OST_ALL_REPLY_NET, rc = 0;
1018 struct obd_trans_info *oti = NULL;
1019 struct obd_device *obd = NULL;
1022 LASSERT(current->journal_info == NULL);
1024 rc = ost_msg_check_version(req->rq_reqmsg);
1026 CERROR("OST drop mal-formed request\n");
1030 /* Security opc should NOT trigger any recovery events */
1031 if (req->rq_reqmsg->opc == SEC_INIT ||
1032 req->rq_reqmsg->opc == SEC_INIT_CONTINUE ||
1033 req->rq_reqmsg->opc == SEC_FINI) {
1034 GOTO(out_check_req, rc = 0);
1037 /* XXX identical to MDS */
1038 if (req->rq_reqmsg->opc != OST_CONNECT) {
1041 if (req->rq_export == NULL) {
1042 CDEBUG(D_HA,"operation %d on unconnected OST from %s\n",
1043 req->rq_reqmsg->opc,
1045 req->rq_status = -ENOTCONN;
1046 GOTO(out_check_req, rc = -ENOTCONN);
1049 obd = req->rq_export->exp_obd;
1051 /* Check for aborted recovery. */
1052 spin_lock_bh(&obd->obd_processing_task_lock);
1053 recovering = obd->obd_recovering;
1054 spin_unlock_bh(&obd->obd_processing_task_lock);
1056 rc = ost_filter_recovery_request(req, obd,
1058 if (rc || !should_process)
1060 if (should_process < 0) {
1061 req->rq_status = should_process;
1062 rc = ptlrpc_error(req);
1068 OBD_ALLOC(oti, sizeof(*oti));
1074 switch (req->rq_reqmsg->opc) {
1076 CDEBUG(D_INODE, "connect\n");
1077 OBD_FAIL_GOTO(OBD_FAIL_OST_CONNECT_NET, out_free_oti, rc = 0);
1078 rc = target_handle_connect(req);
1080 obd = req->rq_export->exp_obd;
1083 case OST_DISCONNECT:
1084 CDEBUG(D_INODE, "disconnect\n");
1085 OBD_FAIL_GOTO(OBD_FAIL_OST_DISCONNECT_NET, out_free_oti, rc = 0);
1086 rc = target_handle_disconnect(req);
1089 CDEBUG(D_INODE, "create\n");
1090 OBD_FAIL_GOTO(OBD_FAIL_OST_ENOSPC, out_check_req, rc = -ENOSPC);
1091 OBD_FAIL_GOTO(OBD_FAIL_OST_EROFS, out_check_req, rc = -EROFS);
1092 OBD_FAIL_GOTO(OBD_FAIL_OST_CREATE_NET, out_free_oti, rc = 0);
1093 rc = ost_create(req->rq_export, req, oti);
1096 CDEBUG(D_INODE, "destroy\n");
1097 OBD_FAIL_GOTO(OBD_FAIL_OST_DESTROY_NET, out_free_oti, rc = 0);
1098 OBD_FAIL_GOTO(OBD_FAIL_OST_EROFS, out_check_req, rc = -EROFS);
1099 rc = ost_destroy(req->rq_export, req, oti);
1102 CDEBUG(D_INODE, "getattr\n");
1103 OBD_FAIL_GOTO(OBD_FAIL_OST_GETATTR_NET, out_free_oti, rc = 0);
1104 rc = ost_getattr(req->rq_export, req);
1107 CDEBUG(D_INODE, "setattr\n");
1108 OBD_FAIL_GOTO(OBD_FAIL_OST_SETATTR_NET, out_free_oti, rc = 0);
1109 rc = ost_setattr(req->rq_export, req, oti);
1112 CDEBUG(D_INODE, "write\n");
1113 OBD_FAIL_GOTO(OBD_FAIL_OST_BRW_NET, out_free_oti, rc = 0);
1114 OBD_FAIL_GOTO(OBD_FAIL_OST_ENOSPC, out_check_req, rc = -ENOSPC);
1115 OBD_FAIL_GOTO(OBD_FAIL_OST_EROFS, out_check_req, rc = -EROFS);
1116 rc = ost_brw_write(req, oti);
1117 LASSERT(current->journal_info == NULL);
1118 /* ost_brw sends its own replies */
1119 GOTO(out_free_oti, rc);
1121 CDEBUG(D_INODE, "read\n");
1122 OBD_FAIL_GOTO(OBD_FAIL_OST_BRW_NET, out_free_oti, rc = 0);
1123 rc = ost_brw_read(req);
1124 LASSERT(current->journal_info == NULL);
1125 /* ost_brw sends its own replies */
1126 GOTO(out_free_oti, rc);
1128 CDEBUG(D_INODE, "san read\n");
1129 OBD_FAIL_GOTO(OBD_FAIL_OST_BRW_NET, out_free_oti, rc = 0);
1130 rc = ost_san_brw(req, OBD_BRW_READ);
1131 /* ost_san_brw sends its own replies */
1132 GOTO(out_free_oti, rc);
1134 CDEBUG(D_INODE, "san write\n");
1135 OBD_FAIL_GOTO(OBD_FAIL_OST_BRW_NET, out_free_oti, rc = 0);
1136 rc = ost_san_brw(req, OBD_BRW_WRITE);
1137 /* ost_san_brw sends its own replies */
1138 GOTO(out_free_oti, rc);
1140 CDEBUG(D_INODE, "punch\n");
1141 OBD_FAIL_GOTO(OBD_FAIL_OST_PUNCH_NET, out_free_oti, rc = 0);
1142 OBD_FAIL_GOTO(OBD_FAIL_OST_EROFS, out_check_req, rc = -EROFS);
1143 rc = ost_punch(req->rq_export, req, oti);
1146 CDEBUG(D_INODE, "statfs\n");
1147 OBD_FAIL_GOTO(OBD_FAIL_OST_STATFS_NET, out_free_oti, rc = 0);
1148 rc = ost_statfs(req);
1151 CDEBUG(D_INODE, "sync\n");
1152 OBD_FAIL_GOTO(OBD_FAIL_OST_SYNC_NET, out_free_oti, rc = 0);
1153 rc = ost_sync(req->rq_export, req);
1156 DEBUG_REQ(D_INODE, req, "set_info");
1157 rc = ost_set_info(req->rq_export, req);
1160 DEBUG_REQ(D_INODE, req, "get_info");
1161 rc = ost_get_info(req->rq_export, req);
1164 DEBUG_REQ(D_INODE, req, "ping");
1165 rc = target_handle_ping(req);
1167 /* FIXME - just reply status */
1168 case LLOG_ORIGIN_CONNECT:
1169 DEBUG_REQ(D_INODE, req, "log connect\n");
1170 rc = ost_llog_handle_connect(req->rq_export, req);
1171 req->rq_status = rc;
1172 rc = lustre_pack_reply(req, 0, NULL, NULL);
1174 GOTO(out_free_oti, rc);
1175 GOTO(out_free_oti, rc = ptlrpc_reply(req));
1176 case OBD_LOG_CANCEL:
1177 CDEBUG(D_INODE, "log cancel\n");
1178 OBD_FAIL_GOTO(OBD_FAIL_OBD_LOG_CANCEL_NET, out_free_oti, rc = 0);
1179 rc = llog_origin_handle_cancel(req);
1180 req->rq_status = rc;
1181 rc = lustre_pack_reply(req, 0, NULL, NULL);
1183 GOTO(out_free_oti, rc);
1184 GOTO(out_free_oti, rc = ptlrpc_reply(req));
1186 CDEBUG(D_INODE, "enqueue\n");
1187 OBD_FAIL_GOTO(OBD_FAIL_LDLM_ENQUEUE, out_free_oti, rc = 0);
1188 rc = ldlm_handle_enqueue(req, ldlm_server_completion_ast,
1189 ldlm_server_blocking_ast,
1190 ldlm_server_glimpse_ast);
1191 fail = OBD_FAIL_OST_LDLM_REPLY_NET;
1194 CDEBUG(D_INODE, "convert\n");
1195 OBD_FAIL_GOTO(OBD_FAIL_LDLM_CONVERT, out_free_oti, rc = 0);
1196 rc = ldlm_handle_convert(req);
1199 CDEBUG(D_INODE, "cancel\n");
1200 OBD_FAIL_GOTO(OBD_FAIL_LDLM_CANCEL, out_free_oti, rc = 0);
1201 rc = ldlm_handle_cancel(req);
1203 case LDLM_BL_CALLBACK:
1204 case LDLM_CP_CALLBACK:
1205 CDEBUG(D_INODE, "callback\n");
1206 CERROR("callbacks should not happen on OST\n");
1209 CERROR("Unexpected opcode %d\n", req->rq_reqmsg->opc);
1210 req->rq_status = -ENOTSUPP;
1211 rc = ptlrpc_error(req);
1212 GOTO(out_free_oti, rc);
1215 LASSERT(current->journal_info == NULL);
1218 /* If we're DISCONNECTing, the export_data is already freed */
1219 if (!rc && req->rq_reqmsg->opc != OST_DISCONNECT) {
1220 if (!obd->obd_no_transno) {
1221 req->rq_repmsg->last_committed =
1222 obd->obd_last_committed;
1224 DEBUG_REQ(D_IOCTL, req,
1225 "not sending last_committed update");
1227 CDEBUG(D_INFO, "last_committed "LPU64", xid "LPX64"\n",
1228 obd->obd_last_committed, req->rq_xid);
1234 oti_to_request(oti, req);
1235 target_send_reply(req, rc, fail);
1240 OBD_FREE(oti, sizeof(*oti));
1243 EXPORT_SYMBOL(ost_handle);
1245 int ost_attach(struct obd_device *dev, obd_count len, void *data)
1247 struct lprocfs_static_vars lvars;
1249 lprocfs_init_vars(ost,&lvars);
1250 return lprocfs_obd_attach(dev, lvars.obd_vars);
1253 int ost_detach(struct obd_device *dev)
1255 return lprocfs_obd_detach(dev);
1258 extern struct file_operations ost_stimes_fops;
1260 static int ost_setup(struct obd_device *obd, obd_count len, void *buf)
1262 struct ost_obd *ost = &obd->u.ost;
1266 rc = cleanup_group_info();
1270 rc = llog_start_commit_thread();
1274 lprocfs_obd_seq_create(obd, "service_times", 0444, &ost_stimes_fops,
1278 ptlrpc_init_svc(OST_NBUFS, OST_BUFSIZE, OST_MAXREQSIZE,
1279 OST_REQUEST_PORTAL, OSC_REPLY_PORTAL, 30000,
1281 obd->obd_proc_entry);
1282 if (ost->ost_service == NULL) {
1283 CERROR("failed to start service\n");
1287 rc = ptlrpc_start_n_threads(obd, ost->ost_service, OST_NUM_THREADS,
1290 GOTO(out_service, rc = -EINVAL);
1292 ost->ost_create_service =
1293 ptlrpc_init_svc(OST_NBUFS, OST_BUFSIZE, OST_MAXREQSIZE,
1294 OST_CREATE_PORTAL, OSC_REPLY_PORTAL, 30000,
1295 ost_handle, "ost_create",
1296 obd->obd_proc_entry);
1297 if (ost->ost_create_service == NULL) {
1298 CERROR("failed to start OST create service\n");
1299 GOTO(out_service, rc = -ENOMEM);
1303 spin_lock_init(&ost->ost_lock);
1304 ost->ost_service->srv_obddev = obd;
1306 rc = ptlrpc_start_n_threads(obd, ost->ost_create_service, 1,
1309 GOTO(out_create, rc = -EINVAL);
1314 ptlrpc_unregister_service(ost->ost_create_service);
1316 ptlrpc_unregister_service(ost->ost_service);
1320 extern void lgss_svc_cache_purge_all(void);
1321 static int ost_cleanup(struct obd_device *obd, int flags)
1323 struct ost_obd *ost = &obd->u.ost;
1327 spin_lock_bh(&obd->obd_processing_task_lock);
1328 if (obd->obd_recovering) {
1329 target_cancel_recovery_timer(obd);
1330 obd->obd_recovering = 0;
1332 spin_unlock_bh(&obd->obd_processing_task_lock);
1334 ptlrpc_stop_all_threads(ost->ost_service);
1335 ptlrpc_unregister_service(ost->ost_service);
1337 ptlrpc_stop_all_threads(ost->ost_create_service);
1338 ptlrpc_unregister_service(ost->ost_create_service);
1342 lgss_svc_cache_purge_all();
1347 /* use obd ops to offer management infrastructure */
1348 static struct obd_ops ost_obd_ops = {
1349 .o_owner = THIS_MODULE,
1350 .o_attach = ost_attach,
1351 .o_detach = ost_detach,
1352 .o_setup = ost_setup,
1353 .o_cleanup = ost_cleanup,
1356 static int __init ost_init(void)
1358 struct lprocfs_static_vars lvars;
1361 lprocfs_init_vars(ost,&lvars);
1362 RETURN(class_register_type(&ost_obd_ops, NULL, lvars.module_vars,
1363 OBD_OST_DEVICENAME));
1366 static void /*__exit*/ ost_exit(void)
1368 class_unregister_type(OBD_OST_DEVICENAME);
1371 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
1372 MODULE_DESCRIPTION("Lustre Object Storage Target (OST) v0.01");
1373 MODULE_LICENSE("GPL");
1375 module_init(ost_init);
1376 module_exit(ost_exit);