1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
4 * Copyright (C) 2001, 2002 Cluster File Systems, Inc.
6 * This code is issued under the GNU General Public License.
7 * See the file COPYING in this distribution
9 * Author Peter Braam <braam@clusterfs.com>
11 * This server is single threaded at present (but can easily be multi
12 * threaded). For testing and management it is treated as an
13 * obd_device, although it does not export a full OBD method table
14 * (the requests are coming in over the wire, so object target
15 * modules do not have a full method table.)
20 #define DEBUG_SUBSYSTEM S_OSC
22 #include <linux/module.h>
23 #include <linux/lustre_dlm.h>
24 #include <linux/lustre_mds.h> /* for mds_objid */
25 #include <linux/obd_ost.h>
26 #include <linux/obd_lov.h>
28 static void osc_con2cl(struct lustre_handle *conn, struct ptlrpc_client **cl,
29 struct ptlrpc_connection **connection,
30 struct lustre_handle **rconn)
32 struct obd_export *export = class_conn2export(conn);
33 struct osc_obd *osc = &export->exp_obd->u.osc;
35 *cl = osc->osc_client;
36 *connection = osc->osc_conn;
37 *rconn = &export->exp_rconnh;
40 static void osc_con2dlmcl(struct lustre_handle *conn, struct ptlrpc_client **cl,
41 struct ptlrpc_connection **connection,
42 struct lustre_handle **rconn)
44 struct obd_export *export = class_conn2export(conn);
45 struct osc_obd *osc = &export->exp_obd->u.osc;
47 *cl = osc->osc_ldlm_client;
48 *connection = osc->osc_conn;
49 *rconn = &export->exp_rconnh;
52 static int osc_connect(struct lustre_handle *conn, struct obd_device *obd)
54 struct osc_obd *osc = &obd->u.osc;
55 //struct obd_import *import;
56 struct ptlrpc_request *request;
57 char *tmp = osc->osc_target_uuid;
58 int rc, size = sizeof(osc->osc_target_uuid);
62 OBD_ALLOC(import, sizeof(*import));
68 rc = class_connect(conn, obd);
72 request = ptlrpc_prep_req(osc->osc_client, osc->osc_conn,
73 OST_CONNECT, 1, &size, &tmp);
75 GOTO(out_disco, rc = -ENOMEM);
77 request->rq_level = LUSTRE_CONN_NEW;
78 request->rq_replen = lustre_msg_size(0, NULL);
79 request->rq_reqmsg->addr = -1;
80 /* Sending our local connection info breaks for local connections
81 request->rq_reqmsg->addr = conn->addr;
82 request->rq_reqmsg->cookie = conn->cookie;
85 rc = ptlrpc_queue_wait(request);
86 rc = ptlrpc_check_status(request, rc);
88 CERROR("%s failed: rc = %d\n", __FUNCTION__, rc);
92 /* XXX eventually maybe more refinement */
93 osc->osc_conn->c_level = LUSTRE_CONN_FULL;
95 class_rconn2export(conn, (struct lustre_handle *)request->rq_repmsg);
99 ptlrpc_free_req(request);
102 class_disconnect(conn);
108 static int osc_disconnect(struct lustre_handle *conn)
110 struct ptlrpc_request *request;
111 struct ptlrpc_client *cl;
112 struct ptlrpc_connection *connection;
113 struct lustre_handle *rconn;
117 osc_con2cl(conn, &cl, &connection, &rconn);
118 request = ptlrpc_prep_req2(cl, connection, rconn,
119 OST_DISCONNECT, 0, NULL, NULL);
122 request->rq_replen = lustre_msg_size(0, NULL);
124 rc = ptlrpc_queue_wait(request);
127 rc = class_disconnect(conn);
132 ptlrpc_free_req(request);
136 static int osc_getattr(struct lustre_handle *conn, struct obdo *oa)
138 struct ptlrpc_request *request;
139 struct ptlrpc_client *cl;
140 struct ptlrpc_connection *connection;
141 struct lustre_handle *rconn;
142 struct ost_body *body;
143 int rc, size = sizeof(*body);
146 osc_con2cl(conn, &cl, &connection, &rconn);
147 request = ptlrpc_prep_req2(cl, connection, rconn,
148 OST_GETATTR, 1, &size, NULL);
152 body = lustre_msg_buf(request->rq_reqmsg, 0);
153 memcpy(&body->oa, oa, sizeof(*oa));
154 body->oa.o_valid = ~0;
156 request->rq_replen = lustre_msg_size(1, &size);
158 rc = ptlrpc_queue_wait(request);
159 rc = ptlrpc_check_status(request, rc);
161 CERROR("%s failed: rc = %d\n", __FUNCTION__, rc);
165 body = lustre_msg_buf(request->rq_repmsg, 0);
166 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
168 memcpy(oa, &body->oa, sizeof(*oa));
172 ptlrpc_free_req(request);
176 static int osc_open(struct lustre_handle *conn, struct obdo *oa,
177 struct lov_stripe_md *md)
179 struct ptlrpc_request *request;
180 struct ptlrpc_client *cl;
181 struct ptlrpc_connection *connection;
182 struct lustre_handle *rconn;
183 struct ost_body *body;
184 int rc, size = sizeof(*body);
187 osc_con2cl(conn, &cl, &connection, &rconn);
188 request = ptlrpc_prep_req2(cl, connection, rconn,
189 OST_OPEN, 1, &size, NULL);
193 body = lustre_msg_buf(request->rq_reqmsg, 0);
194 memcpy(&body->oa, oa, sizeof(*oa));
195 body->oa.o_valid = (OBD_MD_FLMODE | OBD_MD_FLID);
197 request->rq_replen = lustre_msg_size(1, &size);
199 rc = ptlrpc_queue_wait(request);
200 rc = ptlrpc_check_status(request, rc);
204 body = lustre_msg_buf(request->rq_repmsg, 0);
205 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
207 memcpy(oa, &body->oa, sizeof(*oa));
211 ptlrpc_free_req(request);
215 static int osc_close(struct lustre_handle *conn, struct obdo *oa,
216 struct lov_stripe_md *md)
218 struct ptlrpc_request *request;
219 struct ptlrpc_client *cl;
220 struct ptlrpc_connection *connection;
221 struct lustre_handle *rconn;
222 struct ost_body *body;
223 int rc, size = sizeof(*body);
226 osc_con2cl(conn, &cl, &connection, &rconn);
227 request = ptlrpc_prep_req2(cl, connection, rconn,
228 OST_CLOSE, 1, &size, NULL);
232 oa->o_id = md->lmd_object_id;
233 oa->o_mode = S_IFREG;
234 oa->o_valid = (OBD_MD_FLMODE | OBD_MD_FLID);
235 body = lustre_msg_buf(request->rq_reqmsg, 0);
236 memcpy(&body->oa, oa, sizeof(*oa));
238 request->rq_replen = lustre_msg_size(1, &size);
240 rc = ptlrpc_queue_wait(request);
241 rc = ptlrpc_check_status(request, rc);
245 body = lustre_msg_buf(request->rq_repmsg, 0);
246 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
248 memcpy(oa, &body->oa, sizeof(*oa));
252 ptlrpc_free_req(request);
256 static int osc_setattr(struct lustre_handle *conn, struct obdo *oa)
258 struct ptlrpc_request *request;
259 struct ptlrpc_client *cl;
260 struct ptlrpc_connection *connection;
261 struct lustre_handle *rconn;
262 struct ost_body *body;
263 int rc, size = sizeof(*body);
266 osc_con2cl(conn, &cl, &connection, &rconn);
267 request = ptlrpc_prep_req2(cl, connection, rconn,
268 OST_SETATTR, 1, &size, NULL);
272 body = lustre_msg_buf(request->rq_reqmsg, 0);
273 memcpy(&body->oa, oa, sizeof(*oa));
275 request->rq_replen = lustre_msg_size(1, &size);
277 rc = ptlrpc_queue_wait(request);
278 rc = ptlrpc_check_status(request, rc);
282 ptlrpc_free_req(request);
286 static int osc_create(struct lustre_handle *conn, struct obdo *oa,
287 struct lov_stripe_md **ea)
289 struct ptlrpc_request *request;
290 struct ptlrpc_client *cl;
291 struct ptlrpc_connection *connection;
292 struct lustre_handle *rconn;
293 struct ost_body *body;
294 int rc, size = sizeof(*body);
307 OBD_ALLOC(*ea, oa->o_easize);
310 (*ea)->lmd_size = oa->o_easize;
313 osc_con2cl(conn, &cl, &connection, &rconn);
314 request = ptlrpc_prep_req2(cl, connection, rconn,
315 OST_CREATE, 1, &size, NULL);
319 body = lustre_msg_buf(request->rq_reqmsg, 0);
320 memcpy(&body->oa, oa, sizeof(*oa));
322 request->rq_replen = lustre_msg_size(1, &size);
324 rc = ptlrpc_queue_wait(request);
325 rc = ptlrpc_check_status(request, rc);
329 body = lustre_msg_buf(request->rq_repmsg, 0);
330 memcpy(oa, &body->oa, sizeof(*oa));
332 (*ea)->lmd_object_id = oa->o_id;
333 (*ea)->lmd_stripe_count = 1;
336 ptlrpc_free_req(request);
340 static int osc_punch(struct lustre_handle *conn, struct obdo *oa,
341 struct lov_stripe_md *md, obd_size count,
344 struct ptlrpc_request *request;
345 struct ptlrpc_client *cl;
346 struct ptlrpc_connection *connection;
347 struct lustre_handle *rconn;
348 struct ost_body *body;
349 int rc, size = sizeof(*body);
356 osc_con2cl(conn, &cl, &connection, &rconn);
357 request = ptlrpc_prep_req2(cl, connection, rconn,
358 OST_PUNCH, 1, &size, NULL);
362 body = lustre_msg_buf(request->rq_reqmsg, 0);
363 memcpy(&body->oa, oa, sizeof(*oa));
364 body->oa.o_blocks = count;
365 body->oa.o_valid |= OBD_MD_FLBLOCKS;
367 request->rq_replen = lustre_msg_size(1, &size);
369 rc = ptlrpc_queue_wait(request);
370 rc = ptlrpc_check_status(request, rc);
374 body = lustre_msg_buf(request->rq_repmsg, 0);
375 memcpy(oa, &body->oa, sizeof(*oa));
379 ptlrpc_free_req(request);
383 static int osc_destroy(struct lustre_handle *conn, struct obdo *oa,
384 struct lov_stripe_md *ea)
386 struct ptlrpc_request *request;
387 struct ptlrpc_client *cl;
388 struct ptlrpc_connection *connection;
389 struct lustre_handle *rconn;
390 struct ost_body *body;
391 int rc, size = sizeof(*body);
398 osc_con2cl(conn, &cl, &connection, &rconn);
399 request = ptlrpc_prep_req2(cl, connection, rconn,
400 OST_DESTROY, 1, &size, NULL);
404 body = lustre_msg_buf(request->rq_reqmsg, 0);
405 memcpy(&body->oa, oa, sizeof(*oa));
406 body->oa.o_valid = ~0;
408 request->rq_replen = lustre_msg_size(1, &size);
410 rc = ptlrpc_queue_wait(request);
411 rc = ptlrpc_check_status(request, rc);
415 body = lustre_msg_buf(request->rq_repmsg, 0);
416 memcpy(oa, &body->oa, sizeof(*oa));
420 ptlrpc_free_req(request);
424 struct osc_brw_cb_data {
425 brw_callback_t callback;
431 /* Our bulk-unmapping bottom half. */
432 static void unmap_and_decref_bulk_desc(void *data)
434 struct ptlrpc_bulk_desc *desc = data;
435 struct list_head *tmp;
438 /* This feels wrong to me. */
439 list_for_each(tmp, &desc->b_page_list) {
440 struct ptlrpc_bulk_page *bulk;
441 bulk = list_entry(tmp, struct ptlrpc_bulk_page, b_link);
443 kunmap(bulk->b_page);
446 ptlrpc_bulk_decref(desc);
450 static void brw_finish(struct ptlrpc_bulk_desc *desc, void *data)
452 struct osc_brw_cb_data *cb_data = data;
455 if (desc->b_flags & PTL_RPC_FL_INTR)
456 CERROR("got signal\n");
458 if (cb_data->callback)
459 cb_data->callback(cb_data->cb_data);
461 OBD_FREE(cb_data->obd_data, cb_data->obd_size);
462 OBD_FREE(cb_data, sizeof(*cb_data));
464 /* We can't kunmap the desc from interrupt context, so we do it from
465 * the bottom half above. */
466 INIT_TQUEUE(&desc->b_queue, 0, 0);
467 PREPARE_TQUEUE(&desc->b_queue, unmap_and_decref_bulk_desc, desc);
468 schedule_task(&desc->b_queue);
473 static int osc_brw_read(struct lustre_handle *conn, struct lov_stripe_md *md,
474 obd_count page_count, struct page **page_array,
475 obd_size *count, obd_off *offset, obd_flag *flags,
476 brw_callback_t callback, void *data)
478 struct ptlrpc_client *cl;
479 struct ptlrpc_connection *connection;
480 struct lustre_handle *rconn;
481 struct ptlrpc_request *request = NULL;
482 struct ptlrpc_bulk_desc *desc = NULL;
483 struct ost_body *body;
484 struct osc_brw_cb_data *cb_data = NULL;
485 int rc, size[3] = {sizeof(*body)};
486 void *iooptr, *nioptr;
490 size[1] = sizeof(struct obd_ioobj);
491 size[2] = page_count * sizeof(struct niobuf_remote);
493 osc_con2cl(conn, &cl, &connection, &rconn);
494 request = ptlrpc_prep_req2(cl, connection, rconn,
495 OST_BRW, 3, size, NULL);
499 body = lustre_msg_buf(request->rq_reqmsg, 0);
500 body->data = OBD_BRW_READ;
502 desc = ptlrpc_prep_bulk(connection);
504 GOTO(out_req, rc = -ENOMEM);
505 desc->b_portal = OST_BULK_PORTAL;
506 desc->b_cb = brw_finish;
507 OBD_ALLOC(cb_data, sizeof(*cb_data));
509 GOTO(out_desc, rc = -ENOMEM);
511 cb_data->callback = callback;
512 cb_data->cb_data = data;
513 desc->b_cb_data = cb_data;
515 iooptr = lustre_msg_buf(request->rq_reqmsg, 1);
516 nioptr = lustre_msg_buf(request->rq_reqmsg, 2);
517 ost_pack_ioo(&iooptr, md, page_count);
518 /* end almost identical to brw_write case */
520 for (mapped = 0; mapped < page_count; mapped++) {
521 struct ptlrpc_bulk_page *bulk = ptlrpc_prep_bulk_page(desc);
523 GOTO(out_unmap, rc = -ENOMEM);
525 spin_lock(&connection->c_lock);
526 bulk->b_xid = ++connection->c_xid_out;
527 spin_unlock(&connection->c_lock);
529 bulk->b_buf = kmap(page_array[mapped]);
530 bulk->b_page = page_array[mapped];
531 bulk->b_buflen = PAGE_SIZE;
532 ost_pack_niobuf(&nioptr, offset[mapped], count[mapped],
533 flags[mapped], bulk->b_xid);
537 * Register the bulk first, because the reply could arrive out of order,
538 * and we want to be ready for the bulk data.
540 * One reference is released when brw_finish is complete, the
541 * other here when we finish waiting on it if we don't have a callback.
543 * We don't reference the bulk descriptor again here if there is a
544 * callback, so we don't need an additional refcount on it.
546 * On error, we never do the brw_finish, so we handle all decrefs.
549 ptlrpc_bulk_addref(desc);
550 rc = ptlrpc_register_bulk(desc);
554 request->rq_replen = lustre_msg_size(1, size);
555 rc = ptlrpc_queue_wait(request);
556 rc = ptlrpc_check_status(request, rc);
558 /* XXX: Mike, this is the only place I'm not sure of. If we had
559 * an error here, will we always call brw_finish? If yes, then
560 * out_desc_2 will do too much and we should jump to out_desc.
561 * If maybe, then we are screwed, and we need to set things up
562 * so that bulk_sink_callback is called for each bulk page,
563 * even on error so brw_finish is always called. It would need
564 * to be passed an error code as a parameter to know what to do.
566 * That would also help with the partial completion case, so
567 * we could say in brw_finish "these pages are done, don't
568 * restart them" and osc_brw callers can know this.
573 /* Callbacks cause asynchronous handling. */
575 GOTO(out_req, rc = 0);
577 /* If there's no callback function, sleep here until complete. */
578 l_wait_event_killable(desc->b_waitq, ptlrpc_check_bulk_received(desc));
579 if (desc->b_flags & PTL_RPC_FL_INTR)
580 GOTO(out_desc, rc = -EINTR);
584 ptlrpc_bulk_decref(desc);
586 ptlrpc_req_finished(request);
589 /* Clean up on error. */
592 ptlrpc_bulk_decref(desc);
595 kunmap(page_array[mapped]);
596 OBD_FREE(cb_data, sizeof(*cb_data));
600 static int osc_brw_write(struct lustre_handle *conn,
601 struct lov_stripe_md *md, obd_count page_count,
602 struct page **pagearray, obd_size *count,
603 obd_off *offset, obd_flag *flags,
604 brw_callback_t callback, void *data)
606 struct ptlrpc_client *cl;
607 struct ptlrpc_connection *connection;
608 struct lustre_handle *rconn;
609 struct ptlrpc_request *request = NULL;
610 struct ptlrpc_bulk_desc *desc = NULL;
611 struct ost_body *body;
612 struct niobuf_local *local = NULL;
613 struct niobuf_remote *remote;
614 struct osc_brw_cb_data *cb_data = NULL;
615 int rc, j, size[3] = {sizeof(*body)};
616 void *iooptr, *nioptr;
620 size[1] = sizeof(struct obd_ioobj);
621 size[2] = page_count * sizeof(*remote);
623 osc_con2cl(conn, &cl, &connection, &rconn);
624 request = ptlrpc_prep_req2(cl, connection, rconn,
625 OST_BRW, 3, size, NULL);
629 body = lustre_msg_buf(request->rq_reqmsg, 0);
630 body->data = OBD_BRW_WRITE;
632 desc = ptlrpc_prep_bulk(connection);
634 GOTO(out_req, rc = -ENOMEM);
635 desc->b_portal = OSC_BULK_PORTAL;
636 desc->b_cb = brw_finish;
637 OBD_ALLOC(cb_data, sizeof(*cb_data));
639 GOTO(out_desc, rc = -ENOMEM);
641 cb_data->callback = callback;
642 cb_data->cb_data = data;
643 desc->b_cb_data = cb_data;
645 iooptr = lustre_msg_buf(request->rq_reqmsg, 1);
646 nioptr = lustre_msg_buf(request->rq_reqmsg, 2);
647 ost_pack_ioo(&iooptr, md, page_count);
648 /* end almost identical to brw_read case */
650 OBD_ALLOC(local, page_count * sizeof(*local));
652 GOTO(out_cb, rc = -ENOMEM);
654 cb_data->obd_data = local;
655 cb_data->obd_size = page_count * sizeof(*local);
657 for (mapped = 0; mapped < page_count; mapped++) {
658 local[mapped].addr = kmap(pagearray[mapped]);
659 local[mapped].offset = offset[mapped];
660 local[mapped].len = count[mapped];
661 ost_pack_niobuf(&nioptr, offset[mapped], count[mapped],
665 size[1] = page_count * sizeof(*remote);
666 request->rq_replen = lustre_msg_size(2, size);
667 rc = ptlrpc_queue_wait(request);
668 rc = ptlrpc_check_status(request, rc);
672 nioptr = lustre_msg_buf(request->rq_repmsg, 1);
674 GOTO(out_unmap, rc = -EINVAL);
676 if (request->rq_repmsg->buflens[1] != size[1]) {
677 CERROR("buffer length wrong (%d vs. %d)\n",
678 request->rq_repmsg->buflens[1], size[1]);
679 GOTO(out_unmap, rc = -EINVAL);
682 for (j = 0; j < page_count; j++) {
683 struct ptlrpc_bulk_page *bulk;
685 ost_unpack_niobuf(&nioptr, &remote);
687 bulk = ptlrpc_prep_bulk_page(desc);
689 GOTO(out_unmap, rc = -ENOMEM);
691 bulk->b_buf = (void *)(unsigned long)local[j].addr;
692 bulk->b_buflen = local[j].len;
693 bulk->b_xid = remote->xid;
696 if (desc->b_page_count != page_count)
700 * One reference is released when brw_finish is complete, the
701 * other here when we finish waiting on it if we don't have a callback.
703 * We don't reference the bulk descriptor again here if there is a
704 * callback, so we don't need an additional refcount on it.
707 ptlrpc_bulk_addref(desc);
708 rc = ptlrpc_send_bulk(desc);
710 /* XXX: Mike, same question as in osc_brw_read. */
714 /* Callbacks cause asynchronous handling. */
716 GOTO(out_req, rc = 0);
718 /* If there's no callback function, sleep here until complete. */
719 l_wait_event_killable(desc->b_waitq, ptlrpc_check_bulk_sent(desc));
720 if (desc->b_flags & PTL_RPC_FL_INTR)
721 GOTO(out_desc, rc = -EINTR);
725 ptlrpc_bulk_decref(desc);
727 ptlrpc_req_finished(request);
730 /* Clean up on error. */
733 ptlrpc_bulk_decref(desc);
736 kunmap(pagearray[mapped]);
738 OBD_FREE(local, page_count * sizeof(*local));
740 OBD_FREE(cb_data, sizeof(*cb_data));
744 static int osc_brw(int cmd, struct lustre_handle *conn,
745 struct lov_stripe_md *md, obd_count page_count,
746 struct page **page_array, obd_size *count, obd_off *offset,
747 obd_flag *flags, brw_callback_t callback, void *data)
749 if (cmd & OBD_BRW_WRITE)
750 return osc_brw_write(conn, md, page_count, page_array, count,
751 offset, flags, callback, data);
753 return osc_brw_read(conn, md, page_count, page_array, count,
754 offset, flags, callback, data);
757 static int osc_enqueue(struct lustre_handle *conn,
758 struct lustre_handle *parent_lock, __u64 *res_id,
759 __u32 type, void *extentp, int extent_len, __u32 mode,
760 int *flags, void *callback, void *data, int datalen,
761 struct lustre_handle *lockh)
763 struct obd_device *obddev = class_conn2obd(conn);
764 struct ptlrpc_connection *connection;
765 struct ptlrpc_client *cl;
766 struct lustre_handle *rconn;
767 struct ldlm_extent *extent = extentp;
771 /* Filesystem locks are given a bit of special treatment: first we
772 * fixup the lock to start and end on page boundaries. */
773 extent->start &= PAGE_MASK;
774 extent->end = (extent->end + PAGE_SIZE - 1) & PAGE_MASK;
776 /* Next, search for already existing extent locks that will cover us */
777 osc_con2dlmcl(conn, &cl, &connection, &rconn);
778 rc = ldlm_lock_match(obddev->obd_namespace, res_id, type, extent,
779 sizeof(extent), mode, lockh);
781 /* We already have a lock, and it's referenced */
785 /* Next, search for locks that we can upgrade (if we're trying to write)
786 * or are more than we need (if we're trying to read). Because the VFS
787 * and page cache already protect us locally, lots of readers/writers
788 * can share a single PW lock. */
794 rc = ldlm_lock_match(obddev->obd_namespace, res_id, type, extent,
795 sizeof(extent), mode2, lockh);
798 /* FIXME: This is not incredibly elegant, but it might
799 * be more elegant than adding another parameter to
800 * lock_match. I want a second opinion. */
801 ldlm_lock_addref(lockh, mode);
802 ldlm_lock_decref(lockh, mode2);
807 rc = ldlm_cli_convert(cl, lockh, rconn, mode, &flags);
814 rc = ldlm_cli_enqueue(cl, connection, rconn, NULL,obddev->obd_namespace,
815 parent_lock, res_id, type, extent, sizeof(extent),
816 mode, flags, callback, data, datalen, lockh);
820 static int osc_cancel(struct lustre_handle *oconn, __u32 mode,
821 struct lustre_handle *lockh)
825 ldlm_lock_decref(lockh, mode);
830 static int osc_setup(struct obd_device *obddev, obd_count len, void *buf)
832 struct obd_ioctl_data* data = buf;
833 struct osc_obd *osc = &obddev->u.osc;
834 char server_uuid[37];
838 if (data->ioc_inllen1 < 1) {
839 CERROR("osc setup requires a TARGET UUID\n");
843 if (data->ioc_inllen1 > 37) {
844 CERROR("osc TARGET UUID must be less than 38 characters\n");
848 if (data->ioc_inllen2 < 1) {
849 CERROR("osc setup requires a SERVER UUID\n");
853 if (data->ioc_inllen2 > 37) {
854 CERROR("osc SERVER UUID must be less than 38 characters\n");
858 memcpy(osc->osc_target_uuid, data->ioc_inlbuf1, data->ioc_inllen1);
859 memcpy(server_uuid, data->ioc_inlbuf2, MIN(data->ioc_inllen2,
860 sizeof(server_uuid)));
862 osc->osc_conn = ptlrpc_uuid_to_connection(server_uuid);
866 obddev->obd_namespace =
867 ldlm_namespace_new("osc", LDLM_NAMESPACE_CLIENT);
868 if (obddev->obd_namespace == NULL)
869 GOTO(out_conn, rc = -ENOMEM);
871 OBD_ALLOC(osc->osc_client, sizeof(*osc->osc_client));
872 if (osc->osc_client == NULL)
873 GOTO(out_ns, rc = -ENOMEM);
875 OBD_ALLOC(osc->osc_ldlm_client, sizeof(*osc->osc_ldlm_client));
876 if (osc->osc_ldlm_client == NULL)
877 GOTO(out_client, rc = -ENOMEM);
879 ptlrpc_init_client(NULL, NULL, OST_REQUEST_PORTAL, OSC_REPLY_PORTAL,
881 ptlrpc_init_client(NULL, NULL, LDLM_REQUEST_PORTAL, LDLM_REPLY_PORTAL,
882 osc->osc_ldlm_client);
883 osc->osc_client->cli_name = "osc";
884 osc->osc_ldlm_client->cli_name = "ldlm";
890 OBD_FREE(osc->osc_client, sizeof(*osc->osc_client));
892 ldlm_namespace_free(obddev->obd_namespace);
894 ptlrpc_put_connection(osc->osc_conn);
898 static int osc_cleanup(struct obd_device * obddev)
900 struct osc_obd *osc = &obddev->u.osc;
902 ldlm_namespace_free(obddev->obd_namespace);
904 ptlrpc_cleanup_client(osc->osc_client);
905 OBD_FREE(osc->osc_client, sizeof(*osc->osc_client));
906 ptlrpc_cleanup_client(osc->osc_ldlm_client);
907 OBD_FREE(osc->osc_ldlm_client, sizeof(*osc->osc_ldlm_client));
908 ptlrpc_put_connection(osc->osc_conn);
914 static int osc_statfs(struct lustre_handle *conn, struct statfs *sfs)
916 struct ptlrpc_request *request;
917 struct ptlrpc_client *cl;
918 struct ptlrpc_connection *connection;
919 struct lustre_handle *rconn;
920 struct obd_statfs *osfs;
921 int rc, size = sizeof(*osfs);
924 osc_con2cl(conn, &cl, &connection, &rconn);
925 request = ptlrpc_prep_req2(cl, connection, rconn,
926 OST_STATFS, 0, NULL, NULL);
930 request->rq_replen = lustre_msg_size(1, &size);
932 rc = ptlrpc_queue_wait(request);
933 rc = ptlrpc_check_status(request, rc);
935 CERROR("%s failed: rc = %d\n", __FUNCTION__, rc);
939 osfs = lustre_msg_buf(request->rq_repmsg, 0);
940 obd_statfs_unpack(osfs, sfs);
944 ptlrpc_free_req(request);
948 struct obd_ops osc_obd_ops = {
950 o_cleanup: osc_cleanup,
951 o_statfs: osc_statfs,
952 o_create: osc_create,
953 o_destroy: osc_destroy,
954 o_getattr: osc_getattr,
955 o_setattr: osc_setattr,
958 o_connect: osc_connect,
959 o_disconnect: osc_disconnect,
962 o_enqueue: osc_enqueue,
966 static int __init osc_init(void)
968 return class_register_type(&osc_obd_ops, LUSTRE_OSC_NAME);
971 static void __exit osc_exit(void)
973 class_unregister_type(LUSTRE_OSC_NAME);
976 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
977 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC) v1.0");
978 MODULE_LICENSE("GPL");
980 module_init(osc_init);
981 module_exit(osc_exit);