Whamcloud - gitweb
* Split struct niobuf into niobuf_local and niobuf_remote
[fs/lustre-release.git] / lustre / osc / osc_request.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Copryright (C) 2001, 2002 Cluster File Systems, Inc.
5  *
6  *  This code is issued under the GNU General Public License.
7  *  See the file COPYING in this distribution
8  *
9  *  Author Peter Braam <braam@clusterfs.com>
10  *
11  *  This server is single threaded at present (but can easily be multi
12  *  threaded). For testing and management it is treated as an
13  *  obd_device, although it does not export a full OBD method table
14  *  (the requests are coming in over the wire, so object target
15  *  modules do not have a full method table.)
16  *
17  */
18
19 #define EXPORT_SYMTAB
20 #define DEBUG_SUBSYSTEM S_OSC
21
22 #include <linux/module.h>
23 #include <linux/lustre_dlm.h>
24 #include <linux/obd_ost.h>
25
26 static void osc_con2cl(struct obd_conn *conn, struct ptlrpc_client **cl,
27                        struct ptlrpc_connection **connection)
28 {
29         struct osc_obd *osc = &conn->oc_dev->u.osc;
30         *cl = osc->osc_client;
31         *connection = osc->osc_conn;
32 }
33
34 static void osc_con2dlmcl(struct obd_conn *conn, struct ptlrpc_client **cl,
35                           struct ptlrpc_connection **connection)
36 {
37         struct osc_obd *osc = &conn->oc_dev->u.osc;
38         *cl = osc->osc_ldlm_client;
39         *connection = osc->osc_conn;
40 }
41
42 static int osc_connect(struct obd_conn *conn)
43 {
44         struct ptlrpc_request *request;
45         struct ptlrpc_client *cl;
46         struct ptlrpc_connection *connection;
47         struct ost_body *body;
48         int rc, size = sizeof(*body);
49         ENTRY;
50
51         osc_con2cl(conn, &cl, &connection);
52         request = ptlrpc_prep_req(cl, connection, OST_CONNECT, 0, NULL, NULL);
53         if (!request)
54                 RETURN(-ENOMEM);
55
56         request->rq_replen = lustre_msg_size(1, &size);
57
58         rc = ptlrpc_queue_wait(request);
59         if (rc)
60                 GOTO(out, rc);
61
62         body = lustre_msg_buf(request->rq_repmsg, 0);
63         CDEBUG(D_INODE, "received connid %d\n", body->connid);
64
65         conn->oc_id = body->connid;
66         EXIT;
67  out:
68         ptlrpc_free_req(request);
69         return rc;
70 }
71
72 static int osc_disconnect(struct obd_conn *conn)
73 {
74         struct ptlrpc_request *request;
75         struct ptlrpc_client *cl;
76         struct ptlrpc_connection *connection;
77         struct ost_body *body;
78         int rc, size = sizeof(*body);
79         ENTRY;
80
81         osc_con2cl(conn, &cl, &connection);
82         request = ptlrpc_prep_req(cl, connection, OST_DISCONNECT, 1, &size, NULL);
83         if (!request)
84                 RETURN(-ENOMEM);
85
86         body = lustre_msg_buf(request->rq_reqmsg, 0);
87         body->connid = conn->oc_id;
88
89         request->rq_replen = lustre_msg_size(1, &size);
90
91         rc = ptlrpc_queue_wait(request);
92         GOTO(out, rc);
93  out:
94         ptlrpc_free_req(request);
95         return rc;
96 }
97
98 static int osc_getattr(struct obd_conn *conn, struct obdo *oa)
99 {
100         struct ptlrpc_request *request;
101         struct ptlrpc_client *cl;
102         struct ptlrpc_connection *connection;
103         struct ost_body *body;
104         int rc, size = sizeof(*body);
105         ENTRY;
106
107         osc_con2cl(conn, &cl, &connection);
108         request = ptlrpc_prep_req(cl, connection, OST_GETATTR, 1, &size, NULL);
109         if (!request)
110                 RETURN(-ENOMEM);
111
112         body = lustre_msg_buf(request->rq_reqmsg, 0);
113         memcpy(&body->oa, oa, sizeof(*oa));
114         body->connid = conn->oc_id;
115         body->oa.o_valid = ~0;
116
117         request->rq_replen = lustre_msg_size(1, &size);
118
119         rc = ptlrpc_queue_wait(request);
120         if (rc)
121                 GOTO(out, rc);
122
123         body = lustre_msg_buf(request->rq_repmsg, 0);
124         CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
125         if (oa)
126                 memcpy(oa, &body->oa, sizeof(*oa));
127
128         EXIT;
129  out:
130         ptlrpc_free_req(request);
131         return 0;
132 }
133
134 static int osc_open(struct obd_conn *conn, struct obdo *oa)
135 {
136         struct ptlrpc_request *request;
137         struct ptlrpc_client *cl;
138         struct ptlrpc_connection *connection;
139         struct ost_body *body;
140         int rc, size = sizeof(*body);
141         ENTRY;
142
143         osc_con2cl(conn, &cl, &connection);
144         request = ptlrpc_prep_req(cl, connection, OST_OPEN, 1, &size, NULL);
145         if (!request)
146                 RETURN(-ENOMEM);
147
148         body = lustre_msg_buf(request->rq_reqmsg, 0);
149         memcpy(&body->oa, oa, sizeof(*oa));
150         body->connid = conn->oc_id;
151         if (body->oa.o_valid != (OBD_MD_FLMODE | OBD_MD_FLID))
152                 LBUG();
153
154         request->rq_replen = lustre_msg_size(1, &size);
155
156         rc = ptlrpc_queue_wait(request);
157         if (rc)
158                 GOTO(out, rc);
159
160         body = lustre_msg_buf(request->rq_repmsg, 0);
161         CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
162         if (oa)
163                 memcpy(oa, &body->oa, sizeof(*oa));
164
165         EXIT;
166  out:
167         ptlrpc_free_req(request);
168         return 0;
169 }
170
171 static int osc_close(struct obd_conn *conn, struct obdo *oa)
172 {
173         struct ptlrpc_request *request;
174         struct ptlrpc_client *cl;
175         struct ptlrpc_connection *connection;
176         struct ost_body *body;
177         int rc, size = sizeof(*body);
178         ENTRY;
179
180         osc_con2cl(conn, &cl, &connection);
181         request = ptlrpc_prep_req(cl, connection, OST_CLOSE, 1, &size, NULL);
182         if (!request)
183                 RETURN(-ENOMEM);
184
185         body = lustre_msg_buf(request->rq_reqmsg, 0);
186         memcpy(&body->oa, oa, sizeof(*oa));
187         body->connid = conn->oc_id;
188
189         request->rq_replen = lustre_msg_size(1, &size);
190
191         rc = ptlrpc_queue_wait(request);
192         if (rc)
193                 GOTO(out, rc);
194
195         body = lustre_msg_buf(request->rq_repmsg, 0);
196         CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
197         if (oa)
198                 memcpy(oa, &body->oa, sizeof(*oa));
199
200         EXIT;
201  out:
202         ptlrpc_free_req(request);
203         return 0;
204 }
205
206 static int osc_setattr(struct obd_conn *conn, struct obdo *oa)
207 {
208         struct ptlrpc_request *request;
209         struct ptlrpc_client *cl;
210         struct ptlrpc_connection *connection;
211         struct ost_body *body;
212         int rc, size = sizeof(*body);
213         ENTRY;
214
215         osc_con2cl(conn, &cl, &connection);
216         request = ptlrpc_prep_req(cl, connection, OST_SETATTR, 1, &size, NULL);
217         if (!request)
218                 RETURN(-ENOMEM);
219
220         body = lustre_msg_buf(request->rq_reqmsg, 0);
221         memcpy(&body->oa, oa, sizeof(*oa));
222         body->connid = conn->oc_id;
223
224         request->rq_replen = lustre_msg_size(1, &size);
225
226         rc = ptlrpc_queue_wait(request);
227         GOTO(out, rc);
228
229  out:
230         ptlrpc_free_req(request);
231         return 0;
232 }
233
234 static int osc_create(struct obd_conn *conn, struct obdo *oa)
235 {
236         struct ptlrpc_request *request;
237         struct ptlrpc_client *cl;
238         struct ptlrpc_connection *connection;
239         struct ost_body *body;
240         int rc, size = sizeof(*body);
241         ENTRY;
242
243         if (!oa) {
244                 CERROR("oa NULL\n");
245                 RETURN(-EINVAL);
246         }
247         osc_con2cl(conn, &cl, &connection);
248         request = ptlrpc_prep_req(cl, connection, OST_CREATE, 1, &size, NULL);
249         if (!request)
250                 RETURN(-ENOMEM);
251
252         body = lustre_msg_buf(request->rq_reqmsg, 0);
253         memcpy(&body->oa, oa, sizeof(*oa));
254         body->oa.o_valid = ~0;
255         body->connid = conn->oc_id;
256
257         request->rq_replen = lustre_msg_size(1, &size);
258
259         rc = ptlrpc_queue_wait(request);
260         if (rc)
261                 GOTO(out, rc);
262
263         body = lustre_msg_buf(request->rq_repmsg, 0);
264         memcpy(oa, &body->oa, sizeof(*oa));
265
266         EXIT;
267  out:
268         ptlrpc_free_req(request);
269         return 0;
270 }
271
272 static int osc_punch(struct obd_conn *conn, struct obdo *oa, obd_size count,
273                      obd_off offset)
274 {
275         struct ptlrpc_request *request;
276         struct ptlrpc_client *cl;
277         struct ptlrpc_connection *connection;
278         struct ost_body *body;
279         int rc, size = sizeof(*body);
280         ENTRY;
281
282         if (!oa) {
283                 CERROR("oa NULL\n");
284                 RETURN(-EINVAL);
285         }
286         osc_con2cl(conn, &cl, &connection);
287         request = ptlrpc_prep_req(cl, connection, OST_PUNCH, 1, &size, NULL);
288         if (!request)
289                 RETURN(-ENOMEM);
290
291         body = lustre_msg_buf(request->rq_reqmsg, 0);
292         memcpy(&body->oa, oa, sizeof(*oa));
293         body->connid = conn->oc_id;
294         body->oa.o_valid = ~0;
295         body->oa.o_size = offset;
296         body->oa.o_blocks = count;
297
298         request->rq_replen = lustre_msg_size(1, &size);
299
300         rc = ptlrpc_queue_wait(request);
301         if (rc)
302                 GOTO(out, rc);
303
304         body = lustre_msg_buf(request->rq_repmsg, 0);
305         memcpy(oa, &body->oa, sizeof(*oa));
306
307         EXIT;
308  out:
309         ptlrpc_free_req(request);
310         return 0;
311 }
312
313 static int osc_destroy(struct obd_conn *conn, struct obdo *oa)
314 {
315         struct ptlrpc_request *request;
316         struct ptlrpc_client *cl;
317         struct ptlrpc_connection *connection;
318         struct ost_body *body;
319         int rc, size = sizeof(*body);
320         ENTRY;
321
322         if (!oa) {
323                 CERROR("oa NULL\n");
324                 RETURN(-EINVAL);
325         }
326         osc_con2cl(conn, &cl, &connection);
327         request = ptlrpc_prep_req(cl, connection, OST_DESTROY, 1, &size, NULL);
328         if (!request)
329                 RETURN(-ENOMEM);
330
331         body = lustre_msg_buf(request->rq_reqmsg, 0);
332         memcpy(&body->oa, oa, sizeof(*oa));
333         body->connid = conn->oc_id;
334         body->oa.o_valid = ~0;
335
336         request->rq_replen = lustre_msg_size(1, &size);
337
338         rc = ptlrpc_queue_wait(request);
339         if (rc)
340                 GOTO(out, rc);
341
342         body = lustre_msg_buf(request->rq_repmsg, 0);
343         memcpy(oa, &body->oa, sizeof(*oa));
344
345         EXIT;
346  out:
347         ptlrpc_free_req(request);
348         return 0;
349 }
350
351 static int osc_sendpage(struct ptlrpc_bulk_desc *desc,
352                         struct niobuf_remote *dst, struct niobuf_local *src)
353 {
354         struct ptlrpc_bulk_page *page;
355         ENTRY;
356
357         page = ptlrpc_prep_bulk_page(desc);
358         if (page == NULL)
359                 RETURN(-ENOMEM);
360
361         page->b_buf = (void *)(unsigned long)src->addr;
362         page->b_buflen = src->len;
363         page->b_xid = dst->xid;
364
365         RETURN(0);
366 }
367
368 static int osc_brw_read(struct obd_conn *conn, obd_count num_oa,
369                         struct obdo **oa, obd_count *oa_bufs, struct page **buf,
370                         obd_size *count, obd_off *offset, obd_flag *flags)
371 {
372         struct ptlrpc_client *cl;
373         struct ptlrpc_connection *connection;
374         struct ptlrpc_request *request;
375         struct ost_body *body;
376         struct list_head *tmp, *next;
377         int pages, rc, i, j, size[3] = {sizeof(*body)};
378         void *ptr1, *ptr2;
379         struct ptlrpc_bulk_desc *desc;;
380         ENTRY;
381
382         size[1] = num_oa * sizeof(struct obd_ioobj);
383         pages = 0;
384         for (i = 0; i < num_oa; i++)
385                 pages += oa_bufs[i];
386         size[2] = pages * sizeof(struct niobuf_remote);
387
388         osc_con2cl(conn, &cl, &connection);
389         request = ptlrpc_prep_req(cl, connection, OST_BRW, 3, size, NULL);
390         if (!request)
391                 GOTO(out3, rc = -ENOMEM);
392
393         body = lustre_msg_buf(request->rq_reqmsg, 0);
394         body->data = OBD_BRW_READ;
395
396         desc = ptlrpc_prep_bulk(connection);
397         if (desc == NULL)
398                 GOTO(out2, rc = -ENOMEM);
399         desc->b_portal = OST_BULK_PORTAL;
400
401         ptr1 = lustre_msg_buf(request->rq_reqmsg, 1);
402         ptr2 = lustre_msg_buf(request->rq_reqmsg, 2);
403         for (pages = 0, i = 0; i < num_oa; i++) {
404                 ost_pack_ioo(&ptr1, oa[i], oa_bufs[i]);
405                 for (j = 0; j < oa_bufs[i]; j++, pages++) {
406                         struct ptlrpc_bulk_page *page;
407                         page = ptlrpc_prep_bulk_page(desc);
408                         if (page == NULL)
409                                 GOTO(out, rc = -ENOMEM);
410
411                         spin_lock(&connection->c_lock);
412                         page->b_xid = ++connection->c_xid_out;
413                         spin_unlock(&connection->c_lock);
414
415                         page->b_buf = kmap(buf[pages]);
416                         page->b_buflen = PAGE_SIZE;
417                         ost_pack_niobuf(&ptr2, offset[pages], count[pages],
418                                         flags[pages], page->b_xid);
419                 }
420         }
421
422         rc = ptlrpc_register_bulk(desc);
423         if (rc)
424                 GOTO(out, rc);
425
426         request->rq_replen = lustre_msg_size(1, size);
427         rc = ptlrpc_queue_wait(request);
428         if (rc)
429                 ptlrpc_abort_bulk(desc);
430         GOTO(out, rc);
431
432  out:
433         list_for_each_safe(tmp, next, &desc->b_page_list) {
434                 struct ptlrpc_bulk_page *page;
435                 page = list_entry(tmp, struct ptlrpc_bulk_page, b_link);
436
437                 if (page->b_buf != NULL)
438                         kunmap(page->b_buf);
439         }
440
441         ptlrpc_free_bulk(desc);
442  out2:
443         ptlrpc_free_req(request);
444  out3:
445         return rc;
446 }
447
448 static int osc_brw_write(struct obd_conn *conn, obd_count num_oa,
449                          struct obdo **oa, obd_count *oa_bufs,
450                          struct page **buf, obd_size *count, obd_off *offset,
451                          obd_flag *flags)
452 {
453         struct ptlrpc_client *cl;
454         struct ptlrpc_connection *connection;
455         struct ptlrpc_request *request;
456         struct ptlrpc_bulk_desc *desc;
457         struct obd_ioobj ioo;
458         struct ost_body *body;
459         struct niobuf_local *local;
460         struct niobuf_remote *remote;
461         long pages;
462         int rc, i, j, size[3] = {sizeof(*body)};
463         void *ptr1, *ptr2;
464         ENTRY;
465
466         size[1] = num_oa * sizeof(ioo);
467         pages = 0;
468         for (i = 0; i < num_oa; i++)
469                 pages += oa_bufs[i];
470         size[2] = pages * sizeof(*remote);
471
472         OBD_ALLOC(local, pages * sizeof(*local));
473         if (local == NULL)
474                 RETURN(-ENOMEM);
475
476         osc_con2cl(conn, &cl, &connection);
477         request = ptlrpc_prep_req(cl, connection, OST_BRW, 3, size, NULL);
478         if (!request)
479                 GOTO(out3, rc = -ENOMEM);
480         body = lustre_msg_buf(request->rq_reqmsg, 0);
481         body->data = OBD_BRW_WRITE;
482
483         ptr1 = lustre_msg_buf(request->rq_reqmsg, 1);
484         ptr2 = lustre_msg_buf(request->rq_reqmsg, 2);
485         for (pages = 0, i = 0; i < num_oa; i++) {
486                 ost_pack_ioo(&ptr1, oa[i], oa_bufs[i]);
487                 for (j = 0; j < oa_bufs[i]; j++, pages++) {
488                         local[pages].addr = (__u64)(long)kmap(buf[pages]);
489                         local[pages].offset = offset[pages];
490                         local[pages].len = count[pages];
491                         ost_pack_niobuf(&ptr2, offset[pages], count[pages],
492                                         flags[pages], 0);
493                 }
494         }
495
496         size[1] = pages * sizeof(struct niobuf_remote);
497         request->rq_replen = lustre_msg_size(2, size);
498
499         rc = ptlrpc_queue_wait(request);
500         if (rc)
501                 GOTO(out2, rc);
502
503         ptr2 = lustre_msg_buf(request->rq_repmsg, 1);
504         if (ptr2 == NULL)
505                 GOTO(out2, rc = -EINVAL);
506
507         if (request->rq_repmsg->buflens[1] !=
508             pages * sizeof(struct niobuf_remote)) {
509                 CERROR("buffer length wrong (%d vs. %ld)\n",
510                        request->rq_repmsg->buflens[1],
511                        pages * sizeof(struct niobuf_remote));
512                 GOTO(out2, rc = -EINVAL);
513         }
514
515         desc = ptlrpc_prep_bulk(connection);
516         desc->b_portal = OSC_BULK_PORTAL;
517
518         for (pages = 0, i = 0; i < num_oa; i++) {
519                 for (j = 0; j < oa_bufs[i]; j++, pages++) {
520                         ost_unpack_niobuf(&ptr2, &remote);
521                         rc = osc_sendpage(desc, remote, &local[pages]);
522                         if (rc)
523                                 GOTO(out, rc);
524                 }
525         }
526
527         rc = ptlrpc_send_bulk(desc);
528         GOTO(out, rc);
529
530  out:
531         ptlrpc_free_bulk(desc);
532  out2:
533         ptlrpc_free_req(request);
534         for (pages = 0, i = 0; i < num_oa; i++)
535                 for (j = 0; j < oa_bufs[i]; j++, pages++)
536                         kunmap(buf[pages]);
537  out3:
538         OBD_FREE(local, pages * sizeof(*local));
539
540         return rc;
541 }
542
543 static int osc_brw(int rw, struct obd_conn *conn, obd_count num_oa,
544                    struct obdo **oa, obd_count *oa_bufs, struct page **buf,
545                    obd_size *count, obd_off *offset, obd_flag *flags)
546 {
547         if (rw == OBD_BRW_READ)
548                 return osc_brw_read(conn, num_oa, oa, oa_bufs, buf, count,
549                                     offset, flags);
550         else
551                 return osc_brw_write(conn, num_oa, oa, oa_bufs, buf, count,
552                                      offset, flags);
553 }
554
555 static int osc_enqueue(struct obd_conn *oconn, struct ldlm_namespace *ns,
556                        struct ldlm_handle *parent_lock, __u64 *res_id,
557                        __u32 type, struct ldlm_extent *extent, __u32 mode,
558                        int *flags, void *data, int datalen,
559                        struct ldlm_handle *lockh)
560 {
561         struct ptlrpc_connection *conn;
562         struct ptlrpc_client *cl;
563         int rc;
564         __u32 mode2;
565
566         /* Filesystem locks are given a bit of special treatment: first we
567          * fixup the lock to start and end on page boundaries. */
568         extent->start &= PAGE_MASK;
569         extent->end = (extent->end + PAGE_SIZE - 1) & PAGE_MASK;
570
571         /* Next, search for already existing extent locks that will cover us */
572         osc_con2dlmcl(oconn, &cl, &conn);
573         rc = ldlm_local_lock_match(ns, res_id, type, extent, mode, lockh);
574         if (rc == 1) {
575                 /* We already have a lock, and it's referenced */
576                 return 0;
577         }
578
579         /* Next, search for locks that we can upgrade (if we're trying to write)
580          * or are more than we need (if we're trying to read).  Because the VFS
581          * and page cache already protect us locally, lots of readers/writers
582          * can share a single PW lock. */
583         if (mode == LCK_PW)
584                 mode2 = LCK_PR;
585         else
586                 mode2 = LCK_PW;
587
588         rc = ldlm_local_lock_match(ns, res_id, type, extent, mode2, lockh);
589         if (rc == 1) {
590                 int flags;
591                 struct ldlm_lock *lock = ldlm_handle2object(lockh);
592                 /* FIXME: This is not incredibly elegant, but it might
593                  * be more elegant than adding another parameter to
594                  * lock_match.  I want a second opinion. */
595                 ldlm_lock_addref(lock, mode);
596                 ldlm_lock_decref(lock, mode2);
597
598                 if (mode == LCK_PR)
599                         return 0;
600
601                 rc = ldlm_cli_convert(cl, lockh, type, &flags);
602                 if (rc)
603                         LBUG();
604
605                 return rc;
606         }
607
608         rc = ldlm_cli_enqueue(cl, conn, ns, parent_lock, res_id, type,
609                               extent, mode, flags, data, datalen, lockh);
610         return rc;
611 }
612
613 static int osc_cancel(struct obd_conn *oconn, __u32 mode,
614                       struct ldlm_handle *lockh)
615 {
616         struct ldlm_lock *lock;
617         ENTRY;
618
619         lock = ldlm_handle2object(lockh);
620         ldlm_lock_decref(lock, mode);
621
622         RETURN(0);
623 }
624
625 static int osc_setup(struct obd_device *obddev, obd_count len, void *buf)
626 {
627         struct osc_obd *osc = &obddev->u.osc;
628         int rc;
629         ENTRY;
630
631         osc->osc_conn = ptlrpc_uuid_to_connection("ost");
632         if (!osc->osc_conn)
633                 RETURN(-EINVAL);
634
635         OBD_ALLOC(osc->osc_client, sizeof(*osc->osc_client));
636         if (osc->osc_client == NULL)
637                 GOTO(out_conn, rc = -ENOMEM);
638
639         OBD_ALLOC(osc->osc_ldlm_client, sizeof(*osc->osc_ldlm_client));
640         if (osc->osc_ldlm_client == NULL)
641                 GOTO(out_client, rc = -ENOMEM);
642
643         ptlrpc_init_client(NULL, NULL, OST_REQUEST_PORTAL, OSC_REPLY_PORTAL,
644                            osc->osc_client);
645         ptlrpc_init_client(NULL, NULL, LDLM_REQUEST_PORTAL, LDLM_REPLY_PORTAL,
646                            osc->osc_ldlm_client);
647         osc->osc_client->cli_name = "osc";
648         osc->osc_ldlm_client->cli_name = "ldlm";
649
650         MOD_INC_USE_COUNT;
651         RETURN(0);
652
653  out_client:
654         OBD_FREE(osc->osc_client, sizeof(*osc->osc_client));
655  out_conn:
656         ptlrpc_put_connection(osc->osc_conn);
657         return rc;
658 }
659
660 static int osc_cleanup(struct obd_device * obddev)
661 {
662         struct osc_obd *osc = &obddev->u.osc;
663
664         ptlrpc_cleanup_client(osc->osc_client);
665         OBD_FREE(osc->osc_client, sizeof(*osc->osc_client));
666         ptlrpc_cleanup_client(osc->osc_ldlm_client);
667         OBD_FREE(osc->osc_ldlm_client, sizeof(*osc->osc_ldlm_client));
668         ptlrpc_put_connection(osc->osc_conn);
669
670         MOD_DEC_USE_COUNT;
671         return 0;
672 }
673
674 struct obd_ops osc_obd_ops = {
675         o_setup:   osc_setup,
676         o_cleanup: osc_cleanup,
677         o_create: osc_create,
678         o_destroy: osc_destroy,
679         o_getattr: osc_getattr,
680         o_setattr: osc_setattr,
681         o_open: osc_open,
682         o_close: osc_close,
683         o_connect: osc_connect,
684         o_disconnect: osc_disconnect,
685         o_brw: osc_brw,
686         o_punch: osc_punch,
687         o_enqueue: osc_enqueue,
688         o_cancel: osc_cancel
689 };
690
691 static int __init osc_init(void)
692 {
693         obd_register_type(&osc_obd_ops, LUSTRE_OSC_NAME);
694         return 0;
695 }
696
697 static void __exit osc_exit(void)
698 {
699         obd_unregister_type(LUSTRE_OSC_NAME);
700 }
701
702 MODULE_AUTHOR("Peter J. Braam <braam@clusterfs.com>");
703 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC) v1.0");
704 MODULE_LICENSE("GPL");
705
706 module_init(osc_init);
707 module_exit(osc_exit);