Whamcloud - gitweb
Fix RPC request leak and potential refcounting problems on bulk descriptor.
[fs/lustre-release.git] / lustre / osc / osc_request.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Copyright (C) 2001, 2002 Cluster File Systems, Inc.
5  *
6  *  This code is issued under the GNU General Public License.
7  *  See the file COPYING in this distribution
8  *
9  *  Author Peter Braam <braam@clusterfs.com>
10  *
11  *  This server is single threaded at present (but can easily be multi
12  *  threaded). For testing and management it is treated as an
13  *  obd_device, although it does not export a full OBD method table
14  *  (the requests are coming in over the wire, so object target
15  *  modules do not have a full method table.)
16  *
17  */
18
19 #define EXPORT_SYMTAB
20 #define DEBUG_SUBSYSTEM S_OSC
21
22 #include <linux/module.h>
23 #include <linux/lustre_dlm.h>
24 #include <linux/lustre_mds.h> /* for mds_objid */
25 #include <linux/obd_ost.h>
26 #include <linux/obd_lov.h>
27
28 static void osc_con2cl(struct lustre_handle *conn, struct ptlrpc_client **cl,
29                        struct ptlrpc_connection **connection,
30                        struct lustre_handle **rconn)
31 {
32         struct obd_export *export = class_conn2export(conn);
33         struct osc_obd *osc = &export->exp_obd->u.osc;
34
35         *cl = osc->osc_client;
36         *connection = osc->osc_conn;
37         *rconn = &export->exp_rconnh;
38 }
39
40 static void osc_con2dlmcl(struct lustre_handle *conn, struct ptlrpc_client **cl,
41                           struct ptlrpc_connection **connection,
42                           struct lustre_handle **rconn)
43 {
44         struct obd_export *export = class_conn2export(conn);
45         struct osc_obd *osc = &export->exp_obd->u.osc;
46
47         *cl = osc->osc_ldlm_client;
48         *connection = osc->osc_conn;
49         *rconn = &export->exp_rconnh;
50 }
51
52 static int osc_connect(struct lustre_handle *conn, struct obd_device *obd)
53 {
54         struct osc_obd *osc = &obd->u.osc;
55         //struct obd_import *import;
56         struct ptlrpc_request *request;
57         char *tmp = osc->osc_target_uuid;
58         int rc, size = sizeof(osc->osc_target_uuid);
59         ENTRY;
60
61         /* not used yet
62         OBD_ALLOC(import, sizeof(*import));
63         if (!import)
64                 RETURN(-ENOMEM);
65          */
66
67         MOD_INC_USE_COUNT;
68         rc = class_connect(conn, obd);
69         if (rc)
70                 RETURN(rc);
71
72         request = ptlrpc_prep_req(osc->osc_client, osc->osc_conn,
73                                   OST_CONNECT, 1, &size, &tmp);
74         if (!request)
75                 GOTO(out_disco, rc = -ENOMEM);
76
77         request->rq_level = LUSTRE_CONN_NEW;
78         request->rq_replen = lustre_msg_size(0, NULL);
79         request->rq_reqmsg->addr = -1;
80         /* Sending our local connection info breaks for local connections
81         request->rq_reqmsg->addr = conn->addr;
82         request->rq_reqmsg->cookie = conn->cookie;
83          */
84
85         rc = ptlrpc_queue_wait(request);
86         rc = ptlrpc_check_status(request, rc);
87         if (rc) {
88                 CERROR("%s failed: rc = %d\n", __FUNCTION__, rc);
89                 GOTO(out, rc);
90         }
91
92         /* XXX eventually maybe more refinement */
93         osc->osc_conn->c_level = LUSTRE_CONN_FULL;
94
95         class_rconn2export(conn, (struct lustre_handle *)request->rq_repmsg);
96
97         EXIT;
98  out:
99         ptlrpc_free_req(request);
100  out_disco:
101         if (rc) {
102                 class_disconnect(conn);
103                 MOD_DEC_USE_COUNT;
104         }
105         return rc;
106 }
107
108 static int osc_disconnect(struct lustre_handle *conn)
109 {
110         struct ptlrpc_request *request;
111         struct ptlrpc_client *cl;
112         struct ptlrpc_connection *connection;
113         struct lustre_handle *rconn;
114         int rc;
115         ENTRY;
116
117         osc_con2cl(conn, &cl, &connection, &rconn);
118         request = ptlrpc_prep_req2(cl, connection, rconn,
119                                    OST_DISCONNECT, 0, NULL, NULL);
120         if (!request)
121                 RETURN(-ENOMEM);
122         request->rq_replen = lustre_msg_size(0, NULL);
123
124         rc = ptlrpc_queue_wait(request);
125         if (rc)
126                 GOTO(out, rc);
127         rc = class_disconnect(conn);
128         if (!rc)
129                 MOD_DEC_USE_COUNT;
130
131  out:
132         ptlrpc_free_req(request);
133         return rc;
134 }
135
136 static int osc_getattr(struct lustre_handle *conn, struct obdo *oa)
137 {
138         struct ptlrpc_request *request;
139         struct ptlrpc_client *cl;
140         struct ptlrpc_connection *connection;
141         struct lustre_handle *rconn;
142         struct ost_body *body;
143         int rc, size = sizeof(*body);
144         ENTRY;
145
146         osc_con2cl(conn, &cl, &connection, &rconn);
147         request = ptlrpc_prep_req2(cl, connection, rconn,
148                                    OST_GETATTR, 1, &size, NULL);
149         if (!request)
150                 RETURN(-ENOMEM);
151
152         body = lustre_msg_buf(request->rq_reqmsg, 0);
153         memcpy(&body->oa, oa, sizeof(*oa));
154         body->oa.o_valid = ~0;
155
156         request->rq_replen = lustre_msg_size(1, &size);
157
158         rc = ptlrpc_queue_wait(request);
159         rc = ptlrpc_check_status(request, rc);
160         if (rc) {
161                 CERROR("%s failed: rc = %d\n", __FUNCTION__, rc);
162                 GOTO(out, rc);
163         }
164
165         body = lustre_msg_buf(request->rq_repmsg, 0);
166         CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
167         if (oa)
168                 memcpy(oa, &body->oa, sizeof(*oa));
169
170         EXIT;
171  out:
172         ptlrpc_free_req(request);
173         return rc;
174 }
175
176 static int osc_open(struct lustre_handle *conn, struct obdo *oa,
177                     struct lov_stripe_md *md)
178 {
179         struct ptlrpc_request *request;
180         struct ptlrpc_client *cl;
181         struct ptlrpc_connection *connection;
182         struct lustre_handle *rconn;
183         struct ost_body *body;
184         int rc, size = sizeof(*body);
185         ENTRY;
186
187         osc_con2cl(conn, &cl, &connection, &rconn);
188         request = ptlrpc_prep_req2(cl, connection, rconn,
189                                    OST_OPEN, 1, &size, NULL);
190         if (!request)
191                 RETURN(-ENOMEM);
192
193         body = lustre_msg_buf(request->rq_reqmsg, 0);
194         memcpy(&body->oa, oa, sizeof(*oa));
195         body->oa.o_valid = (OBD_MD_FLMODE | OBD_MD_FLID);
196
197         request->rq_replen = lustre_msg_size(1, &size);
198
199         rc = ptlrpc_queue_wait(request);
200         rc = ptlrpc_check_status(request, rc);
201         if (rc)
202                 GOTO(out, rc);
203
204         body = lustre_msg_buf(request->rq_repmsg, 0);
205         CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
206         if (oa)
207                 memcpy(oa, &body->oa, sizeof(*oa));
208
209         EXIT;
210  out:
211         ptlrpc_free_req(request);
212         return rc;
213 }
214
215 static int osc_close(struct lustre_handle *conn, struct obdo *oa,
216                      struct lov_stripe_md *md)
217 {
218         struct ptlrpc_request *request;
219         struct ptlrpc_client *cl;
220         struct ptlrpc_connection *connection;
221         struct lustre_handle *rconn;
222         struct ost_body *body;
223         int rc, size = sizeof(*body);
224         ENTRY;
225
226         osc_con2cl(conn, &cl, &connection, &rconn);
227         request = ptlrpc_prep_req2(cl, connection, rconn,
228                                    OST_CLOSE, 1, &size, NULL);
229         if (!request)
230                 RETURN(-ENOMEM);
231
232         oa->o_id = md->lmd_object_id;
233         oa->o_mode = S_IFREG;
234         oa->o_valid = (OBD_MD_FLMODE | OBD_MD_FLID);
235         body = lustre_msg_buf(request->rq_reqmsg, 0);
236         memcpy(&body->oa, oa, sizeof(*oa));
237
238         request->rq_replen = lustre_msg_size(1, &size);
239
240         rc = ptlrpc_queue_wait(request);
241         rc = ptlrpc_check_status(request, rc);
242         if (rc)
243                 GOTO(out, rc);
244
245         body = lustre_msg_buf(request->rq_repmsg, 0);
246         CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
247         if (oa)
248                 memcpy(oa, &body->oa, sizeof(*oa));
249
250         EXIT;
251  out:
252         ptlrpc_free_req(request);
253         return rc;
254 }
255
256 static int osc_setattr(struct lustre_handle *conn, struct obdo *oa)
257 {
258         struct ptlrpc_request *request;
259         struct ptlrpc_client *cl;
260         struct ptlrpc_connection *connection;
261         struct lustre_handle *rconn;
262         struct ost_body *body;
263         int rc, size = sizeof(*body);
264         ENTRY;
265
266         osc_con2cl(conn, &cl, &connection, &rconn);
267         request = ptlrpc_prep_req2(cl, connection, rconn,
268                                   OST_SETATTR, 1, &size, NULL);
269         if (!request)
270                 RETURN(-ENOMEM);
271
272         body = lustre_msg_buf(request->rq_reqmsg, 0);
273         memcpy(&body->oa, oa, sizeof(*oa));
274
275         request->rq_replen = lustre_msg_size(1, &size);
276
277         rc = ptlrpc_queue_wait(request);
278         rc = ptlrpc_check_status(request, rc);
279         GOTO(out, rc);
280
281  out:
282         ptlrpc_free_req(request);
283         return rc;
284 }
285
286 static int osc_create(struct lustre_handle *conn, struct obdo *oa,
287                       struct lov_stripe_md **ea)
288 {
289         struct ptlrpc_request *request;
290         struct ptlrpc_client *cl;
291         struct ptlrpc_connection *connection;
292         struct lustre_handle *rconn;
293         struct ost_body *body;
294         int rc, size = sizeof(*body);
295         ENTRY;
296
297         if (!oa) {
298                 CERROR("oa NULL\n");
299                 RETURN(-EINVAL);
300         }
301
302         if (!ea) {
303                 LBUG();
304         }
305
306         if (!*ea) {
307                 OBD_ALLOC(*ea, oa->o_easize);
308                 if (!*ea)
309                         RETURN(-ENOMEM);
310                 (*ea)->lmd_size = oa->o_easize;
311         }
312
313         osc_con2cl(conn, &cl, &connection, &rconn);
314         request = ptlrpc_prep_req2(cl, connection, rconn,
315                                   OST_CREATE, 1, &size, NULL);
316         if (!request)
317                 RETURN(-ENOMEM);
318
319         body = lustre_msg_buf(request->rq_reqmsg, 0);
320         memcpy(&body->oa, oa, sizeof(*oa));
321
322         request->rq_replen = lustre_msg_size(1, &size);
323
324         rc = ptlrpc_queue_wait(request);
325         rc = ptlrpc_check_status(request, rc);
326         if (rc)
327                 GOTO(out, rc);
328
329         body = lustre_msg_buf(request->rq_repmsg, 0);
330         memcpy(oa, &body->oa, sizeof(*oa));
331
332         (*ea)->lmd_object_id = oa->o_id;
333         (*ea)->lmd_stripe_count = 1;
334         EXIT;
335  out:
336         ptlrpc_free_req(request);
337         return rc;
338 }
339
340 static int osc_punch(struct lustre_handle *conn, struct obdo *oa,
341                      struct lov_stripe_md *md, obd_size count,
342                      obd_off offset)
343 {
344         struct ptlrpc_request *request;
345         struct ptlrpc_client *cl;
346         struct ptlrpc_connection *connection;
347         struct lustre_handle *rconn;
348         struct ost_body *body;
349         int rc, size = sizeof(*body);
350         ENTRY;
351
352         if (!oa) {
353                 CERROR("oa NULL\n");
354                 RETURN(-EINVAL);
355         }
356         osc_con2cl(conn, &cl, &connection, &rconn);
357         request = ptlrpc_prep_req2(cl, connection, rconn,
358                                    OST_PUNCH, 1, &size, NULL);
359         if (!request)
360                 RETURN(-ENOMEM);
361
362         body = lustre_msg_buf(request->rq_reqmsg, 0);
363         memcpy(&body->oa, oa, sizeof(*oa));
364         body->oa.o_blocks = count;
365         body->oa.o_valid |= OBD_MD_FLBLOCKS;
366
367         request->rq_replen = lustre_msg_size(1, &size);
368
369         rc = ptlrpc_queue_wait(request);
370         rc = ptlrpc_check_status(request, rc);
371         if (rc)
372                 GOTO(out, rc);
373
374         body = lustre_msg_buf(request->rq_repmsg, 0);
375         memcpy(oa, &body->oa, sizeof(*oa));
376
377         EXIT;
378  out:
379         ptlrpc_free_req(request);
380         return rc;
381 }
382
383 static int osc_destroy(struct lustre_handle *conn, struct obdo *oa,
384                        struct lov_stripe_md *ea)
385 {
386         struct ptlrpc_request *request;
387         struct ptlrpc_client *cl;
388         struct ptlrpc_connection *connection;
389         struct lustre_handle *rconn;
390         struct ost_body *body;
391         int rc, size = sizeof(*body);
392         ENTRY;
393
394         if (!oa) {
395                 CERROR("oa NULL\n");
396                 RETURN(-EINVAL);
397         }
398         osc_con2cl(conn, &cl, &connection, &rconn);
399         request = ptlrpc_prep_req2(cl, connection, rconn,
400                                    OST_DESTROY, 1, &size, NULL);
401         if (!request)
402                 RETURN(-ENOMEM);
403
404         body = lustre_msg_buf(request->rq_reqmsg, 0);
405         memcpy(&body->oa, oa, sizeof(*oa));
406         body->oa.o_valid = ~0;
407
408         request->rq_replen = lustre_msg_size(1, &size);
409
410         rc = ptlrpc_queue_wait(request);
411         rc = ptlrpc_check_status(request, rc);
412         if (rc)
413                 GOTO(out, rc);
414
415         body = lustre_msg_buf(request->rq_repmsg, 0);
416         memcpy(oa, &body->oa, sizeof(*oa));
417
418         EXIT;
419  out:
420         ptlrpc_free_req(request);
421         return rc;
422 }
423
424 struct osc_brw_cb_data {
425         brw_callback_t callback;
426         void *cb_data;
427         void *obd_data;
428         size_t obd_size;
429 };
430
431 /* Our bulk-unmapping bottom half. */
432 static void unmap_and_decref_bulk_desc(void *data)
433 {
434         struct ptlrpc_bulk_desc *desc = data;
435         struct list_head *tmp;
436         ENTRY;
437
438         /* This feels wrong to me. */
439         list_for_each(tmp, &desc->b_page_list) {
440                 struct ptlrpc_bulk_page *bulk;
441                 bulk = list_entry(tmp, struct ptlrpc_bulk_page, b_link);
442
443                 kunmap(bulk->b_page);
444         }
445
446         ptlrpc_bulk_decref(desc);
447         EXIT;
448 }
449
450 static void brw_finish(struct ptlrpc_bulk_desc *desc, void *data)
451 {
452         struct osc_brw_cb_data *cb_data = data;
453         ENTRY;
454
455         if (desc->b_flags & PTL_RPC_FL_INTR)
456                 CERROR("got signal\n");
457
458         if (cb_data->callback)
459                 cb_data->callback(cb_data->cb_data);
460
461         OBD_FREE(cb_data->obd_data, cb_data->obd_size);
462         OBD_FREE(cb_data, sizeof(*cb_data));
463
464         /* We can't kunmap the desc from interrupt context, so we do it from
465          * the bottom half above. */
466         INIT_TQUEUE(&desc->b_queue, 0, 0);
467         PREPARE_TQUEUE(&desc->b_queue, unmap_and_decref_bulk_desc, desc);
468         schedule_task(&desc->b_queue);
469
470         EXIT;
471 }
472
473 static int osc_brw_read(struct lustre_handle *conn, struct lov_stripe_md *md,
474                         obd_count page_count, struct page **page_array,
475                         obd_size *count, obd_off *offset, obd_flag *flags,
476                         brw_callback_t callback, void *data)
477 {
478         struct ptlrpc_client *cl;
479         struct ptlrpc_connection *connection;
480         struct lustre_handle *rconn;
481         struct ptlrpc_request *request = NULL;
482         struct ptlrpc_bulk_desc *desc = NULL;
483         struct ost_body *body;
484         struct osc_brw_cb_data *cb_data = NULL;
485         int rc, size[3] = {sizeof(*body)};
486         void *iooptr, *nioptr;
487         int mapped = 0;
488         ENTRY;
489
490         size[1] = sizeof(struct obd_ioobj);
491         size[2] = page_count * sizeof(struct niobuf_remote);
492
493         osc_con2cl(conn, &cl, &connection, &rconn);
494         request = ptlrpc_prep_req2(cl, connection, rconn,
495                                    OST_BRW, 3, size, NULL);
496         if (!request)
497                 RETURN(-ENOMEM);
498
499         body = lustre_msg_buf(request->rq_reqmsg, 0);
500         body->data = OBD_BRW_READ;
501
502         desc = ptlrpc_prep_bulk(connection);
503         if (!desc)
504                 GOTO(out_req, rc = -ENOMEM);
505         desc->b_portal = OST_BULK_PORTAL;
506         desc->b_cb = brw_finish;
507         OBD_ALLOC(cb_data, sizeof(*cb_data));
508         if (!cb_data)
509                 GOTO(out_desc, rc = -ENOMEM);
510
511         cb_data->callback = callback;
512         cb_data->cb_data = data;
513         desc->b_cb_data = cb_data;
514
515         iooptr = lustre_msg_buf(request->rq_reqmsg, 1);
516         nioptr = lustre_msg_buf(request->rq_reqmsg, 2);
517         ost_pack_ioo(&iooptr, md, page_count);
518         /* end almost identical to brw_write case */
519
520         for (mapped = 0; mapped < page_count; mapped++) {
521                 struct ptlrpc_bulk_page *bulk = ptlrpc_prep_bulk_page(desc);
522                 if (bulk == NULL)
523                         GOTO(out_unmap, rc = -ENOMEM);
524
525                 spin_lock(&connection->c_lock);
526                 bulk->b_xid = ++connection->c_xid_out;
527                 spin_unlock(&connection->c_lock);
528
529                 bulk->b_buf = kmap(page_array[mapped]);
530                 bulk->b_page = page_array[mapped];
531                 bulk->b_buflen = PAGE_SIZE;
532                 ost_pack_niobuf(&nioptr, offset[mapped], count[mapped],
533                                 flags[mapped], bulk->b_xid);
534         }
535
536         /*
537          * Register the bulk first, because the reply could arrive out of order,
538          * and we want to be ready for the bulk data.
539          *
540          * One reference is released when brw_finish is complete, the
541          * other here when we finish waiting on it if we don't have a callback.
542          *
543          * We don't reference the bulk descriptor again here if there is a
544          * callback, so we don't need an additional refcount on it.
545          *
546          * On error, we never do the brw_finish, so we handle all decrefs.
547          */
548         if (!callback)
549                 ptlrpc_bulk_addref(desc);
550         rc = ptlrpc_register_bulk(desc);
551         if (rc)
552                 GOTO(out_desc2, rc);
553
554         request->rq_replen = lustre_msg_size(1, size);
555         rc = ptlrpc_queue_wait(request);
556         rc = ptlrpc_check_status(request, rc);
557
558         /* XXX: Mike, this is the only place I'm not sure of.  If we had
559          *      an error here, will we always call brw_finish?  If yes, then
560          *      out_desc_2 will do too much and we should jump to out_desc.
561          *      If maybe, then we are screwed, and we need to set things up
562          *      so that bulk_sink_callback is called for each bulk page,
563          *      even on error so brw_finish is always called.  It would need
564          *      to be passed an error code as a parameter to know what to do.
565          *
566          *      That would also help with the partial completion case, so
567          *      we could say in brw_finish "these pages are done, don't
568          *      restart them" and osc_brw callers can know this.
569          */
570         if (rc)
571                 GOTO(out_desc2, rc);
572
573         /* Callbacks cause asynchronous handling. */
574         if (callback)
575                 GOTO(out_req, rc = 0);
576
577         /* If there's no callback function, sleep here until complete. */
578         l_wait_event_killable(desc->b_waitq, ptlrpc_check_bulk_received(desc));
579         if (desc->b_flags & PTL_RPC_FL_INTR)
580                 GOTO(out_desc, rc = -EINTR);
581
582         EXIT;
583 out_desc:
584         ptlrpc_bulk_decref(desc);
585 out_req:
586         ptlrpc_req_finished(request);
587         RETURN(rc);
588
589         /* Clean up on error. */
590 out_desc2:
591         if (!callback)
592                 ptlrpc_bulk_decref(desc);
593 out_unmap:
594         while (mapped-- > 0)
595                 kunmap(page_array[mapped]);
596         OBD_FREE(cb_data, sizeof(*cb_data));
597         goto out_desc;
598 }
599
600 static int osc_brw_write(struct lustre_handle *conn,
601                          struct lov_stripe_md *md, obd_count page_count,
602                          struct page **pagearray, obd_size *count,
603                          obd_off *offset, obd_flag *flags,
604                          brw_callback_t callback, void *data)
605 {
606         struct ptlrpc_client *cl;
607         struct ptlrpc_connection *connection;
608         struct lustre_handle *rconn;
609         struct ptlrpc_request *request = NULL;
610         struct ptlrpc_bulk_desc *desc = NULL;
611         struct ost_body *body;
612         struct niobuf_local *local = NULL;
613         struct niobuf_remote *remote;
614         struct osc_brw_cb_data *cb_data = NULL;
615         int rc, j, size[3] = {sizeof(*body)};
616         void *iooptr, *nioptr;
617         int mapped = 0;
618         ENTRY;
619
620         size[1] = sizeof(struct obd_ioobj);
621         size[2] = page_count * sizeof(*remote);
622
623         osc_con2cl(conn, &cl, &connection, &rconn);
624         request = ptlrpc_prep_req2(cl, connection, rconn,
625                                    OST_BRW, 3, size, NULL);
626         if (!request)
627                 RETURN(-ENOMEM);
628
629         body = lustre_msg_buf(request->rq_reqmsg, 0);
630         body->data = OBD_BRW_WRITE;
631
632         desc = ptlrpc_prep_bulk(connection);
633         if (!desc)
634                 GOTO(out_req, rc = -ENOMEM);
635         desc->b_portal = OSC_BULK_PORTAL;
636         desc->b_cb = brw_finish;
637         OBD_ALLOC(cb_data, sizeof(*cb_data));
638         if (!cb_data)
639                 GOTO(out_desc, rc = -ENOMEM);
640
641         cb_data->callback = callback;
642         cb_data->cb_data = data;
643         desc->b_cb_data = cb_data;
644
645         iooptr = lustre_msg_buf(request->rq_reqmsg, 1);
646         nioptr = lustre_msg_buf(request->rq_reqmsg, 2);
647         ost_pack_ioo(&iooptr, md, page_count);
648         /* end almost identical to brw_read case */
649
650         OBD_ALLOC(local, page_count * sizeof(*local));
651         if (!local)
652                 GOTO(out_cb, rc = -ENOMEM);
653
654         cb_data->obd_data = local;
655         cb_data->obd_size = page_count * sizeof(*local);
656
657         for (mapped = 0; mapped < page_count; mapped++) {
658                 local[mapped].addr = kmap(pagearray[mapped]);
659                 local[mapped].offset = offset[mapped];
660                 local[mapped].len = count[mapped];
661                 ost_pack_niobuf(&nioptr, offset[mapped], count[mapped],
662                                 flags[mapped], 0);
663         }
664
665         size[1] = page_count * sizeof(*remote);
666         request->rq_replen = lustre_msg_size(2, size);
667         rc = ptlrpc_queue_wait(request);
668         rc = ptlrpc_check_status(request, rc);
669         if (rc)
670                 GOTO(out_unmap, rc);
671
672         nioptr = lustre_msg_buf(request->rq_repmsg, 1);
673         if (!nioptr)
674                 GOTO(out_unmap, rc = -EINVAL);
675
676         if (request->rq_repmsg->buflens[1] != size[1]) {
677                 CERROR("buffer length wrong (%d vs. %d)\n",
678                        request->rq_repmsg->buflens[1], size[1]);
679                 GOTO(out_unmap, rc = -EINVAL);
680         }
681
682         for (j = 0; j < page_count; j++) {
683                 struct ptlrpc_bulk_page *bulk;
684
685                 ost_unpack_niobuf(&nioptr, &remote);
686
687                 bulk = ptlrpc_prep_bulk_page(desc);
688                 if (!bulk)
689                         GOTO(out_unmap, rc = -ENOMEM);
690
691                 bulk->b_buf = (void *)(unsigned long)local[j].addr;
692                 bulk->b_buflen = local[j].len;
693                 bulk->b_xid = remote->xid;
694         }
695
696         if (desc->b_page_count != page_count)
697                 LBUG();
698
699         /*
700          * One reference is released when brw_finish is complete, the
701          * other here when we finish waiting on it if we don't have a callback.
702          *
703          * We don't reference the bulk descriptor again here if there is a
704          * callback, so we don't need an additional refcount on it.
705          */
706         if (!callback)
707                 ptlrpc_bulk_addref(desc);
708         rc = ptlrpc_send_bulk(desc);
709
710         /* XXX: Mike, same question as in osc_brw_read. */
711         if (rc)
712                 GOTO(out_desc2, rc);
713
714         /* Callbacks cause asynchronous handling. */
715         if (callback)
716                 GOTO(out_req, rc = 0);
717
718         /* If there's no callback function, sleep here until complete. */
719         l_wait_event_killable(desc->b_waitq, ptlrpc_check_bulk_sent(desc));
720         if (desc->b_flags & PTL_RPC_FL_INTR)
721                 GOTO(out_desc, rc = -EINTR);
722
723         EXIT;
724 out_desc:
725         ptlrpc_bulk_decref(desc);
726 out_req:
727         ptlrpc_req_finished(request);
728         return rc;
729
730         /* Clean up on error. */
731 out_desc2:
732         if (!callback)
733                 ptlrpc_bulk_decref(desc);
734 out_unmap:
735         while (mapped-- > 0)
736                 kunmap(pagearray[mapped]);
737
738         OBD_FREE(local, page_count * sizeof(*local));
739 out_cb:
740         OBD_FREE(cb_data, sizeof(*cb_data));
741         goto out_desc;
742 }
743
744 static int osc_brw(int cmd, struct lustre_handle *conn,
745                    struct lov_stripe_md *md, obd_count page_count,
746                    struct page **page_array, obd_size *count, obd_off *offset,
747                    obd_flag *flags, brw_callback_t callback, void *data)
748 {
749         if (cmd & OBD_BRW_WRITE)
750                 return osc_brw_write(conn, md, page_count, page_array, count,
751                                      offset, flags, callback, data);
752         else
753                 return osc_brw_read(conn, md, page_count, page_array, count,
754                                     offset, flags, callback, data);
755 }
756
757 static int osc_enqueue(struct lustre_handle *conn,
758                        struct lustre_handle *parent_lock, __u64 *res_id,
759                        __u32 type, void *extentp, int extent_len, __u32 mode,
760                        int *flags, void *callback, void *data, int datalen,
761                        struct lustre_handle *lockh)
762 {
763         struct obd_device *obddev = class_conn2obd(conn);
764         struct ptlrpc_connection *connection;
765         struct ptlrpc_client *cl;
766         struct lustre_handle *rconn;
767         struct ldlm_extent *extent = extentp;
768         int rc;
769         __u32 mode2;
770
771         /* Filesystem locks are given a bit of special treatment: first we
772          * fixup the lock to start and end on page boundaries. */
773         extent->start &= PAGE_MASK;
774         extent->end = (extent->end + PAGE_SIZE - 1) & PAGE_MASK;
775
776         /* Next, search for already existing extent locks that will cover us */
777         osc_con2dlmcl(conn, &cl, &connection, &rconn);
778         rc = ldlm_lock_match(obddev->obd_namespace, res_id, type, extent,
779                              sizeof(extent), mode, lockh);
780         if (rc == 1) {
781                 /* We already have a lock, and it's referenced */
782                 return 0;
783         }
784
785         /* Next, search for locks that we can upgrade (if we're trying to write)
786          * or are more than we need (if we're trying to read).  Because the VFS
787          * and page cache already protect us locally, lots of readers/writers
788          * can share a single PW lock. */
789         if (mode == LCK_PW)
790                 mode2 = LCK_PR;
791         else
792                 mode2 = LCK_PW;
793
794         rc = ldlm_lock_match(obddev->obd_namespace, res_id, type, extent,
795                              sizeof(extent), mode2, lockh);
796         if (rc == 1) {
797                 int flags;
798                 /* FIXME: This is not incredibly elegant, but it might
799                  * be more elegant than adding another parameter to
800                  * lock_match.  I want a second opinion. */
801                 ldlm_lock_addref(lockh, mode);
802                 ldlm_lock_decref(lockh, mode2);
803
804                 if (mode == LCK_PR)
805                         return 0;
806
807                 rc = ldlm_cli_convert(cl, lockh, rconn, mode, &flags);
808                 if (rc)
809                         LBUG();
810
811                 return rc;
812         }
813
814         rc = ldlm_cli_enqueue(cl, connection, rconn, NULL,obddev->obd_namespace,
815                               parent_lock, res_id, type, extent, sizeof(extent),
816                               mode, flags, callback, data, datalen, lockh);
817         return rc;
818 }
819
820 static int osc_cancel(struct lustre_handle *oconn, __u32 mode,
821                       struct lustre_handle *lockh)
822 {
823         ENTRY;
824
825         ldlm_lock_decref(lockh, mode);
826
827         RETURN(0);
828 }
829
830 static int osc_setup(struct obd_device *obddev, obd_count len, void *buf)
831 {
832         struct obd_ioctl_data* data = buf;
833         struct osc_obd *osc = &obddev->u.osc;
834         char server_uuid[37];
835         int rc;
836         ENTRY;
837
838         if (data->ioc_inllen1 < 1) {
839                 CERROR("osc setup requires a TARGET UUID\n");
840                 RETURN(-EINVAL);
841         }
842
843         if (data->ioc_inllen1 > 37) {
844                 CERROR("osc TARGET UUID must be less than 38 characters\n");
845                 RETURN(-EINVAL);
846         }
847
848         if (data->ioc_inllen2 < 1) {
849                 CERROR("osc setup requires a SERVER UUID\n");
850                 RETURN(-EINVAL);
851         }
852
853         if (data->ioc_inllen2 > 37) {
854                 CERROR("osc SERVER UUID must be less than 38 characters\n");
855                 RETURN(-EINVAL);
856         }
857
858         memcpy(osc->osc_target_uuid, data->ioc_inlbuf1, data->ioc_inllen1);
859         memcpy(server_uuid, data->ioc_inlbuf2, MIN(data->ioc_inllen2,
860                                                    sizeof(server_uuid)));
861
862         osc->osc_conn = ptlrpc_uuid_to_connection(server_uuid);
863         if (!osc->osc_conn)
864                 RETURN(-ENOENT);
865
866         obddev->obd_namespace =
867                 ldlm_namespace_new("osc", LDLM_NAMESPACE_CLIENT);
868         if (obddev->obd_namespace == NULL)
869                 GOTO(out_conn, rc = -ENOMEM);
870
871         OBD_ALLOC(osc->osc_client, sizeof(*osc->osc_client));
872         if (osc->osc_client == NULL)
873                 GOTO(out_ns, rc = -ENOMEM);
874
875         OBD_ALLOC(osc->osc_ldlm_client, sizeof(*osc->osc_ldlm_client));
876         if (osc->osc_ldlm_client == NULL)
877                 GOTO(out_client, rc = -ENOMEM);
878
879         ptlrpc_init_client(NULL, NULL, OST_REQUEST_PORTAL, OSC_REPLY_PORTAL,
880                            osc->osc_client);
881         ptlrpc_init_client(NULL, NULL, LDLM_REQUEST_PORTAL, LDLM_REPLY_PORTAL,
882                            osc->osc_ldlm_client);
883         osc->osc_client->cli_name = "osc";
884         osc->osc_ldlm_client->cli_name = "ldlm";
885
886         MOD_INC_USE_COUNT;
887         RETURN(0);
888
889  out_client:
890         OBD_FREE(osc->osc_client, sizeof(*osc->osc_client));
891  out_ns:
892         ldlm_namespace_free(obddev->obd_namespace);
893  out_conn:
894         ptlrpc_put_connection(osc->osc_conn);
895         return rc;
896 }
897
898 static int osc_cleanup(struct obd_device * obddev)
899 {
900         struct osc_obd *osc = &obddev->u.osc;
901
902         ldlm_namespace_free(obddev->obd_namespace);
903
904         ptlrpc_cleanup_client(osc->osc_client);
905         OBD_FREE(osc->osc_client, sizeof(*osc->osc_client));
906         ptlrpc_cleanup_client(osc->osc_ldlm_client);
907         OBD_FREE(osc->osc_ldlm_client, sizeof(*osc->osc_ldlm_client));
908         ptlrpc_put_connection(osc->osc_conn);
909
910         MOD_DEC_USE_COUNT;
911         return 0;
912 }
913
914 static int osc_statfs(struct lustre_handle *conn, struct statfs *sfs)
915 {
916         struct ptlrpc_request *request;
917         struct ptlrpc_client *cl;
918         struct ptlrpc_connection *connection;
919         struct lustre_handle *rconn;
920         struct obd_statfs *osfs;
921         int rc, size = sizeof(*osfs);
922         ENTRY;
923
924         osc_con2cl(conn, &cl, &connection, &rconn);
925         request = ptlrpc_prep_req2(cl, connection, rconn,
926                                    OST_STATFS, 0, NULL, NULL);
927         if (!request)
928                 RETURN(-ENOMEM);
929
930         request->rq_replen = lustre_msg_size(1, &size);
931
932         rc = ptlrpc_queue_wait(request);
933         rc = ptlrpc_check_status(request, rc);
934         if (rc) {
935                 CERROR("%s failed: rc = %d\n", __FUNCTION__, rc);
936                 GOTO(out, rc);
937         }
938
939         osfs = lustre_msg_buf(request->rq_repmsg, 0);
940         obd_statfs_unpack(osfs, sfs);
941
942         EXIT;
943  out:
944         ptlrpc_free_req(request);
945         return rc;
946 }
947
948 struct obd_ops osc_obd_ops = {
949         o_setup:        osc_setup,
950         o_cleanup:      osc_cleanup,
951         o_statfs:       osc_statfs,
952         o_create:       osc_create,
953         o_destroy:      osc_destroy,
954         o_getattr:      osc_getattr,
955         o_setattr:      osc_setattr,
956         o_open:         osc_open,
957         o_close:        osc_close,
958         o_connect:      osc_connect,
959         o_disconnect:   osc_disconnect,
960         o_brw:          osc_brw,
961         o_punch:        osc_punch,
962         o_enqueue:      osc_enqueue,
963         o_cancel:       osc_cancel
964 };
965
966 static int __init osc_init(void)
967 {
968         return class_register_type(&osc_obd_ops, LUSTRE_OSC_NAME);
969 }
970
971 static void __exit osc_exit(void)
972 {
973         class_unregister_type(LUSTRE_OSC_NAME);
974 }
975
976 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
977 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC) v1.0");
978 MODULE_LICENSE("GPL");
979
980 module_init(osc_init);
981 module_exit(osc_exit);