Whamcloud - gitweb
merge b_md onto HEAD. as best as I can remember:
[fs/lustre-release.git] / lustre / osc / osc_request.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  Copyright (C) 2001, 2002 Cluster File Systems, Inc.
5  *   Author Peter Braam <braam@clusterfs.com>
6  *
7  *   This file is part of Lustre, http://www.lustre.org.
8  *
9  *   Lustre is free software; you can redistribute it and/or
10  *   modify it under the terms of version 2 of the GNU General Public
11  *   License as published by the Free Software Foundation.
12  *
13  *   Lustre is distributed in the hope that it will be useful,
14  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
15  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16  *   GNU General Public License for more details.
17  *
18  *   You should have received a copy of the GNU General Public License
19  *   along with Lustre; if not, write to the Free Software
20  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
21  *
22  *  For testing and management it is treated as an obd_device,
23  *  although * it does not export a full OBD method table (the
24  *  requests are coming * in over the wire, so object target modules
25  *  do not have a full * method table.)
26  *
27  */
28
29 #define EXPORT_SYMTAB
30 #define DEBUG_SUBSYSTEM S_OSC
31
32 #include <linux/version.h>
33 #include <linux/module.h>
34 #include <linux/mm.h>
35 #include <linux/highmem.h>
36 #include <linux/lustre_dlm.h>
37 #if (LINUX_VERSION_CODE >= KERNEL_VERSION(2,5,0))
38 #include <linux/workqueue.h>
39 #endif
40 #include <linux/kp30.h>
41 #include <linux/lustre_mds.h> /* for mds_objid */
42 #include <linux/obd_ost.h>
43 #include <linux/ctype.h>
44 #include <linux/init.h>
45 #include <linux/lustre_ha.h>
46 #include <linux/obd_support.h> /* for OBD_FAIL_CHECK */
47 #include <linux/lustre_lite.h> /* for ll_i2info */
48 #include <portals/lib-types.h> /* for PTL_MD_MAX_IOV */
49 #include <linux/lprocfs_status.h>
50
51 extern struct lprocfs_vars status_var_nm_1[];
52 extern struct lprocfs_vars status_class_var[];
53
54 int osc_attach(struct obd_device *dev, obd_count len, void *data)
55 {
56         return lprocfs_reg_obd(dev, status_var_nm_1, dev);
57 }
58
59 int osc_detach(struct obd_device *dev)
60 {
61         return lprocfs_dereg_obd(dev);
62 }
63
64 /* Pack OSC object metadata for shipment to the MDS. */
65 static int osc_packmd(struct lustre_handle *conn, struct lov_mds_md **lmmp,
66                       struct lov_stripe_md *lsm)
67 {
68         int lmm_size;
69
70         lmm_size = sizeof(**lmmp);
71         if (!lmmp)
72                 RETURN(lmm_size);
73
74         if (*lmmp && !lsm) {
75                 OBD_FREE(*lmmp, lmm_size);
76                 *lmmp = NULL;
77                 RETURN(0);
78         }
79
80         if (!*lmmp) {
81                 OBD_ALLOC(*lmmp, lmm_size);
82                 if (!*lmmp)
83                         RETURN(-ENOMEM);
84         }
85         if (lsm)
86                 (*lmmp)->lmm_object_id = (lsm->lsm_object_id);
87
88         return lmm_size;
89 }
90
91 static int osc_unpackmd(struct lustre_handle *conn, struct lov_stripe_md **lsmp,
92                         struct lov_mds_md *lmm)
93 {
94         int lsm_size;
95
96         lsm_size = sizeof(**lsmp);
97         if (!lsmp)
98                 RETURN(lsm_size);
99
100         if (*lsmp && !lmm) {
101                 OBD_FREE(*lsmp, lsm_size);
102                 *lsmp = NULL;
103                 RETURN(0);
104         }
105
106         if (!*lsmp) {
107                 OBD_ALLOC(*lsmp, lsm_size);
108                 if (!*lsmp)
109                         RETURN(-ENOMEM);
110         }
111
112         /* XXX endianness */
113         if (lmm)
114                 (*lsmp)->lsm_object_id = (lmm->lmm_object_id);
115
116         return lsm_size;
117 }
118
119 static int osc_getattr(struct lustre_handle *conn, struct obdo *oa,
120                        struct lov_stripe_md *md)
121 {
122         struct ptlrpc_request *request;
123         struct ost_body *body;
124         int rc, size = sizeof(*body);
125         ENTRY;
126
127         request = ptlrpc_prep_req(class_conn2cliimp(conn), OST_GETATTR, 1,
128                                   &size, NULL);
129         if (!request)
130                 RETURN(-ENOMEM);
131
132         body = lustre_msg_buf(request->rq_reqmsg, 0);
133 #warning FIXME: pack only valid fields instead of memcpy, endianness
134         memcpy(&body->oa, oa, sizeof(*oa));
135
136         request->rq_replen = lustre_msg_size(1, &size);
137
138         rc = ptlrpc_queue_wait(request);
139         rc = ptlrpc_check_status(request, rc);
140         if (rc) {
141                 CERROR("%s failed: rc = %d\n", __FUNCTION__, rc);
142                 GOTO(out, rc);
143         }
144
145         body = lustre_msg_buf(request->rq_repmsg, 0);
146         CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
147         if (oa)
148                 memcpy(oa, &body->oa, sizeof(*oa));
149
150         EXIT;
151  out:
152         ptlrpc_req_finished(request);
153         return rc;
154 }
155
156 static int osc_open(struct lustre_handle *conn, struct obdo *oa,
157                     struct lov_stripe_md *md)
158 {
159         struct ptlrpc_request *request;
160         struct ost_body *body;
161         int rc, size = sizeof(*body);
162         ENTRY;
163
164         request = ptlrpc_prep_req(class_conn2cliimp(conn), OST_OPEN, 1, &size,
165                                   NULL);
166         if (!request)
167                 RETURN(-ENOMEM);
168
169         body = lustre_msg_buf(request->rq_reqmsg, 0);
170 #warning FIXME: pack only valid fields instead of memcpy, endianness
171         memcpy(&body->oa, oa, sizeof(*oa));
172
173         request->rq_replen = lustre_msg_size(1, &size);
174
175         rc = ptlrpc_queue_wait(request);
176         rc = ptlrpc_check_status(request, rc);
177         if (rc)
178                 GOTO(out, rc);
179
180         body = lustre_msg_buf(request->rq_repmsg, 0);
181         CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
182         if (oa)
183                 memcpy(oa, &body->oa, sizeof(*oa));
184
185         EXIT;
186  out:
187         ptlrpc_req_finished(request);
188         return rc;
189 }
190
191 static int osc_close(struct lustre_handle *conn, struct obdo *oa,
192                      struct lov_stripe_md *md)
193 {
194         struct ptlrpc_request *request;
195         struct ost_body *body;
196         int rc, size = sizeof(*body);
197         ENTRY;
198
199         request = ptlrpc_prep_req(class_conn2cliimp(conn), OST_CLOSE, 1, &size,
200                                   NULL);
201         if (!request)
202                 RETURN(-ENOMEM);
203
204         body = lustre_msg_buf(request->rq_reqmsg, 0);
205 #warning FIXME: pack only valid fields instead of memcpy, endianness
206         memcpy(&body->oa, oa, sizeof(*oa));
207
208         request->rq_replen = lustre_msg_size(1, &size);
209
210         rc = ptlrpc_queue_wait(request);
211         rc = ptlrpc_check_status(request, rc);
212         if (rc)
213                 GOTO(out, rc);
214
215         body = lustre_msg_buf(request->rq_repmsg, 0);
216         CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
217         if (oa)
218                 memcpy(oa, &body->oa, sizeof(*oa));
219
220         EXIT;
221  out:
222         ptlrpc_req_finished(request);
223         return rc;
224 }
225
226 static int osc_setattr(struct lustre_handle *conn, struct obdo *oa,
227                        struct lov_stripe_md *md)
228 {
229         struct ptlrpc_request *request;
230         struct ost_body *body;
231         int rc, size = sizeof(*body);
232         ENTRY;
233
234         request = ptlrpc_prep_req(class_conn2cliimp(conn), OST_SETATTR, 1,
235                                   &size, NULL);
236         if (!request)
237                 RETURN(-ENOMEM);
238
239         body = lustre_msg_buf(request->rq_reqmsg, 0);
240         memcpy(&body->oa, oa, sizeof(*oa));
241
242         request->rq_replen = lustre_msg_size(1, &size);
243
244         rc = ptlrpc_queue_wait(request);
245         rc = ptlrpc_check_status(request, rc);
246
247         ptlrpc_req_finished(request);
248         return rc;
249 }
250
251 static int osc_create(struct lustre_handle *conn, struct obdo *oa,
252                       struct lov_stripe_md **ea)
253 {
254         struct ptlrpc_request *request;
255         struct ost_body *body;
256         struct lov_stripe_md *lsm;
257         int rc, size = sizeof(*body);
258         ENTRY;
259
260         LASSERT(oa);
261         LASSERT(ea);
262
263         lsm = *ea;
264         if (!lsm) {
265                 rc = obd_alloc_memmd(conn, &lsm);
266                 if (rc < 0)
267                         RETURN(rc);
268         }
269
270         request = ptlrpc_prep_req(class_conn2cliimp(conn), OST_CREATE, 1, &size,
271                                   NULL);
272         if (!request)
273                 GOTO(out, rc = -ENOMEM);
274
275         body = lustre_msg_buf(request->rq_reqmsg, 0);
276         memcpy(&body->oa, oa, sizeof(*oa));
277
278         request->rq_replen = lustre_msg_size(1, &size);
279
280         rc = ptlrpc_queue_wait(request);
281         rc = ptlrpc_check_status(request, rc);
282         if (rc)
283                 GOTO(out_req, rc);
284
285         body = lustre_msg_buf(request->rq_repmsg, 0);
286         memcpy(oa, &body->oa, sizeof(*oa));
287
288         lsm->lsm_object_id = oa->o_id;
289         lsm->lsm_stripe_count = 0;
290         *ea = lsm;
291         EXIT;
292 out_req:
293         ptlrpc_req_finished(request);
294 out:
295         if (rc && !*ea)
296                 obd_free_memmd(conn, &lsm);
297         return rc;
298 }
299
300 static int osc_punch(struct lustre_handle *conn, struct obdo *oa,
301                      struct lov_stripe_md *md, obd_size start,
302                      obd_size end)
303 {
304         struct ptlrpc_request *request;
305         struct ost_body *body;
306         int rc, size = sizeof(*body);
307         ENTRY;
308
309         if (!oa) {
310                 CERROR("oa NULL\n");
311                 RETURN(-EINVAL);
312         }
313
314         request = ptlrpc_prep_req(class_conn2cliimp(conn), OST_PUNCH, 1, &size,
315                                   NULL);
316         if (!request)
317                 RETURN(-ENOMEM);
318
319         body = lustre_msg_buf(request->rq_reqmsg, 0);
320 #warning FIXME: pack only valid fields instead of memcpy, endianness, valid
321         memcpy(&body->oa, oa, sizeof(*oa));
322
323         /* overload the size and blocks fields in the oa with start/end */
324         body->oa.o_size = HTON__u64(start);
325         body->oa.o_blocks = HTON__u64(end);
326         body->oa.o_valid |= HTON__u32(OBD_MD_FLSIZE | OBD_MD_FLBLOCKS);
327
328         request->rq_replen = lustre_msg_size(1, &size);
329
330         rc = ptlrpc_queue_wait(request);
331         rc = ptlrpc_check_status(request, rc);
332         if (rc)
333                 GOTO(out, rc);
334
335         body = lustre_msg_buf(request->rq_repmsg, 0);
336         memcpy(oa, &body->oa, sizeof(*oa));
337
338         EXIT;
339  out:
340         ptlrpc_req_finished(request);
341         return rc;
342 }
343
344 static int osc_destroy(struct lustre_handle *conn, struct obdo *oa,
345                        struct lov_stripe_md *ea)
346 {
347         struct ptlrpc_request *request;
348         struct ost_body *body;
349         int rc, size = sizeof(*body);
350         ENTRY;
351
352         if (!oa) {
353                 CERROR("oa NULL\n");
354                 RETURN(-EINVAL);
355         }
356         request = ptlrpc_prep_req(class_conn2cliimp(conn), OST_DESTROY, 1,
357                                   &size, NULL);
358         if (!request)
359                 RETURN(-ENOMEM);
360
361         body = lustre_msg_buf(request->rq_reqmsg, 0);
362 #warning FIXME: pack only valid fields instead of memcpy, endianness
363         memcpy(&body->oa, oa, sizeof(*oa));
364
365         request->rq_replen = lustre_msg_size(1, &size);
366
367         rc = ptlrpc_queue_wait(request);
368         rc = ptlrpc_check_status(request, rc);
369         if (rc)
370                 GOTO(out, rc);
371
372         body = lustre_msg_buf(request->rq_repmsg, 0);
373         memcpy(oa, &body->oa, sizeof(*oa));
374
375         EXIT;
376  out:
377         ptlrpc_req_finished(request);
378         return rc;
379 }
380
381 /* Our bulk-unmapping bottom half. */
382 static void unmap_and_decref_bulk_desc(void *data)
383 {
384         struct ptlrpc_bulk_desc *desc = data;
385         struct list_head *tmp;
386         ENTRY;
387
388         /* This feels wrong to me. */
389         list_for_each(tmp, &desc->bd_page_list) {
390                 struct ptlrpc_bulk_page *bulk;
391                 bulk = list_entry(tmp, struct ptlrpc_bulk_page, bp_link);
392
393                 kunmap(bulk->bp_page);
394                 obd_kmap_put(1);
395         }
396
397         ptlrpc_bulk_decref(desc);
398         EXIT;
399 }
400
401 /*  this is the callback function which is invoked by the Portals
402  *  event handler associated with the bulk_sink queue and bulk_source queue. 
403  */
404 static void osc_ptl_ev_hdlr(struct ptlrpc_bulk_desc *desc)
405 {
406         ENTRY;
407
408         LASSERT(desc->bd_brw_set != NULL);
409         LASSERT(desc->bd_brw_set->brw_callback != NULL);
410
411         desc->bd_brw_set->brw_callback(desc->bd_brw_set, CB_PHASE_FINISH);
412
413         /* We can't kunmap the desc from interrupt context, so we do it from
414          * the bottom half above. */
415         prepare_work(&desc->bd_queue, unmap_and_decref_bulk_desc, desc);
416         schedule_work(&desc->bd_queue);
417
418         EXIT;
419 }
420
421 static int osc_brw_read(struct lustre_handle *conn, struct lov_stripe_md *lsm,
422                         obd_count page_count, struct brw_page *pga,
423                         struct obd_brw_set *set)
424 {
425         struct ptlrpc_connection *connection =
426                 client_conn2cli(conn)->cl_import.imp_connection;
427         struct ptlrpc_request *request = NULL;
428         struct ptlrpc_bulk_desc *desc = NULL;
429         struct ost_body *body;
430         int rc, size[3] = {sizeof(*body)}, mapped = 0;
431         void *iooptr, *nioptr;
432         __u32 xid;
433         ENTRY;
434
435         size[1] = sizeof(struct obd_ioobj);
436         size[2] = page_count * sizeof(struct niobuf_remote);
437
438         request = ptlrpc_prep_req(class_conn2cliimp(conn), OST_READ, 3, size,
439                                   NULL);
440         if (!request)
441                 RETURN(-ENOMEM);
442
443         body = lustre_msg_buf(request->rq_reqmsg, 0);
444
445         desc = ptlrpc_prep_bulk(connection);
446         if (!desc)
447                 GOTO(out_req, rc = -ENOMEM);
448         desc->bd_portal = OST_BULK_PORTAL;
449         desc->bd_ptl_ev_hdlr = osc_ptl_ev_hdlr;
450         CDEBUG(D_PAGE, "desc = %p\n", desc);
451
452         iooptr = lustre_msg_buf(request->rq_reqmsg, 1);
453         nioptr = lustre_msg_buf(request->rq_reqmsg, 2);
454         ost_pack_ioo(&iooptr, lsm, page_count);
455         /* end almost identical to brw_write case */
456
457         spin_lock(&connection->c_lock);
458         xid = ++connection->c_xid_out;       /* single xid for all pages */
459         spin_unlock(&connection->c_lock);
460
461         obd_kmap_get(page_count, 0);
462
463         for (mapped = 0; mapped < page_count; mapped++) {
464                 struct ptlrpc_bulk_page *bulk = ptlrpc_prep_bulk_page(desc);
465                 if (bulk == NULL)
466                         GOTO(out_unmap, rc = -ENOMEM);
467
468                 bulk->bp_xid = xid;           /* single xid for all pages */
469
470                 bulk->bp_buf = kmap(pga[mapped].pg);
471                 bulk->bp_page = pga[mapped].pg;
472                 bulk->bp_buflen = PAGE_SIZE;
473                 ost_pack_niobuf(&nioptr, pga[mapped].off, pga[mapped].count,
474                                 pga[mapped].flag, bulk->bp_xid);
475         }
476
477         /*
478          * Register the bulk first, because the reply could arrive out of order,
479          * and we want to be ready for the bulk data.
480          *
481          * One reference is released when brw_finish is complete, the other when
482          * the caller removes us from the "set" list.
483          *
484          * On error, we never do the brw_finish, so we handle all decrefs.
485          */
486         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_READ_BULK)) {
487                 CERROR("obd_fail_loc=%x, skipping register_bulk\n",
488                        OBD_FAIL_OSC_BRW_READ_BULK);
489         } else {
490                 rc = ptlrpc_register_bulk(desc);
491                 if (rc)
492                         GOTO(out_unmap, rc);
493                 obd_brw_set_add(set, desc);
494         }
495
496         request->rq_replen = lustre_msg_size(1, size);
497         rc = ptlrpc_queue_wait(request);
498         rc = ptlrpc_check_status(request, rc);
499
500         /*
501          * XXX: If there is an error during the processing of the callback,
502          *      such as a timeout in a sleep that it performs, brw_finish
503          *      will never get called, and we'll leak the desc, fail to kunmap
504          *      things, cats will live with dogs.  One solution would be to
505          *      export brw_finish as osc_brw_finish, so that the timeout case
506          *      and its kin could call it for proper cleanup.  An alternative
507          *      would be for an error return from the callback to cause us to
508          *      clean up, but that doesn't help the truly async cases (like
509          *      LOV), which will immediately return from their PHASE_START
510          *      callback, before any such cleanup-requiring error condition can
511          *      be detected.
512          */
513  out_req:
514         ptlrpc_req_finished(request);
515         RETURN(rc);
516
517         /* Clean up on error. */
518 out_unmap:
519         while (mapped-- > 0)
520                 kunmap(pga[mapped].pg);
521         obd_kmap_put(page_count);
522         ptlrpc_bulk_decref(desc);
523         goto out_req;
524 }
525
526 static int osc_brw_write(struct lustre_handle *conn, struct lov_stripe_md *md,
527                          obd_count page_count, struct brw_page *pga,
528                          struct obd_brw_set *set)
529 {
530         struct ptlrpc_connection *connection =
531                 client_conn2cli(conn)->cl_import.imp_connection;
532         struct ptlrpc_request *request = NULL;
533         struct ptlrpc_bulk_desc *desc = NULL;
534         struct ost_body *body;
535         struct niobuf_local *local = NULL;
536         struct niobuf_remote *remote;
537         int rc, j, size[3] = {sizeof(*body)}, mapped = 0;
538         void *iooptr, *nioptr;
539         ENTRY;
540
541         size[1] = sizeof(struct obd_ioobj);
542         size[2] = page_count * sizeof(*remote);
543
544         request = ptlrpc_prep_req(class_conn2cliimp(conn), OST_WRITE, 3, size,
545                                   NULL);
546         if (!request)
547                 RETURN(-ENOMEM);
548
549         body = lustre_msg_buf(request->rq_reqmsg, 0);
550
551         desc = ptlrpc_prep_bulk(connection);
552         if (!desc)
553                GOTO(out_req, rc = -ENOMEM);
554         desc->bd_portal = OSC_BULK_PORTAL;
555         desc->bd_ptl_ev_hdlr = osc_ptl_ev_hdlr;
556         CDEBUG(D_PAGE, "desc = %p\n", desc);
557
558         iooptr = lustre_msg_buf(request->rq_reqmsg, 1);
559         nioptr = lustre_msg_buf(request->rq_reqmsg, 2);
560         ost_pack_ioo(&iooptr, md, page_count);
561         /* end almost identical to brw_read case */
562
563         OBD_ALLOC(local, page_count * sizeof(*local));
564         if (!local)
565                 GOTO(out_desc, rc = -ENOMEM);
566
567         obd_kmap_get(page_count, 0);
568
569         for (mapped = 0; mapped < page_count; mapped++) {
570                 local[mapped].addr = kmap(pga[mapped].pg);
571
572                 CDEBUG(D_INFO, "kmap(pg) = %p ; pg->flags = %lx ; pg->count = "
573                        "%d ; page %d of %d\n",
574                        local[mapped].addr, pga[mapped].pg->flags,
575                        page_count(pga[mapped].pg),
576                        mapped, page_count - 1);
577
578                 local[mapped].offset = pga[mapped].off;
579                 local[mapped].len = pga[mapped].count;
580                 ost_pack_niobuf(&nioptr, pga[mapped].off, pga[mapped].count,
581                                 pga[mapped].flag, 0);
582         }
583
584         size[1] = page_count * sizeof(*remote);
585         request->rq_replen = lustre_msg_size(2, size);
586         rc = ptlrpc_queue_wait(request);
587         rc = ptlrpc_check_status(request, rc);
588         if (rc)
589                 GOTO(out_unmap, rc);
590
591         nioptr = lustre_msg_buf(request->rq_repmsg, 1);
592         if (!nioptr)
593                 GOTO(out_unmap, rc = -EINVAL);
594
595         if (request->rq_repmsg->buflens[1] != size[1]) {
596                 CERROR("buffer length wrong (%d vs. %d)\n",
597                        request->rq_repmsg->buflens[1], size[1]);
598                 GOTO(out_unmap, rc = -EINVAL);
599         }
600
601         for (j = 0; j < page_count; j++) {
602                 struct ptlrpc_bulk_page *bulk;
603
604                 ost_unpack_niobuf(&nioptr, &remote);
605
606                 bulk = ptlrpc_prep_bulk_page(desc);
607                 if (!bulk)
608                         GOTO(out_unmap, rc = -ENOMEM);
609
610                 bulk->bp_buf = (void *)(unsigned long)local[j].addr;
611                 bulk->bp_buflen = local[j].len;
612                 bulk->bp_xid = remote->xid;
613                 bulk->bp_page = pga[j].pg;
614         }
615
616         if (desc->bd_page_count != page_count)
617                 LBUG();
618
619         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_WRITE_BULK))
620                 GOTO(out_unmap, rc = 0);
621
622         OBD_FREE(local, page_count * sizeof(*local));
623
624         /* One reference is released when brw_finish is complete, the other
625          * when the caller removes it from the "set" list. */
626         obd_brw_set_add(set, desc);
627         rc = ptlrpc_send_bulk(desc);
628
629         /* XXX: Mike, same question as in osc_brw_read. */
630 out_req:
631         ptlrpc_req_finished(request);
632         RETURN(rc);
633
634         /* Clean up on error. */
635 out_unmap:
636         while (mapped-- > 0)
637                 kunmap(pga[mapped].pg);
638
639         obd_kmap_put(page_count);
640
641         OBD_FREE(local, page_count * sizeof(*local));
642 out_desc:
643         ptlrpc_bulk_decref(desc);
644         goto out_req;
645 }
646
647 static int osc_brw(int cmd, struct lustre_handle *conn,
648                    struct lov_stripe_md *md, obd_count page_count,
649                    struct brw_page *pga, struct obd_brw_set *set)
650 {
651         ENTRY;
652
653         while (page_count) {
654                 obd_count pages_per_brw;
655                 int rc;
656
657                 if (page_count > PTL_MD_MAX_IOV)
658                         pages_per_brw = PTL_MD_MAX_IOV;
659                 else
660                         pages_per_brw = page_count;
661
662                 if (cmd & OBD_BRW_WRITE)
663                         rc = osc_brw_write(conn, md, pages_per_brw, pga, set);
664                 else
665                         rc = osc_brw_read(conn, md, pages_per_brw, pga, set);
666
667                 if (rc != 0)
668                         RETURN(rc);
669
670                 page_count -= pages_per_brw;
671                 pga += pages_per_brw;
672         }
673         RETURN(0);
674 }
675
676 static int osc_enqueue(struct lustre_handle *connh, struct lov_stripe_md *lsm,
677                        struct lustre_handle *parent_lock,
678                        __u32 type, void *extentp, int extent_len, __u32 mode,
679                        int *flags, void *callback, void *data, int datalen,
680                        struct lustre_handle *lockh)
681 {
682         __u64 res_id[RES_NAME_SIZE] = { lsm->lsm_object_id };
683         struct obd_device *obddev = class_conn2obd(connh);
684         struct ldlm_extent *extent = extentp;
685         int rc;
686         ENTRY;
687
688         /* Filesystem locks are given a bit of special treatment: if
689          * this is not a file size lock (which has end == -1), we
690          * fixup the lock to start and end on page boundaries. */
691         if (extent->end != OBD_OBJECT_EOF) {
692                 extent->start &= PAGE_MASK;
693                 extent->end = (extent->end & PAGE_MASK) + PAGE_SIZE - 1;
694         }
695
696         /* Next, search for already existing extent locks that will cover us */
697         rc = ldlm_lock_match(obddev->obd_namespace, res_id, type, extent,
698                              sizeof(extent), mode, lockh);
699         if (rc == 1)
700                 /* We already have a lock, and it's referenced */
701                 RETURN(ELDLM_OK);
702
703         /* If we're trying to read, we also search for an existing PW lock.  The
704          * VFS and page cache already protect us locally, so lots of readers/
705          * writers can share a single PW lock.
706          *
707          * There are problems with conversion deadlocks, so instead of
708          * converting a read lock to a write lock, we'll just enqueue a new
709          * one.
710          *
711          * At some point we should cancel the read lock instead of making them
712          * send us a blocking callback, but there are problems with canceling
713          * locks out from other users right now, too. */
714
715         if (mode == LCK_PR) {
716                 rc = ldlm_lock_match(obddev->obd_namespace, res_id, type,
717                                      extent, sizeof(extent), LCK_PW, lockh);
718                 if (rc == 1) {
719                         /* FIXME: This is not incredibly elegant, but it might
720                          * be more elegant than adding another parameter to
721                          * lock_match.  I want a second opinion. */
722                         ldlm_lock_addref(lockh, LCK_PR);
723                         ldlm_lock_decref(lockh, LCK_PW);
724
725                         RETURN(ELDLM_OK);
726                 }
727         }
728
729         rc = ldlm_cli_enqueue(connh, NULL, obddev->obd_namespace, parent_lock,
730                               res_id, type, extent, sizeof(extent), mode, flags,
731                               ldlm_completion_ast, callback, data, datalen,
732                               lockh);
733         RETURN(rc);
734 }
735
736 static int osc_cancel(struct lustre_handle *oconn, struct lov_stripe_md *md,
737                       __u32 mode, struct lustre_handle *lockh)
738 {
739         ENTRY;
740
741         ldlm_lock_decref(lockh, mode);
742
743         RETURN(0);
744 }
745
746 static int osc_cancel_unused(struct lustre_handle *connh,
747                              struct lov_stripe_md *lsm, int flags)
748 {
749         struct obd_device *obddev = class_conn2obd(connh);
750         __u64 res_id[RES_NAME_SIZE] = { lsm->lsm_object_id };
751
752         return ldlm_cli_cancel_unused(obddev->obd_namespace, res_id, flags);
753 }
754
755 static int osc_statfs(struct lustre_handle *conn, struct obd_statfs *osfs)
756 {
757         struct ptlrpc_request *request;
758         int rc, size = sizeof(*osfs);
759         ENTRY;
760
761         request = ptlrpc_prep_req(class_conn2cliimp(conn), OST_STATFS, 0, NULL,
762                                   NULL);
763         if (!request)
764                 RETURN(-ENOMEM);
765
766         request->rq_replen = lustre_msg_size(1, &size);
767
768         rc = ptlrpc_queue_wait(request);
769         rc = ptlrpc_check_status(request, rc);
770         if (rc) {
771                 CERROR("%s failed: rc = %d\n", __FUNCTION__, rc);
772                 GOTO(out, rc);
773         }
774
775         obd_statfs_unpack(osfs, lustre_msg_buf(request->rq_repmsg, 0));
776
777         EXIT;
778  out:
779         ptlrpc_req_finished(request);
780         return rc;
781 }
782
783 static int osc_iocontrol(long cmd, struct lustre_handle *conn, int len,
784                          void *karg, void *uarg)
785 {
786         struct obd_device *obddev = class_conn2obd(conn);
787         struct obd_ioctl_data *data = karg;
788         int err = 0;
789         ENTRY;
790
791         switch (cmd) {
792         case IOC_LDLM_TEST: {
793                 err = ldlm_test(obddev, conn);
794                 CERROR("-- done err %d\n", err);
795                 GOTO(out, err);
796         }
797         case IOC_LDLM_REGRESS_START: {
798                 unsigned int numthreads = 1;
799                 unsigned int numheld = 10;
800                 unsigned int numres = 10;
801                 unsigned int numext = 10;
802                 char *parse;
803
804                 if (data->ioc_inllen1) {
805                         parse = data->ioc_inlbuf1;
806                         if (*parse != '\0') {
807                                 while(isspace(*parse)) parse++;
808                                 numthreads = simple_strtoul(parse, &parse, 0);
809                                 while(isspace(*parse)) parse++;
810                         }
811                         if (*parse != '\0') {
812                                 while(isspace(*parse)) parse++;
813                                 numheld = simple_strtoul(parse, &parse, 0);
814                                 while(isspace(*parse)) parse++;
815                         }
816                         if (*parse != '\0') {
817                                 while(isspace(*parse)) parse++;
818                                 numres = simple_strtoul(parse, &parse, 0);
819                                 while(isspace(*parse)) parse++;
820                         }
821                         if (*parse != '\0') {
822                                 while(isspace(*parse)) parse++;
823                                 numext = simple_strtoul(parse, &parse, 0);
824                                 while(isspace(*parse)) parse++;
825                         }
826                 }
827
828                 err = ldlm_regression_start(obddev, conn, numthreads,
829                                 numheld, numres, numext);
830
831                 CERROR("-- done err %d\n", err);
832                 GOTO(out, err);
833         }
834         case IOC_LDLM_REGRESS_STOP: {
835                 err = ldlm_regression_stop();
836                 CERROR("-- done err %d\n", err);
837                 GOTO(out, err);
838         }
839         case IOC_OSC_REGISTER_LOV: {
840                 if (obddev->u.cli.cl_containing_lov)
841                         GOTO(out, err = -EALREADY);
842                 obddev->u.cli.cl_containing_lov = (struct obd_device *)karg;
843                 GOTO(out, err);
844         }
845         case OBD_IOC_LOV_GET_CONFIG: {
846                 char *buf;
847                 struct lov_desc *desc;
848                 obd_uuid_t *uuidp;
849
850                 buf = NULL;
851                 len = 0;
852                 if (obd_ioctl_getdata(&buf, &len, (void *)uarg))
853                         GOTO(out, err = -EINVAL);
854
855                 data = (struct obd_ioctl_data *)buf;
856
857                 if (sizeof(*desc) > data->ioc_inllen1) {
858                         OBD_FREE(buf, len);
859                         GOTO(out, err = -EINVAL);
860                 }
861
862                 if (data->ioc_inllen2 < sizeof(*uuidp)) {
863                         OBD_FREE(buf, len);
864                         GOTO(out, err = -EINVAL);
865                 }
866
867                 desc = (struct lov_desc *)data->ioc_inlbuf1;
868                 desc->ld_tgt_count = 1;
869                 desc->ld_active_tgt_count = 1;
870                 desc->ld_default_stripe_count = 1;
871                 desc->ld_default_stripe_size = 0;
872                 desc->ld_default_stripe_offset = 0;
873                 desc->ld_pattern = 0;
874                 memcpy(desc->ld_uuid,  obddev->obd_uuid, sizeof(*uuidp));
875
876                 uuidp = (obd_uuid_t *)data->ioc_inlbuf2;
877                 memcpy(uuidp,  obddev->obd_uuid, sizeof(*uuidp));
878
879                 err = copy_to_user((void *)uarg, buf, len);
880                 if (err)
881                         err = -EFAULT;
882                 OBD_FREE(buf, len);
883                 GOTO(out, err);
884         }
885         default:
886                 GOTO(out, err = -ENOTTY);
887         }
888 out:
889         return err;
890 }
891
892 struct obd_ops osc_obd_ops = {
893         o_attach:       osc_attach,
894         o_detach:       osc_detach,
895         o_setup:        client_obd_setup,
896         o_cleanup:      client_obd_cleanup,
897         o_connect:      client_obd_connect,
898         o_disconnect:   client_obd_disconnect,
899         o_statfs:       osc_statfs,
900         o_packmd:       osc_packmd,
901         o_unpackmd:     osc_unpackmd,
902         o_create:       osc_create,
903         o_destroy:      osc_destroy,
904         o_getattr:      osc_getattr,
905         o_setattr:      osc_setattr,
906         o_open:         osc_open,
907         o_close:        osc_close,
908         o_brw:          osc_brw,
909         o_punch:        osc_punch,
910         o_enqueue:      osc_enqueue,
911         o_cancel:       osc_cancel,
912         o_cancel_unused: osc_cancel_unused,
913         o_iocontrol:    osc_iocontrol
914 };
915
916 static int __init osc_init(void)
917 {
918         RETURN(class_register_type(&osc_obd_ops, status_class_var,
919                                    LUSTRE_OSC_NAME));
920 }
921
922 static void __exit osc_exit(void)
923 {
924         class_unregister_type(LUSTRE_OSC_NAME);
925 }
926
927 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
928 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC) v1.0");
929 MODULE_LICENSE("GPL");
930
931 module_init(osc_init);
932 module_exit(osc_exit);