1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
4 * Copyright (C) 2001, 2002 Cluster File Systems, Inc.
6 * This code is issued under the GNU General Public License.
7 * See the file COPYING in this distribution
9 * Author Peter Braam <braam@clusterfs.com>
11 * This server is single threaded at present (but can easily be multi
12 * threaded). For testing and management it is treated as an
13 * obd_device, although it does not export a full OBD method table
14 * (the requests are coming in over the wire, so object target
15 * modules do not have a full method table.)
20 #define DEBUG_SUBSYSTEM S_OSC
22 #include <linux/module.h>
23 #include <linux/lustre_dlm.h>
24 #include <linux/lustre_mds.h> /* for mds_objid */
25 #include <linux/obd_ost.h>
26 #include <linux/obd_lov.h>
27 #include <linux/init.h>
28 #include <linux/lustre_ha.h>
30 static int osc_getattr(struct lustre_handle *conn, struct obdo *oa,
31 struct lov_stripe_md *md)
33 struct ptlrpc_request *request;
34 struct ost_body *body;
35 int rc, size = sizeof(*body);
38 request = ptlrpc_prep_req2(conn, OST_GETATTR, 1, &size, NULL);
42 body = lustre_msg_buf(request->rq_reqmsg, 0);
43 #warning FIXME: pack only valid fields instead of memcpy, endianness
44 memcpy(&body->oa, oa, sizeof(*oa));
46 request->rq_replen = lustre_msg_size(1, &size);
48 rc = ptlrpc_queue_wait(request);
49 rc = ptlrpc_check_status(request, rc);
51 CERROR("%s failed: rc = %d\n", __FUNCTION__, rc);
55 body = lustre_msg_buf(request->rq_repmsg, 0);
56 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
58 memcpy(oa, &body->oa, sizeof(*oa));
62 ptlrpc_free_req(request);
66 static int osc_open(struct lustre_handle *conn, struct obdo *oa,
67 struct lov_stripe_md *md)
69 struct ptlrpc_request *request;
70 struct ost_body *body;
71 int rc, size = sizeof(*body);
74 request = ptlrpc_prep_req2(conn, OST_OPEN, 1, &size, NULL);
78 body = lustre_msg_buf(request->rq_reqmsg, 0);
79 #warning FIXME: pack only valid fields instead of memcpy, endianness
80 memcpy(&body->oa, oa, sizeof(*oa));
82 request->rq_replen = lustre_msg_size(1, &size);
84 rc = ptlrpc_queue_wait(request);
85 rc = ptlrpc_check_status(request, rc);
89 body = lustre_msg_buf(request->rq_repmsg, 0);
90 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
92 memcpy(oa, &body->oa, sizeof(*oa));
96 ptlrpc_free_req(request);
100 static int osc_close(struct lustre_handle *conn, struct obdo *oa,
101 struct lov_stripe_md *md)
103 struct ptlrpc_request *request;
104 struct ost_body *body;
105 int rc, size = sizeof(*body);
108 request = ptlrpc_prep_req2(conn, OST_CLOSE, 1, &size, NULL);
112 body = lustre_msg_buf(request->rq_reqmsg, 0);
113 #warning FIXME: pack only valid fields instead of memcpy, endianness
114 memcpy(&body->oa, oa, sizeof(*oa));
116 request->rq_replen = lustre_msg_size(1, &size);
118 rc = ptlrpc_queue_wait(request);
119 rc = ptlrpc_check_status(request, rc);
123 body = lustre_msg_buf(request->rq_repmsg, 0);
124 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
126 memcpy(oa, &body->oa, sizeof(*oa));
130 ptlrpc_free_req(request);
134 static int osc_setattr(struct lustre_handle *conn, struct obdo *oa,
135 struct lov_stripe_md *md)
137 struct ptlrpc_request *request;
138 struct ost_body *body;
139 int rc, size = sizeof(*body);
142 request = ptlrpc_prep_req2(conn, OST_SETATTR, 1, &size, NULL);
146 body = lustre_msg_buf(request->rq_reqmsg, 0);
147 memcpy(&body->oa, oa, sizeof(*oa));
149 request->rq_replen = lustre_msg_size(1, &size);
151 rc = ptlrpc_queue_wait(request);
152 rc = ptlrpc_check_status(request, rc);
156 ptlrpc_free_req(request);
160 static int osc_create(struct lustre_handle *conn, struct obdo *oa,
161 struct lov_stripe_md **ea)
163 struct ptlrpc_request *request;
164 struct ost_body *body;
165 int rc, size = sizeof(*body);
178 OBD_ALLOC(*ea, oa->o_easize);
181 (*ea)->lmd_easize = oa->o_easize;
184 request = ptlrpc_prep_req2(conn, OST_CREATE, 1, &size, NULL);
188 body = lustre_msg_buf(request->rq_reqmsg, 0);
189 memcpy(&body->oa, oa, sizeof(*oa));
191 request->rq_replen = lustre_msg_size(1, &size);
193 rc = ptlrpc_queue_wait(request);
194 rc = ptlrpc_check_status(request, rc);
198 body = lustre_msg_buf(request->rq_repmsg, 0);
199 memcpy(oa, &body->oa, sizeof(*oa));
201 (*ea)->lmd_object_id = oa->o_id;
202 (*ea)->lmd_stripe_count = 1;
205 ptlrpc_free_req(request);
209 static int osc_punch(struct lustre_handle *conn, struct obdo *oa,
210 struct lov_stripe_md *md, obd_size start,
213 struct ptlrpc_request *request;
214 struct ost_body *body;
215 int rc, size = sizeof(*body);
223 request = ptlrpc_prep_req2(conn, OST_PUNCH, 1, &size, NULL);
227 body = lustre_msg_buf(request->rq_reqmsg, 0);
228 #warning FIXME: pack only valid fields instead of memcpy, endianness, valid
229 memcpy(&body->oa, oa, sizeof(*oa));
231 /* overload the blocks and size fields in the oa with start/end */
232 #warning FIXME: endianness, size=start, blocks=end?
233 body->oa.o_blocks = start;
234 body->oa.o_size = end;
235 body->oa.o_valid |= OBD_MD_FLBLOCKS | OBD_MD_FLSIZE;
237 request->rq_replen = lustre_msg_size(1, &size);
239 rc = ptlrpc_queue_wait(request);
240 rc = ptlrpc_check_status(request, rc);
244 body = lustre_msg_buf(request->rq_repmsg, 0);
245 memcpy(oa, &body->oa, sizeof(*oa));
249 ptlrpc_free_req(request);
253 static int osc_destroy(struct lustre_handle *conn, struct obdo *oa,
254 struct lov_stripe_md *ea)
256 struct ptlrpc_request *request;
257 struct ost_body *body;
258 int rc, size = sizeof(*body);
265 request = ptlrpc_prep_req2(conn, OST_DESTROY, 1, &size, NULL);
269 body = lustre_msg_buf(request->rq_reqmsg, 0);
270 #warning FIXME: pack only valid fields instead of memcpy, endianness
271 memcpy(&body->oa, oa, sizeof(*oa));
273 request->rq_replen = lustre_msg_size(1, &size);
275 rc = ptlrpc_queue_wait(request);
276 rc = ptlrpc_check_status(request, rc);
280 body = lustre_msg_buf(request->rq_repmsg, 0);
281 memcpy(oa, &body->oa, sizeof(*oa));
285 ptlrpc_free_req(request);
289 struct osc_brw_cb_data {
290 brw_callback_t callback;
296 /* Our bulk-unmapping bottom half. */
297 static void unmap_and_decref_bulk_desc(void *data)
299 struct ptlrpc_bulk_desc *desc = data;
300 struct list_head *tmp;
303 /* This feels wrong to me. */
304 list_for_each(tmp, &desc->b_page_list) {
305 struct ptlrpc_bulk_page *bulk;
306 bulk = list_entry(tmp, struct ptlrpc_bulk_page, b_link);
308 kunmap(bulk->b_page);
311 ptlrpc_bulk_decref(desc);
315 static void brw_finish(struct ptlrpc_bulk_desc *desc, void *data)
317 struct osc_brw_cb_data *cb_data = data;
321 if (desc->b_flags & PTL_RPC_FL_TIMEOUT) {
322 err = (desc->b_flags & PTL_RPC_FL_INTR ? -ERESTARTSYS :
326 if (cb_data->callback)
327 cb_data->callback(cb_data->cb_data, err, CB_PHASE_FINISH);
329 OBD_FREE(cb_data->obd_data, cb_data->obd_size);
330 OBD_FREE(cb_data, sizeof(*cb_data));
332 /* We can't kunmap the desc from interrupt context, so we do it from
333 * the bottom half above. */
334 INIT_TQUEUE(&desc->b_queue, 0, 0);
335 PREPARE_TQUEUE(&desc->b_queue, unmap_and_decref_bulk_desc, desc);
336 schedule_task(&desc->b_queue);
341 static int osc_brw_read(struct lustre_handle *conn, struct lov_stripe_md *md,
342 obd_count page_count, struct brw_page *pga,
343 brw_callback_t callback, struct io_cb_data *data)
345 struct ptlrpc_connection *connection = client_conn2cli(conn)->cl_conn;
346 struct ptlrpc_request *request = NULL;
347 struct ptlrpc_bulk_desc *desc = NULL;
348 struct ost_body *body;
349 struct osc_brw_cb_data *cb_data = NULL;
350 int rc, size[3] = {sizeof(*body)};
351 void *iooptr, *nioptr;
356 size[1] = sizeof(struct obd_ioobj);
357 size[2] = page_count * sizeof(struct niobuf_remote);
359 request = ptlrpc_prep_req2(conn, OST_READ, 3, size, NULL);
363 body = lustre_msg_buf(request->rq_reqmsg, 0);
365 desc = ptlrpc_prep_bulk(connection);
367 GOTO(out_req, rc = -ENOMEM);
368 desc->b_portal = OST_BULK_PORTAL;
369 desc->b_cb = brw_finish;
370 OBD_ALLOC(cb_data, sizeof(*cb_data));
372 GOTO(out_desc, rc = -ENOMEM);
374 cb_data->callback = callback;
375 cb_data->cb_data = data;
377 desc->b_cb_data = cb_data;
379 iooptr = lustre_msg_buf(request->rq_reqmsg, 1);
380 nioptr = lustre_msg_buf(request->rq_reqmsg, 2);
381 ost_pack_ioo(&iooptr, md, page_count);
382 /* end almost identical to brw_write case */
384 spin_lock(&connection->c_lock);
385 xid = ++connection->c_xid_out; /* single xid for all pages */
386 spin_unlock(&connection->c_lock);
388 for (mapped = 0; mapped < page_count; mapped++) {
389 struct ptlrpc_bulk_page *bulk = ptlrpc_prep_bulk_page(desc);
391 GOTO(out_unmap, rc = -ENOMEM);
393 bulk->b_xid = xid; /* single xid for all pages */
395 bulk->b_buf = kmap(pga[mapped].pg);
396 bulk->b_page = pga[mapped].pg;
397 bulk->b_buflen = PAGE_SIZE;
398 ost_pack_niobuf(&nioptr, pga[mapped].off, pga[mapped].count,
399 pga[mapped].flag, bulk->b_xid);
403 * Register the bulk first, because the reply could arrive out of order,
404 * and we want to be ready for the bulk data.
406 * The reference is released when brw_finish is complete.
408 * On error, we never do the brw_finish, so we handle all decrefs.
410 rc = ptlrpc_register_bulk(desc);
414 request->rq_replen = lustre_msg_size(1, size);
415 rc = ptlrpc_queue_wait(request);
416 rc = ptlrpc_check_status(request, rc);
418 /* XXX: Mike, this is the only place I'm not sure of. If we have
419 * an error here, will we have always called brw_finish? If no,
420 * then out_req will not clean up and we should go to out_desc.
421 * If maybe, then we are screwed, and we need to set things up
422 * so that bulk_sink_callback is called for each bulk page,
423 * even on error so brw_finish is always called. It would need
424 * to be passed an error code as a parameter to know what to do.
426 * That would also help with the partial completion case, so
427 * we could say in brw_finish "these pages are done, don't
428 * restart them" and osc_brw callers can know this.
433 /* Callbacks cause asynchronous handling. */
434 rc = callback(data, 0, CB_PHASE_START);
438 ptlrpc_req_finished(request);
441 /* Clean up on error. */
444 kunmap(page_array[mapped]);
445 OBD_FREE(cb_data, sizeof(*cb_data));
447 ptlrpc_bulk_decref(desc);
451 static int osc_brw_write(struct lustre_handle *conn, struct lov_stripe_md *md,
452 obd_count page_count, struct brw_page *pga,
453 brw_callback_t callback, struct io_cb_data *data)
455 struct ptlrpc_connection *connection = client_conn2cli(conn)->cl_conn;
456 struct ptlrpc_request *request = NULL;
457 struct ptlrpc_bulk_desc *desc = NULL;
458 struct ost_body *body;
459 struct niobuf_local *local = NULL;
460 struct niobuf_remote *remote;
461 struct osc_brw_cb_data *cb_data = NULL;
462 int rc, j, size[3] = {sizeof(*body)};
463 void *iooptr, *nioptr;
467 size[1] = sizeof(struct obd_ioobj);
468 size[2] = page_count * sizeof(*remote);
470 request = ptlrpc_prep_req2(conn, OST_WRITE, 3, size, NULL);
474 body = lustre_msg_buf(request->rq_reqmsg, 0);
476 desc = ptlrpc_prep_bulk(connection);
478 GOTO(out_req, rc = -ENOMEM);
479 desc->b_portal = OSC_BULK_PORTAL;
480 desc->b_cb = brw_finish;
481 OBD_ALLOC(cb_data, sizeof(*cb_data));
483 GOTO(out_desc, rc = -ENOMEM);
485 cb_data->callback = callback;
486 cb_data->cb_data = data;
488 desc->b_cb_data = cb_data;
490 iooptr = lustre_msg_buf(request->rq_reqmsg, 1);
491 nioptr = lustre_msg_buf(request->rq_reqmsg, 2);
492 ost_pack_ioo(&iooptr, md, page_count);
493 /* end almost identical to brw_read case */
495 OBD_ALLOC(local, page_count * sizeof(*local));
497 GOTO(out_cb, rc = -ENOMEM);
499 cb_data->obd_data = local;
500 cb_data->obd_size = page_count * sizeof(*local);
502 for (mapped = 0; mapped < page_count; mapped++) {
503 local[mapped].addr = kmap(pga[mapped].pg);
504 local[mapped].offset = pga[mapped].off;
505 local[mapped].len = pga[mapped].count;
506 ost_pack_niobuf(&nioptr, pga[mapped].off, pga[mapped].count,
507 pga[mapped].flag, 0);
510 size[1] = page_count * sizeof(*remote);
511 request->rq_replen = lustre_msg_size(2, size);
512 rc = ptlrpc_queue_wait(request);
513 rc = ptlrpc_check_status(request, rc);
517 nioptr = lustre_msg_buf(request->rq_repmsg, 1);
519 GOTO(out_unmap, rc = -EINVAL);
521 if (request->rq_repmsg->buflens[1] != size[1]) {
522 CERROR("buffer length wrong (%d vs. %d)\n",
523 request->rq_repmsg->buflens[1], size[1]);
524 GOTO(out_unmap, rc = -EINVAL);
527 for (j = 0; j < page_count; j++) {
528 struct ptlrpc_bulk_page *bulk;
530 ost_unpack_niobuf(&nioptr, &remote);
532 bulk = ptlrpc_prep_bulk_page(desc);
534 GOTO(out_unmap, rc = -ENOMEM);
536 bulk->b_buf = (void *)(unsigned long)local[j].addr;
537 bulk->b_buflen = local[j].len;
538 bulk->b_xid = remote->xid;
539 bulk->b_page = pga[j].pg;
542 if (desc->b_page_count != page_count)
545 /* Our reference is released when brw_finish is complete. */
546 rc = ptlrpc_send_bulk(desc);
548 /* XXX: Mike, same question as in osc_brw_read. */
552 /* Callbacks cause asynchronous handling. */
553 rc = callback(data, 0, CB_PHASE_START);
557 ptlrpc_req_finished(request);
560 /* Clean up on error. */
563 kunmap(pagearray[mapped]);
565 OBD_FREE(local, page_count * sizeof(*local));
567 OBD_FREE(cb_data, sizeof(*cb_data));
569 ptlrpc_bulk_decref(desc);
573 static int osc_brw(int cmd, struct lustre_handle *conn,
574 struct lov_stripe_md *md, obd_count page_count,
575 struct brw_page *pga, brw_callback_t callback,
576 struct io_cb_data *data)
578 if (cmd & OBD_BRW_WRITE)
579 return osc_brw_write(conn, md, page_count, pga, callback, data);
581 return osc_brw_read(conn, md, page_count, pga, callback, data);
584 static int osc_enqueue(struct lustre_handle *connh, struct lov_stripe_md *md,
585 struct lustre_handle *parent_lock,
586 __u32 type, void *extentp, int extent_len, __u32 mode,
587 int *flags, void *callback, void *data, int datalen,
588 struct lustre_handle *lockh)
590 __u64 res_id[RES_NAME_SIZE] = { md->lmd_object_id };
591 struct obd_device *obddev = class_conn2obd(connh);
592 struct ldlm_extent *extent = extentp;
596 /* Filesystem locks are given a bit of special treatment: first we
597 * fixup the lock to start and end on page boundaries. */
598 extent->start &= PAGE_MASK;
599 extent->end = (extent->end + PAGE_SIZE - 1) & PAGE_MASK;
601 /* Next, search for already existing extent locks that will cover us */
602 //osc_con2dlmcl(conn, &cl, &connection, &rconn);
603 rc = ldlm_lock_match(obddev->obd_namespace, res_id, type, extent,
604 sizeof(extent), mode, lockh);
606 /* We already have a lock, and it's referenced */
610 /* Next, search for locks that we can upgrade (if we're trying to write)
611 * or are more than we need (if we're trying to read). Because the VFS
612 * and page cache already protect us locally, lots of readers/writers
613 * can share a single PW lock. */
619 rc = ldlm_lock_match(obddev->obd_namespace, res_id, type, extent,
620 sizeof(extent), mode2, lockh);
623 /* FIXME: This is not incredibly elegant, but it might
624 * be more elegant than adding another parameter to
625 * lock_match. I want a second opinion. */
626 ldlm_lock_addref(lockh, mode);
627 ldlm_lock_decref(lockh, mode2);
632 rc = ldlm_cli_convert(lockh, mode, &flags);
639 rc = ldlm_cli_enqueue(connh, NULL,obddev->obd_namespace,
640 parent_lock, res_id, type, extent,
641 sizeof(extent), mode, flags, ldlm_completion_ast,
642 callback, data, datalen, lockh);
646 static int osc_cancel(struct lustre_handle *oconn, struct lov_stripe_md *md,
647 __u32 mode, struct lustre_handle *lockh)
651 ldlm_lock_decref(lockh, mode);
656 static int osc_statfs(struct lustre_handle *conn, struct statfs *sfs)
658 struct ptlrpc_request *request;
659 struct obd_statfs *osfs;
660 int rc, size = sizeof(*osfs);
663 request = ptlrpc_prep_req2(conn, OST_STATFS, 0, NULL, NULL);
667 request->rq_replen = lustre_msg_size(1, &size);
669 rc = ptlrpc_queue_wait(request);
670 rc = ptlrpc_check_status(request, rc);
672 CERROR("%s failed: rc = %d\n", __FUNCTION__, rc);
676 osfs = lustre_msg_buf(request->rq_repmsg, 0);
677 obd_statfs_unpack(osfs, sfs);
681 ptlrpc_free_req(request);
685 static int osc_iocontrol(long cmd, struct lustre_handle *conn, int len,
686 void *karg, void *uarg)
688 struct obd_device *obddev = class_conn2obd(conn);
689 struct obd_ioctl_data *data = karg;
693 if (_IOC_TYPE(cmd) != IOC_LDLM_TYPE || _IOC_NR(cmd) <
694 IOC_LDLM_MIN_NR || _IOC_NR(cmd) > IOC_LDLM_MAX_NR) {
695 CDEBUG(D_IOCTL, "invalid ioctl (type %ld, nr %ld, size %ld)\n",
696 _IOC_TYPE(cmd), _IOC_NR(cmd), _IOC_SIZE(cmd));
701 case IOC_LDLM_TEST: {
702 err = ldlm_test(obddev, conn);
703 CERROR("-- done err %d\n", err);
706 case IOC_LDLM_REGRESS_START: {
707 unsigned int numthreads;
709 if (data->ioc_inllen1)
710 numthreads = simple_strtoul(data->ioc_inlbuf1, NULL, 0);
714 err = ldlm_regression_start(obddev, conn, numthreads);
715 CERROR("-- done err %d\n", err);
718 case IOC_LDLM_REGRESS_STOP: {
719 err = ldlm_regression_stop();
720 CERROR("-- done err %d\n", err);
724 GOTO(out, err = -EINVAL);
730 struct obd_ops osc_obd_ops = {
731 o_setup: client_obd_setup,
732 o_cleanup: client_obd_cleanup,
733 o_statfs: osc_statfs,
734 o_create: osc_create,
735 o_destroy: osc_destroy,
736 o_getattr: osc_getattr,
737 o_setattr: osc_setattr,
740 o_connect: client_obd_connect,
741 o_disconnect: client_obd_disconnect,
744 o_enqueue: osc_enqueue,
745 o_cancel: osc_cancel,
746 o_iocontrol: osc_iocontrol
749 static int __init osc_init(void)
751 return class_register_type(&osc_obd_ops, LUSTRE_OSC_NAME);
754 static void __exit osc_exit(void)
756 class_unregister_type(LUSTRE_OSC_NAME);
759 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
760 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC) v1.0");
761 MODULE_LICENSE("GPL");
763 module_init(osc_init);
764 module_exit(osc_exit);