Whamcloud - gitweb
Fix problem with multiple connections to the same local device.
[fs/lustre-release.git] / lustre / osc / osc_request.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Copyright (C) 2001, 2002 Cluster File Systems, Inc.
5  *
6  *  This code is issued under the GNU General Public License.
7  *  See the file COPYING in this distribution
8  *
9  *  Author Peter Braam <braam@clusterfs.com>
10  *
11  *  This server is single threaded at present (but can easily be multi
12  *  threaded). For testing and management it is treated as an
13  *  obd_device, although it does not export a full OBD method table
14  *  (the requests are coming in over the wire, so object target
15  *  modules do not have a full method table.)
16  *
17  */
18
19 #define EXPORT_SYMTAB
20 #define DEBUG_SUBSYSTEM S_OSC
21
22 #include <linux/module.h>
23 #include <linux/lustre_dlm.h>
24 #include <linux/lustre_mds.h> /* for mds_objid */
25 #include <linux/obd_ost.h>
26 #include <linux/obd_lov.h>
27
28 static void osc_con2cl(struct lustre_handle *conn, struct ptlrpc_client **cl,
29                        struct ptlrpc_connection **connection,
30                        struct lustre_handle **rconn)
31 {
32         struct obd_export *export = class_conn2export(conn);
33         struct osc_obd *osc = &export->export_obd->u.osc;
34         *cl = osc->osc_client;
35         *connection = osc->osc_conn;
36         *rconn = &export->export_import;
37 }
38
39 static void osc_con2dlmcl(struct lustre_handle *conn, struct ptlrpc_client **cl,
40                           struct ptlrpc_connection **connection,
41                           struct lustre_handle **rconn)
42 {
43         struct osc_obd *osc = &class_conn2obd(conn)->u.osc;
44         *cl = osc->osc_ldlm_client;
45         *connection = osc->osc_conn;
46 }
47
48 static int osc_connect(struct lustre_handle *conn, struct obd_device *obd)
49 {
50         struct osc_obd *osc = &obd->u.osc;
51         struct obd_import *import;
52         struct ptlrpc_request *request;
53         char *tmp = osc->osc_target_uuid;
54         int rc, size = sizeof(osc->osc_target_uuid);
55         ENTRY;
56
57         OBD_ALLOC(import, sizeof(*import));
58         if (!import)
59                 RETURN(-ENOMEM);
60
61         MOD_INC_USE_COUNT;
62         rc = class_connect(conn, obd);
63         if (rc)
64                 RETURN(rc);
65
66         request = ptlrpc_prep_req(osc->osc_client, osc->osc_conn,
67                                   OST_CONNECT, 1, &size, &tmp);
68         if (!request)
69                 GOTO(out_disco, rc = -ENOMEM);
70
71         request->rq_level = LUSTRE_CONN_NEW;
72         request->rq_replen = lustre_msg_size(0, NULL);
73         request->rq_reqmsg->addr = -1;
74         /* Sending our local connection info breaks for local connections
75         request->rq_reqmsg->addr = conn->addr;
76         request->rq_reqmsg->cookie = conn->cookie;
77          */
78
79         rc = ptlrpc_queue_wait(request);
80         rc = ptlrpc_check_status(request, rc);
81         if (rc) {
82                 CERROR("%s failed: rc = %d\n", __FUNCTION__, rc);
83                 GOTO(out, rc);
84         }
85
86         /* XXX eventually maybe more refinement */
87         osc->osc_conn->c_level = LUSTRE_CONN_FULL;
88
89         class_import2export(conn, (struct lustre_handle *)request->rq_repmsg);
90
91         EXIT;
92  out:
93         ptlrpc_free_req(request);
94  out_disco:
95         if (rc) {
96                 class_disconnect(conn);
97                 MOD_DEC_USE_COUNT;
98         }
99         return rc;
100 }
101
102 static int osc_disconnect(struct lustre_handle *conn)
103 {
104         struct ptlrpc_request *request;
105         struct ptlrpc_client *cl;
106         struct ptlrpc_connection *connection;
107         struct lustre_handle *rconn;
108         int rc;
109         ENTRY;
110
111         osc_con2cl(conn, &cl, &connection, &rconn);
112         request = ptlrpc_prep_req2(cl, connection, rconn,
113                                    OST_DISCONNECT, 0, NULL, NULL);
114         if (!request)
115                 RETURN(-ENOMEM);
116         request->rq_replen = lustre_msg_size(0, NULL);
117
118         rc = ptlrpc_queue_wait(request);
119         if (rc)
120                 GOTO(out, rc);
121         rc = class_disconnect(conn);
122         if (!rc)
123                 MOD_DEC_USE_COUNT;
124
125  out:
126         ptlrpc_free_req(request);
127         return rc;
128 }
129
130 static int osc_getattr(struct lustre_handle *conn, struct obdo *oa)
131 {
132         struct ptlrpc_request *request;
133         struct ptlrpc_client *cl;
134         struct ptlrpc_connection *connection;
135         struct lustre_handle *rconn;
136         struct ost_body *body;
137         int rc, size = sizeof(*body);
138         ENTRY;
139
140         osc_con2cl(conn, &cl, &connection, &rconn);
141         request = ptlrpc_prep_req2(cl, connection, rconn,
142                                    OST_GETATTR, 1, &size, NULL);
143         if (!request)
144                 RETURN(-ENOMEM);
145
146         body = lustre_msg_buf(request->rq_reqmsg, 0);
147         memcpy(&body->oa, oa, sizeof(*oa));
148         body->oa.o_valid = ~0;
149
150         request->rq_replen = lustre_msg_size(1, &size);
151
152         rc = ptlrpc_queue_wait(request);
153         rc = ptlrpc_check_status(request, rc);
154         if (rc) {
155                 CERROR("%s failed: rc = %d\n", __FUNCTION__, rc);
156                 GOTO(out, rc);
157         }
158
159         body = lustre_msg_buf(request->rq_repmsg, 0);
160         CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
161         if (oa)
162                 memcpy(oa, &body->oa, sizeof(*oa));
163
164         EXIT;
165  out:
166         ptlrpc_free_req(request);
167         return rc;
168 }
169
170 static int osc_open(struct lustre_handle *conn, struct obdo *oa,
171                     struct lov_stripe_md *md)
172 {
173         struct ptlrpc_request *request;
174         struct ptlrpc_client *cl;
175         struct ptlrpc_connection *connection;
176         struct lustre_handle *rconn;
177         struct ost_body *body;
178         int rc, size = sizeof(*body);
179         ENTRY;
180
181         osc_con2cl(conn, &cl, &connection, &rconn);
182         request = ptlrpc_prep_req2(cl, connection, rconn,
183                                    OST_OPEN, 1, &size, NULL);
184         if (!request)
185                 RETURN(-ENOMEM);
186
187         body = lustre_msg_buf(request->rq_reqmsg, 0);
188         memcpy(&body->oa, oa, sizeof(*oa));
189         body->oa.o_valid = (OBD_MD_FLMODE | OBD_MD_FLID);
190
191         request->rq_replen = lustre_msg_size(1, &size);
192
193         rc = ptlrpc_queue_wait(request);
194         rc = ptlrpc_check_status(request, rc);
195         if (rc)
196                 GOTO(out, rc);
197
198         body = lustre_msg_buf(request->rq_repmsg, 0);
199         CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
200         if (oa)
201                 memcpy(oa, &body->oa, sizeof(*oa));
202
203         EXIT;
204  out:
205         ptlrpc_free_req(request);
206         return rc;
207 }
208
209 static int osc_close(struct lustre_handle *conn, struct obdo *oa,
210                      struct lov_stripe_md *md)
211 {
212         struct ptlrpc_request *request;
213         struct ptlrpc_client *cl;
214         struct ptlrpc_connection *connection;
215         struct lustre_handle *rconn;
216         struct ost_body *body;
217         int rc, size = sizeof(*body);
218         ENTRY;
219
220         osc_con2cl(conn, &cl, &connection, &rconn);
221         request = ptlrpc_prep_req2(cl, connection, rconn,
222                                    OST_CLOSE, 1, &size, NULL);
223         if (!request)
224                 RETURN(-ENOMEM);
225
226         oa->o_id = md->lmd_object_id;
227         oa->o_mode = S_IFREG;
228         oa->o_valid = (OBD_MD_FLMODE | OBD_MD_FLID);
229         body = lustre_msg_buf(request->rq_reqmsg, 0);
230         memcpy(&body->oa, oa, sizeof(*oa));
231
232         request->rq_replen = lustre_msg_size(1, &size);
233
234         rc = ptlrpc_queue_wait(request);
235         rc = ptlrpc_check_status(request, rc);
236         if (rc)
237                 GOTO(out, rc);
238
239         body = lustre_msg_buf(request->rq_repmsg, 0);
240         CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
241         if (oa)
242                 memcpy(oa, &body->oa, sizeof(*oa));
243
244         EXIT;
245  out:
246         ptlrpc_free_req(request);
247         return rc;
248 }
249
250 static int osc_setattr(struct lustre_handle *conn, struct obdo *oa)
251 {
252         struct ptlrpc_request *request;
253         struct ptlrpc_client *cl;
254         struct ptlrpc_connection *connection;
255         struct lustre_handle *rconn;
256         struct ost_body *body;
257         int rc, size = sizeof(*body);
258         ENTRY;
259
260         osc_con2cl(conn, &cl, &connection, &rconn);
261         request = ptlrpc_prep_req2(cl, connection, rconn,
262                                   OST_SETATTR, 1, &size, NULL);
263         if (!request)
264                 RETURN(-ENOMEM);
265
266         body = lustre_msg_buf(request->rq_reqmsg, 0);
267         memcpy(&body->oa, oa, sizeof(*oa));
268
269         request->rq_replen = lustre_msg_size(1, &size);
270
271         rc = ptlrpc_queue_wait(request);
272         rc = ptlrpc_check_status(request, rc);
273         GOTO(out, rc);
274
275  out:
276         ptlrpc_free_req(request);
277         return rc;
278 }
279
280 static int osc_create(struct lustre_handle *conn, struct obdo *oa,
281                       struct lov_stripe_md **ea)
282 {
283         struct ptlrpc_request *request;
284         struct ptlrpc_client *cl;
285         struct ptlrpc_connection *connection;
286         struct lustre_handle *rconn;
287         struct ost_body *body;
288         int rc, size = sizeof(*body);
289         ENTRY;
290
291         if (!oa) {
292                 CERROR("oa NULL\n");
293                 RETURN(-EINVAL);
294         }
295
296         if (!ea) {
297                 LBUG();
298         }
299
300         if (!*ea) {
301                 OBD_ALLOC(*ea, oa->o_easize);
302                 if (!*ea)
303                         RETURN(-ENOMEM);
304                 (*ea)->lmd_size = oa->o_easize;
305         }
306
307         osc_con2cl(conn, &cl, &connection, &rconn);
308         request = ptlrpc_prep_req2(cl, connection, rconn,
309                                   OST_CREATE, 1, &size, NULL);
310         if (!request)
311                 RETURN(-ENOMEM);
312
313         body = lustre_msg_buf(request->rq_reqmsg, 0);
314         memcpy(&body->oa, oa, sizeof(*oa));
315
316         request->rq_replen = lustre_msg_size(1, &size);
317
318         rc = ptlrpc_queue_wait(request);
319         rc = ptlrpc_check_status(request, rc);
320         if (rc)
321                 GOTO(out, rc);
322
323         body = lustre_msg_buf(request->rq_repmsg, 0);
324         memcpy(oa, &body->oa, sizeof(*oa));
325
326         (*ea)->lmd_object_id = oa->o_id;
327         (*ea)->lmd_stripe_count = 1;
328         EXIT;
329  out:
330         ptlrpc_free_req(request);
331         return rc;
332 }
333
334 static int osc_punch(struct lustre_handle *conn, struct obdo *oa,
335                      struct lov_stripe_md *md, obd_size count,
336                      obd_off offset)
337 {
338         struct ptlrpc_request *request;
339         struct ptlrpc_client *cl;
340         struct ptlrpc_connection *connection;
341         struct lustre_handle *rconn;
342         struct ost_body *body;
343         int rc, size = sizeof(*body);
344         ENTRY;
345
346         if (!oa) {
347                 CERROR("oa NULL\n");
348                 RETURN(-EINVAL);
349         }
350         osc_con2cl(conn, &cl, &connection, &rconn);
351         request = ptlrpc_prep_req2(cl, connection, rconn,
352                                    OST_PUNCH, 1, &size, NULL);
353         if (!request)
354                 RETURN(-ENOMEM);
355
356         body = lustre_msg_buf(request->rq_reqmsg, 0);
357         memcpy(&body->oa, oa, sizeof(*oa));
358         body->oa.o_blocks = count;
359         body->oa.o_valid |= OBD_MD_FLBLOCKS;
360
361         request->rq_replen = lustre_msg_size(1, &size);
362
363         rc = ptlrpc_queue_wait(request);
364         rc = ptlrpc_check_status(request, rc);
365         if (rc)
366                 GOTO(out, rc);
367
368         body = lustre_msg_buf(request->rq_repmsg, 0);
369         memcpy(oa, &body->oa, sizeof(*oa));
370
371         EXIT;
372  out:
373         ptlrpc_free_req(request);
374         return rc;
375 }
376
377 static int osc_destroy(struct lustre_handle *conn, struct obdo *oa,
378                        struct lov_stripe_md *ea)
379 {
380         struct ptlrpc_request *request;
381         struct ptlrpc_client *cl;
382         struct ptlrpc_connection *connection;
383         struct lustre_handle *rconn;
384         struct ost_body *body;
385         int rc, size = sizeof(*body);
386         ENTRY;
387
388         if (!oa) {
389                 CERROR("oa NULL\n");
390                 RETURN(-EINVAL);
391         }
392         osc_con2cl(conn, &cl, &connection, &rconn);
393         request = ptlrpc_prep_req2(cl, connection, rconn,
394                                    OST_DESTROY, 1, &size, NULL);
395         if (!request)
396                 RETURN(-ENOMEM);
397
398         body = lustre_msg_buf(request->rq_reqmsg, 0);
399         memcpy(&body->oa, oa, sizeof(*oa));
400         body->oa.o_valid = ~0;
401
402         request->rq_replen = lustre_msg_size(1, &size);
403
404         rc = ptlrpc_queue_wait(request);
405         rc = ptlrpc_check_status(request, rc);
406         if (rc)
407                 GOTO(out, rc);
408
409         body = lustre_msg_buf(request->rq_repmsg, 0);
410         memcpy(oa, &body->oa, sizeof(*oa));
411
412         EXIT;
413  out:
414         ptlrpc_free_req(request);
415         return rc;
416 }
417
418 struct osc_brw_cb_data {
419         bulk_callback_t callback;
420         void *cb_data;
421         void *obd_data;
422         size_t obd_size;
423 };
424
425 static void brw_finish(struct ptlrpc_bulk_desc *desc, void *data)
426 {
427         struct list_head *tmp, *next;
428         struct osc_brw_cb_data *cb_data = data;
429         ENTRY;
430
431         if (desc->b_flags & PTL_RPC_FL_INTR)
432                 CERROR("got signal\n");
433
434         /* This feels wrong to me. */
435         list_for_each_safe(tmp, next, &desc->b_page_list) {
436                 struct ptlrpc_bulk_page *bulk;
437                 bulk = list_entry(tmp, struct ptlrpc_bulk_page, b_link);
438
439                 kunmap(bulk->b_page);
440         }
441
442         if (cb_data->callback)
443                 (cb_data->callback)(desc, cb_data->cb_data);
444
445         ptlrpc_bulk_decref(desc);
446         if (cb_data->obd_data)
447                 OBD_FREE(cb_data->obd_data, cb_data->obd_size);
448         OBD_FREE(cb_data, sizeof(*cb_data));
449         EXIT;
450 }
451
452 static int osc_brw_read(struct lustre_handle *conn, struct lov_stripe_md *md,
453                         obd_count page_count, struct page **page_array,
454                         obd_size *count, obd_off *offset, obd_flag *flags,
455                         bulk_callback_t callback)
456 {
457         struct ptlrpc_client *cl;
458         struct ptlrpc_connection *connection;
459         struct lustre_handle *rconn;
460         struct ptlrpc_request *request = NULL;
461         struct ptlrpc_bulk_desc *desc = NULL;
462         struct ost_body *body;
463         int rc, j, size[3] = {sizeof(*body)};
464         void *iooptr, *nioptr;
465         struct osc_brw_cb_data *cb_data = NULL;
466         ENTRY;
467
468         size[1] = sizeof(struct obd_ioobj);
469         size[2] = page_count * sizeof(struct niobuf_remote);
470
471         osc_con2cl(conn, &cl, &connection, &rconn);
472         request = ptlrpc_prep_req2(cl, connection, rconn,
473                                    OST_BRW, 3, size, NULL);
474         if (!request)
475                 RETURN(-ENOMEM);
476
477         body = lustre_msg_buf(request->rq_reqmsg, 0);
478         body->data = OBD_BRW_READ;
479
480         desc = ptlrpc_prep_bulk(connection);
481         if (!desc)
482                 GOTO(out_free, rc = -ENOMEM);
483         desc->b_portal = OST_BULK_PORTAL;
484         desc->b_cb = brw_finish;
485         OBD_ALLOC(cb_data, sizeof(*cb_data));
486         if (!cb_data)
487                 GOTO(out_free, rc = -ENOMEM);
488         cb_data->callback = callback;
489         desc->b_cb_data = cb_data;
490         /* XXX end almost identical to brw_write case */
491
492         iooptr = lustre_msg_buf(request->rq_reqmsg, 1);
493         nioptr = lustre_msg_buf(request->rq_reqmsg, 2);
494         ost_pack_ioo(&iooptr, md, page_count);
495         for (j = 0; j < page_count; j++) {
496                 struct ptlrpc_bulk_page *bulk;
497                 bulk = ptlrpc_prep_bulk_page(desc);
498                 if (bulk == NULL)
499                         GOTO(out_unmap, rc = -ENOMEM);
500
501                 spin_lock(&connection->c_lock);
502                 bulk->b_xid = ++connection->c_xid_out;
503                 spin_unlock(&connection->c_lock);
504
505                 bulk->b_buf = kmap(page_array[j]);
506                 bulk->b_page = page_array[j];
507                 bulk->b_buflen = PAGE_SIZE;
508                 ost_pack_niobuf(&nioptr, offset[j], count[j],
509                                 flags[j], bulk->b_xid);
510         }
511
512         /*
513          * Register the bulk first, because the reply could arrive out of order,
514          * and we want to be ready for the bulk data.
515          *
516          * One reference is released by the bulk callback, the other when
517          * we finish sleeping on it (if we don't have a callback).
518          */
519         atomic_set(&desc->b_refcount, callback ? 1 : 2);
520         rc = ptlrpc_register_bulk(desc);
521         if (rc)
522                 GOTO(out_unmap, rc);
523
524         request->rq_replen = lustre_msg_size(1, size);
525         rc = ptlrpc_queue_wait(request);
526         rc = ptlrpc_check_status(request, rc);
527         if (rc) {
528                 ptlrpc_bulk_decref(desc);
529                 GOTO(out_unmap, rc);
530         }
531
532         /* Callbacks cause asynchronous handling. */
533         if (callback)
534                 RETURN(0);
535
536         l_wait_event_killable(desc->b_waitq, ptlrpc_check_bulk_received(desc));
537         ptlrpc_bulk_decref(desc);
538         if (desc->b_flags & PTL_RPC_FL_INTR)
539                 RETURN(-EINTR);
540
541         RETURN(0);
542
543         /* Clean up on error. */
544  out_unmap:
545         for (j = 0; j < desc->b_page_count; j++)
546                 kunmap(page_array[j]);
547  out_free:
548         if (cb_data)
549                 OBD_FREE(cb_data, sizeof(*cb_data));
550         ptlrpc_free_bulk(desc);
551         ptlrpc_free_req(request);
552         return rc;
553 }
554
555 static int osc_brw_write(struct lustre_handle *conn,
556                          struct lov_stripe_md *md, obd_count page_count,
557                          struct page **pagearray, obd_size *count,
558                          obd_off *offset, obd_flag *flags,
559                          bulk_callback_t callback)
560 {
561         struct ptlrpc_client *cl;
562         struct ptlrpc_connection *connection;
563         struct lustre_handle *rconn;
564         struct ptlrpc_request *request = NULL;
565         struct ptlrpc_bulk_desc *desc = NULL;
566         struct ost_body *body;
567         struct niobuf_local *local = NULL;
568         struct niobuf_remote *remote;
569         struct osc_brw_cb_data *cb_data = NULL;
570         int rc, j, size[3] = {sizeof(*body)};
571         void *iooptr, *nioptr;
572         ENTRY;
573
574         size[1] = sizeof(struct obd_ioobj);
575         size[2] = page_count * sizeof(*remote);
576
577         osc_con2cl(conn, &cl, &connection, &rconn);
578         request = ptlrpc_prep_req2(cl, connection, rconn,
579                                    OST_BRW, 3, size, NULL);
580         if (!request)
581                 RETURN(-ENOMEM);
582
583         body = lustre_msg_buf(request->rq_reqmsg, 0);
584         body->data = OBD_BRW_WRITE;
585
586         OBD_ALLOC(local, page_count * sizeof(*local));
587         if (!local)
588                 GOTO(out_free, rc = -ENOMEM);
589
590         desc = ptlrpc_prep_bulk(connection);
591         if (!desc)
592                 GOTO(out_free, rc = -ENOMEM);
593         desc->b_portal = OSC_BULK_PORTAL;
594         desc->b_cb = brw_finish;
595         OBD_ALLOC(cb_data, sizeof(*cb_data));
596         if (!cb_data)
597                 GOTO(out_free, rc = -ENOMEM);
598         cb_data->callback = callback;
599         desc->b_cb_data = cb_data;
600         /* XXX end almost identical to brw_read case */
601         cb_data->obd_data = local;
602         cb_data->obd_size = page_count * sizeof(*local);
603
604         iooptr = lustre_msg_buf(request->rq_reqmsg, 1);
605         nioptr = lustre_msg_buf(request->rq_reqmsg, 2);
606         ost_pack_ioo(&iooptr, md, page_count);
607         for (j = 0; j < page_count; j++) {
608                 local[j].addr = kmap(pagearray[j]);
609                 local[j].offset = offset[j];
610                 local[j].len = count[j];
611                 ost_pack_niobuf(&nioptr, offset[j], count[j], flags[j], 0);
612         }
613
614         size[1] = page_count * sizeof(struct niobuf_remote);
615         request->rq_replen = lustre_msg_size(2, size);
616         rc = ptlrpc_queue_wait(request);
617         rc = ptlrpc_check_status(request, rc);
618         if (rc)
619                 GOTO(out_unmap, rc);
620
621         nioptr = lustre_msg_buf(request->rq_repmsg, 1);
622         if (!nioptr)
623                 GOTO(out_unmap, rc = -EINVAL);
624
625         if (request->rq_repmsg->buflens[1] !=
626             page_count * sizeof(struct niobuf_remote)) {
627                 CERROR("buffer length wrong (%d vs. %d)\n",
628                        request->rq_repmsg->buflens[1],
629                        page_count * sizeof(struct niobuf_remote));
630                 GOTO(out_unmap, rc = -EINVAL);
631         }
632
633         for (j = 0; j < page_count; j++) {
634                 struct ptlrpc_bulk_page *page;
635
636                 ost_unpack_niobuf(&nioptr, &remote);
637
638                 page = ptlrpc_prep_bulk_page(desc);
639                 if (!page)
640                         GOTO(out_unmap, rc = -ENOMEM);
641
642                 page->b_buf = (void *)(unsigned long)local[j].addr;
643                 page->b_buflen = local[j].len;
644                 page->b_xid = remote->xid;
645         }
646
647         if (desc->b_page_count != page_count)
648                 LBUG();
649
650         /*
651          * One is released when the bulk is complete, the other when we finish
652          * waiting on it.  (Callback cases don't sleep, so only one ref for
653          * them.)
654          */
655         atomic_set(&desc->b_refcount, callback ? 1 : 2);
656         CDEBUG(D_PAGE, "Set refcount of %p to %d\n", desc,
657                atomic_read(&desc->b_refcount));
658         rc = ptlrpc_send_bulk(desc);
659         if (rc)
660                 GOTO(out_unmap, rc);
661
662         /* Callbacks cause asynchronous handling. */
663         if (callback)
664                 RETURN(0);
665
666         /* If there's no callback function, sleep here until complete. */
667         l_wait_event_killable(desc->b_waitq, ptlrpc_check_bulk_sent(desc));
668         ptlrpc_bulk_decref(desc);
669         if (desc->b_flags & PTL_RPC_FL_INTR)
670                 RETURN(-EINTR);
671         RETURN(0);
672
673         /* Clean up on error. */
674  out_unmap:
675         for (j = 0; j < page_count; j++)
676                 kunmap(pagearray[j]);
677
678  out_free:
679         if (cb_data)
680                 OBD_FREE(cb_data, sizeof(*cb_data));
681         if (local)
682                 OBD_FREE(local, page_count * sizeof(*local));
683         ptlrpc_free_bulk(desc);
684         ptlrpc_req_finished(request);
685         return rc;
686 }
687
688 static int osc_brw(int cmd, struct lustre_handle *conn,
689                    struct lov_stripe_md *md, obd_count page_count,
690                    struct page **page_array,
691                    obd_size *count,
692                    obd_off *offset,
693                    obd_flag *flags,
694                    void *callback)
695 {
696         if (cmd & OBD_BRW_WRITE)
697                 return osc_brw_write(conn, md, page_count, page_array, count,
698                                      offset, flags, (bulk_callback_t)callback);
699         else
700                 return osc_brw_read(conn, md, page_count, page_array, count,
701                                     offset, flags, (bulk_callback_t)callback);
702 }
703
704 static int osc_enqueue(struct lustre_handle *conn,
705                        struct lustre_handle *parent_lock, __u64 *res_id,
706                        __u32 type, void *extentp, int extent_len, __u32 mode,
707                        int *flags, void *callback, void *data, int datalen,
708                        struct lustre_handle *lockh)
709 {
710         struct obd_device *obddev = class_conn2obd(conn);
711         struct ptlrpc_connection *connection;
712         struct ptlrpc_client *cl;
713         struct lustre_handle *rconn;
714         struct ldlm_extent *extent = extentp;
715         int rc;
716         __u32 mode2;
717
718         /* Filesystem locks are given a bit of special treatment: first we
719          * fixup the lock to start and end on page boundaries. */
720         extent->start &= PAGE_MASK;
721         extent->end = (extent->end + PAGE_SIZE - 1) & PAGE_MASK;
722
723         /* Next, search for already existing extent locks that will cover us */
724         osc_con2dlmcl(conn, &cl, &connection, &rconn);
725         rc = ldlm_lock_match(obddev->obd_namespace, res_id, type, extent,
726                              sizeof(extent), mode, lockh);
727         if (rc == 1) {
728                 /* We already have a lock, and it's referenced */
729                 return 0;
730         }
731
732         /* Next, search for locks that we can upgrade (if we're trying to write)
733          * or are more than we need (if we're trying to read).  Because the VFS
734          * and page cache already protect us locally, lots of readers/writers
735          * can share a single PW lock. */
736         if (mode == LCK_PW)
737                 mode2 = LCK_PR;
738         else
739                 mode2 = LCK_PW;
740
741         rc = ldlm_lock_match(obddev->obd_namespace, res_id, type, extent,
742                              sizeof(extent), mode2, lockh);
743         if (rc == 1) {
744                 int flags;
745                 /* FIXME: This is not incredibly elegant, but it might
746                  * be more elegant than adding another parameter to
747                  * lock_match.  I want a second opinion. */
748                 ldlm_lock_addref(lockh, mode);
749                 ldlm_lock_decref(lockh, mode2);
750
751                 if (mode == LCK_PR)
752                         return 0;
753
754                 rc = ldlm_cli_convert(cl, lockh, rconn, mode, &flags);
755                 if (rc)
756                         LBUG();
757
758                 return rc;
759         }
760
761         rc = ldlm_cli_enqueue(cl, connection, rconn, NULL,obddev->obd_namespace,
762                               parent_lock, res_id, type, extent, sizeof(extent),
763                               mode, flags, callback, data, datalen, lockh);
764         return rc;
765 }
766
767 static int osc_cancel(struct lustre_handle *oconn, __u32 mode,
768                       struct lustre_handle *lockh)
769 {
770         ENTRY;
771
772         ldlm_lock_decref(lockh, mode);
773
774         RETURN(0);
775 }
776
777 static int osc_setup(struct obd_device *obddev, obd_count len, void *buf)
778 {
779         struct obd_ioctl_data* data = buf;
780         struct osc_obd *osc = &obddev->u.osc;
781         char server_uuid[37];
782         int rc;
783         ENTRY;
784
785         if (data->ioc_inllen1 < 1) {
786                 CERROR("osc setup requires a TARGET UUID\n");
787                 RETURN(-EINVAL);
788         }
789
790         if (data->ioc_inllen1 > 37) {
791                 CERROR("osc TARGET UUID must be less than 38 characters\n");
792                 RETURN(-EINVAL);
793         }
794
795         if (data->ioc_inllen2 < 1) {
796                 CERROR("osc setup requires a SERVER UUID\n");
797                 RETURN(-EINVAL);
798         }
799
800         if (data->ioc_inllen2 > 37) {
801                 CERROR("osc SERVER UUID must be less than 38 characters\n");
802                 RETURN(-EINVAL);
803         }
804
805         memcpy(osc->osc_target_uuid, data->ioc_inlbuf1, data->ioc_inllen1);
806         memcpy(server_uuid, data->ioc_inlbuf2, MIN(data->ioc_inllen2,
807                                                    sizeof(server_uuid)));
808
809         osc->osc_conn = ptlrpc_uuid_to_connection(server_uuid);
810         if (!osc->osc_conn)
811                 RETURN(-ENOENT);
812
813         obddev->obd_namespace =
814                 ldlm_namespace_new("osc", LDLM_NAMESPACE_CLIENT);
815         if (obddev->obd_namespace == NULL)
816                 GOTO(out_conn, rc = -ENOMEM);
817
818         OBD_ALLOC(osc->osc_client, sizeof(*osc->osc_client));
819         if (osc->osc_client == NULL)
820                 GOTO(out_ns, rc = -ENOMEM);
821
822         OBD_ALLOC(osc->osc_ldlm_client, sizeof(*osc->osc_ldlm_client));
823         if (osc->osc_ldlm_client == NULL)
824                 GOTO(out_client, rc = -ENOMEM);
825
826         ptlrpc_init_client(NULL, NULL, OST_REQUEST_PORTAL, OSC_REPLY_PORTAL,
827                            osc->osc_client);
828         ptlrpc_init_client(NULL, NULL, LDLM_REQUEST_PORTAL, LDLM_REPLY_PORTAL,
829                            osc->osc_ldlm_client);
830         osc->osc_client->cli_name = "osc";
831         osc->osc_ldlm_client->cli_name = "ldlm";
832
833         MOD_INC_USE_COUNT;
834         RETURN(0);
835
836  out_client:
837         OBD_FREE(osc->osc_client, sizeof(*osc->osc_client));
838  out_ns:
839         ldlm_namespace_free(obddev->obd_namespace);
840  out_conn:
841         ptlrpc_put_connection(osc->osc_conn);
842         return rc;
843 }
844
845 static int osc_cleanup(struct obd_device * obddev)
846 {
847         struct osc_obd *osc = &obddev->u.osc;
848
849         ldlm_namespace_free(obddev->obd_namespace);
850
851         ptlrpc_cleanup_client(osc->osc_client);
852         OBD_FREE(osc->osc_client, sizeof(*osc->osc_client));
853         ptlrpc_cleanup_client(osc->osc_ldlm_client);
854         OBD_FREE(osc->osc_ldlm_client, sizeof(*osc->osc_ldlm_client));
855         ptlrpc_put_connection(osc->osc_conn);
856
857         MOD_DEC_USE_COUNT;
858         return 0;
859 }
860
861 static int osc_statfs(struct lustre_handle *conn, struct statfs *sfs)
862 {
863         struct ptlrpc_request *request;
864         struct ptlrpc_client *cl;
865         struct ptlrpc_connection *connection;
866         struct lustre_handle *rconn;
867         struct obd_statfs *osfs;
868         int rc, size = sizeof(*osfs);
869         ENTRY;
870
871         osc_con2cl(conn, &cl, &connection, &rconn);
872         request = ptlrpc_prep_req2(cl, connection, rconn,
873                                    OST_STATFS, 0, NULL, NULL);
874         if (!request)
875                 RETURN(-ENOMEM);
876
877         request->rq_replen = lustre_msg_size(1, &size);
878
879         rc = ptlrpc_queue_wait(request);
880         rc = ptlrpc_check_status(request, rc);
881         if (rc) {
882                 CERROR("%s failed: rc = %d\n", __FUNCTION__, rc);
883                 GOTO(out, rc);
884         }
885
886         osfs = lustre_msg_buf(request->rq_repmsg, 0);
887         obd_statfs_unpack(osfs, sfs);
888
889         EXIT;
890  out:
891         ptlrpc_free_req(request);
892         return rc;
893 }
894
895 struct obd_ops osc_obd_ops = {
896         o_setup:        osc_setup,
897         o_cleanup:      osc_cleanup,
898         o_statfs:       osc_statfs,
899         o_create:       osc_create,
900         o_destroy:      osc_destroy,
901         o_getattr:      osc_getattr,
902         o_setattr:      osc_setattr,
903         o_open:         osc_open,
904         o_close:        osc_close,
905         o_connect:      osc_connect,
906         o_disconnect:   osc_disconnect,
907         o_brw:          osc_brw,
908         o_punch:        osc_punch,
909         o_enqueue:      osc_enqueue,
910         o_cancel:       osc_cancel
911 };
912
913 static int __init osc_init(void)
914 {
915         return class_register_type(&osc_obd_ops, LUSTRE_OSC_NAME);
916 }
917
918 static void __exit osc_exit(void)
919 {
920         class_unregister_type(LUSTRE_OSC_NAME);
921 }
922
923 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
924 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC) v1.0");
925 MODULE_LICENSE("GPL");
926
927 module_init(osc_init);
928 module_exit(osc_exit);