Whamcloud - gitweb
Fixups to handle error recovery when we are out of memory. Some of them
[fs/lustre-release.git] / lustre / osc / osc_request.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Copryright (C) 2001, 2002 Cluster File Systems, Inc.
5  *
6  *  This code is issued under the GNU General Public License.
7  *  See the file COPYING in this distribution
8  *
9  *  Author Peter Braam <braam@clusterfs.com>
10  *
11  *  This server is single threaded at present (but can easily be multi
12  *  threaded). For testing and management it is treated as an
13  *  obd_device, although it does not export a full OBD method table
14  *  (the requests are coming in over the wire, so object target
15  *  modules do not have a full method table.)
16  *
17  */
18
19 #define EXPORT_SYMTAB
20 #define DEBUG_SUBSYSTEM S_OSC
21
22 #include <linux/module.h>
23 #include <linux/random.h>
24 #include <linux/lustre_dlm.h>
25 #include <linux/obd_ost.h>
26
27 static void osc_con2cl(struct obd_conn *conn, struct ptlrpc_client **cl,
28                        struct ptlrpc_connection **connection)
29 {
30         struct osc_obd *osc = &conn->oc_dev->u.osc;
31         *cl = osc->osc_client;
32         *connection = osc->osc_conn;
33 }
34
35 static int osc_connect(struct obd_conn *conn)
36 {
37         struct ptlrpc_request *request;
38         struct ptlrpc_client *cl;
39         struct ptlrpc_connection *connection;
40         struct ost_body *body;
41         int rc, size = sizeof(*body);
42         ENTRY;
43
44         osc_con2cl(conn, &cl, &connection);
45         request = ptlrpc_prep_req(cl, connection, OST_CONNECT, 0, NULL, NULL);
46         if (!request)
47                 RETURN(-ENOMEM);
48
49         request->rq_replen = lustre_msg_size(1, &size);
50
51         rc = ptlrpc_queue_wait(request);
52         if (rc)
53                 GOTO(out, rc);
54
55         body = lustre_msg_buf(request->rq_repmsg, 0);
56         CDEBUG(D_INODE, "received connid %d\n", body->connid);
57
58         conn->oc_id = body->connid;
59         EXIT;
60  out:
61         ptlrpc_free_req(request);
62         return rc;
63 }
64
65 static int osc_disconnect(struct obd_conn *conn)
66 {
67         struct ptlrpc_request *request;
68         struct ptlrpc_client *cl;
69         struct ptlrpc_connection *connection;
70         struct ost_body *body;
71         int rc, size = sizeof(*body);
72         ENTRY;
73
74         osc_con2cl(conn, &cl, &connection);
75         request = ptlrpc_prep_req(cl, connection, OST_DISCONNECT, 1, &size, NULL);
76         if (!request)
77                 RETURN(-ENOMEM);
78
79         body = lustre_msg_buf(request->rq_reqmsg, 0);
80         body->connid = conn->oc_id;
81
82         request->rq_replen = lustre_msg_size(1, &size);
83
84         rc = ptlrpc_queue_wait(request);
85         GOTO(out, rc);
86  out:
87         ptlrpc_free_req(request);
88         return rc;
89 }
90
91 static int osc_getattr(struct obd_conn *conn, struct obdo *oa)
92 {
93         struct ptlrpc_request *request;
94         struct ptlrpc_client *cl;
95         struct ptlrpc_connection *connection;
96         struct ost_body *body;
97         int rc, size = sizeof(*body);
98         ENTRY;
99
100         osc_con2cl(conn, &cl, &connection);
101         request = ptlrpc_prep_req(cl, connection, OST_GETATTR, 1, &size, NULL);
102         if (!request)
103                 RETURN(-ENOMEM);
104
105         body = lustre_msg_buf(request->rq_reqmsg, 0);
106         memcpy(&body->oa, oa, sizeof(*oa));
107         body->connid = conn->oc_id;
108         body->oa.o_valid = ~0;
109
110         request->rq_replen = lustre_msg_size(1, &size);
111
112         rc = ptlrpc_queue_wait(request);
113         if (rc)
114                 GOTO(out, rc);
115
116         body = lustre_msg_buf(request->rq_repmsg, 0);
117         CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
118         if (oa)
119                 memcpy(oa, &body->oa, sizeof(*oa));
120
121         EXIT;
122  out:
123         ptlrpc_free_req(request);
124         return 0;
125 }
126
127 static int osc_open(struct obd_conn *conn, struct obdo *oa)
128 {
129         struct ptlrpc_request *request;
130         struct ptlrpc_client *cl;
131         struct ptlrpc_connection *connection;
132         struct ost_body *body;
133         int rc, size = sizeof(*body);
134         ENTRY;
135
136         osc_con2cl(conn, &cl, &connection);
137         request = ptlrpc_prep_req(cl, connection, OST_OPEN, 1, &size, NULL);
138         if (!request)
139                 RETURN(-ENOMEM);
140
141         body = lustre_msg_buf(request->rq_reqmsg, 0);
142         memcpy(&body->oa, oa, sizeof(*oa));
143         body->connid = conn->oc_id;
144         if (body->oa.o_valid != (OBD_MD_FLMODE | OBD_MD_FLID))
145                 LBUG();
146
147         request->rq_replen = lustre_msg_size(1, &size);
148
149         rc = ptlrpc_queue_wait(request);
150         if (rc)
151                 GOTO(out, rc);
152
153         body = lustre_msg_buf(request->rq_repmsg, 0);
154         CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
155         if (oa)
156                 memcpy(oa, &body->oa, sizeof(*oa));
157
158         EXIT;
159  out:
160         ptlrpc_free_req(request);
161         return 0;
162 }
163
164 static int osc_close(struct obd_conn *conn, struct obdo *oa)
165 {
166         struct ptlrpc_request *request;
167         struct ptlrpc_client *cl;
168         struct ptlrpc_connection *connection;
169         struct ost_body *body;
170         int rc, size = sizeof(*body);
171         ENTRY;
172
173         osc_con2cl(conn, &cl, &connection);
174         request = ptlrpc_prep_req(cl, connection, OST_CLOSE, 1, &size, NULL);
175         if (!request)
176                 RETURN(-ENOMEM);
177
178         body = lustre_msg_buf(request->rq_reqmsg, 0);
179         memcpy(&body->oa, oa, sizeof(*oa));
180         body->connid = conn->oc_id;
181
182         request->rq_replen = lustre_msg_size(1, &size);
183
184         rc = ptlrpc_queue_wait(request);
185         if (rc)
186                 GOTO(out, rc);
187
188         body = lustre_msg_buf(request->rq_repmsg, 0);
189         CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
190         if (oa)
191                 memcpy(oa, &body->oa, sizeof(*oa));
192
193         EXIT;
194  out:
195         ptlrpc_free_req(request);
196         return 0;
197 }
198
199 static int osc_setattr(struct obd_conn *conn, struct obdo *oa)
200 {
201         struct ptlrpc_request *request;
202         struct ptlrpc_client *cl;
203         struct ptlrpc_connection *connection;
204         struct ost_body *body;
205         int rc, size = sizeof(*body);
206         ENTRY;
207
208         osc_con2cl(conn, &cl, &connection);
209         request = ptlrpc_prep_req(cl, connection, OST_SETATTR, 1, &size, NULL);
210         if (!request)
211                 RETURN(-ENOMEM);
212
213         body = lustre_msg_buf(request->rq_reqmsg, 0);
214         memcpy(&body->oa, oa, sizeof(*oa));
215         body->connid = conn->oc_id;
216
217         request->rq_replen = lustre_msg_size(1, &size);
218
219         rc = ptlrpc_queue_wait(request);
220         GOTO(out, rc);
221
222  out:
223         ptlrpc_free_req(request);
224         return 0;
225 }
226
227 static int osc_create(struct obd_conn *conn, struct obdo *oa)
228 {
229         struct ptlrpc_request *request;
230         struct ptlrpc_client *cl;
231         struct ptlrpc_connection *connection;
232         struct ost_body *body;
233         int rc, size = sizeof(*body);
234         ENTRY;
235
236         if (!oa) {
237                 CERROR("oa NULL\n");
238                 RETURN(-EINVAL);
239         }
240         osc_con2cl(conn, &cl, &connection);
241         request = ptlrpc_prep_req(cl, connection, OST_CREATE, 1, &size, NULL);
242         if (!request)
243                 RETURN(-ENOMEM);
244
245         body = lustre_msg_buf(request->rq_reqmsg, 0);
246         memcpy(&body->oa, oa, sizeof(*oa));
247         body->oa.o_valid = ~0;
248         body->connid = conn->oc_id;
249
250         request->rq_replen = lustre_msg_size(1, &size);
251
252         rc = ptlrpc_queue_wait(request);
253         if (rc)
254                 GOTO(out, rc);
255
256         body = lustre_msg_buf(request->rq_repmsg, 0);
257         memcpy(oa, &body->oa, sizeof(*oa));
258
259         EXIT;
260  out:
261         ptlrpc_free_req(request);
262         return 0;
263 }
264
265 static int osc_punch(struct obd_conn *conn, struct obdo *oa, obd_size count,
266                      obd_off offset)
267 {
268         struct ptlrpc_request *request;
269         struct ptlrpc_client *cl;
270         struct ptlrpc_connection *connection;
271         struct ost_body *body;
272         int rc, size = sizeof(*body);
273         ENTRY;
274
275         if (!oa) {
276                 CERROR("oa NULL\n");
277                 RETURN(-EINVAL);
278         }
279         osc_con2cl(conn, &cl, &connection);
280         request = ptlrpc_prep_req(cl, connection, OST_PUNCH, 1, &size, NULL);
281         if (!request)
282                 RETURN(-ENOMEM);
283
284         body = lustre_msg_buf(request->rq_reqmsg, 0);
285         memcpy(&body->oa, oa, sizeof(*oa));
286         body->connid = conn->oc_id;
287         body->oa.o_valid = ~0;
288         body->oa.o_size = offset;
289         body->oa.o_blocks = count;
290
291         request->rq_replen = lustre_msg_size(1, &size);
292
293         rc = ptlrpc_queue_wait(request);
294         if (rc)
295                 GOTO(out, rc);
296
297         body = lustre_msg_buf(request->rq_repmsg, 0);
298         memcpy(oa, &body->oa, sizeof(*oa));
299
300         EXIT;
301  out:
302         ptlrpc_free_req(request);
303         return 0;
304 }
305
306 static int osc_destroy(struct obd_conn *conn, struct obdo *oa)
307 {
308         struct ptlrpc_request *request;
309         struct ptlrpc_client *cl;
310         struct ptlrpc_connection *connection;
311         struct ost_body *body;
312         int rc, size = sizeof(*body);
313         ENTRY;
314
315         if (!oa) {
316                 CERROR("oa NULL\n");
317                 RETURN(-EINVAL);
318         }
319         osc_con2cl(conn, &cl, &connection);
320         request = ptlrpc_prep_req(cl, connection, OST_DESTROY, 1, &size, NULL);
321         if (!request)
322                 RETURN(-ENOMEM);
323
324         body = lustre_msg_buf(request->rq_reqmsg, 0);
325         memcpy(&body->oa, oa, sizeof(*oa));
326         body->connid = conn->oc_id;
327         body->oa.o_valid = ~0;
328
329         request->rq_replen = lustre_msg_size(1, &size);
330
331         rc = ptlrpc_queue_wait(request);
332         if (rc)
333                 GOTO(out, rc);
334
335         body = lustre_msg_buf(request->rq_repmsg, 0);
336         memcpy(oa, &body->oa, sizeof(*oa));
337
338         EXIT;
339  out:
340         ptlrpc_free_req(request);
341         return 0;
342 }
343
344 int osc_sendpage(struct obd_conn *conn, struct ptlrpc_request *req,
345                  struct niobuf *dst, struct niobuf *src)
346 {
347         struct ptlrpc_client *cl;
348         struct ptlrpc_connection *connection;
349
350         osc_con2cl(conn, &cl, &connection);
351
352         if (cl->cli_obd) {
353                 /* local sendpage */
354                 memcpy((char *)(unsigned long)dst->addr,
355                        (char *)(unsigned long)src->addr, src->len);
356         } else {
357                 struct ptlrpc_bulk_desc *bulk;
358                 int rc;
359
360                 bulk = ptlrpc_prep_bulk(connection);
361                 if (bulk == NULL)
362                         RETURN(-ENOMEM);
363
364                 bulk->b_buf = (void *)(unsigned long)src->addr;
365                 bulk->b_buflen = src->len;
366                 bulk->b_xid = dst->xid;
367                 rc = ptlrpc_send_bulk(bulk, OSC_BULK_PORTAL);
368                 if (rc != 0) {
369                         CERROR("send_bulk failed: %d\n", rc);
370                         ptlrpc_free_bulk(bulk);
371                         LBUG();
372                         RETURN(rc);
373                 }
374                 wait_event_interruptible(bulk->b_waitq,
375                                          ptlrpc_check_bulk_sent(bulk));
376
377                 if (bulk->b_flags & PTL_RPC_FL_INTR) {
378                         ptlrpc_free_bulk(bulk);
379                         RETURN(-EINTR);
380                 }
381
382                 ptlrpc_free_bulk(bulk);
383         }
384
385         return 0;
386 }
387
388 int osc_brw_read(struct obd_conn *conn, obd_count num_oa, struct obdo **oa,
389                  obd_count *oa_bufs, struct page **buf, obd_size *count,
390                  obd_off *offset, obd_flag *flags)
391 {
392         struct ptlrpc_client *cl;
393         struct ptlrpc_connection *connection;
394         struct ptlrpc_request *request;
395         struct ost_body *body;
396         struct obd_ioobj ioo;
397         struct niobuf src;
398         int pages, rc, i, j, size[3] = {sizeof(*body)};
399         void *ptr1, *ptr2;
400         struct ptlrpc_bulk_desc **bulk;
401         ENTRY;
402
403         size[1] = num_oa * sizeof(ioo);
404         pages = 0;
405         for (i = 0; i < num_oa; i++)
406                 pages += oa_bufs[i];
407         size[2] = pages * sizeof(src);
408
409         OBD_ALLOC(bulk, pages * sizeof(*bulk));
410         if (bulk == NULL)
411                 RETURN(-ENOMEM);
412
413         osc_con2cl(conn, &cl, &connection);
414         request = ptlrpc_prep_req(cl, connection, OST_BRW, 3, size, NULL);
415         if (!request)
416                 GOTO(out, rc = -ENOMEM);
417
418         body = lustre_msg_buf(request->rq_reqmsg, 0);
419         body->data = OBD_BRW_READ;
420
421         ptr1 = lustre_msg_buf(request->rq_reqmsg, 1);
422         ptr2 = lustre_msg_buf(request->rq_reqmsg, 2);
423         for (pages = 0, i = 0; i < num_oa; i++) {
424                 ost_pack_ioo(&ptr1, oa[i], oa_bufs[i]);
425                 for (j = 0; j < oa_bufs[i]; j++, pages++) {
426                         bulk[pages] = ptlrpc_prep_bulk(connection);
427                         if (bulk[pages] == NULL)
428                                 GOTO(out, rc = -ENOMEM);
429
430                         spin_lock(&connection->c_lock);
431                         bulk[pages]->b_xid = ++connection->c_xid_out;
432                         spin_unlock(&connection->c_lock);
433
434                         bulk[pages]->b_buf = kmap(buf[pages]);
435                         bulk[pages]->b_buflen = PAGE_SIZE;
436                         bulk[pages]->b_portal = OST_BULK_PORTAL;
437                         ost_pack_niobuf(&ptr2, bulk[pages]->b_buf,
438                                         offset[pages], count[pages],
439                                         flags[pages], bulk[pages]->b_xid);
440
441                         rc = ptlrpc_register_bulk(bulk[pages]);
442                         if (rc)
443                                 GOTO(out, rc);
444                 }
445         }
446
447         request->rq_replen = lustre_msg_size(1, size);
448         rc = ptlrpc_queue_wait(request);
449         GOTO(out, rc);
450
451  out:
452         /* FIXME: if we've called ptlrpc_wait_bulk but rc != 0, we need to
453          * abort those bulk listeners. */
454
455         for (pages = 0, i = 0; i < num_oa; i++) {
456                 for (j = 0; j < oa_bufs[i]; j++, pages++) {
457                         if (bulk[pages] == NULL)
458                                 continue;
459                         kunmap(buf[pages]);
460                         OBD_FREE(bulk[pages], sizeof(**bulk));
461                 }
462         }
463
464         OBD_FREE(bulk, pages * sizeof(*bulk));
465         ptlrpc_free_req(request);
466         return rc;
467 }
468
469 int osc_brw_write(struct obd_conn *conn, obd_count num_oa, struct obdo **oa,
470                   obd_count *oa_bufs, struct page **buf, obd_size *count,
471                   obd_off *offset, obd_flag *flags)
472 {
473         struct ptlrpc_client *cl;
474         struct ptlrpc_connection *connection;
475         struct ptlrpc_request *request;
476         struct obd_ioobj ioo;
477         struct ost_body *body;
478         struct niobuf *src;
479         int pages, rc, i, j, size[3] = {sizeof(*body)};
480         void *ptr1, *ptr2;
481         ENTRY;
482
483         size[1] = num_oa * sizeof(ioo);
484         pages = 0;
485         for (i = 0; i < num_oa; i++)
486                 pages += oa_bufs[i];
487         size[2] = pages * sizeof(*src);
488
489         OBD_ALLOC(src, size[2]);
490         if (!src)
491                 RETURN(-ENOMEM);
492
493         osc_con2cl(conn, &cl, &connection);
494         request = ptlrpc_prep_req(cl, connection, OST_BRW, 3, size, NULL);
495         if (!request)
496                 RETURN(-ENOMEM);
497         body = lustre_msg_buf(request->rq_reqmsg, 0);
498         body->data = OBD_BRW_WRITE;
499
500         ptr1 = lustre_msg_buf(request->rq_reqmsg, 1);
501         ptr2 = lustre_msg_buf(request->rq_reqmsg, 2);
502         for (pages = 0, i = 0; i < num_oa; i++) {
503                 ost_pack_ioo(&ptr1, oa[i], oa_bufs[i]);
504                 for (j = 0; j < oa_bufs[i]; j++, pages++) {
505                         ost_pack_niobuf(&ptr2, kmap(buf[pages]), offset[pages],
506                                         count[pages], flags[pages], 0);
507                 }
508         }
509         memcpy(src, lustre_msg_buf(request->rq_reqmsg, 2), size[2]);
510
511         size[1] = pages * sizeof(struct niobuf);
512         request->rq_replen = lustre_msg_size(2, size);
513
514         rc = ptlrpc_queue_wait(request);
515         if (rc)
516                 GOTO(out, rc);
517
518         ptr2 = lustre_msg_buf(request->rq_repmsg, 1);
519         if (ptr2 == NULL)
520                 GOTO(out, rc = -EINVAL);
521
522         if (request->rq_repmsg->buflens[1] != pages * sizeof(struct niobuf)) {
523                 CERROR("buffer length wrong (%d vs. %d)\n",
524                        request->rq_repmsg->buflens[1],
525                        pages * sizeof(struct niobuf));
526                 GOTO(out, rc = -EINVAL);
527         }
528
529         for (pages = 0, i = 0; i < num_oa; i++) {
530                 for (j = 0; j < oa_bufs[i]; j++, pages++) {
531                         struct niobuf *dst;
532                         ost_unpack_niobuf(&ptr2, &dst);
533                         osc_sendpage(conn, request, dst, &src[pages]);
534                 }
535         }
536         OBD_FREE(src, size[2]);
537  out:
538         for (pages = 0, i = 0; i < num_oa; i++)
539                 for (j = 0; j < oa_bufs[i]; j++, pages++)
540                         kunmap(buf[pages]);
541
542         ptlrpc_free_req(request);
543         return 0;
544 }
545
546 int osc_brw(int rw, struct obd_conn *conn, obd_count num_oa,
547               struct obdo **oa, obd_count *oa_bufs, struct page **buf,
548               obd_size *count, obd_off *offset, obd_flag *flags)
549 {
550         if (rw == OBD_BRW_READ)
551                 return osc_brw_read(conn, num_oa, oa, oa_bufs, buf, count,
552                                     offset, flags);
553         else
554                 return osc_brw_write(conn, num_oa, oa, oa_bufs, buf, count,
555                                      offset, flags);
556 }
557
558 static int osc_setup(struct obd_device *obddev, obd_count len, void *buf)
559 {
560         struct osc_obd *osc = &obddev->u.osc;
561         __u32 ns_id;
562         int rc;
563         ENTRY;
564
565         osc->osc_conn = ptlrpc_uuid_to_connection("ost");
566         if (!osc->osc_conn)
567                 RETURN(-EINVAL);
568
569         OBD_ALLOC(osc->osc_client, sizeof(*osc->osc_client));
570         if (osc->osc_client == NULL)
571                 GOTO(out_conn, rc = -ENOMEM);
572
573         OBD_ALLOC(osc->osc_ldlm_client, sizeof(*osc->osc_ldlm_client));
574         if (osc->osc_ldlm_client == NULL)
575                 GOTO(out_client, rc = -ENOMEM);
576
577         ptlrpc_init_client(NULL, OST_REQUEST_PORTAL, OSC_REPLY_PORTAL,
578                            osc->osc_client);
579         ptlrpc_init_client(NULL, LDLM_REQUEST_PORTAL, LDLM_REPLY_PORTAL,
580                            osc->osc_ldlm_client);
581
582         get_random_bytes(&ns_id, sizeof(ns_id));
583         rc = ldlm_cli_namespace_new(obddev, osc->osc_ldlm_client, osc->osc_conn,
584                                     ns_id);
585         if (rc) {
586                 CERROR("Couldn't create new namespace %u: %d\n", ns_id, rc);
587                 GOTO(out_ldlm_client, rc);
588         }
589
590         MOD_INC_USE_COUNT;
591         RETURN(0);
592
593  out_ldlm_client:
594         OBD_FREE(osc->osc_ldlm_client, sizeof(*osc->osc_ldlm_client));
595  out_client:
596         OBD_FREE(osc->osc_client, sizeof(*osc->osc_client));
597  out_conn:
598         ptlrpc_put_connection(osc->osc_conn);
599         return rc;
600 }
601
602 static int osc_cleanup(struct obd_device * obddev)
603 {
604         struct osc_obd *osc = &obddev->u.osc;
605
606         OBD_FREE(osc->osc_client, sizeof(*osc->osc_client));
607         OBD_FREE(osc->osc_ldlm_client, sizeof(*osc->osc_ldlm_client));
608         ptlrpc_put_connection(osc->osc_conn);
609
610         MOD_DEC_USE_COUNT;
611         return 0;
612 }
613
614 struct obd_ops osc_obd_ops = {
615         o_setup:   osc_setup,
616         o_cleanup: osc_cleanup,
617         o_create: osc_create,
618         o_destroy: osc_destroy,
619         o_getattr: osc_getattr,
620         o_setattr: osc_setattr,
621         o_open: osc_open,
622         o_close: osc_close,
623         o_connect: osc_connect,
624         o_disconnect: osc_disconnect,
625         o_brw: osc_brw,
626         o_punch: osc_punch
627 };
628
629 static int __init osc_init(void)
630 {
631         obd_register_type(&osc_obd_ops, LUSTRE_OSC_NAME);
632         return 0;
633 }
634
635 static void __exit osc_exit(void)
636 {
637         obd_unregister_type(LUSTRE_OSC_NAME);
638 }
639
640 MODULE_AUTHOR("Peter J. Braam <braam@clusterfs.com>");
641 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC) v1.0");
642 MODULE_LICENSE("GPL");
643
644 module_init(osc_init);
645 module_exit(osc_exit);