Whamcloud - gitweb
WARNING: This commit breaks everything. It will be back in shape within 12
[fs/lustre-release.git] / lustre / osc / osc_request.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Copryright (C) 2001, 2002 Cluster File Systems, Inc.
5  *
6  *  This code is issued under the GNU General Public License.
7  *  See the file COPYING in this distribution
8  *
9  *  Author Peter Braam <braam@clusterfs.com>
10  *
11  *  This server is single threaded at present (but can easily be multi
12  *  threaded). For testing and management it is treated as an
13  *  obd_device, although it does not export a full OBD method table
14  *  (the requests are coming in over the wire, so object target
15  *  modules do not have a full method table.)
16  *
17  */
18
19 #define EXPORT_SYMTAB
20 #define DEBUG_SUBSYSTEM S_OSC
21
22 #include <linux/module.h>
23 #include <linux/lustre_dlm.h>
24 #include <linux/obd_ost.h>
25
26 static void osc_con2cl(struct obd_conn *conn, struct ptlrpc_client **cl,
27                        struct ptlrpc_connection **connection)
28 {
29         struct osc_obd *osc = &conn->oc_dev->u.osc;
30         *cl = osc->osc_client;
31         *connection = osc->osc_conn;
32 }
33
34 static void osc_con2dlmcl(struct obd_conn *conn, struct ptlrpc_client **cl,
35                           struct ptlrpc_connection **connection)
36 {
37         struct osc_obd *osc = &conn->oc_dev->u.osc;
38         *cl = osc->osc_ldlm_client;
39         *connection = osc->osc_conn;
40 }
41
42 static int osc_connect(struct obd_conn *conn)
43 {
44         struct ptlrpc_request *request;
45         struct ptlrpc_client *cl;
46         struct ptlrpc_connection *connection;
47         struct ost_body *body;
48         int rc, size = sizeof(*body);
49         ENTRY;
50
51         osc_con2cl(conn, &cl, &connection);
52         request = ptlrpc_prep_req(cl, connection, OST_CONNECT, 0, NULL, NULL);
53         if (!request)
54                 RETURN(-ENOMEM);
55
56         request->rq_replen = lustre_msg_size(1, &size);
57
58         rc = ptlrpc_queue_wait(request);
59         rc = ptlrpc_check_status(request, rc);
60         if (rc) {
61                 CERROR("%s failed: rc = %d\n", __FUNCTION__, rc);
62                 GOTO(out, rc);
63         }
64
65         body = lustre_msg_buf(request->rq_repmsg, 0);
66         CDEBUG(D_INODE, "received connid %d\n", body->connid);
67
68         conn->oc_id = body->connid;
69         EXIT;
70  out:
71         ptlrpc_free_req(request);
72         return rc;
73 }
74
75 static int osc_disconnect(struct obd_conn *conn)
76 {
77         struct ptlrpc_request *request;
78         struct ptlrpc_client *cl;
79         struct ptlrpc_connection *connection;
80         struct ost_body *body;
81         int rc, size = sizeof(*body);
82         ENTRY;
83
84         osc_con2cl(conn, &cl, &connection);
85         request = ptlrpc_prep_req(cl, connection, OST_DISCONNECT, 1, &size,
86                                   NULL);
87         if (!request)
88                 RETURN(-ENOMEM);
89
90         body = lustre_msg_buf(request->rq_reqmsg, 0);
91         body->connid = conn->oc_id;
92
93         request->rq_replen = lustre_msg_size(1, &size);
94
95         rc = ptlrpc_queue_wait(request);
96         GOTO(out, rc);
97  out:
98         ptlrpc_free_req(request);
99         return rc;
100 }
101
102 static int osc_getattr(struct obd_conn *conn, struct obdo *oa)
103 {
104         struct ptlrpc_request *request;
105         struct ptlrpc_client *cl;
106         struct ptlrpc_connection *connection;
107         struct ost_body *body;
108         int rc, size = sizeof(*body);
109         ENTRY;
110
111         osc_con2cl(conn, &cl, &connection);
112         request = ptlrpc_prep_req(cl, connection, OST_GETATTR, 1, &size, NULL);
113         if (!request)
114                 RETURN(-ENOMEM);
115
116         body = lustre_msg_buf(request->rq_reqmsg, 0);
117         memcpy(&body->oa, oa, sizeof(*oa));
118         body->connid = conn->oc_id;
119         body->oa.o_valid = ~0;
120
121         request->rq_replen = lustre_msg_size(1, &size);
122
123         rc = ptlrpc_queue_wait(request);
124         rc = ptlrpc_check_status(request, rc);
125         if (rc) {
126                 CERROR("%s failed: rc = %d\n", __FUNCTION__, rc);
127                 GOTO(out, rc);
128         }
129
130         body = lustre_msg_buf(request->rq_repmsg, 0);
131         CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
132         if (oa)
133                 memcpy(oa, &body->oa, sizeof(*oa));
134
135         EXIT;
136  out:
137         ptlrpc_free_req(request);
138         return 0;
139 }
140
141 static int osc_open(struct obd_conn *conn, struct obdo *oa)
142 {
143         struct ptlrpc_request *request;
144         struct ptlrpc_client *cl;
145         struct ptlrpc_connection *connection;
146         struct ost_body *body;
147         int rc, size = sizeof(*body);
148         ENTRY;
149
150         osc_con2cl(conn, &cl, &connection);
151         request = ptlrpc_prep_req(cl, connection, OST_OPEN, 1, &size, NULL);
152         if (!request)
153                 RETURN(-ENOMEM);
154
155         body = lustre_msg_buf(request->rq_reqmsg, 0);
156         memcpy(&body->oa, oa, sizeof(*oa));
157         body->connid = conn->oc_id;
158         if (body->oa.o_valid != (OBD_MD_FLMODE | OBD_MD_FLID))
159                 LBUG();
160
161         request->rq_replen = lustre_msg_size(1, &size);
162
163         rc = ptlrpc_queue_wait(request);
164         rc = ptlrpc_check_status(request, rc);
165         if (rc)
166                 GOTO(out, rc);
167
168         body = lustre_msg_buf(request->rq_repmsg, 0);
169         CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
170         if (oa)
171                 memcpy(oa, &body->oa, sizeof(*oa));
172
173         EXIT;
174  out:
175         ptlrpc_free_req(request);
176         return 0;
177 }
178
179 static int osc_close(struct obd_conn *conn, struct obdo *oa)
180 {
181         struct ptlrpc_request *request;
182         struct ptlrpc_client *cl;
183         struct ptlrpc_connection *connection;
184         struct ost_body *body;
185         int rc, size = sizeof(*body);
186         ENTRY;
187
188         osc_con2cl(conn, &cl, &connection);
189         request = ptlrpc_prep_req(cl, connection, OST_CLOSE, 1, &size, NULL);
190         if (!request)
191                 RETURN(-ENOMEM);
192
193         body = lustre_msg_buf(request->rq_reqmsg, 0);
194         memcpy(&body->oa, oa, sizeof(*oa));
195         body->connid = conn->oc_id;
196
197         request->rq_replen = lustre_msg_size(1, &size);
198
199         rc = ptlrpc_queue_wait(request);
200         rc = ptlrpc_check_status(request, rc);
201         if (rc)
202                 GOTO(out, rc);
203
204         body = lustre_msg_buf(request->rq_repmsg, 0);
205         CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
206         if (oa)
207                 memcpy(oa, &body->oa, sizeof(*oa));
208
209         EXIT;
210  out:
211         ptlrpc_free_req(request);
212         return 0;
213 }
214
215 static int osc_setattr(struct obd_conn *conn, struct obdo *oa)
216 {
217         struct ptlrpc_request *request;
218         struct ptlrpc_client *cl;
219         struct ptlrpc_connection *connection;
220         struct ost_body *body;
221         int rc, size = sizeof(*body);
222         ENTRY;
223
224         osc_con2cl(conn, &cl, &connection);
225         request = ptlrpc_prep_req(cl, connection, OST_SETATTR, 1, &size, NULL);
226         if (!request)
227                 RETURN(-ENOMEM);
228
229         body = lustre_msg_buf(request->rq_reqmsg, 0);
230         memcpy(&body->oa, oa, sizeof(*oa));
231         body->connid = conn->oc_id;
232
233         request->rq_replen = lustre_msg_size(1, &size);
234
235         rc = ptlrpc_queue_wait(request);
236         rc = ptlrpc_check_status(request, rc);
237         GOTO(out, rc);
238
239  out:
240         ptlrpc_free_req(request);
241         return 0;
242 }
243
244 static int osc_create(struct obd_conn *conn, struct obdo *oa)
245 {
246         struct ptlrpc_request *request;
247         struct ptlrpc_client *cl;
248         struct ptlrpc_connection *connection;
249         struct ost_body *body;
250         int rc, size = sizeof(*body);
251         ENTRY;
252
253         if (!oa) {
254                 CERROR("oa NULL\n");
255                 RETURN(-EINVAL);
256         }
257         osc_con2cl(conn, &cl, &connection);
258         request = ptlrpc_prep_req(cl, connection, OST_CREATE, 1, &size, NULL);
259         if (!request)
260                 RETURN(-ENOMEM);
261
262         body = lustre_msg_buf(request->rq_reqmsg, 0);
263         memcpy(&body->oa, oa, sizeof(*oa));
264         body->oa.o_valid = ~0;
265         body->connid = conn->oc_id;
266
267         request->rq_replen = lustre_msg_size(1, &size);
268
269         rc = ptlrpc_queue_wait(request);
270         rc = ptlrpc_check_status(request, rc);
271         if (rc)
272                 GOTO(out, rc);
273
274         body = lustre_msg_buf(request->rq_repmsg, 0);
275         memcpy(oa, &body->oa, sizeof(*oa));
276
277         EXIT;
278  out:
279         ptlrpc_free_req(request);
280         return 0;
281 }
282
283 static int osc_punch(struct obd_conn *conn, struct obdo *oa, obd_size count,
284                      obd_off offset)
285 {
286         struct ptlrpc_request *request;
287         struct ptlrpc_client *cl;
288         struct ptlrpc_connection *connection;
289         struct ost_body *body;
290         int rc, size = sizeof(*body);
291         ENTRY;
292
293         if (!oa) {
294                 CERROR("oa NULL\n");
295                 RETURN(-EINVAL);
296         }
297         osc_con2cl(conn, &cl, &connection);
298         request = ptlrpc_prep_req(cl, connection, OST_PUNCH, 1, &size, NULL);
299         if (!request)
300                 RETURN(-ENOMEM);
301
302         body = lustre_msg_buf(request->rq_reqmsg, 0);
303         memcpy(&body->oa, oa, sizeof(*oa));
304         body->connid = conn->oc_id;
305         body->oa.o_valid = ~0;
306         body->oa.o_size = offset;
307         body->oa.o_blocks = count;
308
309         request->rq_replen = lustre_msg_size(1, &size);
310
311         rc = ptlrpc_queue_wait(request);
312         rc = ptlrpc_check_status(request, rc);
313         if (rc)
314                 GOTO(out, rc);
315
316         body = lustre_msg_buf(request->rq_repmsg, 0);
317         memcpy(oa, &body->oa, sizeof(*oa));
318
319         EXIT;
320  out:
321         ptlrpc_free_req(request);
322         return 0;
323 }
324
325 static int osc_destroy(struct obd_conn *conn, struct obdo *oa)
326 {
327         struct ptlrpc_request *request;
328         struct ptlrpc_client *cl;
329         struct ptlrpc_connection *connection;
330         struct ost_body *body;
331         int rc, size = sizeof(*body);
332         ENTRY;
333
334         if (!oa) {
335                 CERROR("oa NULL\n");
336                 RETURN(-EINVAL);
337         }
338         osc_con2cl(conn, &cl, &connection);
339         request = ptlrpc_prep_req(cl, connection, OST_DESTROY, 1, &size, NULL);
340         if (!request)
341                 RETURN(-ENOMEM);
342
343         body = lustre_msg_buf(request->rq_reqmsg, 0);
344         memcpy(&body->oa, oa, sizeof(*oa));
345         body->connid = conn->oc_id;
346         body->oa.o_valid = ~0;
347
348         request->rq_replen = lustre_msg_size(1, &size);
349
350         rc = ptlrpc_queue_wait(request);
351         rc = ptlrpc_check_status(request, rc);
352         if (rc)
353                 GOTO(out, rc);
354
355         body = lustre_msg_buf(request->rq_repmsg, 0);
356         memcpy(oa, &body->oa, sizeof(*oa));
357
358         EXIT;
359  out:
360         ptlrpc_free_req(request);
361         return 0;
362 }
363
364 struct osc_brw_cb_data {
365         struct page **buf;
366         struct ptlrpc_request *req;
367         bulk_callback_t callback;
368         void *cb_data;
369 };
370
371 static void brw_read_finish(struct ptlrpc_bulk_desc *desc, void *data)
372 {
373         struct osc_brw_cb_data *cb_data = data;
374
375         if (desc->b_flags & PTL_RPC_FL_INTR)
376                 CERROR("got signal\n");
377
378         (cb_data->callback)(desc, cb_data->cb_data);
379
380         ptlrpc_free_bulk(desc);
381         ptlrpc_free_req(cb_data->req);
382
383         OBD_FREE(cb_data, sizeof(*cb_data));
384 }
385
386 static int osc_brw_read(struct obd_conn *conn, obd_count num_oa,
387                         struct obdo **oa, obd_count *oa_bufs, struct page **buf,
388                         obd_size *count, obd_off *offset, obd_flag *flags,
389                         bulk_callback_t callback)
390 {
391         struct ptlrpc_client *cl;
392         struct ptlrpc_connection *connection;
393         struct ptlrpc_request *request;
394         struct ost_body *body;
395         struct list_head *tmp;
396         int pages, rc, i, j, size[3] = {sizeof(*body)};
397         void *ptr1, *ptr2;
398         struct ptlrpc_bulk_desc *desc;
399         ENTRY;
400
401         size[1] = num_oa * sizeof(struct obd_ioobj);
402         pages = 0;
403         for (i = 0; i < num_oa; i++)
404                 pages += oa_bufs[i];
405         size[2] = pages * sizeof(struct niobuf_remote);
406
407         osc_con2cl(conn, &cl, &connection);
408         request = ptlrpc_prep_req(cl, connection, OST_BRW, 3, size, NULL);
409         if (!request)
410                 GOTO(out, rc = -ENOMEM);
411
412         body = lustre_msg_buf(request->rq_reqmsg, 0);
413         body->data = OBD_BRW_READ;
414
415         desc = ptlrpc_prep_bulk(connection);
416         if (!desc)
417                 GOTO(out2, rc = -ENOMEM);
418         desc->b_portal = OST_BULK_PORTAL;
419
420         ptr1 = lustre_msg_buf(request->rq_reqmsg, 1);
421         ptr2 = lustre_msg_buf(request->rq_reqmsg, 2);
422         for (pages = 0, i = 0; i < num_oa; i++) {
423                 ost_pack_ioo(&ptr1, oa[i], oa_bufs[i]);
424                 /* FIXME: this inner loop is wrong for multiple OAs */
425                 for (j = 0; j < oa_bufs[i]; j++, pages++) {
426                         struct ptlrpc_bulk_page *bulk;
427                         bulk = ptlrpc_prep_bulk_page(desc);
428                         if (bulk == NULL)
429                                 GOTO(out3, rc = -ENOMEM);
430
431                         spin_lock(&connection->c_lock);
432                         bulk->b_xid = ++connection->c_xid_out;
433                         spin_unlock(&connection->c_lock);
434
435                         bulk->b_buf = kmap(buf[pages]);
436                         bulk->b_page = buf[pages];
437                         bulk->b_buflen = PAGE_SIZE;
438                         ost_pack_niobuf(&ptr2, offset[pages], count[pages],
439                                         flags[pages], bulk->b_xid);
440                 }
441         }
442
443         rc = ptlrpc_register_bulk(desc);
444         if (rc)
445                 GOTO(out3, rc);
446
447         request->rq_replen = lustre_msg_size(1, size);
448         rc = ptlrpc_queue_wait(request);
449         rc = ptlrpc_check_status(request, rc);
450         if (rc)
451                 ptlrpc_abort_bulk(desc);
452         GOTO(out3, rc);
453
454  out3:
455         list_for_each(tmp, &desc->b_page_list) {
456                 struct ptlrpc_bulk_page *bulk;
457                 bulk = list_entry(tmp, struct ptlrpc_bulk_page, b_link);
458                 if (bulk->b_buf != NULL)
459                         kunmap(bulk->b_page);
460         }
461         ptlrpc_free_bulk(desc);
462  out2:
463         ptlrpc_free_req(request);
464  out:
465         return rc;
466 }
467
468 static void brw_write_finish(struct ptlrpc_bulk_desc *desc, void *data)
469 {
470         struct osc_brw_cb_data *cb_data = data;
471         int i;
472
473         if (desc->b_flags & PTL_RPC_FL_INTR)
474                 CERROR("got signal\n");
475
476         for (i = 0; i < desc->b_page_count; i++)
477                 kunmap(cb_data->buf[i]);
478
479         (cb_data->callback)(desc, cb_data->cb_data);
480
481         ptlrpc_free_bulk(desc);
482         ptlrpc_free_req(cb_data->req);
483
484         OBD_FREE(cb_data, sizeof(*cb_data));
485 }
486
487 static int osc_brw_write(struct obd_conn *conn, obd_count num_oa,
488                          struct obdo **oa, obd_count *oa_bufs,
489                          struct page **pagearray, obd_size *count,
490                          obd_off *offset, obd_flag *flags,
491                          bulk_callback_t callback)
492 {
493         struct ptlrpc_client *cl;
494         struct ptlrpc_connection *connection;
495         struct ptlrpc_request *request;
496         struct ptlrpc_bulk_desc *desc;
497         struct obd_ioobj ioo;
498         struct ost_body *body;
499         struct niobuf_local *local;
500         struct niobuf_remote *remote;
501         struct osc_brw_cb_data *cb_data;
502         long pages;
503         int rc, i, j, size[3] = {sizeof(*body)};
504         void *ptr1, *ptr2;
505         ENTRY;
506
507         size[1] = num_oa * sizeof(ioo);
508         pages = 0;
509         for (i = 0; i < num_oa; i++)
510                 pages += oa_bufs[i];
511         size[2] = pages * sizeof(*remote);
512
513         OBD_ALLOC(local, pages * sizeof(*local));
514         if (local == NULL)
515                 RETURN(-ENOMEM);
516
517         osc_con2cl(conn, &cl, &connection);
518         request = ptlrpc_prep_req(cl, connection, OST_BRW, 3, size, NULL);
519         if (!request)
520                 GOTO(out, rc = -ENOMEM);
521         body = lustre_msg_buf(request->rq_reqmsg, 0);
522         body->data = OBD_BRW_WRITE;
523
524         ptr1 = lustre_msg_buf(request->rq_reqmsg, 1);
525         ptr2 = lustre_msg_buf(request->rq_reqmsg, 2);
526         for (pages = 0, i = 0; i < num_oa; i++) {
527                 ost_pack_ioo(&ptr1, oa[i], oa_bufs[i]);
528                 for (j = 0; j < oa_bufs[i]; j++, pages++) {
529                         local[pages].addr = kmap(pagearray[pages]);
530                         local[pages].offset = offset[pages];
531                         local[pages].len = count[pages];
532                         ost_pack_niobuf(&ptr2, offset[pages], count[pages],
533                                         flags[pages], 0);
534                 }
535         }
536
537         size[1] = pages * sizeof(struct niobuf_remote);
538         request->rq_replen = lustre_msg_size(2, size);
539
540         rc = ptlrpc_queue_wait(request);
541         rc = ptlrpc_check_status(request, rc);
542         if (rc)
543                 GOTO(out2, rc);
544
545         ptr2 = lustre_msg_buf(request->rq_repmsg, 1);
546         if (ptr2 == NULL)
547                 GOTO(out2, rc = -EINVAL);
548
549         if (request->rq_repmsg->buflens[1] !=
550             pages * sizeof(struct niobuf_remote)) {
551                 CERROR("buffer length wrong (%d vs. %ld)\n",
552                        request->rq_repmsg->buflens[1],
553                        pages * sizeof(struct niobuf_remote));
554                 GOTO(out2, rc = -EINVAL);
555         }
556
557         desc = ptlrpc_prep_bulk(connection);
558         desc->b_portal = OSC_BULK_PORTAL;
559         if (callback) {
560                 desc->b_cb = brw_write_finish;
561                 OBD_ALLOC(cb_data, sizeof(*cb_data));
562                 cb_data->buf = pagearray;
563                 cb_data->callback = callback;
564                 desc->b_cb_data = cb_data;
565         }
566
567         for (pages = 0, i = 0; i < num_oa; i++) {
568                 for (j = 0; j < oa_bufs[i]; j++, pages++) {
569                         struct ptlrpc_bulk_page *page;
570
571                         ost_unpack_niobuf(&ptr2, &remote);
572
573                         page = ptlrpc_prep_bulk_page(desc);
574                         if (page == NULL)
575                                 GOTO(out3, rc = -ENOMEM);
576
577                         page->b_buf = (void *)(unsigned long)local[pages].addr;
578                         page->b_buflen = local[pages].len;
579                         page->b_xid = remote->xid;
580                 }
581         }
582
583         if (desc->b_page_count != pages)
584                 LBUG();
585
586         rc = ptlrpc_send_bulk(desc);
587         if (rc)
588                 GOTO(out3, rc);
589         if (callback)
590                 GOTO(out, rc);
591
592         /* If there's no callback function, sleep here until complete. */
593         wait_event_interruptible(desc->b_waitq, ptlrpc_check_bulk_sent(desc));
594         if (desc->b_flags & PTL_RPC_FL_INTR)
595                 rc = -EINTR;
596
597         GOTO(out3, rc);
598
599  out3:
600         ptlrpc_free_bulk(desc);
601  out2:
602         ptlrpc_free_req(request);
603         for (pages = 0, i = 0; i < num_oa; i++)
604                 for (j = 0; j < oa_bufs[i]; j++, pages++)
605                         kunmap(pagearray[pages]);
606  out:
607         OBD_FREE(local, pages * sizeof(*local));
608
609         return rc;
610 }
611
612 static int osc_brw(int rw, struct obd_conn *conn, obd_count num_oa,
613                    struct obdo **oa, obd_count *oa_bufs, struct page **buf,
614                    obd_size *count, obd_off *offset, obd_flag *flags,
615                    void *callback)
616 {
617         if (num_oa != 1)
618                 LBUG();
619
620         if (rw == OBD_BRW_READ)
621                 return osc_brw_read(conn, num_oa, oa, oa_bufs, buf, count,
622                                     offset, flags, (bulk_callback_t)callback);
623         else
624                 return osc_brw_write(conn, num_oa, oa, oa_bufs, buf, count,
625                                      offset, flags, (bulk_callback_t)callback);
626 }
627
628 static int osc_enqueue(struct obd_conn *oconn, struct ldlm_namespace *ns,
629                        struct ldlm_handle *parent_lock, __u64 *res_id,
630                        __u32 type, struct ldlm_extent *extent, __u32 mode,
631                        int *flags, void *data, int datalen,
632                        struct ldlm_handle *lockh)
633 {
634         struct ptlrpc_connection *conn;
635         struct ptlrpc_client *cl;
636         int rc;
637         __u32 mode2;
638
639         /* Filesystem locks are given a bit of special treatment: first we
640          * fixup the lock to start and end on page boundaries. */
641         extent->start &= PAGE_MASK;
642         extent->end = (extent->end + PAGE_SIZE - 1) & PAGE_MASK;
643
644         /* Next, search for already existing extent locks that will cover us */
645         osc_con2dlmcl(oconn, &cl, &conn);
646         rc = ldlm_local_lock_match(ns, res_id, type, extent, mode, lockh);
647         if (rc == 1) {
648                 /* We already have a lock, and it's referenced */
649                 return 0;
650         }
651
652         /* Next, search for locks that we can upgrade (if we're trying to write)
653          * or are more than we need (if we're trying to read).  Because the VFS
654          * and page cache already protect us locally, lots of readers/writers
655          * can share a single PW lock. */
656         if (mode == LCK_PW)
657                 mode2 = LCK_PR;
658         else
659                 mode2 = LCK_PW;
660
661         rc = ldlm_local_lock_match(ns, res_id, type, extent, mode2, lockh);
662         if (rc == 1) {
663                 int flags;
664                 struct ldlm_lock *lock = ldlm_handle2object(lockh);
665                 /* FIXME: This is not incredibly elegant, but it might
666                  * be more elegant than adding another parameter to
667                  * lock_match.  I want a second opinion. */
668                 ldlm_lock_addref(lock, mode);
669                 ldlm_lock_decref(lock, mode2);
670
671                 if (mode == LCK_PR)
672                         return 0;
673
674                 rc = ldlm_cli_convert(cl, lockh, type, &flags);
675                 if (rc)
676                         LBUG();
677
678                 return rc;
679         }
680
681         rc = ldlm_cli_enqueue(cl, conn, ns, parent_lock, res_id, type,
682                               extent, mode, flags, data, datalen, lockh);
683         return rc;
684 }
685
686 static int osc_cancel(struct obd_conn *oconn, __u32 mode,
687                       struct ldlm_handle *lockh)
688 {
689         struct ldlm_lock *lock;
690         ENTRY;
691
692         lock = ldlm_handle2object(lockh);
693         ldlm_lock_decref(lock, mode);
694
695         RETURN(0);
696 }
697
698 static int osc_setup(struct obd_device *obddev, obd_count len, void *buf)
699 {
700         struct osc_obd *osc = &obddev->u.osc;
701         int rc;
702         ENTRY;
703
704         osc->osc_conn = ptlrpc_uuid_to_connection("ost");
705         if (!osc->osc_conn)
706                 RETURN(-EINVAL);
707
708         OBD_ALLOC(osc->osc_client, sizeof(*osc->osc_client));
709         if (osc->osc_client == NULL)
710                 GOTO(out_conn, rc = -ENOMEM);
711
712         OBD_ALLOC(osc->osc_ldlm_client, sizeof(*osc->osc_ldlm_client));
713         if (osc->osc_ldlm_client == NULL)
714                 GOTO(out_client, rc = -ENOMEM);
715
716         ptlrpc_init_client(NULL, NULL, OST_REQUEST_PORTAL, OSC_REPLY_PORTAL,
717                            osc->osc_client);
718         ptlrpc_init_client(NULL, NULL, LDLM_REQUEST_PORTAL, LDLM_REPLY_PORTAL,
719                            osc->osc_ldlm_client);
720         osc->osc_client->cli_name = "osc";
721         osc->osc_ldlm_client->cli_name = "ldlm";
722
723         MOD_INC_USE_COUNT;
724         RETURN(0);
725
726  out_client:
727         OBD_FREE(osc->osc_client, sizeof(*osc->osc_client));
728  out_conn:
729         ptlrpc_put_connection(osc->osc_conn);
730         return rc;
731 }
732
733 static int osc_cleanup(struct obd_device * obddev)
734 {
735         struct osc_obd *osc = &obddev->u.osc;
736
737         ptlrpc_cleanup_client(osc->osc_client);
738         OBD_FREE(osc->osc_client, sizeof(*osc->osc_client));
739         ptlrpc_cleanup_client(osc->osc_ldlm_client);
740         OBD_FREE(osc->osc_ldlm_client, sizeof(*osc->osc_ldlm_client));
741         ptlrpc_put_connection(osc->osc_conn);
742
743         MOD_DEC_USE_COUNT;
744         return 0;
745 }
746
747 struct obd_ops osc_obd_ops = {
748         o_setup:   osc_setup,
749         o_cleanup: osc_cleanup,
750         o_create: osc_create,
751         o_destroy: osc_destroy,
752         o_getattr: osc_getattr,
753         o_setattr: osc_setattr,
754         o_open: osc_open,
755         o_close: osc_close,
756         o_connect: osc_connect,
757         o_disconnect: osc_disconnect,
758         o_brw: osc_brw,
759         o_punch: osc_punch,
760         o_enqueue: osc_enqueue,
761         o_cancel: osc_cancel
762 };
763
764 static int __init osc_init(void)
765 {
766         obd_register_type(&osc_obd_ops, LUSTRE_OSC_NAME);
767         return 0;
768 }
769
770 static void __exit osc_exit(void)
771 {
772         obd_unregister_type(LUSTRE_OSC_NAME);
773 }
774
775 MODULE_AUTHOR("Peter J. Braam <braam@clusterfs.com>");
776 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC) v1.0");
777 MODULE_LICENSE("GPL");
778
779 module_init(osc_init);
780 module_exit(osc_exit);