Whamcloud - gitweb
- documentation update for MDS recovery
[fs/lustre-release.git] / lustre / osc / osc_request.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Copryright (C) 2001, 2002 Cluster File Systems, Inc.
5  *
6  *  This code is issued under the GNU General Public License.
7  *  See the file COPYING in this distribution
8  *
9  *  Author Peter Braam <braam@clusterfs.com>
10  *
11  *  This server is single threaded at present (but can easily be multi
12  *  threaded). For testing and management it is treated as an
13  *  obd_device, although it does not export a full OBD method table
14  *  (the requests are coming in over the wire, so object target
15  *  modules do not have a full method table.)
16  *
17  */
18
19 #define EXPORT_SYMTAB
20 #define DEBUG_SUBSYSTEM S_OSC
21
22 #include <linux/module.h>
23 #include <linux/lustre_dlm.h>
24 #include <linux/obd_ost.h>
25
26 static void osc_con2cl(struct obd_conn *conn, struct ptlrpc_client **cl,
27                        struct ptlrpc_connection **connection)
28 {
29         struct osc_obd *osc = &conn->oc_dev->u.osc;
30         *cl = osc->osc_client;
31         *connection = osc->osc_conn;
32 }
33
34 static int osc_connect(struct obd_conn *conn)
35 {
36         struct ptlrpc_request *request;
37         struct ptlrpc_client *cl;
38         struct ptlrpc_connection *connection;
39         struct ost_body *body;
40         int rc, size = sizeof(*body);
41         ENTRY;
42
43         osc_con2cl(conn, &cl, &connection);
44         request = ptlrpc_prep_req(cl, connection, OST_CONNECT, 0, NULL, NULL);
45         if (!request)
46                 RETURN(-ENOMEM);
47
48         request->rq_replen = lustre_msg_size(1, &size);
49
50         rc = ptlrpc_queue_wait(request);
51         if (rc)
52                 GOTO(out, rc);
53
54         body = lustre_msg_buf(request->rq_repmsg, 0);
55         CDEBUG(D_INODE, "received connid %d\n", body->connid);
56
57         conn->oc_id = body->connid;
58         EXIT;
59  out:
60         ptlrpc_free_req(request);
61         return rc;
62 }
63
64 static int osc_disconnect(struct obd_conn *conn)
65 {
66         struct ptlrpc_request *request;
67         struct ptlrpc_client *cl;
68         struct ptlrpc_connection *connection;
69         struct ost_body *body;
70         int rc, size = sizeof(*body);
71         ENTRY;
72
73         osc_con2cl(conn, &cl, &connection);
74         request = ptlrpc_prep_req(cl, connection, OST_DISCONNECT, 1, &size, NULL);
75         if (!request)
76                 RETURN(-ENOMEM);
77
78         body = lustre_msg_buf(request->rq_reqmsg, 0);
79         body->connid = conn->oc_id;
80
81         request->rq_replen = lustre_msg_size(1, &size);
82
83         rc = ptlrpc_queue_wait(request);
84         GOTO(out, rc);
85  out:
86         ptlrpc_free_req(request);
87         return rc;
88 }
89
90 static int osc_getattr(struct obd_conn *conn, struct obdo *oa)
91 {
92         struct ptlrpc_request *request;
93         struct ptlrpc_client *cl;
94         struct ptlrpc_connection *connection;
95         struct ost_body *body;
96         int rc, size = sizeof(*body);
97         ENTRY;
98
99         osc_con2cl(conn, &cl, &connection);
100         request = ptlrpc_prep_req(cl, connection, OST_GETATTR, 1, &size, NULL);
101         if (!request)
102                 RETURN(-ENOMEM);
103
104         body = lustre_msg_buf(request->rq_reqmsg, 0);
105         memcpy(&body->oa, oa, sizeof(*oa));
106         body->connid = conn->oc_id;
107         body->oa.o_valid = ~0;
108
109         request->rq_replen = lustre_msg_size(1, &size);
110
111         rc = ptlrpc_queue_wait(request);
112         if (rc)
113                 GOTO(out, rc);
114
115         body = lustre_msg_buf(request->rq_repmsg, 0);
116         CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
117         if (oa)
118                 memcpy(oa, &body->oa, sizeof(*oa));
119
120         EXIT;
121  out:
122         ptlrpc_free_req(request);
123         return 0;
124 }
125
126 static int osc_open(struct obd_conn *conn, struct obdo *oa)
127 {
128         struct ptlrpc_request *request;
129         struct ptlrpc_client *cl;
130         struct ptlrpc_connection *connection;
131         struct ost_body *body;
132         int rc, size = sizeof(*body);
133         ENTRY;
134
135         osc_con2cl(conn, &cl, &connection);
136         request = ptlrpc_prep_req(cl, connection, OST_OPEN, 1, &size, NULL);
137         if (!request)
138                 RETURN(-ENOMEM);
139
140         body = lustre_msg_buf(request->rq_reqmsg, 0);
141         memcpy(&body->oa, oa, sizeof(*oa));
142         body->connid = conn->oc_id;
143         if (body->oa.o_valid != (OBD_MD_FLMODE | OBD_MD_FLID))
144                 LBUG();
145
146         request->rq_replen = lustre_msg_size(1, &size);
147
148         rc = ptlrpc_queue_wait(request);
149         if (rc)
150                 GOTO(out, rc);
151
152         body = lustre_msg_buf(request->rq_repmsg, 0);
153         CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
154         if (oa)
155                 memcpy(oa, &body->oa, sizeof(*oa));
156
157         EXIT;
158  out:
159         ptlrpc_free_req(request);
160         return 0;
161 }
162
163 static int osc_close(struct obd_conn *conn, struct obdo *oa)
164 {
165         struct ptlrpc_request *request;
166         struct ptlrpc_client *cl;
167         struct ptlrpc_connection *connection;
168         struct ost_body *body;
169         int rc, size = sizeof(*body);
170         ENTRY;
171
172         osc_con2cl(conn, &cl, &connection);
173         request = ptlrpc_prep_req(cl, connection, OST_CLOSE, 1, &size, NULL);
174         if (!request)
175                 RETURN(-ENOMEM);
176
177         body = lustre_msg_buf(request->rq_reqmsg, 0);
178         memcpy(&body->oa, oa, sizeof(*oa));
179         body->connid = conn->oc_id;
180
181         request->rq_replen = lustre_msg_size(1, &size);
182
183         rc = ptlrpc_queue_wait(request);
184         if (rc)
185                 GOTO(out, rc);
186
187         body = lustre_msg_buf(request->rq_repmsg, 0);
188         CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
189         if (oa)
190                 memcpy(oa, &body->oa, sizeof(*oa));
191
192         EXIT;
193  out:
194         ptlrpc_free_req(request);
195         return 0;
196 }
197
198 static int osc_setattr(struct obd_conn *conn, struct obdo *oa)
199 {
200         struct ptlrpc_request *request;
201         struct ptlrpc_client *cl;
202         struct ptlrpc_connection *connection;
203         struct ost_body *body;
204         int rc, size = sizeof(*body);
205         ENTRY;
206
207         osc_con2cl(conn, &cl, &connection);
208         request = ptlrpc_prep_req(cl, connection, OST_SETATTR, 1, &size, NULL);
209         if (!request)
210                 RETURN(-ENOMEM);
211
212         body = lustre_msg_buf(request->rq_reqmsg, 0);
213         memcpy(&body->oa, oa, sizeof(*oa));
214         body->connid = conn->oc_id;
215
216         request->rq_replen = lustre_msg_size(1, &size);
217
218         rc = ptlrpc_queue_wait(request);
219         GOTO(out, rc);
220
221  out:
222         ptlrpc_free_req(request);
223         return 0;
224 }
225
226 static int osc_create(struct obd_conn *conn, struct obdo *oa)
227 {
228         struct ptlrpc_request *request;
229         struct ptlrpc_client *cl;
230         struct ptlrpc_connection *connection;
231         struct ost_body *body;
232         int rc, size = sizeof(*body);
233         ENTRY;
234
235         if (!oa) {
236                 CERROR("oa NULL\n");
237                 RETURN(-EINVAL);
238         }
239         osc_con2cl(conn, &cl, &connection);
240         request = ptlrpc_prep_req(cl, connection, OST_CREATE, 1, &size, NULL);
241         if (!request)
242                 RETURN(-ENOMEM);
243
244         body = lustre_msg_buf(request->rq_reqmsg, 0);
245         memcpy(&body->oa, oa, sizeof(*oa));
246         body->oa.o_valid = ~0;
247         body->connid = conn->oc_id;
248
249         request->rq_replen = lustre_msg_size(1, &size);
250
251         rc = ptlrpc_queue_wait(request);
252         if (rc)
253                 GOTO(out, rc);
254
255         body = lustre_msg_buf(request->rq_repmsg, 0);
256         memcpy(oa, &body->oa, sizeof(*oa));
257
258         EXIT;
259  out:
260         ptlrpc_free_req(request);
261         return 0;
262 }
263
264 static int osc_punch(struct obd_conn *conn, struct obdo *oa, obd_size count,
265                      obd_off offset)
266 {
267         struct ptlrpc_request *request;
268         struct ptlrpc_client *cl;
269         struct ptlrpc_connection *connection;
270         struct ost_body *body;
271         int rc, size = sizeof(*body);
272         ENTRY;
273
274         if (!oa) {
275                 CERROR("oa NULL\n");
276                 RETURN(-EINVAL);
277         }
278         osc_con2cl(conn, &cl, &connection);
279         request = ptlrpc_prep_req(cl, connection, OST_PUNCH, 1, &size, NULL);
280         if (!request)
281                 RETURN(-ENOMEM);
282
283         body = lustre_msg_buf(request->rq_reqmsg, 0);
284         memcpy(&body->oa, oa, sizeof(*oa));
285         body->connid = conn->oc_id;
286         body->oa.o_valid = ~0;
287         body->oa.o_size = offset;
288         body->oa.o_blocks = count;
289
290         request->rq_replen = lustre_msg_size(1, &size);
291
292         rc = ptlrpc_queue_wait(request);
293         if (rc)
294                 GOTO(out, rc);
295
296         body = lustre_msg_buf(request->rq_repmsg, 0);
297         memcpy(oa, &body->oa, sizeof(*oa));
298
299         EXIT;
300  out:
301         ptlrpc_free_req(request);
302         return 0;
303 }
304
305 static int osc_destroy(struct obd_conn *conn, struct obdo *oa)
306 {
307         struct ptlrpc_request *request;
308         struct ptlrpc_client *cl;
309         struct ptlrpc_connection *connection;
310         struct ost_body *body;
311         int rc, size = sizeof(*body);
312         ENTRY;
313
314         if (!oa) {
315                 CERROR("oa NULL\n");
316                 RETURN(-EINVAL);
317         }
318         osc_con2cl(conn, &cl, &connection);
319         request = ptlrpc_prep_req(cl, connection, OST_DESTROY, 1, &size, NULL);
320         if (!request)
321                 RETURN(-ENOMEM);
322
323         body = lustre_msg_buf(request->rq_reqmsg, 0);
324         memcpy(&body->oa, oa, sizeof(*oa));
325         body->connid = conn->oc_id;
326         body->oa.o_valid = ~0;
327
328         request->rq_replen = lustre_msg_size(1, &size);
329
330         rc = ptlrpc_queue_wait(request);
331         if (rc)
332                 GOTO(out, rc);
333
334         body = lustre_msg_buf(request->rq_repmsg, 0);
335         memcpy(oa, &body->oa, sizeof(*oa));
336
337         EXIT;
338  out:
339         ptlrpc_free_req(request);
340         return 0;
341 }
342
343 int osc_sendpage(struct obd_conn *conn, struct ptlrpc_request *req,
344                  struct niobuf *dst, struct niobuf *src)
345 {
346         struct ptlrpc_client *cl;
347         struct ptlrpc_connection *connection;
348
349         osc_con2cl(conn, &cl, &connection);
350
351         if (cl->cli_obd) {
352                 /* local sendpage */
353                 memcpy((char *)(unsigned long)dst->addr,
354                        (char *)(unsigned long)src->addr, src->len);
355         } else {
356                 struct ptlrpc_bulk_desc *bulk;
357                 int rc;
358
359                 bulk = ptlrpc_prep_bulk(connection);
360                 if (bulk == NULL)
361                         RETURN(-ENOMEM);
362
363                 bulk->b_buf = (void *)(unsigned long)src->addr;
364                 bulk->b_buflen = src->len;
365                 bulk->b_xid = dst->xid;
366                 rc = ptlrpc_send_bulk(bulk, OSC_BULK_PORTAL);
367                 if (rc != 0) {
368                         CERROR("send_bulk failed: %d\n", rc);
369                         ptlrpc_free_bulk(bulk);
370                         LBUG();
371                         RETURN(rc);
372                 }
373                 wait_event_interruptible(bulk->b_waitq,
374                                          ptlrpc_check_bulk_sent(bulk));
375
376                 if (bulk->b_flags & PTL_RPC_FL_INTR) {
377                         ptlrpc_free_bulk(bulk);
378                         RETURN(-EINTR);
379                 }
380
381                 ptlrpc_free_bulk(bulk);
382         }
383
384         return 0;
385 }
386
387 int osc_brw_read(struct obd_conn *conn, obd_count num_oa, struct obdo **oa,
388                  obd_count *oa_bufs, struct page **buf, obd_size *count,
389                  obd_off *offset, obd_flag *flags)
390 {
391         struct ptlrpc_client *cl;
392         struct ptlrpc_connection *connection;
393         struct ptlrpc_request *request;
394         struct ost_body *body;
395         struct obd_ioobj ioo;
396         struct niobuf src;
397         int pages, rc, i, j, size[3] = {sizeof(*body)};
398         void *ptr1, *ptr2;
399         struct ptlrpc_bulk_desc **bulk;
400         ENTRY;
401
402         size[1] = num_oa * sizeof(ioo);
403         pages = 0;
404         for (i = 0; i < num_oa; i++)
405                 pages += oa_bufs[i];
406         size[2] = pages * sizeof(src);
407
408         OBD_ALLOC(bulk, pages * sizeof(*bulk));
409         if (bulk == NULL)
410                 RETURN(-ENOMEM);
411
412         osc_con2cl(conn, &cl, &connection);
413         request = ptlrpc_prep_req(cl, connection, OST_BRW, 3, size, NULL);
414         if (!request)
415                 GOTO(out, rc = -ENOMEM);
416
417         body = lustre_msg_buf(request->rq_reqmsg, 0);
418         body->data = OBD_BRW_READ;
419
420         ptr1 = lustre_msg_buf(request->rq_reqmsg, 1);
421         ptr2 = lustre_msg_buf(request->rq_reqmsg, 2);
422         for (pages = 0, i = 0; i < num_oa; i++) {
423                 ost_pack_ioo(&ptr1, oa[i], oa_bufs[i]);
424                 for (j = 0; j < oa_bufs[i]; j++, pages++) {
425                         bulk[pages] = ptlrpc_prep_bulk(connection);
426                         if (bulk[pages] == NULL)
427                                 GOTO(out, rc = -ENOMEM);
428
429                         spin_lock(&connection->c_lock);
430                         bulk[pages]->b_xid = ++connection->c_xid_out;
431                         spin_unlock(&connection->c_lock);
432
433                         bulk[pages]->b_buf = kmap(buf[pages]);
434                         bulk[pages]->b_buflen = PAGE_SIZE;
435                         bulk[pages]->b_portal = OST_BULK_PORTAL;
436                         ost_pack_niobuf(&ptr2, bulk[pages]->b_buf,
437                                         offset[pages], count[pages],
438                                         flags[pages], bulk[pages]->b_xid);
439
440                         rc = ptlrpc_register_bulk(bulk[pages]);
441                         if (rc)
442                                 GOTO(out, rc);
443                 }
444         }
445
446         request->rq_replen = lustre_msg_size(1, size);
447         rc = ptlrpc_queue_wait(request);
448         GOTO(out, rc);
449
450  out:
451         /* FIXME: if we've called ptlrpc_wait_bulk but rc != 0, we need to
452          * abort those bulk listeners. */
453
454         for (pages = 0, i = 0; i < num_oa; i++) {
455                 for (j = 0; j < oa_bufs[i]; j++, pages++) {
456                         if (bulk[pages] == NULL)
457                                 continue;
458                         kunmap(buf[pages]);
459                         ptlrpc_free_bulk(bulk[pages]);
460                 }
461         }
462
463         OBD_FREE(bulk, pages * sizeof(*bulk));
464         ptlrpc_free_req(request);
465         return rc;
466 }
467
468 int osc_brw_write(struct obd_conn *conn, obd_count num_oa, struct obdo **oa,
469                   obd_count *oa_bufs, struct page **buf, obd_size *count,
470                   obd_off *offset, obd_flag *flags)
471 {
472         struct ptlrpc_client *cl;
473         struct ptlrpc_connection *connection;
474         struct ptlrpc_request *request;
475         struct obd_ioobj ioo;
476         struct ost_body *body;
477         struct niobuf *src;
478         int pages, rc, i, j, size[3] = {sizeof(*body)};
479         void *ptr1, *ptr2;
480         ENTRY;
481
482         size[1] = num_oa * sizeof(ioo);
483         pages = 0;
484         for (i = 0; i < num_oa; i++)
485                 pages += oa_bufs[i];
486         size[2] = pages * sizeof(*src);
487
488         OBD_ALLOC(src, size[2]);
489         if (!src)
490                 RETURN(-ENOMEM);
491
492         osc_con2cl(conn, &cl, &connection);
493         request = ptlrpc_prep_req(cl, connection, OST_BRW, 3, size, NULL);
494         if (!request)
495                 RETURN(-ENOMEM);
496         body = lustre_msg_buf(request->rq_reqmsg, 0);
497         body->data = OBD_BRW_WRITE;
498
499         ptr1 = lustre_msg_buf(request->rq_reqmsg, 1);
500         ptr2 = lustre_msg_buf(request->rq_reqmsg, 2);
501         for (pages = 0, i = 0; i < num_oa; i++) {
502                 ost_pack_ioo(&ptr1, oa[i], oa_bufs[i]);
503                 for (j = 0; j < oa_bufs[i]; j++, pages++) {
504                         ost_pack_niobuf(&ptr2, kmap(buf[pages]), offset[pages],
505                                         count[pages], flags[pages], 0);
506                 }
507         }
508         memcpy(src, lustre_msg_buf(request->rq_reqmsg, 2), size[2]);
509
510         size[1] = pages * sizeof(struct niobuf);
511         request->rq_replen = lustre_msg_size(2, size);
512
513         rc = ptlrpc_queue_wait(request);
514         if (rc)
515                 GOTO(out, rc);
516
517         ptr2 = lustre_msg_buf(request->rq_repmsg, 1);
518         if (ptr2 == NULL)
519                 GOTO(out, rc = -EINVAL);
520
521         if (request->rq_repmsg->buflens[1] != pages * sizeof(struct niobuf)) {
522                 CERROR("buffer length wrong (%d vs. %d)\n",
523                        request->rq_repmsg->buflens[1],
524                        pages * sizeof(struct niobuf));
525                 GOTO(out, rc = -EINVAL);
526         }
527
528         for (pages = 0, i = 0; i < num_oa; i++) {
529                 for (j = 0; j < oa_bufs[i]; j++, pages++) {
530                         struct niobuf *dst;
531                         ost_unpack_niobuf(&ptr2, &dst);
532                         osc_sendpage(conn, request, dst, &src[pages]);
533                 }
534         }
535         OBD_FREE(src, size[2]);
536  out:
537         for (pages = 0, i = 0; i < num_oa; i++)
538                 for (j = 0; j < oa_bufs[i]; j++, pages++)
539                         kunmap(buf[pages]);
540
541         ptlrpc_free_req(request);
542         return 0;
543 }
544
545 int osc_brw(int rw, struct obd_conn *conn, obd_count num_oa,
546               struct obdo **oa, obd_count *oa_bufs, struct page **buf,
547               obd_size *count, obd_off *offset, obd_flag *flags)
548 {
549         if (rw == OBD_BRW_READ)
550                 return osc_brw_read(conn, num_oa, oa, oa_bufs, buf, count,
551                                     offset, flags);
552         else
553                 return osc_brw_write(conn, num_oa, oa, oa_bufs, buf, count,
554                                      offset, flags);
555 }
556
557 static int osc_setup(struct obd_device *obddev, obd_count len, void *buf)
558 {
559         struct osc_obd *osc = &obddev->u.osc;
560         int rc;
561         ENTRY;
562
563         osc->osc_conn = ptlrpc_uuid_to_connection("ost");
564         if (!osc->osc_conn)
565                 RETURN(-EINVAL);
566
567         OBD_ALLOC(osc->osc_client, sizeof(*osc->osc_client));
568         if (osc->osc_client == NULL)
569                 GOTO(out_conn, rc = -ENOMEM);
570
571         OBD_ALLOC(osc->osc_ldlm_client, sizeof(*osc->osc_ldlm_client));
572         if (osc->osc_ldlm_client == NULL)
573                 GOTO(out_client, rc = -ENOMEM);
574
575         ptlrpc_init_client(NULL, NULL, OST_REQUEST_PORTAL, OSC_REPLY_PORTAL,
576                            osc->osc_client);
577         ptlrpc_init_client(NULL, NULL, LDLM_REQUEST_PORTAL, LDLM_REPLY_PORTAL,
578                            osc->osc_ldlm_client);
579
580         MOD_INC_USE_COUNT;
581         RETURN(0);
582
583  out_client:
584         OBD_FREE(osc->osc_client, sizeof(*osc->osc_client));
585  out_conn:
586         ptlrpc_put_connection(osc->osc_conn);
587         return rc;
588 }
589
590 static int osc_cleanup(struct obd_device * obddev)
591 {
592         struct osc_obd *osc = &obddev->u.osc;
593
594         ptlrpc_cleanup_client(osc->osc_client);
595         OBD_FREE(osc->osc_client, sizeof(*osc->osc_client));
596         ptlrpc_cleanup_client(osc->osc_ldlm_client);
597         OBD_FREE(osc->osc_ldlm_client, sizeof(*osc->osc_ldlm_client));
598         ptlrpc_put_connection(osc->osc_conn);
599
600         MOD_DEC_USE_COUNT;
601         return 0;
602 }
603
604 struct obd_ops osc_obd_ops = {
605         o_setup:   osc_setup,
606         o_cleanup: osc_cleanup,
607         o_create: osc_create,
608         o_destroy: osc_destroy,
609         o_getattr: osc_getattr,
610         o_setattr: osc_setattr,
611         o_open: osc_open,
612         o_close: osc_close,
613         o_connect: osc_connect,
614         o_disconnect: osc_disconnect,
615         o_brw: osc_brw,
616         o_punch: osc_punch
617 };
618
619 static int __init osc_init(void)
620 {
621         obd_register_type(&osc_obd_ops, LUSTRE_OSC_NAME);
622         return 0;
623 }
624
625 static void __exit osc_exit(void)
626 {
627         obd_unregister_type(LUSTRE_OSC_NAME);
628 }
629
630 MODULE_AUTHOR("Peter J. Braam <braam@clusterfs.com>");
631 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC) v1.0");
632 MODULE_LICENSE("GPL");
633
634 module_init(osc_init);
635 module_exit(osc_exit);