1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
4 * Copyright (C) 2001-2003 Cluster File Systems, Inc.
5 * Author Peter Braam <braam@clusterfs.com>
7 * This file is part of Lustre, http://www.lustre.org.
9 * Lustre is free software; you can redistribute it and/or
10 * modify it under the terms of version 2 of the GNU General Public
11 * License as published by the Free Software Foundation.
13 * Lustre is distributed in the hope that it will be useful,
14 * but WITHOUT ANY WARRANTY; without even the implied warranty of
15 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 * GNU General Public License for more details.
18 * You should have received a copy of the GNU General Public License
19 * along with Lustre; if not, write to the Free Software
20 * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
22 * For testing and management it is treated as an obd_device,
23 * although * it does not export a full OBD method table (the
24 * requests are coming * in over the wire, so object target modules
25 * do not have a full * method table.)
30 #define DEBUG_SUBSYSTEM S_OSC
33 #include <linux/version.h>
34 #include <linux/module.h>
36 #include <linux/highmem.h>
37 #include <linux/lustre_dlm.h>
38 #if (LINUX_VERSION_CODE >= KERNEL_VERSION(2,5,0))
39 #include <linux/workqueue.h>
40 #include <linux/smp_lock.h>
42 #include <linux/locks.h>
45 #include <liblustre.h>
48 #include <linux/kp30.h>
49 #include <linux/lustre_mds.h> /* for mds_objid */
50 #include <linux/obd_ost.h>
51 #include <linux/obd_lov.h> /* for IOC_LOV_SET_OSC_ACTIVE */
52 #include <linux/ctype.h>
53 #include <linux/init.h>
54 #include <linux/lustre_ha.h>
55 #include <linux/obd_support.h> /* for OBD_FAIL_CHECK */
56 #include <linux/lustre_lite.h> /* for ll_i2info */
57 #include <portals/lib-types.h> /* for PTL_MD_MAX_IOV */
58 #include <linux/lprocfs_status.h>
60 /* It is important that ood_fh remain the first item in this structure: that
61 * way, we don't have to re-pack the obdo's inline data before we send it to
62 * the server, we can just send the whole struct unaltered. */
63 #define OSC_OBDO_DATA_MAGIC 0xD15EA5ED
64 struct osc_obdo_data {
65 struct lustre_handle ood_fh;
66 struct ptlrpc_request *ood_request;
69 #include <linux/obd_lov.h> /* just for the startup assertion; is that wrong? */
71 static int send_sync(struct obd_import *imp, struct ll_fid *rootfid,
72 int level, int msg_flags)
74 struct ptlrpc_request *req;
75 struct mds_body *body;
76 int rc, size = sizeof(*body);
79 req = ptlrpc_prep_req(imp, OST_SYNCFS, 1, &size, NULL);
81 GOTO(out, rc = -ENOMEM);
83 body = lustre_msg_buf(req->rq_reqmsg, 0);
84 req->rq_level = level;
85 req->rq_replen = lustre_msg_size(1, &size);
87 req->rq_reqmsg->flags |= msg_flags;
88 rc = ptlrpc_queue_wait(req);
91 CDEBUG(D_NET, "last_committed="LPU64
92 ", last_xid="LPU64"\n",
93 req->rq_repmsg->last_committed,
94 req->rq_repmsg->last_xid);
99 ptlrpc_req_finished(req);
103 static int signal_completed_replay(struct obd_import *imp)
107 return send_sync(imp, &fid, LUSTRE_CONN_RECOVD, MSG_LAST_REPLAY);
110 static int osc_attach(struct obd_device *dev, obd_count len, void *data)
112 struct lprocfs_static_vars lvars;
114 lprocfs_init_vars(&lvars);
115 return lprocfs_obd_attach(dev, lvars.obd_vars);
118 static int osc_detach(struct obd_device *dev)
120 return lprocfs_obd_detach(dev);
123 /* Pack OSC object metadata for shipment to the MDS. */
124 static int osc_packmd(struct lustre_handle *conn, struct lov_mds_md **lmmp,
125 struct lov_stripe_md *lsm)
130 lmm_size = sizeof(**lmmp);
135 OBD_FREE(*lmmp, lmm_size);
141 OBD_ALLOC(*lmmp, lmm_size);
146 LASSERT(lsm->lsm_object_id);
147 (*lmmp)->lmm_object_id = (lsm->lsm_object_id);
153 static int osc_unpackmd(struct lustre_handle *conn, struct lov_stripe_md **lsmp,
154 struct lov_mds_md *lmm)
159 lsm_size = sizeof(**lsmp);
164 OBD_FREE(*lsmp, lsm_size);
170 OBD_ALLOC(*lsmp, lsm_size);
177 (*lsmp)->lsm_object_id = (lmm->lmm_object_id);
178 LASSERT((*lsmp)->lsm_object_id);
184 inline void oti_from_request(struct obd_trans_info *oti,
185 struct ptlrpc_request *req)
187 if (oti && req->rq_repmsg)
188 oti->oti_transno = NTOH__u64(req->rq_repmsg->transno);
192 static int osc_getattr(struct lustre_handle *conn, struct obdo *oa,
193 struct lov_stripe_md *md)
195 struct ptlrpc_request *request;
196 struct ost_body *body;
197 int rc, size = sizeof(*body);
200 request = ptlrpc_prep_req(class_conn2cliimp(conn), OST_GETATTR, 1,
205 body = lustre_msg_buf(request->rq_reqmsg, 0);
206 #warning FIXME: pack only valid fields instead of memcpy, endianness
207 memcpy(&body->oa, oa, sizeof(*oa));
209 request->rq_replen = lustre_msg_size(1, &size);
211 rc = ptlrpc_queue_wait(request);
213 CERROR("%s failed: rc = %d\n", __FUNCTION__, rc);
217 body = lustre_msg_buf(request->rq_repmsg, 0);
218 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
219 memcpy(oa, &body->oa, sizeof(*oa));
223 ptlrpc_req_finished(request);
227 static int osc_open(struct lustre_handle *conn, struct obdo *oa,
228 struct lov_stripe_md *md, struct obd_trans_info *oti)
230 struct ptlrpc_request *request;
231 struct ost_body *body;
232 int rc, size = sizeof(*body);
235 request = ptlrpc_prep_req(class_conn2cliimp(conn), OST_OPEN, 1, &size,
240 request->rq_flags |= PTL_RPC_FL_REPLAY;
241 body = lustre_msg_buf(request->rq_reqmsg, 0);
242 #warning FIXME: pack only valid fields instead of memcpy, endianness
243 memcpy(&body->oa, oa, sizeof(*oa));
245 request->rq_replen = lustre_msg_size(1, &size);
247 rc = ptlrpc_queue_wait(request);
252 struct osc_obdo_data ood;
253 body = lustre_msg_buf(request->rq_repmsg, 0);
254 memcpy(oa, &body->oa, sizeof(*oa));
256 /* If the open succeeded, we better have a handle */
257 /* BlueArc OSTs don't send back (o_valid | FLHANDLE). sigh.
258 * Temporary workaround until fixed. -phil 24 Feb 03 */
259 //LASSERT(oa->o_valid & OBD_MD_FLHANDLE);
260 oa->o_valid |= OBD_MD_FLHANDLE;
262 memcpy(&ood.ood_fh, obdo_handle(oa), sizeof(ood.ood_fh));
263 ood.ood_request = ptlrpc_request_addref(request);
264 ood.ood_magic = OSC_OBDO_DATA_MAGIC;
266 /* Save this data in the request; it will be passed back to us
267 * in future obdos. This memcpy is guaranteed to be safe,
268 * because we check at compile-time that sizeof(ood) is smaller
269 * than oa->o_inline. */
270 memcpy(&oa->o_inline, &ood, sizeof(ood));
275 ptlrpc_req_finished(request);
279 static int osc_close(struct lustre_handle *conn, struct obdo *oa,
280 struct lov_stripe_md *md, struct obd_trans_info *oti)
282 struct obd_import *import = class_conn2cliimp(conn);
283 struct ptlrpc_request *request;
284 struct ost_body *body;
285 struct osc_obdo_data *ood;
287 int rc, size = sizeof(*body);
291 ood = (struct osc_obdo_data *)&oa->o_inline;
292 LASSERT(ood->ood_magic == OSC_OBDO_DATA_MAGIC);
294 request = ptlrpc_prep_req(import, OST_CLOSE, 1, &size, NULL);
298 body = lustre_msg_buf(request->rq_reqmsg, 0);
299 #warning FIXME: pack only valid fields instead of memcpy, endianness
300 memcpy(&body->oa, oa, sizeof(*oa));
302 request->rq_replen = lustre_msg_size(1, &size);
304 rc = ptlrpc_queue_wait(request);
306 /* FIXME: Does this mean that the file is still open locally?
307 * If not, and I somehow suspect not, we need to cleanup
312 spin_lock_irqsave(&import->imp_lock, flags);
313 ood->ood_request->rq_flags &= ~PTL_RPC_FL_REPLAY;
314 /* see comments in llite/file.c:ll_mdc_close() */
315 if (ood->ood_request->rq_transno) {
316 LBUG(); /* this can't happen yet */
317 if (!request->rq_transno) {
318 request->rq_transno = ood->ood_request->rq_transno;
319 ptlrpc_retain_replayable_request(request, import);
321 spin_unlock_irqrestore(&import->imp_lock, flags);
323 spin_unlock_irqrestore(&import->imp_lock, flags);
324 ptlrpc_req_finished(ood->ood_request);
327 body = lustre_msg_buf(request->rq_repmsg, 0);
328 memcpy(oa, &body->oa, sizeof(*oa));
332 ptlrpc_req_finished(request);
336 static int osc_setattr(struct lustre_handle *conn, struct obdo *oa,
337 struct lov_stripe_md *md, struct obd_trans_info *oti)
339 struct ptlrpc_request *request;
340 struct ost_body *body;
341 int rc, size = sizeof(*body);
344 request = ptlrpc_prep_req(class_conn2cliimp(conn), OST_SETATTR, 1,
349 body = lustre_msg_buf(request->rq_reqmsg, 0);
350 memcpy(&body->oa, oa, sizeof(*oa));
352 request->rq_replen = lustre_msg_size(1, &size);
354 rc = ptlrpc_queue_wait(request);
356 ptlrpc_req_finished(request);
360 static int osc_create(struct lustre_handle *conn, struct obdo *oa,
361 struct lov_stripe_md **ea, struct obd_trans_info *oti_in)
363 struct ptlrpc_request *request;
364 struct ost_body *body;
365 struct lov_stripe_md *lsm;
366 struct obd_trans_info *oti, trans_info;
367 int rc, size = sizeof(*body);
375 rc = obd_alloc_memmd(conn, &lsm);
385 request = ptlrpc_prep_req(class_conn2cliimp(conn), OST_CREATE, 1, &size,
388 GOTO(out, rc = -ENOMEM);
390 body = lustre_msg_buf(request->rq_reqmsg, 0);
391 memcpy(&body->oa, oa, sizeof(*oa));
393 request->rq_replen = lustre_msg_size(1, &size);
395 rc = ptlrpc_queue_wait(request);
399 body = lustre_msg_buf(request->rq_repmsg, 0);
400 memcpy(oa, &body->oa, sizeof(*oa));
402 lsm->lsm_object_id = oa->o_id;
403 lsm->lsm_stripe_count = 0;
406 oti_from_request(oti, request);
407 CDEBUG(D_HA, "transno: "LPD64"\n", oti->oti_transno);
410 ptlrpc_req_finished(request);
413 obd_free_memmd(conn, &lsm);
417 static int osc_punch(struct lustre_handle *conn, struct obdo *oa,
418 struct lov_stripe_md *md, obd_size start,
419 obd_size end, struct obd_trans_info *oti)
421 struct ptlrpc_request *request;
422 struct ost_body *body;
423 int rc, size = sizeof(*body);
431 request = ptlrpc_prep_req(class_conn2cliimp(conn), OST_PUNCH, 1, &size,
436 body = lustre_msg_buf(request->rq_reqmsg, 0);
437 #warning FIXME: pack only valid fields instead of memcpy, endianness, valid
438 memcpy(&body->oa, oa, sizeof(*oa));
440 /* overload the size and blocks fields in the oa with start/end */
441 body->oa.o_size = HTON__u64(start);
442 body->oa.o_blocks = HTON__u64(end);
443 body->oa.o_valid |= HTON__u32(OBD_MD_FLSIZE | OBD_MD_FLBLOCKS);
445 request->rq_replen = lustre_msg_size(1, &size);
447 rc = ptlrpc_queue_wait(request);
451 body = lustre_msg_buf(request->rq_repmsg, 0);
452 memcpy(oa, &body->oa, sizeof(*oa));
456 ptlrpc_req_finished(request);
460 static int osc_destroy(struct lustre_handle *conn, struct obdo *oa,
461 struct lov_stripe_md *ea, struct obd_trans_info *oti)
463 struct ptlrpc_request *request;
464 struct ost_body *body;
465 int rc, size = sizeof(*body);
472 request = ptlrpc_prep_req(class_conn2cliimp(conn), OST_DESTROY, 1,
477 body = lustre_msg_buf(request->rq_reqmsg, 0);
478 #warning FIXME: pack only valid fields instead of memcpy, endianness
479 memcpy(&body->oa, oa, sizeof(*oa));
481 request->rq_replen = lustre_msg_size(1, &size);
483 rc = ptlrpc_queue_wait(request);
487 body = lustre_msg_buf(request->rq_repmsg, 0);
488 memcpy(oa, &body->oa, sizeof(*oa));
492 ptlrpc_req_finished(request);
496 /* Our bulk-unmapping bottom half. */
497 static void unmap_and_decref_bulk_desc(void *data)
499 struct ptlrpc_bulk_desc *desc = data;
500 struct list_head *tmp;
503 list_for_each(tmp, &desc->bd_page_list) {
504 struct ptlrpc_bulk_page *bulk;
505 bulk = list_entry(tmp, struct ptlrpc_bulk_page, bp_link);
507 kunmap(bulk->bp_page);
511 ptlrpc_bulk_decref(desc);
516 /* this is the callback function which is invoked by the Portals
517 * event handler associated with the bulk_sink queue and bulk_source queue.
519 static void osc_ptl_ev_hdlr(struct ptlrpc_bulk_desc *desc)
523 LASSERT(desc->bd_brw_set != NULL);
524 LASSERT(desc->bd_brw_set->brw_callback != NULL);
526 /* It's important that you don't use desc->bd_brw_set after this
527 * callback runs. If you do, take a reference on it. */
528 desc->bd_brw_set->brw_callback(desc->bd_brw_set, CB_PHASE_FINISH);
530 /* We can't kunmap the desc from interrupt context, so we do it from
531 * the bottom half above. */
532 prepare_work(&desc->bd_queue, unmap_and_decref_bulk_desc, desc);
533 schedule_work(&desc->bd_queue);
539 * This is called when there was a bulk error return. However, we don't know
540 * whether the bulk completed or not. We cancel the portals bulk descriptors,
541 * so that if the OST decides to send them later we don't double free. Then
542 * remove this descriptor from the set so that the set callback doesn't wait
543 * forever for the last CB_PHASE_FINISH to be called, and finally dump all of
544 * the bulk descriptor references.
546 static void osc_ptl_ev_abort(struct ptlrpc_bulk_desc *desc)
550 LASSERT(desc->bd_brw_set != NULL);
552 /* XXX reconcile this with ll_sync_brw_timeout() handling, and/or
553 * just make osc_ptl_ev_hdlr() check desc->bd_flags for either
554 * PTL_BULK_FL_RCVD or PTL_BULK_FL_SENT, and pass CB_PHASE_ABORT
555 * to brw_callback() and do the rest of the cleanup there. I
556 * also think ll_sync_brw_timeout() is missing an PtlMEUnlink,
557 * but I could be wrong.
559 if (ptlrpc_abort_bulk(desc)) {
563 obd_brw_set_del(desc);
564 unmap_and_decref_bulk_desc(desc);
569 static int osc_brw_read(struct lustre_handle *conn, struct lov_stripe_md *lsm,
570 obd_count page_count, struct brw_page *pga,
571 struct obd_brw_set *set)
573 struct obd_import *imp = class_conn2cliimp(conn);
574 struct ptlrpc_connection *connection = imp->imp_connection;
575 struct ptlrpc_request *request = NULL;
576 struct ptlrpc_bulk_desc *desc = NULL;
577 struct ost_body *body;
578 int rc, size[3] = {sizeof(*body)}, mapped = 0;
579 struct obd_ioobj *iooptr;
580 struct niobuf_remote *nioptr;
585 size[1] = sizeof(struct obd_ioobj);
586 size[2] = page_count * sizeof(struct niobuf_remote);
588 request = ptlrpc_prep_req(imp, OST_READ, 3, size, NULL);
592 body = lustre_msg_buf(request->rq_reqmsg, 0);
593 body->oa.o_valid = HTON__u32(OBD_MD_FLCKSUM * CHECKSUM_BULK);
595 desc = ptlrpc_prep_bulk(connection);
597 GOTO(out_req, rc = -ENOMEM);
598 desc->bd_portal = OST_BULK_PORTAL;
599 desc->bd_ptl_ev_hdlr = osc_ptl_ev_hdlr;
600 CDEBUG(D_PAGE, "desc = %p\n", desc);
602 iooptr = lustre_msg_buf(request->rq_reqmsg, 1);
603 nioptr = lustre_msg_buf(request->rq_reqmsg, 2);
604 ost_pack_ioo(iooptr, lsm, page_count);
605 /* end almost identical to brw_write case */
607 xid = ptlrpc_next_xid(); /* single xid for all pages */
609 obd_kmap_get(page_count, 0);
611 for (mapped = 0; mapped < page_count; mapped++, nioptr++) {
612 struct ptlrpc_bulk_page *bulk = ptlrpc_prep_bulk_page(desc);
614 unmap_and_decref_bulk_desc(desc);
615 GOTO(out_req, rc = -ENOMEM);
618 LASSERT(mapped == 0 || pga[mapped].off > pga[mapped - 1].off);
620 bulk->bp_xid = xid; /* single xid for all pages */
621 bulk->bp_buf = kmap(pga[mapped].pg);
622 bulk->bp_page = pga[mapped].pg;
623 bulk->bp_buflen = PAGE_SIZE;
624 ost_pack_niobuf(nioptr, pga[mapped].off, pga[mapped].count,
625 pga[mapped].flag, bulk->bp_xid);
629 * Register the bulk first, because the reply could arrive out of order,
630 * and we want to be ready for the bulk data.
632 * One reference is released when osc_ptl_ev_hdlr() is called by
633 * portals, the other when the caller removes us from the "set" list.
635 * On error, we never do the brw_finish, so we handle all decrefs.
637 if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_READ_BULK)) {
638 CERROR("obd_fail_loc=%x, skipping register_bulk\n",
639 OBD_FAIL_OSC_BRW_READ_BULK);
641 rc = ptlrpc_register_bulk_put(desc);
643 unmap_and_decref_bulk_desc(desc);
646 obd_brw_set_add(set, desc);
649 request->rq_flags |= PTL_RPC_FL_NO_RESEND;
650 request->rq_replen = lustre_msg_size(1, size);
651 rc = ptlrpc_queue_wait(request);
653 /* XXX bug 937 here */
654 if (rc == -ETIMEDOUT && (request->rq_flags & PTL_RPC_FL_RESEND)) {
655 DEBUG_REQ(D_HA, request, "BULK TIMEOUT");
656 ptlrpc_req_finished(request);
661 osc_ptl_ev_abort(desc);
666 body = lustre_msg_buf(request->rq_repmsg, 0);
667 if (body->oa.o_valid & NTOH__u32(OBD_MD_FLCKSUM)) {
668 static int cksum_counter;
669 __u64 server_cksum = NTOH__u64(body->oa.o_rdev);
672 for (mapped = 0; mapped < page_count; mapped++) {
673 char *ptr = kmap(pga[mapped].pg);
674 int off = pga[mapped].off & (PAGE_SIZE - 1);
675 int len = pga[mapped].count;
677 LASSERT(off + len <= PAGE_SIZE);
678 ost_checksum(&cksum, ptr + off, len);
679 kunmap(pga[mapped].pg);
683 if (server_cksum != cksum) {
684 CERROR("Bad checksum: server "LPX64", client "LPX64
685 ", server NID "LPX64"\n", server_cksum, cksum,
686 imp->imp_connection->c_peer.peer_nid);
688 } else if ((cksum_counter & (-cksum_counter)) == cksum_counter)
689 CERROR("Checksum %u from "LPX64" OK: "LPX64"\n",
691 imp->imp_connection->c_peer.peer_nid, cksum);
693 static int cksum_missed;
695 if ((cksum_missed & (-cksum_missed)) == cksum_missed)
696 CERROR("Request checksum %u from "LPX64", no reply\n",
698 imp->imp_connection->c_peer.peer_nid);
704 ptlrpc_req_finished(request);
708 static int osc_brw_write(struct lustre_handle *conn, struct lov_stripe_md *lsm,
709 obd_count page_count, struct brw_page *pga,
710 struct obd_brw_set *set, struct obd_trans_info *oti)
712 struct obd_import *imp = class_conn2cliimp(conn);
713 struct ptlrpc_connection *connection = imp->imp_connection;
714 struct ptlrpc_request *request = NULL;
715 struct ptlrpc_bulk_desc *desc = NULL;
716 struct ost_body *body;
717 int rc, size[3] = {sizeof(*body)}, mapped = 0;
718 struct obd_ioobj *iooptr;
719 struct niobuf_remote *nioptr;
727 size[1] = sizeof(struct obd_ioobj);
728 size[2] = page_count * sizeof(struct niobuf_remote);
730 request = ptlrpc_prep_req(imp, OST_WRITE, 3, size, NULL);
734 body = lustre_msg_buf(request->rq_reqmsg, 0);
736 desc = ptlrpc_prep_bulk(connection);
738 GOTO(out_req, rc = -ENOMEM);
739 desc->bd_portal = OSC_BULK_PORTAL;
740 desc->bd_ptl_ev_hdlr = osc_ptl_ev_hdlr;
741 CDEBUG(D_PAGE, "desc = %p\n", desc);
743 iooptr = lustre_msg_buf(request->rq_reqmsg, 1);
744 nioptr = lustre_msg_buf(request->rq_reqmsg, 2);
745 ost_pack_ioo(iooptr, lsm, page_count);
746 /* end almost identical to brw_read case */
748 xid = ptlrpc_next_xid(); /* single xid for all pages */
750 obd_kmap_get(page_count, 0);
752 for (mapped = 0; mapped < page_count; mapped++, nioptr++) {
753 struct ptlrpc_bulk_page *bulk = ptlrpc_prep_bulk_page(desc);
755 unmap_and_decref_bulk_desc(desc);
756 GOTO(out_req, rc = -ENOMEM);
759 LASSERT(mapped == 0 || pga[mapped].off > pga[mapped - 1].off);
761 bulk->bp_xid = xid; /* single xid for all pages */
762 bulk->bp_buf = kmap(pga[mapped].pg);
763 bulk->bp_page = pga[mapped].pg;
764 /* matching ptlrpc_bulk_get assert */
765 LASSERT(pga[mapped].count > 0);
766 bulk->bp_buflen = pga[mapped].count;
767 ost_pack_niobuf(nioptr, pga[mapped].off, pga[mapped].count,
768 pga[mapped].flag, bulk->bp_xid);
769 ost_checksum(&cksum, bulk->bp_buf, bulk->bp_buflen);
773 body->oa.o_rdev = HTON__u64(cksum);
774 body->oa.o_valid |= HTON__u32(OBD_MD_FLCKSUM);
777 * Register the bulk first, because the reply could arrive out of
778 * order, and we want to be ready for the bulk data.
780 * One reference is released when brw_finish is complete, the other
781 * when the caller removes us from the "set" list.
783 * On error, we never do the brw_finish, so we handle all decrefs.
785 if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_WRITE_BULK)) {
786 CERROR("obd_fail_loc=%x, skipping register_bulk\n",
787 OBD_FAIL_OSC_BRW_WRITE_BULK);
789 rc = ptlrpc_register_bulk_get(desc);
791 unmap_and_decref_bulk_desc(desc);
794 obd_brw_set_add(set, desc);
797 request->rq_flags |= PTL_RPC_FL_NO_RESEND;
798 request->rq_replen = lustre_msg_size(1, size);
799 rc = ptlrpc_queue_wait(request);
801 /* XXX bug 937 here */
802 if (rc == -ETIMEDOUT && (request->rq_flags & PTL_RPC_FL_RESEND)) {
803 DEBUG_REQ(D_HA, request, "BULK TIMEOUT");
804 ptlrpc_req_finished(request);
809 osc_ptl_ev_abort(desc);
815 ptlrpc_req_finished(request);
820 #define min_t(a,b,c) ( b<c ) ? b : c
823 #warning "FIXME: make values dynamic based on get_info at setup (bug 665)"
824 #define OSC_BRW_MAX_SIZE 65536
825 #define OSC_BRW_MAX_IOV min_t(int, PTL_MD_MAX_IOV, OSC_BRW_MAX_SIZE/PAGE_SIZE)
827 #warning "FIXME: make these values dynamic based on a get_info call at setup"
828 #define OSC_BRW_MAX_SIZE 65536
829 #define OSC_BRW_MAX_IOV min_t(int, PTL_MD_MAX_IOV, OSC_BRW_MAX_SIZE/PAGE_SIZE)
831 static int osc_brw(int cmd, struct lustre_handle *conn,
832 struct lov_stripe_md *md, obd_count page_count,
833 struct brw_page *pga, struct obd_brw_set *set,
834 struct obd_trans_info *oti)
839 obd_count pages_per_brw;
842 if (page_count > OSC_BRW_MAX_IOV)
843 pages_per_brw = OSC_BRW_MAX_IOV;
845 pages_per_brw = page_count;
847 if (cmd & OBD_BRW_WRITE)
848 rc = osc_brw_write(conn, md, pages_per_brw, pga,
851 rc = osc_brw_read(conn, md, pages_per_brw, pga, set);
856 page_count -= pages_per_brw;
857 pga += pages_per_brw;
863 /* Note: caller will lock/unlock, and set uptodate on the pages */
864 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
865 static int sanosc_brw_read(struct lustre_handle *conn,
866 struct lov_stripe_md *lsm,
867 obd_count page_count,
868 struct brw_page *pga,
869 struct obd_brw_set *set)
871 struct ptlrpc_request *request = NULL;
872 struct ost_body *body;
873 struct niobuf_remote *nioptr;
874 struct obd_ioobj *iooptr;
875 int rc, j, size[3] = {sizeof(*body)}, mapped = 0;
878 size[1] = sizeof(struct obd_ioobj);
879 size[2] = page_count * sizeof(*nioptr);
881 request = ptlrpc_prep_req(class_conn2cliimp(conn), OST_SAN_READ, 3,
886 body = lustre_msg_buf(request->rq_reqmsg, 0);
887 iooptr = lustre_msg_buf(request->rq_reqmsg, 1);
888 nioptr = lustre_msg_buf(request->rq_reqmsg, 2);
889 ost_pack_ioo(iooptr, lsm, page_count);
891 obd_kmap_get(page_count, 0);
893 for (mapped = 0; mapped < page_count; mapped++, nioptr++) {
894 LASSERT(PageLocked(pga[mapped].pg));
895 LASSERT(mapped == 0 || pga[mapped].off > pga[mapped - 1].off);
897 kmap(pga[mapped].pg);
898 ost_pack_niobuf(nioptr, pga[mapped].off, pga[mapped].count,
899 pga[mapped].flag, 0);
902 size[1] = page_count * sizeof(*nioptr);
903 request->rq_replen = lustre_msg_size(2, size);
905 rc = ptlrpc_queue_wait(request);
909 nioptr = lustre_msg_buf(request->rq_repmsg, 1);
911 GOTO(out_unmap, rc = -EINVAL);
913 if (request->rq_repmsg->buflens[1] != size[1]) {
914 CERROR("buffer length wrong (%d vs. %d)\n",
915 request->rq_repmsg->buflens[1], size[1]);
916 GOTO(out_unmap, rc = -EINVAL);
920 for (j = 0; j < page_count; j++, nioptr++) {
921 struct page *page = pga[j].pg;
922 struct buffer_head *bh;
925 ost_unpack_niobuf(nioptr, nioptr);
926 /* got san device associated */
927 LASSERT(class_conn2obd(conn));
928 dev = class_conn2obd(conn)->u.cli.cl_sandev;
931 if (!nioptr->offset) {
932 CDEBUG(D_PAGE, "hole at ino %lu; index %ld\n",
933 page->mapping->host->i_ino,
935 memset(page_address(page), 0, PAGE_SIZE);
939 if (!page->buffers) {
940 create_empty_buffers(page, dev, PAGE_SIZE);
943 clear_bit(BH_New, &bh->b_state);
944 set_bit(BH_Mapped, &bh->b_state);
945 bh->b_blocknr = (unsigned long)nioptr->offset;
947 clear_bit(BH_Uptodate, &bh->b_state);
949 ll_rw_block(READ, 1, &bh);
953 /* if buffer already existed, it must be the
954 * one we mapped before, check it */
955 LASSERT(!test_bit(BH_New, &bh->b_state));
956 LASSERT(test_bit(BH_Mapped, &bh->b_state));
957 LASSERT(bh->b_blocknr == (unsigned long)nioptr->offset);
959 /* wait it's io completion */
960 if (test_bit(BH_Lock, &bh->b_state))
963 if (!test_bit(BH_Uptodate, &bh->b_state))
964 ll_rw_block(READ, 1, &bh);
968 /* must do syncronous write here */
970 if (!buffer_uptodate(bh)) {
978 ptlrpc_req_finished(request);
982 /* Clean up on error. */
984 kunmap(pga[mapped].pg);
986 obd_kmap_put(page_count);
991 static int sanosc_brw_write(struct lustre_handle *conn,
992 struct lov_stripe_md *lsm,
993 obd_count page_count,
994 struct brw_page *pga,
995 struct obd_brw_set *set)
997 struct ptlrpc_request *request = NULL;
998 struct ost_body *body;
999 struct niobuf_remote *nioptr;
1000 struct obd_ioobj *iooptr;
1001 int rc, j, size[3] = {sizeof(*body)}, mapped = 0;
1004 size[1] = sizeof(struct obd_ioobj);
1005 size[2] = page_count * sizeof(*nioptr);
1007 request = ptlrpc_prep_req(class_conn2cliimp(conn), OST_SAN_WRITE,
1012 body = lustre_msg_buf(request->rq_reqmsg, 0);
1013 iooptr = lustre_msg_buf(request->rq_reqmsg, 1);
1014 nioptr = lustre_msg_buf(request->rq_reqmsg, 2);
1015 ost_pack_ioo(iooptr, lsm, page_count);
1017 /* map pages, and pack request */
1018 obd_kmap_get(page_count, 0);
1019 for (mapped = 0; mapped < page_count; mapped++, nioptr++) {
1020 LASSERT(PageLocked(pga[mapped].pg));
1021 LASSERT(mapped == 0 || pga[mapped].off > pga[mapped - 1].off);
1023 kmap(pga[mapped].pg);
1024 ost_pack_niobuf(nioptr, pga[mapped].off, pga[mapped].count,
1025 pga[mapped].flag, 0);
1028 size[1] = page_count * sizeof(*nioptr);
1029 request->rq_replen = lustre_msg_size(2, size);
1031 rc = ptlrpc_queue_wait(request);
1033 GOTO(out_unmap, rc);
1035 nioptr = lustre_msg_buf(request->rq_repmsg, 1);
1037 GOTO(out_unmap, rc = -EINVAL);
1039 if (request->rq_repmsg->buflens[1] != size[1]) {
1040 CERROR("buffer length wrong (%d vs. %d)\n",
1041 request->rq_repmsg->buflens[1], size[1]);
1042 GOTO(out_unmap, rc = -EINVAL);
1046 for (j = 0; j < page_count; j++, nioptr++) {
1047 struct page *page = pga[j].pg;
1048 struct buffer_head *bh;
1051 ost_unpack_niobuf(nioptr, nioptr);
1052 /* got san device associated */
1053 LASSERT(class_conn2obd(conn));
1054 dev = class_conn2obd(conn)->u.cli.cl_sandev;
1056 if (!page->buffers) {
1057 create_empty_buffers(page, dev, PAGE_SIZE);
1060 LASSERT(!test_bit(BH_New, &page->buffers->b_state));
1061 LASSERT(test_bit(BH_Mapped, &page->buffers->b_state));
1062 LASSERT(page->buffers->b_blocknr ==
1063 (unsigned long)nioptr->offset);
1069 /* if buffer locked, wait it's io completion */
1070 if (test_bit(BH_Lock, &bh->b_state))
1073 clear_bit(BH_New, &bh->b_state);
1074 set_bit(BH_Mapped, &bh->b_state);
1076 /* override the block nr */
1077 bh->b_blocknr = (unsigned long)nioptr->offset;
1079 /* we are about to write it, so set it
1081 * page lock should garentee no race condition here */
1082 set_bit(BH_Uptodate, &bh->b_state);
1083 set_bit(BH_Dirty, &bh->b_state);
1085 ll_rw_block(WRITE, 1, &bh);
1087 /* must do syncronous write here */
1089 if (!buffer_uptodate(bh) || test_bit(BH_Dirty, &bh->b_state)) {
1097 ptlrpc_req_finished(request);
1101 /* Clean up on error. */
1102 while (mapped-- > 0)
1103 kunmap(pga[mapped].pg);
1105 obd_kmap_put(page_count);
1110 static int sanosc_brw(int cmd, struct lustre_handle *conn,
1111 struct lov_stripe_md *lsm, obd_count page_count,
1112 struct brw_page *pga, struct obd_brw_set *set,
1113 struct obd_trans_info *oti)
1117 while (page_count) {
1118 obd_count pages_per_brw;
1121 if (page_count > OSC_BRW_MAX_IOV)
1122 pages_per_brw = OSC_BRW_MAX_IOV;
1124 pages_per_brw = page_count;
1126 if (cmd & OBD_BRW_WRITE)
1127 rc = sanosc_brw_write(conn, lsm, pages_per_brw,
1130 rc = sanosc_brw_read(conn, lsm, pages_per_brw, pga,set);
1135 page_count -= pages_per_brw;
1136 pga += pages_per_brw;
1143 static int osc_enqueue(struct lustre_handle *connh, struct lov_stripe_md *lsm,
1144 struct lustre_handle *parent_lock,
1145 __u32 type, void *extentp, int extent_len, __u32 mode,
1146 int *flags, void *callback, void *data, int datalen,
1147 struct lustre_handle *lockh)
1149 struct ldlm_res_id res_id = { .name = {lsm->lsm_object_id} };
1150 struct obd_device *obddev = class_conn2obd(connh);
1151 struct ldlm_extent *extent = extentp;
1155 /* Filesystem locks are given a bit of special treatment: if
1156 * this is not a file size lock (which has end == -1), we
1157 * fixup the lock to start and end on page boundaries. */
1158 if (extent->end != OBD_OBJECT_EOF) {
1159 extent->start &= PAGE_MASK;
1160 extent->end = (extent->end & PAGE_MASK) + PAGE_SIZE - 1;
1163 /* Next, search for already existing extent locks that will cover us */
1164 rc = ldlm_lock_match(obddev->obd_namespace, 0, &res_id, type, extent,
1165 sizeof(extent), mode, lockh);
1167 /* We already have a lock, and it's referenced */
1168 RETURN(ELDLM_LOCK_MATCHED);
1170 /* If we're trying to read, we also search for an existing PW lock. The
1171 * VFS and page cache already protect us locally, so lots of readers/
1172 * writers can share a single PW lock.
1174 * There are problems with conversion deadlocks, so instead of
1175 * converting a read lock to a write lock, we'll just enqueue a new
1178 * At some point we should cancel the read lock instead of making them
1179 * send us a blocking callback, but there are problems with canceling
1180 * locks out from other users right now, too. */
1182 if (mode == LCK_PR) {
1183 rc = ldlm_lock_match(obddev->obd_namespace, 0, &res_id, type,
1184 extent, sizeof(extent), LCK_PW, lockh);
1186 /* FIXME: This is not incredibly elegant, but it might
1187 * be more elegant than adding another parameter to
1188 * lock_match. I want a second opinion. */
1189 ldlm_lock_addref(lockh, LCK_PR);
1190 ldlm_lock_decref(lockh, LCK_PW);
1192 RETURN(ELDLM_LOCK_MATCHED);
1196 rc = ldlm_cli_enqueue(connh, NULL, obddev->obd_namespace, parent_lock,
1197 res_id, type, extent, sizeof(extent), mode, flags,
1198 ldlm_completion_ast, callback, data, NULL,
1203 static int osc_cancel(struct lustre_handle *oconn, struct lov_stripe_md *md,
1204 __u32 mode, struct lustre_handle *lockh)
1208 ldlm_lock_decref(lockh, mode);
1213 static int osc_cancel_unused(struct lustre_handle *connh,
1214 struct lov_stripe_md *lsm, int flags)
1216 struct obd_device *obddev = class_conn2obd(connh);
1217 struct ldlm_res_id res_id = { .name = {lsm->lsm_object_id} };
1219 return ldlm_cli_cancel_unused(obddev->obd_namespace, &res_id, flags);
1222 static int osc_statfs(struct lustre_handle *conn, struct obd_statfs *osfs)
1224 struct ptlrpc_request *request;
1225 int rc, size = sizeof(*osfs);
1228 request = ptlrpc_prep_req(class_conn2cliimp(conn), OST_STATFS, 0, NULL,
1233 request->rq_replen = lustre_msg_size(1, &size);
1235 rc = ptlrpc_queue_wait(request);
1237 CERROR("%s failed: rc = %d\n", __FUNCTION__, rc);
1241 obd_statfs_unpack(osfs, lustre_msg_buf(request->rq_repmsg, 0));
1245 ptlrpc_req_finished(request);
1249 /* Retrieve object striping information.
1251 * @lmmu is a pointer to an in-core struct with lmm_ost_count indicating
1252 * the maximum number of OST indices which will fit in the user buffer.
1253 * lmm_magic must be LOV_MAGIC (we only use 1 slot here).
1255 static int osc_getstripe(struct lustre_handle *conn, struct lov_stripe_md *lsm,
1256 struct lov_mds_md *lmmu)
1258 struct lov_mds_md lmm, *lmmk;
1265 rc = copy_from_user(&lmm, lmmu, sizeof(lmm));
1269 if (lmm.lmm_magic != LOV_MAGIC)
1272 if (lmm.lmm_ost_count < 1)
1275 lmm_size = sizeof(lmm) + sizeof(lmm.lmm_objects[0]);
1276 OBD_ALLOC(lmmk, lmm_size);
1280 lmmk->lmm_stripe_count = 1;
1281 lmmk->lmm_ost_count = 1;
1282 lmmk->lmm_object_id = lsm->lsm_object_id;
1283 lmmk->lmm_objects[0].l_object_id = lsm->lsm_object_id;
1285 if (copy_to_user(lmmu, lmmk, lmm_size))
1288 OBD_FREE(lmmk, lmm_size);
1293 static int osc_iocontrol(unsigned int cmd, struct lustre_handle *conn, int len,
1294 void *karg, void *uarg)
1296 struct obd_device *obddev = class_conn2obd(conn);
1297 struct obd_ioctl_data *data = karg;
1303 case IOC_LDLM_TEST: {
1304 err = ldlm_test(obddev, conn);
1305 CERROR("-- done err %d\n", err);
1308 case IOC_LDLM_REGRESS_START: {
1309 unsigned int numthreads = 1;
1310 unsigned int numheld = 10;
1311 unsigned int numres = 10;
1312 unsigned int numext = 10;
1315 if (data->ioc_inllen1) {
1316 parse = data->ioc_inlbuf1;
1317 if (*parse != '\0') {
1318 while(isspace(*parse)) parse++;
1319 numthreads = simple_strtoul(parse, &parse, 0);
1320 while(isspace(*parse)) parse++;
1322 if (*parse != '\0') {
1323 while(isspace(*parse)) parse++;
1324 numheld = simple_strtoul(parse, &parse, 0);
1325 while(isspace(*parse)) parse++;
1327 if (*parse != '\0') {
1328 while(isspace(*parse)) parse++;
1329 numres = simple_strtoul(parse, &parse, 0);
1330 while(isspace(*parse)) parse++;
1332 if (*parse != '\0') {
1333 while(isspace(*parse)) parse++;
1334 numext = simple_strtoul(parse, &parse, 0);
1335 while(isspace(*parse)) parse++;
1339 err = ldlm_regression_start(obddev, conn, numthreads,
1340 numheld, numres, numext);
1342 CERROR("-- done err %d\n", err);
1345 case IOC_LDLM_REGRESS_STOP: {
1346 err = ldlm_regression_stop();
1347 CERROR("-- done err %d\n", err);
1351 case IOC_OSC_REGISTER_LOV: {
1352 if (obddev->u.cli.cl_containing_lov)
1353 GOTO(out, err = -EALREADY);
1354 obddev->u.cli.cl_containing_lov = (struct obd_device *)karg;
1357 case OBD_IOC_LOV_GET_CONFIG: {
1359 struct lov_desc *desc;
1360 struct obd_uuid uuid;
1364 if (obd_ioctl_getdata(&buf, &len, (void *)uarg))
1365 GOTO(out, err = -EINVAL);
1367 data = (struct obd_ioctl_data *)buf;
1369 if (sizeof(*desc) > data->ioc_inllen1) {
1371 GOTO(out, err = -EINVAL);
1374 if (data->ioc_inllen2 < sizeof(uuid)) {
1376 GOTO(out, err = -EINVAL);
1379 desc = (struct lov_desc *)data->ioc_inlbuf1;
1380 desc->ld_tgt_count = 1;
1381 desc->ld_active_tgt_count = 1;
1382 desc->ld_default_stripe_count = 1;
1383 desc->ld_default_stripe_size = 0;
1384 desc->ld_default_stripe_offset = 0;
1385 desc->ld_pattern = 0;
1386 memcpy(&desc->ld_uuid, &obddev->obd_uuid, sizeof(uuid));
1388 memcpy(data->ioc_inlbuf2, &obddev->obd_uuid, sizeof(uuid));
1390 err = copy_to_user((void *)uarg, buf, len);
1396 case LL_IOC_LOV_SETSTRIPE:
1397 err = obd_alloc_memmd(conn, karg);
1401 case LL_IOC_LOV_GETSTRIPE:
1402 err = osc_getstripe(conn, karg, uarg);
1405 CERROR ("osc_ioctl(): unrecognised ioctl %#x\n", cmd);
1406 GOTO(out, err = -ENOTTY);
1412 static void set_osc_active(struct obd_import *imp, int active)
1414 struct obd_device *notify_obd;
1416 LASSERT(imp->imp_obd);
1418 notify_obd = imp->imp_obd->u.cli.cl_containing_lov;
1420 if (notify_obd == NULL)
1423 /* How gross is _this_? */
1424 if (!list_empty(¬ify_obd->obd_exports)) {
1426 struct lustre_handle fakeconn;
1427 struct obd_ioctl_data ioc_data = { 0 };
1428 struct obd_export *exp =
1429 list_entry(notify_obd->obd_exports.next,
1430 struct obd_export, exp_obd_chain);
1432 fakeconn.addr = (__u64)(unsigned long)exp;
1433 fakeconn.cookie = exp->exp_cookie;
1434 ioc_data.ioc_inlbuf1 =
1435 (char *)&imp->imp_obd->u.cli.cl_target_uuid;
1436 ioc_data.ioc_offset = active;
1437 rc = obd_iocontrol(IOC_LOV_SET_OSC_ACTIVE, &fakeconn,
1438 sizeof ioc_data, &ioc_data, NULL);
1440 CERROR("error disabling %s on LOV %p/%s: %d\n",
1441 imp->imp_obd->u.cli.cl_target_uuid.uuid,
1442 notify_obd, notify_obd->obd_uuid.uuid, rc);
1444 CDEBUG(D_HA, "No exports for obd %p/%s, can't notify about "
1445 "%p\n", notify_obd, notify_obd->obd_uuid.uuid,
1446 imp->imp_obd->obd_uuid.uuid);
1450 static int osc_recover(struct obd_import *imp, int phase)
1453 unsigned long flags;
1455 struct ptlrpc_request *req;
1456 struct ldlm_namespace *ns = imp->imp_obd->obd_namespace;
1459 CDEBUG(D_HA, "%s: entering phase: %d\n",
1460 imp->imp_obd->obd_name, phase);
1463 case PTLRPC_RECOVD_PHASE_PREPARE: {
1464 if (imp->imp_flags & IMP_REPLAYABLE) {
1465 CDEBUG(D_HA, "failover OST\n");
1466 /* If we're a failover OSC/OST, just cancel unused
1467 * locks to simplify lock replay.
1469 ldlm_cli_cancel_unused(ns, NULL, LDLM_FL_LOCAL_ONLY);
1471 CDEBUG(D_HA, "non-failover OST\n");
1472 /* Non-failover OSTs (LLNL scenario) disable the OSC
1473 * and invalidate local state.
1475 ldlm_namespace_cleanup(ns, 1 /* no network ops */);
1476 ptlrpc_abort_inflight(imp, 0);
1477 set_osc_active(imp, 0 /* inactive */);
1482 case PTLRPC_RECOVD_PHASE_RECOVER: {
1484 imp->imp_flags &= ~IMP_INVALID;
1485 rc = ptlrpc_reconnect_import(imp, OST_CONNECT, &req);
1487 msg_flags = req->rq_repmsg
1488 ? lustre_msg_get_op_flags(req->rq_repmsg)
1491 if (rc == -EBUSY && (msg_flags & MSG_CONNECT_RECOVERING))
1492 CERROR("reconnect denied by recovery; should retry\n");
1495 if (phase != PTLRPC_RECOVD_PHASE_NOTCONN) {
1496 CERROR("can't reconnect, invalidating\n");
1497 ldlm_namespace_cleanup(ns, 1);
1498 ptlrpc_abort_inflight(imp, 0);
1500 imp->imp_flags |= IMP_INVALID;
1501 ptlrpc_req_finished(req);
1505 if (msg_flags & MSG_CONNECT_RECOVERING) {
1506 /* Replay if they want it. */
1507 DEBUG_REQ(D_HA, req, "OST wants replay");
1508 rc = ptlrpc_replay(imp);
1512 rc = ldlm_replay_locks(imp);
1516 rc = signal_completed_replay(imp);
1519 } else if (msg_flags & MSG_CONNECT_RECONNECT) {
1520 DEBUG_REQ(D_HA, req, "reconnecting to MDS\n");
1521 /* Nothing else to do here. */
1523 DEBUG_REQ(D_HA, req, "evicted: invalidating\n");
1524 /* Otherwise, clean everything up. */
1525 ldlm_namespace_cleanup(ns, 1);
1526 ptlrpc_abort_inflight(imp, 0);
1529 ptlrpc_req_finished(req);
1531 spin_lock_irqsave(&imp->imp_lock, flags);
1532 imp->imp_level = LUSTRE_CONN_FULL;
1533 imp->imp_flags &= ~IMP_INVALID;
1534 spin_unlock_irqrestore(&imp->imp_lock, flags);
1536 /* Is this the right place? Should we do this in _PREPARE
1537 * as well? What about raising the level right away?
1539 ptlrpc_wake_delayed(imp);
1541 rc = ptlrpc_resend(imp);
1545 set_osc_active(imp, 1 /* active */);
1549 /* If we get disconnected in the middle, recovery has probably
1550 * failed. Reconnect and find out.
1552 if (rc == -ENOTCONN)
1556 case PTLRPC_RECOVD_PHASE_NOTCONN:
1557 osc_recover(imp, PTLRPC_RECOVD_PHASE_PREPARE);
1558 RETURN(osc_recover(imp, PTLRPC_RECOVD_PHASE_RECOVER));
1565 static int osc_connect(struct lustre_handle *conn, struct obd_device *obd,
1566 struct obd_uuid *cluuid, struct recovd_obd *recovd,
1567 ptlrpc_recovery_cb_t recover)
1569 struct obd_import *imp = &obd->u.cli.cl_import;
1570 imp->imp_recover = osc_recover;
1571 return client_obd_connect(conn, obd, cluuid, recovd, recover);
1574 struct obd_ops osc_obd_ops = {
1575 o_owner: THIS_MODULE,
1576 o_attach: osc_attach,
1577 o_detach: osc_detach,
1578 o_setup: client_obd_setup,
1579 o_cleanup: client_obd_cleanup,
1580 o_connect: osc_connect,
1581 o_disconnect: client_obd_disconnect,
1582 o_statfs: osc_statfs,
1583 o_packmd: osc_packmd,
1584 o_unpackmd: osc_unpackmd,
1585 o_create: osc_create,
1586 o_destroy: osc_destroy,
1587 o_getattr: osc_getattr,
1588 o_setattr: osc_setattr,
1593 o_enqueue: osc_enqueue,
1594 o_cancel: osc_cancel,
1595 o_cancel_unused: osc_cancel_unused,
1596 o_iocontrol: osc_iocontrol
1599 struct obd_ops sanosc_obd_ops = {
1600 o_owner: THIS_MODULE,
1601 o_attach: osc_attach,
1602 o_detach: osc_detach,
1603 o_cleanup: client_obd_cleanup,
1604 o_connect: osc_connect,
1605 o_disconnect: client_obd_disconnect,
1606 o_statfs: osc_statfs,
1607 o_packmd: osc_packmd,
1608 o_unpackmd: osc_unpackmd,
1609 o_create: osc_create,
1610 o_destroy: osc_destroy,
1611 o_getattr: osc_getattr,
1612 o_setattr: osc_setattr,
1616 o_setup: client_sanobd_setup,
1620 o_enqueue: osc_enqueue,
1621 o_cancel: osc_cancel,
1622 o_cancel_unused: osc_cancel_unused,
1623 o_iocontrol: osc_iocontrol,
1626 int __init osc_init(void)
1628 struct lprocfs_static_vars lvars;
1632 LASSERT(sizeof(struct osc_obdo_data) <= FD_OSTDATA_SIZE);
1634 lprocfs_init_vars(&lvars);
1636 rc = class_register_type(&osc_obd_ops, lvars.module_vars,
1641 rc = class_register_type(&sanosc_obd_ops, lvars.module_vars,
1642 LUSTRE_SANOSC_NAME);
1644 class_unregister_type(LUSTRE_OSC_NAME);
1649 static void __exit osc_exit(void)
1651 class_unregister_type(LUSTRE_SANOSC_NAME);
1652 class_unregister_type(LUSTRE_OSC_NAME);
1656 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
1657 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC)");
1658 MODULE_LICENSE("GPL");
1660 module_init(osc_init);
1661 module_exit(osc_exit);