1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
4 * Copyright (C) 2001-2003 Cluster File Systems, Inc.
5 * Author: Peter J. Braam <braam@clusterfs.com>
6 * Author: Phil Schwan <phil@clusterfs.com>
8 * This file is part of Lustre, http://www.lustre.org.
10 * Lustre is free software; you can redistribute it and/or
11 * modify it under the terms of version 2 of the GNU General Public
12 * License as published by the Free Software Foundation.
14 * Lustre is distributed in the hope that it will be useful,
15 * but WITHOUT ANY WARRANTY; without even the implied warranty of
16 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 * GNU General Public License for more details.
19 * You should have received a copy of the GNU General Public License
20 * along with Lustre; if not, write to the Free Software
21 * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
23 * Storage Target Handling functions
24 * Lustre Object Server Module (OST)
26 * This server is single threaded at present (but can easily be multi
27 * threaded). For testing and management it is treated as an
28 * obd_device, although it does not export a full OBD method table
29 * (the requests are coming in over the wire, so object target
30 * modules do not have a full method table.)
34 #define DEBUG_SUBSYSTEM S_OST
36 #include <linux/module.h>
37 #include <linux/obd_ost.h>
38 #include <linux/lustre_net.h>
39 #include <linux/lustre_dlm.h>
40 #include <linux/lustre_export.h>
41 #include <linux/init.h>
42 #include <linux/lprocfs_status.h>
44 inline void oti_to_request(struct obd_trans_info *oti, struct ptlrpc_request *req)
46 if (oti && req->rq_repmsg)
47 req->rq_repmsg->transno = HTON__u64(oti->oti_transno);
51 static int ost_destroy(struct ptlrpc_request *req, struct obd_trans_info *oti)
53 struct lustre_handle *conn = (struct lustre_handle *)req->rq_reqmsg;
54 struct ost_body *body;
55 int rc, size = sizeof(*body);
58 body = lustre_msg_buf(req->rq_reqmsg, 0);
60 rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
64 req->rq_status = obd_destroy(conn, &body->oa, NULL, oti);
68 static int ost_getattr(struct ptlrpc_request *req)
70 struct lustre_handle *conn = (struct lustre_handle *)req->rq_reqmsg;
71 struct ost_body *body, *repbody;
72 int rc, size = sizeof(*body);
75 body = lustre_msg_buf(req->rq_reqmsg, 0);
77 rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
81 repbody = lustre_msg_buf(req->rq_repmsg, 0);
82 /* FIXME: unpack only valid fields instead of memcpy, endianness */
83 memcpy(&repbody->oa, &body->oa, sizeof(body->oa));
84 req->rq_status = obd_getattr(conn, &repbody->oa, NULL);
88 static int ost_statfs(struct ptlrpc_request *req)
90 struct lustre_handle *conn = (struct lustre_handle *)req->rq_reqmsg;
91 struct obd_statfs *osfs;
92 int rc, size = sizeof(*osfs);
95 rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
99 osfs = lustre_msg_buf(req->rq_repmsg, 0);
100 memset(osfs, 0, size);
102 rc = obd_statfs(conn, osfs);
104 CERROR("ost: statfs failed: rc %d\n", rc);
108 obd_statfs_pack(osfs, osfs);
113 static int ost_syncfs(struct ptlrpc_request *req)
115 struct lustre_handle *conn = (struct lustre_handle *)req->rq_reqmsg;
116 struct obd_statfs *osfs;
117 int rc, size = sizeof(*osfs);
120 rc = lustre_pack_msg(0, &size, NULL, &req->rq_replen, &req->rq_repmsg);
124 rc = obd_syncfs(conn);
126 CERROR("ost: syncfs failed: rc %d\n", rc);
134 static int ost_open(struct ptlrpc_request *req, struct obd_trans_info *oti)
136 struct lustre_handle *conn = (struct lustre_handle *)req->rq_reqmsg;
137 struct ost_body *body, *repbody;
138 int rc, size = sizeof(*body);
141 body = lustre_msg_buf(req->rq_reqmsg, 0);
143 rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
147 repbody = lustre_msg_buf(req->rq_repmsg, 0);
148 /* FIXME: unpack only valid fields instead of memcpy, endianness */
149 memcpy(&repbody->oa, &body->oa, sizeof(body->oa));
150 req->rq_status = obd_open(conn, &repbody->oa, NULL, oti);
154 static int ost_close(struct ptlrpc_request *req, struct obd_trans_info *oti)
156 struct lustre_handle *conn = (struct lustre_handle *)req->rq_reqmsg;
157 struct ost_body *body, *repbody;
158 int rc, size = sizeof(*body);
161 body = lustre_msg_buf(req->rq_reqmsg, 0);
163 rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
167 repbody = lustre_msg_buf(req->rq_repmsg, 0);
168 /* FIXME: unpack only valid fields instead of memcpy, endianness */
169 memcpy(&repbody->oa, &body->oa, sizeof(body->oa));
170 req->rq_status = obd_close(conn, &repbody->oa, NULL, oti);
174 static int ost_create(struct ptlrpc_request *req, struct obd_trans_info *oti)
176 struct lustre_handle *conn = (struct lustre_handle *)req->rq_reqmsg;
177 struct ost_body *body, *repbody;
178 int rc, size = sizeof(*body);
181 body = lustre_msg_buf(req->rq_reqmsg, 0);
183 rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
187 repbody = lustre_msg_buf(req->rq_repmsg, 0);
188 /* FIXME: unpack only valid fields instead of memcpy, endianness */
189 memcpy(&repbody->oa, &body->oa, sizeof(body->oa));
190 req->rq_status = obd_create(conn, &repbody->oa, NULL, oti);
194 static int ost_punch(struct ptlrpc_request *req, struct obd_trans_info *oti)
196 struct lustre_handle *conn = (struct lustre_handle *)req->rq_reqmsg;
197 struct ost_body *body, *repbody;
198 int rc, size = sizeof(*body);
201 body = lustre_msg_buf(req->rq_reqmsg, 0);
203 if ((NTOH__u32(body->oa.o_valid) & (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS))!=
204 (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS))
207 rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
211 repbody = lustre_msg_buf(req->rq_repmsg, 0);
212 /* FIXME: unpack only valid fields instead of memcpy, endianness */
213 memcpy(&repbody->oa, &body->oa, sizeof(body->oa));
214 req->rq_status = obd_punch(conn, &repbody->oa, NULL,
215 repbody->oa.o_size, repbody->oa.o_blocks, oti);
219 static int ost_setattr(struct ptlrpc_request *req, struct obd_trans_info *oti)
221 struct lustre_handle *conn = (struct lustre_handle *)req->rq_reqmsg;
222 struct ost_body *body, *repbody;
223 int rc, size = sizeof(*body);
226 body = lustre_msg_buf(req->rq_reqmsg, 0);
228 rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
232 repbody = lustre_msg_buf(req->rq_repmsg, 0);
233 /* FIXME: unpack only valid fields instead of memcpy, endianness */
234 memcpy(&repbody->oa, &body->oa, sizeof(body->oa));
235 req->rq_status = obd_setattr(conn, &repbody->oa, NULL, oti);
239 static int ost_bulk_timeout(void *data)
242 /* We don't fail the connection here, because having the export
243 * killed makes the (vital) call to commitrw very sad.
248 static int ost_brw_read(struct ptlrpc_request *req)
250 struct lustre_handle *conn = (struct lustre_handle *)req->rq_reqmsg;
251 struct ptlrpc_bulk_desc *desc;
252 struct obd_ioobj *tmp1;
254 struct niobuf_remote *remote_nb;
255 struct niobuf_local *local_nb = NULL;
256 struct obd_ioobj *ioo;
257 struct ost_body *body;
258 struct l_wait_info lwi;
259 void *desc_priv = NULL;
260 int cmd, i, j, objcount, niocount, size = sizeof(*body);
267 body = lustre_msg_buf(req->rq_reqmsg, 0);
268 tmp1 = lustre_msg_buf(req->rq_reqmsg, 1);
269 tmp2 = lustre_msg_buf(req->rq_reqmsg, 2);
270 end2 = (char *)tmp2 + req->rq_reqmsg->buflens[2];
271 objcount = req->rq_reqmsg->buflens[1] / sizeof(*ioo);
272 niocount = req->rq_reqmsg->buflens[2] / sizeof(*remote_nb);
275 if (OBD_FAIL_CHECK(OBD_FAIL_OST_BRW_READ_BULK))
276 GOTO(out, req->rq_status = -EIO);
278 /* Hmm, we don't return anything in this reply buffer?
279 * We should be returning per-page status codes and also
280 * per-object size, blocks count, mtime, ctime. (bug 593) */
281 rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
283 GOTO(out, req->rq_status = rc);
285 for (i = 0; i < objcount; i++) {
286 ost_unpack_ioo(&tmp1, &ioo);
287 if (tmp2 + ioo->ioo_bufcnt > end2) {
289 GOTO(out, rc = -EFAULT);
291 for (j = 0; j < ioo->ioo_bufcnt; j++) {
292 /* XXX verify niobuf[j].offset > niobuf[j-1].offset */
293 ost_unpack_niobuf(&tmp2, &remote_nb);
297 OBD_ALLOC(local_nb, sizeof(*local_nb) * niocount);
298 if (local_nb == NULL)
299 GOTO(out, rc = -ENOMEM);
301 /* The unpackers move tmp1 and tmp2, so reset them before using */
302 ioo = lustre_msg_buf(req->rq_reqmsg, 1);
303 remote_nb = lustre_msg_buf(req->rq_reqmsg, 2);
304 req->rq_status = obd_preprw(cmd, conn, objcount, ioo, niocount,
305 remote_nb, local_nb, &desc_priv, NULL);
308 GOTO(out, req->rq_status);
310 desc = ptlrpc_prep_bulk(req->rq_connection);
312 GOTO(out_local, rc = -ENOMEM);
313 desc->bd_ptl_ev_hdlr = NULL;
314 desc->bd_portal = OST_BULK_PORTAL;
316 for (i = 0; i < niocount; i++) {
317 struct ptlrpc_bulk_page *bulk = ptlrpc_prep_bulk_page(desc);
320 GOTO(out_bulk, rc = -ENOMEM);
321 bulk->bp_xid = remote_nb[i].xid;
322 bulk->bp_buf = local_nb[i].addr;
323 bulk->bp_buflen = remote_nb[i].len;
324 if (body->oa.o_valid & NTOH__u32(OBD_MD_FLCKSUM))
325 ost_checksum(&cksum, bulk->bp_buf, bulk->bp_buflen);
328 rc = ptlrpc_bulk_put(desc);
332 lwi = LWI_TIMEOUT(obd_timeout * HZ, ost_bulk_timeout, desc);
333 rc = l_wait_event(desc->bd_waitq, desc->bd_flags & PTL_BULK_FL_SENT,
336 LASSERT(rc == -ETIMEDOUT);
340 req->rq_status = obd_commitrw(cmd, conn, objcount, ioo, niocount,
341 local_nb, desc_priv, NULL);
344 ptlrpc_bulk_decref(desc);
346 OBD_FREE(local_nb, sizeof(*local_nb) * niocount);
349 ptlrpc_error(req->rq_svc, req);
352 body = lustre_msg_buf(req->rq_repmsg, 0);
353 body->oa.o_rdev = HTON__u64(cksum);
354 body->oa.o_valid |= HTON__u32(OBD_MD_FLCKSUM);
356 ptlrpc_reply(req->rq_svc, req);
362 static int ost_brw_write(struct ptlrpc_request *req, struct obd_trans_info *oti)
364 struct lustre_handle *conn = (struct lustre_handle *)req->rq_reqmsg;
365 struct ptlrpc_bulk_desc *desc;
366 struct obd_ioobj *tmp1;
368 struct niobuf_remote *remote_nb;
369 struct niobuf_local *local_nb = NULL;
370 struct obd_ioobj *ioo;
371 struct ost_body *body;
372 struct l_wait_info lwi;
373 void *desc_priv = NULL;
374 int cmd, i, j, objcount, niocount, size = sizeof(*body);
378 body = lustre_msg_buf(req->rq_reqmsg, 0);
379 tmp1 = lustre_msg_buf(req->rq_reqmsg, 1);
380 tmp2 = lustre_msg_buf(req->rq_reqmsg, 2);
381 end2 = (char *)tmp2 + req->rq_reqmsg->buflens[2];
382 objcount = req->rq_reqmsg->buflens[1] / sizeof(*ioo);
383 niocount = req->rq_reqmsg->buflens[2] / sizeof(*remote_nb);
386 if (OBD_FAIL_CHECK(OBD_FAIL_OST_BRW_WRITE_BULK))
387 GOTO(out, req->rq_status = -EIO);
389 for (i = 0; i < objcount; i++) {
390 ost_unpack_ioo(&tmp1, &ioo);
391 if (tmp2 + ioo->ioo_bufcnt > end2) {
393 GOTO(out, rc = -EFAULT);
395 for (j = 0; j < ioo->ioo_bufcnt; j++) {
396 /* XXX verify niobuf[j].offset > niobuf[j-1].offset */
397 ost_unpack_niobuf(&tmp2, &remote_nb);
401 OBD_ALLOC(local_nb, sizeof(*local_nb) * niocount);
402 if (local_nb == NULL)
403 GOTO(out, rc = -ENOMEM);
405 /* The unpackers move tmp1 and tmp2, so reset them before using */
406 ioo = lustre_msg_buf(req->rq_reqmsg, 1);
407 remote_nb = lustre_msg_buf(req->rq_reqmsg, 2);
408 req->rq_status = obd_preprw(cmd, conn, objcount, ioo, niocount,
409 remote_nb, local_nb, &desc_priv, oti);
412 GOTO(out_local, rc = 0);
414 desc = ptlrpc_prep_bulk(req->rq_connection);
416 GOTO(out_local, rc = -ENOMEM);
417 desc->bd_ptl_ev_hdlr = NULL;
418 desc->bd_portal = OSC_BULK_PORTAL;
420 for (i = 0; i < niocount; i++) {
421 struct ptlrpc_bulk_page *bulk = ptlrpc_prep_bulk_page(desc);
424 GOTO(out_bulk, rc = -ENOMEM);
425 bulk->bp_xid = remote_nb[i].xid;
426 bulk->bp_buf = local_nb[i].addr;
427 bulk->bp_buflen = remote_nb[i].len;
430 rc = ptlrpc_bulk_get(desc);
434 lwi = LWI_TIMEOUT(obd_timeout * HZ, ost_bulk_timeout, desc);
435 rc = l_wait_event(desc->bd_waitq, desc->bd_flags & PTL_BULK_FL_RCVD,
438 LASSERT(rc == -ETIMEDOUT);
439 ptlrpc_abort_bulk(desc);
440 recovd_conn_fail(desc->bd_connection);
441 obd_commitrw(cmd, conn, objcount, ioo, niocount, local_nb,
447 if ((body->oa.o_valid & NTOH__u32(OBD_MD_FLCKSUM))) {
448 static int cksum_counter;
449 __u64 client_cksum = NTOH__u64(body->oa.o_rdev);
452 for (i = 0; i < niocount; i++) {
453 char *ptr = kmap(local_nb[i].page);
454 int off = local_nb[i].offset & (PAGE_SIZE - 1);
455 int len = local_nb[i].len;
457 LASSERT(off + len <= PAGE_SIZE);
458 ost_checksum(&cksum, ptr + off, len);
459 kunmap(local_nb[i].page);
462 if (client_cksum != cksum) {
463 CERROR("Bad checksum: client "LPX64", server "LPX64
464 ", client NID "LPX64"\n", client_cksum, cksum,
465 req->rq_connection->c_peer.peer_nid);
469 if ((cksum_counter & (-cksum_counter)) == cksum_counter)
470 CERROR("Checksum %d from "LPX64": "LPX64" OK\n",
472 req->rq_connection->c_peer.peer_nid,
478 req->rq_status = obd_commitrw(cmd, conn, objcount, ioo, niocount,
479 local_nb, desc_priv, oti);
482 ptlrpc_bulk_decref(desc);
484 OBD_FREE(local_nb, sizeof(*local_nb) * niocount);
487 /* Hmm, we don't return anything in this reply buffer?
488 * We should be returning per-page status codes and also
489 * per-object size, blocks count, mtime, ctime. (bug 593) */
490 rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen,
493 ptlrpc_error(req->rq_svc, req);
495 oti_to_request(oti, req);
496 rc = ptlrpc_reply(req->rq_svc, req);
501 static int ost_san_brw(struct ptlrpc_request *req, int alloc)
503 struct lustre_handle *conn = (struct lustre_handle *)req->rq_reqmsg;
504 struct niobuf_remote *remote_nb, *res_nb;
505 struct obd_ioobj *ioo;
506 struct ost_body *body;
507 int cmd, rc, i, j, objcount, niocount, size[2] = {sizeof(*body)};
508 void *tmp1, *tmp2, *end2;
511 body = lustre_msg_buf(req->rq_reqmsg, 0);
512 tmp1 = lustre_msg_buf(req->rq_reqmsg, 1);
513 tmp2 = lustre_msg_buf(req->rq_reqmsg, 2);
514 end2 = (char *)tmp2 + req->rq_reqmsg->buflens[2];
515 objcount = req->rq_reqmsg->buflens[1] / sizeof(*ioo);
516 niocount = req->rq_reqmsg->buflens[2] / sizeof(*remote_nb);
518 cmd = alloc ? OBD_BRW_WRITE : OBD_BRW_READ;
520 for (i = 0; i < objcount; i++) {
521 ost_unpack_ioo((void *)&tmp1, &ioo);
522 if (tmp2 + ioo->ioo_bufcnt > end2) {
526 for (j = 0; j < ioo->ioo_bufcnt; j++)
527 ost_unpack_niobuf((void *)&tmp2, &remote_nb);
530 size[1] = niocount * sizeof(*remote_nb);
531 rc = lustre_pack_msg(2, size, NULL, &req->rq_replen, &req->rq_repmsg);
535 /* The unpackers move tmp1 and tmp2, so reset them before using */
536 tmp1 = lustre_msg_buf(req->rq_reqmsg, 1);
537 tmp2 = lustre_msg_buf(req->rq_reqmsg, 2);
539 req->rq_status = obd_san_preprw(cmd, conn, objcount, tmp1,
542 if (req->rq_status) {
547 remote_nb = lustre_msg_buf(req->rq_repmsg, 1);
548 res_nb = lustre_msg_buf(req->rq_reqmsg, 2);
549 for (i = 0; i < niocount; i++) {
550 /* this advances remote_nb */
551 ost_pack_niobuf((void **)&remote_nb,
553 res_nb[i].len, /* 0 */
554 res_nb[i].flags, /* 0 */
563 OBD_FREE(req->rq_repmsg, req->rq_replen);
564 req->rq_repmsg = NULL;
565 ptlrpc_error(req->rq_svc, req);
567 ptlrpc_reply(req->rq_svc, req);
572 static int filter_recovery_request(struct ptlrpc_request *req,
573 struct obd_device *obd, int *process)
575 switch (req->rq_reqmsg->opc) {
576 case OST_CONNECT: /* This will never get here, but for completeness. */
590 *process = target_queue_recovery_request(req, obd);
594 DEBUG_REQ(D_ERROR, req, "not permitted during recovery");
596 /* XXX what should we set rq_status to here? */
597 RETURN(ptlrpc_error(req->rq_svc, req));
601 static int ost_handle(struct ptlrpc_request *req)
603 struct obd_trans_info trans_info = { 0, }, *oti = &trans_info;
604 int should_process, rc;
607 rc = lustre_unpack_msg(req->rq_reqmsg, req->rq_reqlen);
608 if (rc || OBD_FAIL_CHECK(OBD_FAIL_OST_HANDLE_UNPACK)) {
609 CERROR("lustre_ost: Invalid request\n");
613 if (req->rq_reqmsg->opc != OST_CONNECT) {
614 struct obd_device *obd;
616 if (req->rq_export == NULL) {
617 CERROR("lustre_ost: operation %d on unconnected OST\n",
618 req->rq_reqmsg->opc);
619 req->rq_status = -ENOTCONN;
620 GOTO(out, rc = -ENOTCONN);
623 obd = req->rq_export->exp_obd;
625 spin_lock_bh(&obd->obd_processing_task_lock);
626 if (obd->obd_flags & OBD_ABORT_RECOVERY)
627 target_abort_recovery(obd);
628 spin_unlock_bh(&obd->obd_processing_task_lock);
630 if (obd->obd_flags & OBD_RECOVERING) {
631 rc = filter_recovery_request(req, obd, &should_process);
632 if (rc || !should_process)
634 } else if (lustre_msg_get_flags(req->rq_reqmsg) & MSG_RESENT) {
636 /* need to store this reply somewhere... */
637 if (req->rq_xid == med->med_last_xid) {
638 DEBUG_REQ(D_HA, req, "resending reply");
639 OBD_ALLOC(req->rq_repmsg, med->med_last_replen);
640 req->rq_replen = med->med_last_replen;
641 memcpy(req->rq_repmsg, med->med_last_reply,
643 ptlrpc_reply(req->rq_svc, req);
646 DEBUG_REQ(D_HA, req, "no reply for resend, continuing");
652 if (strcmp(req->rq_obd->obd_type->typ_name, "ost") != 0)
653 GOTO(out, rc = -EINVAL);
655 switch (req->rq_reqmsg->opc) {
657 CDEBUG(D_INODE, "connect\n");
658 OBD_FAIL_RETURN(OBD_FAIL_OST_CONNECT_NET, 0);
659 rc = target_handle_connect(req, ost_handle);
662 CDEBUG(D_INODE, "disconnect\n");
663 OBD_FAIL_RETURN(OBD_FAIL_OST_DISCONNECT_NET, 0);
664 rc = target_handle_disconnect(req);
667 CDEBUG(D_INODE, "create\n");
668 OBD_FAIL_RETURN(OBD_FAIL_OST_CREATE_NET, 0);
669 rc = ost_create(req, oti);
672 CDEBUG(D_INODE, "destroy\n");
673 OBD_FAIL_RETURN(OBD_FAIL_OST_DESTROY_NET, 0);
674 rc = ost_destroy(req, oti);
677 CDEBUG(D_INODE, "getattr\n");
678 OBD_FAIL_RETURN(OBD_FAIL_OST_GETATTR_NET, 0);
679 rc = ost_getattr(req);
682 CDEBUG(D_INODE, "setattr\n");
683 OBD_FAIL_RETURN(OBD_FAIL_OST_SETATTR_NET, 0);
684 rc = ost_setattr(req, oti);
687 CDEBUG(D_INODE, "open\n");
688 OBD_FAIL_RETURN(OBD_FAIL_OST_OPEN_NET, 0);
689 rc = ost_open(req, oti);
692 CDEBUG(D_INODE, "close\n");
693 OBD_FAIL_RETURN(OBD_FAIL_OST_CLOSE_NET, 0);
694 rc = ost_close(req, oti);
697 CDEBUG(D_INODE, "write\n");
698 OBD_FAIL_RETURN(OBD_FAIL_OST_BRW_NET, 0);
699 rc = ost_brw_write(req, oti);
700 /* ost_brw sends its own replies */
703 CDEBUG(D_INODE, "read\n");
704 OBD_FAIL_RETURN(OBD_FAIL_OST_BRW_NET, 0);
705 rc = ost_brw_read(req);
706 /* ost_brw sends its own replies */
709 CDEBUG(D_INODE, "san read\n");
710 OBD_FAIL_RETURN(OBD_FAIL_OST_BRW_NET, 0);
711 rc = ost_san_brw(req, 0);
712 /* ost_san_brw sends its own replies */
715 CDEBUG(D_INODE, "san write\n");
716 OBD_FAIL_RETURN(OBD_FAIL_OST_BRW_NET, 0);
717 rc = ost_san_brw(req, 1);
718 /* ost_san_brw sends its own replies */
721 CDEBUG(D_INODE, "punch\n");
722 OBD_FAIL_RETURN(OBD_FAIL_OST_PUNCH_NET, 0);
723 rc = ost_punch(req, oti);
726 CDEBUG(D_INODE, "statfs\n");
727 OBD_FAIL_RETURN(OBD_FAIL_OST_STATFS_NET, 0);
728 rc = ost_statfs(req);
731 CDEBUG(D_INODE, "sync\n");
732 OBD_FAIL_RETURN(OBD_FAIL_OST_SYNCFS_NET, 0);
733 rc = ost_syncfs(req);
736 CDEBUG(D_INODE, "enqueue\n");
737 OBD_FAIL_RETURN(OBD_FAIL_LDLM_ENQUEUE, 0);
738 rc = ldlm_handle_enqueue(req, ldlm_server_completion_ast,
739 ldlm_server_blocking_ast);
742 CDEBUG(D_INODE, "convert\n");
743 OBD_FAIL_RETURN(OBD_FAIL_LDLM_CONVERT, 0);
744 rc = ldlm_handle_convert(req);
747 CDEBUG(D_INODE, "cancel\n");
748 OBD_FAIL_RETURN(OBD_FAIL_LDLM_CANCEL, 0);
749 rc = ldlm_handle_cancel(req);
751 case LDLM_BL_CALLBACK:
752 case LDLM_CP_CALLBACK:
753 CDEBUG(D_INODE, "callback\n");
754 CERROR("callbacks should not happen on OST\n");
756 OBD_FAIL_RETURN(OBD_FAIL_LDLM_BL_CALLBACK, 0);
759 req->rq_status = -ENOTSUPP;
760 rc = ptlrpc_error(req->rq_svc, req);
765 /* If we're DISCONNECTing, the export_data is already freed */
766 if (!rc && req->rq_reqmsg->opc != OST_DISCONNECT) {
767 struct obd_device *obd = req->rq_export->exp_obd;
768 if ((obd->obd_flags & OBD_NO_TRANSNO) == 0) {
769 req->rq_repmsg->last_committed =
770 HTON__u64(obd->obd_last_committed);
772 DEBUG_REQ(D_IOCTL, req,
773 "not sending last_committed update");
775 CDEBUG(D_INFO, "last_committed "LPU64", xid "LPX64"\n",
776 obd->obd_last_committed, HTON__u64(req->rq_xid));
780 if (lustre_msg_get_flags(req->rq_reqmsg) & MSG_LAST_REPLAY) {
781 struct obd_device *obd = req->rq_export->exp_obd;
783 if (obd && (obd->obd_flags & OBD_RECOVERING)) {
784 DEBUG_REQ(D_HA, req, "LAST_REPLAY, queuing reply");
785 return target_queue_final_reply(req, rc);
787 /* Lost a race with recovery; let the error path DTRT. */
788 rc = req->rq_status = -ENOTCONN;
792 CERROR("ost: processing error (opcode=%d): %d\n",
793 req->rq_reqmsg->opc, rc);
794 ptlrpc_error(req->rq_svc, req);
796 CDEBUG(D_INODE, "sending reply\n");
797 if (req->rq_repmsg == NULL)
798 CERROR("handler for opcode %d returned rc=0 without "
799 "creating rq_repmsg; needs to return rc != 0!\n",
800 req->rq_reqmsg->opc);
802 oti_to_request(oti, req);
803 ptlrpc_reply(req->rq_svc, req);
809 static int ost_setup(struct obd_device *obddev, obd_count len, void *buf)
811 struct ost_obd *ost = &obddev->u.ost;
816 ost->ost_service = ptlrpc_init_svc(OST_NEVENTS, OST_NBUFS,
817 OST_BUFSIZE, OST_MAXREQSIZE,
818 OST_REQUEST_PORTAL, OSC_REPLY_PORTAL,
820 if (!ost->ost_service) {
821 CERROR("failed to start service\n");
822 GOTO(error_disc, err = -ENOMEM);
825 for (i = 0; i < OST_NUM_THREADS; i++) {
827 sprintf(name, "ll_ost_%02d", i);
828 err = ptlrpc_start_thread(obddev, ost->ost_service, name);
830 CERROR("error starting thread #%d: rc %d\n", i, err);
831 GOTO(error_disc, err = -EINVAL);
841 static int ost_cleanup(struct obd_device * obddev)
843 struct ost_obd *ost = &obddev->u.ost;
848 ptlrpc_stop_all_threads(ost->ost_service);
849 ptlrpc_unregister_service(ost->ost_service);
854 int ost_attach(struct obd_device *dev, obd_count len, void *data)
856 struct lprocfs_static_vars lvars;
858 lprocfs_init_vars(&lvars);
859 return lprocfs_obd_attach(dev, lvars.obd_vars);
862 int ost_detach(struct obd_device *dev)
864 return lprocfs_obd_detach(dev);
867 /* I don't think this function is ever used, since nothing
868 * connects directly to this module.
870 static int ost_connect(struct lustre_handle *conn,
871 struct obd_device *obd, struct obd_uuid *cluuid,
872 struct recovd_obd *recovd,
873 ptlrpc_recovery_cb_t recover)
875 struct obd_export *exp;
879 if (!conn || !obd || !cluuid)
882 rc = class_connect(conn, obd, cluuid);
885 exp = class_conn2export(conn);
891 /* use obd ops to offer management infrastructure */
892 static struct obd_ops ost_obd_ops = {
893 o_owner: THIS_MODULE,
894 o_attach: ost_attach,
895 o_detach: ost_detach,
897 o_cleanup: ost_cleanup,
898 o_connect: ost_connect,
901 static int __init ost_init(void)
903 struct lprocfs_static_vars lvars;
906 lprocfs_init_vars(&lvars);
907 RETURN(class_register_type(&ost_obd_ops, lvars.module_vars,
911 static void __exit ost_exit(void)
913 class_unregister_type(LUSTRE_OST_NAME);
916 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
917 MODULE_DESCRIPTION("Lustre Object Storage Target (OST) v0.01");
918 MODULE_LICENSE("GPL");
920 module_init(ost_init);
921 module_exit(ost_exit);