1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
4 * Copyright (C) 2001, 2002 Cluster File Systems, Inc.
6 * This code is issued under the GNU General Public License.
7 * See the file COPYING in this distribution
9 * Author Peter Braam <braam@clusterfs.com>
11 * This server is single threaded at present (but can easily be multi
12 * threaded). For testing and management it is treated as an
13 * obd_device, although it does not export a full OBD method table
14 * (the requests are coming in over the wire, so object target
15 * modules do not have a full method table.)
20 #define DEBUG_SUBSYSTEM S_OSC
22 #include <linux/module.h>
23 #include <linux/lustre_dlm.h>
24 #include <linux/lustre_mds.h> /* for mds_objid */
25 #include <linux/obd_ost.h>
26 #include <linux/obd_lov.h>
28 static void osc_con2cl(struct lustre_handle *conn, struct ptlrpc_client **cl,
29 struct ptlrpc_connection **connection)
31 struct osc_obd *osc = &class_conn2obd(conn)->u.osc;
32 *cl = osc->osc_client;
33 *connection = osc->osc_conn;
36 static void osc_con2dlmcl(struct lustre_handle *conn, struct ptlrpc_client **cl,
37 struct ptlrpc_connection **connection)
39 struct osc_obd *osc = &class_conn2obd(conn)->u.osc;
40 *cl = osc->osc_ldlm_client;
41 *connection = osc->osc_conn;
44 static int osc_connect(struct lustre_handle *conn, struct obd_device *obd)
46 struct osc_obd *osc = &obd->u.osc;
47 struct obd_import *import;
48 struct ptlrpc_request *request;
49 char *tmp = osc->osc_target_uuid;
50 int rc, size = sizeof(osc->osc_target_uuid);
53 OBD_ALLOC(import, sizeof(*import));
58 rc = class_connect(conn, obd);
62 request = ptlrpc_prep_req(osc->osc_client, osc->osc_conn,
63 OST_CONNECT, 1, &size, &tmp);
65 GOTO(out_disco, rc = -ENOMEM);
67 request->rq_level = LUSTRE_CONN_NEW;
68 request->rq_replen = lustre_msg_size(0, NULL);
70 rc = ptlrpc_queue_wait(request);
71 rc = ptlrpc_check_status(request, rc);
73 CERROR("%s failed: rc = %d\n", __FUNCTION__, rc);
77 /* XXX eventually maybe more refinement */
78 osc->osc_conn->c_level = LUSTRE_CONN_FULL;
80 /* XXX: Make this a handle */
81 osc->osc_connh.addr = request->rq_repmsg->addr;
82 osc->osc_connh.cookie = request->rq_repmsg->cookie;
86 ptlrpc_free_req(request);
89 class_disconnect(conn);
95 static int osc_disconnect(struct lustre_handle *conn)
97 struct ptlrpc_request *request;
98 struct ptlrpc_client *cl;
99 struct ptlrpc_connection *connection;
100 struct osc_obd *osc = &class_conn2obd(conn)->u.osc;
104 osc_con2cl(conn, &cl, &connection);
105 request = ptlrpc_prep_req2(cl, connection, &osc->osc_connh,
106 OST_DISCONNECT, 0, NULL, NULL);
109 request->rq_replen = lustre_msg_size(0, NULL);
111 rc = ptlrpc_queue_wait(request);
114 rc = class_disconnect(conn);
119 ptlrpc_free_req(request);
123 static int osc_getattr(struct lustre_handle *conn, struct obdo *oa)
125 struct ptlrpc_request *request;
126 struct ptlrpc_client *cl;
127 struct osc_obd *osc = &class_conn2obd(conn)->u.osc;
128 struct ptlrpc_connection *connection;
129 struct ost_body *body;
130 int rc, size = sizeof(*body);
133 osc_con2cl(conn, &cl, &connection);
134 request = ptlrpc_prep_req2(cl, connection, &osc->osc_connh,
135 OST_GETATTR, 1, &size, NULL);
139 body = lustre_msg_buf(request->rq_reqmsg, 0);
140 memcpy(&body->oa, oa, sizeof(*oa));
141 body->oa.o_valid = ~0;
143 request->rq_replen = lustre_msg_size(1, &size);
145 rc = ptlrpc_queue_wait(request);
146 rc = ptlrpc_check_status(request, rc);
148 CERROR("%s failed: rc = %d\n", __FUNCTION__, rc);
152 body = lustre_msg_buf(request->rq_repmsg, 0);
153 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
155 memcpy(oa, &body->oa, sizeof(*oa));
159 ptlrpc_free_req(request);
163 static int osc_open(struct lustre_handle *conn, struct obdo *oa,
164 struct lov_stripe_md *md)
166 struct ptlrpc_request *request;
167 struct ptlrpc_client *cl;
168 struct osc_obd *osc = &class_conn2obd(conn)->u.osc;
169 struct ptlrpc_connection *connection;
170 struct ost_body *body;
171 int rc, size = sizeof(*body);
174 osc_con2cl(conn, &cl, &connection);
175 request = ptlrpc_prep_req2(cl, connection, &osc->osc_connh,
176 OST_OPEN, 1, &size, NULL);
180 body = lustre_msg_buf(request->rq_reqmsg, 0);
181 memcpy(&body->oa, oa, sizeof(*oa));
182 body->oa.o_valid = (OBD_MD_FLMODE | OBD_MD_FLID);
184 request->rq_replen = lustre_msg_size(1, &size);
186 rc = ptlrpc_queue_wait(request);
187 rc = ptlrpc_check_status(request, rc);
191 body = lustre_msg_buf(request->rq_repmsg, 0);
192 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
194 memcpy(oa, &body->oa, sizeof(*oa));
198 ptlrpc_free_req(request);
202 static int osc_close(struct lustre_handle *conn, struct obdo *oa,
203 struct lov_stripe_md *md)
205 struct ptlrpc_request *request;
206 struct ptlrpc_client *cl;
207 struct osc_obd *osc = &class_conn2obd(conn)->u.osc;
208 struct ptlrpc_connection *connection;
209 struct ost_body *body;
210 int rc, size = sizeof(*body);
213 osc_con2cl(conn, &cl, &connection);
214 request = ptlrpc_prep_req2(cl, connection, &osc->osc_connh,
215 OST_CLOSE, 1, &size, NULL);
219 oa->o_id = md->lmd_object_id;
220 oa->o_mode = S_IFREG;
221 oa->o_valid = (OBD_MD_FLMODE | OBD_MD_FLID);
222 body = lustre_msg_buf(request->rq_reqmsg, 0);
223 memcpy(&body->oa, oa, sizeof(*oa));
225 request->rq_replen = lustre_msg_size(1, &size);
227 rc = ptlrpc_queue_wait(request);
228 rc = ptlrpc_check_status(request, rc);
232 body = lustre_msg_buf(request->rq_repmsg, 0);
233 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
235 memcpy(oa, &body->oa, sizeof(*oa));
239 ptlrpc_free_req(request);
243 static int osc_setattr(struct lustre_handle *conn, struct obdo *oa)
245 struct ptlrpc_request *request;
246 struct ptlrpc_client *cl;
247 struct osc_obd *osc = &class_conn2obd(conn)->u.osc;
248 struct ptlrpc_connection *connection;
249 struct ost_body *body;
250 int rc, size = sizeof(*body);
253 osc_con2cl(conn, &cl, &connection);
254 request = ptlrpc_prep_req2(cl, connection, &osc->osc_connh,
255 OST_SETATTR, 1, &size, NULL);
259 body = lustre_msg_buf(request->rq_reqmsg, 0);
260 memcpy(&body->oa, oa, sizeof(*oa));
262 request->rq_replen = lustre_msg_size(1, &size);
264 rc = ptlrpc_queue_wait(request);
265 rc = ptlrpc_check_status(request, rc);
269 ptlrpc_free_req(request);
273 static int osc_create(struct lustre_handle *conn, struct obdo *oa,
274 struct lov_stripe_md **ea)
276 struct ptlrpc_request *request;
277 struct ptlrpc_client *cl;
278 struct ptlrpc_connection *connection;
279 struct osc_obd *osc = &class_conn2obd(conn)->u.osc;
280 struct ost_body *body;
281 int rc, size = sizeof(*body);
294 OBD_ALLOC(*ea, oa->o_easize);
297 (*ea)->lmd_size = oa->o_easize;
300 osc_con2cl(conn, &cl, &connection);
301 request = ptlrpc_prep_req2(cl, connection, &osc->osc_connh,
302 OST_CREATE, 1, &size, NULL);
306 body = lustre_msg_buf(request->rq_reqmsg, 0);
307 memcpy(&body->oa, oa, sizeof(*oa));
309 request->rq_replen = lustre_msg_size(1, &size);
311 rc = ptlrpc_queue_wait(request);
312 rc = ptlrpc_check_status(request, rc);
316 body = lustre_msg_buf(request->rq_repmsg, 0);
317 memcpy(oa, &body->oa, sizeof(*oa));
319 (*ea)->lmd_object_id = oa->o_id;
320 (*ea)->lmd_stripe_count = 1;
323 ptlrpc_free_req(request);
327 static int osc_punch(struct lustre_handle *conn, struct obdo *oa,
328 struct lov_stripe_md *md, obd_size count,
331 struct ptlrpc_request *request;
332 struct ptlrpc_client *cl;
333 struct ptlrpc_connection *connection;
334 struct osc_obd *osc = &class_conn2obd(conn)->u.osc;
335 struct ost_body *body;
336 int rc, size = sizeof(*body);
343 osc_con2cl(conn, &cl, &connection);
344 request = ptlrpc_prep_req2(cl, connection, &osc->osc_connh,
345 OST_PUNCH, 1, &size, NULL);
349 body = lustre_msg_buf(request->rq_reqmsg, 0);
350 memcpy(&body->oa, oa, sizeof(*oa));
351 body->oa.o_blocks = count;
352 body->oa.o_valid |= OBD_MD_FLBLOCKS;
354 request->rq_replen = lustre_msg_size(1, &size);
356 rc = ptlrpc_queue_wait(request);
357 rc = ptlrpc_check_status(request, rc);
361 body = lustre_msg_buf(request->rq_repmsg, 0);
362 memcpy(oa, &body->oa, sizeof(*oa));
366 ptlrpc_free_req(request);
370 static int osc_destroy(struct lustre_handle *conn, struct obdo *oa,
371 struct lov_stripe_md *ea)
373 struct ptlrpc_request *request;
374 struct ptlrpc_client *cl;
375 struct ptlrpc_connection *connection;
376 struct osc_obd *osc = &class_conn2obd(conn)->u.osc;
377 struct ost_body *body;
378 int rc, size = sizeof(*body);
385 osc_con2cl(conn, &cl, &connection);
386 request = ptlrpc_prep_req2(cl, connection, &osc->osc_connh,
387 OST_DESTROY, 1, &size, NULL);
391 body = lustre_msg_buf(request->rq_reqmsg, 0);
392 memcpy(&body->oa, oa, sizeof(*oa));
393 body->oa.o_valid = ~0;
395 request->rq_replen = lustre_msg_size(1, &size);
397 rc = ptlrpc_queue_wait(request);
398 rc = ptlrpc_check_status(request, rc);
402 body = lustre_msg_buf(request->rq_repmsg, 0);
403 memcpy(oa, &body->oa, sizeof(*oa));
407 ptlrpc_free_req(request);
411 struct osc_brw_cb_data {
412 bulk_callback_t callback;
418 static void brw_finish(struct ptlrpc_bulk_desc *desc, void *data)
420 struct list_head *tmp, *next;
421 struct osc_brw_cb_data *cb_data = data;
424 if (desc->b_flags & PTL_RPC_FL_INTR)
425 CERROR("got signal\n");
427 /* This feels wrong to me. */
428 list_for_each_safe(tmp, next, &desc->b_page_list) {
429 struct ptlrpc_bulk_page *bulk;
430 bulk = list_entry(tmp, struct ptlrpc_bulk_page, b_link);
432 kunmap(bulk->b_page);
435 if (cb_data->callback)
436 (cb_data->callback)(desc, cb_data->cb_data);
438 ptlrpc_bulk_decref(desc);
439 if (cb_data->obd_data)
440 OBD_FREE(cb_data->obd_data, cb_data->obd_size);
441 OBD_FREE(cb_data, sizeof(*cb_data));
445 static int osc_brw_read(struct lustre_handle *conn, struct lov_stripe_md *md,
446 obd_count page_count, struct page **page_array,
447 obd_size *count, obd_off *offset, obd_flag *flags,
448 bulk_callback_t callback)
450 struct ptlrpc_client *cl;
451 struct ptlrpc_connection *connection;
452 struct ptlrpc_request *request = NULL;
453 struct ptlrpc_bulk_desc *desc = NULL;
454 struct osc_obd *osc = &class_conn2obd(conn)->u.osc;
455 struct ost_body *body;
456 int rc, j, size[3] = {sizeof(*body)};
457 void *iooptr, *nioptr;
458 struct osc_brw_cb_data *cb_data = NULL;
461 size[1] = sizeof(struct obd_ioobj);
462 size[2] = page_count * sizeof(struct niobuf_remote);
464 osc_con2cl(conn, &cl, &connection);
465 request = ptlrpc_prep_req2(cl, connection, &osc->osc_connh,
466 OST_BRW, 3, size, NULL);
470 body = lustre_msg_buf(request->rq_reqmsg, 0);
471 body->data = OBD_BRW_READ;
473 desc = ptlrpc_prep_bulk(connection);
475 GOTO(out_free, rc = -ENOMEM);
476 desc->b_portal = OST_BULK_PORTAL;
477 desc->b_cb = brw_finish;
478 OBD_ALLOC(cb_data, sizeof(*cb_data));
480 GOTO(out_free, rc = -ENOMEM);
481 cb_data->callback = callback;
482 desc->b_cb_data = cb_data;
483 /* XXX end almost identical to brw_write case */
485 iooptr = lustre_msg_buf(request->rq_reqmsg, 1);
486 nioptr = lustre_msg_buf(request->rq_reqmsg, 2);
487 ost_pack_ioo(&iooptr, md, page_count);
488 for (j = 0; j < page_count; j++) {
489 struct ptlrpc_bulk_page *bulk;
490 bulk = ptlrpc_prep_bulk_page(desc);
492 GOTO(out_unmap, rc = -ENOMEM);
494 spin_lock(&connection->c_lock);
495 bulk->b_xid = ++connection->c_xid_out;
496 spin_unlock(&connection->c_lock);
498 bulk->b_buf = kmap(page_array[j]);
499 bulk->b_page = page_array[j];
500 bulk->b_buflen = PAGE_SIZE;
501 ost_pack_niobuf(&nioptr, offset[j], count[j],
502 flags[j], bulk->b_xid);
506 * Register the bulk first, because the reply could arrive out of order,
507 * and we want to be ready for the bulk data.
509 * One reference is released by the bulk callback, the other when
510 * we finish sleeping on it (if we don't have a callback).
512 atomic_set(&desc->b_refcount, callback ? 1 : 2);
513 rc = ptlrpc_register_bulk(desc);
517 request->rq_replen = lustre_msg_size(1, size);
518 rc = ptlrpc_queue_wait(request);
519 rc = ptlrpc_check_status(request, rc);
521 ptlrpc_bulk_decref(desc);
525 /* Callbacks cause asynchronous handling. */
529 l_wait_event_killable(desc->b_waitq, ptlrpc_check_bulk_received(desc));
530 ptlrpc_bulk_decref(desc);
531 if (desc->b_flags & PTL_RPC_FL_INTR)
536 /* Clean up on error. */
538 for (j = 0; j < desc->b_page_count; j++)
539 kunmap(page_array[j]);
542 OBD_FREE(cb_data, sizeof(*cb_data));
543 ptlrpc_free_bulk(desc);
544 ptlrpc_free_req(request);
548 static int osc_brw_write(struct lustre_handle *conn,
549 struct lov_stripe_md *md, obd_count page_count,
550 struct page **pagearray, obd_size *count,
551 obd_off *offset, obd_flag *flags,
552 bulk_callback_t callback)
554 struct ptlrpc_client *cl;
555 struct ptlrpc_connection *connection;
556 struct ptlrpc_request *request = NULL;
557 struct ptlrpc_bulk_desc *desc = NULL;
558 struct osc_obd *osc = &class_conn2obd(conn)->u.osc;
559 struct ost_body *body;
560 struct niobuf_local *local = NULL;
561 struct niobuf_remote *remote;
562 struct osc_brw_cb_data *cb_data = NULL;
563 int rc, j, size[3] = {sizeof(*body)};
564 void *iooptr, *nioptr;
567 size[1] = sizeof(struct obd_ioobj);
568 size[2] = page_count * sizeof(*remote);
570 osc_con2cl(conn, &cl, &connection);
571 request = ptlrpc_prep_req2(cl, connection, &osc->osc_connh,
572 OST_BRW, 3, size, NULL);
576 body = lustre_msg_buf(request->rq_reqmsg, 0);
577 body->data = OBD_BRW_WRITE;
579 OBD_ALLOC(local, page_count * sizeof(*local));
581 GOTO(out_free, rc = -ENOMEM);
583 desc = ptlrpc_prep_bulk(connection);
585 GOTO(out_free, rc = -ENOMEM);
586 desc->b_portal = OSC_BULK_PORTAL;
587 desc->b_cb = brw_finish;
588 OBD_ALLOC(cb_data, sizeof(*cb_data));
590 GOTO(out_free, rc = -ENOMEM);
591 cb_data->callback = callback;
592 desc->b_cb_data = cb_data;
593 /* XXX end almost identical to brw_read case */
594 cb_data->obd_data = local;
595 cb_data->obd_size = page_count * sizeof(*local);
597 iooptr = lustre_msg_buf(request->rq_reqmsg, 1);
598 nioptr = lustre_msg_buf(request->rq_reqmsg, 2);
599 ost_pack_ioo(&iooptr, md, page_count);
600 for (j = 0; j < page_count; j++) {
601 local[j].addr = kmap(pagearray[j]);
602 local[j].offset = offset[j];
603 local[j].len = count[j];
604 ost_pack_niobuf(&nioptr, offset[j], count[j], flags[j], 0);
607 size[1] = page_count * sizeof(struct niobuf_remote);
608 request->rq_replen = lustre_msg_size(2, size);
609 rc = ptlrpc_queue_wait(request);
610 rc = ptlrpc_check_status(request, rc);
614 nioptr = lustre_msg_buf(request->rq_repmsg, 1);
616 GOTO(out_unmap, rc = -EINVAL);
618 if (request->rq_repmsg->buflens[1] !=
619 page_count * sizeof(struct niobuf_remote)) {
620 CERROR("buffer length wrong (%d vs. %d)\n",
621 request->rq_repmsg->buflens[1],
622 page_count * sizeof(struct niobuf_remote));
623 GOTO(out_unmap, rc = -EINVAL);
626 for (j = 0; j < page_count; j++) {
627 struct ptlrpc_bulk_page *page;
629 ost_unpack_niobuf(&nioptr, &remote);
631 page = ptlrpc_prep_bulk_page(desc);
633 GOTO(out_unmap, rc = -ENOMEM);
635 page->b_buf = (void *)(unsigned long)local[j].addr;
636 page->b_buflen = local[j].len;
637 page->b_xid = remote->xid;
640 if (desc->b_page_count != page_count)
644 * One is released when the bulk is complete, the other when we finish
645 * waiting on it. (Callback cases don't sleep, so only one ref for
648 atomic_set(&desc->b_refcount, callback ? 1 : 2);
649 CDEBUG(D_PAGE, "Set refcount of %p to %d\n", desc,
650 atomic_read(&desc->b_refcount));
651 rc = ptlrpc_send_bulk(desc);
655 /* Callbacks cause asynchronous handling. */
659 /* If there's no callback function, sleep here until complete. */
660 l_wait_event_killable(desc->b_waitq, ptlrpc_check_bulk_sent(desc));
661 ptlrpc_bulk_decref(desc);
662 if (desc->b_flags & PTL_RPC_FL_INTR)
666 /* Clean up on error. */
668 for (j = 0; j < page_count; j++)
669 kunmap(pagearray[j]);
673 OBD_FREE(cb_data, sizeof(*cb_data));
675 OBD_FREE(local, page_count * sizeof(*local));
676 ptlrpc_free_bulk(desc);
677 ptlrpc_req_finished(request);
681 static int osc_brw(int cmd, struct lustre_handle *conn,
682 struct lov_stripe_md *md, obd_count page_count,
683 struct page **page_array,
689 if (cmd & OBD_BRW_WRITE)
690 return osc_brw_write(conn, md, page_count, page_array, count,
691 offset, flags, (bulk_callback_t)callback);
693 return osc_brw_read(conn, md, page_count, page_array, count,
694 offset, flags, (bulk_callback_t)callback);
697 static int osc_enqueue(struct lustre_handle *oconn,
698 struct lustre_handle *parent_lock, __u64 *res_id,
699 __u32 type, void *extentp, int extent_len, __u32 mode,
700 int *flags, void *callback, void *data, int datalen,
701 struct lustre_handle *lockh)
703 struct obd_device *obddev = class_conn2obd(oconn);
704 struct osc_obd *osc = &obddev->u.osc;
705 struct ptlrpc_connection *conn;
706 struct ptlrpc_client *cl;
707 struct ldlm_extent *extent = extentp;
711 /* Filesystem locks are given a bit of special treatment: first we
712 * fixup the lock to start and end on page boundaries. */
713 extent->start &= PAGE_MASK;
714 extent->end = (extent->end + PAGE_SIZE - 1) & PAGE_MASK;
716 /* Next, search for already existing extent locks that will cover us */
717 osc_con2dlmcl(oconn, &cl, &conn);
718 rc = ldlm_lock_match(obddev->obd_namespace, res_id, type, extent,
719 sizeof(extent), mode, lockh);
721 /* We already have a lock, and it's referenced */
725 /* Next, search for locks that we can upgrade (if we're trying to write)
726 * or are more than we need (if we're trying to read). Because the VFS
727 * and page cache already protect us locally, lots of readers/writers
728 * can share a single PW lock. */
734 rc = ldlm_lock_match(obddev->obd_namespace, res_id, type, extent,
735 sizeof(extent), mode2, lockh);
738 /* FIXME: This is not incredibly elegant, but it might
739 * be more elegant than adding another parameter to
740 * lock_match. I want a second opinion. */
741 ldlm_lock_addref(lockh, mode);
742 ldlm_lock_decref(lockh, mode2);
747 rc = ldlm_cli_convert(cl, lockh, &osc->osc_connh,
755 rc = ldlm_cli_enqueue(cl, conn, &osc->osc_connh,
756 NULL, obddev->obd_namespace,
757 parent_lock, res_id, type, extent, sizeof(extent),
758 mode, flags, callback, data, datalen, lockh);
762 static int osc_cancel(struct lustre_handle *oconn, __u32 mode,
763 struct lustre_handle *lockh)
767 ldlm_lock_decref(lockh, mode);
772 static int osc_setup(struct obd_device *obddev, obd_count len, void *buf)
774 struct obd_ioctl_data* data = buf;
775 struct osc_obd *osc = &obddev->u.osc;
776 char server_uuid[37];
780 if (data->ioc_inllen1 < 1) {
781 CERROR("osc setup requires a TARGET UUID\n");
785 if (data->ioc_inllen1 > 37) {
786 CERROR("osc TARGET UUID must be less than 38 characters\n");
790 if (data->ioc_inllen2 < 1) {
791 CERROR("osc setup requires a SERVER UUID\n");
795 if (data->ioc_inllen2 > 37) {
796 CERROR("osc SERVER UUID must be less than 38 characters\n");
800 memcpy(osc->osc_target_uuid, data->ioc_inlbuf1, data->ioc_inllen1);
801 memcpy(server_uuid, data->ioc_inlbuf2, MIN(data->ioc_inllen2,
802 sizeof(server_uuid)));
804 osc->osc_conn = ptlrpc_uuid_to_connection(server_uuid);
808 obddev->obd_namespace =
809 ldlm_namespace_new("osc", LDLM_NAMESPACE_CLIENT);
810 if (obddev->obd_namespace == NULL)
811 GOTO(out_conn, rc = -ENOMEM);
813 OBD_ALLOC(osc->osc_client, sizeof(*osc->osc_client));
814 if (osc->osc_client == NULL)
815 GOTO(out_ns, rc = -ENOMEM);
817 OBD_ALLOC(osc->osc_ldlm_client, sizeof(*osc->osc_ldlm_client));
818 if (osc->osc_ldlm_client == NULL)
819 GOTO(out_client, rc = -ENOMEM);
821 ptlrpc_init_client(NULL, NULL, OST_REQUEST_PORTAL, OSC_REPLY_PORTAL,
823 ptlrpc_init_client(NULL, NULL, LDLM_REQUEST_PORTAL, LDLM_REPLY_PORTAL,
824 osc->osc_ldlm_client);
825 osc->osc_client->cli_name = "osc";
826 osc->osc_ldlm_client->cli_name = "ldlm";
832 OBD_FREE(osc->osc_client, sizeof(*osc->osc_client));
834 ldlm_namespace_free(obddev->obd_namespace);
836 ptlrpc_put_connection(osc->osc_conn);
840 static int osc_cleanup(struct obd_device * obddev)
842 struct osc_obd *osc = &obddev->u.osc;
844 ldlm_namespace_free(obddev->obd_namespace);
846 ptlrpc_cleanup_client(osc->osc_client);
847 OBD_FREE(osc->osc_client, sizeof(*osc->osc_client));
848 ptlrpc_cleanup_client(osc->osc_ldlm_client);
849 OBD_FREE(osc->osc_ldlm_client, sizeof(*osc->osc_ldlm_client));
850 ptlrpc_put_connection(osc->osc_conn);
856 static int osc_statfs(struct lustre_handle *conn, struct statfs *sfs)
858 struct ptlrpc_request *request;
859 struct ptlrpc_client *cl;
860 struct osc_obd *osc = &class_conn2obd(conn)->u.osc;
861 struct ptlrpc_connection *connection;
862 struct obd_statfs *osfs;
863 int rc, size = sizeof(*osfs);
866 osc_con2cl(conn, &cl, &connection);
867 request = ptlrpc_prep_req2(cl, connection, &osc->osc_connh,
868 OST_STATFS, 0, NULL, NULL);
872 request->rq_replen = lustre_msg_size(1, &size);
874 rc = ptlrpc_queue_wait(request);
875 rc = ptlrpc_check_status(request, rc);
877 CERROR("%s failed: rc = %d\n", __FUNCTION__, rc);
881 osfs = lustre_msg_buf(request->rq_repmsg, 0);
882 obd_statfs_unpack(osfs, sfs);
886 ptlrpc_free_req(request);
890 struct obd_ops osc_obd_ops = {
892 o_cleanup: osc_cleanup,
893 o_statfs: osc_statfs,
894 o_create: osc_create,
895 o_destroy: osc_destroy,
896 o_getattr: osc_getattr,
897 o_setattr: osc_setattr,
900 o_connect: osc_connect,
901 o_disconnect: osc_disconnect,
904 o_enqueue: osc_enqueue,
908 static int __init osc_init(void)
910 return class_register_type(&osc_obd_ops, LUSTRE_OSC_NAME);
913 static void __exit osc_exit(void)
915 class_unregister_type(LUSTRE_OSC_NAME);
918 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
919 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC) v1.0");
920 MODULE_LICENSE("GPL");
922 module_init(osc_init);
923 module_exit(osc_exit);