1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
4 * Copyright (C) 2001, 2002 Cluster File Systems, Inc.
6 * This code is issued under the GNU General Public License.
7 * See the file COPYING in this distribution
9 * Author Peter Braam <braam@clusterfs.com>
11 * This server is single threaded at present (but can easily be multi
12 * threaded). For testing and management it is treated as an
13 * obd_device, although it does not export a full OBD method table
14 * (the requests are coming in over the wire, so object target
15 * modules do not have a full method table.)
20 #define DEBUG_SUBSYSTEM S_OSC
22 #include <linux/module.h>
23 #include <linux/lustre_dlm.h>
24 #include <linux/lustre_mds.h> /* for mds_objid */
25 #include <linux/obd_ost.h>
26 #include <linux/obd_lov.h>
27 #include <linux/init.h>
31 static int osc_getattr(struct lustre_handle *conn, struct obdo *oa,
32 struct lov_stripe_md *md)
34 struct ptlrpc_request *request;
35 struct ost_body *body;
36 int rc, size = sizeof(*body);
39 request = ptlrpc_prep_req2(conn, OST_GETATTR, 1, &size, NULL);
43 body = lustre_msg_buf(request->rq_reqmsg, 0);
44 #warning FIXME: pack only valid fields instead of memcpy, endianness
45 memcpy(&body->oa, oa, sizeof(*oa));
47 request->rq_replen = lustre_msg_size(1, &size);
49 rc = ptlrpc_queue_wait(request);
50 rc = ptlrpc_check_status(request, rc);
52 CERROR("%s failed: rc = %d\n", __FUNCTION__, rc);
56 body = lustre_msg_buf(request->rq_repmsg, 0);
57 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
59 memcpy(oa, &body->oa, sizeof(*oa));
63 ptlrpc_free_req(request);
67 static int osc_open(struct lustre_handle *conn, struct obdo *oa,
68 struct lov_stripe_md *md)
70 struct ptlrpc_request *request;
71 struct ost_body *body;
72 int rc, size = sizeof(*body);
75 request = ptlrpc_prep_req2(conn, OST_OPEN, 1, &size, NULL);
79 body = lustre_msg_buf(request->rq_reqmsg, 0);
80 #warning FIXME: pack only valid fields instead of memcpy, endianness
81 memcpy(&body->oa, oa, sizeof(*oa));
83 request->rq_replen = lustre_msg_size(1, &size);
85 rc = ptlrpc_queue_wait(request);
86 rc = ptlrpc_check_status(request, rc);
90 body = lustre_msg_buf(request->rq_repmsg, 0);
91 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
93 memcpy(oa, &body->oa, sizeof(*oa));
97 ptlrpc_free_req(request);
101 static int osc_close(struct lustre_handle *conn, struct obdo *oa,
102 struct lov_stripe_md *md)
104 struct ptlrpc_request *request;
105 struct ost_body *body;
106 int rc, size = sizeof(*body);
109 request = ptlrpc_prep_req2(conn, OST_CLOSE, 1, &size, NULL);
113 body = lustre_msg_buf(request->rq_reqmsg, 0);
114 #warning FIXME: pack only valid fields instead of memcpy, endianness
115 memcpy(&body->oa, oa, sizeof(*oa));
117 request->rq_replen = lustre_msg_size(1, &size);
119 rc = ptlrpc_queue_wait(request);
120 rc = ptlrpc_check_status(request, rc);
124 body = lustre_msg_buf(request->rq_repmsg, 0);
125 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
127 memcpy(oa, &body->oa, sizeof(*oa));
131 ptlrpc_free_req(request);
135 static int osc_setattr(struct lustre_handle *conn, struct obdo *oa,
136 struct lov_stripe_md *md)
138 struct ptlrpc_request *request;
139 struct ost_body *body;
140 int rc, size = sizeof(*body);
143 request = ptlrpc_prep_req2(conn, OST_SETATTR, 1, &size, NULL);
147 body = lustre_msg_buf(request->rq_reqmsg, 0);
148 memcpy(&body->oa, oa, sizeof(*oa));
150 request->rq_replen = lustre_msg_size(1, &size);
152 rc = ptlrpc_queue_wait(request);
153 rc = ptlrpc_check_status(request, rc);
157 ptlrpc_free_req(request);
161 static int osc_create(struct lustre_handle *conn, struct obdo *oa,
162 struct lov_stripe_md **ea)
164 struct ptlrpc_request *request;
165 struct ost_body *body;
166 int rc, size = sizeof(*body);
179 OBD_ALLOC(*ea, oa->o_easize);
182 (*ea)->lmd_easize = oa->o_easize;
185 request = ptlrpc_prep_req2(conn, OST_CREATE, 1, &size, NULL);
189 body = lustre_msg_buf(request->rq_reqmsg, 0);
190 memcpy(&body->oa, oa, sizeof(*oa));
192 request->rq_replen = lustre_msg_size(1, &size);
194 rc = ptlrpc_queue_wait(request);
195 rc = ptlrpc_check_status(request, rc);
199 body = lustre_msg_buf(request->rq_repmsg, 0);
200 memcpy(oa, &body->oa, sizeof(*oa));
202 (*ea)->lmd_object_id = oa->o_id;
203 (*ea)->lmd_stripe_count = 1;
206 ptlrpc_free_req(request);
210 static int osc_punch(struct lustre_handle *conn, struct obdo *oa,
211 struct lov_stripe_md *md, obd_size start,
214 struct ptlrpc_request *request;
215 struct ost_body *body;
216 int rc, size = sizeof(*body);
224 request = ptlrpc_prep_req2(conn, OST_PUNCH, 1, &size, NULL);
228 body = lustre_msg_buf(request->rq_reqmsg, 0);
229 #warning FIXME: pack only valid fields instead of memcpy, endianness, valid
230 memcpy(&body->oa, oa, sizeof(*oa));
232 /* overload the blocks and size fields in the oa with start/end */
233 #warning FIXME: endianness, size=start, blocks=end?
234 body->oa.o_blocks = start;
235 body->oa.o_size = end;
236 body->oa.o_valid |= OBD_MD_FLBLOCKS | OBD_MD_FLSIZE;
238 request->rq_replen = lustre_msg_size(1, &size);
240 rc = ptlrpc_queue_wait(request);
241 rc = ptlrpc_check_status(request, rc);
245 body = lustre_msg_buf(request->rq_repmsg, 0);
246 memcpy(oa, &body->oa, sizeof(*oa));
250 ptlrpc_free_req(request);
254 static int osc_destroy(struct lustre_handle *conn, struct obdo *oa,
255 struct lov_stripe_md *ea)
257 struct ptlrpc_request *request;
258 struct ost_body *body;
259 int rc, size = sizeof(*body);
266 request = ptlrpc_prep_req2(conn, OST_DESTROY, 1, &size, NULL);
270 body = lustre_msg_buf(request->rq_reqmsg, 0);
271 #warning FIXME: pack only valid fields instead of memcpy, endianness
272 memcpy(&body->oa, oa, sizeof(*oa));
274 request->rq_replen = lustre_msg_size(1, &size);
276 rc = ptlrpc_queue_wait(request);
277 rc = ptlrpc_check_status(request, rc);
281 body = lustre_msg_buf(request->rq_repmsg, 0);
282 memcpy(oa, &body->oa, sizeof(*oa));
286 ptlrpc_free_req(request);
290 struct osc_brw_cb_data {
291 brw_callback_t callback;
297 /* Our bulk-unmapping bottom half. */
298 static void unmap_and_decref_bulk_desc(void *data)
300 struct ptlrpc_bulk_desc *desc = data;
301 struct list_head *tmp;
304 /* This feels wrong to me. */
305 list_for_each(tmp, &desc->b_page_list) {
306 struct ptlrpc_bulk_page *bulk;
307 bulk = list_entry(tmp, struct ptlrpc_bulk_page, b_link);
309 kunmap(bulk->b_page);
312 ptlrpc_bulk_decref(desc);
316 static void brw_finish(struct ptlrpc_bulk_desc *desc, void *data)
318 struct osc_brw_cb_data *cb_data = data;
321 if (desc->b_flags & PTL_RPC_FL_INTR)
322 CERROR("got signal\n");
324 if (cb_data->callback)
325 cb_data->callback(cb_data->cb_data);
327 OBD_FREE(cb_data->obd_data, cb_data->obd_size);
328 OBD_FREE(cb_data, sizeof(*cb_data));
330 /* We can't kunmap the desc from interrupt context, so we do it from
331 * the bottom half above. */
332 INIT_TQUEUE(&desc->b_queue, 0, 0);
333 PREPARE_TQUEUE(&desc->b_queue, unmap_and_decref_bulk_desc, desc);
334 schedule_task(&desc->b_queue);
339 static int osc_brw_read(struct lustre_handle *conn, struct lov_stripe_md *md,
340 obd_count page_count, struct page **page_array,
341 obd_size *count, obd_off *offset, obd_flag *flags,
342 brw_callback_t callback, void *data)
344 struct ptlrpc_connection *connection = client_conn2cli(conn)->cl_conn;
345 struct ptlrpc_request *request = NULL;
346 struct ptlrpc_bulk_desc *desc = NULL;
347 struct ost_body *body;
348 struct osc_brw_cb_data *cb_data = NULL;
349 int rc, size[3] = {sizeof(*body)};
350 void *iooptr, *nioptr;
355 size[1] = sizeof(struct obd_ioobj);
356 size[2] = page_count * sizeof(struct niobuf_remote);
358 request = ptlrpc_prep_req2(conn, OST_READ, 3, size, NULL);
362 body = lustre_msg_buf(request->rq_reqmsg, 0);
364 desc = ptlrpc_prep_bulk(connection);
366 GOTO(out_req, rc = -ENOMEM);
367 desc->b_portal = OST_BULK_PORTAL;
368 desc->b_cb = brw_finish;
369 OBD_ALLOC(cb_data, sizeof(*cb_data));
371 GOTO(out_desc, rc = -ENOMEM);
373 cb_data->callback = callback;
374 cb_data->cb_data = data;
375 desc->b_cb_data = cb_data;
377 iooptr = lustre_msg_buf(request->rq_reqmsg, 1);
378 nioptr = lustre_msg_buf(request->rq_reqmsg, 2);
379 ost_pack_ioo(&iooptr, md, page_count);
380 /* end almost identical to brw_write case */
382 spin_lock(&connection->c_lock);
383 xid = ++connection->c_xid_out; /* single xid for all pages */
384 spin_unlock(&connection->c_lock);
386 for (mapped = 0; mapped < page_count; mapped++) {
387 struct ptlrpc_bulk_page *bulk = ptlrpc_prep_bulk_page(desc);
389 GOTO(out_unmap, rc = -ENOMEM);
391 bulk->b_xid = xid; /* single xid for all pages */
393 bulk->b_buf = kmap(page_array[mapped]);
394 bulk->b_page = page_array[mapped];
395 bulk->b_buflen = PAGE_SIZE;
396 ost_pack_niobuf(&nioptr, offset[mapped], count[mapped],
397 flags[mapped], bulk->b_xid);
401 * Register the bulk first, because the reply could arrive out of order,
402 * and we want to be ready for the bulk data.
404 * One reference is released when brw_finish is complete, the
405 * other here when we finish waiting on it if we don't have a callback.
407 * We don't reference the bulk descriptor again here if there is a
408 * callback, so we don't need an additional refcount on it.
410 * On error, we never do the brw_finish, so we handle all decrefs.
413 ptlrpc_bulk_addref(desc);
414 rc = ptlrpc_register_bulk(desc);
418 request->rq_replen = lustre_msg_size(1, size);
419 rc = ptlrpc_queue_wait(request);
420 rc = ptlrpc_check_status(request, rc);
422 /* XXX: Mike, this is the only place I'm not sure of. If we had
423 * an error here, will we always call brw_finish? If yes, then
424 * out_desc_2 will do too much and we should jump to out_desc.
425 * If maybe, then we are screwed, and we need to set things up
426 * so that bulk_sink_callback is called for each bulk page,
427 * even on error so brw_finish is always called. It would need
428 * to be passed an error code as a parameter to know what to do.
430 * That would also help with the partial completion case, so
431 * we could say in brw_finish "these pages are done, don't
432 * restart them" and osc_brw callers can know this.
437 /* Callbacks cause asynchronous handling. */
439 GOTO(out_req, rc = 0);
441 /* If there's no callback function, sleep here until complete. */
442 l_wait_event_killable(desc->b_waitq, ptlrpc_check_bulk_received(desc));
443 if (desc->b_flags & PTL_RPC_FL_INTR)
444 GOTO(out_desc, rc = -EINTR);
448 ptlrpc_bulk_decref(desc);
450 ptlrpc_req_finished(request);
453 /* Clean up on error. */
456 ptlrpc_bulk_decref(desc);
459 kunmap(page_array[mapped]);
460 OBD_FREE(cb_data, sizeof(*cb_data));
464 static int osc_brw_write(struct lustre_handle *conn,
465 struct lov_stripe_md *md, obd_count page_count,
466 struct page **pagearray, obd_size *count,
467 obd_off *offset, obd_flag *flags,
468 brw_callback_t callback, void *data)
470 struct ptlrpc_connection *connection = client_conn2cli(conn)->cl_conn;
471 struct ptlrpc_request *request = NULL;
472 struct ptlrpc_bulk_desc *desc = NULL;
473 struct ost_body *body;
474 struct niobuf_local *local = NULL;
475 struct niobuf_remote *remote;
476 struct osc_brw_cb_data *cb_data = NULL;
477 int rc, j, size[3] = {sizeof(*body)};
478 void *iooptr, *nioptr;
482 size[1] = sizeof(struct obd_ioobj);
483 size[2] = page_count * sizeof(*remote);
485 request = ptlrpc_prep_req2(conn, OST_WRITE, 3, size, NULL);
489 body = lustre_msg_buf(request->rq_reqmsg, 0);
491 desc = ptlrpc_prep_bulk(connection);
493 GOTO(out_req, rc = -ENOMEM);
494 desc->b_portal = OSC_BULK_PORTAL;
495 desc->b_cb = brw_finish;
496 OBD_ALLOC(cb_data, sizeof(*cb_data));
498 GOTO(out_desc, rc = -ENOMEM);
500 cb_data->callback = callback;
501 cb_data->cb_data = data;
502 desc->b_cb_data = cb_data;
504 iooptr = lustre_msg_buf(request->rq_reqmsg, 1);
505 nioptr = lustre_msg_buf(request->rq_reqmsg, 2);
506 ost_pack_ioo(&iooptr, md, page_count);
507 /* end almost identical to brw_read case */
509 OBD_ALLOC(local, page_count * sizeof(*local));
511 GOTO(out_cb, rc = -ENOMEM);
513 cb_data->obd_data = local;
514 cb_data->obd_size = page_count * sizeof(*local);
516 for (mapped = 0; mapped < page_count; mapped++) {
517 local[mapped].addr = kmap(pagearray[mapped]);
518 local[mapped].offset = offset[mapped];
519 local[mapped].len = count[mapped];
520 ost_pack_niobuf(&nioptr, offset[mapped], count[mapped],
524 size[1] = page_count * sizeof(*remote);
525 request->rq_replen = lustre_msg_size(2, size);
526 rc = ptlrpc_queue_wait(request);
527 rc = ptlrpc_check_status(request, rc);
531 nioptr = lustre_msg_buf(request->rq_repmsg, 1);
533 GOTO(out_unmap, rc = -EINVAL);
535 if (request->rq_repmsg->buflens[1] != size[1]) {
536 CERROR("buffer length wrong (%d vs. %d)\n",
537 request->rq_repmsg->buflens[1], size[1]);
538 GOTO(out_unmap, rc = -EINVAL);
541 for (j = 0; j < page_count; j++) {
542 struct ptlrpc_bulk_page *bulk;
544 ost_unpack_niobuf(&nioptr, &remote);
546 bulk = ptlrpc_prep_bulk_page(desc);
548 GOTO(out_unmap, rc = -ENOMEM);
550 bulk->b_buf = (void *)(unsigned long)local[j].addr;
551 bulk->b_buflen = local[j].len;
552 bulk->b_xid = remote->xid;
553 bulk->b_page = pagearray[j];
556 if (desc->b_page_count != page_count)
560 * One reference is released when brw_finish is complete, the
561 * other here when we finish waiting on it if we don't have a callback.
563 * We don't reference the bulk descriptor again here if there is a
564 * callback, so we don't need an additional refcount on it.
567 ptlrpc_bulk_addref(desc);
568 rc = ptlrpc_send_bulk(desc);
570 /* XXX: Mike, same question as in osc_brw_read. */
574 /* Callbacks cause asynchronous handling. */
576 GOTO(out_req, rc = 0);
578 /* If there's no callback function, sleep here until complete. */
579 l_wait_event_killable(desc->b_waitq, ptlrpc_check_bulk_sent(desc));
580 if (desc->b_flags & PTL_RPC_FL_INTR)
581 GOTO(out_desc, rc = -EINTR);
585 ptlrpc_bulk_decref(desc);
587 ptlrpc_req_finished(request);
590 /* Clean up on error. */
593 ptlrpc_bulk_decref(desc);
596 kunmap(pagearray[mapped]);
598 OBD_FREE(local, page_count * sizeof(*local));
600 OBD_FREE(cb_data, sizeof(*cb_data));
604 static int osc_brw(int cmd, struct lustre_handle *conn,
605 struct lov_stripe_md *md, obd_count page_count,
606 struct page **page_array, obd_size *count, obd_off *offset,
607 obd_flag *flags, brw_callback_t callback, void *data)
609 if (cmd & OBD_BRW_WRITE)
610 return osc_brw_write(conn, md, page_count, page_array, count,
611 offset, flags, callback, data);
613 return osc_brw_read(conn, md, page_count, page_array, count,
614 offset, flags, callback, data);
617 static int osc_enqueue(struct lustre_handle *connh,
618 struct lustre_handle *parent_lock, __u64 *res_id,
619 __u32 type, void *extentp, int extent_len, __u32 mode,
620 int *flags, void *callback, void *data, int datalen,
621 struct lustre_handle *lockh)
623 struct obd_device *obddev = class_conn2obd(connh);
624 struct ldlm_extent *extent = extentp;
628 /* Filesystem locks are given a bit of special treatment: first we
629 * fixup the lock to start and end on page boundaries. */
630 extent->start &= PAGE_MASK;
631 extent->end = (extent->end + PAGE_SIZE - 1) & PAGE_MASK;
633 /* Next, search for already existing extent locks that will cover us */
634 //osc_con2dlmcl(conn, &cl, &connection, &rconn);
635 rc = ldlm_lock_match(obddev->obd_namespace, res_id, type, extent,
636 sizeof(extent), mode, lockh);
638 /* We already have a lock, and it's referenced */
642 /* Next, search for locks that we can upgrade (if we're trying to write)
643 * or are more than we need (if we're trying to read). Because the VFS
644 * and page cache already protect us locally, lots of readers/writers
645 * can share a single PW lock. */
651 rc = ldlm_lock_match(obddev->obd_namespace, res_id, type, extent,
652 sizeof(extent), mode2, lockh);
655 /* FIXME: This is not incredibly elegant, but it might
656 * be more elegant than adding another parameter to
657 * lock_match. I want a second opinion. */
658 ldlm_lock_addref(lockh, mode);
659 ldlm_lock_decref(lockh, mode2);
664 rc = ldlm_cli_convert(lockh, mode, &flags);
671 rc = ldlm_cli_enqueue(connh, NULL,obddev->obd_namespace,
672 parent_lock, res_id, type, extent, sizeof(extent),
673 mode, flags, ldlm_completion_ast, callback, data, datalen, lockh);
677 static int osc_cancel(struct lustre_handle *oconn, __u32 mode,
678 struct lustre_handle *lockh)
682 ldlm_lock_decref(lockh, mode);
687 static int osc_statfs(struct lustre_handle *conn, struct statfs *sfs)
689 struct ptlrpc_request *request;
690 struct obd_statfs *osfs;
691 int rc, size = sizeof(*osfs);
694 request = ptlrpc_prep_req2(conn, OST_STATFS, 0, NULL, NULL);
698 request->rq_replen = lustre_msg_size(1, &size);
700 rc = ptlrpc_queue_wait(request);
701 rc = ptlrpc_check_status(request, rc);
703 CERROR("%s failed: rc = %d\n", __FUNCTION__, rc);
707 osfs = lustre_msg_buf(request->rq_repmsg, 0);
708 obd_statfs_unpack(osfs, sfs);
712 ptlrpc_free_req(request);
716 struct obd_ops osc_obd_ops = {
717 o_setup: client_obd_setup,
718 o_cleanup: client_obd_cleanup,
719 o_statfs: osc_statfs,
720 o_create: osc_create,
721 o_destroy: osc_destroy,
722 o_getattr: osc_getattr,
723 o_setattr: osc_setattr,
726 o_connect: client_obd_connect,
727 o_disconnect: client_obd_disconnect,
730 o_enqueue: osc_enqueue,
734 static int __init osc_init(void)
736 return class_register_type(&osc_obd_ops, LUSTRE_OSC_NAME);
739 static void __exit osc_exit(void)
741 class_unregister_type(LUSTRE_OSC_NAME);
744 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
745 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC) v1.0");
746 MODULE_LICENSE("GPL");
748 module_init(osc_init);
749 module_exit(osc_exit);