Whamcloud - gitweb
Minor bug fixes to get export handles used by the DLM. I believe the
[fs/lustre-release.git] / lustre / osc / osc_request.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Copryright (C) 2001, 2002 Cluster File Systems, Inc.
5  *
6  *  This code is issued under the GNU General Public License.
7  *  See the file COPYING in this distribution
8  *
9  *  Author Peter Braam <braam@clusterfs.com>
10  *
11  *  This server is single threaded at present (but can easily be multi
12  *  threaded). For testing and management it is treated as an
13  *  obd_device, although it does not export a full OBD method table
14  *  (the requests are coming in over the wire, so object target
15  *  modules do not have a full method table.)
16  *
17  */
18
19 #define EXPORT_SYMTAB
20 #define DEBUG_SUBSYSTEM S_OSC
21
22 #include <linux/module.h>
23 #include <linux/lustre_dlm.h>
24 #include <linux/lustre_mds.h> /* for mds_objid */
25 #include <linux/obd_ost.h>
26 #include <linux/obd_lov.h>
27
28 static void osc_obd2cl(struct obd_device *obd, struct ptlrpc_client **cl,
29                        struct ptlrpc_connection **connection)
30 {
31         struct osc_obd *osc = &obd->u.osc;
32         *cl = osc->osc_client;
33         *connection = osc->osc_conn;
34 }
35
36 static void osc_con2cl(struct obd_conn *conn, struct ptlrpc_client **cl,
37                        struct ptlrpc_connection **connection)
38 {
39         struct osc_obd *osc = &gen_conn2obd(conn)->u.osc;
40         *cl = osc->osc_client;
41         *connection = osc->osc_conn;
42 }
43
44 static void osc_con2dlmcl(struct obd_conn *conn, struct ptlrpc_client **cl,
45                           struct ptlrpc_connection **connection)
46 {
47         struct osc_obd *osc = &gen_conn2obd(conn)->u.osc;
48         *cl = osc->osc_ldlm_client;
49         *connection = osc->osc_conn;
50 }
51
52 static int osc_connect(struct obd_conn *conn, struct obd_device *obd)
53 {
54         struct osc_obd *osc = &obd->u.osc;
55         struct obd_import *import;
56         struct ptlrpc_request *request;
57         struct ptlrpc_client *cl;
58         struct ptlrpc_connection *connection;
59         char *tmp = osc->osc_target_uuid;
60         int rc, size = sizeof(osc->osc_target_uuid);
61         ENTRY;
62
63         OBD_ALLOC(import, sizeof(*import)); 
64         if (!import)
65                 RETURN(-ENOMEM);
66                   
67         MOD_INC_USE_COUNT;
68         rc = gen_connect(conn, obd);
69         if (rc) 
70                 GOTO(out, rc);
71
72         osc_obd2cl(obd, &cl, &connection);
73         request = ptlrpc_prep_req(osc->osc_client, osc->osc_conn, 
74                                   OST_CONNECT, 1, &size, &tmp);
75         if (!request)
76                 RETURN(-ENOMEM);
77
78         request->rq_replen = lustre_msg_size(0, NULL);
79
80         rc = ptlrpc_queue_wait(request);
81         rc = ptlrpc_check_status(request, rc);
82         if (rc) {
83                 CERROR("%s failed: rc = %d\n", __FUNCTION__, rc);
84                 GOTO(out, rc);
85         }
86
87         /* XXX: Make this a handle */
88         osc->osc_connh.addr = request->rq_repmsg->addr;
89         osc->osc_connh.cookie = request->rq_repmsg->cookie;
90
91         EXIT;
92  out:
93         ptlrpc_free_req(request);
94         if (rc)
95                 MOD_DEC_USE_COUNT;
96         return rc;
97 }
98
99 static int osc_disconnect(struct obd_conn *conn)
100 {
101         struct ptlrpc_request *request;
102         struct ptlrpc_client *cl;
103         struct ptlrpc_connection *connection;
104         struct osc_obd *osc = &gen_conn2obd(conn)->u.osc;
105         int rc;
106         ENTRY;
107
108         osc_con2cl(conn, &cl, &connection);
109         request = ptlrpc_prep_req2(cl, connection, &osc->osc_connh,
110                                   OST_DISCONNECT, 0, NULL, NULL);
111         if (!request)
112                 RETURN(-ENOMEM);
113         request->rq_replen = lustre_msg_size(0, NULL);
114
115         rc = ptlrpc_queue_wait(request);
116         if (rc) 
117                 GOTO(out, rc);
118         rc = gen_disconnect(conn);
119         if (!rc)
120                 MOD_DEC_USE_COUNT;
121
122  out:
123         ptlrpc_free_req(request);
124         return rc;
125 }
126
127 static int osc_getattr(struct obd_conn *conn, struct obdo *oa)
128 {
129         struct ptlrpc_request *request;
130         struct ptlrpc_client *cl;
131         struct osc_obd *osc = &gen_conn2obd(conn)->u.osc;
132         struct ptlrpc_connection *connection;
133         struct ost_body *body;
134         int rc, size = sizeof(*body);
135         ENTRY;
136
137         osc_con2cl(conn, &cl, &connection);
138         request = ptlrpc_prep_req2(cl, connection, &osc->osc_connh, 
139                                    OST_GETATTR, 1, &size, NULL);
140         if (!request)
141                 RETURN(-ENOMEM);
142
143         body = lustre_msg_buf(request->rq_reqmsg, 0);
144         memcpy(&body->oa, oa, sizeof(*oa));
145         body->oa.o_valid = ~0;
146
147         request->rq_replen = lustre_msg_size(1, &size);
148
149         rc = ptlrpc_queue_wait(request);
150         rc = ptlrpc_check_status(request, rc);
151         if (rc) {
152                 CERROR("%s failed: rc = %d\n", __FUNCTION__, rc);
153                 GOTO(out, rc);
154         }
155
156         body = lustre_msg_buf(request->rq_repmsg, 0);
157         CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
158         if (oa)
159                 memcpy(oa, &body->oa, sizeof(*oa));
160
161         EXIT;
162  out:
163         ptlrpc_free_req(request);
164         return 0;
165 }
166
167 static int osc_open(struct obd_conn *conn, struct obdo *oa)
168 {
169         struct ptlrpc_request *request;
170         struct ptlrpc_client *cl;
171         struct osc_obd *osc = &gen_conn2obd(conn)->u.osc;
172         struct ptlrpc_connection *connection;
173         struct ost_body *body;
174         int rc, size = sizeof(*body);
175         ENTRY;
176
177         osc_con2cl(conn, &cl, &connection);
178         request = ptlrpc_prep_req2(cl, connection, &osc->osc_connh, 
179                                    OST_OPEN, 1, &size, NULL);
180         if (!request)
181                 RETURN(-ENOMEM);
182
183         body = lustre_msg_buf(request->rq_reqmsg, 0);
184         memcpy(&body->oa, oa, sizeof(*oa));
185         body->oa.o_valid = (OBD_MD_FLMODE | OBD_MD_FLID);
186
187         request->rq_replen = lustre_msg_size(1, &size);
188
189         rc = ptlrpc_queue_wait(request);
190         rc = ptlrpc_check_status(request, rc);
191         if (rc)
192                 GOTO(out, rc);
193
194         body = lustre_msg_buf(request->rq_repmsg, 0);
195         CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
196         if (oa)
197                 memcpy(oa, &body->oa, sizeof(*oa));
198
199         EXIT;
200  out:
201         ptlrpc_free_req(request);
202         return 0;
203 }
204
205 static int osc_close(struct obd_conn *conn, struct obdo *oa)
206 {
207         struct ptlrpc_request *request;
208         struct ptlrpc_client *cl;
209         struct osc_obd *osc = &gen_conn2obd(conn)->u.osc;
210         struct ptlrpc_connection *connection;
211         struct ost_body *body;
212         int rc, size = sizeof(*body);
213         ENTRY;
214
215         osc_con2cl(conn, &cl, &connection);
216         request = ptlrpc_prep_req2(cl, connection, &osc->osc_connh, 
217                                    OST_CLOSE, 1, &size, NULL);
218         if (!request)
219                 RETURN(-ENOMEM);
220
221         body = lustre_msg_buf(request->rq_reqmsg, 0);
222         memcpy(&body->oa, oa, sizeof(*oa));
223
224         request->rq_replen = lustre_msg_size(1, &size);
225
226         rc = ptlrpc_queue_wait(request);
227         rc = ptlrpc_check_status(request, rc);
228         if (rc)
229                 GOTO(out, rc);
230
231         body = lustre_msg_buf(request->rq_repmsg, 0);
232         CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
233         if (oa)
234                 memcpy(oa, &body->oa, sizeof(*oa));
235
236         EXIT;
237  out:
238         ptlrpc_free_req(request);
239         return 0;
240 }
241
242 static int osc_setattr(struct obd_conn *conn, struct obdo *oa)
243 {
244         struct ptlrpc_request *request;
245         struct ptlrpc_client *cl;
246         struct osc_obd *osc = &gen_conn2obd(conn)->u.osc;
247         struct ptlrpc_connection *connection;
248         struct ost_body *body;
249         int rc, size = sizeof(*body);
250         ENTRY;
251
252         osc_con2cl(conn, &cl, &connection);
253         request = ptlrpc_prep_req2(cl, connection, &osc->osc_connh, 
254                                   OST_SETATTR, 1, &size, NULL);
255         if (!request)
256                 RETURN(-ENOMEM);
257
258         body = lustre_msg_buf(request->rq_reqmsg, 0);
259         memcpy(&body->oa, oa, sizeof(*oa));
260
261         request->rq_replen = lustre_msg_size(1, &size);
262
263         rc = ptlrpc_queue_wait(request);
264         rc = ptlrpc_check_status(request, rc);
265         GOTO(out, rc);
266
267  out:
268         ptlrpc_free_req(request);
269         return 0;
270 }
271
272 static int osc_create(struct obd_conn *conn, struct obdo *oa)
273 {
274         struct ptlrpc_request *request;
275         struct ptlrpc_client *cl;
276         struct ptlrpc_connection *connection;
277         struct osc_obd *osc = &gen_conn2obd(conn)->u.osc;
278         struct ost_body *body;
279         struct mds_objid *objid;
280         struct lov_object_id *lov_id;
281         int rc, size = sizeof(*body);
282         ENTRY;
283
284         if (!oa) {
285                 CERROR("oa NULL\n");
286                 RETURN(-EINVAL);
287         }
288         osc_con2cl(conn, &cl, &connection);
289         request = ptlrpc_prep_req2(cl, connection, &osc->osc_connh, 
290                                   OST_CREATE, 1, &size, NULL);
291         if (!request)
292                 RETURN(-ENOMEM);
293
294         body = lustre_msg_buf(request->rq_reqmsg, 0);
295         memcpy(&body->oa, oa, sizeof(*oa));
296
297         request->rq_replen = lustre_msg_size(1, &size);
298
299         rc = ptlrpc_queue_wait(request);
300         rc = ptlrpc_check_status(request, rc);
301         if (rc)
302                 GOTO(out, rc);
303
304         body = lustre_msg_buf(request->rq_repmsg, 0);
305         memcpy(oa, &body->oa, sizeof(*oa));
306
307         memset(oa->o_inline, 0, sizeof(oa->o_inline));
308         objid = (struct mds_objid *)oa->o_inline;
309         objid->mo_lov_md.lmd_object_id = oa->o_id;
310         objid->mo_lov_md.lmd_stripe_count = 1;
311         lov_id = (struct lov_object_id *)(oa->o_inline + sizeof(*objid));
312         lov_id->l_device_id = 0;
313         lov_id->l_object_id = oa->o_id;
314
315         EXIT;
316  out:
317         ptlrpc_free_req(request);
318         return 0;
319 }
320
321 static int osc_punch(struct obd_conn *conn, struct obdo *oa, obd_size count,
322                      obd_off offset)
323 {
324         struct ptlrpc_request *request;
325         struct ptlrpc_client *cl;
326         struct ptlrpc_connection *connection;
327         struct osc_obd *osc = &gen_conn2obd(conn)->u.osc;
328         struct ost_body *body;
329         int rc, size = sizeof(*body);
330         ENTRY;
331
332         if (!oa) {
333                 CERROR("oa NULL\n");
334                 RETURN(-EINVAL);
335         }
336         osc_con2cl(conn, &cl, &connection);
337         request = ptlrpc_prep_req2(cl, connection, &osc->osc_connh, 
338                                    OST_PUNCH, 1, &size, NULL);
339         if (!request)
340                 RETURN(-ENOMEM);
341
342         body = lustre_msg_buf(request->rq_reqmsg, 0);
343         memcpy(&body->oa, oa, sizeof(*oa));
344         body->oa.o_blocks = count;
345         body->oa.o_valid |= OBD_MD_FLBLOCKS;
346
347         request->rq_replen = lustre_msg_size(1, &size);
348
349         rc = ptlrpc_queue_wait(request);
350         rc = ptlrpc_check_status(request, rc);
351         if (rc)
352                 GOTO(out, rc);
353
354         body = lustre_msg_buf(request->rq_repmsg, 0);
355         memcpy(oa, &body->oa, sizeof(*oa));
356
357         EXIT;
358  out:
359         ptlrpc_free_req(request);
360         return 0;
361 }
362
363 static int osc_destroy(struct obd_conn *conn, struct obdo *oa)
364 {
365         struct ptlrpc_request *request;
366         struct ptlrpc_client *cl;
367         struct ptlrpc_connection *connection;
368         struct osc_obd *osc = &gen_conn2obd(conn)->u.osc;
369         struct ost_body *body;
370         int rc, size = sizeof(*body);
371         ENTRY;
372
373         if (!oa) {
374                 CERROR("oa NULL\n");
375                 RETURN(-EINVAL);
376         }
377         osc_con2cl(conn, &cl, &connection);
378         request = ptlrpc_prep_req2(cl, connection, &osc->osc_connh, 
379                                    OST_DESTROY, 1, &size, NULL);
380         if (!request)
381                 RETURN(-ENOMEM);
382
383         body = lustre_msg_buf(request->rq_reqmsg, 0);
384         memcpy(&body->oa, oa, sizeof(*oa));
385         body->oa.o_valid = ~0;
386
387         request->rq_replen = lustre_msg_size(1, &size);
388
389         rc = ptlrpc_queue_wait(request);
390         rc = ptlrpc_check_status(request, rc);
391         if (rc)
392                 GOTO(out, rc);
393
394         body = lustre_msg_buf(request->rq_repmsg, 0);
395         memcpy(oa, &body->oa, sizeof(*oa));
396
397         EXIT;
398  out:
399         ptlrpc_free_req(request);
400         return 0;
401 }
402
403 struct osc_brw_cb_data {
404         struct page **buf;
405         struct ptlrpc_request *req;
406         bulk_callback_t callback;
407         void *cb_data;
408 };
409
410 static void brw_read_finish(struct ptlrpc_bulk_desc *desc, void *data)
411 {
412         struct osc_brw_cb_data *cb_data = data;
413
414         if (desc->b_flags & PTL_RPC_FL_INTR)
415                 CERROR("got signal\n");
416
417         (cb_data->callback)(desc, cb_data->cb_data);
418
419         ptlrpc_free_bulk(desc);
420         ptlrpc_free_req(cb_data->req);
421
422         OBD_FREE(cb_data, sizeof(*cb_data));
423 }
424
425 static int osc_brw_read(struct obd_conn *conn, obd_count num_oa,
426                         struct obdo **oa, obd_count *oa_bufs, struct page **buf,
427                         obd_size *count, obd_off *offset, obd_flag *flags,
428                         bulk_callback_t callback)
429 {
430         struct ptlrpc_client *cl;
431         struct ptlrpc_connection *connection;
432         struct osc_obd *osc = &gen_conn2obd(conn)->u.osc;
433         struct ptlrpc_request *request;
434         struct ost_body *body;
435         struct list_head *tmp;
436         int pages, rc, i, j, size[3] = {sizeof(*body)};
437         void *ptr1, *ptr2;
438         struct ptlrpc_bulk_desc *desc;
439         ENTRY;
440
441         size[1] = num_oa * sizeof(struct obd_ioobj);
442         pages = 0;
443         for (i = 0; i < num_oa; i++)
444                 pages += oa_bufs[i];
445         size[2] = pages * sizeof(struct niobuf_remote);
446
447         osc_con2cl(conn, &cl, &connection);
448         request = ptlrpc_prep_req2(cl, connection, &osc->osc_connh, 
449                                   OST_BRW, 3, size, NULL);
450         if (!request)
451                 GOTO(out, rc = -ENOMEM);
452
453         body = lustre_msg_buf(request->rq_reqmsg, 0);
454         body->data = OBD_BRW_READ;
455
456         desc = ptlrpc_prep_bulk(connection);
457         if (!desc)
458                 GOTO(out2, rc = -ENOMEM);
459         desc->b_portal = OST_BULK_PORTAL;
460
461         ptr1 = lustre_msg_buf(request->rq_reqmsg, 1);
462         ptr2 = lustre_msg_buf(request->rq_reqmsg, 2);
463         for (pages = 0, i = 0; i < num_oa; i++) {
464                 ost_pack_ioo(&ptr1, oa[i], oa_bufs[i]);
465                 /* FIXME: this inner loop is wrong for multiple OAs */
466                 for (j = 0; j < oa_bufs[i]; j++, pages++) {
467                         struct ptlrpc_bulk_page *bulk;
468                         bulk = ptlrpc_prep_bulk_page(desc);
469                         if (bulk == NULL)
470                                 GOTO(out3, rc = -ENOMEM);
471
472                         spin_lock(&connection->c_lock);
473                         bulk->b_xid = ++connection->c_xid_out;
474                         spin_unlock(&connection->c_lock);
475
476                         bulk->b_buf = kmap(buf[pages]);
477                         bulk->b_page = buf[pages];
478                         bulk->b_buflen = PAGE_SIZE;
479                         ost_pack_niobuf(&ptr2, offset[pages], count[pages],
480                                         flags[pages], bulk->b_xid);
481                 }
482         }
483
484         rc = ptlrpc_register_bulk(desc);
485         if (rc)
486                 GOTO(out3, rc);
487
488         request->rq_replen = lustre_msg_size(1, size);
489         rc = ptlrpc_queue_wait(request);
490         rc = ptlrpc_check_status(request, rc);
491         if (rc)
492                 ptlrpc_abort_bulk(desc);
493         GOTO(out3, rc);
494
495  out3:
496         list_for_each(tmp, &desc->b_page_list) {
497                 struct ptlrpc_bulk_page *bulk;
498                 bulk = list_entry(tmp, struct ptlrpc_bulk_page, b_link);
499                 if (bulk->b_page != NULL)
500                         kunmap(bulk->b_page);
501         }
502         ptlrpc_free_bulk(desc);
503  out2:
504         ptlrpc_free_req(request);
505  out:
506         return rc;
507 }
508
509 static void brw_write_finish(struct ptlrpc_bulk_desc *desc, void *data)
510 {
511         struct osc_brw_cb_data *cb_data = data;
512         int i;
513         ENTRY;
514
515         if (desc->b_flags & PTL_RPC_FL_INTR)
516                 CERROR("got signal\n");
517
518         for (i = 0; i < desc->b_page_count; i++)
519                 kunmap(cb_data->buf[i]);
520
521         (cb_data->callback)(desc, cb_data->cb_data);
522
523         ptlrpc_free_bulk(desc);
524         ptlrpc_free_req(cb_data->req);
525
526         OBD_FREE(cb_data, sizeof(*cb_data));
527         EXIT;
528 }
529
530 static int osc_brw_write(struct obd_conn *conn, obd_count num_oa,
531                          struct obdo **oa, obd_count *oa_bufs,
532                          struct page **pagearray, obd_size *count,
533                          obd_off *offset, obd_flag *flags,
534                          bulk_callback_t callback)
535 {
536         struct ptlrpc_client *cl;
537         struct ptlrpc_connection *connection;
538         struct ptlrpc_request *request;
539         struct ptlrpc_bulk_desc *desc;
540         struct osc_obd *osc = &gen_conn2obd(conn)->u.osc;
541         struct obd_ioobj ioo;
542         struct ost_body *body;
543         struct niobuf_local *local;
544         struct niobuf_remote *remote;
545         struct osc_brw_cb_data *cb_data;
546         long pages;
547         int rc, i, j, size[3] = {sizeof(*body)};
548         void *ptr1, *ptr2;
549         ENTRY;
550
551         size[1] = num_oa * sizeof(ioo);
552         pages = 0;
553         for (i = 0; i < num_oa; i++)
554                 pages += oa_bufs[i];
555         size[2] = pages * sizeof(*remote);
556
557         OBD_ALLOC(local, pages * sizeof(*local));
558         if (local == NULL)
559                 RETURN(-ENOMEM);
560
561         osc_con2cl(conn, &cl, &connection);
562         request = ptlrpc_prep_req2(cl, connection, &osc->osc_connh, 
563                                   OST_BRW, 3, size, NULL);
564         if (!request)
565                 GOTO(out, rc = -ENOMEM);
566         body = lustre_msg_buf(request->rq_reqmsg, 0);
567         body->data = OBD_BRW_WRITE;
568
569         ptr1 = lustre_msg_buf(request->rq_reqmsg, 1);
570         ptr2 = lustre_msg_buf(request->rq_reqmsg, 2);
571         for (pages = 0, i = 0; i < num_oa; i++) {
572                 ost_pack_ioo(&ptr1, oa[i], oa_bufs[i]);
573                 for (j = 0; j < oa_bufs[i]; j++, pages++) {
574                         local[pages].addr = kmap(pagearray[pages]);
575                         local[pages].offset = offset[pages];
576                         local[pages].len = count[pages];
577                         ost_pack_niobuf(&ptr2, offset[pages], count[pages],
578                                         flags[pages], 0);
579                 }
580         }
581
582         size[1] = pages * sizeof(struct niobuf_remote);
583         request->rq_replen = lustre_msg_size(2, size);
584
585         rc = ptlrpc_queue_wait(request);
586         rc = ptlrpc_check_status(request, rc);
587         if (rc)
588                 GOTO(out2, rc);
589
590         ptr2 = lustre_msg_buf(request->rq_repmsg, 1);
591         if (ptr2 == NULL)
592                 GOTO(out2, rc = -EINVAL);
593
594         if (request->rq_repmsg->buflens[1] !=
595             pages * sizeof(struct niobuf_remote)) {
596                 CERROR("buffer length wrong (%d vs. %ld)\n",
597                        request->rq_repmsg->buflens[1],
598                        pages * sizeof(struct niobuf_remote));
599                 GOTO(out2, rc = -EINVAL);
600         }
601
602         desc = ptlrpc_prep_bulk(connection);
603         desc->b_portal = OSC_BULK_PORTAL;
604         if (callback) {
605                 desc->b_cb = brw_write_finish;
606                 OBD_ALLOC(cb_data, sizeof(*cb_data));
607                 cb_data->buf = pagearray;
608                 cb_data->callback = callback;
609                 desc->b_cb_data = cb_data;
610         }
611
612         for (pages = 0, i = 0; i < num_oa; i++) {
613                 for (j = 0; j < oa_bufs[i]; j++, pages++) {
614                         struct ptlrpc_bulk_page *page;
615
616                         ost_unpack_niobuf(&ptr2, &remote);
617
618                         page = ptlrpc_prep_bulk_page(desc);
619                         if (page == NULL)
620                                 GOTO(out3, rc = -ENOMEM);
621
622                         page->b_buf = (void *)(unsigned long)local[pages].addr;
623                         page->b_buflen = local[pages].len;
624                         page->b_xid = remote->xid;
625                 }
626         }
627
628         if (desc->b_page_count != pages)
629                 LBUG();
630
631         rc = ptlrpc_send_bulk(desc);
632         if (rc)
633                 GOTO(out3, rc);
634         if (callback)
635                 GOTO(out, rc);
636
637         /* If there's no callback function, sleep here until complete. */
638         wait_event_interruptible(desc->b_waitq, ptlrpc_check_bulk_sent(desc));
639         if (desc->b_flags & PTL_RPC_FL_INTR)
640                 rc = -EINTR;
641
642         GOTO(out3, rc);
643
644  out3:
645         ptlrpc_free_bulk(desc);
646  out2:
647         ptlrpc_free_req(request);
648         for (pages = 0, i = 0; i < num_oa; i++)
649                 for (j = 0; j < oa_bufs[i]; j++, pages++)
650                         kunmap(pagearray[pages]);
651  out:
652         OBD_FREE(local, pages * sizeof(*local));
653
654         return rc;
655 }
656
657 static int osc_brw(int cmd, struct obd_conn *conn, obd_count num_oa,
658                    struct obdo **oa, obd_count *oa_bufs, struct page **buf,
659                    obd_size *count, obd_off *offset, obd_flag *flags,
660                    void *callback)
661 {
662         if (num_oa != 1)
663                 LBUG();
664
665         if (cmd & OBD_BRW_WRITE)
666                 return osc_brw_write(conn, num_oa, oa, oa_bufs, buf, count,
667                                      offset, flags, (bulk_callback_t)callback);
668         else
669                 return osc_brw_read(conn, num_oa, oa, oa_bufs, buf, count,
670                                     offset, flags, (bulk_callback_t)callback);
671 }
672
673 static int osc_enqueue(struct obd_conn *oconn,
674                        struct lustre_handle *parent_lock, __u64 *res_id,
675                        __u32 type, void *extentp, int extent_len, __u32 mode,
676                        int *flags, void *callback, void *data, int datalen,
677                        struct lustre_handle *lockh)
678 {
679         struct obd_device *obddev = gen_conn2obd(oconn);
680         struct osc_obd *osc = &obddev->u.osc;
681         struct ptlrpc_connection *conn;
682         struct ptlrpc_client *cl;
683         struct ldlm_extent *extent = extentp;
684         int rc;
685         __u32 mode2;
686
687         /* Filesystem locks are given a bit of special treatment: first we
688          * fixup the lock to start and end on page boundaries. */
689         extent->start &= PAGE_MASK;
690         extent->end = (extent->end + PAGE_SIZE - 1) & PAGE_MASK;
691
692         /* Next, search for already existing extent locks that will cover us */
693         osc_con2dlmcl(oconn, &cl, &conn);
694         rc = ldlm_lock_match(obddev->obd_namespace, res_id, type, extent,
695                              sizeof(extent), mode, lockh);
696         if (rc == 1) {
697                 /* We already have a lock, and it's referenced */
698                 return 0;
699         }
700
701         /* Next, search for locks that we can upgrade (if we're trying to write)
702          * or are more than we need (if we're trying to read).  Because the VFS
703          * and page cache already protect us locally, lots of readers/writers
704          * can share a single PW lock. */
705         if (mode == LCK_PW)
706                 mode2 = LCK_PR;
707         else
708                 mode2 = LCK_PW;
709
710         rc = ldlm_lock_match(obddev->obd_namespace, res_id, type, extent,
711                              sizeof(extent), mode2, lockh);
712         if (rc == 1) {
713                 int flags;
714                 /* FIXME: This is not incredibly elegant, but it might
715                  * be more elegant than adding another parameter to
716                  * lock_match.  I want a second opinion. */
717                 ldlm_lock_addref(lockh, mode);
718                 ldlm_lock_decref(lockh, mode2);
719
720                 if (mode == LCK_PR)
721                         return 0;
722
723                 rc = ldlm_cli_convert(cl, lockh, &osc->osc_connh, 
724                                       mode, &flags);
725                 if (rc)
726                         LBUG();
727
728                 return rc;
729         }
730
731         rc = ldlm_cli_enqueue(cl, conn, &osc->osc_connh, 
732                               NULL, obddev->obd_namespace,
733                               parent_lock, res_id, type, extent, sizeof(extent),
734                               mode, flags, callback, data, datalen, lockh);
735         return rc;
736 }
737
738 static int osc_cancel(struct obd_conn *oconn, __u32 mode,
739                       struct lustre_handle *lockh)
740 {
741         ENTRY;
742
743         ldlm_lock_decref(lockh, mode);
744
745         RETURN(0);
746 }
747
748 static int osc_setup(struct obd_device *obddev, obd_count len, void *buf)
749 {
750         struct obd_ioctl_data* data = buf;
751         struct osc_obd *osc = &obddev->u.osc;
752         char server_uuid[37];
753         int rc;
754         ENTRY;
755
756         if (data->ioc_inllen1 < 1) {
757                 CERROR("osc setup requires a TARGET UUID\n");
758                 RETURN(-EINVAL);
759         }
760
761         if (data->ioc_inllen1 > 37) {
762                 CERROR("osc TARGET UUID must be less than 38 characters\n");
763                 RETURN(-EINVAL);
764         }
765
766         if (data->ioc_inllen2 < 1) {
767                 CERROR("osc setup requires a SERVER UUID\n");
768                 RETURN(-EINVAL);
769         }
770
771         if (data->ioc_inllen2 > 37) {
772                 CERROR("osc SERVER UUID must be less than 38 characters\n");
773                 RETURN(-EINVAL);
774         }
775
776         memcpy(osc->osc_target_uuid, data->ioc_inlbuf1, data->ioc_inllen1);
777         memcpy(server_uuid, data->ioc_inlbuf2, MIN(data->ioc_inllen2,
778                                                    sizeof(server_uuid)));
779
780         osc->osc_conn = ptlrpc_uuid_to_connection(server_uuid);
781         if (!osc->osc_conn)
782                 RETURN(-ENOENT);
783
784         obddev->obd_namespace =
785                 ldlm_namespace_new("osc", LDLM_NAMESPACE_CLIENT);
786         if (obddev->obd_namespace == NULL)
787                 GOTO(out_conn, rc = -ENOMEM);
788
789         OBD_ALLOC(osc->osc_client, sizeof(*osc->osc_client));
790         if (osc->osc_client == NULL)
791                 GOTO(out_ns, rc = -ENOMEM);
792
793         OBD_ALLOC(osc->osc_ldlm_client, sizeof(*osc->osc_ldlm_client));
794         if (osc->osc_ldlm_client == NULL)
795                 GOTO(out_client, rc = -ENOMEM);
796
797         ptlrpc_init_client(NULL, NULL, OST_REQUEST_PORTAL, OSC_REPLY_PORTAL,
798                            osc->osc_client);
799         ptlrpc_init_client(NULL, NULL, LDLM_REQUEST_PORTAL, LDLM_REPLY_PORTAL,
800                            osc->osc_ldlm_client);
801         osc->osc_client->cli_name = "osc";
802         osc->osc_ldlm_client->cli_name = "ldlm";
803
804         MOD_INC_USE_COUNT;
805         RETURN(0);
806
807  out_client:
808         OBD_FREE(osc->osc_client, sizeof(*osc->osc_client));
809  out_ns:
810         ldlm_namespace_free(obddev->obd_namespace);
811  out_conn:
812         ptlrpc_put_connection(osc->osc_conn);
813         return rc;
814 }
815
816 static int osc_cleanup(struct obd_device * obddev)
817 {
818         struct osc_obd *osc = &obddev->u.osc;
819
820         ldlm_namespace_free(obddev->obd_namespace);
821
822         ptlrpc_cleanup_client(osc->osc_client);
823         OBD_FREE(osc->osc_client, sizeof(*osc->osc_client));
824         ptlrpc_cleanup_client(osc->osc_ldlm_client);
825         OBD_FREE(osc->osc_ldlm_client, sizeof(*osc->osc_ldlm_client));
826         ptlrpc_put_connection(osc->osc_conn);
827
828         MOD_DEC_USE_COUNT;
829         return 0;
830 }
831
832 #if 0
833 static int osc_statfs(struct obd_conn *conn, struct statfs *sfs);
834 {
835         struct ptlrpc_request *request;
836         struct ptlrpc_client *cl;
837         struct ptlrpc_connection *connection;
838         struct obd_statfs *osfs;
839         int rc, size = sizeof(*osfs);
840         ENTRY;
841
842         osc_con2cl(conn, &cl, &connection);
843         request = ptlrpc_prep_req(cl, connection, OST_STATFS, 0, NULL, NULL);
844         if (!request)
845                 RETURN(-ENOMEM);
846
847         request->rq_replen = lustre_msg_size(1, &size);
848
849         rc = ptlrpc_queue_wait(request);
850         rc = ptlrpc_check_status(request, rc);
851         if (rc)
852                 GOTO(out, rc);
853
854         osfs = lustre_msg_buf(request->rq_repmsg, 0);
855         obd_statfs_unpack(sfs, osfs);
856
857         EXIT;
858  out:
859         ptlrpc_free_req(request);
860         return 0;
861 }
862 #endif
863
864 struct obd_ops osc_obd_ops = {
865         o_setup:   osc_setup,
866         o_cleanup: osc_cleanup,
867         o_create: osc_create,
868         o_destroy: osc_destroy,
869         o_getattr: osc_getattr,
870         o_setattr: osc_setattr,
871         o_open: osc_open,
872         o_close: osc_close,
873         o_connect: osc_connect,
874         o_disconnect: osc_disconnect,
875         o_brw: osc_brw,
876         o_punch: osc_punch,
877         o_enqueue: osc_enqueue,
878         o_cancel: osc_cancel
879 };
880
881 static int __init osc_init(void)
882 {
883         return obd_register_type(&osc_obd_ops, LUSTRE_OSC_NAME);
884 }
885
886 static void __exit osc_exit(void)
887 {
888         obd_unregister_type(LUSTRE_OSC_NAME);
889 }
890
891 MODULE_AUTHOR("Peter J. Braam <braam@clusterfs.com>");
892 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC) v1.0");
893 MODULE_LICENSE("GPL");
894
895 module_init(osc_init);
896 module_exit(osc_exit);