Whamcloud - gitweb
- Use refcounting to control lifetime of bulk descriptors.
[fs/lustre-release.git] / lustre / osc / osc_request.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Copyright (C) 2001, 2002 Cluster File Systems, Inc.
5  *
6  *  This code is issued under the GNU General Public License.
7  *  See the file COPYING in this distribution
8  *
9  *  Author Peter Braam <braam@clusterfs.com>
10  *
11  *  This server is single threaded at present (but can easily be multi
12  *  threaded). For testing and management it is treated as an
13  *  obd_device, although it does not export a full OBD method table
14  *  (the requests are coming in over the wire, so object target
15  *  modules do not have a full method table.)
16  *
17  */
18
19 #define EXPORT_SYMTAB
20 #define DEBUG_SUBSYSTEM S_OSC
21
22 #include <linux/module.h>
23 #include <linux/lustre_dlm.h>
24 #include <linux/lustre_mds.h> /* for mds_objid */
25 #include <linux/obd_ost.h>
26 #include <linux/obd_lov.h>
27
28 static void osc_con2cl(struct lustre_handle *conn, struct ptlrpc_client **cl,
29                        struct ptlrpc_connection **connection)
30 {
31         struct osc_obd *osc = &class_conn2obd(conn)->u.osc;
32         *cl = osc->osc_client;
33         *connection = osc->osc_conn;
34 }
35
36 static void osc_con2dlmcl(struct lustre_handle *conn, struct ptlrpc_client **cl,
37                           struct ptlrpc_connection **connection)
38 {
39         struct osc_obd *osc = &class_conn2obd(conn)->u.osc;
40         *cl = osc->osc_ldlm_client;
41         *connection = osc->osc_conn;
42 }
43
44 static int osc_connect(struct lustre_handle *conn, struct obd_device *obd)
45 {
46         struct osc_obd *osc = &obd->u.osc;
47         struct obd_import *import;
48         struct ptlrpc_request *request;
49         char *tmp = osc->osc_target_uuid;
50         int rc, size = sizeof(osc->osc_target_uuid);
51         ENTRY;
52
53         OBD_ALLOC(import, sizeof(*import)); 
54         if (!import)
55                 RETURN(-ENOMEM);
56                   
57         MOD_INC_USE_COUNT;
58         rc = class_connect(conn, obd);
59         if (rc)
60                 RETURN(rc); 
61
62         request = ptlrpc_prep_req(osc->osc_client, osc->osc_conn, 
63                                   OST_CONNECT, 1, &size, &tmp);
64         if (!request)
65                 GOTO(out_disco, rc = -ENOMEM);
66
67         request->rq_level = LUSTRE_CONN_NEW;
68         request->rq_replen = lustre_msg_size(0, NULL);
69
70         rc = ptlrpc_queue_wait(request);
71         rc = ptlrpc_check_status(request, rc);
72         if (rc) {
73                 CERROR("%s failed: rc = %d\n", __FUNCTION__, rc);
74                 GOTO(out, rc);
75         }
76
77         /* XXX: Make this a handle */
78         osc->osc_connh.addr = request->rq_repmsg->addr;
79         osc->osc_connh.cookie = request->rq_repmsg->cookie;
80
81         EXIT;
82  out:
83         ptlrpc_free_req(request);
84  out_disco:
85         if (rc) {
86                 class_disconnect(conn);
87                 MOD_DEC_USE_COUNT;
88         }
89         return rc;
90 }
91
92 static int osc_disconnect(struct lustre_handle *conn)
93 {
94         struct ptlrpc_request *request;
95         struct ptlrpc_client *cl;
96         struct ptlrpc_connection *connection;
97         struct osc_obd *osc = &class_conn2obd(conn)->u.osc;
98         int rc;
99         ENTRY;
100
101         osc_con2cl(conn, &cl, &connection);
102         request = ptlrpc_prep_req2(cl, connection, &osc->osc_connh,
103                                   OST_DISCONNECT, 0, NULL, NULL);
104         if (!request)
105                 RETURN(-ENOMEM);
106         request->rq_replen = lustre_msg_size(0, NULL);
107
108         rc = ptlrpc_queue_wait(request);
109         if (rc) 
110                 GOTO(out, rc);
111         rc = class_disconnect(conn);
112         if (!rc)
113                 MOD_DEC_USE_COUNT;
114
115  out:
116         ptlrpc_free_req(request);
117         return rc;
118 }
119
120 static int osc_getattr(struct lustre_handle *conn, struct obdo *oa)
121 {
122         struct ptlrpc_request *request;
123         struct ptlrpc_client *cl;
124         struct osc_obd *osc = &class_conn2obd(conn)->u.osc;
125         struct ptlrpc_connection *connection;
126         struct ost_body *body;
127         int rc, size = sizeof(*body);
128         ENTRY;
129
130         osc_con2cl(conn, &cl, &connection);
131         request = ptlrpc_prep_req2(cl, connection, &osc->osc_connh, 
132                                    OST_GETATTR, 1, &size, NULL);
133         if (!request)
134                 RETURN(-ENOMEM);
135
136         body = lustre_msg_buf(request->rq_reqmsg, 0);
137         memcpy(&body->oa, oa, sizeof(*oa));
138         body->oa.o_valid = ~0;
139
140         request->rq_replen = lustre_msg_size(1, &size);
141
142         rc = ptlrpc_queue_wait(request);
143         rc = ptlrpc_check_status(request, rc);
144         if (rc) {
145                 CERROR("%s failed: rc = %d\n", __FUNCTION__, rc);
146                 GOTO(out, rc);
147         }
148
149         body = lustre_msg_buf(request->rq_repmsg, 0);
150         CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
151         if (oa)
152                 memcpy(oa, &body->oa, sizeof(*oa));
153
154         EXIT;
155  out:
156         ptlrpc_free_req(request);
157         return rc;
158 }
159
160 static int osc_open(struct lustre_handle *conn, struct obdo *oa)
161 {
162         struct ptlrpc_request *request;
163         struct ptlrpc_client *cl;
164         struct osc_obd *osc = &class_conn2obd(conn)->u.osc;
165         struct ptlrpc_connection *connection;
166         struct ost_body *body;
167         int rc, size = sizeof(*body);
168         ENTRY;
169
170         osc_con2cl(conn, &cl, &connection);
171         request = ptlrpc_prep_req2(cl, connection, &osc->osc_connh, 
172                                    OST_OPEN, 1, &size, NULL);
173         if (!request)
174                 RETURN(-ENOMEM);
175
176         body = lustre_msg_buf(request->rq_reqmsg, 0);
177         memcpy(&body->oa, oa, sizeof(*oa));
178         body->oa.o_valid = (OBD_MD_FLMODE | OBD_MD_FLID);
179
180         request->rq_replen = lustre_msg_size(1, &size);
181
182         rc = ptlrpc_queue_wait(request);
183         rc = ptlrpc_check_status(request, rc);
184         if (rc)
185                 GOTO(out, rc);
186
187         body = lustre_msg_buf(request->rq_repmsg, 0);
188         CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
189         if (oa)
190                 memcpy(oa, &body->oa, sizeof(*oa));
191
192         EXIT;
193  out:
194         ptlrpc_free_req(request);
195         return rc;
196 }
197
198 static int osc_close(struct lustre_handle *conn, struct obdo *oa)
199 {
200         struct ptlrpc_request *request;
201         struct ptlrpc_client *cl;
202         struct osc_obd *osc = &class_conn2obd(conn)->u.osc;
203         struct ptlrpc_connection *connection;
204         struct ost_body *body;
205         int rc, size = sizeof(*body);
206         ENTRY;
207
208         osc_con2cl(conn, &cl, &connection);
209         request = ptlrpc_prep_req2(cl, connection, &osc->osc_connh, 
210                                    OST_CLOSE, 1, &size, NULL);
211         if (!request)
212                 RETURN(-ENOMEM);
213
214         body = lustre_msg_buf(request->rq_reqmsg, 0);
215         memcpy(&body->oa, oa, sizeof(*oa));
216
217         request->rq_replen = lustre_msg_size(1, &size);
218
219         rc = ptlrpc_queue_wait(request);
220         rc = ptlrpc_check_status(request, rc);
221         if (rc)
222                 GOTO(out, rc);
223
224         body = lustre_msg_buf(request->rq_repmsg, 0);
225         CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
226         if (oa)
227                 memcpy(oa, &body->oa, sizeof(*oa));
228
229         EXIT;
230  out:
231         ptlrpc_free_req(request);
232         return rc;
233 }
234
235 static int osc_setattr(struct lustre_handle *conn, struct obdo *oa)
236 {
237         struct ptlrpc_request *request;
238         struct ptlrpc_client *cl;
239         struct osc_obd *osc = &class_conn2obd(conn)->u.osc;
240         struct ptlrpc_connection *connection;
241         struct ost_body *body;
242         int rc, size = sizeof(*body);
243         ENTRY;
244
245         osc_con2cl(conn, &cl, &connection);
246         request = ptlrpc_prep_req2(cl, connection, &osc->osc_connh, 
247                                   OST_SETATTR, 1, &size, NULL);
248         if (!request)
249                 RETURN(-ENOMEM);
250
251         body = lustre_msg_buf(request->rq_reqmsg, 0);
252         memcpy(&body->oa, oa, sizeof(*oa));
253
254         request->rq_replen = lustre_msg_size(1, &size);
255
256         rc = ptlrpc_queue_wait(request);
257         rc = ptlrpc_check_status(request, rc);
258         GOTO(out, rc);
259
260  out:
261         ptlrpc_free_req(request);
262         return rc;
263 }
264
265 static int osc_create(struct lustre_handle *conn, struct obdo *oa)
266 {
267         struct ptlrpc_request *request;
268         struct ptlrpc_client *cl;
269         struct ptlrpc_connection *connection;
270         struct osc_obd *osc = &class_conn2obd(conn)->u.osc;
271         struct ost_body *body;
272         struct mds_objid *objid;
273         struct lov_object_id *lov_id;
274         int rc, size = sizeof(*body);
275         ENTRY;
276
277         if (!oa) {
278                 CERROR("oa NULL\n");
279                 RETURN(-EINVAL);
280         }
281         osc_con2cl(conn, &cl, &connection);
282         request = ptlrpc_prep_req2(cl, connection, &osc->osc_connh, 
283                                   OST_CREATE, 1, &size, NULL);
284         if (!request)
285                 RETURN(-ENOMEM);
286
287         body = lustre_msg_buf(request->rq_reqmsg, 0);
288         memcpy(&body->oa, oa, sizeof(*oa));
289
290         request->rq_replen = lustre_msg_size(1, &size);
291
292         rc = ptlrpc_queue_wait(request);
293         rc = ptlrpc_check_status(request, rc);
294         if (rc)
295                 GOTO(out, rc);
296
297         body = lustre_msg_buf(request->rq_repmsg, 0);
298         memcpy(oa, &body->oa, sizeof(*oa));
299
300         memset(oa->o_inline, 0, sizeof(oa->o_inline));
301         objid = (struct mds_objid *)oa->o_inline;
302         objid->mo_lov_md.lmd_object_id = oa->o_id;
303         objid->mo_lov_md.lmd_stripe_count = 1;
304         lov_id = (struct lov_object_id *)(oa->o_inline + sizeof(*objid));
305         lov_id->l_device_id = 0;
306         lov_id->l_object_id = oa->o_id;
307
308         EXIT;
309  out:
310         ptlrpc_free_req(request);
311         return rc;
312 }
313
314 static int osc_punch(struct lustre_handle *conn, struct obdo *oa, obd_size count,
315                      obd_off offset)
316 {
317         struct ptlrpc_request *request;
318         struct ptlrpc_client *cl;
319         struct ptlrpc_connection *connection;
320         struct osc_obd *osc = &class_conn2obd(conn)->u.osc;
321         struct ost_body *body;
322         int rc, size = sizeof(*body);
323         ENTRY;
324
325         if (!oa) {
326                 CERROR("oa NULL\n");
327                 RETURN(-EINVAL);
328         }
329         osc_con2cl(conn, &cl, &connection);
330         request = ptlrpc_prep_req2(cl, connection, &osc->osc_connh, 
331                                    OST_PUNCH, 1, &size, NULL);
332         if (!request)
333                 RETURN(-ENOMEM);
334
335         body = lustre_msg_buf(request->rq_reqmsg, 0);
336         memcpy(&body->oa, oa, sizeof(*oa));
337         body->oa.o_blocks = count;
338         body->oa.o_valid |= OBD_MD_FLBLOCKS;
339
340         request->rq_replen = lustre_msg_size(1, &size);
341
342         rc = ptlrpc_queue_wait(request);
343         rc = ptlrpc_check_status(request, rc);
344         if (rc)
345                 GOTO(out, rc);
346
347         body = lustre_msg_buf(request->rq_repmsg, 0);
348         memcpy(oa, &body->oa, sizeof(*oa));
349
350         EXIT;
351  out:
352         ptlrpc_free_req(request);
353         return rc;
354 }
355
356 static int osc_destroy(struct lustre_handle *conn, struct obdo *oa)
357 {
358         struct ptlrpc_request *request;
359         struct ptlrpc_client *cl;
360         struct ptlrpc_connection *connection;
361         struct osc_obd *osc = &class_conn2obd(conn)->u.osc;
362         struct ost_body *body;
363         int rc, size = sizeof(*body);
364         ENTRY;
365
366         if (!oa) {
367                 CERROR("oa NULL\n");
368                 RETURN(-EINVAL);
369         }
370         osc_con2cl(conn, &cl, &connection);
371         request = ptlrpc_prep_req2(cl, connection, &osc->osc_connh, 
372                                    OST_DESTROY, 1, &size, NULL);
373         if (!request)
374                 RETURN(-ENOMEM);
375
376         body = lustre_msg_buf(request->rq_reqmsg, 0);
377         memcpy(&body->oa, oa, sizeof(*oa));
378         body->oa.o_valid = ~0;
379
380         request->rq_replen = lustre_msg_size(1, &size);
381
382         rc = ptlrpc_queue_wait(request);
383         rc = ptlrpc_check_status(request, rc);
384         if (rc)
385                 GOTO(out, rc);
386
387         body = lustre_msg_buf(request->rq_repmsg, 0);
388         memcpy(oa, &body->oa, sizeof(*oa));
389
390         EXIT;
391  out:
392         ptlrpc_free_req(request);
393         return rc;
394 }
395
396 struct osc_brw_cb_data {
397         struct page **buf;
398         bulk_callback_t callback;
399         void *cb_data;
400         void *obd_data;
401         size_t obd_size;
402 };
403
404 static void brw_finish(struct ptlrpc_bulk_desc *desc, void *data)
405 {
406         struct osc_brw_cb_data *cb_data = data;
407         int i;
408         ENTRY;
409
410         if (desc->b_flags & PTL_RPC_FL_INTR)
411                 CERROR("got signal\n");
412
413         for (i = 0; i < desc->b_page_count; i++)
414                 kunmap(cb_data->buf[i]);
415
416         if (cb_data->callback)
417                 (cb_data->callback)(desc, cb_data->cb_data);
418
419         ptlrpc_bulk_decref(desc);
420         if (cb_data->obd_data)
421                 OBD_FREE(cb_data->obd_data, cb_data->obd_size);
422         OBD_FREE(cb_data, sizeof(*cb_data));
423         EXIT;
424 }
425
426 static int osc_brw_read(struct lustre_handle *conn, obd_count num_oa,
427                         struct obdo **oa, obd_count *oa_bufs, struct page **buf,
428                         obd_size *count, obd_off *offset, obd_flag *flags,
429                         bulk_callback_t callback)
430 {
431         struct ptlrpc_client *cl;
432         struct ptlrpc_connection *connection;
433         struct ptlrpc_request *request = NULL;
434         struct ptlrpc_bulk_desc *desc = NULL;
435         struct osc_obd *osc = &class_conn2obd(conn)->u.osc;
436         struct ost_body *body;
437         struct osc_brw_cb_data *cb_data = NULL;
438         long pages;
439         int rc, i, j, size[3] = {sizeof(*body)};
440         void *iooptr, *nioptr;
441         ENTRY;
442
443         /* XXX almost identical to brw_write case */
444         size[1] = num_oa * sizeof(struct obd_ioobj);
445         pages = 0;
446         for (i = 0; i < num_oa; i++)
447                 pages += oa_bufs[i];
448         size[2] = pages * sizeof(struct niobuf_remote);
449
450         osc_con2cl(conn, &cl, &connection);
451         request = ptlrpc_prep_req2(cl, connection, &osc->osc_connh, 
452                                    OST_BRW, 3, size, NULL);
453         if (!request)
454                 RETURN(-ENOMEM);
455
456         body = lustre_msg_buf(request->rq_reqmsg, 0);
457         body->data = OBD_BRW_READ;
458
459         desc = ptlrpc_prep_bulk(connection);
460         if (!desc)
461                 GOTO(out_free, rc = -ENOMEM);
462         desc->b_portal = OST_BULK_PORTAL;
463         desc->b_cb = brw_finish;
464         OBD_ALLOC(cb_data, sizeof(*cb_data));
465         if (!cb_data)
466                 GOTO(out_free, rc = -ENOMEM);
467         cb_data->buf = NULL;
468         cb_data->callback = callback;
469         desc->b_cb_data = cb_data;
470         /* XXX end almost identical to brw_write case */
471
472         iooptr = lustre_msg_buf(request->rq_reqmsg, 1);
473         nioptr = lustre_msg_buf(request->rq_reqmsg, 2);
474         for (pages = 0, i = 0; i < num_oa; i++) {
475                 ost_pack_ioo(&iooptr, oa[i], oa_bufs[i]);
476                 /* FIXME: this inner loop is wrong for multiple OAs */
477                 for (j = 0; j < oa_bufs[i]; j++, pages++) {
478                         struct ptlrpc_bulk_page *bulk;
479                         bulk = ptlrpc_prep_bulk_page(desc);
480                         if (bulk == NULL)
481                                 GOTO(out_unmap, rc = -ENOMEM);
482
483                         spin_lock(&connection->c_lock);
484                         bulk->b_xid = ++connection->c_xid_out;
485                         spin_unlock(&connection->c_lock);
486
487                         bulk->b_buf = kmap(buf[pages]);
488                         bulk->b_page = buf[pages];
489                         bulk->b_buflen = PAGE_SIZE;
490                         ost_pack_niobuf(&nioptr, offset[pages], count[pages],
491                                         flags[pages], bulk->b_xid);
492                 }
493         }
494
495         /* 
496          * Register the bulk first, because the reply could arrive out of order,
497          * and we want to be ready for the bulk data.
498          *
499          * One reference is released by the bulk callback, the other when
500          * we finish sleeping on it (if we don't have a callback).
501          */
502         atomic_set(&desc->b_refcount, callback ? 1 : 2);
503         rc = ptlrpc_register_bulk(desc);
504         if (rc)
505                 GOTO(out_unmap, rc);
506
507         request->rq_replen = lustre_msg_size(1, size);
508         rc = ptlrpc_queue_wait(request);
509         rc = ptlrpc_check_status(request, rc);
510         if (rc) {
511                 ptlrpc_bulk_decref(desc);
512                 GOTO(out_unmap, rc);
513         }
514
515         /* Callbacks cause asynchronous handling. */
516         if (callback)
517                 RETURN(0);
518
519         l_wait_event_killable(desc->b_waitq, ptlrpc_check_bulk_received(desc));
520         ptlrpc_bulk_decref(desc);
521         if (desc->b_flags & PTL_RPC_FL_INTR)
522                 RETURN(-EINTR);
523         RETURN(0);
524         
525         /* Clean up on error. */
526  out_unmap:
527         for (pages = 0, i = 0; i < num_oa; i++)
528                 for (j = 0; j < oa_bufs[i]; j++, pages++)
529                         kunmap(pagearray[pages]);
530  out_free:
531         if (cb_data)
532                 OBD_FREE(cb_data, sizeof(*cb_data));
533         ptlrpc_free_bulk(desc);
534         ptlrpc_free_req(request);
535         return rc;
536 }
537
538 static int osc_brw_write(struct lustre_handle *conn, obd_count num_oa,
539                          struct obdo **oa, obd_count *oa_bufs,
540                          struct page **pagearray, obd_size *count,
541                          obd_off *offset, obd_flag *flags,
542                          bulk_callback_t callback)
543 {
544         struct ptlrpc_client *cl;
545         struct ptlrpc_connection *connection;
546         struct ptlrpc_request *request = NULL;
547         struct ptlrpc_bulk_desc *desc = NULL;
548         struct osc_obd *osc = &class_conn2obd(conn)->u.osc;
549         struct ost_body *body;
550         struct niobuf_local *local = NULL;
551         struct niobuf_remote *remote;
552         struct osc_brw_cb_data *cb_data = NULL;
553         long pages;
554         int rc, i, j, size[3] = {sizeof(*body)};
555         void *iooptr, *nioptr;
556         ENTRY;
557
558         /* XXX almost identical to brw_read case */
559         size[1] = num_oa * sizeof(struct obd_ioobj);
560         pages = 0;
561         for (i = 0; i < num_oa; i++)
562                 pages += oa_bufs[i];
563         size[2] = pages * sizeof(struct niobuf_remote);
564
565         osc_con2cl(conn, &cl, &connection);
566         request = ptlrpc_prep_req2(cl, connection, &osc->osc_connh, 
567                                    OST_BRW, 3, size, NULL);
568         if (!request)
569                 RETURN(-ENOMEM);
570
571         body = lustre_msg_buf(request->rq_reqmsg, 0);
572         body->data = OBD_BRW_WRITE;
573
574         OBD_ALLOC(local, pages * sizeof(*local));
575         if (!local)
576                 GOTO(out_free, rc = -ENOMEM);
577
578         desc = ptlrpc_prep_bulk(connection);
579         if (!desc)
580                 GOTO(out_free, rc = -ENOMEM);
581         desc->b_portal = OSC_BULK_PORTAL;
582         desc->b_cb = brw_finish;
583         OBD_ALLOC(cb_data, sizeof(*cb_data));
584         if (!cb_data)
585                 GOTO(out_free, rc = -ENOMEM);
586         cb_data->buf = pagearray;
587         cb_data->callback = callback;
588         desc->b_cb_data = cb_data;
589         /* XXX end almost identical to brw_read case */
590         cb_data->obd_data = local;
591         cb_data->obd_size = pages * sizeof(*local);
592
593         iooptr = lustre_msg_buf(request->rq_reqmsg, 1);
594         nioptr = lustre_msg_buf(request->rq_reqmsg, 2);
595         for (pages = 0, i = 0; i < num_oa; i++) {
596                 ost_pack_ioo(&iooptr, oa[i], oa_bufs[i]);
597                 for (j = 0; j < oa_bufs[i]; j++, pages++) {
598                         local[pages].addr = kmap(pagearray[pages]);
599                         local[pages].offset = offset[pages];
600                         local[pages].len = count[pages];
601                         ost_pack_niobuf(&nioptr, offset[pages], count[pages],
602                                         flags[pages], 0);
603                 }
604         }
605
606         size[1] = pages * sizeof(struct niobuf_remote);
607         request->rq_replen = lustre_msg_size(2, size);
608         rc = ptlrpc_queue_wait(request);
609         rc = ptlrpc_check_status(request, rc);
610         if (rc)
611                 GOTO(out_unmap, rc);
612
613         nioptr = lustre_msg_buf(request->rq_repmsg, 1);
614         if (!nioptr)
615                 GOTO(out_unmap, rc = -EINVAL);
616
617         if (request->rq_repmsg->buflens[1] !=
618             pages * sizeof(struct niobuf_remote)) {
619                 CERROR("buffer length wrong (%d vs. %ld)\n",
620                        request->rq_repmsg->buflens[1],
621                        pages * sizeof(struct niobuf_remote));
622                 GOTO(out_unmap, rc = -EINVAL);
623         }
624
625         for (pages = 0, i = 0; i < num_oa; i++) {
626                 for (j = 0; j < oa_bufs[i]; j++, pages++) {
627                         struct ptlrpc_bulk_page *page;
628
629                         ost_unpack_niobuf(&nioptr, &remote);
630
631                         page = ptlrpc_prep_bulk_page(desc);
632                         if (!page)
633                                 GOTO(out_unmap, rc = -ENOMEM);
634
635                         page->b_buf = (void *)(unsigned long)local[pages].addr;
636                         page->b_buflen = local[pages].len;
637                         page->b_xid = remote->xid;
638                 }
639         }
640
641         if (desc->b_page_count != pages)
642                 LBUG();
643
644         /*
645          * One is released when the bulk is complete, the other when we finish
646          * waiting on it.  (Callback cases don't sleep, so only one ref for
647          * them.)
648          */
649         atomic_set(&desc->b_refcount, callback ? 1 : 2);
650         CDEBUG(D_PAGE, "Set refcount of %p to %d\n", desc,
651                atomic_read(&desc->b_refcount));
652         rc = ptlrpc_send_bulk(desc);
653         if (rc)
654                 GOTO(out_unmap, rc);
655
656         /* Callbacks cause asynchronous handling. */
657         if (callback)
658                 RETURN(0);
659
660         /* If there's no callback function, sleep here until complete. */
661         l_wait_event_killable(desc->b_waitq, ptlrpc_check_bulk_sent(desc));
662         ptlrpc_bulk_decref(desc);
663         if (desc->b_flags & PTL_RPC_FL_INTR)
664                 RETURN(-EINTR);
665         RETURN(0);
666
667         /* Clean up on error. */
668  out_unmap:
669         for (pages = 0, i = 0; i < num_oa; i++)
670                 for (j = 0; j < oa_bufs[i]; j++, pages++)
671                         kunmap(pagearray[pages]);
672
673  out_free:
674         if (cb_data)
675                 OBD_FREE(cb_data, sizeof(*cb_data));
676         if (local)
677                 OBD_FREE(local, pages * sizeof(*local));
678         ptlrpc_free_bulk(desc);
679         ptlrpc_req_finished(request);
680         return rc;
681 }
682
683 static int osc_brw(int cmd, struct lustre_handle *conn, obd_count num_oa,
684                    struct obdo **oa, obd_count *oa_bufs, struct page **buf,
685                    obd_size *count, obd_off *offset, obd_flag *flags,
686                    void *callback)
687 {
688         if (num_oa != 1)
689                 LBUG();
690
691         if (cmd & OBD_BRW_WRITE)
692                 return osc_brw_write(conn, num_oa, oa, oa_bufs, buf, count,
693                                      offset, flags, (bulk_callback_t)callback);
694         else
695                 return osc_brw_read(conn, num_oa, oa, oa_bufs, buf, count,
696                                     offset, flags, (bulk_callback_t)callback);
697 }
698
699 static int osc_enqueue(struct lustre_handle *oconn,
700                        struct lustre_handle *parent_lock, __u64 *res_id,
701                        __u32 type, void *extentp, int extent_len, __u32 mode,
702                        int *flags, void *callback, void *data, int datalen,
703                        struct lustre_handle *lockh)
704 {
705         struct obd_device *obddev = class_conn2obd(oconn);
706         struct osc_obd *osc = &obddev->u.osc;
707         struct ptlrpc_connection *conn;
708         struct ptlrpc_client *cl;
709         struct ldlm_extent *extent = extentp;
710         int rc;
711         __u32 mode2;
712
713         /* Filesystem locks are given a bit of special treatment: first we
714          * fixup the lock to start and end on page boundaries. */
715         extent->start &= PAGE_MASK;
716         extent->end = (extent->end + PAGE_SIZE - 1) & PAGE_MASK;
717
718         /* Next, search for already existing extent locks that will cover us */
719         osc_con2dlmcl(oconn, &cl, &conn);
720         rc = ldlm_lock_match(obddev->obd_namespace, res_id, type, extent,
721                              sizeof(extent), mode, lockh);
722         if (rc == 1) {
723                 /* We already have a lock, and it's referenced */
724                 return 0;
725         }
726
727         /* Next, search for locks that we can upgrade (if we're trying to write)
728          * or are more than we need (if we're trying to read).  Because the VFS
729          * and page cache already protect us locally, lots of readers/writers
730          * can share a single PW lock. */
731         if (mode == LCK_PW)
732                 mode2 = LCK_PR;
733         else
734                 mode2 = LCK_PW;
735
736         rc = ldlm_lock_match(obddev->obd_namespace, res_id, type, extent,
737                              sizeof(extent), mode2, lockh);
738         if (rc == 1) {
739                 int flags;
740                 /* FIXME: This is not incredibly elegant, but it might
741                  * be more elegant than adding another parameter to
742                  * lock_match.  I want a second opinion. */
743                 ldlm_lock_addref(lockh, mode);
744                 ldlm_lock_decref(lockh, mode2);
745
746                 if (mode == LCK_PR)
747                         return 0;
748
749                 rc = ldlm_cli_convert(cl, lockh, &osc->osc_connh, 
750                                       mode, &flags);
751                 if (rc)
752                         LBUG();
753
754                 return rc;
755         }
756
757         rc = ldlm_cli_enqueue(cl, conn, &osc->osc_connh, 
758                               NULL, obddev->obd_namespace,
759                               parent_lock, res_id, type, extent, sizeof(extent),
760                               mode, flags, callback, data, datalen, lockh);
761         return rc;
762 }
763
764 static int osc_cancel(struct lustre_handle *oconn, __u32 mode,
765                       struct lustre_handle *lockh)
766 {
767         ENTRY;
768
769         ldlm_lock_decref(lockh, mode);
770
771         RETURN(0);
772 }
773
774 static int osc_setup(struct obd_device *obddev, obd_count len, void *buf)
775 {
776         struct obd_ioctl_data* data = buf;
777         struct osc_obd *osc = &obddev->u.osc;
778         char server_uuid[37];
779         int rc;
780         ENTRY;
781
782         if (data->ioc_inllen1 < 1) {
783                 CERROR("osc setup requires a TARGET UUID\n");
784                 RETURN(-EINVAL);
785         }
786
787         if (data->ioc_inllen1 > 37) {
788                 CERROR("osc TARGET UUID must be less than 38 characters\n");
789                 RETURN(-EINVAL);
790         }
791
792         if (data->ioc_inllen2 < 1) {
793                 CERROR("osc setup requires a SERVER UUID\n");
794                 RETURN(-EINVAL);
795         }
796
797         if (data->ioc_inllen2 > 37) {
798                 CERROR("osc SERVER UUID must be less than 38 characters\n");
799                 RETURN(-EINVAL);
800         }
801
802         memcpy(osc->osc_target_uuid, data->ioc_inlbuf1, data->ioc_inllen1);
803         memcpy(server_uuid, data->ioc_inlbuf2, MIN(data->ioc_inllen2,
804                                                    sizeof(server_uuid)));
805
806         osc->osc_conn = ptlrpc_uuid_to_connection(server_uuid);
807         if (!osc->osc_conn)
808                 RETURN(-ENOENT);
809
810         obddev->obd_namespace =
811                 ldlm_namespace_new("osc", LDLM_NAMESPACE_CLIENT);
812         if (obddev->obd_namespace == NULL)
813                 GOTO(out_conn, rc = -ENOMEM);
814
815         OBD_ALLOC(osc->osc_client, sizeof(*osc->osc_client));
816         if (osc->osc_client == NULL)
817                 GOTO(out_ns, rc = -ENOMEM);
818
819         OBD_ALLOC(osc->osc_ldlm_client, sizeof(*osc->osc_ldlm_client));
820         if (osc->osc_ldlm_client == NULL)
821                 GOTO(out_client, rc = -ENOMEM);
822
823         ptlrpc_init_client(NULL, NULL, OST_REQUEST_PORTAL, OSC_REPLY_PORTAL,
824                            osc->osc_client);
825         ptlrpc_init_client(NULL, NULL, LDLM_REQUEST_PORTAL, LDLM_REPLY_PORTAL,
826                            osc->osc_ldlm_client);
827         osc->osc_client->cli_name = "osc";
828         osc->osc_ldlm_client->cli_name = "ldlm";
829
830         MOD_INC_USE_COUNT;
831         RETURN(0);
832
833  out_client:
834         OBD_FREE(osc->osc_client, sizeof(*osc->osc_client));
835  out_ns:
836         ldlm_namespace_free(obddev->obd_namespace);
837  out_conn:
838         ptlrpc_put_connection(osc->osc_conn);
839         return rc;
840 }
841
842 static int osc_cleanup(struct obd_device * obddev)
843 {
844         struct osc_obd *osc = &obddev->u.osc;
845
846         ldlm_namespace_free(obddev->obd_namespace);
847
848         ptlrpc_cleanup_client(osc->osc_client);
849         OBD_FREE(osc->osc_client, sizeof(*osc->osc_client));
850         ptlrpc_cleanup_client(osc->osc_ldlm_client);
851         OBD_FREE(osc->osc_ldlm_client, sizeof(*osc->osc_ldlm_client));
852         ptlrpc_put_connection(osc->osc_conn);
853
854         MOD_DEC_USE_COUNT;
855         return 0;
856 }
857
858 static int osc_statfs(struct lustre_handle *conn, struct statfs *sfs)
859 {
860         struct ptlrpc_request *request;
861         struct ptlrpc_client *cl;
862         struct osc_obd *osc = &class_conn2obd(conn)->u.osc;
863         struct ptlrpc_connection *connection;
864         struct obd_statfs *osfs;
865         int rc, size = sizeof(*osfs);
866         ENTRY;
867
868         osc_con2cl(conn, &cl, &connection);
869         request = ptlrpc_prep_req2(cl, connection, &osc->osc_connh,
870                                    OST_STATFS, 0, NULL, NULL);
871         if (!request)
872                 RETURN(-ENOMEM);
873
874         request->rq_replen = lustre_msg_size(1, &size);
875
876         rc = ptlrpc_queue_wait(request);
877         rc = ptlrpc_check_status(request, rc);
878         if (rc) {
879                 CERROR("%s failed: rc = %d\n", __FUNCTION__, rc);
880                 GOTO(out, rc);
881         }
882
883         osfs = lustre_msg_buf(request->rq_repmsg, 0);
884         obd_statfs_unpack(osfs, sfs);
885
886         EXIT;
887  out:
888         ptlrpc_free_req(request);
889         return rc;
890 }
891
892 struct obd_ops osc_obd_ops = {
893         o_setup:        osc_setup,
894         o_cleanup:      osc_cleanup,
895         o_statfs:       osc_statfs,
896         o_create:       osc_create,
897         o_destroy:      osc_destroy,
898         o_getattr:      osc_getattr,
899         o_setattr:      osc_setattr,
900         o_open:         osc_open,
901         o_close:        osc_close,
902         o_connect:      osc_connect,
903         o_disconnect:   osc_disconnect,
904         o_brw:          osc_brw,
905         o_punch:        osc_punch,
906         o_enqueue:      osc_enqueue,
907         o_cancel:       osc_cancel
908 };
909
910 static int __init osc_init(void)
911 {
912         return class_register_type(&osc_obd_ops, LUSTRE_OSC_NAME);
913 }
914
915 static void __exit osc_exit(void)
916 {
917         class_unregister_type(LUSTRE_OSC_NAME);
918 }
919
920 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
921 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC) v1.0");
922 MODULE_LICENSE("GPL");
923
924 module_init(osc_init);
925 module_exit(osc_exit);