Whamcloud - gitweb
fix a blunder in osc_connect that frees the connection before
[fs/lustre-release.git] / lustre / osc / osc_request.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Copryright (C) 2001, 2002 Cluster File Systems, Inc.
5  *
6  *  This code is issued under the GNU General Public License.
7  *  See the file COPYING in this distribution
8  *
9  *  Author Peter Braam <braam@clusterfs.com>
10  *
11  *  This server is single threaded at present (but can easily be multi
12  *  threaded). For testing and management it is treated as an
13  *  obd_device, although it does not export a full OBD method table
14  *  (the requests are coming in over the wire, so object target
15  *  modules do not have a full method table.)
16  *
17  */
18
19 #define EXPORT_SYMTAB
20 #define DEBUG_SUBSYSTEM S_OSC
21
22 #include <linux/module.h>
23 #include <linux/lustre_dlm.h>
24 #include <linux/lustre_mds.h> /* for mds_objid */
25 #include <linux/obd_ost.h>
26 #include <linux/obd_lov.h>
27
28 static void osc_obd2cl(struct obd_device *obd, struct ptlrpc_client **cl,
29                        struct ptlrpc_connection **connection)
30 {
31         struct osc_obd *osc = &obd->u.osc;
32         *cl = osc->osc_client;
33         *connection = osc->osc_conn;
34 }
35
36 static void osc_con2cl(struct lustre_handle *conn, struct ptlrpc_client **cl,
37                        struct ptlrpc_connection **connection)
38 {
39         struct osc_obd *osc = &class_conn2obd(conn)->u.osc;
40         *cl = osc->osc_client;
41         *connection = osc->osc_conn;
42 }
43
44 static void osc_con2dlmcl(struct lustre_handle *conn, struct ptlrpc_client **cl,
45                           struct ptlrpc_connection **connection)
46 {
47         struct osc_obd *osc = &class_conn2obd(conn)->u.osc;
48         *cl = osc->osc_ldlm_client;
49         *connection = osc->osc_conn;
50 }
51
52 static int osc_connect(struct lustre_handle *conn, struct obd_device *obd)
53 {
54         struct osc_obd *osc = &obd->u.osc;
55         struct obd_import *import;
56         struct ptlrpc_request *request;
57         struct ptlrpc_client *cl;
58         struct ptlrpc_connection *connection;
59         char *tmp = osc->osc_target_uuid;
60         int rc, size = sizeof(osc->osc_target_uuid);
61         ENTRY;
62
63         OBD_ALLOC(import, sizeof(*import)); 
64         if (!import)
65                 RETURN(-ENOMEM);
66                   
67         MOD_INC_USE_COUNT;
68         rc = class_connect(conn, obd);
69         if (rc)
70                 RETURN(rc); 
71
72         osc_obd2cl(obd, &cl, &connection);
73         request = ptlrpc_prep_req(osc->osc_client, osc->osc_conn, 
74                                   OST_CONNECT, 1, &size, &tmp);
75         if (!request)
76                 GOTO(out_disco, -ENOMEM);
77
78         request->rq_replen = lustre_msg_size(0, NULL);
79
80         rc = ptlrpc_queue_wait(request);
81         rc = ptlrpc_check_status(request, rc);
82         if (rc) {
83                 CERROR("%s failed: rc = %d\n", __FUNCTION__, rc);
84                 GOTO(out, rc);
85         }
86
87         /* XXX: Make this a handle */
88         osc->osc_connh.addr = request->rq_repmsg->addr;
89         osc->osc_connh.cookie = request->rq_repmsg->cookie;
90
91         EXIT;
92         return 0;
93  out:
94         ptlrpc_free_req(request);
95  out_disco:
96         class_disconnect(conn); 
97         if (rc)
98                 MOD_DEC_USE_COUNT;
99         return rc;
100 }
101
102 static int osc_disconnect(struct lustre_handle *conn)
103 {
104         struct ptlrpc_request *request;
105         struct ptlrpc_client *cl;
106         struct ptlrpc_connection *connection;
107         struct osc_obd *osc = &class_conn2obd(conn)->u.osc;
108         int rc;
109         ENTRY;
110
111         osc_con2cl(conn, &cl, &connection);
112         request = ptlrpc_prep_req2(cl, connection, &osc->osc_connh,
113                                   OST_DISCONNECT, 0, NULL, NULL);
114         if (!request)
115                 RETURN(-ENOMEM);
116         request->rq_replen = lustre_msg_size(0, NULL);
117
118         rc = ptlrpc_queue_wait(request);
119         if (rc) 
120                 GOTO(out, rc);
121         rc = class_disconnect(conn);
122         if (!rc)
123                 MOD_DEC_USE_COUNT;
124
125  out:
126         ptlrpc_free_req(request);
127         return rc;
128 }
129
130 static int osc_getattr(struct lustre_handle *conn, struct obdo *oa)
131 {
132         struct ptlrpc_request *request;
133         struct ptlrpc_client *cl;
134         struct osc_obd *osc = &class_conn2obd(conn)->u.osc;
135         struct ptlrpc_connection *connection;
136         struct ost_body *body;
137         int rc, size = sizeof(*body);
138         ENTRY;
139
140         osc_con2cl(conn, &cl, &connection);
141         request = ptlrpc_prep_req2(cl, connection, &osc->osc_connh, 
142                                    OST_GETATTR, 1, &size, NULL);
143         if (!request)
144                 RETURN(-ENOMEM);
145
146         body = lustre_msg_buf(request->rq_reqmsg, 0);
147         memcpy(&body->oa, oa, sizeof(*oa));
148         body->oa.o_valid = ~0;
149
150         request->rq_replen = lustre_msg_size(1, &size);
151
152         rc = ptlrpc_queue_wait(request);
153         rc = ptlrpc_check_status(request, rc);
154         if (rc) {
155                 CERROR("%s failed: rc = %d\n", __FUNCTION__, rc);
156                 GOTO(out, rc);
157         }
158
159         body = lustre_msg_buf(request->rq_repmsg, 0);
160         CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
161         if (oa)
162                 memcpy(oa, &body->oa, sizeof(*oa));
163
164         EXIT;
165  out:
166         ptlrpc_free_req(request);
167         return 0;
168 }
169
170 static int osc_open(struct lustre_handle *conn, struct obdo *oa)
171 {
172         struct ptlrpc_request *request;
173         struct ptlrpc_client *cl;
174         struct osc_obd *osc = &class_conn2obd(conn)->u.osc;
175         struct ptlrpc_connection *connection;
176         struct ost_body *body;
177         int rc, size = sizeof(*body);
178         ENTRY;
179
180         osc_con2cl(conn, &cl, &connection);
181         request = ptlrpc_prep_req2(cl, connection, &osc->osc_connh, 
182                                    OST_OPEN, 1, &size, NULL);
183         if (!request)
184                 RETURN(-ENOMEM);
185
186         body = lustre_msg_buf(request->rq_reqmsg, 0);
187         memcpy(&body->oa, oa, sizeof(*oa));
188         body->oa.o_valid = (OBD_MD_FLMODE | OBD_MD_FLID);
189
190         request->rq_replen = lustre_msg_size(1, &size);
191
192         rc = ptlrpc_queue_wait(request);
193         rc = ptlrpc_check_status(request, rc);
194         if (rc)
195                 GOTO(out, rc);
196
197         body = lustre_msg_buf(request->rq_repmsg, 0);
198         CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
199         if (oa)
200                 memcpy(oa, &body->oa, sizeof(*oa));
201
202         EXIT;
203  out:
204         ptlrpc_free_req(request);
205         return 0;
206 }
207
208 static int osc_close(struct lustre_handle *conn, struct obdo *oa)
209 {
210         struct ptlrpc_request *request;
211         struct ptlrpc_client *cl;
212         struct osc_obd *osc = &class_conn2obd(conn)->u.osc;
213         struct ptlrpc_connection *connection;
214         struct ost_body *body;
215         int rc, size = sizeof(*body);
216         ENTRY;
217
218         osc_con2cl(conn, &cl, &connection);
219         request = ptlrpc_prep_req2(cl, connection, &osc->osc_connh, 
220                                    OST_CLOSE, 1, &size, NULL);
221         if (!request)
222                 RETURN(-ENOMEM);
223
224         body = lustre_msg_buf(request->rq_reqmsg, 0);
225         memcpy(&body->oa, oa, sizeof(*oa));
226
227         request->rq_replen = lustre_msg_size(1, &size);
228
229         rc = ptlrpc_queue_wait(request);
230         rc = ptlrpc_check_status(request, rc);
231         if (rc)
232                 GOTO(out, rc);
233
234         body = lustre_msg_buf(request->rq_repmsg, 0);
235         CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
236         if (oa)
237                 memcpy(oa, &body->oa, sizeof(*oa));
238
239         EXIT;
240  out:
241         ptlrpc_free_req(request);
242         return 0;
243 }
244
245 static int osc_setattr(struct lustre_handle *conn, struct obdo *oa)
246 {
247         struct ptlrpc_request *request;
248         struct ptlrpc_client *cl;
249         struct osc_obd *osc = &class_conn2obd(conn)->u.osc;
250         struct ptlrpc_connection *connection;
251         struct ost_body *body;
252         int rc, size = sizeof(*body);
253         ENTRY;
254
255         osc_con2cl(conn, &cl, &connection);
256         request = ptlrpc_prep_req2(cl, connection, &osc->osc_connh, 
257                                   OST_SETATTR, 1, &size, NULL);
258         if (!request)
259                 RETURN(-ENOMEM);
260
261         body = lustre_msg_buf(request->rq_reqmsg, 0);
262         memcpy(&body->oa, oa, sizeof(*oa));
263
264         request->rq_replen = lustre_msg_size(1, &size);
265
266         rc = ptlrpc_queue_wait(request);
267         rc = ptlrpc_check_status(request, rc);
268         GOTO(out, rc);
269
270  out:
271         ptlrpc_free_req(request);
272         return 0;
273 }
274
275 static int osc_create(struct lustre_handle *conn, struct obdo *oa)
276 {
277         struct ptlrpc_request *request;
278         struct ptlrpc_client *cl;
279         struct ptlrpc_connection *connection;
280         struct osc_obd *osc = &class_conn2obd(conn)->u.osc;
281         struct ost_body *body;
282         struct mds_objid *objid;
283         struct lov_object_id *lov_id;
284         int rc, size = sizeof(*body);
285         ENTRY;
286
287         if (!oa) {
288                 CERROR("oa NULL\n");
289                 RETURN(-EINVAL);
290         }
291         osc_con2cl(conn, &cl, &connection);
292         request = ptlrpc_prep_req2(cl, connection, &osc->osc_connh, 
293                                   OST_CREATE, 1, &size, NULL);
294         if (!request)
295                 RETURN(-ENOMEM);
296
297         body = lustre_msg_buf(request->rq_reqmsg, 0);
298         memcpy(&body->oa, oa, sizeof(*oa));
299
300         request->rq_replen = lustre_msg_size(1, &size);
301
302         rc = ptlrpc_queue_wait(request);
303         rc = ptlrpc_check_status(request, rc);
304         if (rc)
305                 GOTO(out, rc);
306
307         body = lustre_msg_buf(request->rq_repmsg, 0);
308         memcpy(oa, &body->oa, sizeof(*oa));
309
310         memset(oa->o_inline, 0, sizeof(oa->o_inline));
311         objid = (struct mds_objid *)oa->o_inline;
312         objid->mo_lov_md.lmd_object_id = oa->o_id;
313         objid->mo_lov_md.lmd_stripe_count = 1;
314         lov_id = (struct lov_object_id *)(oa->o_inline + sizeof(*objid));
315         lov_id->l_device_id = 0;
316         lov_id->l_object_id = oa->o_id;
317
318         EXIT;
319  out:
320         ptlrpc_free_req(request);
321         return 0;
322 }
323
324 static int osc_punch(struct lustre_handle *conn, struct obdo *oa, obd_size count,
325                      obd_off offset)
326 {
327         struct ptlrpc_request *request;
328         struct ptlrpc_client *cl;
329         struct ptlrpc_connection *connection;
330         struct osc_obd *osc = &class_conn2obd(conn)->u.osc;
331         struct ost_body *body;
332         int rc, size = sizeof(*body);
333         ENTRY;
334
335         if (!oa) {
336                 CERROR("oa NULL\n");
337                 RETURN(-EINVAL);
338         }
339         osc_con2cl(conn, &cl, &connection);
340         request = ptlrpc_prep_req2(cl, connection, &osc->osc_connh, 
341                                    OST_PUNCH, 1, &size, NULL);
342         if (!request)
343                 RETURN(-ENOMEM);
344
345         body = lustre_msg_buf(request->rq_reqmsg, 0);
346         memcpy(&body->oa, oa, sizeof(*oa));
347         body->oa.o_blocks = count;
348         body->oa.o_valid |= OBD_MD_FLBLOCKS;
349
350         request->rq_replen = lustre_msg_size(1, &size);
351
352         rc = ptlrpc_queue_wait(request);
353         rc = ptlrpc_check_status(request, rc);
354         if (rc)
355                 GOTO(out, rc);
356
357         body = lustre_msg_buf(request->rq_repmsg, 0);
358         memcpy(oa, &body->oa, sizeof(*oa));
359
360         EXIT;
361  out:
362         ptlrpc_free_req(request);
363         return 0;
364 }
365
366 static int osc_destroy(struct lustre_handle *conn, struct obdo *oa)
367 {
368         struct ptlrpc_request *request;
369         struct ptlrpc_client *cl;
370         struct ptlrpc_connection *connection;
371         struct osc_obd *osc = &class_conn2obd(conn)->u.osc;
372         struct ost_body *body;
373         int rc, size = sizeof(*body);
374         ENTRY;
375
376         if (!oa) {
377                 CERROR("oa NULL\n");
378                 RETURN(-EINVAL);
379         }
380         osc_con2cl(conn, &cl, &connection);
381         request = ptlrpc_prep_req2(cl, connection, &osc->osc_connh, 
382                                    OST_DESTROY, 1, &size, NULL);
383         if (!request)
384                 RETURN(-ENOMEM);
385
386         body = lustre_msg_buf(request->rq_reqmsg, 0);
387         memcpy(&body->oa, oa, sizeof(*oa));
388         body->oa.o_valid = ~0;
389
390         request->rq_replen = lustre_msg_size(1, &size);
391
392         rc = ptlrpc_queue_wait(request);
393         rc = ptlrpc_check_status(request, rc);
394         if (rc)
395                 GOTO(out, rc);
396
397         body = lustre_msg_buf(request->rq_repmsg, 0);
398         memcpy(oa, &body->oa, sizeof(*oa));
399
400         EXIT;
401  out:
402         ptlrpc_free_req(request);
403         return 0;
404 }
405
406 struct osc_brw_cb_data {
407         struct page **buf;
408         struct ptlrpc_request *req;
409         bulk_callback_t callback;
410         void *cb_data;
411 };
412
413 static void brw_read_finish(struct ptlrpc_bulk_desc *desc, void *data)
414 {
415         struct osc_brw_cb_data *cb_data = data;
416
417         if (desc->b_flags & PTL_RPC_FL_INTR)
418                 CERROR("got signal\n");
419
420         (cb_data->callback)(desc, cb_data->cb_data);
421
422         ptlrpc_free_bulk(desc);
423         ptlrpc_free_req(cb_data->req);
424
425         OBD_FREE(cb_data, sizeof(*cb_data));
426 }
427
428 static int osc_brw_read(struct lustre_handle *conn, obd_count num_oa,
429                         struct obdo **oa, obd_count *oa_bufs, struct page **buf,
430                         obd_size *count, obd_off *offset, obd_flag *flags,
431                         bulk_callback_t callback)
432 {
433         struct ptlrpc_client *cl;
434         struct ptlrpc_connection *connection;
435         struct osc_obd *osc = &class_conn2obd(conn)->u.osc;
436         struct ptlrpc_request *request;
437         struct ost_body *body;
438         struct list_head *tmp;
439         int pages, rc, i, j, size[3] = {sizeof(*body)};
440         void *ptr1, *ptr2;
441         struct ptlrpc_bulk_desc *desc;
442         ENTRY;
443
444         size[1] = num_oa * sizeof(struct obd_ioobj);
445         pages = 0;
446         for (i = 0; i < num_oa; i++)
447                 pages += oa_bufs[i];
448         size[2] = pages * sizeof(struct niobuf_remote);
449
450         osc_con2cl(conn, &cl, &connection);
451         request = ptlrpc_prep_req2(cl, connection, &osc->osc_connh, 
452                                   OST_BRW, 3, size, NULL);
453         if (!request)
454                 GOTO(out, rc = -ENOMEM);
455
456         body = lustre_msg_buf(request->rq_reqmsg, 0);
457         body->data = OBD_BRW_READ;
458
459         desc = ptlrpc_prep_bulk(connection);
460         if (!desc)
461                 GOTO(out2, rc = -ENOMEM);
462         desc->b_portal = OST_BULK_PORTAL;
463
464         ptr1 = lustre_msg_buf(request->rq_reqmsg, 1);
465         ptr2 = lustre_msg_buf(request->rq_reqmsg, 2);
466         for (pages = 0, i = 0; i < num_oa; i++) {
467                 ost_pack_ioo(&ptr1, oa[i], oa_bufs[i]);
468                 /* FIXME: this inner loop is wrong for multiple OAs */
469                 for (j = 0; j < oa_bufs[i]; j++, pages++) {
470                         struct ptlrpc_bulk_page *bulk;
471                         bulk = ptlrpc_prep_bulk_page(desc);
472                         if (bulk == NULL)
473                                 GOTO(out3, rc = -ENOMEM);
474
475                         spin_lock(&connection->c_lock);
476                         bulk->b_xid = ++connection->c_xid_out;
477                         spin_unlock(&connection->c_lock);
478
479                         bulk->b_buf = kmap(buf[pages]);
480                         bulk->b_page = buf[pages];
481                         bulk->b_buflen = PAGE_SIZE;
482                         ost_pack_niobuf(&ptr2, offset[pages], count[pages],
483                                         flags[pages], bulk->b_xid);
484                 }
485         }
486
487         rc = ptlrpc_register_bulk(desc);
488         if (rc)
489                 GOTO(out3, rc);
490
491         request->rq_replen = lustre_msg_size(1, size);
492         rc = ptlrpc_queue_wait(request);
493         rc = ptlrpc_check_status(request, rc);
494         if (rc)
495                 ptlrpc_abort_bulk(desc);
496         GOTO(out3, rc);
497
498  out3:
499         list_for_each(tmp, &desc->b_page_list) {
500                 struct ptlrpc_bulk_page *bulk;
501                 bulk = list_entry(tmp, struct ptlrpc_bulk_page, b_link);
502                 if (bulk->b_page != NULL)
503                         kunmap(bulk->b_page);
504         }
505         ptlrpc_free_bulk(desc);
506  out2:
507         ptlrpc_free_req(request);
508  out:
509         return rc;
510 }
511
512 static void brw_write_finish(struct ptlrpc_bulk_desc *desc, void *data)
513 {
514         struct osc_brw_cb_data *cb_data = data;
515         int i;
516         ENTRY;
517
518         if (desc->b_flags & PTL_RPC_FL_INTR)
519                 CERROR("got signal\n");
520
521         for (i = 0; i < desc->b_page_count; i++)
522                 kunmap(cb_data->buf[i]);
523
524         (cb_data->callback)(desc, cb_data->cb_data);
525
526         ptlrpc_free_bulk(desc);
527         ptlrpc_free_req(cb_data->req);
528
529         OBD_FREE(cb_data, sizeof(*cb_data));
530         EXIT;
531 }
532
533 static int osc_brw_write(struct lustre_handle *conn, obd_count num_oa,
534                          struct obdo **oa, obd_count *oa_bufs,
535                          struct page **pagearray, obd_size *count,
536                          obd_off *offset, obd_flag *flags,
537                          bulk_callback_t callback)
538 {
539         struct ptlrpc_client *cl;
540         struct ptlrpc_connection *connection;
541         struct ptlrpc_request *request;
542         struct ptlrpc_bulk_desc *desc;
543         struct osc_obd *osc = &class_conn2obd(conn)->u.osc;
544         struct obd_ioobj ioo;
545         struct ost_body *body;
546         struct niobuf_local *local;
547         struct niobuf_remote *remote;
548         struct osc_brw_cb_data *cb_data;
549         long pages;
550         int rc, i, j, size[3] = {sizeof(*body)};
551         void *ptr1, *ptr2;
552         ENTRY;
553
554         size[1] = num_oa * sizeof(ioo);
555         pages = 0;
556         for (i = 0; i < num_oa; i++)
557                 pages += oa_bufs[i];
558         size[2] = pages * sizeof(*remote);
559
560         OBD_ALLOC(local, pages * sizeof(*local));
561         if (local == NULL)
562                 RETURN(-ENOMEM);
563
564         osc_con2cl(conn, &cl, &connection);
565         request = ptlrpc_prep_req2(cl, connection, &osc->osc_connh, 
566                                   OST_BRW, 3, size, NULL);
567         if (!request)
568                 GOTO(out, rc = -ENOMEM);
569         body = lustre_msg_buf(request->rq_reqmsg, 0);
570         body->data = OBD_BRW_WRITE;
571
572         ptr1 = lustre_msg_buf(request->rq_reqmsg, 1);
573         ptr2 = lustre_msg_buf(request->rq_reqmsg, 2);
574         for (pages = 0, i = 0; i < num_oa; i++) {
575                 ost_pack_ioo(&ptr1, oa[i], oa_bufs[i]);
576                 for (j = 0; j < oa_bufs[i]; j++, pages++) {
577                         local[pages].addr = kmap(pagearray[pages]);
578                         local[pages].offset = offset[pages];
579                         local[pages].len = count[pages];
580                         ost_pack_niobuf(&ptr2, offset[pages], count[pages],
581                                         flags[pages], 0);
582                 }
583         }
584
585         size[1] = pages * sizeof(struct niobuf_remote);
586         request->rq_replen = lustre_msg_size(2, size);
587
588         rc = ptlrpc_queue_wait(request);
589         rc = ptlrpc_check_status(request, rc);
590         if (rc)
591                 GOTO(out2, rc);
592
593         ptr2 = lustre_msg_buf(request->rq_repmsg, 1);
594         if (ptr2 == NULL)
595                 GOTO(out2, rc = -EINVAL);
596
597         if (request->rq_repmsg->buflens[1] !=
598             pages * sizeof(struct niobuf_remote)) {
599                 CERROR("buffer length wrong (%d vs. %ld)\n",
600                        request->rq_repmsg->buflens[1],
601                        pages * sizeof(struct niobuf_remote));
602                 GOTO(out2, rc = -EINVAL);
603         }
604
605         desc = ptlrpc_prep_bulk(connection);
606         desc->b_portal = OSC_BULK_PORTAL;
607         if (callback) {
608                 desc->b_cb = brw_write_finish;
609                 OBD_ALLOC(cb_data, sizeof(*cb_data));
610                 cb_data->buf = pagearray;
611                 cb_data->callback = callback;
612                 desc->b_cb_data = cb_data;
613         }
614
615         for (pages = 0, i = 0; i < num_oa; i++) {
616                 for (j = 0; j < oa_bufs[i]; j++, pages++) {
617                         struct ptlrpc_bulk_page *page;
618
619                         ost_unpack_niobuf(&ptr2, &remote);
620
621                         page = ptlrpc_prep_bulk_page(desc);
622                         if (page == NULL)
623                                 GOTO(out3, rc = -ENOMEM);
624
625                         page->b_buf = (void *)(unsigned long)local[pages].addr;
626                         page->b_buflen = local[pages].len;
627                         page->b_xid = remote->xid;
628                 }
629         }
630
631         if (desc->b_page_count != pages)
632                 LBUG();
633
634         rc = ptlrpc_send_bulk(desc);
635         if (rc)
636                 GOTO(out3, rc);
637         if (callback)
638                 GOTO(out, rc);
639
640         /* If there's no callback function, sleep here until complete. */
641         wait_event_interruptible(desc->b_waitq, ptlrpc_check_bulk_sent(desc));
642         if (desc->b_flags & PTL_RPC_FL_INTR)
643                 rc = -EINTR;
644
645         GOTO(out3, rc);
646
647  out3:
648         ptlrpc_free_bulk(desc);
649  out2:
650         ptlrpc_free_req(request);
651         for (pages = 0, i = 0; i < num_oa; i++)
652                 for (j = 0; j < oa_bufs[i]; j++, pages++)
653                         kunmap(pagearray[pages]);
654  out:
655         OBD_FREE(local, pages * sizeof(*local));
656
657         return rc;
658 }
659
660 static int osc_brw(int cmd, struct lustre_handle *conn, obd_count num_oa,
661                    struct obdo **oa, obd_count *oa_bufs, struct page **buf,
662                    obd_size *count, obd_off *offset, obd_flag *flags,
663                    void *callback)
664 {
665         if (num_oa != 1)
666                 LBUG();
667
668         if (cmd & OBD_BRW_WRITE)
669                 return osc_brw_write(conn, num_oa, oa, oa_bufs, buf, count,
670                                      offset, flags, (bulk_callback_t)callback);
671         else
672                 return osc_brw_read(conn, num_oa, oa, oa_bufs, buf, count,
673                                     offset, flags, (bulk_callback_t)callback);
674 }
675
676 static int osc_enqueue(struct lustre_handle *oconn,
677                        struct lustre_handle *parent_lock, __u64 *res_id,
678                        __u32 type, void *extentp, int extent_len, __u32 mode,
679                        int *flags, void *callback, void *data, int datalen,
680                        struct lustre_handle *lockh)
681 {
682         struct obd_device *obddev = class_conn2obd(oconn);
683         struct osc_obd *osc = &obddev->u.osc;
684         struct ptlrpc_connection *conn;
685         struct ptlrpc_client *cl;
686         struct ldlm_extent *extent = extentp;
687         int rc;
688         __u32 mode2;
689
690         /* Filesystem locks are given a bit of special treatment: first we
691          * fixup the lock to start and end on page boundaries. */
692         extent->start &= PAGE_MASK;
693         extent->end = (extent->end + PAGE_SIZE - 1) & PAGE_MASK;
694
695         /* Next, search for already existing extent locks that will cover us */
696         osc_con2dlmcl(oconn, &cl, &conn);
697         rc = ldlm_lock_match(obddev->obd_namespace, res_id, type, extent,
698                              sizeof(extent), mode, lockh);
699         if (rc == 1) {
700                 /* We already have a lock, and it's referenced */
701                 return 0;
702         }
703
704         /* Next, search for locks that we can upgrade (if we're trying to write)
705          * or are more than we need (if we're trying to read).  Because the VFS
706          * and page cache already protect us locally, lots of readers/writers
707          * can share a single PW lock. */
708         if (mode == LCK_PW)
709                 mode2 = LCK_PR;
710         else
711                 mode2 = LCK_PW;
712
713         rc = ldlm_lock_match(obddev->obd_namespace, res_id, type, extent,
714                              sizeof(extent), mode2, lockh);
715         if (rc == 1) {
716                 int flags;
717                 /* FIXME: This is not incredibly elegant, but it might
718                  * be more elegant than adding another parameter to
719                  * lock_match.  I want a second opinion. */
720                 ldlm_lock_addref(lockh, mode);
721                 ldlm_lock_decref(lockh, mode2);
722
723                 if (mode == LCK_PR)
724                         return 0;
725
726                 rc = ldlm_cli_convert(cl, lockh, &osc->osc_connh, 
727                                       mode, &flags);
728                 if (rc)
729                         LBUG();
730
731                 return rc;
732         }
733
734         rc = ldlm_cli_enqueue(cl, conn, &osc->osc_connh, 
735                               NULL, obddev->obd_namespace,
736                               parent_lock, res_id, type, extent, sizeof(extent),
737                               mode, flags, callback, data, datalen, lockh);
738         return rc;
739 }
740
741 static int osc_cancel(struct lustre_handle *oconn, __u32 mode,
742                       struct lustre_handle *lockh)
743 {
744         ENTRY;
745
746         ldlm_lock_decref(lockh, mode);
747
748         RETURN(0);
749 }
750
751 static int osc_setup(struct obd_device *obddev, obd_count len, void *buf)
752 {
753         struct obd_ioctl_data* data = buf;
754         struct osc_obd *osc = &obddev->u.osc;
755         char server_uuid[37];
756         int rc;
757         ENTRY;
758
759         if (data->ioc_inllen1 < 1) {
760                 CERROR("osc setup requires a TARGET UUID\n");
761                 RETURN(-EINVAL);
762         }
763
764         if (data->ioc_inllen1 > 37) {
765                 CERROR("osc TARGET UUID must be less than 38 characters\n");
766                 RETURN(-EINVAL);
767         }
768
769         if (data->ioc_inllen2 < 1) {
770                 CERROR("osc setup requires a SERVER UUID\n");
771                 RETURN(-EINVAL);
772         }
773
774         if (data->ioc_inllen2 > 37) {
775                 CERROR("osc SERVER UUID must be less than 38 characters\n");
776                 RETURN(-EINVAL);
777         }
778
779         memcpy(osc->osc_target_uuid, data->ioc_inlbuf1, data->ioc_inllen1);
780         memcpy(server_uuid, data->ioc_inlbuf2, MIN(data->ioc_inllen2,
781                                                    sizeof(server_uuid)));
782
783         osc->osc_conn = ptlrpc_uuid_to_connection(server_uuid);
784         if (!osc->osc_conn)
785                 RETURN(-ENOENT);
786
787         obddev->obd_namespace =
788                 ldlm_namespace_new("osc", LDLM_NAMESPACE_CLIENT);
789         if (obddev->obd_namespace == NULL)
790                 GOTO(out_conn, rc = -ENOMEM);
791
792         OBD_ALLOC(osc->osc_client, sizeof(*osc->osc_client));
793         if (osc->osc_client == NULL)
794                 GOTO(out_ns, rc = -ENOMEM);
795
796         OBD_ALLOC(osc->osc_ldlm_client, sizeof(*osc->osc_ldlm_client));
797         if (osc->osc_ldlm_client == NULL)
798                 GOTO(out_client, rc = -ENOMEM);
799
800         ptlrpc_init_client(NULL, NULL, OST_REQUEST_PORTAL, OSC_REPLY_PORTAL,
801                            osc->osc_client);
802         ptlrpc_init_client(NULL, NULL, LDLM_REQUEST_PORTAL, LDLM_REPLY_PORTAL,
803                            osc->osc_ldlm_client);
804         osc->osc_client->cli_name = "osc";
805         osc->osc_ldlm_client->cli_name = "ldlm";
806
807         MOD_INC_USE_COUNT;
808         RETURN(0);
809
810  out_client:
811         OBD_FREE(osc->osc_client, sizeof(*osc->osc_client));
812  out_ns:
813         ldlm_namespace_free(obddev->obd_namespace);
814  out_conn:
815         ptlrpc_put_connection(osc->osc_conn);
816         return rc;
817 }
818
819 static int osc_cleanup(struct obd_device * obddev)
820 {
821         struct osc_obd *osc = &obddev->u.osc;
822
823         ldlm_namespace_free(obddev->obd_namespace);
824
825         ptlrpc_cleanup_client(osc->osc_client);
826         OBD_FREE(osc->osc_client, sizeof(*osc->osc_client));
827         ptlrpc_cleanup_client(osc->osc_ldlm_client);
828         OBD_FREE(osc->osc_ldlm_client, sizeof(*osc->osc_ldlm_client));
829         ptlrpc_put_connection(osc->osc_conn);
830
831         MOD_DEC_USE_COUNT;
832         return 0;
833 }
834
835 #if 0
836 static int osc_statfs(struct lustre_handle *conn, struct statfs *sfs);
837 {
838         struct ptlrpc_request *request;
839         struct ptlrpc_client *cl;
840         struct ptlrpc_connection *connection;
841         struct obd_statfs *osfs;
842         int rc, size = sizeof(*osfs);
843         ENTRY;
844
845         osc_con2cl(conn, &cl, &connection);
846         request = ptlrpc_prep_req(cl, connection, OST_STATFS, 0, NULL, NULL);
847         if (!request)
848                 RETURN(-ENOMEM);
849
850         request->rq_replen = lustre_msg_size(1, &size);
851
852         rc = ptlrpc_queue_wait(request);
853         rc = ptlrpc_check_status(request, rc);
854         if (rc)
855                 GOTO(out, rc);
856
857         osfs = lustre_msg_buf(request->rq_repmsg, 0);
858         obd_statfs_unpack(sfs, osfs);
859
860         EXIT;
861  out:
862         ptlrpc_free_req(request);
863         return 0;
864 }
865 #endif
866
867 struct obd_ops osc_obd_ops = {
868         o_setup:   osc_setup,
869         o_cleanup: osc_cleanup,
870         o_create: osc_create,
871         o_destroy: osc_destroy,
872         o_getattr: osc_getattr,
873         o_setattr: osc_setattr,
874         o_open: osc_open,
875         o_close: osc_close,
876         o_connect: osc_connect,
877         o_disconnect: osc_disconnect,
878         o_brw: osc_brw,
879         o_punch: osc_punch,
880         o_enqueue: osc_enqueue,
881         o_cancel: osc_cancel
882 };
883
884 static int __init osc_init(void)
885 {
886         return class_register_type(&osc_obd_ops, LUSTRE_OSC_NAME);
887 }
888
889 static void __exit osc_exit(void)
890 {
891         class_unregister_type(LUSTRE_OSC_NAME);
892 }
893
894 MODULE_AUTHOR("Peter J. Braam <braam@clusterfs.com>");
895 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC) v1.0");
896 MODULE_LICENSE("GPL");
897
898 module_init(osc_init);
899 module_exit(osc_exit);