1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
4 * Copyright (C) 2001-2003 Cluster File Systems, Inc.
5 * Author: Peter J. Braam <braam@clusterfs.com>
6 * Author: Phil Schwan <phil@clusterfs.com>
8 * This file is part of Lustre, http://www.lustre.org.
10 * Lustre is free software; you can redistribute it and/or
11 * modify it under the terms of version 2 of the GNU General Public
12 * License as published by the Free Software Foundation.
14 * Lustre is distributed in the hope that it will be useful,
15 * but WITHOUT ANY WARRANTY; without even the implied warranty of
16 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 * GNU General Public License for more details.
19 * You should have received a copy of the GNU General Public License
20 * along with Lustre; if not, write to the Free Software
21 * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
23 * Storage Target Handling functions
24 * Lustre Object Server Module (OST)
26 * This server is single threaded at present (but can easily be multi
27 * threaded). For testing and management it is treated as an
28 * obd_device, although it does not export a full OBD method table
29 * (the requests are coming in over the wire, so object target
30 * modules do not have a full method table.)
34 #define DEBUG_SUBSYSTEM S_OST
36 #include <linux/module.h>
37 #include <linux/obd_ost.h>
38 #include <linux/lustre_net.h>
39 #include <linux/lustre_dlm.h>
40 #include <linux/lustre_export.h>
41 #include <linux/init.h>
42 #include <linux/lprocfs_status.h>
44 inline void oti_to_request(struct obd_trans_info *oti, struct ptlrpc_request *req)
46 if (oti && req->rq_repmsg)
47 req->rq_repmsg->transno = HTON__u64(oti->oti_transno);
51 static int ost_destroy(struct ptlrpc_request *req, struct obd_trans_info *oti)
53 struct lustre_handle *conn = (struct lustre_handle *)req->rq_reqmsg;
54 struct ost_body *body;
55 int rc, size = sizeof(*body);
58 body = lustre_msg_buf(req->rq_reqmsg, 0);
60 rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
64 req->rq_status = obd_destroy(conn, &body->oa, NULL, oti);
68 static int ost_getattr(struct ptlrpc_request *req)
70 struct lustre_handle *conn = (struct lustre_handle *)req->rq_reqmsg;
71 struct ost_body *body, *repbody;
72 int rc, size = sizeof(*body);
75 body = lustre_msg_buf(req->rq_reqmsg, 0);
77 rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
81 repbody = lustre_msg_buf(req->rq_repmsg, 0);
82 /* FIXME: unpack only valid fields instead of memcpy, endianness */
83 memcpy(&repbody->oa, &body->oa, sizeof(body->oa));
84 req->rq_status = obd_getattr(conn, &repbody->oa, NULL);
88 static int ost_statfs(struct ptlrpc_request *req)
90 struct lustre_handle *conn = (struct lustre_handle *)req->rq_reqmsg;
91 struct obd_statfs *osfs;
92 int rc, size = sizeof(*osfs);
95 rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
99 osfs = lustre_msg_buf(req->rq_repmsg, 0);
100 memset(osfs, 0, size);
102 rc = obd_statfs(conn, osfs);
104 CERROR("ost: statfs failed: rc %d\n", rc);
108 obd_statfs_pack(osfs, osfs);
113 static int ost_syncfs(struct ptlrpc_request *req)
115 struct lustre_handle *conn = (struct lustre_handle *)req->rq_reqmsg;
116 struct obd_statfs *osfs;
117 int rc, size = sizeof(*osfs);
120 rc = lustre_pack_msg(0, &size, NULL, &req->rq_replen, &req->rq_repmsg);
124 rc = obd_syncfs(conn);
126 CERROR("ost: syncfs failed: rc %d\n", rc);
134 static int ost_open(struct ptlrpc_request *req, struct obd_trans_info *oti)
136 struct lustre_handle *conn = (struct lustre_handle *)req->rq_reqmsg;
137 struct ost_body *body, *repbody;
138 int rc, size = sizeof(*body);
141 body = lustre_msg_buf(req->rq_reqmsg, 0);
143 rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
147 repbody = lustre_msg_buf(req->rq_repmsg, 0);
148 /* FIXME: unpack only valid fields instead of memcpy, endianness */
149 memcpy(&repbody->oa, &body->oa, sizeof(body->oa));
150 req->rq_status = obd_open(conn, &repbody->oa, NULL, oti);
154 static int ost_close(struct ptlrpc_request *req, struct obd_trans_info *oti)
156 struct lustre_handle *conn = (struct lustre_handle *)req->rq_reqmsg;
157 struct ost_body *body, *repbody;
158 int rc, size = sizeof(*body);
161 body = lustre_msg_buf(req->rq_reqmsg, 0);
163 rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
167 repbody = lustre_msg_buf(req->rq_repmsg, 0);
168 /* FIXME: unpack only valid fields instead of memcpy, endianness */
169 memcpy(&repbody->oa, &body->oa, sizeof(body->oa));
170 req->rq_status = obd_close(conn, &repbody->oa, NULL, oti);
174 static int ost_create(struct ptlrpc_request *req, struct obd_trans_info *oti)
176 struct lustre_handle *conn = (struct lustre_handle *)req->rq_reqmsg;
177 struct ost_body *body, *repbody;
178 int rc, size = sizeof(*body);
181 body = lustre_msg_buf(req->rq_reqmsg, 0);
183 rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
187 repbody = lustre_msg_buf(req->rq_repmsg, 0);
188 /* FIXME: unpack only valid fields instead of memcpy, endianness */
189 memcpy(&repbody->oa, &body->oa, sizeof(body->oa));
190 req->rq_status = obd_create(conn, &repbody->oa, NULL, oti);
194 static int ost_punch(struct ptlrpc_request *req, struct obd_trans_info *oti)
196 struct lustre_handle *conn = (struct lustre_handle *)req->rq_reqmsg;
197 struct ost_body *body, *repbody;
198 int rc, size = sizeof(*body);
201 body = lustre_msg_buf(req->rq_reqmsg, 0);
203 if ((NTOH__u32(body->oa.o_valid) & (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS))!=
204 (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS))
207 rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
211 repbody = lustre_msg_buf(req->rq_repmsg, 0);
212 /* FIXME: unpack only valid fields instead of memcpy, endianness */
213 memcpy(&repbody->oa, &body->oa, sizeof(body->oa));
214 req->rq_status = obd_punch(conn, &repbody->oa, NULL,
215 repbody->oa.o_size, repbody->oa.o_blocks, oti);
219 static int ost_setattr(struct ptlrpc_request *req, struct obd_trans_info *oti)
221 struct lustre_handle *conn = (struct lustre_handle *)req->rq_reqmsg;
222 struct ost_body *body, *repbody;
223 int rc, size = sizeof(*body);
226 body = lustre_msg_buf(req->rq_reqmsg, 0);
228 rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
232 repbody = lustre_msg_buf(req->rq_repmsg, 0);
233 /* FIXME: unpack only valid fields instead of memcpy, endianness */
234 memcpy(&repbody->oa, &body->oa, sizeof(body->oa));
235 req->rq_status = obd_setattr(conn, &repbody->oa, NULL, oti);
239 static int ost_bulk_timeout(void *data)
242 /* We don't fail the connection here, because having the export
243 * killed makes the (vital) call to commitrw very sad.
248 static int ost_brw_read(struct ptlrpc_request *req)
250 struct lustre_handle *conn = (struct lustre_handle *)req->rq_reqmsg;
251 struct ptlrpc_bulk_desc *desc;
252 struct niobuf_remote *remote_nb;
253 struct niobuf_local *local_nb = NULL;
254 struct obd_ioobj *ioo;
255 struct ost_body *body;
256 struct l_wait_info lwi;
257 void *desc_priv = NULL;
259 int cmd, i, j, objcount, niocount, size = sizeof(*body);
266 body = lustre_msg_buf(req->rq_reqmsg, 0);
267 ioo = lustre_msg_buf(req->rq_reqmsg, 1);
268 remote_nb = lustre_msg_buf(req->rq_reqmsg, 2);
269 end2 = (char *)remote_nb + req->rq_reqmsg->buflens[2];
270 objcount = req->rq_reqmsg->buflens[1] / sizeof(*ioo);
271 niocount = req->rq_reqmsg->buflens[2] / sizeof(*remote_nb);
274 if (OBD_FAIL_CHECK(OBD_FAIL_OST_BRW_READ_BULK))
275 GOTO(out, req->rq_status = -EIO);
277 /* Hmm, we don't return anything in this reply buffer?
278 * We should be returning per-page status codes and also
279 * per-object size, blocks count, mtime, ctime. (bug 593) */
280 rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
282 GOTO(out, req->rq_status = rc);
284 for (i = 0; i < objcount; i++, ioo++) {
285 ost_unpack_ioo(ioo, ioo);
286 if ((void *)(remote_nb + ioo->ioo_bufcnt) > end2) {
287 CERROR("BRW: objid "LPX64" count %u larger than %u\n",
288 ioo->ioo_id, ioo->ioo_bufcnt,
289 (int)(end2 - (void *)remote_nb));
291 GOTO(out, rc = -EINVAL);
293 for (j = 0; j < ioo->ioo_bufcnt; j++, remote_nb++) {
294 ost_unpack_niobuf(remote_nb, remote_nb);
295 if (remote_nb->len == 0) {
296 CERROR("zero len BRW: objid "LPX64" buf %u\n",
298 GOTO(out, rc = -EINVAL);
300 if (j && remote_nb->offset <= (remote_nb - 1)->offset) {
301 CERROR("unordered BRW: objid "LPX64
302 " buf %u offset "LPX64" <= "LPX64"\n",
303 ioo->ioo_id, j, remote_nb->offset,
304 (remote_nb - 1)->offset);
305 GOTO(out, rc = -EINVAL);
310 OBD_ALLOC(local_nb, sizeof(*local_nb) * niocount);
311 if (local_nb == NULL)
312 GOTO(out, rc = -ENOMEM);
314 /* The unpackers move ioo and remote_nb, so reset them before using */
315 ioo = lustre_msg_buf(req->rq_reqmsg, 1);
316 remote_nb = lustre_msg_buf(req->rq_reqmsg, 2);
317 req->rq_status = obd_preprw(cmd, conn, objcount, ioo, niocount,
318 remote_nb, local_nb, &desc_priv, NULL);
321 GOTO(out, req->rq_status);
323 desc = ptlrpc_prep_bulk(req->rq_connection);
325 GOTO(out_local, rc = -ENOMEM);
326 desc->bd_ptl_ev_hdlr = NULL;
327 desc->bd_portal = OST_BULK_PORTAL;
329 for (i = 0; i < niocount; i++) {
330 struct ptlrpc_bulk_page *bulk = ptlrpc_prep_bulk_page(desc);
333 GOTO(out_bulk, rc = -ENOMEM);
334 bulk->bp_xid = remote_nb[i].xid;
335 bulk->bp_buf = local_nb[i].addr;
336 bulk->bp_buflen = remote_nb[i].len;
337 if (body->oa.o_valid & NTOH__u32(OBD_MD_FLCKSUM))
338 ost_checksum(&cksum, bulk->bp_buf, bulk->bp_buflen);
341 rc = ptlrpc_bulk_put(desc);
345 lwi = LWI_TIMEOUT(obd_timeout * HZ, ost_bulk_timeout, desc);
346 rc = l_wait_event(desc->bd_waitq, desc->bd_flags & PTL_BULK_FL_SENT,
349 LASSERT(rc == -ETIMEDOUT);
353 req->rq_status = obd_commitrw(cmd, conn, objcount, ioo, niocount,
354 local_nb, desc_priv, NULL);
357 ptlrpc_bulk_decref(desc);
359 OBD_FREE(local_nb, sizeof(*local_nb) * niocount);
362 ptlrpc_error(req->rq_svc, req);
365 body = lustre_msg_buf(req->rq_repmsg, 0);
366 body->oa.o_rdev = HTON__u64(cksum);
367 body->oa.o_valid |= HTON__u32(OBD_MD_FLCKSUM);
369 ptlrpc_reply(req->rq_svc, req);
375 static int ost_brw_write(struct ptlrpc_request *req, struct obd_trans_info *oti)
377 struct lustre_handle *conn = (struct lustre_handle *)req->rq_reqmsg;
378 struct ptlrpc_bulk_desc *desc;
379 struct niobuf_remote *remote_nb;
381 struct niobuf_local *local_nb = NULL;
382 struct obd_ioobj *ioo;
383 struct ost_body *body;
384 struct l_wait_info lwi;
385 void *desc_priv = NULL;
386 int cmd, i, j, objcount, niocount, size = sizeof(*body);
390 body = lustre_msg_buf(req->rq_reqmsg, 0);
391 ioo = lustre_msg_buf(req->rq_reqmsg, 1);
392 remote_nb = lustre_msg_buf(req->rq_reqmsg, 2);
393 end2 = (void *)remote_nb + req->rq_reqmsg->buflens[2];
394 objcount = req->rq_reqmsg->buflens[1] / sizeof(*ioo);
395 niocount = req->rq_reqmsg->buflens[2] / sizeof(*remote_nb);
398 if (OBD_FAIL_CHECK(OBD_FAIL_OST_BRW_WRITE_BULK))
399 GOTO(out, req->rq_status = -EIO);
401 for (i = 0; i < objcount; i++, ioo++) {
402 ost_unpack_ioo(ioo, ioo);
403 if ((void *)(remote_nb + ioo->ioo_bufcnt) > end2) {
404 CERROR("BRW: objid "LPX64" count %u larger than %u\n",
405 ioo->ioo_id, ioo->ioo_bufcnt,
406 (int)(end2 - (void *)remote_nb));
408 GOTO(out, rc = -EINVAL);
410 for (j = 0; j < ioo->ioo_bufcnt; j++, remote_nb++) {
411 ost_unpack_niobuf(remote_nb, remote_nb);
412 if (remote_nb->len == 0) {
413 CERROR("zero len BRW: objid "LPX64" buf %u\n",
415 GOTO(out, rc = -EINVAL);
417 if (j && remote_nb->offset <= (remote_nb - 1)->offset) {
418 CERROR("unordered BRW: objid "LPX64
419 " buf %u offset "LPX64" <= "LPX64"\n",
420 ioo->ioo_id, j, remote_nb->offset,
421 (remote_nb - 1)->offset);
422 GOTO(out, rc = -EINVAL);
427 OBD_ALLOC(local_nb, sizeof(*local_nb) * niocount);
428 if (local_nb == NULL)
429 GOTO(out, rc = -ENOMEM);
431 /* The unpackers move ioo and remote_nb, so reset them before using */
432 ioo = lustre_msg_buf(req->rq_reqmsg, 1);
433 remote_nb = lustre_msg_buf(req->rq_reqmsg, 2);
435 req->rq_status = obd_preprw(cmd, conn, objcount, ioo, niocount,
436 remote_nb, local_nb, &desc_priv, oti);
439 GOTO(out_local, rc = 0);
441 desc = ptlrpc_prep_bulk(req->rq_connection);
443 GOTO(out_local, rc = -ENOMEM);
444 desc->bd_ptl_ev_hdlr = NULL;
445 desc->bd_portal = OSC_BULK_PORTAL;
447 for (i = 0; i < niocount; i++) {
448 struct ptlrpc_bulk_page *bulk = ptlrpc_prep_bulk_page(desc);
451 GOTO(out_bulk, rc = -ENOMEM);
452 bulk->bp_xid = remote_nb[i].xid;
453 bulk->bp_buf = local_nb[i].addr;
454 bulk->bp_buflen = remote_nb[i].len;
457 rc = ptlrpc_bulk_get(desc);
461 lwi = LWI_TIMEOUT(obd_timeout * HZ, ost_bulk_timeout, desc);
462 rc = l_wait_event(desc->bd_waitq, desc->bd_flags & PTL_BULK_FL_RCVD,
465 LASSERT(rc == -ETIMEDOUT);
466 ptlrpc_abort_bulk(desc);
467 recovd_conn_fail(desc->bd_connection);
468 obd_commitrw(cmd, conn, objcount, ioo, niocount, local_nb,
474 if ((body->oa.o_valid & NTOH__u32(OBD_MD_FLCKSUM))) {
475 static int cksum_counter;
476 __u64 client_cksum = NTOH__u64(body->oa.o_rdev);
479 for (i = 0; i < niocount; i++) {
480 char *ptr = kmap(local_nb[i].page);
481 int off = local_nb[i].offset & (PAGE_SIZE - 1);
482 int len = local_nb[i].len;
484 LASSERT(off + len <= PAGE_SIZE);
485 ost_checksum(&cksum, ptr + off, len);
486 kunmap(local_nb[i].page);
489 if (client_cksum != cksum) {
490 CERROR("Bad checksum: client "LPX64", server "LPX64
491 ", client NID "LPX64"\n", client_cksum, cksum,
492 req->rq_connection->c_peer.peer_nid);
496 if ((cksum_counter & (-cksum_counter)) == cksum_counter)
497 CERROR("Checksum %d from "LPX64": "LPX64" OK\n",
499 req->rq_connection->c_peer.peer_nid,
505 req->rq_status = obd_commitrw(cmd, conn, objcount, ioo, niocount,
506 local_nb, desc_priv, oti);
509 ptlrpc_bulk_decref(desc);
511 OBD_FREE(local_nb, sizeof(*local_nb) * niocount);
514 /* Hmm, we don't return anything in this reply buffer?
515 * We should be returning per-page status codes and also
516 * per-object size, blocks count, mtime, ctime. (bug 593) */
517 rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen,
520 ptlrpc_error(req->rq_svc, req);
522 oti_to_request(oti, req);
523 rc = ptlrpc_reply(req->rq_svc, req);
528 static int ost_san_brw(struct ptlrpc_request *req, int alloc)
530 struct lustre_handle *conn = (struct lustre_handle *)req->rq_reqmsg;
531 struct niobuf_remote *remote_nb, *res_nb;
532 struct obd_ioobj *ioo;
533 struct ost_body *body;
534 int cmd, rc, i, j, objcount, niocount, size[2] = {sizeof(*body)};
538 body = lustre_msg_buf(req->rq_reqmsg, 0);
539 ioo = lustre_msg_buf(req->rq_reqmsg, 1);
540 remote_nb = lustre_msg_buf(req->rq_reqmsg, 2);
541 end2 = (void *)remote_nb + req->rq_reqmsg->buflens[2];
542 objcount = req->rq_reqmsg->buflens[1] / sizeof(*ioo);
543 niocount = req->rq_reqmsg->buflens[2] / sizeof(*remote_nb);
545 cmd = alloc ? OBD_BRW_WRITE : OBD_BRW_READ;
547 for (i = 0; i < objcount; i++, ioo++) {
548 ost_unpack_ioo(ioo, ioo);
549 if ((void *)(remote_nb + ioo->ioo_bufcnt) > end2) {
550 CERROR("BRW: objid "LPX64" count %u larger than %u\n",
551 ioo->ioo_id, ioo->ioo_bufcnt,
552 (int)(end2 - (void *)remote_nb));
553 GOTO(out, rc = -EINVAL);
555 for (j = 0; j < ioo->ioo_bufcnt; j++, remote_nb++)
556 ost_unpack_niobuf(remote_nb, remote_nb);
559 size[1] = niocount * sizeof(*remote_nb);
560 rc = lustre_pack_msg(2, size, NULL, &req->rq_replen, &req->rq_repmsg);
564 /* The unpackers move ioo and remote_nb, so reset them before using */
565 ioo = lustre_msg_buf(req->rq_reqmsg, 1);
566 remote_nb = lustre_msg_buf(req->rq_reqmsg, 2);
568 req->rq_status = obd_san_preprw(cmd, conn, objcount, ioo,
569 niocount, remote_nb);
571 if (req->rq_status) {
576 remote_nb = lustre_msg_buf(req->rq_repmsg, 1);
577 res_nb = lustre_msg_buf(req->rq_reqmsg, 2);
578 for (i = 0; i < niocount; i++, remote_nb++, res_nb++)
579 ost_pack_niobuf(remote_nb, res_nb->offset, res_nb->len,
580 res_nb->flags, res_nb->xid);
586 OBD_FREE(req->rq_repmsg, req->rq_replen);
587 req->rq_repmsg = NULL;
588 ptlrpc_error(req->rq_svc, req);
590 ptlrpc_reply(req->rq_svc, req);
595 static int filter_recovery_request(struct ptlrpc_request *req,
596 struct obd_device *obd, int *process)
598 switch (req->rq_reqmsg->opc) {
599 case OST_CONNECT: /* This will never get here, but for completeness. */
613 *process = target_queue_recovery_request(req, obd);
617 DEBUG_REQ(D_ERROR, req, "not permitted during recovery");
619 /* XXX what should we set rq_status to here? */
620 RETURN(ptlrpc_error(req->rq_svc, req));
624 static int ost_handle(struct ptlrpc_request *req)
626 struct obd_trans_info trans_info = { 0, }, *oti = &trans_info;
627 int should_process, rc;
630 rc = lustre_unpack_msg(req->rq_reqmsg, req->rq_reqlen);
631 if (rc || OBD_FAIL_CHECK(OBD_FAIL_OST_HANDLE_UNPACK)) {
632 CERROR("lustre_ost: Invalid request\n");
636 if (req->rq_reqmsg->opc != OST_CONNECT) {
637 struct obd_device *obd;
639 if (req->rq_export == NULL) {
640 CERROR("lustre_ost: operation %d on unconnected OST\n",
641 req->rq_reqmsg->opc);
642 req->rq_status = -ENOTCONN;
643 GOTO(out, rc = -ENOTCONN);
646 obd = req->rq_export->exp_obd;
648 spin_lock_bh(&obd->obd_processing_task_lock);
649 if (obd->obd_flags & OBD_ABORT_RECOVERY)
650 target_abort_recovery(obd);
651 spin_unlock_bh(&obd->obd_processing_task_lock);
653 if (obd->obd_flags & OBD_RECOVERING) {
654 rc = filter_recovery_request(req, obd, &should_process);
655 if (rc || !should_process)
657 } else if (lustre_msg_get_flags(req->rq_reqmsg) & MSG_RESENT) {
659 /* need to store this reply somewhere... */
660 if (req->rq_xid == med->med_last_xid) {
661 DEBUG_REQ(D_HA, req, "resending reply");
662 OBD_ALLOC(req->rq_repmsg, med->med_last_replen);
663 req->rq_replen = med->med_last_replen;
664 memcpy(req->rq_repmsg, med->med_last_reply,
666 ptlrpc_reply(req->rq_svc, req);
669 DEBUG_REQ(D_HA, req, "no reply for resend, continuing");
675 if (strcmp(req->rq_obd->obd_type->typ_name, "ost") != 0)
676 GOTO(out, rc = -EINVAL);
678 switch (req->rq_reqmsg->opc) {
680 CDEBUG(D_INODE, "connect\n");
681 OBD_FAIL_RETURN(OBD_FAIL_OST_CONNECT_NET, 0);
682 rc = target_handle_connect(req, ost_handle);
685 CDEBUG(D_INODE, "disconnect\n");
686 OBD_FAIL_RETURN(OBD_FAIL_OST_DISCONNECT_NET, 0);
687 rc = target_handle_disconnect(req);
690 CDEBUG(D_INODE, "create\n");
691 OBD_FAIL_RETURN(OBD_FAIL_OST_CREATE_NET, 0);
692 rc = ost_create(req, oti);
695 CDEBUG(D_INODE, "destroy\n");
696 OBD_FAIL_RETURN(OBD_FAIL_OST_DESTROY_NET, 0);
697 rc = ost_destroy(req, oti);
700 CDEBUG(D_INODE, "getattr\n");
701 OBD_FAIL_RETURN(OBD_FAIL_OST_GETATTR_NET, 0);
702 rc = ost_getattr(req);
705 CDEBUG(D_INODE, "setattr\n");
706 OBD_FAIL_RETURN(OBD_FAIL_OST_SETATTR_NET, 0);
707 rc = ost_setattr(req, oti);
710 CDEBUG(D_INODE, "open\n");
711 OBD_FAIL_RETURN(OBD_FAIL_OST_OPEN_NET, 0);
712 rc = ost_open(req, oti);
715 CDEBUG(D_INODE, "close\n");
716 OBD_FAIL_RETURN(OBD_FAIL_OST_CLOSE_NET, 0);
717 rc = ost_close(req, oti);
720 CDEBUG(D_INODE, "write\n");
721 OBD_FAIL_RETURN(OBD_FAIL_OST_BRW_NET, 0);
722 rc = ost_brw_write(req, oti);
723 /* ost_brw sends its own replies */
726 CDEBUG(D_INODE, "read\n");
727 OBD_FAIL_RETURN(OBD_FAIL_OST_BRW_NET, 0);
728 rc = ost_brw_read(req);
729 /* ost_brw sends its own replies */
732 CDEBUG(D_INODE, "san read\n");
733 OBD_FAIL_RETURN(OBD_FAIL_OST_BRW_NET, 0);
734 rc = ost_san_brw(req, 0);
735 /* ost_san_brw sends its own replies */
738 CDEBUG(D_INODE, "san write\n");
739 OBD_FAIL_RETURN(OBD_FAIL_OST_BRW_NET, 0);
740 rc = ost_san_brw(req, 1);
741 /* ost_san_brw sends its own replies */
744 CDEBUG(D_INODE, "punch\n");
745 OBD_FAIL_RETURN(OBD_FAIL_OST_PUNCH_NET, 0);
746 rc = ost_punch(req, oti);
749 CDEBUG(D_INODE, "statfs\n");
750 OBD_FAIL_RETURN(OBD_FAIL_OST_STATFS_NET, 0);
751 rc = ost_statfs(req);
754 CDEBUG(D_INODE, "sync\n");
755 OBD_FAIL_RETURN(OBD_FAIL_OST_SYNCFS_NET, 0);
756 rc = ost_syncfs(req);
759 CDEBUG(D_INODE, "enqueue\n");
760 OBD_FAIL_RETURN(OBD_FAIL_LDLM_ENQUEUE, 0);
761 rc = ldlm_handle_enqueue(req, ldlm_server_completion_ast,
762 ldlm_server_blocking_ast);
765 CDEBUG(D_INODE, "convert\n");
766 OBD_FAIL_RETURN(OBD_FAIL_LDLM_CONVERT, 0);
767 rc = ldlm_handle_convert(req);
770 CDEBUG(D_INODE, "cancel\n");
771 OBD_FAIL_RETURN(OBD_FAIL_LDLM_CANCEL, 0);
772 rc = ldlm_handle_cancel(req);
774 case LDLM_BL_CALLBACK:
775 case LDLM_CP_CALLBACK:
776 CDEBUG(D_INODE, "callback\n");
777 CERROR("callbacks should not happen on OST\n");
779 OBD_FAIL_RETURN(OBD_FAIL_LDLM_BL_CALLBACK, 0);
782 req->rq_status = -ENOTSUPP;
783 rc = ptlrpc_error(req->rq_svc, req);
788 /* If we're DISCONNECTing, the export_data is already freed */
789 if (!rc && req->rq_reqmsg->opc != OST_DISCONNECT) {
790 struct obd_device *obd = req->rq_export->exp_obd;
791 if ((obd->obd_flags & OBD_NO_TRANSNO) == 0) {
792 req->rq_repmsg->last_committed =
793 HTON__u64(obd->obd_last_committed);
795 DEBUG_REQ(D_IOCTL, req,
796 "not sending last_committed update");
798 CDEBUG(D_INFO, "last_committed "LPU64", xid "LPX64"\n",
799 obd->obd_last_committed, HTON__u64(req->rq_xid));
803 if (lustre_msg_get_flags(req->rq_reqmsg) & MSG_LAST_REPLAY) {
804 struct obd_device *obd = req->rq_export->exp_obd;
806 if (obd && (obd->obd_flags & OBD_RECOVERING)) {
807 DEBUG_REQ(D_HA, req, "LAST_REPLAY, queuing reply");
808 return target_queue_final_reply(req, rc);
810 /* Lost a race with recovery; let the error path DTRT. */
811 rc = req->rq_status = -ENOTCONN;
815 CERROR("ost: processing error (opcode=%d): %d\n",
816 req->rq_reqmsg->opc, rc);
817 ptlrpc_error(req->rq_svc, req);
819 CDEBUG(D_INODE, "sending reply\n");
820 if (req->rq_repmsg == NULL)
821 CERROR("handler for opcode %d returned rc=0 without "
822 "creating rq_repmsg; needs to return rc != 0!\n",
823 req->rq_reqmsg->opc);
825 oti_to_request(oti, req);
826 ptlrpc_reply(req->rq_svc, req);
832 static int ost_setup(struct obd_device *obddev, obd_count len, void *buf)
834 struct ost_obd *ost = &obddev->u.ost;
839 ost->ost_service = ptlrpc_init_svc(OST_NEVENTS, OST_NBUFS,
840 OST_BUFSIZE, OST_MAXREQSIZE,
841 OST_REQUEST_PORTAL, OSC_REPLY_PORTAL,
843 if (!ost->ost_service) {
844 CERROR("failed to start service\n");
845 GOTO(error_disc, err = -ENOMEM);
848 for (i = 0; i < OST_NUM_THREADS; i++) {
850 sprintf(name, "ll_ost_%02d", i);
851 err = ptlrpc_start_thread(obddev, ost->ost_service, name);
853 CERROR("error starting thread #%d: rc %d\n", i, err);
854 GOTO(error_disc, err = -EINVAL);
864 static int ost_cleanup(struct obd_device * obddev)
866 struct ost_obd *ost = &obddev->u.ost;
871 ptlrpc_stop_all_threads(ost->ost_service);
872 ptlrpc_unregister_service(ost->ost_service);
877 int ost_attach(struct obd_device *dev, obd_count len, void *data)
879 struct lprocfs_static_vars lvars;
881 lprocfs_init_vars(&lvars);
882 return lprocfs_obd_attach(dev, lvars.obd_vars);
885 int ost_detach(struct obd_device *dev)
887 return lprocfs_obd_detach(dev);
890 /* I don't think this function is ever used, since nothing
891 * connects directly to this module.
893 static int ost_connect(struct lustre_handle *conn,
894 struct obd_device *obd, struct obd_uuid *cluuid,
895 struct recovd_obd *recovd,
896 ptlrpc_recovery_cb_t recover)
898 struct obd_export *exp;
902 if (!conn || !obd || !cluuid)
905 rc = class_connect(conn, obd, cluuid);
908 exp = class_conn2export(conn);
914 /* use obd ops to offer management infrastructure */
915 static struct obd_ops ost_obd_ops = {
916 o_owner: THIS_MODULE,
917 o_attach: ost_attach,
918 o_detach: ost_detach,
920 o_cleanup: ost_cleanup,
921 o_connect: ost_connect,
924 static int __init ost_init(void)
926 struct lprocfs_static_vars lvars;
929 lprocfs_init_vars(&lvars);
930 RETURN(class_register_type(&ost_obd_ops, lvars.module_vars,
934 static void __exit ost_exit(void)
936 class_unregister_type(LUSTRE_OST_NAME);
939 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
940 MODULE_DESCRIPTION("Lustre Object Storage Target (OST) v0.01");
941 MODULE_LICENSE("GPL");
943 module_init(ost_init);
944 module_exit(ost_exit);