1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
4 * Copyright (C) 2001-2003 Cluster File Systems, Inc.
5 * Author Peter Braam <braam@clusterfs.com>
7 * This file is part of Lustre, http://www.lustre.org.
9 * Lustre is free software; you can redistribute it and/or
10 * modify it under the terms of version 2 of the GNU General Public
11 * License as published by the Free Software Foundation.
13 * Lustre is distributed in the hope that it will be useful,
14 * but WITHOUT ANY WARRANTY; without even the implied warranty of
15 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 * GNU General Public License for more details.
18 * You should have received a copy of the GNU General Public License
19 * along with Lustre; if not, write to the Free Software
20 * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
22 * For testing and management it is treated as an obd_device,
23 * although * it does not export a full OBD method table (the
24 * requests are coming * in over the wire, so object target modules
25 * do not have a full * method table.)
30 #define DEBUG_SUBSYSTEM S_OSC
33 #include <linux/version.h>
34 #include <linux/module.h>
36 #include <linux/highmem.h>
37 #include <linux/lustre_dlm.h>
38 #if (LINUX_VERSION_CODE >= KERNEL_VERSION(2,5,0))
39 #include <linux/workqueue.h>
40 #include <linux/smp_lock.h>
42 #include <linux/locks.h>
45 #include <liblustre.h>
48 #include <linux/kp30.h>
49 #include <linux/lustre_mds.h> /* for mds_objid */
50 #include <linux/obd_ost.h>
53 #include <linux/ctype.h>
54 #include <linux/init.h>
59 #include <linux/lustre_ha.h>
60 #include <linux/obd_support.h> /* for OBD_FAIL_CHECK */
61 #include <linux/lustre_lite.h> /* for ll_i2info */
62 #include <portals/lib-types.h> /* for PTL_MD_MAX_IOV */
63 #include <linux/lprocfs_status.h>
65 static int osc_attach(struct obd_device *dev, obd_count len, void *data)
67 struct lprocfs_static_vars lvars;
69 lprocfs_init_vars(&lvars);
70 return lprocfs_obd_attach(dev, lvars.obd_vars);
73 static int osc_detach(struct obd_device *dev)
75 return lprocfs_obd_detach(dev);
78 /* Pack OSC object metadata for disk storage (LE byte order). */
79 static int osc_packmd(struct lustre_handle *conn, struct lov_mds_md **lmmp,
80 struct lov_stripe_md *lsm)
85 lmm_size = sizeof(**lmmp);
90 OBD_FREE(*lmmp, lmm_size);
96 OBD_ALLOC(*lmmp, lmm_size);
102 LASSERT(lsm->lsm_object_id);
103 (*lmmp)->lmm_object_id = cpu_to_le64 (lsm->lsm_object_id);
109 /* Unpack OSC object metadata from disk storage (LE byte order). */
110 static int osc_unpackmd(struct lustre_handle *conn, struct lov_stripe_md **lsmp,
111 struct lov_mds_md *lmm, int lmm_bytes)
117 if (lmm_bytes < sizeof (*lmm)) {
118 CERROR("lov_mds_md too small: %d, need %d\n",
119 lmm_bytes, (int)sizeof(*lmm));
122 /* XXX LOV_MAGIC etc check? */
124 if (lmm->lmm_object_id == cpu_to_le64 (0)) {
125 CERROR ("lov_mds_md: zero lmm_object_id\n");
130 lsm_size = sizeof(**lsmp);
135 OBD_FREE(*lsmp, lsm_size);
141 OBD_ALLOC(*lsmp, lsm_size);
147 /* XXX zero *lsmp? */
148 (*lsmp)->lsm_object_id = le64_to_cpu (lmm->lmm_object_id);
149 (*lsmp)->lsm_maxbytes = LUSTRE_STRIPE_MAXBYTES;
150 LASSERT((*lsmp)->lsm_object_id);
156 #warning "FIXME: make this be sent from OST"
157 #define OSC_BRW_MAX_SIZE 65536
158 #define OSC_BRW_MAX_IOV min_t(int, PTL_MD_MAX_IOV, OSC_BRW_MAX_SIZE/PAGE_SIZE)
160 static int osc_getattr_interpret(struct ptlrpc_request *req,
161 struct osc_getattr_async_args *aa, int rc)
163 struct obdo *oa = aa->aa_oa;
164 struct ost_body *body;
168 CERROR("failed: rc = %d\n", rc);
172 body = lustre_swab_repbuf (req, 0, sizeof (*body),
173 lustre_swab_ost_body);
175 CERROR ("can't unpack ost_body\n");
179 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
180 memcpy(oa, &body->oa, sizeof(*oa));
182 /* This should really be sent by the OST */
183 oa->o_blksize = OSC_BRW_MAX_SIZE;
184 oa->o_valid |= OBD_MD_FLBLKSZ;
189 static int osc_getattr_async(struct lustre_handle *conn, struct obdo *oa,
190 struct lov_stripe_md *md,
191 struct ptlrpc_request_set *set)
193 struct ptlrpc_request *request;
194 struct ost_body *body;
195 int size = sizeof(*body);
196 struct osc_getattr_async_args *aa;
199 request = ptlrpc_prep_req(class_conn2cliimp(conn), OST_GETATTR, 1,
204 body = lustre_msg_buf(request->rq_reqmsg, 0, sizeof (*body));
205 memcpy(&body->oa, oa, sizeof(*oa));
207 request->rq_replen = lustre_msg_size(1, &size);
208 request->rq_interpret_reply = osc_getattr_interpret;
210 LASSERT (sizeof (*aa) <= sizeof (request->rq_async_args));
211 aa = (struct osc_getattr_async_args *)&request->rq_async_args;
214 ptlrpc_set_add_req (set, request);
218 static int osc_getattr(struct lustre_handle *conn, struct obdo *oa,
219 struct lov_stripe_md *md)
221 struct ptlrpc_request *request;
222 struct ost_body *body;
223 int rc, size = sizeof(*body);
226 request = ptlrpc_prep_req(class_conn2cliimp(conn), OST_GETATTR, 1,
231 body = lustre_msg_buf(request->rq_reqmsg, 0, sizeof (*body));
232 memcpy(&body->oa, oa, sizeof(*oa));
234 request->rq_replen = lustre_msg_size(1, &size);
236 rc = ptlrpc_queue_wait(request);
238 CERROR("%s failed: rc = %d\n", __FUNCTION__, rc);
242 body = lustre_swab_repbuf(request, 0, sizeof (*body),
243 lustre_swab_ost_body);
245 CERROR ("can't unpack ost_body\n");
246 GOTO (out, rc = -EPROTO);
249 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
250 memcpy(oa, &body->oa, sizeof(*oa));
252 /* This should really be sent by the OST */
253 oa->o_blksize = OSC_BRW_MAX_SIZE;
254 oa->o_valid |= OBD_MD_FLBLKSZ;
258 ptlrpc_req_finished(request);
262 /* The import lock must already be held. */
263 static inline void osc_update_body_handle(struct list_head *head,
264 struct lustre_handle *old,
265 struct lustre_handle *new, int op)
267 struct list_head *tmp;
268 struct ost_body *body;
269 struct ptlrpc_request *req;
270 struct ptlrpc_request *last_req = NULL; /* temporary fire escape */
272 list_for_each(tmp, head) {
273 req = list_entry(tmp, struct ptlrpc_request, rq_list);
275 /* XXX ok to remove when bug 1303 resolved - rread 05/27/03 */
276 LASSERT (req != last_req);
279 if (req->rq_reqmsg->opc != op)
281 body = lustre_msg_buf(req->rq_reqmsg, 0, sizeof (*body));
282 if (memcmp(obdo_handle(&body->oa), old, sizeof(*old)))
285 DEBUG_REQ(D_HA, req, "updating close body with new fh");
286 memcpy(obdo_handle(&body->oa), new, sizeof(*new));
290 static void osc_replay_open(struct ptlrpc_request *req)
292 struct lustre_handle old;
293 struct ost_body *body;
294 struct obd_client_handle *och = req->rq_replay_data;
295 struct lustre_handle *oa_handle;
298 body = lustre_swab_repbuf (req, 0, sizeof (*body),
299 lustre_swab_ost_body);
300 LASSERT (body != NULL);
302 oa_handle = obdo_handle(&body->oa);
304 memcpy(&old, &och->och_fh, sizeof(old));
305 CDEBUG(D_HA, "updating cookie from "LPD64" to "LPD64"\n",
306 och->och_fh.cookie, oa_handle->cookie);
307 memcpy(&och->och_fh, oa_handle, sizeof(och->och_fh));
309 /* A few frames up, ptlrpc_replay holds the lock, so this is safe. */
310 osc_update_body_handle(&req->rq_import->imp_sending_list, &old,
311 &och->och_fh, OST_CLOSE);
312 osc_update_body_handle(&req->rq_import->imp_delayed_list, &old,
313 &och->och_fh, OST_CLOSE);
318 static int osc_open(struct lustre_handle *conn, struct obdo *oa,
319 struct lov_stripe_md *md, struct obd_trans_info *oti,
320 struct obd_client_handle *och)
322 struct ptlrpc_request *request;
323 struct ost_body *body;
325 int rc, size = sizeof(*body);
327 LASSERT(och != NULL);
329 request = ptlrpc_prep_req(class_conn2cliimp(conn), OST_OPEN, 1, &size,
334 spin_lock_irqsave (&request->rq_lock, flags);
335 request->rq_replay = 1;
336 spin_unlock_irqrestore (&request->rq_lock, flags);
338 body = lustre_msg_buf(request->rq_reqmsg, 0, sizeof (*body));
339 memcpy(&body->oa, oa, sizeof(*oa));
341 request->rq_replen = lustre_msg_size(1, &size);
343 rc = ptlrpc_queue_wait(request);
347 body = lustre_swab_repbuf (request, 0, sizeof (*body),
348 lustre_swab_ost_body);
350 CERROR ("Can't unpack ost_body\n");
351 GOTO (out, rc = -EPROTO);
354 memcpy(oa, &body->oa, sizeof(*oa));
356 /* If the open succeeded, we better have a handle */
357 /* BlueArc OSTs don't send back (o_valid | FLHANDLE). sigh.
358 * Temporary workaround until fixed. -phil 24 Feb 03 */
359 // if ((oa->o_valid & OBD_MD_FLHANDLE) == 0) {
360 // CERROR ("No file handle\n");
361 // GOTO (out, rc = -EPROTO);
363 oa->o_valid |= OBD_MD_FLHANDLE;
365 /* This should really be sent by the OST */
366 oa->o_blksize = OSC_BRW_MAX_SIZE;
367 oa->o_valid |= OBD_MD_FLBLKSZ;
369 memcpy(&och->och_fh, obdo_handle(oa), sizeof(och->och_fh));
370 request->rq_replay_cb = osc_replay_open;
371 request->rq_replay_data = och;
372 och->och_req = ptlrpc_request_addref(request);
373 och->och_magic = OBD_CLIENT_HANDLE_MAGIC;
377 ptlrpc_req_finished(request);
381 static int osc_close(struct lustre_handle *conn, struct obdo *oa,
382 struct lov_stripe_md *md, struct obd_trans_info *oti)
384 struct obd_import *import = class_conn2cliimp(conn);
385 struct ptlrpc_request *request;
386 struct ost_body *body;
387 struct obd_client_handle *och;
389 int rc, size = sizeof(*body);
393 och = (struct obd_client_handle *)&oa->o_inline;
394 if (och->och_magic == 0) {
395 /* Zero magic means that this file was never opened on this
396 * OST--almost certainly because the OST was inactive at
400 LASSERT(och->och_magic == OBD_CLIENT_HANDLE_MAGIC);
402 request = ptlrpc_prep_req(import, OST_CLOSE, 1, &size, NULL);
406 body = lustre_msg_buf(request->rq_reqmsg, 0, sizeof (*body));
407 memcpy(&body->oa, oa, sizeof(*oa));
409 request->rq_replen = lustre_msg_size(1, &size);
411 rc = ptlrpc_queue_wait(request);
413 CDEBUG(D_HA, "Suppressing close error %d\n", rc); // bug 1036
415 /* och_req == NULL can't happen any more, right? --phik */
416 if (och->och_req != NULL) {
417 spin_lock_irqsave(&import->imp_lock, flags);
418 spin_lock (&och->och_req->rq_lock);
419 och->och_req->rq_replay = 0;
420 spin_unlock (&och->och_req->rq_lock);
421 /* see comments in llite/file.c:ll_mdc_close() */
422 if (och->och_req->rq_transno) {
423 /* this can't happen yet, because the OSTs don't yet
424 * issue transnos for OPEN requests -phik 21 Apr 2003 */
426 if (!request->rq_transno && import->imp_replayable) {
427 request->rq_transno = och->och_req->rq_transno;
428 ptlrpc_retain_replayable_request(request,
431 spin_unlock_irqrestore(&import->imp_lock, flags);
433 spin_unlock_irqrestore(&import->imp_lock, flags);
436 ptlrpc_req_finished(och->och_req);
440 body = lustre_swab_repbuf (request, 0, sizeof (*body),
441 lustre_swab_ost_body);
444 CDEBUG(D_HA, "Suppressing close error %d\n", rc); // bug 1036
446 memcpy(oa, &body->oa, sizeof(*oa));
449 ptlrpc_req_finished(request);
453 static int osc_setattr(struct lustre_handle *conn, struct obdo *oa,
454 struct lov_stripe_md *md, struct obd_trans_info *oti)
456 struct ptlrpc_request *request;
457 struct ost_body *body;
458 int rc, size = sizeof(*body);
461 request = ptlrpc_prep_req(class_conn2cliimp(conn), OST_SETATTR, 1,
466 body = lustre_msg_buf(request->rq_reqmsg, 0, sizeof (*body));
467 memcpy(&body->oa, oa, sizeof(*oa));
469 request->rq_replen = lustre_msg_size(1, &size);
471 rc = ptlrpc_queue_wait(request);
473 ptlrpc_req_finished(request);
477 static int osc_create(struct lustre_handle *conn, struct obdo *oa,
478 struct lov_stripe_md **ea, struct obd_trans_info *oti)
480 struct ptlrpc_request *request;
481 struct ost_body *body;
482 struct lov_stripe_md *lsm;
483 int rc, size = sizeof(*body);
491 rc = obd_alloc_memmd(conn, &lsm);
496 request = ptlrpc_prep_req(class_conn2cliimp(conn), OST_CREATE, 1, &size,
499 GOTO(out, rc = -ENOMEM);
501 body = lustre_msg_buf(request->rq_reqmsg, 0, sizeof (*body));
502 memcpy(&body->oa, oa, sizeof(*oa));
504 request->rq_replen = lustre_msg_size(1, &size);
506 rc = ptlrpc_queue_wait(request);
510 body = lustre_swab_repbuf (request, 0, sizeof (*body),
511 lustre_swab_ost_body);
513 CERROR ("can't unpack ost_body\n");
514 GOTO (out_req, rc = -EPROTO);
517 memcpy(oa, &body->oa, sizeof(*oa));
519 /* This should really be sent by the OST */
520 oa->o_blksize = OSC_BRW_MAX_SIZE;
521 oa->o_valid |= OBD_MD_FLBLKSZ;
523 lsm->lsm_object_id = oa->o_id;
524 lsm->lsm_stripe_count = 0;
525 lsm->lsm_maxbytes = LUSTRE_STRIPE_MAXBYTES;
529 oti->oti_transno = request->rq_repmsg->transno;
531 CDEBUG(D_HA, "transno: "LPD64"\n", request->rq_repmsg->transno);
534 ptlrpc_req_finished(request);
537 obd_free_memmd(conn, &lsm);
541 static int osc_punch(struct lustre_handle *conn, struct obdo *oa,
542 struct lov_stripe_md *md, obd_size start,
543 obd_size end, struct obd_trans_info *oti)
545 struct ptlrpc_request *request;
546 struct ost_body *body;
547 int rc, size = sizeof(*body);
555 request = ptlrpc_prep_req(class_conn2cliimp(conn), OST_PUNCH, 1, &size,
560 body = lustre_msg_buf(request->rq_reqmsg, 0, sizeof (*body));
561 memcpy(&body->oa, oa, sizeof(*oa));
563 /* overload the size and blocks fields in the oa with start/end */
564 body->oa.o_size = start;
565 body->oa.o_blocks = end;
566 body->oa.o_valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS);
568 request->rq_replen = lustre_msg_size(1, &size);
570 rc = ptlrpc_queue_wait(request);
574 body = lustre_swab_repbuf (request, 0, sizeof (*body),
575 lustre_swab_ost_body);
577 CERROR ("can't unpack ost_body\n");
578 GOTO (out, rc = -EPROTO);
581 memcpy(oa, &body->oa, sizeof(*oa));
585 ptlrpc_req_finished(request);
589 static int osc_destroy(struct lustre_handle *conn, struct obdo *oa,
590 struct lov_stripe_md *ea, struct obd_trans_info *oti)
592 struct ptlrpc_request *request;
593 struct ost_body *body;
594 int rc, size = sizeof(*body);
601 request = ptlrpc_prep_req(class_conn2cliimp(conn), OST_DESTROY, 1,
606 body = lustre_msg_buf(request->rq_reqmsg, 0, sizeof (*body));
607 memcpy(&body->oa, oa, sizeof(*oa));
609 request->rq_replen = lustre_msg_size(1, &size);
611 rc = ptlrpc_queue_wait(request);
615 body = lustre_swab_repbuf (request, 0, sizeof (*body),
616 lustre_swab_ost_body);
618 CERROR ("Can't unpack body\n");
619 GOTO (out, rc = -EPROTO);
622 memcpy(oa, &body->oa, sizeof(*oa));
626 ptlrpc_req_finished(request);
630 /* We assume that the reason this OSC got a short read is because it read
631 * beyond the end of a stripe file; i.e. lustre is reading a sparse file
632 * via the LOV, and it _knows_ it's reading inside the file, it's just that
633 * this stripe never got written at or beyond this stripe offset yet. */
634 static void handle_short_read(int nob_read, obd_count page_count,
635 struct brw_page *pga)
639 /* skip bytes read OK */
640 while (nob_read > 0) {
641 LASSERT (page_count > 0);
643 if (pga->count > nob_read) {
644 /* EOF inside this page */
645 ptr = kmap(pga->pg) + (pga->off & ~PAGE_MASK);
646 memset(ptr + nob_read, 0, pga->count - nob_read);
653 nob_read -= pga->count;
658 /* zero remaining pages */
659 while (page_count-- > 0) {
660 ptr = kmap(pga->pg) + (pga->off & ~PAGE_MASK);
661 memset(ptr, 0, pga->count);
667 static int check_write_rcs (struct ptlrpc_request *request,
668 int niocount, obd_count page_count,
669 struct brw_page *pga)
674 /* return error if any niobuf was in error */
675 remote_rcs = lustre_swab_repbuf(request, 1,
676 sizeof(*remote_rcs) * niocount, NULL);
677 if (remote_rcs == NULL) {
678 CERROR ("Missing/short RC vector on BRW_WRITE reply\n");
681 if (lustre_msg_swabbed (request->rq_repmsg))
682 for (i = 0; i < niocount; i++)
683 __swab32s (&remote_rcs[i]);
685 for (i = 0; i < niocount; i++) {
686 if (remote_rcs[i] < 0)
687 return (remote_rcs[i]);
689 if (remote_rcs[i] != 0) {
690 CERROR ("rc[%d] invalid (%d) req %p\n",
691 i, remote_rcs[i], request);
699 static inline int can_merge_pages (struct brw_page *p1, struct brw_page *p2)
701 if (p1->flag != p2->flag) {
702 /* XXX we don't make much use of 'flag' right now
703 * but this will warn about usage when we do */
704 CERROR ("different flags set %d, %d\n",
709 return (p1->off + p1->count == p2->off);
713 static __u64 cksum_pages(int nob, obd_count page_count, struct brw_page *pga)
720 LASSERT (page_count > 0);
722 ptr = kmap (pga->pg);
723 ost_checksum (&cksum, ptr + (pga->off & (PAGE_SIZE - 1)),
724 pga->count > nob ? nob : pga->count);
736 static int osc_brw_prep_request(struct obd_import *imp,
737 struct lov_stripe_md *lsm, obd_count page_count,
738 struct brw_page *pga, int cmd,
739 int *requested_nobp, int *niocountp,
740 struct ptlrpc_request **reqp)
742 struct ptlrpc_request *req;
743 struct ptlrpc_bulk_desc *desc;
744 struct ost_body *body;
745 struct obd_ioobj *ioobj;
746 struct niobuf_remote *niobuf;
755 opc = ((cmd & OBD_BRW_WRITE) != 0) ? OST_WRITE : OST_READ;
757 for (niocount = i = 1; i < page_count; i++)
758 if (!can_merge_pages (&pga[i - 1], &pga[i]))
761 size[0] = sizeof (*body);
762 size[1] = sizeof (*ioobj);
763 size[2] = niocount * sizeof (*niobuf);
765 req = ptlrpc_prep_req (imp, opc, 3, size, NULL);
769 if (opc == OST_WRITE)
770 desc = ptlrpc_prep_bulk_imp(req, BULK_GET_SOURCE,
773 desc = ptlrpc_prep_bulk_imp(req, BULK_PUT_SINK,
776 GOTO (out, rc = -ENOMEM);
777 /* NB request now owns desc and will free it when it gets freed */
779 body = lustre_msg_buf(req->rq_reqmsg, 0, sizeof(*body));
780 ioobj = lustre_msg_buf(req->rq_reqmsg, 1, sizeof(*ioobj));
781 niobuf = lustre_msg_buf(req->rq_reqmsg, 2, niocount * sizeof(*niobuf));
783 ioobj->ioo_id = lsm->lsm_object_id;
785 ioobj->ioo_type = S_IFREG;
786 ioobj->ioo_bufcnt = niocount;
788 LASSERT (page_count > 0);
789 for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
790 struct brw_page *pg = &pga[i];
791 struct brw_page *pg_prev = pg - 1;
793 LASSERT (pg->count > 0);
794 LASSERT ((pg->off & (PAGE_SIZE - 1)) + pg->count <= PAGE_SIZE);
795 LASSERT (i == 0 || pg->off > pg_prev->off);
797 rc = ptlrpc_prep_bulk_page (desc, pg->pg,
798 pg->off & (PAGE_SIZE - 1),
803 requested_nob += pg->count;
806 can_merge_pages (pg_prev, pg)) {
808 niobuf->len += pg->count;
810 niobuf->offset = pg->off;
811 niobuf->len = pg->count;
812 niobuf->flags = pg->flag;
816 LASSERT ((void *)(niobuf - niocount) ==
817 lustre_msg_buf(req->rq_reqmsg, 2, niocount * sizeof(*niobuf)));
819 body->oa.o_valid |= OBD_MD_FLCKSUM;
820 if (opc == OST_BRW_WRITE)
821 body->oa.o_rdev = cksum_pages (requested_nob, page_count, pga);
823 spin_lock_irqsave (&req->rq_lock, flags);
824 req->rq_no_resend = 1;
825 spin_unlock_irqrestore (&req->rq_lock, flags);
827 /* size[0] still sizeof (*body) */
828 if (opc == OST_WRITE) {
829 /* 1 RC per niobuf */
830 size[1] = sizeof(__u32) * niocount;
831 req->rq_replen = lustre_msg_size(2, size);
833 /* 1 RC for the whole I/O */
834 req->rq_replen = lustre_msg_size(1, size);
837 *niocountp = niocount;
838 *requested_nobp = requested_nob;
843 ptlrpc_req_finished (req);
847 static int osc_brw_fini_request (struct ptlrpc_request *req,
848 int requested_nob, int niocount,
849 obd_count page_count, struct brw_page *pga,
855 if (req->rq_reqmsg->opc == OST_WRITE) {
857 CERROR ("Unexpected +ve rc %d\n", rc);
861 return (check_write_rcs(req, niocount, page_count, pga));
864 if (rc > requested_nob) {
865 CERROR ("Unexpected rc %d (%d requested)\n",
870 if (rc < requested_nob)
871 handle_short_read (rc, page_count, pga);
874 imp = req->rq_import;
875 body = lustre_swab_repmsg (req, 0, sizeof (*body),
876 lustre_swab_ost_body);
878 CERROR ("Can't unpack body\n");
879 } else if (body->oa.o_valid & OBD_MD_FLCKSUM) {
880 static int cksum_counter;
881 __u64 server_cksum = body->oa.o_rdev;
882 __u64 cksum = cksum_pages (rc, page_count, pga);
885 if (server_cksum != cksum) {
886 CERROR("Bad checksum: server "LPX64", client "LPX64
887 ", server NID "LPX64"\n", server_cksum, cksum,
888 imp->imp_connection->c_peer.peer_nid);
890 } else if ((cksum_counter & (-cksum_counter)) == cksum_counter)
891 CERROR("Checksum %u from "LPX64" OK: "LPX64"\n",
893 imp->imp_connection->c_peer.peer_nid, cksum);
895 static int cksum_missed;
897 if ((cksum_missed & (-cksum_missed)) == cksum_missed)
898 CERROR("Request checksum %u from "LPX64", no reply\n",
900 imp->imp_connection->c_peer.peer_nid);
906 static int osc_brw_internal(struct lustre_handle *conn,
907 struct lov_stripe_md *lsm,
908 obd_count page_count, struct brw_page *pga, int cmd)
912 struct ptlrpc_request *request;
917 rc = osc_brw_prep_request(class_conn2cliimp(conn), lsm, page_count, pga,
918 cmd, &requested_nob, &niocount, &request);
919 /* NB ^ sets rq_no_resend */
924 rc = ptlrpc_queue_wait(request);
926 if (rc == -ETIMEDOUT && request->rq_resend) {
927 DEBUG_REQ(D_HA, request, "BULK TIMEOUT");
928 ptlrpc_req_finished(request);
932 rc = osc_brw_fini_request (request, requested_nob, niocount,
933 page_count, pga, rc);
935 ptlrpc_req_finished(request);
939 static int brw_interpret(struct ptlrpc_request *request,
940 struct osc_brw_async_args *aa, int rc)
942 int requested_nob = aa->aa_requested_nob;
943 int niocount = aa->aa_nio_count;
944 obd_count page_count = aa->aa_page_count;
945 struct brw_page *pga = aa->aa_pga;
948 /* XXX bug 937 here */
949 if (rc == -ETIMEDOUT && request->rq_resend) {
950 DEBUG_REQ(D_HA, request, "BULK TIMEOUT");
951 LBUG(); /* re-send. later. */
955 rc = osc_brw_fini_request (request, requested_nob, niocount,
956 page_count, pga, rc);
960 static int async_internal(struct lustre_handle *conn, struct lov_stripe_md *lsm,
961 obd_count page_count, struct brw_page *pga,
962 struct ptlrpc_request_set *set, int cmd)
964 struct ptlrpc_request *request;
967 struct osc_brw_async_args *aa;
971 rc = osc_brw_prep_request (class_conn2cliimp(conn),
972 lsm, page_count, pga, cmd,
973 &requested_nob, &nio_count, &request);
974 /* NB ^ sets rq_no_resend */
977 LASSERT (sizeof (*aa) <= sizeof (request->rq_async_args));
978 aa = (struct osc_brw_async_args *)&request->rq_async_args;
979 aa->aa_requested_nob = requested_nob;
980 aa->aa_nio_count = nio_count;
981 aa->aa_page_count = page_count;
984 request->rq_interpret_reply = brw_interpret;
985 ptlrpc_set_add_req(set, request);
991 #define min_t(type,x,y) \
992 ({ type __x = (x); type __y = (y); __x < __y ? __x: __y; })
996 * ugh, we want disk allocation on the target to happen in offset order. we'll
997 * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
998 * fine for our small page arrays and doesn't require allocation. its an
999 * insertion sort that swaps elements that are strides apart, shrinking the
1000 * stride down until its '1' and the array is sorted.
1002 static void sort_brw_pages(struct brw_page *array, int num)
1005 struct brw_page tmp;
1009 for (stride = 1; stride < num ; stride = (stride * 3) + 1)
1014 for (i = stride ; i < num ; i++) {
1017 while (j >= stride && array[j - stride].off > tmp.off) {
1018 array[j] = array[j - stride];
1023 } while (stride > 1);
1026 /* make sure we the regions we're passing to elan don't violate its '4
1027 * fragments' constraint. portal headers are a fragment, all full
1028 * PAGE_SIZE long pages count as 1 fragment, and each partial page
1029 * counts as a fragment. I think. see bug 934. */
1030 static obd_count check_elan_limit(struct brw_page *pg, obd_count pages)
1033 int saw_whole_frag = 0;
1036 for (i = 0 ; frags_left && i < pages ; pg++, i++) {
1037 if (pg->count == PAGE_SIZE) {
1038 if (!saw_whole_frag) {
1049 static int osc_brw(int cmd, struct lustre_handle *conn,
1050 struct lov_stripe_md *md, obd_count page_count,
1051 struct brw_page *pga, struct obd_trans_info *oti)
1055 if (cmd == OBD_BRW_CHECK) {
1056 /* The caller just wants to know if there's a chance that this
1057 * I/O can succeed */
1058 struct obd_import *imp = class_conn2cliimp(conn);
1060 if (imp == NULL || imp->imp_invalid)
1065 while (page_count) {
1066 obd_count pages_per_brw;
1069 if (page_count > OSC_BRW_MAX_IOV)
1070 pages_per_brw = OSC_BRW_MAX_IOV;
1072 pages_per_brw = page_count;
1074 sort_brw_pages(pga, pages_per_brw);
1075 pages_per_brw = check_elan_limit(pga, pages_per_brw);
1077 rc = osc_brw_internal(conn, md, pages_per_brw, pga, cmd);
1082 page_count -= pages_per_brw;
1083 pga += pages_per_brw;
1088 static int osc_brw_async(int cmd, struct lustre_handle *conn,
1089 struct lov_stripe_md *md, obd_count page_count,
1090 struct brw_page *pga, struct ptlrpc_request_set *set,
1091 struct obd_trans_info *oti)
1095 if (cmd == OBD_BRW_CHECK) {
1096 /* The caller just wants to know if there's a chance that this
1097 * I/O can succeed */
1098 struct obd_import *imp = class_conn2cliimp(conn);
1100 if (imp == NULL || imp->imp_invalid)
1105 while (page_count) {
1106 obd_count pages_per_brw;
1109 if (page_count > OSC_BRW_MAX_IOV)
1110 pages_per_brw = OSC_BRW_MAX_IOV;
1112 pages_per_brw = page_count;
1114 sort_brw_pages(pga, pages_per_brw);
1115 pages_per_brw = check_elan_limit(pga, pages_per_brw);
1117 rc = async_internal(conn, md, pages_per_brw, pga, set, cmd);
1122 page_count -= pages_per_brw;
1123 pga += pages_per_brw;
1129 /* Note: caller will lock/unlock, and set uptodate on the pages */
1130 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
1131 static int sanosc_brw_read(struct lustre_handle *conn,
1132 struct lov_stripe_md *lsm,
1133 obd_count page_count,
1134 struct brw_page *pga)
1136 struct ptlrpc_request *request = NULL;
1137 struct ost_body *body;
1138 struct niobuf_remote *nioptr;
1139 struct obd_ioobj *iooptr;
1140 int rc, size[3] = {sizeof(*body)}, mapped = 0;
1144 /* XXX does not handle 'new' brw protocol */
1146 size[1] = sizeof(struct obd_ioobj);
1147 size[2] = page_count * sizeof(*nioptr);
1149 request = ptlrpc_prep_req(class_conn2cliimp(conn), OST_SAN_READ, 3,
1154 body = lustre_msg_buf(request->rq_reqmsg, 0, sizeof (*body));
1155 iooptr = lustre_msg_buf(request->rq_reqmsg, 1, sizeof (*iooptr));
1156 nioptr = lustre_msg_buf(request->rq_reqmsg, 2,
1157 sizeof (*nioptr) * page_count);
1159 iooptr->ioo_id = lsm->lsm_object_id;
1161 iooptr->ioo_type = S_IFREG;
1162 iooptr->ioo_bufcnt = page_count;
1164 for (mapped = 0; mapped < page_count; mapped++, nioptr++) {
1165 LASSERT(PageLocked(pga[mapped].pg));
1166 LASSERT(mapped == 0 || pga[mapped].off > pga[mapped - 1].off);
1168 nioptr->offset = pga[mapped].off;
1169 nioptr->len = pga[mapped].count;
1170 nioptr->flags = pga[mapped].flag;
1173 size[1] = page_count * sizeof(*nioptr);
1174 request->rq_replen = lustre_msg_size(2, size);
1176 rc = ptlrpc_queue_wait(request);
1180 swab = lustre_msg_swabbed (request->rq_repmsg);
1181 LASSERT_REPSWAB (request, 1);
1182 nioptr = lustre_msg_buf(request->rq_repmsg, 1, size[1]);
1184 /* nioptr missing or short */
1185 GOTO(out_req, rc = -EPROTO);
1189 for (mapped = 0; mapped < page_count; mapped++, nioptr++) {
1190 struct page *page = pga[mapped].pg;
1191 struct buffer_head *bh;
1195 lustre_swab_niobuf_remote (nioptr);
1197 /* got san device associated */
1198 LASSERT(class_conn2obd(conn));
1199 dev = class_conn2obd(conn)->u.cli.cl_sandev;
1202 if (!nioptr->offset) {
1203 CDEBUG(D_PAGE, "hole at ino %lu; index %ld\n",
1204 page->mapping->host->i_ino,
1206 memset(page_address(page), 0, PAGE_SIZE);
1210 if (!page->buffers) {
1211 create_empty_buffers(page, dev, PAGE_SIZE);
1214 clear_bit(BH_New, &bh->b_state);
1215 set_bit(BH_Mapped, &bh->b_state);
1216 bh->b_blocknr = (unsigned long)nioptr->offset;
1218 clear_bit(BH_Uptodate, &bh->b_state);
1220 ll_rw_block(READ, 1, &bh);
1224 /* if buffer already existed, it must be the
1225 * one we mapped before, check it */
1226 LASSERT(!test_bit(BH_New, &bh->b_state));
1227 LASSERT(test_bit(BH_Mapped, &bh->b_state));
1228 LASSERT(bh->b_blocknr == (unsigned long)nioptr->offset);
1230 /* wait it's io completion */
1231 if (test_bit(BH_Lock, &bh->b_state))
1234 if (!test_bit(BH_Uptodate, &bh->b_state))
1235 ll_rw_block(READ, 1, &bh);
1239 /* must do syncronous write here */
1241 if (!buffer_uptodate(bh)) {
1249 ptlrpc_req_finished(request);
1253 static int sanosc_brw_write(struct lustre_handle *conn,
1254 struct lov_stripe_md *lsm,
1255 obd_count page_count,
1256 struct brw_page *pga)
1258 struct ptlrpc_request *request = NULL;
1259 struct ost_body *body;
1260 struct niobuf_remote *nioptr;
1261 struct obd_ioobj *iooptr;
1262 int rc, size[3] = {sizeof(*body)}, mapped = 0;
1266 size[1] = sizeof(struct obd_ioobj);
1267 size[2] = page_count * sizeof(*nioptr);
1269 request = ptlrpc_prep_req(class_conn2cliimp(conn), OST_SAN_WRITE,
1274 body = lustre_msg_buf(request->rq_reqmsg, 0, sizeof (*body));
1275 iooptr = lustre_msg_buf(request->rq_reqmsg, 1, sizeof (*iooptr));
1276 nioptr = lustre_msg_buf(request->rq_reqmsg, 2,
1277 sizeof (*nioptr) * page_count);
1279 iooptr->ioo_id = lsm->lsm_object_id;
1281 iooptr->ioo_type = S_IFREG;
1282 iooptr->ioo_bufcnt = page_count;
1285 for (mapped = 0; mapped < page_count; mapped++, nioptr++) {
1286 LASSERT(PageLocked(pga[mapped].pg));
1287 LASSERT(mapped == 0 || pga[mapped].off > pga[mapped - 1].off);
1289 nioptr->offset = pga[mapped].off;
1290 nioptr->len = pga[mapped].count;
1291 nioptr->flags = pga[mapped].flag;
1294 size[1] = page_count * sizeof(*nioptr);
1295 request->rq_replen = lustre_msg_size(2, size);
1297 rc = ptlrpc_queue_wait(request);
1301 swab = lustre_msg_swabbed (request->rq_repmsg);
1302 LASSERT_REPSWAB (request, 1);
1303 nioptr = lustre_msg_buf(request->rq_repmsg, 1, size[1]);
1305 CERROR("absent/short niobuf array\n");
1306 GOTO(out_req, rc = -EPROTO);
1310 for (mapped = 0; mapped < page_count; mapped++, nioptr++) {
1311 struct page *page = pga[mapped].pg;
1312 struct buffer_head *bh;
1316 lustre_swab_niobuf_remote (nioptr);
1318 /* got san device associated */
1319 LASSERT(class_conn2obd(conn));
1320 dev = class_conn2obd(conn)->u.cli.cl_sandev;
1322 if (!page->buffers) {
1323 create_empty_buffers(page, dev, PAGE_SIZE);
1326 LASSERT(!test_bit(BH_New, &page->buffers->b_state));
1327 LASSERT(test_bit(BH_Mapped, &page->buffers->b_state));
1328 LASSERT(page->buffers->b_blocknr ==
1329 (unsigned long)nioptr->offset);
1335 /* if buffer locked, wait it's io completion */
1336 if (test_bit(BH_Lock, &bh->b_state))
1339 clear_bit(BH_New, &bh->b_state);
1340 set_bit(BH_Mapped, &bh->b_state);
1342 /* override the block nr */
1343 bh->b_blocknr = (unsigned long)nioptr->offset;
1345 /* we are about to write it, so set it
1347 * page lock should garentee no race condition here */
1348 set_bit(BH_Uptodate, &bh->b_state);
1349 set_bit(BH_Dirty, &bh->b_state);
1351 ll_rw_block(WRITE, 1, &bh);
1353 /* must do syncronous write here */
1355 if (!buffer_uptodate(bh) || test_bit(BH_Dirty, &bh->b_state)) {
1363 ptlrpc_req_finished(request);
1367 static int sanosc_brw(int cmd, struct lustre_handle *conn,
1368 struct lov_stripe_md *lsm, obd_count page_count,
1369 struct brw_page *pga, struct obd_trans_info *oti)
1373 while (page_count) {
1374 obd_count pages_per_brw;
1377 if (page_count > OSC_BRW_MAX_IOV)
1378 pages_per_brw = OSC_BRW_MAX_IOV;
1380 pages_per_brw = page_count;
1382 if (cmd & OBD_BRW_WRITE)
1383 rc = sanosc_brw_write(conn, lsm, pages_per_brw, pga);
1385 rc = sanosc_brw_read(conn, lsm, pages_per_brw, pga);
1390 page_count -= pages_per_brw;
1391 pga += pages_per_brw;
1398 static int osc_enqueue(struct lustre_handle *connh, struct lov_stripe_md *lsm,
1399 struct lustre_handle *parent_lock,
1400 __u32 type, void *extentp, int extent_len, __u32 mode,
1401 int *flags, void *callback, void *data,
1402 struct lustre_handle *lockh)
1404 struct ldlm_res_id res_id = { .name = {lsm->lsm_object_id} };
1405 struct obd_device *obddev = class_conn2obd(connh);
1406 struct ldlm_extent *extent = extentp;
1410 /* Filesystem lock extents are extended to page boundaries so that
1411 * dealing with the page cache is a little smoother. */
1412 extent->start -= extent->start & ~PAGE_MASK;
1413 extent->end |= ~PAGE_MASK;
1415 /* Next, search for already existing extent locks that will cover us */
1416 rc = ldlm_lock_match(obddev->obd_namespace, LDLM_FL_MATCH_DATA, &res_id,
1417 type, extent, sizeof(extent), mode, data, lockh);
1419 /* We already have a lock, and it's referenced */
1422 /* If we're trying to read, we also search for an existing PW lock. The
1423 * VFS and page cache already protect us locally, so lots of readers/
1424 * writers can share a single PW lock.
1426 * There are problems with conversion deadlocks, so instead of
1427 * converting a read lock to a write lock, we'll just enqueue a new
1430 * At some point we should cancel the read lock instead of making them
1431 * send us a blocking callback, but there are problems with canceling
1432 * locks out from other users right now, too. */
1434 if (mode == LCK_PR) {
1435 rc = ldlm_lock_match(obddev->obd_namespace, LDLM_FL_MATCH_DATA,
1436 &res_id, type, extent, sizeof(extent),
1437 LCK_PW, data, lockh);
1439 /* FIXME: This is not incredibly elegant, but it might
1440 * be more elegant than adding another parameter to
1441 * lock_match. I want a second opinion. */
1442 ldlm_lock_addref(lockh, LCK_PR);
1443 ldlm_lock_decref(lockh, LCK_PW);
1449 rc = ldlm_cli_enqueue(connh, NULL, obddev->obd_namespace, parent_lock,
1450 res_id, type, extent, sizeof(extent), mode, flags,
1451 ldlm_completion_ast, callback, data, lockh);
1455 static int osc_match(struct lustre_handle *connh, struct lov_stripe_md *lsm,
1456 __u32 type, void *extentp, int extent_len, __u32 mode,
1457 int *flags, void *data, struct lustre_handle *lockh)
1459 struct ldlm_res_id res_id = { .name = {lsm->lsm_object_id} };
1460 struct obd_device *obddev = class_conn2obd(connh);
1461 struct ldlm_extent *extent = extentp;
1465 /* Filesystem lock extents are extended to page boundaries so that
1466 * dealing with the page cache is a little smoother */
1467 extent->start -= extent->start & ~PAGE_MASK;
1468 extent->end |= ~PAGE_MASK;
1470 /* Next, search for already existing extent locks that will cover us */
1471 rc = ldlm_lock_match(obddev->obd_namespace, *flags, &res_id, type,
1472 extent, sizeof(extent), mode, data, lockh);
1476 /* If we're trying to read, we also search for an existing PW lock. The
1477 * VFS and page cache already protect us locally, so lots of readers/
1478 * writers can share a single PW lock. */
1479 if (mode == LCK_PR) {
1480 rc = ldlm_lock_match(obddev->obd_namespace, *flags, &res_id,
1481 type, extent, sizeof(extent), LCK_PW,
1484 /* FIXME: This is not incredibly elegant, but it might
1485 * be more elegant than adding another parameter to
1486 * lock_match. I want a second opinion. */
1487 ldlm_lock_addref(lockh, LCK_PR);
1488 ldlm_lock_decref(lockh, LCK_PW);
1494 static int osc_cancel(struct lustre_handle *oconn, struct lov_stripe_md *md,
1495 __u32 mode, struct lustre_handle *lockh)
1499 ldlm_lock_decref(lockh, mode);
1504 static int osc_cancel_unused(struct lustre_handle *connh,
1505 struct lov_stripe_md *lsm, int flags, void *opaque)
1507 struct obd_device *obddev = class_conn2obd(connh);
1508 struct ldlm_res_id res_id = { .name = {lsm->lsm_object_id} };
1510 return ldlm_cli_cancel_unused(obddev->obd_namespace, &res_id, flags,
1514 static int osc_statfs(struct lustre_handle *conn, struct obd_statfs *osfs)
1516 struct obd_statfs *msfs;
1517 struct ptlrpc_request *request;
1518 int rc, size = sizeof(*osfs);
1521 request = ptlrpc_prep_req(class_conn2cliimp(conn), OST_STATFS, 0, NULL,
1526 request->rq_replen = lustre_msg_size(1, &size);
1528 rc = ptlrpc_queue_wait(request);
1530 CERROR("%s failed: rc = %d\n", __FUNCTION__, rc);
1534 msfs = lustre_swab_repbuf (request, 0, sizeof (*msfs),
1535 lustre_swab_obd_statfs);
1537 CERROR ("Can't unpack obd_statfs\n");
1538 GOTO (out, rc = -EPROTO);
1541 memcpy (osfs, msfs, sizeof (*msfs));
1545 ptlrpc_req_finished(request);
1549 /* Retrieve object striping information.
1551 * @lmmu is a pointer to an in-core struct with lmm_ost_count indicating
1552 * the maximum number of OST indices which will fit in the user buffer.
1553 * lmm_magic must be LOV_MAGIC (we only use 1 slot here).
1555 static int osc_getstripe(struct lustre_handle *conn, struct lov_stripe_md *lsm,
1556 struct lov_mds_md *lmmu)
1558 struct lov_mds_md lmm, *lmmk;
1565 rc = copy_from_user(&lmm, lmmu, sizeof(lmm));
1569 if (lmm.lmm_magic != LOV_MAGIC)
1572 if (lmm.lmm_ost_count < 1)
1575 lmm_size = sizeof(lmm) + sizeof(lmm.lmm_objects[0]);
1576 OBD_ALLOC(lmmk, lmm_size);
1580 lmmk->lmm_stripe_count = 1;
1581 lmmk->lmm_ost_count = 1;
1582 lmmk->lmm_object_id = lsm->lsm_object_id;
1583 lmmk->lmm_objects[0].l_object_id = lsm->lsm_object_id;
1585 if (copy_to_user(lmmu, lmmk, lmm_size))
1588 OBD_FREE(lmmk, lmm_size);
1593 static int osc_iocontrol(unsigned int cmd, struct lustre_handle *conn, int len,
1594 void *karg, void *uarg)
1596 struct obd_device *obddev = class_conn2obd(conn);
1597 struct obd_ioctl_data *data = karg;
1602 case IOC_OSC_REGISTER_LOV: {
1603 if (obddev->u.cli.cl_containing_lov)
1604 GOTO(out, err = -EALREADY);
1605 obddev->u.cli.cl_containing_lov = (struct obd_device *)karg;
1608 case OBD_IOC_LOV_GET_CONFIG: {
1610 struct lov_desc *desc;
1611 struct obd_uuid uuid;
1615 if (obd_ioctl_getdata(&buf, &len, (void *)uarg))
1616 GOTO(out, err = -EINVAL);
1618 data = (struct obd_ioctl_data *)buf;
1620 if (sizeof(*desc) > data->ioc_inllen1) {
1622 GOTO(out, err = -EINVAL);
1625 if (data->ioc_inllen2 < sizeof(uuid)) {
1627 GOTO(out, err = -EINVAL);
1630 desc = (struct lov_desc *)data->ioc_inlbuf1;
1631 desc->ld_tgt_count = 1;
1632 desc->ld_active_tgt_count = 1;
1633 desc->ld_default_stripe_count = 1;
1634 desc->ld_default_stripe_size = 0;
1635 desc->ld_default_stripe_offset = 0;
1636 desc->ld_pattern = 0;
1637 memcpy(&desc->ld_uuid, &obddev->obd_uuid, sizeof(uuid));
1639 memcpy(data->ioc_inlbuf2, &obddev->obd_uuid, sizeof(uuid));
1641 err = copy_to_user((void *)uarg, buf, len);
1644 obd_ioctl_freedata(buf, len);
1647 case LL_IOC_LOV_SETSTRIPE:
1648 err = obd_alloc_memmd(conn, karg);
1652 case LL_IOC_LOV_GETSTRIPE:
1653 err = osc_getstripe(conn, karg, uarg);
1655 case OBD_IOC_CLIENT_RECOVER:
1656 err = ptlrpc_recover_import(obddev->u.cli.cl_import,
1659 case IOC_OSC_SET_ACTIVE:
1660 err = ptlrpc_set_import_active(obddev->u.cli.cl_import,
1664 CERROR ("osc_ioctl(): unrecognised ioctl %#x\n", cmd);
1665 GOTO(out, err = -ENOTTY);
1671 static int osc_get_info(struct lustre_handle *conn, obd_count keylen,
1672 void *key, __u32 *vallen, void *val)
1675 if (!vallen || !val)
1678 if (keylen > strlen("lock_to_stripe") &&
1679 strcmp(key, "lock_to_stripe") == 0) {
1680 __u32 *stripe = val;
1681 *vallen = sizeof(*stripe);
1688 struct obd_ops osc_obd_ops = {
1689 o_owner: THIS_MODULE,
1690 o_attach: osc_attach,
1691 o_detach: osc_detach,
1692 o_setup: client_obd_setup,
1693 o_cleanup: client_obd_cleanup,
1694 o_connect: client_import_connect,
1695 o_disconnect: client_import_disconnect,
1696 o_statfs: osc_statfs,
1697 o_packmd: osc_packmd,
1698 o_unpackmd: osc_unpackmd,
1699 o_create: osc_create,
1700 o_destroy: osc_destroy,
1701 o_getattr: osc_getattr,
1702 o_getattr_async: osc_getattr_async,
1703 o_setattr: osc_setattr,
1707 o_brw_async: osc_brw_async,
1709 o_enqueue: osc_enqueue,
1711 o_cancel: osc_cancel,
1712 o_cancel_unused: osc_cancel_unused,
1713 o_iocontrol: osc_iocontrol,
1714 o_get_info: osc_get_info
1717 struct obd_ops sanosc_obd_ops = {
1718 o_owner: THIS_MODULE,
1719 o_attach: osc_attach,
1720 o_detach: osc_detach,
1721 o_cleanup: client_obd_cleanup,
1722 o_connect: client_import_connect,
1723 o_disconnect: client_import_disconnect,
1724 o_statfs: osc_statfs,
1725 o_packmd: osc_packmd,
1726 o_unpackmd: osc_unpackmd,
1727 o_create: osc_create,
1728 o_destroy: osc_destroy,
1729 o_getattr: osc_getattr,
1730 o_getattr_async: osc_getattr_async,
1731 o_setattr: osc_setattr,
1735 o_setup: client_sanobd_setup,
1739 o_enqueue: osc_enqueue,
1741 o_cancel: osc_cancel,
1742 o_cancel_unused: osc_cancel_unused,
1743 o_iocontrol: osc_iocontrol,
1746 int __init osc_init(void)
1748 struct lprocfs_static_vars lvars;
1752 LASSERT(sizeof(struct obd_client_handle) <= FD_OSTDATA_SIZE);
1753 LASSERT(sizeof(struct obd_client_handle) <= OBD_INLINESZ);
1755 lprocfs_init_vars(&lvars);
1757 rc = class_register_type(&osc_obd_ops, lvars.module_vars,
1762 rc = class_register_type(&sanosc_obd_ops, lvars.module_vars,
1763 LUSTRE_SANOSC_NAME);
1765 class_unregister_type(LUSTRE_OSC_NAME);
1770 static void __exit osc_exit(void)
1772 class_unregister_type(LUSTRE_SANOSC_NAME);
1773 class_unregister_type(LUSTRE_OSC_NAME);
1777 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
1778 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC)");
1779 MODULE_LICENSE("GPL");
1781 module_init(osc_init);
1782 module_exit(osc_exit);