Whamcloud - gitweb
Landing of b_recovery (at last).
[fs/lustre-release.git] / lustre / osc / osc_request.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  Copyright (C) 2001, 2002 Cluster File Systems, Inc.
5  *   Author Peter Braam <braam@clusterfs.com>
6  *
7  *   This file is part of Lustre, http://www.lustre.org.
8  *
9  *   Lustre is free software; you can redistribute it and/or
10  *   modify it under the terms of version 2 of the GNU General Public
11  *   License as published by the Free Software Foundation.
12  *
13  *   Lustre is distributed in the hope that it will be useful,
14  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
15  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16  *   GNU General Public License for more details.
17  *
18  *   You should have received a copy of the GNU General Public License
19  *   along with Lustre; if not, write to the Free Software
20  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
21  *
22  *  For testing and management it is treated as an obd_device,
23  *  although * it does not export a full OBD method table (the
24  *  requests are coming * in over the wire, so object target modules
25  *  do not have a full * method table.)
26  *
27  */
28
29 #define EXPORT_SYMTAB
30 #define DEBUG_SUBSYSTEM S_OSC
31
32 #include <linux/version.h>
33 #include <linux/module.h>
34 #include <linux/mm.h>
35 #include <linux/highmem.h>
36 #include <linux/lustre_dlm.h>
37 #if (LINUX_VERSION_CODE >= KERNEL_VERSION(2,5,0))
38 #include <linux/workqueue.h>
39 #endif
40 #include <linux/kp30.h>
41 #include <linux/lustre_mds.h> /* for mds_objid */
42 #include <linux/obd_ost.h>
43 #include <linux/ctype.h>
44 #include <linux/init.h>
45 #include <linux/lustre_ha.h>
46 #include <linux/obd_support.h> /* for OBD_FAIL_CHECK */
47 #include <linux/lustre_lite.h> /* for ll_i2info */
48 #include <portals/lib-types.h> /* for PTL_MD_MAX_IOV */
49 #include <linux/lprocfs_status.h>
50
51 extern struct lprocfs_vars status_var_nm_1[];
52 extern struct lprocfs_vars status_class_var[];
53
54 int osc_attach(struct obd_device *dev, obd_count len, void *data)
55 {
56         return lprocfs_reg_obd(dev, status_var_nm_1, dev);
57 }
58
59 int osc_detach(struct obd_device *dev)
60 {
61         return lprocfs_dereg_obd(dev);
62 }
63
64 /* Pack OSC object metadata for shipment to the MDS. */
65 static int osc_packmd(struct lustre_handle *conn, struct lov_mds_md **lmmp,
66                       struct lov_stripe_md *lsm)
67 {
68         int lmm_size;
69
70         lmm_size = sizeof(**lmmp);
71         if (!lmmp)
72                 RETURN(lmm_size);
73
74         if (*lmmp && !lsm) {
75                 OBD_FREE(*lmmp, lmm_size);
76                 *lmmp = NULL;
77                 RETURN(0);
78         }
79
80         if (!*lmmp) {
81                 OBD_ALLOC(*lmmp, lmm_size);
82                 if (!*lmmp)
83                         RETURN(-ENOMEM);
84         }
85         if (lsm)
86                 (*lmmp)->lmm_object_id = (lsm->lsm_object_id);
87
88         return lmm_size;
89 }
90
91 static int osc_unpackmd(struct lustre_handle *conn, struct lov_stripe_md **lsmp,
92                         struct lov_mds_md *lmm)
93 {
94         int lsm_size;
95
96         lsm_size = sizeof(**lsmp);
97         if (!lsmp)
98                 RETURN(lsm_size);
99
100         if (*lsmp && !lmm) {
101                 OBD_FREE(*lsmp, lsm_size);
102                 *lsmp = NULL;
103                 RETURN(0);
104         }
105
106         if (!*lsmp) {
107                 OBD_ALLOC(*lsmp, lsm_size);
108                 if (!*lsmp)
109                         RETURN(-ENOMEM);
110         }
111
112         /* XXX endianness */
113         if (lmm)
114                 (*lsmp)->lsm_object_id = (lmm->lmm_object_id);
115
116         return lsm_size;
117 }
118
119 static int osc_getattr(struct lustre_handle *conn, struct obdo *oa,
120                        struct lov_stripe_md *md)
121 {
122         struct ptlrpc_request *request;
123         struct ost_body *body;
124         int rc, size = sizeof(*body);
125         ENTRY;
126
127         request = ptlrpc_prep_req(class_conn2cliimp(conn), OST_GETATTR, 1,
128                                   &size, NULL);
129         if (!request)
130                 RETURN(-ENOMEM);
131
132         body = lustre_msg_buf(request->rq_reqmsg, 0);
133 #warning FIXME: pack only valid fields instead of memcpy, endianness
134         memcpy(&body->oa, oa, sizeof(*oa));
135
136         request->rq_replen = lustre_msg_size(1, &size);
137
138         rc = ptlrpc_queue_wait(request);
139         if (rc) {
140                 CERROR("%s failed: rc = %d\n", __FUNCTION__, rc);
141                 GOTO(out, rc);
142         }
143
144         body = lustre_msg_buf(request->rq_repmsg, 0);
145         CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
146         if (oa)
147                 memcpy(oa, &body->oa, sizeof(*oa));
148
149         EXIT;
150  out:
151         ptlrpc_req_finished(request);
152         return rc;
153 }
154
155 static int osc_open(struct lustre_handle *conn, struct obdo *oa,
156                     struct lov_stripe_md *md)
157 {
158         struct ptlrpc_request *request;
159         struct ost_body *body;
160         int rc, size = sizeof(*body);
161         ENTRY;
162
163         request = ptlrpc_prep_req(class_conn2cliimp(conn), OST_OPEN, 1, &size,
164                                   NULL);
165         if (!request)
166                 RETURN(-ENOMEM);
167
168         body = lustre_msg_buf(request->rq_reqmsg, 0);
169 #warning FIXME: pack only valid fields instead of memcpy, endianness
170         memcpy(&body->oa, oa, sizeof(*oa));
171
172         request->rq_replen = lustre_msg_size(1, &size);
173
174         rc = ptlrpc_queue_wait(request);
175         if (rc)
176                 GOTO(out, rc);
177
178         body = lustre_msg_buf(request->rq_repmsg, 0);
179         CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
180         if (oa)
181                 memcpy(oa, &body->oa, sizeof(*oa));
182
183         EXIT;
184  out:
185         ptlrpc_req_finished(request);
186         return rc;
187 }
188
189 static int osc_close(struct lustre_handle *conn, struct obdo *oa,
190                      struct lov_stripe_md *md)
191 {
192         struct ptlrpc_request *request;
193         struct ost_body *body;
194         int rc, size = sizeof(*body);
195         ENTRY;
196
197         request = ptlrpc_prep_req(class_conn2cliimp(conn), OST_CLOSE, 1, &size,
198                                   NULL);
199         if (!request)
200                 RETURN(-ENOMEM);
201
202         body = lustre_msg_buf(request->rq_reqmsg, 0);
203 #warning FIXME: pack only valid fields instead of memcpy, endianness
204         memcpy(&body->oa, oa, sizeof(*oa));
205
206         request->rq_replen = lustre_msg_size(1, &size);
207
208         rc = ptlrpc_queue_wait(request);
209         if (rc)
210                 GOTO(out, rc);
211
212         body = lustre_msg_buf(request->rq_repmsg, 0);
213         CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
214         if (oa)
215                 memcpy(oa, &body->oa, sizeof(*oa));
216
217         EXIT;
218  out:
219         ptlrpc_req_finished(request);
220         return rc;
221 }
222
223 static int osc_setattr(struct lustre_handle *conn, struct obdo *oa,
224                        struct lov_stripe_md *md)
225 {
226         struct ptlrpc_request *request;
227         struct ost_body *body;
228         int rc, size = sizeof(*body);
229         ENTRY;
230
231         request = ptlrpc_prep_req(class_conn2cliimp(conn), OST_SETATTR, 1,
232                                   &size, NULL);
233         if (!request)
234                 RETURN(-ENOMEM);
235
236         body = lustre_msg_buf(request->rq_reqmsg, 0);
237         memcpy(&body->oa, oa, sizeof(*oa));
238
239         request->rq_replen = lustre_msg_size(1, &size);
240
241         rc = ptlrpc_queue_wait(request);
242
243         ptlrpc_req_finished(request);
244         return rc;
245 }
246
247 static int osc_create(struct lustre_handle *conn, struct obdo *oa,
248                       struct lov_stripe_md **ea)
249 {
250         struct ptlrpc_request *request;
251         struct ost_body *body;
252         struct lov_stripe_md *lsm;
253         int rc, size = sizeof(*body);
254         ENTRY;
255
256         LASSERT(oa);
257         LASSERT(ea);
258
259         lsm = *ea;
260         if (!lsm) {
261                 rc = obd_alloc_memmd(conn, &lsm);
262                 if (rc < 0)
263                         RETURN(rc);
264         }
265
266         request = ptlrpc_prep_req(class_conn2cliimp(conn), OST_CREATE, 1, &size,
267                                   NULL);
268         if (!request)
269                 GOTO(out, rc = -ENOMEM);
270
271         body = lustre_msg_buf(request->rq_reqmsg, 0);
272         memcpy(&body->oa, oa, sizeof(*oa));
273
274         request->rq_replen = lustre_msg_size(1, &size);
275
276         rc = ptlrpc_queue_wait(request);
277         if (rc)
278                 GOTO(out_req, rc);
279
280         body = lustre_msg_buf(request->rq_repmsg, 0);
281         memcpy(oa, &body->oa, sizeof(*oa));
282
283         lsm->lsm_object_id = oa->o_id;
284         lsm->lsm_stripe_count = 0;
285         *ea = lsm;
286         EXIT;
287 out_req:
288         ptlrpc_req_finished(request);
289 out:
290         if (rc && !*ea)
291                 obd_free_memmd(conn, &lsm);
292         return rc;
293 }
294
295 static int osc_punch(struct lustre_handle *conn, struct obdo *oa,
296                      struct lov_stripe_md *md, obd_size start,
297                      obd_size end)
298 {
299         struct ptlrpc_request *request;
300         struct ost_body *body;
301         int rc, size = sizeof(*body);
302         ENTRY;
303
304         if (!oa) {
305                 CERROR("oa NULL\n");
306                 RETURN(-EINVAL);
307         }
308
309         request = ptlrpc_prep_req(class_conn2cliimp(conn), OST_PUNCH, 1, &size,
310                                   NULL);
311         if (!request)
312                 RETURN(-ENOMEM);
313
314         body = lustre_msg_buf(request->rq_reqmsg, 0);
315 #warning FIXME: pack only valid fields instead of memcpy, endianness, valid
316         memcpy(&body->oa, oa, sizeof(*oa));
317
318         /* overload the size and blocks fields in the oa with start/end */
319         body->oa.o_size = HTON__u64(start);
320         body->oa.o_blocks = HTON__u64(end);
321         body->oa.o_valid |= HTON__u32(OBD_MD_FLSIZE | OBD_MD_FLBLOCKS);
322
323         request->rq_replen = lustre_msg_size(1, &size);
324
325         rc = ptlrpc_queue_wait(request);
326         if (rc)
327                 GOTO(out, rc);
328
329         body = lustre_msg_buf(request->rq_repmsg, 0);
330         memcpy(oa, &body->oa, sizeof(*oa));
331
332         EXIT;
333  out:
334         ptlrpc_req_finished(request);
335         return rc;
336 }
337
338 static int osc_destroy(struct lustre_handle *conn, struct obdo *oa,
339                        struct lov_stripe_md *ea)
340 {
341         struct ptlrpc_request *request;
342         struct ost_body *body;
343         int rc, size = sizeof(*body);
344         ENTRY;
345
346         if (!oa) {
347                 CERROR("oa NULL\n");
348                 RETURN(-EINVAL);
349         }
350         request = ptlrpc_prep_req(class_conn2cliimp(conn), OST_DESTROY, 1,
351                                   &size, NULL);
352         if (!request)
353                 RETURN(-ENOMEM);
354
355         body = lustre_msg_buf(request->rq_reqmsg, 0);
356 #warning FIXME: pack only valid fields instead of memcpy, endianness
357         memcpy(&body->oa, oa, sizeof(*oa));
358
359         request->rq_replen = lustre_msg_size(1, &size);
360
361         rc = ptlrpc_queue_wait(request);
362         if (rc)
363                 GOTO(out, rc);
364
365         body = lustre_msg_buf(request->rq_repmsg, 0);
366         memcpy(oa, &body->oa, sizeof(*oa));
367
368         EXIT;
369  out:
370         ptlrpc_req_finished(request);
371         return rc;
372 }
373
374 /* Our bulk-unmapping bottom half. */
375 static void unmap_and_decref_bulk_desc(void *data)
376 {
377         struct ptlrpc_bulk_desc *desc = data;
378         struct list_head *tmp;
379         ENTRY;
380
381         /* This feels wrong to me. */
382         list_for_each(tmp, &desc->bd_page_list) {
383                 struct ptlrpc_bulk_page *bulk;
384                 bulk = list_entry(tmp, struct ptlrpc_bulk_page, bp_link);
385
386                 kunmap(bulk->bp_page);
387                 obd_kmap_put(1);
388         }
389
390         ptlrpc_bulk_decref(desc);
391         EXIT;
392 }
393
394 /*  this is the callback function which is invoked by the Portals
395  *  event handler associated with the bulk_sink queue and bulk_source queue. 
396  */
397 static void osc_ptl_ev_hdlr(struct ptlrpc_bulk_desc *desc)
398 {
399         ENTRY;
400
401         LASSERT(desc->bd_brw_set != NULL);
402         LASSERT(desc->bd_brw_set->brw_callback != NULL);
403
404         desc->bd_brw_set->brw_callback(desc->bd_brw_set, CB_PHASE_FINISH);
405
406         /* We can't kunmap the desc from interrupt context, so we do it from
407          * the bottom half above. */
408         prepare_work(&desc->bd_queue, unmap_and_decref_bulk_desc, desc);
409         schedule_work(&desc->bd_queue);
410
411         EXIT;
412 }
413
414 static int osc_brw_read(struct lustre_handle *conn, struct lov_stripe_md *lsm,
415                         obd_count page_count, struct brw_page *pga,
416                         struct obd_brw_set *set)
417 {
418         struct obd_import *imp = class_conn2cliimp(conn);
419         struct ptlrpc_connection *connection = imp->imp_connection;
420         struct ptlrpc_request *request = NULL;
421         struct ptlrpc_bulk_desc *desc = NULL;
422         struct ost_body *body;
423         int rc, size[3] = {sizeof(*body)}, mapped = 0;
424         void *iooptr, *nioptr;
425         __u32 xid;
426         ENTRY;
427
428         size[1] = sizeof(struct obd_ioobj);
429         size[2] = page_count * sizeof(struct niobuf_remote);
430
431         request = ptlrpc_prep_req(imp, OST_READ, 3, size, NULL);
432         if (!request)
433                 RETURN(-ENOMEM);
434
435         body = lustre_msg_buf(request->rq_reqmsg, 0);
436
437         desc = ptlrpc_prep_bulk(connection);
438         if (!desc)
439                 GOTO(out_req, rc = -ENOMEM);
440         desc->bd_portal = OST_BULK_PORTAL;
441         desc->bd_ptl_ev_hdlr = osc_ptl_ev_hdlr;
442         CDEBUG(D_PAGE, "desc = %p\n", desc);
443
444         iooptr = lustre_msg_buf(request->rq_reqmsg, 1);
445         nioptr = lustre_msg_buf(request->rq_reqmsg, 2);
446         ost_pack_ioo(&iooptr, lsm, page_count);
447         /* end almost identical to brw_write case */
448
449         spin_lock(&imp->imp_lock);
450         xid = ++imp->imp_last_xid;       /* single xid for all pages */
451         spin_unlock(&imp->imp_lock);
452
453         obd_kmap_get(page_count, 0);
454
455         for (mapped = 0; mapped < page_count; mapped++) {
456                 struct ptlrpc_bulk_page *bulk = ptlrpc_prep_bulk_page(desc);
457                 if (bulk == NULL)
458                         GOTO(out_unmap, rc = -ENOMEM);
459
460                 bulk->bp_xid = xid;           /* single xid for all pages */
461
462                 bulk->bp_buf = kmap(pga[mapped].pg);
463                 bulk->bp_page = pga[mapped].pg;
464                 bulk->bp_buflen = PAGE_SIZE;
465                 ost_pack_niobuf(&nioptr, pga[mapped].off, pga[mapped].count,
466                                 pga[mapped].flag, bulk->bp_xid);
467         }
468
469         /*
470          * Register the bulk first, because the reply could arrive out of order,
471          * and we want to be ready for the bulk data.
472          *
473          * One reference is released when brw_finish is complete, the other when
474          * the caller removes us from the "set" list.
475          *
476          * On error, we never do the brw_finish, so we handle all decrefs.
477          */
478         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_READ_BULK)) {
479                 CERROR("obd_fail_loc=%x, skipping register_bulk\n",
480                        OBD_FAIL_OSC_BRW_READ_BULK);
481         } else {
482                 rc = ptlrpc_register_bulk(desc);
483                 if (rc)
484                         GOTO(out_unmap, rc);
485                 obd_brw_set_add(set, desc);
486         }
487
488         request->rq_replen = lustre_msg_size(1, size);
489         rc = ptlrpc_queue_wait(request);
490
491         /*
492          * XXX: If there is an error during the processing of the callback,
493          *      such as a timeout in a sleep that it performs, brw_finish
494          *      will never get called, and we'll leak the desc, fail to kunmap
495          *      things, cats will live with dogs.  One solution would be to
496          *      export brw_finish as osc_brw_finish, so that the timeout case
497          *      and its kin could call it for proper cleanup.  An alternative
498          *      would be for an error return from the callback to cause us to
499          *      clean up, but that doesn't help the truly async cases (like
500          *      LOV), which will immediately return from their PHASE_START
501          *      callback, before any such cleanup-requiring error condition can
502          *      be detected.
503          */
504  out_req:
505         ptlrpc_req_finished(request);
506         RETURN(rc);
507
508         /* Clean up on error. */
509 out_unmap:
510         while (mapped-- > 0)
511                 kunmap(pga[mapped].pg);
512         obd_kmap_put(page_count);
513         ptlrpc_bulk_decref(desc);
514         goto out_req;
515 }
516
517 static int osc_brw_write(struct lustre_handle *conn, struct lov_stripe_md *md,
518                          obd_count page_count, struct brw_page *pga,
519                          struct obd_brw_set *set)
520 {
521         struct ptlrpc_connection *connection =
522                 client_conn2cli(conn)->cl_import.imp_connection;
523         struct ptlrpc_request *request = NULL;
524         struct ptlrpc_bulk_desc *desc = NULL;
525         struct ost_body *body;
526         struct niobuf_local *local = NULL;
527         struct niobuf_remote *remote;
528         int rc, j, size[3] = {sizeof(*body)}, mapped = 0;
529         void *iooptr, *nioptr;
530         ENTRY;
531
532         size[1] = sizeof(struct obd_ioobj);
533         size[2] = page_count * sizeof(*remote);
534
535         request = ptlrpc_prep_req(class_conn2cliimp(conn), OST_WRITE, 3, size,
536                                   NULL);
537         if (!request)
538                 RETURN(-ENOMEM);
539
540         body = lustre_msg_buf(request->rq_reqmsg, 0);
541
542         desc = ptlrpc_prep_bulk(connection);
543         if (!desc)
544                GOTO(out_req, rc = -ENOMEM);
545         desc->bd_portal = OSC_BULK_PORTAL;
546         desc->bd_ptl_ev_hdlr = osc_ptl_ev_hdlr;
547         CDEBUG(D_PAGE, "desc = %p\n", desc);
548
549         iooptr = lustre_msg_buf(request->rq_reqmsg, 1);
550         nioptr = lustre_msg_buf(request->rq_reqmsg, 2);
551         ost_pack_ioo(&iooptr, md, page_count);
552         /* end almost identical to brw_read case */
553
554         OBD_ALLOC(local, page_count * sizeof(*local));
555         if (!local)
556                 GOTO(out_desc, rc = -ENOMEM);
557
558         obd_kmap_get(page_count, 0);
559
560         for (mapped = 0; mapped < page_count; mapped++) {
561                 local[mapped].addr = kmap(pga[mapped].pg);
562
563                 CDEBUG(D_INFO, "kmap(pg) = %p ; pg->flags = %lx ; pg->count = "
564                        "%d ; page %d of %d\n",
565                        local[mapped].addr, pga[mapped].pg->flags,
566                        page_count(pga[mapped].pg),
567                        mapped, page_count - 1);
568
569                 local[mapped].offset = pga[mapped].off;
570                 local[mapped].len = pga[mapped].count;
571                 ost_pack_niobuf(&nioptr, pga[mapped].off, pga[mapped].count,
572                                 pga[mapped].flag, 0);
573         }
574
575         size[1] = page_count * sizeof(*remote);
576         request->rq_replen = lustre_msg_size(2, size);
577         rc = ptlrpc_queue_wait(request);
578         if (rc)
579                 GOTO(out_unmap, rc);
580
581         nioptr = lustre_msg_buf(request->rq_repmsg, 1);
582         if (!nioptr)
583                 GOTO(out_unmap, rc = -EINVAL);
584
585         if (request->rq_repmsg->buflens[1] != size[1]) {
586                 CERROR("buffer length wrong (%d vs. %d)\n",
587                        request->rq_repmsg->buflens[1], size[1]);
588                 GOTO(out_unmap, rc = -EINVAL);
589         }
590
591         for (j = 0; j < page_count; j++) {
592                 struct ptlrpc_bulk_page *bulk;
593
594                 ost_unpack_niobuf(&nioptr, &remote);
595
596                 bulk = ptlrpc_prep_bulk_page(desc);
597                 if (!bulk)
598                         GOTO(out_unmap, rc = -ENOMEM);
599
600                 bulk->bp_buf = (void *)(unsigned long)local[j].addr;
601                 bulk->bp_buflen = local[j].len;
602                 bulk->bp_xid = remote->xid;
603                 bulk->bp_page = pga[j].pg;
604         }
605
606         if (desc->bd_page_count != page_count)
607                 LBUG();
608
609         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_WRITE_BULK))
610                 GOTO(out_unmap, rc = 0);
611
612         OBD_FREE(local, page_count * sizeof(*local));
613
614         /* One reference is released when brw_finish is complete, the other
615          * when the caller removes it from the "set" list. */
616         obd_brw_set_add(set, desc);
617         rc = ptlrpc_send_bulk(desc);
618
619         /* XXX: Mike, same question as in osc_brw_read. */
620 out_req:
621         ptlrpc_req_finished(request);
622         RETURN(rc);
623
624         /* Clean up on error. */
625 out_unmap:
626         while (mapped-- > 0)
627                 kunmap(pga[mapped].pg);
628
629         obd_kmap_put(page_count);
630
631         OBD_FREE(local, page_count * sizeof(*local));
632 out_desc:
633         ptlrpc_bulk_decref(desc);
634         goto out_req;
635 }
636
637 static int osc_brw(int cmd, struct lustre_handle *conn,
638                    struct lov_stripe_md *md, obd_count page_count,
639                    struct brw_page *pga, struct obd_brw_set *set)
640 {
641         ENTRY;
642
643         while (page_count) {
644                 obd_count pages_per_brw;
645                 int rc;
646
647                 if (page_count > PTL_MD_MAX_IOV)
648                         pages_per_brw = PTL_MD_MAX_IOV;
649                 else
650                         pages_per_brw = page_count;
651
652                 if (cmd & OBD_BRW_WRITE)
653                         rc = osc_brw_write(conn, md, pages_per_brw, pga, set);
654                 else
655                         rc = osc_brw_read(conn, md, pages_per_brw, pga, set);
656
657                 if (rc != 0)
658                         RETURN(rc);
659
660                 page_count -= pages_per_brw;
661                 pga += pages_per_brw;
662         }
663         RETURN(0);
664 }
665
666 static int osc_enqueue(struct lustre_handle *connh, struct lov_stripe_md *lsm,
667                        struct lustre_handle *parent_lock,
668                        __u32 type, void *extentp, int extent_len, __u32 mode,
669                        int *flags, void *callback, void *data, int datalen,
670                        struct lustre_handle *lockh)
671 {
672         __u64 res_id[RES_NAME_SIZE] = { lsm->lsm_object_id };
673         struct obd_device *obddev = class_conn2obd(connh);
674         struct ldlm_extent *extent = extentp;
675         int rc;
676         ENTRY;
677
678         /* Filesystem locks are given a bit of special treatment: if
679          * this is not a file size lock (which has end == -1), we
680          * fixup the lock to start and end on page boundaries. */
681         if (extent->end != OBD_OBJECT_EOF) {
682                 extent->start &= PAGE_MASK;
683                 extent->end = (extent->end & PAGE_MASK) + PAGE_SIZE - 1;
684         }
685
686         /* Next, search for already existing extent locks that will cover us */
687         rc = ldlm_lock_match(obddev->obd_namespace, res_id, type, extent,
688                              sizeof(extent), mode, lockh);
689         if (rc == 1)
690                 /* We already have a lock, and it's referenced */
691                 RETURN(ELDLM_OK);
692
693         /* If we're trying to read, we also search for an existing PW lock.  The
694          * VFS and page cache already protect us locally, so lots of readers/
695          * writers can share a single PW lock.
696          *
697          * There are problems with conversion deadlocks, so instead of
698          * converting a read lock to a write lock, we'll just enqueue a new
699          * one.
700          *
701          * At some point we should cancel the read lock instead of making them
702          * send us a blocking callback, but there are problems with canceling
703          * locks out from other users right now, too. */
704
705         if (mode == LCK_PR) {
706                 rc = ldlm_lock_match(obddev->obd_namespace, res_id, type,
707                                      extent, sizeof(extent), LCK_PW, lockh);
708                 if (rc == 1) {
709                         /* FIXME: This is not incredibly elegant, but it might
710                          * be more elegant than adding another parameter to
711                          * lock_match.  I want a second opinion. */
712                         ldlm_lock_addref(lockh, LCK_PR);
713                         ldlm_lock_decref(lockh, LCK_PW);
714
715                         RETURN(ELDLM_OK);
716                 }
717         }
718
719         rc = ldlm_cli_enqueue(connh, NULL, obddev->obd_namespace, parent_lock,
720                               res_id, type, extent, sizeof(extent), mode, flags,
721                               ldlm_completion_ast, callback, data, datalen,
722                               lockh);
723         RETURN(rc);
724 }
725
726 static int osc_cancel(struct lustre_handle *oconn, struct lov_stripe_md *md,
727                       __u32 mode, struct lustre_handle *lockh)
728 {
729         ENTRY;
730
731         ldlm_lock_decref(lockh, mode);
732
733         RETURN(0);
734 }
735
736 static int osc_cancel_unused(struct lustre_handle *connh,
737                              struct lov_stripe_md *lsm, int flags)
738 {
739         struct obd_device *obddev = class_conn2obd(connh);
740         __u64 res_id[RES_NAME_SIZE] = { lsm->lsm_object_id };
741
742         return ldlm_cli_cancel_unused(obddev->obd_namespace, res_id, flags);
743 }
744
745 static int osc_statfs(struct lustre_handle *conn, struct obd_statfs *osfs)
746 {
747         struct ptlrpc_request *request;
748         int rc, size = sizeof(*osfs);
749         ENTRY;
750
751         request = ptlrpc_prep_req(class_conn2cliimp(conn), OST_STATFS, 0, NULL,
752                                   NULL);
753         if (!request)
754                 RETURN(-ENOMEM);
755
756         request->rq_replen = lustre_msg_size(1, &size);
757
758         rc = ptlrpc_queue_wait(request);
759         if (rc) {
760                 CERROR("%s failed: rc = %d\n", __FUNCTION__, rc);
761                 GOTO(out, rc);
762         }
763
764         obd_statfs_unpack(osfs, lustre_msg_buf(request->rq_repmsg, 0));
765
766         EXIT;
767  out:
768         ptlrpc_req_finished(request);
769         return rc;
770 }
771
772 static int osc_iocontrol(long cmd, struct lustre_handle *conn, int len,
773                          void *karg, void *uarg)
774 {
775         struct obd_device *obddev = class_conn2obd(conn);
776         struct obd_ioctl_data *data = karg;
777         int err = 0;
778         ENTRY;
779
780         switch (cmd) {
781         case IOC_LDLM_TEST: {
782                 err = ldlm_test(obddev, conn);
783                 CERROR("-- done err %d\n", err);
784                 GOTO(out, err);
785         }
786         case IOC_LDLM_REGRESS_START: {
787                 unsigned int numthreads = 1;
788                 unsigned int numheld = 10;
789                 unsigned int numres = 10;
790                 unsigned int numext = 10;
791                 char *parse;
792
793                 if (data->ioc_inllen1) {
794                         parse = data->ioc_inlbuf1;
795                         if (*parse != '\0') {
796                                 while(isspace(*parse)) parse++;
797                                 numthreads = simple_strtoul(parse, &parse, 0);
798                                 while(isspace(*parse)) parse++;
799                         }
800                         if (*parse != '\0') {
801                                 while(isspace(*parse)) parse++;
802                                 numheld = simple_strtoul(parse, &parse, 0);
803                                 while(isspace(*parse)) parse++;
804                         }
805                         if (*parse != '\0') {
806                                 while(isspace(*parse)) parse++;
807                                 numres = simple_strtoul(parse, &parse, 0);
808                                 while(isspace(*parse)) parse++;
809                         }
810                         if (*parse != '\0') {
811                                 while(isspace(*parse)) parse++;
812                                 numext = simple_strtoul(parse, &parse, 0);
813                                 while(isspace(*parse)) parse++;
814                         }
815                 }
816
817                 err = ldlm_regression_start(obddev, conn, numthreads,
818                                 numheld, numres, numext);
819
820                 CERROR("-- done err %d\n", err);
821                 GOTO(out, err);
822         }
823         case IOC_LDLM_REGRESS_STOP: {
824                 err = ldlm_regression_stop();
825                 CERROR("-- done err %d\n", err);
826                 GOTO(out, err);
827         }
828         case IOC_OSC_REGISTER_LOV: {
829                 if (obddev->u.cli.cl_containing_lov)
830                         GOTO(out, err = -EALREADY);
831                 obddev->u.cli.cl_containing_lov = (struct obd_device *)karg;
832                 GOTO(out, err);
833         }
834         case OBD_IOC_LOV_GET_CONFIG: {
835                 char *buf;
836                 struct lov_desc *desc;
837                 obd_uuid_t *uuidp;
838
839                 buf = NULL;
840                 len = 0;
841                 if (obd_ioctl_getdata(&buf, &len, (void *)uarg))
842                         GOTO(out, err = -EINVAL);
843
844                 data = (struct obd_ioctl_data *)buf;
845
846                 if (sizeof(*desc) > data->ioc_inllen1) {
847                         OBD_FREE(buf, len);
848                         GOTO(out, err = -EINVAL);
849                 }
850
851                 if (data->ioc_inllen2 < sizeof(*uuidp)) {
852                         OBD_FREE(buf, len);
853                         GOTO(out, err = -EINVAL);
854                 }
855
856                 desc = (struct lov_desc *)data->ioc_inlbuf1;
857                 desc->ld_tgt_count = 1;
858                 desc->ld_active_tgt_count = 1;
859                 desc->ld_default_stripe_count = 1;
860                 desc->ld_default_stripe_size = 0;
861                 desc->ld_default_stripe_offset = 0;
862                 desc->ld_pattern = 0;
863                 memcpy(desc->ld_uuid,  obddev->obd_uuid, sizeof(*uuidp));
864
865                 uuidp = (obd_uuid_t *)data->ioc_inlbuf2;
866                 memcpy(uuidp,  obddev->obd_uuid, sizeof(*uuidp));
867
868                 err = copy_to_user((void *)uarg, buf, len);
869                 if (err)
870                         err = -EFAULT;
871                 OBD_FREE(buf, len);
872                 GOTO(out, err);
873         }
874         default:
875                 GOTO(out, err = -ENOTTY);
876         }
877 out:
878         return err;
879 }
880
881 struct obd_ops osc_obd_ops = {
882         o_attach:       osc_attach,
883         o_detach:       osc_detach,
884         o_setup:        client_obd_setup,
885         o_cleanup:      client_obd_cleanup,
886         o_connect:      client_obd_connect,
887         o_disconnect:   client_obd_disconnect,
888         o_statfs:       osc_statfs,
889         o_packmd:       osc_packmd,
890         o_unpackmd:     osc_unpackmd,
891         o_create:       osc_create,
892         o_destroy:      osc_destroy,
893         o_getattr:      osc_getattr,
894         o_setattr:      osc_setattr,
895         o_open:         osc_open,
896         o_close:        osc_close,
897         o_brw:          osc_brw,
898         o_punch:        osc_punch,
899         o_enqueue:      osc_enqueue,
900         o_cancel:       osc_cancel,
901         o_cancel_unused: osc_cancel_unused,
902         o_iocontrol:    osc_iocontrol
903 };
904
905 static int __init osc_init(void)
906 {
907         RETURN(class_register_type(&osc_obd_ops, status_class_var,
908                                    LUSTRE_OSC_NAME));
909 }
910
911 static void __exit osc_exit(void)
912 {
913         class_unregister_type(LUSTRE_OSC_NAME);
914 }
915
916 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
917 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC) v1.0");
918 MODULE_LICENSE("GPL");
919
920 module_init(osc_init);
921 module_exit(osc_exit);