Whamcloud - gitweb
Check RPC reply status for both MDC and OSC. This allows us to return
[fs/lustre-release.git] / lustre / osc / osc_request.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Copryright (C) 2001, 2002 Cluster File Systems, Inc.
5  *
6  *  This code is issued under the GNU General Public License.
7  *  See the file COPYING in this distribution
8  *
9  *  Author Peter Braam <braam@clusterfs.com>
10  *
11  *  This server is single threaded at present (but can easily be multi
12  *  threaded). For testing and management it is treated as an
13  *  obd_device, although it does not export a full OBD method table
14  *  (the requests are coming in over the wire, so object target
15  *  modules do not have a full method table.)
16  *
17  */
18
19 #define EXPORT_SYMTAB
20 #define DEBUG_SUBSYSTEM S_OSC
21
22 #include <linux/module.h>
23 #include <linux/lustre_dlm.h>
24 #include <linux/obd_ost.h>
25 #include <linux/lustre_debug.h>
26
27 static void osc_con2cl(struct obd_conn *conn, struct ptlrpc_client **cl,
28                        struct ptlrpc_connection **connection)
29 {
30         struct osc_obd *osc = &conn->oc_dev->u.osc;
31         *cl = osc->osc_client;
32         *connection = osc->osc_conn;
33 }
34
35 static void osc_con2dlmcl(struct obd_conn *conn, struct ptlrpc_client **cl,
36                           struct ptlrpc_connection **connection)
37 {
38         struct osc_obd *osc = &conn->oc_dev->u.osc;
39         *cl = osc->osc_ldlm_client;
40         *connection = osc->osc_conn;
41 }
42
43 static int osc_connect(struct obd_conn *conn)
44 {
45         struct ptlrpc_request *request;
46         struct ptlrpc_client *cl;
47         struct ptlrpc_connection *connection;
48         struct ost_body *body;
49         int rc, size = sizeof(*body);
50         ENTRY;
51
52         osc_con2cl(conn, &cl, &connection);
53         request = ptlrpc_prep_req(cl, connection, OST_CONNECT, 0, NULL, NULL);
54         if (!request)
55                 RETURN(-ENOMEM);
56
57         request->rq_replen = lustre_msg_size(1, &size);
58
59         rc = ptlrpc_queue_wait(request);
60         rc = ptlrpc_check_status(request, rc);
61         if (rc) {
62                 CERROR("%s failed: rc = %d\n", __FUNCTION__, rc);
63                 GOTO(out, rc);
64         }
65
66         body = lustre_msg_buf(request->rq_repmsg, 0);
67         CDEBUG(D_INODE, "received connid %d\n", body->connid);
68
69         conn->oc_id = body->connid;
70         EXIT;
71  out:
72         ptlrpc_free_req(request);
73         return rc;
74 }
75
76 static int osc_disconnect(struct obd_conn *conn)
77 {
78         struct ptlrpc_request *request;
79         struct ptlrpc_client *cl;
80         struct ptlrpc_connection *connection;
81         struct ost_body *body;
82         int rc, size = sizeof(*body);
83         ENTRY;
84
85         osc_con2cl(conn, &cl, &connection);
86         request = ptlrpc_prep_req(cl, connection, OST_DISCONNECT, 1, &size, NULL);
87         if (!request)
88                 RETURN(-ENOMEM);
89
90         body = lustre_msg_buf(request->rq_reqmsg, 0);
91         body->connid = conn->oc_id;
92
93         request->rq_replen = lustre_msg_size(1, &size);
94
95         rc = ptlrpc_queue_wait(request);
96         GOTO(out, rc);
97  out:
98         ptlrpc_free_req(request);
99         return rc;
100 }
101
102 static int osc_getattr(struct obd_conn *conn, struct obdo *oa)
103 {
104         struct ptlrpc_request *request;
105         struct ptlrpc_client *cl;
106         struct ptlrpc_connection *connection;
107         struct ost_body *body;
108         int rc, size = sizeof(*body);
109         ENTRY;
110
111         osc_con2cl(conn, &cl, &connection);
112         request = ptlrpc_prep_req(cl, connection, OST_GETATTR, 1, &size, NULL);
113         if (!request)
114                 RETURN(-ENOMEM);
115
116         body = lustre_msg_buf(request->rq_reqmsg, 0);
117         memcpy(&body->oa, oa, sizeof(*oa));
118         body->connid = conn->oc_id;
119         body->oa.o_valid = ~0;
120
121         request->rq_replen = lustre_msg_size(1, &size);
122
123         rc = ptlrpc_queue_wait(request);
124         rc = ptlrpc_check_status(request, rc);
125         if (rc) {
126                 CERROR("%s failed: rc = %d\n", __FUNCTION__, rc);
127                 GOTO(out, rc);
128         }
129
130         body = lustre_msg_buf(request->rq_repmsg, 0);
131         CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
132         if (oa)
133                 memcpy(oa, &body->oa, sizeof(*oa));
134
135         EXIT;
136  out:
137         ptlrpc_free_req(request);
138         return 0;
139 }
140
141 static int osc_open(struct obd_conn *conn, struct obdo *oa)
142 {
143         struct ptlrpc_request *request;
144         struct ptlrpc_client *cl;
145         struct ptlrpc_connection *connection;
146         struct ost_body *body;
147         int rc, size = sizeof(*body);
148         ENTRY;
149
150         osc_con2cl(conn, &cl, &connection);
151         request = ptlrpc_prep_req(cl, connection, OST_OPEN, 1, &size, NULL);
152         if (!request)
153                 RETURN(-ENOMEM);
154
155         body = lustre_msg_buf(request->rq_reqmsg, 0);
156         memcpy(&body->oa, oa, sizeof(*oa));
157         body->connid = conn->oc_id;
158         if (body->oa.o_valid != (OBD_MD_FLMODE | OBD_MD_FLID))
159                 LBUG();
160
161         request->rq_replen = lustre_msg_size(1, &size);
162
163         rc = ptlrpc_queue_wait(request);
164         rc = ptlrpc_check_status(request, rc);
165         if (rc)
166                 GOTO(out, rc);
167
168         body = lustre_msg_buf(request->rq_repmsg, 0);
169         CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
170         if (oa)
171                 memcpy(oa, &body->oa, sizeof(*oa));
172
173         EXIT;
174  out:
175         ptlrpc_free_req(request);
176         return 0;
177 }
178
179 static int osc_close(struct obd_conn *conn, struct obdo *oa)
180 {
181         struct ptlrpc_request *request;
182         struct ptlrpc_client *cl;
183         struct ptlrpc_connection *connection;
184         struct ost_body *body;
185         int rc, size = sizeof(*body);
186         ENTRY;
187
188         osc_con2cl(conn, &cl, &connection);
189         request = ptlrpc_prep_req(cl, connection, OST_CLOSE, 1, &size, NULL);
190         if (!request)
191                 RETURN(-ENOMEM);
192
193         body = lustre_msg_buf(request->rq_reqmsg, 0);
194         memcpy(&body->oa, oa, sizeof(*oa));
195         body->connid = conn->oc_id;
196
197         request->rq_replen = lustre_msg_size(1, &size);
198
199         rc = ptlrpc_queue_wait(request);
200         rc = ptlrpc_check_status(request, rc);
201         if (rc)
202                 GOTO(out, rc);
203
204         body = lustre_msg_buf(request->rq_repmsg, 0);
205         CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
206         if (oa)
207                 memcpy(oa, &body->oa, sizeof(*oa));
208
209         EXIT;
210  out:
211         ptlrpc_free_req(request);
212         return 0;
213 }
214
215 static int osc_setattr(struct obd_conn *conn, struct obdo *oa)
216 {
217         struct ptlrpc_request *request;
218         struct ptlrpc_client *cl;
219         struct ptlrpc_connection *connection;
220         struct ost_body *body;
221         int rc, size = sizeof(*body);
222         ENTRY;
223
224         osc_con2cl(conn, &cl, &connection);
225         request = ptlrpc_prep_req(cl, connection, OST_SETATTR, 1, &size, NULL);
226         if (!request)
227                 RETURN(-ENOMEM);
228
229         body = lustre_msg_buf(request->rq_reqmsg, 0);
230         memcpy(&body->oa, oa, sizeof(*oa));
231         body->connid = conn->oc_id;
232
233         request->rq_replen = lustre_msg_size(1, &size);
234
235         rc = ptlrpc_queue_wait(request);
236         rc = ptlrpc_check_status(request, rc);
237         GOTO(out, rc);
238
239  out:
240         ptlrpc_free_req(request);
241         return 0;
242 }
243
244 static int osc_create(struct obd_conn *conn, struct obdo *oa)
245 {
246         struct ptlrpc_request *request;
247         struct ptlrpc_client *cl;
248         struct ptlrpc_connection *connection;
249         struct ost_body *body;
250         int rc, size = sizeof(*body);
251         ENTRY;
252
253         if (!oa) {
254                 CERROR("oa NULL\n");
255                 RETURN(-EINVAL);
256         }
257         osc_con2cl(conn, &cl, &connection);
258         request = ptlrpc_prep_req(cl, connection, OST_CREATE, 1, &size, NULL);
259         if (!request)
260                 RETURN(-ENOMEM);
261
262         body = lustre_msg_buf(request->rq_reqmsg, 0);
263         memcpy(&body->oa, oa, sizeof(*oa));
264         body->oa.o_valid = ~0;
265         body->connid = conn->oc_id;
266
267         request->rq_replen = lustre_msg_size(1, &size);
268
269         rc = ptlrpc_queue_wait(request);
270         rc = ptlrpc_check_status(request, rc);
271         if (rc)
272                 GOTO(out, rc);
273
274         body = lustre_msg_buf(request->rq_repmsg, 0);
275         memcpy(oa, &body->oa, sizeof(*oa));
276
277         EXIT;
278  out:
279         ptlrpc_free_req(request);
280         return 0;
281 }
282
283 static int osc_punch(struct obd_conn *conn, struct obdo *oa, obd_size count,
284                      obd_off offset)
285 {
286         struct ptlrpc_request *request;
287         struct ptlrpc_client *cl;
288         struct ptlrpc_connection *connection;
289         struct ost_body *body;
290         int rc, size = sizeof(*body);
291         ENTRY;
292
293         if (!oa) {
294                 CERROR("oa NULL\n");
295                 RETURN(-EINVAL);
296         }
297         osc_con2cl(conn, &cl, &connection);
298         request = ptlrpc_prep_req(cl, connection, OST_PUNCH, 1, &size, NULL);
299         if (!request)
300                 RETURN(-ENOMEM);
301
302         body = lustre_msg_buf(request->rq_reqmsg, 0);
303         memcpy(&body->oa, oa, sizeof(*oa));
304         body->connid = conn->oc_id;
305         body->oa.o_valid = ~0;
306         body->oa.o_size = offset;
307         body->oa.o_blocks = count;
308
309         request->rq_replen = lustre_msg_size(1, &size);
310
311         rc = ptlrpc_queue_wait(request);
312         rc = ptlrpc_check_status(request, rc);
313         if (rc)
314                 GOTO(out, rc);
315
316         body = lustre_msg_buf(request->rq_repmsg, 0);
317         memcpy(oa, &body->oa, sizeof(*oa));
318
319         EXIT;
320  out:
321         ptlrpc_free_req(request);
322         return 0;
323 }
324
325 static int osc_destroy(struct obd_conn *conn, struct obdo *oa)
326 {
327         struct ptlrpc_request *request;
328         struct ptlrpc_client *cl;
329         struct ptlrpc_connection *connection;
330         struct ost_body *body;
331         int rc, size = sizeof(*body);
332         ENTRY;
333
334         if (!oa) {
335                 CERROR("oa NULL\n");
336                 RETURN(-EINVAL);
337         }
338         osc_con2cl(conn, &cl, &connection);
339         request = ptlrpc_prep_req(cl, connection, OST_DESTROY, 1, &size, NULL);
340         if (!request)
341                 RETURN(-ENOMEM);
342
343         body = lustre_msg_buf(request->rq_reqmsg, 0);
344         memcpy(&body->oa, oa, sizeof(*oa));
345         body->connid = conn->oc_id;
346         body->oa.o_valid = ~0;
347
348         request->rq_replen = lustre_msg_size(1, &size);
349
350         rc = ptlrpc_queue_wait(request);
351         rc = ptlrpc_check_status(request, rc);
352         if (rc)
353                 GOTO(out, rc);
354
355         body = lustre_msg_buf(request->rq_repmsg, 0);
356         memcpy(oa, &body->oa, sizeof(*oa));
357
358         EXIT;
359  out:
360         ptlrpc_free_req(request);
361         return 0;
362 }
363
364 static int osc_sendpage(struct ptlrpc_bulk_desc *desc,
365                         struct niobuf_remote *dst, struct niobuf_local *src)
366 {
367         struct ptlrpc_bulk_page *page;
368         ENTRY;
369
370         ASSERT_FILE_OFFSET(NTOH__u64(src->offset),
371                            RETURN(dump_rniobuf(dst) | dump_lniobuf(src)));
372         page = ptlrpc_prep_bulk_page(desc);
373         if (page == NULL)
374                 RETURN(-ENOMEM);
375
376         page->b_buf = (void *)(unsigned long)src->addr;
377         page->b_buflen = src->len;
378         page->b_xid = dst->xid;
379
380         RETURN(0);
381 }
382
383 static int osc_brw_read(struct obd_conn *conn, obd_count num_oa,
384                         struct obdo **oa, obd_count *oa_bufs, struct page **buf,
385                         obd_size *count, obd_off *offset, obd_flag *flags)
386 {
387         struct ptlrpc_client *cl;
388         struct ptlrpc_connection *connection;
389         struct ptlrpc_request *request;
390         struct ost_body *body;
391         struct list_head *tmp, *next;
392         int pages, rc, i, j, size[3] = {sizeof(*body)};
393         void *ptr1, *ptr2;
394         struct ptlrpc_bulk_desc *desc;;
395         ENTRY;
396
397         size[1] = num_oa * sizeof(struct obd_ioobj);
398         pages = 0;
399         for (i = 0; i < num_oa; i++)
400                 pages += oa_bufs[i];
401         size[2] = pages * sizeof(struct niobuf_remote);
402
403         osc_con2cl(conn, &cl, &connection);
404         request = ptlrpc_prep_req(cl, connection, OST_BRW, 3, size, NULL);
405         if (!request)
406                 GOTO(out3, rc = -ENOMEM);
407
408         body = lustre_msg_buf(request->rq_reqmsg, 0);
409         body->data = OBD_BRW_READ;
410
411         desc = ptlrpc_prep_bulk(connection);
412         if (desc == NULL)
413                 GOTO(out2, rc = -ENOMEM);
414         desc->b_portal = OST_BULK_PORTAL;
415
416         ptr1 = lustre_msg_buf(request->rq_reqmsg, 1);
417         ptr2 = lustre_msg_buf(request->rq_reqmsg, 2);
418         for (pages = 0, i = 0; i < num_oa; i++) {
419                 ost_pack_ioo(&ptr1, oa[i], oa_bufs[i]);
420                 for (j = 0; j < oa_bufs[i]; j++, pages++) {
421                         struct ptlrpc_bulk_page *page;
422                         page = ptlrpc_prep_bulk_page(desc);
423                         if (page == NULL)
424                                 GOTO(out, rc = -ENOMEM);
425
426                         spin_lock(&connection->c_lock);
427                         page->b_xid = ++connection->c_xid_out;
428                         spin_unlock(&connection->c_lock);
429
430                         page->b_buf = kmap(buf[pages]);
431                         page->b_buflen = PAGE_SIZE;
432                         ost_pack_niobuf(&ptr2, offset[pages], count[pages],
433                                         flags[pages], page->b_xid);
434                 }
435         }
436
437         rc = ptlrpc_register_bulk(desc);
438         if (rc)
439                 GOTO(out, rc);
440
441         request->rq_replen = lustre_msg_size(1, size);
442         rc = ptlrpc_queue_wait(request);
443         rc = ptlrpc_check_status(request, rc);
444         if (rc)
445                 ptlrpc_abort_bulk(desc);
446         GOTO(out, rc);
447
448  out:
449         list_for_each_safe(tmp, next, &desc->b_page_list) {
450                 struct ptlrpc_bulk_page *page;
451                 page = list_entry(tmp, struct ptlrpc_bulk_page, b_link);
452
453                 if (page->b_buf != NULL)
454                         kunmap(page->b_buf);
455         }
456
457         ptlrpc_free_bulk(desc);
458  out2:
459         ptlrpc_free_req(request);
460  out3:
461         return rc;
462 }
463
464 static int osc_brw_write(struct obd_conn *conn, obd_count num_oa,
465                          struct obdo **oa, obd_count *oa_bufs,
466                          struct page **buf, obd_size *count, obd_off *offset,
467                          obd_flag *flags)
468 {
469         struct ptlrpc_client *cl;
470         struct ptlrpc_connection *connection;
471         struct ptlrpc_request *request;
472         struct ptlrpc_bulk_desc *desc;
473         struct obd_ioobj ioo;
474         struct ost_body *body;
475         struct niobuf_local *local;
476         struct niobuf_remote *remote;
477         long pages;
478         int rc, i, j, size[3] = {sizeof(*body)};
479         void *ptr1, *ptr2;
480         ENTRY;
481
482         size[1] = num_oa * sizeof(ioo);
483         pages = 0;
484         for (i = 0; i < num_oa; i++)
485                 pages += oa_bufs[i];
486         size[2] = pages * sizeof(*remote);
487
488         OBD_ALLOC(local, pages * sizeof(*local));
489         if (local == NULL)
490                 RETURN(-ENOMEM);
491
492         osc_con2cl(conn, &cl, &connection);
493         request = ptlrpc_prep_req(cl, connection, OST_BRW, 3, size, NULL);
494         if (!request)
495                 GOTO(out, rc = -ENOMEM);
496         body = lustre_msg_buf(request->rq_reqmsg, 0);
497         body->data = OBD_BRW_WRITE;
498
499         ptr1 = lustre_msg_buf(request->rq_reqmsg, 1);
500         ptr2 = lustre_msg_buf(request->rq_reqmsg, 2);
501         for (pages = 0, i = 0; i < num_oa; i++) {
502                 ost_pack_ioo(&ptr1, oa[i], oa_bufs[i]);
503                 for (j = 0; j < oa_bufs[i]; j++, pages++) {
504                         local[pages].addr = (__u64)(long)kmap(buf[pages]);
505                         local[pages].offset = offset[pages];
506                         local[pages].len = count[pages];
507                         ost_pack_niobuf(&ptr2, offset[pages], count[pages],
508                                         flags[pages], 0);
509                 }
510         }
511
512         size[1] = pages * sizeof(struct niobuf_remote);
513         request->rq_replen = lustre_msg_size(2, size);
514
515         rc = ptlrpc_queue_wait(request);
516         rc = ptlrpc_check_status(request, rc);
517         if (rc)
518                 GOTO(out2, rc);
519
520         ptr2 = lustre_msg_buf(request->rq_repmsg, 1);
521         if (ptr2 == NULL)
522                 GOTO(out2, rc = -EINVAL);
523
524         if (request->rq_repmsg->buflens[1] !=
525             pages * sizeof(struct niobuf_remote)) {
526                 CERROR("buffer length wrong (%d vs. %ld)\n",
527                        request->rq_repmsg->buflens[1],
528                        pages * sizeof(struct niobuf_remote));
529                 GOTO(out2, rc = -EINVAL);
530         }
531
532         desc = ptlrpc_prep_bulk(connection);
533         desc->b_portal = OSC_BULK_PORTAL;
534
535         for (pages = 0, i = 0; i < num_oa; i++) {
536                 for (j = 0; j < oa_bufs[i]; j++, pages++) {
537                         ost_unpack_niobuf(&ptr2, &remote);
538                         rc = osc_sendpage(desc, remote, &local[pages]);
539                         if (rc)
540                                 GOTO(out3, rc);
541                 }
542         }
543
544         rc = ptlrpc_send_bulk(desc);
545         GOTO(out3, rc);
546
547  out3:
548         ptlrpc_free_bulk(desc);
549  out2:
550         ptlrpc_free_req(request);
551         for (pages = 0, i = 0; i < num_oa; i++)
552                 for (j = 0; j < oa_bufs[i]; j++, pages++)
553                         kunmap(buf[pages]);
554  out:
555         OBD_FREE(local, pages * sizeof(*local));
556
557         return rc;
558 }
559
560 static int osc_brw(int rw, struct obd_conn *conn, obd_count num_oa,
561                    struct obdo **oa, obd_count *oa_bufs, struct page **buf,
562                    obd_size *count, obd_off *offset, obd_flag *flags)
563 {
564         if (rw == OBD_BRW_READ)
565                 return osc_brw_read(conn, num_oa, oa, oa_bufs, buf, count,
566                                     offset, flags);
567         else
568                 return osc_brw_write(conn, num_oa, oa, oa_bufs, buf, count,
569                                      offset, flags);
570 }
571
572 static int osc_enqueue(struct obd_conn *oconn, struct ldlm_namespace *ns,
573                        struct ldlm_handle *parent_lock, __u64 *res_id,
574                        __u32 type, struct ldlm_extent *extent, __u32 mode,
575                        int *flags, void *data, int datalen,
576                        struct ldlm_handle *lockh)
577 {
578         struct ptlrpc_connection *conn;
579         struct ptlrpc_client *cl;
580         int rc;
581         __u32 mode2;
582
583         /* Filesystem locks are given a bit of special treatment: first we
584          * fixup the lock to start and end on page boundaries. */
585         extent->start &= PAGE_MASK;
586         extent->end = (extent->end + PAGE_SIZE - 1) & PAGE_MASK;
587
588         /* Next, search for already existing extent locks that will cover us */
589         osc_con2dlmcl(oconn, &cl, &conn);
590         rc = ldlm_local_lock_match(ns, res_id, type, extent, mode, lockh);
591         if (rc == 1) {
592                 /* We already have a lock, and it's referenced */
593                 return 0;
594         }
595
596         /* Next, search for locks that we can upgrade (if we're trying to write)
597          * or are more than we need (if we're trying to read).  Because the VFS
598          * and page cache already protect us locally, lots of readers/writers
599          * can share a single PW lock. */
600         if (mode == LCK_PW)
601                 mode2 = LCK_PR;
602         else
603                 mode2 = LCK_PW;
604
605         rc = ldlm_local_lock_match(ns, res_id, type, extent, mode2, lockh);
606         if (rc == 1) {
607                 int flags;
608                 struct ldlm_lock *lock = ldlm_handle2object(lockh);
609                 /* FIXME: This is not incredibly elegant, but it might
610                  * be more elegant than adding another parameter to
611                  * lock_match.  I want a second opinion. */
612                 ldlm_lock_addref(lock, mode);
613                 ldlm_lock_decref(lock, mode2);
614
615                 if (mode == LCK_PR)
616                         return 0;
617
618                 rc = ldlm_cli_convert(cl, lockh, type, &flags);
619                 if (rc)
620                         LBUG();
621
622                 return rc;
623         }
624
625         rc = ldlm_cli_enqueue(cl, conn, ns, parent_lock, res_id, type,
626                               extent, mode, flags, data, datalen, lockh);
627         return rc;
628 }
629
630 static int osc_cancel(struct obd_conn *oconn, __u32 mode,
631                       struct ldlm_handle *lockh)
632 {
633         struct ldlm_lock *lock;
634         ENTRY;
635
636         lock = ldlm_handle2object(lockh);
637         ldlm_lock_decref(lock, mode);
638
639         RETURN(0);
640 }
641
642 static int osc_setup(struct obd_device *obddev, obd_count len, void *buf)
643 {
644         struct osc_obd *osc = &obddev->u.osc;
645         int rc;
646         ENTRY;
647
648         osc->osc_conn = ptlrpc_uuid_to_connection("ost");
649         if (!osc->osc_conn)
650                 RETURN(-EINVAL);
651
652         OBD_ALLOC(osc->osc_client, sizeof(*osc->osc_client));
653         if (osc->osc_client == NULL)
654                 GOTO(out_conn, rc = -ENOMEM);
655
656         OBD_ALLOC(osc->osc_ldlm_client, sizeof(*osc->osc_ldlm_client));
657         if (osc->osc_ldlm_client == NULL)
658                 GOTO(out_client, rc = -ENOMEM);
659
660         ptlrpc_init_client(NULL, NULL, OST_REQUEST_PORTAL, OSC_REPLY_PORTAL,
661                            osc->osc_client);
662         ptlrpc_init_client(NULL, NULL, LDLM_REQUEST_PORTAL, LDLM_REPLY_PORTAL,
663                            osc->osc_ldlm_client);
664         osc->osc_client->cli_name = "osc";
665         osc->osc_ldlm_client->cli_name = "ldlm";
666
667         MOD_INC_USE_COUNT;
668         RETURN(0);
669
670  out_client:
671         OBD_FREE(osc->osc_client, sizeof(*osc->osc_client));
672  out_conn:
673         ptlrpc_put_connection(osc->osc_conn);
674         return rc;
675 }
676
677 static int osc_cleanup(struct obd_device * obddev)
678 {
679         struct osc_obd *osc = &obddev->u.osc;
680
681         ptlrpc_cleanup_client(osc->osc_client);
682         OBD_FREE(osc->osc_client, sizeof(*osc->osc_client));
683         ptlrpc_cleanup_client(osc->osc_ldlm_client);
684         OBD_FREE(osc->osc_ldlm_client, sizeof(*osc->osc_ldlm_client));
685         ptlrpc_put_connection(osc->osc_conn);
686
687         MOD_DEC_USE_COUNT;
688         return 0;
689 }
690
691 struct obd_ops osc_obd_ops = {
692         o_setup:   osc_setup,
693         o_cleanup: osc_cleanup,
694         o_create: osc_create,
695         o_destroy: osc_destroy,
696         o_getattr: osc_getattr,
697         o_setattr: osc_setattr,
698         o_open: osc_open,
699         o_close: osc_close,
700         o_connect: osc_connect,
701         o_disconnect: osc_disconnect,
702         o_brw: osc_brw,
703         o_punch: osc_punch,
704         o_enqueue: osc_enqueue,
705         o_cancel: osc_cancel
706 };
707
708 static int __init osc_init(void)
709 {
710         obd_register_type(&osc_obd_ops, LUSTRE_OSC_NAME);
711         return 0;
712 }
713
714 static void __exit osc_exit(void)
715 {
716         obd_unregister_type(LUSTRE_OSC_NAME);
717 }
718
719 MODULE_AUTHOR("Peter J. Braam <braam@clusterfs.com>");
720 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC) v1.0");
721 MODULE_LICENSE("GPL");
722
723 module_init(osc_init);
724 module_exit(osc_exit);