1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
4 * Copyright (C) 2001-2003 Cluster File Systems, Inc.
5 * Author: Peter J. Braam <braam@clusterfs.com>
6 * Author: Phil Schwan <phil@clusterfs.com>
8 * This file is part of Lustre, http://www.lustre.org.
10 * Lustre is free software; you can redistribute it and/or
11 * modify it under the terms of version 2 of the GNU General Public
12 * License as published by the Free Software Foundation.
14 * Lustre is distributed in the hope that it will be useful,
15 * but WITHOUT ANY WARRANTY; without even the implied warranty of
16 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 * GNU General Public License for more details.
19 * You should have received a copy of the GNU General Public License
20 * along with Lustre; if not, write to the Free Software
21 * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
23 * Storage Target Handling functions
24 * Lustre Object Server Module (OST)
26 * This server is single threaded at present (but can easily be multi
27 * threaded). For testing and management it is treated as an
28 * obd_device, although it does not export a full OBD method table
29 * (the requests are coming in over the wire, so object target
30 * modules do not have a full method table.)
34 # define EXPORT_SYMTAB
36 #define DEBUG_SUBSYSTEM S_OST
38 #include <linux/module.h>
39 #include <linux/obd_ost.h>
40 #include <linux/lustre_net.h>
41 #include <linux/lustre_dlm.h>
42 #include <linux/lustre_export.h>
43 #include <linux/init.h>
44 #include <linux/lprocfs_status.h>
45 #include <linux/lustre_commit_confd.h>
46 #include <libcfs/list.h>
47 #include <linux/lustre_sec.h>
48 #include <linux/lustre_audit.h>
50 void oti_init(struct obd_trans_info *oti, struct ptlrpc_request *req)
54 memset(oti, 0, sizeof *oti);
55 oti->oti_nid = req->rq_peer.peer_id.nid;
56 if (req->rq_repmsg && req->rq_reqmsg != 0)
57 oti->oti_transno = req->rq_repmsg->transno;
60 void oti_to_request(struct obd_trans_info *oti, struct ptlrpc_request *req)
62 struct oti_req_ack_lock *ack_lock;
69 req->rq_repmsg->transno = oti->oti_transno;
71 /* XXX 4 == entries in oti_ack_locks??? */
72 for (ack_lock = oti->oti_ack_locks, i = 0; i < 4; i++, ack_lock++) {
75 /* XXX not even calling target_send_reply in some cases... */
76 ptlrpc_save_lock (req, &ack_lock->lock, ack_lock->mode);
80 static int ost_destroy(struct obd_export *exp, struct ptlrpc_request *req,
81 struct obd_trans_info *oti)
83 struct ost_body *body, *repbody;
84 int rc, size = sizeof(*body);
87 body = lustre_swab_reqbuf(req, 0, sizeof(*body), lustre_swab_ost_body);
91 rc = lustre_pack_reply(req, 1, &size, NULL);
95 if (body->oa.o_valid & OBD_MD_FLCOOKIE)
96 oti->oti_logcookies = obdo_logcookie(&body->oa);
97 repbody = lustre_msg_buf(req->rq_repmsg, 0, sizeof(*repbody));
98 memcpy(&repbody->oa, &body->oa, sizeof(body->oa));
99 req->rq_status = obd_destroy(exp, &body->oa, NULL, oti);
103 static int ost_getattr(struct obd_export *exp, struct ptlrpc_request *req)
105 struct ost_body *body, *repbody;
106 int rc, size = sizeof(*body);
109 body = lustre_swab_reqbuf(req, 0, sizeof(*body), lustre_swab_ost_body);
113 rc = lustre_pack_reply(req, 1, &size, NULL);
117 repbody = lustre_msg_buf (req->rq_repmsg, 0, sizeof(*repbody));
118 memcpy(&repbody->oa, &body->oa, sizeof(body->oa));
119 req->rq_status = obd_getattr(exp, &repbody->oa, NULL);
123 static int ost_statfs(struct ptlrpc_request *req)
125 struct obd_statfs *osfs;
126 int rc, size = sizeof(*osfs);
129 rc = lustre_pack_reply(req, 1, &size, NULL);
133 osfs = lustre_msg_buf(req->rq_repmsg, 0, sizeof(*osfs));
135 req->rq_status = obd_statfs(req->rq_export->exp_obd, osfs, jiffies-HZ);
136 if (req->rq_status != 0)
137 CERROR("ost: statfs failed: rc %d\n", req->rq_status);
142 static int ost_create(struct obd_export *exp, struct ptlrpc_request *req,
143 struct obd_trans_info *oti)
145 struct ost_body *body, *repbody;
146 int rc, size = sizeof(*repbody);
149 body = lustre_swab_reqbuf(req, 0, sizeof(*body), lustre_swab_ost_body);
153 rc = lustre_pack_reply(req, 1, &size, NULL);
157 repbody = lustre_msg_buf (req->rq_repmsg, 0, sizeof(*repbody));
158 memcpy(&repbody->oa, &body->oa, sizeof(body->oa));
159 oti->oti_logcookies = obdo_logcookie(&repbody->oa);
160 req->rq_status = obd_create(exp, &repbody->oa, NULL, 0, NULL, oti);
161 //obd_log_cancel(conn, NULL, 1, oti->oti_logcookies, 0);
165 static int ost_punch(struct obd_export *exp, struct ptlrpc_request *req,
166 struct obd_trans_info *oti)
168 struct ost_body *body, *repbody;
169 struct lustre_capa *capa = NULL;
170 int rc, size = sizeof(*repbody);
173 body = lustre_swab_reqbuf(req, 0, sizeof(*body), lustre_swab_ost_body);
177 if ((body->oa.o_valid & (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS)) !=
178 (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS))
181 if (body->oa.o_valid & OBD_MD_CAPA) {
182 capa = lustre_swab_reqbuf(req, 1, sizeof(*capa),
183 lustre_swab_lustre_capa);
185 CERROR("Missing/short capa\n");
190 rc = lustre_pack_reply(req, 1, &size, NULL);
194 repbody = lustre_msg_buf(req->rq_repmsg, 0, sizeof(*repbody));
195 memcpy(&repbody->oa, &body->oa, sizeof(body->oa));
196 req->rq_status = obd_punch(exp, &repbody->oa, NULL, repbody->oa.o_size,
197 repbody->oa.o_blocks, oti, capa);
201 static int ost_sync(struct obd_export *exp, struct ptlrpc_request *req)
203 struct ost_body *body, *repbody;
204 int rc, size = sizeof(*repbody);
207 body = lustre_swab_reqbuf(req, 0, sizeof(*body), lustre_swab_ost_body);
211 rc = lustre_pack_reply(req, 1, &size, NULL);
215 repbody = lustre_msg_buf(req->rq_repmsg, 0, sizeof(*repbody));
216 memcpy(&repbody->oa, &body->oa, sizeof(body->oa));
217 req->rq_status = obd_sync(exp, &repbody->oa, NULL, repbody->oa.o_size,
218 repbody->oa.o_blocks);
222 static int ost_setattr(struct obd_export *exp, struct ptlrpc_request *req,
223 struct obd_trans_info *oti)
225 struct ost_body *body, *repbody;
226 int rc, size = sizeof(*repbody);
229 body = lustre_swab_reqbuf(req, 0, sizeof(*body), lustre_swab_ost_body);
233 rc = lustre_pack_reply(req, 1, &size, NULL);
237 repbody = lustre_msg_buf(req->rq_repmsg, 0, sizeof(*repbody));
238 memcpy(&repbody->oa, &body->oa, sizeof(body->oa));
240 req->rq_status = obd_setattr(exp, &repbody->oa, NULL, oti, NULL);
244 static int ost_bulk_timeout(void *data)
247 /* We don't fail the connection here, because having the export
248 * killed makes the (vital) call to commitrw very sad.
253 static int get_per_page_niobufs(struct obd_ioobj *ioo, int nioo,
254 struct niobuf_remote *rnb, int nrnb,
255 struct niobuf_remote **pp_rnbp)
257 /* Copy a remote niobuf, splitting it into page-sized chunks
258 * and setting ioo[i].ioo_bufcnt accordingly */
259 struct niobuf_remote *pp_rnb;
266 /* first count and check the number of pages required */
267 for (i = 0; i < nioo; i++)
268 for (j = 0; j < ioo->ioo_bufcnt; j++, rnbidx++) {
269 obd_off offset = rnb[rnbidx].offset;
270 obd_off p0 = offset >> PAGE_SHIFT;
271 obd_off pn = (offset + rnb[rnbidx].len - 1)>>PAGE_SHIFT;
273 LASSERT(rnbidx < nrnb);
275 npages += (pn + 1 - p0);
277 if (rnb[rnbidx].len == 0) {
278 CERROR("zero len BRW: obj %d objid "LPX64
279 " buf %u\n", i, ioo[i].ioo_id, j);
283 rnb[rnbidx].offset <= rnb[rnbidx-1].offset) {
284 CERROR("unordered BRW: obj %d objid "LPX64
285 " buf %u offset "LPX64" <= "LPX64"\n",
286 i, ioo[i].ioo_id, j, rnb[rnbidx].offset,
292 LASSERT(rnbidx == nrnb);
294 if (npages == nrnb) { /* all niobufs are for single pages */
299 OBD_ALLOC(pp_rnb, sizeof(*pp_rnb) * npages);
303 /* now do the actual split */
305 for (i = 0; i < nioo; i++) {
308 for (j = 0; j < ioo[i].ioo_bufcnt; j++, rnbidx++) {
309 obd_off off = rnb[rnbidx].offset;
310 int nob = rnb[rnbidx].len;
312 LASSERT(rnbidx < nrnb);
314 obd_off poff = off & (PAGE_SIZE - 1);
315 int pnob = (poff + nob > PAGE_SIZE) ?
316 PAGE_SIZE - poff : nob;
318 LASSERT(page < npages);
319 pp_rnb[page].len = pnob;
320 pp_rnb[page].offset = off;
321 pp_rnb[page].flags = rnb[rnbidx].flags;
323 CDEBUG(0, " obj %d id "LPX64
324 "page %d(%d) "LPX64" for %d, flg %x\n",
325 i, ioo[i].ioo_id, obj_pages, page,
326 pp_rnb[page].offset, pp_rnb[page].len,
336 ioo[i].ioo_bufcnt = obj_pages;
338 LASSERT(page == npages);
344 static void free_per_page_niobufs (int npages, struct niobuf_remote *pp_rnb,
345 struct niobuf_remote *rnb)
347 if (pp_rnb == rnb) /* didn't allocate above */
350 OBD_FREE(pp_rnb, sizeof(*pp_rnb) * npages);
354 obd_count ost_checksum_bulk(struct ptlrpc_bulk_desc *desc)
357 struct ptlrpc_bulk_page *bp;
359 list_for_each_entry(bp, &desc->bd_page_list, bp_link) {
360 ost_checksum(&cksum, kmap(bp->bp_page) + bp->bp_pageoffset,
369 static void ost_stime_record(struct ptlrpc_request *req, struct timeval *start,
370 unsigned rw, unsigned phase)
372 struct obd_device *obd = req->rq_svc->srv_obddev;
374 int ind = rw *3 + phase;
376 if (obd && obd->obd_type && obd->obd_type->typ_name) {
377 if (!strcmp(obd->obd_type->typ_name, OBD_OST_DEVICENAME)) {
378 struct ost_obd *ost = NULL;
381 if (ind >= (sizeof(ost->ost_stimes) /
382 sizeof(ost->ost_stimes[0])))
384 do_gettimeofday(&stop);
386 spin_lock(&ost->ost_lock);
387 lprocfs_stime_record(&ost->ost_stimes[ind],&stop,start);
388 spin_unlock(&ost->ost_lock);
389 memcpy(start, &stop, sizeof(*start));
394 static int ost_brw_read(struct ptlrpc_request *req)
396 struct ptlrpc_bulk_desc *desc;
397 struct niobuf_remote *remote_nb;
398 struct niobuf_remote *pp_rnb;
399 struct niobuf_local *local_nb;
400 struct obd_ioobj *ioo;
401 struct ost_body *body, *repbody;
402 struct lustre_capa *capa = NULL;
403 struct l_wait_info lwi;
404 struct obd_trans_info oti = { 0 };
405 int size[1] = { sizeof(*body) };
412 struct timeval start;
415 if (OBD_FAIL_CHECK(OBD_FAIL_OST_BRW_READ_BULK))
416 GOTO(out, rc = -EIO);
418 OBD_FAIL_TIMEOUT(OBD_FAIL_OST_BRW_PAUSE_BULK | OBD_FAIL_ONCE,
419 (obd_timeout + 1) / 4);
421 body = lustre_swab_reqbuf(req, bufcnt++, sizeof(*body),
422 lustre_swab_ost_body);
424 CERROR("Missing/short ost_body\n");
425 GOTO(out, rc = -EFAULT);
428 ioo = lustre_swab_reqbuf(req, bufcnt++, sizeof(*ioo),
429 lustre_swab_obd_ioobj);
431 CERROR("Missing/short ioobj\n");
432 GOTO(out, rc = -EFAULT);
435 if (body->oa.o_valid & OBD_MD_CAPA) {
436 capa = lustre_swab_reqbuf(req, bufcnt++, sizeof(*capa),
437 lustre_swab_lustre_capa);
439 CERROR("Missing/short capa\n");
440 GOTO(out, rc = -EFAULT);
444 niocount = ioo->ioo_bufcnt;
445 remote_nb = lustre_swab_reqbuf(req, bufcnt++,
446 niocount * sizeof(*remote_nb),
447 lustre_swab_niobuf_remote);
448 if (remote_nb == NULL) {
449 CERROR("Missing/short niobuf\n");
450 GOTO(out, rc = -EFAULT);
452 if (lustre_msg_swabbed(req->rq_reqmsg)) { /* swab remaining niobufs */
453 for (i = 1; i < niocount; i++)
454 lustre_swab_niobuf_remote (&remote_nb[i]);
457 rc = lustre_pack_reply(req, 1, size, NULL);
461 /* FIXME all niobuf splitting should be done in obdfilter if needed */
462 /* CAVEAT EMPTOR this sets ioo->ioo_bufcnt to # pages */
463 npages = get_per_page_niobufs(ioo, 1, remote_nb, niocount, &pp_rnb);
465 GOTO(out, rc = npages);
467 OBD_ALLOC(local_nb, sizeof(*local_nb) * npages);
468 if (local_nb == NULL)
469 GOTO(out_pp_rnb, rc = -ENOMEM);
471 desc = ptlrpc_prep_bulk_exp (req, npages,
472 BULK_PUT_SOURCE, OST_BULK_PORTAL);
474 GOTO(out_local, rc = -ENOMEM);
476 do_gettimeofday(&start);
477 rc = obd_preprw(OBD_BRW_READ, req->rq_export, &body->oa, 1,
478 ioo, npages, pp_rnb, local_nb, &oti, capa);
479 ost_stime_record(req, &start, 0, 0);
483 /* We're finishing using body->oa as an input variable */
484 body->oa.o_valid = 0;
487 for (i = 0; i < npages; i++) {
488 int page_rc = local_nb[i].rc;
490 if (page_rc < 0) { /* error */
495 LASSERT(page_rc <= pp_rnb[i].len);
497 if (page_rc != 0) { /* some data! */
498 LASSERT (local_nb[i].page != NULL);
499 ptlrpc_prep_bulk_page(desc, local_nb[i].page,
500 pp_rnb[i].offset & (PAGE_SIZE-1),
504 if (page_rc != pp_rnb[i].len) { /* short read */
505 /* All subsequent pages should be 0 */
507 LASSERT(local_nb[i].rc == 0);
513 rc = ptlrpc_start_bulk_transfer(desc);
515 struct timeval tstart, now;
516 do_gettimeofday(&tstart);
517 lwi = LWI_TIMEOUT(obd_timeout * HZ / 3,
518 ost_bulk_timeout, desc);
519 rc = l_wait_event(desc->bd_waitq,
520 !ptlrpc_bulk_active(desc), &lwi);
521 LASSERT(rc == 0 || rc == -ETIMEDOUT);
522 do_gettimeofday(&now);
523 if (rc == -ETIMEDOUT) {
524 char cln_str[PTL_NALFMT_SIZE];
525 DEBUG_REQ(D_ERROR, req, "timeout on bulk PUT"
526 ", exp_conn_cnt = %u, real wait %us"
527 ", arrived %u.%u, served %u.%u",
528 req->rq_export->exp_conn_cnt,
529 (unsigned) (now.tv_sec - tstart.tv_sec),
530 (unsigned) req->rq_arrival_time.tv_sec,
531 (unsigned) req->rq_arrival_time.tv_usec,
532 (unsigned) req->rq_rpcd_start.tv_sec,
533 (unsigned) req->rq_rpcd_start.tv_usec);
534 CDEBUG(D_ERROR, "bulk PUT timeout: client %s\n",
535 ptlrpc_peernid2str(&req->rq_peer, cln_str));
536 ptlrpc_abort_bulk(desc);
537 } else if (!desc->bd_success ||
538 desc->bd_nob_transferred != desc->bd_nob) {
539 DEBUG_REQ(D_ERROR, req, "%s bulk PUT %d(%d)",
541 "truncated" : "network error on",
542 desc->bd_nob_transferred,
544 /* XXX should this be a different errno? */
548 DEBUG_REQ(D_ERROR, req, "bulk PUT failed: rc %d\n", rc);
550 comms_error = rc != 0;
553 ost_stime_record(req, &start, 0, 1);
554 /* Must commit after prep above in all cases */
555 rc = obd_commitrw(OBD_BRW_READ, req->rq_export, &body->oa, 1,
556 ioo, npages, local_nb, &oti, rc);
557 ost_stime_record(req, &start, 0, 2);
560 repbody = lustre_msg_buf(req->rq_repmsg, 0, sizeof(*repbody));
561 memcpy(&repbody->oa, &body->oa, sizeof(repbody->oa));
564 repbody->oa.o_cksum = ost_checksum_bulk(desc);
565 repbody->oa.o_valid |= OBD_MD_FLCKSUM;
570 ptlrpc_free_bulk(desc);
572 OBD_FREE(local_nb, sizeof(*local_nb) * npages);
574 free_per_page_niobufs(npages, pp_rnb, remote_nb);
578 req->rq_status = nob;
580 } else if (!comms_error) {
581 /* only reply if comms OK */
585 if (req->rq_reply_state != NULL) {
586 /* reply out callback would free */
587 lustre_free_reply_state (req->rq_reply_state);
589 if (req->rq_reqmsg->conn_cnt == req->rq_export->exp_conn_cnt) {
590 CERROR("bulk IO comms error: "
591 "evicting %s@%s id %s\n",
592 req->rq_export->exp_client_uuid.uuid,
593 req->rq_export->exp_connection->c_remote_uuid.uuid,
595 ptlrpc_fail_export(req->rq_export);
597 CERROR("ignoring bulk IO comms error: "
598 "client reconnected %s@%s id %s\n",
599 req->rq_export->exp_client_uuid.uuid,
600 req->rq_export->exp_connection->c_remote_uuid.uuid,
608 int ost_brw_write(struct ptlrpc_request *req, struct obd_trans_info *oti)
610 struct ptlrpc_bulk_desc *desc;
611 struct niobuf_remote *remote_nb;
612 struct niobuf_remote *pp_rnb;
613 struct niobuf_local *local_nb;
614 struct obd_ioobj *ioo;
615 struct lustre_capa *capa = NULL;
616 struct ost_body *body, *repbody;
617 struct l_wait_info lwi;
619 int size[2] = { sizeof(*body) };
620 int objcount, niocount, npages;
622 int rc, swab, i, j, bufcnt = 0;
623 struct timeval start;
626 if (OBD_FAIL_CHECK(OBD_FAIL_OST_BRW_WRITE_BULK))
627 GOTO(out, rc = -EIO);
629 /* pause before transaction has been started */
630 OBD_FAIL_TIMEOUT(OBD_FAIL_OST_BRW_PAUSE_BULK | OBD_FAIL_ONCE,
631 (obd_timeout + 1) / 4);
633 swab = lustre_msg_swabbed(req->rq_reqmsg);
634 body = lustre_swab_reqbuf(req, bufcnt++, sizeof(*body),
635 lustre_swab_ost_body);
637 CERROR("Missing/short ost_body\n");
638 GOTO(out, rc = -EFAULT);
641 LASSERT_REQSWAB(req, 1);
642 objcount = req->rq_reqmsg->buflens[1] / sizeof(*ioo);
644 CERROR("Missing/short ioobj\n");
645 GOTO(out, rc = -EFAULT);
647 ioo = lustre_msg_buf(req->rq_reqmsg, bufcnt++,
648 objcount * sizeof(*ioo));
649 LASSERT (ioo != NULL);
650 for (niocount = i = 0; i < objcount; i++) {
652 lustre_swab_obd_ioobj (&ioo[i]);
653 if (ioo[i].ioo_bufcnt == 0) {
654 CERROR("ioo[%d] has zero bufcnt\n", i);
655 GOTO(out, rc = -EFAULT);
657 niocount += ioo[i].ioo_bufcnt;
660 if (body->oa.o_valid & OBD_MD_CAPA) {
661 capa = lustre_swab_reqbuf(req, bufcnt++, sizeof(*capa),
662 lustre_swab_lustre_capa);
664 CERROR("Missing/short capa\n");
665 GOTO(out, rc = -EFAULT);
669 remote_nb = lustre_swab_reqbuf(req, bufcnt++,
670 niocount * sizeof(*remote_nb),
671 lustre_swab_niobuf_remote);
672 if (remote_nb == NULL) {
673 CERROR("Missing/short niobuf\n");
674 GOTO(out, rc = -EFAULT);
676 if (swab) { /* swab the remaining niobufs */
677 for (i = 1; i < niocount; i++)
678 lustre_swab_niobuf_remote (&remote_nb[i]);
681 size[1] = niocount * sizeof(*rcs);
682 rc = lustre_pack_reply(req, 2, size, NULL);
685 rcs = lustre_msg_buf(req->rq_repmsg, 1, niocount * sizeof(*rcs));
688 /* Do snap options here*/
689 rc = obd_do_cow(req->rq_export, ioo, objcount, remote_nb);
694 /* FIXME all niobuf splitting should be done in obdfilter if needed */
695 /* CAVEAT EMPTOR this sets ioo->ioo_bufcnt to # pages */
696 npages = get_per_page_niobufs(ioo, objcount,remote_nb,niocount,&pp_rnb);
698 GOTO(out, rc = npages);
700 OBD_ALLOC(local_nb, sizeof(*local_nb) * npages);
701 if (local_nb == NULL)
702 GOTO(out_pp_rnb, rc = -ENOMEM);
704 desc = ptlrpc_prep_bulk_exp (req, npages,
705 BULK_GET_SINK, OST_BULK_PORTAL);
707 GOTO(out_local, rc = -ENOMEM);
709 do_gettimeofday(&start);
710 rc = obd_preprw(OBD_BRW_WRITE, req->rq_export, &body->oa, objcount,
711 ioo, npages, pp_rnb, local_nb, oti, capa);
712 ost_stime_record(req, &start, 1, 0);
716 /* NB Having prepped, we must commit... */
718 for (i = 0; i < npages; i++)
719 ptlrpc_prep_bulk_page(desc, local_nb[i].page,
720 pp_rnb[i].offset & (PAGE_SIZE - 1),
723 rc = ptlrpc_start_bulk_transfer (desc);
725 struct timeval tstart, now;
726 do_gettimeofday(&tstart);
727 lwi = LWI_TIMEOUT(obd_timeout * HZ / 3,
728 ost_bulk_timeout, desc);
729 rc = l_wait_event(desc->bd_waitq, !ptlrpc_bulk_active(desc),
731 LASSERT(rc == 0 || rc == -ETIMEDOUT);
732 do_gettimeofday(&now);
733 if (rc == -ETIMEDOUT) {
734 char cln_str[PTL_NALFMT_SIZE];
735 DEBUG_REQ(D_ERROR, req, "timeout on bulk GET, "
736 "exp_conn_cnt = %u, real wait %us\n",
737 req->rq_export->exp_conn_cnt,
738 (unsigned) (now.tv_sec - tstart.tv_sec));
739 CDEBUG(D_ERROR, "bulk GET timeout: client %s\n",
740 ptlrpc_peernid2str(&req->rq_peer, cln_str));
741 ptlrpc_abort_bulk(desc);
742 } else if (!desc->bd_success ||
743 desc->bd_nob_transferred != desc->bd_nob) {
744 DEBUG_REQ(D_ERROR, req, "%s bulk GET %d(%d)",
746 "truncated" : "network error on",
747 desc->bd_nob_transferred, desc->bd_nob);
748 /* XXX should this be a different errno? */
752 DEBUG_REQ(D_ERROR, req, "ptlrpc_bulk_get failed: rc %d\n", rc);
754 comms_error = rc != 0;
756 repbody = lustre_msg_buf(req->rq_repmsg, 0, sizeof(*repbody));
757 memcpy(&repbody->oa, &body->oa, sizeof(repbody->oa));
760 if (rc == 0 && (body->oa.o_valid & OBD_MD_FLCKSUM) != 0) {
761 static int cksum_counter;
762 obd_count client_cksum = body->oa.o_cksum;
763 obd_count cksum = ost_checksum_bulk(desc);
765 if (client_cksum != cksum) {
766 CERROR("Bad checksum: client %x, server %x id %s\n",
770 repbody->oa.o_cksum = cksum;
773 if ((cksum_counter & (-cksum_counter)) == cksum_counter)
774 CWARN("Checksum %u from NID %s: %x OK\n",
775 cksum_counter, req->rq_peerstr, cksum);
779 ost_stime_record(req, &start, 1, 1);
780 /* Must commit after prep above in all cases */
781 rc = obd_commitrw(OBD_BRW_WRITE, req->rq_export, &repbody->oa,
782 objcount, ioo, npages, local_nb, oti, rc);
784 ost_stime_record(req, &start, 1, 2);
787 repbody->oa.o_cksum = ost_checksum_bulk(desc);
788 repbody->oa.o_valid |= OBD_MD_FLCKSUM;
790 /* set per-requested niobuf return codes */
791 for (i = j = 0; i < niocount; i++) {
792 int nob = remote_nb[i].len;
797 if (local_nb[j].rc < 0)
798 rcs[i] = local_nb[j].rc;
799 nob -= pp_rnb[j].len;
804 LASSERT(j == npages);
806 /*XXX This write extents only for write-back cache extents*/
807 rc = obd_write_extents(req->rq_export, ioo, objcount, niocount,
810 ptlrpc_free_bulk(desc);
812 OBD_FREE(local_nb, sizeof(*local_nb) * npages);
814 free_per_page_niobufs(npages, pp_rnb, remote_nb);
817 oti_to_request(oti, req);
818 rc = ptlrpc_reply(req);
819 } else if (!comms_error) {
820 /* Only reply if there was no comms problem with bulk */
824 if (req->rq_reply_state != NULL) {
825 /* reply out callback would free */
826 lustre_free_reply_state (req->rq_reply_state);
828 if (req->rq_reqmsg->conn_cnt == req->rq_export->exp_conn_cnt) {
829 CERROR("%s: bulk IO comm error evicting %s@%s id %s\n",
830 req->rq_export->exp_obd->obd_name,
831 req->rq_export->exp_client_uuid.uuid,
832 req->rq_export->exp_connection->c_remote_uuid.uuid,
834 ptlrpc_fail_export(req->rq_export);
836 CERROR("ignoring bulk IO comms error: "
837 "client reconnected %s@%s id %s\n",
838 req->rq_export->exp_client_uuid.uuid,
839 req->rq_export->exp_connection->c_remote_uuid.uuid,
845 EXPORT_SYMBOL(ost_brw_write);
847 static int ost_san_brw(struct ptlrpc_request *req, int cmd)
849 struct niobuf_remote *remote_nb, *res_nb, *pp_rnb;
850 struct obd_ioobj *ioo;
851 struct ost_body *body, *repbody;
852 int rc, i, objcount, niocount, size[2] = {sizeof(*body)}, npages;
856 /* XXX not set to use latest protocol */
858 swab = lustre_msg_swabbed(req->rq_reqmsg);
859 body = lustre_swab_reqbuf(req, 0, sizeof(*body), lustre_swab_ost_body);
861 CERROR("Missing/short ost_body\n");
862 GOTO(out, rc = -EFAULT);
865 ioo = lustre_swab_reqbuf(req, 1, sizeof(*ioo), lustre_swab_obd_ioobj);
867 CERROR("Missing/short ioobj\n");
868 GOTO(out, rc = -EFAULT);
870 objcount = req->rq_reqmsg->buflens[1] / sizeof(*ioo);
871 niocount = ioo[0].ioo_bufcnt;
872 for (i = 1; i < objcount; i++) {
874 lustre_swab_obd_ioobj (&ioo[i]);
875 niocount += ioo[i].ioo_bufcnt;
878 remote_nb = lustre_swab_reqbuf(req, 2, niocount * sizeof(*remote_nb),
879 lustre_swab_niobuf_remote);
880 if (remote_nb == NULL) {
881 CERROR("Missing/short niobuf\n");
882 GOTO(out, rc = -EFAULT);
884 if (swab) { /* swab the remaining niobufs */
885 for (i = 1; i < niocount; i++)
886 lustre_swab_niobuf_remote (&remote_nb[i]);
889 /* CAVEAT EMPTOR this sets ioo->ioo_bufcnt to # pages */
890 npages = get_per_page_niobufs(ioo, objcount,remote_nb,niocount,&pp_rnb);
892 GOTO (out, rc = npages);
894 size[1] = npages * sizeof(*pp_rnb);
895 rc = lustre_pack_reply(req, 2, size, NULL);
897 GOTO(out_pp_rnb, rc);
899 req->rq_status = obd_san_preprw(cmd, req->rq_export, &body->oa,
900 objcount, ioo, npages, pp_rnb);
903 GOTO(out_pp_rnb, rc = 0);
905 repbody = lustre_msg_buf(req->rq_repmsg, 0, sizeof(*repbody));
906 memcpy(&repbody->oa, &body->oa, sizeof(body->oa));
908 res_nb = lustre_msg_buf(req->rq_repmsg, 1, size[1]);
909 memcpy(res_nb, remote_nb, size[1]);
912 free_per_page_niobufs(npages, pp_rnb, remote_nb);
923 static int ost_set_info(struct obd_export *exp, struct ptlrpc_request *req)
929 key = lustre_msg_buf(req->rq_reqmsg, 0, 1);
931 DEBUG_REQ(D_HA, req, "no set_info key");
934 keylen = req->rq_reqmsg->buflens[0];
936 rc = lustre_pack_reply(req, 0, NULL, NULL);
940 val = lustre_msg_buf(req->rq_reqmsg, 1, 0);
942 CERROR("val for setinfo can't be NULL\n");
946 if (keylen == 8 && memcmp(key, "auditlog", 8) == 0) {
947 lustre_swab_reqbuf(req, 1, sizeof(struct audit_msg),
948 lustre_swab_audit_msg);
949 } else if (keylen == 5 && strcmp(key, "audit") == 0) {
950 lustre_swab_reqbuf(req, 1, sizeof(struct audit_attr_msg),
951 lustre_swab_audit_attr);
952 } else if (keylen == 9 && strcmp(key, "audit_obj") == 0) {
953 lustre_swab_reqbuf(req, 1, sizeof(struct obdo),
955 } else if (keylen == 8 && memcmp(key, "capa_key", 8) == 0) {
956 lustre_swab_reqbuf(req, 1, sizeof(struct lustre_capa_key),
957 lustre_swab_lustre_capa_key);
960 rc = obd_set_info(exp, keylen, key, req->rq_reqmsg->buflens[1], val);
961 req->rq_repmsg->status = 0;
965 static int ost_get_info(struct obd_export *exp, struct ptlrpc_request *req)
968 int keylen, rc = 0, size = sizeof(obd_id);
972 key = lustre_msg_buf(req->rq_reqmsg, 0, 1);
974 DEBUG_REQ(D_HA, req, "no get_info key");
977 keylen = req->rq_reqmsg->buflens[0];
979 if (keylen < strlen("last_id") || memcmp(key, "last_id", 7) != 0)
982 rc = lustre_pack_reply(req, 1, &size, NULL);
986 reply = lustre_msg_buf(req->rq_repmsg, 0, sizeof(*reply));
987 rc = obd_get_info(exp, keylen, key, (__u32 *)&size, reply);
988 req->rq_repmsg->status = 0;
992 static int ost_llog_handle_connect(struct obd_export *exp,
993 struct ptlrpc_request *req)
995 struct llogd_conn_body *body;
999 body = lustre_msg_buf(req->rq_reqmsg, 0, sizeof(*body));
1000 rc = obd_llog_connect(exp, body);
1004 static int ost_filter_recovery_request(struct ptlrpc_request *req,
1005 struct obd_device *obd, int *process)
1007 switch (req->rq_reqmsg->opc) {
1008 case OST_CONNECT: /* This will never get here, but for completeness. */
1009 case OST_DISCONNECT:
1020 case OBD_LOG_CANCEL:
1022 *process = target_queue_recovery_request(req, obd);
1026 DEBUG_REQ(D_ERROR, req, "not permitted during recovery");
1028 /* XXX what should we set rq_status to here? */
1029 req->rq_status = -EAGAIN;
1030 RETURN(ptlrpc_error(req));
1034 int ost_msg_check_version(struct lustre_msg *msg)
1040 case OST_DISCONNECT:
1055 rc = lustre_msg_check_version(msg, LUSTRE_OBD_VERSION);
1057 CERROR("bad opc %u version %08x, expecting %08x\n",
1058 msg->opc, msg->version, LUSTRE_OBD_VERSION);
1063 case LDLM_BL_CALLBACK:
1064 case LDLM_CP_CALLBACK:
1065 rc = lustre_msg_check_version(msg, LUSTRE_DLM_VERSION);
1067 CERROR("bad opc %u version %08x, expecting %08x\n",
1068 msg->opc, msg->version, LUSTRE_DLM_VERSION);
1070 case OBD_LOG_CANCEL:
1071 case LLOG_ORIGIN_CONNECT:
1072 rc = lustre_msg_check_version(msg, LUSTRE_LOG_VERSION);
1074 CERROR("bad opc %u version %08x, expecting %08x\n",
1075 msg->opc, msg->version, LUSTRE_LOG_VERSION);
1078 case SEC_INIT_CONTINUE:
1083 CERROR("OST unexpected opcode %d\n", msg->opc);
1090 int ost_handle(struct ptlrpc_request *req)
1092 int should_process, fail = OBD_FAIL_OST_ALL_REPLY_NET, rc = 0;
1093 struct obd_trans_info *oti = NULL;
1094 struct obd_device *obd = NULL;
1097 LASSERT(current->journal_info == NULL);
1099 rc = ost_msg_check_version(req->rq_reqmsg);
1101 CERROR("OST drop mal-formed request\n");
1105 /* Security opc should NOT trigger any recovery events */
1106 if (req->rq_reqmsg->opc == SEC_INIT ||
1107 req->rq_reqmsg->opc == SEC_INIT_CONTINUE ||
1108 req->rq_reqmsg->opc == SEC_FINI) {
1109 GOTO(out_check_req, rc = 0);
1112 /* XXX identical to MDS */
1113 if (req->rq_reqmsg->opc != OST_CONNECT) {
1116 if (req->rq_export == NULL) {
1117 CDEBUG(D_HA,"operation %d on unconnected OST from %s\n",
1118 req->rq_reqmsg->opc,
1120 req->rq_status = -ENOTCONN;
1121 GOTO(out_check_req, rc = -ENOTCONN);
1124 obd = req->rq_export->exp_obd;
1126 /* Check for aborted recovery. */
1127 spin_lock_bh(&obd->obd_processing_task_lock);
1128 recovering = obd->obd_recovering;
1129 spin_unlock_bh(&obd->obd_processing_task_lock);
1131 rc = ost_filter_recovery_request(req, obd,
1133 if (rc || !should_process)
1135 if (should_process < 0) {
1136 req->rq_status = should_process;
1137 rc = ptlrpc_error(req);
1143 OBD_ALLOC(oti, sizeof(*oti));
1149 switch (req->rq_reqmsg->opc) {
1151 CDEBUG(D_INODE, "connect\n");
1152 OBD_FAIL_GOTO(OBD_FAIL_OST_CONNECT_NET, out_free_oti, rc = 0);
1153 rc = target_handle_connect(req);
1155 obd = req->rq_export->exp_obd;
1158 case OST_DISCONNECT:
1159 CDEBUG(D_INODE, "disconnect\n");
1160 OBD_FAIL_GOTO(OBD_FAIL_OST_DISCONNECT_NET, out_free_oti, rc = 0);
1161 rc = target_handle_disconnect(req);
1164 CDEBUG(D_INODE, "create\n");
1165 OBD_FAIL_GOTO(OBD_FAIL_OST_ENOSPC, out_check_req, rc = -ENOSPC);
1166 OBD_FAIL_GOTO(OBD_FAIL_OST_EROFS, out_check_req, rc = -EROFS);
1167 OBD_FAIL_GOTO(OBD_FAIL_OST_CREATE_NET, out_free_oti, rc = 0);
1168 rc = ost_create(req->rq_export, req, oti);
1171 CDEBUG(D_INODE, "destroy\n");
1172 OBD_FAIL_GOTO(OBD_FAIL_OST_DESTROY_NET, out_free_oti, rc = 0);
1173 OBD_FAIL_GOTO(OBD_FAIL_OST_EROFS, out_check_req, rc = -EROFS);
1174 rc = ost_destroy(req->rq_export, req, oti);
1177 CDEBUG(D_INODE, "getattr\n");
1178 OBD_FAIL_GOTO(OBD_FAIL_OST_GETATTR_NET, out_free_oti, rc = 0);
1179 rc = ost_getattr(req->rq_export, req);
1182 CDEBUG(D_INODE, "setattr\n");
1183 OBD_FAIL_GOTO(OBD_FAIL_OST_SETATTR_NET, out_free_oti, rc = 0);
1184 rc = ost_setattr(req->rq_export, req, oti);
1187 CDEBUG(D_INODE, "write\n");
1188 OBD_FAIL_GOTO(OBD_FAIL_OST_BRW_NET, out_free_oti, rc = 0);
1189 OBD_FAIL_GOTO(OBD_FAIL_OST_ENOSPC, out_check_req, rc = -ENOSPC);
1190 OBD_FAIL_GOTO(OBD_FAIL_OST_EROFS, out_check_req, rc = -EROFS);
1191 rc = ost_brw_write(req, oti);
1192 LASSERT(current->journal_info == NULL);
1193 /* ost_brw sends its own replies */
1194 GOTO(out_free_oti, rc);
1196 CDEBUG(D_INODE, "read\n");
1197 OBD_FAIL_GOTO(OBD_FAIL_OST_BRW_NET, out_free_oti, rc = 0);
1198 rc = ost_brw_read(req);
1199 LASSERT(current->journal_info == NULL);
1200 /* ost_brw sends its own replies */
1201 GOTO(out_free_oti, rc);
1203 CDEBUG(D_INODE, "san read\n");
1204 OBD_FAIL_GOTO(OBD_FAIL_OST_BRW_NET, out_free_oti, rc = 0);
1205 rc = ost_san_brw(req, OBD_BRW_READ);
1206 /* ost_san_brw sends its own replies */
1207 GOTO(out_free_oti, rc);
1209 CDEBUG(D_INODE, "san write\n");
1210 OBD_FAIL_GOTO(OBD_FAIL_OST_BRW_NET, out_free_oti, rc = 0);
1211 rc = ost_san_brw(req, OBD_BRW_WRITE);
1212 /* ost_san_brw sends its own replies */
1213 GOTO(out_free_oti, rc);
1215 CDEBUG(D_INODE, "punch\n");
1216 OBD_FAIL_GOTO(OBD_FAIL_OST_PUNCH_NET, out_free_oti, rc = 0);
1217 OBD_FAIL_GOTO(OBD_FAIL_OST_EROFS, out_check_req, rc = -EROFS);
1218 rc = ost_punch(req->rq_export, req, oti);
1221 CDEBUG(D_INODE, "statfs\n");
1222 OBD_FAIL_GOTO(OBD_FAIL_OST_STATFS_NET, out_free_oti, rc = 0);
1223 rc = ost_statfs(req);
1226 CDEBUG(D_INODE, "sync\n");
1227 OBD_FAIL_GOTO(OBD_FAIL_OST_SYNC_NET, out_free_oti, rc = 0);
1228 rc = ost_sync(req->rq_export, req);
1231 DEBUG_REQ(D_INODE, req, "set_info");
1232 rc = ost_set_info(req->rq_export, req);
1235 DEBUG_REQ(D_INODE, req, "get_info");
1236 rc = ost_get_info(req->rq_export, req);
1239 DEBUG_REQ(D_INODE, req, "ping");
1240 rc = target_handle_ping(req);
1242 /* FIXME - just reply status */
1243 case LLOG_ORIGIN_CONNECT:
1244 DEBUG_REQ(D_INODE, req, "log connect\n");
1245 rc = ost_llog_handle_connect(req->rq_export, req);
1246 req->rq_status = rc;
1247 rc = lustre_pack_reply(req, 0, NULL, NULL);
1249 GOTO(out_free_oti, rc);
1250 GOTO(out_free_oti, rc = ptlrpc_reply(req));
1251 case OBD_LOG_CANCEL:
1252 CDEBUG(D_INODE, "log cancel\n");
1253 OBD_FAIL_GOTO(OBD_FAIL_OBD_LOG_CANCEL_NET, out_free_oti, rc = 0);
1254 rc = llog_origin_handle_cancel(req);
1255 req->rq_status = rc;
1256 rc = lustre_pack_reply(req, 0, NULL, NULL);
1258 GOTO(out_free_oti, rc);
1259 GOTO(out_free_oti, rc = ptlrpc_reply(req));
1261 CDEBUG(D_INODE, "enqueue\n");
1262 OBD_FAIL_GOTO(OBD_FAIL_LDLM_ENQUEUE, out_free_oti, rc = 0);
1263 rc = ldlm_handle_enqueue(req, ldlm_server_completion_ast,
1264 ldlm_server_blocking_ast,
1265 ldlm_server_glimpse_ast);
1266 fail = OBD_FAIL_OST_LDLM_REPLY_NET;
1269 CDEBUG(D_INODE, "convert\n");
1270 OBD_FAIL_GOTO(OBD_FAIL_LDLM_CONVERT, out_free_oti, rc = 0);
1271 rc = ldlm_handle_convert(req);
1274 CDEBUG(D_INODE, "cancel\n");
1275 OBD_FAIL_GOTO(OBD_FAIL_LDLM_CANCEL, out_free_oti, rc = 0);
1276 rc = ldlm_handle_cancel(req);
1278 case LDLM_BL_CALLBACK:
1279 case LDLM_CP_CALLBACK:
1280 CDEBUG(D_INODE, "callback\n");
1281 CERROR("callbacks should not happen on OST\n");
1284 CERROR("Unexpected opcode %d\n", req->rq_reqmsg->opc);
1285 req->rq_status = -ENOTSUPP;
1286 rc = ptlrpc_error(req);
1287 GOTO(out_free_oti, rc);
1290 LASSERT(current->journal_info == NULL);
1293 /* If we're DISCONNECTing, the export_data is already freed */
1294 if (!rc && req->rq_reqmsg->opc != OST_DISCONNECT) {
1295 if (!obd->obd_no_transno) {
1296 req->rq_repmsg->last_committed =
1297 obd->obd_last_committed;
1299 DEBUG_REQ(D_IOCTL, req,
1300 "not sending last_committed update");
1302 CDEBUG(D_INFO, "last_committed "LPU64", xid "LPX64"\n",
1303 obd->obd_last_committed, req->rq_xid);
1309 oti_to_request(oti, req);
1310 target_send_reply(req, rc, fail);
1315 OBD_FREE(oti, sizeof(*oti));
1318 EXPORT_SYMBOL(ost_handle);
1320 int ost_attach(struct obd_device *dev, obd_count len, void *data)
1322 struct lprocfs_static_vars lvars;
1324 lprocfs_init_vars(ost,&lvars);
1325 return lprocfs_obd_attach(dev, lvars.obd_vars);
1328 int ost_detach(struct obd_device *dev)
1330 return lprocfs_obd_detach(dev);
1333 extern struct file_operations ost_stimes_fops;
1335 static int ost_setup(struct obd_device *obd, obd_count len, void *buf)
1337 struct ost_obd *ost = &obd->u.ost;
1341 rc = cleanup_group_info();
1345 rc = llog_start_commit_thread();
1349 lprocfs_obd_seq_create(obd, "service_times", 0444, &ost_stimes_fops,
1353 ptlrpc_init_svc(OST_NBUFS, OST_BUFSIZE, OST_MAXREQSIZE,
1354 OST_REQUEST_PORTAL, OSC_REPLY_PORTAL, 30000,
1356 obd->obd_proc_entry);
1357 if (ost->ost_service == NULL) {
1358 CERROR("failed to start service\n");
1362 rc = ptlrpc_start_n_threads(obd, ost->ost_service, OST_NUM_THREADS,
1365 GOTO(out_service, rc = -EINVAL);
1367 ost->ost_create_service =
1368 ptlrpc_init_svc(OST_NBUFS, OST_BUFSIZE, OST_MAXREQSIZE,
1369 OST_CREATE_PORTAL, OSC_REPLY_PORTAL, 30000,
1370 ost_handle, "ost_create",
1371 obd->obd_proc_entry);
1372 if (ost->ost_create_service == NULL) {
1373 CERROR("failed to start OST create service\n");
1374 GOTO(out_service, rc = -ENOMEM);
1378 spin_lock_init(&ost->ost_lock);
1379 ost->ost_service->srv_obddev = obd;
1381 rc = ptlrpc_start_n_threads(obd, ost->ost_create_service, 1,
1384 GOTO(out_create, rc = -EINVAL);
1386 ost->ost_destroy_service =
1387 ptlrpc_init_svc(OST_NBUFS, OST_BUFSIZE, OST_MAXREQSIZE,
1388 OST_DESTROY_PORTAL, OSC_REPLY_PORTAL, 30000,
1389 ost_handle, "ost_destroy",
1390 obd->obd_proc_entry);
1391 if (ost->ost_destroy_service == NULL) {
1392 CERROR("failed to start service\n");
1393 GOTO(out_create, rc = -ENOMEM);
1396 rc = ptlrpc_start_n_threads(obd, ost->ost_destroy_service,
1397 OST_NUM_THREADS, "ll_dstr_ost");
1399 GOTO(out_destroy, rc = -EINVAL);
1404 ptlrpc_unregister_service(ost->ost_destroy_service);
1406 ptlrpc_unregister_service(ost->ost_create_service);
1408 ptlrpc_unregister_service(ost->ost_service);
1412 extern void lgss_svc_cache_purge_all(void);
1413 static int ost_cleanup(struct obd_device *obd, int flags)
1415 struct ost_obd *ost = &obd->u.ost;
1419 spin_lock_bh(&obd->obd_processing_task_lock);
1420 if (obd->obd_recovering) {
1421 target_cancel_recovery_timer(obd);
1422 obd->obd_recovering = 0;
1424 spin_unlock_bh(&obd->obd_processing_task_lock);
1426 ptlrpc_stop_all_threads(ost->ost_service);
1427 ptlrpc_unregister_service(ost->ost_service);
1429 ptlrpc_stop_all_threads(ost->ost_create_service);
1430 ptlrpc_unregister_service(ost->ost_create_service);
1432 ptlrpc_stop_all_threads(ost->ost_destroy_service);
1433 ptlrpc_unregister_service(ost->ost_destroy_service);
1437 lgss_svc_cache_purge_all();
1442 /* use obd ops to offer management infrastructure */
1443 static struct obd_ops ost_obd_ops = {
1444 .o_owner = THIS_MODULE,
1445 .o_attach = ost_attach,
1446 .o_detach = ost_detach,
1447 .o_setup = ost_setup,
1448 .o_cleanup = ost_cleanup,
1451 static int __init ost_init(void)
1453 struct lprocfs_static_vars lvars;
1456 lprocfs_init_vars(ost,&lvars);
1457 RETURN(class_register_type(&ost_obd_ops, NULL, lvars.module_vars,
1458 OBD_OST_DEVICENAME));
1461 static void /*__exit*/ ost_exit(void)
1463 class_unregister_type(OBD_OST_DEVICENAME);
1466 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
1467 MODULE_DESCRIPTION("Lustre Object Storage Target (OST) v0.01");
1468 MODULE_LICENSE("GPL");
1470 module_init(ost_init);
1471 module_exit(ost_exit);