1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
4 * Copyright (C) 2001-2003 Cluster File Systems, Inc.
5 * Author: Peter J. Braam <braam@clusterfs.com>
6 * Author: Phil Schwan <phil@clusterfs.com>
8 * This file is part of Lustre, http://www.lustre.org.
10 * Lustre is free software; you can redistribute it and/or
11 * modify it under the terms of version 2 of the GNU General Public
12 * License as published by the Free Software Foundation.
14 * Lustre is distributed in the hope that it will be useful,
15 * but WITHOUT ANY WARRANTY; without even the implied warranty of
16 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 * GNU General Public License for more details.
19 * You should have received a copy of the GNU General Public License
20 * along with Lustre; if not, write to the Free Software
21 * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
23 * Storage Target Handling functions
24 * Lustre Object Server Module (OST)
26 * This server is single threaded at present (but can easily be multi
27 * threaded). For testing and management it is treated as an
28 * obd_device, although it does not export a full OBD method table
29 * (the requests are coming in over the wire, so object target
30 * modules do not have a full method table.)
34 # define EXPORT_SYMTAB
36 #define DEBUG_SUBSYSTEM S_OST
38 #include <linux/module.h>
39 #include <linux/obd_ost.h>
40 #include <linux/lustre_net.h>
41 #include <linux/lustre_dlm.h>
42 #include <linux/lustre_export.h>
43 #include <linux/init.h>
44 #include <linux/lprocfs_status.h>
45 #include <linux/lustre_commit_confd.h>
46 #include <portals/list.h>
48 void oti_init(struct obd_trans_info *oti, struct ptlrpc_request *req)
52 memset(oti, 0, sizeof *oti);
54 if (req->rq_repmsg && req->rq_reqmsg != 0)
55 oti->oti_transno = req->rq_repmsg->transno;
58 void oti_to_request(struct obd_trans_info *oti, struct ptlrpc_request *req)
60 struct oti_req_ack_lock *ack_lock;
67 req->rq_repmsg->transno = oti->oti_transno;
69 /* XXX 4 == entries in oti_ack_locks??? */
70 for (ack_lock = oti->oti_ack_locks, i = 0; i < 4; i++, ack_lock++) {
73 /* XXX not even calling target_send_reply in some cases... */
74 ptlrpc_save_lock (req, &ack_lock->lock, ack_lock->mode);
78 static int ost_destroy(struct obd_export *exp, struct ptlrpc_request *req,
79 struct obd_trans_info *oti)
81 struct ost_body *body, *repbody;
82 int rc, size = sizeof(*body);
85 body = lustre_swab_reqbuf(req, 0, sizeof(*body), lustre_swab_ost_body);
89 rc = lustre_pack_reply(req, 1, &size, NULL);
93 if (body->oa.o_valid & OBD_MD_FLCOOKIE)
94 oti->oti_logcookies = obdo_logcookie(&body->oa);
95 repbody = lustre_msg_buf(req->rq_repmsg, 0, sizeof(*repbody));
96 memcpy(&repbody->oa, &body->oa, sizeof(body->oa));
97 req->rq_status = obd_destroy(exp, &body->oa, NULL, oti);
101 static int ost_getattr(struct obd_export *exp, struct ptlrpc_request *req)
103 struct ost_body *body, *repbody;
104 int rc, size = sizeof(*body);
107 body = lustre_swab_reqbuf(req, 0, sizeof(*body), lustre_swab_ost_body);
111 rc = lustre_pack_reply(req, 1, &size, NULL);
115 repbody = lustre_msg_buf (req->rq_repmsg, 0, sizeof(*repbody));
116 memcpy(&repbody->oa, &body->oa, sizeof(body->oa));
117 req->rq_status = obd_getattr(exp, &repbody->oa, NULL);
121 static int ost_statfs(struct ptlrpc_request *req)
123 struct obd_statfs *osfs;
124 int rc, size = sizeof(*osfs);
127 rc = lustre_pack_reply(req, 1, &size, NULL);
131 osfs = lustre_msg_buf(req->rq_repmsg, 0, sizeof(*osfs));
133 req->rq_status = obd_statfs(req->rq_export->exp_obd, osfs, jiffies-HZ);
134 if (req->rq_status != 0)
135 CERROR("ost: statfs failed: rc %d\n", req->rq_status);
140 static int ost_create(struct obd_export *exp, struct ptlrpc_request *req,
141 struct obd_trans_info *oti)
143 struct ost_body *body, *repbody;
144 int rc, size = sizeof(*repbody);
147 body = lustre_swab_reqbuf(req, 0, sizeof(*body), lustre_swab_ost_body);
151 rc = lustre_pack_reply(req, 1, &size, NULL);
155 repbody = lustre_msg_buf (req->rq_repmsg, 0, sizeof(*repbody));
156 memcpy(&repbody->oa, &body->oa, sizeof(body->oa));
157 oti->oti_logcookies = obdo_logcookie(&repbody->oa);
158 req->rq_status = obd_create(exp, &repbody->oa, NULL, oti);
159 //obd_log_cancel(conn, NULL, 1, oti->oti_logcookies, 0);
163 static int ost_punch(struct obd_export *exp, struct ptlrpc_request *req,
164 struct obd_trans_info *oti)
166 struct ost_body *body, *repbody;
167 int rc, size = sizeof(*repbody);
170 body = lustre_swab_reqbuf(req, 0, sizeof(*body), lustre_swab_ost_body);
174 if ((body->oa.o_valid & (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS)) !=
175 (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS))
178 rc = lustre_pack_reply(req, 1, &size, NULL);
182 repbody = lustre_msg_buf(req->rq_repmsg, 0, sizeof(*repbody));
183 memcpy(&repbody->oa, &body->oa, sizeof(body->oa));
184 req->rq_status = obd_punch(exp, &repbody->oa, NULL, repbody->oa.o_size,
185 repbody->oa.o_blocks, oti);
189 static int ost_sync(struct obd_export *exp, struct ptlrpc_request *req)
191 struct ost_body *body, *repbody;
192 int rc, size = sizeof(*repbody);
195 body = lustre_swab_reqbuf(req, 0, sizeof(*body), lustre_swab_ost_body);
199 rc = lustre_pack_reply(req, 1, &size, NULL);
203 repbody = lustre_msg_buf(req->rq_repmsg, 0, sizeof(*repbody));
204 memcpy(&repbody->oa, &body->oa, sizeof(body->oa));
205 req->rq_status = obd_sync(exp, &repbody->oa, NULL, repbody->oa.o_size,
206 repbody->oa.o_blocks);
210 static int ost_setattr(struct obd_export *exp, struct ptlrpc_request *req,
211 struct obd_trans_info *oti)
213 struct ost_body *body, *repbody;
214 int rc, size = sizeof(*repbody);
217 body = lustre_swab_reqbuf(req, 0, sizeof(*body), lustre_swab_ost_body);
221 rc = lustre_pack_reply(req, 1, &size, NULL);
225 repbody = lustre_msg_buf(req->rq_repmsg, 0, sizeof(*repbody));
226 memcpy(&repbody->oa, &body->oa, sizeof(body->oa));
228 req->rq_status = obd_setattr(exp, &repbody->oa, NULL, oti);
232 static int ost_bulk_timeout(void *data)
235 /* We don't fail the connection here, because having the export
236 * killed makes the (vital) call to commitrw very sad.
241 static int get_per_page_niobufs(struct obd_ioobj *ioo, int nioo,
242 struct niobuf_remote *rnb, int nrnb,
243 struct niobuf_remote **pp_rnbp)
245 /* Copy a remote niobuf, splitting it into page-sized chunks
246 * and setting ioo[i].ioo_bufcnt accordingly */
247 struct niobuf_remote *pp_rnb;
254 /* first count and check the number of pages required */
255 for (i = 0; i < nioo; i++)
256 for (j = 0; j < ioo->ioo_bufcnt; j++, rnbidx++) {
257 obd_off offset = rnb[rnbidx].offset;
258 obd_off p0 = offset >> PAGE_SHIFT;
259 obd_off pn = (offset + rnb[rnbidx].len - 1)>>PAGE_SHIFT;
261 LASSERT(rnbidx < nrnb);
263 npages += (pn + 1 - p0);
265 if (rnb[rnbidx].len == 0) {
266 CERROR("zero len BRW: obj %d objid "LPX64
267 " buf %u\n", i, ioo[i].ioo_id, j);
271 rnb[rnbidx].offset <= rnb[rnbidx-1].offset) {
272 CERROR("unordered BRW: obj %d objid "LPX64
273 " buf %u offset "LPX64" <= "LPX64"\n",
274 i, ioo[i].ioo_id, j, rnb[rnbidx].offset,
280 LASSERT(rnbidx == nrnb);
282 if (npages == nrnb) { /* all niobufs are for single pages */
287 OBD_ALLOC(pp_rnb, sizeof(*pp_rnb) * npages);
291 /* now do the actual split */
293 for (i = 0; i < nioo; i++) {
296 for (j = 0; j < ioo[i].ioo_bufcnt; j++, rnbidx++) {
297 obd_off off = rnb[rnbidx].offset;
298 int nob = rnb[rnbidx].len;
300 LASSERT(rnbidx < nrnb);
302 obd_off poff = off & (PAGE_SIZE - 1);
303 int pnob = (poff + nob > PAGE_SIZE) ?
304 PAGE_SIZE - poff : nob;
306 LASSERT(page < npages);
307 pp_rnb[page].len = pnob;
308 pp_rnb[page].offset = off;
309 pp_rnb[page].flags = rnb[rnbidx].flags;
311 CDEBUG(0, " obj %d id "LPX64
312 "page %d(%d) "LPX64" for %d, flg %x\n",
313 i, ioo[i].ioo_id, obj_pages, page,
314 pp_rnb[page].offset, pp_rnb[page].len,
324 ioo[i].ioo_bufcnt = obj_pages;
326 LASSERT(page == npages);
332 static void free_per_page_niobufs (int npages, struct niobuf_remote *pp_rnb,
333 struct niobuf_remote *rnb)
335 if (pp_rnb == rnb) /* didn't allocate above */
338 OBD_FREE(pp_rnb, sizeof(*pp_rnb) * npages);
342 obd_count ost_checksum_bulk(struct ptlrpc_bulk_desc *desc)
345 struct ptlrpc_bulk_page *bp;
347 list_for_each_entry(bp, &desc->bd_page_list, bp_link) {
348 ost_checksum(&cksum, kmap(bp->bp_page) + bp->bp_pageoffset,
357 static void ost_stime_record(struct ptlrpc_request *req, struct timeval *start,
358 unsigned rw, unsigned phase)
360 struct obd_device *obd = req->rq_svc->srv_obddev;
362 int ind = rw *3 + phase;
364 if (obd && obd->obd_type && obd->obd_type->typ_name) {
365 if (!strcmp(obd->obd_type->typ_name, LUSTRE_OST_NAME)) {
366 struct ost_obd *ost = NULL;
369 if (ind >= (sizeof(ost->ost_stimes) /
370 sizeof(ost->ost_stimes[0])))
372 do_gettimeofday(&stop);
374 spin_lock(&ost->ost_lock);
375 lprocfs_stime_record(&ost->ost_stimes[ind],&stop,start);
376 spin_unlock(&ost->ost_lock);
377 memcpy(start, &stop, sizeof(*start));
382 static char str[PTL_NALFMT_SIZE];
385 static int ost_brw_read(struct ptlrpc_request *req)
387 struct ptlrpc_bulk_desc *desc;
388 struct niobuf_remote *remote_nb;
389 struct niobuf_remote *pp_rnb;
390 struct niobuf_local *local_nb;
391 struct obd_ioobj *ioo;
392 struct ost_body *body, *repbody;
393 struct l_wait_info lwi;
394 struct obd_trans_info oti = { 0 };
395 int size[1] = { sizeof(*body) };
402 struct timeval start;
405 if (OBD_FAIL_CHECK(OBD_FAIL_OST_BRW_READ_BULK))
406 GOTO(out, rc = -EIO);
408 OBD_FAIL_TIMEOUT(OBD_FAIL_OST_BRW_PAUSE_BULK | OBD_FAIL_ONCE,
409 (obd_timeout + 1) / 4);
411 body = lustre_swab_reqbuf(req, 0, sizeof(*body), lustre_swab_ost_body);
413 CERROR("Missing/short ost_body\n");
414 GOTO(out, rc = -EFAULT);
417 ioo = lustre_swab_reqbuf(req, 1, sizeof(*ioo), lustre_swab_obd_ioobj);
419 CERROR("Missing/short ioobj\n");
420 GOTO(out, rc = -EFAULT);
423 niocount = ioo->ioo_bufcnt;
424 remote_nb = lustre_swab_reqbuf(req, 2, niocount * sizeof(*remote_nb),
425 lustre_swab_niobuf_remote);
426 if (remote_nb == NULL) {
427 CERROR("Missing/short niobuf\n");
428 GOTO(out, rc = -EFAULT);
430 if (lustre_msg_swabbed(req->rq_reqmsg)) { /* swab remaining niobufs */
431 for (i = 1; i < niocount; i++)
432 lustre_swab_niobuf_remote (&remote_nb[i]);
435 rc = lustre_pack_reply(req, 1, size, NULL);
439 /* FIXME all niobuf splitting should be done in obdfilter if needed */
440 /* CAVEAT EMPTOR this sets ioo->ioo_bufcnt to # pages */
441 npages = get_per_page_niobufs(ioo, 1, remote_nb, niocount, &pp_rnb);
443 GOTO(out, rc = npages);
445 OBD_ALLOC(local_nb, sizeof(*local_nb) * npages);
446 if (local_nb == NULL)
447 GOTO(out_pp_rnb, rc = -ENOMEM);
449 desc = ptlrpc_prep_bulk_exp (req, npages,
450 BULK_PUT_SOURCE, OST_BULK_PORTAL);
452 GOTO(out_local, rc = -ENOMEM);
454 do_gettimeofday(&start);
455 rc = obd_preprw(OBD_BRW_READ, req->rq_export, &body->oa, 1,
456 ioo, npages, pp_rnb, local_nb, &oti);
457 ost_stime_record(req, &start, 0, 0);
462 for (i = 0; i < npages; i++) {
463 int page_rc = local_nb[i].rc;
465 if (page_rc < 0) { /* error */
470 LASSERT(page_rc <= pp_rnb[i].len);
472 if (page_rc != 0) { /* some data! */
473 LASSERT (local_nb[i].page != NULL);
474 ptlrpc_prep_bulk_page(desc, local_nb[i].page,
475 pp_rnb[i].offset & (PAGE_SIZE-1),
479 if (page_rc != pp_rnb[i].len) { /* short read */
480 /* All subsequent pages should be 0 */
482 LASSERT(local_nb[i].rc == 0);
488 rc = ptlrpc_start_bulk_transfer(desc);
490 lwi = LWI_TIMEOUT(obd_timeout * HZ / 4,
491 ost_bulk_timeout, desc);
492 rc = l_wait_event(desc->bd_waitq,
493 !ptlrpc_bulk_active(desc), &lwi);
494 LASSERT(rc == 0 || rc == -ETIMEDOUT);
495 if (rc == -ETIMEDOUT) {
496 DEBUG_REQ(D_ERROR, req, "timeout on bulk PUT");
497 ptlrpc_abort_bulk(desc);
498 } else if (!desc->bd_success ||
499 desc->bd_nob_transferred != desc->bd_nob) {
500 DEBUG_REQ(D_ERROR, req, "%s bulk PUT %d(%d)",
502 "truncated" : "network error on",
503 desc->bd_nob_transferred,
505 /* XXX should this be a different errno? */
509 DEBUG_REQ(D_ERROR, req, "bulk PUT failed: rc %d\n", rc);
511 comms_error = rc != 0;
514 ost_stime_record(req, &start, 0, 1);
515 /* Must commit after prep above in all cases */
516 rc = obd_commitrw(OBD_BRW_READ, req->rq_export, &body->oa, 1,
517 ioo, npages, local_nb, &oti, rc);
518 ost_stime_record(req, &start, 0, 2);
521 repbody = lustre_msg_buf(req->rq_repmsg, 0, sizeof(*repbody));
522 memcpy(&repbody->oa, &body->oa, sizeof(repbody->oa));
525 repbody->oa.o_cksum = ost_checksum_bulk(desc);
526 repbody->oa.o_valid |= OBD_MD_FLCKSUM;
531 ptlrpc_free_bulk(desc);
533 OBD_FREE(local_nb, sizeof(*local_nb) * npages);
535 free_per_page_niobufs(npages, pp_rnb, remote_nb);
539 req->rq_status = nob;
541 } else if (!comms_error) {
542 /* only reply if comms OK */
546 if (req->rq_reply_state != NULL) {
547 /* reply out callback would free */
548 lustre_free_reply_state (req->rq_reply_state);
550 if (req->rq_reqmsg->conn_cnt == req->rq_export->exp_conn_cnt) {
551 CERROR("bulk IO comms error: "
552 "evicting %s@%s nid %s\n",
553 req->rq_export->exp_client_uuid.uuid,
554 req->rq_export->exp_connection->c_remote_uuid.uuid,
555 ptlrpc_peernid2str(&req->rq_peer, str));
556 ptlrpc_fail_export(req->rq_export);
558 CERROR("ignoring bulk IO comms error: "
559 "client reconnected %s@%s nid %s\n",
560 req->rq_export->exp_client_uuid.uuid,
561 req->rq_export->exp_connection->c_remote_uuid.uuid,
562 ptlrpc_peernid2str(&req->rq_peer, str));
569 int ost_brw_write(struct ptlrpc_request *req, struct obd_trans_info *oti)
571 struct ptlrpc_bulk_desc *desc;
572 struct niobuf_remote *remote_nb;
573 struct niobuf_remote *pp_rnb;
574 struct niobuf_local *local_nb;
575 struct obd_ioobj *ioo;
576 struct ost_body *body, *repbody;
577 struct l_wait_info lwi;
579 int size[2] = { sizeof(*body) };
580 int objcount, niocount, npages;
583 struct timeval start;
586 if (OBD_FAIL_CHECK(OBD_FAIL_OST_BRW_WRITE_BULK))
587 GOTO(out, rc = -EIO);
589 /* pause before transaction has been started */
590 OBD_FAIL_TIMEOUT(OBD_FAIL_OST_BRW_PAUSE_BULK | OBD_FAIL_ONCE,
591 (obd_timeout + 1) / 4);
593 swab = lustre_msg_swabbed(req->rq_reqmsg);
594 body = lustre_swab_reqbuf(req, 0, sizeof(*body), lustre_swab_ost_body);
596 CERROR("Missing/short ost_body\n");
597 GOTO(out, rc = -EFAULT);
600 LASSERT_REQSWAB(req, 1);
601 objcount = req->rq_reqmsg->buflens[1] / sizeof(*ioo);
603 CERROR("Missing/short ioobj\n");
604 GOTO(out, rc = -EFAULT);
606 ioo = lustre_msg_buf (req->rq_reqmsg, 1, objcount * sizeof(*ioo));
607 LASSERT (ioo != NULL);
608 for (niocount = i = 0; i < objcount; i++) {
610 lustre_swab_obd_ioobj (&ioo[i]);
611 if (ioo[i].ioo_bufcnt == 0) {
612 CERROR("ioo[%d] has zero bufcnt\n", i);
613 GOTO(out, rc = -EFAULT);
615 niocount += ioo[i].ioo_bufcnt;
618 remote_nb = lustre_swab_reqbuf(req, 2, niocount * sizeof(*remote_nb),
619 lustre_swab_niobuf_remote);
620 if (remote_nb == NULL) {
621 CERROR("Missing/short niobuf\n");
622 GOTO(out, rc = -EFAULT);
624 if (swab) { /* swab the remaining niobufs */
625 for (i = 1; i < niocount; i++)
626 lustre_swab_niobuf_remote (&remote_nb[i]);
629 size[1] = niocount * sizeof(*rcs);
630 rc = lustre_pack_reply(req, 2, size, NULL);
633 rcs = lustre_msg_buf(req->rq_repmsg, 1, niocount * sizeof(*rcs));
636 /* Do snap options here*/
637 rc = obd_do_cow(req->rq_export, ioo, objcount, remote_nb);
642 /* FIXME all niobuf splitting should be done in obdfilter if needed */
643 /* CAVEAT EMPTOR this sets ioo->ioo_bufcnt to # pages */
644 npages = get_per_page_niobufs(ioo, objcount,remote_nb,niocount,&pp_rnb);
646 GOTO(out, rc = npages);
648 OBD_ALLOC(local_nb, sizeof(*local_nb) * npages);
649 if (local_nb == NULL)
650 GOTO(out_pp_rnb, rc = -ENOMEM);
652 desc = ptlrpc_prep_bulk_exp (req, npages,
653 BULK_GET_SINK, OST_BULK_PORTAL);
655 GOTO(out_local, rc = -ENOMEM);
657 do_gettimeofday(&start);
658 rc = obd_preprw(OBD_BRW_WRITE, req->rq_export, &body->oa, objcount,
659 ioo, npages, pp_rnb, local_nb, oti);
660 ost_stime_record(req, &start, 1, 0);
664 /* NB Having prepped, we must commit... */
666 for (i = 0; i < npages; i++)
667 ptlrpc_prep_bulk_page(desc, local_nb[i].page,
668 pp_rnb[i].offset & (PAGE_SIZE - 1),
671 rc = ptlrpc_start_bulk_transfer (desc);
673 lwi = LWI_TIMEOUT(obd_timeout * HZ / 4,
674 ost_bulk_timeout, desc);
675 rc = l_wait_event(desc->bd_waitq, !ptlrpc_bulk_active(desc),
677 LASSERT(rc == 0 || rc == -ETIMEDOUT);
678 if (rc == -ETIMEDOUT) {
679 DEBUG_REQ(D_ERROR, req, "timeout on bulk GET");
680 ptlrpc_abort_bulk(desc);
681 } else if (!desc->bd_success ||
682 desc->bd_nob_transferred != desc->bd_nob) {
683 DEBUG_REQ(D_ERROR, req, "%s bulk GET %d(%d)",
685 "truncated" : "network error on",
686 desc->bd_nob_transferred, desc->bd_nob);
687 /* XXX should this be a different errno? */
691 DEBUG_REQ(D_ERROR, req, "ptlrpc_bulk_get failed: rc %d\n", rc);
693 comms_error = rc != 0;
695 repbody = lustre_msg_buf(req->rq_repmsg, 0, sizeof(*repbody));
696 memcpy(&repbody->oa, &body->oa, sizeof(repbody->oa));
699 if (rc == 0 && (body->oa.o_valid & OBD_MD_FLCKSUM) != 0) {
700 static int cksum_counter;
701 obd_count client_cksum = body->oa.o_cksum;
702 obd_count cksum = ost_checksum_bulk(desc);
704 if (client_cksum != cksum) {
705 CERROR("Bad checksum: client %x, server %x NID %s\n",
707 ptlrpc_peernid2str(&req->rq_peer, str));
709 repbody->oa.o_cksum = cksum;
712 if ((cksum_counter & (-cksum_counter)) == cksum_counter)
713 CWARN("Checksum %u from NID %s: %x OK\n",
715 ptlrpc_peernid2str(&req->rq_peer, str),
720 ost_stime_record(req, &start, 1, 1);
721 /* Must commit after prep above in all cases */
722 rc = obd_commitrw(OBD_BRW_WRITE, req->rq_export, &repbody->oa,
723 objcount, ioo, npages, local_nb, oti, rc);
725 ost_stime_record(req, &start, 1, 2);
727 repbody = lustre_msg_buf(req->rq_repmsg, 0, sizeof(*repbody));
728 memcpy(&repbody->oa, &body->oa, sizeof(repbody->oa));
731 repbody->oa.o_cksum = ost_checksum_bulk(desc);
732 repbody->oa.o_valid |= OBD_MD_FLCKSUM;
734 /* set per-requested niobuf return codes */
735 for (i = j = 0; i < niocount; i++) {
736 int nob = remote_nb[i].len;
741 if (local_nb[j].rc < 0)
742 rcs[i] = local_nb[j].rc;
743 nob -= pp_rnb[j].len;
748 LASSERT(j == npages);
750 rc = obd_write_extents(req->rq_export, ioo, objcount, niocount,
753 CERROR("write extents error of id "LPU64" rc=%d\n",
757 ptlrpc_free_bulk(desc);
759 OBD_FREE(local_nb, sizeof(*local_nb) * npages);
761 free_per_page_niobufs(npages, pp_rnb, remote_nb);
764 oti_to_request(oti, req);
765 rc = ptlrpc_reply(req);
766 } else if (!comms_error) {
767 /* Only reply if there was no comms problem with bulk */
771 if (req->rq_reply_state != NULL) {
772 /* reply out callback would free */
773 lustre_free_reply_state (req->rq_reply_state);
775 if (req->rq_reqmsg->conn_cnt == req->rq_export->exp_conn_cnt) {
776 CERROR("%s: bulk IO comm error evicting %s@%s NID %s\n",
777 req->rq_export->exp_obd->obd_name,
778 req->rq_export->exp_client_uuid.uuid,
779 req->rq_export->exp_connection->c_remote_uuid.uuid,
780 ptlrpc_peernid2str(&req->rq_peer, str));
781 ptlrpc_fail_export(req->rq_export);
783 CERROR("ignoring bulk IO comms error: "
784 "client reconnected %s@%s nid %s\n",
785 req->rq_export->exp_client_uuid.uuid,
786 req->rq_export->exp_connection->c_remote_uuid.uuid,
787 ptlrpc_peernid2str(&req->rq_peer, str));
792 EXPORT_SYMBOL(ost_brw_write);
794 static int ost_san_brw(struct ptlrpc_request *req, int cmd)
796 struct niobuf_remote *remote_nb, *res_nb, *pp_rnb;
797 struct obd_ioobj *ioo;
798 struct ost_body *body, *repbody;
799 int rc, i, objcount, niocount, size[2] = {sizeof(*body)}, npages;
803 /* XXX not set to use latest protocol */
805 swab = lustre_msg_swabbed(req->rq_reqmsg);
806 body = lustre_swab_reqbuf(req, 0, sizeof(*body), lustre_swab_ost_body);
808 CERROR("Missing/short ost_body\n");
809 GOTO(out, rc = -EFAULT);
812 ioo = lustre_swab_reqbuf(req, 1, sizeof(*ioo), lustre_swab_obd_ioobj);
814 CERROR("Missing/short ioobj\n");
815 GOTO(out, rc = -EFAULT);
817 objcount = req->rq_reqmsg->buflens[1] / sizeof(*ioo);
818 niocount = ioo[0].ioo_bufcnt;
819 for (i = 1; i < objcount; i++) {
821 lustre_swab_obd_ioobj (&ioo[i]);
822 niocount += ioo[i].ioo_bufcnt;
825 remote_nb = lustre_swab_reqbuf(req, 2, niocount * sizeof(*remote_nb),
826 lustre_swab_niobuf_remote);
827 if (remote_nb == NULL) {
828 CERROR("Missing/short niobuf\n");
829 GOTO(out, rc = -EFAULT);
831 if (swab) { /* swab the remaining niobufs */
832 for (i = 1; i < niocount; i++)
833 lustre_swab_niobuf_remote (&remote_nb[i]);
836 /* CAVEAT EMPTOR this sets ioo->ioo_bufcnt to # pages */
837 npages = get_per_page_niobufs(ioo, objcount,remote_nb,niocount,&pp_rnb);
839 GOTO (out, rc = npages);
841 size[1] = npages * sizeof(*pp_rnb);
842 rc = lustre_pack_reply(req, 2, size, NULL);
844 GOTO(out_pp_rnb, rc);
846 req->rq_status = obd_san_preprw(cmd, req->rq_export, &body->oa,
847 objcount, ioo, npages, pp_rnb);
850 GOTO(out_pp_rnb, rc = 0);
852 repbody = lustre_msg_buf(req->rq_repmsg, 0, sizeof(*repbody));
853 memcpy(&repbody->oa, &body->oa, sizeof(body->oa));
855 res_nb = lustre_msg_buf(req->rq_repmsg, 1, size[1]);
856 memcpy(res_nb, remote_nb, size[1]);
859 free_per_page_niobufs(npages, pp_rnb, remote_nb);
870 static int ost_set_info(struct obd_export *exp, struct ptlrpc_request *req)
876 key = lustre_msg_buf(req->rq_reqmsg, 0, 1);
878 DEBUG_REQ(D_HA, req, "no set_info key");
881 keylen = req->rq_reqmsg->buflens[0];
883 rc = lustre_pack_reply(req, 0, NULL, NULL);
887 val = lustre_msg_buf(req->rq_reqmsg, 1, 0);
889 rc = obd_set_info(exp, keylen, key, req->rq_reqmsg->buflens[1], val);
890 req->rq_repmsg->status = 0;
894 static int ost_get_info(struct obd_export *exp, struct ptlrpc_request *req)
897 int keylen, rc = 0, size = sizeof(obd_id);
901 key = lustre_msg_buf(req->rq_reqmsg, 0, 1);
903 DEBUG_REQ(D_HA, req, "no get_info key");
906 keylen = req->rq_reqmsg->buflens[0];
908 if (keylen < strlen("last_id") || memcmp(key, "last_id", 7) != 0)
911 rc = lustre_pack_reply(req, 1, &size, NULL);
915 reply = lustre_msg_buf(req->rq_repmsg, 0, sizeof(*reply));
916 rc = obd_get_info(exp, keylen, key, &size, reply);
917 req->rq_repmsg->status = 0;
921 static int ost_llog_handle_connect(struct obd_export *exp,
922 struct ptlrpc_request *req)
924 struct llogd_conn_body *body;
928 body = lustre_msg_buf(req->rq_reqmsg, 0, sizeof(*body));
929 rc = obd_llog_connect(exp, body);
933 static int ost_filter_recovery_request(struct ptlrpc_request *req,
934 struct obd_device *obd, int *process)
936 switch (req->rq_reqmsg->opc) {
937 case OST_CONNECT: /* This will never get here, but for completeness. */
951 *process = target_queue_recovery_request(req, obd);
955 DEBUG_REQ(D_ERROR, req, "not permitted during recovery");
957 /* XXX what should we set rq_status to here? */
958 req->rq_status = -EAGAIN;
959 RETURN(ptlrpc_error(req));
963 int ost_msg_check_version(struct lustre_msg *msg)
971 rc = lustre_msg_check_version(msg, LUSTRE_OBD_VERSION);
973 CERROR("bad opc %u version %08x, expecting %08x\n",
974 msg->opc, msg->version, LUSTRE_OBD_VERSION);
989 rc = lustre_msg_check_version(msg, LUSTRE_OBD_VERSION);
991 CERROR("bad opc %u version %08x, expecting %08x\n",
992 msg->opc, msg->version, LUSTRE_OST_VERSION);
997 case LDLM_BL_CALLBACK:
998 case LDLM_CP_CALLBACK:
999 rc = lustre_msg_check_version(msg, LUSTRE_DLM_VERSION);
1001 CERROR("bad opc %u version %08x, expecting %08x\n",
1002 msg->opc, msg->version, LUSTRE_DLM_VERSION);
1004 case OBD_LOG_CANCEL:
1005 case LLOG_ORIGIN_CONNECT:
1006 rc = lustre_msg_check_version(msg, LUSTRE_LOG_VERSION);
1008 CERROR("bad opc %u version %08x, expecting %08x\n",
1009 msg->opc, msg->version, LUSTRE_LOG_VERSION);
1012 CERROR("OST unexpected opcode %d\n", msg->opc);
1019 int ost_handle(struct ptlrpc_request *req)
1021 struct obd_trans_info trans_info = { 0, };
1022 struct obd_trans_info *oti = &trans_info;
1023 int should_process, fail = OBD_FAIL_OST_ALL_REPLY_NET, rc = 0;
1024 struct obd_export *exp = NULL;
1027 LASSERT(current->journal_info == NULL);
1029 rc = ost_msg_check_version(req->rq_reqmsg);
1031 CERROR("OST drop mal-formed request\n");
1035 /* XXX identical to MDS */
1036 if (req->rq_reqmsg->opc != OST_CONNECT) {
1037 struct obd_device *obd;
1040 exp = req->rq_export;
1043 CDEBUG(D_HA,"operation %d on unconnected OST from %s\n",
1044 req->rq_reqmsg->opc,
1045 ptlrpc_peernid2str(&req->rq_peer, str));
1046 req->rq_status = -ENOTCONN;
1047 GOTO(out, rc = -ENOTCONN);
1052 /* Check for aborted recovery. */
1053 spin_lock_bh(&obd->obd_processing_task_lock);
1054 recovering = obd->obd_recovering;
1055 spin_unlock_bh(&obd->obd_processing_task_lock);
1057 rc = ost_filter_recovery_request(req, obd,
1059 if (rc || !should_process)
1061 if (should_process < 0) {
1062 req->rq_status = should_process;
1063 rc = ptlrpc_error(req);
1071 switch (req->rq_reqmsg->opc) {
1073 CDEBUG(D_INODE, "connect\n");
1074 OBD_FAIL_RETURN(OBD_FAIL_OST_CONNECT_NET, 0);
1075 rc = target_handle_connect(req);
1078 case OST_DISCONNECT:
1079 CDEBUG(D_INODE, "disconnect\n");
1080 OBD_FAIL_RETURN(OBD_FAIL_OST_DISCONNECT_NET, 0);
1081 rc = target_handle_disconnect(req);
1084 CDEBUG(D_INODE, "create\n");
1085 if (OBD_FAIL_CHECK_ONCE(OBD_FAIL_OST_ENOSPC))
1086 GOTO(out, rc = -ENOSPC);
1087 if (OBD_FAIL_CHECK_ONCE(OBD_FAIL_OST_EROFS))
1088 GOTO(out, rc = -EROFS);
1089 OBD_FAIL_RETURN(OBD_FAIL_OST_CREATE_NET, 0);
1090 rc = ost_create(exp, req, oti);
1093 CDEBUG(D_INODE, "destroy\n");
1094 OBD_FAIL_RETURN(OBD_FAIL_OST_DESTROY_NET, 0);
1095 if (OBD_FAIL_CHECK_ONCE(OBD_FAIL_OST_EROFS))
1096 GOTO(out, rc = -EROFS);
1097 rc = ost_destroy(exp, req, oti);
1100 CDEBUG(D_INODE, "getattr\n");
1101 OBD_FAIL_RETURN(OBD_FAIL_OST_GETATTR_NET, 0);
1102 rc = ost_getattr(exp, req);
1105 CDEBUG(D_INODE, "setattr\n");
1106 OBD_FAIL_RETURN(OBD_FAIL_OST_SETATTR_NET, 0);
1107 rc = ost_setattr(exp, req, oti);
1110 CDEBUG(D_INODE, "write\n");
1111 OBD_FAIL_RETURN(OBD_FAIL_OST_BRW_NET, 0);
1112 if (OBD_FAIL_CHECK_ONCE(OBD_FAIL_OST_ENOSPC))
1113 GOTO(out, rc = -ENOSPC);
1114 if (OBD_FAIL_CHECK_ONCE(OBD_FAIL_OST_EROFS))
1115 GOTO(out, rc = -EROFS);
1116 rc = ost_brw_write(req, oti);
1117 LASSERT(current->journal_info == NULL);
1118 /* ost_brw sends its own replies */
1121 CDEBUG(D_INODE, "read\n");
1122 OBD_FAIL_RETURN(OBD_FAIL_OST_BRW_NET, 0);
1123 rc = ost_brw_read(req);
1124 LASSERT(current->journal_info == NULL);
1125 /* ost_brw sends its own replies */
1128 CDEBUG(D_INODE, "san read\n");
1129 OBD_FAIL_RETURN(OBD_FAIL_OST_BRW_NET, 0);
1130 rc = ost_san_brw(req, OBD_BRW_READ);
1131 /* ost_san_brw sends its own replies */
1134 CDEBUG(D_INODE, "san write\n");
1135 OBD_FAIL_RETURN(OBD_FAIL_OST_BRW_NET, 0);
1136 rc = ost_san_brw(req, OBD_BRW_WRITE);
1137 /* ost_san_brw sends its own replies */
1140 CDEBUG(D_INODE, "punch\n");
1141 OBD_FAIL_RETURN(OBD_FAIL_OST_PUNCH_NET, 0);
1142 if (OBD_FAIL_CHECK_ONCE(OBD_FAIL_OST_EROFS))
1143 GOTO(out, rc = -EROFS);
1144 rc = ost_punch(exp, req, oti);
1147 CDEBUG(D_INODE, "statfs\n");
1148 OBD_FAIL_RETURN(OBD_FAIL_OST_STATFS_NET, 0);
1149 rc = ost_statfs(req);
1152 CDEBUG(D_INODE, "sync\n");
1153 OBD_FAIL_RETURN(OBD_FAIL_OST_SYNC_NET, 0);
1154 rc = ost_sync(exp, req);
1157 DEBUG_REQ(D_INODE, req, "set_info");
1158 rc = ost_set_info(exp, req);
1161 DEBUG_REQ(D_INODE, req, "get_info");
1162 rc = ost_get_info(exp, req);
1165 DEBUG_REQ(D_INODE, req, "ping");
1166 rc = target_handle_ping(req);
1168 /* FIXME - just reply status */
1169 case LLOG_ORIGIN_CONNECT:
1170 DEBUG_REQ(D_INODE, req, "log connect\n");
1171 rc = ost_llog_handle_connect(exp, req);
1172 req->rq_status = rc;
1173 rc = lustre_pack_reply(req, 0, NULL, NULL);
1176 RETURN(ptlrpc_reply(req));
1177 case OBD_LOG_CANCEL:
1178 CDEBUG(D_INODE, "log cancel\n");
1179 OBD_FAIL_RETURN(OBD_FAIL_OBD_LOG_CANCEL_NET, 0);
1180 rc = llog_origin_handle_cancel(req);
1181 req->rq_status = rc;
1182 rc = lustre_pack_reply(req, 0, NULL, NULL);
1185 RETURN(ptlrpc_reply(req));
1187 CDEBUG(D_INODE, "enqueue\n");
1188 OBD_FAIL_RETURN(OBD_FAIL_LDLM_ENQUEUE, 0);
1189 rc = ldlm_handle_enqueue(req, ldlm_server_completion_ast,
1190 ldlm_server_blocking_ast,
1191 ldlm_server_glimpse_ast);
1192 fail = OBD_FAIL_OST_LDLM_REPLY_NET;
1195 CDEBUG(D_INODE, "convert\n");
1196 OBD_FAIL_RETURN(OBD_FAIL_LDLM_CONVERT, 0);
1197 rc = ldlm_handle_convert(req);
1200 CDEBUG(D_INODE, "cancel\n");
1201 OBD_FAIL_RETURN(OBD_FAIL_LDLM_CANCEL, 0);
1202 rc = ldlm_handle_cancel(req);
1204 case LDLM_BL_CALLBACK:
1205 case LDLM_CP_CALLBACK:
1206 CDEBUG(D_INODE, "callback\n");
1207 CERROR("callbacks should not happen on OST\n");
1210 CERROR("Unexpected opcode %d\n", req->rq_reqmsg->opc);
1211 req->rq_status = -ENOTSUPP;
1212 rc = ptlrpc_error(req);
1216 LASSERT(current->journal_info == NULL);
1219 /* If we're DISCONNECTing, the export_data is already freed */
1220 if (!rc && req->rq_reqmsg->opc != OST_DISCONNECT) {
1221 struct obd_device *obd = req->rq_export->exp_obd;
1222 if (!obd->obd_no_transno) {
1223 req->rq_repmsg->last_committed =
1224 obd->obd_last_committed;
1226 DEBUG_REQ(D_IOCTL, req,
1227 "not sending last_committed update");
1229 CDEBUG(D_INFO, "last_committed "LPU64", xid "LPX64"\n",
1230 obd->obd_last_committed, req->rq_xid);
1234 if (lustre_msg_get_flags(req->rq_reqmsg) & MSG_LAST_REPLAY) {
1235 struct obd_device *obd = req->rq_export->exp_obd;
1237 if (obd && obd->obd_recovering) {
1238 DEBUG_REQ(D_HA, req, "LAST_REPLAY, queuing reply");
1239 return target_queue_final_reply(req, rc);
1241 /* Lost a race with recovery; let the error path DTRT. */
1242 rc = req->rq_status = -ENOTCONN;
1246 oti_to_request(oti, req);
1248 target_send_reply(req, rc, fail);
1251 EXPORT_SYMBOL(ost_handle);
1253 int ost_attach(struct obd_device *dev, obd_count len, void *data)
1255 struct lprocfs_static_vars lvars;
1257 lprocfs_init_vars(ost,&lvars);
1258 return lprocfs_obd_attach(dev, lvars.obd_vars);
1261 int ost_detach(struct obd_device *dev)
1263 return lprocfs_obd_detach(dev);
1266 extern struct file_operations ost_stimes_fops;
1268 static int ost_setup(struct obd_device *obd, obd_count len, void *buf)
1270 struct ost_obd *ost = &obd->u.ost;
1274 rc = cleanup_group_info();
1278 rc = llog_start_commit_thread();
1282 lprocfs_obd_seq_create(obd, "service_times", 0444, &ost_stimes_fops,
1286 ptlrpc_init_svc(OST_NBUFS, OST_BUFSIZE, OST_MAXREQSIZE,
1287 OST_REQUEST_PORTAL, OSC_REPLY_PORTAL,
1289 obd->obd_proc_entry);
1290 if (ost->ost_service == NULL) {
1291 CERROR("failed to start service\n");
1295 rc = ptlrpc_start_n_threads(obd, ost->ost_service, OST_NUM_THREADS,
1298 GOTO(out_service, rc = -EINVAL);
1300 ost->ost_create_service =
1301 ptlrpc_init_svc(OST_NBUFS, OST_BUFSIZE, OST_MAXREQSIZE,
1302 OST_CREATE_PORTAL, OSC_REPLY_PORTAL,
1303 ost_handle, "ost_create",
1304 obd->obd_proc_entry);
1305 if (ost->ost_create_service == NULL) {
1306 CERROR("failed to start OST create service\n");
1307 GOTO(out_service, rc = -ENOMEM);
1311 spin_lock_init(&ost->ost_lock);
1312 ost->ost_service->srv_obddev = obd;
1314 rc = ptlrpc_start_n_threads(obd, ost->ost_create_service, 1,
1317 GOTO(out_create, rc = -EINVAL);
1322 ptlrpc_unregister_service(ost->ost_create_service);
1324 ptlrpc_unregister_service(ost->ost_service);
1328 static int ost_cleanup(struct obd_device *obd, int flags)
1330 struct ost_obd *ost = &obd->u.ost;
1334 spin_lock_bh(&obd->obd_processing_task_lock);
1335 if (obd->obd_recovering) {
1336 target_cancel_recovery_timer(obd);
1337 obd->obd_recovering = 0;
1339 spin_unlock_bh(&obd->obd_processing_task_lock);
1341 ptlrpc_stop_all_threads(ost->ost_service);
1342 ptlrpc_unregister_service(ost->ost_service);
1344 ptlrpc_stop_all_threads(ost->ost_create_service);
1345 ptlrpc_unregister_service(ost->ost_create_service);
1350 /* use obd ops to offer management infrastructure */
1351 static struct obd_ops ost_obd_ops = {
1352 .o_owner = THIS_MODULE,
1353 .o_attach = ost_attach,
1354 .o_detach = ost_detach,
1355 .o_setup = ost_setup,
1356 .o_cleanup = ost_cleanup,
1359 static int __init ost_init(void)
1361 struct lprocfs_static_vars lvars;
1364 lprocfs_init_vars(ost,&lvars);
1365 RETURN(class_register_type(&ost_obd_ops, NULL, lvars.module_vars,
1369 static void /*__exit*/ ost_exit(void)
1371 class_unregister_type(LUSTRE_OST_NAME);
1374 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
1375 MODULE_DESCRIPTION("Lustre Object Storage Target (OST) v0.01");
1376 MODULE_LICENSE("GPL");
1378 module_init(ost_init);
1379 module_exit(ost_exit);