Whamcloud - gitweb
Merge of b_md to HEAD:
[fs/lustre-release.git] / lustre / osc / osc_request.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  Copyright (C) 2001, 2002 Cluster File Systems, Inc.
5  *   Author Peter Braam <braam@clusterfs.com>
6  *
7  *   This file is part of Lustre, http://www.lustre.org.
8  *
9  *   Lustre is free software; you can redistribute it and/or
10  *   modify it under the terms of version 2 of the GNU General Public
11  *   License as published by the Free Software Foundation.
12  *
13  *   Lustre is distributed in the hope that it will be useful,
14  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
15  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16  *   GNU General Public License for more details.
17  *
18  *   You should have received a copy of the GNU General Public License
19  *   along with Lustre; if not, write to the Free Software
20  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
21  *
22  *  For testing and management it is treated as an obd_device,
23  *  although * it does not export a full OBD method table (the
24  *  requests are coming * in over the wire, so object target modules
25  *  do not have a full * method table.)
26  *
27  */
28
29 #define EXPORT_SYMTAB
30 #define DEBUG_SUBSYSTEM S_OSC
31
32 #include <linux/version.h>
33 #include <linux/module.h>
34 #include <linux/mm.h>
35 #include <linux/highmem.h>
36 #include <linux/lustre_dlm.h>
37 #if (LINUX_VERSION_CODE >= KERNEL_VERSION(2,5,0))
38 #include <linux/workqueue.h>
39 #endif
40 #include <linux/kp30.h>
41 #include <linux/lustre_mds.h> /* for mds_objid */
42 #include <linux/obd_ost.h>
43 #include <linux/obd_lov.h> /* for IOC_LOV_SET_OSC_ACTIVE */
44 #include <linux/ctype.h>
45 #include <linux/init.h>
46 #include <linux/lustre_ha.h>
47 #include <linux/obd_support.h> /* for OBD_FAIL_CHECK */
48 #include <linux/lustre_lite.h> /* for ll_i2info */
49 #include <portals/lib-types.h> /* for PTL_MD_MAX_IOV */
50 #include <linux/lprocfs_status.h>
51
52 extern struct lprocfs_vars status_var_nm_1[];
53 extern struct lprocfs_vars status_class_var[];
54
55 static int osc_attach(struct obd_device *dev, obd_count len, void *data)
56 {
57         return lprocfs_reg_obd(dev, status_var_nm_1, dev);
58 }
59
60 static int osc_detach(struct obd_device *dev)
61 {
62         return lprocfs_dereg_obd(dev);
63 }
64
65 /* Pack OSC object metadata for shipment to the MDS. */
66 static int osc_packmd(struct lustre_handle *conn, struct lov_mds_md **lmmp,
67                       struct lov_stripe_md *lsm)
68 {
69         int lmm_size;
70
71         lmm_size = sizeof(**lmmp);
72         if (!lmmp)
73                 RETURN(lmm_size);
74
75         if (*lmmp && !lsm) {
76                 OBD_FREE(*lmmp, lmm_size);
77                 *lmmp = NULL;
78                 RETURN(0);
79         }
80
81         if (!*lmmp) {
82                 OBD_ALLOC(*lmmp, lmm_size);
83                 if (!*lmmp)
84                         RETURN(-ENOMEM);
85         }
86         if (lsm) {
87                 LASSERT(lsm->lsm_object_id);
88                 (*lmmp)->lmm_object_id = (lsm->lsm_object_id);
89         }
90
91         return lmm_size;
92 }
93
94 static int osc_unpackmd(struct lustre_handle *conn, struct lov_stripe_md **lsmp,
95                         struct lov_mds_md *lmm)
96 {
97         int lsm_size;
98
99         lsm_size = sizeof(**lsmp);
100         if (!lsmp)
101                 RETURN(lsm_size);
102
103         if (*lsmp && !lmm) {
104                 OBD_FREE(*lsmp, lsm_size);
105                 *lsmp = NULL;
106                 RETURN(0);
107         }
108
109         if (!*lsmp) {
110                 OBD_ALLOC(*lsmp, lsm_size);
111                 if (!*lsmp)
112                         RETURN(-ENOMEM);
113         }
114
115         /* XXX endianness */
116         if (lmm) {
117                 (*lsmp)->lsm_object_id = (lmm->lmm_object_id);
118                 LASSERT((*lsmp)->lsm_object_id);
119         }
120
121         return lsm_size;
122 }
123
124 static int osc_getattr(struct lustre_handle *conn, struct obdo *oa,
125                        struct lov_stripe_md *md)
126 {
127         struct ptlrpc_request *request;
128         struct ost_body *body;
129         int rc, size = sizeof(*body);
130         ENTRY;
131
132         request = ptlrpc_prep_req(class_conn2cliimp(conn), OST_GETATTR, 1,
133                                   &size, NULL);
134         if (!request)
135                 RETURN(-ENOMEM);
136
137         body = lustre_msg_buf(request->rq_reqmsg, 0);
138 #warning FIXME: pack only valid fields instead of memcpy, endianness
139         memcpy(&body->oa, oa, sizeof(*oa));
140
141         request->rq_replen = lustre_msg_size(1, &size);
142
143         rc = ptlrpc_queue_wait(request);
144         if (rc) {
145                 CERROR("%s failed: rc = %d\n", __FUNCTION__, rc);
146                 GOTO(out, rc);
147         }
148
149         body = lustre_msg_buf(request->rq_repmsg, 0);
150         CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
151         if (oa)
152                 memcpy(oa, &body->oa, sizeof(*oa));
153
154         EXIT;
155  out:
156         ptlrpc_req_finished(request);
157         return rc;
158 }
159
160 static int osc_open(struct lustre_handle *conn, struct obdo *oa,
161                     struct lov_stripe_md *md)
162 {
163         struct ptlrpc_request *request;
164         struct ost_body *body;
165         int rc, size = sizeof(*body);
166         ENTRY;
167
168         request = ptlrpc_prep_req(class_conn2cliimp(conn), OST_OPEN, 1, &size,
169                                   NULL);
170         if (!request)
171                 RETURN(-ENOMEM);
172
173         body = lustre_msg_buf(request->rq_reqmsg, 0);
174 #warning FIXME: pack only valid fields instead of memcpy, endianness
175         memcpy(&body->oa, oa, sizeof(*oa));
176
177         request->rq_replen = lustre_msg_size(1, &size);
178
179         rc = ptlrpc_queue_wait(request);
180         if (rc)
181                 GOTO(out, rc);
182
183         body = lustre_msg_buf(request->rq_repmsg, 0);
184         CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
185         if (oa)
186                 memcpy(oa, &body->oa, sizeof(*oa));
187
188         EXIT;
189  out:
190         ptlrpc_req_finished(request);
191         return rc;
192 }
193
194 static int osc_close(struct lustre_handle *conn, struct obdo *oa,
195                      struct lov_stripe_md *md)
196 {
197         struct ptlrpc_request *request;
198         struct ost_body *body;
199         int rc, size = sizeof(*body);
200         ENTRY;
201
202         request = ptlrpc_prep_req(class_conn2cliimp(conn), OST_CLOSE, 1, &size,
203                                   NULL);
204         if (!request)
205                 RETURN(-ENOMEM);
206
207         body = lustre_msg_buf(request->rq_reqmsg, 0);
208 #warning FIXME: pack only valid fields instead of memcpy, endianness
209         memcpy(&body->oa, oa, sizeof(*oa));
210
211         request->rq_replen = lustre_msg_size(1, &size);
212
213         rc = ptlrpc_queue_wait(request);
214         if (rc)
215                 GOTO(out, rc);
216
217         body = lustre_msg_buf(request->rq_repmsg, 0);
218         CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
219         if (oa)
220                 memcpy(oa, &body->oa, sizeof(*oa));
221
222         EXIT;
223  out:
224         ptlrpc_req_finished(request);
225         return rc;
226 }
227
228 static int osc_setattr(struct lustre_handle *conn, struct obdo *oa,
229                        struct lov_stripe_md *md)
230 {
231         struct ptlrpc_request *request;
232         struct ost_body *body;
233         int rc, size = sizeof(*body);
234         ENTRY;
235
236         request = ptlrpc_prep_req(class_conn2cliimp(conn), OST_SETATTR, 1,
237                                   &size, NULL);
238         if (!request)
239                 RETURN(-ENOMEM);
240
241         body = lustre_msg_buf(request->rq_reqmsg, 0);
242         memcpy(&body->oa, oa, sizeof(*oa));
243
244         request->rq_replen = lustre_msg_size(1, &size);
245
246         rc = ptlrpc_queue_wait(request);
247
248         ptlrpc_req_finished(request);
249         return rc;
250 }
251
252 static int osc_create(struct lustre_handle *conn, struct obdo *oa,
253                       struct lov_stripe_md **ea)
254 {
255         struct ptlrpc_request *request;
256         struct ost_body *body;
257         struct lov_stripe_md *lsm;
258         int rc, size = sizeof(*body);
259         ENTRY;
260
261         LASSERT(oa);
262         LASSERT(ea);
263
264         lsm = *ea;
265         if (!lsm) {
266                 rc = obd_alloc_memmd(conn, &lsm);
267                 if (rc < 0)
268                         RETURN(rc);
269         }
270
271         request = ptlrpc_prep_req(class_conn2cliimp(conn), OST_CREATE, 1, &size,
272                                   NULL);
273         if (!request)
274                 GOTO(out, rc = -ENOMEM);
275
276         body = lustre_msg_buf(request->rq_reqmsg, 0);
277         memcpy(&body->oa, oa, sizeof(*oa));
278
279         request->rq_replen = lustre_msg_size(1, &size);
280
281         rc = ptlrpc_queue_wait(request);
282         if (rc)
283                 GOTO(out_req, rc);
284
285         body = lustre_msg_buf(request->rq_repmsg, 0);
286         memcpy(oa, &body->oa, sizeof(*oa));
287
288         lsm->lsm_object_id = oa->o_id;
289         lsm->lsm_stripe_count = 0;
290         *ea = lsm;
291         EXIT;
292 out_req:
293         ptlrpc_req_finished(request);
294 out:
295         if (rc && !*ea)
296                 obd_free_memmd(conn, &lsm);
297         return rc;
298 }
299
300 static int osc_punch(struct lustre_handle *conn, struct obdo *oa,
301                      struct lov_stripe_md *md, obd_size start,
302                      obd_size end)
303 {
304         struct ptlrpc_request *request;
305         struct ost_body *body;
306         int rc, size = sizeof(*body);
307         ENTRY;
308
309         if (!oa) {
310                 CERROR("oa NULL\n");
311                 RETURN(-EINVAL);
312         }
313
314         request = ptlrpc_prep_req(class_conn2cliimp(conn), OST_PUNCH, 1, &size,
315                                   NULL);
316         if (!request)
317                 RETURN(-ENOMEM);
318
319         body = lustre_msg_buf(request->rq_reqmsg, 0);
320 #warning FIXME: pack only valid fields instead of memcpy, endianness, valid
321         memcpy(&body->oa, oa, sizeof(*oa));
322
323         /* overload the size and blocks fields in the oa with start/end */
324         body->oa.o_size = HTON__u64(start);
325         body->oa.o_blocks = HTON__u64(end);
326         body->oa.o_valid |= HTON__u32(OBD_MD_FLSIZE | OBD_MD_FLBLOCKS);
327
328         request->rq_replen = lustre_msg_size(1, &size);
329
330         rc = ptlrpc_queue_wait(request);
331         if (rc)
332                 GOTO(out, rc);
333
334         body = lustre_msg_buf(request->rq_repmsg, 0);
335         memcpy(oa, &body->oa, sizeof(*oa));
336
337         EXIT;
338  out:
339         ptlrpc_req_finished(request);
340         return rc;
341 }
342
343 static int osc_destroy(struct lustre_handle *conn, struct obdo *oa,
344                        struct lov_stripe_md *ea)
345 {
346         struct ptlrpc_request *request;
347         struct ost_body *body;
348         int rc, size = sizeof(*body);
349         ENTRY;
350
351         if (!oa) {
352                 CERROR("oa NULL\n");
353                 RETURN(-EINVAL);
354         }
355         request = ptlrpc_prep_req(class_conn2cliimp(conn), OST_DESTROY, 1,
356                                   &size, NULL);
357         if (!request)
358                 RETURN(-ENOMEM);
359
360         body = lustre_msg_buf(request->rq_reqmsg, 0);
361 #warning FIXME: pack only valid fields instead of memcpy, endianness
362         memcpy(&body->oa, oa, sizeof(*oa));
363
364         request->rq_replen = lustre_msg_size(1, &size);
365
366         rc = ptlrpc_queue_wait(request);
367         if (rc)
368                 GOTO(out, rc);
369
370         body = lustre_msg_buf(request->rq_repmsg, 0);
371         memcpy(oa, &body->oa, sizeof(*oa));
372
373         EXIT;
374  out:
375         ptlrpc_req_finished(request);
376         return rc;
377 }
378
379 /* Our bulk-unmapping bottom half. */
380 static void unmap_and_decref_bulk_desc(void *data)
381 {
382         struct ptlrpc_bulk_desc *desc = data;
383         struct list_head *tmp;
384         ENTRY;
385
386         /* This feels wrong to me. */
387         list_for_each(tmp, &desc->bd_page_list) {
388                 struct ptlrpc_bulk_page *bulk;
389                 bulk = list_entry(tmp, struct ptlrpc_bulk_page, bp_link);
390
391                 kunmap(bulk->bp_page);
392                 obd_kmap_put(1);
393         }
394
395         ptlrpc_bulk_decref(desc);
396         EXIT;
397 }
398
399 /*  this is the callback function which is invoked by the Portals
400  *  event handler associated with the bulk_sink queue and bulk_source queue. 
401  */
402 static void osc_ptl_ev_hdlr(struct ptlrpc_bulk_desc *desc)
403 {
404         ENTRY;
405
406         LASSERT(desc->bd_brw_set != NULL);
407         LASSERT(desc->bd_brw_set->brw_callback != NULL);
408
409         desc->bd_brw_set->brw_callback(desc->bd_brw_set, CB_PHASE_FINISH);
410
411         /* We can't kunmap the desc from interrupt context, so we do it from
412          * the bottom half above. */
413         prepare_work(&desc->bd_queue, unmap_and_decref_bulk_desc, desc);
414         schedule_work(&desc->bd_queue);
415
416         EXIT;
417 }
418
419 static int osc_brw_read(struct lustre_handle *conn, struct lov_stripe_md *lsm,
420                         obd_count page_count, struct brw_page *pga,
421                         struct obd_brw_set *set)
422 {
423         struct obd_import *imp = class_conn2cliimp(conn);
424         struct ptlrpc_connection *connection = imp->imp_connection;
425         struct ptlrpc_request *request = NULL;
426         struct ptlrpc_bulk_desc *desc = NULL;
427         struct ost_body *body;
428         int rc, size[3] = {sizeof(*body)}, mapped = 0;
429         void *iooptr, *nioptr;
430         __u32 xid;
431         ENTRY;
432
433         size[1] = sizeof(struct obd_ioobj);
434         size[2] = page_count * sizeof(struct niobuf_remote);
435
436         request = ptlrpc_prep_req(imp, OST_READ, 3, size, NULL);
437         if (!request)
438                 RETURN(-ENOMEM);
439
440         body = lustre_msg_buf(request->rq_reqmsg, 0);
441
442         desc = ptlrpc_prep_bulk(connection);
443         if (!desc)
444                 GOTO(out_req, rc = -ENOMEM);
445         desc->bd_portal = OST_BULK_PORTAL;
446         desc->bd_ptl_ev_hdlr = osc_ptl_ev_hdlr;
447         CDEBUG(D_PAGE, "desc = %p\n", desc);
448
449         iooptr = lustre_msg_buf(request->rq_reqmsg, 1);
450         nioptr = lustre_msg_buf(request->rq_reqmsg, 2);
451         ost_pack_ioo(&iooptr, lsm, page_count);
452         /* end almost identical to brw_write case */
453
454         spin_lock(&imp->imp_lock);
455         xid = ++imp->imp_last_xid;       /* single xid for all pages */
456         spin_unlock(&imp->imp_lock);
457
458         obd_kmap_get(page_count, 0);
459
460         for (mapped = 0; mapped < page_count; mapped++) {
461                 struct ptlrpc_bulk_page *bulk = ptlrpc_prep_bulk_page(desc);
462                 if (bulk == NULL)
463                         GOTO(out_unmap, rc = -ENOMEM);
464
465                 bulk->bp_xid = xid;           /* single xid for all pages */
466
467                 bulk->bp_buf = kmap(pga[mapped].pg);
468                 bulk->bp_page = pga[mapped].pg;
469                 bulk->bp_buflen = PAGE_SIZE;
470                 ost_pack_niobuf(&nioptr, pga[mapped].off, pga[mapped].count,
471                                 pga[mapped].flag, bulk->bp_xid);
472         }
473
474         /*
475          * Register the bulk first, because the reply could arrive out of order,
476          * and we want to be ready for the bulk data.
477          *
478          * One reference is released when brw_finish is complete, the other when
479          * the caller removes us from the "set" list.
480          *
481          * On error, we never do the brw_finish, so we handle all decrefs.
482          */
483         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_READ_BULK)) {
484                 CERROR("obd_fail_loc=%x, skipping register_bulk\n",
485                        OBD_FAIL_OSC_BRW_READ_BULK);
486         } else {
487                 rc = ptlrpc_register_bulk(desc);
488                 if (rc)
489                         GOTO(out_unmap, rc);
490                 obd_brw_set_add(set, desc);
491         }
492
493         request->rq_replen = lustre_msg_size(1, size);
494         rc = ptlrpc_queue_wait(request);
495
496         /*
497          * XXX: If there is an error during the processing of the callback,
498          *      such as a timeout in a sleep that it performs, brw_finish
499          *      will never get called, and we'll leak the desc, fail to kunmap
500          *      things, cats will live with dogs.  One solution would be to
501          *      export brw_finish as osc_brw_finish, so that the timeout case
502          *      and its kin could call it for proper cleanup.  An alternative
503          *      would be for an error return from the callback to cause us to
504          *      clean up, but that doesn't help the truly async cases (like
505          *      LOV), which will immediately return from their PHASE_START
506          *      callback, before any such cleanup-requiring error condition can
507          *      be detected.
508          */
509  out_req:
510         ptlrpc_req_finished(request);
511         RETURN(rc);
512
513         /* Clean up on error. */
514 out_unmap:
515         while (mapped-- > 0)
516                 kunmap(pga[mapped].pg);
517         obd_kmap_put(page_count);
518         ptlrpc_bulk_decref(desc);
519         goto out_req;
520 }
521
522 static int osc_brw_write(struct lustre_handle *conn, struct lov_stripe_md *md,
523                          obd_count page_count, struct brw_page *pga,
524                          struct obd_brw_set *set)
525 {
526         struct ptlrpc_connection *connection =
527                 client_conn2cli(conn)->cl_import.imp_connection;
528         struct ptlrpc_request *request = NULL;
529         struct ptlrpc_bulk_desc *desc = NULL;
530         struct ost_body *body;
531         struct niobuf_local *local = NULL;
532         struct niobuf_remote *remote;
533         int rc, j, size[3] = {sizeof(*body)}, mapped = 0;
534         void *iooptr, *nioptr;
535         ENTRY;
536
537         size[1] = sizeof(struct obd_ioobj);
538         size[2] = page_count * sizeof(*remote);
539
540         request = ptlrpc_prep_req(class_conn2cliimp(conn), OST_WRITE, 3, size,
541                                   NULL);
542         if (!request)
543                 RETURN(-ENOMEM);
544
545         body = lustre_msg_buf(request->rq_reqmsg, 0);
546
547         desc = ptlrpc_prep_bulk(connection);
548         if (!desc)
549                GOTO(out_req, rc = -ENOMEM);
550         desc->bd_portal = OSC_BULK_PORTAL;
551         desc->bd_ptl_ev_hdlr = osc_ptl_ev_hdlr;
552         CDEBUG(D_PAGE, "desc = %p\n", desc);
553
554         iooptr = lustre_msg_buf(request->rq_reqmsg, 1);
555         nioptr = lustre_msg_buf(request->rq_reqmsg, 2);
556         ost_pack_ioo(&iooptr, md, page_count);
557         /* end almost identical to brw_read case */
558
559         OBD_ALLOC(local, page_count * sizeof(*local));
560         if (!local)
561                 GOTO(out_desc, rc = -ENOMEM);
562
563         obd_kmap_get(page_count, 0);
564
565         for (mapped = 0; mapped < page_count; mapped++) {
566                 local[mapped].addr = kmap(pga[mapped].pg);
567
568                 CDEBUG(D_INFO, "kmap(pg) = %p ; pg->flags = %lx ; pg->count = "
569                        "%d ; page %d of %d\n",
570                        local[mapped].addr, pga[mapped].pg->flags,
571                        page_count(pga[mapped].pg),
572                        mapped, page_count - 1);
573
574                 local[mapped].offset = pga[mapped].off;
575                 local[mapped].len = pga[mapped].count;
576                 ost_pack_niobuf(&nioptr, pga[mapped].off, pga[mapped].count,
577                                 pga[mapped].flag, 0);
578         }
579
580         size[1] = page_count * sizeof(*remote);
581         request->rq_replen = lustre_msg_size(2, size);
582         rc = ptlrpc_queue_wait(request);
583         if (rc)
584                 GOTO(out_unmap, rc);
585
586         nioptr = lustre_msg_buf(request->rq_repmsg, 1);
587         if (!nioptr)
588                 GOTO(out_unmap, rc = -EINVAL);
589
590         if (request->rq_repmsg->buflens[1] != size[1]) {
591                 CERROR("buffer length wrong (%d vs. %d)\n",
592                        request->rq_repmsg->buflens[1], size[1]);
593                 GOTO(out_unmap, rc = -EINVAL);
594         }
595
596         for (j = 0; j < page_count; j++) {
597                 struct ptlrpc_bulk_page *bulk;
598
599                 ost_unpack_niobuf(&nioptr, &remote);
600
601                 bulk = ptlrpc_prep_bulk_page(desc);
602                 if (!bulk)
603                         GOTO(out_unmap, rc = -ENOMEM);
604
605                 bulk->bp_buf = (void *)(unsigned long)local[j].addr;
606                 bulk->bp_buflen = local[j].len;
607                 bulk->bp_xid = remote->xid;
608                 bulk->bp_page = pga[j].pg;
609         }
610
611         if (desc->bd_page_count != page_count)
612                 LBUG();
613
614         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_WRITE_BULK))
615                 GOTO(out_unmap, rc = 0);
616
617         OBD_FREE(local, page_count * sizeof(*local));
618
619         /* One reference is released when brw_finish is complete, the other
620          * when the caller removes it from the "set" list. */
621         obd_brw_set_add(set, desc);
622         rc = ptlrpc_send_bulk(desc);
623
624         /* XXX: Mike, same question as in osc_brw_read. */
625 out_req:
626         ptlrpc_req_finished(request);
627         RETURN(rc);
628
629         /* Clean up on error. */
630 out_unmap:
631         while (mapped-- > 0)
632                 kunmap(pga[mapped].pg);
633
634         obd_kmap_put(page_count);
635
636         OBD_FREE(local, page_count * sizeof(*local));
637 out_desc:
638         ptlrpc_bulk_decref(desc);
639         goto out_req;
640 }
641
642 static int osc_brw(int cmd, struct lustre_handle *conn,
643                    struct lov_stripe_md *md, obd_count page_count,
644                    struct brw_page *pga, struct obd_brw_set *set)
645 {
646         ENTRY;
647
648         while (page_count) {
649                 obd_count pages_per_brw;
650                 int rc;
651
652                 if (page_count > PTL_MD_MAX_IOV)
653                         pages_per_brw = PTL_MD_MAX_IOV;
654                 else
655                         pages_per_brw = page_count;
656
657                 if (cmd & OBD_BRW_WRITE)
658                         rc = osc_brw_write(conn, md, pages_per_brw, pga, set);
659                 else
660                         rc = osc_brw_read(conn, md, pages_per_brw, pga, set);
661
662                 if (rc != 0)
663                         RETURN(rc);
664
665                 page_count -= pages_per_brw;
666                 pga += pages_per_brw;
667         }
668         RETURN(0);
669 }
670
671 static int osc_enqueue(struct lustre_handle *connh, struct lov_stripe_md *lsm,
672                        struct lustre_handle *parent_lock,
673                        __u32 type, void *extentp, int extent_len, __u32 mode,
674                        int *flags, void *callback, void *data, int datalen,
675                        struct lustre_handle *lockh)
676 {
677         __u64 res_id[RES_NAME_SIZE] = { lsm->lsm_object_id };
678         struct obd_device *obddev = class_conn2obd(connh);
679         struct ldlm_extent *extent = extentp;
680         int rc;
681         ENTRY;
682
683         /* Filesystem locks are given a bit of special treatment: if
684          * this is not a file size lock (which has end == -1), we
685          * fixup the lock to start and end on page boundaries. */
686         if (extent->end != OBD_OBJECT_EOF) {
687                 extent->start &= PAGE_MASK;
688                 extent->end = (extent->end & PAGE_MASK) + PAGE_SIZE - 1;
689         }
690
691         /* Next, search for already existing extent locks that will cover us */
692         rc = ldlm_lock_match(obddev->obd_namespace, res_id, type, extent,
693                              sizeof(extent), mode, lockh);
694         if (rc == 1)
695                 /* We already have a lock, and it's referenced */
696                 RETURN(ELDLM_OK);
697
698         /* If we're trying to read, we also search for an existing PW lock.  The
699          * VFS and page cache already protect us locally, so lots of readers/
700          * writers can share a single PW lock.
701          *
702          * There are problems with conversion deadlocks, so instead of
703          * converting a read lock to a write lock, we'll just enqueue a new
704          * one.
705          *
706          * At some point we should cancel the read lock instead of making them
707          * send us a blocking callback, but there are problems with canceling
708          * locks out from other users right now, too. */
709
710         if (mode == LCK_PR) {
711                 rc = ldlm_lock_match(obddev->obd_namespace, res_id, type,
712                                      extent, sizeof(extent), LCK_PW, lockh);
713                 if (rc == 1) {
714                         /* FIXME: This is not incredibly elegant, but it might
715                          * be more elegant than adding another parameter to
716                          * lock_match.  I want a second opinion. */
717                         ldlm_lock_addref(lockh, LCK_PR);
718                         ldlm_lock_decref(lockh, LCK_PW);
719
720                         RETURN(ELDLM_OK);
721                 }
722         }
723
724         rc = ldlm_cli_enqueue(connh, NULL, obddev->obd_namespace, parent_lock,
725                               res_id, type, extent, sizeof(extent), mode, flags,
726                               ldlm_completion_ast, callback, data, datalen,
727                               lockh);
728         RETURN(rc);
729 }
730
731 static int osc_cancel(struct lustre_handle *oconn, struct lov_stripe_md *md,
732                       __u32 mode, struct lustre_handle *lockh)
733 {
734         ENTRY;
735
736         ldlm_lock_decref(lockh, mode);
737
738         RETURN(0);
739 }
740
741 static int osc_cancel_unused(struct lustre_handle *connh,
742                              struct lov_stripe_md *lsm, int flags)
743 {
744         struct obd_device *obddev = class_conn2obd(connh);
745         __u64 res_id[RES_NAME_SIZE] = { lsm->lsm_object_id };
746
747         return ldlm_cli_cancel_unused(obddev->obd_namespace, res_id, flags);
748 }
749
750 static int osc_statfs(struct lustre_handle *conn, struct obd_statfs *osfs)
751 {
752         struct ptlrpc_request *request;
753         int rc, size = sizeof(*osfs);
754         ENTRY;
755
756         request = ptlrpc_prep_req(class_conn2cliimp(conn), OST_STATFS, 0, NULL,
757                                   NULL);
758         if (!request)
759                 RETURN(-ENOMEM);
760
761         request->rq_replen = lustre_msg_size(1, &size);
762
763         rc = ptlrpc_queue_wait(request);
764         if (rc) {
765                 CERROR("%s failed: rc = %d\n", __FUNCTION__, rc);
766                 GOTO(out, rc);
767         }
768
769         obd_statfs_unpack(osfs, lustre_msg_buf(request->rq_repmsg, 0));
770
771         EXIT;
772  out:
773         ptlrpc_req_finished(request);
774         return rc;
775 }
776
777 static int osc_iocontrol(long cmd, struct lustre_handle *conn, int len,
778                          void *karg, void *uarg)
779 {
780         struct obd_device *obddev = class_conn2obd(conn);
781         struct obd_ioctl_data *data = karg;
782         int err = 0;
783         ENTRY;
784
785         switch (cmd) {
786         case IOC_LDLM_TEST: {
787                 err = ldlm_test(obddev, conn);
788                 CERROR("-- done err %d\n", err);
789                 GOTO(out, err);
790         }
791         case IOC_LDLM_REGRESS_START: {
792                 unsigned int numthreads = 1;
793                 unsigned int numheld = 10;
794                 unsigned int numres = 10;
795                 unsigned int numext = 10;
796                 char *parse;
797
798                 if (data->ioc_inllen1) {
799                         parse = data->ioc_inlbuf1;
800                         if (*parse != '\0') {
801                                 while(isspace(*parse)) parse++;
802                                 numthreads = simple_strtoul(parse, &parse, 0);
803                                 while(isspace(*parse)) parse++;
804                         }
805                         if (*parse != '\0') {
806                                 while(isspace(*parse)) parse++;
807                                 numheld = simple_strtoul(parse, &parse, 0);
808                                 while(isspace(*parse)) parse++;
809                         }
810                         if (*parse != '\0') {
811                                 while(isspace(*parse)) parse++;
812                                 numres = simple_strtoul(parse, &parse, 0);
813                                 while(isspace(*parse)) parse++;
814                         }
815                         if (*parse != '\0') {
816                                 while(isspace(*parse)) parse++;
817                                 numext = simple_strtoul(parse, &parse, 0);
818                                 while(isspace(*parse)) parse++;
819                         }
820                 }
821
822                 err = ldlm_regression_start(obddev, conn, numthreads,
823                                 numheld, numres, numext);
824
825                 CERROR("-- done err %d\n", err);
826                 GOTO(out, err);
827         }
828         case IOC_LDLM_REGRESS_STOP: {
829                 err = ldlm_regression_stop();
830                 CERROR("-- done err %d\n", err);
831                 GOTO(out, err);
832         }
833         case IOC_OSC_REGISTER_LOV: {
834                 if (obddev->u.cli.cl_containing_lov)
835                         GOTO(out, err = -EALREADY);
836                 obddev->u.cli.cl_containing_lov = (struct obd_device *)karg;
837                 GOTO(out, err);
838         }
839         case OBD_IOC_LOV_GET_CONFIG: {
840                 char *buf;
841                 struct lov_desc *desc;
842                 obd_uuid_t *uuidp;
843
844                 buf = NULL;
845                 len = 0;
846                 if (obd_ioctl_getdata(&buf, &len, (void *)uarg))
847                         GOTO(out, err = -EINVAL);
848
849                 data = (struct obd_ioctl_data *)buf;
850
851                 if (sizeof(*desc) > data->ioc_inllen1) {
852                         OBD_FREE(buf, len);
853                         GOTO(out, err = -EINVAL);
854                 }
855
856                 if (data->ioc_inllen2 < sizeof(*uuidp)) {
857                         OBD_FREE(buf, len);
858                         GOTO(out, err = -EINVAL);
859                 }
860
861                 desc = (struct lov_desc *)data->ioc_inlbuf1;
862                 desc->ld_tgt_count = 1;
863                 desc->ld_active_tgt_count = 1;
864                 desc->ld_default_stripe_count = 1;
865                 desc->ld_default_stripe_size = 0;
866                 desc->ld_default_stripe_offset = 0;
867                 desc->ld_pattern = 0;
868                 memcpy(desc->ld_uuid,  obddev->obd_uuid, sizeof(*uuidp));
869
870                 uuidp = (obd_uuid_t *)data->ioc_inlbuf2;
871                 memcpy(uuidp,  obddev->obd_uuid, sizeof(*uuidp));
872
873                 err = copy_to_user((void *)uarg, buf, len);
874                 if (err)
875                         err = -EFAULT;
876                 OBD_FREE(buf, len);
877                 GOTO(out, err);
878         }
879         default:
880                 GOTO(out, err = -ENOTTY);
881         }
882 out:
883         return err;
884 }
885
886 static void set_osc_active(struct obd_import *imp, int active)
887 {
888         struct obd_device *notify_obd = imp->imp_obd->u.cli.cl_containing_lov;
889
890         if (notify_obd == NULL)
891                 return;
892
893         /* How gross is _this_? */
894         if (!list_empty(&notify_obd->obd_exports)) {
895                 int rc;
896                 struct lustre_handle fakeconn;
897                 struct obd_ioctl_data ioc_data;
898                 struct obd_export *exp =
899                         list_entry(notify_obd->obd_exports.next,
900                                    struct obd_export, exp_obd_chain);
901
902                 fakeconn.addr = (__u64)(unsigned long)exp;
903                 fakeconn.cookie = exp->exp_cookie;
904                 ioc_data.ioc_inlbuf1 = imp->imp_obd->obd_uuid;
905                 ioc_data.ioc_offset = active;
906                 rc = obd_iocontrol(IOC_LOV_SET_OSC_ACTIVE, &fakeconn,
907                                    sizeof ioc_data, &ioc_data, NULL);
908                 if (rc)
909                         CERROR("disabling %s on LOV %p/%s: %d\n",
910                                imp->imp_obd->obd_uuid, notify_obd,
911                                notify_obd->obd_uuid, rc);
912         } else {
913                 CDEBUG(D_HA, "No exports for obd %p/%s, can't notify about "
914                        "%p\n", notify_obd, notify_obd->obd_uuid,
915                        imp->imp_obd->obd_uuid);
916         }
917 }
918
919
920 /* XXX looks a lot like super.c:invalidate_request_list, don't it? */
921 static void abort_inflight_for_import(struct obd_import *imp)
922 {
923         struct list_head *tmp, *n;
924
925         /* Make sure that no new requests get processed for this import.
926          * ptlrpc_queue_wait must (and does) hold imp_lock while testing this
927          * flag and then putting requests on sending_list or delayed_list.
928          */
929         spin_lock(&imp->imp_lock);
930         imp->imp_flags |= IMP_INVALID;
931         spin_unlock(&imp->imp_lock);
932
933         list_for_each_safe(tmp, n, &imp->imp_sending_list) {
934                 struct ptlrpc_request *req =
935                         list_entry(tmp, struct ptlrpc_request, rq_list);
936
937                 DEBUG_REQ(D_HA, req, "inflight");
938                 req->rq_flags |= PTL_RPC_FL_ERR;
939                 wake_up(&req->rq_wait_for_rep);
940         }
941
942         list_for_each_safe(tmp, n, &imp->imp_delayed_list) {
943                 struct ptlrpc_request *req =
944                         list_entry(tmp, struct ptlrpc_request, rq_list);
945
946                 DEBUG_REQ(D_HA, req, "aborting waiting req");
947                 req->rq_flags |= PTL_RPC_FL_ERR;
948                 wake_up(&req->rq_wait_for_rep);
949         }
950 }
951
952 static int osc_recover(struct obd_import *imp, int phase)
953 {
954         int rc;
955         ENTRY;
956
957         switch(phase) {
958             case PTLRPC_RECOVD_PHASE_PREPARE: {
959                 struct ldlm_namespace *ns = imp->imp_obd->obd_namespace;
960                 ldlm_namespace_cleanup(ns, 1 /* no network ops */);
961                 abort_inflight_for_import(imp);
962                 set_osc_active(imp, 0 /* inactive */);
963                 RETURN(0);
964             }
965             case PTLRPC_RECOVD_PHASE_RECOVER:
966                 rc = ptlrpc_reconnect_import(imp, OST_CONNECT);
967                 if (rc)
968                         RETURN(rc);
969                 set_osc_active(imp, 1 /* active */);
970                 RETURN(0);
971             default:
972                 RETURN(-EINVAL);
973         }
974 }
975
976 static int osc_connect(struct lustre_handle *conn, struct obd_device *obd,
977                        obd_uuid_t cluuid, struct recovd_obd *recovd,
978                        ptlrpc_recovery_cb_t recover)
979 {
980         struct obd_import *imp = &obd->u.cli.cl_import;
981         imp->imp_recover = osc_recover;
982         return client_obd_connect(conn, obd, cluuid, recovd, recover);
983 }
984
985 struct obd_ops osc_obd_ops = {
986         o_attach:       osc_attach,
987         o_detach:       osc_detach,
988         o_setup:        client_obd_setup,
989         o_cleanup:      client_obd_cleanup,
990         o_connect:      osc_connect,
991         o_disconnect:   client_obd_disconnect,
992         o_statfs:       osc_statfs,
993         o_packmd:       osc_packmd,
994         o_unpackmd:     osc_unpackmd,
995         o_create:       osc_create,
996         o_destroy:      osc_destroy,
997         o_getattr:      osc_getattr,
998         o_setattr:      osc_setattr,
999         o_open:         osc_open,
1000         o_close:        osc_close,
1001         o_brw:          osc_brw,
1002         o_punch:        osc_punch,
1003         o_enqueue:      osc_enqueue,
1004         o_cancel:       osc_cancel,
1005         o_cancel_unused: osc_cancel_unused,
1006         o_iocontrol:    osc_iocontrol
1007 };
1008
1009 static int __init osc_init(void)
1010 {
1011         RETURN(class_register_type(&osc_obd_ops, status_class_var,
1012                                    LUSTRE_OSC_NAME));
1013 }
1014
1015 static void __exit osc_exit(void)
1016 {
1017         class_unregister_type(LUSTRE_OSC_NAME);
1018 }
1019
1020 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
1021 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC) v1.0");
1022 MODULE_LICENSE("GPL");
1023
1024 module_init(osc_init);
1025 module_exit(osc_exit);