Whamcloud - gitweb
- More changes in the connection handle stuff. We are back to where
[fs/lustre-release.git] / lustre / osc / osc_request.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Copryright (C) 2001, 2002 Cluster File Systems, Inc.
5  *
6  *  This code is issued under the GNU General Public License.
7  *  See the file COPYING in this distribution
8  *
9  *  Author Peter Braam <braam@clusterfs.com>
10  *
11  *  This server is single threaded at present (but can easily be multi
12  *  threaded). For testing and management it is treated as an
13  *  obd_device, although it does not export a full OBD method table
14  *  (the requests are coming in over the wire, so object target
15  *  modules do not have a full method table.)
16  *
17  */
18
19 #define EXPORT_SYMTAB
20 #define DEBUG_SUBSYSTEM S_OSC
21
22 #include <linux/module.h>
23 #include <linux/lustre_dlm.h>
24 #include <linux/lustre_mds.h> /* for mds_objid */
25 #include <linux/obd_ost.h>
26 #include <linux/obd_lov.h>
27
28 static void osc_obd2cl(struct obd_device *obd, struct ptlrpc_client **cl,
29                        struct ptlrpc_connection **connection)
30 {
31         struct osc_obd *osc = &obd->u.osc;
32         *cl = osc->osc_client;
33         *connection = osc->osc_conn;
34 }
35
36 static void osc_con2cl(struct obd_conn *conn, struct ptlrpc_client **cl,
37                        struct ptlrpc_connection **connection)
38 {
39         struct osc_obd *osc = &gen_conn2obd(conn)->u.osc;
40         *cl = osc->osc_client;
41         *connection = osc->osc_conn;
42 }
43
44 static void osc_con2dlmcl(struct obd_conn *conn, struct ptlrpc_client **cl,
45                           struct ptlrpc_connection **connection)
46 {
47         struct osc_obd *osc = &gen_conn2obd(conn)->u.osc;
48         *cl = osc->osc_ldlm_client;
49         *connection = osc->osc_conn;
50 }
51
52 static int osc_connect(struct obd_conn *conn, struct obd_device *obd)
53 {
54         struct osc_obd *osc = &obd->u.osc;
55         struct obd_import *import;
56         struct ptlrpc_request *request;
57         struct ptlrpc_client *cl;
58         struct ptlrpc_connection *connection;
59         struct ost_body *body;
60         char *tmp = osc->osc_target_uuid;
61         int rc, size = sizeof(osc->osc_target_uuid);
62         ENTRY;
63
64         OBD_ALLOC(import, sizeof(*import)); 
65         if (!import)
66                 RETURN(-ENOMEM);
67                   
68         MOD_INC_USE_COUNT;
69         rc = gen_connect(conn, obd);
70         if (rc) 
71                 GOTO(out, rc);
72
73         osc_obd2cl(obd, &cl, &connection);
74         request = ptlrpc_prep_req(cl, connection, OST_CONNECT, 1, &size, &tmp);
75         if (!request)
76                 RETURN(-ENOMEM);
77
78         size = sizeof(*body);
79         request->rq_replen = lustre_msg_size(1, &size);
80
81         rc = ptlrpc_queue_wait(request);
82         rc = ptlrpc_check_status(request, rc);
83         if (rc) {
84                 CERROR("%s failed: rc = %d\n", __FUNCTION__, rc);
85                 GOTO(out, rc);
86         }
87
88         body = lustre_msg_buf(request->rq_repmsg, 0);
89         CDEBUG(D_INODE, "received connid %d\n", body->connid);
90
91
92         /* XXX: Make this a handle */
93         osc->osc_connh.addr = request->rq_repmsg->addr;
94         osc->osc_connh.cookie = request->rq_repmsg->cookie;
95         /* This might be redundant. */
96         cl->cli_target_devno = request->rq_repmsg->target_id;
97         osc->osc_ldlm_client->cli_target_devno = cl->cli_target_devno;
98         conn->oc_id = body->connid;
99         EXIT;
100  out:
101         ptlrpc_free_req(request);
102         if (rc)
103                 MOD_DEC_USE_COUNT;
104         return rc;
105 }
106
107 static int osc_disconnect(struct obd_conn *conn)
108 {
109         struct ptlrpc_request *request;
110         struct ptlrpc_client *cl;
111         struct ptlrpc_connection *connection;
112         struct ost_body *body;
113         struct osc_obd *osc = &gen_conn2obd(conn)->u.osc;
114         int rc, size = sizeof(*body);
115         ENTRY;
116
117         osc_con2cl(conn, &cl, &connection);
118         request = ptlrpc_prep_req2(cl, connection, &osc->osc_connh,
119                                   OST_DISCONNECT, 1, &size, NULL);
120         if (!request)
121                 RETURN(-ENOMEM);
122
123         body = lustre_msg_buf(request->rq_reqmsg, 0);
124         body->connid = conn->oc_id;
125
126         request->rq_replen = lustre_msg_size(0, NULL);
127
128         rc = ptlrpc_queue_wait(request);
129         if (rc) 
130                 GOTO(out, rc);
131         rc = gen_disconnect(conn);
132         if (!rc)
133                 MOD_DEC_USE_COUNT;
134
135  out:
136         ptlrpc_free_req(request);
137         return rc;
138 }
139
140 static int osc_getattr(struct obd_conn *conn, struct obdo *oa)
141 {
142         struct ptlrpc_request *request;
143         struct ptlrpc_client *cl;
144         struct osc_obd *osc = &gen_conn2obd(conn)->u.osc;
145         struct ptlrpc_connection *connection;
146         struct ost_body *body;
147         int rc, size = sizeof(*body);
148         ENTRY;
149
150         osc_con2cl(conn, &cl, &connection);
151         request = ptlrpc_prep_req2(cl, connection, &osc->osc_connh, 
152                                    OST_GETATTR, 1, &size, NULL);
153         if (!request)
154                 RETURN(-ENOMEM);
155
156         body = lustre_msg_buf(request->rq_reqmsg, 0);
157         memcpy(&body->oa, oa, sizeof(*oa));
158         body->connid = conn->oc_id;
159         body->oa.o_valid = ~0;
160
161         request->rq_replen = lustre_msg_size(1, &size);
162
163         rc = ptlrpc_queue_wait(request);
164         rc = ptlrpc_check_status(request, rc);
165         if (rc) {
166                 CERROR("%s failed: rc = %d\n", __FUNCTION__, rc);
167                 GOTO(out, rc);
168         }
169
170         body = lustre_msg_buf(request->rq_repmsg, 0);
171         CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
172         if (oa)
173                 memcpy(oa, &body->oa, sizeof(*oa));
174
175         EXIT;
176  out:
177         ptlrpc_free_req(request);
178         return 0;
179 }
180
181 static int osc_open(struct obd_conn *conn, struct obdo *oa)
182 {
183         struct ptlrpc_request *request;
184         struct ptlrpc_client *cl;
185         struct osc_obd *osc = &gen_conn2obd(conn)->u.osc;
186         struct ptlrpc_connection *connection;
187         struct ost_body *body;
188         int rc, size = sizeof(*body);
189         ENTRY;
190
191         osc_con2cl(conn, &cl, &connection);
192         request = ptlrpc_prep_req2(cl, connection, &osc->osc_connh, 
193                                    OST_OPEN, 1, &size, NULL);
194         if (!request)
195                 RETURN(-ENOMEM);
196
197         body = lustre_msg_buf(request->rq_reqmsg, 0);
198         memcpy(&body->oa, oa, sizeof(*oa));
199         body->connid = conn->oc_id;
200         body->oa.o_valid = (OBD_MD_FLMODE | OBD_MD_FLID);
201
202         request->rq_replen = lustre_msg_size(1, &size);
203
204         rc = ptlrpc_queue_wait(request);
205         rc = ptlrpc_check_status(request, rc);
206         if (rc)
207                 GOTO(out, rc);
208
209         body = lustre_msg_buf(request->rq_repmsg, 0);
210         CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
211         if (oa)
212                 memcpy(oa, &body->oa, sizeof(*oa));
213
214         EXIT;
215  out:
216         ptlrpc_free_req(request);
217         return 0;
218 }
219
220 static int osc_close(struct obd_conn *conn, struct obdo *oa)
221 {
222         struct ptlrpc_request *request;
223         struct ptlrpc_client *cl;
224         struct osc_obd *osc = &gen_conn2obd(conn)->u.osc;
225         struct ptlrpc_connection *connection;
226         struct ost_body *body;
227         int rc, size = sizeof(*body);
228         ENTRY;
229
230         osc_con2cl(conn, &cl, &connection);
231         request = ptlrpc_prep_req2(cl, connection, &osc->osc_connh, 
232                                    OST_CLOSE, 1, &size, NULL);
233         if (!request)
234                 RETURN(-ENOMEM);
235
236         body = lustre_msg_buf(request->rq_reqmsg, 0);
237         memcpy(&body->oa, oa, sizeof(*oa));
238         body->connid = conn->oc_id;
239
240         request->rq_replen = lustre_msg_size(1, &size);
241
242         rc = ptlrpc_queue_wait(request);
243         rc = ptlrpc_check_status(request, rc);
244         if (rc)
245                 GOTO(out, rc);
246
247         body = lustre_msg_buf(request->rq_repmsg, 0);
248         CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
249         if (oa)
250                 memcpy(oa, &body->oa, sizeof(*oa));
251
252         EXIT;
253  out:
254         ptlrpc_free_req(request);
255         return 0;
256 }
257
258 static int osc_setattr(struct obd_conn *conn, struct obdo *oa)
259 {
260         struct ptlrpc_request *request;
261         struct ptlrpc_client *cl;
262         struct osc_obd *osc = &gen_conn2obd(conn)->u.osc;
263         struct ptlrpc_connection *connection;
264         struct ost_body *body;
265         int rc, size = sizeof(*body);
266         ENTRY;
267
268         osc_con2cl(conn, &cl, &connection);
269         request = ptlrpc_prep_req2(cl, connection, &osc->osc_connh, 
270                                   OST_SETATTR, 1, &size, NULL);
271         if (!request)
272                 RETURN(-ENOMEM);
273
274         body = lustre_msg_buf(request->rq_reqmsg, 0);
275         memcpy(&body->oa, oa, sizeof(*oa));
276         body->connid = conn->oc_id;
277
278         request->rq_replen = lustre_msg_size(1, &size);
279
280         rc = ptlrpc_queue_wait(request);
281         rc = ptlrpc_check_status(request, rc);
282         GOTO(out, rc);
283
284  out:
285         ptlrpc_free_req(request);
286         return 0;
287 }
288
289 static int osc_create(struct obd_conn *conn, struct obdo *oa)
290 {
291         struct ptlrpc_request *request;
292         struct ptlrpc_client *cl;
293         struct ptlrpc_connection *connection;
294         struct osc_obd *osc = &gen_conn2obd(conn)->u.osc;
295         struct ost_body *body;
296         struct mds_objid *objid;
297         struct lov_object_id *lov_id;
298         int rc, size = sizeof(*body);
299         ENTRY;
300
301         if (!oa) {
302                 CERROR("oa NULL\n");
303                 RETURN(-EINVAL);
304         }
305         osc_con2cl(conn, &cl, &connection);
306         request = ptlrpc_prep_req2(cl, connection, &osc->osc_connh, 
307                                   OST_CREATE, 1, &size, NULL);
308         if (!request)
309                 RETURN(-ENOMEM);
310
311         body = lustre_msg_buf(request->rq_reqmsg, 0);
312         memcpy(&body->oa, oa, sizeof(*oa));
313         body->connid = conn->oc_id;
314
315         request->rq_replen = lustre_msg_size(1, &size);
316
317         rc = ptlrpc_queue_wait(request);
318         rc = ptlrpc_check_status(request, rc);
319         if (rc)
320                 GOTO(out, rc);
321
322         body = lustre_msg_buf(request->rq_repmsg, 0);
323         memcpy(oa, &body->oa, sizeof(*oa));
324
325         memset(oa->o_inline, 0, sizeof(oa->o_inline));
326         objid = (struct mds_objid *)oa->o_inline;
327         objid->mo_lov_md.lmd_object_id = oa->o_id;
328         objid->mo_lov_md.lmd_stripe_count = 1;
329         lov_id = (struct lov_object_id *)(oa->o_inline + sizeof(*objid));
330         lov_id->l_device_id = 0;
331         lov_id->l_object_id = oa->o_id;
332
333         EXIT;
334  out:
335         ptlrpc_free_req(request);
336         return 0;
337 }
338
339 static int osc_punch(struct obd_conn *conn, struct obdo *oa, obd_size count,
340                      obd_off offset)
341 {
342         struct ptlrpc_request *request;
343         struct ptlrpc_client *cl;
344         struct ptlrpc_connection *connection;
345         struct osc_obd *osc = &gen_conn2obd(conn)->u.osc;
346         struct ost_body *body;
347         int rc, size = sizeof(*body);
348         ENTRY;
349
350         if (!oa) {
351                 CERROR("oa NULL\n");
352                 RETURN(-EINVAL);
353         }
354         osc_con2cl(conn, &cl, &connection);
355         request = ptlrpc_prep_req2(cl, connection, &osc->osc_connh, 
356                                    OST_PUNCH, 1, &size, NULL);
357         if (!request)
358                 RETURN(-ENOMEM);
359
360         body = lustre_msg_buf(request->rq_reqmsg, 0);
361         memcpy(&body->oa, oa, sizeof(*oa));
362         body->connid = conn->oc_id;
363         body->oa.o_blocks = count;
364         body->oa.o_valid |= OBD_MD_FLBLOCKS;
365
366         request->rq_replen = lustre_msg_size(1, &size);
367
368         rc = ptlrpc_queue_wait(request);
369         rc = ptlrpc_check_status(request, rc);
370         if (rc)
371                 GOTO(out, rc);
372
373         body = lustre_msg_buf(request->rq_repmsg, 0);
374         memcpy(oa, &body->oa, sizeof(*oa));
375
376         EXIT;
377  out:
378         ptlrpc_free_req(request);
379         return 0;
380 }
381
382 static int osc_destroy(struct obd_conn *conn, struct obdo *oa)
383 {
384         struct ptlrpc_request *request;
385         struct ptlrpc_client *cl;
386         struct ptlrpc_connection *connection;
387         struct osc_obd *osc = &gen_conn2obd(conn)->u.osc;
388         struct ost_body *body;
389         int rc, size = sizeof(*body);
390         ENTRY;
391
392         if (!oa) {
393                 CERROR("oa NULL\n");
394                 RETURN(-EINVAL);
395         }
396         osc_con2cl(conn, &cl, &connection);
397         request = ptlrpc_prep_req2(cl, connection, &osc->osc_connh, 
398                                    OST_DESTROY, 1, &size, NULL);
399         if (!request)
400                 RETURN(-ENOMEM);
401
402         body = lustre_msg_buf(request->rq_reqmsg, 0);
403         memcpy(&body->oa, oa, sizeof(*oa));
404         body->connid = conn->oc_id;
405         body->oa.o_valid = ~0;
406
407         request->rq_replen = lustre_msg_size(1, &size);
408
409         rc = ptlrpc_queue_wait(request);
410         rc = ptlrpc_check_status(request, rc);
411         if (rc)
412                 GOTO(out, rc);
413
414         body = lustre_msg_buf(request->rq_repmsg, 0);
415         memcpy(oa, &body->oa, sizeof(*oa));
416
417         EXIT;
418  out:
419         ptlrpc_free_req(request);
420         return 0;
421 }
422
423 struct osc_brw_cb_data {
424         struct page **buf;
425         struct ptlrpc_request *req;
426         bulk_callback_t callback;
427         void *cb_data;
428 };
429
430 static void brw_read_finish(struct ptlrpc_bulk_desc *desc, void *data)
431 {
432         struct osc_brw_cb_data *cb_data = data;
433
434         if (desc->b_flags & PTL_RPC_FL_INTR)
435                 CERROR("got signal\n");
436
437         (cb_data->callback)(desc, cb_data->cb_data);
438
439         ptlrpc_free_bulk(desc);
440         ptlrpc_free_req(cb_data->req);
441
442         OBD_FREE(cb_data, sizeof(*cb_data));
443 }
444
445 static int osc_brw_read(struct obd_conn *conn, obd_count num_oa,
446                         struct obdo **oa, obd_count *oa_bufs, struct page **buf,
447                         obd_size *count, obd_off *offset, obd_flag *flags,
448                         bulk_callback_t callback)
449 {
450         struct ptlrpc_client *cl;
451         struct ptlrpc_connection *connection;
452         struct osc_obd *osc = &gen_conn2obd(conn)->u.osc;
453         struct ptlrpc_request *request;
454         struct ost_body *body;
455         struct list_head *tmp;
456         int pages, rc, i, j, size[3] = {sizeof(*body)};
457         void *ptr1, *ptr2;
458         struct ptlrpc_bulk_desc *desc;
459         ENTRY;
460
461         size[1] = num_oa * sizeof(struct obd_ioobj);
462         pages = 0;
463         for (i = 0; i < num_oa; i++)
464                 pages += oa_bufs[i];
465         size[2] = pages * sizeof(struct niobuf_remote);
466
467         osc_con2cl(conn, &cl, &connection);
468         request = ptlrpc_prep_req2(cl, connection, &osc->osc_connh, 
469                                   OST_BRW, 3, size, NULL);
470         if (!request)
471                 GOTO(out, rc = -ENOMEM);
472
473         body = lustre_msg_buf(request->rq_reqmsg, 0);
474         body->data = OBD_BRW_READ;
475
476         desc = ptlrpc_prep_bulk(connection);
477         if (!desc)
478                 GOTO(out2, rc = -ENOMEM);
479         desc->b_portal = OST_BULK_PORTAL;
480
481         ptr1 = lustre_msg_buf(request->rq_reqmsg, 1);
482         ptr2 = lustre_msg_buf(request->rq_reqmsg, 2);
483         for (pages = 0, i = 0; i < num_oa; i++) {
484                 ost_pack_ioo(&ptr1, oa[i], oa_bufs[i]);
485                 /* FIXME: this inner loop is wrong for multiple OAs */
486                 for (j = 0; j < oa_bufs[i]; j++, pages++) {
487                         struct ptlrpc_bulk_page *bulk;
488                         bulk = ptlrpc_prep_bulk_page(desc);
489                         if (bulk == NULL)
490                                 GOTO(out3, rc = -ENOMEM);
491
492                         spin_lock(&connection->c_lock);
493                         bulk->b_xid = ++connection->c_xid_out;
494                         spin_unlock(&connection->c_lock);
495
496                         bulk->b_buf = kmap(buf[pages]);
497                         bulk->b_page = buf[pages];
498                         bulk->b_buflen = PAGE_SIZE;
499                         ost_pack_niobuf(&ptr2, offset[pages], count[pages],
500                                         flags[pages], bulk->b_xid);
501                 }
502         }
503
504         rc = ptlrpc_register_bulk(desc);
505         if (rc)
506                 GOTO(out3, rc);
507
508         request->rq_replen = lustre_msg_size(1, size);
509         rc = ptlrpc_queue_wait(request);
510         rc = ptlrpc_check_status(request, rc);
511         if (rc)
512                 ptlrpc_abort_bulk(desc);
513         GOTO(out3, rc);
514
515  out3:
516         list_for_each(tmp, &desc->b_page_list) {
517                 struct ptlrpc_bulk_page *bulk;
518                 bulk = list_entry(tmp, struct ptlrpc_bulk_page, b_link);
519                 if (bulk->b_page != NULL)
520                         kunmap(bulk->b_page);
521         }
522         ptlrpc_free_bulk(desc);
523  out2:
524         ptlrpc_free_req(request);
525  out:
526         return rc;
527 }
528
529 static void brw_write_finish(struct ptlrpc_bulk_desc *desc, void *data)
530 {
531         struct osc_brw_cb_data *cb_data = data;
532         int i;
533         ENTRY;
534
535         if (desc->b_flags & PTL_RPC_FL_INTR)
536                 CERROR("got signal\n");
537
538         for (i = 0; i < desc->b_page_count; i++)
539                 kunmap(cb_data->buf[i]);
540
541         (cb_data->callback)(desc, cb_data->cb_data);
542
543         ptlrpc_free_bulk(desc);
544         ptlrpc_free_req(cb_data->req);
545
546         OBD_FREE(cb_data, sizeof(*cb_data));
547         EXIT;
548 }
549
550 static int osc_brw_write(struct obd_conn *conn, obd_count num_oa,
551                          struct obdo **oa, obd_count *oa_bufs,
552                          struct page **pagearray, obd_size *count,
553                          obd_off *offset, obd_flag *flags,
554                          bulk_callback_t callback)
555 {
556         struct ptlrpc_client *cl;
557         struct ptlrpc_connection *connection;
558         struct ptlrpc_request *request;
559         struct ptlrpc_bulk_desc *desc;
560         struct osc_obd *osc = &gen_conn2obd(conn)->u.osc;
561         struct obd_ioobj ioo;
562         struct ost_body *body;
563         struct niobuf_local *local;
564         struct niobuf_remote *remote;
565         struct osc_brw_cb_data *cb_data;
566         long pages;
567         int rc, i, j, size[3] = {sizeof(*body)};
568         void *ptr1, *ptr2;
569         ENTRY;
570
571         size[1] = num_oa * sizeof(ioo);
572         pages = 0;
573         for (i = 0; i < num_oa; i++)
574                 pages += oa_bufs[i];
575         size[2] = pages * sizeof(*remote);
576
577         OBD_ALLOC(local, pages * sizeof(*local));
578         if (local == NULL)
579                 RETURN(-ENOMEM);
580
581         osc_con2cl(conn, &cl, &connection);
582         request = ptlrpc_prep_req2(cl, connection, &osc->osc_connh, 
583                                   OST_BRW, 3, size, NULL);
584         if (!request)
585                 GOTO(out, rc = -ENOMEM);
586         body = lustre_msg_buf(request->rq_reqmsg, 0);
587         body->data = OBD_BRW_WRITE;
588
589         ptr1 = lustre_msg_buf(request->rq_reqmsg, 1);
590         ptr2 = lustre_msg_buf(request->rq_reqmsg, 2);
591         for (pages = 0, i = 0; i < num_oa; i++) {
592                 ost_pack_ioo(&ptr1, oa[i], oa_bufs[i]);
593                 for (j = 0; j < oa_bufs[i]; j++, pages++) {
594                         local[pages].addr = kmap(pagearray[pages]);
595                         local[pages].offset = offset[pages];
596                         local[pages].len = count[pages];
597                         ost_pack_niobuf(&ptr2, offset[pages], count[pages],
598                                         flags[pages], 0);
599                 }
600         }
601
602         size[1] = pages * sizeof(struct niobuf_remote);
603         request->rq_replen = lustre_msg_size(2, size);
604
605         rc = ptlrpc_queue_wait(request);
606         rc = ptlrpc_check_status(request, rc);
607         if (rc)
608                 GOTO(out2, rc);
609
610         ptr2 = lustre_msg_buf(request->rq_repmsg, 1);
611         if (ptr2 == NULL)
612                 GOTO(out2, rc = -EINVAL);
613
614         if (request->rq_repmsg->buflens[1] !=
615             pages * sizeof(struct niobuf_remote)) {
616                 CERROR("buffer length wrong (%d vs. %ld)\n",
617                        request->rq_repmsg->buflens[1],
618                        pages * sizeof(struct niobuf_remote));
619                 GOTO(out2, rc = -EINVAL);
620         }
621
622         desc = ptlrpc_prep_bulk(connection);
623         desc->b_portal = OSC_BULK_PORTAL;
624         if (callback) {
625                 desc->b_cb = brw_write_finish;
626                 OBD_ALLOC(cb_data, sizeof(*cb_data));
627                 cb_data->buf = pagearray;
628                 cb_data->callback = callback;
629                 desc->b_cb_data = cb_data;
630         }
631
632         for (pages = 0, i = 0; i < num_oa; i++) {
633                 for (j = 0; j < oa_bufs[i]; j++, pages++) {
634                         struct ptlrpc_bulk_page *page;
635
636                         ost_unpack_niobuf(&ptr2, &remote);
637
638                         page = ptlrpc_prep_bulk_page(desc);
639                         if (page == NULL)
640                                 GOTO(out3, rc = -ENOMEM);
641
642                         page->b_buf = (void *)(unsigned long)local[pages].addr;
643                         page->b_buflen = local[pages].len;
644                         page->b_xid = remote->xid;
645                 }
646         }
647
648         if (desc->b_page_count != pages)
649                 LBUG();
650
651         rc = ptlrpc_send_bulk(desc);
652         if (rc)
653                 GOTO(out3, rc);
654         if (callback)
655                 GOTO(out, rc);
656
657         /* If there's no callback function, sleep here until complete. */
658         wait_event_interruptible(desc->b_waitq, ptlrpc_check_bulk_sent(desc));
659         if (desc->b_flags & PTL_RPC_FL_INTR)
660                 rc = -EINTR;
661
662         GOTO(out3, rc);
663
664  out3:
665         ptlrpc_free_bulk(desc);
666  out2:
667         ptlrpc_free_req(request);
668         for (pages = 0, i = 0; i < num_oa; i++)
669                 for (j = 0; j < oa_bufs[i]; j++, pages++)
670                         kunmap(pagearray[pages]);
671  out:
672         OBD_FREE(local, pages * sizeof(*local));
673
674         return rc;
675 }
676
677 static int osc_brw(int cmd, struct obd_conn *conn, obd_count num_oa,
678                    struct obdo **oa, obd_count *oa_bufs, struct page **buf,
679                    obd_size *count, obd_off *offset, obd_flag *flags,
680                    void *callback)
681 {
682         if (num_oa != 1)
683                 LBUG();
684
685         if (cmd & OBD_BRW_WRITE)
686                 return osc_brw_write(conn, num_oa, oa, oa_bufs, buf, count,
687                                      offset, flags, (bulk_callback_t)callback);
688         else
689                 return osc_brw_read(conn, num_oa, oa, oa_bufs, buf, count,
690                                     offset, flags, (bulk_callback_t)callback);
691 }
692
693 static int osc_enqueue(struct obd_conn *oconn,
694                        struct lustre_handle *parent_lock, __u64 *res_id,
695                        __u32 type, void *extentp, int extent_len, __u32 mode,
696                        int *flags, void *callback, void *data, int datalen,
697                        struct lustre_handle *lockh)
698 {
699         struct obd_device *obddev = gen_conn2obd(oconn);
700         struct ptlrpc_connection *conn;
701         struct osc_obd *osc = &gen_conn2obd(oconn)->u.osc;
702         struct ptlrpc_client *cl;
703         struct ldlm_extent *extent = extentp;
704         int rc;
705         __u32 mode2;
706
707         /* Filesystem locks are given a bit of special treatment: first we
708          * fixup the lock to start and end on page boundaries. */
709         extent->start &= PAGE_MASK;
710         extent->end = (extent->end + PAGE_SIZE - 1) & PAGE_MASK;
711
712         /* Next, search for already existing extent locks that will cover us */
713         osc_con2dlmcl(oconn, &cl, &conn);
714         rc = ldlm_lock_match(obddev->obd_namespace, res_id, type, extent,
715                              sizeof(extent), mode, lockh);
716         if (rc == 1) {
717                 /* We already have a lock, and it's referenced */
718                 return 0;
719         }
720
721         /* Next, search for locks that we can upgrade (if we're trying to write)
722          * or are more than we need (if we're trying to read).  Because the VFS
723          * and page cache already protect us locally, lots of readers/writers
724          * can share a single PW lock. */
725         if (mode == LCK_PW)
726                 mode2 = LCK_PR;
727         else
728                 mode2 = LCK_PW;
729
730         rc = ldlm_lock_match(obddev->obd_namespace, res_id, type, extent,
731                              sizeof(extent), mode2, lockh);
732         if (rc == 1) {
733                 int flags;
734                 /* FIXME: This is not incredibly elegant, but it might
735                  * be more elegant than adding another parameter to
736                  * lock_match.  I want a second opinion. */
737                 ldlm_lock_addref(lockh, mode);
738                 ldlm_lock_decref(lockh, mode2);
739
740                 if (mode == LCK_PR)
741                         return 0;
742
743                 rc = ldlm_cli_convert(cl, lockh, mode, &flags);
744                 if (rc)
745                         LBUG();
746
747                 return rc;
748         }
749
750         rc = ldlm_cli_enqueue(cl, conn, NULL, obddev->obd_namespace,
751                               parent_lock, res_id, type, extent, sizeof(extent),
752                               mode, flags, callback, data, datalen, lockh);
753         return rc;
754 }
755
756 static int osc_cancel(struct obd_conn *oconn, __u32 mode,
757                       struct lustre_handle *lockh)
758 {
759         ENTRY;
760
761         ldlm_lock_decref(lockh, mode);
762
763         RETURN(0);
764 }
765
766 static int osc_setup(struct obd_device *obddev, obd_count len, void *buf)
767 {
768         struct obd_ioctl_data* data = buf;
769         struct osc_obd *osc = &obddev->u.osc;
770         char server_uuid[37];
771         int rc;
772         ENTRY;
773
774         if (data->ioc_inllen1 < 1) {
775                 CERROR("osc setup requires a TARGET UUID\n");
776                 RETURN(-EINVAL);
777         }
778
779         if (data->ioc_inllen1 > 37) {
780                 CERROR("osc TARGET UUID must be less than 38 characters\n");
781                 RETURN(-EINVAL);
782         }
783
784         if (data->ioc_inllen2 < 1) {
785                 CERROR("osc setup requires a SERVER UUID\n");
786                 RETURN(-EINVAL);
787         }
788
789         if (data->ioc_inllen2 > 37) {
790                 CERROR("osc SERVER UUID must be less than 38 characters\n");
791                 RETURN(-EINVAL);
792         }
793
794         memcpy(osc->osc_target_uuid, data->ioc_inlbuf1, data->ioc_inllen1);
795         memcpy(server_uuid, data->ioc_inlbuf2, MIN(data->ioc_inllen2,
796                                                    sizeof(server_uuid)));
797
798         osc->osc_conn = ptlrpc_uuid_to_connection(server_uuid);
799         if (!osc->osc_conn)
800                 RETURN(-ENOENT);
801
802         obddev->obd_namespace =
803                 ldlm_namespace_new("osc", LDLM_NAMESPACE_CLIENT);
804         if (obddev->obd_namespace == NULL)
805                 GOTO(out_conn, rc = -ENOMEM);
806
807         OBD_ALLOC(osc->osc_client, sizeof(*osc->osc_client));
808         if (osc->osc_client == NULL)
809                 GOTO(out_ns, rc = -ENOMEM);
810
811         OBD_ALLOC(osc->osc_ldlm_client, sizeof(*osc->osc_ldlm_client));
812         if (osc->osc_ldlm_client == NULL)
813                 GOTO(out_client, rc = -ENOMEM);
814
815         ptlrpc_init_client(NULL, NULL, OST_REQUEST_PORTAL, OSC_REPLY_PORTAL,
816                            osc->osc_client);
817         ptlrpc_init_client(NULL, NULL, LDLM_REQUEST_PORTAL, LDLM_REPLY_PORTAL,
818                            osc->osc_ldlm_client);
819         osc->osc_client->cli_name = "osc";
820         osc->osc_ldlm_client->cli_name = "ldlm";
821
822         MOD_INC_USE_COUNT;
823         RETURN(0);
824
825  out_client:
826         OBD_FREE(osc->osc_client, sizeof(*osc->osc_client));
827  out_ns:
828         ldlm_namespace_free(obddev->obd_namespace);
829  out_conn:
830         ptlrpc_put_connection(osc->osc_conn);
831         return rc;
832 }
833
834 static int osc_cleanup(struct obd_device * obddev)
835 {
836         struct osc_obd *osc = &obddev->u.osc;
837
838         ldlm_namespace_free(obddev->obd_namespace);
839
840         ptlrpc_cleanup_client(osc->osc_client);
841         OBD_FREE(osc->osc_client, sizeof(*osc->osc_client));
842         ptlrpc_cleanup_client(osc->osc_ldlm_client);
843         OBD_FREE(osc->osc_ldlm_client, sizeof(*osc->osc_ldlm_client));
844         ptlrpc_put_connection(osc->osc_conn);
845
846         MOD_DEC_USE_COUNT;
847         return 0;
848 }
849
850 #if 0
851 static int osc_statfs(struct obd_conn *conn, struct statfs *sfs);
852 {
853         struct ptlrpc_request *request;
854         struct ptlrpc_client *cl;
855         struct ptlrpc_connection *connection;
856         struct obd_statfs *osfs;
857         int rc, size = sizeof(*osfs);
858         ENTRY;
859
860         osc_con2cl(conn, &cl, &connection);
861         request = ptlrpc_prep_req(cl, connection, OST_STATFS, 0, NULL, NULL);
862         if (!request)
863                 RETURN(-ENOMEM);
864
865         request->rq_replen = lustre_msg_size(1, &size);
866
867         rc = ptlrpc_queue_wait(request);
868         rc = ptlrpc_check_status(request, rc);
869         if (rc)
870                 GOTO(out, rc);
871
872         osfs = lustre_msg_buf(request->rq_repmsg, 0);
873         obd_statfs_unpack(sfs, osfs);
874
875         EXIT;
876  out:
877         ptlrpc_free_req(request);
878         return 0;
879 }
880 #endif
881
882 struct obd_ops osc_obd_ops = {
883         o_setup:   osc_setup,
884         o_cleanup: osc_cleanup,
885         o_create: osc_create,
886         o_destroy: osc_destroy,
887         o_getattr: osc_getattr,
888         o_setattr: osc_setattr,
889         o_open: osc_open,
890         o_close: osc_close,
891         o_connect: osc_connect,
892         o_disconnect: osc_disconnect,
893         o_brw: osc_brw,
894         o_punch: osc_punch,
895         o_enqueue: osc_enqueue,
896         o_cancel: osc_cancel
897 };
898
899 static int __init osc_init(void)
900 {
901         return obd_register_type(&osc_obd_ops, LUSTRE_OSC_NAME);
902 }
903
904 static void __exit osc_exit(void)
905 {
906         obd_unregister_type(LUSTRE_OSC_NAME);
907 }
908
909 MODULE_AUTHOR("Peter J. Braam <braam@clusterfs.com>");
910 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC) v1.0");
911 MODULE_LICENSE("GPL");
912
913 module_init(osc_init);
914 module_exit(osc_exit);