1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
4 * Copyright (C) 2001, 2002 Cluster File Systems, Inc.
5 * Author Peter Braam <braam@clusterfs.com>
7 * This file is part of Lustre, http://www.lustre.org.
9 * Lustre is free software; you can redistribute it and/or
10 * modify it under the terms of version 2 of the GNU General Public
11 * License as published by the Free Software Foundation.
13 * Lustre is distributed in the hope that it will be useful,
14 * but WITHOUT ANY WARRANTY; without even the implied warranty of
15 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 * GNU General Public License for more details.
18 * You should have received a copy of the GNU General Public License
19 * along with Lustre; if not, write to the Free Software
20 * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
22 * For testing and management it is treated as an obd_device,
23 * although * it does not export a full OBD method table (the
24 * requests are coming * in over the wire, so object target modules
25 * do not have a full * method table.)
30 #define DEBUG_SUBSYSTEM S_OSC
32 #include <linux/version.h>
33 #include <linux/module.h>
35 #include <linux/highmem.h>
36 #include <linux/lustre_dlm.h>
37 #if (LINUX_VERSION_CODE >= KERNEL_VERSION(2,5,0))
38 #include <linux/workqueue.h>
40 #include <linux/kp30.h>
41 #include <linux/lustre_mds.h> /* for mds_objid */
42 #include <linux/obd_ost.h>
43 #include <linux/obd_lov.h> /* for IOC_LOV_SET_OSC_ACTIVE */
44 #include <linux/ctype.h>
45 #include <linux/init.h>
46 #include <linux/lustre_ha.h>
47 #include <linux/obd_support.h> /* for OBD_FAIL_CHECK */
48 #include <linux/lustre_lite.h> /* for ll_i2info */
49 #include <portals/lib-types.h> /* for PTL_MD_MAX_IOV */
50 #include <linux/lprocfs_status.h>
52 extern struct lprocfs_vars status_var_nm_1[];
53 extern struct lprocfs_vars status_class_var[];
55 static int osc_attach(struct obd_device *dev, obd_count len, void *data)
57 return lprocfs_reg_obd(dev, status_var_nm_1, dev);
60 static int osc_detach(struct obd_device *dev)
62 return lprocfs_dereg_obd(dev);
65 /* Pack OSC object metadata for shipment to the MDS. */
66 static int osc_packmd(struct lustre_handle *conn, struct lov_mds_md **lmmp,
67 struct lov_stripe_md *lsm)
72 lmm_size = sizeof(**lmmp);
77 OBD_FREE(*lmmp, lmm_size);
83 OBD_ALLOC(*lmmp, lmm_size);
88 LASSERT(lsm->lsm_object_id);
89 (*lmmp)->lmm_object_id = (lsm->lsm_object_id);
95 static int osc_unpackmd(struct lustre_handle *conn, struct lov_stripe_md **lsmp,
96 struct lov_mds_md *lmm)
101 lsm_size = sizeof(**lsmp);
106 OBD_FREE(*lsmp, lsm_size);
112 OBD_ALLOC(*lsmp, lsm_size);
119 (*lsmp)->lsm_object_id = (lmm->lmm_object_id);
120 LASSERT((*lsmp)->lsm_object_id);
126 static int osc_getattr(struct lustre_handle *conn, struct obdo *oa,
127 struct lov_stripe_md *md)
129 struct ptlrpc_request *request;
130 struct ost_body *body;
131 int rc, size = sizeof(*body);
134 request = ptlrpc_prep_req(class_conn2cliimp(conn), OST_GETATTR, 1,
139 body = lustre_msg_buf(request->rq_reqmsg, 0);
140 #warning FIXME: pack only valid fields instead of memcpy, endianness
141 memcpy(&body->oa, oa, sizeof(*oa));
143 request->rq_replen = lustre_msg_size(1, &size);
145 rc = ptlrpc_queue_wait(request);
147 CERROR("%s failed: rc = %d\n", __FUNCTION__, rc);
151 body = lustre_msg_buf(request->rq_repmsg, 0);
152 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
154 memcpy(oa, &body->oa, sizeof(*oa));
158 ptlrpc_req_finished(request);
162 static int osc_open(struct lustre_handle *conn, struct obdo *oa,
163 struct lov_stripe_md *md)
165 struct ptlrpc_request *request;
166 struct ost_body *body;
167 int rc, size = sizeof(*body);
170 request = ptlrpc_prep_req(class_conn2cliimp(conn), OST_OPEN, 1, &size,
175 body = lustre_msg_buf(request->rq_reqmsg, 0);
176 #warning FIXME: pack only valid fields instead of memcpy, endianness
177 memcpy(&body->oa, oa, sizeof(*oa));
179 request->rq_replen = lustre_msg_size(1, &size);
181 rc = ptlrpc_queue_wait(request);
185 body = lustre_msg_buf(request->rq_repmsg, 0);
186 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
188 memcpy(oa, &body->oa, sizeof(*oa));
192 ptlrpc_req_finished(request);
196 static int osc_close(struct lustre_handle *conn, struct obdo *oa,
197 struct lov_stripe_md *md)
199 struct ptlrpc_request *request;
200 struct ost_body *body;
201 int rc, size = sizeof(*body);
204 request = ptlrpc_prep_req(class_conn2cliimp(conn), OST_CLOSE, 1, &size,
209 body = lustre_msg_buf(request->rq_reqmsg, 0);
210 #warning FIXME: pack only valid fields instead of memcpy, endianness
211 memcpy(&body->oa, oa, sizeof(*oa));
213 request->rq_replen = lustre_msg_size(1, &size);
215 rc = ptlrpc_queue_wait(request);
219 body = lustre_msg_buf(request->rq_repmsg, 0);
220 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
222 memcpy(oa, &body->oa, sizeof(*oa));
226 ptlrpc_req_finished(request);
230 static int osc_setattr(struct lustre_handle *conn, struct obdo *oa,
231 struct lov_stripe_md *md)
233 struct ptlrpc_request *request;
234 struct ost_body *body;
235 int rc, size = sizeof(*body);
238 request = ptlrpc_prep_req(class_conn2cliimp(conn), OST_SETATTR, 1,
243 body = lustre_msg_buf(request->rq_reqmsg, 0);
244 memcpy(&body->oa, oa, sizeof(*oa));
246 request->rq_replen = lustre_msg_size(1, &size);
248 rc = ptlrpc_queue_wait(request);
250 ptlrpc_req_finished(request);
254 static int osc_create(struct lustre_handle *conn, struct obdo *oa,
255 struct lov_stripe_md **ea)
257 struct ptlrpc_request *request;
258 struct ost_body *body;
259 struct lov_stripe_md *lsm;
260 int rc, size = sizeof(*body);
268 rc = obd_alloc_memmd(conn, &lsm);
273 request = ptlrpc_prep_req(class_conn2cliimp(conn), OST_CREATE, 1, &size,
276 GOTO(out, rc = -ENOMEM);
278 body = lustre_msg_buf(request->rq_reqmsg, 0);
279 memcpy(&body->oa, oa, sizeof(*oa));
281 request->rq_replen = lustre_msg_size(1, &size);
283 rc = ptlrpc_queue_wait(request);
287 body = lustre_msg_buf(request->rq_repmsg, 0);
288 memcpy(oa, &body->oa, sizeof(*oa));
290 lsm->lsm_object_id = oa->o_id;
291 lsm->lsm_stripe_count = 0;
295 ptlrpc_req_finished(request);
298 obd_free_memmd(conn, &lsm);
302 static int osc_punch(struct lustre_handle *conn, struct obdo *oa,
303 struct lov_stripe_md *md, obd_size start,
306 struct ptlrpc_request *request;
307 struct ost_body *body;
308 int rc, size = sizeof(*body);
316 request = ptlrpc_prep_req(class_conn2cliimp(conn), OST_PUNCH, 1, &size,
321 body = lustre_msg_buf(request->rq_reqmsg, 0);
322 #warning FIXME: pack only valid fields instead of memcpy, endianness, valid
323 memcpy(&body->oa, oa, sizeof(*oa));
325 /* overload the size and blocks fields in the oa with start/end */
326 body->oa.o_size = HTON__u64(start);
327 body->oa.o_blocks = HTON__u64(end);
328 body->oa.o_valid |= HTON__u32(OBD_MD_FLSIZE | OBD_MD_FLBLOCKS);
330 request->rq_replen = lustre_msg_size(1, &size);
332 rc = ptlrpc_queue_wait(request);
336 body = lustre_msg_buf(request->rq_repmsg, 0);
337 memcpy(oa, &body->oa, sizeof(*oa));
341 ptlrpc_req_finished(request);
345 static int osc_destroy(struct lustre_handle *conn, struct obdo *oa,
346 struct lov_stripe_md *ea)
348 struct ptlrpc_request *request;
349 struct ost_body *body;
350 int rc, size = sizeof(*body);
357 request = ptlrpc_prep_req(class_conn2cliimp(conn), OST_DESTROY, 1,
362 body = lustre_msg_buf(request->rq_reqmsg, 0);
363 #warning FIXME: pack only valid fields instead of memcpy, endianness
364 memcpy(&body->oa, oa, sizeof(*oa));
366 request->rq_replen = lustre_msg_size(1, &size);
368 rc = ptlrpc_queue_wait(request);
372 body = lustre_msg_buf(request->rq_repmsg, 0);
373 memcpy(oa, &body->oa, sizeof(*oa));
377 ptlrpc_req_finished(request);
381 /* Our bulk-unmapping bottom half. */
382 static void unmap_and_decref_bulk_desc(void *data)
384 struct ptlrpc_bulk_desc *desc = data;
385 struct list_head *tmp;
388 /* This feels wrong to me. */
389 list_for_each(tmp, &desc->bd_page_list) {
390 struct ptlrpc_bulk_page *bulk;
391 bulk = list_entry(tmp, struct ptlrpc_bulk_page, bp_link);
393 kunmap(bulk->bp_page);
397 ptlrpc_bulk_decref(desc);
401 /* this is the callback function which is invoked by the Portals
402 * event handler associated with the bulk_sink queue and bulk_source queue.
404 static void osc_ptl_ev_hdlr(struct ptlrpc_bulk_desc *desc)
408 LASSERT(desc->bd_brw_set != NULL);
409 LASSERT(desc->bd_brw_set->brw_callback != NULL);
411 desc->bd_brw_set->brw_callback(desc->bd_brw_set, CB_PHASE_FINISH);
413 /* We can't kunmap the desc from interrupt context, so we do it from
414 * the bottom half above. */
415 prepare_work(&desc->bd_queue, unmap_and_decref_bulk_desc, desc);
416 schedule_work(&desc->bd_queue);
421 static int osc_brw_read(struct lustre_handle *conn, struct lov_stripe_md *lsm,
422 obd_count page_count, struct brw_page *pga,
423 struct obd_brw_set *set)
425 struct obd_import *imp = class_conn2cliimp(conn);
426 struct ptlrpc_connection *connection = imp->imp_connection;
427 struct ptlrpc_request *request = NULL;
428 struct ptlrpc_bulk_desc *desc = NULL;
429 struct ost_body *body;
430 int rc, size[3] = {sizeof(*body)}, mapped = 0;
431 void *iooptr, *nioptr;
435 size[1] = sizeof(struct obd_ioobj);
436 size[2] = page_count * sizeof(struct niobuf_remote);
438 request = ptlrpc_prep_req(imp, OST_READ, 3, size, NULL);
442 body = lustre_msg_buf(request->rq_reqmsg, 0);
444 desc = ptlrpc_prep_bulk(connection);
446 GOTO(out_req, rc = -ENOMEM);
447 desc->bd_portal = OST_BULK_PORTAL;
448 desc->bd_ptl_ev_hdlr = osc_ptl_ev_hdlr;
449 CDEBUG(D_PAGE, "desc = %p\n", desc);
451 iooptr = lustre_msg_buf(request->rq_reqmsg, 1);
452 nioptr = lustre_msg_buf(request->rq_reqmsg, 2);
453 ost_pack_ioo(&iooptr, lsm, page_count);
454 /* end almost identical to brw_write case */
456 spin_lock(&imp->imp_lock);
457 xid = ++imp->imp_last_xid; /* single xid for all pages */
458 spin_unlock(&imp->imp_lock);
460 obd_kmap_get(page_count, 0);
462 for (mapped = 0; mapped < page_count; mapped++) {
463 struct ptlrpc_bulk_page *bulk = ptlrpc_prep_bulk_page(desc);
465 GOTO(out_unmap, rc = -ENOMEM);
467 bulk->bp_xid = xid; /* single xid for all pages */
469 bulk->bp_buf = kmap(pga[mapped].pg);
470 bulk->bp_page = pga[mapped].pg;
471 bulk->bp_buflen = PAGE_SIZE;
472 ost_pack_niobuf(&nioptr, pga[mapped].off, pga[mapped].count,
473 pga[mapped].flag, bulk->bp_xid);
477 * Register the bulk first, because the reply could arrive out of order,
478 * and we want to be ready for the bulk data.
480 * One reference is released when brw_finish is complete, the other when
481 * the caller removes us from the "set" list.
483 * On error, we never do the brw_finish, so we handle all decrefs.
485 if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_READ_BULK)) {
486 CERROR("obd_fail_loc=%x, skipping register_bulk\n",
487 OBD_FAIL_OSC_BRW_READ_BULK);
489 rc = ptlrpc_register_bulk(desc);
492 obd_brw_set_add(set, desc);
495 request->rq_replen = lustre_msg_size(1, size);
496 rc = ptlrpc_queue_wait(request);
499 * XXX: If there is an error during the processing of the callback,
500 * such as a timeout in a sleep that it performs, brw_finish
501 * will never get called, and we'll leak the desc, fail to kunmap
502 * things, cats will live with dogs. One solution would be to
503 * export brw_finish as osc_brw_finish, so that the timeout case
504 * and its kin could call it for proper cleanup. An alternative
505 * would be for an error return from the callback to cause us to
506 * clean up, but that doesn't help the truly async cases (like
507 * LOV), which will immediately return from their PHASE_START
508 * callback, before any such cleanup-requiring error condition can
512 ptlrpc_req_finished(request);
515 /* Clean up on error. */
518 kunmap(pga[mapped].pg);
519 obd_kmap_put(page_count);
520 ptlrpc_bulk_decref(desc);
524 static int osc_brw_write(struct lustre_handle *conn, struct lov_stripe_md *md,
525 obd_count page_count, struct brw_page *pga,
526 struct obd_brw_set *set)
528 struct ptlrpc_connection *connection =
529 client_conn2cli(conn)->cl_import.imp_connection;
530 struct ptlrpc_request *request = NULL;
531 struct ptlrpc_bulk_desc *desc = NULL;
532 struct ost_body *body;
533 struct niobuf_local *local = NULL;
534 struct niobuf_remote *remote;
535 int rc, j, size[3] = {sizeof(*body)}, mapped = 0;
536 void *iooptr, *nioptr;
539 size[1] = sizeof(struct obd_ioobj);
540 size[2] = page_count * sizeof(*remote);
542 request = ptlrpc_prep_req(class_conn2cliimp(conn), OST_WRITE, 3, size,
547 body = lustre_msg_buf(request->rq_reqmsg, 0);
549 desc = ptlrpc_prep_bulk(connection);
551 GOTO(out_req, rc = -ENOMEM);
552 desc->bd_portal = OSC_BULK_PORTAL;
553 desc->bd_ptl_ev_hdlr = osc_ptl_ev_hdlr;
554 CDEBUG(D_PAGE, "desc = %p\n", desc);
556 iooptr = lustre_msg_buf(request->rq_reqmsg, 1);
557 nioptr = lustre_msg_buf(request->rq_reqmsg, 2);
558 ost_pack_ioo(&iooptr, md, page_count);
559 /* end almost identical to brw_read case */
561 OBD_ALLOC(local, page_count * sizeof(*local));
563 GOTO(out_desc, rc = -ENOMEM);
565 obd_kmap_get(page_count, 0);
567 for (mapped = 0; mapped < page_count; mapped++) {
568 local[mapped].addr = kmap(pga[mapped].pg);
570 CDEBUG(D_INFO, "kmap(pg) = %p ; pg->flags = %lx ; pg->count = "
571 "%d ; page %d of %d\n",
572 local[mapped].addr, pga[mapped].pg->flags,
573 page_count(pga[mapped].pg),
574 mapped, page_count - 1);
576 local[mapped].offset = pga[mapped].off;
577 local[mapped].len = pga[mapped].count;
578 ost_pack_niobuf(&nioptr, pga[mapped].off, pga[mapped].count,
579 pga[mapped].flag, 0);
582 size[1] = page_count * sizeof(*remote);
583 request->rq_replen = lustre_msg_size(2, size);
584 rc = ptlrpc_queue_wait(request);
588 nioptr = lustre_msg_buf(request->rq_repmsg, 1);
590 GOTO(out_unmap, rc = -EINVAL);
592 if (request->rq_repmsg->buflens[1] != size[1]) {
593 CERROR("buffer length wrong (%d vs. %d)\n",
594 request->rq_repmsg->buflens[1], size[1]);
595 GOTO(out_unmap, rc = -EINVAL);
598 for (j = 0; j < page_count; j++) {
599 struct ptlrpc_bulk_page *bulk;
601 ost_unpack_niobuf(&nioptr, &remote);
603 bulk = ptlrpc_prep_bulk_page(desc);
605 GOTO(out_unmap, rc = -ENOMEM);
607 bulk->bp_buf = (void *)(unsigned long)local[j].addr;
608 bulk->bp_buflen = local[j].len;
609 bulk->bp_xid = remote->xid;
610 bulk->bp_page = pga[j].pg;
613 if (desc->bd_page_count != page_count)
616 if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_WRITE_BULK))
617 GOTO(out_unmap, rc = 0);
619 OBD_FREE(local, page_count * sizeof(*local));
621 /* One reference is released when brw_finish is complete, the other
622 * when the caller removes it from the "set" list. */
623 obd_brw_set_add(set, desc);
624 rc = ptlrpc_send_bulk(desc);
626 /* XXX: Mike, same question as in osc_brw_read. */
628 ptlrpc_req_finished(request);
631 /* Clean up on error. */
634 kunmap(pga[mapped].pg);
636 obd_kmap_put(page_count);
638 OBD_FREE(local, page_count * sizeof(*local));
640 ptlrpc_bulk_decref(desc);
644 static int osc_brw(int cmd, struct lustre_handle *conn,
645 struct lov_stripe_md *md, obd_count page_count,
646 struct brw_page *pga, struct obd_brw_set *set)
651 obd_count pages_per_brw;
654 if (page_count > PTL_MD_MAX_IOV)
655 pages_per_brw = PTL_MD_MAX_IOV;
657 pages_per_brw = page_count;
659 if (cmd & OBD_BRW_WRITE)
660 rc = osc_brw_write(conn, md, pages_per_brw, pga, set);
662 rc = osc_brw_read(conn, md, pages_per_brw, pga, set);
667 page_count -= pages_per_brw;
668 pga += pages_per_brw;
673 static int osc_enqueue(struct lustre_handle *connh, struct lov_stripe_md *lsm,
674 struct lustre_handle *parent_lock,
675 __u32 type, void *extentp, int extent_len, __u32 mode,
676 int *flags, void *callback, void *data, int datalen,
677 struct lustre_handle *lockh)
679 __u64 res_id[RES_NAME_SIZE] = { lsm->lsm_object_id };
680 struct obd_device *obddev = class_conn2obd(connh);
681 struct ldlm_extent *extent = extentp;
685 /* Filesystem locks are given a bit of special treatment: if
686 * this is not a file size lock (which has end == -1), we
687 * fixup the lock to start and end on page boundaries. */
688 if (extent->end != OBD_OBJECT_EOF) {
689 extent->start &= PAGE_MASK;
690 extent->end = (extent->end & PAGE_MASK) + PAGE_SIZE - 1;
693 /* Next, search for already existing extent locks that will cover us */
694 rc = ldlm_lock_match(obddev->obd_namespace, res_id, type, extent,
695 sizeof(extent), mode, lockh);
697 /* We already have a lock, and it's referenced */
700 /* If we're trying to read, we also search for an existing PW lock. The
701 * VFS and page cache already protect us locally, so lots of readers/
702 * writers can share a single PW lock.
704 * There are problems with conversion deadlocks, so instead of
705 * converting a read lock to a write lock, we'll just enqueue a new
708 * At some point we should cancel the read lock instead of making them
709 * send us a blocking callback, but there are problems with canceling
710 * locks out from other users right now, too. */
712 if (mode == LCK_PR) {
713 rc = ldlm_lock_match(obddev->obd_namespace, res_id, type,
714 extent, sizeof(extent), LCK_PW, lockh);
716 /* FIXME: This is not incredibly elegant, but it might
717 * be more elegant than adding another parameter to
718 * lock_match. I want a second opinion. */
719 ldlm_lock_addref(lockh, LCK_PR);
720 ldlm_lock_decref(lockh, LCK_PW);
726 rc = ldlm_cli_enqueue(connh, NULL, obddev->obd_namespace, parent_lock,
727 res_id, type, extent, sizeof(extent), mode, flags,
728 ldlm_completion_ast, callback, data, datalen,
733 static int osc_cancel(struct lustre_handle *oconn, struct lov_stripe_md *md,
734 __u32 mode, struct lustre_handle *lockh)
738 ldlm_lock_decref(lockh, mode);
743 static int osc_cancel_unused(struct lustre_handle *connh,
744 struct lov_stripe_md *lsm, int flags)
746 struct obd_device *obddev = class_conn2obd(connh);
747 __u64 res_id[RES_NAME_SIZE] = { lsm->lsm_object_id };
749 return ldlm_cli_cancel_unused(obddev->obd_namespace, res_id, flags);
752 static int osc_statfs(struct lustre_handle *conn, struct obd_statfs *osfs)
754 struct ptlrpc_request *request;
755 int rc, size = sizeof(*osfs);
758 request = ptlrpc_prep_req(class_conn2cliimp(conn), OST_STATFS, 0, NULL,
763 request->rq_replen = lustre_msg_size(1, &size);
765 rc = ptlrpc_queue_wait(request);
767 CERROR("%s failed: rc = %d\n", __FUNCTION__, rc);
771 obd_statfs_unpack(osfs, lustre_msg_buf(request->rq_repmsg, 0));
775 ptlrpc_req_finished(request);
779 static int osc_iocontrol(unsigned int cmd, struct lustre_handle *conn, int len,
780 void *karg, void *uarg)
782 struct obd_device *obddev = class_conn2obd(conn);
783 struct obd_ioctl_data *data = karg;
788 case IOC_LDLM_TEST: {
789 err = ldlm_test(obddev, conn);
790 CERROR("-- done err %d\n", err);
793 case IOC_LDLM_REGRESS_START: {
794 unsigned int numthreads = 1;
795 unsigned int numheld = 10;
796 unsigned int numres = 10;
797 unsigned int numext = 10;
800 if (data->ioc_inllen1) {
801 parse = data->ioc_inlbuf1;
802 if (*parse != '\0') {
803 while(isspace(*parse)) parse++;
804 numthreads = simple_strtoul(parse, &parse, 0);
805 while(isspace(*parse)) parse++;
807 if (*parse != '\0') {
808 while(isspace(*parse)) parse++;
809 numheld = simple_strtoul(parse, &parse, 0);
810 while(isspace(*parse)) parse++;
812 if (*parse != '\0') {
813 while(isspace(*parse)) parse++;
814 numres = simple_strtoul(parse, &parse, 0);
815 while(isspace(*parse)) parse++;
817 if (*parse != '\0') {
818 while(isspace(*parse)) parse++;
819 numext = simple_strtoul(parse, &parse, 0);
820 while(isspace(*parse)) parse++;
824 err = ldlm_regression_start(obddev, conn, numthreads,
825 numheld, numres, numext);
827 CERROR("-- done err %d\n", err);
830 case IOC_LDLM_REGRESS_STOP: {
831 err = ldlm_regression_stop();
832 CERROR("-- done err %d\n", err);
835 case IOC_OSC_REGISTER_LOV: {
836 if (obddev->u.cli.cl_containing_lov)
837 GOTO(out, err = -EALREADY);
838 obddev->u.cli.cl_containing_lov = (struct obd_device *)karg;
841 case OBD_IOC_LOV_GET_CONFIG: {
843 struct lov_desc *desc;
848 if (obd_ioctl_getdata(&buf, &len, (void *)uarg))
849 GOTO(out, err = -EINVAL);
851 data = (struct obd_ioctl_data *)buf;
853 if (sizeof(*desc) > data->ioc_inllen1) {
855 GOTO(out, err = -EINVAL);
858 if (data->ioc_inllen2 < sizeof(*uuidp)) {
860 GOTO(out, err = -EINVAL);
863 desc = (struct lov_desc *)data->ioc_inlbuf1;
864 desc->ld_tgt_count = 1;
865 desc->ld_active_tgt_count = 1;
866 desc->ld_default_stripe_count = 1;
867 desc->ld_default_stripe_size = 0;
868 desc->ld_default_stripe_offset = 0;
869 desc->ld_pattern = 0;
870 memcpy(desc->ld_uuid, obddev->obd_uuid, sizeof(*uuidp));
872 uuidp = (obd_uuid_t *)data->ioc_inlbuf2;
873 memcpy(uuidp, obddev->obd_uuid, sizeof(*uuidp));
875 err = copy_to_user((void *)uarg, buf, len);
882 CERROR ("osc_ioctl(): unrecognised ioctl %#lx\n", cmd);
883 GOTO(out, err = -ENOTTY);
889 static void set_osc_active(struct obd_import *imp, int active)
891 struct obd_device *notify_obd = imp->imp_obd->u.cli.cl_containing_lov;
893 if (notify_obd == NULL)
896 /* How gross is _this_? */
897 if (!list_empty(¬ify_obd->obd_exports)) {
899 struct lustre_handle fakeconn;
900 struct obd_ioctl_data ioc_data;
901 struct obd_export *exp =
902 list_entry(notify_obd->obd_exports.next,
903 struct obd_export, exp_obd_chain);
905 fakeconn.addr = (__u64)(unsigned long)exp;
906 fakeconn.cookie = exp->exp_cookie;
907 ioc_data.ioc_inlbuf1 = imp->imp_obd->obd_uuid;
908 ioc_data.ioc_offset = active;
909 rc = obd_iocontrol(IOC_LOV_SET_OSC_ACTIVE, &fakeconn,
910 sizeof ioc_data, &ioc_data, NULL);
912 CERROR("disabling %s on LOV %p/%s: %d\n",
913 imp->imp_obd->obd_uuid, notify_obd,
914 notify_obd->obd_uuid, rc);
916 CDEBUG(D_HA, "No exports for obd %p/%s, can't notify about "
917 "%p\n", notify_obd, notify_obd->obd_uuid,
918 imp->imp_obd->obd_uuid);
923 /* XXX looks a lot like super.c:invalidate_request_list, don't it? */
924 static void abort_inflight_for_import(struct obd_import *imp)
926 struct list_head *tmp, *n;
928 /* Make sure that no new requests get processed for this import.
929 * ptlrpc_queue_wait must (and does) hold imp_lock while testing this
930 * flag and then putting requests on sending_list or delayed_list.
932 spin_lock(&imp->imp_lock);
933 imp->imp_flags |= IMP_INVALID;
934 spin_unlock(&imp->imp_lock);
936 list_for_each_safe(tmp, n, &imp->imp_sending_list) {
937 struct ptlrpc_request *req =
938 list_entry(tmp, struct ptlrpc_request, rq_list);
940 DEBUG_REQ(D_HA, req, "inflight");
941 req->rq_flags |= PTL_RPC_FL_ERR;
942 wake_up(&req->rq_wait_for_rep);
945 list_for_each_safe(tmp, n, &imp->imp_delayed_list) {
946 struct ptlrpc_request *req =
947 list_entry(tmp, struct ptlrpc_request, rq_list);
949 DEBUG_REQ(D_HA, req, "aborting waiting req");
950 req->rq_flags |= PTL_RPC_FL_ERR;
951 wake_up(&req->rq_wait_for_rep);
955 static int osc_recover(struct obd_import *imp, int phase)
961 case PTLRPC_RECOVD_PHASE_PREPARE: {
962 struct ldlm_namespace *ns = imp->imp_obd->obd_namespace;
963 ldlm_namespace_cleanup(ns, 1 /* no network ops */);
964 abort_inflight_for_import(imp);
965 set_osc_active(imp, 0 /* inactive */);
968 case PTLRPC_RECOVD_PHASE_RECOVER:
969 imp->imp_flags &= ~IMP_INVALID;
970 rc = ptlrpc_reconnect_import(imp, OST_CONNECT);
972 imp->imp_flags |= IMP_INVALID;
975 set_osc_active(imp, 1 /* active */);
982 static int osc_connect(struct lustre_handle *conn, struct obd_device *obd,
983 obd_uuid_t cluuid, struct recovd_obd *recovd,
984 ptlrpc_recovery_cb_t recover)
986 struct obd_import *imp = &obd->u.cli.cl_import;
987 imp->imp_recover = osc_recover;
988 return client_obd_connect(conn, obd, cluuid, recovd, recover);
991 struct obd_ops osc_obd_ops = {
992 o_attach: osc_attach,
993 o_detach: osc_detach,
994 o_setup: client_obd_setup,
995 o_cleanup: client_obd_cleanup,
996 o_connect: osc_connect,
997 o_disconnect: client_obd_disconnect,
998 o_statfs: osc_statfs,
999 o_packmd: osc_packmd,
1000 o_unpackmd: osc_unpackmd,
1001 o_create: osc_create,
1002 o_destroy: osc_destroy,
1003 o_getattr: osc_getattr,
1004 o_setattr: osc_setattr,
1009 o_enqueue: osc_enqueue,
1010 o_cancel: osc_cancel,
1011 o_cancel_unused: osc_cancel_unused,
1012 o_iocontrol: osc_iocontrol
1015 static int __init osc_init(void)
1017 RETURN(class_register_type(&osc_obd_ops, status_class_var,
1021 static void __exit osc_exit(void)
1023 class_unregister_type(LUSTRE_OSC_NAME);
1026 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
1027 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC) v1.0");
1028 MODULE_LICENSE("GPL");
1030 module_init(osc_init);
1031 module_exit(osc_exit);