1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
4 * Copyright (C) 2001, 2002 Cluster File Systems, Inc.
6 * This code is issued under the GNU General Public License.
7 * See the file COPYING in this distribution
9 * Author Peter Braam <braam@clusterfs.com>
11 * This server is single threaded at present (but can easily be multi
12 * threaded). For testing and management it is treated as an
13 * obd_device, although it does not export a full OBD method table
14 * (the requests are coming in over the wire, so object target
15 * modules do not have a full method table.)
20 #define DEBUG_SUBSYSTEM S_OSC
22 #include <linux/module.h>
23 #include <linux/lustre_dlm.h>
24 #include <linux/lustre_mds.h> /* for mds_objid */
25 #include <linux/obd_ost.h>
26 #include <linux/obd_lov.h>
27 #include <linux/ctype.h>
28 #include <linux/init.h>
29 #include <linux/lustre_ha.h>
30 #include <linux/obd_support.h> /* for OBD_FAIL_CHECK */
31 #include <linux/lustre_lite.h> /* for ll_i2info */
33 static int osc_getattr(struct lustre_handle *conn, struct obdo *oa,
34 struct lov_stripe_md *md)
36 struct ptlrpc_request *request;
37 struct ost_body *body;
38 int rc, size = sizeof(*body);
41 request = ptlrpc_prep_req2(conn, OST_GETATTR, 1, &size, NULL);
45 body = lustre_msg_buf(request->rq_reqmsg, 0);
46 #warning FIXME: pack only valid fields instead of memcpy, endianness
47 memcpy(&body->oa, oa, sizeof(*oa));
49 request->rq_replen = lustre_msg_size(1, &size);
51 rc = ptlrpc_queue_wait(request);
52 rc = ptlrpc_check_status(request, rc);
54 CERROR("%s failed: rc = %d\n", __FUNCTION__, rc);
58 body = lustre_msg_buf(request->rq_repmsg, 0);
59 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
61 memcpy(oa, &body->oa, sizeof(*oa));
65 ptlrpc_free_req(request);
69 static int osc_open(struct lustre_handle *conn, struct obdo *oa,
70 struct lov_stripe_md *md)
72 struct ptlrpc_request *request;
73 struct ost_body *body;
74 int rc, size = sizeof(*body);
77 request = ptlrpc_prep_req2(conn, OST_OPEN, 1, &size, NULL);
81 body = lustre_msg_buf(request->rq_reqmsg, 0);
82 #warning FIXME: pack only valid fields instead of memcpy, endianness
83 memcpy(&body->oa, oa, sizeof(*oa));
85 request->rq_replen = lustre_msg_size(1, &size);
87 rc = ptlrpc_queue_wait(request);
88 rc = ptlrpc_check_status(request, rc);
92 body = lustre_msg_buf(request->rq_repmsg, 0);
93 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
95 memcpy(oa, &body->oa, sizeof(*oa));
99 ptlrpc_free_req(request);
103 static int osc_close(struct lustre_handle *conn, struct obdo *oa,
104 struct lov_stripe_md *md)
106 struct ptlrpc_request *request;
107 struct ost_body *body;
108 int rc, size = sizeof(*body);
111 request = ptlrpc_prep_req2(conn, OST_CLOSE, 1, &size, NULL);
115 body = lustre_msg_buf(request->rq_reqmsg, 0);
116 #warning FIXME: pack only valid fields instead of memcpy, endianness
117 memcpy(&body->oa, oa, sizeof(*oa));
119 request->rq_replen = lustre_msg_size(1, &size);
121 rc = ptlrpc_queue_wait(request);
122 rc = ptlrpc_check_status(request, rc);
126 body = lustre_msg_buf(request->rq_repmsg, 0);
127 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
129 memcpy(oa, &body->oa, sizeof(*oa));
133 ptlrpc_free_req(request);
137 static int osc_setattr(struct lustre_handle *conn, struct obdo *oa,
138 struct lov_stripe_md *md)
140 struct ptlrpc_request *request;
141 struct ost_body *body;
142 int rc, size = sizeof(*body);
145 request = ptlrpc_prep_req2(conn, OST_SETATTR, 1, &size, NULL);
149 body = lustre_msg_buf(request->rq_reqmsg, 0);
150 memcpy(&body->oa, oa, sizeof(*oa));
152 request->rq_replen = lustre_msg_size(1, &size);
154 rc = ptlrpc_queue_wait(request);
155 rc = ptlrpc_check_status(request, rc);
159 ptlrpc_free_req(request);
163 static int osc_create(struct lustre_handle *conn, struct obdo *oa,
164 struct lov_stripe_md **ea)
166 struct ptlrpc_request *request;
167 struct ost_body *body;
168 int rc, size = sizeof(*body);
181 OBD_ALLOC(*ea, oa->o_easize);
184 (*ea)->lmd_easize = oa->o_easize;
187 request = ptlrpc_prep_req2(conn, OST_CREATE, 1, &size, NULL);
191 body = lustre_msg_buf(request->rq_reqmsg, 0);
192 memcpy(&body->oa, oa, sizeof(*oa));
194 request->rq_replen = lustre_msg_size(1, &size);
196 rc = ptlrpc_queue_wait(request);
197 rc = ptlrpc_check_status(request, rc);
201 body = lustre_msg_buf(request->rq_repmsg, 0);
202 memcpy(oa, &body->oa, sizeof(*oa));
204 (*ea)->lmd_object_id = oa->o_id;
205 (*ea)->lmd_stripe_count = 1;
208 ptlrpc_free_req(request);
212 static int osc_punch(struct lustre_handle *conn, struct obdo *oa,
213 struct lov_stripe_md *md, obd_size start,
216 struct ptlrpc_request *request;
217 struct ost_body *body;
218 int rc, size = sizeof(*body);
226 request = ptlrpc_prep_req2(conn, OST_PUNCH, 1, &size, NULL);
230 body = lustre_msg_buf(request->rq_reqmsg, 0);
231 #warning FIXME: pack only valid fields instead of memcpy, endianness, valid
232 memcpy(&body->oa, oa, sizeof(*oa));
234 /* overload the blocks and size fields in the oa with start/end */
235 #warning FIXME: endianness, size=start, blocks=end?
236 body->oa.o_blocks = start;
237 body->oa.o_size = end;
238 body->oa.o_valid |= OBD_MD_FLBLOCKS | OBD_MD_FLSIZE;
240 request->rq_replen = lustre_msg_size(1, &size);
242 rc = ptlrpc_queue_wait(request);
243 rc = ptlrpc_check_status(request, rc);
247 body = lustre_msg_buf(request->rq_repmsg, 0);
248 memcpy(oa, &body->oa, sizeof(*oa));
252 ptlrpc_free_req(request);
256 static int osc_destroy(struct lustre_handle *conn, struct obdo *oa,
257 struct lov_stripe_md *ea)
259 struct ptlrpc_request *request;
260 struct ost_body *body;
261 int rc, size = sizeof(*body);
268 request = ptlrpc_prep_req2(conn, OST_DESTROY, 1, &size, NULL);
272 body = lustre_msg_buf(request->rq_reqmsg, 0);
273 #warning FIXME: pack only valid fields instead of memcpy, endianness
274 memcpy(&body->oa, oa, sizeof(*oa));
276 request->rq_replen = lustre_msg_size(1, &size);
278 rc = ptlrpc_queue_wait(request);
279 rc = ptlrpc_check_status(request, rc);
283 body = lustre_msg_buf(request->rq_repmsg, 0);
284 memcpy(oa, &body->oa, sizeof(*oa));
288 ptlrpc_free_req(request);
292 struct osc_brw_cb_data {
293 brw_callback_t callback;
299 /* Our bulk-unmapping bottom half. */
300 static void unmap_and_decref_bulk_desc(void *data)
302 struct ptlrpc_bulk_desc *desc = data;
303 struct list_head *tmp;
306 /* This feels wrong to me. */
307 list_for_each(tmp, &desc->b_page_list) {
308 struct ptlrpc_bulk_page *bulk;
309 bulk = list_entry(tmp, struct ptlrpc_bulk_page, b_link);
311 kunmap(bulk->b_page);
314 ptlrpc_bulk_decref(desc);
318 static void brw_finish(struct ptlrpc_bulk_desc *desc, void *data)
320 struct osc_brw_cb_data *cb_data = data;
324 if (desc->b_flags & PTL_RPC_FL_TIMEOUT) {
325 err = (desc->b_flags & PTL_RPC_FL_INTR ? -ERESTARTSYS :
329 if (cb_data->callback)
330 cb_data->callback(cb_data->cb_data, err, CB_PHASE_FINISH);
332 OBD_FREE(cb_data->obd_data, cb_data->obd_size);
333 OBD_FREE(cb_data, sizeof(*cb_data));
335 /* We can't kunmap the desc from interrupt context, so we do it from
336 * the bottom half above. */
337 INIT_TQUEUE(&desc->b_queue, 0, 0);
338 PREPARE_TQUEUE(&desc->b_queue, unmap_and_decref_bulk_desc, desc);
339 schedule_task(&desc->b_queue);
344 static int osc_brw_read(struct lustre_handle *conn, struct lov_stripe_md *md,
345 obd_count page_count, struct brw_page *pga,
346 brw_callback_t callback, struct io_cb_data *data)
348 struct ptlrpc_connection *connection = client_conn2cli(conn)->cl_conn;
349 struct ptlrpc_request *request = NULL;
350 struct ptlrpc_bulk_desc *desc = NULL;
351 struct ost_body *body;
352 struct osc_brw_cb_data *cb_data = NULL;
353 int rc, size[3] = {sizeof(*body)};
354 void *iooptr, *nioptr;
359 size[1] = sizeof(struct obd_ioobj);
360 size[2] = page_count * sizeof(struct niobuf_remote);
362 request = ptlrpc_prep_req2(conn, OST_READ, 3, size, NULL);
366 body = lustre_msg_buf(request->rq_reqmsg, 0);
368 desc = ptlrpc_prep_bulk(connection);
370 GOTO(out_req, rc = -ENOMEM);
371 desc->b_portal = OST_BULK_PORTAL;
372 desc->b_cb = brw_finish;
373 OBD_ALLOC(cb_data, sizeof(*cb_data));
375 GOTO(out_desc, rc = -ENOMEM);
377 cb_data->callback = callback;
378 cb_data->cb_data = data;
380 desc->b_cb_data = cb_data;
382 iooptr = lustre_msg_buf(request->rq_reqmsg, 1);
383 nioptr = lustre_msg_buf(request->rq_reqmsg, 2);
384 ost_pack_ioo(&iooptr, md, page_count);
385 /* end almost identical to brw_write case */
387 spin_lock(&connection->c_lock);
388 xid = ++connection->c_xid_out; /* single xid for all pages */
389 spin_unlock(&connection->c_lock);
391 for (mapped = 0; mapped < page_count; mapped++) {
392 struct ptlrpc_bulk_page *bulk = ptlrpc_prep_bulk_page(desc);
394 GOTO(out_unmap, rc = -ENOMEM);
396 bulk->b_xid = xid; /* single xid for all pages */
398 bulk->b_buf = kmap(pga[mapped].pg);
399 bulk->b_page = pga[mapped].pg;
400 bulk->b_buflen = PAGE_SIZE;
401 ost_pack_niobuf(&nioptr, pga[mapped].off, pga[mapped].count,
402 pga[mapped].flag, bulk->b_xid);
406 * Register the bulk first, because the reply could arrive out of order,
407 * and we want to be ready for the bulk data.
409 * The reference is released when brw_finish is complete.
411 * On error, we never do the brw_finish, so we handle all decrefs.
413 if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_READ_BULK)) {
414 CERROR("obd_fail_loc=%x, skipping register_bulk\n",
415 OBD_FAIL_OSC_BRW_READ_BULK);
417 rc = ptlrpc_register_bulk(desc);
422 request->rq_replen = lustre_msg_size(1, size);
423 rc = ptlrpc_queue_wait(request);
424 rc = ptlrpc_check_status(request, rc);
427 * XXX: If there is an error during the processing of the callback,
428 * such as a timeout in a sleep that it performs, brw_finish
429 * will never get called, and we'll leak the desc, fail to kunmap
430 * things, cats will live with dogs. One solution would be to
431 * export brw_finish as osc_brw_finish, so that the timeout case and
432 * its kin could call it for proper cleanup. An alternative would
433 * be for an error return from the callback to cause us to clean up,
434 * but that doesn't help the truly async cases (like LOV), which
435 * will immediately return from their PHASE_START callback, before
436 * any such cleanup-requiring error condition can be detected.
441 /* Callbacks cause asynchronous handling. */
442 rc = callback(data, 0, CB_PHASE_START);
445 ptlrpc_req_finished(request);
448 /* Clean up on error. */
451 kunmap(pga[mapped].pg);
452 OBD_FREE(cb_data, sizeof(*cb_data));
454 ptlrpc_bulk_decref(desc);
458 static int osc_brw_write(struct lustre_handle *conn, struct lov_stripe_md *md,
459 obd_count page_count, struct brw_page *pga,
460 brw_callback_t callback, struct io_cb_data *data)
462 struct ptlrpc_connection *connection = client_conn2cli(conn)->cl_conn;
463 struct ptlrpc_request *request = NULL;
464 struct ptlrpc_bulk_desc *desc = NULL;
465 struct ost_body *body;
466 struct niobuf_local *local = NULL;
467 struct niobuf_remote *remote;
468 struct osc_brw_cb_data *cb_data = NULL;
469 int rc, j, size[3] = {sizeof(*body)};
470 void *iooptr, *nioptr;
474 size[1] = sizeof(struct obd_ioobj);
475 size[2] = page_count * sizeof(*remote);
477 request = ptlrpc_prep_req2(conn, OST_WRITE, 3, size, NULL);
481 body = lustre_msg_buf(request->rq_reqmsg, 0);
483 desc = ptlrpc_prep_bulk(connection);
485 GOTO(out_req, rc = -ENOMEM);
486 desc->b_portal = OSC_BULK_PORTAL;
487 desc->b_cb = brw_finish;
488 OBD_ALLOC(cb_data, sizeof(*cb_data));
490 GOTO(out_desc, rc = -ENOMEM);
492 cb_data->callback = callback;
493 cb_data->cb_data = data;
495 desc->b_cb_data = cb_data;
497 iooptr = lustre_msg_buf(request->rq_reqmsg, 1);
498 nioptr = lustre_msg_buf(request->rq_reqmsg, 2);
499 ost_pack_ioo(&iooptr, md, page_count);
500 /* end almost identical to brw_read case */
502 OBD_ALLOC(local, page_count * sizeof(*local));
504 GOTO(out_cb, rc = -ENOMEM);
506 cb_data->obd_data = local;
507 cb_data->obd_size = page_count * sizeof(*local);
509 for (mapped = 0; mapped < page_count; mapped++) {
510 local[mapped].addr = kmap(pga[mapped].pg);
511 local[mapped].offset = pga[mapped].off;
512 local[mapped].len = pga[mapped].count;
513 ost_pack_niobuf(&nioptr, pga[mapped].off, pga[mapped].count,
514 pga[mapped].flag, 0);
517 size[1] = page_count * sizeof(*remote);
518 request->rq_replen = lustre_msg_size(2, size);
519 rc = ptlrpc_queue_wait(request);
520 rc = ptlrpc_check_status(request, rc);
524 nioptr = lustre_msg_buf(request->rq_repmsg, 1);
526 GOTO(out_unmap, rc = -EINVAL);
528 if (request->rq_repmsg->buflens[1] != size[1]) {
529 CERROR("buffer length wrong (%d vs. %d)\n",
530 request->rq_repmsg->buflens[1], size[1]);
531 GOTO(out_unmap, rc = -EINVAL);
534 for (j = 0; j < page_count; j++) {
535 struct ptlrpc_bulk_page *bulk;
537 ost_unpack_niobuf(&nioptr, &remote);
539 bulk = ptlrpc_prep_bulk_page(desc);
541 GOTO(out_unmap, rc = -ENOMEM);
543 bulk->b_buf = (void *)(unsigned long)local[j].addr;
544 bulk->b_buflen = local[j].len;
545 bulk->b_xid = remote->xid;
546 bulk->b_page = pga[j].pg;
549 if (desc->b_page_count != page_count)
552 if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_WRITE_BULK))
553 GOTO(out_unmap, rc = 0);
555 /* Our reference is released when brw_finish is complete. */
556 rc = ptlrpc_send_bulk(desc);
558 /* XXX: Mike, same question as in osc_brw_read. */
562 /* Callbacks cause asynchronous handling. */
563 rc = callback(data, 0, CB_PHASE_START);
566 ptlrpc_req_finished(request);
569 /* Clean up on error. */
572 kunmap(pga[mapped].pg);
574 OBD_FREE(local, page_count * sizeof(*local));
576 OBD_FREE(cb_data, sizeof(*cb_data));
578 ptlrpc_bulk_decref(desc);
582 static int osc_brw(int cmd, struct lustre_handle *conn,
583 struct lov_stripe_md *md, obd_count page_count,
584 struct brw_page *pga, brw_callback_t callback,
585 struct io_cb_data *data)
587 if (cmd & OBD_BRW_WRITE)
588 return osc_brw_write(conn, md, page_count, pga, callback, data);
590 return osc_brw_read(conn, md, page_count, pga, callback, data);
593 static int osc_enqueue(struct lustre_handle *connh, struct lov_stripe_md *md,
594 struct lustre_handle *parent_lock,
595 __u32 type, void *extentp, int extent_len, __u32 mode,
596 int *flags, void *callback, void *data, int datalen,
597 struct lustre_handle *lockh)
599 __u64 res_id[RES_NAME_SIZE] = { md->lmd_object_id };
600 struct obd_device *obddev = class_conn2obd(connh);
601 struct ldlm_extent *extent = extentp;
605 /* Filesystem locks are given a bit of special treatment: first we
606 * fixup the lock to start and end on page boundaries. */
607 extent->start &= PAGE_MASK;
608 extent->end = (extent->end + PAGE_SIZE - 1) & PAGE_MASK;
610 /* Next, search for already existing extent locks that will cover us */
611 //osc_con2dlmcl(conn, &cl, &connection, &rconn);
612 rc = ldlm_lock_match(obddev->obd_namespace, res_id, type, extent,
613 sizeof(extent), mode, lockh);
615 /* We already have a lock, and it's referenced */
619 /* Next, search for locks that we can upgrade (if we're trying to write)
620 * or are more than we need (if we're trying to read). Because the VFS
621 * and page cache already protect us locally, lots of readers/writers
622 * can share a single PW lock. */
628 rc = ldlm_lock_match(obddev->obd_namespace, res_id, type, extent,
629 sizeof(extent), mode2, lockh);
632 /* FIXME: This is not incredibly elegant, but it might
633 * be more elegant than adding another parameter to
634 * lock_match. I want a second opinion. */
635 ldlm_lock_addref(lockh, mode);
636 ldlm_lock_decref(lockh, mode2);
641 rc = ldlm_cli_convert(lockh, mode, &flags);
648 rc = ldlm_cli_enqueue(connh, NULL, obddev->obd_namespace,
649 parent_lock, res_id, type, extent,
650 sizeof(extent), mode, flags, ldlm_completion_ast,
651 callback, data, datalen, lockh);
655 static int osc_cancel(struct lustre_handle *oconn, struct lov_stripe_md *md,
656 __u32 mode, struct lustre_handle *lockh)
660 ldlm_lock_decref(lockh, mode);
665 static int osc_statfs(struct lustre_handle *conn, struct statfs *sfs)
667 struct ptlrpc_request *request;
668 struct obd_statfs *osfs;
669 int rc, size = sizeof(*osfs);
672 request = ptlrpc_prep_req2(conn, OST_STATFS, 0, NULL, NULL);
676 request->rq_replen = lustre_msg_size(1, &size);
678 rc = ptlrpc_queue_wait(request);
679 rc = ptlrpc_check_status(request, rc);
681 CERROR("%s failed: rc = %d\n", __FUNCTION__, rc);
685 osfs = lustre_msg_buf(request->rq_repmsg, 0);
686 obd_statfs_unpack(osfs, sfs);
690 ptlrpc_free_req(request);
694 static int osc_iocontrol(long cmd, struct lustre_handle *conn, int len,
695 void *karg, void *uarg)
697 struct obd_device *obddev = class_conn2obd(conn);
698 struct obd_ioctl_data *data = karg;
702 if (_IOC_TYPE(cmd) != IOC_LDLM_TYPE || _IOC_NR(cmd) <
703 IOC_LDLM_MIN_NR || _IOC_NR(cmd) > IOC_LDLM_MAX_NR) {
704 CDEBUG(D_IOCTL, "invalid ioctl (type %ld, nr %ld, size %ld)\n",
705 _IOC_TYPE(cmd), _IOC_NR(cmd), _IOC_SIZE(cmd));
710 case IOC_LDLM_TEST: {
711 err = ldlm_test(obddev, conn);
712 CERROR("-- done err %d\n", err);
715 case IOC_LDLM_REGRESS_START: {
716 unsigned int numthreads = 1;
717 unsigned int numheld = 10;
718 unsigned int numres = 10;
719 unsigned int numext = 10;
722 if (data->ioc_inllen1) {
723 parse = data->ioc_inlbuf1;
724 if (*parse != '\0') {
725 while(isspace(*parse)) parse++;
726 numthreads = simple_strtoul(parse, &parse, 0);
727 while(isspace(*parse)) parse++;
729 if (*parse != '\0') {
730 while(isspace(*parse)) parse++;
731 numheld = simple_strtoul(parse, &parse, 0);
732 while(isspace(*parse)) parse++;
734 if (*parse != '\0') {
735 while(isspace(*parse)) parse++;
736 numres = simple_strtoul(parse, &parse, 0);
737 while(isspace(*parse)) parse++;
739 if (*parse != '\0') {
740 while(isspace(*parse)) parse++;
741 numext = simple_strtoul(parse, &parse, 0);
742 while(isspace(*parse)) parse++;
746 err = ldlm_regression_start(obddev, conn, numthreads,
747 numheld, numres, numext);
749 CERROR("-- done err %d\n", err);
752 case IOC_LDLM_REGRESS_STOP: {
753 err = ldlm_regression_stop();
754 CERROR("-- done err %d\n", err);
758 GOTO(out, err = -EINVAL);
764 struct obd_ops osc_obd_ops = {
765 o_setup: client_obd_setup,
766 o_cleanup: client_obd_cleanup,
767 o_statfs: osc_statfs,
768 o_create: osc_create,
769 o_destroy: osc_destroy,
770 o_getattr: osc_getattr,
771 o_setattr: osc_setattr,
774 o_connect: client_obd_connect,
775 o_disconnect: client_obd_disconnect,
778 o_enqueue: osc_enqueue,
779 o_cancel: osc_cancel,
780 o_iocontrol: osc_iocontrol
783 static int __init osc_init(void)
785 return class_register_type(&osc_obd_ops, LUSTRE_OSC_NAME);
788 static void __exit osc_exit(void)
790 class_unregister_type(LUSTRE_OSC_NAME);
793 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
794 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC) v1.0");
795 MODULE_LICENSE("GPL");
797 module_init(osc_init);
798 module_exit(osc_exit);