1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
4 * Copyright (C) 2001, 2002 Cluster File Systems, Inc.
6 * This code is issued under the GNU General Public License.
7 * See the file COPYING in this distribution
9 * Author Peter Braam <braam@clusterfs.com>
11 * This server is single threaded at present (but can easily be multi
12 * threaded). For testing and management it is treated as an
13 * obd_device, although it does not export a full OBD method table
14 * (the requests are coming in over the wire, so object target
15 * modules do not have a full method table.)
20 #define DEBUG_SUBSYSTEM S_OSC
22 #include <linux/module.h>
23 #include <linux/lustre_dlm.h>
24 #include <linux/lustre_mds.h> /* for mds_objid */
25 #include <linux/obd_ost.h>
26 #include <linux/obd_lov.h>
27 #include <linux/ctype.h>
28 #include <linux/init.h>
29 #include <linux/lustre_ha.h>
30 #include <linux/obd_support.h> /* for OBD_FAIL_CHECK */
31 #include <linux/lustre_lite.h> /* for ll_i2info */
33 static int osc_getattr(struct lustre_handle *conn, struct obdo *oa,
34 struct lov_stripe_md *md)
36 struct ptlrpc_request *request;
37 struct ost_body *body;
38 int rc, size = sizeof(*body);
41 request = ptlrpc_prep_req(class_conn2cliimp(conn), OST_GETATTR, 1,
46 body = lustre_msg_buf(request->rq_reqmsg, 0);
47 #warning FIXME: pack only valid fields instead of memcpy, endianness
48 memcpy(&body->oa, oa, sizeof(*oa));
50 request->rq_replen = lustre_msg_size(1, &size);
52 rc = ptlrpc_queue_wait(request);
53 rc = ptlrpc_check_status(request, rc);
55 CERROR("%s failed: rc = %d\n", __FUNCTION__, rc);
59 body = lustre_msg_buf(request->rq_repmsg, 0);
60 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
62 memcpy(oa, &body->oa, sizeof(*oa));
66 ptlrpc_free_req(request);
70 static int osc_open(struct lustre_handle *conn, struct obdo *oa,
71 struct lov_stripe_md *md)
73 struct ptlrpc_request *request;
74 struct ost_body *body;
75 int rc, size = sizeof(*body);
78 request = ptlrpc_prep_req(class_conn2cliimp(conn), OST_OPEN, 1, &size,
83 body = lustre_msg_buf(request->rq_reqmsg, 0);
84 #warning FIXME: pack only valid fields instead of memcpy, endianness
85 memcpy(&body->oa, oa, sizeof(*oa));
87 request->rq_replen = lustre_msg_size(1, &size);
89 rc = ptlrpc_queue_wait(request);
90 rc = ptlrpc_check_status(request, rc);
94 body = lustre_msg_buf(request->rq_repmsg, 0);
95 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
97 memcpy(oa, &body->oa, sizeof(*oa));
101 ptlrpc_free_req(request);
105 static int osc_close(struct lustre_handle *conn, struct obdo *oa,
106 struct lov_stripe_md *md)
108 struct ptlrpc_request *request;
109 struct ost_body *body;
110 int rc, size = sizeof(*body);
113 request = ptlrpc_prep_req(class_conn2cliimp(conn), OST_CLOSE, 1, &size,
118 body = lustre_msg_buf(request->rq_reqmsg, 0);
119 #warning FIXME: pack only valid fields instead of memcpy, endianness
120 memcpy(&body->oa, oa, sizeof(*oa));
122 request->rq_replen = lustre_msg_size(1, &size);
124 rc = ptlrpc_queue_wait(request);
125 rc = ptlrpc_check_status(request, rc);
129 body = lustre_msg_buf(request->rq_repmsg, 0);
130 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
132 memcpy(oa, &body->oa, sizeof(*oa));
136 ptlrpc_free_req(request);
140 static int osc_setattr(struct lustre_handle *conn, struct obdo *oa,
141 struct lov_stripe_md *md)
143 struct ptlrpc_request *request;
144 struct ost_body *body;
145 int rc, size = sizeof(*body);
148 request = ptlrpc_prep_req(class_conn2cliimp(conn), OST_SETATTR, 1,
153 body = lustre_msg_buf(request->rq_reqmsg, 0);
154 memcpy(&body->oa, oa, sizeof(*oa));
156 request->rq_replen = lustre_msg_size(1, &size);
158 rc = ptlrpc_queue_wait(request);
159 rc = ptlrpc_check_status(request, rc);
161 ptlrpc_free_req(request);
165 static int osc_create(struct lustre_handle *conn, struct obdo *oa,
166 struct lov_stripe_md **ea)
168 struct ptlrpc_request *request;
169 struct ost_body *body;
170 int rc, size = sizeof(*body);
183 // XXX check oa->o_valid & OBD_MD_FLEASIZE first...
184 OBD_ALLOC(*ea, oa->o_easize);
187 (*ea)->lsm_mds_easize = oa->o_easize;
190 request = ptlrpc_prep_req(class_conn2cliimp(conn), OST_CREATE, 1, &size,
195 body = lustre_msg_buf(request->rq_reqmsg, 0);
196 memcpy(&body->oa, oa, sizeof(*oa));
198 request->rq_replen = lustre_msg_size(1, &size);
200 rc = ptlrpc_queue_wait(request);
201 rc = ptlrpc_check_status(request, rc);
205 body = lustre_msg_buf(request->rq_repmsg, 0);
206 memcpy(oa, &body->oa, sizeof(*oa));
208 (*ea)->lsm_object_id = oa->o_id;
209 (*ea)->lsm_stripe_count = 0;
212 ptlrpc_free_req(request);
216 static int osc_punch(struct lustre_handle *conn, struct obdo *oa,
217 struct lov_stripe_md *md, obd_size start,
220 struct ptlrpc_request *request;
221 struct ost_body *body;
222 int rc, size = sizeof(*body);
230 request = ptlrpc_prep_req(class_conn2cliimp(conn), OST_PUNCH, 1, &size,
235 body = lustre_msg_buf(request->rq_reqmsg, 0);
236 #warning FIXME: pack only valid fields instead of memcpy, endianness, valid
237 memcpy(&body->oa, oa, sizeof(*oa));
239 /* overload the blocks and size fields in the oa with start/end */
240 #warning FIXME: endianness, size=start, blocks=end?
241 body->oa.o_blocks = start;
242 body->oa.o_size = end;
243 body->oa.o_valid |= OBD_MD_FLBLOCKS | OBD_MD_FLSIZE;
245 request->rq_replen = lustre_msg_size(1, &size);
247 rc = ptlrpc_queue_wait(request);
248 rc = ptlrpc_check_status(request, rc);
252 body = lustre_msg_buf(request->rq_repmsg, 0);
253 memcpy(oa, &body->oa, sizeof(*oa));
257 ptlrpc_free_req(request);
261 static int osc_destroy(struct lustre_handle *conn, struct obdo *oa,
262 struct lov_stripe_md *ea)
264 struct ptlrpc_request *request;
265 struct ost_body *body;
266 int rc, size = sizeof(*body);
273 request = ptlrpc_prep_req(class_conn2cliimp(conn), OST_DESTROY, 1,
278 body = lustre_msg_buf(request->rq_reqmsg, 0);
279 #warning FIXME: pack only valid fields instead of memcpy, endianness
280 memcpy(&body->oa, oa, sizeof(*oa));
282 request->rq_replen = lustre_msg_size(1, &size);
284 rc = ptlrpc_queue_wait(request);
285 rc = ptlrpc_check_status(request, rc);
289 body = lustre_msg_buf(request->rq_repmsg, 0);
290 memcpy(oa, &body->oa, sizeof(*oa));
294 ptlrpc_free_req(request);
298 struct osc_brw_cb_data {
299 brw_callback_t callback;
305 /* Our bulk-unmapping bottom half. */
306 static void unmap_and_decref_bulk_desc(void *data)
308 struct ptlrpc_bulk_desc *desc = data;
309 struct list_head *tmp;
312 /* This feels wrong to me. */
313 list_for_each(tmp, &desc->bd_page_list) {
314 struct ptlrpc_bulk_page *bulk;
315 bulk = list_entry(tmp, struct ptlrpc_bulk_page, bp_link);
317 kunmap(bulk->bp_page);
320 ptlrpc_bulk_decref(desc);
324 static void brw_finish(struct ptlrpc_bulk_desc *desc, void *data)
326 struct osc_brw_cb_data *cb_data = data;
330 if (desc->bd_flags & PTL_RPC_FL_TIMEOUT) {
331 err = (desc->bd_flags & PTL_RPC_FL_INTR ? -ERESTARTSYS :
335 if (cb_data->callback)
336 cb_data->callback(cb_data->cb_data, err, CB_PHASE_FINISH);
338 if (cb_data->obd_data)
339 OBD_FREE(cb_data->obd_data, cb_data->obd_size);
340 OBD_FREE(cb_data, sizeof(*cb_data));
342 /* We can't kunmap the desc from interrupt context, so we do it from
343 * the bottom half above. */
344 INIT_TQUEUE(&desc->bd_queue, 0, 0);
345 PREPARE_TQUEUE(&desc->bd_queue, unmap_and_decref_bulk_desc, desc);
346 schedule_task(&desc->bd_queue);
351 static int osc_brw_read(struct lustre_handle *conn, struct lov_stripe_md *md,
352 obd_count page_count, struct brw_page *pga,
353 brw_callback_t callback, struct io_cb_data *data)
355 struct ptlrpc_connection *connection =
356 client_conn2cli(conn)->cl_import.imp_connection;
357 struct ptlrpc_request *request = NULL;
358 struct ptlrpc_bulk_desc *desc = NULL;
359 struct ost_body *body;
360 struct osc_brw_cb_data *cb_data = NULL;
361 int rc, size[3] = {sizeof(*body)};
362 void *iooptr, *nioptr;
367 size[1] = sizeof(struct obd_ioobj);
368 size[2] = page_count * sizeof(struct niobuf_remote);
370 request = ptlrpc_prep_req(class_conn2cliimp(conn), OST_READ, 3, size,
375 body = lustre_msg_buf(request->rq_reqmsg, 0);
377 desc = ptlrpc_prep_bulk(connection);
379 GOTO(out_req, rc = -ENOMEM);
380 desc->bd_portal = OST_BULK_PORTAL;
381 desc->bd_cb = brw_finish;
382 OBD_ALLOC(cb_data, sizeof(*cb_data));
384 GOTO(out_desc, rc = -ENOMEM);
386 cb_data->callback = callback;
387 cb_data->cb_data = data;
389 desc->bd_cb_data = cb_data;
391 iooptr = lustre_msg_buf(request->rq_reqmsg, 1);
392 nioptr = lustre_msg_buf(request->rq_reqmsg, 2);
393 ost_pack_ioo(&iooptr, md, page_count);
394 /* end almost identical to brw_write case */
396 spin_lock(&connection->c_lock);
397 xid = ++connection->c_xid_out; /* single xid for all pages */
398 spin_unlock(&connection->c_lock);
400 for (mapped = 0; mapped < page_count; mapped++) {
401 struct ptlrpc_bulk_page *bulk = ptlrpc_prep_bulk_page(desc);
403 GOTO(out_unmap, rc = -ENOMEM);
405 bulk->bp_xid = xid; /* single xid for all pages */
407 bulk->bp_buf = kmap(pga[mapped].pg);
408 bulk->bp_page = pga[mapped].pg;
409 bulk->bp_buflen = PAGE_SIZE;
410 ost_pack_niobuf(&nioptr, pga[mapped].off, pga[mapped].count,
411 pga[mapped].flag, bulk->bp_xid);
415 * Register the bulk first, because the reply could arrive out of order,
416 * and we want to be ready for the bulk data.
418 * The reference is released when brw_finish is complete.
420 * On error, we never do the brw_finish, so we handle all decrefs.
422 if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_READ_BULK)) {
423 CERROR("obd_fail_loc=%x, skipping register_bulk\n",
424 OBD_FAIL_OSC_BRW_READ_BULK);
426 rc = ptlrpc_register_bulk(desc);
431 request->rq_replen = lustre_msg_size(1, size);
432 rc = ptlrpc_queue_wait(request);
433 rc = ptlrpc_check_status(request, rc);
436 * XXX: If there is an error during the processing of the callback,
437 * such as a timeout in a sleep that it performs, brw_finish
438 * will never get called, and we'll leak the desc, fail to kunmap
439 * things, cats will live with dogs. One solution would be to
440 * export brw_finish as osc_brw_finish, so that the timeout case
441 * and its kin could call it for proper cleanup. An alternative
442 * would be for an error return from the callback to cause us to
443 * clean up, but that doesn't help the truly async cases (like
444 * LOV), which will immediately return from their PHASE_START
445 * callback, before any such cleanup-requiring error condition can
451 /* Callbacks cause asynchronous handling. */
452 rc = callback(data, 0, CB_PHASE_START);
455 ptlrpc_req_finished(request);
458 /* Clean up on error. */
461 kunmap(pga[mapped].pg);
462 OBD_FREE(cb_data, sizeof(*cb_data));
464 ptlrpc_bulk_decref(desc);
468 static int osc_brw_write(struct lustre_handle *conn, struct lov_stripe_md *md,
469 obd_count page_count, struct brw_page *pga,
470 brw_callback_t callback, struct io_cb_data *data)
472 struct ptlrpc_connection *connection =
473 client_conn2cli(conn)->cl_import.imp_connection;
474 struct ptlrpc_request *request = NULL;
475 struct ptlrpc_bulk_desc *desc = NULL;
476 struct ost_body *body;
477 struct niobuf_local *local = NULL;
478 struct niobuf_remote *remote;
479 struct osc_brw_cb_data *cb_data = NULL;
480 int rc, j, size[3] = {sizeof(*body)};
481 void *iooptr, *nioptr;
485 size[1] = sizeof(struct obd_ioobj);
486 size[2] = page_count * sizeof(*remote);
488 request = ptlrpc_prep_req(class_conn2cliimp(conn), OST_WRITE, 3, size,
493 body = lustre_msg_buf(request->rq_reqmsg, 0);
495 desc = ptlrpc_prep_bulk(connection);
497 GOTO(out_req, rc = -ENOMEM);
498 desc->bd_portal = OSC_BULK_PORTAL;
499 desc->bd_cb = brw_finish;
500 OBD_ALLOC(cb_data, sizeof(*cb_data));
502 GOTO(out_desc, rc = -ENOMEM);
504 cb_data->callback = callback;
505 cb_data->cb_data = data;
507 desc->bd_cb_data = cb_data;
509 iooptr = lustre_msg_buf(request->rq_reqmsg, 1);
510 nioptr = lustre_msg_buf(request->rq_reqmsg, 2);
511 ost_pack_ioo(&iooptr, md, page_count);
512 /* end almost identical to brw_read case */
514 OBD_ALLOC(local, page_count * sizeof(*local));
516 GOTO(out_cb, rc = -ENOMEM);
518 cb_data->obd_data = local;
519 cb_data->obd_size = page_count * sizeof(*local);
521 for (mapped = 0; mapped < page_count; mapped++) {
522 local[mapped].addr = kmap(pga[mapped].pg);
524 CDEBUG(D_INFO, "kmap(pg) = %p ; pg->flags = %lx ; pg->count = "
525 "%d ; page %d of %d\n",
526 local[mapped].addr, pga[mapped].pg->flags,
527 page_count(pga[mapped].pg),
528 mapped, page_count - 1);
530 local[mapped].offset = pga[mapped].off;
531 local[mapped].len = pga[mapped].count;
532 ost_pack_niobuf(&nioptr, pga[mapped].off, pga[mapped].count,
533 pga[mapped].flag, 0);
536 size[1] = page_count * sizeof(*remote);
537 request->rq_replen = lustre_msg_size(2, size);
538 rc = ptlrpc_queue_wait(request);
539 rc = ptlrpc_check_status(request, rc);
543 nioptr = lustre_msg_buf(request->rq_repmsg, 1);
545 GOTO(out_unmap, rc = -EINVAL);
547 if (request->rq_repmsg->buflens[1] != size[1]) {
548 CERROR("buffer length wrong (%d vs. %d)\n",
549 request->rq_repmsg->buflens[1], size[1]);
550 GOTO(out_unmap, rc = -EINVAL);
553 for (j = 0; j < page_count; j++) {
554 struct ptlrpc_bulk_page *bulk;
556 ost_unpack_niobuf(&nioptr, &remote);
558 bulk = ptlrpc_prep_bulk_page(desc);
560 GOTO(out_unmap, rc = -ENOMEM);
562 bulk->bp_buf = (void *)(unsigned long)local[j].addr;
563 bulk->bp_buflen = local[j].len;
564 bulk->bp_xid = remote->xid;
565 bulk->bp_page = pga[j].pg;
568 if (desc->bd_page_count != page_count)
571 if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_WRITE_BULK))
572 GOTO(out_unmap, rc = 0);
574 /* Our reference is released when brw_finish is complete. */
575 rc = ptlrpc_send_bulk(desc);
577 /* XXX: Mike, same question as in osc_brw_read. */
581 /* Callbacks cause asynchronous handling. */
582 rc = callback(data, 0, CB_PHASE_START);
585 ptlrpc_req_finished(request);
588 /* Clean up on error. */
591 kunmap(pga[mapped].pg);
593 OBD_FREE(local, page_count * sizeof(*local));
595 OBD_FREE(cb_data, sizeof(*cb_data));
597 ptlrpc_bulk_decref(desc);
601 static int osc_brw(int cmd, struct lustre_handle *conn,
602 struct lov_stripe_md *md, obd_count page_count,
603 struct brw_page *pga, brw_callback_t callback,
604 struct io_cb_data *data)
606 if (cmd & OBD_BRW_WRITE)
607 return osc_brw_write(conn, md, page_count, pga, callback, data);
609 return osc_brw_read(conn, md, page_count, pga, callback, data);
612 static int osc_enqueue(struct lustre_handle *connh, struct lov_stripe_md *lsm,
613 struct lustre_handle *parent_lock,
614 __u32 type, void *extentp, int extent_len, __u32 mode,
615 int *flags, void *callback, void *data, int datalen,
616 struct lustre_handle *lockh)
618 __u64 res_id[RES_NAME_SIZE] = { lsm->lsm_object_id };
619 struct obd_device *obddev = class_conn2obd(connh);
620 struct ldlm_extent *extent = extentp;
624 /* Filesystem locks are given a bit of special treatment: first we
625 * fixup the lock to start and end on page boundaries. */
626 extent->start &= PAGE_MASK;
627 extent->end = (extent->end + PAGE_SIZE - 1) & PAGE_MASK;
629 /* Next, search for already existing extent locks that will cover us */
630 //osc_con2dlmcl(conn, &cl, &connection, &rconn);
631 rc = ldlm_lock_match(obddev->obd_namespace, res_id, type, extent,
632 sizeof(extent), mode, lockh);
634 /* We already have a lock, and it's referenced */
638 /* Next, search for locks that we can upgrade (if we're trying to write)
639 * or are more than we need (if we're trying to read). Because the VFS
640 * and page cache already protect us locally, lots of readers/writers
641 * can share a single PW lock. */
647 rc = ldlm_lock_match(obddev->obd_namespace, res_id, type, extent,
648 sizeof(extent), mode2, lockh);
651 /* FIXME: This is not incredibly elegant, but it might
652 * be more elegant than adding another parameter to
653 * lock_match. I want a second opinion. */
654 ldlm_lock_addref(lockh, mode);
655 ldlm_lock_decref(lockh, mode2);
660 rc = ldlm_cli_convert(lockh, mode, &flags);
667 rc = ldlm_cli_enqueue(connh, NULL, obddev->obd_namespace,
668 parent_lock, res_id, type, extent,
669 sizeof(extent), mode, flags, ldlm_completion_ast,
670 callback, data, datalen, lockh);
674 static int osc_cancel(struct lustre_handle *oconn, struct lov_stripe_md *md,
675 __u32 mode, struct lustre_handle *lockh)
679 ldlm_lock_decref(lockh, mode);
684 static int osc_cancel_unused(struct lustre_handle *connh,
685 struct lov_stripe_md *lsm, int local)
687 struct obd_device *obddev = class_conn2obd(connh);
688 __u64 res_id[RES_NAME_SIZE] = { lsm->lsm_object_id };
690 return ldlm_cli_cancel_unused(obddev->obd_namespace, res_id, local);
693 static int osc_statfs(struct lustre_handle *conn, struct obd_statfs *osfs)
695 struct ptlrpc_request *request;
696 int rc, size = sizeof(*osfs);
699 request = ptlrpc_prep_req(class_conn2cliimp(conn), OST_STATFS, 0, NULL,
704 request->rq_replen = lustre_msg_size(1, &size);
706 rc = ptlrpc_queue_wait(request);
707 rc = ptlrpc_check_status(request, rc);
709 CERROR("%s failed: rc = %d\n", __FUNCTION__, rc);
713 obd_statfs_unpack(osfs, lustre_msg_buf(request->rq_repmsg, 0));
717 ptlrpc_free_req(request);
721 static int osc_iocontrol(long cmd, struct lustre_handle *conn, int len,
722 void *karg, void *uarg)
724 struct obd_device *obddev = class_conn2obd(conn);
725 struct obd_ioctl_data *data = karg;
729 if (_IOC_TYPE(cmd) != IOC_LDLM_TYPE ||
730 _IOC_NR(cmd) < IOC_LDLM_MIN_NR || _IOC_NR(cmd) > IOC_LDLM_MAX_NR) {
731 CDEBUG(D_IOCTL, "invalid ioctl (type %ld, nr %ld, size %ld)\n",
732 _IOC_TYPE(cmd), _IOC_NR(cmd), _IOC_SIZE(cmd));
737 case IOC_LDLM_TEST: {
738 err = ldlm_test(obddev, conn);
739 CERROR("-- done err %d\n", err);
742 case IOC_LDLM_REGRESS_START: {
743 unsigned int numthreads = 1;
744 unsigned int numheld = 10;
745 unsigned int numres = 10;
746 unsigned int numext = 10;
749 if (data->ioc_inllen1) {
750 parse = data->ioc_inlbuf1;
751 if (*parse != '\0') {
752 while(isspace(*parse)) parse++;
753 numthreads = simple_strtoul(parse, &parse, 0);
754 while(isspace(*parse)) parse++;
756 if (*parse != '\0') {
757 while(isspace(*parse)) parse++;
758 numheld = simple_strtoul(parse, &parse, 0);
759 while(isspace(*parse)) parse++;
761 if (*parse != '\0') {
762 while(isspace(*parse)) parse++;
763 numres = simple_strtoul(parse, &parse, 0);
764 while(isspace(*parse)) parse++;
766 if (*parse != '\0') {
767 while(isspace(*parse)) parse++;
768 numext = simple_strtoul(parse, &parse, 0);
769 while(isspace(*parse)) parse++;
773 err = ldlm_regression_start(obddev, conn, numthreads,
774 numheld, numres, numext);
776 CERROR("-- done err %d\n", err);
779 case IOC_LDLM_REGRESS_STOP: {
780 err = ldlm_regression_stop();
781 CERROR("-- done err %d\n", err);
785 GOTO(out, err = -EINVAL);
791 struct obd_ops osc_obd_ops = {
792 o_setup: client_obd_setup,
793 o_cleanup: client_obd_cleanup,
794 o_statfs: osc_statfs,
795 o_create: osc_create,
796 o_destroy: osc_destroy,
797 o_getattr: osc_getattr,
798 o_setattr: osc_setattr,
801 o_connect: client_obd_connect,
802 o_disconnect: client_obd_disconnect,
805 o_enqueue: osc_enqueue,
806 o_cancel: osc_cancel,
807 o_cancel_unused: osc_cancel_unused,
808 o_iocontrol: osc_iocontrol
811 static int __init osc_init(void)
813 return class_register_type(&osc_obd_ops, LUSTRE_OSC_NAME);
816 static void __exit osc_exit(void)
818 class_unregister_type(LUSTRE_OSC_NAME);
821 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
822 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC) v1.0");
823 MODULE_LICENSE("GPL");
825 module_init(osc_init);
826 module_exit(osc_exit);