1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
4 * Copyright (C) 2001-2003 Cluster File Systems, Inc.
5 * Author Peter Braam <braam@clusterfs.com>
7 * This file is part of Lustre, http://www.lustre.org.
9 * Lustre is free software; you can redistribute it and/or
10 * modify it under the terms of version 2 of the GNU General Public
11 * License as published by the Free Software Foundation.
13 * Lustre is distributed in the hope that it will be useful,
14 * but WITHOUT ANY WARRANTY; without even the implied warranty of
15 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 * GNU General Public License for more details.
18 * You should have received a copy of the GNU General Public License
19 * along with Lustre; if not, write to the Free Software
20 * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
22 * For testing and management it is treated as an obd_device,
23 * although * it does not export a full OBD method table (the
24 * requests are coming * in over the wire, so object target modules
25 * do not have a full * method table.)
30 #define DEBUG_SUBSYSTEM S_OSC
32 #include <linux/version.h>
33 #include <linux/module.h>
35 #include <linux/highmem.h>
36 #include <linux/lustre_dlm.h>
37 #if (LINUX_VERSION_CODE >= KERNEL_VERSION(2,5,0))
38 #include <linux/workqueue.h>
40 #include <linux/kp30.h>
41 #include <linux/lustre_mds.h> /* for mds_objid */
42 #include <linux/obd_ost.h>
43 #include <linux/obd_lov.h> /* for IOC_LOV_SET_OSC_ACTIVE */
44 #include <linux/ctype.h>
45 #include <linux/init.h>
46 #include <linux/lustre_ha.h>
47 #include <linux/obd_support.h> /* for OBD_FAIL_CHECK */
48 #include <linux/lustre_lite.h> /* for ll_i2info */
49 #include <portals/lib-types.h> /* for PTL_MD_MAX_IOV */
50 #include <linux/lprocfs_status.h>
52 static int osc_attach(struct obd_device *dev, obd_count len, void *data)
54 struct lprocfs_static_vars lvars;
56 lprocfs_init_vars(&lvars);
57 return lprocfs_obd_attach(dev, lvars.obd_vars);
60 static int osc_detach(struct obd_device *dev)
62 return lprocfs_obd_detach(dev);
65 /* Pack OSC object metadata for shipment to the MDS. */
66 static int osc_packmd(struct lustre_handle *conn, struct lov_mds_md **lmmp,
67 struct lov_stripe_md *lsm)
72 lmm_size = sizeof(**lmmp);
77 OBD_FREE(*lmmp, lmm_size);
83 OBD_ALLOC(*lmmp, lmm_size);
88 LASSERT(lsm->lsm_object_id);
89 (*lmmp)->lmm_object_id = (lsm->lsm_object_id);
95 static int osc_unpackmd(struct lustre_handle *conn, struct lov_stripe_md **lsmp,
96 struct lov_mds_md *lmm)
101 lsm_size = sizeof(**lsmp);
106 OBD_FREE(*lsmp, lsm_size);
112 OBD_ALLOC(*lsmp, lsm_size);
119 (*lsmp)->lsm_object_id = (lmm->lmm_object_id);
120 LASSERT((*lsmp)->lsm_object_id);
126 inline void oti_from_request(struct obd_trans_info *oti, struct ptlrpc_request *req)
128 if (oti && req->rq_repmsg)
129 oti->oti_transno = NTOH__u64(req->rq_repmsg->transno);
133 static int osc_getattr(struct lustre_handle *conn, struct obdo *oa,
134 struct lov_stripe_md *md)
136 struct ptlrpc_request *request;
137 struct ost_body *body;
138 int rc, size = sizeof(*body);
141 request = ptlrpc_prep_req(class_conn2cliimp(conn), OST_GETATTR, 1,
146 body = lustre_msg_buf(request->rq_reqmsg, 0);
147 #warning FIXME: pack only valid fields instead of memcpy, endianness
148 memcpy(&body->oa, oa, sizeof(*oa));
150 request->rq_replen = lustre_msg_size(1, &size);
152 rc = ptlrpc_queue_wait(request);
154 CERROR("%s failed: rc = %d\n", __FUNCTION__, rc);
158 body = lustre_msg_buf(request->rq_repmsg, 0);
159 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
160 memcpy(oa, &body->oa, sizeof(*oa));
164 ptlrpc_req_finished(request);
168 static int osc_open(struct lustre_handle *conn, struct obdo *oa,
169 struct lov_stripe_md *md, struct obd_trans_info *oti)
171 struct ptlrpc_request *request;
172 struct ost_body *body;
173 int rc, size = sizeof(*body);
176 request = ptlrpc_prep_req(class_conn2cliimp(conn), OST_OPEN, 1, &size,
181 #warning FIXME: request->rq_flags |= PTL_RPC_FL_REPLAY;
182 body = lustre_msg_buf(request->rq_reqmsg, 0);
183 #warning FIXME: pack only valid fields instead of memcpy, endianness
184 memcpy(&body->oa, oa, sizeof(*oa));
186 request->rq_replen = lustre_msg_size(1, &size);
188 rc = ptlrpc_queue_wait(request);
192 body = lustre_msg_buf(request->rq_repmsg, 0);
193 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
195 memcpy(oa, &body->oa, sizeof(*oa));
199 ptlrpc_req_finished(request);
203 static int osc_close(struct lustre_handle *conn, struct obdo *oa,
204 struct lov_stripe_md *md, struct obd_trans_info *oti)
206 struct ptlrpc_request *request;
207 struct ost_body *body;
208 int rc, size = sizeof(*body);
211 request = ptlrpc_prep_req(class_conn2cliimp(conn), OST_CLOSE, 1, &size,
216 body = lustre_msg_buf(request->rq_reqmsg, 0);
217 #warning FIXME: pack only valid fields instead of memcpy, endianness
218 memcpy(&body->oa, oa, sizeof(*oa));
220 request->rq_replen = lustre_msg_size(1, &size);
222 rc = ptlrpc_queue_wait(request);
226 body = lustre_msg_buf(request->rq_repmsg, 0);
227 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
229 memcpy(oa, &body->oa, sizeof(*oa));
233 ptlrpc_req_finished(request);
237 static int osc_setattr(struct lustre_handle *conn, struct obdo *oa,
238 struct lov_stripe_md *md, struct obd_trans_info *oti)
240 struct ptlrpc_request *request;
241 struct ost_body *body;
242 int rc, size = sizeof(*body);
245 request = ptlrpc_prep_req(class_conn2cliimp(conn), OST_SETATTR, 1,
250 body = lustre_msg_buf(request->rq_reqmsg, 0);
251 memcpy(&body->oa, oa, sizeof(*oa));
253 request->rq_replen = lustre_msg_size(1, &size);
255 rc = ptlrpc_queue_wait(request);
257 ptlrpc_req_finished(request);
261 static int osc_create(struct lustre_handle *conn, struct obdo *oa,
262 struct lov_stripe_md **ea, struct obd_trans_info *oti_in)
264 struct ptlrpc_request *request;
265 struct ost_body *body;
266 struct lov_stripe_md *lsm;
267 struct obd_trans_info *oti, trans_info;
268 int rc, size = sizeof(*body);
276 rc = obd_alloc_memmd(conn, &lsm);
286 request = ptlrpc_prep_req(class_conn2cliimp(conn), OST_CREATE, 1, &size,
289 GOTO(out, rc = -ENOMEM);
291 body = lustre_msg_buf(request->rq_reqmsg, 0);
292 memcpy(&body->oa, oa, sizeof(*oa));
294 request->rq_replen = lustre_msg_size(1, &size);
296 rc = ptlrpc_queue_wait(request);
300 body = lustre_msg_buf(request->rq_repmsg, 0);
301 memcpy(oa, &body->oa, sizeof(*oa));
303 lsm->lsm_object_id = oa->o_id;
304 lsm->lsm_stripe_count = 0;
307 oti_from_request(oti, request);
308 CDEBUG(D_HA, "transno: "LPD64"\n", oti->oti_transno);
311 ptlrpc_req_finished(request);
314 obd_free_memmd(conn, &lsm);
318 static int osc_punch(struct lustre_handle *conn, struct obdo *oa,
319 struct lov_stripe_md *md, obd_size start,
320 obd_size end, struct obd_trans_info *oti)
322 struct ptlrpc_request *request;
323 struct ost_body *body;
324 int rc, size = sizeof(*body);
332 request = ptlrpc_prep_req(class_conn2cliimp(conn), OST_PUNCH, 1, &size,
337 body = lustre_msg_buf(request->rq_reqmsg, 0);
338 #warning FIXME: pack only valid fields instead of memcpy, endianness, valid
339 memcpy(&body->oa, oa, sizeof(*oa));
341 /* overload the size and blocks fields in the oa with start/end */
342 body->oa.o_size = HTON__u64(start);
343 body->oa.o_blocks = HTON__u64(end);
344 body->oa.o_valid |= HTON__u32(OBD_MD_FLSIZE | OBD_MD_FLBLOCKS);
346 request->rq_replen = lustre_msg_size(1, &size);
348 rc = ptlrpc_queue_wait(request);
352 body = lustre_msg_buf(request->rq_repmsg, 0);
353 memcpy(oa, &body->oa, sizeof(*oa));
357 ptlrpc_req_finished(request);
361 static int osc_destroy(struct lustre_handle *conn, struct obdo *oa,
362 struct lov_stripe_md *ea, struct obd_trans_info *oti)
364 struct ptlrpc_request *request;
365 struct ost_body *body;
366 int rc, size = sizeof(*body);
373 request = ptlrpc_prep_req(class_conn2cliimp(conn), OST_DESTROY, 1,
378 body = lustre_msg_buf(request->rq_reqmsg, 0);
379 #warning FIXME: pack only valid fields instead of memcpy, endianness
380 memcpy(&body->oa, oa, sizeof(*oa));
382 request->rq_replen = lustre_msg_size(1, &size);
384 rc = ptlrpc_queue_wait(request);
388 body = lustre_msg_buf(request->rq_repmsg, 0);
389 memcpy(oa, &body->oa, sizeof(*oa));
393 ptlrpc_req_finished(request);
397 /* Our bulk-unmapping bottom half. */
398 static void unmap_and_decref_bulk_desc(void *data)
400 struct ptlrpc_bulk_desc *desc = data;
401 struct list_head *tmp;
404 /* This feels wrong to me. */
405 list_for_each(tmp, &desc->bd_page_list) {
406 struct ptlrpc_bulk_page *bulk;
407 bulk = list_entry(tmp, struct ptlrpc_bulk_page, bp_link);
409 kunmap(bulk->bp_page);
413 ptlrpc_bulk_decref(desc);
418 /* this is the callback function which is invoked by the Portals
419 * event handler associated with the bulk_sink queue and bulk_source queue.
421 static void osc_ptl_ev_hdlr(struct ptlrpc_bulk_desc *desc)
425 LASSERT(desc->bd_brw_set != NULL);
426 LASSERT(desc->bd_brw_set->brw_callback != NULL);
428 desc->bd_brw_set->brw_callback(desc->bd_brw_set, CB_PHASE_FINISH);
430 /* We can't kunmap the desc from interrupt context, so we do it from
431 * the bottom half above. */
432 prepare_work(&desc->bd_queue, unmap_and_decref_bulk_desc, desc);
433 schedule_work(&desc->bd_queue);
438 static int osc_brw_read(struct lustre_handle *conn, struct lov_stripe_md *lsm,
439 obd_count page_count, struct brw_page *pga,
440 struct obd_brw_set *set)
442 struct obd_import *imp = class_conn2cliimp(conn);
443 struct ptlrpc_connection *connection = imp->imp_connection;
444 struct ptlrpc_request *request = NULL;
445 struct ptlrpc_bulk_desc *desc = NULL;
446 struct ost_body *body;
447 int rc, size[3] = {sizeof(*body)}, mapped = 0;
449 struct obd_ioobj *iooptr;
454 size[1] = sizeof(struct obd_ioobj);
455 size[2] = page_count * sizeof(struct niobuf_remote);
457 request = ptlrpc_prep_req(imp, OST_READ, 3, size, NULL);
461 body = lustre_msg_buf(request->rq_reqmsg, 0);
463 desc = ptlrpc_prep_bulk(connection);
465 GOTO(out_req, rc = -ENOMEM);
466 desc->bd_portal = OST_BULK_PORTAL;
467 desc->bd_ptl_ev_hdlr = osc_ptl_ev_hdlr;
468 CDEBUG(D_PAGE, "desc = %p\n", desc);
470 iooptr = lustre_msg_buf(request->rq_reqmsg, 1);
471 nioptr = lustre_msg_buf(request->rq_reqmsg, 2);
472 ost_pack_ioo(&iooptr, lsm, page_count);
473 /* end almost identical to brw_write case */
475 spin_lock_irqsave(&imp->imp_lock, flags);
476 xid = ++imp->imp_last_xid; /* single xid for all pages */
477 spin_unlock_irqrestore(&imp->imp_lock, flags);
479 obd_kmap_get(page_count, 0);
481 for (mapped = 0; mapped < page_count; mapped++) {
482 struct ptlrpc_bulk_page *bulk = ptlrpc_prep_bulk_page(desc);
484 GOTO(out_unmap, rc = -ENOMEM);
486 bulk->bp_xid = xid; /* single xid for all pages */
488 bulk->bp_buf = kmap(pga[mapped].pg);
489 bulk->bp_page = pga[mapped].pg;
490 bulk->bp_buflen = PAGE_SIZE;
491 ost_pack_niobuf(&nioptr, pga[mapped].off, pga[mapped].count,
492 pga[mapped].flag, bulk->bp_xid);
496 * Register the bulk first, because the reply could arrive out of order,
497 * and we want to be ready for the bulk data.
499 * One reference is released when brw_finish is complete, the other when
500 * the caller removes us from the "set" list.
502 * On error, we never do the brw_finish, so we handle all decrefs.
504 if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_READ_BULK)) {
505 CERROR("obd_fail_loc=%x, skipping register_bulk\n",
506 OBD_FAIL_OSC_BRW_READ_BULK);
508 rc = ptlrpc_register_bulk_put(desc);
511 obd_brw_set_add(set, desc);
514 request->rq_replen = lustre_msg_size(1, size);
515 rc = ptlrpc_queue_wait(request);
518 * XXX: If there is an error during the processing of the callback,
519 * such as a timeout in a sleep that it performs, brw_finish
520 * will never get called, and we'll leak the desc, fail to kunmap
521 * things, cats will live with dogs. One solution would be to
522 * export brw_finish as osc_brw_finish, so that the timeout case
523 * and its kin could call it for proper cleanup. An alternative
524 * would be for an error return from the callback to cause us to
525 * clean up, but that doesn't help the truly async cases (like
526 * LOV), which will immediately return from their PHASE_START
527 * callback, before any such cleanup-requiring error condition can
531 ptlrpc_req_finished(request);
534 /* Clean up on error. */
537 kunmap(pga[mapped].pg);
538 obd_kmap_put(page_count);
539 ptlrpc_bulk_decref(desc);
543 static int osc_brw_write(struct lustre_handle *conn, struct lov_stripe_md *lsm,
544 obd_count page_count, struct brw_page *pga,
545 struct obd_brw_set *set, struct obd_trans_info *oti)
547 struct obd_import *imp = class_conn2cliimp(conn);
548 struct ptlrpc_connection *connection = imp->imp_connection;
549 struct ptlrpc_request *request = NULL;
550 struct ptlrpc_bulk_desc *desc = NULL;
551 struct ost_body *body;
552 int rc, size[3] = {sizeof(*body)}, mapped = 0;
554 struct obd_ioobj *iooptr;
559 size[1] = sizeof(struct obd_ioobj);
560 size[2] = page_count * sizeof(struct niobuf_remote);
562 request = ptlrpc_prep_req(imp, OST_WRITE, 3, size, NULL);
566 body = lustre_msg_buf(request->rq_reqmsg, 0);
568 desc = ptlrpc_prep_bulk(connection);
570 GOTO(out_req, rc = -ENOMEM);
571 desc->bd_portal = OSC_BULK_PORTAL;
572 desc->bd_ptl_ev_hdlr = osc_ptl_ev_hdlr;
573 CDEBUG(D_PAGE, "desc = %p\n", desc);
575 iooptr = lustre_msg_buf(request->rq_reqmsg, 1);
576 nioptr = lustre_msg_buf(request->rq_reqmsg, 2);
577 ost_pack_ioo(&iooptr, lsm, page_count);
578 /* end almost identical to brw_read case */
580 spin_lock_irqsave(&imp->imp_lock, flags);
581 xid = ++imp->imp_last_xid; /* single xid for all pages */
582 spin_unlock_irqrestore(&imp->imp_lock, flags);
584 obd_kmap_get(page_count, 0);
586 for (mapped = 0; mapped < page_count; mapped++) {
587 struct ptlrpc_bulk_page *bulk = ptlrpc_prep_bulk_page(desc);
589 GOTO(out_unmap, rc = -ENOMEM);
591 bulk->bp_xid = xid; /* single xid for all pages */
593 bulk->bp_buf = kmap(pga[mapped].pg);
594 bulk->bp_page = pga[mapped].pg;
595 bulk->bp_buflen = PAGE_SIZE;
596 ost_pack_niobuf(&nioptr, pga[mapped].off, pga[mapped].count,
597 pga[mapped].flag, bulk->bp_xid);
601 * Register the bulk first, because the reply could arrive out of
602 * order, and we want to be ready for the bulk data.
604 * One reference is released when brw_finish is complete, the other
605 * when the caller removes us from the "set" list.
607 * On error, we never do the brw_finish, so we handle all decrefs.
609 if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_WRITE_BULK)) {
610 CERROR("obd_fail_loc=%x, skipping register_bulk\n",
611 OBD_FAIL_OSC_BRW_WRITE_BULK);
613 rc = ptlrpc_register_bulk_get(desc);
616 obd_brw_set_add(set, desc);
619 request->rq_replen = lustre_msg_size(1, size);
620 rc = ptlrpc_queue_wait(request);
623 * XXX: If there is an error during the processing of the callback,
624 * such as a timeout in a sleep that it performs, brw_finish
625 * will never get called, and we'll leak the desc, fail to kunmap
626 * things, cats will live with dogs. One solution would be to
627 * export brw_finish as osc_brw_finish, so that the timeout case
628 * and its kin could call it for proper cleanup. An alternative
629 * would be for an error return from the callback to cause us to
630 * clean up, but that doesn't help the truly async cases (like
631 * LOV), which will immediately return from their PHASE_START
632 * callback, before any such cleanup-requiring error condition can
636 ptlrpc_req_finished(request);
639 /* Clean up on error. */
642 kunmap(pga[mapped].pg);
643 obd_kmap_put(page_count);
644 ptlrpc_bulk_decref(desc);
648 static int osc_brw(int cmd, struct lustre_handle *conn,
649 struct lov_stripe_md *md, obd_count page_count,
650 struct brw_page *pga, struct obd_brw_set *set,
651 struct obd_trans_info *oti)
656 obd_count pages_per_brw;
659 if (page_count > PTL_MD_MAX_IOV)
660 pages_per_brw = PTL_MD_MAX_IOV;
662 pages_per_brw = page_count;
664 if (cmd & OBD_BRW_WRITE)
665 rc = osc_brw_write(conn, md, pages_per_brw, pga, set, oti);
667 rc = osc_brw_read(conn, md, pages_per_brw, pga, set);
672 page_count -= pages_per_brw;
673 pga += pages_per_brw;
678 static int osc_enqueue(struct lustre_handle *connh, struct lov_stripe_md *lsm,
679 struct lustre_handle *parent_lock,
680 __u32 type, void *extentp, int extent_len, __u32 mode,
681 int *flags, void *callback, void *data, int datalen,
682 struct lustre_handle *lockh)
684 struct ldlm_res_id res_id = { .name = {lsm->lsm_object_id} };
685 struct obd_device *obddev = class_conn2obd(connh);
686 struct ldlm_extent *extent = extentp;
690 /* Filesystem locks are given a bit of special treatment: if
691 * this is not a file size lock (which has end == -1), we
692 * fixup the lock to start and end on page boundaries. */
693 if (extent->end != OBD_OBJECT_EOF) {
694 extent->start &= PAGE_MASK;
695 extent->end = (extent->end & PAGE_MASK) + PAGE_SIZE - 1;
698 /* Next, search for already existing extent locks that will cover us */
699 rc = ldlm_lock_match(obddev->obd_namespace, 0, &res_id, type, extent,
700 sizeof(extent), mode, lockh);
702 /* We already have a lock, and it's referenced */
705 /* If we're trying to read, we also search for an existing PW lock. The
706 * VFS and page cache already protect us locally, so lots of readers/
707 * writers can share a single PW lock.
709 * There are problems with conversion deadlocks, so instead of
710 * converting a read lock to a write lock, we'll just enqueue a new
713 * At some point we should cancel the read lock instead of making them
714 * send us a blocking callback, but there are problems with canceling
715 * locks out from other users right now, too. */
717 if (mode == LCK_PR) {
718 rc = ldlm_lock_match(obddev->obd_namespace, 0, &res_id, type,
719 extent, sizeof(extent), LCK_PW, lockh);
721 /* FIXME: This is not incredibly elegant, but it might
722 * be more elegant than adding another parameter to
723 * lock_match. I want a second opinion. */
724 ldlm_lock_addref(lockh, LCK_PR);
725 ldlm_lock_decref(lockh, LCK_PW);
731 rc = ldlm_cli_enqueue(connh, NULL, obddev->obd_namespace, parent_lock,
732 res_id, type, extent, sizeof(extent), mode, flags,
733 ldlm_completion_ast, callback, data, NULL,
738 static int osc_cancel(struct lustre_handle *oconn, struct lov_stripe_md *md,
739 __u32 mode, struct lustre_handle *lockh)
743 ldlm_lock_decref(lockh, mode);
748 static int osc_cancel_unused(struct lustre_handle *connh,
749 struct lov_stripe_md *lsm, int flags)
751 struct obd_device *obddev = class_conn2obd(connh);
752 struct ldlm_res_id res_id = { .name = {lsm->lsm_object_id} };
754 return ldlm_cli_cancel_unused(obddev->obd_namespace, &res_id, flags);
757 static int osc_statfs(struct lustre_handle *conn, struct obd_statfs *osfs)
759 struct ptlrpc_request *request;
760 int rc, size = sizeof(*osfs);
763 request = ptlrpc_prep_req(class_conn2cliimp(conn), OST_STATFS, 0, NULL,
768 request->rq_replen = lustre_msg_size(1, &size);
770 rc = ptlrpc_queue_wait(request);
772 CERROR("%s failed: rc = %d\n", __FUNCTION__, rc);
776 obd_statfs_unpack(osfs, lustre_msg_buf(request->rq_repmsg, 0));
780 ptlrpc_req_finished(request);
784 /* Retrieve object striping information.
786 * @lmmu is a pointer to an in-core struct with lmm_ost_count indicating
787 * the maximum number of OST indices which will fit in the user buffer.
788 * lmm_magic must be LOV_MAGIC (we only use 1 slot here).
790 static int osc_getstripe(struct lustre_handle *conn, struct lov_stripe_md *lsm,
791 struct lov_mds_md *lmmu)
793 struct lov_mds_md lmm, *lmmk;
800 rc = copy_from_user(&lmm, lmmu, sizeof(lmm));
804 if (lmm.lmm_magic != LOV_MAGIC)
807 if (lmm.lmm_ost_count < 1)
810 lmm_size = sizeof(lmm) + sizeof(lmm.lmm_objects[0]);
811 OBD_ALLOC(lmmk, lmm_size);
815 lmmk->lmm_stripe_count = 1;
816 lmmk->lmm_ost_count = 1;
817 lmmk->lmm_object_id = lsm->lsm_object_id;
818 lmmk->lmm_objects[0].l_object_id = lsm->lsm_object_id;
820 if (copy_to_user(lmmu, lmmk, lmm_size))
823 OBD_FREE(lmmk, lmm_size);
828 static int osc_iocontrol(unsigned int cmd, struct lustre_handle *conn, int len,
829 void *karg, void *uarg)
831 struct obd_device *obddev = class_conn2obd(conn);
832 struct obd_ioctl_data *data = karg;
838 case IOC_LDLM_TEST: {
839 err = ldlm_test(obddev, conn);
840 CERROR("-- done err %d\n", err);
843 case IOC_LDLM_REGRESS_START: {
844 unsigned int numthreads = 1;
845 unsigned int numheld = 10;
846 unsigned int numres = 10;
847 unsigned int numext = 10;
850 if (data->ioc_inllen1) {
851 parse = data->ioc_inlbuf1;
852 if (*parse != '\0') {
853 while(isspace(*parse)) parse++;
854 numthreads = simple_strtoul(parse, &parse, 0);
855 while(isspace(*parse)) parse++;
857 if (*parse != '\0') {
858 while(isspace(*parse)) parse++;
859 numheld = simple_strtoul(parse, &parse, 0);
860 while(isspace(*parse)) parse++;
862 if (*parse != '\0') {
863 while(isspace(*parse)) parse++;
864 numres = simple_strtoul(parse, &parse, 0);
865 while(isspace(*parse)) parse++;
867 if (*parse != '\0') {
868 while(isspace(*parse)) parse++;
869 numext = simple_strtoul(parse, &parse, 0);
870 while(isspace(*parse)) parse++;
874 err = ldlm_regression_start(obddev, conn, numthreads,
875 numheld, numres, numext);
877 CERROR("-- done err %d\n", err);
880 case IOC_LDLM_REGRESS_STOP: {
881 err = ldlm_regression_stop();
882 CERROR("-- done err %d\n", err);
886 case IOC_OSC_REGISTER_LOV: {
887 if (obddev->u.cli.cl_containing_lov)
888 GOTO(out, err = -EALREADY);
889 obddev->u.cli.cl_containing_lov = (struct obd_device *)karg;
892 case OBD_IOC_LOV_GET_CONFIG: {
894 struct lov_desc *desc;
895 struct obd_uuid uuid;
899 if (obd_ioctl_getdata(&buf, &len, (void *)uarg))
900 GOTO(out, err = -EINVAL);
902 data = (struct obd_ioctl_data *)buf;
904 if (sizeof(*desc) > data->ioc_inllen1) {
906 GOTO(out, err = -EINVAL);
909 if (data->ioc_inllen2 < sizeof(uuid.uuid)) {
911 GOTO(out, err = -EINVAL);
914 desc = (struct lov_desc *)data->ioc_inlbuf1;
915 desc->ld_tgt_count = 1;
916 desc->ld_active_tgt_count = 1;
917 desc->ld_default_stripe_count = 1;
918 desc->ld_default_stripe_size = 0;
919 desc->ld_default_stripe_offset = 0;
920 desc->ld_pattern = 0;
921 memcpy(desc->ld_uuid.uuid, obddev->obd_uuid.uuid, sizeof(uuid.uuid));
923 memcpy(data->ioc_inlbuf2, obddev->obd_uuid.uuid,
926 err = copy_to_user((void *)uarg, buf, len);
932 case LL_IOC_LOV_SETSTRIPE:
933 err = obd_alloc_memmd(conn, karg);
937 case LL_IOC_LOV_GETSTRIPE:
938 err = osc_getstripe(conn, karg, uarg);
941 CERROR ("osc_ioctl(): unrecognised ioctl %#x\n", cmd);
942 GOTO(out, err = -ENOTTY);
948 static void set_osc_active(struct obd_import *imp, int active)
950 struct obd_device *notify_obd;
952 LASSERT(imp->imp_obd);
954 notify_obd = imp->imp_obd->u.cli.cl_containing_lov;
956 if (notify_obd == NULL)
959 /* How gross is _this_? */
960 if (!list_empty(¬ify_obd->obd_exports)) {
962 struct lustre_handle fakeconn;
963 struct obd_ioctl_data ioc_data = { 0 };
964 struct obd_export *exp =
965 list_entry(notify_obd->obd_exports.next,
966 struct obd_export, exp_obd_chain);
968 fakeconn.addr = (__u64)(unsigned long)exp;
969 fakeconn.cookie = exp->exp_cookie;
970 ioc_data.ioc_inlbuf1 = &imp->imp_obd->u.cli.cl_target_uuid;
971 ioc_data.ioc_offset = active;
972 rc = obd_iocontrol(IOC_LOV_SET_OSC_ACTIVE, &fakeconn,
973 sizeof ioc_data, &ioc_data, NULL);
975 CERROR("disabling %s on LOV %p/%s: %d\n",
976 imp->imp_obd->u.cli.cl_target_uuid.uuid,
977 notify_obd, notify_obd->obd_uuid.uuid, rc);
980 CDEBUG(D_HA, "No exports for obd %p/%s, can't notify about "
981 "%p\n", notify_obd, notify_obd->obd_uuid.uuid,
982 imp->imp_obd->obd_uuid.uuid);
986 static int osc_recover(struct obd_import *imp, int phase)
990 struct ptlrpc_request *req;
995 case PTLRPC_RECOVD_PHASE_PREPARE: {
996 struct ldlm_namespace *ns = imp->imp_obd->obd_namespace;
997 ldlm_namespace_cleanup(ns, 1 /* no network ops */);
998 ptlrpc_abort_inflight(imp, 0);
999 set_osc_active(imp, 0 /* inactive */);
1003 case PTLRPC_RECOVD_PHASE_RECOVER:
1004 imp->imp_flags &= ~IMP_INVALID;
1005 rc = ptlrpc_reconnect_import(imp, OST_CONNECT, &req);
1006 ptlrpc_req_finished(req);
1008 imp->imp_flags |= IMP_INVALID;
1012 spin_lock_irqsave(&imp->imp_lock, flags);
1013 imp->imp_level = LUSTRE_CONN_FULL;
1014 spin_unlock_irqrestore(&imp->imp_lock, flags);
1016 /* Is this the right place? Should we do this in _PREPARE
1017 * as well? What about raising the level right away?
1019 ptlrpc_wake_delayed(imp);
1021 set_osc_active(imp, 1 /* active */);
1024 case PTLRPC_RECOVD_PHASE_NOTCONN:
1025 osc_recover(imp, PTLRPC_RECOVD_PHASE_PREPARE);
1026 RETURN(osc_recover(imp, PTLRPC_RECOVD_PHASE_RECOVER));
1033 static int osc_connect(struct lustre_handle *conn, struct obd_device *obd,
1034 struct obd_uuid *cluuid, struct recovd_obd *recovd,
1035 ptlrpc_recovery_cb_t recover)
1037 struct obd_import *imp = &obd->u.cli.cl_import;
1038 imp->imp_recover = osc_recover;
1039 return client_obd_connect(conn, obd, cluuid, recovd, recover);
1042 struct obd_ops osc_obd_ops = {
1043 o_owner: THIS_MODULE,
1044 o_attach: osc_attach,
1045 o_detach: osc_detach,
1046 o_setup: client_obd_setup,
1047 o_cleanup: client_obd_cleanup,
1048 o_connect: osc_connect,
1049 o_disconnect: client_obd_disconnect,
1050 o_statfs: osc_statfs,
1051 o_packmd: osc_packmd,
1052 o_unpackmd: osc_unpackmd,
1053 o_create: osc_create,
1054 o_destroy: osc_destroy,
1055 o_getattr: osc_getattr,
1056 o_setattr: osc_setattr,
1061 o_enqueue: osc_enqueue,
1062 o_cancel: osc_cancel,
1063 o_cancel_unused: osc_cancel_unused,
1064 o_iocontrol: osc_iocontrol
1067 static int __init osc_init(void)
1069 struct lprocfs_static_vars lvars;
1071 lprocfs_init_vars(&lvars);
1072 RETURN(class_register_type(&osc_obd_ops, lvars.module_vars,
1076 static void __exit osc_exit(void)
1078 class_unregister_type(LUSTRE_OSC_NAME);
1081 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
1082 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC)");
1083 MODULE_LICENSE("GPL");
1085 module_init(osc_init);
1086 module_exit(osc_exit);