Whamcloud - gitweb
Merge from posix_stable
[fs/lustre-release.git] / lustre / osc / osc_request.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Copryright (C) 2001, 2002 Cluster File Systems, Inc.
5  *
6  *  This code is issued under the GNU General Public License.
7  *  See the file COPYING in this distribution
8  *
9  *  Author Peter Braam <braam@clusterfs.com>
10  *
11  *  This server is single threaded at present (but can easily be multi
12  *  threaded). For testing and management it is treated as an
13  *  obd_device, although it does not export a full OBD method table
14  *  (the requests are coming in over the wire, so object target
15  *  modules do not have a full method table.)
16  *
17  */
18
19 #define EXPORT_SYMTAB
20 #define DEBUG_SUBSYSTEM S_OSC
21
22 #include <linux/module.h>
23 #include <linux/lustre_dlm.h>
24 #include <linux/lustre_mds.h> /* for mds_objid */
25 #include <linux/obd_ost.h>
26 #include <linux/obd_lov.h>
27
28 static void osc_con2cl(struct obd_conn *conn, struct ptlrpc_client **cl,
29                        struct ptlrpc_connection **connection)
30 {
31         struct osc_obd *osc = &conn->oc_dev->u.osc;
32         *cl = osc->osc_client;
33         *connection = osc->osc_conn;
34 }
35
36 static void osc_con2dlmcl(struct obd_conn *conn, struct ptlrpc_client **cl,
37                           struct ptlrpc_connection **connection)
38 {
39         struct osc_obd *osc = &conn->oc_dev->u.osc;
40         *cl = osc->osc_ldlm_client;
41         *connection = osc->osc_conn;
42 }
43
44 static int osc_connect(struct obd_conn *conn)
45 {
46         struct osc_obd *osc = &conn->oc_dev->u.osc;
47         struct ptlrpc_request *request;
48         struct ptlrpc_client *cl;
49         struct ptlrpc_connection *connection;
50         struct ost_body *body;
51         char *tmp = osc->osc_target_uuid;
52         int rc, size = sizeof(osc->osc_target_uuid);
53         ENTRY;
54
55         osc_con2cl(conn, &cl, &connection);
56         request = ptlrpc_prep_req(cl, connection, OST_CONNECT, 1, &size, &tmp);
57         if (!request)
58                 RETURN(-ENOMEM);
59
60         size = sizeof(*body);
61         request->rq_replen = lustre_msg_size(1, &size);
62
63         rc = ptlrpc_queue_wait(request);
64         rc = ptlrpc_check_status(request, rc);
65         if (rc) {
66                 CERROR("%s failed: rc = %d\n", __FUNCTION__, rc);
67                 GOTO(out, rc);
68         }
69
70         body = lustre_msg_buf(request->rq_repmsg, 0);
71         CDEBUG(D_INODE, "received connid %d\n", body->connid);
72
73         /* This might be redundant. */
74         cl->cli_target_devno = request->rq_repmsg->target_id;
75         osc->osc_ldlm_client->cli_target_devno = cl->cli_target_devno;
76         /* XXX: Make this a handle */
77         conn->oc_id = body->connid;
78         EXIT;
79  out:
80         ptlrpc_free_req(request);
81         return rc;
82 }
83
84 static int osc_disconnect(struct obd_conn *conn)
85 {
86         struct ptlrpc_request *request;
87         struct ptlrpc_client *cl;
88         struct ptlrpc_connection *connection;
89         struct ost_body *body;
90         int rc, size = sizeof(*body);
91         ENTRY;
92
93         osc_con2cl(conn, &cl, &connection);
94         request = ptlrpc_prep_req(cl, connection, OST_DISCONNECT, 1, &size,
95                                   NULL);
96         if (!request)
97                 RETURN(-ENOMEM);
98
99         body = lustre_msg_buf(request->rq_reqmsg, 0);
100         body->connid = conn->oc_id;
101
102         request->rq_replen = lustre_msg_size(0, NULL);
103
104         rc = ptlrpc_queue_wait(request);
105         GOTO(out, rc);
106  out:
107         ptlrpc_free_req(request);
108         return rc;
109 }
110
111 static int osc_getattr(struct obd_conn *conn, struct obdo *oa)
112 {
113         struct ptlrpc_request *request;
114         struct ptlrpc_client *cl;
115         struct ptlrpc_connection *connection;
116         struct ost_body *body;
117         int rc, size = sizeof(*body);
118         ENTRY;
119
120         osc_con2cl(conn, &cl, &connection);
121         request = ptlrpc_prep_req(cl, connection, OST_GETATTR, 1, &size, NULL);
122         if (!request)
123                 RETURN(-ENOMEM);
124
125         body = lustre_msg_buf(request->rq_reqmsg, 0);
126         memcpy(&body->oa, oa, sizeof(*oa));
127         body->connid = conn->oc_id;
128         body->oa.o_valid = ~0;
129
130         request->rq_replen = lustre_msg_size(1, &size);
131
132         rc = ptlrpc_queue_wait(request);
133         rc = ptlrpc_check_status(request, rc);
134         if (rc) {
135                 CERROR("%s failed: rc = %d\n", __FUNCTION__, rc);
136                 GOTO(out, rc);
137         }
138
139         body = lustre_msg_buf(request->rq_repmsg, 0);
140         CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
141         if (oa)
142                 memcpy(oa, &body->oa, sizeof(*oa));
143
144         EXIT;
145  out:
146         ptlrpc_free_req(request);
147         return 0;
148 }
149
150 static int osc_open(struct obd_conn *conn, struct obdo *oa)
151 {
152         struct ptlrpc_request *request;
153         struct ptlrpc_client *cl;
154         struct ptlrpc_connection *connection;
155         struct ost_body *body;
156         int rc, size = sizeof(*body);
157         ENTRY;
158
159         osc_con2cl(conn, &cl, &connection);
160         request = ptlrpc_prep_req(cl, connection, OST_OPEN, 1, &size, NULL);
161         if (!request)
162                 RETURN(-ENOMEM);
163
164         body = lustre_msg_buf(request->rq_reqmsg, 0);
165         memcpy(&body->oa, oa, sizeof(*oa));
166         body->connid = conn->oc_id;
167         body->oa.o_valid = (OBD_MD_FLMODE | OBD_MD_FLID);
168
169         request->rq_replen = lustre_msg_size(1, &size);
170
171         rc = ptlrpc_queue_wait(request);
172         rc = ptlrpc_check_status(request, rc);
173         if (rc)
174                 GOTO(out, rc);
175
176         body = lustre_msg_buf(request->rq_repmsg, 0);
177         CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
178         if (oa)
179                 memcpy(oa, &body->oa, sizeof(*oa));
180
181         EXIT;
182  out:
183         ptlrpc_free_req(request);
184         return 0;
185 }
186
187 static int osc_close(struct obd_conn *conn, struct obdo *oa)
188 {
189         struct ptlrpc_request *request;
190         struct ptlrpc_client *cl;
191         struct ptlrpc_connection *connection;
192         struct ost_body *body;
193         int rc, size = sizeof(*body);
194         ENTRY;
195
196         osc_con2cl(conn, &cl, &connection);
197         request = ptlrpc_prep_req(cl, connection, OST_CLOSE, 1, &size, NULL);
198         if (!request)
199                 RETURN(-ENOMEM);
200
201         body = lustre_msg_buf(request->rq_reqmsg, 0);
202         memcpy(&body->oa, oa, sizeof(*oa));
203         body->connid = conn->oc_id;
204
205         request->rq_replen = lustre_msg_size(1, &size);
206
207         rc = ptlrpc_queue_wait(request);
208         rc = ptlrpc_check_status(request, rc);
209         if (rc)
210                 GOTO(out, rc);
211
212         body = lustre_msg_buf(request->rq_repmsg, 0);
213         CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
214         if (oa)
215                 memcpy(oa, &body->oa, sizeof(*oa));
216
217         EXIT;
218  out:
219         ptlrpc_free_req(request);
220         return 0;
221 }
222
223 static int osc_setattr(struct obd_conn *conn, struct obdo *oa)
224 {
225         struct ptlrpc_request *request;
226         struct ptlrpc_client *cl;
227         struct ptlrpc_connection *connection;
228         struct ost_body *body;
229         int rc, size = sizeof(*body);
230         ENTRY;
231
232         osc_con2cl(conn, &cl, &connection);
233         request = ptlrpc_prep_req(cl, connection, OST_SETATTR, 1, &size, NULL);
234         if (!request)
235                 RETURN(-ENOMEM);
236
237         body = lustre_msg_buf(request->rq_reqmsg, 0);
238         memcpy(&body->oa, oa, sizeof(*oa));
239         body->connid = conn->oc_id;
240
241         request->rq_replen = lustre_msg_size(1, &size);
242
243         rc = ptlrpc_queue_wait(request);
244         rc = ptlrpc_check_status(request, rc);
245         GOTO(out, rc);
246
247  out:
248         ptlrpc_free_req(request);
249         return 0;
250 }
251
252 static int osc_create(struct obd_conn *conn, struct obdo *oa)
253 {
254         struct ptlrpc_request *request;
255         struct ptlrpc_client *cl;
256         struct ptlrpc_connection *connection;
257         struct ost_body *body;
258         struct mds_objid *objid;
259         struct lov_object_id *lov_id;
260         int rc, size = sizeof(*body);
261         ENTRY;
262
263         if (!oa) {
264                 CERROR("oa NULL\n");
265                 RETURN(-EINVAL);
266         }
267         osc_con2cl(conn, &cl, &connection);
268         request = ptlrpc_prep_req(cl, connection, OST_CREATE, 1, &size, NULL);
269         if (!request)
270                 RETURN(-ENOMEM);
271
272         body = lustre_msg_buf(request->rq_reqmsg, 0);
273         memcpy(&body->oa, oa, sizeof(*oa));
274         body->connid = conn->oc_id;
275
276         request->rq_replen = lustre_msg_size(1, &size);
277
278         rc = ptlrpc_queue_wait(request);
279         rc = ptlrpc_check_status(request, rc);
280         if (rc)
281                 GOTO(out, rc);
282
283         body = lustre_msg_buf(request->rq_repmsg, 0);
284         memcpy(oa, &body->oa, sizeof(*oa));
285
286         memset(oa->o_inline, 0, sizeof(oa->o_inline));
287         objid = (struct mds_objid *)oa->o_inline;
288         objid->mo_lov_md.lmd_object_id = oa->o_id;
289         objid->mo_lov_md.lmd_stripe_count = 1;
290         lov_id = (struct lov_object_id *)(oa->o_inline + sizeof(*objid));
291         lov_id->l_device_id = 0;
292         lov_id->l_object_id = oa->o_id;
293
294         EXIT;
295  out:
296         ptlrpc_free_req(request);
297         return 0;
298 }
299
300 static int osc_punch(struct obd_conn *conn, struct obdo *oa, obd_size count,
301                      obd_off offset)
302 {
303         struct ptlrpc_request *request;
304         struct ptlrpc_client *cl;
305         struct ptlrpc_connection *connection;
306         struct ost_body *body;
307         int rc, size = sizeof(*body);
308         ENTRY;
309
310         if (!oa) {
311                 CERROR("oa NULL\n");
312                 RETURN(-EINVAL);
313         }
314         osc_con2cl(conn, &cl, &connection);
315         request = ptlrpc_prep_req(cl, connection, OST_PUNCH, 1, &size, NULL);
316         if (!request)
317                 RETURN(-ENOMEM);
318
319         body = lustre_msg_buf(request->rq_reqmsg, 0);
320         memcpy(&body->oa, oa, sizeof(*oa));
321         body->connid = conn->oc_id;
322         body->oa.o_valid = ~0;
323         body->oa.o_size = offset;
324         body->oa.o_blocks = count;
325
326         request->rq_replen = lustre_msg_size(1, &size);
327
328         rc = ptlrpc_queue_wait(request);
329         rc = ptlrpc_check_status(request, rc);
330         if (rc)
331                 GOTO(out, rc);
332
333         body = lustre_msg_buf(request->rq_repmsg, 0);
334         memcpy(oa, &body->oa, sizeof(*oa));
335
336         EXIT;
337  out:
338         ptlrpc_free_req(request);
339         return 0;
340 }
341
342 static int osc_destroy(struct obd_conn *conn, struct obdo *oa)
343 {
344         struct ptlrpc_request *request;
345         struct ptlrpc_client *cl;
346         struct ptlrpc_connection *connection;
347         struct ost_body *body;
348         int rc, size = sizeof(*body);
349         ENTRY;
350
351         if (!oa) {
352                 CERROR("oa NULL\n");
353                 RETURN(-EINVAL);
354         }
355         osc_con2cl(conn, &cl, &connection);
356         request = ptlrpc_prep_req(cl, connection, OST_DESTROY, 1, &size, NULL);
357         if (!request)
358                 RETURN(-ENOMEM);
359
360         body = lustre_msg_buf(request->rq_reqmsg, 0);
361         memcpy(&body->oa, oa, sizeof(*oa));
362         body->connid = conn->oc_id;
363         body->oa.o_valid = ~0;
364
365         request->rq_replen = lustre_msg_size(1, &size);
366
367         rc = ptlrpc_queue_wait(request);
368         rc = ptlrpc_check_status(request, rc);
369         if (rc)
370                 GOTO(out, rc);
371
372         body = lustre_msg_buf(request->rq_repmsg, 0);
373         memcpy(oa, &body->oa, sizeof(*oa));
374
375         EXIT;
376  out:
377         ptlrpc_free_req(request);
378         return 0;
379 }
380
381 struct osc_brw_cb_data {
382         struct page **buf;
383         struct ptlrpc_request *req;
384         bulk_callback_t callback;
385         void *cb_data;
386 };
387
388 static void brw_read_finish(struct ptlrpc_bulk_desc *desc, void *data)
389 {
390         struct osc_brw_cb_data *cb_data = data;
391
392         if (desc->b_flags & PTL_RPC_FL_INTR)
393                 CERROR("got signal\n");
394
395         (cb_data->callback)(desc, cb_data->cb_data);
396
397         ptlrpc_free_bulk(desc);
398         ptlrpc_free_req(cb_data->req);
399
400         OBD_FREE(cb_data, sizeof(*cb_data));
401 }
402
403 static int osc_brw_read(struct obd_conn *conn, obd_count num_oa,
404                         struct obdo **oa, obd_count *oa_bufs, struct page **buf,
405                         obd_size *count, obd_off *offset, obd_flag *flags,
406                         bulk_callback_t callback)
407 {
408         struct ptlrpc_client *cl;
409         struct ptlrpc_connection *connection;
410         struct ptlrpc_request *request;
411         struct ost_body *body;
412         struct list_head *tmp;
413         int pages, rc, i, j, size[3] = {sizeof(*body)};
414         void *ptr1, *ptr2;
415         struct ptlrpc_bulk_desc *desc;
416         ENTRY;
417
418         size[1] = num_oa * sizeof(struct obd_ioobj);
419         pages = 0;
420         for (i = 0; i < num_oa; i++)
421                 pages += oa_bufs[i];
422         size[2] = pages * sizeof(struct niobuf_remote);
423
424         osc_con2cl(conn, &cl, &connection);
425         request = ptlrpc_prep_req(cl, connection, OST_BRW, 3, size, NULL);
426         if (!request)
427                 GOTO(out, rc = -ENOMEM);
428
429         body = lustre_msg_buf(request->rq_reqmsg, 0);
430         body->data = OBD_BRW_READ;
431
432         desc = ptlrpc_prep_bulk(connection);
433         if (!desc)
434                 GOTO(out2, rc = -ENOMEM);
435         desc->b_portal = OST_BULK_PORTAL;
436
437         ptr1 = lustre_msg_buf(request->rq_reqmsg, 1);
438         ptr2 = lustre_msg_buf(request->rq_reqmsg, 2);
439         for (pages = 0, i = 0; i < num_oa; i++) {
440                 ost_pack_ioo(&ptr1, oa[i], oa_bufs[i]);
441                 /* FIXME: this inner loop is wrong for multiple OAs */
442                 for (j = 0; j < oa_bufs[i]; j++, pages++) {
443                         struct ptlrpc_bulk_page *bulk;
444                         bulk = ptlrpc_prep_bulk_page(desc);
445                         if (bulk == NULL)
446                                 GOTO(out3, rc = -ENOMEM);
447
448                         spin_lock(&connection->c_lock);
449                         bulk->b_xid = ++connection->c_xid_out;
450                         spin_unlock(&connection->c_lock);
451
452                         bulk->b_buf = kmap(buf[pages]);
453                         bulk->b_page = buf[pages];
454                         bulk->b_buflen = PAGE_SIZE;
455                         ost_pack_niobuf(&ptr2, offset[pages], count[pages],
456                                         flags[pages], bulk->b_xid);
457                 }
458         }
459
460         rc = ptlrpc_register_bulk(desc);
461         if (rc)
462                 GOTO(out3, rc);
463
464         request->rq_replen = lustre_msg_size(1, size);
465         rc = ptlrpc_queue_wait(request);
466         rc = ptlrpc_check_status(request, rc);
467         if (rc)
468                 ptlrpc_abort_bulk(desc);
469         GOTO(out3, rc);
470
471  out3:
472         list_for_each(tmp, &desc->b_page_list) {
473                 struct ptlrpc_bulk_page *bulk;
474                 bulk = list_entry(tmp, struct ptlrpc_bulk_page, b_link);
475                 if (bulk->b_page != NULL)
476                         kunmap(bulk->b_page);
477         }
478         ptlrpc_free_bulk(desc);
479  out2:
480         ptlrpc_free_req(request);
481  out:
482         return rc;
483 }
484
485 static void brw_write_finish(struct ptlrpc_bulk_desc *desc, void *data)
486 {
487         struct osc_brw_cb_data *cb_data = data;
488         int i;
489         ENTRY;
490
491         if (desc->b_flags & PTL_RPC_FL_INTR)
492                 CERROR("got signal\n");
493
494         for (i = 0; i < desc->b_page_count; i++)
495                 kunmap(cb_data->buf[i]);
496
497         (cb_data->callback)(desc, cb_data->cb_data);
498
499         ptlrpc_free_bulk(desc);
500         ptlrpc_free_req(cb_data->req);
501
502         OBD_FREE(cb_data, sizeof(*cb_data));
503         EXIT;
504 }
505
506 static int osc_brw_write(struct obd_conn *conn, obd_count num_oa,
507                          struct obdo **oa, obd_count *oa_bufs,
508                          struct page **pagearray, obd_size *count,
509                          obd_off *offset, obd_flag *flags,
510                          bulk_callback_t callback)
511 {
512         struct ptlrpc_client *cl;
513         struct ptlrpc_connection *connection;
514         struct ptlrpc_request *request;
515         struct ptlrpc_bulk_desc *desc;
516         struct obd_ioobj ioo;
517         struct ost_body *body;
518         struct niobuf_local *local;
519         struct niobuf_remote *remote;
520         struct osc_brw_cb_data *cb_data;
521         long pages;
522         int rc, i, j, size[3] = {sizeof(*body)};
523         void *ptr1, *ptr2;
524         ENTRY;
525
526         size[1] = num_oa * sizeof(ioo);
527         pages = 0;
528         for (i = 0; i < num_oa; i++)
529                 pages += oa_bufs[i];
530         size[2] = pages * sizeof(*remote);
531
532         OBD_ALLOC(local, pages * sizeof(*local));
533         if (local == NULL)
534                 RETURN(-ENOMEM);
535
536         osc_con2cl(conn, &cl, &connection);
537         request = ptlrpc_prep_req(cl, connection, OST_BRW, 3, size, NULL);
538         if (!request)
539                 GOTO(out, rc = -ENOMEM);
540         body = lustre_msg_buf(request->rq_reqmsg, 0);
541         body->data = OBD_BRW_WRITE;
542
543         ptr1 = lustre_msg_buf(request->rq_reqmsg, 1);
544         ptr2 = lustre_msg_buf(request->rq_reqmsg, 2);
545         for (pages = 0, i = 0; i < num_oa; i++) {
546                 ost_pack_ioo(&ptr1, oa[i], oa_bufs[i]);
547                 for (j = 0; j < oa_bufs[i]; j++, pages++) {
548                         local[pages].addr = kmap(pagearray[pages]);
549                         local[pages].offset = offset[pages];
550                         local[pages].len = count[pages];
551                         ost_pack_niobuf(&ptr2, offset[pages], count[pages],
552                                         flags[pages], 0);
553                 }
554         }
555
556         size[1] = pages * sizeof(struct niobuf_remote);
557         request->rq_replen = lustre_msg_size(2, size);
558
559         rc = ptlrpc_queue_wait(request);
560         rc = ptlrpc_check_status(request, rc);
561         if (rc)
562                 GOTO(out2, rc);
563
564         ptr2 = lustre_msg_buf(request->rq_repmsg, 1);
565         if (ptr2 == NULL)
566                 GOTO(out2, rc = -EINVAL);
567
568         if (request->rq_repmsg->buflens[1] !=
569             pages * sizeof(struct niobuf_remote)) {
570                 CERROR("buffer length wrong (%d vs. %ld)\n",
571                        request->rq_repmsg->buflens[1],
572                        pages * sizeof(struct niobuf_remote));
573                 GOTO(out2, rc = -EINVAL);
574         }
575
576         desc = ptlrpc_prep_bulk(connection);
577         desc->b_portal = OSC_BULK_PORTAL;
578         if (callback) {
579                 desc->b_cb = brw_write_finish;
580                 OBD_ALLOC(cb_data, sizeof(*cb_data));
581                 cb_data->buf = pagearray;
582                 cb_data->callback = callback;
583                 desc->b_cb_data = cb_data;
584         }
585
586         for (pages = 0, i = 0; i < num_oa; i++) {
587                 for (j = 0; j < oa_bufs[i]; j++, pages++) {
588                         struct ptlrpc_bulk_page *page;
589
590                         ost_unpack_niobuf(&ptr2, &remote);
591
592                         page = ptlrpc_prep_bulk_page(desc);
593                         if (page == NULL)
594                                 GOTO(out3, rc = -ENOMEM);
595
596                         page->b_buf = (void *)(unsigned long)local[pages].addr;
597                         page->b_buflen = local[pages].len;
598                         page->b_xid = remote->xid;
599                 }
600         }
601
602         if (desc->b_page_count != pages)
603                 LBUG();
604
605         rc = ptlrpc_send_bulk(desc);
606         if (rc)
607                 GOTO(out3, rc);
608         if (callback)
609                 GOTO(out, rc);
610
611         /* If there's no callback function, sleep here until complete. */
612         wait_event_interruptible(desc->b_waitq, ptlrpc_check_bulk_sent(desc));
613         if (desc->b_flags & PTL_RPC_FL_INTR)
614                 rc = -EINTR;
615
616         GOTO(out3, rc);
617
618  out3:
619         ptlrpc_free_bulk(desc);
620  out2:
621         ptlrpc_free_req(request);
622         for (pages = 0, i = 0; i < num_oa; i++)
623                 for (j = 0; j < oa_bufs[i]; j++, pages++)
624                         kunmap(pagearray[pages]);
625  out:
626         OBD_FREE(local, pages * sizeof(*local));
627
628         return rc;
629 }
630
631 static int osc_brw(int cmd, struct obd_conn *conn, obd_count num_oa,
632                    struct obdo **oa, obd_count *oa_bufs, struct page **buf,
633                    obd_size *count, obd_off *offset, obd_flag *flags,
634                    void *callback)
635 {
636         if (num_oa != 1)
637                 LBUG();
638
639         if (cmd & OBD_BRW_WRITE)
640                 return osc_brw_write(conn, num_oa, oa, oa_bufs, buf, count,
641                                      offset, flags, (bulk_callback_t)callback);
642         else
643                 return osc_brw_read(conn, num_oa, oa, oa_bufs, buf, count,
644                                     offset, flags, (bulk_callback_t)callback);
645 }
646
647 static int osc_enqueue(struct obd_conn *oconn,
648                        struct lustre_handle *parent_lock, __u64 *res_id,
649                        __u32 type, void *extentp, int extent_len, __u32 mode,
650                        int *flags, void *callback, void *data, int datalen,
651                        struct lustre_handle *lockh)
652 {
653         struct obd_device *obddev = oconn->oc_dev;
654         struct ptlrpc_connection *conn;
655         struct ptlrpc_client *cl;
656         struct ldlm_extent *extent = extentp;
657         int rc;
658         __u32 mode2;
659
660         /* Filesystem locks are given a bit of special treatment: first we
661          * fixup the lock to start and end on page boundaries. */
662         extent->start &= PAGE_MASK;
663         extent->end = (extent->end + PAGE_SIZE - 1) & PAGE_MASK;
664
665         /* Next, search for already existing extent locks that will cover us */
666         osc_con2dlmcl(oconn, &cl, &conn);
667         rc = ldlm_local_lock_match(obddev->obd_namespace, res_id, type, extent,
668                                    sizeof(extent), mode, lockh);
669         if (rc == 1) {
670                 /* We already have a lock, and it's referenced */
671                 return 0;
672         }
673
674         /* Next, search for locks that we can upgrade (if we're trying to write)
675          * or are more than we need (if we're trying to read).  Because the VFS
676          * and page cache already protect us locally, lots of readers/writers
677          * can share a single PW lock. */
678         if (mode == LCK_PW)
679                 mode2 = LCK_PR;
680         else
681                 mode2 = LCK_PW;
682
683         rc = ldlm_local_lock_match(obddev->obd_namespace, res_id, type, extent,
684                                    sizeof(extent), mode2, lockh);
685         if (rc == 1) {
686                 int flags;
687                 struct ldlm_lock *lock = lustre_handle2object(lockh);
688                 /* FIXME: This is not incredibly elegant, but it might
689                  * be more elegant than adding another parameter to
690                  * lock_match.  I want a second opinion. */
691                 ldlm_lock_addref(lock, mode);
692                 ldlm_lock_decref(lock, mode2);
693
694                 if (mode == LCK_PR)
695                         return 0;
696
697                 rc = ldlm_cli_convert(cl, lockh, mode, &flags);
698                 if (rc)
699                         LBUG();
700
701                 return rc;
702         }
703
704         rc = ldlm_cli_enqueue(cl, conn, NULL, obddev->obd_namespace,
705                               parent_lock, res_id, type, extent, sizeof(extent),
706                               mode, flags, callback, data, datalen, lockh);
707         return rc;
708 }
709
710 static int osc_cancel(struct obd_conn *oconn, __u32 mode,
711                       struct lustre_handle *lockh)
712 {
713         struct ldlm_lock *lock;
714         ENTRY;
715
716         lock = lustre_handle2object(lockh);
717         ldlm_lock_decref(lock, mode);
718
719         RETURN(0);
720 }
721
722 static int osc_setup(struct obd_device *obddev, obd_count len, void *buf)
723 {
724         struct obd_ioctl_data* data = buf;
725         struct osc_obd *osc = &obddev->u.osc;
726         char server_uuid[37];
727         int rc;
728         ENTRY;
729
730         if (data->ioc_inllen1 < 1) {
731                 CERROR("osc setup requires a TARGET UUID\n");
732                 RETURN(-EINVAL);
733         }
734
735         if (data->ioc_inllen1 > 37) {
736                 CERROR("osc TARGET UUID must be less than 38 characters\n");
737                 RETURN(-EINVAL);
738         }
739
740         if (data->ioc_inllen2 < 1) {
741                 CERROR("osc setup requires a SERVER UUID\n");
742                 RETURN(-EINVAL);
743         }
744
745         if (data->ioc_inllen2 > 37) {
746                 CERROR("osc SERVER UUID must be less than 38 characters\n");
747                 RETURN(-EINVAL);
748         }
749
750         memcpy(osc->osc_target_uuid, data->ioc_inlbuf1, data->ioc_inllen1);
751         memcpy(server_uuid, data->ioc_inlbuf2, MIN(data->ioc_inllen2,
752                                                    sizeof(server_uuid)));
753
754         osc->osc_conn = ptlrpc_uuid_to_connection(server_uuid);
755         if (!osc->osc_conn)
756                 RETURN(-ENOENT);
757
758         obddev->obd_namespace =
759                 ldlm_namespace_new("osc", LDLM_NAMESPACE_CLIENT);
760         if (obddev->obd_namespace == NULL)
761                 GOTO(out_conn, rc = -ENOMEM);
762
763         OBD_ALLOC(osc->osc_client, sizeof(*osc->osc_client));
764         if (osc->osc_client == NULL)
765                 GOTO(out_ns, rc = -ENOMEM);
766
767         OBD_ALLOC(osc->osc_ldlm_client, sizeof(*osc->osc_ldlm_client));
768         if (osc->osc_ldlm_client == NULL)
769                 GOTO(out_client, rc = -ENOMEM);
770
771         ptlrpc_init_client(NULL, NULL, OST_REQUEST_PORTAL, OSC_REPLY_PORTAL,
772                            osc->osc_client);
773         ptlrpc_init_client(NULL, NULL, LDLM_REQUEST_PORTAL, LDLM_REPLY_PORTAL,
774                            osc->osc_ldlm_client);
775         osc->osc_client->cli_name = "osc";
776         osc->osc_ldlm_client->cli_name = "ldlm";
777
778         MOD_INC_USE_COUNT;
779         RETURN(0);
780
781  out_client:
782         OBD_FREE(osc->osc_client, sizeof(*osc->osc_client));
783  out_ns:
784         ldlm_namespace_free(obddev->obd_namespace);
785  out_conn:
786         ptlrpc_put_connection(osc->osc_conn);
787         return rc;
788 }
789
790 static int osc_cleanup(struct obd_device * obddev)
791 {
792         struct osc_obd *osc = &obddev->u.osc;
793
794         ldlm_namespace_free(obddev->obd_namespace);
795
796         ptlrpc_cleanup_client(osc->osc_client);
797         OBD_FREE(osc->osc_client, sizeof(*osc->osc_client));
798         ptlrpc_cleanup_client(osc->osc_ldlm_client);
799         OBD_FREE(osc->osc_ldlm_client, sizeof(*osc->osc_ldlm_client));
800         ptlrpc_put_connection(osc->osc_conn);
801
802         MOD_DEC_USE_COUNT;
803         return 0;
804 }
805
806 #if 0
807 static int osc_statfs(struct obd_conn *conn, struct statfs *sfs);
808 {
809         struct ptlrpc_request *request;
810         struct ptlrpc_client *cl;
811         struct ptlrpc_connection *connection;
812         struct obd_statfs *osfs;
813         int rc, size = sizeof(*osfs);
814         ENTRY;
815
816         osc_con2cl(conn, &cl, &connection);
817         request = ptlrpc_prep_req(cl, connection, OST_STATFS, 0, NULL, NULL);
818         if (!request)
819                 RETURN(-ENOMEM);
820
821         request->rq_replen = lustre_msg_size(1, &size);
822
823         rc = ptlrpc_queue_wait(request);
824         rc = ptlrpc_check_status(request, rc);
825         if (rc)
826                 GOTO(out, rc);
827
828         osfs = lustre_msg_buf(request->rq_repmsg, 0);
829         obd_statfs_unpack(sfs, osfs);
830
831         EXIT;
832  out:
833         ptlrpc_free_req(request);
834         return 0;
835 }
836 #endif
837
838 struct obd_ops osc_obd_ops = {
839         o_setup:   osc_setup,
840         o_cleanup: osc_cleanup,
841         o_create: osc_create,
842         o_destroy: osc_destroy,
843         o_getattr: osc_getattr,
844         o_setattr: osc_setattr,
845         o_open: osc_open,
846         o_close: osc_close,
847         o_connect: osc_connect,
848         o_disconnect: osc_disconnect,
849         o_brw: osc_brw,
850         o_punch: osc_punch,
851         o_enqueue: osc_enqueue,
852         o_cancel: osc_cancel
853 };
854
855 static int __init osc_init(void)
856 {
857         return obd_register_type(&osc_obd_ops, LUSTRE_OSC_NAME);
858 }
859
860 static void __exit osc_exit(void)
861 {
862         obd_unregister_type(LUSTRE_OSC_NAME);
863 }
864
865 MODULE_AUTHOR("Peter J. Braam <braam@clusterfs.com>");
866 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC) v1.0");
867 MODULE_LICENSE("GPL");
868
869 module_init(osc_init);
870 module_exit(osc_exit);