Whamcloud - gitweb
Merge b_md to HEAD for 0.5.19 release.
[fs/lustre-release.git] / lustre / osc / osc_request.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  Copyright (C) 2001, 2002 Cluster File Systems, Inc.
5  *   Author Peter Braam <braam@clusterfs.com>
6  *
7  *   This file is part of Lustre, http://www.lustre.org.
8  *
9  *   Lustre is free software; you can redistribute it and/or
10  *   modify it under the terms of version 2 of the GNU General Public
11  *   License as published by the Free Software Foundation.
12  *
13  *   Lustre is distributed in the hope that it will be useful,
14  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
15  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16  *   GNU General Public License for more details.
17  *
18  *   You should have received a copy of the GNU General Public License
19  *   along with Lustre; if not, write to the Free Software
20  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
21  *
22  *  For testing and management it is treated as an obd_device,
23  *  although * it does not export a full OBD method table (the
24  *  requests are coming * in over the wire, so object target modules
25  *  do not have a full * method table.)
26  *
27  */
28
29 #define EXPORT_SYMTAB
30 #define DEBUG_SUBSYSTEM S_OSC
31
32 #include <linux/version.h>
33 #include <linux/module.h>
34 #include <linux/mm.h>
35 #include <linux/highmem.h>
36 #include <linux/lustre_dlm.h>
37 #if (LINUX_VERSION_CODE >= KERNEL_VERSION(2,5,0))
38 #include <linux/workqueue.h>
39 #endif
40 #include <linux/kp30.h>
41 #include <linux/lustre_mds.h> /* for mds_objid */
42 #include <linux/obd_ost.h>
43 #include <linux/obd_lov.h> /* for IOC_LOV_SET_OSC_ACTIVE */
44 #include <linux/ctype.h>
45 #include <linux/init.h>
46 #include <linux/lustre_ha.h>
47 #include <linux/obd_support.h> /* for OBD_FAIL_CHECK */
48 #include <linux/lustre_lite.h> /* for ll_i2info */
49 #include <portals/lib-types.h> /* for PTL_MD_MAX_IOV */
50 #include <linux/lprocfs_status.h>
51
52 extern struct lprocfs_vars status_var_nm_1[];
53 extern struct lprocfs_vars status_class_var[];
54
55 static int osc_attach(struct obd_device *dev, obd_count len, void *data)
56 {
57         return lprocfs_reg_obd(dev, status_var_nm_1, dev);
58 }
59
60 static int osc_detach(struct obd_device *dev)
61 {
62         return lprocfs_dereg_obd(dev);
63 }
64
65 /* Pack OSC object metadata for shipment to the MDS. */
66 static int osc_packmd(struct lustre_handle *conn, struct lov_mds_md **lmmp,
67                       struct lov_stripe_md *lsm)
68 {
69         int lmm_size;
70         ENTRY;
71
72         lmm_size = sizeof(**lmmp);
73         if (!lmmp)
74                 RETURN(lmm_size);
75
76         if (*lmmp && !lsm) {
77                 OBD_FREE(*lmmp, lmm_size);
78                 *lmmp = NULL;
79                 RETURN(0);
80         }
81
82         if (!*lmmp) {
83                 OBD_ALLOC(*lmmp, lmm_size);
84                 if (!*lmmp)
85                         RETURN(-ENOMEM);
86         }
87         if (lsm) {
88                 LASSERT(lsm->lsm_object_id);
89                 (*lmmp)->lmm_object_id = (lsm->lsm_object_id);
90         }
91
92         RETURN(lmm_size);
93 }
94
95 static int osc_unpackmd(struct lustre_handle *conn, struct lov_stripe_md **lsmp,
96                         struct lov_mds_md *lmm)
97 {
98         int lsm_size;
99         ENTRY;
100
101         lsm_size = sizeof(**lsmp);
102         if (!lsmp)
103                 RETURN(lsm_size);
104
105         if (*lsmp && !lmm) {
106                 OBD_FREE(*lsmp, lsm_size);
107                 *lsmp = NULL;
108                 RETURN(0);
109         }
110
111         if (!*lsmp) {
112                 OBD_ALLOC(*lsmp, lsm_size);
113                 if (!*lsmp)
114                         RETURN(-ENOMEM);
115         }
116
117         /* XXX endianness */
118         if (lmm) {
119                 (*lsmp)->lsm_object_id = (lmm->lmm_object_id);
120                 LASSERT((*lsmp)->lsm_object_id);
121         }
122
123         RETURN(lsm_size);
124 }
125
126 static int osc_getattr(struct lustre_handle *conn, struct obdo *oa,
127                        struct lov_stripe_md *md)
128 {
129         struct ptlrpc_request *request;
130         struct ost_body *body;
131         int rc, size = sizeof(*body);
132         ENTRY;
133
134         request = ptlrpc_prep_req(class_conn2cliimp(conn), OST_GETATTR, 1,
135                                   &size, NULL);
136         if (!request)
137                 RETURN(-ENOMEM);
138
139         body = lustre_msg_buf(request->rq_reqmsg, 0);
140 #warning FIXME: pack only valid fields instead of memcpy, endianness
141         memcpy(&body->oa, oa, sizeof(*oa));
142
143         request->rq_replen = lustre_msg_size(1, &size);
144
145         rc = ptlrpc_queue_wait(request);
146         if (rc) {
147                 CERROR("%s failed: rc = %d\n", __FUNCTION__, rc);
148                 GOTO(out, rc);
149         }
150
151         body = lustre_msg_buf(request->rq_repmsg, 0);
152         CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
153         if (oa)
154                 memcpy(oa, &body->oa, sizeof(*oa));
155
156         EXIT;
157  out:
158         ptlrpc_req_finished(request);
159         return rc;
160 }
161
162 static int osc_open(struct lustre_handle *conn, struct obdo *oa,
163                     struct lov_stripe_md *md)
164 {
165         struct ptlrpc_request *request;
166         struct ost_body *body;
167         int rc, size = sizeof(*body);
168         ENTRY;
169
170         request = ptlrpc_prep_req(class_conn2cliimp(conn), OST_OPEN, 1, &size,
171                                   NULL);
172         if (!request)
173                 RETURN(-ENOMEM);
174
175         body = lustre_msg_buf(request->rq_reqmsg, 0);
176 #warning FIXME: pack only valid fields instead of memcpy, endianness
177         memcpy(&body->oa, oa, sizeof(*oa));
178
179         request->rq_replen = lustre_msg_size(1, &size);
180
181         rc = ptlrpc_queue_wait(request);
182         if (rc)
183                 GOTO(out, rc);
184
185         body = lustre_msg_buf(request->rq_repmsg, 0);
186         CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
187         if (oa)
188                 memcpy(oa, &body->oa, sizeof(*oa));
189
190         EXIT;
191  out:
192         ptlrpc_req_finished(request);
193         return rc;
194 }
195
196 static int osc_close(struct lustre_handle *conn, struct obdo *oa,
197                      struct lov_stripe_md *md)
198 {
199         struct ptlrpc_request *request;
200         struct ost_body *body;
201         int rc, size = sizeof(*body);
202         ENTRY;
203
204         request = ptlrpc_prep_req(class_conn2cliimp(conn), OST_CLOSE, 1, &size,
205                                   NULL);
206         if (!request)
207                 RETURN(-ENOMEM);
208
209         body = lustre_msg_buf(request->rq_reqmsg, 0);
210 #warning FIXME: pack only valid fields instead of memcpy, endianness
211         memcpy(&body->oa, oa, sizeof(*oa));
212
213         request->rq_replen = lustre_msg_size(1, &size);
214
215         rc = ptlrpc_queue_wait(request);
216         if (rc)
217                 GOTO(out, rc);
218
219         body = lustre_msg_buf(request->rq_repmsg, 0);
220         CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
221         if (oa)
222                 memcpy(oa, &body->oa, sizeof(*oa));
223
224         EXIT;
225  out:
226         ptlrpc_req_finished(request);
227         return rc;
228 }
229
230 static int osc_setattr(struct lustre_handle *conn, struct obdo *oa,
231                        struct lov_stripe_md *md)
232 {
233         struct ptlrpc_request *request;
234         struct ost_body *body;
235         int rc, size = sizeof(*body);
236         ENTRY;
237
238         request = ptlrpc_prep_req(class_conn2cliimp(conn), OST_SETATTR, 1,
239                                   &size, NULL);
240         if (!request)
241                 RETURN(-ENOMEM);
242
243         body = lustre_msg_buf(request->rq_reqmsg, 0);
244         memcpy(&body->oa, oa, sizeof(*oa));
245
246         request->rq_replen = lustre_msg_size(1, &size);
247
248         rc = ptlrpc_queue_wait(request);
249
250         ptlrpc_req_finished(request);
251         return rc;
252 }
253
254 static int osc_create(struct lustre_handle *conn, struct obdo *oa,
255                       struct lov_stripe_md **ea)
256 {
257         struct ptlrpc_request *request;
258         struct ost_body *body;
259         struct lov_stripe_md *lsm;
260         int rc, size = sizeof(*body);
261         ENTRY;
262
263         LASSERT(oa);
264         LASSERT(ea);
265
266         lsm = *ea;
267         if (!lsm) {
268                 rc = obd_alloc_memmd(conn, &lsm);
269                 if (rc < 0)
270                         RETURN(rc);
271         }
272
273         request = ptlrpc_prep_req(class_conn2cliimp(conn), OST_CREATE, 1, &size,
274                                   NULL);
275         if (!request)
276                 GOTO(out, rc = -ENOMEM);
277
278         body = lustre_msg_buf(request->rq_reqmsg, 0);
279         memcpy(&body->oa, oa, sizeof(*oa));
280
281         request->rq_replen = lustre_msg_size(1, &size);
282
283         rc = ptlrpc_queue_wait(request);
284         if (rc)
285                 GOTO(out_req, rc);
286
287         body = lustre_msg_buf(request->rq_repmsg, 0);
288         memcpy(oa, &body->oa, sizeof(*oa));
289
290         lsm->lsm_object_id = oa->o_id;
291         lsm->lsm_stripe_count = 0;
292         *ea = lsm;
293         EXIT;
294 out_req:
295         ptlrpc_req_finished(request);
296 out:
297         if (rc && !*ea)
298                 obd_free_memmd(conn, &lsm);
299         return rc;
300 }
301
302 static int osc_punch(struct lustre_handle *conn, struct obdo *oa,
303                      struct lov_stripe_md *md, obd_size start,
304                      obd_size end)
305 {
306         struct ptlrpc_request *request;
307         struct ost_body *body;
308         int rc, size = sizeof(*body);
309         ENTRY;
310
311         if (!oa) {
312                 CERROR("oa NULL\n");
313                 RETURN(-EINVAL);
314         }
315
316         request = ptlrpc_prep_req(class_conn2cliimp(conn), OST_PUNCH, 1, &size,
317                                   NULL);
318         if (!request)
319                 RETURN(-ENOMEM);
320
321         body = lustre_msg_buf(request->rq_reqmsg, 0);
322 #warning FIXME: pack only valid fields instead of memcpy, endianness, valid
323         memcpy(&body->oa, oa, sizeof(*oa));
324
325         /* overload the size and blocks fields in the oa with start/end */
326         body->oa.o_size = HTON__u64(start);
327         body->oa.o_blocks = HTON__u64(end);
328         body->oa.o_valid |= HTON__u32(OBD_MD_FLSIZE | OBD_MD_FLBLOCKS);
329
330         request->rq_replen = lustre_msg_size(1, &size);
331
332         rc = ptlrpc_queue_wait(request);
333         if (rc)
334                 GOTO(out, rc);
335
336         body = lustre_msg_buf(request->rq_repmsg, 0);
337         memcpy(oa, &body->oa, sizeof(*oa));
338
339         EXIT;
340  out:
341         ptlrpc_req_finished(request);
342         return rc;
343 }
344
345 static int osc_destroy(struct lustre_handle *conn, struct obdo *oa,
346                        struct lov_stripe_md *ea)
347 {
348         struct ptlrpc_request *request;
349         struct ost_body *body;
350         int rc, size = sizeof(*body);
351         ENTRY;
352
353         if (!oa) {
354                 CERROR("oa NULL\n");
355                 RETURN(-EINVAL);
356         }
357         request = ptlrpc_prep_req(class_conn2cliimp(conn), OST_DESTROY, 1,
358                                   &size, NULL);
359         if (!request)
360                 RETURN(-ENOMEM);
361
362         body = lustre_msg_buf(request->rq_reqmsg, 0);
363 #warning FIXME: pack only valid fields instead of memcpy, endianness
364         memcpy(&body->oa, oa, sizeof(*oa));
365
366         request->rq_replen = lustre_msg_size(1, &size);
367
368         rc = ptlrpc_queue_wait(request);
369         if (rc)
370                 GOTO(out, rc);
371
372         body = lustre_msg_buf(request->rq_repmsg, 0);
373         memcpy(oa, &body->oa, sizeof(*oa));
374
375         EXIT;
376  out:
377         ptlrpc_req_finished(request);
378         return rc;
379 }
380
381 /* Our bulk-unmapping bottom half. */
382 static void unmap_and_decref_bulk_desc(void *data)
383 {
384         struct ptlrpc_bulk_desc *desc = data;
385         struct list_head *tmp;
386         ENTRY;
387
388         /* This feels wrong to me. */
389         list_for_each(tmp, &desc->bd_page_list) {
390                 struct ptlrpc_bulk_page *bulk;
391                 bulk = list_entry(tmp, struct ptlrpc_bulk_page, bp_link);
392
393                 kunmap(bulk->bp_page);
394                 obd_kmap_put(1);
395         }
396
397         ptlrpc_bulk_decref(desc);
398         EXIT;
399 }
400
401 /*  this is the callback function which is invoked by the Portals
402  *  event handler associated with the bulk_sink queue and bulk_source queue.
403  */
404 static void osc_ptl_ev_hdlr(struct ptlrpc_bulk_desc *desc)
405 {
406         ENTRY;
407
408         LASSERT(desc->bd_brw_set != NULL);
409         LASSERT(desc->bd_brw_set->brw_callback != NULL);
410
411         desc->bd_brw_set->brw_callback(desc->bd_brw_set, CB_PHASE_FINISH);
412
413         /* We can't kunmap the desc from interrupt context, so we do it from
414          * the bottom half above. */
415         prepare_work(&desc->bd_queue, unmap_and_decref_bulk_desc, desc);
416         schedule_work(&desc->bd_queue);
417
418         EXIT;
419 }
420
421 static int osc_brw_read(struct lustre_handle *conn, struct lov_stripe_md *lsm,
422                         obd_count page_count, struct brw_page *pga,
423                         struct obd_brw_set *set)
424 {
425         struct obd_import *imp = class_conn2cliimp(conn);
426         struct ptlrpc_connection *connection = imp->imp_connection;
427         struct ptlrpc_request *request = NULL;
428         struct ptlrpc_bulk_desc *desc = NULL;
429         struct ost_body *body;
430         int rc, size[3] = {sizeof(*body)}, mapped = 0;
431         unsigned long flags;
432         struct obd_ioobj *iooptr;
433         void *nioptr;
434         __u32 xid;
435         ENTRY;
436
437         size[1] = sizeof(struct obd_ioobj);
438         size[2] = page_count * sizeof(struct niobuf_remote);
439
440         request = ptlrpc_prep_req(imp, OST_READ, 3, size, NULL);
441         if (!request)
442                 RETURN(-ENOMEM);
443
444         body = lustre_msg_buf(request->rq_reqmsg, 0);
445
446         desc = ptlrpc_prep_bulk(connection);
447         if (!desc)
448                 GOTO(out_req, rc = -ENOMEM);
449         desc->bd_portal = OST_BULK_PORTAL;
450         desc->bd_ptl_ev_hdlr = osc_ptl_ev_hdlr;
451         CDEBUG(D_PAGE, "desc = %p\n", desc);
452
453         iooptr = lustre_msg_buf(request->rq_reqmsg, 1);
454         nioptr = lustre_msg_buf(request->rq_reqmsg, 2);
455         ost_pack_ioo(&iooptr, lsm, page_count);
456         /* end almost identical to brw_write case */
457
458         spin_lock_irqsave(&imp->imp_lock, flags);
459         xid = ++imp->imp_last_xid;       /* single xid for all pages */
460         spin_unlock_irqrestore(&imp->imp_lock, flags);
461
462         obd_kmap_get(page_count, 0);
463
464         for (mapped = 0; mapped < page_count; mapped++) {
465                 struct ptlrpc_bulk_page *bulk = ptlrpc_prep_bulk_page(desc);
466                 if (bulk == NULL)
467                         GOTO(out_unmap, rc = -ENOMEM);
468
469                 bulk->bp_xid = xid;           /* single xid for all pages */
470
471                 bulk->bp_buf = kmap(pga[mapped].pg);
472                 bulk->bp_page = pga[mapped].pg;
473                 bulk->bp_buflen = PAGE_SIZE;
474                 ost_pack_niobuf(&nioptr, pga[mapped].off, pga[mapped].count,
475                                 pga[mapped].flag, bulk->bp_xid);
476         }
477
478         /*
479          * Register the bulk first, because the reply could arrive out of order,
480          * and we want to be ready for the bulk data.
481          *
482          * One reference is released when brw_finish is complete, the other when
483          * the caller removes us from the "set" list.
484          *
485          * On error, we never do the brw_finish, so we handle all decrefs.
486          */
487         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_READ_BULK)) {
488                 CERROR("obd_fail_loc=%x, skipping register_bulk\n",
489                        OBD_FAIL_OSC_BRW_READ_BULK);
490         } else {
491                 rc = ptlrpc_register_bulk(desc);
492                 if (rc)
493                         GOTO(out_unmap, rc);
494                 obd_brw_set_add(set, desc);
495         }
496
497         request->rq_replen = lustre_msg_size(1, size);
498         rc = ptlrpc_queue_wait(request);
499
500         /*
501          * XXX: If there is an error during the processing of the callback,
502          *      such as a timeout in a sleep that it performs, brw_finish
503          *      will never get called, and we'll leak the desc, fail to kunmap
504          *      things, cats will live with dogs.  One solution would be to
505          *      export brw_finish as osc_brw_finish, so that the timeout case
506          *      and its kin could call it for proper cleanup.  An alternative
507          *      would be for an error return from the callback to cause us to
508          *      clean up, but that doesn't help the truly async cases (like
509          *      LOV), which will immediately return from their PHASE_START
510          *      callback, before any such cleanup-requiring error condition can
511          *      be detected.
512          */
513  out_req:
514         ptlrpc_req_finished(request);
515         RETURN(rc);
516
517         /* Clean up on error. */
518 out_unmap:
519         while (mapped-- > 0)
520                 kunmap(pga[mapped].pg);
521         obd_kmap_put(page_count);
522         ptlrpc_bulk_decref(desc);
523         goto out_req;
524 }
525
526 static int osc_brw_write(struct lustre_handle *conn, struct lov_stripe_md *lsm,
527                          obd_count page_count, struct brw_page *pga,
528                          struct obd_brw_set *set)
529 {
530         struct obd_import *imp = class_conn2cliimp(conn);
531         struct ptlrpc_connection *connection = imp->imp_connection;
532         struct ptlrpc_request *request = NULL;
533         struct ptlrpc_bulk_desc *desc = NULL;
534         struct ost_body *body;
535         struct niobuf_local *local = NULL;
536         struct niobuf_remote *remote;
537         int rc, size[3] = {sizeof(*body)}, mapped = 0;
538         int j;
539         struct obd_ioobj *iooptr;
540         void *nioptr;
541         ENTRY;
542
543         size[1] = sizeof(struct obd_ioobj);
544         size[2] = page_count * sizeof(struct niobuf_remote);
545
546         request = ptlrpc_prep_req(imp, OST_WRITE, 3, size, NULL);
547         if (!request)
548                 RETURN(-ENOMEM);
549
550         body = lustre_msg_buf(request->rq_reqmsg, 0);
551
552         desc = ptlrpc_prep_bulk(connection);
553         if (!desc)
554                 GOTO(out_req, rc = -ENOMEM);
555         desc->bd_portal = OSC_BULK_PORTAL;
556         desc->bd_ptl_ev_hdlr = osc_ptl_ev_hdlr;
557         CDEBUG(D_PAGE, "desc = %p\n", desc);
558
559         iooptr = lustre_msg_buf(request->rq_reqmsg, 1);
560         nioptr = lustre_msg_buf(request->rq_reqmsg, 2);
561         ost_pack_ioo(&iooptr, lsm, page_count);
562         /* end almost identical to brw_read case */
563
564         OBD_ALLOC(local, page_count * sizeof(*local));
565         if (!local)
566                 GOTO(out_desc, rc = -ENOMEM);
567
568         obd_kmap_get(page_count, 0);
569
570         for (mapped = 0; mapped < page_count; mapped++) {
571                 local[mapped].addr = kmap(pga[mapped].pg);
572
573                 CDEBUG(D_INFO, "kmap(pg) = %p ; pg->flags = %lx ; pg->refcount = "
574                        "%d ; page %d of %d\n",
575                        local[mapped].addr, pga[mapped].pg->flags,
576                        page_count(pga[mapped].pg),
577                        mapped, page_count - 1);
578
579                 local[mapped].offset = pga[mapped].off;
580                 local[mapped].len = pga[mapped].count;
581                 ost_pack_niobuf(&nioptr, pga[mapped].off, pga[mapped].count,
582                                 pga[mapped].flag, 0);
583         }
584
585         size[1] = page_count * sizeof(*remote);
586         request->rq_replen = lustre_msg_size(2, size);
587         rc = ptlrpc_queue_wait(request);
588         if (rc)
589                 GOTO(out_unmap, rc);
590
591         nioptr = lustre_msg_buf(request->rq_repmsg, 1);
592         if (!nioptr)
593                 GOTO(out_unmap, rc = -EINVAL);
594
595         if (request->rq_repmsg->buflens[1] != size[1]) {
596                 CERROR("buffer length wrong (%d vs. %d)\n",
597                        request->rq_repmsg->buflens[1], size[1]);
598                 GOTO(out_unmap, rc = -EINVAL);
599         }
600
601         for (j = 0; j < page_count; j++) {
602                 struct ptlrpc_bulk_page *bulk;
603
604                 ost_unpack_niobuf(&nioptr, &remote);
605
606                 bulk = ptlrpc_prep_bulk_page(desc);
607                 if (!bulk)
608                         GOTO(out_unmap, rc = -ENOMEM);
609
610                 bulk->bp_buf = local[j].addr;
611                 bulk->bp_buflen = local[j].len;
612                 bulk->bp_xid = remote->xid;
613                 bulk->bp_page = pga[j].pg;
614         }
615
616         if (desc->bd_page_count != page_count)
617                 LBUG();
618
619         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_WRITE_BULK))
620                 GOTO(out_unmap, rc = 0);
621
622         OBD_FREE(local, page_count * sizeof(*local));
623
624         /* One reference is released when brw_finish is complete, the other
625          * when the caller removes it from the "set" list. */
626         obd_brw_set_add(set, desc);
627         rc = ptlrpc_send_bulk(desc);
628
629         /* XXX: Mike, same question as in osc_brw_read. */
630 out_req:
631         ptlrpc_req_finished(request);
632         RETURN(rc);
633
634         /* Clean up on error. */
635 out_unmap:
636         while (mapped-- > 0)
637                 kunmap(pga[mapped].pg);
638
639         obd_kmap_put(page_count);
640
641         OBD_FREE(local, page_count * sizeof(*local));
642 out_desc:
643         ptlrpc_bulk_decref(desc);
644         goto out_req;
645 }
646
647 static int osc_brw(int cmd, struct lustre_handle *conn,
648                    struct lov_stripe_md *md, obd_count page_count,
649                    struct brw_page *pga, struct obd_brw_set *set)
650 {
651         ENTRY;
652
653         while (page_count) {
654                 obd_count pages_per_brw;
655                 int rc;
656
657                 if (page_count > PTL_MD_MAX_IOV)
658                         pages_per_brw = PTL_MD_MAX_IOV;
659                 else
660                         pages_per_brw = page_count;
661
662                 if (cmd & OBD_BRW_WRITE)
663                         rc = osc_brw_write(conn, md, pages_per_brw, pga, set);
664                 else
665                         rc = osc_brw_read(conn, md, pages_per_brw, pga, set);
666
667                 if (rc != 0)
668                         RETURN(rc);
669
670                 page_count -= pages_per_brw;
671                 pga += pages_per_brw;
672         }
673         RETURN(0);
674 }
675
676 static int osc_enqueue(struct lustre_handle *connh, struct lov_stripe_md *lsm,
677                        struct lustre_handle *parent_lock,
678                        __u32 type, void *extentp, int extent_len, __u32 mode,
679                        int *flags, void *callback, void *data, int datalen,
680                        struct lustre_handle *lockh)
681 {
682         __u64 res_id[RES_NAME_SIZE] = { lsm->lsm_object_id };
683         struct obd_device *obddev = class_conn2obd(connh);
684         struct ldlm_extent *extent = extentp;
685         int rc;
686         ENTRY;
687
688         /* Filesystem locks are given a bit of special treatment: if
689          * this is not a file size lock (which has end == -1), we
690          * fixup the lock to start and end on page boundaries. */
691         if (extent->end != OBD_OBJECT_EOF) {
692                 extent->start &= PAGE_MASK;
693                 extent->end = (extent->end & PAGE_MASK) + PAGE_SIZE - 1;
694         }
695
696         /* Next, search for already existing extent locks that will cover us */
697         rc = ldlm_lock_match(obddev->obd_namespace, res_id, type, extent,
698                              sizeof(extent), mode, lockh);
699         if (rc == 1)
700                 /* We already have a lock, and it's referenced */
701                 RETURN(ELDLM_OK);
702
703         /* If we're trying to read, we also search for an existing PW lock.  The
704          * VFS and page cache already protect us locally, so lots of readers/
705          * writers can share a single PW lock.
706          *
707          * There are problems with conversion deadlocks, so instead of
708          * converting a read lock to a write lock, we'll just enqueue a new
709          * one.
710          *
711          * At some point we should cancel the read lock instead of making them
712          * send us a blocking callback, but there are problems with canceling
713          * locks out from other users right now, too. */
714
715         if (mode == LCK_PR) {
716                 rc = ldlm_lock_match(obddev->obd_namespace, res_id, type,
717                                      extent, sizeof(extent), LCK_PW, lockh);
718                 if (rc == 1) {
719                         /* FIXME: This is not incredibly elegant, but it might
720                          * be more elegant than adding another parameter to
721                          * lock_match.  I want a second opinion. */
722                         ldlm_lock_addref(lockh, LCK_PR);
723                         ldlm_lock_decref(lockh, LCK_PW);
724
725                         RETURN(ELDLM_OK);
726                 }
727         }
728
729         rc = ldlm_cli_enqueue(connh, NULL, obddev->obd_namespace, parent_lock,
730                               res_id, type, extent, sizeof(extent), mode, flags,
731                               ldlm_completion_ast, callback, data, datalen,
732                               lockh);
733         RETURN(rc);
734 }
735
736 static int osc_cancel(struct lustre_handle *oconn, struct lov_stripe_md *md,
737                       __u32 mode, struct lustre_handle *lockh)
738 {
739         ENTRY;
740
741         ldlm_lock_decref(lockh, mode);
742
743         RETURN(0);
744 }
745
746 static int osc_cancel_unused(struct lustre_handle *connh,
747                              struct lov_stripe_md *lsm, int flags)
748 {
749         struct obd_device *obddev = class_conn2obd(connh);
750         __u64 res_id[RES_NAME_SIZE] = { lsm->lsm_object_id };
751
752         return ldlm_cli_cancel_unused(obddev->obd_namespace, res_id, flags);
753 }
754
755 static int osc_statfs(struct lustre_handle *conn, struct obd_statfs *osfs)
756 {
757         struct ptlrpc_request *request;
758         int rc, size = sizeof(*osfs);
759         ENTRY;
760
761         request = ptlrpc_prep_req(class_conn2cliimp(conn), OST_STATFS, 0, NULL,
762                                   NULL);
763         if (!request)
764                 RETURN(-ENOMEM);
765
766         request->rq_replen = lustre_msg_size(1, &size);
767
768         rc = ptlrpc_queue_wait(request);
769         if (rc) {
770                 CERROR("%s failed: rc = %d\n", __FUNCTION__, rc);
771                 GOTO(out, rc);
772         }
773
774         obd_statfs_unpack(osfs, lustre_msg_buf(request->rq_repmsg, 0));
775
776         EXIT;
777  out:
778         ptlrpc_req_finished(request);
779         return rc;
780 }
781
782 /* Retrieve object striping information.
783  *
784  * @lmmu is a pointer to an in-core struct with lmm_ost_count indicating
785  * the maximum number of OST indices which will fit in the user buffer.
786  * lmm_magic must be LOV_MAGIC (we only use 1 slot here).
787  */
788 static int osc_getstripe(struct lustre_handle *conn, struct lov_stripe_md *lsm,
789                          struct lov_mds_md *lmmu)
790 {
791         struct lov_mds_md lmm, *lmmk;
792         int rc, lmm_size;
793         ENTRY;
794
795         if (!lsm)
796                 RETURN(-ENODATA);
797
798         rc = copy_from_user(&lmm, lmmu, sizeof(lmm));
799         if (rc)
800                 RETURN(-EFAULT);
801
802         if (lmm.lmm_magic != LOV_MAGIC)
803                 RETURN(-EINVAL);
804
805         if (lmm.lmm_ost_count < 1)
806                 RETURN(-EOVERFLOW);
807
808         lmm_size = sizeof(lmm) + sizeof(lmm.lmm_objects[0]);
809         OBD_ALLOC(lmmk, lmm_size);
810         if (rc < 0)
811                 RETURN(rc);
812
813         lmmk->lmm_stripe_count = 1;
814         lmmk->lmm_ost_count = 1;
815         lmmk->lmm_object_id = lsm->lsm_object_id;
816         lmmk->lmm_objects[0].l_object_id = lsm->lsm_object_id;
817
818         if (copy_to_user(lmmu, lmmk, lmm_size))
819                 rc = -EFAULT;
820
821         OBD_FREE(lmmk, lmm_size);
822
823         RETURN(rc);
824 }
825
826 static int osc_iocontrol(unsigned int cmd, struct lustre_handle *conn, int len,
827                          void *karg, void *uarg)
828 {
829         struct obd_device *obddev = class_conn2obd(conn);
830         struct obd_ioctl_data *data = karg;
831         int err = 0;
832         ENTRY;
833
834         switch (cmd) {
835         case IOC_LDLM_TEST: {
836                 err = ldlm_test(obddev, conn);
837                 CERROR("-- done err %d\n", err);
838                 GOTO(out, err);
839         }
840         case IOC_LDLM_REGRESS_START: {
841                 unsigned int numthreads = 1;
842                 unsigned int numheld = 10;
843                 unsigned int numres = 10;
844                 unsigned int numext = 10;
845                 char *parse;
846
847                 if (data->ioc_inllen1) {
848                         parse = data->ioc_inlbuf1;
849                         if (*parse != '\0') {
850                                 while(isspace(*parse)) parse++;
851                                 numthreads = simple_strtoul(parse, &parse, 0);
852                                 while(isspace(*parse)) parse++;
853                         }
854                         if (*parse != '\0') {
855                                 while(isspace(*parse)) parse++;
856                                 numheld = simple_strtoul(parse, &parse, 0);
857                                 while(isspace(*parse)) parse++;
858                         }
859                         if (*parse != '\0') {
860                                 while(isspace(*parse)) parse++;
861                                 numres = simple_strtoul(parse, &parse, 0);
862                                 while(isspace(*parse)) parse++;
863                         }
864                         if (*parse != '\0') {
865                                 while(isspace(*parse)) parse++;
866                                 numext = simple_strtoul(parse, &parse, 0);
867                                 while(isspace(*parse)) parse++;
868                         }
869                 }
870
871                 err = ldlm_regression_start(obddev, conn, numthreads,
872                                 numheld, numres, numext);
873
874                 CERROR("-- done err %d\n", err);
875                 GOTO(out, err);
876         }
877         case IOC_LDLM_REGRESS_STOP: {
878                 err = ldlm_regression_stop();
879                 CERROR("-- done err %d\n", err);
880                 GOTO(out, err);
881         }
882         case IOC_OSC_REGISTER_LOV: {
883                 if (obddev->u.cli.cl_containing_lov)
884                         GOTO(out, err = -EALREADY);
885                 obddev->u.cli.cl_containing_lov = (struct obd_device *)karg;
886                 GOTO(out, err);
887         }
888         case OBD_IOC_LOV_GET_CONFIG: {
889                 char *buf;
890                 struct lov_desc *desc;
891                 obd_uuid_t *uuidp;
892
893                 buf = NULL;
894                 len = 0;
895                 if (obd_ioctl_getdata(&buf, &len, (void *)uarg))
896                         GOTO(out, err = -EINVAL);
897
898                 data = (struct obd_ioctl_data *)buf;
899
900                 if (sizeof(*desc) > data->ioc_inllen1) {
901                         OBD_FREE(buf, len);
902                         GOTO(out, err = -EINVAL);
903                 }
904
905                 if (data->ioc_inllen2 < sizeof(*uuidp)) {
906                         OBD_FREE(buf, len);
907                         GOTO(out, err = -EINVAL);
908                 }
909
910                 desc = (struct lov_desc *)data->ioc_inlbuf1;
911                 desc->ld_tgt_count = 1;
912                 desc->ld_active_tgt_count = 1;
913                 desc->ld_default_stripe_count = 1;
914                 desc->ld_default_stripe_size = 0;
915                 desc->ld_default_stripe_offset = 0;
916                 desc->ld_pattern = 0;
917                 memcpy(desc->ld_uuid,  obddev->obd_uuid, sizeof(*uuidp));
918
919                 uuidp = (obd_uuid_t *)data->ioc_inlbuf2;
920                 memcpy(uuidp,  obddev->obd_uuid, sizeof(*uuidp));
921
922                 err = copy_to_user((void *)uarg, buf, len);
923                 if (err)
924                         err = -EFAULT;
925                 OBD_FREE(buf, len);
926                 GOTO(out, err);
927         }
928         case LL_IOC_LOV_SETSTRIPE:
929                 err = obd_alloc_memmd(conn, karg);
930                 if (err > 0)
931                         err = 0;
932                 GOTO(out, err);
933         case LL_IOC_LOV_GETSTRIPE:
934                 err = osc_getstripe(conn, karg, uarg);
935                 GOTO(out, err);
936         default:
937                 CERROR ("osc_ioctl(): unrecognised ioctl %#x\n", cmd);
938                 GOTO(out, err = -ENOTTY);
939         }
940 out:
941         return err;
942 }
943
944 static void set_osc_active(struct obd_import *imp, int active)
945 {
946         struct obd_device *notify_obd = imp->imp_obd->u.cli.cl_containing_lov;
947
948         if (notify_obd == NULL)
949                 return;
950
951         /* How gross is _this_? */
952         if (!list_empty(&notify_obd->obd_exports)) {
953                 int rc;
954                 struct lustre_handle fakeconn;
955                 struct obd_ioctl_data ioc_data;
956                 struct obd_export *exp =
957                         list_entry(notify_obd->obd_exports.next,
958                                    struct obd_export, exp_obd_chain);
959
960                 fakeconn.addr = (__u64)(unsigned long)exp;
961                 fakeconn.cookie = exp->exp_cookie;
962                 ioc_data.ioc_inlbuf1 = imp->imp_obd->u.cli.cl_target_uuid;
963                 ioc_data.ioc_offset = active;
964                 rc = obd_iocontrol(IOC_LOV_SET_OSC_ACTIVE, &fakeconn,
965                                    sizeof ioc_data, &ioc_data, NULL);
966                 if (rc)
967                         CERROR("disabling %s on LOV %p/%s: %d\n",
968                                imp->imp_obd->obd_uuid, notify_obd,
969                                notify_obd->obd_uuid, rc);
970         } else {
971                 CDEBUG(D_HA, "No exports for obd %p/%s, can't notify about "
972                        "%p\n", notify_obd, notify_obd->obd_uuid,
973                        imp->imp_obd->obd_uuid);
974         }
975 }
976
977 static int osc_recover(struct obd_import *imp, int phase)
978 {
979         int rc;
980         unsigned long flags;
981         struct ptlrpc_request *req;
982         ENTRY;
983
984         switch(phase) {
985
986             case PTLRPC_RECOVD_PHASE_PREPARE: {
987                 struct ldlm_namespace *ns = imp->imp_obd->obd_namespace;
988                 ldlm_namespace_cleanup(ns, 1 /* no network ops */);
989                 ptlrpc_abort_inflight(imp);
990                 set_osc_active(imp, 0 /* inactive */);
991                 RETURN(0);
992             }
993
994             case PTLRPC_RECOVD_PHASE_RECOVER:
995                 imp->imp_flags &= ~IMP_INVALID;
996                 rc = ptlrpc_reconnect_import(imp, OST_CONNECT, &req);
997                 ptlrpc_req_finished(req);
998                 if (rc) {
999                         imp->imp_flags |= IMP_INVALID;
1000                         RETURN(rc);
1001                 }
1002
1003                 spin_lock_irqsave(&imp->imp_lock, flags);
1004                 imp->imp_level = LUSTRE_CONN_FULL;
1005                 spin_unlock_irqrestore(&imp->imp_lock, flags);
1006
1007                 /* Is this the right place?  Should we do this in _PREPARE
1008                  * as well?  What about raising the level right away?
1009                  */
1010                 ptlrpc_wake_delayed(imp);
1011
1012                 set_osc_active(imp, 1 /* active */);
1013                 RETURN(0);
1014
1015             case PTLRPC_RECOVD_PHASE_NOTCONN:
1016                 osc_recover(imp, PTLRPC_RECOVD_PHASE_PREPARE);
1017                 RETURN(osc_recover(imp, PTLRPC_RECOVD_PHASE_RECOVER));
1018
1019             default:
1020                 RETURN(-EINVAL);
1021         }
1022 }
1023
1024 static int osc_connect(struct lustre_handle *conn, struct obd_device *obd,
1025                        obd_uuid_t cluuid, struct recovd_obd *recovd,
1026                        ptlrpc_recovery_cb_t recover)
1027 {
1028         struct obd_import *imp = &obd->u.cli.cl_import;
1029         imp->imp_recover = osc_recover;
1030         return client_obd_connect(conn, obd, cluuid, recovd, recover);
1031 }
1032
1033 struct obd_ops osc_obd_ops = {
1034         o_owner:        THIS_MODULE,
1035         o_attach:       osc_attach,
1036         o_detach:       osc_detach,
1037         o_setup:        client_obd_setup,
1038         o_cleanup:      client_obd_cleanup,
1039         o_connect:      osc_connect,
1040         o_disconnect:   client_obd_disconnect,
1041         o_statfs:       osc_statfs,
1042         o_packmd:       osc_packmd,
1043         o_unpackmd:     osc_unpackmd,
1044         o_create:       osc_create,
1045         o_destroy:      osc_destroy,
1046         o_getattr:      osc_getattr,
1047         o_setattr:      osc_setattr,
1048         o_open:         osc_open,
1049         o_close:        osc_close,
1050         o_brw:          osc_brw,
1051         o_punch:        osc_punch,
1052         o_enqueue:      osc_enqueue,
1053         o_cancel:       osc_cancel,
1054         o_cancel_unused: osc_cancel_unused,
1055         o_iocontrol:    osc_iocontrol
1056 };
1057
1058 static int __init osc_init(void)
1059 {
1060         RETURN(class_register_type(&osc_obd_ops, status_class_var,
1061                                    LUSTRE_OSC_NAME));
1062 }
1063
1064 static void __exit osc_exit(void)
1065 {
1066         class_unregister_type(LUSTRE_OSC_NAME);
1067 }
1068
1069 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
1070 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC) v1.0");
1071 MODULE_LICENSE("GPL");
1072
1073 module_init(osc_init);
1074 module_exit(osc_exit);