1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
4 * Copyright (C) 2001-2003 Cluster File Systems, Inc.
5 * Author Peter Braam <braam@clusterfs.com>
7 * This file is part of Lustre, http://www.lustre.org.
9 * Lustre is free software; you can redistribute it and/or
10 * modify it under the terms of version 2 of the GNU General Public
11 * License as published by the Free Software Foundation.
13 * Lustre is distributed in the hope that it will be useful,
14 * but WITHOUT ANY WARRANTY; without even the implied warranty of
15 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 * GNU General Public License for more details.
18 * You should have received a copy of the GNU General Public License
19 * along with Lustre; if not, write to the Free Software
20 * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
22 * For testing and management it is treated as an obd_device,
23 * although * it does not export a full OBD method table (the
24 * requests are coming * in over the wire, so object target modules
25 * do not have a full * method table.)
30 #define DEBUG_SUBSYSTEM S_OSC
33 #include <linux/version.h>
34 #include <linux/module.h>
36 #include <linux/highmem.h>
37 #include <linux/lustre_dlm.h>
38 #if (LINUX_VERSION_CODE >= KERNEL_VERSION(2,5,0))
39 #include <linux/workqueue.h>
40 #include <linux/smp_lock.h>
42 #include <linux/locks.h>
45 #include <liblustre.h>
48 #include <linux/kp30.h>
49 #include <linux/lustre_mds.h> /* for mds_objid */
50 #include <linux/obd_ost.h>
51 #include <linux/obd_lov.h> /* for IOC_LOV_SET_OSC_ACTIVE */
52 #include <linux/ctype.h>
53 #include <linux/init.h>
54 #include <linux/lustre_ha.h>
55 #include <linux/obd_support.h> /* for OBD_FAIL_CHECK */
56 #include <linux/lustre_lite.h> /* for ll_i2info */
57 #include <portals/lib-types.h> /* for PTL_MD_MAX_IOV */
58 #include <linux/lprocfs_status.h>
60 /* It is important that ood_fh remain the first item in this structure: that
61 * way, we don't have to re-pack the obdo's inline data before we send it to
62 * the server, we can just send the whole struct unaltered. */
63 #define OSC_OBDO_DATA_MAGIC 0xD15EA5ED
64 struct osc_obdo_data {
65 struct lustre_handle ood_fh;
66 struct ptlrpc_request *ood_request;
69 #include <linux/obd_lov.h> /* just for the startup assertion; is that wrong? */
71 static int send_sync(struct obd_import *imp, struct ll_fid *rootfid,
72 int level, int msg_flags)
74 struct ptlrpc_request *req;
75 struct mds_body *body;
76 int rc, size = sizeof(*body);
79 req = ptlrpc_prep_req(imp, OST_SYNCFS, 1, &size, NULL);
81 GOTO(out, rc = -ENOMEM);
83 body = lustre_msg_buf(req->rq_reqmsg, 0);
84 req->rq_level = level;
85 req->rq_replen = lustre_msg_size(1, &size);
87 req->rq_reqmsg->flags |= msg_flags;
88 rc = ptlrpc_queue_wait(req);
91 CDEBUG(D_NET, "last_committed="LPU64
92 ", last_xid="LPU64"\n",
93 req->rq_repmsg->last_committed,
94 req->rq_repmsg->last_xid);
99 ptlrpc_req_finished(req);
103 static int signal_completed_replay(struct obd_import *imp)
107 return send_sync(imp, &fid, LUSTRE_CONN_RECOVD, MSG_LAST_REPLAY);
110 static int osc_attach(struct obd_device *dev, obd_count len, void *data)
112 struct lprocfs_static_vars lvars;
114 lprocfs_init_vars(&lvars);
115 return lprocfs_obd_attach(dev, lvars.obd_vars);
118 static int osc_detach(struct obd_device *dev)
120 return lprocfs_obd_detach(dev);
123 /* Pack OSC object metadata for shipment to the MDS. */
124 static int osc_packmd(struct lustre_handle *conn, struct lov_mds_md **lmmp,
125 struct lov_stripe_md *lsm)
130 lmm_size = sizeof(**lmmp);
135 OBD_FREE(*lmmp, lmm_size);
141 OBD_ALLOC(*lmmp, lmm_size);
146 LASSERT(lsm->lsm_object_id);
147 (*lmmp)->lmm_object_id = (lsm->lsm_object_id);
153 static int osc_unpackmd(struct lustre_handle *conn, struct lov_stripe_md **lsmp,
154 struct lov_mds_md *lmm)
159 lsm_size = sizeof(**lsmp);
164 OBD_FREE(*lsmp, lsm_size);
170 OBD_ALLOC(*lsmp, lsm_size);
177 (*lsmp)->lsm_object_id = (lmm->lmm_object_id);
178 LASSERT((*lsmp)->lsm_object_id);
184 inline void oti_from_request(struct obd_trans_info *oti,
185 struct ptlrpc_request *req)
187 if (oti && req->rq_repmsg)
188 oti->oti_transno = NTOH__u64(req->rq_repmsg->transno);
192 static int osc_getattr(struct lustre_handle *conn, struct obdo *oa,
193 struct lov_stripe_md *md)
195 struct ptlrpc_request *request;
196 struct ost_body *body;
197 int rc, size = sizeof(*body);
200 request = ptlrpc_prep_req(class_conn2cliimp(conn), OST_GETATTR, 1,
205 body = lustre_msg_buf(request->rq_reqmsg, 0);
206 #warning FIXME: pack only valid fields instead of memcpy, endianness
207 memcpy(&body->oa, oa, sizeof(*oa));
209 request->rq_replen = lustre_msg_size(1, &size);
211 rc = ptlrpc_queue_wait(request);
213 CERROR("%s failed: rc = %d\n", __FUNCTION__, rc);
217 body = lustre_msg_buf(request->rq_repmsg, 0);
218 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
219 memcpy(oa, &body->oa, sizeof(*oa));
223 ptlrpc_req_finished(request);
227 static int osc_open(struct lustre_handle *conn, struct obdo *oa,
228 struct lov_stripe_md *md, struct obd_trans_info *oti)
230 struct ptlrpc_request *request;
231 struct ost_body *body;
232 int rc, size = sizeof(*body);
235 request = ptlrpc_prep_req(class_conn2cliimp(conn), OST_OPEN, 1, &size,
240 request->rq_flags |= PTL_RPC_FL_REPLAY;
241 body = lustre_msg_buf(request->rq_reqmsg, 0);
242 #warning FIXME: pack only valid fields instead of memcpy, endianness
243 memcpy(&body->oa, oa, sizeof(*oa));
245 request->rq_replen = lustre_msg_size(1, &size);
247 rc = ptlrpc_queue_wait(request);
252 struct osc_obdo_data ood;
253 body = lustre_msg_buf(request->rq_repmsg, 0);
254 memcpy(oa, &body->oa, sizeof(*oa));
256 /* If the open succeeded, we better have a handle */
257 /* BlueArc OSTs don't send back (o_valid | FLHANDLE). sigh.
258 * Temporary workaround until fixed. -phil 24 Feb 03 */
259 //LASSERT(oa->o_valid & OBD_MD_FLHANDLE);
260 oa->o_valid |= OBD_MD_FLHANDLE;
262 memcpy(&ood.ood_fh, obdo_handle(oa), sizeof(ood.ood_fh));
263 ood.ood_request = ptlrpc_request_addref(request);
264 ood.ood_magic = OSC_OBDO_DATA_MAGIC;
266 /* Save this data in the request; it will be passed back to us
267 * in future obdos. This memcpy is guaranteed to be safe,
268 * because we check at compile-time that sizeof(ood) is smaller
269 * than oa->o_inline. */
270 memcpy(&oa->o_inline, &ood, sizeof(ood));
275 ptlrpc_req_finished(request);
279 static int osc_close(struct lustre_handle *conn, struct obdo *oa,
280 struct lov_stripe_md *md, struct obd_trans_info *oti)
282 struct obd_import *import = class_conn2cliimp(conn);
283 struct ptlrpc_request *request;
284 struct ost_body *body;
285 struct osc_obdo_data *ood;
287 int rc, size = sizeof(*body);
291 ood = (struct osc_obdo_data *)&oa->o_inline;
292 LASSERT(ood->ood_magic == OSC_OBDO_DATA_MAGIC);
294 request = ptlrpc_prep_req(import, OST_CLOSE, 1, &size, NULL);
298 body = lustre_msg_buf(request->rq_reqmsg, 0);
299 #warning FIXME: pack only valid fields instead of memcpy, endianness
300 memcpy(&body->oa, oa, sizeof(*oa));
302 request->rq_replen = lustre_msg_size(1, &size);
304 rc = ptlrpc_queue_wait(request);
306 /* FIXME: Does this mean that the file is still open locally?
307 * If not, and I somehow suspect not, we need to cleanup
312 spin_lock_irqsave(&import->imp_lock, flags);
313 ood->ood_request->rq_flags &= ~PTL_RPC_FL_REPLAY;
314 /* see comments in llite/file.c:ll_mdc_close() */
315 if (ood->ood_request->rq_transno) {
316 LBUG(); /* this can't happen yet */
317 if (!request->rq_transno) {
318 request->rq_transno = ood->ood_request->rq_transno;
319 ptlrpc_retain_replayable_request(request, import);
321 spin_unlock_irqrestore(&import->imp_lock, flags);
323 spin_unlock_irqrestore(&import->imp_lock, flags);
324 ptlrpc_req_finished(ood->ood_request);
327 body = lustre_msg_buf(request->rq_repmsg, 0);
328 memcpy(oa, &body->oa, sizeof(*oa));
332 ptlrpc_req_finished(request);
336 static int osc_setattr(struct lustre_handle *conn, struct obdo *oa,
337 struct lov_stripe_md *md, struct obd_trans_info *oti)
339 struct ptlrpc_request *request;
340 struct ost_body *body;
341 int rc, size = sizeof(*body);
344 request = ptlrpc_prep_req(class_conn2cliimp(conn), OST_SETATTR, 1,
349 body = lustre_msg_buf(request->rq_reqmsg, 0);
350 memcpy(&body->oa, oa, sizeof(*oa));
352 request->rq_replen = lustre_msg_size(1, &size);
354 rc = ptlrpc_queue_wait(request);
356 ptlrpc_req_finished(request);
360 static int osc_create(struct lustre_handle *conn, struct obdo *oa,
361 struct lov_stripe_md **ea, struct obd_trans_info *oti_in)
363 struct ptlrpc_request *request;
364 struct ost_body *body;
365 struct lov_stripe_md *lsm;
366 struct obd_trans_info *oti, trans_info;
367 int rc, size = sizeof(*body);
375 rc = obd_alloc_memmd(conn, &lsm);
385 request = ptlrpc_prep_req(class_conn2cliimp(conn), OST_CREATE, 1, &size,
388 GOTO(out, rc = -ENOMEM);
390 body = lustre_msg_buf(request->rq_reqmsg, 0);
391 memcpy(&body->oa, oa, sizeof(*oa));
393 request->rq_replen = lustre_msg_size(1, &size);
395 rc = ptlrpc_queue_wait(request);
399 body = lustre_msg_buf(request->rq_repmsg, 0);
400 memcpy(oa, &body->oa, sizeof(*oa));
402 lsm->lsm_object_id = oa->o_id;
403 lsm->lsm_stripe_count = 0;
406 oti_from_request(oti, request);
407 CDEBUG(D_HA, "transno: "LPD64"\n", oti->oti_transno);
410 ptlrpc_req_finished(request);
413 obd_free_memmd(conn, &lsm);
417 static int osc_punch(struct lustre_handle *conn, struct obdo *oa,
418 struct lov_stripe_md *md, obd_size start,
419 obd_size end, struct obd_trans_info *oti)
421 struct ptlrpc_request *request;
422 struct ost_body *body;
423 int rc, size = sizeof(*body);
431 request = ptlrpc_prep_req(class_conn2cliimp(conn), OST_PUNCH, 1, &size,
436 body = lustre_msg_buf(request->rq_reqmsg, 0);
437 #warning FIXME: pack only valid fields instead of memcpy, endianness, valid
438 memcpy(&body->oa, oa, sizeof(*oa));
440 /* overload the size and blocks fields in the oa with start/end */
441 body->oa.o_size = HTON__u64(start);
442 body->oa.o_blocks = HTON__u64(end);
443 body->oa.o_valid |= HTON__u32(OBD_MD_FLSIZE | OBD_MD_FLBLOCKS);
445 request->rq_replen = lustre_msg_size(1, &size);
447 rc = ptlrpc_queue_wait(request);
451 body = lustre_msg_buf(request->rq_repmsg, 0);
452 memcpy(oa, &body->oa, sizeof(*oa));
456 ptlrpc_req_finished(request);
460 static int osc_destroy(struct lustre_handle *conn, struct obdo *oa,
461 struct lov_stripe_md *ea, struct obd_trans_info *oti)
463 struct ptlrpc_request *request;
464 struct ost_body *body;
465 int rc, size = sizeof(*body);
472 request = ptlrpc_prep_req(class_conn2cliimp(conn), OST_DESTROY, 1,
477 body = lustre_msg_buf(request->rq_reqmsg, 0);
478 #warning FIXME: pack only valid fields instead of memcpy, endianness
479 memcpy(&body->oa, oa, sizeof(*oa));
481 request->rq_replen = lustre_msg_size(1, &size);
483 rc = ptlrpc_queue_wait(request);
487 body = lustre_msg_buf(request->rq_repmsg, 0);
488 memcpy(oa, &body->oa, sizeof(*oa));
492 ptlrpc_req_finished(request);
496 /* Our bulk-unmapping bottom half. */
497 static void unmap_and_decref_bulk_desc(void *data)
499 struct ptlrpc_bulk_desc *desc = data;
500 struct list_head *tmp;
503 list_for_each(tmp, &desc->bd_page_list) {
504 struct ptlrpc_bulk_page *bulk;
505 bulk = list_entry(tmp, struct ptlrpc_bulk_page, bp_link);
507 kunmap(bulk->bp_page);
511 ptlrpc_bulk_decref(desc);
516 /* this is the callback function which is invoked by the Portals
517 * event handler associated with the bulk_sink queue and bulk_source queue.
519 static void osc_ptl_ev_hdlr(struct ptlrpc_bulk_desc *desc)
523 LASSERT(desc->bd_brw_set != NULL);
524 LASSERT(desc->bd_brw_set->brw_callback != NULL);
526 desc->bd_brw_set->brw_callback(desc->bd_brw_set, CB_PHASE_FINISH);
528 /* We can't kunmap the desc from interrupt context, so we do it from
529 * the bottom half above. */
530 prepare_work(&desc->bd_queue, unmap_and_decref_bulk_desc, desc);
531 schedule_work(&desc->bd_queue);
537 * This is called when there was a bulk error return. However, we don't know
538 * whether the bulk completed or not. We cancel the portals bulk descriptors,
539 * so that if the OST decides to send them later we don't double free. Then
540 * remove this descriptor from the set so that the set callback doesn't wait
541 * forever for the last CB_PHASE_FINISH to be called, and finally dump all of
542 * the bulk descriptor references.
544 static void osc_ptl_ev_abort(struct ptlrpc_bulk_desc *desc)
548 LASSERT(desc->bd_brw_set != NULL);
550 ptlrpc_abort_bulk(desc);
551 obd_brw_set_del(desc);
552 unmap_and_decref_bulk_desc(desc);
557 static int osc_brw_read(struct lustre_handle *conn, struct lov_stripe_md *lsm,
558 obd_count page_count, struct brw_page *pga,
559 struct obd_brw_set *set)
561 struct obd_import *imp = class_conn2cliimp(conn);
562 struct ptlrpc_connection *connection = imp->imp_connection;
563 struct ptlrpc_request *request = NULL;
564 struct ptlrpc_bulk_desc *desc = NULL;
565 struct ost_body *body;
566 int rc, size[3] = {sizeof(*body)}, mapped = 0;
567 struct obd_ioobj *iooptr;
573 size[1] = sizeof(struct obd_ioobj);
574 size[2] = page_count * sizeof(struct niobuf_remote);
576 request = ptlrpc_prep_req(imp, OST_READ, 3, size, NULL);
580 body = lustre_msg_buf(request->rq_reqmsg, 0);
581 body->oa.o_valid = HTON__u32(OBD_MD_FLCKSUM * CHECKSUM_BULK);
583 desc = ptlrpc_prep_bulk(connection);
585 GOTO(out_req, rc = -ENOMEM);
586 desc->bd_portal = OST_BULK_PORTAL;
587 desc->bd_ptl_ev_hdlr = osc_ptl_ev_hdlr;
588 CDEBUG(D_PAGE, "desc = %p\n", desc);
590 iooptr = lustre_msg_buf(request->rq_reqmsg, 1);
591 nioptr = lustre_msg_buf(request->rq_reqmsg, 2);
592 ost_pack_ioo(&iooptr, lsm, page_count);
593 /* end almost identical to brw_write case */
595 xid = ptlrpc_next_xid(); /* single xid for all pages */
597 obd_kmap_get(page_count, 0);
599 for (mapped = 0; mapped < page_count; mapped++) {
600 struct ptlrpc_bulk_page *bulk = ptlrpc_prep_bulk_page(desc);
602 unmap_and_decref_bulk_desc(desc);
603 GOTO(out_req, rc = -ENOMEM);
606 bulk->bp_xid = xid; /* single xid for all pages */
608 bulk->bp_buf = kmap(pga[mapped].pg);
609 bulk->bp_page = pga[mapped].pg;
610 bulk->bp_buflen = PAGE_SIZE;
611 ost_pack_niobuf(&nioptr, pga[mapped].off, pga[mapped].count,
612 pga[mapped].flag, bulk->bp_xid);
616 * Register the bulk first, because the reply could arrive out of order,
617 * and we want to be ready for the bulk data.
619 * One reference is released when osc_ptl_ev_hdlr() is called by
620 * portals, the other when the caller removes us from the "set" list.
622 * On error, we never do the brw_finish, so we handle all decrefs.
624 if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_READ_BULK)) {
625 CERROR("obd_fail_loc=%x, skipping register_bulk\n",
626 OBD_FAIL_OSC_BRW_READ_BULK);
628 rc = ptlrpc_register_bulk_put(desc);
630 unmap_and_decref_bulk_desc(desc);
633 obd_brw_set_add(set, desc);
636 request->rq_flags |= PTL_RPC_FL_NO_RESEND;
637 request->rq_replen = lustre_msg_size(1, size);
638 rc = ptlrpc_queue_wait(request);
640 /* XXX bug 937 here */
641 if (rc == -ETIMEDOUT && (request->rq_flags & PTL_RPC_FL_RESEND)) {
642 DEBUG_REQ(D_HA, request, "BULK TIMEOUT");
643 ptlrpc_req_finished(request);
648 osc_ptl_ev_abort(desc);
653 body = lustre_msg_buf(request->rq_repmsg, 0);
654 if (body->oa.o_valid & NTOH__u32(OBD_MD_FLCKSUM)) {
655 static int cksum_counter;
656 __u64 server_cksum = NTOH__u64(body->oa.o_rdev);
659 for (mapped = 0; mapped < page_count; mapped++) {
660 char *ptr = kmap(pga[mapped].pg);
661 int off = pga[mapped].off & (PAGE_SIZE - 1);
662 int len = pga[mapped].count;
664 LASSERT(off + len <= PAGE_SIZE);
665 ost_checksum(&cksum, ptr + off, len);
666 kunmap(pga[mapped].pg);
670 if (server_cksum != cksum) {
671 CERROR("Bad checksum: server "LPX64", client "LPX64
672 ", server NID "LPX64"\n", server_cksum, cksum,
673 imp->imp_connection->c_peer.peer_nid);
675 } else if ((cksum_counter & (-cksum_counter)) == cksum_counter)
676 CERROR("Checksum %u from "LPX64" OK: "LPX64"\n",
678 imp->imp_connection->c_peer.peer_nid, cksum);
680 static int cksum_missed;
682 if ((cksum_missed & (-cksum_missed)) == cksum_missed)
683 CERROR("Request checksum %u from "LPX64", no reply\n",
685 imp->imp_connection->c_peer.peer_nid);
691 ptlrpc_req_finished(request);
695 static int osc_brw_write(struct lustre_handle *conn, struct lov_stripe_md *lsm,
696 obd_count page_count, struct brw_page *pga,
697 struct obd_brw_set *set, struct obd_trans_info *oti)
699 struct obd_import *imp = class_conn2cliimp(conn);
700 struct ptlrpc_connection *connection = imp->imp_connection;
701 struct ptlrpc_request *request = NULL;
702 struct ptlrpc_bulk_desc *desc = NULL;
703 struct ost_body *body;
704 int rc, size[3] = {sizeof(*body)}, mapped = 0;
705 struct obd_ioobj *iooptr;
714 size[1] = sizeof(struct obd_ioobj);
715 size[2] = page_count * sizeof(struct niobuf_remote);
717 request = ptlrpc_prep_req(imp, OST_WRITE, 3, size, NULL);
721 body = lustre_msg_buf(request->rq_reqmsg, 0);
723 desc = ptlrpc_prep_bulk(connection);
725 GOTO(out_req, rc = -ENOMEM);
726 desc->bd_portal = OSC_BULK_PORTAL;
727 desc->bd_ptl_ev_hdlr = osc_ptl_ev_hdlr;
728 CDEBUG(D_PAGE, "desc = %p\n", desc);
730 iooptr = lustre_msg_buf(request->rq_reqmsg, 1);
731 nioptr = lustre_msg_buf(request->rq_reqmsg, 2);
732 ost_pack_ioo(&iooptr, lsm, page_count);
733 /* end almost identical to brw_read case */
735 xid = ptlrpc_next_xid(); /* single xid for all pages */
737 obd_kmap_get(page_count, 0);
739 for (mapped = 0; mapped < page_count; mapped++) {
740 struct ptlrpc_bulk_page *bulk = ptlrpc_prep_bulk_page(desc);
742 unmap_and_decref_bulk_desc(desc);
743 GOTO(out_req, rc = -ENOMEM);
746 bulk->bp_xid = xid; /* single xid for all pages */
748 bulk->bp_buf = kmap(pga[mapped].pg);
749 bulk->bp_page = pga[mapped].pg;
750 bulk->bp_buflen = pga[mapped].count;
751 ost_pack_niobuf(&nioptr, pga[mapped].off, pga[mapped].count,
752 pga[mapped].flag, bulk->bp_xid);
753 ost_checksum(&cksum, bulk->bp_buf, bulk->bp_buflen);
757 body->oa.o_rdev = HTON__u64(cksum);
758 body->oa.o_valid |= HTON__u32(OBD_MD_FLCKSUM);
761 * Register the bulk first, because the reply could arrive out of
762 * order, and we want to be ready for the bulk data.
764 * One reference is released when brw_finish is complete, the other
765 * when the caller removes us from the "set" list.
767 * On error, we never do the brw_finish, so we handle all decrefs.
769 if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_WRITE_BULK)) {
770 CERROR("obd_fail_loc=%x, skipping register_bulk\n",
771 OBD_FAIL_OSC_BRW_WRITE_BULK);
773 rc = ptlrpc_register_bulk_get(desc);
775 unmap_and_decref_bulk_desc(desc);
778 obd_brw_set_add(set, desc);
781 request->rq_flags |= PTL_RPC_FL_NO_RESEND;
782 request->rq_replen = lustre_msg_size(1, size);
783 rc = ptlrpc_queue_wait(request);
785 /* XXX bug 937 here */
786 if (rc == -ETIMEDOUT && (request->rq_flags & PTL_RPC_FL_RESEND)) {
787 DEBUG_REQ(D_HA, request, "BULK TIMEOUT");
788 ptlrpc_req_finished(request);
793 osc_ptl_ev_abort(desc);
799 ptlrpc_req_finished(request);
804 #define min_t(a,b,c) ( b<c ) ? b : c
807 #warning "FIXME: make values dynamic based on get_info at setup (bug 665)"
808 #define OSC_BRW_MAX_SIZE 65536
809 #define OSC_BRW_MAX_IOV min_t(int, PTL_MD_MAX_IOV, OSC_BRW_MAX_SIZE/PAGE_SIZE)
811 static int osc_brw(int cmd, struct lustre_handle *conn,
812 struct lov_stripe_md *md, obd_count page_count,
813 struct brw_page *pga, struct obd_brw_set *set,
814 struct obd_trans_info *oti)
819 obd_count pages_per_brw;
822 if (page_count > OSC_BRW_MAX_IOV)
823 pages_per_brw = OSC_BRW_MAX_IOV;
825 pages_per_brw = page_count;
827 if (cmd & OBD_BRW_WRITE)
828 rc = osc_brw_write(conn, md, pages_per_brw, pga,
831 rc = osc_brw_read(conn, md, pages_per_brw, pga, set);
836 page_count -= pages_per_brw;
837 pga += pages_per_brw;
843 /* Note: caller will lock/unlock, and set uptodate on the pages */
844 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
845 static int sanosc_brw_read(struct lustre_handle *conn,
846 struct lov_stripe_md *md,
847 obd_count page_count,
848 struct brw_page *pga,
849 struct obd_brw_set *set)
851 struct ptlrpc_request *request = NULL;
852 struct ost_body *body;
853 struct niobuf_remote *remote, *nio_rep;
854 int rc, j, size[3] = {sizeof(*body)}, mapped = 0;
855 struct obd_ioobj *iooptr;
859 size[1] = sizeof(struct obd_ioobj);
860 size[2] = page_count * sizeof(*remote);
862 request = ptlrpc_prep_req(class_conn2cliimp(conn), OST_SAN_READ, 3,
867 body = lustre_msg_buf(request->rq_reqmsg, 0);
868 iooptr = lustre_msg_buf(request->rq_reqmsg, 1);
869 nioptr = lustre_msg_buf(request->rq_reqmsg, 2);
870 ost_pack_ioo(&iooptr, md, page_count);
872 obd_kmap_get(page_count, 0);
874 for (mapped = 0; mapped < page_count; mapped++) {
875 LASSERT(PageLocked(pga[mapped].pg));
877 kmap(pga[mapped].pg);
878 ost_pack_niobuf(&nioptr, pga[mapped].off, pga[mapped].count,
879 pga[mapped].flag, 0);
882 size[1] = page_count * sizeof(*remote);
883 request->rq_replen = lustre_msg_size(2, size);
885 rc = ptlrpc_queue_wait(request);
889 nioptr = lustre_msg_buf(request->rq_repmsg, 1);
891 GOTO(out_unmap, rc = -EINVAL);
893 if (request->rq_repmsg->buflens[1] != size[1]) {
894 CERROR("buffer length wrong (%d vs. %d)\n",
895 request->rq_repmsg->buflens[1], size[1]);
896 GOTO(out_unmap, rc = -EINVAL);
899 for (j = 0; j < page_count; j++) {
900 ost_unpack_niobuf(&nioptr, &remote);
903 nioptr = lustre_msg_buf(request->rq_repmsg, 1);
904 nio_rep = (struct niobuf_remote*)nioptr;
907 for (j = 0; j < page_count; j++) {
908 struct page *page = pga[j].pg;
909 struct buffer_head *bh;
912 /* got san device associated */
913 LASSERT(class_conn2obd(conn));
914 dev = class_conn2obd(conn)->u.cli.cl_sandev;
917 if (!nio_rep[j].offset) {
918 CDEBUG(D_PAGE, "hole at ino %lu; index %ld\n",
919 page->mapping->host->i_ino,
921 memset(page_address(page), 0, PAGE_SIZE);
925 if (!page->buffers) {
926 create_empty_buffers(page, dev, PAGE_SIZE);
929 clear_bit(BH_New, &bh->b_state);
930 set_bit(BH_Mapped, &bh->b_state);
931 bh->b_blocknr = (unsigned long)nio_rep[j].offset;
933 clear_bit(BH_Uptodate, &bh->b_state);
935 ll_rw_block(READ, 1, &bh);
939 /* if buffer already existed, it must be the
940 * one we mapped before, check it */
941 LASSERT(!test_bit(BH_New, &bh->b_state));
942 LASSERT(test_bit(BH_Mapped, &bh->b_state));
943 LASSERT(bh->b_blocknr ==
944 (unsigned long)nio_rep[j].offset);
946 /* wait it's io completion */
947 if (test_bit(BH_Lock, &bh->b_state))
950 if (!test_bit(BH_Uptodate, &bh->b_state))
951 ll_rw_block(READ, 1, &bh);
955 /* must do syncronous write here */
957 if (!buffer_uptodate(bh)) {
965 ptlrpc_req_finished(request);
969 /* Clean up on error. */
971 kunmap(pga[mapped].pg);
973 obd_kmap_put(page_count);
978 static int sanosc_brw_write(struct lustre_handle *conn,
979 struct lov_stripe_md *md,
980 obd_count page_count,
981 struct brw_page *pga,
982 struct obd_brw_set *set)
984 struct ptlrpc_request *request = NULL;
985 struct ost_body *body;
986 struct niobuf_remote *remote, *nio_rep;
987 int rc, j, size[3] = {sizeof(*body)}, mapped = 0;
988 struct obd_ioobj *iooptr;
992 size[1] = sizeof(struct obd_ioobj);
993 size[2] = page_count * sizeof(*remote);
995 request = ptlrpc_prep_req(class_conn2cliimp(conn), OST_SAN_WRITE,
1000 body = lustre_msg_buf(request->rq_reqmsg, 0);
1001 iooptr = lustre_msg_buf(request->rq_reqmsg, 1);
1002 nioptr = lustre_msg_buf(request->rq_reqmsg, 2);
1003 ost_pack_ioo(&iooptr, md, page_count);
1005 /* map pages, and pack request */
1006 obd_kmap_get(page_count, 0);
1007 for (mapped = 0; mapped < page_count; mapped++) {
1008 LASSERT(PageLocked(pga[mapped].pg));
1010 kmap(pga[mapped].pg);
1011 ost_pack_niobuf(&nioptr, pga[mapped].off, pga[mapped].count,
1012 pga[mapped].flag, 0);
1015 size[1] = page_count * sizeof(*remote);
1016 request->rq_replen = lustre_msg_size(2, size);
1018 rc = ptlrpc_queue_wait(request);
1020 GOTO(out_unmap, rc);
1022 nioptr = lustre_msg_buf(request->rq_repmsg, 1);
1024 GOTO(out_unmap, rc = -EINVAL);
1026 if (request->rq_repmsg->buflens[1] != size[1]) {
1027 CERROR("buffer length wrong (%d vs. %d)\n",
1028 request->rq_repmsg->buflens[1], size[1]);
1029 GOTO(out_unmap, rc = -EINVAL);
1032 for (j = 0; j < page_count; j++) {
1033 ost_unpack_niobuf(&nioptr, &remote);
1036 nioptr = lustre_msg_buf(request->rq_repmsg, 1);
1037 nio_rep = (struct niobuf_remote*)nioptr;
1040 for (j = 0; j < page_count; j++) {
1041 struct page *page = pga[j].pg;
1042 struct buffer_head *bh;
1045 /* got san device associated */
1046 LASSERT(class_conn2obd(conn));
1047 dev = class_conn2obd(conn)->u.cli.cl_sandev;
1049 if (!page->buffers) {
1050 create_empty_buffers(page, dev, PAGE_SIZE);
1053 LASSERT(!test_bit(BH_New, &page->buffers->b_state));
1054 LASSERT(test_bit(BH_Mapped, &page->buffers->b_state));
1055 LASSERT(page->buffers->b_blocknr ==
1056 (unsigned long)nio_rep[j].offset);
1062 /* if buffer locked, wait it's io completion */
1063 if (test_bit(BH_Lock, &bh->b_state))
1066 clear_bit(BH_New, &bh->b_state);
1067 set_bit(BH_Mapped, &bh->b_state);
1069 /* override the block nr */
1070 bh->b_blocknr = (unsigned long)nio_rep[j].offset;
1072 /* we are about to write it, so set it
1074 * page lock should garentee no race condition here */
1075 set_bit(BH_Uptodate, &bh->b_state);
1076 set_bit(BH_Dirty, &bh->b_state);
1078 ll_rw_block(WRITE, 1, &bh);
1080 /* must do syncronous write here */
1082 if (!buffer_uptodate(bh) || test_bit(BH_Dirty, &bh->b_state)) {
1090 ptlrpc_req_finished(request);
1094 /* Clean up on error. */
1095 while (mapped-- > 0)
1096 kunmap(pga[mapped].pg);
1098 obd_kmap_put(page_count);
1103 static int sanosc_brw_read(struct lustre_handle *conn,
1104 struct lov_stripe_md *md,
1105 obd_count page_count,
1106 struct brw_page *pga,
1107 struct obd_brw_set *set)
1113 static int sanosc_brw_write(struct lustre_handle *conn,
1114 struct lov_stripe_md *md,
1115 obd_count page_count,
1116 struct brw_page *pga,
1117 struct obd_brw_set *set)
1124 static int sanosc_brw(int cmd, struct lustre_handle *conn,
1125 struct lov_stripe_md *md, obd_count page_count,
1126 struct brw_page *pga, struct obd_brw_set *set,
1127 struct obd_trans_info *oti)
1131 while (page_count) {
1132 obd_count pages_per_brw;
1135 if (page_count > OSC_BRW_MAX_IOV)
1136 pages_per_brw = OSC_BRW_MAX_IOV;
1138 pages_per_brw = page_count;
1140 if (cmd & OBD_BRW_WRITE)
1141 rc = sanosc_brw_write(conn, md, pages_per_brw,
1144 rc = sanosc_brw_read(conn, md, pages_per_brw, pga, set);
1149 page_count -= pages_per_brw;
1150 pga += pages_per_brw;
1156 static int osc_enqueue(struct lustre_handle *connh, struct lov_stripe_md *lsm,
1157 struct lustre_handle *parent_lock,
1158 __u32 type, void *extentp, int extent_len, __u32 mode,
1159 int *flags, void *callback, void *data, int datalen,
1160 struct lustre_handle *lockh)
1162 struct ldlm_res_id res_id = { .name = {lsm->lsm_object_id} };
1163 struct obd_device *obddev = class_conn2obd(connh);
1164 struct ldlm_extent *extent = extentp;
1168 /* Filesystem locks are given a bit of special treatment: if
1169 * this is not a file size lock (which has end == -1), we
1170 * fixup the lock to start and end on page boundaries. */
1171 if (extent->end != OBD_OBJECT_EOF) {
1172 extent->start &= PAGE_MASK;
1173 extent->end = (extent->end & PAGE_MASK) + PAGE_SIZE - 1;
1176 /* Next, search for already existing extent locks that will cover us */
1177 rc = ldlm_lock_match(obddev->obd_namespace, 0, &res_id, type, extent,
1178 sizeof(extent), mode, lockh);
1180 /* We already have a lock, and it's referenced */
1183 /* If we're trying to read, we also search for an existing PW lock. The
1184 * VFS and page cache already protect us locally, so lots of readers/
1185 * writers can share a single PW lock.
1187 * There are problems with conversion deadlocks, so instead of
1188 * converting a read lock to a write lock, we'll just enqueue a new
1191 * At some point we should cancel the read lock instead of making them
1192 * send us a blocking callback, but there are problems with canceling
1193 * locks out from other users right now, too. */
1195 if (mode == LCK_PR) {
1196 rc = ldlm_lock_match(obddev->obd_namespace, 0, &res_id, type,
1197 extent, sizeof(extent), LCK_PW, lockh);
1199 /* FIXME: This is not incredibly elegant, but it might
1200 * be more elegant than adding another parameter to
1201 * lock_match. I want a second opinion. */
1202 ldlm_lock_addref(lockh, LCK_PR);
1203 ldlm_lock_decref(lockh, LCK_PW);
1209 rc = ldlm_cli_enqueue(connh, NULL, obddev->obd_namespace, parent_lock,
1210 res_id, type, extent, sizeof(extent), mode, flags,
1211 ldlm_completion_ast, callback, data, NULL,
1216 static int osc_cancel(struct lustre_handle *oconn, struct lov_stripe_md *md,
1217 __u32 mode, struct lustre_handle *lockh)
1221 ldlm_lock_decref(lockh, mode);
1226 static int osc_cancel_unused(struct lustre_handle *connh,
1227 struct lov_stripe_md *lsm, int flags)
1229 struct obd_device *obddev = class_conn2obd(connh);
1230 struct ldlm_res_id res_id = { .name = {lsm->lsm_object_id} };
1232 return ldlm_cli_cancel_unused(obddev->obd_namespace, &res_id, flags);
1235 static int osc_statfs(struct lustre_handle *conn, struct obd_statfs *osfs)
1237 struct ptlrpc_request *request;
1238 int rc, size = sizeof(*osfs);
1241 request = ptlrpc_prep_req(class_conn2cliimp(conn), OST_STATFS, 0, NULL,
1246 request->rq_replen = lustre_msg_size(1, &size);
1248 rc = ptlrpc_queue_wait(request);
1250 CERROR("%s failed: rc = %d\n", __FUNCTION__, rc);
1254 obd_statfs_unpack(osfs, lustre_msg_buf(request->rq_repmsg, 0));
1258 ptlrpc_req_finished(request);
1262 /* Retrieve object striping information.
1264 * @lmmu is a pointer to an in-core struct with lmm_ost_count indicating
1265 * the maximum number of OST indices which will fit in the user buffer.
1266 * lmm_magic must be LOV_MAGIC (we only use 1 slot here).
1268 static int osc_getstripe(struct lustre_handle *conn, struct lov_stripe_md *lsm,
1269 struct lov_mds_md *lmmu)
1271 struct lov_mds_md lmm, *lmmk;
1278 rc = copy_from_user(&lmm, lmmu, sizeof(lmm));
1282 if (lmm.lmm_magic != LOV_MAGIC)
1285 if (lmm.lmm_ost_count < 1)
1288 lmm_size = sizeof(lmm) + sizeof(lmm.lmm_objects[0]);
1289 OBD_ALLOC(lmmk, lmm_size);
1293 lmmk->lmm_stripe_count = 1;
1294 lmmk->lmm_ost_count = 1;
1295 lmmk->lmm_object_id = lsm->lsm_object_id;
1296 lmmk->lmm_objects[0].l_object_id = lsm->lsm_object_id;
1298 if (copy_to_user(lmmu, lmmk, lmm_size))
1301 OBD_FREE(lmmk, lmm_size);
1306 static int osc_iocontrol(unsigned int cmd, struct lustre_handle *conn, int len,
1307 void *karg, void *uarg)
1309 struct obd_device *obddev = class_conn2obd(conn);
1310 struct obd_ioctl_data *data = karg;
1316 case IOC_LDLM_TEST: {
1317 err = ldlm_test(obddev, conn);
1318 CERROR("-- done err %d\n", err);
1321 case IOC_LDLM_REGRESS_START: {
1322 unsigned int numthreads = 1;
1323 unsigned int numheld = 10;
1324 unsigned int numres = 10;
1325 unsigned int numext = 10;
1328 if (data->ioc_inllen1) {
1329 parse = data->ioc_inlbuf1;
1330 if (*parse != '\0') {
1331 while(isspace(*parse)) parse++;
1332 numthreads = simple_strtoul(parse, &parse, 0);
1333 while(isspace(*parse)) parse++;
1335 if (*parse != '\0') {
1336 while(isspace(*parse)) parse++;
1337 numheld = simple_strtoul(parse, &parse, 0);
1338 while(isspace(*parse)) parse++;
1340 if (*parse != '\0') {
1341 while(isspace(*parse)) parse++;
1342 numres = simple_strtoul(parse, &parse, 0);
1343 while(isspace(*parse)) parse++;
1345 if (*parse != '\0') {
1346 while(isspace(*parse)) parse++;
1347 numext = simple_strtoul(parse, &parse, 0);
1348 while(isspace(*parse)) parse++;
1352 err = ldlm_regression_start(obddev, conn, numthreads,
1353 numheld, numres, numext);
1355 CERROR("-- done err %d\n", err);
1358 case IOC_LDLM_REGRESS_STOP: {
1359 err = ldlm_regression_stop();
1360 CERROR("-- done err %d\n", err);
1364 case IOC_OSC_REGISTER_LOV: {
1365 if (obddev->u.cli.cl_containing_lov)
1366 GOTO(out, err = -EALREADY);
1367 obddev->u.cli.cl_containing_lov = (struct obd_device *)karg;
1370 case OBD_IOC_LOV_GET_CONFIG: {
1372 struct lov_desc *desc;
1373 struct obd_uuid uuid;
1377 if (obd_ioctl_getdata(&buf, &len, (void *)uarg))
1378 GOTO(out, err = -EINVAL);
1380 data = (struct obd_ioctl_data *)buf;
1382 if (sizeof(*desc) > data->ioc_inllen1) {
1384 GOTO(out, err = -EINVAL);
1387 if (data->ioc_inllen2 < sizeof(uuid)) {
1389 GOTO(out, err = -EINVAL);
1392 desc = (struct lov_desc *)data->ioc_inlbuf1;
1393 desc->ld_tgt_count = 1;
1394 desc->ld_active_tgt_count = 1;
1395 desc->ld_default_stripe_count = 1;
1396 desc->ld_default_stripe_size = 0;
1397 desc->ld_default_stripe_offset = 0;
1398 desc->ld_pattern = 0;
1399 memcpy(&desc->ld_uuid, &obddev->obd_uuid, sizeof(uuid));
1401 memcpy(data->ioc_inlbuf2, &obddev->obd_uuid, sizeof(uuid));
1403 err = copy_to_user((void *)uarg, buf, len);
1409 case LL_IOC_LOV_SETSTRIPE:
1410 err = obd_alloc_memmd(conn, karg);
1414 case LL_IOC_LOV_GETSTRIPE:
1415 err = osc_getstripe(conn, karg, uarg);
1418 CERROR ("osc_ioctl(): unrecognised ioctl %#x\n", cmd);
1419 GOTO(out, err = -ENOTTY);
1425 static void set_osc_active(struct obd_import *imp, int active)
1427 struct obd_device *notify_obd;
1429 LASSERT(imp->imp_obd);
1431 notify_obd = imp->imp_obd->u.cli.cl_containing_lov;
1433 if (notify_obd == NULL)
1436 /* How gross is _this_? */
1437 if (!list_empty(¬ify_obd->obd_exports)) {
1439 struct lustre_handle fakeconn;
1440 struct obd_ioctl_data ioc_data = { 0 };
1441 struct obd_export *exp =
1442 list_entry(notify_obd->obd_exports.next,
1443 struct obd_export, exp_obd_chain);
1445 fakeconn.addr = (__u64)(unsigned long)exp;
1446 fakeconn.cookie = exp->exp_cookie;
1447 ioc_data.ioc_inlbuf1 =
1448 (char *)&imp->imp_obd->u.cli.cl_target_uuid;
1449 ioc_data.ioc_offset = active;
1450 rc = obd_iocontrol(IOC_LOV_SET_OSC_ACTIVE, &fakeconn,
1451 sizeof ioc_data, &ioc_data, NULL);
1453 CERROR("error disabling %s on LOV %p/%s: %d\n",
1454 imp->imp_obd->u.cli.cl_target_uuid.uuid,
1455 notify_obd, notify_obd->obd_uuid.uuid, rc);
1457 CDEBUG(D_HA, "No exports for obd %p/%s, can't notify about "
1458 "%p\n", notify_obd, notify_obd->obd_uuid.uuid,
1459 imp->imp_obd->obd_uuid.uuid);
1463 static int osc_recover(struct obd_import *imp, int phase)
1466 unsigned long flags;
1468 struct ptlrpc_request *req;
1469 struct ldlm_namespace *ns = imp->imp_obd->obd_namespace;
1472 CDEBUG(D_HA, "%s: entering phase: %d\n",
1473 imp->imp_obd->obd_name, phase);
1476 case PTLRPC_RECOVD_PHASE_PREPARE: {
1477 if (imp->imp_flags & IMP_REPLAYABLE) {
1478 CDEBUG(D_HA, "failover OST\n");
1479 /* If we're a failover OSC/OST, just cancel unused
1480 * locks to simplify lock replay.
1482 ldlm_cli_cancel_unused(ns, NULL, LDLM_FL_LOCAL_ONLY);
1484 CDEBUG(D_HA, "non-failover OST\n");
1485 /* Non-failover OSTs (LLNL scenario) disable the OSC
1486 * and invalidate local state.
1488 ldlm_namespace_cleanup(ns, 1 /* no network ops */);
1489 ptlrpc_abort_inflight(imp, 0);
1490 set_osc_active(imp, 0 /* inactive */);
1495 case PTLRPC_RECOVD_PHASE_RECOVER: {
1497 imp->imp_flags &= ~IMP_INVALID;
1498 rc = ptlrpc_reconnect_import(imp, OST_CONNECT, &req);
1500 msg_flags = req->rq_repmsg
1501 ? lustre_msg_get_op_flags(req->rq_repmsg)
1504 if (rc == -EBUSY && (msg_flags & MSG_CONNECT_RECOVERING))
1505 CERROR("reconnect denied by recovery; should retry\n");
1508 if (phase != PTLRPC_RECOVD_PHASE_NOTCONN) {
1509 CERROR("can't reconnect, invalidating\n");
1510 ldlm_namespace_cleanup(ns, 1);
1511 ptlrpc_abort_inflight(imp, 0);
1513 imp->imp_flags |= IMP_INVALID;
1514 ptlrpc_req_finished(req);
1518 if (msg_flags & MSG_CONNECT_RECOVERING) {
1519 /* Replay if they want it. */
1520 DEBUG_REQ(D_HA, req, "OST wants replay");
1521 rc = ptlrpc_replay(imp);
1525 rc = ldlm_replay_locks(imp);
1529 rc = signal_completed_replay(imp);
1532 } else if (msg_flags & MSG_CONNECT_RECONNECT) {
1533 DEBUG_REQ(D_HA, req, "reconnecting to MDS\n");
1534 /* Nothing else to do here. */
1536 DEBUG_REQ(D_HA, req, "evicted: invalidating\n");
1537 /* Otherwise, clean everything up. */
1538 ldlm_namespace_cleanup(ns, 1);
1539 ptlrpc_abort_inflight(imp, 0);
1542 ptlrpc_req_finished(req);
1544 spin_lock_irqsave(&imp->imp_lock, flags);
1545 imp->imp_level = LUSTRE_CONN_FULL;
1546 imp->imp_flags &= ~IMP_INVALID;
1547 spin_unlock_irqrestore(&imp->imp_lock, flags);
1549 /* Is this the right place? Should we do this in _PREPARE
1550 * as well? What about raising the level right away?
1552 ptlrpc_wake_delayed(imp);
1554 rc = ptlrpc_resend(imp);
1558 set_osc_active(imp, 1 /* active */);
1562 /* If we get disconnected in the middle, recovery has probably
1563 * failed. Reconnect and find out.
1565 if (rc == -ENOTCONN)
1569 case PTLRPC_RECOVD_PHASE_NOTCONN:
1570 osc_recover(imp, PTLRPC_RECOVD_PHASE_PREPARE);
1571 RETURN(osc_recover(imp, PTLRPC_RECOVD_PHASE_RECOVER));
1578 static int osc_connect(struct lustre_handle *conn, struct obd_device *obd,
1579 struct obd_uuid *cluuid, struct recovd_obd *recovd,
1580 ptlrpc_recovery_cb_t recover)
1582 struct obd_import *imp = &obd->u.cli.cl_import;
1583 imp->imp_recover = osc_recover;
1584 return client_obd_connect(conn, obd, cluuid, recovd, recover);
1587 struct obd_ops osc_obd_ops = {
1588 o_owner: THIS_MODULE,
1589 o_attach: osc_attach,
1590 o_detach: osc_detach,
1591 o_setup: client_obd_setup,
1592 o_cleanup: client_obd_cleanup,
1593 o_connect: osc_connect,
1594 o_disconnect: client_obd_disconnect,
1595 o_statfs: osc_statfs,
1596 o_packmd: osc_packmd,
1597 o_unpackmd: osc_unpackmd,
1598 o_create: osc_create,
1599 o_destroy: osc_destroy,
1600 o_getattr: osc_getattr,
1601 o_setattr: osc_setattr,
1606 o_enqueue: osc_enqueue,
1607 o_cancel: osc_cancel,
1608 o_cancel_unused: osc_cancel_unused,
1609 o_iocontrol: osc_iocontrol
1612 struct obd_ops sanosc_obd_ops = {
1613 o_owner: THIS_MODULE,
1614 o_attach: osc_attach,
1615 o_detach: osc_detach,
1616 o_cleanup: client_obd_cleanup,
1617 o_connect: osc_connect,
1618 o_disconnect: client_obd_disconnect,
1619 o_statfs: osc_statfs,
1620 o_packmd: osc_packmd,
1621 o_unpackmd: osc_unpackmd,
1622 o_create: osc_create,
1623 o_destroy: osc_destroy,
1624 o_getattr: osc_getattr,
1625 o_setattr: osc_setattr,
1629 o_setup: client_sanobd_setup,
1633 o_enqueue: osc_enqueue,
1634 o_cancel: osc_cancel,
1635 o_cancel_unused: osc_cancel_unused,
1636 o_iocontrol: osc_iocontrol,
1639 int __init osc_init(void)
1641 struct lprocfs_static_vars lvars;
1645 LASSERT(sizeof(struct osc_obdo_data) <= FD_OSTDATA_SIZE);
1647 lprocfs_init_vars(&lvars);
1649 rc = class_register_type(&osc_obd_ops, lvars.module_vars,
1654 rc = class_register_type(&sanosc_obd_ops, lvars.module_vars,
1655 LUSTRE_SANOSC_NAME);
1657 class_unregister_type(LUSTRE_OSC_NAME);
1662 static void __exit osc_exit(void)
1664 class_unregister_type(LUSTRE_SANOSC_NAME);
1665 class_unregister_type(LUSTRE_OSC_NAME);
1669 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
1670 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC)");
1671 MODULE_LICENSE("GPL");
1673 module_init(osc_init);
1674 module_exit(osc_exit);