1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
4 * Copyright (C) 2001, 2002 Cluster File Systems, Inc.
6 * This code is issued under the GNU General Public License.
7 * See the file COPYING in this distribution
9 * Author Peter Braam <braam@clusterfs.com>
11 * This server is single threaded at present (but can easily be multi
12 * threaded). For testing and management it is treated as an
13 * obd_device, although it does not export a full OBD method table
14 * (the requests are coming in over the wire, so object target
15 * modules do not have a full method table.)
20 #define DEBUG_SUBSYSTEM S_OSC
22 #include <linux/module.h>
23 #include <linux/lustre_dlm.h>
24 #include <linux/lustre_mds.h> /* for mds_objid */
25 #include <linux/obd_ost.h>
26 #include <linux/obd_lov.h>
27 #include <linux/init.h>
28 #include <linux/lustre_ha.h>
29 #include <linux/obd_support.h> /* for OBD_FAIL_CHECK */
30 #include <linux/lustre_lite.h> /* for ll_i2info */
32 static int osc_getattr(struct lustre_handle *conn, struct obdo *oa,
33 struct lov_stripe_md *md)
35 struct ptlrpc_request *request;
36 struct ost_body *body;
37 int rc, size = sizeof(*body);
40 request = ptlrpc_prep_req2(conn, OST_GETATTR, 1, &size, NULL);
44 body = lustre_msg_buf(request->rq_reqmsg, 0);
45 #warning FIXME: pack only valid fields instead of memcpy, endianness
46 memcpy(&body->oa, oa, sizeof(*oa));
48 request->rq_replen = lustre_msg_size(1, &size);
50 rc = ptlrpc_queue_wait(request);
51 rc = ptlrpc_check_status(request, rc);
53 CERROR("%s failed: rc = %d\n", __FUNCTION__, rc);
57 body = lustre_msg_buf(request->rq_repmsg, 0);
58 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
60 memcpy(oa, &body->oa, sizeof(*oa));
64 ptlrpc_free_req(request);
68 static int osc_open(struct lustre_handle *conn, struct obdo *oa,
69 struct lov_stripe_md *md)
71 struct ptlrpc_request *request;
72 struct ost_body *body;
73 int rc, size = sizeof(*body);
76 request = ptlrpc_prep_req2(conn, OST_OPEN, 1, &size, NULL);
80 body = lustre_msg_buf(request->rq_reqmsg, 0);
81 #warning FIXME: pack only valid fields instead of memcpy, endianness
82 memcpy(&body->oa, oa, sizeof(*oa));
84 request->rq_replen = lustre_msg_size(1, &size);
86 rc = ptlrpc_queue_wait(request);
87 rc = ptlrpc_check_status(request, rc);
91 body = lustre_msg_buf(request->rq_repmsg, 0);
92 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
94 memcpy(oa, &body->oa, sizeof(*oa));
98 ptlrpc_free_req(request);
102 static int osc_close(struct lustre_handle *conn, struct obdo *oa,
103 struct lov_stripe_md *md)
105 struct ptlrpc_request *request;
106 struct ost_body *body;
107 int rc, size = sizeof(*body);
110 request = ptlrpc_prep_req2(conn, OST_CLOSE, 1, &size, NULL);
114 body = lustre_msg_buf(request->rq_reqmsg, 0);
115 #warning FIXME: pack only valid fields instead of memcpy, endianness
116 memcpy(&body->oa, oa, sizeof(*oa));
118 request->rq_replen = lustre_msg_size(1, &size);
120 rc = ptlrpc_queue_wait(request);
121 rc = ptlrpc_check_status(request, rc);
125 body = lustre_msg_buf(request->rq_repmsg, 0);
126 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
128 memcpy(oa, &body->oa, sizeof(*oa));
132 ptlrpc_free_req(request);
136 static int osc_setattr(struct lustre_handle *conn, struct obdo *oa,
137 struct lov_stripe_md *md)
139 struct ptlrpc_request *request;
140 struct ost_body *body;
141 int rc, size = sizeof(*body);
144 request = ptlrpc_prep_req2(conn, OST_SETATTR, 1, &size, NULL);
148 body = lustre_msg_buf(request->rq_reqmsg, 0);
149 memcpy(&body->oa, oa, sizeof(*oa));
151 request->rq_replen = lustre_msg_size(1, &size);
153 rc = ptlrpc_queue_wait(request);
154 rc = ptlrpc_check_status(request, rc);
158 ptlrpc_free_req(request);
162 static int osc_create(struct lustre_handle *conn, struct obdo *oa,
163 struct lov_stripe_md **ea)
165 struct ptlrpc_request *request;
166 struct ost_body *body;
167 int rc, size = sizeof(*body);
180 OBD_ALLOC(*ea, oa->o_easize);
183 (*ea)->lmd_easize = oa->o_easize;
186 request = ptlrpc_prep_req2(conn, OST_CREATE, 1, &size, NULL);
190 body = lustre_msg_buf(request->rq_reqmsg, 0);
191 memcpy(&body->oa, oa, sizeof(*oa));
193 request->rq_replen = lustre_msg_size(1, &size);
195 rc = ptlrpc_queue_wait(request);
196 rc = ptlrpc_check_status(request, rc);
200 body = lustre_msg_buf(request->rq_repmsg, 0);
201 memcpy(oa, &body->oa, sizeof(*oa));
203 (*ea)->lmd_object_id = oa->o_id;
204 (*ea)->lmd_stripe_count = 1;
207 ptlrpc_free_req(request);
211 static int osc_punch(struct lustre_handle *conn, struct obdo *oa,
212 struct lov_stripe_md *md, obd_size start,
215 struct ptlrpc_request *request;
216 struct ost_body *body;
217 int rc, size = sizeof(*body);
225 request = ptlrpc_prep_req2(conn, OST_PUNCH, 1, &size, NULL);
229 body = lustre_msg_buf(request->rq_reqmsg, 0);
230 #warning FIXME: pack only valid fields instead of memcpy, endianness, valid
231 memcpy(&body->oa, oa, sizeof(*oa));
233 /* overload the blocks and size fields in the oa with start/end */
234 #warning FIXME: endianness, size=start, blocks=end?
235 body->oa.o_blocks = start;
236 body->oa.o_size = end;
237 body->oa.o_valid |= OBD_MD_FLBLOCKS | OBD_MD_FLSIZE;
239 request->rq_replen = lustre_msg_size(1, &size);
241 rc = ptlrpc_queue_wait(request);
242 rc = ptlrpc_check_status(request, rc);
246 body = lustre_msg_buf(request->rq_repmsg, 0);
247 memcpy(oa, &body->oa, sizeof(*oa));
251 ptlrpc_free_req(request);
255 static int osc_destroy(struct lustre_handle *conn, struct obdo *oa,
256 struct lov_stripe_md *ea)
258 struct ptlrpc_request *request;
259 struct ost_body *body;
260 int rc, size = sizeof(*body);
267 request = ptlrpc_prep_req2(conn, OST_DESTROY, 1, &size, NULL);
271 body = lustre_msg_buf(request->rq_reqmsg, 0);
272 #warning FIXME: pack only valid fields instead of memcpy, endianness
273 memcpy(&body->oa, oa, sizeof(*oa));
275 request->rq_replen = lustre_msg_size(1, &size);
277 rc = ptlrpc_queue_wait(request);
278 rc = ptlrpc_check_status(request, rc);
282 body = lustre_msg_buf(request->rq_repmsg, 0);
283 memcpy(oa, &body->oa, sizeof(*oa));
287 ptlrpc_free_req(request);
291 struct osc_brw_cb_data {
292 brw_callback_t callback;
298 /* Our bulk-unmapping bottom half. */
299 static void unmap_and_decref_bulk_desc(void *data)
301 struct ptlrpc_bulk_desc *desc = data;
302 struct list_head *tmp;
305 /* This feels wrong to me. */
306 list_for_each(tmp, &desc->b_page_list) {
307 struct ptlrpc_bulk_page *bulk;
308 bulk = list_entry(tmp, struct ptlrpc_bulk_page, b_link);
310 kunmap(bulk->b_page);
313 ptlrpc_bulk_decref(desc);
317 static void brw_finish(struct ptlrpc_bulk_desc *desc, void *data)
319 struct osc_brw_cb_data *cb_data = data;
323 if (desc->b_flags & PTL_RPC_FL_TIMEOUT) {
324 err = (desc->b_flags & PTL_RPC_FL_INTR ? -ERESTARTSYS :
328 if (cb_data->callback)
329 cb_data->callback(cb_data->cb_data, err, CB_PHASE_FINISH);
331 OBD_FREE(cb_data->obd_data, cb_data->obd_size);
332 OBD_FREE(cb_data, sizeof(*cb_data));
334 /* We can't kunmap the desc from interrupt context, so we do it from
335 * the bottom half above. */
336 INIT_TQUEUE(&desc->b_queue, 0, 0);
337 PREPARE_TQUEUE(&desc->b_queue, unmap_and_decref_bulk_desc, desc);
338 schedule_task(&desc->b_queue);
343 static int osc_brw_read(struct lustre_handle *conn, struct lov_stripe_md *md,
344 obd_count page_count, struct brw_page *pga,
345 brw_callback_t callback, struct io_cb_data *data)
347 struct ptlrpc_connection *connection = client_conn2cli(conn)->cl_conn;
348 struct ptlrpc_request *request = NULL;
349 struct ptlrpc_bulk_desc *desc = NULL;
350 struct ost_body *body;
351 struct osc_brw_cb_data *cb_data = NULL;
352 int rc, size[3] = {sizeof(*body)};
353 void *iooptr, *nioptr;
358 size[1] = sizeof(struct obd_ioobj);
359 size[2] = page_count * sizeof(struct niobuf_remote);
361 request = ptlrpc_prep_req2(conn, OST_READ, 3, size, NULL);
365 body = lustre_msg_buf(request->rq_reqmsg, 0);
367 desc = ptlrpc_prep_bulk(connection);
369 GOTO(out_req, rc = -ENOMEM);
370 desc->b_portal = OST_BULK_PORTAL;
371 desc->b_cb = brw_finish;
372 OBD_ALLOC(cb_data, sizeof(*cb_data));
374 GOTO(out_desc, rc = -ENOMEM);
376 cb_data->callback = callback;
377 cb_data->cb_data = data;
379 desc->b_cb_data = cb_data;
381 iooptr = lustre_msg_buf(request->rq_reqmsg, 1);
382 nioptr = lustre_msg_buf(request->rq_reqmsg, 2);
383 ost_pack_ioo(&iooptr, md, page_count);
384 /* end almost identical to brw_write case */
386 spin_lock(&connection->c_lock);
387 xid = ++connection->c_xid_out; /* single xid for all pages */
388 spin_unlock(&connection->c_lock);
390 for (mapped = 0; mapped < page_count; mapped++) {
391 struct ptlrpc_bulk_page *bulk = ptlrpc_prep_bulk_page(desc);
393 GOTO(out_unmap, rc = -ENOMEM);
395 bulk->b_xid = xid; /* single xid for all pages */
397 bulk->b_buf = kmap(pga[mapped].pg);
398 bulk->b_page = pga[mapped].pg;
399 bulk->b_buflen = PAGE_SIZE;
400 ost_pack_niobuf(&nioptr, pga[mapped].off, pga[mapped].count,
401 pga[mapped].flag, bulk->b_xid);
405 * Register the bulk first, because the reply could arrive out of order,
406 * and we want to be ready for the bulk data.
408 * The reference is released when brw_finish is complete.
410 * On error, we never do the brw_finish, so we handle all decrefs.
412 if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_READ_BULK)) {
413 CERROR("obd_fail_loc=%x, skipping register_bulk\n",
414 OBD_FAIL_OSC_BRW_READ_BULK);
416 rc = ptlrpc_register_bulk(desc);
421 request->rq_replen = lustre_msg_size(1, size);
422 rc = ptlrpc_queue_wait(request);
423 rc = ptlrpc_check_status(request, rc);
426 * XXX: If there is an error during the processing of the callback,
427 * such as a timeout in a sleep that it performs, brw_finish
428 * will never get called, and we'll leak the desc, fail to kunmap
429 * things, cats will live with dogs. One solution would be to
430 * export brw_finish as osc_brw_finish, so that the timeout case and
431 * its kin could call it for proper cleanup. An alternative would
432 * be for an error return from the callback to cause us to clean up,
433 * but that doesn't help the truly async cases (like LOV), which
434 * will immediately return from their PHASE_START callback, before
435 * any such cleanup-requiring error condition can be detected.
440 /* Callbacks cause asynchronous handling. */
441 rc = callback(data, 0, CB_PHASE_START);
444 ptlrpc_req_finished(request);
447 /* Clean up on error. */
450 kunmap(pga[mapped].pg);
451 OBD_FREE(cb_data, sizeof(*cb_data));
453 ptlrpc_bulk_decref(desc);
457 static int osc_brw_write(struct lustre_handle *conn, struct lov_stripe_md *md,
458 obd_count page_count, struct brw_page *pga,
459 brw_callback_t callback, struct io_cb_data *data)
461 struct ptlrpc_connection *connection = client_conn2cli(conn)->cl_conn;
462 struct ptlrpc_request *request = NULL;
463 struct ptlrpc_bulk_desc *desc = NULL;
464 struct ost_body *body;
465 struct niobuf_local *local = NULL;
466 struct niobuf_remote *remote;
467 struct osc_brw_cb_data *cb_data = NULL;
468 int rc, j, size[3] = {sizeof(*body)};
469 void *iooptr, *nioptr;
473 size[1] = sizeof(struct obd_ioobj);
474 size[2] = page_count * sizeof(*remote);
476 request = ptlrpc_prep_req2(conn, OST_WRITE, 3, size, NULL);
480 body = lustre_msg_buf(request->rq_reqmsg, 0);
482 desc = ptlrpc_prep_bulk(connection);
484 GOTO(out_req, rc = -ENOMEM);
485 desc->b_portal = OSC_BULK_PORTAL;
486 desc->b_cb = brw_finish;
487 OBD_ALLOC(cb_data, sizeof(*cb_data));
489 GOTO(out_desc, rc = -ENOMEM);
491 cb_data->callback = callback;
492 cb_data->cb_data = data;
494 desc->b_cb_data = cb_data;
496 iooptr = lustre_msg_buf(request->rq_reqmsg, 1);
497 nioptr = lustre_msg_buf(request->rq_reqmsg, 2);
498 ost_pack_ioo(&iooptr, md, page_count);
499 /* end almost identical to brw_read case */
501 OBD_ALLOC(local, page_count * sizeof(*local));
503 GOTO(out_cb, rc = -ENOMEM);
505 cb_data->obd_data = local;
506 cb_data->obd_size = page_count * sizeof(*local);
508 for (mapped = 0; mapped < page_count; mapped++) {
509 local[mapped].addr = kmap(pga[mapped].pg);
510 local[mapped].offset = pga[mapped].off;
511 local[mapped].len = pga[mapped].count;
512 ost_pack_niobuf(&nioptr, pga[mapped].off, pga[mapped].count,
513 pga[mapped].flag, 0);
516 size[1] = page_count * sizeof(*remote);
517 request->rq_replen = lustre_msg_size(2, size);
518 rc = ptlrpc_queue_wait(request);
519 rc = ptlrpc_check_status(request, rc);
523 nioptr = lustre_msg_buf(request->rq_repmsg, 1);
525 GOTO(out_unmap, rc = -EINVAL);
527 if (request->rq_repmsg->buflens[1] != size[1]) {
528 CERROR("buffer length wrong (%d vs. %d)\n",
529 request->rq_repmsg->buflens[1], size[1]);
530 GOTO(out_unmap, rc = -EINVAL);
533 for (j = 0; j < page_count; j++) {
534 struct ptlrpc_bulk_page *bulk;
536 ost_unpack_niobuf(&nioptr, &remote);
538 bulk = ptlrpc_prep_bulk_page(desc);
540 GOTO(out_unmap, rc = -ENOMEM);
542 bulk->b_buf = (void *)(unsigned long)local[j].addr;
543 bulk->b_buflen = local[j].len;
544 bulk->b_xid = remote->xid;
545 bulk->b_page = pga[j].pg;
548 if (desc->b_page_count != page_count)
551 if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_WRITE_BULK))
552 GOTO(out_unmap, rc = 0);
554 /* Our reference is released when brw_finish is complete. */
555 rc = ptlrpc_send_bulk(desc);
557 /* XXX: Mike, same question as in osc_brw_read. */
561 /* Callbacks cause asynchronous handling. */
562 rc = callback(data, 0, CB_PHASE_START);
565 ptlrpc_req_finished(request);
568 /* Clean up on error. */
571 kunmap(pga[mapped].pg);
573 OBD_FREE(local, page_count * sizeof(*local));
575 OBD_FREE(cb_data, sizeof(*cb_data));
577 ptlrpc_bulk_decref(desc);
581 static int osc_brw(int cmd, struct lustre_handle *conn,
582 struct lov_stripe_md *md, obd_count page_count,
583 struct brw_page *pga, brw_callback_t callback,
584 struct io_cb_data *data)
586 if (cmd & OBD_BRW_WRITE)
587 return osc_brw_write(conn, md, page_count, pga, callback, data);
589 return osc_brw_read(conn, md, page_count, pga, callback, data);
592 static int osc_enqueue(struct lustre_handle *connh, struct lov_stripe_md *md,
593 struct lustre_handle *parent_lock,
594 __u32 type, void *extentp, int extent_len, __u32 mode,
595 int *flags, void *callback, void *data, int datalen,
596 struct lustre_handle *lockh)
598 __u64 res_id[RES_NAME_SIZE] = { md->lmd_object_id };
599 struct obd_device *obddev = class_conn2obd(connh);
600 struct ldlm_extent *extent = extentp;
601 struct ldlm_lock *lock;
602 struct inode *inode = data;
603 struct ll_inode_info *lli = ll_i2info(inode);
607 /* Filesystem locks are given a bit of special treatment: first we
608 * fixup the lock to start and end on page boundaries. */
609 extent->start &= PAGE_MASK;
610 extent->end = (extent->end + PAGE_SIZE - 1) & PAGE_MASK;
612 /* Next, search for already existing extent locks that will cover us */
613 //osc_con2dlmcl(conn, &cl, &connection, &rconn);
614 rc = ldlm_lock_match(obddev->obd_namespace, res_id, type, extent,
615 sizeof(extent), mode, lockh);
617 /* We already have a lock, and it's referenced */
621 /* Next, search for locks that we can upgrade (if we're trying to write)
622 * or are more than we need (if we're trying to read). Because the VFS
623 * and page cache already protect us locally, lots of readers/writers
624 * can share a single PW lock. */
630 rc = ldlm_lock_match(obddev->obd_namespace, res_id, type, extent,
631 sizeof(extent), mode2, lockh);
634 /* FIXME: This is not incredibly elegant, but it might
635 * be more elegant than adding another parameter to
636 * lock_match. I want a second opinion. */
637 ldlm_lock_addref(lockh, mode);
638 ldlm_lock_decref(lockh, mode2);
643 rc = ldlm_cli_convert(lockh, mode, &flags);
650 rc = ldlm_cli_enqueue(connh, NULL, obddev->obd_namespace,
651 parent_lock, res_id, type, extent,
652 sizeof(extent), mode, flags, ldlm_completion_ast,
653 callback, data, datalen, lockh);
657 /* This code must change if we ever stop passing an inode in as data */
658 /* This is ldlm and llite code. It makes me sad that it's in
659 * osc_request.c --phil */
660 lock = ldlm_handle2lock(lockh);
662 /* Lock already has an extra ref from handle2lock */
663 l_lock(&obddev->obd_namespace->ns_lock);
664 list_add(&lock->l_inode_link, &lli->lli_osc_locks);
665 l_unlock(&obddev->obd_namespace->ns_lock);
671 static int osc_cancel(struct lustre_handle *oconn, struct lov_stripe_md *md,
672 __u32 mode, struct lustre_handle *lockh)
676 ldlm_lock_decref(lockh, mode);
681 static int osc_statfs(struct lustre_handle *conn, struct statfs *sfs)
683 struct ptlrpc_request *request;
684 struct obd_statfs *osfs;
685 int rc, size = sizeof(*osfs);
688 request = ptlrpc_prep_req2(conn, OST_STATFS, 0, NULL, NULL);
692 request->rq_replen = lustre_msg_size(1, &size);
694 rc = ptlrpc_queue_wait(request);
695 rc = ptlrpc_check_status(request, rc);
697 CERROR("%s failed: rc = %d\n", __FUNCTION__, rc);
701 osfs = lustre_msg_buf(request->rq_repmsg, 0);
702 obd_statfs_unpack(osfs, sfs);
706 ptlrpc_free_req(request);
710 static int osc_iocontrol(long cmd, struct lustre_handle *conn, int len,
711 void *karg, void *uarg)
713 struct obd_device *obddev = class_conn2obd(conn);
714 struct obd_ioctl_data *data = karg;
718 if (_IOC_TYPE(cmd) != IOC_LDLM_TYPE || _IOC_NR(cmd) <
719 IOC_LDLM_MIN_NR || _IOC_NR(cmd) > IOC_LDLM_MAX_NR) {
720 CDEBUG(D_IOCTL, "invalid ioctl (type %ld, nr %ld, size %ld)\n",
721 _IOC_TYPE(cmd), _IOC_NR(cmd), _IOC_SIZE(cmd));
726 case IOC_LDLM_TEST: {
727 err = ldlm_test(obddev, conn);
728 CERROR("-- done err %d\n", err);
731 case IOC_LDLM_REGRESS_START: {
732 unsigned int numthreads;
734 if (data->ioc_inllen1)
735 numthreads = simple_strtoul(data->ioc_inlbuf1, NULL, 0);
739 err = ldlm_regression_start(obddev, conn, numthreads);
740 CERROR("-- done err %d\n", err);
743 case IOC_LDLM_REGRESS_STOP: {
744 err = ldlm_regression_stop();
745 CERROR("-- done err %d\n", err);
749 GOTO(out, err = -EINVAL);
755 struct obd_ops osc_obd_ops = {
756 o_setup: client_obd_setup,
757 o_cleanup: client_obd_cleanup,
758 o_statfs: osc_statfs,
759 o_create: osc_create,
760 o_destroy: osc_destroy,
761 o_getattr: osc_getattr,
762 o_setattr: osc_setattr,
765 o_connect: client_obd_connect,
766 o_disconnect: client_obd_disconnect,
769 o_enqueue: osc_enqueue,
770 o_cancel: osc_cancel,
771 o_iocontrol: osc_iocontrol
774 static int __init osc_init(void)
776 return class_register_type(&osc_obd_ops, LUSTRE_OSC_NAME);
779 static void __exit osc_exit(void)
781 class_unregister_type(LUSTRE_OSC_NAME);
784 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
785 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC) v1.0");
786 MODULE_LICENSE("GPL");
788 module_init(osc_init);
789 module_exit(osc_exit);