Whamcloud - gitweb
Fixed brw_finish to kunmap the pointers in the bulk descriptor instead of the
[fs/lustre-release.git] / lustre / osc / osc_request.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Copyright (C) 2001, 2002 Cluster File Systems, Inc.
5  *
6  *  This code is issued under the GNU General Public License.
7  *  See the file COPYING in this distribution
8  *
9  *  Author Peter Braam <braam@clusterfs.com>
10  *
11  *  This server is single threaded at present (but can easily be multi
12  *  threaded). For testing and management it is treated as an
13  *  obd_device, although it does not export a full OBD method table
14  *  (the requests are coming in over the wire, so object target
15  *  modules do not have a full method table.)
16  *
17  */
18
19 #define EXPORT_SYMTAB
20 #define DEBUG_SUBSYSTEM S_OSC
21
22 #include <linux/module.h>
23 #include <linux/lustre_dlm.h>
24 #include <linux/lustre_mds.h> /* for mds_objid */
25 #include <linux/obd_ost.h>
26 #include <linux/obd_lov.h>
27
28 static void osc_con2cl(struct lustre_handle *conn, struct ptlrpc_client **cl,
29                        struct ptlrpc_connection **connection)
30 {
31         struct osc_obd *osc = &class_conn2obd(conn)->u.osc;
32         *cl = osc->osc_client;
33         *connection = osc->osc_conn;
34 }
35
36 static void osc_con2dlmcl(struct lustre_handle *conn, struct ptlrpc_client **cl,
37                           struct ptlrpc_connection **connection)
38 {
39         struct osc_obd *osc = &class_conn2obd(conn)->u.osc;
40         *cl = osc->osc_ldlm_client;
41         *connection = osc->osc_conn;
42 }
43
44 static int osc_connect(struct lustre_handle *conn, struct obd_device *obd)
45 {
46         struct osc_obd *osc = &obd->u.osc;
47         struct obd_import *import;
48         struct ptlrpc_request *request;
49         char *tmp = osc->osc_target_uuid;
50         int rc, size = sizeof(osc->osc_target_uuid);
51         ENTRY;
52
53         OBD_ALLOC(import, sizeof(*import));
54         if (!import)
55                 RETURN(-ENOMEM);
56
57         MOD_INC_USE_COUNT;
58         rc = class_connect(conn, obd);
59         if (rc)
60                 RETURN(rc);
61
62         request = ptlrpc_prep_req(osc->osc_client, osc->osc_conn,
63                                   OST_CONNECT, 1, &size, &tmp);
64         if (!request)
65                 GOTO(out_disco, rc = -ENOMEM);
66
67         request->rq_level = LUSTRE_CONN_NEW;
68         request->rq_replen = lustre_msg_size(0, NULL);
69
70         rc = ptlrpc_queue_wait(request);
71         rc = ptlrpc_check_status(request, rc);
72         if (rc) {
73                 CERROR("%s failed: rc = %d\n", __FUNCTION__, rc);
74                 GOTO(out, rc);
75         }
76
77         /* XXX eventually maybe more refinement */
78         osc->osc_conn->c_level = LUSTRE_CONN_FULL;
79
80         /* XXX: Make this a handle */
81         osc->osc_connh.addr = request->rq_repmsg->addr;
82         osc->osc_connh.cookie = request->rq_repmsg->cookie;
83
84         EXIT;
85  out:
86         ptlrpc_free_req(request);
87  out_disco:
88         if (rc) {
89                 class_disconnect(conn);
90                 MOD_DEC_USE_COUNT;
91         }
92         return rc;
93 }
94
95 static int osc_disconnect(struct lustre_handle *conn)
96 {
97         struct ptlrpc_request *request;
98         struct ptlrpc_client *cl;
99         struct ptlrpc_connection *connection;
100         struct osc_obd *osc = &class_conn2obd(conn)->u.osc;
101         int rc;
102         ENTRY;
103
104         osc_con2cl(conn, &cl, &connection);
105         request = ptlrpc_prep_req2(cl, connection, &osc->osc_connh,
106                                   OST_DISCONNECT, 0, NULL, NULL);
107         if (!request)
108                 RETURN(-ENOMEM);
109         request->rq_replen = lustre_msg_size(0, NULL);
110
111         rc = ptlrpc_queue_wait(request);
112         if (rc)
113                 GOTO(out, rc);
114         rc = class_disconnect(conn);
115         if (!rc)
116                 MOD_DEC_USE_COUNT;
117
118  out:
119         ptlrpc_free_req(request);
120         return rc;
121 }
122
123 static int osc_getattr(struct lustre_handle *conn, struct obdo *oa)
124 {
125         struct ptlrpc_request *request;
126         struct ptlrpc_client *cl;
127         struct osc_obd *osc = &class_conn2obd(conn)->u.osc;
128         struct ptlrpc_connection *connection;
129         struct ost_body *body;
130         int rc, size = sizeof(*body);
131         ENTRY;
132
133         osc_con2cl(conn, &cl, &connection);
134         request = ptlrpc_prep_req2(cl, connection, &osc->osc_connh,
135                                    OST_GETATTR, 1, &size, NULL);
136         if (!request)
137                 RETURN(-ENOMEM);
138
139         body = lustre_msg_buf(request->rq_reqmsg, 0);
140         memcpy(&body->oa, oa, sizeof(*oa));
141         body->oa.o_valid = ~0;
142
143         request->rq_replen = lustre_msg_size(1, &size);
144
145         rc = ptlrpc_queue_wait(request);
146         rc = ptlrpc_check_status(request, rc);
147         if (rc) {
148                 CERROR("%s failed: rc = %d\n", __FUNCTION__, rc);
149                 GOTO(out, rc);
150         }
151
152         body = lustre_msg_buf(request->rq_repmsg, 0);
153         CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
154         if (oa)
155                 memcpy(oa, &body->oa, sizeof(*oa));
156
157         EXIT;
158  out:
159         ptlrpc_free_req(request);
160         return rc;
161 }
162
163 static int osc_open(struct lustre_handle *conn, struct obdo *oa,
164                     struct lov_stripe_md *md)
165 {
166         struct ptlrpc_request *request;
167         struct ptlrpc_client *cl;
168         struct osc_obd *osc = &class_conn2obd(conn)->u.osc;
169         struct ptlrpc_connection *connection;
170         struct ost_body *body;
171         int rc, size = sizeof(*body);
172         ENTRY;
173
174         osc_con2cl(conn, &cl, &connection);
175         request = ptlrpc_prep_req2(cl, connection, &osc->osc_connh,
176                                    OST_OPEN, 1, &size, NULL);
177         if (!request)
178                 RETURN(-ENOMEM);
179
180         body = lustre_msg_buf(request->rq_reqmsg, 0);
181         memcpy(&body->oa, oa, sizeof(*oa));
182         body->oa.o_valid = (OBD_MD_FLMODE | OBD_MD_FLID);
183
184         request->rq_replen = lustre_msg_size(1, &size);
185
186         rc = ptlrpc_queue_wait(request);
187         rc = ptlrpc_check_status(request, rc);
188         if (rc)
189                 GOTO(out, rc);
190
191         body = lustre_msg_buf(request->rq_repmsg, 0);
192         CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
193         if (oa)
194                 memcpy(oa, &body->oa, sizeof(*oa));
195
196         EXIT;
197  out:
198         ptlrpc_free_req(request);
199         return rc;
200 }
201
202 static int osc_close(struct lustre_handle *conn, struct obdo *oa,
203                      struct lov_stripe_md *md)
204 {
205         struct ptlrpc_request *request;
206         struct ptlrpc_client *cl;
207         struct osc_obd *osc = &class_conn2obd(conn)->u.osc;
208         struct ptlrpc_connection *connection;
209         struct ost_body *body;
210         int rc, size = sizeof(*body);
211         ENTRY;
212
213         osc_con2cl(conn, &cl, &connection);
214         request = ptlrpc_prep_req2(cl, connection, &osc->osc_connh,
215                                    OST_CLOSE, 1, &size, NULL);
216         if (!request)
217                 RETURN(-ENOMEM);
218
219         oa->o_id = md->lmd_object_id;
220         oa->o_mode = S_IFREG;
221         oa->o_valid = (OBD_MD_FLMODE | OBD_MD_FLID);
222         body = lustre_msg_buf(request->rq_reqmsg, 0);
223         memcpy(&body->oa, oa, sizeof(*oa));
224
225         request->rq_replen = lustre_msg_size(1, &size);
226
227         rc = ptlrpc_queue_wait(request);
228         rc = ptlrpc_check_status(request, rc);
229         if (rc)
230                 GOTO(out, rc);
231
232         body = lustre_msg_buf(request->rq_repmsg, 0);
233         CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
234         if (oa)
235                 memcpy(oa, &body->oa, sizeof(*oa));
236
237         EXIT;
238  out:
239         ptlrpc_free_req(request);
240         return rc;
241 }
242
243 static int osc_setattr(struct lustre_handle *conn, struct obdo *oa)
244 {
245         struct ptlrpc_request *request;
246         struct ptlrpc_client *cl;
247         struct osc_obd *osc = &class_conn2obd(conn)->u.osc;
248         struct ptlrpc_connection *connection;
249         struct ost_body *body;
250         int rc, size = sizeof(*body);
251         ENTRY;
252
253         osc_con2cl(conn, &cl, &connection);
254         request = ptlrpc_prep_req2(cl, connection, &osc->osc_connh,
255                                   OST_SETATTR, 1, &size, NULL);
256         if (!request)
257                 RETURN(-ENOMEM);
258
259         body = lustre_msg_buf(request->rq_reqmsg, 0);
260         memcpy(&body->oa, oa, sizeof(*oa));
261
262         request->rq_replen = lustre_msg_size(1, &size);
263
264         rc = ptlrpc_queue_wait(request);
265         rc = ptlrpc_check_status(request, rc);
266         GOTO(out, rc);
267
268  out:
269         ptlrpc_free_req(request);
270         return rc;
271 }
272
273 static int osc_create(struct lustre_handle *conn, struct obdo *oa,
274                       struct lov_stripe_md **ea)
275 {
276         struct ptlrpc_request *request;
277         struct ptlrpc_client *cl;
278         struct ptlrpc_connection *connection;
279         struct osc_obd *osc = &class_conn2obd(conn)->u.osc;
280         struct ost_body *body;
281         int rc, size = sizeof(*body);
282         ENTRY;
283
284         if (!oa) {
285                 CERROR("oa NULL\n");
286                 RETURN(-EINVAL);
287         }
288
289         if (!ea) {
290                 LBUG();
291         }
292
293         if (!*ea) {
294                 OBD_ALLOC(*ea, oa->o_easize);
295                 if (!*ea)
296                         RETURN(-ENOMEM);
297                 (*ea)->lmd_size = oa->o_easize;
298         }
299
300         osc_con2cl(conn, &cl, &connection);
301         request = ptlrpc_prep_req2(cl, connection, &osc->osc_connh,
302                                   OST_CREATE, 1, &size, NULL);
303         if (!request)
304                 RETURN(-ENOMEM);
305
306         body = lustre_msg_buf(request->rq_reqmsg, 0);
307         memcpy(&body->oa, oa, sizeof(*oa));
308
309         request->rq_replen = lustre_msg_size(1, &size);
310
311         rc = ptlrpc_queue_wait(request);
312         rc = ptlrpc_check_status(request, rc);
313         if (rc)
314                 GOTO(out, rc);
315
316         body = lustre_msg_buf(request->rq_repmsg, 0);
317         memcpy(oa, &body->oa, sizeof(*oa));
318
319         (*ea)->lmd_object_id = oa->o_id;
320         (*ea)->lmd_stripe_count = 1;
321         EXIT;
322  out:
323         ptlrpc_free_req(request);
324         return rc;
325 }
326
327 static int osc_punch(struct lustre_handle *conn, struct obdo *oa,
328                      struct lov_stripe_md *md, obd_size count,
329                      obd_off offset)
330 {
331         struct ptlrpc_request *request;
332         struct ptlrpc_client *cl;
333         struct ptlrpc_connection *connection;
334         struct osc_obd *osc = &class_conn2obd(conn)->u.osc;
335         struct ost_body *body;
336         int rc, size = sizeof(*body);
337         ENTRY;
338
339         if (!oa) {
340                 CERROR("oa NULL\n");
341                 RETURN(-EINVAL);
342         }
343         osc_con2cl(conn, &cl, &connection);
344         request = ptlrpc_prep_req2(cl, connection, &osc->osc_connh,
345                                    OST_PUNCH, 1, &size, NULL);
346         if (!request)
347                 RETURN(-ENOMEM);
348
349         body = lustre_msg_buf(request->rq_reqmsg, 0);
350         memcpy(&body->oa, oa, sizeof(*oa));
351         body->oa.o_blocks = count;
352         body->oa.o_valid |= OBD_MD_FLBLOCKS;
353
354         request->rq_replen = lustre_msg_size(1, &size);
355
356         rc = ptlrpc_queue_wait(request);
357         rc = ptlrpc_check_status(request, rc);
358         if (rc)
359                 GOTO(out, rc);
360
361         body = lustre_msg_buf(request->rq_repmsg, 0);
362         memcpy(oa, &body->oa, sizeof(*oa));
363
364         EXIT;
365  out:
366         ptlrpc_free_req(request);
367         return rc;
368 }
369
370 static int osc_destroy(struct lustre_handle *conn, struct obdo *oa,
371                        struct lov_stripe_md *ea)
372 {
373         struct ptlrpc_request *request;
374         struct ptlrpc_client *cl;
375         struct ptlrpc_connection *connection;
376         struct osc_obd *osc = &class_conn2obd(conn)->u.osc;
377         struct ost_body *body;
378         int rc, size = sizeof(*body);
379         ENTRY;
380
381         if (!oa) {
382                 CERROR("oa NULL\n");
383                 RETURN(-EINVAL);
384         }
385         osc_con2cl(conn, &cl, &connection);
386         request = ptlrpc_prep_req2(cl, connection, &osc->osc_connh,
387                                    OST_DESTROY, 1, &size, NULL);
388         if (!request)
389                 RETURN(-ENOMEM);
390
391         body = lustre_msg_buf(request->rq_reqmsg, 0);
392         memcpy(&body->oa, oa, sizeof(*oa));
393         body->oa.o_valid = ~0;
394
395         request->rq_replen = lustre_msg_size(1, &size);
396
397         rc = ptlrpc_queue_wait(request);
398         rc = ptlrpc_check_status(request, rc);
399         if (rc)
400                 GOTO(out, rc);
401
402         body = lustre_msg_buf(request->rq_repmsg, 0);
403         memcpy(oa, &body->oa, sizeof(*oa));
404
405         EXIT;
406  out:
407         ptlrpc_free_req(request);
408         return rc;
409 }
410
411 struct osc_brw_cb_data {
412         bulk_callback_t callback;
413         void *cb_data;
414         void *obd_data;
415         size_t obd_size;
416 };
417
418 static void brw_finish(struct ptlrpc_bulk_desc *desc, void *data)
419 {
420         struct list_head *tmp, *next;
421         struct osc_brw_cb_data *cb_data = data;
422         ENTRY;
423
424         if (desc->b_flags & PTL_RPC_FL_INTR)
425                 CERROR("got signal\n");
426
427         /* This feels wrong to me. */
428         list_for_each_safe(tmp, next, &desc->b_page_list) {
429                 struct ptlrpc_bulk_page *bulk;
430                 bulk = list_entry(tmp, struct ptlrpc_bulk_page, b_link);
431
432                 kunmap(bulk->b_page);
433         }
434
435         if (cb_data->callback)
436                 (cb_data->callback)(desc, cb_data->cb_data);
437
438         ptlrpc_bulk_decref(desc);
439         if (cb_data->obd_data)
440                 OBD_FREE(cb_data->obd_data, cb_data->obd_size);
441         OBD_FREE(cb_data, sizeof(*cb_data));
442         EXIT;
443 }
444
445 static int osc_brw_read(struct lustre_handle *conn, struct lov_stripe_md *md,
446                         obd_count page_count, struct page **page_array,
447                         obd_size *count, obd_off *offset, obd_flag *flags,
448                         bulk_callback_t callback)
449 {
450         struct ptlrpc_client *cl;
451         struct ptlrpc_connection *connection;
452         struct ptlrpc_request *request = NULL;
453         struct ptlrpc_bulk_desc *desc = NULL;
454         struct osc_obd *osc = &class_conn2obd(conn)->u.osc;
455         struct ost_body *body;
456         int rc, j, size[3] = {sizeof(*body)};
457         void *iooptr, *nioptr;
458         struct osc_brw_cb_data *cb_data = NULL;
459         ENTRY;
460
461         size[1] = sizeof(struct obd_ioobj);
462         size[2] = page_count * sizeof(struct niobuf_remote);
463
464         osc_con2cl(conn, &cl, &connection);
465         request = ptlrpc_prep_req2(cl, connection, &osc->osc_connh,
466                                    OST_BRW, 3, size, NULL);
467         if (!request)
468                 RETURN(-ENOMEM);
469
470         body = lustre_msg_buf(request->rq_reqmsg, 0);
471         body->data = OBD_BRW_READ;
472
473         desc = ptlrpc_prep_bulk(connection);
474         if (!desc)
475                 GOTO(out_free, rc = -ENOMEM);
476         desc->b_portal = OST_BULK_PORTAL;
477         desc->b_cb = brw_finish;
478         OBD_ALLOC(cb_data, sizeof(*cb_data));
479         if (!cb_data)
480                 GOTO(out_free, rc = -ENOMEM);
481         cb_data->callback = callback;
482         desc->b_cb_data = cb_data;
483         /* XXX end almost identical to brw_write case */
484
485         iooptr = lustre_msg_buf(request->rq_reqmsg, 1);
486         nioptr = lustre_msg_buf(request->rq_reqmsg, 2);
487         ost_pack_ioo(&iooptr, md, page_count);
488         for (j = 0; j < page_count; j++) {
489                 struct ptlrpc_bulk_page *bulk;
490                 bulk = ptlrpc_prep_bulk_page(desc);
491                 if (bulk == NULL)
492                         GOTO(out_unmap, rc = -ENOMEM);
493
494                 spin_lock(&connection->c_lock);
495                 bulk->b_xid = ++connection->c_xid_out;
496                 spin_unlock(&connection->c_lock);
497
498                 bulk->b_buf = kmap(page_array[j]);
499                 bulk->b_page = page_array[j];
500                 bulk->b_buflen = PAGE_SIZE;
501                 ost_pack_niobuf(&nioptr, offset[j], count[j],
502                                 flags[j], bulk->b_xid);
503         }
504
505         /*
506          * Register the bulk first, because the reply could arrive out of order,
507          * and we want to be ready for the bulk data.
508          *
509          * One reference is released by the bulk callback, the other when
510          * we finish sleeping on it (if we don't have a callback).
511          */
512         atomic_set(&desc->b_refcount, callback ? 1 : 2);
513         rc = ptlrpc_register_bulk(desc);
514         if (rc)
515                 GOTO(out_unmap, rc);
516
517         request->rq_replen = lustre_msg_size(1, size);
518         rc = ptlrpc_queue_wait(request);
519         rc = ptlrpc_check_status(request, rc);
520         if (rc) {
521                 ptlrpc_bulk_decref(desc);
522                 GOTO(out_unmap, rc);
523         }
524
525         /* Callbacks cause asynchronous handling. */
526         if (callback)
527                 RETURN(0);
528
529         l_wait_event_killable(desc->b_waitq, ptlrpc_check_bulk_received(desc));
530         ptlrpc_bulk_decref(desc);
531         if (desc->b_flags & PTL_RPC_FL_INTR)
532                 RETURN(-EINTR);
533
534         RETURN(0);
535
536         /* Clean up on error. */
537  out_unmap:
538         for (j = 0; j < desc->b_page_count; j++)
539                 kunmap(page_array[j]);
540  out_free:
541         if (cb_data)
542                 OBD_FREE(cb_data, sizeof(*cb_data));
543         ptlrpc_free_bulk(desc);
544         ptlrpc_free_req(request);
545         return rc;
546 }
547
548 static int osc_brw_write(struct lustre_handle *conn,
549                          struct lov_stripe_md *md, obd_count page_count,
550                          struct page **pagearray, obd_size *count,
551                          obd_off *offset, obd_flag *flags,
552                          bulk_callback_t callback)
553 {
554         struct ptlrpc_client *cl;
555         struct ptlrpc_connection *connection;
556         struct ptlrpc_request *request = NULL;
557         struct ptlrpc_bulk_desc *desc = NULL;
558         struct osc_obd *osc = &class_conn2obd(conn)->u.osc;
559         struct ost_body *body;
560         struct niobuf_local *local = NULL;
561         struct niobuf_remote *remote;
562         struct osc_brw_cb_data *cb_data = NULL;
563         int rc, j, size[3] = {sizeof(*body)};
564         void *iooptr, *nioptr;
565         ENTRY;
566
567         size[1] = sizeof(struct obd_ioobj);
568         size[2] = page_count * sizeof(*remote);
569
570         osc_con2cl(conn, &cl, &connection);
571         request = ptlrpc_prep_req2(cl, connection, &osc->osc_connh,
572                                    OST_BRW, 3, size, NULL);
573         if (!request)
574                 RETURN(-ENOMEM);
575
576         body = lustre_msg_buf(request->rq_reqmsg, 0);
577         body->data = OBD_BRW_WRITE;
578
579         OBD_ALLOC(local, page_count * sizeof(*local));
580         if (!local)
581                 GOTO(out_free, rc = -ENOMEM);
582
583         desc = ptlrpc_prep_bulk(connection);
584         if (!desc)
585                 GOTO(out_free, rc = -ENOMEM);
586         desc->b_portal = OSC_BULK_PORTAL;
587         desc->b_cb = brw_finish;
588         OBD_ALLOC(cb_data, sizeof(*cb_data));
589         if (!cb_data)
590                 GOTO(out_free, rc = -ENOMEM);
591         cb_data->callback = callback;
592         desc->b_cb_data = cb_data;
593         /* XXX end almost identical to brw_read case */
594         cb_data->obd_data = local;
595         cb_data->obd_size = page_count * sizeof(*local);
596
597         iooptr = lustre_msg_buf(request->rq_reqmsg, 1);
598         nioptr = lustre_msg_buf(request->rq_reqmsg, 2);
599         ost_pack_ioo(&iooptr, md, page_count);
600         for (j = 0; j < page_count; j++) {
601                 local[j].addr = kmap(pagearray[j]);
602                 local[j].offset = offset[j];
603                 local[j].len = count[j];
604                 ost_pack_niobuf(&nioptr, offset[j], count[j], flags[j], 0);
605         }
606
607         size[1] = page_count * sizeof(struct niobuf_remote);
608         request->rq_replen = lustre_msg_size(2, size);
609         rc = ptlrpc_queue_wait(request);
610         rc = ptlrpc_check_status(request, rc);
611         if (rc)
612                 GOTO(out_unmap, rc);
613
614         nioptr = lustre_msg_buf(request->rq_repmsg, 1);
615         if (!nioptr)
616                 GOTO(out_unmap, rc = -EINVAL);
617
618         if (request->rq_repmsg->buflens[1] !=
619             page_count * sizeof(struct niobuf_remote)) {
620                 CERROR("buffer length wrong (%d vs. %d)\n",
621                        request->rq_repmsg->buflens[1],
622                        page_count * sizeof(struct niobuf_remote));
623                 GOTO(out_unmap, rc = -EINVAL);
624         }
625
626         for (j = 0; j < page_count; j++) {
627                 struct ptlrpc_bulk_page *page;
628
629                 ost_unpack_niobuf(&nioptr, &remote);
630
631                 page = ptlrpc_prep_bulk_page(desc);
632                 if (!page)
633                         GOTO(out_unmap, rc = -ENOMEM);
634
635                 page->b_buf = (void *)(unsigned long)local[j].addr;
636                 page->b_buflen = local[j].len;
637                 page->b_xid = remote->xid;
638         }
639
640         if (desc->b_page_count != page_count)
641                 LBUG();
642
643         /*
644          * One is released when the bulk is complete, the other when we finish
645          * waiting on it.  (Callback cases don't sleep, so only one ref for
646          * them.)
647          */
648         atomic_set(&desc->b_refcount, callback ? 1 : 2);
649         CDEBUG(D_PAGE, "Set refcount of %p to %d\n", desc,
650                atomic_read(&desc->b_refcount));
651         rc = ptlrpc_send_bulk(desc);
652         if (rc)
653                 GOTO(out_unmap, rc);
654
655         /* Callbacks cause asynchronous handling. */
656         if (callback)
657                 RETURN(0);
658
659         /* If there's no callback function, sleep here until complete. */
660         l_wait_event_killable(desc->b_waitq, ptlrpc_check_bulk_sent(desc));
661         ptlrpc_bulk_decref(desc);
662         if (desc->b_flags & PTL_RPC_FL_INTR)
663                 RETURN(-EINTR);
664         RETURN(0);
665
666         /* Clean up on error. */
667  out_unmap:
668         for (j = 0; j < page_count; j++)
669                 kunmap(pagearray[j]);
670
671  out_free:
672         if (cb_data)
673                 OBD_FREE(cb_data, sizeof(*cb_data));
674         if (local)
675                 OBD_FREE(local, page_count * sizeof(*local));
676         ptlrpc_free_bulk(desc);
677         ptlrpc_req_finished(request);
678         return rc;
679 }
680
681 static int osc_brw(int cmd, struct lustre_handle *conn,
682                    struct lov_stripe_md *md, obd_count page_count,
683                    struct page **page_array,
684                    obd_size *count,
685                    obd_off *offset,
686                    obd_flag *flags,
687                    void *callback)
688 {
689         if (cmd & OBD_BRW_WRITE)
690                 return osc_brw_write(conn, md, page_count, page_array, count,
691                                      offset, flags, (bulk_callback_t)callback);
692         else
693                 return osc_brw_read(conn, md, page_count, page_array, count,
694                                     offset, flags, (bulk_callback_t)callback);
695 }
696
697 static int osc_enqueue(struct lustre_handle *oconn,
698                        struct lustre_handle *parent_lock, __u64 *res_id,
699                        __u32 type, void *extentp, int extent_len, __u32 mode,
700                        int *flags, void *callback, void *data, int datalen,
701                        struct lustre_handle *lockh)
702 {
703         struct obd_device *obddev = class_conn2obd(oconn);
704         struct osc_obd *osc = &obddev->u.osc;
705         struct ptlrpc_connection *conn;
706         struct ptlrpc_client *cl;
707         struct ldlm_extent *extent = extentp;
708         int rc;
709         __u32 mode2;
710
711         /* Filesystem locks are given a bit of special treatment: first we
712          * fixup the lock to start and end on page boundaries. */
713         extent->start &= PAGE_MASK;
714         extent->end = (extent->end + PAGE_SIZE - 1) & PAGE_MASK;
715
716         /* Next, search for already existing extent locks that will cover us */
717         osc_con2dlmcl(oconn, &cl, &conn);
718         rc = ldlm_lock_match(obddev->obd_namespace, res_id, type, extent,
719                              sizeof(extent), mode, lockh);
720         if (rc == 1) {
721                 /* We already have a lock, and it's referenced */
722                 return 0;
723         }
724
725         /* Next, search for locks that we can upgrade (if we're trying to write)
726          * or are more than we need (if we're trying to read).  Because the VFS
727          * and page cache already protect us locally, lots of readers/writers
728          * can share a single PW lock. */
729         if (mode == LCK_PW)
730                 mode2 = LCK_PR;
731         else
732                 mode2 = LCK_PW;
733
734         rc = ldlm_lock_match(obddev->obd_namespace, res_id, type, extent,
735                              sizeof(extent), mode2, lockh);
736         if (rc == 1) {
737                 int flags;
738                 /* FIXME: This is not incredibly elegant, but it might
739                  * be more elegant than adding another parameter to
740                  * lock_match.  I want a second opinion. */
741                 ldlm_lock_addref(lockh, mode);
742                 ldlm_lock_decref(lockh, mode2);
743
744                 if (mode == LCK_PR)
745                         return 0;
746
747                 rc = ldlm_cli_convert(cl, lockh, &osc->osc_connh,
748                                       mode, &flags);
749                 if (rc)
750                         LBUG();
751
752                 return rc;
753         }
754
755         rc = ldlm_cli_enqueue(cl, conn, &osc->osc_connh,
756                               NULL, obddev->obd_namespace,
757                               parent_lock, res_id, type, extent, sizeof(extent),
758                               mode, flags, callback, data, datalen, lockh);
759         return rc;
760 }
761
762 static int osc_cancel(struct lustre_handle *oconn, __u32 mode,
763                       struct lustre_handle *lockh)
764 {
765         ENTRY;
766
767         ldlm_lock_decref(lockh, mode);
768
769         RETURN(0);
770 }
771
772 static int osc_setup(struct obd_device *obddev, obd_count len, void *buf)
773 {
774         struct obd_ioctl_data* data = buf;
775         struct osc_obd *osc = &obddev->u.osc;
776         char server_uuid[37];
777         int rc;
778         ENTRY;
779
780         if (data->ioc_inllen1 < 1) {
781                 CERROR("osc setup requires a TARGET UUID\n");
782                 RETURN(-EINVAL);
783         }
784
785         if (data->ioc_inllen1 > 37) {
786                 CERROR("osc TARGET UUID must be less than 38 characters\n");
787                 RETURN(-EINVAL);
788         }
789
790         if (data->ioc_inllen2 < 1) {
791                 CERROR("osc setup requires a SERVER UUID\n");
792                 RETURN(-EINVAL);
793         }
794
795         if (data->ioc_inllen2 > 37) {
796                 CERROR("osc SERVER UUID must be less than 38 characters\n");
797                 RETURN(-EINVAL);
798         }
799
800         memcpy(osc->osc_target_uuid, data->ioc_inlbuf1, data->ioc_inllen1);
801         memcpy(server_uuid, data->ioc_inlbuf2, MIN(data->ioc_inllen2,
802                                                    sizeof(server_uuid)));
803
804         osc->osc_conn = ptlrpc_uuid_to_connection(server_uuid);
805         if (!osc->osc_conn)
806                 RETURN(-ENOENT);
807
808         obddev->obd_namespace =
809                 ldlm_namespace_new("osc", LDLM_NAMESPACE_CLIENT);
810         if (obddev->obd_namespace == NULL)
811                 GOTO(out_conn, rc = -ENOMEM);
812
813         OBD_ALLOC(osc->osc_client, sizeof(*osc->osc_client));
814         if (osc->osc_client == NULL)
815                 GOTO(out_ns, rc = -ENOMEM);
816
817         OBD_ALLOC(osc->osc_ldlm_client, sizeof(*osc->osc_ldlm_client));
818         if (osc->osc_ldlm_client == NULL)
819                 GOTO(out_client, rc = -ENOMEM);
820
821         ptlrpc_init_client(NULL, NULL, OST_REQUEST_PORTAL, OSC_REPLY_PORTAL,
822                            osc->osc_client);
823         ptlrpc_init_client(NULL, NULL, LDLM_REQUEST_PORTAL, LDLM_REPLY_PORTAL,
824                            osc->osc_ldlm_client);
825         osc->osc_client->cli_name = "osc";
826         osc->osc_ldlm_client->cli_name = "ldlm";
827
828         MOD_INC_USE_COUNT;
829         RETURN(0);
830
831  out_client:
832         OBD_FREE(osc->osc_client, sizeof(*osc->osc_client));
833  out_ns:
834         ldlm_namespace_free(obddev->obd_namespace);
835  out_conn:
836         ptlrpc_put_connection(osc->osc_conn);
837         return rc;
838 }
839
840 static int osc_cleanup(struct obd_device * obddev)
841 {
842         struct osc_obd *osc = &obddev->u.osc;
843
844         ldlm_namespace_free(obddev->obd_namespace);
845
846         ptlrpc_cleanup_client(osc->osc_client);
847         OBD_FREE(osc->osc_client, sizeof(*osc->osc_client));
848         ptlrpc_cleanup_client(osc->osc_ldlm_client);
849         OBD_FREE(osc->osc_ldlm_client, sizeof(*osc->osc_ldlm_client));
850         ptlrpc_put_connection(osc->osc_conn);
851
852         MOD_DEC_USE_COUNT;
853         return 0;
854 }
855
856 static int osc_statfs(struct lustre_handle *conn, struct statfs *sfs)
857 {
858         struct ptlrpc_request *request;
859         struct ptlrpc_client *cl;
860         struct osc_obd *osc = &class_conn2obd(conn)->u.osc;
861         struct ptlrpc_connection *connection;
862         struct obd_statfs *osfs;
863         int rc, size = sizeof(*osfs);
864         ENTRY;
865
866         osc_con2cl(conn, &cl, &connection);
867         request = ptlrpc_prep_req2(cl, connection, &osc->osc_connh,
868                                    OST_STATFS, 0, NULL, NULL);
869         if (!request)
870                 RETURN(-ENOMEM);
871
872         request->rq_replen = lustre_msg_size(1, &size);
873
874         rc = ptlrpc_queue_wait(request);
875         rc = ptlrpc_check_status(request, rc);
876         if (rc) {
877                 CERROR("%s failed: rc = %d\n", __FUNCTION__, rc);
878                 GOTO(out, rc);
879         }
880
881         osfs = lustre_msg_buf(request->rq_repmsg, 0);
882         obd_statfs_unpack(osfs, sfs);
883
884         EXIT;
885  out:
886         ptlrpc_free_req(request);
887         return rc;
888 }
889
890 struct obd_ops osc_obd_ops = {
891         o_setup:        osc_setup,
892         o_cleanup:      osc_cleanup,
893         o_statfs:       osc_statfs,
894         o_create:       osc_create,
895         o_destroy:      osc_destroy,
896         o_getattr:      osc_getattr,
897         o_setattr:      osc_setattr,
898         o_open:         osc_open,
899         o_close:        osc_close,
900         o_connect:      osc_connect,
901         o_disconnect:   osc_disconnect,
902         o_brw:          osc_brw,
903         o_punch:        osc_punch,
904         o_enqueue:      osc_enqueue,
905         o_cancel:       osc_cancel
906 };
907
908 static int __init osc_init(void)
909 {
910         return class_register_type(&osc_obd_ops, LUSTRE_OSC_NAME);
911 }
912
913 static void __exit osc_exit(void)
914 {
915         class_unregister_type(LUSTRE_OSC_NAME);
916 }
917
918 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
919 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC) v1.0");
920 MODULE_LICENSE("GPL");
921
922 module_init(osc_init);
923 module_exit(osc_exit);