Whamcloud - gitweb
Another part of the statfs story, disabled.
[fs/lustre-release.git] / lustre / osc / osc_request.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Copryright (C) 2001, 2002 Cluster File Systems, Inc.
5  *
6  *  This code is issued under the GNU General Public License.
7  *  See the file COPYING in this distribution
8  *
9  *  Author Peter Braam <braam@clusterfs.com>
10  *
11  *  This server is single threaded at present (but can easily be multi
12  *  threaded). For testing and management it is treated as an
13  *  obd_device, although it does not export a full OBD method table
14  *  (the requests are coming in over the wire, so object target
15  *  modules do not have a full method table.)
16  *
17  */
18
19 #define EXPORT_SYMTAB
20 #define DEBUG_SUBSYSTEM S_OSC
21
22 #include <linux/module.h>
23 #include <linux/lustre_dlm.h>
24 #include <linux/lustre_mds.h> /* for mds_objid */
25 #include <linux/obd_ost.h>
26 #include <linux/obd_lov.h>
27
28 static void osc_con2cl(struct obd_conn *conn, struct ptlrpc_client **cl,
29                        struct ptlrpc_connection **connection)
30 {
31         struct osc_obd *osc = &conn->oc_dev->u.osc;
32         *cl = osc->osc_client;
33         *connection = osc->osc_conn;
34 }
35
36 static void osc_con2dlmcl(struct obd_conn *conn, struct ptlrpc_client **cl,
37                           struct ptlrpc_connection **connection)
38 {
39         struct osc_obd *osc = &conn->oc_dev->u.osc;
40         *cl = osc->osc_ldlm_client;
41         *connection = osc->osc_conn;
42 }
43
44 static int osc_connect(struct obd_conn *conn)
45 {
46         struct ptlrpc_request *request;
47         struct ptlrpc_client *cl;
48         struct ptlrpc_connection *connection;
49         struct ost_body *body;
50         int rc, size = sizeof(*body);
51         ENTRY;
52
53         osc_con2cl(conn, &cl, &connection);
54         request = ptlrpc_prep_req(cl, connection, OST_CONNECT, 0, NULL, NULL);
55         if (!request)
56                 RETURN(-ENOMEM);
57
58         request->rq_replen = lustre_msg_size(1, &size);
59
60         rc = ptlrpc_queue_wait(request);
61         rc = ptlrpc_check_status(request, rc);
62         if (rc) {
63                 CERROR("%s failed: rc = %d\n", __FUNCTION__, rc);
64                 GOTO(out, rc);
65         }
66
67         body = lustre_msg_buf(request->rq_repmsg, 0);
68         CDEBUG(D_INODE, "received connid %d\n", body->connid);
69
70         conn->oc_id = body->connid;
71         EXIT;
72  out:
73         ptlrpc_free_req(request);
74         return rc;
75 }
76
77 static int osc_disconnect(struct obd_conn *conn)
78 {
79         struct ptlrpc_request *request;
80         struct ptlrpc_client *cl;
81         struct ptlrpc_connection *connection;
82         struct ost_body *body;
83         int rc, size = sizeof(*body);
84         ENTRY;
85
86         osc_con2cl(conn, &cl, &connection);
87         request = ptlrpc_prep_req(cl, connection, OST_DISCONNECT, 1, &size,
88                                   NULL);
89         if (!request)
90                 RETURN(-ENOMEM);
91
92         body = lustre_msg_buf(request->rq_reqmsg, 0);
93         body->connid = conn->oc_id;
94
95         request->rq_replen = lustre_msg_size(1, &size);
96
97         rc = ptlrpc_queue_wait(request);
98         GOTO(out, rc);
99  out:
100         ptlrpc_free_req(request);
101         return rc;
102 }
103
104 static int osc_getattr(struct obd_conn *conn, struct obdo *oa)
105 {
106         struct ptlrpc_request *request;
107         struct ptlrpc_client *cl;
108         struct ptlrpc_connection *connection;
109         struct ost_body *body;
110         int rc, size = sizeof(*body);
111         ENTRY;
112
113         osc_con2cl(conn, &cl, &connection);
114         request = ptlrpc_prep_req(cl, connection, OST_GETATTR, 1, &size, NULL);
115         if (!request)
116                 RETURN(-ENOMEM);
117
118         body = lustre_msg_buf(request->rq_reqmsg, 0);
119         memcpy(&body->oa, oa, sizeof(*oa));
120         body->connid = conn->oc_id;
121         body->oa.o_valid = ~0;
122
123         request->rq_replen = lustre_msg_size(1, &size);
124
125         rc = ptlrpc_queue_wait(request);
126         rc = ptlrpc_check_status(request, rc);
127         if (rc) {
128                 CERROR("%s failed: rc = %d\n", __FUNCTION__, rc);
129                 GOTO(out, rc);
130         }
131
132         body = lustre_msg_buf(request->rq_repmsg, 0);
133         CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
134         if (oa)
135                 memcpy(oa, &body->oa, sizeof(*oa));
136
137         EXIT;
138  out:
139         ptlrpc_free_req(request);
140         return 0;
141 }
142
143 static int osc_open(struct obd_conn *conn, struct obdo *oa)
144 {
145         struct ptlrpc_request *request;
146         struct ptlrpc_client *cl;
147         struct ptlrpc_connection *connection;
148         struct ost_body *body;
149         int rc, size = sizeof(*body);
150         ENTRY;
151
152         osc_con2cl(conn, &cl, &connection);
153         request = ptlrpc_prep_req(cl, connection, OST_OPEN, 1, &size, NULL);
154         if (!request)
155                 RETURN(-ENOMEM);
156
157         body = lustre_msg_buf(request->rq_reqmsg, 0);
158         memcpy(&body->oa, oa, sizeof(*oa));
159         body->connid = conn->oc_id;
160         body->oa.o_valid = (OBD_MD_FLMODE | OBD_MD_FLID);
161
162         request->rq_replen = lustre_msg_size(1, &size);
163
164         rc = ptlrpc_queue_wait(request);
165         rc = ptlrpc_check_status(request, rc);
166         if (rc)
167                 GOTO(out, rc);
168
169         body = lustre_msg_buf(request->rq_repmsg, 0);
170         CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
171         if (oa)
172                 memcpy(oa, &body->oa, sizeof(*oa));
173
174         EXIT;
175  out:
176         ptlrpc_free_req(request);
177         return 0;
178 }
179
180 static int osc_close(struct obd_conn *conn, struct obdo *oa)
181 {
182         struct ptlrpc_request *request;
183         struct ptlrpc_client *cl;
184         struct ptlrpc_connection *connection;
185         struct ost_body *body;
186         int rc, size = sizeof(*body);
187         ENTRY;
188
189         osc_con2cl(conn, &cl, &connection);
190         request = ptlrpc_prep_req(cl, connection, OST_CLOSE, 1, &size, NULL);
191         if (!request)
192                 RETURN(-ENOMEM);
193
194         body = lustre_msg_buf(request->rq_reqmsg, 0);
195         memcpy(&body->oa, oa, sizeof(*oa));
196         body->connid = conn->oc_id;
197
198         request->rq_replen = lustre_msg_size(1, &size);
199
200         rc = ptlrpc_queue_wait(request);
201         rc = ptlrpc_check_status(request, rc);
202         if (rc)
203                 GOTO(out, rc);
204
205         body = lustre_msg_buf(request->rq_repmsg, 0);
206         CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
207         if (oa)
208                 memcpy(oa, &body->oa, sizeof(*oa));
209
210         EXIT;
211  out:
212         ptlrpc_free_req(request);
213         return 0;
214 }
215
216 static int osc_setattr(struct obd_conn *conn, struct obdo *oa)
217 {
218         struct ptlrpc_request *request;
219         struct ptlrpc_client *cl;
220         struct ptlrpc_connection *connection;
221         struct ost_body *body;
222         int rc, size = sizeof(*body);
223         ENTRY;
224
225         osc_con2cl(conn, &cl, &connection);
226         request = ptlrpc_prep_req(cl, connection, OST_SETATTR, 1, &size, NULL);
227         if (!request)
228                 RETURN(-ENOMEM);
229
230         body = lustre_msg_buf(request->rq_reqmsg, 0);
231         memcpy(&body->oa, oa, sizeof(*oa));
232         body->connid = conn->oc_id;
233
234         request->rq_replen = lustre_msg_size(1, &size);
235
236         rc = ptlrpc_queue_wait(request);
237         rc = ptlrpc_check_status(request, rc);
238         GOTO(out, rc);
239
240  out:
241         ptlrpc_free_req(request);
242         return 0;
243 }
244
245 static int osc_create(struct obd_conn *conn, struct obdo *oa)
246 {
247         struct ptlrpc_request *request;
248         struct ptlrpc_client *cl;
249         struct ptlrpc_connection *connection;
250         struct ost_body *body;
251         struct mds_objid *objid;
252         struct lov_object_id *lov_id;
253         int rc, size = sizeof(*body);
254         ENTRY;
255
256         if (!oa) {
257                 CERROR("oa NULL\n");
258                 RETURN(-EINVAL);
259         }
260         osc_con2cl(conn, &cl, &connection);
261         request = ptlrpc_prep_req(cl, connection, OST_CREATE, 1, &size, NULL);
262         if (!request)
263                 RETURN(-ENOMEM);
264
265         body = lustre_msg_buf(request->rq_reqmsg, 0);
266         memcpy(&body->oa, oa, sizeof(*oa));
267         body->oa.o_valid = ~0;
268         body->connid = conn->oc_id;
269
270         request->rq_replen = lustre_msg_size(1, &size);
271
272         rc = ptlrpc_queue_wait(request);
273         rc = ptlrpc_check_status(request, rc);
274         if (rc)
275                 GOTO(out, rc);
276
277         body = lustre_msg_buf(request->rq_repmsg, 0);
278         memcpy(oa, &body->oa, sizeof(*oa));
279
280         memset(oa->o_inline, 0, sizeof(oa->o_inline));
281         objid = (struct mds_objid *)oa->o_inline;
282         objid->mo_lov_md.lmd_object_id = oa->o_id;
283         objid->mo_lov_md.lmd_stripe_count = 1;
284         lov_id = (struct lov_object_id *)(oa->o_inline + sizeof(*objid));
285         lov_id->l_device_id = 0;
286         lov_id->l_object_id = oa->o_id;
287
288         EXIT;
289  out:
290         ptlrpc_free_req(request);
291         return 0;
292 }
293
294 static int osc_punch(struct obd_conn *conn, struct obdo *oa, obd_size count,
295                      obd_off offset)
296 {
297         struct ptlrpc_request *request;
298         struct ptlrpc_client *cl;
299         struct ptlrpc_connection *connection;
300         struct ost_body *body;
301         int rc, size = sizeof(*body);
302         ENTRY;
303
304         if (!oa) {
305                 CERROR("oa NULL\n");
306                 RETURN(-EINVAL);
307         }
308         osc_con2cl(conn, &cl, &connection);
309         request = ptlrpc_prep_req(cl, connection, OST_PUNCH, 1, &size, NULL);
310         if (!request)
311                 RETURN(-ENOMEM);
312
313         body = lustre_msg_buf(request->rq_reqmsg, 0);
314         memcpy(&body->oa, oa, sizeof(*oa));
315         body->connid = conn->oc_id;
316         body->oa.o_valid = ~0;
317         body->oa.o_size = offset;
318         body->oa.o_blocks = count;
319
320         request->rq_replen = lustre_msg_size(1, &size);
321
322         rc = ptlrpc_queue_wait(request);
323         rc = ptlrpc_check_status(request, rc);
324         if (rc)
325                 GOTO(out, rc);
326
327         body = lustre_msg_buf(request->rq_repmsg, 0);
328         memcpy(oa, &body->oa, sizeof(*oa));
329
330         EXIT;
331  out:
332         ptlrpc_free_req(request);
333         return 0;
334 }
335
336 static int osc_destroy(struct obd_conn *conn, struct obdo *oa)
337 {
338         struct ptlrpc_request *request;
339         struct ptlrpc_client *cl;
340         struct ptlrpc_connection *connection;
341         struct ost_body *body;
342         int rc, size = sizeof(*body);
343         ENTRY;
344
345         if (!oa) {
346                 CERROR("oa NULL\n");
347                 RETURN(-EINVAL);
348         }
349         osc_con2cl(conn, &cl, &connection);
350         request = ptlrpc_prep_req(cl, connection, OST_DESTROY, 1, &size, NULL);
351         if (!request)
352                 RETURN(-ENOMEM);
353
354         body = lustre_msg_buf(request->rq_reqmsg, 0);
355         memcpy(&body->oa, oa, sizeof(*oa));
356         body->connid = conn->oc_id;
357         body->oa.o_valid = ~0;
358
359         request->rq_replen = lustre_msg_size(1, &size);
360
361         rc = ptlrpc_queue_wait(request);
362         rc = ptlrpc_check_status(request, rc);
363         if (rc)
364                 GOTO(out, rc);
365
366         body = lustre_msg_buf(request->rq_repmsg, 0);
367         memcpy(oa, &body->oa, sizeof(*oa));
368
369         EXIT;
370  out:
371         ptlrpc_free_req(request);
372         return 0;
373 }
374
375 struct osc_brw_cb_data {
376         struct page **buf;
377         struct ptlrpc_request *req;
378         bulk_callback_t callback;
379         void *cb_data;
380 };
381
382 static void brw_read_finish(struct ptlrpc_bulk_desc *desc, void *data)
383 {
384         struct osc_brw_cb_data *cb_data = data;
385
386         if (desc->b_flags & PTL_RPC_FL_INTR)
387                 CERROR("got signal\n");
388
389         (cb_data->callback)(desc, cb_data->cb_data);
390
391         ptlrpc_free_bulk(desc);
392         ptlrpc_free_req(cb_data->req);
393
394         OBD_FREE(cb_data, sizeof(*cb_data));
395 }
396
397 static int osc_brw_read(struct obd_conn *conn, obd_count num_oa,
398                         struct obdo **oa, obd_count *oa_bufs, struct page **buf,
399                         obd_size *count, obd_off *offset, obd_flag *flags,
400                         bulk_callback_t callback)
401 {
402         struct ptlrpc_client *cl;
403         struct ptlrpc_connection *connection;
404         struct ptlrpc_request *request;
405         struct ost_body *body;
406         struct list_head *tmp;
407         int pages, rc, i, j, size[3] = {sizeof(*body)};
408         void *ptr1, *ptr2;
409         struct ptlrpc_bulk_desc *desc;
410         ENTRY;
411
412         size[1] = num_oa * sizeof(struct obd_ioobj);
413         pages = 0;
414         for (i = 0; i < num_oa; i++)
415                 pages += oa_bufs[i];
416         size[2] = pages * sizeof(struct niobuf_remote);
417
418         osc_con2cl(conn, &cl, &connection);
419         request = ptlrpc_prep_req(cl, connection, OST_BRW, 3, size, NULL);
420         if (!request)
421                 GOTO(out, rc = -ENOMEM);
422
423         body = lustre_msg_buf(request->rq_reqmsg, 0);
424         body->data = OBD_BRW_READ;
425
426         desc = ptlrpc_prep_bulk(connection);
427         if (!desc)
428                 GOTO(out2, rc = -ENOMEM);
429         desc->b_portal = OST_BULK_PORTAL;
430
431         ptr1 = lustre_msg_buf(request->rq_reqmsg, 1);
432         ptr2 = lustre_msg_buf(request->rq_reqmsg, 2);
433         for (pages = 0, i = 0; i < num_oa; i++) {
434                 ost_pack_ioo(&ptr1, oa[i], oa_bufs[i]);
435                 /* FIXME: this inner loop is wrong for multiple OAs */
436                 for (j = 0; j < oa_bufs[i]; j++, pages++) {
437                         struct ptlrpc_bulk_page *bulk;
438                         bulk = ptlrpc_prep_bulk_page(desc);
439                         if (bulk == NULL)
440                                 GOTO(out3, rc = -ENOMEM);
441
442                         spin_lock(&connection->c_lock);
443                         bulk->b_xid = ++connection->c_xid_out;
444                         spin_unlock(&connection->c_lock);
445
446                         bulk->b_buf = kmap(buf[pages]);
447                         bulk->b_page = buf[pages];
448                         bulk->b_buflen = PAGE_SIZE;
449                         ost_pack_niobuf(&ptr2, offset[pages], count[pages],
450                                         flags[pages], bulk->b_xid);
451                 }
452         }
453
454         rc = ptlrpc_register_bulk(desc);
455         if (rc)
456                 GOTO(out3, rc);
457
458         request->rq_replen = lustre_msg_size(1, size);
459         rc = ptlrpc_queue_wait(request);
460         rc = ptlrpc_check_status(request, rc);
461         if (rc)
462                 ptlrpc_abort_bulk(desc);
463         GOTO(out3, rc);
464
465  out3:
466         list_for_each(tmp, &desc->b_page_list) {
467                 struct ptlrpc_bulk_page *bulk;
468                 bulk = list_entry(tmp, struct ptlrpc_bulk_page, b_link);
469                 if (bulk->b_buf != NULL)
470                         kunmap(bulk->b_page);
471         }
472         ptlrpc_free_bulk(desc);
473  out2:
474         ptlrpc_free_req(request);
475  out:
476         return rc;
477 }
478
479 static void brw_write_finish(struct ptlrpc_bulk_desc *desc, void *data)
480 {
481         struct osc_brw_cb_data *cb_data = data;
482         int i;
483         ENTRY;
484
485         if (desc->b_flags & PTL_RPC_FL_INTR)
486                 CERROR("got signal\n");
487
488         for (i = 0; i < desc->b_page_count; i++)
489                 kunmap(cb_data->buf[i]);
490
491         (cb_data->callback)(desc, cb_data->cb_data);
492
493         ptlrpc_free_bulk(desc);
494         ptlrpc_free_req(cb_data->req);
495
496         OBD_FREE(cb_data, sizeof(*cb_data));
497         EXIT;
498 }
499
500 static int osc_brw_write(struct obd_conn *conn, obd_count num_oa,
501                          struct obdo **oa, obd_count *oa_bufs,
502                          struct page **pagearray, obd_size *count,
503                          obd_off *offset, obd_flag *flags,
504                          bulk_callback_t callback)
505 {
506         struct ptlrpc_client *cl;
507         struct ptlrpc_connection *connection;
508         struct ptlrpc_request *request;
509         struct ptlrpc_bulk_desc *desc;
510         struct obd_ioobj ioo;
511         struct ost_body *body;
512         struct niobuf_local *local;
513         struct niobuf_remote *remote;
514         struct osc_brw_cb_data *cb_data;
515         long pages;
516         int rc, i, j, size[3] = {sizeof(*body)};
517         void *ptr1, *ptr2;
518         ENTRY;
519
520         size[1] = num_oa * sizeof(ioo);
521         pages = 0;
522         for (i = 0; i < num_oa; i++)
523                 pages += oa_bufs[i];
524         size[2] = pages * sizeof(*remote);
525
526         OBD_ALLOC(local, pages * sizeof(*local));
527         if (local == NULL)
528                 RETURN(-ENOMEM);
529
530         osc_con2cl(conn, &cl, &connection);
531         request = ptlrpc_prep_req(cl, connection, OST_BRW, 3, size, NULL);
532         if (!request)
533                 GOTO(out, rc = -ENOMEM);
534         body = lustre_msg_buf(request->rq_reqmsg, 0);
535         body->data = OBD_BRW_WRITE;
536
537         ptr1 = lustre_msg_buf(request->rq_reqmsg, 1);
538         ptr2 = lustre_msg_buf(request->rq_reqmsg, 2);
539         for (pages = 0, i = 0; i < num_oa; i++) {
540                 ost_pack_ioo(&ptr1, oa[i], oa_bufs[i]);
541                 for (j = 0; j < oa_bufs[i]; j++, pages++) {
542                         local[pages].addr = kmap(pagearray[pages]);
543                         local[pages].offset = offset[pages];
544                         local[pages].len = count[pages];
545                         ost_pack_niobuf(&ptr2, offset[pages], count[pages],
546                                         flags[pages], 0);
547                 }
548         }
549
550         size[1] = pages * sizeof(struct niobuf_remote);
551         request->rq_replen = lustre_msg_size(2, size);
552
553         rc = ptlrpc_queue_wait(request);
554         rc = ptlrpc_check_status(request, rc);
555         if (rc)
556                 GOTO(out2, rc);
557
558         ptr2 = lustre_msg_buf(request->rq_repmsg, 1);
559         if (ptr2 == NULL)
560                 GOTO(out2, rc = -EINVAL);
561
562         if (request->rq_repmsg->buflens[1] !=
563             pages * sizeof(struct niobuf_remote)) {
564                 CERROR("buffer length wrong (%d vs. %ld)\n",
565                        request->rq_repmsg->buflens[1],
566                        pages * sizeof(struct niobuf_remote));
567                 GOTO(out2, rc = -EINVAL);
568         }
569
570         desc = ptlrpc_prep_bulk(connection);
571         desc->b_portal = OSC_BULK_PORTAL;
572         if (callback) {
573                 desc->b_cb = brw_write_finish;
574                 OBD_ALLOC(cb_data, sizeof(*cb_data));
575                 cb_data->buf = pagearray;
576                 cb_data->callback = callback;
577                 desc->b_cb_data = cb_data;
578         }
579
580         for (pages = 0, i = 0; i < num_oa; i++) {
581                 for (j = 0; j < oa_bufs[i]; j++, pages++) {
582                         struct ptlrpc_bulk_page *page;
583
584                         ost_unpack_niobuf(&ptr2, &remote);
585
586                         page = ptlrpc_prep_bulk_page(desc);
587                         if (page == NULL)
588                                 GOTO(out3, rc = -ENOMEM);
589
590                         page->b_buf = (void *)(unsigned long)local[pages].addr;
591                         page->b_buflen = local[pages].len;
592                         page->b_xid = remote->xid;
593                 }
594         }
595
596         if (desc->b_page_count != pages)
597                 LBUG();
598
599         rc = ptlrpc_send_bulk(desc);
600         if (rc)
601                 GOTO(out3, rc);
602         if (callback)
603                 GOTO(out, rc);
604
605         /* If there's no callback function, sleep here until complete. */
606         wait_event_interruptible(desc->b_waitq, ptlrpc_check_bulk_sent(desc));
607         if (desc->b_flags & PTL_RPC_FL_INTR)
608                 rc = -EINTR;
609
610         GOTO(out3, rc);
611
612  out3:
613         ptlrpc_free_bulk(desc);
614  out2:
615         ptlrpc_free_req(request);
616         for (pages = 0, i = 0; i < num_oa; i++)
617                 for (j = 0; j < oa_bufs[i]; j++, pages++)
618                         kunmap(pagearray[pages]);
619  out:
620         OBD_FREE(local, pages * sizeof(*local));
621
622         return rc;
623 }
624
625 static int osc_brw(int rw, struct obd_conn *conn, obd_count num_oa,
626                    struct obdo **oa, obd_count *oa_bufs, struct page **buf,
627                    obd_size *count, obd_off *offset, obd_flag *flags,
628                    void *callback)
629 {
630         if (num_oa != 1)
631                 LBUG();
632
633         if (rw == OBD_BRW_READ)
634                 return osc_brw_read(conn, num_oa, oa, oa_bufs, buf, count,
635                                     offset, flags, (bulk_callback_t)callback);
636         else
637                 return osc_brw_write(conn, num_oa, oa, oa_bufs, buf, count,
638                                      offset, flags, (bulk_callback_t)callback);
639 }
640
641 static int osc_enqueue(struct obd_conn *oconn, struct ldlm_namespace *ns,
642                        struct ldlm_handle *parent_lock, __u64 *res_id,
643                        __u32 type, struct ldlm_extent *extent, __u32 mode,
644                        int *flags, void *data, int datalen,
645                        struct ldlm_handle *lockh)
646 {
647         struct ptlrpc_connection *conn;
648         struct ptlrpc_client *cl;
649         int rc;
650         __u32 mode2;
651
652         /* Filesystem locks are given a bit of special treatment: first we
653          * fixup the lock to start and end on page boundaries. */
654         extent->start &= PAGE_MASK;
655         extent->end = (extent->end + PAGE_SIZE - 1) & PAGE_MASK;
656
657         /* Next, search for already existing extent locks that will cover us */
658         osc_con2dlmcl(oconn, &cl, &conn);
659         rc = ldlm_local_lock_match(ns, res_id, type, extent, mode, lockh);
660         if (rc == 1) {
661                 /* We already have a lock, and it's referenced */
662                 return 0;
663         }
664
665         /* Next, search for locks that we can upgrade (if we're trying to write)
666          * or are more than we need (if we're trying to read).  Because the VFS
667          * and page cache already protect us locally, lots of readers/writers
668          * can share a single PW lock. */
669         if (mode == LCK_PW)
670                 mode2 = LCK_PR;
671         else
672                 mode2 = LCK_PW;
673
674         rc = ldlm_local_lock_match(ns, res_id, type, extent, mode2, lockh);
675         if (rc == 1) {
676                 int flags;
677                 struct ldlm_lock *lock = ldlm_handle2object(lockh);
678                 /* FIXME: This is not incredibly elegant, but it might
679                  * be more elegant than adding another parameter to
680                  * lock_match.  I want a second opinion. */
681                 ldlm_lock_addref(lock, mode);
682                 ldlm_lock_decref(lock, mode2);
683
684                 if (mode == LCK_PR)
685                         return 0;
686
687                 rc = ldlm_cli_convert(cl, lockh, type, &flags);
688                 if (rc)
689                         LBUG();
690
691                 return rc;
692         }
693
694         rc = ldlm_cli_enqueue(cl, conn, ns, parent_lock, res_id, type,
695                               extent, mode, flags, data, datalen, lockh);
696         return rc;
697 }
698
699 static int osc_cancel(struct obd_conn *oconn, __u32 mode,
700                       struct ldlm_handle *lockh)
701 {
702         struct ldlm_lock *lock;
703         ENTRY;
704
705         lock = ldlm_handle2object(lockh);
706         ldlm_lock_decref(lock, mode);
707
708         RETURN(0);
709 }
710
711 static int osc_setup(struct obd_device *obddev, obd_count len, void *buf)
712 {
713         struct osc_obd *osc = &obddev->u.osc;
714         int rc;
715         ENTRY;
716
717         osc->osc_conn = ptlrpc_uuid_to_connection("ost");
718         if (!osc->osc_conn)
719                 RETURN(-EINVAL);
720
721         OBD_ALLOC(osc->osc_client, sizeof(*osc->osc_client));
722         if (osc->osc_client == NULL)
723                 GOTO(out_conn, rc = -ENOMEM);
724
725         OBD_ALLOC(osc->osc_ldlm_client, sizeof(*osc->osc_ldlm_client));
726         if (osc->osc_ldlm_client == NULL)
727                 GOTO(out_client, rc = -ENOMEM);
728
729         ptlrpc_init_client(NULL, NULL, OST_REQUEST_PORTAL, OSC_REPLY_PORTAL,
730                            osc->osc_client);
731         ptlrpc_init_client(NULL, NULL, LDLM_REQUEST_PORTAL, LDLM_REPLY_PORTAL,
732                            osc->osc_ldlm_client);
733         osc->osc_client->cli_name = "osc";
734         osc->osc_ldlm_client->cli_name = "ldlm";
735
736         MOD_INC_USE_COUNT;
737         RETURN(0);
738
739  out_client:
740         OBD_FREE(osc->osc_client, sizeof(*osc->osc_client));
741  out_conn:
742         ptlrpc_put_connection(osc->osc_conn);
743         return rc;
744 }
745
746 static int osc_cleanup(struct obd_device * obddev)
747 {
748         struct osc_obd *osc = &obddev->u.osc;
749
750         ptlrpc_cleanup_client(osc->osc_client);
751         OBD_FREE(osc->osc_client, sizeof(*osc->osc_client));
752         ptlrpc_cleanup_client(osc->osc_ldlm_client);
753         OBD_FREE(osc->osc_ldlm_client, sizeof(*osc->osc_ldlm_client));
754         ptlrpc_put_connection(osc->osc_conn);
755
756         MOD_DEC_USE_COUNT;
757         return 0;
758 }
759
760 #if 0
761 static int osc_statfs(struct obd_conn *conn, struct statfs *statfs);
762 {
763         struct ptlrpc_request *request;
764         struct ptlrpc_client *cl;
765         struct ptlrpc_connection *connection;
766         struct ost_body *body;
767         int rc, size = sizeof(*body);
768         ENTRY;
769
770         osc_con2cl(conn, &cl, &connection);
771         request = ptlrpc_prep_req(cl, connection, OST_STATFS, 1, &size, NULL);
772         if (!request)
773                 RETURN(-ENOMEM);
774
775         body = lustre_msg_buf(request->rq_reqmsg, 0);
776         memcpy(&body->oa, oa, sizeof(*oa));
777         body->oa.o_valid = ~0;
778         body->connid = conn->oc_id;
779
780         request->rq_replen = lustre_msg_size(1, &size);
781
782         rc = ptlrpc_queue_wait(request);
783         rc = ptlrpc_check_status(request, rc);
784         if (rc)
785                 GOTO(out, rc);
786
787         body = lustre_msg_buf(request->rq_repmsg, 0);
788         memcpy(oa, &body->oa, sizeof(*oa));
789
790         EXIT;
791  out:
792         ptlrpc_free_req(request);
793         return 0;
794 }
795 #endif
796
797 struct obd_ops osc_obd_ops = {
798         o_setup:   osc_setup,
799         o_cleanup: osc_cleanup,
800         o_create: osc_create,
801         o_destroy: osc_destroy,
802         o_getattr: osc_getattr,
803         o_setattr: osc_setattr,
804         o_open: osc_open,
805         o_close: osc_close,
806         o_connect: osc_connect,
807         o_disconnect: osc_disconnect,
808         o_brw: osc_brw,
809         o_punch: osc_punch,
810         o_enqueue: osc_enqueue,
811         o_cancel: osc_cancel
812 };
813
814 static int __init osc_init(void)
815 {
816         obd_register_type(&osc_obd_ops, LUSTRE_OSC_NAME);
817         return 0;
818 }
819
820 static void __exit osc_exit(void)
821 {
822         obd_unregister_type(LUSTRE_OSC_NAME);
823 }
824
825 MODULE_AUTHOR("Peter J. Braam <braam@clusterfs.com>");
826 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC) v1.0");
827 MODULE_LICENSE("GPL");
828
829 module_init(osc_init);
830 module_exit(osc_exit);