Whamcloud - gitweb
This commit contains probably 92% of the striping infrastructure
[fs/lustre-release.git] / lustre / osc / osc_request.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Copyright (C) 2001, 2002 Cluster File Systems, Inc.
5  *
6  *  This code is issued under the GNU General Public License.
7  *  See the file COPYING in this distribution
8  *
9  *  Author Peter Braam <braam@clusterfs.com>
10  *
11  *  This server is single threaded at present (but can easily be multi
12  *  threaded). For testing and management it is treated as an
13  *  obd_device, although it does not export a full OBD method table
14  *  (the requests are coming in over the wire, so object target
15  *  modules do not have a full method table.)
16  *
17  */
18
19 #define EXPORT_SYMTAB
20 #define DEBUG_SUBSYSTEM S_OSC
21
22 #include <linux/module.h>
23 #include <linux/lustre_dlm.h>
24 #include <linux/lustre_mds.h> /* for mds_objid */
25 #include <linux/obd_ost.h>
26 #include <linux/obd_lov.h>
27
28 static void osc_con2cl(struct lustre_handle *conn, struct ptlrpc_client **cl,
29                        struct ptlrpc_connection **connection)
30 {
31         struct osc_obd *osc = &class_conn2obd(conn)->u.osc;
32         *cl = osc->osc_client;
33         *connection = osc->osc_conn;
34 }
35
36 static void osc_con2dlmcl(struct lustre_handle *conn, struct ptlrpc_client **cl,
37                           struct ptlrpc_connection **connection)
38 {
39         struct osc_obd *osc = &class_conn2obd(conn)->u.osc;
40         *cl = osc->osc_ldlm_client;
41         *connection = osc->osc_conn;
42 }
43
44 static int osc_connect(struct lustre_handle *conn, struct obd_device *obd)
45 {
46         struct osc_obd *osc = &obd->u.osc;
47         struct obd_import *import;
48         struct ptlrpc_request *request;
49         char *tmp = osc->osc_target_uuid;
50         int rc, size = sizeof(osc->osc_target_uuid);
51         ENTRY;
52
53         OBD_ALLOC(import, sizeof(*import)); 
54         if (!import)
55                 RETURN(-ENOMEM);
56                   
57         MOD_INC_USE_COUNT;
58         rc = class_connect(conn, obd);
59         if (rc)
60                 RETURN(rc); 
61
62         request = ptlrpc_prep_req(osc->osc_client, osc->osc_conn, 
63                                   OST_CONNECT, 1, &size, &tmp);
64         if (!request)
65                 GOTO(out_disco, rc = -ENOMEM);
66
67         request->rq_level = LUSTRE_CONN_NEW;
68         request->rq_replen = lustre_msg_size(0, NULL);
69
70         rc = ptlrpc_queue_wait(request);
71         rc = ptlrpc_check_status(request, rc);
72         if (rc) {
73                 CERROR("%s failed: rc = %d\n", __FUNCTION__, rc);
74                 GOTO(out, rc);
75         }
76
77         /* XXX: Make this a handle */
78         osc->osc_connh.addr = request->rq_repmsg->addr;
79         osc->osc_connh.cookie = request->rq_repmsg->cookie;
80
81         EXIT;
82  out:
83         ptlrpc_free_req(request);
84  out_disco:
85         if (rc) {
86                 class_disconnect(conn);
87                 MOD_DEC_USE_COUNT;
88         }
89         return rc;
90 }
91
92 static int osc_disconnect(struct lustre_handle *conn)
93 {
94         struct ptlrpc_request *request;
95         struct ptlrpc_client *cl;
96         struct ptlrpc_connection *connection;
97         struct osc_obd *osc = &class_conn2obd(conn)->u.osc;
98         int rc;
99         ENTRY;
100
101         osc_con2cl(conn, &cl, &connection);
102         request = ptlrpc_prep_req2(cl, connection, &osc->osc_connh,
103                                   OST_DISCONNECT, 0, NULL, NULL);
104         if (!request)
105                 RETURN(-ENOMEM);
106         request->rq_replen = lustre_msg_size(0, NULL);
107
108         rc = ptlrpc_queue_wait(request);
109         if (rc) 
110                 GOTO(out, rc);
111         rc = class_disconnect(conn);
112         if (!rc)
113                 MOD_DEC_USE_COUNT;
114
115  out:
116         ptlrpc_free_req(request);
117         return rc;
118 }
119
120 static int osc_getattr(struct lustre_handle *conn, struct obdo *oa)
121 {
122         struct ptlrpc_request *request;
123         struct ptlrpc_client *cl;
124         struct osc_obd *osc = &class_conn2obd(conn)->u.osc;
125         struct ptlrpc_connection *connection;
126         struct ost_body *body;
127         int rc, size = sizeof(*body);
128         ENTRY;
129
130         osc_con2cl(conn, &cl, &connection);
131         request = ptlrpc_prep_req2(cl, connection, &osc->osc_connh, 
132                                    OST_GETATTR, 1, &size, NULL);
133         if (!request)
134                 RETURN(-ENOMEM);
135
136         body = lustre_msg_buf(request->rq_reqmsg, 0);
137         memcpy(&body->oa, oa, sizeof(*oa));
138         body->oa.o_valid = ~0;
139
140         request->rq_replen = lustre_msg_size(1, &size);
141
142         rc = ptlrpc_queue_wait(request);
143         rc = ptlrpc_check_status(request, rc);
144         if (rc) {
145                 CERROR("%s failed: rc = %d\n", __FUNCTION__, rc);
146                 GOTO(out, rc);
147         }
148
149         body = lustre_msg_buf(request->rq_repmsg, 0);
150         CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
151         if (oa)
152                 memcpy(oa, &body->oa, sizeof(*oa));
153
154         EXIT;
155  out:
156         ptlrpc_free_req(request);
157         return rc;
158 }
159
160 static int osc_open(struct lustre_handle *conn, struct obdo *oa,
161                     struct lov_stripe_md *md)
162 {
163         struct ptlrpc_request *request;
164         struct ptlrpc_client *cl;
165         struct osc_obd *osc = &class_conn2obd(conn)->u.osc;
166         struct ptlrpc_connection *connection;
167         struct ost_body *body;
168         int rc, size = sizeof(*body);
169         ENTRY;
170
171         osc_con2cl(conn, &cl, &connection);
172         request = ptlrpc_prep_req2(cl, connection, &osc->osc_connh, 
173                                    OST_OPEN, 1, &size, NULL);
174         if (!request)
175                 RETURN(-ENOMEM);
176
177         body = lustre_msg_buf(request->rq_reqmsg, 0);
178         memcpy(&body->oa, oa, sizeof(*oa));
179         body->oa.o_valid = (OBD_MD_FLMODE | OBD_MD_FLID);
180
181         request->rq_replen = lustre_msg_size(1, &size);
182
183         rc = ptlrpc_queue_wait(request);
184         rc = ptlrpc_check_status(request, rc);
185         if (rc)
186                 GOTO(out, rc);
187
188         body = lustre_msg_buf(request->rq_repmsg, 0);
189         CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
190         if (oa)
191                 memcpy(oa, &body->oa, sizeof(*oa));
192
193         EXIT;
194  out:
195         ptlrpc_free_req(request);
196         return rc;
197 }
198
199 static int osc_close(struct lustre_handle *conn, struct obdo *oa,
200                      struct lov_stripe_md *md)
201 {
202         struct ptlrpc_request *request;
203         struct ptlrpc_client *cl;
204         struct osc_obd *osc = &class_conn2obd(conn)->u.osc;
205         struct ptlrpc_connection *connection;
206         struct ost_body *body;
207         int rc, size = sizeof(*body);
208         ENTRY;
209
210         osc_con2cl(conn, &cl, &connection);
211         request = ptlrpc_prep_req2(cl, connection, &osc->osc_connh, 
212                                    OST_CLOSE, 1, &size, NULL);
213         if (!request)
214                 RETURN(-ENOMEM);
215
216         oa->o_id = md->lmd_object_id;
217         oa->o_mode = S_IFREG;
218         oa->o_valid = (OBD_MD_FLMODE | OBD_MD_FLID);
219         body = lustre_msg_buf(request->rq_reqmsg, 0);
220         memcpy(&body->oa, oa, sizeof(*oa));
221
222         request->rq_replen = lustre_msg_size(1, &size);
223
224         rc = ptlrpc_queue_wait(request);
225         rc = ptlrpc_check_status(request, rc);
226         if (rc)
227                 GOTO(out, rc);
228
229         body = lustre_msg_buf(request->rq_repmsg, 0);
230         CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
231         if (oa)
232                 memcpy(oa, &body->oa, sizeof(*oa));
233
234         EXIT;
235  out:
236         ptlrpc_free_req(request);
237         return rc;
238 }
239
240 static int osc_setattr(struct lustre_handle *conn, struct obdo *oa)
241 {
242         struct ptlrpc_request *request;
243         struct ptlrpc_client *cl;
244         struct osc_obd *osc = &class_conn2obd(conn)->u.osc;
245         struct ptlrpc_connection *connection;
246         struct ost_body *body;
247         int rc, size = sizeof(*body);
248         ENTRY;
249
250         osc_con2cl(conn, &cl, &connection);
251         request = ptlrpc_prep_req2(cl, connection, &osc->osc_connh, 
252                                   OST_SETATTR, 1, &size, NULL);
253         if (!request)
254                 RETURN(-ENOMEM);
255
256         body = lustre_msg_buf(request->rq_reqmsg, 0);
257         memcpy(&body->oa, oa, sizeof(*oa));
258
259         request->rq_replen = lustre_msg_size(1, &size);
260
261         rc = ptlrpc_queue_wait(request);
262         rc = ptlrpc_check_status(request, rc);
263         GOTO(out, rc);
264
265  out:
266         ptlrpc_free_req(request);
267         return rc;
268 }
269
270 static int osc_create(struct lustre_handle *conn, struct obdo *oa, 
271                       struct lov_stripe_md **ea)
272 {
273         struct ptlrpc_request *request;
274         struct ptlrpc_client *cl;
275         struct ptlrpc_connection *connection;
276         struct osc_obd *osc = &class_conn2obd(conn)->u.osc;
277         struct ost_body *body;
278         int rc, size = sizeof(*body);
279         ENTRY;
280
281         if (!oa) {
282                 CERROR("oa NULL\n");
283                 RETURN(-EINVAL);
284         }
285
286         if (!ea) { 
287                 LBUG();
288         }
289
290         if (!*ea) { 
291                 OBD_ALLOC(*ea, oa->o_easize);
292                 if (!*ea) 
293                         RETURN(-ENOMEM); 
294                 (*ea)->lmd_size = oa->o_easize;
295         }
296
297         osc_con2cl(conn, &cl, &connection);
298         request = ptlrpc_prep_req2(cl, connection, &osc->osc_connh, 
299                                   OST_CREATE, 1, &size, NULL);
300         if (!request)
301                 RETURN(-ENOMEM);
302
303         body = lustre_msg_buf(request->rq_reqmsg, 0);
304         memcpy(&body->oa, oa, sizeof(*oa));
305
306         request->rq_replen = lustre_msg_size(1, &size);
307
308         rc = ptlrpc_queue_wait(request);
309         rc = ptlrpc_check_status(request, rc);
310         if (rc)
311                 GOTO(out, rc);
312
313         body = lustre_msg_buf(request->rq_repmsg, 0);
314         memcpy(oa, &body->oa, sizeof(*oa));
315
316         (*ea)->lmd_object_id = oa->o_id;
317         (*ea)->lmd_stripe_count = 1;
318         EXIT;
319  out:
320         ptlrpc_free_req(request);
321         return rc;
322 }
323
324 static int osc_punch(struct lustre_handle *conn, struct obdo *oa, 
325                      struct lov_stripe_md *md, obd_size count,
326                      obd_off offset)
327 {
328         struct ptlrpc_request *request;
329         struct ptlrpc_client *cl;
330         struct ptlrpc_connection *connection;
331         struct osc_obd *osc = &class_conn2obd(conn)->u.osc;
332         struct ost_body *body;
333         int rc, size = sizeof(*body);
334         ENTRY;
335
336         if (!oa) {
337                 CERROR("oa NULL\n");
338                 RETURN(-EINVAL);
339         }
340         osc_con2cl(conn, &cl, &connection);
341         request = ptlrpc_prep_req2(cl, connection, &osc->osc_connh, 
342                                    OST_PUNCH, 1, &size, NULL);
343         if (!request)
344                 RETURN(-ENOMEM);
345
346         body = lustre_msg_buf(request->rq_reqmsg, 0);
347         memcpy(&body->oa, oa, sizeof(*oa));
348         body->oa.o_blocks = count;
349         body->oa.o_valid |= OBD_MD_FLBLOCKS;
350
351         request->rq_replen = lustre_msg_size(1, &size);
352
353         rc = ptlrpc_queue_wait(request);
354         rc = ptlrpc_check_status(request, rc);
355         if (rc)
356                 GOTO(out, rc);
357
358         body = lustre_msg_buf(request->rq_repmsg, 0);
359         memcpy(oa, &body->oa, sizeof(*oa));
360
361         EXIT;
362  out:
363         ptlrpc_free_req(request);
364         return rc;
365 }
366
367 static int osc_destroy(struct lustre_handle *conn, struct obdo *oa, 
368                        struct lov_stripe_md *ea)
369 {
370         struct ptlrpc_request *request;
371         struct ptlrpc_client *cl;
372         struct ptlrpc_connection *connection;
373         struct osc_obd *osc = &class_conn2obd(conn)->u.osc;
374         struct ost_body *body;
375         int rc, size = sizeof(*body);
376         ENTRY;
377
378         if (!oa) {
379                 CERROR("oa NULL\n");
380                 RETURN(-EINVAL);
381         }
382         osc_con2cl(conn, &cl, &connection);
383         request = ptlrpc_prep_req2(cl, connection, &osc->osc_connh, 
384                                    OST_DESTROY, 1, &size, NULL);
385         if (!request)
386                 RETURN(-ENOMEM);
387
388         body = lustre_msg_buf(request->rq_reqmsg, 0);
389         memcpy(&body->oa, oa, sizeof(*oa));
390         body->oa.o_valid = ~0;
391
392         request->rq_replen = lustre_msg_size(1, &size);
393
394         rc = ptlrpc_queue_wait(request);
395         rc = ptlrpc_check_status(request, rc);
396         if (rc)
397                 GOTO(out, rc);
398
399         body = lustre_msg_buf(request->rq_repmsg, 0);
400         memcpy(oa, &body->oa, sizeof(*oa));
401
402         EXIT;
403  out:
404         ptlrpc_free_req(request);
405         return rc;
406 }
407
408 struct osc_brw_cb_data {
409         struct page **buf;
410         bulk_callback_t callback;
411         void *cb_data;
412         void *obd_data;
413         size_t obd_size;
414 };
415
416 static void brw_finish(struct ptlrpc_bulk_desc *desc, void *data)
417 {
418         struct osc_brw_cb_data *cb_data = data;
419         int i;
420         ENTRY;
421
422         if (desc->b_flags & PTL_RPC_FL_INTR)
423                 CERROR("got signal\n");
424
425         for (i = 0; i < desc->b_page_count; i++)
426                 kunmap(cb_data->buf[i]);
427
428         if (cb_data->callback)
429                 (cb_data->callback)(desc, cb_data->cb_data);
430
431         ptlrpc_bulk_decref(desc);
432         if (cb_data->obd_data)
433                 OBD_FREE(cb_data->obd_data, cb_data->obd_size);
434         OBD_FREE(cb_data, sizeof(*cb_data));
435         EXIT;
436 }
437
438 static int osc_brw_read(struct lustre_handle *conn, 
439                         struct lov_stripe_md *md, obd_count page_count, 
440                         struct page **page_array,
441                         obd_size *count, obd_off *offset, obd_flag *flags,
442                         bulk_callback_t callback)
443 {
444         struct ptlrpc_client *cl;
445         struct ptlrpc_connection *connection;
446         struct ptlrpc_request *request = NULL;
447         struct ptlrpc_bulk_desc *desc = NULL;
448         struct osc_obd *osc = &class_conn2obd(conn)->u.osc;
449         struct ost_body *body;
450         int rc, j, size[3] = {sizeof(*body)};
451         void *iooptr, *nioptr;
452         struct osc_brw_cb_data *cb_data = NULL;
453
454         ENTRY;
455
456         size[1] = sizeof(struct obd_ioobj);
457         size[2] = page_count * sizeof(struct niobuf_remote);
458
459         osc_con2cl(conn, &cl, &connection);
460         request = ptlrpc_prep_req2(cl, connection, &osc->osc_connh, 
461                                    OST_BRW, 3, size, NULL);
462         if (!request)
463                 RETURN(-ENOMEM);
464
465         body = lustre_msg_buf(request->rq_reqmsg, 0);
466         body->data = OBD_BRW_READ;
467
468         desc = ptlrpc_prep_bulk(connection);
469         if (!desc)
470                 GOTO(out_free, rc = -ENOMEM);
471         desc->b_portal = OST_BULK_PORTAL;
472         desc->b_cb = brw_finish;
473         OBD_ALLOC(cb_data, sizeof(*cb_data));
474         if (!cb_data)
475                 GOTO(out_free, rc = -ENOMEM);
476         cb_data->buf = NULL;
477         cb_data->callback = callback;
478         desc->b_cb_data = cb_data;
479         /* XXX end almost identical to brw_write case */
480
481         iooptr = lustre_msg_buf(request->rq_reqmsg, 1);
482         nioptr = lustre_msg_buf(request->rq_reqmsg, 2);
483         ost_pack_ioo(&iooptr, md, page_count); 
484         for (j = 0; j < page_count; j++) { 
485                 struct ptlrpc_bulk_page *bulk;
486                 bulk = ptlrpc_prep_bulk_page(desc);
487                 if (bulk == NULL)
488                         GOTO(out_unmap, rc = -ENOMEM);
489                 
490                 spin_lock(&connection->c_lock);
491                 bulk->b_xid = ++connection->c_xid_out;
492                 spin_unlock(&connection->c_lock);
493                 
494                 bulk->b_buf = kmap(page_array[j]);
495                 bulk->b_page = page_array[j];
496                 bulk->b_buflen = PAGE_SIZE;
497                 ost_pack_niobuf(&nioptr, offset[j], count[j],
498                                 flags[j], bulk->b_xid);
499         }
500
501         /* 
502          * Register the bulk first, because the reply could arrive out of order,
503          * and we want to be ready for the bulk data.
504          *
505          * One reference is released by the bulk callback, the other when
506          * we finish sleeping on it (if we don't have a callback).
507          */
508         atomic_set(&desc->b_refcount, callback ? 1 : 2);
509         rc = ptlrpc_register_bulk(desc);
510         if (rc)
511                 GOTO(out_unmap, rc);
512
513         request->rq_replen = lustre_msg_size(1, size);
514         rc = ptlrpc_queue_wait(request);
515         rc = ptlrpc_check_status(request, rc);
516         if (rc) {
517                 ptlrpc_bulk_decref(desc);
518                 GOTO(out_unmap, rc);
519         }
520
521         /* Callbacks cause asynchronous handling. */
522         if (callback)
523                 RETURN(0);
524
525         l_wait_event_killable(desc->b_waitq, ptlrpc_check_bulk_received(desc));
526         ptlrpc_bulk_decref(desc);
527         if (desc->b_flags & PTL_RPC_FL_INTR)
528                 RETURN(-EINTR);
529
530         RETURN(0);
531         
532         /* Clean up on error. */
533  out_unmap:
534         for (j = 0; j < desc->b_page_count; j++)
535                         kunmap(pagearray[j]);
536  out_free:
537         if (cb_data)
538                 OBD_FREE(cb_data, sizeof(*cb_data));
539         ptlrpc_free_bulk(desc);
540         ptlrpc_free_req(request);
541         return rc;
542 }
543
544 static int osc_brw_write(struct lustre_handle *conn,
545                          struct lov_stripe_md *md, obd_count page_count,
546                          struct page **pagearray, obd_size *count,
547                          obd_off *offset, obd_flag *flags,
548                          bulk_callback_t callback)
549 {
550         struct ptlrpc_client *cl;
551         struct ptlrpc_connection *connection;
552         struct ptlrpc_request *request = NULL;
553         struct ptlrpc_bulk_desc *desc = NULL;
554         struct osc_obd *osc = &class_conn2obd(conn)->u.osc;
555         struct ost_body *body;
556         struct niobuf_local *local = NULL;
557         struct niobuf_remote *remote;
558         struct osc_brw_cb_data *cb_data = NULL;
559         int rc, j, size[3] = {sizeof(*body)};
560         void *iooptr, *nioptr;
561         ENTRY;
562
563         size[1] = sizeof(struct obd_ioobj);
564         size[2] = page_count * sizeof(*remote);
565
566         osc_con2cl(conn, &cl, &connection);
567         request = ptlrpc_prep_req2(cl, connection, &osc->osc_connh, 
568                                    OST_BRW, 3, size, NULL);
569         if (!request)
570                 RETURN(-ENOMEM);
571
572         body = lustre_msg_buf(request->rq_reqmsg, 0);
573         body->data = OBD_BRW_WRITE;
574
575         OBD_ALLOC(local, page_count * sizeof(*local));
576         if (!local)
577                 GOTO(out_free, rc = -ENOMEM);
578
579         desc = ptlrpc_prep_bulk(connection);
580         if (!desc)
581                 GOTO(out_free, rc = -ENOMEM);
582         desc->b_portal = OSC_BULK_PORTAL;
583         desc->b_cb = brw_finish;
584         OBD_ALLOC(cb_data, sizeof(*cb_data));
585         if (!cb_data)
586                 GOTO(out_free, rc = -ENOMEM);
587         cb_data->buf = pagearray;
588         cb_data->callback = callback;
589         desc->b_cb_data = cb_data;
590         /* XXX end almost identical to brw_read case */
591         cb_data->obd_data = local;
592         cb_data->obd_size = page_count * sizeof(*local);
593
594         iooptr = lustre_msg_buf(request->rq_reqmsg, 1);
595         nioptr = lustre_msg_buf(request->rq_reqmsg, 2);
596         ost_pack_ioo(&iooptr, md, page_count);
597         for (j = 0; j < page_count; j++) {
598                 local[j].addr = kmap(pagearray[j]);
599                 local[j].offset = offset[j];
600                 local[j].len = count[j];
601                 ost_pack_niobuf(&nioptr, offset[j], count[j],
602                                 flags[j], 0);
603         }
604
605         size[1] = page_count * sizeof(struct niobuf_remote);
606         request->rq_replen = lustre_msg_size(2, size);
607         rc = ptlrpc_queue_wait(request);
608         rc = ptlrpc_check_status(request, rc);
609         if (rc)
610                 GOTO(out_unmap, rc);
611
612         nioptr = lustre_msg_buf(request->rq_repmsg, 1);
613         if (!nioptr)
614                 GOTO(out_unmap, rc = -EINVAL);
615
616         if (request->rq_repmsg->buflens[1] !=
617             page_count * sizeof(struct niobuf_remote)) {
618                 CERROR("buffer length wrong (%d vs. %d)\n",
619                        request->rq_repmsg->buflens[1],
620                        page_count * sizeof(struct niobuf_remote));
621                 GOTO(out_unmap, rc = -EINVAL);
622         }
623
624
625         for (j = 0; j < page_count; j++) {
626                 struct ptlrpc_bulk_page *page;
627                 
628                 ost_unpack_niobuf(&nioptr, &remote);
629                 
630                 page = ptlrpc_prep_bulk_page(desc);
631                 if (!page)
632                         GOTO(out_unmap, rc = -ENOMEM);
633                 
634                 page->b_buf = (void *)(unsigned long)local[j].addr;
635                 page->b_buflen = local[j].len;
636                 page->b_xid = remote->xid;
637         }
638         
639         if (desc->b_page_count != page_count)
640                 LBUG();
641
642         /*
643          * One is released when the bulk is complete, the other when we finish
644          * waiting on it.  (Callback cases don't sleep, so only one ref for
645          * them.)
646          */
647         atomic_set(&desc->b_refcount, callback ? 1 : 2);
648         CDEBUG(D_PAGE, "Set refcount of %p to %d\n", desc,
649                atomic_read(&desc->b_refcount));
650         rc = ptlrpc_send_bulk(desc);
651         if (rc)
652                 GOTO(out_unmap, rc);
653
654         /* Callbacks cause asynchronous handling. */
655         if (callback)
656                 RETURN(0);
657
658         /* If there's no callback function, sleep here until complete. */
659         l_wait_event_killable(desc->b_waitq, ptlrpc_check_bulk_sent(desc));
660         ptlrpc_bulk_decref(desc);
661         if (desc->b_flags & PTL_RPC_FL_INTR)
662                 RETURN(-EINTR);
663         RETURN(0);
664
665         /* Clean up on error. */
666  out_unmap:
667         for (j = 0; j < page_count; j++)
668                 kunmap(pagearray[j]);
669
670  out_free:
671         if (cb_data)
672                 OBD_FREE(cb_data, sizeof(*cb_data));
673         if (local)
674                 OBD_FREE(local, page_count * sizeof(*local));
675         ptlrpc_free_bulk(desc);
676         ptlrpc_req_finished(request);
677         return rc;
678 }
679
680 static int osc_brw(int cmd, struct lustre_handle *conn, 
681                    struct lov_stripe_md *md, obd_count page_count, 
682                    struct page **page_array,
683                    obd_size *count, 
684                    obd_off *offset, 
685                    obd_flag *flags,
686                    void *callback)
687 {
688         if (cmd & OBD_BRW_WRITE)
689                 return osc_brw_write(conn, md, page_count, page_array, count,
690                                      offset, flags, (bulk_callback_t)callback);
691         else
692                 return osc_brw_read(conn, md, page_count, page_array, count,
693                                     offset, flags, (bulk_callback_t)callback);
694 }
695
696 static int osc_enqueue(struct lustre_handle *oconn,
697                        struct lustre_handle *parent_lock, __u64 *res_id,
698                        __u32 type, void *extentp, int extent_len, __u32 mode,
699                        int *flags, void *callback, void *data, int datalen,
700                        struct lustre_handle *lockh)
701 {
702         struct obd_device *obddev = class_conn2obd(oconn);
703         struct osc_obd *osc = &obddev->u.osc;
704         struct ptlrpc_connection *conn;
705         struct ptlrpc_client *cl;
706         struct ldlm_extent *extent = extentp;
707         int rc;
708         __u32 mode2;
709
710         /* Filesystem locks are given a bit of special treatment: first we
711          * fixup the lock to start and end on page boundaries. */
712         extent->start &= PAGE_MASK;
713         extent->end = (extent->end + PAGE_SIZE - 1) & PAGE_MASK;
714
715         /* Next, search for already existing extent locks that will cover us */
716         osc_con2dlmcl(oconn, &cl, &conn);
717         rc = ldlm_lock_match(obddev->obd_namespace, res_id, type, extent,
718                              sizeof(extent), mode, lockh);
719         if (rc == 1) {
720                 /* We already have a lock, and it's referenced */
721                 return 0;
722         }
723
724         /* Next, search for locks that we can upgrade (if we're trying to write)
725          * or are more than we need (if we're trying to read).  Because the VFS
726          * and page cache already protect us locally, lots of readers/writers
727          * can share a single PW lock. */
728         if (mode == LCK_PW)
729                 mode2 = LCK_PR;
730         else
731                 mode2 = LCK_PW;
732
733         rc = ldlm_lock_match(obddev->obd_namespace, res_id, type, extent,
734                              sizeof(extent), mode2, lockh);
735         if (rc == 1) {
736                 int flags;
737                 /* FIXME: This is not incredibly elegant, but it might
738                  * be more elegant than adding another parameter to
739                  * lock_match.  I want a second opinion. */
740                 ldlm_lock_addref(lockh, mode);
741                 ldlm_lock_decref(lockh, mode2);
742
743                 if (mode == LCK_PR)
744                         return 0;
745
746                 rc = ldlm_cli_convert(cl, lockh, &osc->osc_connh, 
747                                       mode, &flags);
748                 if (rc)
749                         LBUG();
750
751                 return rc;
752         }
753
754         rc = ldlm_cli_enqueue(cl, conn, &osc->osc_connh, 
755                               NULL, obddev->obd_namespace,
756                               parent_lock, res_id, type, extent, sizeof(extent),
757                               mode, flags, callback, data, datalen, lockh);
758         return rc;
759 }
760
761 static int osc_cancel(struct lustre_handle *oconn, __u32 mode,
762                       struct lustre_handle *lockh)
763 {
764         ENTRY;
765
766         ldlm_lock_decref(lockh, mode);
767
768         RETURN(0);
769 }
770
771 static int osc_setup(struct obd_device *obddev, obd_count len, void *buf)
772 {
773         struct obd_ioctl_data* data = buf;
774         struct osc_obd *osc = &obddev->u.osc;
775         char server_uuid[37];
776         int rc;
777         ENTRY;
778
779         if (data->ioc_inllen1 < 1) {
780                 CERROR("osc setup requires a TARGET UUID\n");
781                 RETURN(-EINVAL);
782         }
783
784         if (data->ioc_inllen1 > 37) {
785                 CERROR("osc TARGET UUID must be less than 38 characters\n");
786                 RETURN(-EINVAL);
787         }
788
789         if (data->ioc_inllen2 < 1) {
790                 CERROR("osc setup requires a SERVER UUID\n");
791                 RETURN(-EINVAL);
792         }
793
794         if (data->ioc_inllen2 > 37) {
795                 CERROR("osc SERVER UUID must be less than 38 characters\n");
796                 RETURN(-EINVAL);
797         }
798
799         memcpy(osc->osc_target_uuid, data->ioc_inlbuf1, data->ioc_inllen1);
800         memcpy(server_uuid, data->ioc_inlbuf2, MIN(data->ioc_inllen2,
801                                                    sizeof(server_uuid)));
802
803         osc->osc_conn = ptlrpc_uuid_to_connection(server_uuid);
804         if (!osc->osc_conn)
805                 RETURN(-ENOENT);
806
807         obddev->obd_namespace =
808                 ldlm_namespace_new("osc", LDLM_NAMESPACE_CLIENT);
809         if (obddev->obd_namespace == NULL)
810                 GOTO(out_conn, rc = -ENOMEM);
811
812         OBD_ALLOC(osc->osc_client, sizeof(*osc->osc_client));
813         if (osc->osc_client == NULL)
814                 GOTO(out_ns, rc = -ENOMEM);
815
816         OBD_ALLOC(osc->osc_ldlm_client, sizeof(*osc->osc_ldlm_client));
817         if (osc->osc_ldlm_client == NULL)
818                 GOTO(out_client, rc = -ENOMEM);
819
820         ptlrpc_init_client(NULL, NULL, OST_REQUEST_PORTAL, OSC_REPLY_PORTAL,
821                            osc->osc_client);
822         ptlrpc_init_client(NULL, NULL, LDLM_REQUEST_PORTAL, LDLM_REPLY_PORTAL,
823                            osc->osc_ldlm_client);
824         osc->osc_client->cli_name = "osc";
825         osc->osc_ldlm_client->cli_name = "ldlm";
826
827         MOD_INC_USE_COUNT;
828         RETURN(0);
829
830  out_client:
831         OBD_FREE(osc->osc_client, sizeof(*osc->osc_client));
832  out_ns:
833         ldlm_namespace_free(obddev->obd_namespace);
834  out_conn:
835         ptlrpc_put_connection(osc->osc_conn);
836         return rc;
837 }
838
839 static int osc_cleanup(struct obd_device * obddev)
840 {
841         struct osc_obd *osc = &obddev->u.osc;
842
843         ldlm_namespace_free(obddev->obd_namespace);
844
845         ptlrpc_cleanup_client(osc->osc_client);
846         OBD_FREE(osc->osc_client, sizeof(*osc->osc_client));
847         ptlrpc_cleanup_client(osc->osc_ldlm_client);
848         OBD_FREE(osc->osc_ldlm_client, sizeof(*osc->osc_ldlm_client));
849         ptlrpc_put_connection(osc->osc_conn);
850
851         MOD_DEC_USE_COUNT;
852         return 0;
853 }
854
855 static int osc_statfs(struct lustre_handle *conn, struct statfs *sfs)
856 {
857         struct ptlrpc_request *request;
858         struct ptlrpc_client *cl;
859         struct osc_obd *osc = &class_conn2obd(conn)->u.osc;
860         struct ptlrpc_connection *connection;
861         struct obd_statfs *osfs;
862         int rc, size = sizeof(*osfs);
863         ENTRY;
864
865         osc_con2cl(conn, &cl, &connection);
866         request = ptlrpc_prep_req2(cl, connection, &osc->osc_connh,
867                                    OST_STATFS, 0, NULL, NULL);
868         if (!request)
869                 RETURN(-ENOMEM);
870
871         request->rq_replen = lustre_msg_size(1, &size);
872
873         rc = ptlrpc_queue_wait(request);
874         rc = ptlrpc_check_status(request, rc);
875         if (rc) {
876                 CERROR("%s failed: rc = %d\n", __FUNCTION__, rc);
877                 GOTO(out, rc);
878         }
879
880         osfs = lustre_msg_buf(request->rq_repmsg, 0);
881         obd_statfs_unpack(osfs, sfs);
882
883         EXIT;
884  out:
885         ptlrpc_free_req(request);
886         return rc;
887 }
888
889 struct obd_ops osc_obd_ops = {
890         o_setup:        osc_setup,
891         o_cleanup:      osc_cleanup,
892         o_statfs:       osc_statfs,
893         o_create:       osc_create,
894         o_destroy:      osc_destroy,
895         o_getattr:      osc_getattr,
896         o_setattr:      osc_setattr,
897         o_open:         osc_open,
898         o_close:        osc_close,
899         o_connect:      osc_connect,
900         o_disconnect:   osc_disconnect,
901         o_brw:          osc_brw,
902         o_punch:        osc_punch,
903         o_enqueue:      osc_enqueue,
904         o_cancel:       osc_cancel
905 };
906
907 static int __init osc_init(void)
908 {
909         return class_register_type(&osc_obd_ops, LUSTRE_OSC_NAME);
910 }
911
912 static void __exit osc_exit(void)
913 {
914         class_unregister_type(LUSTRE_OSC_NAME);
915 }
916
917 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
918 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC) v1.0");
919 MODULE_LICENSE("GPL");
920
921 module_init(osc_init);
922 module_exit(osc_exit);