1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
4 * Copyright (C) 2001, 2002 Cluster File Systems, Inc.
5 * Author Peter Braam <braam@clusterfs.com>
7 * This file is part of Lustre, http://www.lustre.org.
9 * Lustre is free software; you can redistribute it and/or
10 * modify it under the terms of version 2 of the GNU General Public
11 * License as published by the Free Software Foundation.
13 * Lustre is distributed in the hope that it will be useful,
14 * but WITHOUT ANY WARRANTY; without even the implied warranty of
15 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 * GNU General Public License for more details.
18 * You should have received a copy of the GNU General Public License
19 * along with Lustre; if not, write to the Free Software
20 * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
22 * For testing and management it is treated as an obd_device,
23 * although * it does not export a full OBD method table (the
24 * requests are coming * in over the wire, so object target modules
25 * do not have a full * method table.)
30 #define DEBUG_SUBSYSTEM S_OSC
32 #include <linux/version.h>
33 #include <linux/module.h>
35 #include <linux/highmem.h>
36 #include <linux/lustre_dlm.h>
37 #if (LINUX_VERSION_CODE >= KERNEL_VERSION(2,5,0))
38 #include <linux/workqueue.h>
40 #include <linux/kp30.h>
41 #include <linux/lustre_mds.h> /* for mds_objid */
42 #include <linux/obd_ost.h>
43 #include <linux/obd_lov.h> /* for IOC_LOV_SET_OSC_ACTIVE */
44 #include <linux/ctype.h>
45 #include <linux/init.h>
46 #include <linux/lustre_ha.h>
47 #include <linux/obd_support.h> /* for OBD_FAIL_CHECK */
48 #include <linux/lustre_lite.h> /* for ll_i2info */
49 #include <portals/lib-types.h> /* for PTL_MD_MAX_IOV */
50 #include <linux/lprocfs_status.h>
52 extern struct lprocfs_vars status_var_nm_1[];
53 extern struct lprocfs_vars status_class_var[];
55 static int osc_attach(struct obd_device *dev, obd_count len, void *data)
57 return lprocfs_reg_obd(dev, status_var_nm_1, dev);
60 static int osc_detach(struct obd_device *dev)
62 return lprocfs_dereg_obd(dev);
65 /* Pack OSC object metadata for shipment to the MDS. */
66 static int osc_packmd(struct lustre_handle *conn, struct lov_mds_md **lmmp,
67 struct lov_stripe_md *lsm)
71 lmm_size = sizeof(**lmmp);
76 OBD_FREE(*lmmp, lmm_size);
82 OBD_ALLOC(*lmmp, lmm_size);
87 LASSERT(lsm->lsm_object_id);
88 (*lmmp)->lmm_object_id = (lsm->lsm_object_id);
94 static int osc_unpackmd(struct lustre_handle *conn, struct lov_stripe_md **lsmp,
95 struct lov_mds_md *lmm)
99 lsm_size = sizeof(**lsmp);
104 OBD_FREE(*lsmp, lsm_size);
110 OBD_ALLOC(*lsmp, lsm_size);
117 (*lsmp)->lsm_object_id = (lmm->lmm_object_id);
118 LASSERT((*lsmp)->lsm_object_id);
124 static int osc_getattr(struct lustre_handle *conn, struct obdo *oa,
125 struct lov_stripe_md *md)
127 struct ptlrpc_request *request;
128 struct ost_body *body;
129 int rc, size = sizeof(*body);
132 request = ptlrpc_prep_req(class_conn2cliimp(conn), OST_GETATTR, 1,
137 body = lustre_msg_buf(request->rq_reqmsg, 0);
138 #warning FIXME: pack only valid fields instead of memcpy, endianness
139 memcpy(&body->oa, oa, sizeof(*oa));
141 request->rq_replen = lustre_msg_size(1, &size);
143 rc = ptlrpc_queue_wait(request);
145 CERROR("%s failed: rc = %d\n", __FUNCTION__, rc);
149 body = lustre_msg_buf(request->rq_repmsg, 0);
150 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
152 memcpy(oa, &body->oa, sizeof(*oa));
156 ptlrpc_req_finished(request);
160 static int osc_open(struct lustre_handle *conn, struct obdo *oa,
161 struct lov_stripe_md *md)
163 struct ptlrpc_request *request;
164 struct ost_body *body;
165 int rc, size = sizeof(*body);
168 request = ptlrpc_prep_req(class_conn2cliimp(conn), OST_OPEN, 1, &size,
173 body = lustre_msg_buf(request->rq_reqmsg, 0);
174 #warning FIXME: pack only valid fields instead of memcpy, endianness
175 memcpy(&body->oa, oa, sizeof(*oa));
177 request->rq_replen = lustre_msg_size(1, &size);
179 rc = ptlrpc_queue_wait(request);
183 body = lustre_msg_buf(request->rq_repmsg, 0);
184 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
186 memcpy(oa, &body->oa, sizeof(*oa));
190 ptlrpc_req_finished(request);
194 static int osc_close(struct lustre_handle *conn, struct obdo *oa,
195 struct lov_stripe_md *md)
197 struct ptlrpc_request *request;
198 struct ost_body *body;
199 int rc, size = sizeof(*body);
202 request = ptlrpc_prep_req(class_conn2cliimp(conn), OST_CLOSE, 1, &size,
207 body = lustre_msg_buf(request->rq_reqmsg, 0);
208 #warning FIXME: pack only valid fields instead of memcpy, endianness
209 memcpy(&body->oa, oa, sizeof(*oa));
211 request->rq_replen = lustre_msg_size(1, &size);
213 rc = ptlrpc_queue_wait(request);
217 body = lustre_msg_buf(request->rq_repmsg, 0);
218 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
220 memcpy(oa, &body->oa, sizeof(*oa));
224 ptlrpc_req_finished(request);
228 static int osc_setattr(struct lustre_handle *conn, struct obdo *oa,
229 struct lov_stripe_md *md)
231 struct ptlrpc_request *request;
232 struct ost_body *body;
233 int rc, size = sizeof(*body);
236 request = ptlrpc_prep_req(class_conn2cliimp(conn), OST_SETATTR, 1,
241 body = lustre_msg_buf(request->rq_reqmsg, 0);
242 memcpy(&body->oa, oa, sizeof(*oa));
244 request->rq_replen = lustre_msg_size(1, &size);
246 rc = ptlrpc_queue_wait(request);
248 ptlrpc_req_finished(request);
252 static int osc_create(struct lustre_handle *conn, struct obdo *oa,
253 struct lov_stripe_md **ea)
255 struct ptlrpc_request *request;
256 struct ost_body *body;
257 struct lov_stripe_md *lsm;
258 int rc, size = sizeof(*body);
266 rc = obd_alloc_memmd(conn, &lsm);
271 request = ptlrpc_prep_req(class_conn2cliimp(conn), OST_CREATE, 1, &size,
274 GOTO(out, rc = -ENOMEM);
276 body = lustre_msg_buf(request->rq_reqmsg, 0);
277 memcpy(&body->oa, oa, sizeof(*oa));
279 request->rq_replen = lustre_msg_size(1, &size);
281 rc = ptlrpc_queue_wait(request);
285 body = lustre_msg_buf(request->rq_repmsg, 0);
286 memcpy(oa, &body->oa, sizeof(*oa));
288 lsm->lsm_object_id = oa->o_id;
289 lsm->lsm_stripe_count = 0;
293 ptlrpc_req_finished(request);
296 obd_free_memmd(conn, &lsm);
300 static int osc_punch(struct lustre_handle *conn, struct obdo *oa,
301 struct lov_stripe_md *md, obd_size start,
304 struct ptlrpc_request *request;
305 struct ost_body *body;
306 int rc, size = sizeof(*body);
314 request = ptlrpc_prep_req(class_conn2cliimp(conn), OST_PUNCH, 1, &size,
319 body = lustre_msg_buf(request->rq_reqmsg, 0);
320 #warning FIXME: pack only valid fields instead of memcpy, endianness, valid
321 memcpy(&body->oa, oa, sizeof(*oa));
323 /* overload the size and blocks fields in the oa with start/end */
324 body->oa.o_size = HTON__u64(start);
325 body->oa.o_blocks = HTON__u64(end);
326 body->oa.o_valid |= HTON__u32(OBD_MD_FLSIZE | OBD_MD_FLBLOCKS);
328 request->rq_replen = lustre_msg_size(1, &size);
330 rc = ptlrpc_queue_wait(request);
334 body = lustre_msg_buf(request->rq_repmsg, 0);
335 memcpy(oa, &body->oa, sizeof(*oa));
339 ptlrpc_req_finished(request);
343 static int osc_destroy(struct lustre_handle *conn, struct obdo *oa,
344 struct lov_stripe_md *ea)
346 struct ptlrpc_request *request;
347 struct ost_body *body;
348 int rc, size = sizeof(*body);
355 request = ptlrpc_prep_req(class_conn2cliimp(conn), OST_DESTROY, 1,
360 body = lustre_msg_buf(request->rq_reqmsg, 0);
361 #warning FIXME: pack only valid fields instead of memcpy, endianness
362 memcpy(&body->oa, oa, sizeof(*oa));
364 request->rq_replen = lustre_msg_size(1, &size);
366 rc = ptlrpc_queue_wait(request);
370 body = lustre_msg_buf(request->rq_repmsg, 0);
371 memcpy(oa, &body->oa, sizeof(*oa));
375 ptlrpc_req_finished(request);
379 /* Our bulk-unmapping bottom half. */
380 static void unmap_and_decref_bulk_desc(void *data)
382 struct ptlrpc_bulk_desc *desc = data;
383 struct list_head *tmp;
386 /* This feels wrong to me. */
387 list_for_each(tmp, &desc->bd_page_list) {
388 struct ptlrpc_bulk_page *bulk;
389 bulk = list_entry(tmp, struct ptlrpc_bulk_page, bp_link);
391 kunmap(bulk->bp_page);
395 ptlrpc_bulk_decref(desc);
399 /* this is the callback function which is invoked by the Portals
400 * event handler associated with the bulk_sink queue and bulk_source queue.
402 static void osc_ptl_ev_hdlr(struct ptlrpc_bulk_desc *desc)
406 LASSERT(desc->bd_brw_set != NULL);
407 LASSERT(desc->bd_brw_set->brw_callback != NULL);
409 desc->bd_brw_set->brw_callback(desc->bd_brw_set, CB_PHASE_FINISH);
411 /* We can't kunmap the desc from interrupt context, so we do it from
412 * the bottom half above. */
413 prepare_work(&desc->bd_queue, unmap_and_decref_bulk_desc, desc);
414 schedule_work(&desc->bd_queue);
419 static int osc_brw_read(struct lustre_handle *conn, struct lov_stripe_md *lsm,
420 obd_count page_count, struct brw_page *pga,
421 struct obd_brw_set *set)
423 struct obd_import *imp = class_conn2cliimp(conn);
424 struct ptlrpc_connection *connection = imp->imp_connection;
425 struct ptlrpc_request *request = NULL;
426 struct ptlrpc_bulk_desc *desc = NULL;
427 struct ost_body *body;
428 int rc, size[3] = {sizeof(*body)}, mapped = 0;
429 void *iooptr, *nioptr;
433 size[1] = sizeof(struct obd_ioobj);
434 size[2] = page_count * sizeof(struct niobuf_remote);
436 request = ptlrpc_prep_req(imp, OST_READ, 3, size, NULL);
440 body = lustre_msg_buf(request->rq_reqmsg, 0);
442 desc = ptlrpc_prep_bulk(connection);
444 GOTO(out_req, rc = -ENOMEM);
445 desc->bd_portal = OST_BULK_PORTAL;
446 desc->bd_ptl_ev_hdlr = osc_ptl_ev_hdlr;
447 CDEBUG(D_PAGE, "desc = %p\n", desc);
449 iooptr = lustre_msg_buf(request->rq_reqmsg, 1);
450 nioptr = lustre_msg_buf(request->rq_reqmsg, 2);
451 ost_pack_ioo(&iooptr, lsm, page_count);
452 /* end almost identical to brw_write case */
454 spin_lock(&imp->imp_lock);
455 xid = ++imp->imp_last_xid; /* single xid for all pages */
456 spin_unlock(&imp->imp_lock);
458 obd_kmap_get(page_count, 0);
460 for (mapped = 0; mapped < page_count; mapped++) {
461 struct ptlrpc_bulk_page *bulk = ptlrpc_prep_bulk_page(desc);
463 GOTO(out_unmap, rc = -ENOMEM);
465 bulk->bp_xid = xid; /* single xid for all pages */
467 bulk->bp_buf = kmap(pga[mapped].pg);
468 bulk->bp_page = pga[mapped].pg;
469 bulk->bp_buflen = PAGE_SIZE;
470 ost_pack_niobuf(&nioptr, pga[mapped].off, pga[mapped].count,
471 pga[mapped].flag, bulk->bp_xid);
475 * Register the bulk first, because the reply could arrive out of order,
476 * and we want to be ready for the bulk data.
478 * One reference is released when brw_finish is complete, the other when
479 * the caller removes us from the "set" list.
481 * On error, we never do the brw_finish, so we handle all decrefs.
483 if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_READ_BULK)) {
484 CERROR("obd_fail_loc=%x, skipping register_bulk\n",
485 OBD_FAIL_OSC_BRW_READ_BULK);
487 rc = ptlrpc_register_bulk(desc);
490 obd_brw_set_add(set, desc);
493 request->rq_replen = lustre_msg_size(1, size);
494 rc = ptlrpc_queue_wait(request);
497 * XXX: If there is an error during the processing of the callback,
498 * such as a timeout in a sleep that it performs, brw_finish
499 * will never get called, and we'll leak the desc, fail to kunmap
500 * things, cats will live with dogs. One solution would be to
501 * export brw_finish as osc_brw_finish, so that the timeout case
502 * and its kin could call it for proper cleanup. An alternative
503 * would be for an error return from the callback to cause us to
504 * clean up, but that doesn't help the truly async cases (like
505 * LOV), which will immediately return from their PHASE_START
506 * callback, before any such cleanup-requiring error condition can
510 ptlrpc_req_finished(request);
513 /* Clean up on error. */
516 kunmap(pga[mapped].pg);
517 obd_kmap_put(page_count);
518 ptlrpc_bulk_decref(desc);
522 static int osc_brw_write(struct lustre_handle *conn, struct lov_stripe_md *md,
523 obd_count page_count, struct brw_page *pga,
524 struct obd_brw_set *set)
526 struct ptlrpc_connection *connection =
527 client_conn2cli(conn)->cl_import.imp_connection;
528 struct ptlrpc_request *request = NULL;
529 struct ptlrpc_bulk_desc *desc = NULL;
530 struct ost_body *body;
531 struct niobuf_local *local = NULL;
532 struct niobuf_remote *remote;
533 int rc, j, size[3] = {sizeof(*body)}, mapped = 0;
534 void *iooptr, *nioptr;
537 size[1] = sizeof(struct obd_ioobj);
538 size[2] = page_count * sizeof(*remote);
540 request = ptlrpc_prep_req(class_conn2cliimp(conn), OST_WRITE, 3, size,
545 body = lustre_msg_buf(request->rq_reqmsg, 0);
547 desc = ptlrpc_prep_bulk(connection);
549 GOTO(out_req, rc = -ENOMEM);
550 desc->bd_portal = OSC_BULK_PORTAL;
551 desc->bd_ptl_ev_hdlr = osc_ptl_ev_hdlr;
552 CDEBUG(D_PAGE, "desc = %p\n", desc);
554 iooptr = lustre_msg_buf(request->rq_reqmsg, 1);
555 nioptr = lustre_msg_buf(request->rq_reqmsg, 2);
556 ost_pack_ioo(&iooptr, md, page_count);
557 /* end almost identical to brw_read case */
559 OBD_ALLOC(local, page_count * sizeof(*local));
561 GOTO(out_desc, rc = -ENOMEM);
563 obd_kmap_get(page_count, 0);
565 for (mapped = 0; mapped < page_count; mapped++) {
566 local[mapped].addr = kmap(pga[mapped].pg);
568 CDEBUG(D_INFO, "kmap(pg) = %p ; pg->flags = %lx ; pg->count = "
569 "%d ; page %d of %d\n",
570 local[mapped].addr, pga[mapped].pg->flags,
571 page_count(pga[mapped].pg),
572 mapped, page_count - 1);
574 local[mapped].offset = pga[mapped].off;
575 local[mapped].len = pga[mapped].count;
576 ost_pack_niobuf(&nioptr, pga[mapped].off, pga[mapped].count,
577 pga[mapped].flag, 0);
580 size[1] = page_count * sizeof(*remote);
581 request->rq_replen = lustre_msg_size(2, size);
582 rc = ptlrpc_queue_wait(request);
586 nioptr = lustre_msg_buf(request->rq_repmsg, 1);
588 GOTO(out_unmap, rc = -EINVAL);
590 if (request->rq_repmsg->buflens[1] != size[1]) {
591 CERROR("buffer length wrong (%d vs. %d)\n",
592 request->rq_repmsg->buflens[1], size[1]);
593 GOTO(out_unmap, rc = -EINVAL);
596 for (j = 0; j < page_count; j++) {
597 struct ptlrpc_bulk_page *bulk;
599 ost_unpack_niobuf(&nioptr, &remote);
601 bulk = ptlrpc_prep_bulk_page(desc);
603 GOTO(out_unmap, rc = -ENOMEM);
605 bulk->bp_buf = (void *)(unsigned long)local[j].addr;
606 bulk->bp_buflen = local[j].len;
607 bulk->bp_xid = remote->xid;
608 bulk->bp_page = pga[j].pg;
611 if (desc->bd_page_count != page_count)
614 if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_WRITE_BULK))
615 GOTO(out_unmap, rc = 0);
617 OBD_FREE(local, page_count * sizeof(*local));
619 /* One reference is released when brw_finish is complete, the other
620 * when the caller removes it from the "set" list. */
621 obd_brw_set_add(set, desc);
622 rc = ptlrpc_send_bulk(desc);
624 /* XXX: Mike, same question as in osc_brw_read. */
626 ptlrpc_req_finished(request);
629 /* Clean up on error. */
632 kunmap(pga[mapped].pg);
634 obd_kmap_put(page_count);
636 OBD_FREE(local, page_count * sizeof(*local));
638 ptlrpc_bulk_decref(desc);
642 static int osc_brw(int cmd, struct lustre_handle *conn,
643 struct lov_stripe_md *md, obd_count page_count,
644 struct brw_page *pga, struct obd_brw_set *set)
649 obd_count pages_per_brw;
652 if (page_count > PTL_MD_MAX_IOV)
653 pages_per_brw = PTL_MD_MAX_IOV;
655 pages_per_brw = page_count;
657 if (cmd & OBD_BRW_WRITE)
658 rc = osc_brw_write(conn, md, pages_per_brw, pga, set);
660 rc = osc_brw_read(conn, md, pages_per_brw, pga, set);
665 page_count -= pages_per_brw;
666 pga += pages_per_brw;
671 static int osc_enqueue(struct lustre_handle *connh, struct lov_stripe_md *lsm,
672 struct lustre_handle *parent_lock,
673 __u32 type, void *extentp, int extent_len, __u32 mode,
674 int *flags, void *callback, void *data, int datalen,
675 struct lustre_handle *lockh)
677 __u64 res_id[RES_NAME_SIZE] = { lsm->lsm_object_id };
678 struct obd_device *obddev = class_conn2obd(connh);
679 struct ldlm_extent *extent = extentp;
683 /* Filesystem locks are given a bit of special treatment: if
684 * this is not a file size lock (which has end == -1), we
685 * fixup the lock to start and end on page boundaries. */
686 if (extent->end != OBD_OBJECT_EOF) {
687 extent->start &= PAGE_MASK;
688 extent->end = (extent->end & PAGE_MASK) + PAGE_SIZE - 1;
691 /* Next, search for already existing extent locks that will cover us */
692 rc = ldlm_lock_match(obddev->obd_namespace, res_id, type, extent,
693 sizeof(extent), mode, lockh);
695 /* We already have a lock, and it's referenced */
698 /* If we're trying to read, we also search for an existing PW lock. The
699 * VFS and page cache already protect us locally, so lots of readers/
700 * writers can share a single PW lock.
702 * There are problems with conversion deadlocks, so instead of
703 * converting a read lock to a write lock, we'll just enqueue a new
706 * At some point we should cancel the read lock instead of making them
707 * send us a blocking callback, but there are problems with canceling
708 * locks out from other users right now, too. */
710 if (mode == LCK_PR) {
711 rc = ldlm_lock_match(obddev->obd_namespace, res_id, type,
712 extent, sizeof(extent), LCK_PW, lockh);
714 /* FIXME: This is not incredibly elegant, but it might
715 * be more elegant than adding another parameter to
716 * lock_match. I want a second opinion. */
717 ldlm_lock_addref(lockh, LCK_PR);
718 ldlm_lock_decref(lockh, LCK_PW);
724 rc = ldlm_cli_enqueue(connh, NULL, obddev->obd_namespace, parent_lock,
725 res_id, type, extent, sizeof(extent), mode, flags,
726 ldlm_completion_ast, callback, data, datalen,
731 static int osc_cancel(struct lustre_handle *oconn, struct lov_stripe_md *md,
732 __u32 mode, struct lustre_handle *lockh)
736 ldlm_lock_decref(lockh, mode);
741 static int osc_cancel_unused(struct lustre_handle *connh,
742 struct lov_stripe_md *lsm, int flags)
744 struct obd_device *obddev = class_conn2obd(connh);
745 __u64 res_id[RES_NAME_SIZE] = { lsm->lsm_object_id };
747 return ldlm_cli_cancel_unused(obddev->obd_namespace, res_id, flags);
750 static int osc_statfs(struct lustre_handle *conn, struct obd_statfs *osfs)
752 struct ptlrpc_request *request;
753 int rc, size = sizeof(*osfs);
756 request = ptlrpc_prep_req(class_conn2cliimp(conn), OST_STATFS, 0, NULL,
761 request->rq_replen = lustre_msg_size(1, &size);
763 rc = ptlrpc_queue_wait(request);
765 CERROR("%s failed: rc = %d\n", __FUNCTION__, rc);
769 obd_statfs_unpack(osfs, lustre_msg_buf(request->rq_repmsg, 0));
773 ptlrpc_req_finished(request);
777 static int osc_iocontrol(unsigned int cmd, struct lustre_handle *conn, int len,
778 void *karg, void *uarg)
780 struct obd_device *obddev = class_conn2obd(conn);
781 struct obd_ioctl_data *data = karg;
786 case IOC_LDLM_TEST: {
787 err = ldlm_test(obddev, conn);
788 CERROR("-- done err %d\n", err);
791 case IOC_LDLM_REGRESS_START: {
792 unsigned int numthreads = 1;
793 unsigned int numheld = 10;
794 unsigned int numres = 10;
795 unsigned int numext = 10;
798 if (data->ioc_inllen1) {
799 parse = data->ioc_inlbuf1;
800 if (*parse != '\0') {
801 while(isspace(*parse)) parse++;
802 numthreads = simple_strtoul(parse, &parse, 0);
803 while(isspace(*parse)) parse++;
805 if (*parse != '\0') {
806 while(isspace(*parse)) parse++;
807 numheld = simple_strtoul(parse, &parse, 0);
808 while(isspace(*parse)) parse++;
810 if (*parse != '\0') {
811 while(isspace(*parse)) parse++;
812 numres = simple_strtoul(parse, &parse, 0);
813 while(isspace(*parse)) parse++;
815 if (*parse != '\0') {
816 while(isspace(*parse)) parse++;
817 numext = simple_strtoul(parse, &parse, 0);
818 while(isspace(*parse)) parse++;
822 err = ldlm_regression_start(obddev, conn, numthreads,
823 numheld, numres, numext);
825 CERROR("-- done err %d\n", err);
828 case IOC_LDLM_REGRESS_STOP: {
829 err = ldlm_regression_stop();
830 CERROR("-- done err %d\n", err);
833 case IOC_OSC_REGISTER_LOV: {
834 if (obddev->u.cli.cl_containing_lov)
835 GOTO(out, err = -EALREADY);
836 obddev->u.cli.cl_containing_lov = (struct obd_device *)karg;
839 case OBD_IOC_LOV_GET_CONFIG: {
841 struct lov_desc *desc;
846 if (obd_ioctl_getdata(&buf, &len, (void *)uarg))
847 GOTO(out, err = -EINVAL);
849 data = (struct obd_ioctl_data *)buf;
851 if (sizeof(*desc) > data->ioc_inllen1) {
853 GOTO(out, err = -EINVAL);
856 if (data->ioc_inllen2 < sizeof(*uuidp)) {
858 GOTO(out, err = -EINVAL);
861 desc = (struct lov_desc *)data->ioc_inlbuf1;
862 desc->ld_tgt_count = 1;
863 desc->ld_active_tgt_count = 1;
864 desc->ld_default_stripe_count = 1;
865 desc->ld_default_stripe_size = 0;
866 desc->ld_default_stripe_offset = 0;
867 desc->ld_pattern = 0;
868 memcpy(desc->ld_uuid, obddev->obd_uuid, sizeof(*uuidp));
870 uuidp = (obd_uuid_t *)data->ioc_inlbuf2;
871 memcpy(uuidp, obddev->obd_uuid, sizeof(*uuidp));
873 err = copy_to_user((void *)uarg, buf, len);
880 CERROR ("osc_ioctl(): unrecognised ioctl %#lx\n", cmd);
881 GOTO(out, err = -ENOTTY);
887 static void set_osc_active(struct obd_import *imp, int active)
889 struct obd_device *notify_obd = imp->imp_obd->u.cli.cl_containing_lov;
891 if (notify_obd == NULL)
894 /* How gross is _this_? */
895 if (!list_empty(¬ify_obd->obd_exports)) {
897 struct lustre_handle fakeconn;
898 struct obd_ioctl_data ioc_data;
899 struct obd_export *exp =
900 list_entry(notify_obd->obd_exports.next,
901 struct obd_export, exp_obd_chain);
903 fakeconn.addr = (__u64)(unsigned long)exp;
904 fakeconn.cookie = exp->exp_cookie;
905 ioc_data.ioc_inlbuf1 = imp->imp_obd->obd_uuid;
906 ioc_data.ioc_offset = active;
907 rc = obd_iocontrol(IOC_LOV_SET_OSC_ACTIVE, &fakeconn,
908 sizeof ioc_data, &ioc_data, NULL);
910 CERROR("disabling %s on LOV %p/%s: %d\n",
911 imp->imp_obd->obd_uuid, notify_obd,
912 notify_obd->obd_uuid, rc);
914 CDEBUG(D_HA, "No exports for obd %p/%s, can't notify about "
915 "%p\n", notify_obd, notify_obd->obd_uuid,
916 imp->imp_obd->obd_uuid);
921 /* XXX looks a lot like super.c:invalidate_request_list, don't it? */
922 static void abort_inflight_for_import(struct obd_import *imp)
924 struct list_head *tmp, *n;
926 /* Make sure that no new requests get processed for this import.
927 * ptlrpc_queue_wait must (and does) hold imp_lock while testing this
928 * flag and then putting requests on sending_list or delayed_list.
930 spin_lock(&imp->imp_lock);
931 imp->imp_flags |= IMP_INVALID;
932 spin_unlock(&imp->imp_lock);
934 list_for_each_safe(tmp, n, &imp->imp_sending_list) {
935 struct ptlrpc_request *req =
936 list_entry(tmp, struct ptlrpc_request, rq_list);
938 DEBUG_REQ(D_HA, req, "inflight");
939 req->rq_flags |= PTL_RPC_FL_ERR;
940 wake_up(&req->rq_wait_for_rep);
943 list_for_each_safe(tmp, n, &imp->imp_delayed_list) {
944 struct ptlrpc_request *req =
945 list_entry(tmp, struct ptlrpc_request, rq_list);
947 DEBUG_REQ(D_HA, req, "aborting waiting req");
948 req->rq_flags |= PTL_RPC_FL_ERR;
949 wake_up(&req->rq_wait_for_rep);
953 static int osc_recover(struct obd_import *imp, int phase)
959 case PTLRPC_RECOVD_PHASE_PREPARE: {
960 struct ldlm_namespace *ns = imp->imp_obd->obd_namespace;
961 ldlm_namespace_cleanup(ns, 1 /* no network ops */);
962 abort_inflight_for_import(imp);
963 set_osc_active(imp, 0 /* inactive */);
966 case PTLRPC_RECOVD_PHASE_RECOVER:
967 imp->imp_flags &= ~IMP_INVALID;
968 rc = ptlrpc_reconnect_import(imp, OST_CONNECT);
970 imp->imp_flags |= IMP_INVALID;
973 set_osc_active(imp, 1 /* active */);
980 static int osc_connect(struct lustre_handle *conn, struct obd_device *obd,
981 obd_uuid_t cluuid, struct recovd_obd *recovd,
982 ptlrpc_recovery_cb_t recover)
984 struct obd_import *imp = &obd->u.cli.cl_import;
985 imp->imp_recover = osc_recover;
986 return client_obd_connect(conn, obd, cluuid, recovd, recover);
989 struct obd_ops osc_obd_ops = {
990 o_attach: osc_attach,
991 o_detach: osc_detach,
992 o_setup: client_obd_setup,
993 o_cleanup: client_obd_cleanup,
994 o_connect: osc_connect,
995 o_disconnect: client_obd_disconnect,
996 o_statfs: osc_statfs,
997 o_packmd: osc_packmd,
998 o_unpackmd: osc_unpackmd,
999 o_create: osc_create,
1000 o_destroy: osc_destroy,
1001 o_getattr: osc_getattr,
1002 o_setattr: osc_setattr,
1007 o_enqueue: osc_enqueue,
1008 o_cancel: osc_cancel,
1009 o_cancel_unused: osc_cancel_unused,
1010 o_iocontrol: osc_iocontrol
1013 static int __init osc_init(void)
1015 RETURN(class_register_type(&osc_obd_ops, status_class_var,
1019 static void __exit osc_exit(void)
1021 class_unregister_type(LUSTRE_OSC_NAME);
1024 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
1025 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC) v1.0");
1026 MODULE_LICENSE("GPL");
1028 module_init(osc_init);
1029 module_exit(osc_exit);