Whamcloud - gitweb
Major fixups for multi-page I/Os in filterobd.
[fs/lustre-release.git] / lustre / osc / osc_request.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Copryright (C) 2001, 2002 Cluster File Systems, Inc.
5  *
6  *  This code is issued under the GNU General Public License.
7  *  See the file COPYING in this distribution
8  *
9  *  Author Peter Braam <braam@clusterfs.com>
10  *
11  *  This server is single threaded at present (but can easily be multi
12  *  threaded). For testing and management it is treated as an
13  *  obd_device, although it does not export a full OBD method table
14  *  (the requests are coming in over the wire, so object target
15  *  modules do not have a full method table.)
16  *
17  */
18
19 #define EXPORT_SYMTAB
20 #define DEBUG_SUBSYSTEM S_OSC
21
22 #include <linux/module.h>
23 #include <linux/lustre_dlm.h>
24 #include <linux/obd_ost.h>
25
26 static void osc_con2cl(struct obd_conn *conn, struct ptlrpc_client **cl,
27                        struct ptlrpc_connection **connection)
28 {
29         struct osc_obd *osc = &conn->oc_dev->u.osc;
30         *cl = osc->osc_client;
31         *connection = osc->osc_conn;
32 }
33
34 static void osc_con2dlmcl(struct obd_conn *conn, struct ptlrpc_client **cl,
35                           struct ptlrpc_connection **connection)
36 {
37         struct osc_obd *osc = &conn->oc_dev->u.osc;
38         *cl = osc->osc_ldlm_client;
39         *connection = osc->osc_conn;
40 }
41
42 static int osc_connect(struct obd_conn *conn)
43 {
44         struct ptlrpc_request *request;
45         struct ptlrpc_client *cl;
46         struct ptlrpc_connection *connection;
47         struct ost_body *body;
48         int rc, size = sizeof(*body);
49         ENTRY;
50
51         osc_con2cl(conn, &cl, &connection);
52         request = ptlrpc_prep_req(cl, connection, OST_CONNECT, 0, NULL, NULL);
53         if (!request)
54                 RETURN(-ENOMEM);
55
56         request->rq_replen = lustre_msg_size(1, &size);
57
58         rc = ptlrpc_queue_wait(request);
59         rc = ptlrpc_check_status(request, rc);
60         if (rc) {
61                 CERROR("%s failed: rc = %d\n", __FUNCTION__, rc);
62                 GOTO(out, rc);
63         }
64
65         body = lustre_msg_buf(request->rq_repmsg, 0);
66         CDEBUG(D_INODE, "received connid %d\n", body->connid);
67
68         conn->oc_id = body->connid;
69         EXIT;
70  out:
71         ptlrpc_free_req(request);
72         return rc;
73 }
74
75 static int osc_disconnect(struct obd_conn *conn)
76 {
77         struct ptlrpc_request *request;
78         struct ptlrpc_client *cl;
79         struct ptlrpc_connection *connection;
80         struct ost_body *body;
81         int rc, size = sizeof(*body);
82         ENTRY;
83
84         osc_con2cl(conn, &cl, &connection);
85         request = ptlrpc_prep_req(cl, connection, OST_DISCONNECT, 1, &size, NULL);
86         if (!request)
87                 RETURN(-ENOMEM);
88
89         body = lustre_msg_buf(request->rq_reqmsg, 0);
90         body->connid = conn->oc_id;
91
92         request->rq_replen = lustre_msg_size(1, &size);
93
94         rc = ptlrpc_queue_wait(request);
95         GOTO(out, rc);
96  out:
97         ptlrpc_free_req(request);
98         return rc;
99 }
100
101 static int osc_getattr(struct obd_conn *conn, struct obdo *oa)
102 {
103         struct ptlrpc_request *request;
104         struct ptlrpc_client *cl;
105         struct ptlrpc_connection *connection;
106         struct ost_body *body;
107         int rc, size = sizeof(*body);
108         ENTRY;
109
110         osc_con2cl(conn, &cl, &connection);
111         request = ptlrpc_prep_req(cl, connection, OST_GETATTR, 1, &size, NULL);
112         if (!request)
113                 RETURN(-ENOMEM);
114
115         body = lustre_msg_buf(request->rq_reqmsg, 0);
116         memcpy(&body->oa, oa, sizeof(*oa));
117         body->connid = conn->oc_id;
118         body->oa.o_valid = ~0;
119
120         request->rq_replen = lustre_msg_size(1, &size);
121
122         rc = ptlrpc_queue_wait(request);
123         rc = ptlrpc_check_status(request, rc);
124         if (rc) {
125                 CERROR("%s failed: rc = %d\n", __FUNCTION__, rc);
126                 GOTO(out, rc);
127         }
128
129         body = lustre_msg_buf(request->rq_repmsg, 0);
130         CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
131         if (oa)
132                 memcpy(oa, &body->oa, sizeof(*oa));
133
134         EXIT;
135  out:
136         ptlrpc_free_req(request);
137         return 0;
138 }
139
140 static int osc_open(struct obd_conn *conn, struct obdo *oa)
141 {
142         struct ptlrpc_request *request;
143         struct ptlrpc_client *cl;
144         struct ptlrpc_connection *connection;
145         struct ost_body *body;
146         int rc, size = sizeof(*body);
147         ENTRY;
148
149         osc_con2cl(conn, &cl, &connection);
150         request = ptlrpc_prep_req(cl, connection, OST_OPEN, 1, &size, NULL);
151         if (!request)
152                 RETURN(-ENOMEM);
153
154         body = lustre_msg_buf(request->rq_reqmsg, 0);
155         memcpy(&body->oa, oa, sizeof(*oa));
156         body->connid = conn->oc_id;
157         if (body->oa.o_valid != (OBD_MD_FLMODE | OBD_MD_FLID))
158                 LBUG();
159
160         request->rq_replen = lustre_msg_size(1, &size);
161
162         rc = ptlrpc_queue_wait(request);
163         rc = ptlrpc_check_status(request, rc);
164         if (rc)
165                 GOTO(out, rc);
166
167         body = lustre_msg_buf(request->rq_repmsg, 0);
168         CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
169         if (oa)
170                 memcpy(oa, &body->oa, sizeof(*oa));
171
172         EXIT;
173  out:
174         ptlrpc_free_req(request);
175         return 0;
176 }
177
178 static int osc_close(struct obd_conn *conn, struct obdo *oa)
179 {
180         struct ptlrpc_request *request;
181         struct ptlrpc_client *cl;
182         struct ptlrpc_connection *connection;
183         struct ost_body *body;
184         int rc, size = sizeof(*body);
185         ENTRY;
186
187         osc_con2cl(conn, &cl, &connection);
188         request = ptlrpc_prep_req(cl, connection, OST_CLOSE, 1, &size, NULL);
189         if (!request)
190                 RETURN(-ENOMEM);
191
192         body = lustre_msg_buf(request->rq_reqmsg, 0);
193         memcpy(&body->oa, oa, sizeof(*oa));
194         body->connid = conn->oc_id;
195
196         request->rq_replen = lustre_msg_size(1, &size);
197
198         rc = ptlrpc_queue_wait(request);
199         rc = ptlrpc_check_status(request, rc);
200         if (rc)
201                 GOTO(out, rc);
202
203         body = lustre_msg_buf(request->rq_repmsg, 0);
204         CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
205         if (oa)
206                 memcpy(oa, &body->oa, sizeof(*oa));
207
208         EXIT;
209  out:
210         ptlrpc_free_req(request);
211         return 0;
212 }
213
214 static int osc_setattr(struct obd_conn *conn, struct obdo *oa)
215 {
216         struct ptlrpc_request *request;
217         struct ptlrpc_client *cl;
218         struct ptlrpc_connection *connection;
219         struct ost_body *body;
220         int rc, size = sizeof(*body);
221         ENTRY;
222
223         osc_con2cl(conn, &cl, &connection);
224         request = ptlrpc_prep_req(cl, connection, OST_SETATTR, 1, &size, NULL);
225         if (!request)
226                 RETURN(-ENOMEM);
227
228         body = lustre_msg_buf(request->rq_reqmsg, 0);
229         memcpy(&body->oa, oa, sizeof(*oa));
230         body->connid = conn->oc_id;
231
232         request->rq_replen = lustre_msg_size(1, &size);
233
234         rc = ptlrpc_queue_wait(request);
235         rc = ptlrpc_check_status(request, rc);
236         GOTO(out, rc);
237
238  out:
239         ptlrpc_free_req(request);
240         return 0;
241 }
242
243 static int osc_create(struct obd_conn *conn, struct obdo *oa)
244 {
245         struct ptlrpc_request *request;
246         struct ptlrpc_client *cl;
247         struct ptlrpc_connection *connection;
248         struct ost_body *body;
249         int rc, size = sizeof(*body);
250         ENTRY;
251
252         if (!oa) {
253                 CERROR("oa NULL\n");
254                 RETURN(-EINVAL);
255         }
256         osc_con2cl(conn, &cl, &connection);
257         request = ptlrpc_prep_req(cl, connection, OST_CREATE, 1, &size, NULL);
258         if (!request)
259                 RETURN(-ENOMEM);
260
261         body = lustre_msg_buf(request->rq_reqmsg, 0);
262         memcpy(&body->oa, oa, sizeof(*oa));
263         body->oa.o_valid = ~0;
264         body->connid = conn->oc_id;
265
266         request->rq_replen = lustre_msg_size(1, &size);
267
268         rc = ptlrpc_queue_wait(request);
269         rc = ptlrpc_check_status(request, rc);
270         if (rc)
271                 GOTO(out, rc);
272
273         body = lustre_msg_buf(request->rq_repmsg, 0);
274         memcpy(oa, &body->oa, sizeof(*oa));
275
276         EXIT;
277  out:
278         ptlrpc_free_req(request);
279         return 0;
280 }
281
282 static int osc_punch(struct obd_conn *conn, struct obdo *oa, obd_size count,
283                      obd_off offset)
284 {
285         struct ptlrpc_request *request;
286         struct ptlrpc_client *cl;
287         struct ptlrpc_connection *connection;
288         struct ost_body *body;
289         int rc, size = sizeof(*body);
290         ENTRY;
291
292         if (!oa) {
293                 CERROR("oa NULL\n");
294                 RETURN(-EINVAL);
295         }
296         osc_con2cl(conn, &cl, &connection);
297         request = ptlrpc_prep_req(cl, connection, OST_PUNCH, 1, &size, NULL);
298         if (!request)
299                 RETURN(-ENOMEM);
300
301         body = lustre_msg_buf(request->rq_reqmsg, 0);
302         memcpy(&body->oa, oa, sizeof(*oa));
303         body->connid = conn->oc_id;
304         body->oa.o_valid = ~0;
305         body->oa.o_size = offset;
306         body->oa.o_blocks = count;
307
308         request->rq_replen = lustre_msg_size(1, &size);
309
310         rc = ptlrpc_queue_wait(request);
311         rc = ptlrpc_check_status(request, rc);
312         if (rc)
313                 GOTO(out, rc);
314
315         body = lustre_msg_buf(request->rq_repmsg, 0);
316         memcpy(oa, &body->oa, sizeof(*oa));
317
318         EXIT;
319  out:
320         ptlrpc_free_req(request);
321         return 0;
322 }
323
324 static int osc_destroy(struct obd_conn *conn, struct obdo *oa)
325 {
326         struct ptlrpc_request *request;
327         struct ptlrpc_client *cl;
328         struct ptlrpc_connection *connection;
329         struct ost_body *body;
330         int rc, size = sizeof(*body);
331         ENTRY;
332
333         if (!oa) {
334                 CERROR("oa NULL\n");
335                 RETURN(-EINVAL);
336         }
337         osc_con2cl(conn, &cl, &connection);
338         request = ptlrpc_prep_req(cl, connection, OST_DESTROY, 1, &size, NULL);
339         if (!request)
340                 RETURN(-ENOMEM);
341
342         body = lustre_msg_buf(request->rq_reqmsg, 0);
343         memcpy(&body->oa, oa, sizeof(*oa));
344         body->connid = conn->oc_id;
345         body->oa.o_valid = ~0;
346
347         request->rq_replen = lustre_msg_size(1, &size);
348
349         rc = ptlrpc_queue_wait(request);
350         rc = ptlrpc_check_status(request, rc);
351         if (rc)
352                 GOTO(out, rc);
353
354         body = lustre_msg_buf(request->rq_repmsg, 0);
355         memcpy(oa, &body->oa, sizeof(*oa));
356
357         EXIT;
358  out:
359         ptlrpc_free_req(request);
360         return 0;
361 }
362
363 static int osc_sendpage(struct ptlrpc_bulk_desc *desc,
364                         struct niobuf_remote *dst, struct niobuf_local *src)
365 {
366         struct ptlrpc_bulk_page *page;
367         ENTRY;
368
369         page = ptlrpc_prep_bulk_page(desc);
370         if (page == NULL)
371                 RETURN(-ENOMEM);
372
373         page->b_buf = (void *)(unsigned long)src->addr;
374         page->b_buflen = src->len;
375         page->b_xid = dst->xid;
376
377         RETURN(0);
378 }
379
380 static int osc_brw_read(struct obd_conn *conn, obd_count num_oa,
381                         struct obdo **oa, obd_count *oa_bufs, struct page **buf,
382                         obd_size *count, obd_off *offset, obd_flag *flags)
383 {
384         struct ptlrpc_client *cl;
385         struct ptlrpc_connection *connection;
386         struct ptlrpc_request *request;
387         struct ost_body *body;
388         struct list_head *tmp;
389         int pages, rc, i, j, size[3] = {sizeof(*body)};
390         void *ptr1, *ptr2;
391         struct ptlrpc_bulk_desc *desc;
392         ENTRY;
393
394         size[1] = num_oa * sizeof(struct obd_ioobj);
395         pages = 0;
396         for (i = 0; i < num_oa; i++)
397                 pages += oa_bufs[i];
398         size[2] = pages * sizeof(struct niobuf_remote);
399
400         osc_con2cl(conn, &cl, &connection);
401         request = ptlrpc_prep_req(cl, connection, OST_BRW, 3, size, NULL);
402         if (!request)
403                 GOTO(out, rc = -ENOMEM);
404
405         body = lustre_msg_buf(request->rq_reqmsg, 0);
406         body->data = OBD_BRW_READ;
407
408         desc = ptlrpc_prep_bulk(connection);
409         if (!desc)
410                 GOTO(out2, rc = -ENOMEM);
411         desc->b_portal = OST_BULK_PORTAL;
412
413         ptr1 = lustre_msg_buf(request->rq_reqmsg, 1);
414         ptr2 = lustre_msg_buf(request->rq_reqmsg, 2);
415         for (pages = 0, i = 0; i < num_oa; i++) {
416                 ost_pack_ioo(&ptr1, oa[i], oa_bufs[i]);
417                 for (j = 0; j < oa_bufs[i]; j++, pages++) {
418                         struct ptlrpc_bulk_page *bulk;
419                         bulk = ptlrpc_prep_bulk_page(desc);
420                         if (bulk == NULL)
421                                 GOTO(out3, rc = -ENOMEM);
422
423                         spin_lock(&connection->c_lock);
424                         bulk->b_xid = ++connection->c_xid_out;
425                         spin_unlock(&connection->c_lock);
426
427                         bulk->b_buf = kmap(buf[pages]);
428                         bulk->b_page = buf[pages];
429                         bulk->b_buflen = PAGE_SIZE;
430                         ost_pack_niobuf(&ptr2, offset[pages], count[pages],
431                                         flags[pages], bulk->b_xid);
432                 }
433         }
434
435         rc = ptlrpc_register_bulk(desc);
436         if (rc)
437                 GOTO(out3, rc);
438
439         request->rq_replen = lustre_msg_size(1, size);
440         rc = ptlrpc_queue_wait(request);
441         rc = ptlrpc_check_status(request, rc);
442         if (rc)
443                 ptlrpc_abort_bulk(desc);
444         GOTO(out3, rc);
445
446  out3:
447         list_for_each(tmp, &desc->b_page_list) {
448                 struct ptlrpc_bulk_page *bulk;
449                 bulk = list_entry(tmp, struct ptlrpc_bulk_page, b_link);
450                 if (bulk->b_buf != NULL)
451                         kunmap(bulk->b_page);
452         }
453         ptlrpc_free_bulk(desc);
454  out2:
455         ptlrpc_free_req(request);
456  out:
457         return rc;
458 }
459
460 static int osc_brw_write(struct obd_conn *conn, obd_count num_oa,
461                          struct obdo **oa, obd_count *oa_bufs,
462                          struct page **pagearray, obd_size *count, obd_off *offset,
463                          obd_flag *flags)
464 {
465         struct ptlrpc_client *cl;
466         struct ptlrpc_connection *connection;
467         struct ptlrpc_request *request;
468         struct ptlrpc_bulk_desc *desc;
469         struct obd_ioobj ioo;
470         struct ost_body *body;
471         struct niobuf_local *local;
472         struct niobuf_remote *remote;
473         long pages;
474         int rc, i, j, size[3] = {sizeof(*body)};
475         void *ptr1, *ptr2;
476         ENTRY;
477
478         size[1] = num_oa * sizeof(ioo);
479         pages = 0;
480         for (i = 0; i < num_oa; i++)
481                 pages += oa_bufs[i];
482         size[2] = pages * sizeof(*remote);
483
484         OBD_ALLOC(local, pages * sizeof(*local));
485         if (local == NULL)
486                 RETURN(-ENOMEM);
487
488         osc_con2cl(conn, &cl, &connection);
489         request = ptlrpc_prep_req(cl, connection, OST_BRW, 3, size, NULL);
490         if (!request)
491                 GOTO(out, rc = -ENOMEM);
492         body = lustre_msg_buf(request->rq_reqmsg, 0);
493         body->data = OBD_BRW_WRITE;
494
495         ptr1 = lustre_msg_buf(request->rq_reqmsg, 1);
496         ptr2 = lustre_msg_buf(request->rq_reqmsg, 2);
497         for (pages = 0, i = 0; i < num_oa; i++) {
498                 ost_pack_ioo(&ptr1, oa[i], oa_bufs[i]);
499                 for (j = 0; j < oa_bufs[i]; j++, pages++) {
500                         local[pages].addr = kmap(pagearray[pages]);
501                         local[pages].offset = offset[pages];
502                         local[pages].len = count[pages];
503                         ost_pack_niobuf(&ptr2, offset[pages], count[pages],
504                                         flags[pages], 0);
505                 }
506         }
507
508         size[1] = pages * sizeof(struct niobuf_remote);
509         request->rq_replen = lustre_msg_size(2, size);
510
511         rc = ptlrpc_queue_wait(request);
512         rc = ptlrpc_check_status(request, rc);
513         if (rc)
514                 GOTO(out2, rc);
515
516         ptr2 = lustre_msg_buf(request->rq_repmsg, 1);
517         if (ptr2 == NULL)
518                 GOTO(out2, rc = -EINVAL);
519
520         if (request->rq_repmsg->buflens[1] !=
521             pages * sizeof(struct niobuf_remote)) {
522                 CERROR("buffer length wrong (%d vs. %ld)\n",
523                        request->rq_repmsg->buflens[1],
524                        pages * sizeof(struct niobuf_remote));
525                 GOTO(out2, rc = -EINVAL);
526         }
527
528         desc = ptlrpc_prep_bulk(connection);
529         desc->b_portal = OSC_BULK_PORTAL;
530
531         for (pages = 0, i = 0; i < num_oa; i++) {
532                 for (j = 0; j < oa_bufs[i]; j++, pages++) {
533                         ost_unpack_niobuf(&ptr2, &remote);
534                         rc = osc_sendpage(desc, remote, &local[pages]);
535                         if (rc)
536                                 GOTO(out3, rc);
537                 }
538         }
539
540         rc = ptlrpc_send_bulk(desc);
541         GOTO(out3, rc);
542
543  out3:
544         ptlrpc_free_bulk(desc);
545  out2:
546         ptlrpc_free_req(request);
547         for (pages = 0, i = 0; i < num_oa; i++)
548                 for (j = 0; j < oa_bufs[i]; j++, pages++)
549                         kunmap(pagearray[pages]);
550  out:
551         OBD_FREE(local, pages * sizeof(*local));
552
553         return rc;
554 }
555
556 static int osc_brw(int rw, struct obd_conn *conn, obd_count num_oa,
557                    struct obdo **oa, obd_count *oa_bufs, struct page **buf,
558                    obd_size *count, obd_off *offset, obd_flag *flags)
559 {
560         if (rw == OBD_BRW_READ)
561                 return osc_brw_read(conn, num_oa, oa, oa_bufs, buf, count,
562                                     offset, flags);
563         else
564                 return osc_brw_write(conn, num_oa, oa, oa_bufs, buf, count,
565                                      offset, flags);
566 }
567
568 static int osc_enqueue(struct obd_conn *oconn, struct ldlm_namespace *ns,
569                        struct ldlm_handle *parent_lock, __u64 *res_id,
570                        __u32 type, struct ldlm_extent *extent, __u32 mode,
571                        int *flags, void *data, int datalen,
572                        struct ldlm_handle *lockh)
573 {
574         struct ptlrpc_connection *conn;
575         struct ptlrpc_client *cl;
576         int rc;
577         __u32 mode2;
578
579         /* Filesystem locks are given a bit of special treatment: first we
580          * fixup the lock to start and end on page boundaries. */
581         extent->start &= PAGE_MASK;
582         extent->end = (extent->end + PAGE_SIZE - 1) & PAGE_MASK;
583
584         /* Next, search for already existing extent locks that will cover us */
585         osc_con2dlmcl(oconn, &cl, &conn);
586         rc = ldlm_local_lock_match(ns, res_id, type, extent, mode, lockh);
587         if (rc == 1) {
588                 /* We already have a lock, and it's referenced */
589                 return 0;
590         }
591
592         /* Next, search for locks that we can upgrade (if we're trying to write)
593          * or are more than we need (if we're trying to read).  Because the VFS
594          * and page cache already protect us locally, lots of readers/writers
595          * can share a single PW lock. */
596         if (mode == LCK_PW)
597                 mode2 = LCK_PR;
598         else
599                 mode2 = LCK_PW;
600
601         rc = ldlm_local_lock_match(ns, res_id, type, extent, mode2, lockh);
602         if (rc == 1) {
603                 int flags;
604                 struct ldlm_lock *lock = ldlm_handle2object(lockh);
605                 /* FIXME: This is not incredibly elegant, but it might
606                  * be more elegant than adding another parameter to
607                  * lock_match.  I want a second opinion. */
608                 ldlm_lock_addref(lock, mode);
609                 ldlm_lock_decref(lock, mode2);
610
611                 if (mode == LCK_PR)
612                         return 0;
613
614                 rc = ldlm_cli_convert(cl, lockh, type, &flags);
615                 if (rc)
616                         LBUG();
617
618                 return rc;
619         }
620
621         rc = ldlm_cli_enqueue(cl, conn, ns, parent_lock, res_id, type,
622                               extent, mode, flags, data, datalen, lockh);
623         return rc;
624 }
625
626 static int osc_cancel(struct obd_conn *oconn, __u32 mode,
627                       struct ldlm_handle *lockh)
628 {
629         struct ldlm_lock *lock;
630         ENTRY;
631
632         lock = ldlm_handle2object(lockh);
633         ldlm_lock_decref(lock, mode);
634
635         RETURN(0);
636 }
637
638 static int osc_setup(struct obd_device *obddev, obd_count len, void *buf)
639 {
640         struct osc_obd *osc = &obddev->u.osc;
641         int rc;
642         ENTRY;
643
644         osc->osc_conn = ptlrpc_uuid_to_connection("ost");
645         if (!osc->osc_conn)
646                 RETURN(-EINVAL);
647
648         OBD_ALLOC(osc->osc_client, sizeof(*osc->osc_client));
649         if (osc->osc_client == NULL)
650                 GOTO(out_conn, rc = -ENOMEM);
651
652         OBD_ALLOC(osc->osc_ldlm_client, sizeof(*osc->osc_ldlm_client));
653         if (osc->osc_ldlm_client == NULL)
654                 GOTO(out_client, rc = -ENOMEM);
655
656         ptlrpc_init_client(NULL, NULL, OST_REQUEST_PORTAL, OSC_REPLY_PORTAL,
657                            osc->osc_client);
658         ptlrpc_init_client(NULL, NULL, LDLM_REQUEST_PORTAL, LDLM_REPLY_PORTAL,
659                            osc->osc_ldlm_client);
660         osc->osc_client->cli_name = "osc";
661         osc->osc_ldlm_client->cli_name = "ldlm";
662
663         MOD_INC_USE_COUNT;
664         RETURN(0);
665
666  out_client:
667         OBD_FREE(osc->osc_client, sizeof(*osc->osc_client));
668  out_conn:
669         ptlrpc_put_connection(osc->osc_conn);
670         return rc;
671 }
672
673 static int osc_cleanup(struct obd_device * obddev)
674 {
675         struct osc_obd *osc = &obddev->u.osc;
676
677         ptlrpc_cleanup_client(osc->osc_client);
678         OBD_FREE(osc->osc_client, sizeof(*osc->osc_client));
679         ptlrpc_cleanup_client(osc->osc_ldlm_client);
680         OBD_FREE(osc->osc_ldlm_client, sizeof(*osc->osc_ldlm_client));
681         ptlrpc_put_connection(osc->osc_conn);
682
683         MOD_DEC_USE_COUNT;
684         return 0;
685 }
686
687 struct obd_ops osc_obd_ops = {
688         o_setup:   osc_setup,
689         o_cleanup: osc_cleanup,
690         o_create: osc_create,
691         o_destroy: osc_destroy,
692         o_getattr: osc_getattr,
693         o_setattr: osc_setattr,
694         o_open: osc_open,
695         o_close: osc_close,
696         o_connect: osc_connect,
697         o_disconnect: osc_disconnect,
698         o_brw: osc_brw,
699         o_punch: osc_punch,
700         o_enqueue: osc_enqueue,
701         o_cancel: osc_cancel
702 };
703
704 static int __init osc_init(void)
705 {
706         obd_register_type(&osc_obd_ops, LUSTRE_OSC_NAME);
707         return 0;
708 }
709
710 static void __exit osc_exit(void)
711 {
712         obd_unregister_type(LUSTRE_OSC_NAME);
713 }
714
715 MODULE_AUTHOR("Peter J. Braam <braam@clusterfs.com>");
716 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC) v1.0");
717 MODULE_LICENSE("GPL");
718
719 module_init(osc_init);
720 module_exit(osc_exit);